Source code for cdm_reader_mapper.mdf_reader.reader

"""Common Data Model (CDM) MDF reader."""

from __future__ import annotations
from collections.abc import Callable, Iterable
from pathlib import Path
from typing import Any, get_args

import pandas as pd

from cdm_reader_mapper import DataBundle

from ..common.json_dict import open_json_file
from ..properties import SupportedFileTypes
from .utils.filereader import FileReader
from .utils.utilities import as_list, as_path, read_csv, read_feather, read_parquet, validate_arg


READERS: dict[str, Callable[..., tuple[pd.DataFrame | Iterable[pd.DataFrame], dict[str, Any]]]] = {
    "csv": read_csv,
    "parquet": read_parquet,
    "feather": read_feather,
}


[docs] def validate_read_mdf_args( *, source: str | Path, imodel: str | None = None, ext_schema_path: str | Path | None = None, ext_schema_file: str | Path | None = None, year_init: int | None = None, year_end: int | None = None, chunksize: int | None = None, skiprows: int | None = None, ) -> None: """ Validate arguments for reading an MDF file. This function performs validation on file paths and numeric arguments required for reading an MDF dataset. Parameters ---------- source : str or Path-like Source of input dataset. imodel : str, optional Name of data model, e.g. icoads_r300_d721. ext_schema_path : str or Path-like, optional Directory of external schema file. ext_schema_file : str or Path-like, optional Path of external schema file. year_init : int, optional Initial valid year. year_end : int, optional End valid year. chunksize : int, optional Number of lines to read from the file per chunk. skiprows : int, optional Number of lines to skip at the start of the file. Raises ------ FileNotFoundError If the source file does not exist. ValueError - If one of `imodel` or `ext_schema_path/ext_schema_file` is not provided. - If `chunksize` is 0 or negative. - If `skiprows` is negative. - If `year_init` is greater than `year_end`. - If any input parameter does not match requested types. """ source = as_path(source, "source") if not source.exists(): raise FileNotFoundError(f"Source file not found: {source}") if not imodel and not (ext_schema_path or ext_schema_file): raise ValueError("One of imodel or ext_schema_path/ext_schema_file must be provided") validate_arg("chunksize", chunksize, int) if chunksize is not None and chunksize <= 0: raise ValueError("chunksize must be a real positive integer") validate_arg("skiprows", skiprows, int) if skiprows is not None and skiprows < 0: raise ValueError("skiprows must be a positive integer.") validate_arg("year_init", year_init, int) validate_arg("year_end", year_end, int) if year_init is not None and year_end is not None: if year_init > year_end: raise ValueError("year_init must be <= year_end")
[docs] def read_mdf( source: str, imodel: str | None = None, ext_schema_path: str | Path | None = None, ext_schema_file: str | Path | None = None, ext_table_path: str | Path | None = None, year_init: int | None = None, year_end: int | None = None, encoding: str | None = None, chunksize: int | None = None, skiprows: int | None = None, convert_flag: bool = True, converter_dict: dict[str, Any] | None = None, converter_kwargs: dict[str, Any] | None = None, decode_flag: bool = True, decoder_dict: dict[str, Any] | None = None, validate_flag: bool = True, sections: str | list[str] | None = None, excludes: str | list[str] | None = None, pd_kwargs: dict[str, Any] | None = None, xr_kwargs: dict[str, Any] | None = None, ) -> DataBundle: """ Read data files compliant with a user specific data model. Reads a data file to a pandas DataFrame using a pre-defined data model. Read data is validates against its data model producing a boolean mask on output. The data model needs to be input to the module as a named model (included in the module) or as the path to a valid data model. Parameters ---------- source : str The file (including path) to be read. imodel : str Name of internally available input data model, e.g. icoads_r300_d704. ext_schema_path : str or Path-like, optional The path to the external input data model schema file. The schema file must have the same name as the directory. One of ``imodel`` and ``ext_schema_path`` or ``ext_schema_file`` must be set. ext_schema_file : str or Path-like, optional The external input data model schema file. One of ``imodel`` and ``ext_schema_path`` or ``ext_schema_file`` must be set. ext_table_path : str or Path-like, optional The path to the external table file. The table file must have the same name as the directory. year_init : str or int, optional Left border of time axis. year_end : str or int, optional Right border of time axis. encoding : str, optional The encoding of the input file. Overrides the value in the imodel schema file. chunksize : int, optional Number of reports per chunk. skiprows : int, optional Number of initial rows to skip from file. convert_flag : bool, default: True If True convert entries by using a pre-defined data model. converter_dict : dict of {Hashable: func}, optional Functions for converting values in specific columns. If None use information from a pre-defined data model. converter_kwargs : dict of {Hashable: kwargs}, optional Key-word arguments for converting values in specific columns. If None use information from a pre-defined data model. decode_flag : bool, default: True If True decode entries by using a pre-defined data model. decoder_dict : dict of {Hashable: func}, optional Functions for decoding values in specific columns. If None use information from a pre-defined data model. validate_flag : bool, default: True Validate data entries by using a pre-defined data model. sections : list, optional List with subset of data model sections to output. If None read pre-defined data model sections. excludes : str or list of str, optional MDF Sections to exclude. pd_kwargs : dict, optional Additional pandas arguments. xr_kwargs : dict, optional Additional xarray arguments. Returns ------- cdm_reader_mapper.DataBundle DaaBundle containing MDF data. See Also -------- read: Read either original marine-meteorological or MDF data or CDM tables from disk. read_data : Read MDF data and validation mask from disk. read_tables : Read CDM tables from disk. write: Write either MDF data or CDM tables to disk. write_data : Write MDF data and validation mask to disk. write_tables : Write CDM tables to disk. """ skiprows = skiprows or 0 validate_read_mdf_args( source=source, imodel=imodel, ext_schema_path=ext_schema_path, ext_schema_file=ext_schema_file, year_init=year_init, year_end=year_end, chunksize=chunksize, skiprows=skiprows, ) pd_kwargs = pd_kwargs or {} pd_kwargs.setdefault("encoding", encoding) pd_kwargs.setdefault("chunksize", chunksize) pd_kwargs.setdefault("skiprows", skiprows) xr_kwargs = xr_kwargs or {} convert_kwargs = dict( convert_flag=convert_flag, converter_dict=converter_dict, converter_kwargs=converter_kwargs, ) decode_kwargs = dict( decode_flag=decode_flag, decoder_dict=decoder_dict, ) validate_kwargs = dict( validate_flag=validate_flag, ext_table_path=ext_table_path, ) sections = as_list(sections) excludes = as_list(excludes) validate_arg("sections", sections, list) validate_arg("excludes", excludes, list) select_kwargs = dict( sections=sections, excludes=excludes, year_init=year_init, year_end=year_end, ) filereader = FileReader( imodel=imodel, ext_schema_path=ext_schema_path, ext_schema_file=ext_schema_file, ) return filereader.read( source=source, pd_kwargs=pd_kwargs, xr_kwargs=xr_kwargs, convert_kwargs=convert_kwargs, decode_kwargs=decode_kwargs, validate_kwargs=validate_kwargs, select_kwargs=select_kwargs, )
def _read_data( data_file: str, mask_file: str | None, reader: Callable[..., Any], col_subset: str | list[str] | tuple[str] | None, data_kwargs: dict[str, Any], mask_kwargs: dict[str, Any], ) -> tuple[pd.DataFrame, pd.DataFrame, dict[str, Any]]: """ Helper function for reading data files from disk. Parameters ---------- data_file : str Path to data file. mask_file : str Path to mask file. reader : Callable Function to read `data_file` and/or `mask_file`. col_subset : str or list of str or tuple of str Subset of columns to select from `data_file` and/or `mask_file`. data_kwargs : dict Keyword-arguments to read `data_file`. mask_kwargs : dict Keyword-arguments to read `mask_file`. Returns ------- tuple of pd.DataFrame, pd.DataFrame and dict Data as pd.DataFrame, mask as pd.DataFrame and information dictionary. """ data, info = reader( data_file, col_subset=col_subset, **data_kwargs, ) if mask_file is None: mask = pd.DataFrame() else: mask, _ = reader( mask_file, col_subset=col_subset, column_names=info["columns"], **mask_kwargs, ) return data, mask, info
[docs] def read_data( data_file: str, mask_file: str | None = None, info_file: str | None = None, data_format: SupportedFileTypes = "parquet", imodel: str | None = None, col_subset: str | list[str] | tuple[str] | None = None, encoding: str | None = None, delimiter: str | None = None, **kwargs: Any, ) -> DataBundle: r""" Read MDF data which is already on a pre-defined data model. Parameters ---------- data_file : str The data file (including path) to be read. mask_file : str, optional The validation file (including path) to be read. info_file : str, optional The information file (including path) to be read. data_format : {"csv", "parquet", "feather"}, default: "parquet" Format of input data file(s). imodel : str, optional Name of internally available input data model, e.g. icoads_r300_d704. col_subset : str, tuple or list, optional Specify the section or sections of the file to write. - For multiple sections of the tables: e.g col_subset = [columns0,...,columnsN] - For a single section: e.g. list type object col_subset = [columns] Column labels could be both string or tuple. encoding : str, optional The encoding of the input file. Overrides the value in the imodel schema file. delimiter : str, optional The delimiter used in the input file. Overrides the value in the imodel schema file. \**kwargs : Any Key-word arguments that will be passed to read fuunction. Returns ------- cdm_reader_mapper.DataBundle DataBundle containing MDF data. See Also -------- read: Read original marine-meteorological data as well as MDF data or CDM tables from disk. read_mdf : Read original marine-meteorological data from disk. read_tables : Read CDM tables from disk. write: Write both MDF data or CDM tables to disk. write_data : Write MDF data and validation mask to disk. write_tables : Write CDM tables to disk. """ supported_file_types = get_args(SupportedFileTypes) if data_format not in supported_file_types: raise ValueError(f"data_format must be one of {supported_file_types}, not {data_format}.") data_kwargs = kwargs.copy() mask_kwargs = kwargs.copy() parse_dates = False if data_format == "csv": info_dict = open_json_file(info_file) if info_file else {} dtype = info_dict.get("dtypes", "object") parse_dates = info_dict.get("parse_dates", False) encoding = encoding or info_dict.get("encoding") delimiter = delimiter or info_dict.get("delimiter") data_kwargs.setdefault("dtype", dtype) data_kwargs.setdefault("parse_dates", parse_dates) data_kwargs.setdefault("encoding", encoding) mask_kwargs.setdefault("dtype", "boolean") mask_kwargs.setdefault("delimiter", delimiter) data, mask, info = _read_data( data_file=data_file, mask_file=mask_file, reader=READERS[data_format], col_subset=col_subset, data_kwargs=data_kwargs, mask_kwargs=mask_kwargs, ) return DataBundle( data=data, columns=info["columns"], dtypes=info["dtypes"], parse_dates=parse_dates, mask=mask, imodel=imodel, encoding=encoding, )