Source code for cdm_reader_mapper.common.inspect

"""
Common Data Model (CDM) pandas inspection operators.

Created on Wed Jul  3 09:48:18 2019

@author: iregon
"""

from __future__ import annotations
from collections.abc import Iterable, Mapping
from typing import Any

import pandas as pd

from .iterators import ParquetStreamReader, ProcessFunction, process_function


[docs] def merge_sum_dicts(dicts: list[Mapping[str, Any]]) -> dict[str, Any]: """ Recursively merge dictionaries, summing numeric values at the leaves. Parameters ---------- dicts : list of Mapping A list of dictionaries for recursiv merging. Returns ------- dict Recursively merged dictionary. """ result = {} for d in dicts: for key, value in d.items(): if key not in result: result[key] = value else: if isinstance(value, Mapping) and isinstance(result[key], Mapping): result[key] = merge_sum_dicts([result[key], value]) else: result[key] += value return result
def _count_by_cat(df: pd.DataFrame, columns: list[Any]) -> dict[Any, int]: """ Count unique values in a pandas DataFrame, including NaNs. Parameters ---------- df : pd.DataFrame DataFrame to count unique values. columns : list of Any Column names for counting unique values. Returns ------- dict Dictionary containing name and amount of unique values. """ count_dict: dict[Any, int] = {} for column in columns: counts = df[column].value_counts(dropna=False) counts.index = counts.index.where(~counts.index.isna(), "nan") count_dict[column] = counts.to_dict() return count_dict
[docs] @process_function() def count_by_cat( data: pd.DataFrame | Iterable[pd.DataFrame], columns: str | tuple[str, str] | list[str | tuple[str, str]] | None = None, ) -> dict[str | tuple[str, str], dict[Any, int]]: """ Count unique values per column in a DataFrame or a Iterable of DataFrame. Parameters ---------- data : pandas.DataFrame or Iterable[pd.DataFrame] Input dataset. columns : str, list or tuple, optional Name(s) of the data column(s) to be selected. If None, all columns are used. Returns ------- Dict[str | tuple[str, str], int] Dictionary where each key is a column name, and each value is a dictionary mapping unique values (including NaN as 'nan') to their counts. Notes ----- - Works with large files via ParquetStreamReader by iterating through chunks. """ if columns is None: if not isinstance(data, pd.DataFrame): raise TypeError(f"data must be a pandas DataFrame, not {type(data)}.") columns = list(data.columns) if isinstance(columns, str): columns = [columns] else: columns = list(columns) result = ProcessFunction( data=data, func=_count_by_cat, func_kwargs={"columns": columns}, non_data_output="acc", makecopy=False, non_data_proc=merge_sum_dicts, ) # The decorator converts the ProcessFunction to the requested return data type. return result # type: ignore[return-value]
def _get_length(data: pd.DataFrame) -> int: """ Get length pd.DataFrame. Parameters ---------- data : pd.DataFrame DataFrame to get length. Returns ------- int Length of `data`. """ return len(data)
[docs] @process_function() def get_length(data: pd.DataFrame | Iterable[pd.DataFrame] | ParquetStreamReader) -> int: """ Get the total number of rows in a pandas object. Parameters ---------- data : pandas.DataFrame or Iterable[pd.DataFrame] Input dataset. Returns ------- int Total number of rows. Notes ----- - Works with large files via ParquetStreamReader by using a specialized handler to count rows without loading the entire file into memory. """ if hasattr(data, "_row_count"): return int(data._row_count) result = ProcessFunction( data=data, func=_get_length, non_data_output="acc", makecopy=True, non_data_proc=sum, ) # The decorator converts the ProcessFunction to the requested return data type. return result # type: ignore[return-value]