Source code for cdm_reader_mapper.common.iterators

"""Utilities for handling pandas TextParser objects safely."""

from __future__ import annotations
import inspect
import itertools
from collections.abc import Callable, Generator, Iterable, Iterator, Sequence
from functools import wraps
from pathlib import Path
from tempfile import TemporaryDirectory
from types import TracebackType
from typing import (
    Any,
    Literal,
)

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import xarray as xr


[docs] class ProcessFunction: r""" Stores data and a callable function with optional arguments for processing. Parameters ---------- data : pd.DataFrame, pd.Series, Iterable of pd.DataFrame or Iterable of pd.Series Input data to be processed. func : Callable A callable that will be applied to `data`. func_args : Any, list of Any or tuple of Any, optional Positional arguments to pass to `func`. func_kwargs : dict, optional Keyword arguments to pass to `func`. \**kwargs : Any Additional metadata or configuration parameters stored with the instance. """ def __init__( self, data: pd.DataFrame | pd.Series | Iterable[pd.DataFrame] | Iterable[pd.Series], func: Callable[..., Any], func_args: Any | list[Any] | tuple[Any] | None = None, func_kwargs: dict[str, Any] | None = None, **kwargs: Any, ): r""" Initialize a ProcessFunction instance. Parameters ---------- data : pd.DataFrame, pd.Series, Iterable of pd.DataFrame or Iterable of pd.Series Input data to be processed. func : Callable A callable that will be applied to `data`. func_args : Any, list of Any or tuple of Any, optional Positional arguments to pass to `func`. func_kwargs : dict, optional Keyword arguments to pass to `func`. \**kwargs : Any Additional metadata or configuration parameters stored with the instance. Raises ------ ValueError If `func` is not callable. """ self.data = data if not callable(func): raise ValueError(f"Function {func} is not callable.") self.func = func if func_args is None: func_args = () if not isinstance(func_args, (list, tuple)): func_args = (func_args,) self.func_args = func_args if func_kwargs is None: func_kwargs = {} self.func_kwargs = func_kwargs self.kwargs = kwargs
[docs] class ParquetStreamReader: """ A wrapper that mimics pandas.io.parsers.TextFileReader. Parameters ---------- source : iterable or iterator or callable Data source yielding ``pandas.DataFrame`` or ``pandas.Series`` objects. If a callable is provided, it must return a fresh iterator each time it is called (useful for copying/resetting the stream). Attributes ---------- columns : list or pandas.Index Column labels inferred from the first chunk. dtypes : dict or pandas.Series Data types inferred from the first chunk. attrs : dict User-defined metadata associated with the stream. Notes ----- - The stream is consumed as it is iterated. - Use `copy()` to create an independent iterator. - Some operations (e.g., `read()`) load all data into memory and may not be suitable for large datasets. """ def __init__( self, source: ( list[pd.DataFrame | pd.Series] | tuple[pd.DataFrame | pd.Series] | Iterator[pd.DataFrame | pd.Series] | Callable[[], Iterator[pd.DataFrame | pd.Series]] ), ): """ Initialize a ParquetStreamReader instance. Parameters ---------- source : iterable or iterator or callable Data source yielding ``pandas.DataFrame`` or ``pandas.Series`` objects. If a callable is provided, it must return a fresh iterator each time it is called (useful for copying/resetting the stream). Raises ------ TypeError If ``source`` is not an iterator, iterable, or callable returning an iterator. """ self._closed = False self._buffer: list[pd.DataFrame | pd.Series] = [] if isinstance(source, (tuple, list)): source = iter(source) if callable(source): # factory that produces a fresh iterator self._factory = source elif isinstance(source, Iterator): self._factory = lambda: source else: raise TypeError("ParquetStreamReader expects an iterator or a factory callable.") self._generator = self._factory() try: first = next(self._generator) if isinstance(first, pd.DataFrame): self.columns = first.columns self.dtypes = first.dtypes elif isinstance(first, pd.Series): self.columns = [first.name] self.dtypes = {first.name: first.dtype} self.prepend(first) except StopIteration: self.columns = [] self.dtypes = {} self.attrs: dict[str, Any] = {} def __iter__(self) -> Iterator[pd.DataFrame | pd.Series]: """ Return the iterator interface. Returns ------- Iterator[pd.DataFrame | pd.Series] The stream itself. """ return self def __next__(self) -> pd.DataFrame | pd.Series: """ Return the next chunk from the stream. Returns ------- pandas.DataFrame or pandas.Series The next chunk of data. Raises ------ StopIteration If no more data is available. ValueError If the stream has been closed. """ if self._closed: raise ValueError("I/O operation on closed stream.") if self._buffer: return self._buffer.pop(0) return next(self._generator) def __getitem__(self, item: str) -> Any: """ Retrieve a value from the stream's metadata. Parameters ---------- item : str Key in the `attrs` dictionary. Returns ------- Any The stored metadata value. """ return self.attrs[item] def __setitem__(self, item: str, value: Any) -> None: """ Set a metadata attribute on the stream. Parameters ---------- item : str Attribute name. value : Any Value to assign. """ if item in self.attrs: setattr(self.attrs, item, value) else: raise TypeError("'ParquetStreamReader' object does not support item assignment.")
[docs] def prepend(self, chunk: pd.DataFrame | pd.Series) -> None: """ Push a chunk back onto the front of the stream. Useful for peeking at the first chunk without losing it. Parameters ---------- chunk : pandas.DataFrame or pandas.Series The chunk to prepend. """ # Insert at 0 ensures FIFO order (peeking logic) self._buffer.insert(0, chunk)
[docs] def get_chunk(self) -> pd.DataFrame | pd.Series: """ Return the next available chunk. This is equivalent to calling ``next(reader)`` and is provided for API compatibility with pandas readers. Returns ------- pandas.DataFrame or pandas.Series The next chunk of data. """ return next(self)
[docs] def read(self) -> pd.DataFrame: """ Read all remaining data into a single DataFrame. This consumes the entire stream and concatenates all remaining chunks into one DataFrame. Returns ------- pandas.DataFrame Concatenated result of all remaining chunks. Returns an empty DataFrame if the stream is exhausted. Warnings -------- This operation loads all data into memory and may not be suitable for large datasets. """ # Consume the entire rest of the stream chunks = list(self) if not chunks: return pd.DataFrame() return pd.concat(chunks)
[docs] def copy(self) -> ParquetStreamReader: """ Create an independent copy of the stream. Returns ------- ParquetStreamReader A new stream reader with independent iteration state. Raises ------ ValueError If the stream has been closed. """ if self._closed: raise ValueError("Cannot copy a closed stream.") self._generator, new_gen = itertools.tee(self._generator) copy_stream = self.__class__.__new__(self.__class__) # Manually copy all internal state copy_stream._closed = self._closed copy_stream._buffer = self._buffer.copy() copy_stream._generator = new_gen copy_stream._factory = self._factory copy_stream.columns = self.columns.copy() copy_stream.dtypes = self.dtypes.copy() copy_stream.attrs = self.attrs.copy() return copy_stream
@property def empty(self) -> bool: """ Check whether the stream has any remaining data. Returns ------- bool True if the stream is exhausted, False otherwise. Notes ----- This method creates a temporary copy of the stream to check for remaining elements without consuming the original. """ copy_stream = self.copy() try: next(copy_stream) return False except StopIteration: return True
[docs] def reset_index(self, drop: bool = False) -> ParquetStreamReader: """ Reset the index across all chunks to a continuous range. Parameters ---------- drop : bool, default False If True, do not insert the old index as a column. If False, the new index is also inserted as a column named "index". Returns ------- ParquetStreamReader A new stream reader with reindexed chunks. Raises ------ ValueError If the stream has been closed. """ if self._closed: raise ValueError("Cannot copy a closed stream.") offset = 0 chunks: list[pd.DataFrame] = [] for df in self: df = df.copy() n = len(df) indexes = range(offset, offset + n) df.index = indexes if drop is False: df.insert(0, "index", indexes) offset += n chunks.append(df) return ParquetStreamReader(lambda: iter(chunks))
[docs] def close(self) -> None: """Close the stream and release resources.""" self._closed = True
def __enter__(self) -> ParquetStreamReader: """ Enter the runtime context for use in a ``with`` statement. Returns ------- ParquetStreamReader The stream instance. """ return self def __exit__(self, _exc_type: type | None, _exc_val: BaseException | None, _exc_tb: TracebackType | None) -> None: """ Exit the runtime context and close the stream. Parameters ---------- _exc_type : type or None The type of the exception raised within the context, if any. `None` if no exception occurred. _exc_val : BaseException or None The exception instance raised within the context, if any. `None` if no exception occurred. _exc_tb : TracebackType or None The traceback associated with the exception, if any. `None` if no exception occurred. """ self.close()
def _sort_chunk_outputs( outputs: tuple[Any, ...], capture_meta: bool, requested_types: tuple[type, ...] ) -> tuple[list[pd.DataFrame | pd.Series], list[Any]]: """ Separate DataFrames from metadata in the function output. Parameters ---------- outputs : tuple of Any Tuple of objects returned by a processing function. capture_meta : bool If True, non-data outputs are collected as metadata. If False, they are ignored. requested_types : tuple of tuple Types that should be considered as valid data objects (e.g., `pd.DataFrame`, `pd.Series`). Returns ------- tuple of list of pd.DataFrame or pd.Series and list of Any A tuple containing: - A list of extracted data objects (flattened if nested in lists) - A list of metadata objects (empty if `capture_meta` is False) """ data, meta = [], [] for out in outputs: if isinstance(out, requested_types): data.append(out) elif isinstance(out, list) and out and isinstance(out[0], requested_types): data.extend(out) elif capture_meta: # Only capture metadata from the first chunk meta.append(out) return data, meta def _initialize_storage( first_batch: list[pd.DataFrame | pd.Series], ) -> tuple[list[TemporaryDirectory[str]], list[tuple[type, Any]]]: """ Create temp directories and captures schemas from the first chunk. Parameters ---------- first_batch : list of pandas.DataFrame or pandas.Series The first batch of data objects used to initialize storage. Each element determines the schema and corresponding temporary storage. Returns ------- tuple of list of TemporaryDirectory of str and list of tuple of type and Any A tuple containing: - A list of temporary directories, one for each object in `first_batch`. - A list of schema descriptors, where each entry is a tuple of: (object type, schema information). For DataFrames, the schema is the columns; for Series, it is the name. Raises ------ TypeError If an element in `first_batch` is not a pandas DataFrame or Series. """ temp_dirs: list[TemporaryDirectory[str]] = [] schemas: list[tuple[type, Any]] = [] for obj in first_batch: if isinstance(obj, pd.DataFrame): schemas.append((pd.DataFrame, obj.columns)) elif isinstance(obj, pd.Series): schemas.append((pd.Series, obj.name)) else: raise TypeError(f"Unsupported data type: {type(obj)}.Use one of [pd.DataFrame, pd.Series].") temp_dirs.append(TemporaryDirectory()) return temp_dirs, schemas def _write_chunks_to_disk( batch: list[pd.DataFrame | pd.Series], temp_dirs: list[TemporaryDirectory[str]], chunk_counter: int, ) -> None: """ Write the current batch of DataFrames to their respective temp directories. Parameters ---------- batch : list of pandas.DataFrame or pandas.Series A batch of data objects to be written to disk. Series objects are converted to single-column DataFrames before writing. temp_dirs : list of TemporaryDirectory of str Temporary directories corresponding to each element in `batch`. Each batch item is written into its matching directory. chunk_counter : int Sequential counter used to generate unique filenames for each chunk written to disk. """ for i, data_out in enumerate(batch): if isinstance(data_out, pd.Series): data_out = data_out.to_frame() file_path = Path(temp_dirs[i].name) / f"part_{chunk_counter:05d}.parquet" table = pa.Table.from_pandas(data_out, preserve_index=True) pq.write_table(table, file_path, compression="snappy") def _parquet_generator(temp_dir: TemporaryDirectory[str], data_type: type, schema: str | None) -> Generator[pd.DataFrame | pd.Series]: """ Yield DataFrames from a temp directory, restoring schema. Parameters ---------- temp_dir : TemporaryDirectory of str Temporary directory containing Parquet chunk files to be read. The directory is cleaned up after iteration completes (even if an exception occurs). data_type : type Expected output type for each chunk. If ``pd.Series``, each Parquet file is converted to a Series; otherwise, a DataFrame is returned. schema : str or None Metadata used to restore Series structure. When ``data_type`` is ``pd.Series``, this is used as the Series name. Returns ------- Generator of pd.DataFrame or pd.Series A generator yielding reconstructed DataFrames or Series objects from the Parquet files stored in `temp_dir`. """ try: files = sorted(Path(temp_dir.name).glob("*.parquet")) for f in files: df = pd.read_parquet(f) if data_type is pd.Series: s = df.iloc[:, 0].copy() s.name = schema yield s else: yield df finally: temp_dir.cleanup() def _validate_chunk(items: tuple[Any, ...], requested_types: tuple[type, ...]) -> None: """ Validate that the current chunk contains supported data types. Parameters ---------- items : tuple of Any A tuple of synchronized items (one per reader) for the current chunk. requested_types : tuple of type Requested data types (e.g., pd.DataFrame, pd.Series). Raises ------ TypeError If the first item in the chunk is not of a supported type. """ if not isinstance(items[0], requested_types): raise TypeError(f"Unsupported data type: {type(items[0])}, expected {requested_types}") def _process_result( result: Any, requested_types: tuple[type, ...], non_data_output: str, chunk_counter: int ) -> tuple[list[pd.DataFrame | pd.Series], list[Any]]: """ Normalize and split a function result into data and metadata outputs. Parameters ---------- result : Any Output returned by the user-provided processing function. requested_types : tuple of type Types considered valid data outputs. non_data_output : str Mode controlling metadata capture (e.g., "acc" for accumulate). chunk_counter : int Current chunk index (used to determine metadata capture behavior). Returns ------- tuple of list of pd.DataFrame or pd.Series and list of Any A tuple containing: - A list of extracted data objects (flattened if nested in lists) - A list of metadata objects (empty if `capture_meta` is False) """ if not isinstance(result, tuple): result = (result,) capture_meta = non_data_output == "acc" or chunk_counter == 0 return _sort_chunk_outputs(result, capture_meta, requested_types) def _accumulate_meta(store: dict[int, list[Any]], meta: Iterable[Any]) -> None: """ Accumulate metadata outputs across chunks. Parameters ---------- store : dict of int to list of Any Dictionary mapping metadata index to collected values across chunks. meta : iterable of Any Metadata values extracted from the current chunk. """ for i, m in enumerate(meta): store.setdefault(i, []).append(m) def _handle_data_write( data: list[Any], temp_dirs: list[TemporaryDirectory[str]] | None, schemas: list[tuple[type, Any]] | None, chunk_counter: int, ) -> tuple[list[TemporaryDirectory[str]], list[tuple[type, Any]]]: """ Initialize storage (if needed) and write data chunks to disk. Parameters ---------- data : list of Any Data outputs for the current chunk (e.g., DataFrames). temp_dirs : list of TemporaryDirectory or None Temporary directories used for storing chunked parquet files. schemas : list of tuple of type and Any or None Schema information associated with each data stream. chunk_counter : int Index of the current chunk (used for file naming/order). Returns ------- tuple of list[TemporaryDirectory[str]], list[tuple[type, Any]] Initialized or updated temporary directories and schemas. """ if temp_dirs is None or schemas is None: temp_dirs, schemas = _initialize_storage(data) _write_chunks_to_disk(data, temp_dirs, chunk_counter) return temp_dirs, schemas def _finalize_non_data( store: dict[int, list[Any]], proc: Callable[..., Any] | None, args: tuple[Any, ...] | None, kwargs: dict[str, Any] | None, ) -> Any: """ Finalize aggregated non-data (metadata) outputs. Parameters ---------- store : dict of int to list of Any Collected metadata grouped by output position. proc : Callable or None Optional post-processing function applied to aggregated metadata. args : tuple of Any or None Positional arguments for the post-processing function. kwargs : dict of str to Any or None Keyword arguments for the post-processing function. Returns ------- Any Final processed metadata output. May be a single value, list, or dict. """ if len(store) == 1: output: Any = next(iter(store.values())) else: output = store if callable(proc): output = proc(output, *(args or ()), **(kwargs or {})) if isinstance(output, list) and len(output) == 1: return output[0] return output def _build_output( temp_dirs: list[TemporaryDirectory[str]], schemas: list[tuple[type, Any]], output_non_data: Any, ) -> tuple[ParquetStreamReader, ...] | tuple[Any, ...] | Any: """ Construct final output combining data stream readers and metadata. Parameters ---------- temp_dirs : list of TemporaryDirectory Temporary directories containing written parquet chunks. schemas : list of tuple[type, Any] Schema definitions corresponding to each data stream. output_non_data : Any Final processed metadata output. Returns ------- tuple of ParquetStreamReader or tuple of Any Tuple containing: - ParquetStreamReader objects (one per data stream) - Followed by metadata outputs """ def _make_parquet_reader(d: TemporaryDirectory[str], t: type, s: str | None) -> ParquetStreamReader: """ Create a ParquetStreamReader bound to a temporary directory and schema. Parameters ---------- d : TemporaryDirectory Temporary directory for saving parquet files. t : type Type of data to be saved. s : str or None Metadata used to restore Series structure. Returns ------- ParquetStreamReader Data as ParquetStreamReader to save. """ return ParquetStreamReader(lambda: _parquet_generator(d, t, s)) final_iterators: list[ParquetStreamReader] = [_make_parquet_reader(d, t, s) for d, (t, s) in zip(temp_dirs, schemas, strict=True)] if not isinstance(output_non_data, tuple): output_non_data = [output_non_data] else: output_non_data = list(output_non_data) return tuple(final_iterators + output_non_data) def _process_chunks( readers: list[ParquetStreamReader], func: Callable[..., Any], requested_types: tuple[type, ...], static_args: list[Any], static_kwargs: dict[str, Any], non_data_output: str, non_data_proc: Callable[..., Any] | None, non_data_proc_args: tuple[Any] | None, non_data_proc_kwargs: dict[str, Any] | None, ) -> ( tuple[ParquetStreamReader, ...] # when data is produced | tuple[Any, ...] # non-data outputs that are a tuple | Any # single value or list (no data produced) ): """ Process chunks. Parameters ---------- readers : list of ParquetStreamReader Input stream readers providing chunked data to be processed in lockstep. func : Callable Function applied to each synchronized set of chunks from ``readers``. requested_types : tuple of type Types considered valid data outputs (e.g., pd.DataFrame, pd.Series). Used to separate data from metadata returned by `func`. static_args : list of Any Additional positional arguments passed unchanged to `func`. static_kwargs : dict Additional keyword arguments passed unchanged to `func`. non_data_output : str Controls how non-data outputs (metadata) are aggregated across chunks. Typically determines whether they are accumulated or processed differently. non_data_proc : Callable or None Optional function applied to aggregated non-data outputs after processing. non_data_proc_args : tuple of Any or None Positional arguments for `non_data_proc`. non_data_proc_kwargs : dict or None Keyword arguments for `non_data_proc`. Returns ------- tuple of ParquetStreamReader or tuple of Any or Any If data outputs are produced: A tuple containing ParquetStreamReader objects (one per data stream) followed by processed non-data outputs. If no data outputs are produced: The aggregated non-data output, which may be a single value, list, or tuple depending on processing configuration. Raises ------ TypeError If input chunks are not of the expected `requested_types`. ValueError If the input iterables are empty or schema initialization fails. """ # State variables temp_dirs: list[TemporaryDirectory[str]] | None = None schemas: list[tuple[type, Any]] | None = None output_non_data_dict: dict[int, list[Any]] = {} chunk_counter: int = 0 for items in zip(*readers, strict=True): _validate_chunk(items, requested_types) result = func(*items, *static_args, **static_kwargs) data, meta = _process_result(result, requested_types, non_data_output, chunk_counter) _accumulate_meta(output_non_data_dict, meta) if data: temp_dirs, schemas = _handle_data_write(data, temp_dirs, schemas, chunk_counter) chunk_counter += 1 if chunk_counter == 0: raise ValueError("Iterable is empty.") output_non_data = _finalize_non_data( output_non_data_dict, non_data_proc, non_data_proc_args, non_data_proc_kwargs, ) if temp_dirs is None: return output_non_data if schemas is None: raise ValueError("Could not set schemas.") return _build_output(temp_dirs, schemas, output_non_data) def _prepare_readers( reader: Iterator[pd.DataFrame | pd.Series], func_args: Sequence[Any], func_kwargs: dict[str, Any], makecopy: bool, ) -> tuple[list[ParquetStreamReader], list[Any], dict[str, Any]]: """ Prepare readers for chunking. Parameters ---------- reader : Iterator of pandas.DataFrame or pandas.Series Primary input iterator to be converted into a ParquetStreamReader. func_args : Sequence of Any Positional arguments for the processing function. Any elements that can be converted into ParquetStreamReader instances are treated as additional data streams; others are passed through as static values. func_kwargs : dict Keyword arguments for the processing function. Values that can be converted into ParquetStreamReader instances are treated as streams; others are treated as static keyword arguments. makecopy : bool If True, all constructed ParquetStreamReader objects are copied so that iteration is independent across consumers. Returns ------- tuple of list of ParquetStreamReader or list of Any or dict A tuple containing: - A list of ParquetStreamReader objects used for chunked iteration (including the primary reader and any detected in args/kwargs). - A list of non-stream positional arguments to be passed to the processing function. - A dictionary of non-stream keyword arguments to be passed to the processing function. Raises ------ TypeError If the primary reader cannot be converted into a ParquetStreamReader. """ reader = ensure_parquet_reader(reader) if not isinstance(reader, ParquetStreamReader): raise TypeError(f"reader is not a ParquetStreamReader: {type(reader)}") args_reader: list[ParquetStreamReader] = [] args: list[Any] = [] for arg in func_args: converted = ensure_parquet_reader(arg) if isinstance(converted, ParquetStreamReader): args_reader.append(converted) else: args.append(converted) kwargs = {} for k, v in func_kwargs.items(): converted = ensure_parquet_reader(v) if isinstance(converted, ParquetStreamReader): args_reader.append(converted) else: kwargs[k] = converted readers = [reader] + args_reader if makecopy: readers = [r.copy() for r in readers] return readers, args, kwargs
[docs] def parquet_stream_from_iterable( iterable: Iterable[pd.DataFrame | pd.Series], ) -> ParquetStreamReader: """ Stream an iterable of DataFrame/Series to parquet and return a disk-backed ParquetStreamReader. Memory usage remains constant. Parameters ---------- iterable : Iterable pf pd.DataFrame or pd.Series An iterable of pandas DataFrame or Series objects to be streamed to disk. Returns ------- ParquetStreamReader A disk-backed stream reader that lazily reads the provided iterable from Parquet files stored in a temporary directory. Raises ------ ValueError If the input iterable is empty. TypeError If elements in the iterable are not pandas DataFrame or Series objects, or if mixed types are provided across chunks. """ iterator = iter(iterable) try: first = next(iterator) except StopIteration as err: raise ValueError("Iterable is empty.") from err if not isinstance(first, (pd.DataFrame, pd.Series)): raise TypeError("Iterable must contain pd.DataFrame or pd.Series objects.") temp_dir = TemporaryDirectory() temp_dirs = [temp_dir] if isinstance(first, pd.DataFrame): data_type = pd.DataFrame schema = first.columns else: data_type = pd.Series schema = first.name _write_chunks_to_disk([first], temp_dirs, chunk_counter=0) for idx, chunk in enumerate(iterator, start=1): if not isinstance(chunk, type(first)): raise TypeError("All chunks must be of the same type.") _write_chunks_to_disk([chunk], temp_dirs, chunk_counter=idx) return ParquetStreamReader(lambda: _parquet_generator(temp_dir, data_type, schema))
[docs] def is_valid_iterator(reader: Any) -> bool: """ Check if reader is a valid Iterable. Parameters ---------- reader : Any Object to be checked for iterator compatibility. Returns ------- bool True if `reader` is an instance of `Iterator`, otherwise False. """ return isinstance(reader, Iterator)
[docs] def ensure_parquet_reader(obj: Any) -> Any: """ Ensure obj is a ParquetStreamReader. Parameters ---------- obj : Any Object that may represent a ParquetStreamReader, an iterator of pd.DataFrame or pd.Series objects, or a static value. Returns ------- Any If `obj` is already a ParquetStreamReader, it is returned unchanged. If `obj` is an iterator, it is converted into a ParquetStreamReader. Otherwise, `obj` is returned as-is (treated as a static value). """ if isinstance(obj, ParquetStreamReader): return obj if is_valid_iterator(obj): return parquet_stream_from_iterable(obj) return obj
[docs] def process_disk_backed( reader: Iterator[pd.DataFrame | pd.Series], func: Callable[..., Any], func_args: tuple[Any, ...] | None = None, func_kwargs: dict[str, Any] | None = None, requested_types: type | tuple[type, ...] = (pd.DataFrame, pd.Series), non_data_output: Literal["first", "acc"] = "first", non_data_proc: Callable[..., Any] | None = None, non_data_proc_args: tuple[Any, ...] | None = None, non_data_proc_kwargs: dict[str, Any] | None = None, makecopy: bool = True, ) -> tuple[Any, ...] | None: """ Consume a stream of DataFrames, processes them, and returns a tuple of results. DataFrames are cached to disk (Parquet) and returned as generators. Parameters ---------- reader : Iterator of pd.DataFrame or pd.Series Input stream of DataFrame or Series objects to be processed in chunks. func : Callable Function applied to each synchronized set of chunks from the stream. May return data objects (pd.DataFrame or pd.Series) and/or metadata. func_args : tuple of Any, optional Additional positional arguments passed to `func`. Defaults to empty tuple. func_kwargs : dict, optional Additional keyword arguments passed to `func`. Defaults to empty dict. requested_types : type or tuple of type, default (pd.DataFrame, pd.Series) Types treated as data outputs from `func`. All other outputs are treated as metadata. non_data_output : {"first", "acc"}, default "first" Strategy for handling non-data outputs: - "first": only the first chunk's metadata is kept - "acc": accumulate metadata across all chunks non_data_proc : Callable, optional Optional function applied to aggregated non-data outputs after processing. non_data_proc_args : tuple of Any, optional Positional arguments for `non_data_proc`. non_data_proc_kwargs : dict, optional Keyword arguments for `non_data_proc`. makecopy : bool, default True If True, ensures independent copies of input streams are used internally. Returns ------- tuple of Any or None A tuple containing: - One or more ParquetStreamReader objects for chunked data outputs (if any data was produced) - Processed non-data outputs (metadata), optionally transformed by `non_data_proc` Raises ------ ValueError If `non_data_proc` is provided but not callable. """ if func_args is None: func_args = () if func_kwargs is None: func_kwargs = {} if not isinstance(requested_types, (list, tuple)): requested_types = (requested_types,) readers, static_args, static_kwargs = _prepare_readers(reader, func_args, func_kwargs, makecopy) if non_data_proc is not None: if not callable(non_data_proc): raise ValueError(f"Function {non_data_proc} is not callable.") if non_data_proc_args is None: non_data_proc_args = () if non_data_proc_kwargs is None: non_data_proc_kwargs = {} return _process_chunks( readers, func, requested_types, static_args, static_kwargs, non_data_output, non_data_proc, non_data_proc_args, non_data_proc_kwargs, )
def _process_function(results: Any, data_only: bool = False) -> Any: """ Execute a ProcessFunction or return the input unchanged. Parameters ---------- results : Any Input object to be processed. If it is a `ProcessFunction` instance, it will be executed using its stored data, function, and arguments. Otherwise, it is returned unchanged. data_only : bool, default False If True, only the first element of the processed result (typically the data stream) is returned. If False, the full tuple of results is returned. Returns ------- Any - If `results` is not a ProcessFunction, it is returned unchanged. - If it is a ProcessFunction, returns either: - A tuple containing processed data streams and metadata, or - A single data stream if `data_only=True`. """ if not isinstance(results, ProcessFunction): return results data = results.data func = results.func args = results.func_args kwargs = results.func_kwargs if isinstance(data, (pd.DataFrame, pd.Series, xr.Dataset, xr.DataArray)): return func(data, *args, **kwargs) if is_valid_iterator(data) and not isinstance(data, ParquetStreamReader): data = parquet_stream_from_iterable(data) if isinstance(data, (list, tuple)): data = parquet_stream_from_iterable(data) if not isinstance(data, ParquetStreamReader): raise TypeError(f"Unsupported data type: {type(data)}") args_for_call: tuple[Any, ...] | None = tuple(args) result = process_disk_backed( data, func, func_args=args_for_call, func_kwargs=kwargs, **results.kwargs, ) if result is None: return None if data_only is True: result = result[0] return result
[docs] def process_function( data_only: bool = False, postprocessing: dict[str, Any] | None = None, ) -> Callable[..., Any]: """ Decorator to apply function to both pd.DataFrame and Iterable[pd.DataFrame]. Parameters ---------- data_only : bool, default False If True, only the primary data output is returned from the processed result. postprocessing : dict, optional Optional configuration for a postprocessing step applied to each result. Expected keys: - "func": callable applied to each DataFrame/Series/stream output - "kwargs": list or dict of argument names taken from the original call Returns ------- Callable A decorator that wraps a function so it can operate on both in-memory pandas objects and disk-backed ParquetStreamReader streams. Raises ------ ValueError If a provided postprocessing function is not callable. """ def decorator(func: Callable[..., Any]) -> Callable[..., Any]: """ Decorator that enables a function to operate on pandas objects and streamed Parquet data. Parameters ---------- func : Callable Function to be wrapped so it can operate on both pandas objects and disk-backed ParquetStreamReader streams. Returns ------- Callable Wrapped function that supports streaming and optional postprocessing. """ sig = inspect.signature(func) @wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: r""" Wrapper that executes the decorated function and handles streaming and optional postprocessing. Parameters ---------- \*args : Any Positional arguments passed to the decorated function. \**kwargs : Any Keyword arguments passed to the decorated function. Returns ------- Any Result of executing the decorated function, optionally processed through disk-backed streaming and postprocessing logic. """ bound_args = sig.bind(*args, **kwargs) bound_args.apply_defaults() original_call = bound_args.arguments.copy() result_class = func(*args, **kwargs) results = _process_function( result_class, data_only=data_only, ) if postprocessing is None: return results postproc_func = postprocessing.get("func") if not callable(postproc_func): raise ValueError(f"Function {postproc_func} is not callable.") postproc_list = postprocessing.get("kwargs", {}) if isinstance(postproc_list, str): postproc_list = [postproc_list] postproc_kwargs = {k: original_call[k] for k in postproc_list} result_list = [] for result in results: if isinstance(result, (pd.DataFrame, pd.Series, ParquetStreamReader)): result = postproc_func(result, **postproc_kwargs) result_list.append(result) return tuple(result_list) return wrapper return decorator