Source code for cdm_reader_mapper.mdf_reader.reader

"""Common Data Model (CDM) MDF reader."""

from __future__ import annotations

from pathlib import Path
from typing import Callable, Any, get_args

import pandas as pd

from cdm_reader_mapper import DataBundle

from ..common.json_dict import open_json_file

from .utils.filereader import FileReader
from .utils.utilities import validate_arg

from .utils.utilities import as_list, as_path, read_csv, read_parquet, read_feather

from ..properties import SupportedFileTypes

READERS = {
    "csv": read_csv,
    "parquet": read_parquet,
    "feather": read_feather,
}


def validate_read_mdf_args(
    *,
    source: str | Path,
    imodel: str | None = None,
    ext_schema_path: str | Path | None = None,
    ext_schema_file: str | Path | None = None,
    year_init: int | None = None,
    year_end: int | None = None,
    chunksize: int | None = None,
    skiprows: int | None = None,
):
    """
    Validate arguments for reading an MDF file.

    This function performs validation on file paths and numeric arguments
    required for reading an MDF dataset.

    Raises
    ------
    FileNotFoundError
        If the source file does not exist.
    ValueError
        If required arguments are missing or numeric constraints are violated.
    """
    source = as_path(source, "source")

    if not source.exists():
        raise FileNotFoundError(f"Source file not found: {source}")

    if not imodel and not (ext_schema_path or ext_schema_file):
        raise ValueError(
            "One of imodel or ext_schema_path/ext_schema_file must be provided"
        )

    validate_arg("chunksize", chunksize, int)
    if chunksize is not None and chunksize <= 0:
        raise ValueError("chunksize must be a positive integer")

    validate_arg("skiprows", skiprows, int)
    if skiprows is not None and skiprows < 0:
        raise ValueError("skiprows must be >= 0")

    if year_init is not None and year_end is not None:
        if year_init > year_end:
            raise ValueError("year_init must be <= year_end")


[docs] def read_mdf( source, imodel: str | None = None, ext_schema_path: str | None = None, ext_schema_file: str | None = None, ext_table_path: str | None = None, year_init: int | None = None, year_end: int | None = None, encoding: str | None = None, chunksize: int | None = None, skiprows: int = None, convert_flag: bool = True, converter_dict: dict | None = None, converter_kwargs: dict | None = None, decode_flag: bool = True, decoder_dict: dict | None = None, validate_flag: bool = True, sections: str | list | None = None, excludes: str | list | None = None, pd_kwargs: dict | None = None, xr_kwargs: dict | None = None, ) -> DataBundle: """Read data files compliant with a user specific data model. Reads a data file to a pandas DataFrame using a pre-defined data model. Read data is validates against its data model producing a boolean mask on output. The data model needs to be input to the module as a named model (included in the module) or as the path to a valid data model. Parameters ---------- source: str The file (including path) to be read. imodel: str, optional Name of internally available input data model. e.g. icoads_r300_d704 ext_schema_path: str, optional The path to the external input data model schema file. The schema file must have the same name as the directory. One of ``imodel`` and ``ext_schema_path`` or ``ext_schema_file`` must be set. ext_schema_file: str, optional The external input data model schema file. One of ``imodel`` and ``ext_schema_path`` or ``ext_schema_file`` must be set. year_init: str or int, optional Left border of time axis. year_end: str or int, optional Right border of time axis. encoding : str, optional The encoding of the input file. Overrides the value in the imodel schema file. chunksize : int, optional Number of reports per chunk. skiprows : int, optional Number of initial rows to skip from file, default: 0 convert_flag: bool, default: True If True convert entries by using a pre-defined data model. converter_dict: dict of {Hashable: func}, optional Functions for converting values in specific columns. If None use information from a pre-defined data model. converter_kwargs: dict of {Hashable: kwargs}, optional Key-word arguments for converting values in specific columns. If None use information from a pre-defined data model. decode_flag: bool, default: True If True decode entries by using a pre-defined data model. decoder_dict: dict of {Hashable: func}, optional Functions for decoding values in specific columns. If None use information from a pre-defined data model. validate_flag: bool, default: True Validate data entries by using a pre-defined data model. sections : list, optional List with subset of data model sections to output, optional If None read pre-defined data model sections. pd_kwargs: dict, optional Additional pandas arguments xr_kwargs: dict, optional Additional xarray arguments Returns ------- cdm_reader_mapper.DataBundle See Also -------- read: Read either original marine-meteorological or MDF data or CDM tables from disk. read_data : Read MDF data and validation mask from disk. read_tables : Read CDM tables from disk. write: Write either MDF data or CDM tables to disk. write_data : Write MDF data and validation mask to disk. write_tables : Write CDM tables to disk. """ skiprows = skiprows or 0 validate_read_mdf_args( source=source, imodel=imodel, ext_schema_path=ext_schema_path, ext_schema_file=ext_schema_file, year_init=year_init, year_end=year_end, chunksize=chunksize, skiprows=skiprows, ) pd_kwargs = pd_kwargs or {} pd_kwargs.setdefault("encoding", encoding) pd_kwargs.setdefault("chunksize", chunksize) pd_kwargs.setdefault("skiprows", skiprows) xr_kwargs = xr_kwargs or {} convert_kwargs = dict( convert_flag=convert_flag, converter_dict=converter_dict, converter_kwargs=converter_kwargs, ) decode_kwargs = dict( decode_flag=decode_flag, decoder_dict=decoder_dict, ) validate_kwargs = dict( validate_flag=validate_flag, ext_table_path=ext_table_path, ) sections = as_list(sections) excludes = as_list(excludes) validate_arg("sections", sections, list) validate_arg("excludes", excludes, list) select_kwargs = dict( sections=sections, excludes=excludes, year_init=year_init, year_end=year_end, ) filereader = FileReader( imodel=imodel, ext_schema_path=ext_schema_path, ext_schema_file=ext_schema_file, ) return filereader.read( source=source, pd_kwargs=pd_kwargs, xr_kwargs=xr_kwargs, convert_kwargs=convert_kwargs, decode_kwargs=decode_kwargs, validate_kwargs=validate_kwargs, select_kwargs=select_kwargs, )
def _read_data( data_file: str, mask_file: str | None, reader: Callable[..., Any], col_subset: str | list | tuple | None, data_kwargs: dict, mask_kwargs: dict, ): """Helper function for reading data files from disk.""" data, info = reader( data_file, col_subset=col_subset, **data_kwargs, ) if mask_file is None: mask = pd.DataFrame() else: mask, _ = reader( mask_file, col_subset=col_subset, column_names=info["columns"], **mask_kwargs, ) return data, mask, info
[docs] def read_data( data_file: str, mask_file: str | None = None, info_file: str | None = None, data_format: SupportedFileTypes = "parquet", imodel: str | None = None, col_subset: str | list | tuple | None = None, encoding: str | None = None, delimiter: str | None = None, **kwargs, ) -> DataBundle: """Read MDF data which is already on a pre-defined data model. Parameters ---------- data_file: str The data file (including path) to be read. mask_file: str, optional The validation file (including path) to be read. info_file: str, optional The information file (including path) to be read. data_format: {"csv", "parquet", "feather"}, default: "parquet" Format of input data file(s). imodel: str, optional Name of internally available input data model. e.g. icoads_r300_d704 col_subset: str, tuple or list, optional Specify the section or sections of the file to write. - For multiple sections of the tables: e.g col_subset = [columns0,...,columnsN] - For a single section: e.g. list type object col_subset = [columns] Column labels could be both string or tuple. encoding : str, optional The encoding of the input file. Overrides the value in the imodel schema file. delimiter : str, optional The delimiter used in the input file. Overrides the value in the imodel schema file. Returns ------- cdm_reader_mapper.DataBundle See Also -------- read: Read original marine-meteorological data as well as MDF data or CDM tables from disk. read_mdf : Read original marine-meteorological data from disk. read_tables : Read CDM tables from disk. write: Write both MDF data or CDM tables to disk. write_data : Write MDF data and validation mask to disk. write_tables : Write CDM tables to disk. """ supported_file_types = get_args(SupportedFileTypes) if data_format not in supported_file_types: raise ValueError( f"data_format must be one of {supported_file_types}, not {data_format}." ) data_kwargs = kwargs.copy() mask_kwargs = kwargs.copy() parse_dates = False if data_format == "csv": info_dict = open_json_file(info_file) if info_file else {} dtype = info_dict.get("dtypes", "object") parse_dates = info_dict.get("parse_dates", False) encoding = encoding or info_dict.get("encoding") delimiter = delimiter or info_dict.get("delimiter") data_kwargs.setdefault("dtype", dtype) data_kwargs.setdefault("parse_dates", parse_dates) data_kwargs.setdefault("encoding", encoding) mask_kwargs.setdefault("dtype", "boolean") mask_kwargs.setdefault("delimiter", delimiter) data, mask, info = _read_data( data_file=data_file, mask_file=mask_file, reader=READERS[data_format], col_subset=col_subset, data_kwargs=data_kwargs, mask_kwargs=mask_kwargs, ) return DataBundle( data=data, columns=info["columns"], dtypes=info["dtypes"], parse_dates=parse_dates, mask=mask, imodel=imodel, encoding=encoding, )