"""Utility function for reading and writing CDM tables."""
from __future__ import annotations
from collections.abc import Iterable
from typing import Any
from .. import properties
[docs]
def dict_to_tuple_list(dic: dict[Any, Any]) -> list[tuple[Any, Any]]:
"""
Convert a dictionary with scalar or list values into a list of (key, value) tuples.
If a value is a list, each item in the list will produce its own tuple.
If a value is a scalar, a single tuple is produced.
Parameters
----------
dic : dict
Dictionary containing keys and values. Values may be scalars or lists.
Returns
-------
list of tuple
List of (key, value) tuples. If a dictionary value is a list,
each list item becomes a separate tuple.
Examples
--------
>>> dict_to_tuple_list({"A": [1, 2], "B": 3})
[('A', 1), ('A', 2), ('B', 3)]
"""
tuple_list: list[tuple[Any, Any]] = []
for key, value in dic.items():
if isinstance(value, list):
tuple_list.extend((key, item) for item in value)
else:
tuple_list.append((key, value))
return tuple_list
[docs]
def get_cdm_subset(cdm_subset: str | Iterable[str] | None) -> list[str]:
"""
Normalize and validate a CDM subset specification.
This function ensures that the returned value is always a list of valid
CDM table names (as defined in `properties.cdm_tables`). It accepts:
- `None` returns the full list of CDM tables.
- A single string validated and returned as a one-element list.
- An iterable of strings each entry is validated and returned unchanged.
Parameters
----------
cdm_subset : str, Iterable of str or None
CDM subset input to normalize. May be:
- `None`: full list of CDM tables is returned.
- `str`: returned as a list containing that string.
- Any iterable (e.g., list) of strings: returned unchanged after validation.
Returns
-------
list of str
A list of CDM table names that are guaranteed to exist in
`properties.cdm_tables`.
Raises
------
ValueError
If any provided table name is not in `properties.cdm_tables`.
"""
if cdm_subset is None:
return list(properties.cdm_tables)
if isinstance(cdm_subset, str) or not isinstance(cdm_subset, Iterable):
cdm_subset = [cdm_subset]
else:
cdm_subset = list(cdm_subset)
for item in cdm_subset:
if item not in properties.cdm_tables:
raise ValueError(f"Invalid CDM subset '{item}'. must be one of {properties.cdm_tables}.")
return cdm_subset
[docs]
def get_usecols(tb: str, col_subset: str | Iterable[str] | dict[str, Any] | None) -> list[str] | None:
"""
Normalize a column subset specification for use with pandas.read_csv.
This function converts various forms of column subset input into a
standardized list of column names suitable for the `usecols` argument
in `pandas.read_csv`.
Parameters
----------
tb : str
Table name. Only used if `col_subset` is a dictionary.
col_subset : str, Iterable of str, dict, or None
Column subset specification. Acceptable formats:
- A single column name as a string.
- An iterable of column names (list, tuple, set, etc.).
- A dictionary mapping table names to column lists.
- None (read all columns).
Returns
-------
list of str or None
Normalized list of column names suitable for pandas `usecols`,
or None if no restriction is applied.
Raises
------
TypeError
If `col_subset` is not a string, iterable, dict, or None.
Notes
-----
1. If `col_subset` is a string, it is returned as a single-element list.
2. If `col_subset` is an iterable of strings (e.g., list, tuple, set),
it is converted to a list.
3. If `col_subset` is a dictionary, it is interpreted as a mapping
{table_name: list_of_columns} and returns the entry corresponding
to the given table `tb` (or None if missing).
4. If `col_subset` is None, the function returns None, meaning all columns
should be read.
"""
if isinstance(col_subset, str):
return [col_subset]
if isinstance(col_subset, dict):
return col_subset.get(tb)
if col_subset is None:
return None
# Any other iterable ? convert to list
try:
return list(col_subset)
except TypeError as err:
raise TypeError(f"col_subset must be str, iterable of str, dict, or None, got {type(col_subset)}") from err
[docs]
def adjust_filename(filename: str, table: str = "", extension: str = "psv") -> str:
"""
Adjust a filename by optionally prepending a table name and appending an extension.
Parameters
----------
filename : str
Original filename.
table : str, optional
Table name to prepend if not already present in the filename (default is "").
extension : str, optional
File extension to append if not already present (default is "psv").
Returns
-------
str
Adjusted filename with optional table prefix and file extension.
Notes
-----
1. If `table` is not already part of the filename, it will be prepended with a dash.
2. If the filename does not contain an extension (no '.'), the specified `extension` is appended.
Default extension is 'psv'.
Examples
--------
>>> adjust_filename("data", table="header")
'header-data.psv'
>>> adjust_filename("header-data.psv", table="header")
'header-data.psv'
>>> adjust_filename("data.txt", table="header")
'header-data.txt'
"""
if table not in filename:
filename = f"{table}-{filename}"
if "." not in filename:
filename = f"{filename}.{extension}"
return filename