"""Auxiliary functions and class for reading, converting, decoding and validating MDF files."""
from __future__ import annotations
import logging
from collections.abc import Iterable, Mapping, Sequence
from dataclasses import replace
from pathlib import Path
from typing import Any
import dask # noqa: F401
import h5netcdf # noqa: F401
import h5py # noqa: F401
import pandas as pd
import xarray as xr
from cdm_reader_mapper.common.iterators import ProcessFunction, process_function
from cdm_reader_mapper.core.databundle import DataBundle
from .. import properties
from .convert_and_decode import convert_and_decode
from .parser import (
ParserConfig,
build_parser_config,
parse_netcdf,
parse_pandas,
update_pd_config,
update_xr_config,
)
from .utilities import remove_boolean_values
from .validators import validate
def _merge_kwargs(*dicts: Mapping[str, Any]) -> dict[str, Any]:
r"""
Merge multiple keyword-argument dictionarie.
Parameters
----------
\*dicts : Sequence[Mapping[str, Any]]
Sequence of dictionaries to be merged together.
Returns
-------
dict
A combined dictionary.
"""
merged = {}
for d in dicts:
for k in d:
if k in merged:
raise ValueError(f"Duplicate kwarg '{k}' in open_data()")
merged[k] = d[k]
return merged
def _apply_multiindex(df: pd.DataFrame) -> pd.DataFrame:
"""
Convert tuple-based columns to a pandas MultiIndex.
Parameters
----------
df : pd.DataFrame
Data for which tuple-based columns should be converted to a pandas MultiIndex.
Returns
-------
pd.DataFrame
Data with converted column type.
"""
if not df.columns.map(lambda x: isinstance(x, tuple)).all():
return df
df.columns = pd.MultiIndex.from_tuples(
[col if isinstance(col, tuple) else (None, col) for col in df.columns],
)
return df
def _select_years(
df: pd.DataFrame,
selection: tuple[int | None, int | None],
year_col: str | tuple[str, str] | None,
) -> pd.DataFrame:
"""
Filter rows of a DataFrame by a year range.
Parameters
----------
df : pd.DataFrame
Data for which rows should be selected.
selection : tuple of int or None
Left and right borders of range.
year_col : str or tuple of str
Column that contains year information.
Returns
-------
pd.DataFrame
Data with selected rows only.
"""
year_init, year_end = selection
if year_init is None and year_end is None:
return df
if year_col is None:
return df
years = pd.to_numeric(df[year_col], errors="coerce")
mask = pd.Series(True, index=df.index)
if year_init is not None:
mask &= years >= year_init
if year_end is not None:
mask &= years <= year_end
mask &= years.notna()
return df.loc[mask].reset_index(drop=True)
[docs]
class FileReader:
"""
Class to read marine-meteorological data.
Provides a high-level interface to read, parse, filter, convert,
decode, and validate data from multiple sources (FWF, CSV, NetCDF).
Parameters
----------
imodel : str, optional
Name of the data model (e.g., 'ICOADS').
ext_schema_path : str or Path, optional
Directory of external MDF schema file.
ext_schema_file : str or Path, optional
Path to external MDF schema file.
"""
def __init__(
self,
imodel: str | None,
ext_schema_path: str | Path | None = None,
ext_schema_file: str | Path | None = None,
):
"""
Initialize FileReader with a data model and parser configuration.
Parameters
----------
imodel : str, optional
Name of the data model (e.g., 'ICOADS').
ext_schema_path : str or Path, optional
Directory of external MDF schema file.
ext_schema_file : str or Path, optional
Path to external MDF schema file.
"""
self.imodel = imodel
self.config: ParserConfig = build_parser_config(
imodel=imodel,
ext_schema_path=ext_schema_path,
ext_schema_file=ext_schema_file,
)
def _process_data(
self,
data: pd.DataFrame | Iterable[pd.DataFrame],
convert_flag: bool = False,
decode_flag: bool = False,
converter_dict: dict[str, Any] | None = None,
converter_kwargs: dict[str, Any] | None = None,
decoder_dict: dict[str, Any] | None = None,
validate_flag: bool = False,
ext_table_path: str | None = None,
sections: Sequence[str] | None = None,
excludes: Sequence[str] | None = None,
year_init: int | None = None,
year_end: int | None = None,
config: ParserConfig | None = None,
parse_mode: str = "pandas",
) -> tuple[pd.DataFrame, pd.DataFrame, ParserConfig]:
"""
Core processing of raw data: parse, filter, convert, decode, validate.
Parameters
----------
data : pandas.DataFrame or Iterable[pd.DataFrame]
Input data.
convert_flag : bool
Whether to apply converters.
decode_flag : bool
Whether to apply decoders.
converter_dict : dict, optional
Mapping of columns to converter functions.
converter_kwargs : dict, optional
Keyword arguments for converters.
decoder_dict : dict, optional
Mapping of columns to decoder functions.
validate_flag : bool
Whether to apply validation.
ext_table_path : str, optional
Path to external validation tables.
sections : sequence of str, optional
Sections to include.
excludes : sequence of str, optional
Sections to exclude.
year_init : int, optional
Initial year for filtering.
year_end : int, optional
End year for filtering.
config : ParserConfig, optional
Parser configuration.
parse_mode : str
Parsing backend ('pandas' or 'netcdf').
Returns
-------
tuple of (data, mask, config)
- data : pandas.DataFrame with parsed, filtered, converted data
- mask : pandas.DataFrame with boolean mask for validation
- config : ParserConfig updated with final columns
"""
config = config or self.config
if parse_mode == "pandas":
data = parse_pandas(data, config.order_specs, sections, excludes)
elif parse_mode == "netcdf":
data = parse_netcdf(data, config.order_specs, sections, excludes)
else:
raise ValueError("parse_mode must be 'pandas' or 'netcdf'")
data = _apply_multiindex(data)
if self.imodel:
data_model = self.imodel.split("_")[0]
factorize = properties.factorize.get(data_model)
year_col = properties.year_column.get(data_model)
else:
data_model = None
factorize = None
year_col = None
if factorize:
source_column = factorize["source"]
target_column = factorize["target"]
codes, _ = pd.factorize(data[source_column])
data[target_column] = pd.Series(codes)
data = _select_years(data, (year_init, year_end), year_col)
converter_dict = converter_dict or config.convert_decode["converter_dict"]
converter_kwargs = converter_kwargs or config.convert_decode["converter_kwargs"]
decoder_dict = decoder_dict or config.convert_decode["decoder_dict"]
data = convert_and_decode(
data,
convert_flag=convert_flag,
decode_flag=decode_flag,
converter_dict=converter_dict,
converter_kwargs=converter_kwargs,
decoder_dict=decoder_dict,
)
if validate_flag:
mask = validate(
data,
imodel=self.imodel,
ext_table_path=ext_table_path,
attributes=config.validation,
disables=config.disable_reads,
)
else:
mask = pd.DataFrame(True, index=data.index, columns=data.columns)
data = remove_boolean_values(data, config.dtypes)
config = replace(config, columns=data.columns)
if config.encoding not in [None, "utf-8"]:
object_columns = data.select_dtypes(include=["object", "string"]).columns
for object_column in object_columns:
data[object_column] = data[object_column].str.encode(config.encoding).str.decode("utf-8")
return data, mask, config
[docs]
@process_function()
def open_data(
self,
source: str,
open_with: str = "pandas",
pd_kwargs: dict[str, Any] | None = None,
xr_kwargs: dict[str, Any] | None = None,
convert_kwargs: dict[str, Any] | None = None,
decode_kwargs: dict[str, Any] | None = None,
validate_kwargs: dict[str, Any] | None = None,
select_kwargs: dict[str, Any] | None = None,
) -> tuple[pd.DataFrame, pd.DataFrame, ParserConfig] | tuple[Iterable[pd.DataFrame], Iterable[pd.DataFrame], ParserConfig]:
"""
Open and parse source data according to parser configuration.
Parameters
----------
source : str
Path or pattern for input file(s).
open_with : str
Parser backend: 'pandas' or 'netcdf'.
pd_kwargs : dict, optional
Additional key-word arguments for parsing pandas-readable data.
xr_kwargs : dict, optional
Additional key-word arguments for parsing xarray-readable data.
convert_kwargs : dict, optional
Additional key-word arguments for data conversion.
decode_kwargs : dict, optional
Additional key-word arguments for data decoding.
validate_kwargs : dict, optional
Additional key-word arguments for data validation.
select_kwargs : dict, optional
Additional key-word arguments for selecting/filtering data.
Returns
-------
tuple
(data, mask, config) or chunked equivalents if using Iterable[pd.DataFrame].
"""
@process_function()
def _open_data() -> ProcessFunction:
"""
Open source data according to parser consiguration.
Returns
-------
ProcessFunction
Containing data, mask and a configuration dictionary.
"""
return ProcessFunction(
data=to_parse,
func=self._process_data,
func_kwargs=func_kwargs,
makecopy=False,
)
pd_kwargs = dict(pd_kwargs or {})
xr_kwargs = dict(xr_kwargs or {})
convert_kwargs = convert_kwargs or {}
decode_kwargs = decode_kwargs or {}
validate_kwargs = validate_kwargs or {}
select_kwargs = select_kwargs or {}
func_kwargs = _merge_kwargs(
convert_kwargs,
decode_kwargs,
validate_kwargs,
select_kwargs,
)
func_kwargs["parse_mode"] = open_with
if open_with == "netcdf":
to_parse = xr.open_mfdataset(source, **xr_kwargs).squeeze()
config = update_xr_config(to_parse, self.config)
elif open_with == "pandas":
config = update_pd_config(pd_kwargs, self.config)
pd_kwargs["encoding"] = config.encoding
pd_kwargs.setdefault("widths", [properties.MAX_FULL_REPORT_WIDTH])
pd_kwargs.setdefault("header", None)
pd_kwargs.setdefault("quotechar", "\0")
pd_kwargs.setdefault("escapechar", "\0")
pd_kwargs.setdefault("dtype", object)
pd_kwargs.setdefault("skip_blank_lines", False)
to_parse = pd.read_fwf(source, **pd_kwargs)
else:
raise ValueError("open_with must be 'pandas' or 'netcdf'")
func_kwargs["config"] = config
result = _open_data()
return tuple(result)
[docs]
def read(
self,
source: str,
pd_kwargs: dict[str, Any] | None = None,
xr_kwargs: dict[str, Any] | None = None,
convert_kwargs: dict[str, Any] | None = None,
decode_kwargs: dict[str, Any] | None = None,
validate_kwargs: dict[str, Any] | None = None,
select_kwargs: dict[str, Any] | None = None,
) -> DataBundle:
"""
Read and process data from the given source.
Parameters
----------
source : str
Path to input file(s).
pd_kwargs : dict, optional
Additional key-word arguments for parsing pandas-readable data.
xr_kwargs : dict, optional
Additional key-word arguments for parsing xarray-readable data.
convert_kwargs : dict, optional
Additional key-word arguments for data conversion.
decode_kwargs : dict, optional
Additional key-word arguments for data decoding.
validate_kwargs : dict, optional
Additional key-word arguments for data validation.
select_kwargs : dict, optional
Additional key-word arguments for selecting/filtering data.
Returns
-------
DataBundle
Container with processed data, mask, columns, dtypes, and metadata.
Notes
-----
All kwargs are forwarded to ``open_data`` to customize the
parsing, conversion, decoding, validation, and selection steps.
"""
logging.info("EXTRACTING DATA FROM MODEL: %s", self.imodel)
logging.info("Reading and parsing source data...")
if self.imodel is None:
open_with = "pandas"
else:
open_with = properties.open_file.get(self.imodel, "pandas")
result = self.open_data(
source,
open_with=open_with,
pd_kwargs=pd_kwargs,
xr_kwargs=xr_kwargs,
convert_kwargs=convert_kwargs,
decode_kwargs=decode_kwargs,
validate_kwargs=validate_kwargs,
select_kwargs=select_kwargs,
)
if not isinstance(result, tuple) or len(result) != 3:
raise RuntimeError("open_data() must return (data, mask, config)")
data, mask, config = result
return DataBundle(
data=data,
columns=config.columns,
dtypes=config.dtypes,
parse_dates=config.parse_dates,
encoding="utf-8",
mask=mask,
imodel=self.imodel,
)