Source code for cdm_reader_mapper.cdm_mapper.mapper

"""
Map Common Data Model (CDM).

Created on Thu Apr 11 13:45:38 2019

Maps data contained in a pandas DataFrame (or Iterable[pd.DataFrame]) to
the C3S Climate Data Store Common Data Model (CDM) header and observational
tables using the mapping information available in the tool's mapping library
for the input data model.

@author: iregon
"""

from __future__ import annotations

from typing import Any, Iterable, get_args

import pandas as pd

from cdm_reader_mapper.common import logging_hdlr

from cdm_reader_mapper.common.iterators import (
    ParquetStreamReader,
    ProcessFunction,
    process_function,
)

from . import properties
from .codes.codes import get_code_table
from .utils.conversions import convert_from_str_series
from .tables.tables import get_cdm_atts, get_imodel_maps
from .utils.mapping_functions import mapping_functions


def _is_empty(value):
    """Check whether a value is considered empty."""
    if value is None:
        return True

    if hasattr(value, "empty"):
        return bool(value.empty)

    if not value:
        return True

    return False


def _drop_duplicated_rows(df) -> pd.DataFrame:
    """Drop duplicates from list."""
    list_cols = [
        col for col in df.columns if df[col].apply(lambda x: isinstance(x, list)).any()
    ]

    for col in list_cols:
        df[col] = df[col].apply(lambda x: tuple(x) if isinstance(x, list) else x)

    df.drop_duplicates(ignore_index=True, inplace=True)

    for col in list_cols:
        if df[col].apply(lambda x: isinstance(x, tuple)).any():
            df[col] = df[col].apply(lambda x: list(x) if isinstance(x, tuple) else x)

    return df


def _get_nested_value(ndict, keys) -> Any | None:
    """Traverse nested dictionaries along a sequence of keys."""
    if not isinstance(ndict, dict):
        return None

    current = ndict
    for key in keys:
        if not isinstance(current, dict):
            return None

        value = current.get(key)
        if value is None:
            return None

        if isinstance(value, dict):
            current = value
        else:
            return value

    return None


def _transform(
    data,
    imodel_functions,
    transform,
    kwargs,
    logger,
) -> pd.Series:
    """Apply a transformation function from imodel_functions to a pandas Series."""
    logger.debug(f"Applying transform: {transform}")

    if kwargs:
        logger.debug(f"With kwargs: {', '.join(kwargs.keys())}")
    try:
        trans_func = getattr(imodel_functions, transform)
    except AttributeError:
        logger.error(f"Transform '{transform}' not found in imodel_functions")
        return data

    return trans_func(data, **kwargs)


def _code_table(
    data,
    data_model,
    code_table,
    logger,
) -> pd.Series:
    """Map values in a Series or DataFrame using a (possibly nested) code table."""
    logger.debug(f"Mapping code table: {code_table}")
    table_map = get_code_table(*data_model.split("_"), code_table=code_table)

    df = data.to_frame() if isinstance(data, pd.Series) else data.copy()

    df = df.astype(str)

    df.columns = [
        "_".join(col) if isinstance(col, tuple) else str(col) for col in df.columns
    ]

    def _map_col(col):
        return _get_nested_value(table_map, col.tolist())

    return df.apply(_map_col, axis=1)


def _default(
    default,
    length,
) -> list:
    """Return a list of a given length filled with the default value."""
    return [default] * length


def _fill_value(series, fill_value) -> pd.Series:
    """Fill missing values in series."""
    if fill_value is None:
        return series
    return series.fillna(value=fill_value).infer_objects(copy=False)


def _extract_input_data(idata, elements, default, logger):
    """Extract the relevant input data based on `elements`."""

    def _return_default(bool):
        return pd.Series(_default(default, len(idata)), index=idata.index), bool

    if not elements:
        if default is None:
            bool = False
        else:
            bool = True
        return _return_default(bool)

    logger.debug(f"\telements: {' '.join(map(str, elements))}")

    cols = idata.columns

    for e in elements:
        if e not in cols:
            logger.warning(f"Missing element from input data: {e}")
            return _return_default(True)

    data = idata[elements[0]] if len(elements) == 1 else idata[elements]

    if _is_empty(data):
        return _return_default(True)

    return data, False


def _column_mapping(
    idata,
    imapping,
    imodel_functions,
    atts,
    codes_subset,
    column,
    logger,
):
    """Map a column (or multiple elements) in input data according to mapping rules."""
    elements = imapping.get("elements")
    transform = imapping.get("transform")
    kwargs = imapping.get("kwargs", {})
    code_table = imapping.get("code_table")
    default = imapping.get("default")
    fill_value = imapping.get("fill_value")

    if codes_subset and code_table not in codes_subset:
        code_table = None

    data, used_default = _extract_input_data(
        idata,
        elements,
        default,
        logger,
    )

    if not used_default:
        if transform:
            data = _transform(
                data,
                imodel_functions,
                transform,
                kwargs,
                logger=logger,
            )
        elif code_table:
            data = _code_table(
                data,
                imodel_functions.imodel,
                code_table,
                logger=logger,
            )

    if not isinstance(data, pd.Series):
        data = pd.Series(data, index=idata.index, copy=False)

    data.name = column

    if fill_value is not None:
        data = _fill_value(data, fill_value)

    atts_combined = {**atts, **imapping}
    return convert_from_str_series(
        data,
        atts_combined,
    )


def _table_mapping(
    idata,
    mapping,
    atts,
    imodel_functions,
    codes_subset,
    cdm_complete,
    drop_missing_obs,
    drop_duplicates,
    logger,
) -> pd.DataFrame:
    columns = list(atts) if cdm_complete else [c for c in atts if c in idata.columns]
    out = {}

    for column in columns:
        logger.debug(f"\tElement: {column}")

        out[column] = _column_mapping(
            idata,
            mapping.get(column, {}),
            imodel_functions,
            atts[column],
            codes_subset,
            column,
            logger,
        )

    if not out:
        return pd.DataFrame(index=idata.index)

    table_df = pd.DataFrame(out, index=idata.index)

    if drop_missing_obs is True and "observation_value" in table_df:
        table_df = table_df.dropna(subset=["observation_value"])

    if drop_duplicates:
        table_df = _drop_duplicated_rows(table_df)

    return table_df


def _prepare_cdm_tables(cdm_subset):
    """Prepare table buffers and attributes for CDM tables."""
    if isinstance(cdm_subset, str):
        cdm_subset = [cdm_subset]

    cdm_atts = get_cdm_atts(cdm_subset)
    if not cdm_atts:
        return {}

    tables = {}
    for table, atts in cdm_atts.items():
        tables[table] = atts

    return tables


def _map_data_model(
    idata,
    imodel_maps,
    imodel_functions,
    cdm_tables,
    codes_subset,
    cdm_complete,
    drop_missing_obs,
    drop_duplicates,
    logger,
):
    """Process one chunk of input data."""
    if ":" in idata.columns[0]:
        idata.columns = pd.MultiIndex.from_tuples(
            col.split(":") for col in idata.columns
        )

    all_tables = []
    for table, table_atts in cdm_tables.items():
        logger.debug(f"Table: {table}")
        table_maps = imodel_maps[table]
        table_df = _table_mapping(
            idata=idata,
            mapping=table_maps,
            atts=table_atts,
            imodel_functions=imodel_functions,
            codes_subset=codes_subset,
            cdm_complete=cdm_complete,
            drop_missing_obs=drop_missing_obs,
            drop_duplicates=drop_duplicates,
            logger=logger,
        )

        table_df.columns = pd.MultiIndex.from_product([[table], table_df.columns])
        all_tables.append(table_df)

    tables_df = pd.concat(all_tables, axis=1, join="outer").reset_index(drop=True)
    columns = tables_df.columns
    return tables_df, columns


[docs] def map_model( data: pd.DataFrame | Iterable[pd.DataFrame], imodel: str, cdm_subset: str | list[str] | None = None, codes_subset: str | list[str] | None = None, cdm_complete: bool = True, drop_missing_obs: bool = True, drop_duplicates: bool = True, log_level: str = "INFO", ) -> pd.DataFrame | ParquetStreamReader: """Map a pandas DataFrame to the CDM header and observational tables. Parameters ---------- data: pandas.DataFrame or Iterable[pd.DataFrame] input data to map. imodel: str A specific mapping from generic data model to CDM, like map a SID-DCK from IMMA1’s core and attachments to CDM in a specific way. e.g. ``icoads_r300_d704`` cdm_subset: str or list, optional subset of CDM model tables to map. Defaults to the full set of CDM tables defined for the imodel. codes_subset: str or list, optional subset of code mapping tables to map. Default to the full set of code mapping tables defined for the imodel. cdm_complete: bool If True map entire CDM tables list. Default: True drop_missing_obs: bool If True Drop observations without a valid observation value (e.g. no air_temperature value). Default: True drop_duplicates: bool If True drop duplicated rows. Default: True log_level: str level of logging information to save. Default: INFO. Returns ------- cdm_tables: pandas.DataFrame DataFrame with MultiIndex columns (cdm_table, column_name). """ @process_function() def _map_model(): return ProcessFunction( data=data, func=_map_data_model, func_kwargs={ "imodel_maps": imodel_maps, "imodel_functions": imodel_functions, "cdm_tables": cdm_tables, "codes_subset": codes_subset, "cdm_complete": cdm_complete, "drop_missing_obs": drop_missing_obs, "drop_duplicates": drop_duplicates, "logger": logger, }, makecopy=False, ) logger = logging_hdlr.init_logger(__name__, level=log_level) if imodel is None: raise ValueError("Input data model 'imodel' is not defined.") if not isinstance(imodel, str): raise TypeError(f"Input data model type is not supported: {type(imodel)}") data_model = imodel.split("_") if data_model[0] not in get_args(properties.SupportedDataModels): raise ValueError("Input data model " f"{data_model[0]}" " not supported") if not cdm_subset: cdm_subset = properties.cdm_tables imodel_maps = get_imodel_maps(*data_model, cdm_tables=cdm_subset) imodel_functions = mapping_functions(imodel) cdm_tables = _prepare_cdm_tables(imodel_maps.keys()) results = _map_model() result, columns = tuple(results) if isinstance(result, pd.DataFrame): return result if isinstance(result, ParquetStreamReader): result.columns = columns return result raise ValueError( f"result mus be a pd.DataFrame or ParquetStreamReader, not {type(result)}." )