Source code for cdm_reader_mapper.mdf_reader.utils.convert_and_decode

"""Internal pandas converting operators."""

from __future__ import annotations
from collections.abc import Callable
from decimal import Decimal, InvalidOperation
from typing import Any, get_args

import pandas as pd

from .. import properties
from .utilities import convert_str_boolean


numeric_types = get_args(properties.NumericTypes)


[docs] def max_decimal_places(*decimals: Decimal) -> int: r""" Return the maximum number of decimal places among Decimal values. Parameters ---------- \*decimals : Decimal One or more Decimal values. Returns ------- int Maximum number of decimal places. """ return max(-int(d.as_tuple().exponent) if int(d.as_tuple().exponent) < 0 else 0 for d in decimals)
[docs] def to_numeric(x: Any, scale: Decimal, offset: Decimal) -> Decimal | bool: """ Convert a value to a scaled Decimal with offset applied. Parameters ---------- x : Any Input value to convert. scale : Decimal Scale factor. offset : Decimal Offset value. Returns ------- Decimal | bool Converted Decimal value, boolean, or False if invalid. Notes ----- - Boolean values are returned unchanged - Empty or invalid values return False - Strings are stripped and spaces replaced with zeros - Result is quantized to the maximum decimal precision of input, scale, or offset """ x = convert_str_boolean(x) if isinstance(x, bool): return x if isinstance(x, str): x = x.strip() x = x.replace(" ", "0") try: x_dec = Decimal(str(x)) decimal_places = max_decimal_places(offset, scale, x_dec) result = offset + x_dec * scale if decimal_places == 0: return result return result.quantize(Decimal("1." + "0" * decimal_places)) except (InvalidOperation, TypeError, ValueError): return False
[docs] class Decoders: """ Registry-based decoder dispatcher for column-wise decoding. Currently supports Base36 decoding for numeric-like fields. Parameters ---------- dtype : str Target data type name (e.g. numeric field type). encoding : str, default "base36" Encoding scheme to use. """ def __init__(self, dtype: str, encoding: str = "base36") -> None: """ Initialize a Decoders instance. Parameters ---------- dtype : str Target data type name (e.g. numeric field type). encoding : str, default "base36" Encoding scheme to use. """ self.dtype = dtype self.encoding = encoding self._registry = {"key": self.base36} for numeric_type in numeric_types: self._registry[numeric_type] = self.base36
[docs] def decoder(self) -> Callable[[pd.Series], pd.Series] | None: """ Return the decoder function for the configured dtype and encoding. Returns ------- Callable or None Decoder function accepting a pandas Series, or None if encoding is unsupported. Raises ------ KeyError If no decoder is registered for the given dtype. """ if self.encoding != "base36": return None try: return self._registry[self.dtype] except KeyError as exc: raise KeyError(f"No converter registered for '{self.dtype}'") from exc
[docs] def base36(self, data: pd.Series) -> pd.Series: """ Decode a pandas Series from Base36 to stringified base-10 integers. Boolean values are preserved. Invalid values raise ValueError via `int(..., 36)`. Parameters ---------- data : pd.Series Input Series containing base36-encoded values. Returns ------- pd.Series Decoded Series with stringified integers or booleans. """ def _base36(x: Any) -> Any: """ Decode a value from Base36 to stringified base-10 integer. Parameters ---------- x : Any Value to be decoded. Returns ------- Any Converted value. """ x = convert_str_boolean(x) if isinstance(x, bool): return x return str(int(str(x), 36)) return data.apply(_base36)
[docs] class Converters: """ Registry-based converter for pandas Series. Converts object-typed Series into numeric, datetime, or cleaned object representations based on the configured dtype. Parameters ---------- dtype : str Target output dtype identifier. """ def __init__(self, dtype: str) -> None: """ Initialize a Converters instance. Parameters ---------- dtype : str Target output dtype identifier. """ self.dtype = dtype self.numeric_scale = 1.0 if self.dtype == "float" else 1 self.numeric_offset = 0.0 if self.dtype == "float" else 0 self.preprocessing_functions: dict[str, Callable[[Any], Any]] = { "PPPP": lambda x: 10_000 + int(x) if isinstance(x, str) and x.startswith("0") else x } self._registry: dict[str, Callable[..., Any]] = { "datetime": self.object_to_datetime, "str": self.object_to_object, "object": self.object_to_object, "key": self.object_to_object, } for numeric_type in numeric_types: self._registry[numeric_type] = self.object_to_numeric
[docs] def converter(self) -> Callable[..., pd.Series]: """ Return the converter function registered for the configured dtype. Returns ------- Callable Converter function. Raises ------ KeyError If no converter is registered for the dtype. """ if self.dtype in self._registry: return self._registry[self.dtype] raise KeyError(f"No converter registered for '{self.dtype}'")
[docs] def object_to_numeric( self, data: pd.Series, scale: float | int | None = None, offset: float | int | None = None, ) -> pd.Series: """ Convert object Series to numeric using Decimal arithmetic. - Right spaces are treated as zeros - Optional scale and offset may be applied - Boolean values are preserved - Invalid conversions return False Parameters ---------- data : pd.Series Object-typed Series. scale : numeric, optional Scale factor. offset : numeric, optional Offset value. Returns ------- pd.Series Converted Series. """ if data.dtype != "object": return data scale_val = scale if scale else self.numeric_scale offset_val = offset if offset else self.numeric_offset scale_dec = Decimal(str(scale_val)) offset_dec = Decimal(str(offset_val)) column_name = data.name if column_name in self.preprocessing_functions: data = data.apply(self.preprocessing_functions[column_name]) return data.apply(lambda x: to_numeric(x, scale_dec, offset_dec))
[docs] def object_to_object( self, data: pd.Series, disable_white_strip: bool | str = False, ) -> pd.Series: """ Clean object Series by stripping whitespace and nullifying empty strings. Parameters ---------- data : pd.Series Object-typed Series. disable_white_strip : bool or {"l", "r"}, default False Control whitespace stripping behavior. Returns ------- pd.Series Cleaned Series. """ if data.dtype != "object": return data if not disable_white_strip: data = data.str.strip() elif disable_white_strip == "l": data = data.str.rstrip() elif disable_white_strip == "r": data = data.str.lstrip() return data.apply(lambda x: None if isinstance(x, str) and (x.isspace() or not x) else x)
[docs] def object_to_datetime( self, data: pd.Series, datetime_format: str = "%Y%m%d", ) -> pd.Series: """ Convert object Series to pandas datetime. Invalid values are coerced to NaT. Parameters ---------- data : pd.Series Object-typed Series. datetime_format : str, default "%Y%m%d" Datetime parsing format. Returns ------- pd.Series Datetime Series. """ if data.dtype != "object": return data return pd.to_datetime(data, format=datetime_format, errors="coerce")
[docs] def convert_and_decode( data: pd.DataFrame, convert_flag: bool = True, decode_flag: bool = True, converter_dict: dict[str, Callable[[pd.Series], pd.Series]] | None = None, converter_kwargs: dict[str, dict[str, Any]] | None = None, decoder_dict: dict[str, Callable[[pd.Series], pd.Series]] | None = None, ) -> pd.DataFrame: """ Convert and decode data entries by using a pre-defined data model. Overwrite attribute `data` with converted and/or decoded data. Parameters ---------- data : pd.DataFrame Data to convert and decode. convert_flag : bool, default True If True, apply converters to the columns defined in `converter_dict`. decode_flag : bool, default True If True, apply decoders to the columns defined in `decoder_dict`. converter_dict : dict[str, callable], optional Column-specific converter functions. If None, defaults to empty dict. converter_kwargs : dict[str, dict], optional Keyword arguments for each converter function. decoder_dict : dict[str, callable], optional Column-specific decoder functions. If None, defaults to empty dict. Returns ------- pd.DataFrame DataFrame with converted and decoded columns. """ converter_dict = converter_dict or {} converter_kwargs = converter_kwargs or {} decoder_dict = decoder_dict or {} if decode_flag: for column, dec_func in decoder_dict.items(): if column in data.columns: decoded = dec_func(data[column]) decoded.index = data[column].index data[column] = decoded if convert_flag: for column, conv_func in converter_dict.items(): if column in data.columns: kwargs = converter_kwargs.get(column, {}) converted = conv_func(data[column], **kwargs) converted.index = data[column].index data[column] = converted return data