Source code for cdm_reader_mapper.cdm_mapper.reader

"""
Read Common Data Model (CDM) mapping tables.

Created on Thu Apr 11 13:45:38 2019

Reads files with the CDM table format from a file system to a pandas.Dataframe.

All CDM fields are read as objects. Null values are read with the specified null value
in the table files, or as NaN if the na_values argument is set to the a specific null
value in the file.

Reads the full set of files (default), a subset or a single table, as controlled
by cdm_subset:

    - When reading multiple tables, the resulting dataframe is multi-indexed in
        the columns, with (table-name, field) as column names. Merging of tables
        occurs on the report_id field.
    - When reading a single table, the resulting dataframe has simple indexing
        in the columns.

Reads the full set of fields (default) or a subset of it, as controlled by
param col_subset:

    - When reading multiple tables (default or subset), the col_subset is a
        dictionary like: col_subset = {table0:[columns],...tablen:[columns]}
        If a table is not specified in col_subset, all its fields are read.
    - When reading a single table, the col_subset is a list like:
        col_subset = [columns]
    - It is assumed that the column names are all conform to the cdm field names

The full table set (header, observations-"*") is assumed to be in the same directory.

Filenames for tables are assumed to be:
    tableName-<tb_id>.<extension>
with:
    valid tableName: as declared in properties.cdm_tables
    tb_id: any identifier including wildcards if required
    extension: defaulting to 'psv'

When specifying a subset of tables, valid names are those in properties.cdm_tables

@author: iregon
"""

from __future__ import annotations

import glob
import os

from typing import get_args

import pandas as pd

from cdm_reader_mapper.common import get_filename, logging_hdlr
from cdm_reader_mapper.core.databundle import DataBundle

from ..properties import SupportedFileTypes
from .properties import cdm_tables

from .utils.conversions import convert_to_str_df, convert_from_str_df
from .utils.utilities import get_cdm_subset, get_usecols


READERS = {
    "csv": pd.read_csv,
    "parquet": pd.read_parquet,
    "feather": pd.read_feather,
}

READER_KWARGS = {
    "csv": "usecols",
    "parquet": "columns",
    "feather": "columns",
}


def _read_file(
    ifile: str,
    table: str,
    col_subset: str | list | None,
    data_format: SupportedFileTypes,
    **kwargs,
) -> pd.DataFrame:
    usecols = get_usecols(table, col_subset)
    reader = READERS[data_format]
    reader_kwargs = {READER_KWARGS[data_format]: usecols, **kwargs}
    return reader(ifile, **reader_kwargs)


def _read_single_file(
    ifile: str,
    data_format: SupportedFileTypes,
    cdm_subset: str | list | None = None,
    col_subset: str | list | None = None,
    null_label: str = "null",
    **kwargs,
) -> pd.DataFrame:
    if not isinstance(cdm_subset, list):
        cdm_subset = [cdm_subset]
    df = _read_file(
        ifile,
        table=cdm_subset[0],
        data_format=data_format,
        col_subset=col_subset,
        **kwargs,
    )

    if df.empty:
        return pd.DataFrame()

    df = df.set_index("report_id", drop=False)

    if null_label in df.index:
        return df.drop(index=null_label)

    return df


def _read_multiple_files(
    inp_dir: str,
    data_format: SupportedFileTypes,
    prefix: str | None = None,
    suffix: str | None = None,
    extension: str | None = None,
    separator: str | None = "-",
    cdm_subset: str | list | None = None,
    col_subset: str | list | None = None,
    null_label: str = "null",
    logger=None,
    **kwargs,
) -> list[pd.DataFrame]:
    if suffix is None:
        suffix_pattern = "*"
    elif suffix == "*":
        suffix_pattern = "*"
    else:
        suffix_pattern = f"*{suffix}"

    # See if there's anything at all:
    pattern = get_filename(
        [prefix, suffix_pattern], path=inp_dir, extension=extension, separator=separator
    )
    files = glob.glob(pattern)

    if len(files) == 0:
        raise FileNotFoundError(f"No files found matching pattern {pattern}")

    df_list = []
    if not isinstance(cdm_subset, list):
        cdm_subset = [cdm_subset]

    for table in cdm_subset:
        if table not in cdm_tables:
            logger.warning(f"Requested table {table} not defined in CDM")
            continue

        logger.info(f"Getting file path for pattern {table}")
        _pattern = [table]
        if prefix:
            _pattern = [prefix] + _pattern
        if suffix:
            _pattern = _pattern + [suffix_pattern]
        pattern_ = get_filename(
            _pattern, path=inp_dir, extension=extension, separator=separator
        )
        paths_ = glob.glob(pattern_)
        if len(paths_) != 1:
            logger.warning(
                f"Pattern {pattern_} resulted in multiple files for table {table}: {paths_} "
                "Cannot securely retrieve cdm table(s)"
            )
            continue

        dfi = _read_single_file(
            paths_[0],
            data_format=data_format,
            cdm_subset=[table],
            col_subset=col_subset,
            null_label=null_label,
            **kwargs,
        )

        if dfi.empty:
            logger.warning(
                f"Table {table} empty in file system, not added to the final DF"
            )
            continue

        dfi.columns = pd.MultiIndex.from_product([[table], dfi.columns])
        df_list.append(dfi)

    return df_list


[docs] def read_tables( source: str, data_format: SupportedFileTypes = "parquet", prefix: str | None = None, suffix: str | None = None, extension: str | None = None, separator: str | None = "-", cdm_subset: str | list | None = None, col_subset: str | list | dict | None = None, delimiter: str = "|", na_values: str | None = None, null_label: str = "null", imodel: str | None = None, from_str: bool | None = None, to_str: bool | None = None, **kwargs, ) -> DataBundle: """ Read CDM-table-like files from file system to a pandas.DataFrame. Parameters ---------- source: str The file (including path) or the path to the file(s) to be read. data_format: {"csv", "parquet", "feather"}, default: "parquet" Format of input data file(s). prefix: str, optional Prefix of file name structure: ``<prefix>-<table>-*<suffix>.<extension>``. Could de used if `source` is a valid directory path. suffix: str, optional Suffix of file name structure: ``<prefix>-<table>-*<suffix>.<extension>``. Could de used if `source` is a valid directory path. extension: str, optional Extension of file name structure: ``<prefix>-<table>-*<suffix>.<extension>``. Could de used if `source` is a valid directory path. Default: "psv" separator : str, optional Separator to join the file name pattern components. Default: "-" cdm_subset: str or list, optional Specifies a subset of tables or a single table. - For multiple subsets of tables: This function returns a pandas.DataFrame that is multi-index at the columns, with (table-name, field) as column names. Tables are merged via the report_id field. - For a single table: This function returns a pandas.DataFrame with a simple indexing for the columns. Required if `source` is a valid file name. col_subset: str, list or dict, optional Specify the section or sections of the file to read. - For multiple sections of the tables: e.g col_subset = {table0:[columns0],...tableN:[columnsN]} - For a single section: e.g. list type object col_subset = [columns] This variable assumes that the column names are all conform to the cdm field names. delimiter: str Character or regex pattern to treat as the delimiter while reading with pandas.read_csv. Default: '|' na_values: Hashable, Iterable of Hashable or dict of {Hashable: Iterable}, optional Additional strings to recognize as Na/NaN while reading input file with pandas.read_csv. For more details see: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html null_label: str String how to label non valid values in `data`. Default: null Returns ------- cdm_reader_mapper.DataBundle See Also -------- read: Read either original marine-meteorological data or MDF data or CDM tables from disk. read_data : Read MDF data and validation mask from disk. read_mdf : Read original marine-meteorological data from disk. write: Write either MDF data or CDM tables to disk. write_tables: Write CDM tables to disk. write_data : Write MDF data and validation mask to disk. """ logger = logging_hdlr.init_logger(__name__, level="INFO") supported_file_types = get_args(SupportedFileTypes) if data_format not in supported_file_types: raise ValueError( f"data_format must be one of {supported_file_types}, not {data_format}." ) # Because how the printers are written, they modify the original data frame!, # also removing rows with empty observation_value in observation_tables if data_format == "csv": kwargs = { "delimiter": delimiter, "dtype": "object", "na_values": na_values, "keep_default_na": False, **kwargs, } # See if subset, if any of the tables is not as specs cdm_subset = get_cdm_subset(cdm_subset) extension = extension or data_format if os.path.isfile(source): df_list = [ _read_single_file( source, data_format=data_format, cdm_subset=cdm_subset, col_subset=col_subset, null_label=null_label, **kwargs, ) ] if df_list[0].empty: df_list = [] elif os.path.isdir(source): df_list = _read_multiple_files( source, data_format=data_format, prefix=prefix, suffix=suffix, extension=extension, separator=separator, cdm_subset=cdm_subset, col_subset=col_subset, null_label=null_label, logger=logger, **kwargs, ) else: raise FileNotFoundError( f"Source is neither a valid file name nor a valid directory path: {source}." ) if len(df_list) == 0: raise ValueError("All tables empty in file system.") merged = pd.concat(df_list, axis=1, join="outer") merged = merged.reset_index(drop=True) if from_str is True: merged = convert_from_str_df(merged, imodel, cdm_subset=cdm_subset) elif to_str is True: merged = convert_to_str_df(merged, imodel, cdm_subset=cdm_subset) return DataBundle(data=merged, columns=merged.columns, mode="tables")