Source code for cdm_reader_mapper.mdf_reader.writer

"""Common Data Model (CDM) MDF writer."""

from __future__ import annotations
import json
import logging
from collections.abc import Iterable, Sequence
from pathlib import Path
from typing import Any, get_args

import pandas as pd

from ..common import get_filename
from ..common.iterators import (
    ParquetStreamReader,
    is_valid_iterator,
    parquet_stream_from_iterable,
)
from ..properties import SupportedFileTypes
from .utils.utilities import join, update_column_names, update_dtypes


WRITERS = {
    "csv": "to_csv",
    "parquet": "to_parquet",
    "feather": "to_feather",
}


def _validate_write_inputs(
    data: pd.DataFrame | Iterable[pd.DataFrame],
    mask: pd.DataFrame | Iterable[pd.DataFrame],
    data_format: str,
    supported: Sequence[str],
) -> None:
    """
    Validate writing inputs.

    Parameters
    ----------
    data : pd.DataFrame or Iterable of pd.DataFrame
        Data to export.
    mask : pd.DataFrame or Iterable of pd.DataFrame
        Validation mask to export.
    data_format : str
        Format of output data file(s).
    supported : Sequence of str
        Names of supported data models.

    Raises
    ------
    ValueError
        If `data_format` is not in `supported`.
        If type of `data` does not match type of `mask`.
    """
    if data_format not in supported:
        raise ValueError(f"data_format must be one of {supported}, not {data_format}.")

    if mask is not None and not isinstance(mask, type(data)):
        raise ValueError("Type of 'data' and type of 'mask' do not match.")


def _build_info(dtypes: dict[Any, Any], parse_dates: list[Any]) -> dict[str, Any]:
    """
    Build information dictionary.

    Parameters
    ----------
    dtypes : dict
        Dictionary of data types on `data`.
    parse_dates : list of Any
        Information of how to parse dates.

    Returns
    -------
    dict
        Dictionary including information about both data types and parse dates.
    """
    return {
        "dtypes": {k: str(v) for k, v in dtypes.items()},
        "parse_dates": [join(p) for p in parse_dates],
    }


def _get_write_kwargs(data_format: str, header: Any, mode: str, encoding: str, delimiter: str, kwargs: dict[str, Any]) -> dict[str, Any]:
    """
    Build keyword arguments for writing data in different formats.

    Parameters
    ----------
    data_format : str
        Output format, e.g. 'csv' or 'parquet'.
    header : Any
        Header configuration used for CSV output.
    mode : str
        File write mode (e.g. 'w', 'a').
    encoding : str
        Encoding used for text-based formats.
    delimiter : str
        Column separator used for CSV output.
    kwargs : dict[str, Any]
        Additional format-specific keyword arguments.

    Returns
    -------
    dict[str, Any]
        Keyword arguments passed to the underlying writer.

    Notes
    -----
    - For 'csv', returns full pandas-compatible kwargs.
    - For 'parquet', returns fixed engine and compression settings.
    """
    if data_format == "csv":
        return dict(
            header=header,
            mode=mode,
            index=False,
            sep=delimiter,
            encoding=encoding,
            **kwargs,
        )
    if data_format == "parquet":
        return dict(engine="pyarrow", compression="snappy")
    return {}


def _normalize_data_chunks(
    data: pd.DataFrame | Iterable[pd.DataFrame] | None,
) -> list[pd.DataFrame] | ParquetStreamReader:
    """
    Helper function to normalize data chunks.

    Parameters
    ----------
    data : pd.DataFrame of Iterable of pd.DataFrame or None
        Data to be normalized.

    Returns
    -------
    list of pd.DataFrame or ParquetStreamReader
        Normalized data.

    Raises
    ------
    TypeError
        If `data` has an unsupported data type.
    """
    if data is None:
        data = pd.DataFrame()
    if isinstance(data, pd.DataFrame):
        return [data]
    if is_valid_iterator(data):
        if not isinstance(data, ParquetStreamReader):
            data = parquet_stream_from_iterable(data)
        return data.copy()
    if isinstance(data, (list, tuple)):
        return parquet_stream_from_iterable(data)
    raise TypeError(f"Unsupported data type found: {type(data)}.")


[docs] def write_data( data: pd.DataFrame | Iterable[pd.DataFrame], mask: pd.DataFrame | Iterable[pd.DataFrame] | None = None, data_format: SupportedFileTypes = "parquet", dtypes: pd.Series | dict[Any, Any] | None = None, parse_dates: list[Any] | bool = False, encoding: str = "utf-8", out_dir: str = ".", prefix: str | None = None, suffix: str | None = None, extension: str | None = None, filename: str | dict[str, str] | None = None, separator: str | None = "_", col_subset: str | list[str] | tuple[str] | None = None, delimiter: str = ",", **kwargs: Any, ) -> None: r""" Write pandas.DataFrame to MDF file on file system. Parameters ---------- data : pandas.DataFrame or Iterable[pd.DataFrame] Data to export. mask : pandas.DataFrame or Iterable[pd.DataFrame], optional Validation mask to export. data_format : {"csv", "parquet", "feather"}, default: "parquet" Format of output data file(s). dtypes : dict, optional Dictionary of data types on `data`. Dump `dtypes` and `parse_dates` to json information file. parse_dates : list | bool, default: False Information of how to parse dates in :py:attr:`data`. Dump `dtypes` and `parse_dates` to json information file. For more information see :py:func:`pandas.read_csv`. encoding : str, default: "utf-8" A string representing the encoding to use in the output file, defaults to utf-8. out_dir : str, default: "." Path to the output directory. prefix : str, optional Prefix of file name structure: `<prefix>-data-*<suffix>.<extension>`. suffix : str, optional Suffix of file name structure: `<prefix>-data-*<suffix>.<extension>`. extension : str, optional Extension of file name structure: `<prefix>-data-*<suffix>.<extension>`. By default, extension depends on `data_format`. filename : str or dict, optional Name of the output file name(s). List one filename for both `data` and `mask` ({"data":<filenameD>, "mask":<filenameM>}). By default, automatically create file name from table name, `prefix` and `suffix`. separator : str, optional Separator to join the file name pattern components (default "_"). col_subset : str, tuple or list, optional Specify the section or sections of the file to write. - For multiple sections of the tables: e.g col_subset = [columns0,...,columnsN] - For a single section: e.g. list type object col_subset = [columns] Column labels could be both string or tuple. delimiter : str, default: "," Character or regex pattern to treat as the delimiter while reading with df.to_csv. \**kwargs : Any Additional keyword-arguments passed to `to_csv` when `data_format` is 'csv'. Raises ------ ValueError If `data_foramt` is not one of 'csv', 'parquet' or 'feather'. If type of `data` and type of `mask` do not match. See Also -------- write : Write either MDF data or CDM tables to disk. write_tables : Write CDM tables to disk. read : Read either original marine-meteorological data or MDF data or CDM tables from disk. read_data : Read MDF data and validation mask from disk. read_mdf : Read original marine-meteorological data from disk. read_tables : Read CDM tables from disk. Notes ----- Use this function after reading MDF data. """ supported_file_types = get_args(SupportedFileTypes) _validate_write_inputs(data, mask, data_format, supported_file_types) extension = extension or data_format dtypes = dtypes if isinstance(dtypes, (dict, pd.Series)) else {} parse_dates = [] if isinstance(parse_dates, bool) else parse_dates data_list = _normalize_data_chunks(data) mask_list = _normalize_data_chunks(mask) info = _build_info(dtypes, parse_dates) logging.info("WRITING DATA TO FILES IN: %s", out_dir) out_dir_path = Path(out_dir) out_dir_path.mkdir(parents=True, exist_ok=True) filename_data = get_filename([prefix, "data", suffix], path=out_dir_path, extension=extension, separator=separator) filename_mask = get_filename([prefix, "mask", suffix], path=out_dir_path, extension=extension, separator=separator) filename_info = get_filename([prefix, "info", suffix], path=out_dir_path, extension="json", separator=separator) writer = WRITERS[data_format] for i, (data_df, mask_df) in enumerate(zip(data_list, mask_list, strict=True)): if col_subset is not None: data_df = data_df[col_subset] mask_df = mask_df[col_subset] data_df = data_df.to_frame() if isinstance(data_df, pd.Series) else data_df mask_df = mask_df.to_frame() if isinstance(mask_df, pd.Series) else mask_df mode = "w" if i == 0 else "a" header = [join(c) for c in data_df.columns] if i == 0 else False if i == 0: info["dtypes"] = update_dtypes(info["dtypes"], data_df.columns) if isinstance(info["dtypes"], dict): for col in data_df.columns: info["dtypes"] = update_column_names(info["dtypes"], col, join(col)) info["parse_dates"] = [p for p in info["parse_dates"] if isinstance(header, list) and p in header] info["encoding"] = encoding write_kwargs = _get_write_kwargs(data_format, header, mode, encoding, delimiter, kwargs) getattr(data_df, writer)(filename_data, **write_kwargs) if not mask_df.empty: getattr(mask_df, writer)(filename_mask, **write_kwargs) if data_format == "csv": with Path(filename_info).open("w") as f: json.dump(info, f, indent=4)