Source code for cdm_reader_mapper.cdm_mapper.reader

"""
Read Common Data Model (CDM) mapping tables.

Created on Thu Apr 11 13:45:38 2019

Reads files with the CDM table format from a file system to a pandas.Dataframe.

All CDM fields are read as objects. Null values are read with the specified null value
in the table files, or as NaN if the na_values argument is set to the a specific null
value in the file.

Reads the full set of files (default), a subset or a single table, as controlled
by cdm_subset:

    - When reading multiple tables, the resulting dataframe is multi-indexed in
        the columns, with (table-name, field) as column names. Merging of tables
        occurs on the report_id field.
    - When reading a single table, the resulting dataframe has simple indexing
        in the columns.

Reads the full set of fields (default) or a subset of it, as controlled by
param col_subset:

    - When reading multiple tables (default or subset), the col_subset is a
        dictionary like: col_subset = {table0:[columns],...tablen:[columns]}
        If a table is not specified in col_subset, all its fields are read.
    - When reading a single table, the col_subset is a list like:
        col_subset = [columns]
    - It is assumed that the column names are all conform to the cdm field names

The full table set (header, observations-"*") is assumed to be in the same directory.

Filenames for tables are assumed to be:
    tableName-<tb_id>.<extension>
with:
    valid tableName: as declared in properties.cdm_tables
    tb_id: any identifier including wildcards if required
    extension: defaulting to 'psv'

When specifying a subset of tables, valid names are those in properties.cdm_tables

@author: iregon
"""

from __future__ import annotations
import logging
import pathlib
from collections.abc import Callable
from pathlib import Path
from typing import Any, get_args

import pandas as pd

from cdm_reader_mapper.common import get_filename, logging_hdlr
from cdm_reader_mapper.core.databundle import DataBundle

from ..properties import SupportedFileTypes
from .properties import cdm_tables
from .utils.conversions import convert_from_str_df, convert_to_str_df
from .utils.utilities import get_cdm_subset, get_usecols


READERS: dict[str, Callable[..., pd.DataFrame | pd.Series]] = {
    "csv": pd.read_csv,
    "parquet": pd.read_parquet,
    "feather": pd.read_feather,
}

READER_KWARGS = {
    "csv": "usecols",
    "parquet": "columns",
    "feather": "columns",
}


def _read_file(
    ifile: str | Path,
    table: str,
    col_subset: str | list[str] | dict[str, Any] | None,
    data_format: SupportedFileTypes,
    **kwargs: Any,
) -> pd.DataFrame:
    r"""
    Read a single file into a DataFrame using a format-specific reader.

    Parameters
    ----------
    ifile : str or Path
        Path to the input file.
    table : str
        Table name used to determine column selection.
    col_subset : str, list of str or dict or None
        Column subset specification used to filter columns during reading.
    data_format : SupportedFileTypes
        File format used to select the appropriate reader.
    \**kwargs : Any
        Additional keyword arguments passed to the underlying reader.

    Returns
    -------
    pd.DataFrame
        DataFrame containing the selected data from the file.
    """
    usecols = get_usecols(table, col_subset)
    reader = READERS[data_format]
    reader_kwargs = {READER_KWARGS[data_format]: usecols, **kwargs}
    return reader(ifile, **reader_kwargs)


def _read_single_file(
    ifile: str | Path,
    data_format: SupportedFileTypes,
    cdm_subset: str | list[str],
    col_subset: str | list[str] | dict[str, Any] | None,
    null_label: str = "null",
    **kwargs: Any,
) -> pd.DataFrame:
    r"""
    Read and preprocess a single CDM file into a DataFrame.

    Parameters
    ----------
    ifile : str or Path
        Path to the input file.
    data_format : SupportedFileTypes
        File format used to read the file.
    cdm_subset : str or list of str
        CDM table(s) to read. Only the first entry is used.
    col_subset : str, list of str or dict or None
        Column subset specification used to filter columns.
    null_label : str, default: null
        Label used to identify and remove null-index rows.
    \**kwargs : Any
        Additional keyword arguments passed to the reader.

    Returns
    -------
    pd.DataFrame
        Processed DataFrame indexed by `report_id`. Returns an empty
        DataFrame if no data is found.
    """
    if isinstance(cdm_subset, str):
        cdm_subset = [cdm_subset]
    else:
        cdm_subset = list(cdm_subset)

    df = _read_file(
        ifile,
        table=cdm_subset[0],
        data_format=data_format,
        col_subset=col_subset,
        **kwargs,
    )

    if df.empty:
        return pd.DataFrame()

    df = df.set_index("report_id", drop=False)

    if null_label in df.index:
        return df.drop(index=null_label)

    return df


def _read_multiple_files(
    inp_dir: str,
    data_format: SupportedFileTypes,
    prefix: str | None = None,
    suffix: str | None = None,
    extension: str | None = None,
    separator: str | None = "-",
    cdm_subset: str | list[str] | None = None,
    col_subset: str | list[str] | dict[str, Any] | None = None,
    null_label: str = "null",
    logger: logging.Logger | None = None,
    **kwargs: Any,
) -> list[pd.DataFrame]:
    r"""
    Read multiple CDM files from a directory into a list of DataFrames.

    Files are matched using naming patterns derived from prefix, suffix,
    and table names. Each successfully read table is returned as a DataFrame
    with a hierarchical column index.

    Parameters
    ----------
    inp_dir : str
        Directory containing input files.
    data_format : SupportedFileTypes
        File format used to read the files.
    prefix : str, optional
        Prefix used in file name matching.
    suffix : str, optional
        Suffix used in file name matching.
    extension : str, optional
        File extension to filter files.
    separator : str, default "-"
        Separator used in file naming patterns.
    cdm_subset : str or list of str, optional
        CDM table(s) to read. Must not be None.
    col_subset : str or list of str or dict, optional
        Column subset specification used to filter columns.
    null_label : str, default: null
        Label used to identify and remove null-index rows.
    logger : logging.Logger or None
        Logger used for informational and warning messages. Must not be None.
    \**kwargs : Any
        Additional keyword arguments passed to file readers.

    Returns
    -------
    list of pandas.DataFrame
        List of DataFrames, one per successfully read table, each with
        a MultiIndex column structure.

    Raises
    ------
    ValueError
        If `cdm_subset` or `logger` is None.
    FileNotFoundError
        If no files match the constructed file pattern.
    """
    if cdm_subset is None:
        raise ValueError("cdm_subset must be a string or a list of strings, not None.")
    if logger is None:
        raise ValueError("logger must be a logging.logger, not None.")

    if suffix is None:
        suffix_pattern = "*"
    elif suffix == "*":
        suffix_pattern = "*"
    else:
        suffix_pattern = f"*{suffix}"

    # See if there's anything at all:
    full_pattern = get_filename([prefix, suffix_pattern], path=inp_dir, extension=extension, separator=separator)
    path_pattern = Path(full_pattern)
    base_dir = path_pattern.parent
    file_pattern = path_pattern.name
    files = list(base_dir.glob(file_pattern))

    if len(files) == 0:
        raise FileNotFoundError(f"No files found matching pattern {full_pattern}")

    df_list = []
    if not isinstance(cdm_subset, list):
        cdm_subset = [cdm_subset]

    for table in cdm_subset:
        if table not in cdm_tables:
            logger.warning("Requested table %s not defined in CDM", table)
            continue

        logger.info("Getting file path for pattern %s", table)
        table_pattern = [table]
        if prefix:
            table_pattern = [prefix] + table_pattern
        if suffix:
            table_pattern = table_pattern + [suffix_pattern]

        full_table_pattern = get_filename(table_pattern, path=inp_dir, extension=extension, separator=separator)
        table_path_pattern = Path(full_table_pattern)
        table_base_dir = table_path_pattern.parent
        table_file_pattern = table_path_pattern.name
        paths = list(table_base_dir.glob(table_file_pattern))

        if len(paths) != 1:
            logger.warning(
                "Pattern %s resulted in multiple files for table %s: %s Cannot securely retrieve cdm table(s)", table_pattern, table, paths
            )
            continue

        dfi = _read_single_file(
            paths[0],
            data_format=data_format,
            cdm_subset=[table],
            col_subset=col_subset,
            null_label=null_label,
            **kwargs,
        )

        if dfi.empty:
            logger.warning("Table %s empty in file system, not added to the final DF", table)
            continue

        dfi.columns = pd.MultiIndex.from_product([[table], dfi.columns])
        df_list.append(dfi)

    return df_list


[docs] def read_tables( source: str, data_format: SupportedFileTypes = "parquet", prefix: str | None = None, suffix: str | None = None, extension: str | None = None, separator: str | None = "-", cdm_subset: str | list[str] | None = None, col_subset: str | list[str] | dict[str, Any] | None = None, delimiter: str = "|", na_values: str | None = None, null_label: str = "null", from_str: bool | None = None, to_str: bool | None = None, imodel: str | None = None, **kwargs: Any, ) -> DataBundle: r""" Read CDM-table-like files from file system to a pandas.DataFrame. Parameters ---------- source : str The file (including path) or the path to the file(s) to be read. data_format : {"csv", "parquet", "feather"}, default: "parquet" Format of input data file(s). prefix : str, optional Prefix of file name structure: ``<prefix>-<table>-*<suffix>.<extension>``. Could de used if `source` is a valid directory path. suffix : str, optional Suffix of file name structure: ``<prefix>-<table>-*<suffix>.<extension>``. Could de used if `source` is a valid directory path. extension : str, optional Extension of file name structure: ``<prefix>-<table>-*<suffix>.<extension>``. Could de used if `source` is a valid directory path. separator : str, default: - Separator to join the file name pattern components. cdm_subset : str or list, optional Specifies a subset of tables or a single table. - For multiple subsets of tables: This function returns a pandas.DataFrame that is multi-index at the columns, with (table-name, field) as column names. Tables are merged via the report_id field. - For a single table: This function returns a pandas.DataFrame with a simple indexing for the columns. Required if `source` is a valid file name. col_subset : str, list or dict, optional Specify the section or sections of the file to read. - For multiple sections of the tables: e.g col_subset = {table0:[columns0],...tableN:[columnsN]} - For a single section: e.g. list type object col_subset = [columns] This variable assumes that the column names are all conform to the cdm field names. delimiter : str, default: | Character or regex pattern to treat as the delimiter while reading with pandas.read_csv. na_values : Hashable, Iterable of Hashable or dict of {Hashable: Iterable}, optional Additional strings to recognize as Na/NaN while reading input file with pandas.read_csv. For more details see: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html null_label : str, default: null String how to label non valid values in `data`. from_str : bool, optional If True convert original string data to `imodel`-specific data types. to_str : bool, optional If True convert original `imodel`-specific data types to strings. imodel : str , optional Name of data model, e.g. icoads. Must be set if either `from_str` or `to_str` is set. \**kwargs : Any Additional keyword-arguments pass to data reader. Returns ------- cdm_reader_mapper.DataBundle DataBundle instance containing successfully read CDM table(s). See Also -------- read: Read either original marine-meteorological data or MDF data or CDM tables from disk. read_data : Read MDF data and validation mask from disk. read_mdf : Read original marine-meteorological data from disk. write: Write either MDF data or CDM tables to disk. write_tables: Write CDM tables to disk. write_data : Write MDF data and validation mask to disk. """ logger = logging_hdlr.init_logger(__name__, level="INFO") supported_file_types = get_args(SupportedFileTypes) if data_format not in supported_file_types: raise ValueError(f"data_format must be one of {supported_file_types}, not {data_format}.") # Because how the printers are written, they modify the original data frame!, # also removing rows with empty observation_value in observation_tables if data_format == "csv": kwargs = { "delimiter": delimiter, "dtype": "object", "na_values": na_values, "keep_default_na": False, **kwargs, } # See if subset, if any of the tables is not as specs cdm_subset = get_cdm_subset(cdm_subset) extension = extension or data_format if pathlib.Path(source).is_file(): df_list = [ _read_single_file( source, data_format=data_format, cdm_subset=cdm_subset, col_subset=col_subset, null_label=null_label, **kwargs, ) ] if df_list[0].empty: df_list = [] elif pathlib.Path(source).is_dir(): df_list = _read_multiple_files( source, data_format=data_format, prefix=prefix, suffix=suffix, extension=extension, separator=separator, cdm_subset=cdm_subset, col_subset=col_subset, null_label=null_label, logger=logger, **kwargs, ) else: raise FileNotFoundError(f"Source is neither a valid file name nor a valid directory path: {source}.") if len(df_list) == 0: raise ValueError("All tables empty in file system.") merged = pd.concat(df_list, axis=1, join="outer") merged = merged.reset_index(drop=True) if from_str is True: merged = convert_from_str_df(merged, imodel, cdm_subset=cdm_subset) elif to_str is True: merged = convert_to_str_df(merged, imodel, cdm_subset=cdm_subset) return DataBundle(data=merged, columns=merged.columns, mode="tables")