"""Common Data Model (CDM) MDF writer."""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Iterable, get_args
import pandas as pd
from .utils.utilities import join, update_column_names, update_dtypes
from ..common import get_filename
from ..common.iterators import (
ParquetStreamReader,
is_valid_iterator,
parquet_stream_from_iterable,
)
from ..properties import SupportedFileTypes
WRITERS = {
"csv": "to_csv",
"parquet": "to_parquet",
"feather": "to_feather",
}
def _normalize_data_chunks(
data: pd.DataFrame | Iterable[pd.DataFrame] | None,
) -> list | ParquetStreamReader:
"""Helper function to normalize data chunks."""
if data is None:
data = pd.DataFrame()
if isinstance(data, pd.DataFrame):
return [data]
if is_valid_iterator(data):
if not isinstance(data, ParquetStreamReader):
data = parquet_stream_from_iterable(data)
return data.copy()
if isinstance(data, (list, tuple)):
return parquet_stream_from_iterable(data)
raise TypeError(f"Unsupported data type found: {type(data)}.")
[docs]
def write_data(
data: pd.DataFrame | Iterable[pd.DataFrame],
mask: pd.DataFrame | Iterable[pd.DataFrame] | None = None,
data_format: SupportedFileTypes = "parquet",
dtypes: pd.Series | dict | None = None,
parse_dates: list | bool = False,
encoding: str = "utf-8",
out_dir: str = ".",
prefix: str | None = None,
suffix: str | None = None,
extension: str = None,
filename: str | dict | None = None,
separator: str | None = "_",
col_subset: str | list[str] | tuple[str] | None = None,
delimiter: str = ",",
**kwargs,
) -> None:
"""Write pandas.DataFrame to MDF file on file system.
Parameters
----------
data: pandas.DataFrame or Iterable[pd.DataFrame]
Data to export.
mask: pandas.DataFrame or Iterable[pd.DataFrame], optional
Validation mask to export.
data_format: {"csv", "parquet", "feather"}, default: "parquet"
Format of output data file(s).
dtypes: dict, optional
Dictionary of data types on ``data``.
Dump ``dtypes`` and ``parse_dates`` to json information file.
parse_dates: list | bool, default: False
Information of how to parse dates in :py:attr:`data`.
Dump ``dtypes`` and ``parse_dates`` to json information file.
For more information see :py:func:`pandas.read_csv`.
encoding: str, default: "utf-8"
A string representing the encoding to use in the output file, defaults to utf-8.
out_dir: str, default: "."
Path to the output directory.
prefix: str, optional
Prefix of file name structure: ``<prefix>-data-*<suffix>.<extension>``.
suffix: str, optional
Suffix of file name structure: ``<prefix>-data-*<suffix>.<extension>``.
extension: str, optional
Extension of file name structure: ``<prefix>-data-*<suffix>.<extension>``.
By default, extension depends on `data_format`.
separator : str, optional
Separator to join the file name pattern components (default "_").
filename: str or dict, optional
Name of the output file name(s).
List one filename for both ``data`` and ``mask`` ({"data":<filenameD>, "mask":<filenameM>}).
By default, automatically create file name from table name, ``prefix`` and ``suffix``.
col_subset: str, tuple or list, optional
Specify the section or sections of the file to write.
- For multiple sections of the tables:
e.g col_subset = [columns0,...,columnsN]
- For a single section:
e.g. list type object col_subset = [columns]
Column labels could be both string or tuple.
delimiter: str, default: ","
Character or regex pattern to treat as the delimiter while reading with df.to_csv.
See Also
--------
write: Write either MDF data or CDM tables to disk.
write_tables : Write CDM tables to disk.
read: Read either original marine-meteorological data or MDF data or CDM tables from disk.
read_data : Read MDF data and validation mask from disk.
read_mdf : Read original marine-meteorological data from disk.
read_tables : Read CDM tables from disk.
Note
----
Use this function after reading MDF data.
"""
supported_file_types = get_args(SupportedFileTypes)
if data_format not in supported_file_types:
raise ValueError(
f"data_format must be one of {supported_file_types}, not {data_format}."
)
if mask is not None and not isinstance(mask, type(data)):
raise ValueError("type of 'data' and type of 'mask' do not match.")
extension = extension or data_format
if not isinstance(dtypes, (dict, pd.Series)):
dtypes = {}
if isinstance(parse_dates, bool):
parse_dates = []
data_list = _normalize_data_chunks(data)
mask_list = _normalize_data_chunks(mask)
info = {
"dtypes": {k: str(v) for k, v in dtypes.items()},
"parse_dates": [join(p) for p in parse_dates],
}
logging.info(f"WRITING DATA TO FILES IN: {out_dir}")
out_dir = Path(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
filename_data = get_filename(
[prefix, "data", suffix], path=out_dir, extension=extension, separator=separator
)
filename_mask = get_filename(
[prefix, "mask", suffix], path=out_dir, extension=extension, separator=separator
)
filename_info = get_filename(
[prefix, "info", suffix], path=out_dir, extension="json", separator=separator
)
for i, (data_df, mask_df) in enumerate(zip(data_list, mask_list)):
if col_subset is not None:
data_df = data_df[col_subset]
mask_df = mask_df[col_subset]
if isinstance(data_df, pd.Series):
data_df = data_df.to_frame()
if isinstance(mask_df, pd.Series):
mask_df = mask_df.to_frame()
mode = "w" if i == 0 else "a"
header = [join(c) for c in data_df.columns] if i == 0 else False
if i == 0:
info["dtypes"] = update_dtypes(info["dtypes"], data_df.columns)
for col in data_df.columns:
info["dtypes"] = update_column_names(info["dtypes"], col, join(col))
info["parse_dates"] = [p for p in info["parse_dates"] if p in header]
info["encoding"] = encoding
write_kwargs = {}
if data_format == "csv":
write_kwargs = dict(
header=header,
mode=mode,
index=False,
sep=delimiter,
encoding=encoding,
**kwargs,
)
if data_format == "parquet":
write_kwargs = dict(
engine="pyarrow",
compression="snappy",
)
writer = WRITERS[data_format]
getattr(data_df, writer)(filename_data, **write_kwargs)
if not mask_df.empty:
getattr(mask_df, writer)(filename_mask, **write_kwargs)
if data_format == "csv":
with open(filename_info, "w") as fileObj:
json.dump(info, fileObj, indent=4)