Source code for cdm_reader_mapper.core._utilities

"""Common Data Model (CDM) DataBundle class."""

from __future__ import annotations
from collections.abc import Iterable, Iterator
from copy import deepcopy
from typing import Any

import numpy as np
import pandas as pd

from cdm_reader_mapper.common.iterators import (
    ParquetStreamReader,
    is_valid_iterator,
    parquet_stream_from_iterable,
    process_disk_backed,
)


properties = {
    "data",
    "columns",
    "dtypes",
    "mask",
    "imodel",
    "mode",
    "parse_dates",
    "encoding",
}


def _copy(value: Any) -> Any:
    """
    Make copy of value.

    Parameters
    ----------
    value : Any
        Value to make a copy of.

    Returns
    -------
    Any
        Copy of `value`.
    """
    if isinstance(value, dict):
        return deepcopy(value)
    elif isinstance(value, (pd.DataFrame, pd.Series)):
        return value.copy()
    elif isinstance(value, ParquetStreamReader):
        return value.copy()
    elif hasattr(value, "copy"):
        return value.copy()
    return value


[docs] def method(attr_func: Any, *args: Any, **kwargs: Any) -> Any: r""" Handle both method calls and subscriptable attributes. Parameters ---------- attr_func : Any A callable object (e.g., function or method) or a subscriptable object (e.g., list, tuple, dict, or array-like). \*args : Any Positional arguments passed to `attr_func`, or used as the index/key when `attr_func` is subscriptable. \**kwargs : Any Keyword arguments passed to `attr_func`. Ignored if `attr_func` is not callable. Returns ------- Any The result of calling `attr_func(*args, **kwargs)` if it is callable, or the result of `attr_func[args]` if it is subscriptable. Raises ------ ValueError If `attr_func` is neither callable nor subscriptable, or if indexing fails due to an invalid key or index. """ if callable(attr_func): return attr_func(*args, **kwargs) try: return attr_func[args] except (ValueError, TypeError, AttributeError) as err: raise ValueError("Attribute is neither callable nor subscriptable.") from err
[docs] def reader_method( data: pd.DataFrame | ParquetStreamReader, attr: str, *args: Any, process_kwargs: dict[str, Any] | None = None, **kwargs: Any, ) -> ParquetStreamReader | None: r""" Handle operations on chunked data (ParquetStreamReader). Uses process_disk_backed to stream processing without loading into RAM. Parameters ---------- data : pd.DataFrame or ParquetStreamReader Input data to operate on. attr : str Name of attribute or method of to apply. \*args : Any Positional arguments passed to the attribute or method. process_kwargs : dict, optional Additional keyword arguments passed to the streaming processor. \**kwargs : Any Keyword arguments passed to the attribute or method. Supports `inplace` to update `db` instead of returning a result. Returns ------- ParquetStreamReader or None A new stream with the applied operation. """ # Define the transformation function to apply per chunk def apply_operation(df: pd.DataFrame) -> pd.DataFrame: """ Apply operation to `df`. Parameters ---------- df : pd.DataFrame Data wo apply operation on. Returns ------- pd.DataFrame Manipulated data. """ # Fetch the attribute (method or property) from the chunk attr_obj = getattr(df, attr) # Use the 'method' helper to execute it (call or subscript) return method(attr_obj, *args, **kwargs) if process_kwargs is None: process_kwargs = {} # Process stream using Disk-Backed Parquet Engine result_tuple = process_disk_backed( data, apply_operation, makecopy=False, **process_kwargs, ) if result_tuple is None: return None # The result is a tuple: (ParquetStreamReader, [extra_outputs]) new_reader: ParquetStreamReader = result_tuple[0] return new_reader
[docs] def combine_attribute_values(first_value: Any, iterator: Iterator[Any] | ParquetStreamReader, attr: str) -> Any: """ Collect values of an attribute across all chunks and combine them. Parameters ---------- first_value : Any The value from the first chunk (already read). iterator : Iterator/ParquetStreamReader The stream positioned at the second chunk. attr : str The attribute name to fetch from remaining chunks. Returns ------- Any Combined attribute values of `iterator`. """ combined_values = [first_value] # Iterate through the rest of the stream combined_values.extend(getattr(chunk, attr) for chunk in iterator) # Logic to merge results based on type if isinstance(first_value, pd.Index): combined_index = first_value for idx in combined_values[1:]: combined_index = combined_index.union(idx) return combined_index if isinstance(first_value, (int, float)): return sum(combined_values) if isinstance(first_value, tuple) and len(first_value) == 2: # Tuple usually implies shape (rows, cols) # Sum rows (0), keep cols (1) constant first_ = sum(value[0] for value in combined_values) second_ = first_value[1] return (first_, second_) if isinstance(first_value, (list, np.ndarray)): return np.concatenate(combined_values) if isinstance(first_value, (pd.DataFrame, pd.Series)): return pd.concat(combined_values) return combined_values
[docs] class SubscriptableMethod: """ Allows both method calls and subscript access. Parameters ---------- func : Any Underlying callable or subscriptable object. """ def __init__(self, func: Any) -> None: """ Initialization of a SubscriptableMethod instance. Parameters ---------- func : Any Underlying callable or subscriptable object. """ self.func = func def __getitem__(self, item: Any) -> Any: """ Ensure subscript access is handled properly. Parameters ---------- item : Any Index or key used for subscripting. Returns ------- Any Result of `self.func[item]`. Raises ------ NotImplementedError If the wrapped object does not support subscripting. """ try: return self.func[item] except TypeError as err: raise NotImplementedError("Calling subscriptable methods have not been implemented for chunked data yet.") from err def __call__(self, *args: Any, **kwargs: Any) -> Any: r""" Ensure function calls work properly. Parameters ---------- \*args : Any Positional arguments passed to the callable. \**kwargs : Any Keyword arguments passed to the callable. Returns ------- Any Result of `self.func(*args, **kwargs)`. """ return self.func(*args, **kwargs)
def _validate_mode(mode: str) -> None: """ Validate the data bundle mode. Parameters ---------- mode : str Mode string to validate. Raises ------ ValueError If the mode is not one of the supported values. """ if mode not in {"data", "tables"}: raise ValueError(f"'mode' {mode} is not valid, use one of ['data', 'tables'].") def _normalize_data_input( data: pd.DataFrame | Iterable[pd.DataFrame] | None, columns: pd.Index | pd.MultiIndex | list[Any] | None, dtypes: pd.Series | dict[str | tuple[str, str], Any] | None, ) -> pd.DataFrame | ParquetStreamReader: """ Normalize and validate the input data. Parameters ---------- data : pd.DataFrame, Iterable of pd.DataFrame, or None Input data. columns : pd.Index, pd.MultiIndex, or list, optional Column labels used when initializing empty data. dtypes : pd.Series or dict, optional Data types for columns. Returns ------- pd.DataFrame or ParquetStreamReader Normalized data representation. Raises ------ TypeError If the data type is unsupported. """ if data is None: data = pd.DataFrame(columns=columns, dtype=dtypes) if isinstance(data, (list, tuple)): data = iter(data) if is_valid_iterator(data) and not isinstance(data, ParquetStreamReader): data = parquet_stream_from_iterable(data) if not isinstance(data, (pd.DataFrame, ParquetStreamReader)): raise TypeError(f"'data' has unsupported type {type(data)}.") return data def _normalize_mask_input( mask: pd.DataFrame | Iterable[pd.DataFrame] | None, data: pd.DataFrame | ParquetStreamReader, ) -> pd.DataFrame | ParquetStreamReader: """ Normalize and validate the mask aligned with the input data. Parameters ---------- mask : pd.DataFrame, Iterable of pd.DataFrame, or None Input mask. data : pd.DataFrame or ParquetStreamReader Normalized data used to infer mask structure when mask is None. Returns ------- pd.DataFrame or ParquetStreamReader Normalized mask. Raises ------ TypeError If the mask type is unsupported. """ if mask is None: if isinstance(data, pd.DataFrame): mask = pd.DataFrame(columns=data.columns, index=data.index, dtype=bool) else: data_cp = data.copy() mask = [pd.DataFrame(columns=df.columns, index=df.index, dtype=bool) for df in data_cp] if isinstance(mask, (list, tuple)): mask = iter(mask) if is_valid_iterator(mask) and not isinstance(mask, ParquetStreamReader): mask = parquet_stream_from_iterable(mask) if not isinstance(mask, (pd.DataFrame, ParquetStreamReader)): raise TypeError(f"mask has unsupported type {type(mask)}") return mask