"""
Read Common Data Model (CDM) mapping tables.
Created on Thu Apr 11 13:45:38 2019
Reads files with the CDM table format from a file system to a pandas.Dataframe.
All CDM fields are read as objects. Null values are read with the specified null value
in the table files, or as NaN if the na_values argument is set to the a specific null
value in the file.
Reads the full set of files (default), a subset or a single table, as controlled
by cdm_subset:
- When reading multiple tables, the resulting dataframe is multi-indexed in
the columns, with (table-name, field) as column names. Merging of tables
occurs on the report_id field.
- When reading a single table, the resulting dataframe has simple indexing
in the columns.
Reads the full set of fields (default) or a subset of it, as controlled by
param col_subset:
- When reading multiple tables (default or subset), the col_subset is a
dictionary like: col_subset = {table0:[columns],...tablen:[columns]}
If a table is not specified in col_subset, all its fields are read.
- When reading a single table, the col_subset is a list like:
col_subset = [columns]
- It is assumed that the column names are all conform to the cdm field names
The full table set (header, observations-"*") is assumed to be in the same directory.
Filenames for tables are assumed to be:
tableName-<tb_id>.<extension>
with:
valid tableName: as declared in properties.cdm_tables
tb_id: any identifier including wildcards if required
extension: defaulting to 'psv'
When specifying a subset of tables, valid names are those in properties.cdm_tables
@author: iregon
"""
from __future__ import annotations
import glob
import os
from typing import get_args
import pandas as pd
from cdm_reader_mapper.common import get_filename, logging_hdlr
from cdm_reader_mapper.core.databundle import DataBundle
from ..properties import SupportedFileTypes
from .properties import cdm_tables
from .utils.conversions import convert_to_str_df, convert_from_str_df
from .utils.utilities import get_cdm_subset, get_usecols
READERS = {
"csv": pd.read_csv,
"parquet": pd.read_parquet,
"feather": pd.read_feather,
}
READER_KWARGS = {
"csv": "usecols",
"parquet": "columns",
"feather": "columns",
}
def _read_file(
ifile: str,
table: str,
col_subset: str | list | None,
data_format: SupportedFileTypes,
**kwargs,
) -> pd.DataFrame:
usecols = get_usecols(table, col_subset)
reader = READERS[data_format]
reader_kwargs = {READER_KWARGS[data_format]: usecols, **kwargs}
return reader(ifile, **reader_kwargs)
def _read_single_file(
ifile: str,
data_format: SupportedFileTypes,
cdm_subset: str | list | None = None,
col_subset: str | list | None = None,
null_label: str = "null",
**kwargs,
) -> pd.DataFrame:
if not isinstance(cdm_subset, list):
cdm_subset = [cdm_subset]
df = _read_file(
ifile,
table=cdm_subset[0],
data_format=data_format,
col_subset=col_subset,
**kwargs,
)
if df.empty:
return pd.DataFrame()
df = df.set_index("report_id", drop=False)
if null_label in df.index:
return df.drop(index=null_label)
return df
def _read_multiple_files(
inp_dir: str,
data_format: SupportedFileTypes,
prefix: str | None = None,
suffix: str | None = None,
extension: str | None = None,
separator: str | None = "-",
cdm_subset: str | list | None = None,
col_subset: str | list | None = None,
null_label: str = "null",
logger=None,
**kwargs,
) -> list[pd.DataFrame]:
if suffix is None:
suffix_pattern = "*"
elif suffix == "*":
suffix_pattern = "*"
else:
suffix_pattern = f"*{suffix}"
# See if there's anything at all:
pattern = get_filename(
[prefix, suffix_pattern], path=inp_dir, extension=extension, separator=separator
)
files = glob.glob(pattern)
if len(files) == 0:
raise FileNotFoundError(f"No files found matching pattern {pattern}")
df_list = []
if not isinstance(cdm_subset, list):
cdm_subset = [cdm_subset]
for table in cdm_subset:
if table not in cdm_tables:
logger.warning(f"Requested table {table} not defined in CDM")
continue
logger.info(f"Getting file path for pattern {table}")
_pattern = [table]
if prefix:
_pattern = [prefix] + _pattern
if suffix:
_pattern = _pattern + [suffix_pattern]
pattern_ = get_filename(
_pattern, path=inp_dir, extension=extension, separator=separator
)
paths_ = glob.glob(pattern_)
if len(paths_) != 1:
logger.warning(
f"Pattern {pattern_} resulted in multiple files for table {table}: {paths_} "
"Cannot securely retrieve cdm table(s)"
)
continue
dfi = _read_single_file(
paths_[0],
data_format=data_format,
cdm_subset=[table],
col_subset=col_subset,
null_label=null_label,
**kwargs,
)
if dfi.empty:
logger.warning(
f"Table {table} empty in file system, not added to the final DF"
)
continue
dfi.columns = pd.MultiIndex.from_product([[table], dfi.columns])
df_list.append(dfi)
return df_list
[docs]
def read_tables(
source: str,
data_format: SupportedFileTypes = "parquet",
prefix: str | None = None,
suffix: str | None = None,
extension: str | None = None,
separator: str | None = "-",
cdm_subset: str | list | None = None,
col_subset: str | list | dict | None = None,
delimiter: str = "|",
na_values: str | None = None,
null_label: str = "null",
imodel: str | None = None,
from_str: bool | None = None,
to_str: bool | None = None,
**kwargs,
) -> DataBundle:
"""
Read CDM-table-like files from file system to a pandas.DataFrame.
Parameters
----------
source: str
The file (including path) or the path to the file(s) to be read.
data_format: {"csv", "parquet", "feather"}, default: "parquet"
Format of input data file(s).
prefix: str, optional
Prefix of file name structure: ``<prefix>-<table>-*<suffix>.<extension>``.
Could de used if `source` is a valid directory path.
suffix: str, optional
Suffix of file name structure: ``<prefix>-<table>-*<suffix>.<extension>``.
Could de used if `source` is a valid directory path.
extension: str, optional
Extension of file name structure: ``<prefix>-<table>-*<suffix>.<extension>``.
Could de used if `source` is a valid directory path.
Default: "psv"
separator : str, optional
Separator to join the file name pattern components.
Default: "-"
cdm_subset: str or list, optional
Specifies a subset of tables or a single table.
- For multiple subsets of tables:
This function returns a pandas.DataFrame that is multi-index at
the columns, with (table-name, field) as column names. Tables are merged via the report_id field.
- For a single table:
This function returns a pandas.DataFrame with a simple indexing for the columns.
Required if `source` is a valid file name.
col_subset: str, list or dict, optional
Specify the section or sections of the file to read.
- For multiple sections of the tables:
e.g col_subset = {table0:[columns0],...tableN:[columnsN]}
- For a single section:
e.g. list type object col_subset = [columns]
This variable assumes that the column names are all conform to the cdm field names.
delimiter: str
Character or regex pattern to treat as the delimiter while reading with pandas.read_csv.
Default: '|'
na_values: Hashable, Iterable of Hashable or dict of {Hashable: Iterable}, optional
Additional strings to recognize as Na/NaN while reading input file with pandas.read_csv.
For more details see: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
null_label: str
String how to label non valid values in `data`.
Default: null
Returns
-------
cdm_reader_mapper.DataBundle
See Also
--------
read: Read either original marine-meteorological data or MDF data or CDM tables from disk.
read_data : Read MDF data and validation mask from disk.
read_mdf : Read original marine-meteorological data from disk.
write: Write either MDF data or CDM tables to disk.
write_tables: Write CDM tables to disk.
write_data : Write MDF data and validation mask to disk.
"""
logger = logging_hdlr.init_logger(__name__, level="INFO")
supported_file_types = get_args(SupportedFileTypes)
if data_format not in supported_file_types:
raise ValueError(
f"data_format must be one of {supported_file_types}, not {data_format}."
)
# Because how the printers are written, they modify the original data frame!,
# also removing rows with empty observation_value in observation_tables
if data_format == "csv":
kwargs = {
"delimiter": delimiter,
"dtype": "object",
"na_values": na_values,
"keep_default_na": False,
**kwargs,
}
# See if subset, if any of the tables is not as specs
cdm_subset = get_cdm_subset(cdm_subset)
extension = extension or data_format
if os.path.isfile(source):
df_list = [
_read_single_file(
source,
data_format=data_format,
cdm_subset=cdm_subset,
col_subset=col_subset,
null_label=null_label,
**kwargs,
)
]
if df_list[0].empty:
df_list = []
elif os.path.isdir(source):
df_list = _read_multiple_files(
source,
data_format=data_format,
prefix=prefix,
suffix=suffix,
extension=extension,
separator=separator,
cdm_subset=cdm_subset,
col_subset=col_subset,
null_label=null_label,
logger=logger,
**kwargs,
)
else:
raise FileNotFoundError(
f"Source is neither a valid file name nor a valid directory path: {source}."
)
if len(df_list) == 0:
raise ValueError("All tables empty in file system.")
merged = pd.concat(df_list, axis=1, join="outer")
merged = merged.reset_index(drop=True)
if from_str is True:
merged = convert_from_str_df(merged, imodel, cdm_subset=cdm_subset)
elif to_str is True:
merged = convert_to_str_df(merged, imodel, cdm_subset=cdm_subset)
return DataBundle(data=merged, columns=merged.columns, mode="tables")