Source code for cdm_reader_mapper.cdm_mapper.codes.codes
"""
Created on Thu Apr 11 13:45:38 2019.
Module to handle data models mappings to C3S Climate Data Store
Common Data Model (CMD) tables within the cdm tool.
@author: iregon
"""
from __future__ import annotations
import ast
import datetime
from pathlib import Path
from typing import Any
from cdm_reader_mapper.common.json_dict import (
collect_json_files,
combine_dicts,
open_json_file,
)
from .. import properties
def _eval(s: str) -> Any:
"""
Safely evaluate a string as a Python literal.
Parameters
----------
s : str
Input string to evaluate.
Returns
-------
Any
Evaluated Python object if parsing succeeds, otherwise the original string.
"""
try:
return ast.literal_eval(s)
except (SyntaxError, ValueError):
return s
def _to_int(x: Any) -> int | None:
"""
Convert input to an integer if possible.
Parameters
----------
x : Any
Input value to convert.
Returns
-------
int or None
Integer representation of `x` if conversion succeeds, otherwise None.
"""
try:
return int(x)
except (TypeError, ValueError):
return None
def _expand_integer_range_key(d: Any) -> Any:
"""
Expand dictionary keys that are integer ranges into individual year keys.
Keys that can be evaluated to a list of the form [start, end, step]
are expanded into individual string keys covering that range. The special
value "yyyy" for the upper bound is replaced with the current year.
Parameters
----------
d : Any
Input object, typically a dictionary with string keys.
Returns
-------
Any
Dictionary with expanded keys if input is a dict, otherwise the input
unchanged.
"""
if not isinstance(d, dict):
return d
expanded: dict[str, Any] = {}
for k, v in d.items():
v = _expand_integer_range_key(v)
k_eval = _eval(k)
if isinstance(k_eval, list) and len(k_eval) >= 2:
lower = _to_int(k_eval[0])
upper = _to_int(k_eval[1] if k_eval[1] != "yyyy" else datetime.date.today().year)
step = _to_int(k_eval[2] if len(k_eval) > 2 else 1)
if lower is None or upper is None or step is None:
continue
for i in range(lower, upper + 1, step):
expanded[str(i)] = v
elif not isinstance(k_eval, list):
expanded[k] = v
return expanded
[docs]
def open_code_table(ifile: str | Path) -> Any:
"""
Open code table from json file on disk.
Parameters
----------
ifile : str or Path-like
Path to the JSON file containing the code table.
Returns
-------
Any
Parsed JSON content with integer range keys expanded into explicit keys.
"""
json_dict = open_json_file(ifile)
return _expand_integer_range_key(json_dict)
[docs]
def get_code_table(data_model: str, *sub_models: str, code_table: str | None = None) -> dict[str, dict[str, Any]]:
r"""
Load code tables into dictionary.
Combine JSON code table files from a specified data model,
optional submodels, and common code tables.
Parameters
----------
data_model : str
The main data model name, e.g., `icoads`.
\*sub_models : str
Optional submodel names, e.g. `r300`, `d721`.
code_table : str
Name of the code table to load. If None, return empty dictionary.
Returns
-------
dict
Combined dictionary of code tables. Nested tables are merged recursively.
"""
common_files = collect_json_files("common", base=f"{properties._base}.codes", name=code_table)
table_files = collect_json_files(data_model, *sub_models, base=f"{properties._base}.codes", name=code_table)
table_files = common_files + table_files
tables = [open_code_table(ifile) for ifile in table_files]
return combine_dicts(tables)