Source code for cdm_reader_mapper.common.select

# noqa: D100
"""
Common Data Model (CDM) pandas selection operators.

Created on Wed Jul  3 09:48:18 2019

@author: iregon
"""
from __future__ import annotations

from typing import Iterable

import pandas as pd

from .iterators import ParquetStreamReader, ProcessFunction, process_function


def _concat_indexes(idx_dict):
    selected_idx = pd.Index([]).append(idx_dict[0])
    rejected_idx = pd.Index([]).append(idx_dict[1])
    selected_idx = selected_idx.drop_duplicates()
    rejected_idx = rejected_idx.drop_duplicates()
    return selected_idx, rejected_idx


def _reset_index(data, reset_index=False):
    if reset_index is False:
        return data
    return data.reset_index(drop=True)


def _split_df(
    df: pd.DataFrame,
    mask: pd.DataFrame,
    inverse: bool = False,
    return_rejected: bool = False,
):
    if inverse:
        selected = df[~mask]
        rejected = df[mask] if return_rejected else df.iloc[0:0]
    else:
        selected = df[mask]
        rejected = df[~mask] if return_rejected else df.iloc[0:0]

    selected_idx = mask.index[mask]
    rejected_idx = mask.index[~mask]
    return selected, rejected, selected_idx, rejected_idx


def _split_by_boolean_df(df: pd.DataFrame, mask: pd.DataFrame, boolean: bool, **kwargs):
    if mask.empty:
        mask_sel = pd.Series(boolean, index=df.index)
    else:
        mask_sel = mask.all(axis=1) if boolean else ~mask.any(axis=1)
        mask_sel = mask_sel.fillna(boolean)
    return _split_df(df=df, mask=mask_sel, **kwargs)


def _split_by_column_df(
    df: pd.DataFrame,
    col: str,
    values: Iterable,
    **kwargs,
):
    mask_sel = df[col].isin(values)
    mask_sel.name = col

    return _split_df(df=df, mask=mask_sel, **kwargs)


def _split_by_index_df(
    df: pd.DataFrame,
    index,
    **kwargs,
):
    index = pd.Index(index if isinstance(index, Iterable) else [index])
    mask_sel = pd.Series(df.index.isin(index), index=df.index)
    return _split_df(df=df, mask=mask_sel, **kwargs)


PSR_KWARGS = {
    "makecopy": False,
    "non_data_output": "acc",
    "non_data_proc": _concat_indexes,
}


[docs] def split_by_boolean( data: pd.DataFrame | Iterable[pd.DataFrame], mask: pd.DataFrame | Iterable[pd.DataFrame], boolean: bool, reset_index: bool = False, inverse: bool = False, return_rejected: bool = False, ) -> tuple[ pd.DataFrame | ParquetStreamReader, pd.DataFrame | ParquetStreamReader, pd.Index | pd.MultiIndex, pd.Index | pd.MultiIndex, ]: """ Split a DataFrame using a boolean mask via ``split_dataframe_by_boolean``. Parameters ---------- data : pandas.DataFrame or Iterable[pd.DataFrame] DataFrame to be split. mask : pandas.DataFrame or Iterable[pd.DataFrame] Boolean mask with the same length as ``data``. boolean : bool Determines mask interpretation: - ``True`` ? select rows where **all** mask columns are True. - ``False`` ? select rows where **any** mask column is False. reset_index : bool, optional If ``True``, reset the index of returned DataFrames. inverse : bool, optional If ``True``, invert the selection performed by the underlying function. return_rejected : bool, optional If ``True``, return rejected rows as the second output. If ``False``, the rejected output is empty but dtype-preserving. Returns ------- (pandas.DataFrame or ParquetStreamReader, pandas.DataFrame or ParquetStreamReader, pd.Index or pd.MultiIndex, pd.Index or pd.MultiIndex) Selected rows (all mask columns True), rejected rows, original indexes of selection and original indexes of rejection. """ @process_function(postprocessing={"func": _reset_index, "kwargs": "reset_index"}) def _split_by_boolean(reset_index=reset_index): return ProcessFunction( data=data, func=_split_by_boolean_df, func_args=(mask, boolean), func_kwargs={"inverse": inverse, "return_rejected": return_rejected}, **PSR_KWARGS, ) result = _split_by_boolean() return tuple(result)
[docs] def split_by_boolean_true( data: pd.DataFrame, mask: pd.DataFrame, reset_index: bool = False, inverse: bool = False, return_rejected: bool = False, ) -> tuple[ pd.DataFrame | ParquetStreamReader, pd.DataFrame | ParquetStreamReader, pd.Index | pd.MultiIndex, pd.Index | pd.MultiIndex, ]: """ Split rows where all mask columns are ``True``. Parameters ---------- data : pandas.DataFrame DataFrame to be split. mask : pandas.DataFrame Boolean mask with the same length as ``data``. reset_index : bool, optional If ``True``, reset indices in returned DataFrames. inverse : bool, optional If ``True``, invert the selection. return_rejected : bool, optional If ``True``, return rejected rows as the second output. If ``False``, the rejected output is empty but dtype-preserving. Returns ------- (pandas.DataFrame or ParquetStreamReader, pandas.DataFrame or ParquetStreamReader, pd.Index or pd.MultiIndex, pd.Index or pd.MultiIndex) Selected rows (all mask columns True), rejected rows, original indexes of selection and original indexes of rejection. """ return split_by_boolean( data, mask, True, reset_index=reset_index, inverse=inverse, return_rejected=return_rejected, )
def split_by_boolean_false( data: pd.DataFrame, mask: pd.DataFrame, reset_index: bool = False, inverse: bool = False, return_rejected: bool = False, ) -> tuple[ pd.DataFrame | ParquetStreamReader, pd.DataFrame | ParquetStreamReader, pd.Index | pd.MultiIndex, pd.Index | pd.MultiIndex, ]: """ Split rows where at least one mask column is ``False``. Parameters ---------- data : pandas.DataFrame DataFrame to be split. mask : pandas.DataFrame Boolean mask with the same length as ``data``. reset_index : bool, optional If ``True``, reset indices in returned DataFrames. inverse : bool, optional If ``True``, invert the selection. return_rejected : bool, optional If ``True``, return rejected rows as the second output. If ``False``, the rejected output is empty but dtype-preserving. Returns ------- (pandas.DataFrame or ParquetStreamReader, pandas.DataFrame or ParquetStreamReader, pd.Index or pd.MultiIndex, pd.Index or pd.MultiIndex) Selected rows (all mask columns True), rejected rows, original indexes of selection and original indexes of rejection. """ return split_by_boolean( data, mask, False, reset_index=reset_index, inverse=inverse, return_rejected=return_rejected, )
[docs] def split_by_column_entries( data: pd.DataFrame, selection: dict[str, Iterable], reset_index: bool = False, inverse: bool = False, return_rejected: bool = False, ) -> tuple[ pd.DataFrame | ParquetStreamReader, pd.DataFrame | ParquetStreamReader, pd.Index | pd.MultiIndex, pd.Index | pd.MultiIndex, ]: """ Split a DataFrame based on matching values in a given column. Parameters ---------- data : pandas.DataFrame DataFrame to be split. selection : dict Mapping of a column name to an iterable of allowed values. Example: ``{"city": ["London", "Berlin"]}``. reset_index : bool, optional Whether to reset index in returned DataFrames. inverse : bool, optional If ``True``, invert the selection. return_rejected : bool, optional If ``True``, return rejected rows as the second output. If ``False``, the rejected output is empty but dtype-preserving. Returns ------- (pandas.DataFrame or ParquetStreamReader, pandas.DataFrame or ParquetStreamReader, pd.Index or pd.MultiIndex, pd.Index or pd.MultiIndex) Selected rows (all mask columns True), rejected rows, original indexes of selection and original indexes of rejection. """ @process_function(postprocessing={"func": _reset_index, "kwargs": "reset_index"}) def _split_by_column_entries(reset_index=reset_index): return ProcessFunction( data=data, func=_split_by_column_df, func_args=(col, values), func_kwargs={"inverse": inverse, "return_rejected": return_rejected}, **PSR_KWARGS, ) col, values = next(iter(selection.items())) result = _split_by_column_entries() return tuple(result)
[docs] def split_by_index( data: pd.DataFrame, index, reset_index: bool = False, inverse: bool = False, return_rejected: bool = False, ) -> tuple[ pd.DataFrame | ParquetStreamReader, pd.DataFrame | ParquetStreamReader, pd.Index | pd.MultiIndex, pd.Index | pd.MultiIndex, ]: """ Split a DataFrame by selecting specific index labels. Parameters ---------- data : pandas.DataFrame DataFrame to be split. index : label or sequence of labels Index values to select. reset_index : bool, optional If ``True``, reset index in returned DataFrames. inverse : bool, optional If ``True``, select rows **not** in ``index``. return_rejected : bool, optional If ``True``, return rejected rows as the second output. If ``False``, the rejected output is empty but dtype-preserving. Returns ------- (pandas.DataFrame or ParquetStreamReader, pandas.DataFrame or ParquetStreamReader, pd.Index or pd.MultiIndex, pd.Index or pd.MultiIndex) Selected rows (all mask columns True), rejected rows, original indexes of selection and original indexes of rejection. """ @process_function(postprocessing={"func": _reset_index, "kwargs": "reset_index"}) def _split_by_index(reset_index=reset_index): return ProcessFunction( data=data, func=_split_by_index_df, func_args=(index,), func_kwargs={"inverse": inverse, "return_rejected": return_rejected}, **PSR_KWARGS, ) result = _split_by_index() return tuple(result)