Source code for cdm_reader_mapper.cdm_mapper.codes.codes

"""
Created on Thu Apr 11 13:45:38 2019.

Module to handle data models mappings to C3S Climate Data Store
Common Data Model (CMD) tables within the cdm tool.

@author: iregon
"""

from __future__ import annotations
import ast
import datetime
from pathlib import Path
from typing import Any

from cdm_reader_mapper.common.json_dict import (
    collect_json_files,
    combine_dicts,
    open_json_file,
)

from .. import properties


def _eval(s: str) -> Any:
    """
    Safely evaluate a string as a Python literal.

    Parameters
    ----------
    s : str
        Input string to evaluate.

    Returns
    -------
    Any
        Evaluated Python object if parsing succeeds, otherwise the original string.
    """
    try:
        return ast.literal_eval(s)
    except (SyntaxError, ValueError):
        return s


def _to_int(x: Any) -> int | None:
    """
    Convert input to an integer if possible.

    Parameters
    ----------
    x : Any
        Input value to convert.

    Returns
    -------
    int or None
        Integer representation of `x` if conversion succeeds, otherwise None.
    """
    try:
        return int(x)
    except (TypeError, ValueError):
        return None


def _expand_integer_range_key(d: Any) -> Any:
    """
    Expand dictionary keys that are integer ranges into individual year keys.

    Keys that can be evaluated to a list of the form [start, end, step]
    are expanded into individual string keys covering that range. The special
    value "yyyy" for the upper bound is replaced with the current year.

    Parameters
    ----------
    d : Any
        Input object, typically a dictionary with string keys.

    Returns
    -------
    Any
        Dictionary with expanded keys if input is a dict, otherwise the input
        unchanged.
    """
    if not isinstance(d, dict):
        return d

    expanded: dict[str, Any] = {}

    for k, v in d.items():
        v = _expand_integer_range_key(v)

        k_eval = _eval(k)

        if isinstance(k_eval, list) and len(k_eval) >= 2:
            lower = _to_int(k_eval[0])
            upper = _to_int(k_eval[1] if k_eval[1] != "yyyy" else datetime.date.today().year)
            step = _to_int(k_eval[2] if len(k_eval) > 2 else 1)

            if lower is None or upper is None or step is None:
                continue

            for i in range(lower, upper + 1, step):
                expanded[str(i)] = v
        elif not isinstance(k_eval, list):
            expanded[k] = v

    return expanded


[docs] def open_code_table(ifile: str | Path) -> Any: """ Open code table from json file on disk. Parameters ---------- ifile : str or Path-like Path to the JSON file containing the code table. Returns ------- Any Parsed JSON content with integer range keys expanded into explicit keys. """ json_dict = open_json_file(ifile) return _expand_integer_range_key(json_dict)
[docs] def get_code_table(data_model: str, *sub_models: str, code_table: str | None = None) -> dict[str, dict[str, Any]]: r""" Load code tables into dictionary. Combine JSON code table files from a specified data model, optional submodels, and common code tables. Parameters ---------- data_model : str The main data model name, e.g., `icoads`. \*sub_models : str Optional submodel names, e.g. `r300`, `d721`. code_table : str Name of the code table to load. If None, return empty dictionary. Returns ------- dict Combined dictionary of code tables. Nested tables are merged recursively. """ common_files = collect_json_files("common", base=f"{properties._base}.codes", name=code_table) table_files = collect_json_files(data_model, *sub_models, base=f"{properties._base}.codes", name=code_table) table_files = common_files + table_files tables = [open_code_table(ifile) for ifile in table_files] return combine_dicts(tables)