"""
Read Common Data Model (CDM) mapping tables.
Created on Thu Apr 11 13:45:38 2019
Reads files with the CDM table format from a file system to a pandas.Dataframe.
All CDM fields are read as objects. Null values are read with the specified null value
in the table files, or as NaN if the na_values argument is set to the a specific null
value in the file.
Reads the full set of files (default), a subset or a single table, as controlled
by cdm_subset:
- When reading multiple tables, the resulting dataframe is multi-indexed in
the columns, with (table-name, field) as column names. Merging of tables
occurs on the report_id field.
- When reading a single table, the resulting dataframe has simple indexing
in the columns.
Reads the full set of fields (default) or a subset of it, as controlled by
param col_subset:
- When reading multiple tables (default or subset), the col_subset is a
dictionary like: col_subset = {table0:[columns],...tablen:[columns]}
If a table is not specified in col_subset, all its fields are read.
- When reading a single table, the col_subset is a list like:
col_subset = [columns]
- It is assumed that the column names are all conform to the cdm field names
The full table set (header, observations-"*") is assumed to be in the same directory.
Filenames for tables are assumed to be:
tableName-<tb_id>.<extension>
with:
valid tableName: as declared in properties.cdm_tables
tb_id: any identifier including wildcards if required
extension: defaulting to 'psv'
When specifying a subset of tables, valid names are those in properties.cdm_tables
@author: iregon
"""
from __future__ import annotations
import logging
import pathlib
from collections.abc import Callable
from pathlib import Path
from typing import Any, get_args
import pandas as pd
from cdm_reader_mapper.common import get_filename, logging_hdlr
from cdm_reader_mapper.core.databundle import DataBundle
from ..properties import SupportedFileTypes
from .properties import cdm_tables
from .utils.conversions import convert_from_str_df, convert_to_str_df
from .utils.utilities import get_cdm_subset, get_usecols
READERS: dict[str, Callable[..., pd.DataFrame | pd.Series]] = {
"csv": pd.read_csv,
"parquet": pd.read_parquet,
"feather": pd.read_feather,
}
READER_KWARGS = {
"csv": "usecols",
"parquet": "columns",
"feather": "columns",
}
def _read_file(
ifile: str | Path,
table: str,
col_subset: str | list[str] | dict[str, Any] | None,
data_format: SupportedFileTypes,
**kwargs: Any,
) -> pd.DataFrame:
r"""
Read a single file into a DataFrame using a format-specific reader.
Parameters
----------
ifile : str or Path
Path to the input file.
table : str
Table name used to determine column selection.
col_subset : str, list of str or dict or None
Column subset specification used to filter columns during reading.
data_format : SupportedFileTypes
File format used to select the appropriate reader.
\**kwargs : Any
Additional keyword arguments passed to the underlying reader.
Returns
-------
pd.DataFrame
DataFrame containing the selected data from the file.
"""
usecols = get_usecols(table, col_subset)
reader = READERS[data_format]
reader_kwargs = {READER_KWARGS[data_format]: usecols, **kwargs}
return reader(ifile, **reader_kwargs)
def _read_single_file(
ifile: str | Path,
data_format: SupportedFileTypes,
cdm_subset: str | list[str],
col_subset: str | list[str] | dict[str, Any] | None,
null_label: str = "null",
**kwargs: Any,
) -> pd.DataFrame:
r"""
Read and preprocess a single CDM file into a DataFrame.
Parameters
----------
ifile : str or Path
Path to the input file.
data_format : SupportedFileTypes
File format used to read the file.
cdm_subset : str or list of str
CDM table(s) to read. Only the first entry is used.
col_subset : str, list of str or dict or None
Column subset specification used to filter columns.
null_label : str, default: null
Label used to identify and remove null-index rows.
\**kwargs : Any
Additional keyword arguments passed to the reader.
Returns
-------
pd.DataFrame
Processed DataFrame indexed by `report_id`. Returns an empty
DataFrame if no data is found.
"""
if isinstance(cdm_subset, str):
cdm_subset = [cdm_subset]
else:
cdm_subset = list(cdm_subset)
df = _read_file(
ifile,
table=cdm_subset[0],
data_format=data_format,
col_subset=col_subset,
**kwargs,
)
if df.empty:
return pd.DataFrame()
df = df.set_index("report_id", drop=False)
if null_label in df.index:
return df.drop(index=null_label)
return df
def _read_multiple_files(
inp_dir: str,
data_format: SupportedFileTypes,
prefix: str | None = None,
suffix: str | None = None,
extension: str | None = None,
separator: str | None = "-",
cdm_subset: str | list[str] | None = None,
col_subset: str | list[str] | dict[str, Any] | None = None,
null_label: str = "null",
logger: logging.Logger | None = None,
**kwargs: Any,
) -> list[pd.DataFrame]:
r"""
Read multiple CDM files from a directory into a list of DataFrames.
Files are matched using naming patterns derived from prefix, suffix,
and table names. Each successfully read table is returned as a DataFrame
with a hierarchical column index.
Parameters
----------
inp_dir : str
Directory containing input files.
data_format : SupportedFileTypes
File format used to read the files.
prefix : str, optional
Prefix used in file name matching.
suffix : str, optional
Suffix used in file name matching.
extension : str, optional
File extension to filter files.
separator : str, default "-"
Separator used in file naming patterns.
cdm_subset : str or list of str, optional
CDM table(s) to read. Must not be None.
col_subset : str or list of str or dict, optional
Column subset specification used to filter columns.
null_label : str, default: null
Label used to identify and remove null-index rows.
logger : logging.Logger or None
Logger used for informational and warning messages. Must not be None.
\**kwargs : Any
Additional keyword arguments passed to file readers.
Returns
-------
list of pandas.DataFrame
List of DataFrames, one per successfully read table, each with
a MultiIndex column structure.
Raises
------
ValueError
If `cdm_subset` or `logger` is None.
FileNotFoundError
If no files match the constructed file pattern.
"""
if cdm_subset is None:
raise ValueError("cdm_subset must be a string or a list of strings, not None.")
if logger is None:
raise ValueError("logger must be a logging.logger, not None.")
if suffix is None:
suffix_pattern = "*"
elif suffix == "*":
suffix_pattern = "*"
else:
suffix_pattern = f"*{suffix}"
# See if there's anything at all:
full_pattern = get_filename([prefix, suffix_pattern], path=inp_dir, extension=extension, separator=separator)
path_pattern = Path(full_pattern)
base_dir = path_pattern.parent
file_pattern = path_pattern.name
files = list(base_dir.glob(file_pattern))
if len(files) == 0:
raise FileNotFoundError(f"No files found matching pattern {full_pattern}")
df_list = []
if not isinstance(cdm_subset, list):
cdm_subset = [cdm_subset]
for table in cdm_subset:
if table not in cdm_tables:
logger.warning("Requested table %s not defined in CDM", table)
continue
logger.info("Getting file path for pattern %s", table)
table_pattern = [table]
if prefix:
table_pattern = [prefix] + table_pattern
if suffix:
table_pattern = table_pattern + [suffix_pattern]
full_table_pattern = get_filename(table_pattern, path=inp_dir, extension=extension, separator=separator)
table_path_pattern = Path(full_table_pattern)
table_base_dir = table_path_pattern.parent
table_file_pattern = table_path_pattern.name
paths = list(table_base_dir.glob(table_file_pattern))
if len(paths) != 1:
logger.warning(
"Pattern %s resulted in multiple files for table %s: %s Cannot securely retrieve cdm table(s)", table_pattern, table, paths
)
continue
dfi = _read_single_file(
paths[0],
data_format=data_format,
cdm_subset=[table],
col_subset=col_subset,
null_label=null_label,
**kwargs,
)
if dfi.empty:
logger.warning("Table %s empty in file system, not added to the final DF", table)
continue
dfi.columns = pd.MultiIndex.from_product([[table], dfi.columns])
df_list.append(dfi)
return df_list
[docs]
def read_tables(
source: str,
data_format: SupportedFileTypes = "parquet",
prefix: str | None = None,
suffix: str | None = None,
extension: str | None = None,
separator: str | None = "-",
cdm_subset: str | list[str] | None = None,
col_subset: str | list[str] | dict[str, Any] | None = None,
delimiter: str = "|",
na_values: str | None = None,
null_label: str = "null",
from_str: bool | None = None,
to_str: bool | None = None,
imodel: str | None = None,
**kwargs: Any,
) -> DataBundle:
r"""
Read CDM-table-like files from file system to a pandas.DataFrame.
Parameters
----------
source : str
The file (including path) or the path to the file(s) to be read.
data_format : {"csv", "parquet", "feather"}, default: "parquet"
Format of input data file(s).
prefix : str, optional
Prefix of file name structure: ``<prefix>-<table>-*<suffix>.<extension>``.
Could de used if `source` is a valid directory path.
suffix : str, optional
Suffix of file name structure: ``<prefix>-<table>-*<suffix>.<extension>``.
Could de used if `source` is a valid directory path.
extension : str, optional
Extension of file name structure: ``<prefix>-<table>-*<suffix>.<extension>``.
Could de used if `source` is a valid directory path.
separator : str, default: -
Separator to join the file name pattern components.
cdm_subset : str or list, optional
Specifies a subset of tables or a single table.
- For multiple subsets of tables:
This function returns a pandas.DataFrame that is multi-index at
the columns, with (table-name, field) as column names. Tables are merged via the report_id field.
- For a single table:
This function returns a pandas.DataFrame with a simple indexing for the columns.
Required if `source` is a valid file name.
col_subset : str, list or dict, optional
Specify the section or sections of the file to read.
- For multiple sections of the tables:
e.g col_subset = {table0:[columns0],...tableN:[columnsN]}
- For a single section:
e.g. list type object col_subset = [columns]
This variable assumes that the column names are all conform to the cdm field names.
delimiter : str, default: |
Character or regex pattern to treat as the delimiter while reading with pandas.read_csv.
na_values : Hashable, Iterable of Hashable or dict of {Hashable: Iterable}, optional
Additional strings to recognize as Na/NaN while reading input file with pandas.read_csv.
For more details see: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
null_label : str, default: null
String how to label non valid values in `data`.
from_str : bool, optional
If True convert original string data to `imodel`-specific data types.
to_str : bool, optional
If True convert original `imodel`-specific data types to strings.
imodel : str , optional
Name of data model, e.g. icoads.
Must be set if either `from_str` or `to_str` is set.
\**kwargs : Any
Additional keyword-arguments pass to data reader.
Returns
-------
cdm_reader_mapper.DataBundle
DataBundle instance containing successfully read CDM table(s).
See Also
--------
read: Read either original marine-meteorological data or MDF data or CDM tables from disk.
read_data : Read MDF data and validation mask from disk.
read_mdf : Read original marine-meteorological data from disk.
write: Write either MDF data or CDM tables to disk.
write_tables: Write CDM tables to disk.
write_data : Write MDF data and validation mask to disk.
"""
logger = logging_hdlr.init_logger(__name__, level="INFO")
supported_file_types = get_args(SupportedFileTypes)
if data_format not in supported_file_types:
raise ValueError(f"data_format must be one of {supported_file_types}, not {data_format}.")
# Because how the printers are written, they modify the original data frame!,
# also removing rows with empty observation_value in observation_tables
if data_format == "csv":
kwargs = {
"delimiter": delimiter,
"dtype": "object",
"na_values": na_values,
"keep_default_na": False,
**kwargs,
}
# See if subset, if any of the tables is not as specs
cdm_subset = get_cdm_subset(cdm_subset)
extension = extension or data_format
if pathlib.Path(source).is_file():
df_list = [
_read_single_file(
source,
data_format=data_format,
cdm_subset=cdm_subset,
col_subset=col_subset,
null_label=null_label,
**kwargs,
)
]
if df_list[0].empty:
df_list = []
elif pathlib.Path(source).is_dir():
df_list = _read_multiple_files(
source,
data_format=data_format,
prefix=prefix,
suffix=suffix,
extension=extension,
separator=separator,
cdm_subset=cdm_subset,
col_subset=col_subset,
null_label=null_label,
logger=logger,
**kwargs,
)
else:
raise FileNotFoundError(f"Source is neither a valid file name nor a valid directory path: {source}.")
if len(df_list) == 0:
raise ValueError("All tables empty in file system.")
merged = pd.concat(df_list, axis=1, join="outer")
merged = merged.reset_index(drop=True)
if from_str is True:
merged = convert_from_str_df(merged, imodel, cdm_subset=cdm_subset)
elif to_str is True:
merged = convert_to_str_df(merged, imodel, cdm_subset=cdm_subset)
return DataBundle(data=merged, columns=merged.columns, mode="tables")