Source code for cdm_reader_mapper.core.databundle

"""Common Data Model (CDM) DataBundle class."""

from __future__ import annotations
from collections.abc import Iterable, Sequence
from typing import Any, Literal

import pandas as pd

from cdm_reader_mapper.cdm_mapper.mapper import map_model
from cdm_reader_mapper.common import (
    count_by_cat,
    get_length,
    replace_columns,
    split_by_boolean_false,
    split_by_boolean_true,
    split_by_column_entries,
    split_by_index,
)
from cdm_reader_mapper.common.iterators import ParquetStreamReader, is_valid_iterator
from cdm_reader_mapper.duplicates.duplicates import DupDetect, duplicate_check
from cdm_reader_mapper.metmetpy import (
    correct_datetime,
    correct_pt,
    validate_datetime,
    validate_id,
)

from ._utilities import (
    SubscriptableMethod,
    _copy,
    _normalize_data_input,
    _normalize_mask_input,
    _validate_mode,
    combine_attribute_values,
    reader_method,
)
from .writer import write


properties = {
    "data",
    "columns",
    "dtypes",
    "mask",
    "imodel",
    "mode",
    "parse_dates",
    "encoding",
}


[docs] class DataBundle: r""" Container for tabular data and associated metadata. This class wraps either an in-memory `pd.DataFrame` or a `ParquetStreamReader` for chunked, disk-backed processing. It provides a unified interface for accessing DataFrame-like attributes and methods, transparently handling streaming data where required. Parameters ---------- data : pandas.DataFrame or Iterable[pandas.DataFrame] or ParquetStreamReader, optional Input data. If an iterable is provided, it is converted into a `ParquetStreamReader` for streaming. columns : pandas.Index or pandas.MultiIndex or list, optional Column labels used when initializing empty data. dtypes : pandas.Series or dict, optional Data types for columns. parse_dates : list or bool, optional Instructions for parsing dates. encoding : str, optional Encoding associated with the data. mask : pandas.DataFrame or Iterable[pandas.DataFrame] or ParquetStreamReader, optional Boolean mask aligned with `data`. If not provided, an empty mask is created. imodel : str, optional Name of the input data model. mode : {"data", "tables"}, default "data" Data representation mode. Examples -------- Getting a :py:class:`~DataBundle` while reading data from disk. >>> from cdm_reader_mapper import read_mdf >>> db = read_mdf(source="file_on_disk", imodel="custom_model_name") Constructing a :py:class:`~DataBundle` from already read MDf data. >>> from cdm_reader_mapper import DataBundle >>> read = read_mdf(source="file_on_disk", imodel="custom_model_name") >>> data_ = read.data >>> mask_ = read.mask >>> db = DataBundle(data=data_, mask=mask_) Constructing a :py:class:`~DataBundle` from already read CDM data. >>> from cdm_reader_mapper import read_tables >>> tables = read_tables("path_to_files").data >>> db = DataBundle(data=tables, mode="tables") """ def __init__( self, data: pd.DataFrame | Iterable[pd.DataFrame] | None = None, columns: pd.Index | pd.MultiIndex | list[Any] | None = None, dtypes: pd.Series | dict[str | tuple[str, str], Any] | None = None, parse_dates: list[Any] | bool | None = None, encoding: str | None = None, mask: pd.DataFrame | Iterable[pd.DataFrame] | None = None, imodel: str | None = None, mode: Literal["data", "tables"] = "data", ) -> None: """ Initialization of a DataBundle instance. Parameters ---------- data : pandas.DataFrame or Iterable[pandas.DataFrame] or ParquetStreamReader, optional Input data. If an iterable is provided, it is converted into a `ParquetStreamReader` for streaming. columns : pandas.Index or pandas.MultiIndex or list, optional Column labels used when initializing empty data. dtypes : pandas.Series or dict, optional Data types for columns. parse_dates : list or bool, optional Instructions for parsing dates. encoding : str, optional Encoding associated with the data. mask : pandas.DataFrame or Iterable[pandas.DataFrame] or ParquetStreamReader, optional Boolean mask aligned with `data`. If not provided, an empty mask is created. imodel : str, optional Name of the input data model. mode : {"data", "tables"}, default "data" Data representation mode. Raises ------ ValueError If `mode` is invalid. TypeError If `data` and/or `mask` has an unsupported type. """ _validate_mode(mode) data = _normalize_data_input(data, columns, dtypes) mask = _normalize_mask_input(mask, data) self._data: pd.DataFrame | ParquetStreamReader = data self._columns = columns self._dtypes = dtypes self._parse_dates = parse_dates self._encoding = encoding self._mask: pd.DataFrame | ParquetStreamReader = mask self._imodel = imodel self._mode = mode self.DupDetect: DupDetect | None = None def __len__(self) -> int: """ Length of :py:attr:`data`. Returns ------- int Number of rows in the underlying data. Raises ------ TypeError If the computed length is not an integer. """ length = get_length(self._data) if isinstance(length, int): return length raise TypeError(f"Length is not an integer: {length}, {type(length)}") def __getattr__(self, attr: str) -> Any: """ Apply attribute to :py:attr:`data` if attribute is not defined for :py:class:`~DataBundle` . Parameters ---------- attr : str Name of the attribute. Returns ------- Any Attribute value, callable wrapper, or computed result. Raises ------ AttributeError If the attribute does not exist. ValueError If the data stream is empty. TypeError If the underlying data type is unsupported. """ if attr.startswith("__") and attr.endswith("__"): raise AttributeError(f"DataBundle object has no attribute {attr}.") data = self._data if isinstance(data, pd.DataFrame): attr_func = getattr(data, attr) if not callable(attr_func): return attr_func return SubscriptableMethod(attr_func) if isinstance(data, ParquetStreamReader): # This allows db.read(), db.close(), db.get_chunk() to work if hasattr(data, attr): return getattr(data, attr) data = data.copy() try: first_chunk = data.get_chunk() except (StopIteration, ValueError) as err: raise ValueError("Cannot access attribute on empty data stream.") from err if not hasattr(first_chunk, attr): # Restore state before raising error data.prepend(first_chunk) raise AttributeError(f"DataFrame chunk has no attribute '{attr}'.") attr_value = getattr(first_chunk, attr) if callable(attr_value): # METHOD CALL (e.g., .dropna(), .fillna()) # Put the chunk BACK so the reader_method sees the full stream. data.prepend(first_chunk) def wrapped_reader_method(*args: Any, **kwargs: Any) -> ParquetStreamReader | None: return reader_method(data, attr, *args, **kwargs) return SubscriptableMethod(wrapped_reader_method) else: # PROPERTY ACCESS (e.g., .shape, .dtypes) # DO NOT put the chunk back yet. Pass the 'first_value' # and the 'data' iterator (which is now at chunk 2) to the combiner. # The combiner will consume the rest. return combine_attribute_values(attr_value, data, attr) raise TypeError(f"'data' is {type(data)}, expected DataFrame or ParquetStreamReader.") def __repr__(self) -> str: """ Return a string representation for :py:attr:`data`. Returns ------- str String representation for the underlying data. """ return self._data.__repr__() def __setitem__(self, item: Any, value: Any) -> None: """ Make class support item assignment for :py:attr:`data`. Parameters ---------- item : Any Column name or property key. value : Any Value to assign. """ if isinstance(item, str) and item in properties: setattr(self, item, value) else: self._data[item] = value def __getitem__(self, item: Any) -> Any: """ Make class subscriptable. Parameters ---------- item : Any Key or column name. Returns ------- Any Item `item` of underlying data. """ if isinstance(item, str): if hasattr(self, item): return getattr(self, item) return self._data.__getitem__(item) def _return_property(self, property: str) -> Any: """ Return an internal property if it exists. Parameters ---------- property : str Name of the attribute. Returns ------- Any Internal property `property`. """ if hasattr(self, property): return getattr(self, property) @property def data(self) -> pd.DataFrame | ParquetStreamReader: """ Underlying MDF data. Returns ------- pd.DataFrame or ParquetStreamReader Underlying MDf data. """ return self._return_property("_data") @data.setter def data(self, value: pd.DataFrame | ParquetStreamReader) -> None: """ Set the underlying MDF data. Parameters ---------- value : pandas.DataFrame or ParquetStreamReader Value to be set. """ self._data = value @property def columns(self) -> pd.Index | pd.MultiIndex: """ Column labels of :py:attr:`data`. Returns ------- pd.Index or pd.MultiIndex Column labels of the underlying MDf data. """ return self._data.columns @columns.setter def columns(self, value: pd.Index | pd.MultiIndex | list[Any]) -> None: """ Set column labels of the underlying MDF data. Parameters ---------- value : pandas.Index or pandas.MultiIndex or list Value to be set. """ self._columns = value @property def dtypes(self) -> pd.Series | dict[str, Any] | None: """ Dictionary of data types on :py:attr:`data`. Returns ------- pd.Series or dict or None Data types of underlying MDF data. """ return self._return_property("_dtypes") @property def parse_dates(self) -> list[Any] | bool | None: """ Information of how to parse dates in :py:attr:`data`. Returns ------- list or bool or None Information of how to parse dates in underlying MDF data. See Also -------- :py:func:`pd.read_csv` : Read CSV file using pandas. """ parse_dates_ = self._return_property("_parse_dates") if parse_dates_ is None: return None if isinstance(parse_dates_, (list, bool)): return parse_dates_ raise TypeError(f"parse_dates has type {type(parse_dates_)}; expected list[Any], bool, or None.") @property def encoding(self) -> str | None: """ A string representing the encoding to use in the :py:attr:`data`. Returns ------- str or None String representing the encoding to use in the underlying MDF data. See Also -------- :py:func:`pd.to_csv` : Write data with encoding to CSV file. """ encoding_ = self._return_property("_encoding") if encoding_ is None: return None if isinstance(encoding_, str): return encoding_ raise TypeError(f"encoding has type {type(encoding_)}; expected str or None.") @property def mask(self) -> pd.DataFrame | ParquetStreamReader: """ MDF validation mask. Returns ------- pd.DataFrame or ParquetStreamReader Validation mask of the underlying MDF data. """ return self._return_property("_mask") @mask.setter def mask(self, value: pd.DataFrame | ParquetStreamReader) -> None: """ Set the validation mask of underlying MDF data. Parameters ---------- value : pd.DataFrame or ParquetStreamReader Value to be set. """ self._mask = value @property def imodel(self) -> str | None: """ Name of the MDF/CDM input model. Returns ------- str or None Name of the MDF/CDM input model if available. """ imodel_ = self._return_property("_imodel") if imodel_ is None: return None if isinstance(imodel_, str): return imodel_ raise TypeError(f"imodel has type {type(imodel_)}; expected str or None.") @imodel.setter def imodel(self, value: str) -> None: """ Set the data model name of underlying MDF data. Parameters ---------- value : str Value to be set. """ self._imodel = value @property def mode(self) -> str: """ Data mode. Returns ------- str Current data mode. Raises ------ TypeError If mode of the underlying data is not a string. """ mode_ = self._return_property("_mode") if isinstance(mode_, str): return mode_ raise TypeError(f"mode_ has type {type(mode_)}; expected str.") @mode.setter def mode(self, value: Literal["data", "tables"]) -> None: """ Set the data mode name of underlying data. Parameters ---------- value : {'data', 'tables'} Value to be set Raises ------ ValueError If `value` is not one of `data` or `tables`. """ if value not in ("data", "tables"): raise ValueError("value must be one of 'data' or 'tables'.") self._mode = value def _return_db(self, db: DataBundle, inplace: bool) -> DataBundle | None: """ Return the resulting DataBundle depending on inplace mode. Parameters ---------- db : DataBundle The DataBundle instance containing updated data. inplace : bool If True modifications are applied in place and None is returned. Returns ------- :py:class:`~DataBundle` or None Returns `db` if `inplace` is False, otherwise None. """ if inplace is True: return None return db def _get_db(self, inplace: bool) -> DataBundle | None: """ Retrieve the target DataBundle for modification. Parameters ---------- inplace : bool If True return the current instance; otherwise return a copy. Returns ------- :py:class:`~DataBundle` or None The DataBundle instance to operate on. """ if inplace is True: return self return self.copy() def _stack(self, other: DataBundle | Sequence[DataBundle], datasets: str | Sequence[str], inplace: bool, **kwargs: Any) -> DataBundle | None: r""" Concatenate datasets from multiple DataBundle instances. Parameters ---------- other : :py:class:`~DataBundle` or Sequence of :py:class:`~DataBundle` Other DataBundle instances whose data should be stacked with the current instance. datasets : str or Sequence of str Dataset attribute name(s) to be concatenated (e.g., "data", "mask"). inplace : bool If True modify the current instance in place. \**kwargs : Any Additional keyword-arguments for stacking DataFrames. Returns ------- :py:class:`~DataBundle` or None Updated DataBundle if ``inplace`` is False, otherwise None. Raises ------ ValueError If any dataset is an iterator instead of a pandas DataFrame. """ db_cp = self._get_db(inplace) if isinstance(other, DataBundle): other = [other] if isinstance(datasets, str): datasets = [datasets] for data in datasets: data_attr = f"_{data}" df_cp = getattr(db_cp, data_attr, pd.DataFrame()) if is_valid_iterator(df_cp): raise ValueError("Data must be a pd.DataFrame not a iterable of pd.DataFrames.") to_concat = [df_cp] to_concat.extend(getattr(o, data_attr) for o in other if hasattr(o, data_attr)) if not any(d.empty for d in to_concat): concatenated = pd.concat(to_concat, **kwargs) else: concatenated = pd.DataFrame() concatenated = concatenated.reset_index(drop=True) setattr(db_cp, data_attr, concatenated) if db_cp is None: return None return self._return_db(db_cp, inplace)
[docs] def add(self, addition: dict[str, pd.DataFrame | pd.Series], inplace: bool = False) -> DataBundle | None: """ Adding information to a :py:class:`~DataBundle`. Parameters ---------- addition : dict Additional elements to add to the :py:class:`~DataBundle`. inplace : bool, default: False If True add datasets in :py:class:`~DataBundle` else return a copy of :py:class:`~DataBundle` with added datasets. Returns ------- :py:class:`~DataBundle` or None DataBundle with added information or None if "inplace=True". Examples -------- >>> tables = read_tables("path_to_files") >>> db = db.add({"data": tables}) """ db_ = self._get_db(inplace) for name, data in addition.items(): data_cp = _copy(data) setattr(db_, f"_{name}", data_cp) if db_ is None: return None return self._return_db(db_, inplace)
[docs] def copy(self) -> DataBundle: """ Make deep copy of a :py:class:`~DataBundle`. Returns ------- :py:class:`~DataBundle` Copy of a DataBundle. Examples -------- >>> db2 = db.copy() """ db = DataBundle() for key, value in self.__dict__.items(): value = _copy(value) setattr(db, key, value) return db
[docs] def stack_v( self, other: DataBundle | Sequence[DataBundle], datasets: str | Sequence[str] | Literal["data", "mask"] = ("data", "mask"), inplace: bool = False, **kwargs: Any, ) -> DataBundle | None: r""" Stack multiple :py:class:`~DataBundle`'s vertically. Parameters ---------- other : :py:class:`~DataBundle` or Sequence of :py:class:`~DataBundle` List of other :py:class:`~DataBundle` to stack vertically. datasets : str or Sequence of str, default: (data, mask) List of datasets to be stacked. inplace : bool, default: False If True overwrite datasets in :py:class:`~DataBundle` else return a copy of :py:class:`~DataBundle` with stacked datasets. \**kwargs : Any Additional keyword-arguments for stacking DataFrames vertically. Returns ------- :py:class:`~DataBundle` or None Vertically stacked DataBundle or None if "inplace=True". See Also -------- DataBundle.stack_h : Stack multiple DataBundle's horizontally. Notes ----- * This is only working with pd.DataFrames, not with iterables of pd.DataFrames! * The DataFrames in the :py:class:`~DataBundle` have to have the same data columns! Examples -------- >>> db = db1.stack_v(db2, datasets=["data", "mask"]) """ return self._stack(other, datasets, inplace, **kwargs)
[docs] def stack_h( self, other: DataBundle | Sequence[DataBundle], datasets: str | Sequence[str] | Literal["data", "mask"] = ("data", "mask"), inplace: bool = False, **kwargs: Any, ) -> DataBundle | None: r""" Stack multiple :py:class:`~DataBundle`'s horizontally. Parameters ---------- other : :py:class:`~DataBundle` or Sequence of :py:class:`~DataBundle` List of other :py:class:`~DataBundle` to stack horizontally. datasets : str or Sequence of str, default: [data, mask] List of datasets to be stacked. inplace : bool, default: False If True overwrite `datasets` in :py:class:`~DataBundle` else return a copy of :py:class:`~DataBundle` with stacked datasets. \**kwargs : Any Additional keyword-arguments for stacking DataFrames horizontally. Returns ------- :py:class:`~DataBundle` or None Horizontally stacked DataBundle or None if ``inplace=True``. See Also -------- DataBundle.stack_v : Stack multiple DataBundle's vertically. Notes ----- * This is only working with pd.DataFrames, not with iterables of pd.DataFrames! * The DataFrames in the :py:class:`~DataBundle` may have different data columns! Examples -------- >>> db = db1.stack_h(db2, datasets=["data", "mask"]) """ return self._stack(other, datasets, inplace, axis=1, join="outer", **kwargs)
[docs] def select_where_all_true(self, inplace: bool = False, do_mask: bool = True, **kwargs: Any) -> DataBundle | None: r""" Select rows from :py:attr:`data` where all column entries in :py:attr:`mask` are True. Parameters ---------- inplace : bool, default: False If True overwrite :py:attr:`data` in :py:class:`~DataBundle` else return a copy of :py:class:`~DataBundle` with valid values only in :py:attr:`data`. do_mask : bool, default: True If True also do selection on :py:attr:`mask`. \**kwargs : Any Additional keyword-arguments for splitting `data` where all entries are True. Returns ------- :py:class:`~DataBundle` or None DataBundle containing rows where all column entries in :py:attr:`mask` are True or None if ``inplace=True``. See Also -------- DataBundle.select_where_all_false : Select rows from `data` where all entries in `mask` are False. DataBundle.select_where_entry_isin : Select rows from `data` where column entries are in a specific value list. DataBundle.select_where_index_isin : Select rows from `data` within specific index list. Notes ----- For more information see :py:func:`split_by_boolean_true` Examples -------- Select without overwriting the old data. >>> db_selected = db.select_where_all_true() Select overwriting the old data. >>> db.select_where_all_true(inplace=True) >>> df_selected = db.data """ db_ = self._get_db(inplace) if db_ is None: return None _mask = _copy(db_._mask) db_._data, _, selected_idx, _ = split_by_boolean_true(db_._data, _mask, **kwargs) if do_mask is True: db_._mask, _, _, _ = split_by_index(db_._mask, selected_idx, **kwargs) return self._return_db(db_, inplace)
[docs] def select_where_all_false(self, inplace: bool = False, do_mask: bool = True, **kwargs: Any) -> DataBundle | None: r""" Select rows from :py:attr:`data` where all column entries in :py:attr:`mask` are False. Parameters ---------- inplace : bool, default: False If True overwrite :py:attr:`data` in :py:class:`~DataBundle` else return a copy of :py:class:`~DataBundle` with invalid values only in :py:attr:`data`. do_mask : bool, default: True If True also do selection on :py:attr:`mask`. \**kwargs : Any Additional keyword-arguments for splitting `data` where all entries are False. Returns ------- :py:class:`~DataBundle` or None DataBundle containing rows where all column entries in :py:attr:`mask` are False or None if ``inplace=True``. See Also -------- DataBundle.select_where_all_true : Select rows from `data` where all entries in `mask` are True. DataBundle.select_where_entry_isin : Select rows from `data` where column entries are in a specific value list. DataBundle.select_where_index_isin : Select rows from `data` within specific index list. Notes ----- For more information see :py:func:`split_by_boolean_false` Examples -------- Select without overwriting the old data. >>> db_selected = db.select_where_all_false() Select valid values only with overwriting the old data. >>> db.select_where_all_false(inplace=True) >>> df_selected = db.data """ db_ = self._get_db(inplace) if db_ is None: return None _mask = _copy(db_._mask) db_._data, _, selected_idx, _ = split_by_boolean_false(db_._data, _mask, **kwargs) if do_mask is True: db_._mask, _, _, _ = split_by_index(db_._mask, selected_idx, **kwargs) return self._return_db(db_, inplace)
[docs] def select_where_entry_isin( self, selection: dict[str | tuple[str, str], Sequence[Any]], inplace: bool = False, do_mask: bool = True, **kwargs: Any ) -> DataBundle | None: r""" Select rows from :py:attr:`data` where column entries are in a specific value list. Parameters ---------- selection : dict Keys: Column names in :py:attr:`data`. Values: Specific value list. inplace : bool, default: False If ``True`` overwrite :py:attr:`data` in :py:class:`~DataBundle` else return a copy of :py:class:`~DataBundle` with selected columns only in :py:attr:`data`. do_mask : bool, default: True If True also do selection on :py:attr:`mask`. \**kwargs : Any Additional keyword-arguments for splitting `data` where entries within a specific value list. Returns ------- :py:class:`~DataBundle` or None DataBundle containing rows where column entries are in a specific value list or None if ``inplace=True``. See Also -------- DataBundle.select_where_index_isin : Select rows from `data` within specific index list. DataBundle.select_where_all_true : Select rows from `data` where all entries in `mask` are True. DataBundle.select_where_all_false : Select rows from `data` where all entries in `mask` are False. Notes ----- For more information see :py:func:`split_by_column_entries` Examples -------- Select without overwriting the old data. >>> db_selected = db.select_where_entry_isin( ... selection={("c1", "B1"): [26, 41]}, ... ) Select with overwriting the old data. >>> db.select_where_entry_isin(selection={("c1", "B1"): [26, 41]}, inplace=True) >>> df_selected = db.data """ db_ = self._get_db(inplace) if db_ is None: return None db_._data, _, selected_idx, _ = split_by_column_entries(db_._data, selection, **kwargs) if do_mask is True: db_._mask, _, _, _ = split_by_index(db_._mask, selected_idx, **kwargs) return self._return_db(db_, inplace)
[docs] def select_where_index_isin(self, index: list[int], inplace: bool = False, do_mask: bool = True, **kwargs: Any) -> DataBundle | None: r""" Select rows from :py:attr:`data` where indexes within a specific index list. Parameters ---------- index : list of int Specific index list. inplace : bool, default: False If ``True`` overwrite :py:attr:`data` in :py:class:`~DataBundle` else return a copy of :py:class:`~DataBundle` with selected rows only in :py:attr:`data`. do_mask : bool, default: True If True also do selection on :py:attr:`mask`. \**kwargs : Any Additional keyword-arguments for splitting `data` where indexes within a specific index list. Returns ------- :py:class:`~DataBundle` or None DataBundle containing rows where indexes are within a specific index list or None if ``inplace=True``. See Also -------- DataBundle.select_where_entry_isin : Select rows from `data` where column entries are in a specific value list. DataBundle.select_where_all_true : Select rows from `data` where all entries in `mask` are True. DataBundle.select_where_all_false : Select rows from `data` where all entries in `mask` are False. Notes ----- For more information see :py:func:`split_by_index` Examples -------- Select without overwriting the old data. >>> db_selected = db.select_where_index_isin([0, 2, 4]) Select with overwriting the old data. >>> db.select_where_index_isin(index=[0, 2, 4], inplace=True) >>> df_selected = db.data """ db_ = self._get_db(inplace) if db_ is None: return None db_._data, _, selected_idx, _ = split_by_index(db_._data, index, **kwargs) if do_mask is True: db_._mask, _, _, _ = split_by_index(db_._mask, selected_idx, **kwargs) return self._return_db(db_, inplace)
[docs] def split_by_boolean_true(self, do_mask: bool = True, **kwargs: Any) -> tuple[DataBundle, DataBundle]: r""" Split :py:attr:`data` by rows where all column entries in :py:attr:`mask` are True. Parameters ---------- do_mask : bool, default: True If True also do selection on :py:attr:`mask`. \**kwargs : Any Additional keyword-arguments for splitting `data` where mask is False. Returns ------- tuple First :py:class:`~DataBundle` including rows where all column entries in :py:attr:`mask` are True. Second :py:class:`~DataBundle` including rows where all column entries in :py:attr:`mask` are False. See Also -------- DataBundle.split_by_boolean_false : Split `data` by rows where all entries in `mask` are False. DataBundle.split_by_column_entries : Split `data` by rows where column entries are in a specific value list. DataBundle.split_by_index : Split `data` by rows within specific index list. Notes ----- For more information see :py:func:`split_by_boolean_true` Examples -------- Split DataBundle. >>> db_true, db_false = db.split_by_boolean_true() """ db1_ = self.copy() db2_ = self.copy() _mask = _copy(db1_._mask) db1_._data, db2_._data, selected_idx, _ = split_by_boolean_true(db1_._data, _mask, return_rejected=True, **kwargs) if do_mask is True: db1_._mask, db2_._mask, _, _ = split_by_index(db1_._mask, selected_idx, return_rejected=True, **kwargs) return db1_, db2_
[docs] def split_by_boolean_false(self, do_mask: bool = True, **kwargs: Any) -> tuple[DataBundle, DataBundle]: r""" Split :py:attr:`data` by rows where all column entries in :py:attr:`mask` are False. Parameters ---------- do_mask : bool, default: True If True also do selection on :py:attr:`mask`. \**kwargs : Any Additional keyword-arguments for splitting `data` where mask is False. Returns ------- tuple First :py:class:`~DataBundle` including rows where all column entries in :py:attr:`mask` are False. Second :py:class:`~DataBundle` including rows where all column entries in :py:attr:`mask` are True. See Also -------- DataBundle.split_by_boolean_false : Split `data` by rows where all entries in `mask` are True. DataBundle.split_by_column_entries : Split `data` by rows where column entries are in a specific value list. DataBundle.split_by_index : Split `data` by rows within specific index list. Notes ----- For more information see :py:func:`split_by_boolean_false` Examples -------- Split DataBundle. >>> db_false, db_true = db.split_by_boolean_false() """ db1_ = self.copy() db2_ = self.copy() _mask = _copy(db1_._mask) db1_._data, db2_._data, selected_idx, _ = split_by_boolean_false(db1_._data, _mask, return_rejected=True, **kwargs) if do_mask is True: db1_._mask, db2_._mask, _, _ = split_by_index(db1_._mask, selected_idx, return_rejected=True, **kwargs) return db1_, db2_
[docs] def split_by_column_entries( self, selection: dict[str | tuple[str, str], Sequence[Any]], do_mask: bool = True, **kwargs: Any ) -> tuple[DataBundle, DataBundle]: r""" Split :py:attr:`data` by rows where column entries are in a specific value list. Parameters ---------- selection : dict Keys: Column names in :py:attr:`data`. Values: Specific value list. do_mask : bool, default: True If True also do selection on :py:attr:`mask`. \**kwargs : Any Additional keyword-arguments for splitting `data` by column entries. Returns ------- tuple First :py:class:`~DataBundle` including rows where column entries are in a specific value list. Second :py:class:`~DataBundle` including rows where column entries are not in a specific value list. See Also -------- DataBundle.split_by_index : Split `data` by rows within specific index list. DataBundle.split_by_boolean_true : Split `data` by rows where all entries in `mask` are True. DataBundle.split_by_boolean_false : Split `data` by rows where all entries in `mask` are False. Notes ----- For more information see :py:func:`split_by_column_entries` Examples -------- Split DataBundle. >>> db_isin, db_isnotin = db.split_by_column_entries( ... selection={("c1", "B1"): [26, 41]}, ... ) """ db1_ = self.copy() db2_ = self.copy() db1_._data, db2_._data, selected_idx, _ = split_by_column_entries(db1_._data, selection, return_rejected=True, **kwargs) if do_mask is True: db1_._mask, db2_._mask, _, _ = split_by_index(db1_._mask, selected_idx, return_rejected=True, **kwargs) return db1_, db2_
[docs] def split_by_index(self, index: list[int], do_mask: bool = True, **kwargs: Any) -> tuple[DataBundle, DataBundle]: r""" Split :py:attr:`data` by rows within specific index list. Parameters ---------- index : list of int Specific index list. do_mask : bool, default: True If True also do selection on :py:attr:`mask`. \**kwargs : Any Additional keyword-arguments for splitting `data` by index. Returns ------- tuple First :py:class:`~DataBundle` including rows within specific index list. Second :py:class:`~DataBundle` including rows outside specific index list. See Also -------- DataBundle.split_by_column_entries : Select columns from `data` with specific values. DataBundle.split_by_boolean_true : Split `data` by rows where all entries in `mask` are True. DataBundle.split_by_boolean_false : Split `data` by rows where all entries in `mask` are False. Notes ----- For more information see :py:func:`split_by_index` Examples -------- Split DataBundle. >>> db_isin, db_isnotin = db.split_by_index([0, 2, 4]) """ db1_ = self.copy() db2_ = self.copy() db1_._data, db2_._data, _, _ = split_by_index(db1_._data, index, return_rejected=True, **kwargs) if do_mask is True: db1_._mask, db2_._mask, _, _ = split_by_index(db1_._mask, index, return_rejected=True, **kwargs) return db1_, db2_
[docs] def unique(self, **kwargs: Any) -> dict[str | tuple[str, str], dict[Any, int]]: r""" Get unique values of :py:attr:`data`. Parameters ---------- \**kwargs : Any Additional keyword-arguments for getting unique values. Returns ------- dict Dictionary with unique values. Notes ----- For more information see :py:func:`unique` Examples -------- >>> db.unique(columns=("c1", "B1")) """ return count_by_cat(self._data, **kwargs) # type: ignore[no-any-return]
[docs] def replace_columns(self, df_corr: pd.DataFrame, subset: str | None = None, inplace: bool = False, **kwargs: Any) -> DataBundle | None: r""" Replace columns in :py:attr:`data`. Parameters ---------- df_corr : pd.DataFrame Data to be inplaced. subset : str or list of str, optional Select subset by columns. This option is useful for multi-indexed :py:attr:`data`. inplace : bool, default: False If True overwrite :py:attr:`data` in :py:class:`~DataBundle` else return a copy of :py:class:`~DataBundle` with replaced column names in :py:attr:`data`. \**kwargs : Any Additional keyword-arguments for replacing columns. Returns ------- :py:class:`~DataBundle` or None DataBundle with replaced column names or None if "inplace=True". Notes ----- For more information see :py:func:`replace_columns` Examples -------- >>> import pandas as pd >>> df_corr = pd.read_csv("correction_file_on_disk") >>> df_repl = db.replace_columns(df_corr) """ if not isinstance(self._data, (pd.DataFrame, pd.Series)): raise TypeError("Data must be a pd.DataFrame or pd.Series, not a {type(self._data)}.") db_ = self._get_db(inplace) if db_ is None: return None if subset is None: db_._data = replace_columns(df_l=db_._data, df_r=df_corr, **kwargs) else: db_._data[subset] = replace_columns(df_l=db_._data[subset], df_r=df_corr, **kwargs) db_._columns = db_._data.columns return self._return_db(db_, inplace)
[docs] def correct_datetime(self, imodel: str | None = None, inplace: bool = False, **kwargs: Any) -> DataBundle | None: r""" Correct datetime information in :py:attr:`data`. Parameters ---------- imodel : str, optional Name of the MFD/CDM data model. inplace : bool, default: False If True overwrite :py:attr:`data` in :py:class:`~DataBundle` else return a copy of :py:class:`~DataBundle` with datetime-corrected values in :py:attr:`data`. \**kwargs : Any Additional keyword-arguments for correcting datetime. Returns ------- :py:class:`~DataBundle` or None DataBundle with corrected datetime information or None if "inplace=True". See Also -------- DataBundle.correct_pt : Correct platform type information in `data`. DataBundle.validate_datetime: Validate datetime information in `data`. DataBundle.validate_id : Validate station id information in `data`. Notes ----- For more information see :py:func:`correct_datetime` Examples -------- >>> df_dt = db.correct_datetime() """ imodel = imodel or self._imodel db_ = self._get_db(inplace) if db_ is None: return None db_._data = correct_datetime(db_._data, imodel, **kwargs) return self._return_db(db_, inplace)
[docs] def validate_datetime(self, imodel: str | None = None, **kwargs: Any) -> pd.DataFrame: r""" Validate datetime information in :py:attr:`data`. Parameters ---------- imodel : str, optional Name of the MFD/CDM data model. \**kwargs : Any Additional keyword-arguments for validating datetime. Returns ------- pd.DataFrame DataFrame containing True and False values for each index in :py:attr:`data`. True: All datetime information in :py:attr:`data` row are valid. False: At least one datetime information in :py:attr:`data` row is invalid. See Also -------- DataBundle.validate_id : Validate station id information in `data`. DataBundle.correct_datetime : Correct datetime information in `data`. DataBundle.correct_pt : Correct platform type information in `data`. Notes ----- For more information see :py:func:`validate_datetime` Examples -------- >>> val_dt = db.validate_datetime() """ imodel = imodel or self._imodel return validate_datetime(self._data, imodel, **kwargs)
[docs] def correct_pt(self, imodel: str | None = None, inplace: bool = False, **kwargs: Any) -> DataBundle | None: r""" Correct platform type information in :py:attr:`data`. Parameters ---------- imodel : str, optional Name of the MFD/CDM data model. inplace : bool, default: True If True overwrite :py:attr:`data` in :py:class:`~DataBundle` else return a copy of :py:class:`~DataBundle` with platform-corrected values in :py:attr:`data`. \**kwargs : Any Additional keyword-arguments for correcting platform type. Returns ------- :py:class:`~DataBundle` or None DataBundle with corrected platform type information or None if "inplace=True". See Also -------- DataBundle.correct_datetime : Correct datetime information in `data`. DataBundle.validate_id : Validate station id information in `data`. DataBundle.validate_datetime : Validate datetime information in `data`. Notes ----- For more information see :py:func:`correct_pt` Examples -------- >>> df_pt = db.correct_pt() """ imodel = imodel or self._imodel db_ = self._get_db(inplace) if db_ is None: return None db_._data = correct_pt(db_._data, imodel, **kwargs) return self._return_db(db_, inplace)
[docs] def validate_id(self, imodel: str | None = None, **kwargs: Any) -> pd.DataFrame: r""" Validate station id information in :py:attr:`data`. Parameters ---------- imodel : str, optional Name of the MFD/CDM data model. \**kwargs : Any Additional keyword-arguments for validating station id. Returns ------- pd.DataFrame DataFrame containing True and False values for each index in :py:attr:`data`. True: All station ID information in :py:attr:`data` row are valid. False: At least one station ID information in :py:attr:`data` row is invalid. See Also -------- DataBundle.validate_datetime : Validate datetime information in `data`. DataBundle.correct_pt : Correct platform type information in `data`. DataBundle.correct_datetime : Correct datetime information in `data`. Notes ----- For more information see :py:func:`validate_id` Examples -------- >>> val_dt = db.validate_id() """ imodel = imodel or self._imodel return validate_id(self._data, imodel, **kwargs)
[docs] def map_model(self, imodel: str | None = None, inplace: bool = False, **kwargs: Any) -> DataBundle | None: r""" Map :py:attr:`data` to the Common Data Model. Parameters ---------- imodel : str, optional Name of the MFD/CDM data model. inplace : bool, default: False If True overwrite :py:attr:`data` in :py:class:`~DataBundle` else return a copy of :py:class:`~DataBundle` with :py:attr:`data` as CDM tables. \**kwargs : Any Additional keyword-arguments for mapping to CDM. Returns ------- :py:class:`~DataBundle` or None DataBundle containing :py:attr:`data` mapped to the CDM or None if ``inplace=True``. Notes ----- For more information see :py:func:`map_model` Examples -------- >>> cdm_tables = db.map_model() """ imodel = imodel or self._imodel db_ = self._get_db(inplace) if db_ is None: return None _tables = map_model(db_._data, imodel, **kwargs) db_._mode = "tables" db_._columns = _tables.columns db_._data = _tables return self._return_db(db_, inplace)
[docs] def write( self, dtypes: dict[str | tuple[str, str], str | type] | None = None, parse_dates: list[str | tuple[str, str]] | bool | None = None, encoding: str | None = None, mode: Literal["data", "tables"] | None = None, **kwargs: Any, ) -> None: r""" Write :py:attr:`data` on disk. Parameters ---------- dtypes : dict, optional Data types of `data`. parse_dates : list or bool, optional Information how to parse dates on `data`. encoding : str, optional The encoding of the input file. Overrides the value in the imodel schema file. mode : {data, tables}, optional Data mode. \**kwargs : Any Additional keword-arguments for writing data in disk. See Also -------- write_data : Write MDF data and validation mask to disk. write_tables: Write CDM tables to disk. read: Read original marine-meteorological data as well as MDF data or CDM tables from disk. read_data: Read MDF data and validation mask from disk. read_mdf : Read original marine-meteorological data from disk. Notes ----- If :py:attr:`mode` is "data" write data using :py:func:`write_data`. If :py:attr:`mode` is "tables" write data using :py:func:`write_tables`. Examples -------- >>> db.write() read_tables : Read CDM tables from disk. """ dtypes = dtypes or self._dtypes parse_dates = parse_dates or self._parse_dates encoding = encoding or self._encoding mode = mode or self._mode write( data=self._data, mask=self._mask, dtypes=dtypes, parse_dates=parse_dates, encoding=encoding, mode=mode, **kwargs, )
[docs] def duplicate_check(self, inplace: bool = False, **kwargs: Any) -> DataBundle | None: r""" Duplicate check in :py:attr:`data`. Parameters ---------- inplace : bool, default: False If True overwrite :py:attr:`data` in :py:class:`~DataBundle` else return a copy of :py:class:`~DataBundle` with :py:attr:`data` as CDM tables. \**kwargs : Any Additional keyword-arguments for duplicate check. Returns ------- :py:class:`~DataBundle` or None DataBundle containing new :py:class:`~DupDetect` class for further duplicate check methods or None if "inplace=True". See Also -------- DataBundle.get_duplicates : Get duplicate matches in `data`. DataBundle.flag_duplicates : Flag detected duplicates in `data`. DataBundle.remove_duplicates : Remove detected duplicates in `data`. Notes ----- Following columns have to be provided: * `longitude` * `latitude` * `primary_station_id` * `report_timestamp` * `station_course` * `station_speed` This adds a new class :py:class:`~DupDetect` to :py:class:`~DataBundle`. This class is necessary for further duplicate check methods. For more information see :py:func:`duplicate_check` Examples -------- >>> db.duplicate_check() """ db_ = self._get_db(inplace) if db_ is None: return None if db_._mode == "tables" and "header" in db_._data: data = db_._data["header"] else: data = db_._data db_.DupDetect = duplicate_check(data, **kwargs) return self._return_db(db_, inplace)
[docs] def flag_duplicates(self, inplace: bool = False, **kwargs: Any) -> DataBundle | None: r""" Flag detected duplicates in :py:attr:`data`. Parameters ---------- inplace : bool, default: False If True overwrite :py:attr:`data` in :py:class:`~DataBundle` else return a copy of :py:class:`~DataBundle` with :py:attr:`data` containing flagged duplicates. \**kwargs : Any Additional keyword-arguments for flagging duplicates. Returns ------- :py:class:`~DataBundle` or None DataBundle containing duplicate flags in :py:attr:`data` or None if "inplace=True". Raises ------ RuntimeError Before flagging duplicates, a duplictate check has to be done, :py:func:`DataBundle.duplicate_check`. See Also -------- DataBundle.remove_duplicates : Remove detected duplicates in `data`. DataBundle.get_duplicates : Get duplicate matches in `data`. DataBundle.duplicate_check : Duplicate check in `data`. Notes ----- For more information see :py:func:`DupDetect.flag_duplicates` Examples -------- Flag duplicates without overwriting :py:attr:`data`. >>> flagged_tables = db.flag_duplicates() Flag duplicates with overwriting :py:attr:`data`. >>> db.flag_duplicates(inplace=True) >>> flagged_tables = db.data """ db_ = self._get_db(inplace) if db_ is None: return None if db_.DupDetect is None: raise RuntimeError("Before flagging duplicates, a duplictate check has to be done: 'db.duplicate_check()'") db_.DupDetect.flag_duplicates(**kwargs) if db_._mode == "tables" and "header" in db_._data: db_._data["header"] = db_.DupDetect.result else: db_._data = db_.DupDetect.result return self._return_db(db_, inplace)
[docs] def get_duplicates(self, **kwargs: Any) -> pd.DataFrame: r""" Get duplicate matches in :py:attr:`data`. Parameters ---------- \**kwargs : Any Additional keyword-arguments used for getting duplicates. Returns ------- pd.DataFrame DataFrame containing duplicate matches. Raises ------ RuntimeError Before getting duplicates, a duplictate check has to be done, :py:func:`DataBundle.duplicate_check`. See Also -------- DataBundle.remove_duplicates : Remove detected duplicates in `data`. DataBundle.flag_duplicates : Flag detected duplicates in `data`. DataBundle.duplicate_check : Duplicate check in `data`. Notes ----- For more information see :py:func:`DupDetect.get_duplicates` Examples -------- >>> matches = db.get_duplicates() """ if self.DupDetect is None: raise RuntimeError("Before getting duplicates, a duplictate check has to be done: 'db.duplicate_check()'") return self.DupDetect.get_duplicates(**kwargs)
[docs] def remove_duplicates(self, inplace: bool = False, **kwargs: Any) -> DataBundle | None: r""" Remove detected duplicates in :py:attr:`data`. Parameters ---------- inplace : bool, default: False If True overwrite :py:attr:`data` in :py:class:`~DataBundle` else return a copy of :py:class:`~DataBundle` with :py:attr:`data` containing no duplicates. \**kwargs : Any Additional keyword-arguments used to remove duplicates. Returns ------- :py:class:`~DataBundle` or None DataBundle without duplicated rows or None if "inplace=True". Raises ------ RuntimeError Before removing duplicates, a duplictate check has to be done, :py:func:`DataBundle.duplicate_check`. See Also -------- DataBundle.flag_duplicates : Flag detected duplicates in `data`. DataBundle.get_duplicates : Get duplicate matches in `data`. DataBundle.duplicate_check : Duplicate check in `data`. Notes ----- For more information see :py:func:`DupDetect.remove_duplicates` Examples -------- Remove duplicates without overwriting :py:attr:`data`. >>> removed_tables = db.remove_duplicates() Remove duplicates with overwriting :py:attr:`data`. >>> db.remove_duplicates(inplace=True) >>> removed_tables = db.data """ db_ = self._get_db(inplace) if db_ is None: return None if db_.DupDetect is None: raise RuntimeError("Before removing duplicates, a duplictate check has to be done: 'db.duplicate_check()'") db_.DupDetect.remove_duplicates(**kwargs) header_ = db_.DupDetect.result if not isinstance(db_._data, pd.DataFrame): raise TypeError("data has unsupported type: {type(db_._data)}.") db_._data = db_._data[db_._data.index.isin(header_.index)] return self._return_db(db_, inplace)