Source code for cdm_reader_mapper.common.replace

"""
Common Data Model (CDM) pandas replacement operators.

Created on Wed Jul  3 09:48:18 2019

Replace columns from right dataframe into left dataframe

Replacement occurs on a pivot column, that might have the same name in both
dfs (pivot_c) or be different (pivot_l and pivot_r)

Can replace one or multiple columns and support multiindexing (tested only on left, so far...)

Replacement arguments:
    - rep_c : list or string of column name(s) to replace, they are the same name in left and right
    - rep_map: dictionary with {col_l:col_r...} if not the same

@author: iregon
"""

from __future__ import annotations
from collections.abc import Iterable

import pandas as pd

from .iterators import ParquetStreamReader, ProcessFunction, process_function


def _replace_columns(
    df_l: pd.DataFrame,
    df_r: pd.DataFrame,
    pivot_c: str | None = None,
    pivot_l: str | None = None,
    pivot_r: str | None = None,
    rep_c: str | list[str] | None = None,
    rep_map: dict[str, str] | None = None,
) -> pd.DataFrame:
    """
    Helper function to replace columns in DataFrame.

    Parameters
    ----------
    df_l : pandas.DataFrame or Iterable[pd.dataFrame]
        The left DataFrame whose columns will be replaced.
    df_r : pandas.DataFrame or Iterable[pd.dataFrame]
        The right DataFrame providing replacement values.
    pivot_c : str, optional
        A single pivot column present in both DataFrames.
        Overrides `pivot_l` and `pivot_r`.
    pivot_l : str, optional
        Pivot column in `df_l`. Used only when `pivot_c` is not supplied.
    pivot_r : str, optional
        Pivot column in `df_r`. Used only when `pivot_c` is not supplied.
    rep_c : str or list of str, optional
        One or more column names to replace in `df_l`.
        Ignored if `rep_map` is supplied.
    rep_map : dict, optional
        Mapping between left and right column names as `{left_col: right_col}`.

    Returns
    -------
    pandas.DataFrame
        Updated DataFrame with replacements applied.

    Raises
    ------
    TypeError
        If `df_l` or `df_r` is not a pandas DataFrame.
    ValueError
        - If one of `pivot_l` and `pivot_r` is not defined.
        - If `rep_map` and `rep_c` is not defined.
        - If replacement source columns not found in `df_r`.
    """
    # Check inargs
    if not isinstance(df_l, pd.DataFrame) or not isinstance(df_r, pd.DataFrame):
        raise TypeError("Input left and right data must be pandas DataFrames.")

    if pivot_c is not None:
        pivot_l = pivot_r = pivot_c

    if pivot_l is None or pivot_r is None:
        raise ValueError("Pivot columns must be declared using `pivot_c` or both `pivot_l` and `pivot_r`.")

    if rep_map is None:
        if rep_c is None:
            raise ValueError("Replacement columns must be declared using `rep_c` or `rep_map`.")

        if isinstance(rep_c, str):
            rep_c = [rep_c]
        rep_map = {col: col for col in rep_c}

    missing_cols = [src for src in rep_map.values() if src not in df_r.columns]
    if missing_cols:
        raise ValueError(f"Replacement source columns not found in right DataFrame: {missing_cols}.")

    out = df_l.copy()
    right_lookup = df_r[[pivot_r, *rep_map.values()]].set_index(pivot_r).rename(columns={v: k for k, v in rep_map.items()})

    # Align once using reindex (vectorized, C-level)
    aligned = right_lookup.reindex(out[pivot_l].values)

    # Assign columns directly (fastest path)
    for col in aligned.columns:
        out[col] = aligned[col].values

    return out


[docs] def replace_columns( df_l: pd.DataFrame | Iterable[pd.dataFrame], df_r: pd.DataFrame | Iterable[pd.dataFrame], pivot_c: str | None = None, pivot_l: str | None = None, pivot_r: str | None = None, rep_c: str | list[str] | None = None, rep_map: dict[str, str] | None = None, ) -> pd.DataFrame | ParquetStreamReader: """ Replace columns in one DataFrame using row-matching from another. This function works for both a pd.DataFrame and any Iterable of of pandas DataFrames. Parameters ---------- df_l : pandas.DataFrame or Iterable[pd.dataFrame] The left DataFrame whose columns will be replaced. df_r : pandas.DataFrame or Iterable[pd.dataFrame] The right DataFrame providing replacement values. pivot_c : str, optional A single pivot column present in both DataFrames. Overrides `pivot_l` and `pivot_r`. pivot_l : str, optional Pivot column in `df_l`. Used only when `pivot_c` is not supplied. pivot_r : str, optional Pivot column in `df_r`. Used only when `pivot_c` is not supplied. rep_c : str or list of str, optional One or more column names to replace in `df_l`. Ignored if `rep_map` is supplied. rep_map : dict, optional Mapping between left and right column names as `{left_col: right_col}`. Returns ------- pd.DataFrame or ParquetStreamReader Updated data with replacements applied. Raises ------ TypeError If `df_l` or `df_r` is not a pandas DataFrame. ValueError - If one of `pivot_l` and `pivot_r` is not defined. - If `rep_map` and `rep_c` is not defined. - If replacement source columns not found in `df_r`. Notes ----- This function logs errors and returns `None` instead of raising exceptions. """ @process_function(data_only=True) def _replace_columns_hlp() -> ProcessFunction: """ Replace columns in one DataFrame using row-matching from another. This function works for both a pd.DataFrame and any Iterable of of pandas DataFrames. Returns ------- ProcessFunction Updated data with replacements applied. """ return ProcessFunction( data=df_l, func=_replace_columns, func_args=(df_r,), func_kwargs={ "pivot_c": pivot_c, "pivot_l": pivot_l, "pivot_r": pivot_r, "rep_c": rep_c, "rep_map": rep_map, }, makecopy=False, ) result = _replace_columns_hlp() if isinstance(result, pd.DataFrame): return pd.DataFrame(result) elif isinstance(result, ParquetStreamReader): return result raise ValueError(f"result mus be a pd.DataFrame or ParquetStreamReader, not {type(result)}.")