Source code for cdm_reader_mapper.duplicates.duplicates

"""Common Data Model (CDM) pandas duplicate check."""

from __future__ import annotations
import datetime
from collections.abc import Iterable
from copy import deepcopy
from typing import Any

import numpy as np
import pandas as pd
import recordlinkage as rl

from ._duplicate_settings import Compare, _compare_kwargs, _histories, _method_kwargs


[docs] def convert_series(df: pd.DataFrame, conversion: dict[Any, Any]) -> pd.DataFrame: """ Convert data types in Dataframe. Parameters ---------- df : pd.DataFrame Input DataFrame. conversion : dict Conversion dictionary conating columns and new data type as key-value pairs. Returns ------- pd.DataFrame DataFrame with converted data types. """ def convert_date_to_float(date: pd.Series | pd.DatetimeIndex) -> pd.Series: """ Convert datetime values to float seconds relative to the minimum value. Parameters ---------- date : pd.Series or pd.DatetimeIndex Datetime-like values to convert. Returns ------- pd.Series Float values representing seconds since the minimum datetime in `date`. """ date = date.astype("datetime64[ns]") return (date - date.min()) / np.timedelta64(1, "s") df = df.copy() for column, method in conversion.items(): try: df[column] = df[column].astype(method) except TypeError: df[column] = locals()[method](df[column]) df = df.infer_objects(copy=False).fillna(9999.0) return df
[docs] def add_history(df: pd.DataFrame, indexes: Iterable[int]) -> pd.DataFrame: """ Append duplicate information to the 'history' column of a DataFrame. Parameters ---------- df : pd.DataFrame The DataFrame containing a 'history' column. indexes : list[int] or pd.Index Row indexes where history should be updated. Returns ------- pd.DataFrame A new DataFrame with updated 'history' column for the selected rows. Notes ----- - If 'history' column does not exist, it will be created with empty strings. - Each message is prefixed with a UTC timestamp in "YYYY-MM-DD HH:MM:SS" format. """ def _datetime_now() -> str: """ Get actual datetime. Returns ------- str Actual datetime as string representative ("%Y-%m-%d %H:%M:%S"). """ try: now = datetime.datetime.now(datetime.UTC) except AttributeError: now = datetime.datetime.utcnow() return now.strftime("%Y-%m-%d %H:%M:%S") df = df.copy() if "history" not in df.columns: df["history"] = "" history_tstmp = _datetime_now() addition = "".join([f"; {history_tstmp}. {add}" for add in _histories.items()]) df.loc[indexes, "history"] = df.loc[indexes, "history"] + addition return df
[docs] def add_duplicates(df: pd.DataFrame, dups: pd.DataFrame) -> pd.DataFrame: """ Add duplicate information to the DataFrame based on the `dups` table. Parameters ---------- df : pd.DataFrame DataFrame containing a 'report_id' column. dups : pd.DataFrame DataFrame where the index corresponds to rows in `df` and the values are lists of duplicate indices or duplicate IDs. Returns ------- pd.DataFrame A new DataFrame with a 'duplicates' column containing duplicates as a sorted string list, e.g., "{ID1,ID2}". Notes ----- - If a row has no duplicates, its 'duplicates' column is left unchanged. - Supports duplicates represented either by IDs (str) or by indices (int) of `report_id`. """ def _add_dups(row: pd.Series) -> pd.Series: """ Add duplicates as string representatives to series. Parameters ---------- row : pd.Series Single row of a pd.DataFrame. Returns ------- pd.Series Duplicates as string representatives added to `row`. """ idx = row.name if idx not in dups.index: return row dup_idx = dups.loc[idx].to_list() if isinstance(dup_idx[0][0], str): v_ = sorted(dup_idx[0]) else: v_ = report_ids.iloc[dup_idx[0]] v_ = sorted(v_.tolist()) row["duplicates"] = "{" + ",".join(v_) + "}" return row df = df.copy() if "duplicates" not in df.columns: df["duplicates"] = "" report_ids = df["report_id"] dtypes = df.dtypes result = df.apply(lambda x: _add_dups(x), axis=1) return result.astype(dtypes)
[docs] def add_report_quality(df: pd.DataFrame, indexes_bad: Iterable[int]) -> pd.DataFrame: """ Update the 'report_quality' column in a DataFrame for bad reports. Parameters ---------- df : pd.DataFrame DataFrame containing at least a 'report_quality' column. indexes_bad : iterable of int Row indices in the DataFrame to mark as bad quality (value=1). Returns ------- pd.DataFrame DataFrame with updated 'report_quality' column. """ df = df.copy() df["report_quality"] = df["report_quality"].astype(int) df.loc[indexes_bad, "report_quality"] = 1 return df
[docs] class DupDetect: """ Class to detect, flag, and remove duplicate entries in a DataFrame using a comparison matrix from recordlinkage. Parameters ---------- data : pd.DataFrame Original dataset. compared : pd.DataFrame Comparison matrix of the dataset. method : str Duplicate detection method used for recordlinkage indexing. method_kwargs : dict Keyword arguments for recordlinkage indexing method. compare_kwargs : dict Keyword arguments used for recordlinkage.Compare. """ def __init__( self, data: pd.DataFrame, compared: pd.DataFrame, method: str, method_kwargs: dict[Any, Any], compare_kwargs: dict[Any, Any], ) -> None: """ Initialize a DupDetect instance. Parameters ---------- data : pd.DataFrame Original dataset. compared : pd.DataFrame Comparison matrix of the dataset. method : str Duplicate detection method used for recordlinkage indexing. method_kwargs : dict Keyword arguments for recordlinkage indexing method. compare_kwargs : dict Keyword arguments used for recordlinkage.Compare. """ self.data = data.copy() self.compared = compared self.method = method self.method_kwargs = method_kwargs self.compare_kwargs = compare_kwargs def _get_limit(self, limit: str | float | None) -> float: """ Resolve the duplicate threshold limit. Parameters ---------- limit : str or float 'default', None, or a numeric limit. Returns ------- float Threshold for total score to consider duplicates. """ default_limit = 0.991 if limit is None or limit == "default": return default_limit return float(limit) def _get_equal_musts(self) -> list[str]: """ Identify columns that must be equal for duplicates. Returns ------- list[str] Columns that must match exactly to consider duplicates. """ equal_musts: list[str] = [] for value in self.compare_kwargs.keys(): if isinstance(value, str): value_lst = [value] else: value_lst = list(value) equal_musts.extend(v for v in value_lst if v in self.data.columns) return equal_musts def _total_score(self) -> None: """Compute total similarity score for each row in `self.compared`.""" pcmax = self.compared.shape[1] self.score = 1 - (abs(self.compared.sum(axis=1) - pcmax) / pcmax)
[docs] def get_duplicates( self, keep: str | int = "first", limit: str | float | None = "default", equal_musts: str | list[str] | None = None, overwrite: bool = True, ) -> pd.DataFrame: """ Identify duplicate matches based on the comparison matrix. Parameters ---------- keep : str or int Which entry to keep: 'first', 'last', or -1, 0. limit : str or float, optional, default: default Threshold of total similarity score to consider as duplicate. equal_musts : str or list[str], optional Columns that must exactly match. overwrite : bool, default: True Whether to recompute matches if already calculated. Returns ------- pd.DataFrame DataFrame containing matched duplicates. """ if keep not in ["first", "last", -1, 0]: raise ValueError("keep has to be one of 'first', 'last', -1 or 0.") if keep == "first": keep = -1 elif keep == "last": keep = 0 self.keep = keep if keep == 0: self.drop = -1 elif keep == -1: self.drop = 0 if overwrite is True: self._total_score() self.limit = self._get_limit(limit) cond = self.score >= self.limit if equal_musts is None: equal_musts = self._get_equal_musts() if isinstance(equal_musts, str): equal_musts = [equal_musts] for must in equal_musts: cond = cond & (self.compared[must]) self.matches = self.compared[cond] return self.matches
[docs] def flag_duplicates( self, keep: str | int = "first", limit: str | float | None = "default", equal_musts: str | list[str] | None = None, ) -> pd.DataFrame: r""" Get result dataset with flagged duplicates. Parameters ---------- keep : str or int, default: first Which entry should be kept in result dataset. limit : str, int or float, optional Limit of total score that as to be exceeded to be declared as a duplicate. Defaults to .991. equal_musts : str or list, optional Hashable of column name(s) that must totally be equal to be declared as a duplicate. Default: All column names found in method_kwargs. Returns ------- pd.DataFrame Input DataFrame with flagged duplicates, including duplicate_status_ and quality_flag_. References ---------- .. _duplicate_status: https://glamod.github.io/cdm-obs-documentation/tables/code_tables/duplicate_status/duplicate_status.html .. _quality_flag: https://glamod.github.io/cdm-obs-documentation/tables/code_tables/quality_flag/quality_flag.html """ def _get_similars(drop_dict: dict[str | int, Any], keeps: Any) -> tuple[Any, Any]: """ Get similar entries from a comparison dictionary. Parameters ---------- drop_dict : dict Dictionary containing values under keys `drop_` and `keep_` used to determine similarity relationships. keeps : Any Reference collection used to determine whether a value in `drop` is considered a match. Returns ------- tuple of Any and Any A tuple containing the matched `drop` and `keep` values converted to integers if possible. If the values are not convertible or no match is found, returns `(None, None)`. """ if drop_dict[drop] in keeps: drops = drop_dict[drop] keeps = drop_dict[keep] try: return int(drops), int(keeps) except ValueError: return drops, keeps return None, None def _get_duplicates(x: pd.DataFrame, last: Any) -> pd.Series: """ Extract unique duplicate values from a DataFrame column. Parameters ---------- x : pd.DataFrame Input DataFrame containing the column to inspect. last : Any Column name used to extract values for duplicate detection. Returns ------- pd.Series Series containing a single key "dups" with the list of unique duplicate values found in the specified column. """ b = list(set(x[last].values)) return pd.Series({"dups": b}) def _delete_values_equal_keys(dictionary: dict[Any, Any]) -> tuple[dict[Any, Any], list[Any]]: """ Remove entries where keys and values are identical. Parameters ---------- dictionary : dict Input mapping of keys to values. Returns ------- tuple of dict and list of Any A tuple containing: - A filtered dictionary with identical key-value pairs removed - A list of values that were removed because key == value """ new_dictionary = {} drops = [] for k, v in dictionary.items(): if k == v: drops.append(v) continue new_dictionary[k] = v return new_dictionary, drops def replace_keeps_and_drops(df: pd.DataFrame, keep: Any) -> pd.DataFrame: """ Iteratively resolve and replace duplicate mappings in a DataFrame. Parameters ---------- df : pd.DataFrame Input DataFrame containing values to be deduplicated. keep : Any Column name used to identify canonical ("keep") values. Returns ------- pd.DataFrame Updated DataFrame with resolved duplicate mappings and cleaned keep-column values. """ keeps = df[keep].values while True: df = df.sort_index() replaces = df.apply(lambda row, keeps=keeps: _get_similars(row, keeps), axis=1) replaces = {k: v for k, v in dict(replaces.values).items() if k is not None} replaces, drops = _delete_values_equal_keys(replaces) keys = replaces.keys() values = replaces.values() if len(drops) > 0: df = df.drop(drops, axis="index") df[keep] = df[keep].replace(replaces) if not set(keys).intersection(values): return df self.get_duplicates(keep=keep, limit=limit, equal_musts=equal_musts) result = self.data.copy() dtypes = result.dtypes result["duplicate_status"] = 0 if not hasattr(self, "matches"): self.get_duplicates(limit="default", equal_musts=equal_musts) indexes = self.matches.index indexes_df = indexes.to_frame() drop = indexes_df.columns[self.drop] keep = indexes_df.columns[self.keep] indexes_df = indexes_df.drop_duplicates(subset=[drop]) indexes_df = replace_keeps_and_drops(indexes_df, keep) dup_keep = indexes_df.groupby(indexes_df[keep]).apply( lambda x: _get_duplicates(x, drop), include_groups=False, ) dup_drop = indexes_df.groupby(indexes_df[drop]).apply( lambda x: _get_duplicates(x, keep), include_groups=False, ) duplicates = pd.concat([dup_keep, dup_drop]) indexes_good = indexes_df[keep].values.tolist() indexes_bad = indexes_df[drop].values.tolist() indexes = indexes_good + indexes_bad result.loc[indexes_good, "duplicate_status"] = 1 result.loc[indexes_bad, "duplicate_status"] = 3 result = add_report_quality(result, indexes_bad=indexes_bad) result = add_history(result, indexes) result = result.sort_index(ascending=True) result = add_duplicates(result, duplicates) self.result = result.astype(dtypes) self.data = self.data.sort_index(ascending=True) return self.result
[docs] def remove_duplicates( self, keep: str | int = "first", limit: str | float | None = "default", equal_musts: str | list[str] | None = None, ) -> pd.DataFrame: """ Remove duplicate entries from the dataset. Parameters ---------- keep : str or int Which entry to keep ('first' or 'last'). limit : str or float, optional Minimum similarity score to declare duplicates. equal_musts : str or list[str], optional Columns that must exactly match. Returns ------- pd.DataFrame Dataset without duplicates. """ self.get_duplicates(keep=keep, limit=limit, equal_musts=equal_musts) result = self.data.copy() drops = self.matches.index.get_level_values(self.drop) result = result.drop(drops) self.result = result.sort_index(ascending=True) self.data = self.data.sort_index(ascending=True) return self.result
[docs] def set_comparer(compare_dict: dict[Any, Any]) -> Compare: """ Build a recordlinkage Compare object with optional conversion dictionary. Parameters ---------- compare_dict : dict Dictionary of columns to compare, e.g. {"column_name": {"method": "exact" | "numeric" | "date2", "kwargs": {...}}}. Returns ------- recordlinkage.Compare Compare object with added comparison methods and a 'conversion' attribute. """ comparer = Compare() comparer.conversion = {} for column, c_dict in compare_dict.items(): try: method = c_dict["method"] except KeyError as err: raise KeyError( "compare_kwargs must be hierarchically ordered: {<column_name>: {'method': <compare_method>}}. 'method' not found" ) from err try: kwargs = c_dict["kwargs"] except KeyError: kwargs = {} getattr(comparer, method)( column, column, label=column, **kwargs, ) if method == "numeric": comparer.conversion[column] = float if method == "date": comparer.conversion[column] = "datetime64[ns]" if method == "date2": comparer.conversion[column] = "convert_date_to_float" return comparer
[docs] def remove_ignores(dic: dict[Any, Any], columns: str | list[str]) -> dict[Any, Any]: """ Remove dictionary entries where keys or values match ignored columns. Parameters ---------- dic : dict Original dictionary to filter. columns : str or list[str] Column(s) to ignore. Returns ------- dict Filtered dictionary without the ignored columns. """ new_dict = {} if isinstance(columns, str): columns = [columns] for k, v in dic.items(): if k in columns: continue if v in columns: continue if isinstance(v, list): v2 = [v_ for v_ in v if v_ not in columns] if len(v2) == 0: continue v = v2 new_dict[k] = v return new_dict
[docs] def change_offsets(dic: dict[Any, Any], dic_o: dict[Any, Any]) -> dict[Any, Any]: """ Update the 'offset' value in compare dictionary kwargs. Parameters ---------- dic : dict Original compare dictionary. dic_o : dict Dictionary mapping column names to new offsets. Returns ------- dict Updated compare dictionary with modified offsets. """ for key in dic.keys(): if key not in dic_o.keys(): continue dic[key]["kwargs"]["offset"] = dic_o[key] return dic
[docs] def reindex_nulls(df: pd.DataFrame, null_label: Any) -> pd.DataFrame: """ Reindex a DataFrame in ascending order based on the number of 'null' strings in each row. Parameters ---------- df : pd.DataFrame Input DataFrame. Cells with the string "null" are counted as nulls. null_label : Any Missing value representative. Returns ------- pd.DataFrame DataFrame reindexed so that rows with fewer 'null' values appear first. Original row order is preserved for rows with the same null count. """ def is_missing(x: Any) -> bool: """ Determine whether a value is considered missing. This function supports scalar values as well as nested iterables (lists, tuples, numpy arrays). A value is considered missing if it is: - NaN (as defined by ``pandas.isna``) - Equal to ``null_label`` - Any element inside an iterable is missing (recursively checked) Parameters ---------- x : Any Value to check for missingness. Returns ------- bool True if the value (or any nested value) is missing, otherwise False. """ if isinstance(x, (list, tuple, np.ndarray)): return any(is_missing(x_) for x_ in x) if pd.isna(x): return True if x == null_label: return True return False def count_nulls(row: pd.Series) -> int: """ Count the number of missing values in a pandas Series. Parameters ---------- row : pd.Series Input row or Series to evaluate. Returns ------- int Number of missing values in the Series. """ return sum(is_missing(x) for x in row) null_counts = df.apply(count_nulls, axis=1) if null_counts.empty: return df sorted_index = null_counts.sort_values(kind="stable").index return df.loc[sorted_index]
[docs] class Comparer: """ Wrapper around recordlinkage.Compare to compute pairwise comparisons on a DataFrame. This class initializes a recordlinkage indexer and Compare object, optionally converting the data types before computing the comparisons. Parameters ---------- data : pd.DataFrame The dataset to compare. method : str The indexing method from `recordlinkage.index`, e.g., 'SortedNeighbourhood'. method_kwargs : dict Keyword arguments to pass to the indexing method. compare_kwargs : dict Dictionary specifying columns and comparison methods for recordlinkage.Compare. pairs_df : list[pd.DataFrame], optional Optional pre-split DataFrames to pass to the indexer. Defaults to `[data]`. convert_data : bool, default False Whether to convert data using `compare_kwargs` conversion dictionary. """ def __init__( self, data: pd.DataFrame, method: str, method_kwargs: dict[Any, Any], compare_kwargs: dict[Any, Any], pairs_df: list[pd.DataFrame] | None = None, convert_data: bool = False, ): """ Initialize a Comparer instance. Parameters ---------- data : pd.DataFrame The dataset to compare. method : str The indexing method from `recordlinkage.index`, e.g., 'SortedNeighbourhood'. method_kwargs : dict Keyword arguments to pass to the indexing method. compare_kwargs : dict Dictionary specifying columns and comparison methods for recordlinkage.Compare. pairs_df : list[pd.DataFrame], optional Optional pre-split DataFrames to pass to the indexer. Defaults to `[data]`. convert_data : bool, default False Whether to convert data using `compare_kwargs` conversion dictionary. """ indexer = getattr(rl.index, method)(**method_kwargs) comparer = set_comparer(compare_kwargs) if convert_data is True: data_cp = convert_series(data, comparer.conversion) else: data_cp = data.copy() if pairs_df is None: pairs_df = [data_cp] pairs = indexer.index(*pairs_df) self.compared = comparer.compute(pairs, data_cp) self.data = data_cp
[docs] def duplicate_check( data: pd.DataFrame, method: str = "SortedNeighbourhood", method_kwargs: dict[Any, Any] | None = None, compare_kwargs: dict[Any, Any] | None = None, table_name: str | None = None, ignore_columns: str | None = None, ignore_entries: dict[str, Any] | None = None, offsets: dict[str, Any] | None = None, reindex_by_null: bool = True, null_label: Any = "null", ) -> DupDetect: """ Run a duplicate check on a dataset using recordlinkage. Returns a DupDetect object. Parameters ---------- data : pandas.DataFrame Dataset for duplicate check. method : str, default: SortedNeighbourhood Duplicate check method for recordlinkage. method_kwargs : dict, optional Keyword arguments for recordlinkage duplicate check. Defaults to _method_kwargs. compare_kwargs : dict, optional Keyword arguments for recordlinkage.Compare object. Defaults to _compare_kwargs. table_name : str, optional Name of the CDM table to be selected from data. ignore_columns : str or list, optional Name of data columns to be ignored for duplicate check. ignore_entries : dict, optional Key: Column name. Value: value to be ignored. E.g. offsets={"station_speed": null}. offsets : dict, optional Change offsets for recordlinkage Compare object. Key: Column name. Value: new offset. E.g. offsets={"latitude": 0.1}. reindex_by_null : bool, optional If True data is re-indexed in ascending order according to the number of nulls in each row. null_label : str, optional Null label which is used if `reindex_by_null` is True. Returns ------- cdm_reader_mapper.DupDetect A DupDetect instance. """ if reindex_by_null is True: data = reindex_nulls(data, null_label=null_label) index = data.index data.reset_index(drop=True) if table_name: data = data[table_name] if not method_kwargs: method_kwargs = deepcopy(_method_kwargs) if not compare_kwargs: compare_kwargs = deepcopy(_compare_kwargs) if ignore_columns: method_kwargs = remove_ignores(method_kwargs, ignore_columns) compare_kwargs = remove_ignores(compare_kwargs, ignore_columns) if offsets: compare_kwargs = change_offsets(compare_kwargs, offsets) dtypes = data.dtypes comparer = Comparer( data=data, method=method, method_kwargs=method_kwargs, compare_kwargs=compare_kwargs, convert_data=True, ) compared = comparer.compared data_ = comparer.data if ignore_entries is None: return DupDetect(data, compared, method, method_kwargs, compare_kwargs) compared = [compared] for column_, entry_ in ignore_entries.items(): if not isinstance(entry_, list): entry_ = [entry_] entries = data[column_].isin(entry_) d1 = data.mask(entries).dropna(how="all") d2 = data.where(entries).dropna(how="all") if d1.empty: continue if d2.empty: continue method_kwargs_ = remove_ignores(method_kwargs, column_) compare_kwargs_ = remove_ignores(compare_kwargs, column_) compared_ = Comparer( data=data_, method=method, method_kwargs=method_kwargs_, compare_kwargs=compare_kwargs_, pairs_df=[d2, d1], ).compared compared_[list(ignore_entries.keys())] = 1 compared.append(compared_) compared = pd.concat(compared) data.set_index(index, inplace=True) data = data.astype(dtypes) return DupDetect(data, compared, method, method_kwargs, compare_kwargs)