Source code for balance.utils.data_transformation

# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

from __future__ import annotations

import copy
import logging
import warnings
from itertools import combinations
from typing import Any, List, Tuple

import numpy as np
import pandas as pd
from balance.utils.input_validation import choose_variables
from balance.utils.pandas_utils import _safe_fillna_and_infer

logger: logging.Logger = logging.getLogger(__package__)


[docs] def add_na_indicator( df: pd.DataFrame, replace_val_obj: str = "_NA", replace_val_num: int = 0 ) -> pd.DataFrame: """If a column in the DataFrame contains NAs, replace these with 0 for numerical columns or "_NA" for non-numerical columns, and add another column of an indicator variable for which rows were NA. Args: df (pd.DataFrame): The input DataFrame replace_val_obj (str, optional): The value to put instead of nulls for object columns. Defaults to "_NA". replace_val_num (int, optional): The value to put instead of nulls for numeric columns. Defaults to 0. Raises: Exception: Can't add NA indicator to DataFrame which contains columns which start with '_is_na_' Exception: Can't add NA indicator to columns containing NAs and the value '{replace_val_obj}', Returns: pd.DataFrame: New dataframe with additional columns """ already_na_cols = [c for c in df.columns if c.startswith("_is_na_")] if len(already_na_cols) > 0: raise ValueError( "Can't add NA indicator to DataFrame which contains" f"columns which start with '_is_na_': {already_na_cols}" ) na = df.isnull() na_cols = list(df.columns[na.any(axis="index")]) na_indicators = na.loc[:, na_cols] na_indicators.columns = ("_is_na_" + c for c in na_indicators.columns) categorical_cols = list(df.columns[df.dtypes == "category"]) non_numeric_cols = list( df.columns[(df.dtypes == "object") | (df.dtypes == "string")] ) for c in list(na_cols): if replace_val_obj in set(df[c]): raise ValueError( f"Can't add NA indicator to columns containing NAs and the value '{replace_val_obj}', " f"i.e. column: {c}" ) if c in categorical_cols: filled_col = ( df[c].cat.add_categories(replace_val_obj).fillna(replace_val_obj) ) df[c] = filled_col.infer_objects(copy=False) elif c in non_numeric_cols: df[c] = _safe_fillna_and_infer(df[c], replace_val_obj) else: df[c] = _safe_fillna_and_infer(df[c], replace_val_num) return pd.concat((df, na_indicators), axis=1)
[docs] def drop_na_rows( sample_df: pd.DataFrame, sample_weights: pd.Series, name: str = "sample object" ) -> Tuple[pd.DataFrame, pd.Series]: """ Drop rows with missing values in sample_df and their corresponding weights, and the same in target_df. Args: sample_df (pd.DataFrame): a dataframe representing the sample or target sample_weights (pd.Series): design weights for sample or target name (str, optional): name of object checked (used for warnings prints). Defaults to "sample object". Raises: ValueError: Dropping rows led to empty {name}. Maybe try na_action='add_indicator'? Returns: Tuple[pd.DataFrame, pd.Series]: sample_df, sample_weights without NAs rows """ sample_n = sample_df.shape[0] sample_df = sample_df.dropna() sample_weights = sample_weights[sample_df.index] sample_n_after = sample_df.shape[0] _sample_rate = f"{sample_n - sample_n_after}/{sample_n}" logger.warning(f"Dropped {_sample_rate} rows of {name}") if sample_n_after == 0: raise ValueError( f"Dropping rows led to empty {name}. Maybe try na_action='add_indicator'?" ) return (sample_df, sample_weights)
[docs] def qcut( s: np.ndarray | pd.Series, q: int | float, duplicates: str = "drop", **kwargs: Any, ) -> np.ndarray | pd.Series: """Discretize variable into equal-sized buckets based quantiles. This is a wrapper to pandas qcut function. Args: s (_type_): 1d ndarray or Series. q (_type_): Number of quantiles (int or float). duplicates (str, optional): whether to drop non unique bin edges or raise error ("raise" or "drop"). Defaults to "drop". Returns: Series of type object with intervals. """ if s.shape[0] < q: # pyre-ignore[58]: Comparison is valid in practice logger.warning("Not quantizing, too few values") return s else: return pd.qcut(s, q, duplicates=duplicates, **kwargs).astype("O")
[docs] def quantize( df: pd.DataFrame | pd.Series, q: int = 10, variables: List[str] | None = None, ) -> pd.DataFrame | np.ndarray | pd.Series: """Cut numeric variables of a DataFrame into quantiles buckets Args: df (Union[pd.DataFrame, pd.Series]): a DataFrame to transform q (int, optional): Number of buckets to create for each variable. Defaults to 10. variables (optional): variables to transform. If None, all numeric variables are transformed. Defaults to None. Returns: pd.DataFrame: DataFrame after quantization. numpy.nan values are kept as is. """ if not (isinstance(df, pd.Series) or isinstance(df, pd.DataFrame)): # Necessary because pandas calls the function on the first item on its own # https://stackoverflow.com/questions/21635915/ df = pd.Series(df) if isinstance(df, pd.Series): if not pd.api.types.is_numeric_dtype(df.dtype): raise TypeError("series must be numeric") return qcut(df, q, duplicates="drop") if not isinstance(df, pd.DataFrame): raise TypeError("df must be a pandas DataFrame") variables = choose_variables(df, variables=variables) numeric_columns = list(df.select_dtypes(include=[np.number]).columns) variables = [v for v in variables if v in numeric_columns] original_columns = list(df.columns) transformed_data = df.loc[:, variables].transform( lambda c: qcut(c, q, duplicates="drop") ) untransformed_columns = df.columns.difference(variables) transformed_data = pd.concat( (df.loc[:, untransformed_columns], transformed_data), axis=1 ) return transformed_data.loc[:, original_columns]
[docs] def row_pairwise_diffs(df: pd.DataFrame) -> pd.DataFrame: """Produce the differences between every pair of rows of df Args: df (pd.DataFrame): DataFrame Returns: pd.DataFrame: DataFrame with differences between all combinations of rows """ c = combinations(sorted(df.index), 2) diffs = [] for j, i in c: d = df.loc[i] - df.loc[j] d = d.to_frame().transpose().assign(source=f"{i} - {j}").set_index("source") diffs.append(d) return pd.concat([df] + diffs)
[docs] def auto_spread( data: pd.DataFrame, features: List[str] | None = None, id_: str = "id" ) -> pd.DataFrame: """Automatically transform a 'long' DataFrame into a 'wide' DataFrame by guessing which column should be used as a key, treating all other columns as values. At the moment, this will only find a single key column Args: data (pd.DataFrame): features (Optional[list], optional): Defaults to None. id_ (str, optional): Defaults to "id". Returns: pd.DataFrame """ if features is None: features = [c for c in data.columns.values if c != id_] is_unique = {} for c in features: # Use include_groups=False to avoid FutureWarning about operating on grouping columns # Fall back to old behavior if include_groups parameter is not supported try: unique_userids = data.groupby(c, include_groups=False)[id_].apply( lambda x: len(set(x)) == len(x) ) except TypeError: # Fallback for older pandas versions that don't support include_groups parameter unique_userids = data.groupby(c)[id_].apply(lambda x: len(set(x)) == len(x)) is_unique[c] = all(unique_userids.values) unique_groupings = [k for k, v in is_unique.items() if v] if len(unique_groupings) < 1: logger.warning(f"no unique groupings {is_unique}") return data elif len(unique_groupings) > 1: logger.warning( f"{len(unique_groupings)} possible groupings: {unique_groupings}" ) # Always chooses the first unique grouping unique_grouping = unique_groupings[0] logger.warning(f"Grouping by {unique_grouping}") data = data.loc[:, features + [id_]].pivot(index=id_, columns=unique_grouping) data.columns = [ "_".join(map(str, ((unique_grouping,) + c[-1:] + c[:-1]))) for c in data.columns.values ] data = data.reset_index() return data
def auto_aggregate( data: pd.DataFrame, features: None = None, _id: str = "id", # NOTE: we use str as default since using a lambda function directly would make this argument mutable - # so if one function call would change it, another function call would get the revised aggfunc argument. # Thus, using str is important so to keep our function idempotent. aggfunc: str | Any = "sum", ) -> pd.DataFrame: # The default aggregation function is a lambda around sum(x), because as of # Pandas 0.22.0, Series.sum of an all-na Series is 0, not nan if features is not None: warnings.warn( "features argument is unused, it will be removed in the future", DeprecationWarning, stacklevel=2, ) if isinstance(aggfunc, str): if aggfunc == "sum": def _f(x: Any) -> int: return sum(x) aggfunc = _f else: raise ValueError( f"unknown aggregate function name {aggfunc}, accepted values are ('sum',)." ) try: data_without_id = data.drop(columns=[_id]) except KeyError: raise ValueError(f"data must have a column named {_id}") all_columns = data_without_id.columns.to_list() numeric_columns = data_without_id.select_dtypes( include=[np.number] ).columns.to_list() if set(all_columns) != set(numeric_columns): raise ValueError( "Not all covariates are numeric. The function will not aggregate automatically." ) return pd.pivot_table(data, index=_id, aggfunc=aggfunc).reset_index()
[docs] def fct_lump(s: pd.Series, prop: float = 0.05) -> pd.Series: """Lumps infrequent levels into '_lumped_other'. Note that all values with proportion less than prop output the same value '_lumped_other'. Args: s (pd.Series): pd.series to lump, with dtype of integer, numeric, object, or category (category will be converted to object) prop (float, optional): the proportion of infrequent levels to lump. Defaults to 0.05. Returns: pd.Series: pd.series (with category dtype converted to object, if applicable) """ # Handle value_counts with object-dtype to maintain consistent behavior with warnings.catch_warnings(): warnings.filterwarnings( "ignore", message="The behavior of value_counts with object-dtype is deprecated.*", category=FutureWarning, ) props = s.value_counts() / s.shape[0] # Ensure proper dtype inference on the index props.index = props.index.infer_objects(copy=False) small_categories = props[props < prop].index.tolist() remainder_category_name = "_lumped_other" while remainder_category_name in props.index: remainder_category_name = remainder_category_name * 2 # Convert to object dtype s = s.astype("object") # Replace small categories with the remainder category name s.loc[s.apply(lambda x: x in small_categories)] = remainder_category_name return s
[docs] def fct_lump_by(s: pd.Series, by: pd.Series, prop: float = 0.05) -> pd.Series: """Lumps infrequent levels into '_lumped_other, only does so per value of the grouping variable `by`. Useful, for example, for keeping the most important interactions in a model. Args: s (pd.Series): pd.series to lump by (pd.Series): pd.series according to which group the data prop (float, optional): the proportion of infrequent levels to lump. Defaults to 0.05. Returns: pd.Series: pd.series, we keep the index of s as the index of the result. """ res = copy.deepcopy(s) pd.options.mode.copy_on_write = True # pandas groupby doesnt preserve order for subgroup in pd.unique(by): mask = by == subgroup grouped_res = fct_lump(res.loc[mask], prop=prop) # Ensure dtype compatibility before assignment res = res.astype("object") res.loc[mask] = grouped_res return res