Source code for cdm_reader_mapper.cdm_mapper.mapper

"""
Map Common Data Model (CDM).

Created on Thu Apr 11 13:45:38 2019

Maps data contained in a pandas DataFrame (or Iterable[pd.DataFrame]) to
the C3S Climate Data Store Common Data Model (CDM) header and observational
tables using the mapping information available in the tool's mapping library
for the input data model.

@author: iregon
"""

from __future__ import annotations
from collections.abc import Iterable, Sequence
from logging import Logger
from typing import Any, get_args

import pandas as pd

from cdm_reader_mapper.common import logging_hdlr
from cdm_reader_mapper.common.iterators import (
    ParquetStreamReader,
    ProcessFunction,
    process_function,
)

from . import properties
from .codes.codes import get_code_table
from .tables.tables import get_cdm_atts, get_imodel_maps
from .utils.conversions import convert_from_str_series
from .utils.mapping_functions import MappingFunctions


def _is_empty(value: Any) -> bool:
    """
    Check whether a value is considered empty.

    Parameters
    ----------
    value : Any
        Value to be checked.

    Returns
    -------
    bool
        True if value is not empty else False.
    """
    if value is None:
        return True

    if hasattr(value, "empty"):
        return bool(value.empty)

    if not value:
        return True

    return False


def _drop_duplicated_rows(df: pd.DataFrame) -> pd.DataFrame:
    """
    Drop duplicates from list.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame to drop duplictaed rows.

    Returns
    -------
    pd.DataFrame
        Input DataFrame with deleted duplicated rows.
    """
    list_cols = [col for col in df.columns if df[col].apply(lambda x: isinstance(x, list)).any()]

    for col in list_cols:
        df[col] = df[col].apply(lambda x: tuple(x) if isinstance(x, list) else x)

    df.drop_duplicates(ignore_index=True, inplace=True)

    for col in list_cols:
        if df[col].apply(lambda x: isinstance(x, tuple)).any():
            df[col] = df[col].apply(lambda x: list(x) if isinstance(x, tuple) else x)

    return df


def _get_nested_value(ndict: dict[Any, Any] | None, keys: list[Any]) -> Any | None:
    """
    Traverse nested dictionaries along a sequence of keys.

    Parameters
    ----------
    ndict : dict or None
        Input dictionary that may contain nested values.
    keys : list of Any
        List of keys in `ndict` containing nested values.

    Returns
    -------
    Any | None
        Any if `ndict` is not a dictionary or none of `keys` is in `ndict`, otherwise single nested value.
    """
    if not isinstance(ndict, dict):
        return None

    for key in keys:
        value = ndict.get(key)
        if value is None:
            return None

        if isinstance(value, dict):
            ndict = value
        else:
            return value

    return None


def _transform(
    data: pd.DataFrame | pd.Series,
    imodel_functions: MappingFunctions,
    transform: str,
    kwargs: dict[str, Any],
    logger: Logger,
) -> pd.Series:
    """
    Apply a transformation function from imodel_functions to a pandas Series.

    Parameters
    ----------
    data : pd.DataFrame or pd.Series
        Input data to be transformed.
    imodel_functions : MappingFunctions
        Class containing mapping functions.
    transform : str
        Name of the mapping function.
    kwargs : dict
        Key-word arguments used by mapping function.
    logger : Logger
        Logger instance used for logging.

    Returns
    -------
    pd.Series
        Transformed Series.

    Notes
    -----
    Logs error if `transform` is not a valid mapping function name.
    """
    logger.debug("Applying transform: %s", transform)

    if kwargs:
        logger.debug("With kwargs: %s", ", ".join(kwargs.keys()))
    try:
        trans_func = getattr(imodel_functions, transform)
    except AttributeError:
        logger.error("Transform '%s' not found in imodel_functions", transform)
        return data

    return trans_func(data, **kwargs)


def _code_table(
    data: pd.DataFrame | pd.Series,
    data_model: str,
    code_table: str,
    logger: Logger,
) -> pd.Series:
    """
    Map values in a Series or DataFrame using a (possibly nested) code table.

    Parameters
    ----------
    data : pd.DataFrame or pd.Series
        Input data to be mapped.
    data_model : str
        Name of the data model (e.g. `icoads_r300_d701`).
    code_table : str
        Name of the code table (e.g. `at_units`).
    logger : Logger
        Logger instance used for logging.

    Returns
    -------
    pd.Series
        Series mapped by a code table.
    """
    logger.debug("Mapping code table: %s", code_table)
    table_map = get_code_table(*data_model.split("_"), code_table=code_table)

    df = data.to_frame() if isinstance(data, pd.Series) else data.copy()

    df = df.astype(str)

    df.columns = ["_".join(col) if isinstance(col, tuple) else str(col) for col in df.columns]

    def _map_col(col: Any) -> Any:
        """
        Map column using a code table.

        Parameters
        ----------
        col : Any
            Column values.

        Returns
        -------
        Any
            Mapped values.
        """
        return _get_nested_value(table_map, col.tolist())

    return df.apply(_map_col, axis=1)


def _default(
    default: Any,
    length: int,
) -> list[Any]:
    """
    Return a list of a given length filled with the default value.

    Parameters
    ----------
    default : Any
        Arbitrary default value.
    length : int
        Length of output list.

    Returns
    -------
    list
        List of length `length` containing value `default`.
    """
    return [default] * length


def _fill_value(data: pd.DataFrame | pd.Series, fill_value: Any | None) -> pd.DataFrame | pd.Series:
    """
    Fill missing values in series.

    Parameters
    ----------
    data : pd.DataFrame or pd.Series
        Input data to be filled with `fill_value`.
    fill_value : Any or None
        Fill value used to fill missing values in `data`.

    Returns
    -------
    pd.DataFrame or pd.Series
        Output data filled with `fill_value`.
    """
    if fill_value is None:
        return data
    return data.fillna(value=fill_value).infer_objects(copy=False)


def _extract_input_data(
    data: pd.DataFrame | pd.Series, elements: list[str | tuple[str, str]], default: Any, logger: Logger
) -> tuple[pd.DataFrame | pd.Series, bool]:
    """
    Extract the relevant input data based on `elements`.

    Parameters
    ----------
    data : pd.DataFrame | pd.Series
        Input data containing relevant information.
    elements : list of str or list of tuple
        Elements to be extracted.
    default : Any
        Default value if `elements` is empty.
    logger : Logger
        Logger instance used for logging.

    Returns
    -------
    tuple of pd.DataFrame or tuple of pd.Series
        Data containing relevant information only.

    Notes
    -----
    - Logs `elements` as 'DEBUG'.
    - Logs WARNING if one of `elements` is not in columns of `data`.
    """

    def _return_default(bool: bool) -> tuple[pd.Series, bool]:
        """
        Create series containing default values.

        Parameters
        ----------
        bool : bool
            True if default value is defined, otherwise False.

        Returns
        -------
        tuple of pd.Series and bool
            Data containing default value and boolean value whether default value is defined.
        """
        return pd.Series(_default(default, len(data)), index=data.index), bool

    if not elements:
        if default is None:
            bool = False
        else:
            bool = True
        return _return_default(bool)

    logger.debug("\telements: %s", " ".join(map(str, elements)))

    cols = data.columns

    for e in elements:
        if e not in cols:
            logger.warning("Missing element from input data: %s", e)
            return _return_default(True)

    result = data[elements[0]] if len(elements) == 1 else data[elements]

    if _is_empty(result):
        return _return_default(True)

    return result, False


def _column_mapping(
    data: pd.DataFrame | pd.Series,
    imapping: dict[str, Any],
    imodel_functions: MappingFunctions,
    atts: dict[str, Any],
    codes_subset: str | tuple[str, str] | list[str | tuple[str, str]],
    column: str | tuple[str, str],
    logger: Logger,
) -> pd.Series:
    """
    Map a column (or multiple elements) in input data according to mapping rules.

    Parameters
    ----------
    data : pd.DataFrame or pd.Series
        Input data to be mapped.
    imapping : dict
        Mapping dictionary.
    imodel_functions : MappingFunctions
        Class containing mapping functions.
    atts : dict
        Attribute dictionary.
    codes_subset : str or tuple of str or list of str or list of tuple of str
        Subset to be mapped.
    column : str or tuple of str
        Output column name.
    logger : Logger
        Logger instance used for logging.

    Returns
    -------
    pd.Series
        Mapped output data.
    """
    elements: list[str | tuple[str, str]] = imapping.get("elements", [])
    transform: str | None = imapping.get("transform")
    kwargs: dict[str, Any] = imapping.get("kwargs", {})
    code_table: str | None = imapping.get("code_table")
    default: Any = imapping.get("default")
    fill_value: Any = imapping.get("fill_value")

    if codes_subset and code_table not in codes_subset:
        code_table = None

    extracted, used_default = _extract_input_data(
        data,
        elements,
        default,
        logger,
    )

    if not used_default:
        if transform:
            extracted = _transform(
                extracted,
                imodel_functions,
                transform,
                kwargs,
                logger=logger,
            )
        elif code_table:
            extracted = _code_table(
                extracted,
                imodel_functions.imodel,
                code_table,
                logger=logger,
            )

    if not isinstance(extracted, pd.Series):
        extracted = pd.Series(extracted, index=data.index, copy=False)

    extracted.name = column

    if fill_value is not None:
        extracted = _fill_value(extracted, fill_value)

    atts_combined = {**atts, **imapping}
    return convert_from_str_series(
        extracted,
        atts_combined,
    )


def _table_mapping(
    data: pd.DataFrame,
    mapping: dict[str, Any],
    atts: dict[str, Any],
    imodel_functions: MappingFunctions,
    codes_subset: str | tuple[str, str] | list[str | tuple[str, str]],
    cdm_complete: bool,
    drop_missing_obs: bool,
    drop_duplicates: bool,
    logger: Logger,
) -> pd.DataFrame:
    """
    Map a CDM table in input data according to mapping rules.

    Parameters
    ----------
    data : pd.DataFrame
        Input data to be mapped.
    mapping : dict
        Mapping dictionary containing column-specific mapping information.
    atts : dict
        Attribute dictionary.
    imodel_functions : MappingFunctions
        Class containing mapping functions.
    codes_subset : str or tuple of str or list of str or list of tuple of str
        Subset of codes to be mapped.
    cdm_complete : bool
        If True process complete CDM table.
    drop_missing_obs : bool
        If True drop lines if column `observation_value` is missing.
    drop_duplicates : bool
        If True drop duplicated rows.
    logger : Logger
        Logger instance used for logging.

    Returns
    -------
    pd.DataFrame
        Mapped CDM table.

    Notes
    -----
    Logs each entry in `columns` as DEBUG.
    """
    columns = list(atts) if cdm_complete else [c for c in atts if c in data.columns]
    out = {}

    for column in columns:
        logger.debug("	Element: %s", column)

        out[column] = _column_mapping(
            data,
            mapping.get(column, {}),
            imodel_functions,
            atts[column],
            codes_subset,
            column,
            logger,
        )

    if not out:
        return pd.DataFrame(index=data.index)

    table_df = pd.DataFrame(out, index=data.index)

    if drop_missing_obs is True and "observation_value" in table_df:
        table_df = table_df.dropna(subset=["observation_value"])

    if drop_duplicates:
        table_df = _drop_duplicated_rows(table_df)

    return table_df


def _prepare_cdm_tables(cdm_subset: str | Sequence[str]) -> dict[str, Any]:
    """
    Prepare table buffers and attributes for CDM tables.

    Parameters
    ----------
    cdm_subset : str or tuple of str or Sequence of str or Sequence of tuple of str
        Subset of CDM tables.

    Returns
    -------
    dict
        CDM table-specific mapping attributes.
    """
    if isinstance(cdm_subset, str):
        cdm_subset = [cdm_subset]

    cdm_atts = get_cdm_atts(cdm_subset)
    if not cdm_atts:
        return {}

    tables = {table: atts for table, atts in cdm_atts.items()}

    return tables


def _map_data_model(
    data: pd.DataFrame,
    imodel_maps: dict[str, Any],
    imodel_functions: MappingFunctions,
    cdm_tables: dict[str, Any],
    codes_subset: str | tuple[str, str] | list[str | tuple[str, str]],
    cdm_complete: bool,
    drop_missing_obs: bool,
    drop_duplicates: bool,
    logger: Logger,
) -> tuple[pd.DataFrame, pd.Index | pd.MultiIndex]:
    """
    Process one chunk of input data.

    Parameters
    ----------
    data : pd.DataFrame
        Input data to be mapped.
    imodel_maps : dict
        A imodel-specific mapping dictionary.
    imodel_functions : MappingFunctions
        Class containing mapping functions.
    cdm_tables : str or tuple of str or Sequence of str or Sequence of tuple of str
        Subset of CDM tables.
    codes_subset : str or tuple of str or list of str or list of tuple of str
        Subset of codes to be mapped.
    cdm_complete : bool
        If True process complete CDM table.
    drop_missing_obs : bool
        If True drop lines if column `observation_value` is missing.
    drop_duplicates : bool
        If True drop duplicated rows.
    logger : Logger
        Logger instance used for logging.

    Returns
    -------
    tuple of pd.DataFrame and pd.Index or pd.MultiIndex
        Data containing all requested CDM tables and their columns.
    """
    if ":" in data.columns[0]:
        data.columns = pd.MultiIndex.from_tuples(col.split(":") for col in data.columns)

    all_tables = []
    for table, table_atts in cdm_tables.items():
        logger.debug("Table: %s", table)
        table_maps = imodel_maps[table]
        table_df = _table_mapping(
            data=data,
            mapping=table_maps,
            atts=table_atts,
            imodel_functions=imodel_functions,
            codes_subset=codes_subset,
            cdm_complete=cdm_complete,
            drop_missing_obs=drop_missing_obs,
            drop_duplicates=drop_duplicates,
            logger=logger,
        )

        table_df.columns = pd.MultiIndex.from_product([[table], table_df.columns])
        all_tables.append(table_df)

    tables_df = pd.concat(all_tables, axis=1, join="outer").reset_index(drop=True)
    columns = tables_df.columns
    return tables_df, columns


[docs] def map_model( data: pd.DataFrame | Iterable[pd.DataFrame], imodel: str | None, cdm_subset: str | list[str] | None = None, codes_subset: str | list[str] | None = None, cdm_complete: bool = True, drop_missing_obs: bool = True, drop_duplicates: bool = True, log_level: str = "INFO", ) -> pd.DataFrame | ParquetStreamReader: """ Map a pandas DataFrame to the CDM header and observational tables. Parameters ---------- data : pandas.DataFrame or Iterable[pd.DataFrame] Input data to map. imodel : str A specific mapping from generic data model to CDM, like map a SID-DCK from IMMA1’s core and attachments to CDM in a specific way, e.g. ``icoads_r300_d704``. cdm_subset : str or list, optional Subset of CDM model tables to map. Defaults to the full set of CDM tables defined for the imodel. codes_subset : str or list, optional Subset of code mapping tables to map. Default to the full set of code mapping tables defined for the imodel. cdm_complete : bool, default: True If True map entire CDM tables list. drop_missing_obs : bool, default: True If True Drop observations without a valid observation value, e.g. no air_temperature value. drop_duplicates : bool, default: True If True drop duplicated rows. log_level : str, default: INFO Level of logging information to save. Returns ------- cdm_tables: pandas.DataFrame DataFrame with MultiIndex columns (cdm_table, column_name). Raises ------ ValueError - If `imodel` is not defined. - If first split entry ('_') of `imodel` is not defined. - If mapping does not return a DataFame. TypeError - If type of `imodel` is not supported. - If anything during mapping fails. """ @process_function() def _map_model() -> ProcessFunction: """ Map model for both DataFrames and ParquetStreamReader. Returns ------- ProcessFunction Tuple of pd.DataFrame and its columns. """ return ProcessFunction( data=data, func=_map_data_model, func_kwargs={ "imodel_maps": imodel_maps, "imodel_functions": imodel_functions, "cdm_tables": cdm_tables, "codes_subset": codes_subset, "cdm_complete": cdm_complete, "drop_missing_obs": drop_missing_obs, "drop_duplicates": drop_duplicates, "logger": logger, }, makecopy=False, ) logger = logging_hdlr.init_logger(__name__, level=log_level) if imodel is None: raise ValueError("Input data model 'imodel' is not defined.") if not isinstance(imodel, str): raise TypeError(f"Input data model type is not supported: {type(imodel)}") data_model = imodel.split("_") if data_model[0] not in get_args(properties.SupportedDataModels): raise ValueError(f"Input data model {data_model[0]} not supported") if not cdm_subset: cdm_subset = properties.cdm_tables imodel_maps = get_imodel_maps(*data_model, cdm_tables=cdm_subset) imodel_functions = MappingFunctions(imodel) cdms = list(imodel_maps.keys()) cdm_tables = _prepare_cdm_tables(cdms) results = _map_model() if not isinstance(results, tuple): raise TypeError(f"result is not a tuple, {type(results)}.") result, columns = tuple(results) if isinstance(result, pd.DataFrame): return result if isinstance(result, ParquetStreamReader): result.columns = columns return result raise ValueError(f"result mus be a pd.DataFrame or ParquetStreamReader, not {type(result)}.")