"""Common Data Model (CDM) MDF reader."""
from __future__ import annotations
from collections.abc import Callable, Iterable
from pathlib import Path
from typing import Any, get_args
import pandas as pd
from cdm_reader_mapper import DataBundle
from ..common.json_dict import open_json_file
from ..properties import SupportedFileTypes
from .utils.filereader import FileReader
from .utils.utilities import as_list, as_path, read_csv, read_feather, read_parquet, validate_arg
READERS: dict[str, Callable[..., tuple[pd.DataFrame | Iterable[pd.DataFrame], dict[str, Any]]]] = {
"csv": read_csv,
"parquet": read_parquet,
"feather": read_feather,
}
[docs]
def validate_read_mdf_args(
*,
source: str | Path,
imodel: str | None = None,
ext_schema_path: str | Path | None = None,
ext_schema_file: str | Path | None = None,
year_init: int | None = None,
year_end: int | None = None,
chunksize: int | None = None,
skiprows: int | None = None,
) -> None:
"""
Validate arguments for reading an MDF file.
This function performs validation on file paths and numeric arguments
required for reading an MDF dataset.
Parameters
----------
source : str or Path-like
Source of input dataset.
imodel : str, optional
Name of data model, e.g. icoads_r300_d721.
ext_schema_path : str or Path-like, optional
Directory of external schema file.
ext_schema_file : str or Path-like, optional
Path of external schema file.
year_init : int, optional
Initial valid year.
year_end : int, optional
End valid year.
chunksize : int, optional
Number of lines to read from the file per chunk.
skiprows : int, optional
Number of lines to skip at the start of the file.
Raises
------
FileNotFoundError
If the source file does not exist.
ValueError
- If one of `imodel` or `ext_schema_path/ext_schema_file` is not provided.
- If `chunksize` is 0 or negative.
- If `skiprows` is negative.
- If `year_init` is greater than `year_end`.
- If any input parameter does not match requested types.
"""
source = as_path(source, "source")
if not source.exists():
raise FileNotFoundError(f"Source file not found: {source}")
if not imodel and not (ext_schema_path or ext_schema_file):
raise ValueError("One of imodel or ext_schema_path/ext_schema_file must be provided")
validate_arg("chunksize", chunksize, int)
if chunksize is not None and chunksize <= 0:
raise ValueError("chunksize must be a real positive integer")
validate_arg("skiprows", skiprows, int)
if skiprows is not None and skiprows < 0:
raise ValueError("skiprows must be a positive integer.")
validate_arg("year_init", year_init, int)
validate_arg("year_end", year_end, int)
if year_init is not None and year_end is not None:
if year_init > year_end:
raise ValueError("year_init must be <= year_end")
[docs]
def read_mdf(
source: str,
imodel: str | None = None,
ext_schema_path: str | Path | None = None,
ext_schema_file: str | Path | None = None,
ext_table_path: str | Path | None = None,
year_init: int | None = None,
year_end: int | None = None,
encoding: str | None = None,
chunksize: int | None = None,
skiprows: int | None = None,
convert_flag: bool = True,
converter_dict: dict[str, Any] | None = None,
converter_kwargs: dict[str, Any] | None = None,
decode_flag: bool = True,
decoder_dict: dict[str, Any] | None = None,
validate_flag: bool = True,
sections: str | list[str] | None = None,
excludes: str | list[str] | None = None,
pd_kwargs: dict[str, Any] | None = None,
xr_kwargs: dict[str, Any] | None = None,
) -> DataBundle:
"""
Read data files compliant with a user specific data model.
Reads a data file to a pandas DataFrame using a pre-defined data model.
Read data is validates against its data model producing a boolean mask
on output.
The data model needs to be input to the module as a named model
(included in the module) or as the path to a valid data model.
Parameters
----------
source : str
The file (including path) to be read.
imodel : str
Name of internally available input data model, e.g. icoads_r300_d704.
ext_schema_path : str or Path-like, optional
The path to the external input data model schema file.
The schema file must have the same name as the directory.
One of ``imodel`` and ``ext_schema_path`` or ``ext_schema_file`` must be set.
ext_schema_file : str or Path-like, optional
The external input data model schema file.
One of ``imodel`` and ``ext_schema_path`` or ``ext_schema_file`` must be set.
ext_table_path : str or Path-like, optional
The path to the external table file.
The table file must have the same name as the directory.
year_init : str or int, optional
Left border of time axis.
year_end : str or int, optional
Right border of time axis.
encoding : str, optional
The encoding of the input file. Overrides the value in the imodel schema file.
chunksize : int, optional
Number of reports per chunk.
skiprows : int, optional
Number of initial rows to skip from file.
convert_flag : bool, default: True
If True convert entries by using a pre-defined data model.
converter_dict : dict of {Hashable: func}, optional
Functions for converting values in specific columns.
If None use information from a pre-defined data model.
converter_kwargs : dict of {Hashable: kwargs}, optional
Key-word arguments for converting values in specific columns.
If None use information from a pre-defined data model.
decode_flag : bool, default: True
If True decode entries by using a pre-defined data model.
decoder_dict : dict of {Hashable: func}, optional
Functions for decoding values in specific columns.
If None use information from a pre-defined data model.
validate_flag : bool, default: True
Validate data entries by using a pre-defined data model.
sections : list, optional
List with subset of data model sections to output.
If None read pre-defined data model sections.
excludes : str or list of str, optional
MDF Sections to exclude.
pd_kwargs : dict, optional
Additional pandas arguments.
xr_kwargs : dict, optional
Additional xarray arguments.
Returns
-------
cdm_reader_mapper.DataBundle
DaaBundle containing MDF data.
See Also
--------
read: Read either original marine-meteorological or MDF data or CDM tables from disk.
read_data : Read MDF data and validation mask from disk.
read_tables : Read CDM tables from disk.
write: Write either MDF data or CDM tables to disk.
write_data : Write MDF data and validation mask to disk.
write_tables : Write CDM tables to disk.
"""
skiprows = skiprows or 0
validate_read_mdf_args(
source=source,
imodel=imodel,
ext_schema_path=ext_schema_path,
ext_schema_file=ext_schema_file,
year_init=year_init,
year_end=year_end,
chunksize=chunksize,
skiprows=skiprows,
)
pd_kwargs = pd_kwargs or {}
pd_kwargs.setdefault("encoding", encoding)
pd_kwargs.setdefault("chunksize", chunksize)
pd_kwargs.setdefault("skiprows", skiprows)
xr_kwargs = xr_kwargs or {}
convert_kwargs = dict(
convert_flag=convert_flag,
converter_dict=converter_dict,
converter_kwargs=converter_kwargs,
)
decode_kwargs = dict(
decode_flag=decode_flag,
decoder_dict=decoder_dict,
)
validate_kwargs = dict(
validate_flag=validate_flag,
ext_table_path=ext_table_path,
)
sections = as_list(sections)
excludes = as_list(excludes)
validate_arg("sections", sections, list)
validate_arg("excludes", excludes, list)
select_kwargs = dict(
sections=sections,
excludes=excludes,
year_init=year_init,
year_end=year_end,
)
filereader = FileReader(
imodel=imodel,
ext_schema_path=ext_schema_path,
ext_schema_file=ext_schema_file,
)
return filereader.read(
source=source,
pd_kwargs=pd_kwargs,
xr_kwargs=xr_kwargs,
convert_kwargs=convert_kwargs,
decode_kwargs=decode_kwargs,
validate_kwargs=validate_kwargs,
select_kwargs=select_kwargs,
)
def _read_data(
data_file: str,
mask_file: str | None,
reader: Callable[..., Any],
col_subset: str | list[str] | tuple[str] | None,
data_kwargs: dict[str, Any],
mask_kwargs: dict[str, Any],
) -> tuple[pd.DataFrame, pd.DataFrame, dict[str, Any]]:
"""
Helper function for reading data files from disk.
Parameters
----------
data_file : str
Path to data file.
mask_file : str
Path to mask file.
reader : Callable
Function to read `data_file` and/or `mask_file`.
col_subset : str or list of str or tuple of str
Subset of columns to select from `data_file` and/or `mask_file`.
data_kwargs : dict
Keyword-arguments to read `data_file`.
mask_kwargs : dict
Keyword-arguments to read `mask_file`.
Returns
-------
tuple of pd.DataFrame, pd.DataFrame and dict
Data as pd.DataFrame, mask as pd.DataFrame and information dictionary.
"""
data, info = reader(
data_file,
col_subset=col_subset,
**data_kwargs,
)
if mask_file is None:
mask = pd.DataFrame()
else:
mask, _ = reader(
mask_file,
col_subset=col_subset,
column_names=info["columns"],
**mask_kwargs,
)
return data, mask, info
[docs]
def read_data(
data_file: str,
mask_file: str | None = None,
info_file: str | None = None,
data_format: SupportedFileTypes = "parquet",
imodel: str | None = None,
col_subset: str | list[str] | tuple[str] | None = None,
encoding: str | None = None,
delimiter: str | None = None,
**kwargs: Any,
) -> DataBundle:
r"""
Read MDF data which is already on a pre-defined data model.
Parameters
----------
data_file : str
The data file (including path) to be read.
mask_file : str, optional
The validation file (including path) to be read.
info_file : str, optional
The information file (including path) to be read.
data_format : {"csv", "parquet", "feather"}, default: "parquet"
Format of input data file(s).
imodel : str, optional
Name of internally available input data model, e.g. icoads_r300_d704.
col_subset : str, tuple or list, optional
Specify the section or sections of the file to write.
- For multiple sections of the tables:
e.g col_subset = [columns0,...,columnsN]
- For a single section:
e.g. list type object col_subset = [columns]
Column labels could be both string or tuple.
encoding : str, optional
The encoding of the input file. Overrides the value in the imodel schema file.
delimiter : str, optional
The delimiter used in the input file. Overrides the value in the imodel schema file.
\**kwargs : Any
Key-word arguments that will be passed to read fuunction.
Returns
-------
cdm_reader_mapper.DataBundle
DataBundle containing MDF data.
See Also
--------
read: Read original marine-meteorological data as well as MDF data or CDM tables from disk.
read_mdf : Read original marine-meteorological data from disk.
read_tables : Read CDM tables from disk.
write: Write both MDF data or CDM tables to disk.
write_data : Write MDF data and validation mask to disk.
write_tables : Write CDM tables to disk.
"""
supported_file_types = get_args(SupportedFileTypes)
if data_format not in supported_file_types:
raise ValueError(f"data_format must be one of {supported_file_types}, not {data_format}.")
data_kwargs = kwargs.copy()
mask_kwargs = kwargs.copy()
parse_dates = False
if data_format == "csv":
info_dict = open_json_file(info_file) if info_file else {}
dtype = info_dict.get("dtypes", "object")
parse_dates = info_dict.get("parse_dates", False)
encoding = encoding or info_dict.get("encoding")
delimiter = delimiter or info_dict.get("delimiter")
data_kwargs.setdefault("dtype", dtype)
data_kwargs.setdefault("parse_dates", parse_dates)
data_kwargs.setdefault("encoding", encoding)
mask_kwargs.setdefault("dtype", "boolean")
mask_kwargs.setdefault("delimiter", delimiter)
data, mask, info = _read_data(
data_file=data_file,
mask_file=mask_file,
reader=READERS[data_format],
col_subset=col_subset,
data_kwargs=data_kwargs,
mask_kwargs=mask_kwargs,
)
return DataBundle(
data=data,
columns=info["columns"],
dtypes=info["dtypes"],
parse_dates=parse_dates,
mask=mask,
imodel=imodel,
encoding=encoding,
)