Source code for cdm_reader_mapper.duplicates.duplicates

"""Common Data Model (CDM) pandas duplicate check."""

from __future__ import annotations

import datetime
from copy import deepcopy

from typing import Iterable

import numpy as np
import pandas as pd
import recordlinkage as rl

from ._duplicate_settings import Compare, _compare_kwargs, _histories, _method_kwargs


def convert_series(df: pd.DataFrame, conversion: dict) -> pd.DataFrame:
    """Convert data types in Dataframe.

    Parameters
    ----------
    df: pandas.DataFrame
        Input DataFrame
    conversion: dict
        Conversion dictionary conating columns and
        new data type as key-value pairs.

    Returns
    -------
    pandas.DataFrame
    """

    def convert_date_to_float(date):
        date = date.astype("datetime64[ns]")
        return (date - date.min()) / np.timedelta64(1, "s")

    df = df.copy()
    for column, method in conversion.items():
        try:
            df[column] = df[column].astype(method)
        except TypeError:
            df[column] = locals()[method](df[column])

    df = df.infer_objects(copy=False).fillna(9999.0)
    return df


def add_history(df: pd.DataFrame, indexes: Iterable[int]) -> pd.DataFrame:
    """
    Append duplicate information to the 'history' column of a DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        The DataFrame containing a 'history' column.
    indexes : list[int] or pd.Index
        Row indexes where history should be updated.

    Returns
    -------
    pd.DataFrame
        A new DataFrame with updated 'history' column for the selected rows.

    Notes
    -----
    - If 'history' column does not exist, it will be created with empty strings.
    - Each message is prefixed with a UTC timestamp in "YYYY-MM-DD HH:MM:SS" format.
    """

    def _datetime_now():

        try:
            now = datetime.datetime.now(datetime.UTC)
        except AttributeError:
            now = datetime.datetime.utcnow()

        return now.strftime("%Y-%m-%d %H:%M:%S")

    df = df.copy()

    if "history" not in df.columns:
        df["history"] = ""

    history_tstmp = _datetime_now()
    addition = "".join([f"; {history_tstmp}. {add}" for add in _histories.items()])
    df.loc[indexes, "history"] = df.loc[indexes, "history"] + addition
    return df


def add_duplicates(df: pd.DataFrame, dups: pd.DataFrame) -> pd.DataFrame:
    """
    Add duplicate information to the DataFrame based on the `dups` table.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame containing a 'report_id' column.
    dups : pd.DataFrame
        DataFrame where the index corresponds to rows in `df` and
        the values are lists of duplicate indices or duplicate IDs.

    Returns
    -------
    pd.DataFrame
        A new DataFrame with a 'duplicates' column containing duplicates
        as a sorted string list, e.g., "{ID1,ID2}".

    Notes
    -----
    - If a row has no duplicates, its 'duplicates' column is left unchanged.
    - Supports duplicates represented either by IDs (str) or by indices (int) of `report_id`.
    """

    def _add_dups(row):
        idx = row.name
        if idx not in dups.index:
            return row

        dup_idx = dups.loc[idx].to_list()
        if isinstance(dup_idx[0][0], str):
            v_ = sorted(dup_idx[0])
        else:
            v_ = report_ids.iloc[dup_idx[0]]
            v_ = sorted(v_.tolist())
        row["duplicates"] = "{" + ",".join(v_) + "}"
        return row

    df = df.copy()

    if "duplicates" not in df.columns:
        df["duplicates"] = ""

    report_ids = df["report_id"]

    dtypes = df.dtypes
    result = df.apply(lambda x: _add_dups(x), axis=1)
    return result.astype(dtypes)


def add_report_quality(df: pd.DataFrame, indexes_bad: Iterable[int]) -> pd.DataFrame:
    """
    Update the 'report_quality' column in a DataFrame for bad reports.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame containing at least a 'report_quality' column.
    indexes_bad : iterable of int
        Row indices in the DataFrame to mark as bad quality (value=1).

    Returns
    -------
    pd.DataFrame
        DataFrame with updated 'report_quality' column.
    """
    df = df.copy()
    df["report_quality"] = df["report_quality"].astype(int)
    df.loc[indexes_bad, "report_quality"] = 1
    return df


[docs] class DupDetect: """ Class to detect, flag, and remove duplicate entries in a DataFrame using a comparison matrix from recordlinkage. Parameters ---------- data : pd.DataFrame Original dataset. compared : pd.DataFrame Comparison matrix of the dataset. method : str Duplicate detection method used for recordlinkage indexing. method_kwargs : dict Keyword arguments for recordlinkage indexing method. compare_kwargs : dict Keyword arguments used for recordlinkage.Compare. """
[docs] def __init__( self, data: pd.DataFrame, compared: pd.DataFrame, method: str, method_kwargs: dict, compare_kwargs: dict, ): self.data = data.copy() self.compared = compared self.method = method self.method_kwargs = method_kwargs self.compare_kwargs = compare_kwargs
def _get_limit(self, limit: str | float | None) -> float: """ Resolve the duplicate threshold limit. Parameters ---------- limit : str or float 'default', None, or a numeric limit. Returns ------- float Threshold for total score to consider duplicates. """ default_limit = 0.991 if limit in ("default", None): return default_limit return float(limit) def _get_equal_musts(self) -> list[str]: """ Identify columns that must be equal for duplicates. Returns ------- list[str] Columns that must match exactly to consider duplicates. """ equal_musts = [] for value in self.compare_kwargs.keys(): if not isinstance(value, list): value = [value] for v in value: if v in self.data.columns: equal_musts.append(v) return equal_musts def _total_score(self) -> None: """Compute total similarity score for each row in `self.compared`.""" pcmax = self.compared.shape[1] self.score = 1 - (abs(self.compared.sum(axis=1) - pcmax) / pcmax)
[docs] def get_duplicates( self, keep: str | int = "first", limit: str | float | None = "default", equal_musts: str | list[str] | None = None, overwrite: bool = True, ) -> pd.DataFrame: """ Identify duplicate matches based on the comparison matrix. Parameters ---------- keep : str or int Which entry to keep: 'first', 'last', or -1, 0. limit : str or float, optional Threshold of total similarity score to consider as duplicate. equal_musts : str or list[str], optional Columns that must exactly match. overwrite : bool Whether to recompute matches if already calculated. Returns ------- pd.DataFrame DataFrame containing matched duplicates. """ if keep not in ["first", "last", -1, 0]: raise ValueError("keep has to be one of 'first', 'last', -1 or 0.") if keep == "first": keep = -1 elif keep == "last": keep = 0 self.keep = keep if keep == 0: self.drop = -1 elif keep == -1: self.drop = 0 if overwrite is True: self._total_score() self.limit = self._get_limit(limit) cond = self.score >= self.limit if equal_musts is None: equal_musts = self._get_equal_musts() if isinstance(equal_musts, str): equal_musts = [equal_musts] for must in equal_musts: cond = cond & (self.compared[must]) self.matches = self.compared[cond] return self.matches
[docs] def flag_duplicates( self, keep: str | int = "first", limit: str | float | None = "default", equal_musts: str | list[str] | None = None, ) -> pd.DataFrame: r"""Get result dataset with flagged duplicates. Parameters ---------- keep: str, ["first", "last"] Which entry should be kept in result dataset. limit: float, optional Limit of total score that as to be exceeded to be declared as a duplicate. Default: .991 equal_musts: str or list, optional Hashable of column name(s) that must totally be equal to be declared as a duplicate. Default: All column names found in method_kwargs. Returns ------- pandas.DataFrame Input DataFrame with flagged duplicates. Flags for ``duplicate_status``: see `duplicate_status`_ Flags for ``report_quality``: see `quality_flag`_ .. _duplicate_status: https://glamod.github.io/cdm-obs-documentation/tables/code_tables/duplicate_status/duplicate_status.html .. _quality_flag: https://glamod.github.io/cdm-obs-documentation/tables/code_tables/quality_flag/quality_flag.html """ def _get_similars(drop, keeps): if drop[drop_] in keeps: drops = drop[drop_] keeps = drop[keep_] try: return int(drops), int(keeps) except ValueError: return drops, keeps def _get_duplicates(x, last): b = list(set(x[last].values)) return pd.Series({"dups": b}) def _delete_values_equal_keys(dictionary): dictionary_ = {} drops_ = [] for k, v in dictionary.items(): if k == v: drops_.append(v) continue dictionary_[k] = v return dictionary_, drops_ def replace_keeps_and_drops(df, keep_): while True: df = df.sort_index() keeps = df[keep_].values replaces = df.apply(lambda x: _get_similars(x, keeps), axis=1) replaces = dict(replaces.dropna().values) replaces, drops_ = _delete_values_equal_keys(replaces) keys = replaces.keys() values = replaces.values() if len(drops_) > 0: df = df.drop(drops_, axis="index") df[keep_] = df[keep_].replace(replaces) if not set(keys).intersection(values): return df self.get_duplicates(keep=keep, limit=limit, equal_musts=equal_musts) result = self.data.copy() dtypes = result.dtypes result["duplicate_status"] = 0 if not hasattr(self, "matches"): self.get_matches(limit="default", equal_musts=equal_musts) indexes = self.matches.index indexes_df = indexes.to_frame() drop_ = indexes_df.columns[self.drop] keep_ = indexes_df.columns[self.keep] indexes_df = indexes_df.drop_duplicates(subset=[drop_]) indexes_df = replace_keeps_and_drops(indexes_df, keep_) dup_keep = indexes_df.groupby(indexes_df[keep_]).apply( lambda x: _get_duplicates(x, drop_), include_groups=False, ) dup_drop = indexes_df.groupby(indexes_df[drop_]).apply( lambda x: _get_duplicates(x, keep_), include_groups=False, ) duplicates = pd.concat([dup_keep, dup_drop]) indexes_good = indexes_df[keep_].values.tolist() indexes_bad = indexes_df[drop_].values.tolist() indexes = indexes_good + indexes_bad result.loc[indexes_good, "duplicate_status"] = 1 result.loc[indexes_bad, "duplicate_status"] = 3 result = add_report_quality(result, indexes_bad=indexes_bad) result = add_history(result, indexes) result = result.sort_index(ascending=True) result = add_duplicates(result, duplicates) self.result = result.astype(dtypes) self.data = self.data.sort_index(ascending=True) return self.result
[docs] def remove_duplicates( self, keep: str | int = "first", limit: str | float | None = "default", equal_musts: str | list[str] | None = None, ) -> pd.DataFrame: """ Remove duplicate entries from the dataset. Parameters ---------- keep : str or int Which entry to keep ('first' or 'last'). limit : str or float, optional Minimum similarity score to declare duplicates. equal_musts : str or list[str], optional Columns that must exactly match. Returns ------- pd.DataFrame Dataset without duplicates. """ self.get_duplicates(keep=keep, limit=limit, equal_musts=equal_musts) result = self.data.copy() drops = self.matches.index.get_level_values(self.drop) result = result.drop(drops) self.result = result.sort_index(ascending=True) self.data = self.data.sort_index(ascending=True) return self.result
def set_comparer(compare_dict) -> Compare: """ Build a recordlinkage Compare object with optional conversion dictionary. Parameters ---------- compare_dict: dict Dictionary of columns to compare, e.g., {"column_name": {"method": "exact" | "numeric" | "date2", "kwargs": {...}}} Returns ------- recordlinkage.Compare Compare object with added comparison methods and a 'conversion' attribute. """ comparer = Compare() setattr(comparer, "conversion", {}) for column, c_dict in compare_dict.items(): try: method = c_dict["method"] except KeyError: raise KeyError( "compare_kwargs must be hierarchically ordered: {<column_name>: {'method': <compare_method>}}. 'method' not found" ) try: kwargs = c_dict["kwargs"] except KeyError: kwargs = {} getattr(comparer, method)( column, column, label=column, **kwargs, ) if method == "numeric": comparer.conversion[column] = float if method == "date": comparer.conversion[column] = "datetime64[ns]" if method == "date2": comparer.conversion[column] = "convert_date_to_float" return comparer def remove_ignores(dic: dict, columns: str | list[str]) -> dict: """ Remove dictionary entries where keys or values match ignored columns. Parameters ---------- dic : dict Original dictionary to filter. columns : str or list[str] Column(s) to ignore. Returns ------- dict Filtered dictionary without the ignored columns. """ new_dict = {} if isinstance(columns, str): columns = [columns] for k, v in dic.items(): if k in columns: continue if v in columns: continue if isinstance(v, list): v2 = [v_ for v_ in v if v_ not in columns] if len(v2) == 0: continue v = v2 new_dict[k] = v return new_dict def change_offsets(dic: dict, dic_o: dict) -> dict: """ Update the 'offset' value in compare dictionary kwargs. Parameters ---------- dic : dict Original compare dictionary. dic_o : dict Dictionary mapping column names to new offsets. Returns ------- dict Updated compare dictionary with modified offsets. """ for key in dic.keys(): if key not in dic_o.keys(): continue dic[key]["kwargs"]["offset"] = dic_o[key] return dic def reindex_nulls(df: pd.DataFrame, null_label) -> pd.DataFrame: """ Reindex a DataFrame in ascending order based on the number of 'null' strings in each row. Parameters ---------- df : pd.DataFrame Input DataFrame. Cells with the string "null" are counted as nulls. Returns ------- pd.DataFrame DataFrame reindexed so that rows with fewer 'null' values appear first. Original row order is preserved for rows with the same null count. """ def is_missing(x): if isinstance(x, (list, tuple, np.ndarray)): return any(is_missing(x_) for x_ in x) if pd.isna(x): return True if x == null_label: return True return False def count_nulls(row): return sum(is_missing(x) for x in row) null_counts = df.apply(count_nulls, axis=1) if null_counts.empty: return df sorted_index = null_counts.sort_values(kind="stable").index return df.loc[sorted_index] class Comparer: """ Wrapper around recordlinkage.Compare to compute pairwise comparisons on a DataFrame. This class initializes a recordlinkage indexer and Compare object, optionally converting the data types before computing the comparisons. Parameters ---------- data : pd.DataFrame The dataset to compare. method : str The indexing method from `recordlinkage.index`, e.g., 'SortedNeighbourhood'. method_kwargs : dict Keyword arguments to pass to the indexing method. compare_kwargs : dict Dictionary specifying columns and comparison methods for recordlinkage.Compare. pairs_df : list[pd.DataFrame], optional Optional pre-split DataFrames to pass to the indexer. Defaults to `[data]`. convert_data : bool, default False Whether to convert data using `compare_kwargs` conversion dictionary. """ def __init__( self, data, method, method_kwargs, compare_kwargs, pairs_df=None, convert_data=False, ): indexer = getattr(rl.index, method)(**method_kwargs) comparer = set_comparer(compare_kwargs) if convert_data is True: data_cp = convert_series(data, comparer.conversion) else: data_cp = data.copy() if pairs_df is None: pairs_df = [data_cp] pairs = indexer.index(*pairs_df) self.compared = comparer.compute(pairs, data_cp) self.data = data_cp
[docs] def duplicate_check( data, method="SortedNeighbourhood", method_kwargs=None, compare_kwargs=None, table_name=None, ignore_columns=None, ignore_entries=None, offsets=None, reindex_by_null=True, null_label="null", ) -> DupDetect: """ Run a duplicate check on a dataset using recordlinkage. Returns a DupDetect object. Parameters ---------- data: pandas.DataFrame Dataset for duplicate check. method: str Duplicate check method for recordlinkage. Default: SortedNeighbourhood method_kwargs: dict, optional Keyword arguments for recordlinkage duplicate check. Default: _method_kwargs compare_kwargs: dict, optional Keyword arguments for recordlinkage.Compare object. Default: _compare_kwargs table_name: str, optional Name of the CDM table to be selected from data. ignore_columns: str or list, optional Name of data columns to be ignored for duplicate check. ignore_entries: dict, optional Key: Column name Value: value to be ignored E.g. offsets={"station_speed": null} offsets: dict, optional Change offsets for recordlinkage Compare object. Key: Column name Value: new offset E.g. offsets={"latitude": 0.1} reindex_by_null: bool, optional If True data is re-indexed in ascending order according to the number of nulls in each row. null_label: str, optional Null label which is used if `reindex_by_null` is True. Returns ------- cdm_reader_mapper.DupDetect """ if reindex_by_null is True: data = reindex_nulls(data, null_label=null_label) index = data.index data.reset_index(drop=True) if table_name: data = data[table_name] if not method_kwargs: method_kwargs = deepcopy(_method_kwargs) if not compare_kwargs: compare_kwargs = deepcopy(_compare_kwargs) if ignore_columns: method_kwargs = remove_ignores(method_kwargs, ignore_columns) compare_kwargs = remove_ignores(compare_kwargs, ignore_columns) if offsets: compare_kwargs = change_offsets(compare_kwargs, offsets) dtypes = data.dtypes Compared_ = Comparer( data=data, method=method, method_kwargs=method_kwargs, compare_kwargs=compare_kwargs, convert_data=True, ) compared = Compared_.compared data_ = Compared_.data if ignore_entries is None: return DupDetect(data, compared, method, method_kwargs, compare_kwargs) compared = [compared] for column_, entry_ in ignore_entries.items(): if not isinstance(entry_, list): entry_ = [entry_] entries = data[column_].isin(entry_) d1 = data.mask(entries).dropna(how="all") d2 = data.where(entries).dropna(how="all") if d1.empty: continue if d2.empty: continue method_kwargs_ = remove_ignores(method_kwargs, column_) compare_kwargs_ = remove_ignores(compare_kwargs, column_) compared_ = Comparer( data=data_, method=method, method_kwargs=method_kwargs_, compare_kwargs=compare_kwargs_, pairs_df=[d2, d1], ).compared compared_[list(ignore_entries.keys())] = 1 compared.append(compared_) compared = pd.concat(compared) data.set_index(index, inplace=True) data = data.astype(dtypes) return DupDetect(data, compared, method, method_kwargs, compare_kwargs)