# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# pyre-unsafe
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
from typing import List, Literal, Optional, Tuple, Union
import numpy as np
import numpy.typing as npt
import pandas as pd
from balance.stats_and_plots.weights_stats import _check_weights_are_valid
from balance.util import model_matrix, rm_mutual_nas
from scipy.stats import norm
from statsmodels.stats.weightstats import DescrStatsW
logger: logging.Logger = logging.getLogger(__package__)
##########################################
# Weighted statistics
##########################################
def _prepare_weighted_stat_args(
v: Union[
List,
pd.Series,
pd.DataFrame,
npt.NDArray,
np.matrix,
],
w: Union[
List,
pd.Series,
npt.NDArray,
None,
] = None,
inf_rm: bool = False,
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Checks that the values (v) and weights (w) are of the supported types and of the same length. Can also replace np.Inf to np.nan.
Args:
v (Union[List, pd.Series, pd.DataFrame, np.ndarray, np.matrix,]): a series of values. Notice that pd.DataFrame and np.matrix should store the Series/np.ndarry as columns (not rows).
w (Union[List, pd.Series, np.ndarray, None]): a series of weights to be used with v (could also be none). Defaults to None.
inf_rm (bool, optional): should np.inf values be replaced by np.nan? Defaults to False.
Returns:
Tuple[pd.DataFrame, pd.Series]: the original v and w after they have been validated, and turned to pd.DataFrame and pd.Series (if needed).
The values might be int64 or float64, depending on the input.
"""
supported_types = (
list,
pd.Series,
np.ndarray,
)
tmp_supported_types = supported_types + (
pd.DataFrame,
np.matrix,
)
if type(v) not in tmp_supported_types:
raise TypeError(f"argument must be {tmp_supported_types}, is {type(v)}")
tmp_supported_types = supported_types + (type(None),)
if type(w) not in supported_types + (type(None),):
raise TypeError(f"argument must be {tmp_supported_types}, is {type(w)}")
# NOTE: np.matrix is an instance of np.ndarray, so we must turn it to pd.Dataframe before moving forward.
if isinstance(v, np.matrix):
v = pd.DataFrame(v)
if isinstance(v, np.ndarray) or isinstance(v, list):
v = pd.Series(v)
if isinstance(v, pd.Series):
v = v.to_frame()
if isinstance(w, np.ndarray) or isinstance(w, list):
w = pd.Series(w)
if w is None:
w = pd.Series(np.ones(len(v)), index=v.index)
if v.shape[0] != w.shape[0]:
raise ValueError(
f"weights (w) and data (v) must have same number of rows, ({v.shape[0]}, {w.shape[0]})"
)
dtypes = v.dtypes if hasattr(v.dtypes, "__iter__") else [v.dtypes]
if not all(np.issubdtype(x, np.number) for x in dtypes):
raise TypeError("all columns must be numeric")
if inf_rm:
v = v.replace([np.inf, -np.inf], np.nan)
w = w.replace([np.inf, -np.inf], np.nan)
v = v.reset_index(drop=True)
w = w.reset_index(drop=True)
_check_weights_are_valid(w)
return v, w
[docs]
def weighted_mean(
v: Union[
List,
pd.Series,
pd.DataFrame,
np.matrix,
],
w: Union[
List,
pd.Series,
npt.NDArray,
None,
] = None,
inf_rm: bool = False,
) -> pd.Series:
"""
Computes the weighted average of a pandas Series or DataFrame.
If no weights are supplied, it just computes the simple arithmetic mean.
See:
https://en.wikipedia.org/wiki/Weighted_arithmetic_mean
Uses :func:`_prepare_weighted_stat_args`.
Args:
v (Union[ List, pd.Series, pd.DataFrame, np.matrix, ]): values to average. None (or np.nan) values are treated like 0.
If v is a DataFrame than the average of the values from each column will be returned, all using the same set of weights from w.
w (Union[ List, pd.Series], optional): weights. Defaults to None.
If there is None value in the weights, that value will be ignored from the calculation.
inf_rm (bool, optional): whether to remove infinite (from weights or values) from the computation. Defaults to False.
Returns:
pd.Series(dtype=np.float64): The weighted mean.
If v is a DataFrame with several columns than the pd.Series will have a value for the weighted mean of each of the columns.
If inf_rm=False then:
If v has Inf then the output will be Inf.
If w has Inf then the output will be np.nan.
Examples:
::
from balance.stats_and_plots.weighted_stats import weighted_mean
weighted_mean(pd.Series((1, 2, 3, 4)))
# 0 2.5
# dtype: float64
weighted_mean(pd.Series((1, 2, 3, 4)), pd.Series((1, 2, 3, 4)))
# 0 3.0
# dtype: float64
df = pd.DataFrame(
{"a": [1,2,3,4], "b": [1,1,1,1]}
)
w = pd.Series((1, 2, 3, 4))
weighted_mean(df, w)
# a 3.0
# b 1.0
# dtype: float64
"""
v, w = _prepare_weighted_stat_args(v, w, inf_rm)
return v.multiply(w, axis="index").sum() / w.sum()
[docs]
def var_of_weighted_mean(
v: Union[List, pd.Series, pd.DataFrame, np.matrix],
w: Optional[Union[List, pd.Series, npt.NDArray]] = None,
inf_rm: bool = False,
) -> pd.Series:
"""
Computes the variance of the weighted average (pi estimator for ratio-mean) of a list of values and their corresponding weights.
If no weights are supplied, it assumes that all values have equal weights of 1.0.
See: https://en.wikipedia.org/wiki/Weighted_arithmetic_mean#Variance_of_the_weighted_mean_(%CF%80-estimator_for_ratio-mean)
Uses :func:`_prepare_weighted_stat_args`.
v (Union[List, pd.Series, pd.DataFrame, np.matrix]): A series of values. If v is a DataFrame, the weighted variance will be calculated for each column using the same set of weights from `w`.
w (Optional[Union[List, pd.Series, np.ndarray]]): A series of weights to be used with `v`. If None, all values will be weighted equally.
inf_rm (bool, optional): Whether to remove infinite (from weights or values) from the computation. Defaults to False.
Returns:
pd.Series: The variance of the weighted mean. If `v` is a DataFrame with several columns, the pd.Series will have a value for the weighted variance of each column. The values are of data type `np.float64`.
If `inf_rm` is False:
If `v` has infinite values, the output will be `Inf`.
If `w` has infinite values, the output will be `np.nan`.
Examples:
::
from balance.stats_and_plots.weighted_stats import var_of_weighted_mean
# In R: sum((1:4 - mean(1:4))^2 / 4) / (4)
# [1] 0.3125
var_of_weighted_mean(pd.Series((1, 2, 3, 4)))
# pd.Series(0.3125)
# For a reproducible R example, see: https://gist.github.com/talgalili/b92cd8cdcbfc287e331a8f27db265c00
var_of_weighted_mean(pd.Series((1, 2, 3, 4)), pd.Series((1, 2, 3, 4)))
# pd.Series(0.24)
df = pd.DataFrame(
{"a": [1,2,3,4], "b": [1,1,1,1]}
)
w = pd.Series((1, 2, 3, 4))
var_of_weighted_mean(df, w)
# a 0.24
# b 0.00
# dtype: float64
"""
v, w = _prepare_weighted_stat_args(v, w, inf_rm)
weighed_mean_of_v = weighted_mean(v, w, inf_rm) # This is a pd.Series
# NOTE: the multiply needs to be called from the pd.Series. Unfortunately this makes the code less readable.
sum_of_squared_weighted_diffs = (
((v - weighed_mean_of_v).multiply(w, axis="index")).pow(2).sum()
)
squared_sum_of_w = w.sum() ** 2 # This is a pd.series
return sum_of_squared_weighted_diffs / squared_sum_of_w
[docs]
def ci_of_weighted_mean(
v: Union[List[float], pd.Series, pd.DataFrame, npt.NDArray],
w: Optional[Union[List[float], pd.Series, npt.NDArray]] = None,
conf_level: float = 0.95,
round_ndigits: Optional[int] = None,
inf_rm: bool = False,
) -> pd.Series:
"""
Computes the confidence interval of the weighted mean of a list of values and their corresponding weights.
If no weights are supplied, it assumes that all values have equal weights of 1.0.
v (Union[List[float], pd.Series, pd.DataFrame, np.ndarray]): A series of values. If v is a DataFrame, the weighted mean and its confidence interval will be calculated for each column using the same set of weights from `w`.
w (Optional[Union[List[float], pd.Series, np.ndarray]]): A series of weights to be used with `v`. If None, all values will be weighted equally.
conf_level (float, optional): Confidence level for the interval, between 0 and 1. Defaults to 0.95.
round_ndigits (Optional[int], optional): Number of decimal places to round the confidence interval. If None, the values will not be rounded. Defaults to None.
inf_rm (bool, optional): Whether to remove infinite (from weights or values) from the computation. Defaults to False.
Returns:
pd.Series: The confidence interval of the weighted mean. If `v` is a DataFrame with several columns, the pd.Series will have a value for the confidence interval of each column. The values are of data type Tuple[np.float64, np.float64].
If `inf_rm` is False:
If `v` has infinite values, the output will be `Inf`.
If `w` has infinite values, the output will be `np.nan`.
Examples:
::
from balance.stats_and_plots.weighted_stats import ci_of_weighted_mean
ci_of_weighted_mean(pd.Series((1, 2, 3, 4)))
# 0 (1.404346824279273, 3.5956531757207273)
# dtype: object
ci_of_weighted_mean(pd.Series((1, 2, 3, 4)), round_ndigits = 3).to_list()
# [(1.404, 3.596)]
ci_of_weighted_mean(pd.Series((1, 2, 3, 4)), pd.Series((1, 2, 3, 4)))
# 0 (2.039817664728938, 3.960182335271062)
# dtype: object
ci_of_weighted_mean(pd.Series((1, 2, 3, 4)), pd.Series((1, 2, 3, 4)), round_ndigits = 3).to_list()
# [(2.04, 3.96)]
df = pd.DataFrame(
{"a": [1,2,3,4], "b": [1,1,1,1]}
)
w = pd.Series((1, 2, 3, 4))
ci_of_weighted_mean(df, w, conf_level = 0.99, round_ndigits = 3)
# a (1.738, 4.262)
# b (1.0, 1.0)
# dtype: object
ci_of_weighted_mean(df, w, conf_level = 0.99, round_ndigits = 3).to_dict()
# {'a': (1.738, 4.262), 'b': (1.0, 1.0)}
"""
v, w = _prepare_weighted_stat_args(v, w, inf_rm)
weighed_mean_of_v = weighted_mean(v, w, inf_rm)
var_weighed_mean_of_v = var_of_weighted_mean(v, w, inf_rm)
z_value = norm.ppf((1 + conf_level) / 2)
if isinstance(v, pd.Series):
ci_index = v.index
elif isinstance(v, pd.DataFrame):
ci_index = v.columns
else:
ci_index = None
ci = pd.Series(
[
(
weighed_mean_of_v[i] - z_value * np.sqrt(var_weighed_mean_of_v[i]),
weighed_mean_of_v[i] + z_value * np.sqrt(var_weighed_mean_of_v[i]),
)
for i in range(len(weighed_mean_of_v))
],
index=ci_index,
)
if round_ndigits is not None:
# Apply a lambda function to round a pd.Series of tuples to x decimal places
ci = ci.apply(lambda t: tuple(round(x, round_ndigits) for x in t))
# pyre-ignore
return ci
[docs]
def weighted_var(
v: Union[
List,
pd.Series,
pd.DataFrame,
npt.NDArray,
np.matrix,
],
w: Union[
List,
pd.Series,
npt.NDArray,
None,
] = None,
inf_rm: bool = False,
) -> pd.Series:
"""
Calculate the sample weighted variance (a.k.a 'reliability weights').
This is described here:
https://en.wikipedia.org/wiki/Weighted_arithmetic_mean#Reliability_weights_2
And also used in SDMTools, see:
https://www.gnu.org/software/gsl/doc/html/statistics.html#weighted-samples
Uses :func:`weighted_mean` and :func:`_prepare_weighted_stat_args`.
Args:
v (Union[ List, pd.Series, pd.DataFrame, np.matrix, ]): values to get the weighted variance for. None values are treated like 0.
If v is a DataFrame than the average of the values from each column will be returned, all using the same set of weights from w.
w (Union[ List, pd.Series], optional): weights. Defaults to None.
If there is None value in the weights, that value will be ignored from the calculation.
inf_rm (bool, optional): whether to remove infinite (from weights or values) from the computation. Defaults to False.
Returns:
pd.Series[np.float64]: The weighted variance.
If v is a DataFrame with several columns than the pd.Series will have a value for the weighted mean of each of the columns.
If inf_rm=False then:
If v has Inf then the output will be Inf.
If w has Inf then the output will be np.nan.
"""
v, w = _prepare_weighted_stat_args(v, w, inf_rm)
means = weighted_mean(v, w)
v1 = w.sum()
v2 = (w**2).sum()
return (v1 / (v1**2 - v2)) * ((v - means) ** 2).multiply(w, axis="index").sum()
[docs]
def weighted_sd(
v: Union[
List,
pd.Series,
pd.DataFrame,
np.matrix,
],
w: Union[
List,
pd.Series,
npt.NDArray,
None,
] = None,
inf_rm: bool = False,
) -> pd.Series:
"""Calculate the sample weighted standard deviation
See :func:`weighted_var` for details.
Args:
v (Union[ List, pd.Series, pd.DataFrame, np.matrix, ]): Values.
w (Union[ List, pd.Series, np.ndarray, None, ], optional): Weights. Defaults to None.
inf_rm (bool, optional): Remove inf. Defaults to False.
Returns:
pd.Series: np.sqrt of :func:`weighted_var` (np.float64)
"""
return pd.Series(np.sqrt(weighted_var(v, w, inf_rm)))
[docs]
def weighted_quantile(
v: Union[
List,
pd.Series,
pd.DataFrame,
npt.NDArray,
np.matrix,
],
quantiles: Union[
List,
pd.Series,
npt.NDArray,
],
w: Union[
List,
pd.Series,
npt.NDArray,
None,
] = None,
inf_rm: bool = False,
) -> pd.DataFrame:
"""
Calculates the weighted quantiles (q) of values (v) based on weights (w).
See :func:`_prepare_weighted_stat_args` for the pre-processing done to v and w.
Based on :func:`statsmodels.stats.weightstats.DescrStatsW`.
Args:
v (Union[ List, pd.Series, pd.DataFrame, np.array, np.matrix, ]): values to get the weighted quantiles for.
quantiles (Union[ List, pd.Series, ]): the quantiles to calculate.
w (Union[ List, pd.Series, np.array, ] optional): weights. Defaults to None.
inf_rm (bool, optional): whether to remove infinite (from weights or values) from the computation. Defaults to False.
Returns:
pd.DataFrame: The index (names p) has the values from quantiles. The columns are based on v:
If it's a pd.Series it's one column, if it's a pd.DataFrame with several columns, than each column
in the output corrosponds to the column in v.
"""
v, w = _prepare_weighted_stat_args(v, w, inf_rm)
v = np.array(v)
w = np.array(w)
quantiles = np.array(quantiles)
d = DescrStatsW(v.astype(float), weights=w)
return d.quantile(quantiles)
[docs]
def descriptive_stats(
df: pd.DataFrame,
weights: Union[
List,
pd.Series,
npt.NDArray,
None,
] = None,
stat: Literal["mean", "std", "var_of_mean", "ci_of_mean", "..."] = "mean",
# relevant only if stat is None
weighted: bool = True,
# relevant only if we have non-numeric columns and we want to use model_matrix on them
numeric_only: bool = False,
add_na: bool = True,
**kwargs,
) -> pd.DataFrame:
"""Computes weighted statistics (e.g.: mean, std) on a DataFrame
This function gets a DataFrame + weights and apply some weighted aggregation function (mean, std, or DescrStatsW).
The main benefit of the function is that if the DataFrame includes non-numeric columns, then descriptive_stats will first
run :func:`model_matrix` to create some numeric dummary variable that will then be processed.
Args:
df (pd.DataFrame): Some DataFrame to get stats (mean, std, etc.) for.
weights (Union[ List, pd.Series, np.ndarray, ], optional): Weights to apply for the computation. Defaults to None.
stat (Literal["mean", "std", "var_of_mean", ...], optional): Which statistic to calculate on the data.
If mean - uses :func:`weighted_mean` (with inf_rm=True)
If std - uses :func:`weighted_sd` (with inf_rm=True)
If var_of_mean - uses :func:`var_of_weighted_mean` (with inf_rm=True)
If ci_of_mean - uses :func:`ci_of_weighted_mean` (with inf_rm=True)
If something else - tries to use :func:`statsmodels.stats.weightstats.DescrStatsW`.
This supports stat such as: std_mean, sum_weights, nobs, etc. See function documentation to see more.
(while removing mutual nan using :func:`rm_mutual_nas`)
Defaults to "mean".
weighted (bool, optional): If stat is not "mean" or "std", if to use the weights with the DescrStatsW function.
Defaults to True.
numeric_only (bool, optional): Should the statistics be computed only on numeric columns?
If True - then non-numeric columns will be omitted.
If False - then :func:`model_matrix` (with no formula argument) will be used to transfer non-numeric columns to dummy variables.
Defaults to False.
add_na (bool, optional): Passed to :func:`model_matrix`.
Relevant only if numeric_only == False and df has non-numeric columns.
Defaults to True.
**kwargs: extra args to be passed to functions (e.g.: ci_of_weighted_mean)
Returns:
pd.DataFrame: Returns pd.DataFrame of the output (based on stat argument), for each of the columns in df.
Examples:
::
import numpy as np
import pandas as pd
from balance.stats_and_plots.weighted_stats import descriptive_stats, weighted_mean, weighted_sd
# Without weights
x = [1, 2, 3, 4]
print(descriptive_stats(pd.DataFrame(x), stat="mean"))
print(np.mean(x))
print(weighted_mean(x))
# 0
# 0 2.5
# 2.5
# 0 2.5
# dtype: float64
print(descriptive_stats(pd.DataFrame(x), stat="var_of_mean"))
# 0
# 0 0.3125
print(descriptive_stats(pd.DataFrame(x), stat="std"))
print(weighted_sd(x))
x2 = pd.Series(x)
print(np.sqrt( np.sum((x2 - x2.mean())**2) / (len(x)-1) ))
# 0
# 0 1.290994
# 0 1.290994
# dtype: float64
# 1.2909944487358056
# Notice that it is different from
# print(np.std(x))
# which gives: 1.118033988749895
# Which is the MLE (i.e.: biased, dividing by n and not n-1) estimator for std:
# (np.sqrt( np.sum((x2 - x2.mean())**2) / (len(x)) ))
x2 = pd.Series(x)
tmp_sd = np.sqrt(np.sum((x2 - x2.mean()) ** 2) / (len(x) - 1))
tmp_se = tmp_sd / np.sqrt(len(x))
print(descriptive_stats(pd.DataFrame(x), stat="std_mean").iloc[0, 0])
print(tmp_se)
# 0.6454972243679029
# 0.6454972243679028
# Weighted results
x, w = [1, 2, 3, 4], [1, 2, 3, 4]
print(descriptive_stats(pd.DataFrame(x), w, stat="mean"))
# 0
# 0 3.0
print(descriptive_stats(pd.DataFrame(x), w, stat="std"))
# 0
# 0 1.195229
print(descriptive_stats(pd.DataFrame(x), w, stat="std_mean"))
# 0
# 0 0.333333
print(descriptive_stats(pd.DataFrame(x), w, stat="var_of_mean"))
# 0
# 0 0.24
print(descriptive_stats(pd.DataFrame(x), w, stat="ci_of_mean", conf_level = 0.99, round_ndigits=3))
# 0
# 0 (1.738, 4.262)
"""
if len(df.select_dtypes(np.number).columns) != len(df.columns):
# If we have non-numeric columns, and want faster results,
# then we can set numeric_only == True.
# This will skip the model_matrix computation for non-numeric variables.
if numeric_only:
# TODO: (p2) does this check takes a long time?
# if it does - then maybe add an option of numeric_only = None
# to just use df as is.
df = df.select_dtypes(include=[np.number])
else:
# TODO: add the ability to pass formula argument to model_matrix
df = model_matrix( # pyre-ignore[9]: this uses the DataFrame onlyÍ
df, add_na=add_na, return_type="one"
)["model_matrix"]
if stat == "mean":
return weighted_mean(df, weights, inf_rm=True).to_frame().transpose()
elif stat == "std":
return weighted_sd(df, weights, inf_rm=True).to_frame().transpose()
elif stat == "var_of_mean":
return var_of_weighted_mean(df, weights, inf_rm=True).to_frame().transpose()
elif stat == "ci_of_mean":
conf_level = kwargs.get("conf_level", 0.95)
round_ndigits = kwargs.get("round_ndigits", None)
return (
ci_of_weighted_mean(
df,
weights,
inf_rm=True,
conf_level=conf_level,
round_ndigits=round_ndigits,
)
.to_frame()
.transpose()
)
# TODO: (p2) check which input DescrStatsW takes, not sure we need to run the next two lines.
if weights is not None:
weights = pd.Series(weights)
# TODO: (p2) consider to remove the "weighted" argument, and just use
# whatever is passed to "weights" (if None, than make sure it's replaced with 1s)
# Fallback to statsmodels function
_check_weights_are_valid(weights)
wdf = {}
for c in df.columns.values:
df_c, w = rm_mutual_nas(df.loc[:, c], weights)
# pyre-fixme[16]: `DescrStatsW` has no attribute `...`.
wdf[c] = [getattr(DescrStatsW(df_c, w if weighted else None), stat)]
return pd.DataFrame(wdf)
[docs]
def relative_frequency_table(
df: Union[pd.DataFrame, pd.Series],
column: Optional[str] = None,
w: Optional[pd.Series] = None,
) -> pd.DataFrame:
"""Creates a relative frequency table by aggregating over a categorical column (`column`) - optionally weighted by `w`.
I.e.: produce the proportion (or weighted proportion) of rows that appear in each category, relative to the total number of rows (or sum of weights).
See: https://en.wikipedia.org/wiki/Frequency_(statistics)#Types.
Used in plotting functions.
Args:
df (pd.DataFrame): A DataFrame with categorical columns, or a pd.Series of the grouping column.
column (Optional[str]): The name of the column to be aggregated.
If None (default), then it takes the first column of df (if pd.DataFrame), or just uses as is (if pd.Series)
w (Optional[pd.Series], optional): Optional weights to use when aggregating the relative proportions.
If None than assumes weights is 1 for all rows. The defaults is None.
Returns:
pd.DataFrame: a pd.DataFrame with columns:
- `column`, the aggregation variable, and,
- 'prop', the aggregated (weighted) proportion of rows from each group in 'column'.
Examples:
::
from balance.stats_and_plots.weighted_stats import relative_frequency_table
import pandas as pd
df = pd.DataFrame({
'group': ('a', 'b', 'c', 'c'),
'v1': (1, 2, 3, 4),
})
print(relative_frequency_table(df, 'group'))
# group prop
# 0 a 0.25
# 1 b 0.25
# 2 c 0.50
print(relative_frequency_table(df, 'group', pd.Series((2, 1, 1, 1),)))
# group prop
# 0 a 0.4
# 1 b 0.2
# 2 c 0.4
# Using a pd.Series:
a_series = df['group']
print(relative_frequency_table(a_series))
# group prop
# 0 a 0.25
# 1 b 0.25
# 2 c 0.50
"""
_check_weights_are_valid(w)
if not (w is None or isinstance(w, pd.Series)):
raise TypeError("argument `w` must be a pandas Series or None")
if w is None:
w = pd.Series(np.ones(df.shape[0]))
w = w.rename("Freq")
if column is None:
if isinstance(df, pd.DataFrame):
column = df.columns.values[0]
elif isinstance(df, pd.Series):
if df.name is None:
df.name = "group"
column = df.name
else:
raise TypeError("argument `df` must be a pandas DataFrame or Series")
relative_frequency_table_data = pd.concat((df, w), axis=1)
relative_frequency_table_data = relative_frequency_table_data.groupby(
column, as_index=False
).sum()
relative_frequency_table_data["prop"] = relative_frequency_table_data["Freq"] / sum(
relative_frequency_table_data["Freq"]
)
return relative_frequency_table_data.loc[:, (column, "prop")]