Source code for weighted_sample_statistics.core

"""
Definition of weighted_sample_statistics class to calculate weighted weighted_sample_statistics
"""

import logging
import re
from typing import Iterable, Optional, Union

import numpy as np
from pandas import DataFrame

DataFrameType = Union[DataFrame, None]

logger = logging.getLogger(__name__)


[docs] def make_negation_name(column_name: str, suffix: str = "_x") -> str: """Make a new column name for complementary values. Returns ------- negation_name : str """ negation_name = re.sub(r"_\d\.\d$", "", column_name) + suffix if re.search(r"_\d\.\d$", column_name): negation_name += re.search(r"_\d\.\d$", column_name).group() return negation_name
[docs] class WeightedSampleStatistics: """ Calculate weighted_sample_statistics for summations Parameters ---------- group_keys: iterable The variables to use to group records_df_selection: DataFrame All the microdata including non-response weights_df: DataFrame The weights per unit all_records_df: DataFrame All the microdata including non-response column_list: iterable list of columns to calculate weighted_sample_statistics scaling_factor_key: str Name of the weight variable var_type: str Type of the data add_inverse: bool Add the negated value as well for booleans report_numbers: bool Do not calculate the average, but the sum Attributes ---------- records_sum: grouped The summation of the weighted values number_samples_sqrt: grouped The square root of the sample size n standard_error: grouped The standard error of the mean estimate: std / n_sqrt """ def __init__( self, group_keys: Iterable, records_df_selection: DataFrame, weights_df: DataFrame, column_list: Optional[Iterable] = None, var_type: Optional[str] = None, scaling_factor_key: Optional[str] = None, units_scaling_factor_key: Optional[str] = None, all_records_df: Optional[DataFrame] = None, var_weight_key: Optional[str] = None, variance_df_selection: Optional[DataFrame] = None, records_df_unfilled: Optional[DataFrame] = None, add_inverse: bool = False, report_numbers: bool = False, negation_suffix: Optional[str] = None, start: bool = False, ) -> None: """ Initialize the WeightedSampleStatistics object. This object is used to calculate weighted sample statistics for a given DataFrame. The object can be initialized with a set of records, weights, and a list of columns to calculate statistics for. The object can also be initialized with a set of all records, including non-response, and a variable weight. The object can also be initialized with a set of variance selection data and a set of unfilled records. Parameters ---------- group_keys : Iterable The variables to use for grouping. records_df_selection : DataFrame The DataFrame containing the selected records. weights_df : DataFrame The DataFrame containing the weights per unit. column_list : Iterable, optional List of columns to calculate statistics for. Default is None. var_type : str, optional The type of the data. Default is None. scaling_factor_key : str, optional The key for the scaling factor. Default is None. units_scaling_factor_key : str, optional The key for the unit scaling factor. Default is None. all_records_df : DataFrame, optional DataFrame containing all records, including non-response. Default is None. var_weight_key : str, optional The key for the variable weight. Default is None. variance_df_selection : DataFrame, optional DataFrame containing variance selection data. Default is None. records_df_unfilled : DataFrame, optional DataFrame containing unfilled records. Default is None. add_inverse : bool, optional Whether to add the negated value for booleans. Default is False. report_numbers : bool, optional Whether to report numbers instead of calculating the average. Default is False. negation_suffix : str, optional Suffix to use for negated values. Default is "_x". start : bool, optional Whether to start calculations immediately. Default is False. """ # Initialize instance variables self.group_keys = group_keys self.records_df_selection = records_df_selection self.weights_df = weights_df self.column_list = column_list or [] self.var_type = var_type self.scaling_factor_key = scaling_factor_key self.units_scaling_factor_key = units_scaling_factor_key self.all_records_df = all_records_df self.var_weight_key = var_weight_key self.variance_df_selection = variance_df_selection self.records_df_unfilled = records_df_unfilled self.add_inverse = add_inverse self.report_numbers = report_numbers self.negation_suffix = negation_suffix or "_x" self.group_keys = group_keys self.records_df_selection = records_df_selection self.weights_df = weights_df self.column_list = column_list or [] self.var_type = var_type self.scaling_factor_key = scaling_factor_key self.units_scaling_factor_key = units_scaling_factor_key self.all_records_df = all_records_df self.var_weight_key = var_weight_key self.variance_df_selection = variance_df_selection self.records_df_unfilled = records_df_unfilled self.add_inverse = add_inverse self.report_numbers = report_numbers self.negation_suffix = negation_suffix or "_x" self.weights_sel_sum_df = None self.scale_variabele_sel_grp = None self.scale_variabele_pop_grp = None self.var_weight_sel_grp = None self.var_weight_pop_grp = None self.unit_weights_pop_grp = None self.unit_weights_sel_grp = None self.weights_sel_grp = None self.weights_grp = None self.all_records_grp = None self.variance_sel_grp = None self.records_valid_grp = None self.records_sel_grp = None self.unit_weights_pop_grp = None self.unit_weights_sel_grp = None self.weights_sel_grp = None self.weights_grp = None self.all_records_grp = None self.variance_sel_grp = None self.records_valid_grp = None self.records_sel_grp = None self.weights = None self.records_df_valid = None self.var_weight_sel_df = None self.scale_variabele_sel_df = None self.var_weight_pop_df = None self.scale_variabele_pop_df = None self.unit_weights_sel_df = None self.unit_weights_pop_df = None self.weights_sel_df = None self.response_fraction = None self.weights = None self.records_df_valid = None self.var_weight_sel_df = None self.scale_variabele_sel_df = None self.var_weight_pop_df = None self.scale_variabele_pop_df = None self.unit_weights_sel_df = None self.unit_weights_pop_df = None self.weights_sel_df = None self.standard_error = None self.number_samples_sqrt = None self.n_sample = None self.records_std_df = None self.records_std_agg = None self.records_var_agg = None self.records_var_df = None self.proportion_pop_grp = None self.proportion_pop_mean_agg = None self.proportion_sel_grp = None self.proportion_sel_mean_agg = None self.proportion_weighted_pop_df = None self.number_ratio = None self.records_norm_sel_df = None self.proportion_weighted_sel_df = None self.proportion_pop_df = None self.records_norm_pop_df = None self.proportion_sel_df = None self.records_sum = None self.response_count = None self.records_weighted_conditional_mean_agg = None self.records_weighted_mean_agg = None self.sample_count_initial = None self.records_weighted_sel_mean_df = None self.records_weighted_pop_mean_df = None self.records_weighted_pop_grp = None self.records_weighted_pop_df = None self.records_weighted_sel_df = None self.weights_pop_normalized_df = None self.unit_weights_pop_sum_agg = None self.var_weights_pop_sum_agg = None self.var_weights_pop_sum_df = None self.weights_sel_normalized_df = None self.records_weighted_sel_grp = None self.var_weights_sel_sum_df = None self.unit_weights_pop_sum_df = None self.unit_weights_sel_sum_agg = None self.unit_weights_sel_sum_df = None self.var_weights_sel_sum_agg = None self.weights_pop_sum_agg = None self.weights_sel_sum_agg = None self.weights_pop_sum_df = None # If start is True, begin calculations immediately if start: self.calculate()
[docs] def calculate(self) -> None: """ Perform all calculations required for weighted sample statistics. This method orchestrates the sequence of calculations necessary to determine weighted means, proportions, and standard errors. It also calculates the response fraction if all records are provided. Returns ------- None """ # Set the mask for valid data entries self.set_mask_valid_df() # Scale the variables based on the provided scaling factors self.scale_variables() # Group the variables as per the specified grouping keys self.group_variables() # Calculate the weighted means for the records self.calculate_weighted_means() # If all records are available, calculate the response fraction if self.all_records_df is not None: self.calculate_response_fraction() # Calculate the proportions for the selected and population data self.calculate_proportions() # Calculate the standard errors for the estimates self.calculate_standard_errors()
[docs] def scale_variables(self): """Scale the variables with the scaling factor. This function scales the variables by the scaling factor. """ logger.debug(f"Scaling variables with {self.scaling_factor_key}") # The weights are the scaling factors self.weights = self.weights_df.loc[:, self.scaling_factor_key] # The unit weights are the scaling factors for the population self.unit_weights_pop_df = self.weights_df.loc[:, self.units_scaling_factor_key] # The fixed variables are the scaling variables fixed = set(list([self.var_weight_key, self.scaling_factor_key])) # Check if the variable to process (stored in the column list) is a scaling variable if set(self.column_list).intersection(fixed): # In case the variable to process (stored in the column list) is a scaling variable, # we do not scale it, so set the weights to 1 self.weights.values[:] = 1.0 # The weights for the selection are the same as the weights self.weights_sel_df = self.weights.reindex( self.records_df_selection.index ).astype(float) # The unit weights for the selection are the same as the unit weights self.unit_weights_sel_df = self.unit_weights_pop_df.reindex( self.records_df_selection.index ).astype(float) # The scale variable for the population is the variable to be scaled self.scale_variabele_pop_df = self.weights_df[self.var_weight_key].astype(float) # The scale variable for the selection is the same as the scale variable for the population self.scale_variabele_sel_df = self.scale_variabele_pop_df.reindex( self.records_df_selection.index ).astype(float) # Now we have the numerator, we also need to scale the denominator. # The variable to be scaled for the population is the scaling factor times the variable self.var_weight_pop_df = ( self.weights_df[self.scaling_factor_key] * self.scale_variabele_pop_df ) self.var_weight_pop_df = self.var_weight_pop_df.astype(float) # The variable to be scaled for the selection is the same as the variable to be scaled # for the population self.var_weight_sel_df = self.var_weight_pop_df.reindex( self.records_df_selection.index ) self.var_weight_sel_df = self.var_weight_sel_df.astype(float)
[docs] def set_mask_valid_df(self): """Set mask valid df This function sets the mask for the valid records for the selected variables. This mask is used to select the valid records from the dataframe of the population. Returns ------- None """ logger.debug("Setting mask for valid records") if self.records_df_unfilled is not None: logger.debug("Set mask for valid records to True if not NaN") self.records_df_valid = ~self.records_df_unfilled.isna() try: logger.debug("Select only columns in self.column_list") self.records_df_valid = self.records_df_valid[self.column_list] except KeyError: logger.debug("No valid columns found in records_df_unfilled") col = re.sub(r"_\d\.\d", "", self.column_list[0]) try: logger.debug("Select only column in self.column_list") self.records_df_valid = self.records_df_valid[col] except KeyError: logger.debug("No valid columns found in records_df_unfilled") self.records_df_valid = None
[docs] def group_variables(self): """Group the variables according to the group keys. This function groups the variables and the weights according to the specified group keys. The grouped variables are stored as attributes of the class for later use. Returns ------- None """ logger.debug(f"Grouping variables with {self.group_keys}") # Group the records according to the group keys self.records_sel_grp = self.records_df_selection.groupby(self.group_keys) # Group the variance (if it is present) if self.variance_df_selection is not None: self.variance_sel_grp = self.variance_df_selection.groupby(self.group_keys) # Group the valid records if self.records_df_valid is not None: self.records_valid_grp = self.records_df_valid.groupby(self.group_keys) # Group the all records (if it is present) if self.all_records_df is not None: # Add the variable weight to the all records if it is not present if self.var_weight_key not in self.all_records_df.columns: self.all_records_df[self.var_weight_key] = 1 # Group the all records self.all_records_grp = self.all_records_df[self.var_weight_key].groupby( self.group_keys ) # Group the weights self.weights_grp = self.weights.groupby(self.group_keys) self.weights_sel_grp = self.weights_sel_df.groupby(self.group_keys) # Group the unit weights self.unit_weights_sel_grp = self.unit_weights_sel_df.groupby(self.group_keys) self.unit_weights_pop_grp = self.unit_weights_pop_df.groupby(self.group_keys) # Group the variable weights self.var_weight_pop_grp = self.var_weight_pop_df.groupby(self.group_keys) self.var_weight_sel_grp = self.var_weight_sel_df.groupby(self.group_keys) # Group the scaled variables self.scale_variabele_pop_grp = self.scale_variabele_pop_df.groupby( self.group_keys ) self.scale_variabele_sel_grp = self.scale_variabele_sel_df.groupby( self.group_keys )
[docs] def calculate_weighted_means(self): """Calculate summed weighted statistics for the selected columns. This method calculates the weighted sums and means for the selected columns in the dataset. It normalizes weights, applies them to the records, and handles special cases such as empty selections and negation of values. Returns ------- None """ logger.debug( f"Start calculation summed weighted_sample_statistics for {self.column_list}" ) if "omzet_enq" in self.column_list: logger.debug("Stop hier") return # Calculate the sum of weights for selection and population self.weights_sel_sum_df = self.weights_sel_grp.transform("sum") self.weights_pop_sum_df = self.weights_grp.transform("sum") self.weights_sel_sum_agg = self.weights_sel_grp.sum() self.weights_pop_sum_agg = self.weights_grp.sum() # Calculate the sum of unit weights for selection and population self.unit_weights_sel_sum_df = self.unit_weights_sel_grp.transform("sum") self.unit_weights_pop_sum_df = self.unit_weights_pop_grp.transform("sum") # Calculate the sum of variable weights for selection and population self.var_weights_sel_sum_df = self.var_weight_sel_grp.transform("sum") self.var_weights_pop_sum_df = self.var_weight_pop_grp.transform("sum") self.var_weights_sel_sum_agg = self.var_weight_sel_grp.sum() self.var_weights_pop_sum_agg = self.var_weight_pop_grp.sum() # Aggregate sum of unit weights for selection and population self.unit_weights_sel_sum_agg = self.unit_weights_sel_grp.sum() self.unit_weights_pop_sum_agg = self.unit_weights_pop_grp.sum() # Normalize weights with selection sums logger.debug(f"Normalizing weights with sums in selection") self.weights_sel_normalized_df = self.weights.div( self.weights_sel_sum_df, axis="index" ) self.weights_sel_normalized_df = self.weights_sel_normalized_df.reindex( self.weights_sel_sum_df.index ) # Normalize weights with population sums logger.debug(f"Normalizing weights with sums in population") self.weights_pop_normalized_df = self.weights.div( self.weights_pop_sum_df, axis="index" ) self.weights_pop_normalized_df = self.weights_pop_normalized_df.reindex( self.weights_pop_normalized_df.index ) # Apply normalized weights to records logger.debug(f"Applying weights to records") self.records_weighted_sel_df = self.records_df_selection.mul( self.weights_sel_normalized_df, axis="index" ) self.records_weighted_pop_df = self.records_df_selection.mul( self.weights_pop_normalized_df, axis="index" ) # Group weighted records by selection and population self.records_weighted_sel_grp = self.records_weighted_sel_df.groupby( self.group_keys ) self.records_weighted_pop_grp = self.records_weighted_pop_df.groupby( self.group_keys ) # Transform to get summed weighted means self.records_weighted_sel_mean_df = self.records_weighted_sel_grp.transform( "sum" ) self.records_weighted_pop_mean_df = self.records_weighted_pop_grp.transform( "sum" ) # Aggregate weighted means logger.debug(f"Calculating weighted means") self.records_weighted_mean_agg = self.records_weighted_pop_grp.sum() self.records_weighted_conditional_mean_agg = self.records_weighted_sel_grp.sum() # Calculate the sum of conditional weighted means logger.debug(f"Calculating conditional weighted means") self.records_sum = self.records_weighted_conditional_mean_agg.mul( self.weights_sel_sum_agg, axis="index" ) # Handle NaN values by filling them with 0 logger.debug(f"Fill NaN's with 0") self.records_sum = self.records_sum.astype(float).fillna(0) # Add negated values if required if self.add_inverse: for col_name in self.records_sum: new_col = make_negation_name(col_name, self.negation_suffix) logger.debug(f"Creating new negated column {new_col}") filter_sum = self.var_weights_sel_sum_agg.reindex( self.records_sum.index ).fillna(0) self.records_sum[new_col] = filter_sum - self.records_sum[col_name] # Count the response for each group self.response_count = self.weights_grp.count() # Convert to percentage if variable type is boolean or dictionary if self.var_type in ("bool", "dict"): self.records_weighted_mean_agg *= 100 self.records_weighted_conditional_mean_agg *= 100
[docs] def calculate_response_fraction(self): """Calculate response fraction""" logger.debug("Calculating the response fractions") self.sample_count_initial = self.all_records_grp.count() if self.records_valid_grp is not None: valid_vals = self.records_valid_grp.sum() else: valid_vals = self.response_count response_series = 100 * valid_vals.div(self.sample_count_initial, axis="index") # turn the response series into a dataframe with the same number of columns and column # names as the mean for col_name in self.column_list: try: response_df: DataFrameType = response_series.to_frame(name=col_name) except AttributeError: logger.debug("Failed to transfer to frame. It is a frame already") response_df = response_series if self.response_fraction is None: self.response_fraction = response_df else: self.response_fraction = self.response_fraction.join(response_df)
[docs] def calculate_proportions(self): """Calculate proportions""" logger.debug("Calculating the proportions") # normaliseer de records variable met de schaalfactor self.records_norm_pop_df = self.records_df_selection.div( self.scale_variabele_pop_df, axis="index" ) self.records_norm_pop_df.clip(lower=0, upper=1, inplace=True) self.records_norm_sel_df = self.records_norm_pop_df.reindex( self.records_df_selection.index ) sum_unit_weight = self.unit_weights_sel_sum_df sum_weight = self.var_weights_sel_sum_df try: self.number_ratio = sum_unit_weight.div(sum_weight, axis="index") except AttributeError as err: logger.warning(f"{err}") return self.proportion_sel_df = 100 * self.records_norm_sel_df self.proportion_pop_df = 100 * self.records_norm_pop_df self.proportion_weighted_sel_df = self.proportion_sel_df.mul( self.weights_sel_normalized_df, axis="index" ) self.proportion_weighted_pop_df = self.proportion_pop_df.mul( self.weights_pop_normalized_df, axis="index" ) self.proportion_sel_grp = self.proportion_weighted_sel_df.groupby( self.group_keys ) self.proportion_pop_grp = self.proportion_weighted_pop_df.groupby( self.group_keys ) # You can show that this proportion (calculated from the average of the fractions between # 0 and 100 %) is mathematically different from the sum of the elements divided by the sum of the # total. # To keep the output consistent, we simply print the last one. # But we still use the fraction to calculate the standard error self.proportion_sel_mean_agg = 100 * self.records_sum.div( self.var_weights_sel_sum_agg, axis="index" ) self.proportion_pop_mean_agg = 100 * self.records_sum.div( self.var_weights_pop_sum_agg, axis="index" )
[docs] def calculate_standard_errors(self): """Calculate standard errors""" logger.debug("Calculating the standard errors") if self.variance_df_selection is None: # for the first round (on the smallest strata, calculate the standard deviation based # on the microdata sum_i (w_i * (x_i - x_mean)**2) # where w_i are the normalized weight for which by definition: sum_i w_i = 1 try: mean_proportion = self.proportion_pop_grp.transform("sum") except AttributeError as err: logger.warning(err) return proportion_minus_mean = self.proportion_pop_df - mean_proportion proportion_squared = np.square(proportion_minus_mean) proportion_squared_sel = proportion_squared.reindex( self.records_df_selection.index ) records_var = proportion_squared_sel.mul( self.weights_pop_normalized_df, axis="index" ) elif self.weights_sel_normalized_df is not None: # for the compound breakdowns, us the variances from the first round and multiply # with w_i**2 number_of_nans = self.weights_sel_normalized_df.isna().sum() if number_of_nans > 0: logger.info(f"Weights contain {number_of_nans} nans. Filling with 0") self.weights_sel_normalized_df = self.weights_sel_normalized_df.fillna( 0 ) weights_sel_normalized_df_squared = np.square( self.weights_sel_normalized_df ) records_square = self.variance_df_selection.astype(float) try: records_var = records_square.mul( weights_sel_normalized_df_squared, axis="index" ) except TypeError as err: logger.warning(err) return else: logger.info("We have variances but no weights. Use weight factor 1") records_var = self.variance_df_selection.astype(float) records_var_grp = records_var.groupby(self.group_keys) self.records_var_df = records_var_grp.transform("sum") self.records_var_agg = records_var_grp.sum() self.records_std_df = self.records_var_df.pow(0.5) self.records_std_agg = self.records_var_agg.pow(0.5) if self.variance_df_selection is None: # for the first round when we calculated the standard dev with the sum (x_i - xm)**2 # you have to divide by sqrt(n) and multiply with the fpc self.n_sample = self.records_sel_grp.count() self.number_samples_sqrt = np.sqrt(self.n_sample) ratio = self.n_sample.div(self.weights_sel_sum_agg, axis="index") ratio[ratio > 1] = 1 fpc = np.sqrt(1 - ratio) try: self.standard_error = self.records_std_agg.div( self.number_samples_sqrt, axis="index" ) except ZeroDivisionError as err: logger.warning(f"{err}") return self.standard_error = self.standard_error.mul(fpc, axis="index") else: # We got the standard error from the compound standard deviations. # No need to divide by sqrt(n), as we used the w_i**2 terms already self.standard_error = self.records_std_agg