# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# pyre-strict
from __future__ import annotations
import collections
import logging
import re
from typing import List, Literal
import numpy as np
import numpy.typing as npt
import pandas as pd
from balance.stats_and_plots.weighted_stats import (
descriptive_stats,
weighted_mean,
weighted_var,
)
from balance.stats_and_plots.weights_stats import _check_weights_are_valid
from balance.util import _safe_groupby_apply, _safe_replace_and_infer
from pandas import CategoricalDtype
from scipy.integrate import quad
from scipy.stats import gaussian_kde
logger: logging.Logger = logging.getLogger(__package__)
# TODO: add?
# from scipy.stats import wasserstein_distance
##########################################
# Weighted comparisons - functions to compare one or two data sources with one or two sources of weights
##########################################
# TODO: fix the r_indicator function. The current implementation is broken since it
# seems to wrongly estimate N.
# This seems to attempt to reproduce equation 2.2.2, in page 5 in
# "Indicators for the representativeness of survey response"
# by Jelke Bethlehem, Fannie Cobben, and Barry Schouten
# See pdf: https://www150.statcan.gc.ca/n1/en/pub/11-522-x/2008000/article/10976-eng.pdf?st=Zi4d4zld
# From: https://www150.statcan.gc.ca/n1/pub/11-522-x/2008000/article/10976-eng.pdf
# def r_indicator(sample_p: np.float64, target_p: np.float64) -> np.float64:
# p = np.concatenate((sample_p, target_p))
# N = len(sample_p) + len(target_p)
# return 1 - 2 * np.sqrt(1 / (N - 1) * np.sum((p - np.mean(p)) ** 2))
def _weights_per_covars_names(covar_names: List[str]) -> pd.DataFrame:
# TODO (p2): consider if to give weights that are proportional to the proportion of this covar in the population
# E.g.: if merging varios platforms, maybe if something like windows has very few users, it's impact on the ASMD
# should be smaller (similar to how kld works).
# The current function structure won't support it, and this would require some extra input (i.e.: baseline target pop proportion)
"""
Figure out how much weight to give to each column name for ASMD averaging.
This is meant for post-processing df produced from model_matrix that include
one-hot categorical variables. This function helps to resolve the needed weight to add
to columns after they are broken down by the one-hot encoding used in model_matrix.
Each of these will count for 1/num_levels.
It's OK to assume that variables with '[]' in the name are one-hot
categoricals because model_matrix enforces it.
Args:
covar_names (List): A list with names of covariate.
Returns:
pd.DataFrame: with two columns, one for weights and another for main_covar_names,
with rows for each of the columns from 'covar_names'
Examples:
::
asmd_df = pd.DataFrame(
{
'age': 0.5,
'education[T.high_school]': 1,
'education[T.bachelor]': 1,
'education[T.masters]': 1,
'education[T.phd]': 1,
}, index = ('self', ))
input = asmd_df.columns.values.tolist()
# input
# ['age',
# 'education[T.high_school]',
# 'education[T. bachelor]',
# 'education[T. masters]',
# 'education[T. phd]']
_weights_per_covars_names(input).to_dict()
# Output:
# {'weight': {'age': 1.0,
# 'education[T.high_school]': 0.25,
# 'education[T.bachelor]': 0.25,
# 'education[T.masters]': 0.25,
# 'education[T.phd]': 0.25},
# 'main_covar_names': {'age': 'age',
# 'education[T.high_school]': 'education',
# 'education[T.bachelor]': 'education',
# 'education[T.masters]': 'education',
# 'education[T.phd]': 'education'}}
"""
columns_to_original_variable = {v: re.sub(r"\[.*\]$", "", v) for v in covar_names}
counts = collections.Counter(columns_to_original_variable.values())
weights = pd.DataFrame(
{k: 1 / counts[v] for k, v in columns_to_original_variable.items()},
index=["weight"],
)
_check_weights_are_valid(weights) # verify nothing odd has occurred.
main_covar_names = pd.DataFrame.from_dict(
columns_to_original_variable,
orient="index",
columns=[
"main_covar_names",
],
)
return pd.concat([weights.transpose(), main_covar_names], axis=1)
def _kl_divergence_discrete(p: np.ndarray, q: np.ndarray, eps: float = 1e-12) -> float:
"""
Compute the KL divergence between two discrete probability mass functions.
Args:
p: 1D array-like, PMF of the sample distribution (must sum to 1).
q: 1D array-like, PMF of the target distribution (must sum to 1).
eps: Small constant to avoid ``log(0)`` or division by zero.
Returns:
The KL divergence ``sum_i p[i] * log(p[i] / q[i])``.
Raises:
ValueError: If inputs are not 1D arrays, are empty, have different
lengths, contain invalid values, or sum to zero.
"""
p = np.asarray(p, dtype=np.float64)
q = np.asarray(q, dtype=np.float64)
if p.ndim != 1 or q.ndim != 1:
raise ValueError("Both p and q must be 1D arrays.")
if p.size == 0 or q.size == 0:
raise ValueError("Input PMF arrays must not be empty.")
if p.size != q.size:
raise ValueError("Input PMF arrays must have the same length.")
if np.any(p < 0) or np.any(q < 0):
raise ValueError("PMF arrays must not contain negative values.")
if not np.isfinite(p).all() or not np.isfinite(q).all():
raise ValueError("PMF arrays must not contain NaN or infinite values.")
p_sum = np.sum(p)
q_sum = np.sum(q)
if p_sum == 0 or q_sum == 0:
raise ValueError("PMF arrays must not sum to zero.")
p = p / p_sum
q = q / q_sum
p = np.clip(p, eps, 1)
q = np.clip(q, eps, 1)
kl = np.sum(p * np.log(p / q))
return float(kl)
def _kl_divergence_continuous_quad(
p_samples: np.ndarray,
q_samples: np.ndarray,
p_weights: np.ndarray | None = None,
q_weights: np.ndarray | None = None,
eps: float = 1e-12,
) -> float:
"""
Estimate the KL divergence between two continuous distributions using KDEs
integrated with adaptive quadrature.
Args:
p_samples: 1D array-like samples from the sample distribution.
q_samples: 1D array-like samples from the target distribution.
p_weights: Optional weights for ``p_samples``.
q_weights: Optional weights for ``q_samples``.
eps: Small constant to avoid evaluating ``log(0)`` or dividing by zero.
Returns:
Estimated KL divergence ``∫ p(x) log(p(x) / q(x)) dx``.
Raises:
ValueError: If inputs are not valid 1D arrays, empty, or have
insufficient samples.
"""
def _validate_samples(
samples: np.ndarray, weights: np.ndarray | None, label: str
) -> tuple[np.ndarray, np.ndarray | None]:
samples = np.asarray(samples)
if samples.ndim != 1:
raise ValueError(f"{label} must be a 1D array.")
if samples.size == 0:
raise ValueError(f"{label} must not be empty.")
weights_arr = None if weights is None else np.asarray(weights, dtype=float)
if weights_arr is not None and weights_arr.shape[0] != samples.shape[0]:
raise ValueError(f"{label} weights must match the number of samples.")
if weights_arr is not None and weights_arr.sum() <= 0:
raise ValueError(f"{label} weights must sum to a positive value.")
if samples.size < 2:
raise ValueError(f"{label} must contain at least two samples for KDE.")
if weights_arr is not None and np.any(weights_arr < 0):
raise ValueError(f"{label} weights must be non-negative.")
return samples, weights_arr
p_samples, p_weights = _validate_samples(p_samples, p_weights, "p_samples")
q_samples, q_weights = _validate_samples(q_samples, q_weights, "q_samples")
kde_p: gaussian_kde = gaussian_kde(p_samples, weights=p_weights)
kde_q: gaussian_kde = gaussian_kde(q_samples, weights=q_weights)
min_support = min(p_samples.min(), q_samples.min())
max_support = max(p_samples.max(), q_samples.max())
def integrand(x: float) -> float:
p_x = float(np.clip(kde_p(x), eps, None)[0])
q_x = float(np.clip(kde_q(x), eps, None)[0])
return float(p_x * np.log(p_x / q_x))
kl, _ = quad(integrand, float(min_support), float(max_support), limit=100)
return float(max(kl, 0.0))
# TODO: add memoization
[docs]
def asmd(
sample_df: pd.DataFrame,
target_df: pd.DataFrame,
sample_weights: list[float] | pd.Series | npt.NDArray | None = None,
target_weights: list[float] | pd.Series | npt.NDArray | None = None,
std_type: Literal["target", "sample", "pooled"] = "target",
aggregate_by_main_covar: bool = False,
) -> pd.Series:
"""
Calculate the Absolute Standardized Mean Deviation (ASMD) between the columns of two DataFrames (or BalanceDFs).
It uses weighted average and std for the calculations.
This is the same as taking the absolute value of Cohen's d statistic,
with a specific choice of the standard deviation (std).
https://en.wikipedia.org/wiki/Effect_size#Cohen's_d
As opposed to Cohen's d, the current asmd implementation has several options of std calculation,
see the arguments section for details.
Unlike in R package {cobalt}, in the current implementation of asmd:
- the absolute value is taken
- un-represented levels of categorical variables are treated as missing,
not as 0.
- differences for categorical variables are also weighted by default
The function omits columns that starts with the name "_is_na_"
If column names of sample_df and target_df are different, it will only calculate asmd for
the overlapping columns. The rest will be np.nan.
The mean(asmd) will be calculated while treating the nan values as 0s.
Args:
sample_df (pd.DataFrame): source group of the asmd comparison
target_df (pd.DataFrame): target group of the asmd comparison.
The column names should be the same as the ones from sample_df.
sample_weights (Union[ List, pd.Series, np.ndarray, ], optional): weights to use.
The default is None.
If no weights are passed (None), it will use an array of 1s.
target_weights (Union[ List, pd.Series, np.ndarray, ], optional): weights to use.
The default is None.
If no weights are passed (None), it will use an array of 1s.
std_type (Literal["target", "sample", "pooled"], optional):
How the standard deviation should be calculated.
The options are: "target", "sample" and "pooled". Defaults to "target".
"target" means we use the std from the target population.
"sample" means we use the std from the sample population.
"pooled" means we use the simple arithmetic average of
the variance from the sample and target population.
We then take the square root of that value to get the pooled std.
Notice that this is done while giving the same weight to both sources.
I.e.: there is NO over/under weighting sample or target based on their
respective sample sizes (in contrast to how pooled sd is calculated in
Cohen's d statistic).
aggregate_by_main_covar (bool):
If to use :func:`_aggregate_statistic_by_main_covar` to aggregate (average) the asmd based on variable name.
Default is False.
Returns:
pd.Series: a Series indexed on the names of the columns in the input DataFrames.
The values (of type np.float64) are of the ASMD calculation.
The last element is 'mean(asmd)', which is the average of the calculated ASMD values.
Examples:
::
import numpy as np
import pandas as pd
from balance.stats_and_plots import weighted_comparisons_stats
a1 = pd.Series((1, 2))
b1 = pd.Series((-1, 1))
a2 = pd.Series((3, 4))
b2 = pd.Series((-2, 2))
w1 = pd.Series((1, 1))
w2 = w1
r = weighted_comparisons_stats.asmd(
pd.DataFrame({"a": a1, "b": b1}),
pd.DataFrame({"a": a2, "b": b2}),
w1,
w2,
)
exp_a = np.abs(a1.mean() - a2.mean()) / a2.std()
exp_b = np.abs(b1.mean() - b2.mean()) / b2.std()
print(r)
print(exp_a)
print(exp_b)
# output:
# {'a': 2.82842712474619, 'b': 0.0, 'mean(asmd)': 1.414213562373095}
# 2.82842712474619
# 0.0
a1 = pd.Series((1, 2))
b1_A = pd.Series((1, 3))
b1_B = pd.Series((-1, -3))
a2 = pd.Series((3, 4))
b2_A = pd.Series((2, 3))
b2_B = pd.Series((-2, -3))
w1 = pd.Series((1, 1))
w2 = w1
r = weighted_comparisons_stats.asmd(
pd.DataFrame({"a": a1, "b[A]": b1_A, "b[B]": b1_B}),
pd.DataFrame({"a": a2, "b[A]": b2_A, "b[B]": b2_B}),
w1,
w2,
).to_dict()
print(r)
# {'a': 2.82842712474619, 'b[A]': 0.7071067811865475, 'b[B]': 0.7071067811865475, 'mean(asmd)': 1.7677669529663689}
# Check that using aggregate_by_main_covar works
r = weighted_comparisons_stats.asmd(
pd.DataFrame({"a": a1, "b[A]": b1_A, "b[B]": b1_B}),
pd.DataFrame({"a": a2, "b[A]": b2_A, "b[B]": b2_B}),
w1,
w2,
"target",
True,
).to_dict()
print(r)
# {'a': 2.82842712474619, 'b': 0.7071067811865475, 'mean(asmd)': 1.7677669529663689}
"""
if not isinstance(sample_df, pd.DataFrame):
raise ValueError(f"sample_df must be pd.DataFrame, is {type(sample_df)}")
if not isinstance(target_df, pd.DataFrame):
raise ValueError(f"target_df must be pd.DataFrame, is {type(target_df)}")
possible_std_type = ("sample", "target", "pooled")
if not (std_type in possible_std_type):
raise ValueError(f"std_type must be in {possible_std_type}, is {std_type}")
if sample_df.columns.values.tolist() != target_df.columns.values.tolist():
logger.warning(
f"""
sample_df and target_df must have the same column names.
sample_df column names: {sample_df.columns.values.tolist()}
target_df column names: {target_df.columns.values.tolist()}"""
)
sample_mean = descriptive_stats(sample_df, sample_weights, "mean")
target_mean = descriptive_stats(target_df, target_weights, "mean")
if std_type == "sample":
std = descriptive_stats(sample_df, sample_weights, "std")
elif std_type == "target":
std = descriptive_stats(target_df, target_weights, "std")
elif std_type == "pooled":
target_std = descriptive_stats(target_df, target_weights, "std")
sample_std = descriptive_stats(sample_df, sample_weights, "std")
std = np.sqrt(((sample_std**2) + (target_std**2)) / 2)
out = abs(sample_mean - target_mean) / std # pyre-ignore[61]: std is always defined
# Remove na indicator columns; it's OK to assume that these columns are
# indicators because add_na_indicator enforces it
out = out.loc[:, (c for c in out.columns.values if not c.startswith("_is_na_"))]
out = _safe_replace_and_infer(out)
# TODO (p2): verify that df column names are unique (otherwise throw an exception).
# it should probably be upstream during in the Sample creation process.
weights = _weights_per_covars_names(out.columns.values.tolist())[
["weight"]
].transpose()
out = pd.concat((out, weights))
mean = weighted_mean(out.iloc[0], out.loc["weight"])
out["mean(asmd)"] = mean
out = out.iloc[0]
out.name = None
if aggregate_by_main_covar:
out = _aggregate_statistic_by_main_covar(out)
return out
[docs]
def kld(
sample_df: pd.DataFrame,
target_df: pd.DataFrame,
sample_weights: list[float] | pd.Series | npt.NDArray | None = None,
target_weights: list[float] | pd.Series | npt.NDArray | None = None,
aggregate_by_main_covar: bool = False,
) -> pd.Series:
"""
Calculate the Kullback-Leibler divergence (KLD) between the columns of two DataFrames.
This function supports both discrete (categorical, binary, boolean) and continuous (numeric) data.
Numeric columns are compared using weighted kernel density estimates integrated
via adaptive quadrature. Categorical (including binary/one-hot encoded)
columns are treated as general discrete distributions based on their weighted
empirical probability mass functions. Each column's contribution is aggregated
into ``mean(kld)`` using the same weighting scheme as ASMD.
The function omits columns that start with the name "_is_na_".
If column names of sample_df and target_df are different, it will only calculate KLD for
the overlapping columns (using outer join with 0 fill for missing columns).
The mean(kld) will be calculated using weighted averaging based on covariate structure.
Args:
sample_df (pd.DataFrame): source group of the KLD comparison.
target_df (pd.DataFrame): target group of the KLD comparison.
The column names should be the same as the ones from sample_df.
sample_weights (Union[List, pd.Series, np.ndarray], optional): weights to use.
The default is None. If no weights are passed (None), it will use an array
of 1s.
target_weights (Union[List, pd.Series, np.ndarray], optional): weights to use.
The default is None. If no weights are passed (None), it will use an array
of 1s.
aggregate_by_main_covar (bool):
If to use :func:`_aggregate_statistic_by_main_covar` to aggregate (average) the KLD based on variable name.
Default is False.
Returns:
pd.Series: a Series indexed on the names of the columns in the input DataFrames.
The values (of type np.float64) are of the KLD calculation.
The last element is 'mean(kld)', which is the weighted average of the calculated KLD values.
Examples:
::
import pandas as pd
from balance.stats_and_plots import weighted_comparisons_stats
# Example with categorical data showing clear distributional differences
sample_df = pd.DataFrame({
'category': ['A', 'A', 'B', 'C'],
})
target_df = pd.DataFrame({
'category': ['A', 'B', 'B', 'C'],
})
# Calculate KLD between the two categorical distributions
# Sample has: 50% A, 25% B, 25% C
# Target has: 25% A, 50% B, 25% C
kld_result = weighted_comparisons_stats.kld(sample_df, target_df)
print(kld_result.round(6))
# category[A] 0.143841
# category[B] 0.130812
# category[C] 0.000000
# mean(kld) 0.091551
# dtype: float64
# Example with aggregation by main covariate name
# This aggregates all category levels into a single KLD value
kld_aggregated = weighted_comparisons_stats.kld(
sample_df,
target_df,
aggregate_by_main_covar=True
)
print(kld_aggregated.round(6))
# category 0.091551
# mean(kld) 0.091551
# dtype: float64
"""
if not isinstance(sample_df, pd.DataFrame):
raise ValueError(f"sample_df must be pd.DataFrame, is {type(sample_df)}")
if not isinstance(target_df, pd.DataFrame):
raise ValueError(f"target_df must be pd.DataFrame, is {type(target_df)}")
if sample_df.columns.values.tolist() != target_df.columns.values.tolist():
logger.warning(
f"""
sample_df and target_df must have the same column names.
sample_df column names: {sample_df.columns.values.tolist()}
target_df column names: {target_df.columns.values.tolist()}"""
)
sample_df, target_df = sample_df.align(
target_df, join="outer", axis=1, fill_value=0
)
sample_weights_arr = (
np.asarray(sample_weights, dtype=float)
if sample_weights is not None
else np.ones(sample_df.shape[0], dtype=float)
)
target_weights_arr = (
np.asarray(target_weights, dtype=float)
if target_weights is not None
else np.ones(target_df.shape[0], dtype=float)
)
def _is_binary(series: pd.Series) -> bool:
uniques = pd.unique(series.dropna())
return len(uniques) <= 2 and set(uniques).issubset({0, 1})
def _extract_series_and_weights(
series: pd.Series, weights: np.ndarray
) -> tuple[pd.Series, np.ndarray]:
if weights.shape[0] != series.shape[0]:
raise ValueError("Weights must match the number of observations.")
mask = series.notna().to_numpy()
return series[mask], weights[mask]
def _weighted_pmf(series: pd.Series, weights: np.ndarray) -> pd.Series:
df = pd.DataFrame({"value": series, "weight": weights})
df = df[df["value"].notna()]
grouped = df.groupby("value", sort=False, observed=False)["weight"].sum()
total = grouped.sum()
if total <= 0:
raise ValueError("PMF weights must sum to a positive value.")
return grouped / total
out: dict[str, float] | pd.DataFrame | pd.Series = {}
for col in sample_df.columns:
if col.startswith("_is_na_"):
continue
sample_series, sample_w = _extract_series_and_weights(
sample_df[col], sample_weights_arr
)
target_series, target_w = _extract_series_and_weights(
target_df[col], target_weights_arr
)
is_discrete = _is_binary(sample_series) and _is_binary(target_series)
is_discrete = is_discrete or pd.api.types.is_object_dtype(sample_series)
is_discrete = is_discrete or isinstance(sample_series.dtype, CategoricalDtype)
is_discrete = is_discrete or pd.api.types.is_bool_dtype(sample_series)
if is_discrete:
sample_pmf = _weighted_pmf(sample_series, sample_w)
target_pmf = _weighted_pmf(target_series, target_w)
pmfs = pd.concat(
[sample_pmf.rename("sample"), target_pmf.rename("target")], axis=1
).fillna(0.0)
out[col] = _kl_divergence_discrete(
pmfs["sample"].to_numpy(), pmfs["target"].to_numpy()
)
else:
sample_vals = pd.to_numeric(sample_series, errors="coerce").dropna()
target_vals = pd.to_numeric(target_series, errors="coerce").dropna()
sample_w_numeric = sample_w[sample_series.index.isin(sample_vals.index)]
target_w_numeric = target_w[target_series.index.isin(target_vals.index)]
out[col] = _kl_divergence_continuous_quad(
sample_vals.to_numpy(),
target_vals.to_numpy(),
sample_w_numeric if sample_w_numeric.size else None,
target_w_numeric if target_w_numeric.size else None,
)
out = _safe_replace_and_infer(pd.DataFrame([out]))
weights = (
_weights_per_covars_names(out.columns.values.tolist())["weight"]
.to_frame()
.transpose()
)
out = pd.concat((out, weights))
mean = weighted_mean(out.iloc[0], out.loc["weight"])
out["mean(kld)"] = mean
out = out.iloc[0]
out.name = None
if aggregate_by_main_covar:
out = _aggregate_statistic_by_main_covar(out)
return out
def _aggregate_statistic_by_main_covar(statistic_series: pd.Series) -> pd.Series:
"""
This function helps to aggregate statistics (e.g., ASMD, KLD), which are broken down per level for a category variable,
into one value per covariate.
This is useful since it allows us to get high level view of features such as country, locale, etc.
It also allows us to filter out variables (such as is_today) before the overall averaging of the ASMD.
Args:
statistic_series (pd.Series): a value for each covariate. Covariate name are in the index, the values are the asmd values.
Returns:
pd.Series: If statistic_series had several items broken by one-hot encoding,
then they would be averaged (with equal weight to each).
Examples:
::
from balance.stats_and_plots.weighted_comparisons_stats import _aggregate_statistic_by_main_covar
statistic_series = pd.Series(
{
'age': 0.5,
'education[T.high_school]': 1,
'education[T.bachelor]': 2,
'education[T.masters]': 3,
'education[T.phd]': 4,
})
_aggregate_statistic_by_main_covar(statistic_series).to_dict()
# output:
# {'age': 0.5, 'education': 2.5}
"""
weights = _weights_per_covars_names(statistic_series.index.values.tolist())
# turn things into DataFrame to make it easy to aggregate.
out = pd.concat((statistic_series, weights), axis=1)
# Define the apply function for calculating weighted means
def calculate_weighted_mean(group_data: pd.DataFrame) -> float:
values = group_data.iloc[:, 0] # First column contains the ASMD values
weights = group_data["weight"]
return ((values * weights) / weights.sum()).sum()
# Use _safe_groupby_apply to handle pandas compatibility
out = _safe_groupby_apply(out, "main_covar_names", calculate_weighted_mean)
out.name = None
out.index.name = None
return out
# TODO: (p2) sample_before and sample_after are redundant, the moment weights of
# before and after are supplied directly.
# In the future, we can either omit sample_after, or change the names to
# reflect a support a comparison of two panels to some target populations.
[docs]
def asmd_improvement(
sample_before: pd.DataFrame,
sample_after: pd.DataFrame,
target: pd.DataFrame,
sample_before_weights: list[float] | pd.Series | npt.NDArray | None = None,
sample_after_weights: list[float] | pd.Series | npt.NDArray | None = None,
target_weights: list[float] | pd.Series | npt.NDArray | None = None,
) -> np.float64:
"""Calculates the improvement in mean(asmd) from before to after applying some weight adjustment.
Args:
sample_before (pd.DataFrame): DataFrame of the sample before adjustments.
sample_after (pd.DataFrame): DataFrame of the sample after adjustments
(should be identical to sample_before. But could be used to compare two populations).
target (pd.DataFrame): DataFrame of the target population.
sample_before_weights (Union[ List, pd.Series, np.ndarray, ], optional): Weights before adjustments (i.e.: design weights). Defaults to None.
sample_after_weights (Union[ List, pd.Series, np.ndarray, ], optional): Weights after some adjustment. Defaults to None.
target_weights (Union[ List, pd.Series, np.ndarray, ], optional): Design weights of the target population. Defaults to None.
Returns:
np.float64: The improvement is taking the (before_mean_asmd-after_mean_asmd)/before_mean_asmd.
The asmd is calculated using :func:`asmd`.
Returns 0.0 when asmd_mean_before is zero or very close to zero (< 1e-10).
"""
asmd_mean_before = asmd(
sample_before, target, sample_before_weights, target_weights
).loc["mean(asmd)"]
asmd_mean_after = asmd(
sample_after, target, sample_after_weights, target_weights
).loc["mean(asmd)"]
# Avoid division by zero when asmd_mean_before is zero or very close to zero
if np.abs(asmd_mean_before) < 1e-10:
return np.float64(0.0)
return (asmd_mean_before - asmd_mean_after) / asmd_mean_before
[docs]
def outcome_variance_ratio(
df_numerator: pd.DataFrame,
df_denominator: pd.DataFrame,
w_numerator: list[float] | pd.Series | npt.NDArray | None = None,
w_denominator: list[float] | pd.Series | npt.NDArray | None = None,
) -> pd.Series:
"""Calculate ratio of weighted variances of two DataFrames
Directly calculating the empirical ratio of variance of the outcomes before and after weighting.
Notice that this is different than design effect.
The Deff estimates the ratio of variances of the weighted means, while this function calculates the ratio
of empirical weighted variance of the data.
Args:
df_numerator (pd.DataFrame): df_numerator
df_denominator (pd.DataFrame): df_denominator
w_numerator (Union[ List, pd.Series, np.ndarray, None, ], optional): w_numerator. Defaults to None.
w_denominator (Union[ List, pd.Series, np.ndarray, None, ], optional): w_denominator. Defaults to None.
Returns:
pd.Series: (np.float64) A series of calculated ratio of variances for each outcome.
"""
numerator_w_var = weighted_var(df_numerator, w_numerator)
denominator_w_var = weighted_var(df_denominator, w_denominator)
return numerator_w_var / denominator_w_var