Source code for causalkit.eda.eda

"""EDA utilities for causal analysis (propensity, overlap, balance, weights).

This module provides a lightweight CausalEDA class to quickly assess whether a
binary treatment problem is suitable for causal effect estimation. The outputs
focus on interpretability: treatment predictability, overlap/positivity,
covariate balance before/after weighting, and basic data health.

What the main outputs mean

- outcome_stats(): DataFrame with comprehensive statistics (count, mean, std, 
  percentiles, min/max) for outcome grouped by treatment.
- fit_propensity(): Numpy array of cross-validated propensity scores P(T=1|X).
- confounders_roc_auc(): Float ROC AUC of treatment vs. propensity score.
  Higher AUC implies treatment is predictable from confounders (more confounding risk).
- positivity_check(): Dict with bounds, share_below, share_above, and flag.
  It reports what share of units have PS outside [low, high]; a large share
  signals poor overlap (violated positivity).
- plot_ps_overlap(): Overlaid histograms of PS for treated vs control.
- confounders_means(): DataFrame with comprehensive balance assessment including
  means by treatment group, absolute differences, and standardized mean differences (SMD).

Note: The class accepts either the project’s CausalData object (duck-typed) or a
CausalDataLite with explicit fields.
"""
from __future__ import annotations

from dataclasses import dataclass
from typing import List, Optional, Dict, Any, Tuple

import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold, cross_val_predict, KFold
from sklearn.metrics import roc_auc_score, mean_squared_error, mean_absolute_error
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from catboost import CatBoostClassifier, CatBoostRegressor
import matplotlib.pyplot as plt


[docs] class PropensityModel: """A model for propensity scores and related diagnostics. This class encapsulates propensity scores and provides methods for: - Computing ROC AUC - Extracting SHAP values - Plotting propensity score overlap - Checking positivity/overlap The class is returned by CausalEDA.fit_propensity() and provides a cleaner interface for propensity score analysis. """
[docs] def __init__(self, propensity_scores: np.ndarray, treatment_values: np.ndarray, fitted_model: Any, feature_names: List[str], X_for_shap: Optional[np.ndarray] = None, cat_features_for_shap: Optional[List[int]] = None): """Initialize PropensityModel with fitted model artifacts. Parameters ---------- propensity_scores : np.ndarray Array of propensity scores P(T=1|X) treatment_values : np.ndarray Array of actual treatment assignments (0/1) fitted_model : Any The fitted propensity score model feature_names : List[str] Names of features used in the model X_for_shap : Optional[np.ndarray] Preprocessed feature matrix for SHAP computation cat_features_for_shap : Optional[List[int]] Categorical feature indices for SHAP computation """ self.propensity_scores = propensity_scores self.treatment_values = treatment_values self.fitted_model = fitted_model self.feature_names = feature_names self.X_for_shap = X_for_shap self.cat_features_for_shap = cat_features_for_shap
@property def roc_auc(self) -> float: """Compute ROC AUC of treatment assignment vs. propensity scores. Higher AUC means treatment is more predictable from confounders, indicating stronger systematic differences between groups (potential confounding). Values near 0.5 suggest random-like assignment. Returns ------- float ROC AUC score between 0 and 1 """ return float(roc_auc_score(self.treatment_values, self.propensity_scores)) @property def shap(self) -> pd.DataFrame: """Return SHAP values from the fitted propensity score model. SHAP values show the directional contribution of each feature to treatment assignment prediction, where positive values increase treatment probability and negative values decrease it. Returns ------- pd.DataFrame For CatBoost models: DataFrame with columns 'feature' and 'shap_mean', where 'shap_mean' represents the mean SHAP value across all samples. For sklearn models: DataFrame with columns 'feature' and 'importance' (absolute coefficient values, for backward compatibility). Raises ------ RuntimeError If the fitted model does not support SHAP values extraction. """ # Extract SHAP values or feature importance based on model type if isinstance(self.fitted_model, CatBoostClassifier): # Use CatBoost's SHAP values for directional feature contributions try: # Import Pool for SHAP computation from catboost import Pool # Check if we have the required data for SHAP computation if self.X_for_shap is None: raise RuntimeError("Preprocessed data for SHAP computation not available.") # Create Pool object for SHAP computation shap_pool = Pool(data=self.X_for_shap, cat_features=self.cat_features_for_shap) # Get SHAP values - returns array of shape (n_samples, n_features + 1) # where the last column is the bias term shap_values = self.fitted_model.get_feature_importance(type='ShapValues', data=shap_pool) # Remove bias term (last column) and compute mean SHAP values across samples # This gives us the average directional contribution of each feature shap_values_no_bias = shap_values[:, :-1] # Remove bias column importance_values = np.mean(shap_values_no_bias, axis=0) # Mean across samples feature_names = self.feature_names column_name = 'shap_mean' # Use different column name to indicate SHAP values except Exception as e: raise RuntimeError(f"Failed to extract SHAP values from CatBoost model: {e}") elif hasattr(self.fitted_model, 'named_steps') and hasattr(self.fitted_model.named_steps.get('clf'), 'coef_'): # Handle sklearn pipeline with logistic regression try: clf = self.fitted_model.named_steps['clf'] # For logistic regression, use absolute coefficients as importance importance_values = np.abs(clf.coef_[0]) # Need to map back to original feature names through preprocessing # For simplicity, if we have preprocessed features, we'll use the original feature names # and aggregate importance for one-hot encoded categorical features prep = self.fitted_model.named_steps.get('prep') if hasattr(prep, 'get_feature_names_out'): try: # Try to get feature names from preprocessor transformed_names = prep.get_feature_names_out() feature_names = [str(name) for name in transformed_names] except: # Fallback to original feature names if transformation fails feature_names = self.feature_names if len(importance_values) != len(feature_names): # If lengths don't match due to one-hot encoding, we can't map back easily # Just use indices as feature names feature_names = [f"feature_{i}" for i in range(len(importance_values))] else: feature_names = self.feature_names column_name = 'importance' # Keep backward compatibility for sklearn models except Exception as e: raise RuntimeError(f"Failed to extract feature importance from sklearn model: {e}") else: raise RuntimeError(f"Feature importance extraction not supported for model type: {type(self.fitted_model)}") # Ensure we have matching lengths if len(importance_values) != len(feature_names): # This can happen with preprocessing transformations # Create generic feature names if needed if len(importance_values) > len(feature_names): feature_names = feature_names + [f"transformed_feature_{i}" for i in range(len(feature_names), len(importance_values))] else: feature_names = feature_names[:len(importance_values)] # Create DataFrame with appropriate column name result_df = pd.DataFrame({ 'feature': feature_names, column_name: importance_values }) # Sort appropriately: by absolute value for SHAP values (to show most impactful features), # by value for regular importance (higher is better) if column_name == 'shap_mean': # For SHAP values, sort by absolute value to show most impactful features first result_df = result_df.reindex(result_df[column_name].abs().sort_values(ascending=False).index) else: # For regular importance, sort by value (descending) result_df = result_df.sort_values(column_name, ascending=False) result_df = result_df.reset_index(drop=True) return result_df
[docs] def ps_graph(self): """Plot overlaid histograms of propensity scores for treated vs control. Useful to visually assess group overlap. Does not return data; it draws on the current matplotlib figure. """ plt.figure() t = self.treatment_values ps = self.propensity_scores plt.hist(ps[t == 1], bins=30, alpha=0.5, density=True, label="treated") plt.hist(ps[t == 0], bins=30, alpha=0.5, density=True, label="control") plt.xlabel("Propensity score") plt.ylabel("Density") plt.legend() plt.title("PS overlap")
[docs] def positivity_check(self, bounds: Tuple[float, float] = (0.05, 0.95)) -> Dict[str, Any]: """Check overlap/positivity based on propensity score thresholds. Parameters ---------- bounds : Tuple[float, float], default (0.05, 0.95) Lower and upper thresholds for positivity check Returns ------- Dict[str, Any] Dictionary with: - bounds: (low, high) thresholds used - share_below: fraction with PS < low - share_above: fraction with PS > high - flag: heuristic boolean True if the tails collectively exceed ~2% """ low, high = bounds ps = self.propensity_scores share_low = float((ps < low).mean()) share_high = float((ps > high).mean()) flag = (share_low + share_high) > 0.02 # heuristic return {"bounds": bounds, "share_below": share_low, "share_above": share_high, "flag": bool(flag)}
[docs] class OutcomeModel: """A model for outcome prediction and related diagnostics. This class encapsulates outcome predictions and provides methods for: - Computing RMSE and MAE regression metrics - Extracting SHAP values for outcome prediction The class is returned by CausalEDA.outcome_fit() and provides a cleaner interface for outcome model analysis. """
[docs] def __init__(self, predicted_outcomes: np.ndarray, actual_outcomes: np.ndarray, fitted_model: Any, feature_names: List[str], X_for_shap: Optional[np.ndarray] = None, cat_features_for_shap: Optional[List[int]] = None): """Initialize OutcomeModel with fitted model artifacts. Parameters ---------- predicted_outcomes : np.ndarray Array of predicted outcome values actual_outcomes : np.ndarray Array of actual outcome values fitted_model : Any The fitted outcome prediction model feature_names : List[str] Names of features used in the model (confounders only) X_for_shap : Optional[np.ndarray] Preprocessed feature matrix for SHAP computation cat_features_for_shap : Optional[List[int]] Categorical feature indices for SHAP computation """ self.predicted_outcomes = predicted_outcomes self.actual_outcomes = actual_outcomes self.fitted_model = fitted_model self.feature_names = feature_names self.X_for_shap = X_for_shap self.cat_features_for_shap = cat_features_for_shap
@property def scores(self) -> Dict[str, float]: """Compute regression metrics (RMSE and MAE) for outcome predictions. Returns ------- Dict[str, float] Dictionary containing: - 'rmse': Root Mean Squared Error - 'mae': Mean Absolute Error """ mse = mean_squared_error(self.actual_outcomes, self.predicted_outcomes) rmse = np.sqrt(mse) mae = mean_absolute_error(self.actual_outcomes, self.predicted_outcomes) return { 'rmse': float(rmse), 'mae': float(mae) } @property def shap(self) -> pd.DataFrame: """Return SHAP values from the fitted outcome prediction model. SHAP values show the directional contribution of each feature to outcome prediction, where positive values increase the predicted outcome and negative values decrease it. Returns ------- pd.DataFrame For CatBoost models: DataFrame with columns 'feature' and 'shap_mean', where 'shap_mean' represents the mean SHAP value across all samples. For sklearn models: DataFrame with columns 'feature' and 'importance' (absolute coefficient values, for backward compatibility). Raises ------ RuntimeError If the fitted model does not support SHAP values extraction. """ # Extract SHAP values or feature importance based on model type if isinstance(self.fitted_model, CatBoostRegressor): # Use CatBoost's SHAP values for directional feature contributions try: # Import Pool for SHAP computation from catboost import Pool # Check if we have the required data for SHAP computation if self.X_for_shap is None: raise RuntimeError("Preprocessed data for SHAP computation not available.") # Create Pool object for SHAP computation shap_pool = Pool(data=self.X_for_shap, cat_features=self.cat_features_for_shap) # Get SHAP values - returns array of shape (n_samples, n_features + 1) # where the last column is the bias term shap_values = self.fitted_model.get_feature_importance(type='ShapValues', data=shap_pool) # Remove bias term (last column) and compute mean SHAP values across samples # This gives us the average directional contribution of each feature shap_values_no_bias = shap_values[:, :-1] # Remove bias column importance_values = np.mean(shap_values_no_bias, axis=0) # Mean across samples feature_names = self.feature_names column_name = 'shap_mean' # Use different column name to indicate SHAP values except Exception as e: raise RuntimeError(f"Failed to extract SHAP values from CatBoost model: {e}") elif hasattr(self.fitted_model, 'named_steps') and hasattr(self.fitted_model.named_steps.get('clf'), 'coef_'): # Handle sklearn pipeline with linear regression try: clf = self.fitted_model.named_steps['clf'] # For linear regression, use absolute coefficients as importance importance_values = np.abs(clf.coef_) # Need to map back to original feature names through preprocessing # For simplicity, if we have preprocessed features, we'll use the original feature names # and aggregate importance for one-hot encoded categorical features prep = self.fitted_model.named_steps.get('prep') if hasattr(prep, 'get_feature_names_out'): try: # Try to get feature names from preprocessor transformed_names = prep.get_feature_names_out() feature_names = [str(name) for name in transformed_names] except: # Fallback to original feature names if transformation fails feature_names = self.feature_names if len(importance_values) != len(feature_names): # If lengths don't match due to one-hot encoding, we can't map back easily # Just use indices as feature names feature_names = [f"feature_{i}" for i in range(len(importance_values))] else: feature_names = self.feature_names column_name = 'importance' # Keep backward compatibility for sklearn models except Exception as e: raise RuntimeError(f"Failed to extract feature importance from sklearn model: {e}") else: raise RuntimeError(f"Feature importance extraction not supported for model type: {type(self.fitted_model)}") # Ensure we have matching lengths if len(importance_values) != len(feature_names): # This can happen with preprocessing transformations # Create generic feature names if needed if len(importance_values) > len(feature_names): feature_names = feature_names + [f"transformed_feature_{i}" for i in range(len(feature_names), len(importance_values))] else: feature_names = feature_names[:len(importance_values)] # Create DataFrame with appropriate column name result_df = pd.DataFrame({ 'feature': feature_names, column_name: importance_values }) # Sort appropriately: by absolute value for SHAP values (to show most impactful features), # by value for regular importance (higher is better) if column_name == 'shap_mean': # For SHAP values, sort by absolute value to show most impactful features first result_df = result_df.reindex(result_df[column_name].abs().sort_values(ascending=False).index) else: # For regular importance, sort by value (descending) result_df = result_df.sort_values(column_name, ascending=False) result_df = result_df.reset_index(drop=True) return result_df
# Optional lightweight dataclass for standalone usage, but CausalEDA also # supports the existing causalkit.data.CausalData which uses `confounders`.
[docs] @dataclass class CausalDataLite: """A minimal container for dataset roles used by CausalEDA. Attributes - df: The full pandas DataFrame containing treatment, outcome and covariates. - treatment: Column name of the binary treatment indicator (0/1). - target: Column name of the outcome variable. - confounders: List of covariate column names used to model treatment. """ df: pd.DataFrame treatment: str target: str confounders: List[str]
def _extract_roles(data_obj: Any) -> Dict[str, Any]: """Extract dataset roles from various supported data containers. Accepts: - CausalDataLite - Project's CausalData (duck-typed: attributes df, treatment, outcome, and either confounders or confounders) - Any object exposing the same attributes/properties Returns a dict with keys: df, treatment, outcome, confounders. If both confounders/confounders are absent, it assumes all columns except treatment/outcome are confounders. """ # Direct dataclass-like attributes df = getattr(data_obj, "df") treatment_attr = getattr(data_obj, "treatment") # Support both 'outcome' (project's CausalData) and 'target' (CausalDataLite) target_attr = getattr(data_obj, "outcome", None) if target_attr is None: target_attr = getattr(data_obj, "target") # If these are Series (as in causalkit.data.CausalData properties), convert to column names if isinstance(treatment_attr, pd.Series): treatment = treatment_attr.name else: treatment = treatment_attr if isinstance(target_attr, pd.Series): target = target_attr.name else: target = target_attr if hasattr(data_obj, "confounders") and getattr(data_obj, "confounders") is not None: confs = list(getattr(data_obj, "confounders")) elif hasattr(data_obj, "confounders") and getattr(data_obj, "confounders") is not None: # causalkit.data.CausalData.confounders returns a DataFrame or None; if it's a # DataFrame, use its columns; if it's a list/iterable, cast to list. cofs = getattr(data_obj, "confounders") if isinstance(cofs, pd.DataFrame): confs = list(cofs.columns) else: confs = list(cofs) if cofs is not None else [] else: # Last resort: assume all columns except treatment/outcome are confounders confs = [c for c in df.columns if c not in {treatment, target}] return {"df": df, "treatment": treatment, "outcome": target, "confounders": confs}
[docs] class CausalEDA: """Exploratory diagnostics for causal designs with binary treatment. The class exposes methods to: - Summarize outcome by treatment and naive mean difference. - Estimate cross-validated propensity scores and assess treatment predictability (AUC) and positivity/overlap. - Inspect covariate balance via standardized mean differences (SMD) before/after IPTW weighting; visualize with a love plot. - Inspect weight distributions and effective sample size (ESS). """
[docs] def __init__(self, data: Any, ps_model: Optional[Any] = None, n_splits: int = 5, random_state: int = 42): roles = _extract_roles(data) self.d = CausalDataLite(df=roles["df"], treatment=roles["treatment"], target=roles["outcome"], confounders=roles["confounders"]) self.n_splits = n_splits self.random_state = random_state self.ps_model = ps_model or CatBoostClassifier( thread_count=-1, # Use all available threads random_seed=random_state, verbose=False # Suppress training output ) # Preprocessing for CatBoost - identify categorical features for native handling X = self.d.df[self.d.confounders] num = X.select_dtypes(include=[np.number]).columns.tolist() cat = [c for c in X.columns if c not in num] self.cat_features = [X.columns.get_loc(c) for c in cat] if cat else None # For CatBoost, we can use minimal preprocessing since it handles categoricals natively if isinstance(self.ps_model, CatBoostClassifier): # Only scale numeric features, keep categoricals as-is for CatBoost if num: num_transformer = Pipeline(steps=[ ("scaler", StandardScaler(with_mean=True, with_std=True)) ]) self.preproc = ColumnTransformer( transformers=[ ("num", num_transformer, num), ("cat", "passthrough", cat), ], remainder="drop", ) else: # All categorical, no preprocessing needed self.preproc = "passthrough" else: # Fallback preprocessing for other models (like LogisticRegression) num_transformer = Pipeline(steps=[ ("scaler", StandardScaler(with_mean=True, with_std=True)) ]) self.preproc = ColumnTransformer( transformers=[ ("num", num_transformer, num), ("cat", OneHotEncoder(handle_unknown="ignore", drop=None, sparse_output=False), cat), ], remainder="drop", ) self.ps_pipe = Pipeline([("prep", self.preproc), ("clf", self.ps_model)])
# ---------- basics ----------
[docs] def data_shape(self) -> Dict[str, int]: """Return the shape information of the causal dataset. Returns a dict with: - n_rows: number of rows (observations) in the dataset - n_columns: number of columns (features) in the dataset This provides a quick overview of the dataset dimensions for exploratory analysis and reporting purposes. Returns ------- Dict[str, int] Dictionary containing 'n_rows' and 'n_columns' keys with corresponding integer values representing the dataset dimensions. Examples -------- >>> eda = CausalEDA(causal_data) >>> shape_info = eda.data_shape() >>> print(f"Dataset has {shape_info['n_rows']} rows and {shape_info['n_columns']} columns") """ df = self.d.df n_rows, n_columns = df.shape return {"n_rows": n_rows, "n_columns": n_columns}
[docs] def outcome_stats(self) -> pd.DataFrame: """Comprehensive outcome statistics grouped by treatment. Returns a DataFrame with detailed outcome statistics for each treatment group, including count, mean, std, min, various percentiles, and max. This method provides comprehensive outcome analysis and returns data in a clean DataFrame format suitable for reporting. Returns ------- pd.DataFrame DataFrame with treatment groups as index and the following columns: - count: number of observations in each group - mean: average outcome value - std: standard deviation of outcome - min: minimum outcome value - p10: 10th percentile - p25: 25th percentile (Q1) - median: 50th percentile (median) - p75: 75th percentile (Q3) - p90: 90th percentile - max: maximum outcome value Examples -------- >>> eda = CausalEDA(causal_data) >>> stats = eda.outcome_stats() >>> print(stats) count mean std min p10 p25 median p75 p90 max treatment 0 3000 5.123456 2.345678 0.123456 2.345678 3.456789 5.123456 6.789012 7.890123 9.876543 1 2000 6.789012 2.456789 0.234567 3.456789 4.567890 6.789012 8.901234 9.012345 10.987654 """ df, t, y = self.d.df, self.d.treatment, self.d.target # Ensure treatment is numeric for grouping if not pd.api.types.is_numeric_dtype(df[t]): raise ValueError("Treatment must be numeric 0/1 for outcome_stats().") # Create grouped object for multiple operations grouped = df.groupby(t)[y] # Calculate basic statistics using built-in methods basic_stats = grouped.agg(['count', 'mean', 'std', 'min', 'median', 'max']) # Calculate percentiles separately to avoid pandas aggregation mixing issues p10 = grouped.quantile(0.10) p25 = grouped.quantile(0.25) p75 = grouped.quantile(0.75) p90 = grouped.quantile(0.90) # Combine all statistics into a single DataFrame stats_df = pd.DataFrame({ 'count': basic_stats['count'], 'mean': basic_stats['mean'], 'std': basic_stats['std'], 'min': basic_stats['min'], 'p10': p10, 'p25': p25, 'median': basic_stats['median'], 'p75': p75, 'p90': p90, 'max': basic_stats['max'] }) # Ensure the index is named appropriately stats_df.index.name = 'treatment' return stats_df
# ---------- propensity & overlap ----------
[docs] def fit_propensity(self) -> 'PropensityModel': """Estimate cross-validated propensity scores P(T=1|X). Uses a preprocessing+CatBoost classifier pipeline with stratified K-fold cross_val_predict to generate out-of-fold probabilities. CatBoost uses all available threads and handles categorical features natively. Returns a PropensityModel instance containing propensity scores and diagnostic methods. Returns ------- PropensityModel A PropensityModel instance with methods for: - roc_auc: ROC AUC score property - shap: SHAP values DataFrame property - ps_graph(): method to plot propensity score overlap - positivity_check(): method to check positivity/overlap """ df = self.d.df X = df[self.d.confounders] t = df[self.d.treatment].astype(int).values cv = StratifiedKFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state) # Special handling for CatBoost to properly pass categorical features if isinstance(self.ps_model, CatBoostClassifier): # For CatBoost, we need custom cross-validation to pass cat_features properly import warnings ps = np.zeros(len(X)) with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) for train_idx, test_idx in cv.split(X, t): # Prepare data for this fold X_train, X_test = X.iloc[train_idx], X.iloc[test_idx] t_train = t[train_idx] # Apply preprocessing X_train_prep = self.preproc.fit_transform(X_train) X_test_prep = self.preproc.transform(X_test) # Create and train CatBoost model for this fold model = CatBoostClassifier( thread_count=-1, random_seed=self.random_state, verbose=False ) # Identify categorical features after preprocessing if self.cat_features is not None: # Map original categorical feature indices to preprocessed data num_features = X.select_dtypes(include=[np.number]).shape[1] cat_features_prep = list(range(num_features, X_train_prep.shape[1])) else: cat_features_prep = None model.fit(X_train_prep, t_train, cat_features=cat_features_prep) ps[test_idx] = model.predict_proba(X_test_prep)[:, 1] else: # Use standard sklearn pipeline for non-CatBoost models import warnings with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) ps = cross_val_predict(self.ps_pipe, X, t, cv=cv, method="predict_proba")[:, 1] # clip away from 0/1 for stability ps = np.clip(ps, 1e-6, 1 - 1e-6) self._ps = ps # Train a final model on the full dataset for feature importance # This provides consistent feature importance across the entire dataset if isinstance(self.ps_model, CatBoostClassifier): # Apply preprocessing to full dataset X_full_prep = self.preproc.fit_transform(X) # Create and train final model final_model = CatBoostClassifier( thread_count=-1, random_seed=self.random_state, verbose=False ) # Identify categorical features after preprocessing if self.cat_features is not None: # Map original categorical feature indices to preprocessed data num_features = X.select_dtypes(include=[np.number]).shape[1] cat_features_prep = list(range(num_features, X_full_prep.shape[1])) else: cat_features_prep = None with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) final_model.fit(X_full_prep, t, cat_features=cat_features_prep) # Store the trained model and data needed for SHAP computation self._fitted_model = final_model self._feature_names = X.columns.tolist() self._X_for_shap = X_full_prep # Store preprocessed data for SHAP self._cat_features_for_shap = cat_features_prep # Store categorical features info else: # For non-CatBoost models, fit the pipeline on full data with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) self.ps_pipe.fit(X, t) self._fitted_model = self.ps_pipe self._feature_names = X.columns.tolist() # Create and return PropensityModel instance return PropensityModel( propensity_scores=ps, treatment_values=t, fitted_model=self._fitted_model, feature_names=self._feature_names, X_for_shap=getattr(self, '_X_for_shap', None), cat_features_for_shap=getattr(self, '_cat_features_for_shap', None) )
[docs] def outcome_fit(self, outcome_model: Optional[Any] = None) -> 'OutcomeModel': """Fit a regression model to predict outcome from confounders only. Uses a preprocessing+CatBoost regressor pipeline with K-fold cross_val_predict to generate out-of-fold predictions. CatBoost uses all available threads and handles categorical features natively. Returns an OutcomeModel instance containing predicted outcomes and diagnostic methods. The outcome model predicts the baseline outcome from confounders only, excluding treatment. This is essential for proper causal analysis. Parameters ---------- outcome_model : Optional[Any] Custom regression model to use. If None, uses CatBoostRegressor. Returns ------- OutcomeModel An OutcomeModel instance with methods for: - scores: RMSE and MAE regression metrics - shap: SHAP values DataFrame property for outcome prediction """ df = self.d.df # Features: confounders only (treatment excluded for proper causal analysis) X = df[self.d.confounders] y = df[self.d.target].values cv = KFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state) # Default to CatBoostRegressor if no custom model provided if outcome_model is None: outcome_model = CatBoostRegressor( thread_count=-1, random_seed=self.random_state, verbose=False ) # Identify categorical features for native CatBoost handling num_features = X.select_dtypes(include=[np.number]).columns.tolist() cat_features = [c for c in X.columns if c not in num_features] cat_features_indices = [X.columns.get_loc(c) for c in cat_features] if cat_features else None # Setup preprocessing similar to propensity model if isinstance(outcome_model, CatBoostRegressor): # Only scale numeric features, keep categoricals as-is for CatBoost if num_features: num_transformer = Pipeline(steps=[ ("scaler", StandardScaler(with_mean=True, with_std=True)) ]) preproc = ColumnTransformer( transformers=[ ("num", num_transformer, num_features), ("cat", "passthrough", cat_features), ], remainder="drop", ) else: # All categorical, no preprocessing needed preproc = "passthrough" else: # Fallback preprocessing for other models num_transformer = Pipeline(steps=[ ("scaler", StandardScaler(with_mean=True, with_std=True)) ]) preproc = ColumnTransformer( transformers=[ ("num", num_transformer, num_features), ("cat", OneHotEncoder(handle_unknown="ignore", drop=None, sparse_output=False), cat_features), ], remainder="drop", ) # Special handling for CatBoost to properly pass categorical features if isinstance(outcome_model, CatBoostRegressor): import warnings predictions = np.zeros(len(X)) with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) for train_idx, test_idx in cv.split(X, y): # Prepare data for this fold X_train, X_test = X.iloc[train_idx], X.iloc[test_idx] y_train = y[train_idx] # Apply preprocessing X_train_prep = preproc.fit_transform(X_train) X_test_prep = preproc.transform(X_test) # Create and train CatBoost model for this fold model = CatBoostRegressor( thread_count=-1, random_seed=self.random_state, verbose=False ) # Identify categorical features after preprocessing if cat_features_indices is not None: # Map original categorical feature indices to preprocessed data num_features_count = len(num_features) cat_features_prep = list(range(num_features_count, X_train_prep.shape[1])) else: cat_features_prep = None model.fit(X_train_prep, y_train, cat_features=cat_features_prep) predictions[test_idx] = model.predict(X_test_prep) else: # Use standard sklearn pipeline for non-CatBoost models pipeline = Pipeline([("prep", preproc), ("reg", outcome_model)]) import warnings with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) predictions = cross_val_predict(pipeline, X, y, cv=cv) self._outcome_predictions = predictions # Train a final model on the full dataset for SHAP computation if isinstance(outcome_model, CatBoostRegressor): # Apply preprocessing to full dataset X_full_prep = preproc.fit_transform(X) # Create and train final model final_model = CatBoostRegressor( thread_count=-1, random_seed=self.random_state, verbose=False ) # Identify categorical features after preprocessing if cat_features_indices is not None: # Map original categorical feature indices to preprocessed data num_features_count = len(num_features) cat_features_prep = list(range(num_features_count, X_full_prep.shape[1])) else: cat_features_prep = None with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) final_model.fit(X_full_prep, y, cat_features=cat_features_prep) # Store the trained model and data needed for SHAP computation self._outcome_fitted_model = final_model self._outcome_feature_names = X.columns.tolist() self._outcome_X_for_shap = X_full_prep # Store preprocessed data for SHAP self._outcome_cat_features_for_shap = cat_features_prep # Store categorical features info else: # For non-CatBoost models, fit the pipeline on full data pipeline = Pipeline([("prep", preproc), ("reg", outcome_model)]) with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) pipeline.fit(X, y) self._outcome_fitted_model = pipeline self._outcome_feature_names = X.columns.tolist() # Create and return OutcomeModel instance return OutcomeModel( predicted_outcomes=predictions, actual_outcomes=y, fitted_model=self._outcome_fitted_model, feature_names=self._outcome_feature_names, X_for_shap=getattr(self, '_outcome_X_for_shap', None), cat_features_for_shap=getattr(self, '_outcome_cat_features_for_shap', None) )
[docs] def confounders_roc_auc(self, ps: Optional[np.ndarray] = None) -> float: """Compute ROC AUC of treatment assignment vs. estimated propensity score. Interpretation: Higher AUC means treatment is more predictable from confounders, indicating stronger systematic differences between groups (potential confounding). Values near 0.5 suggest random-like assignment. """ if ps is None: ps = getattr(self, "_ps", None) if ps is None: ps_model = self.fit_propensity() ps = ps_model.propensity_scores # AUC against actual treatment t = self.d.df[self.d.treatment].astype(int).values return float(roc_auc_score(t, ps))
[docs] def positivity_check(self, ps: Optional[np.ndarray] = None, bounds: Tuple[float, float] = (0.05, 0.95)) -> Dict[str, Any]: """Check overlap/positivity based on propensity score thresholds. Returns a dict with: - bounds: (low, high) thresholds used - share_below: fraction with PS < low - share_above: fraction with PS > high - flag: heuristic boolean True if the tails collectively exceed ~2% """ if ps is None: ps = getattr(self, "_ps", None) if ps is None: ps_model = self.fit_propensity() ps = ps_model.propensity_scores low, high = bounds share_low = float((ps < low).mean()) share_high = float((ps > high).mean()) flag = (share_low + share_high) > 0.02 # heuristic return {"bounds": bounds, "share_below": share_low, "share_above": share_high, "flag": bool(flag)}
[docs] def plot_ps_overlap(self, ps: Optional[np.ndarray] = None): """Plot overlaid histograms of propensity scores for treated vs control. Useful to visually assess group overlap. Does not return data; it draws on the current matplotlib figure. """ if ps is None: ps = getattr(self, "_ps", None) if ps is None: ps_model = self.fit_propensity() ps = ps_model.propensity_scores df = self.d.df t = df[self.d.treatment].astype(int).values plt.figure() plt.hist(ps[t == 1], bins=30, alpha=0.5, density=True, label="treated") plt.hist(ps[t == 0], bins=30, alpha=0.5, density=True, label="control") plt.xlabel("Propensity score") plt.ylabel("Density") plt.legend() plt.title("PS overlap")
# ---------- balance ----------
[docs] def confounders_means(self) -> pd.DataFrame: """Comprehensive confounders balance assessment with means by treatment group. Returns a DataFrame with detailed balance information including: - Mean values of each confounder for control group (treatment=0) - Mean values of each confounder for treated group (treatment=1) - Absolute difference between treatment groups - Standardized Mean Difference (SMD) for formal balance assessment This method provides a comprehensive view of confounder balance by showing the actual mean values alongside the standardized differences, making it easier to understand both the magnitude and direction of imbalances. Returns ------- pd.DataFrame DataFrame with confounders as index and the following columns: - mean_t_0: mean value for control group (treatment=0) - mean_t_1: mean value for treated group (treatment=1) - abs_diff: absolute difference abs(mean_t_1 - mean_t_0) - smd: standardized mean difference (Cohen's d) Notes ----- SMD values > 0.1 in absolute value typically indicate meaningful imbalance. Categorical variables are automatically converted to dummy variables. Examples -------- >>> eda = CausalEDA(causal_data) >>> balance = eda.confounders_means() >>> print(balance.head()) mean_t_0 mean_t_1 abs_diff smd confounders age 29.5 31.2 1.7 0.085 income 45000.0 47500.0 2500.0 0.125 education 0.25 0.35 0.1 0.215 """ df = self.d.df X = df[self.d.confounders] t = df[self.d.treatment].astype(int).values # Convert categorical variables to dummy variables for analysis X_num = pd.get_dummies(X, drop_first=False) rows = [] for col in X_num.columns: x = X_num[col].values.astype(float) # Calculate means for each treatment group mean_t_0 = x[t == 0].mean() mean_t_1 = x[t == 1].mean() # Calculate absolute difference abs_diff = abs(mean_t_1 - mean_t_0) # Calculate standardized mean difference (SMD) v_control = x[t == 0].var(ddof=1) if len(x[t == 0]) > 1 else 0.0 v_treated = x[t == 1].var(ddof=1) if len(x[t == 1]) > 1 else 0.0 pooled_std = np.sqrt((v_control + v_treated) / 2) smd = (mean_t_1 - mean_t_0) / pooled_std if pooled_std > 0 else 0.0 rows.append({ "confounders": col, "mean_t_0": mean_t_0, "mean_t_1": mean_t_1, "abs_diff": abs_diff, "smd": smd }) # Create DataFrame and set confounders as index balance_df = pd.DataFrame(rows) balance_df = balance_df.set_index("confounders") # Sort by absolute SMD value (most imbalanced first) balance_df = balance_df.reindex( balance_df['smd'].abs().sort_values(ascending=False).index ) return balance_df
[docs] def outcome_plots(self, treatment: Optional[str] = None, target: Optional[str] = None, bins: int = 30, density: bool = True, alpha: float = 0.5, figsize: Tuple[float, float] = (7, 4), sharex: bool = True) -> Tuple[plt.Figure, plt.Figure]: """ Plot the distribution of the outcome for every treatment on one plot, and also produce a boxplot by treatment to visualize outliers. Parameters ---------- treatment : Optional[str] Treatment column name. Defaults to the treatment stored in the CausalEDA data. target : Optional[str] Target/outcome column name. Defaults to the outcome stored in the CausalEDA data. bins : int Number of bins for histograms when the outcome is numeric. density : bool Whether to normalize histograms to form a density. alpha : float Transparency for overlaid histograms. figsize : tuple Figure size for the plots. sharex : bool If True and the outcome is numeric, use the same x-limits across treatments. Returns ------- Tuple[matplotlib.figure.Figure, matplotlib.figure.Figure] (fig_distribution, fig_boxplot) """ df = self.d.df t_col = treatment or self.d.treatment y_col = target or self.d.target if t_col not in df.columns or y_col not in df.columns: raise ValueError("Specified treatment/outcome columns not found in DataFrame.") # Determine unique treatments preserving natural sort treatments = pd.unique(df[t_col]) # Distribution plot (overlayed) fig1 = plt.figure(figsize=figsize) ax1 = fig1.gca() # Only support numeric outcome for histogram/boxplot in this minimal implementation if not pd.api.types.is_numeric_dtype(df[y_col]): # Fallback: draw normalized bars per treatment for categorical outcome # Compute frequency of outcome values per treatment and stack them side-by-side vals = pd.unique(df[y_col]) vals_sorted = sorted(vals, key=lambda v: (str(type(v)), v)) width = 0.8 / max(1, len(treatments)) x = np.arange(len(vals_sorted)) for i, tr in enumerate(treatments): sub = df[df[t_col] == tr][y_col] counts = pd.Series(sub).value_counts(normalize=True) heights = [counts.get(v, 0.0) for v in vals_sorted] ax1.bar(x + i * width, heights, width=width, alpha=alpha, label=str(tr)) ax1.set_xticks(x + (len(treatments) - 1) * width / 2) ax1.set_xticklabels([str(v) for v in vals_sorted]) ax1.set_ylabel("Proportion") ax1.set_xlabel(str(y_col)) ax1.set_title("Target distribution by treatment (categorical)") ax1.legend(title=str(t_col)) else: # Numeric outcome: overlay histograms/density # Determine common x-limits if sharex xmin, xmax = None, None if sharex: xmin = float(df[y_col].min()) xmax = float(df[y_col].max()) for tr in treatments: y_vals = df.loc[df[t_col] == tr, y_col].dropna().values if len(y_vals) == 0: continue ax1.hist(y_vals, bins=bins, density=density, alpha=alpha, label=str(tr), range=(xmin, xmax) if sharex else None) ax1.set_xlabel(str(y_col)) ax1.set_ylabel("Density" if density else "Count") ax1.set_title("Target distribution by treatment") ax1.legend(title=str(t_col)) # Boxplot by treatment fig2 = plt.figure(figsize=figsize) ax2 = fig2.gca() # Create data in order of treatments data = [df.loc[df[t_col] == tr, y_col].dropna().values for tr in treatments] ax2.boxplot(data, labels=[str(tr) for tr in treatments], showfliers=True) ax2.set_xlabel(str(t_col)) ax2.set_ylabel(str(y_col)) ax2.set_title("Target by treatment (boxplot)") return fig1, fig2
[docs] def treatment_features(self) -> pd.DataFrame: """Return SHAP values from the fitted propensity score model. This method extracts SHAP values from the propensity score model that was trained during fit_propensity(). SHAP values show the directional contribution of each feature to treatment assignment prediction, where positive values increase treatment probability and negative values decrease it. Returns ------- pd.DataFrame For CatBoost models: DataFrame with columns 'feature' and 'shap_mean', where 'shap_mean' represents the mean SHAP value across all samples. Positive values indicate features that increase treatment probability, negative values indicate features that decrease treatment probability. For sklearn models: DataFrame with columns 'feature' and 'importance' (absolute coefficient values, for backward compatibility). Raises ------ RuntimeError If fit_propensity() has not been called yet, or if the fitted model does not support SHAP values extraction. Examples -------- >>> eda = CausalEDA(data) >>> ps = eda.fit_propensity() # Must be called first >>> shap_df = eda.treatment_features() >>> print(shap_df.head()) feature shap_mean 0 age 0.45 # Positive: increases treatment prob 1 income -0.32 # Negative: decreases treatment prob 2 education 0.12 # Positive: increases treatment prob """ # Check if model has been fitted if not hasattr(self, '_fitted_model') or self._fitted_model is None: raise RuntimeError("No fitted propensity model found. Please call fit_propensity() first.") if not hasattr(self, '_feature_names') or self._feature_names is None: raise RuntimeError("Feature names not available. Please call fit_propensity() first.") # Extract SHAP values or feature importance based on model type if isinstance(self._fitted_model, CatBoostClassifier): # Use CatBoost's SHAP values for directional feature contributions try: # Import Pool for SHAP computation from catboost import Pool # Check if we have the required data for SHAP computation if not hasattr(self, '_X_for_shap') or self._X_for_shap is None: raise RuntimeError("Preprocessed data for SHAP computation not available. Please call fit_propensity() first.") # Create Pool object for SHAP computation shap_pool = Pool(data=self._X_for_shap, cat_features=self._cat_features_for_shap) # Get SHAP values - returns array of shape (n_samples, n_features + 1) # where the last column is the bias term shap_values = self._fitted_model.get_feature_importance(type='ShapValues', data=shap_pool) # Remove bias term (last column) and compute mean SHAP values across samples # This gives us the average directional contribution of each feature shap_values_no_bias = shap_values[:, :-1] # Remove bias column importance_values = np.mean(shap_values_no_bias, axis=0) # Mean across samples feature_names = self._feature_names column_name = 'shap_mean' # Use different column name to indicate SHAP values except Exception as e: raise RuntimeError(f"Failed to extract SHAP values from CatBoost model: {e}") elif hasattr(self._fitted_model, 'named_steps') and hasattr(self._fitted_model.named_steps.get('clf'), 'coef_'): # Handle sklearn pipeline with logistic regression try: clf = self._fitted_model.named_steps['clf'] # For logistic regression, use absolute coefficients as importance importance_values = np.abs(clf.coef_[0]) # Need to map back to original feature names through preprocessing # For simplicity, if we have preprocessed features, we'll use the original feature names # and aggregate importance for one-hot encoded categorical features prep = self._fitted_model.named_steps.get('prep') if hasattr(prep, 'get_feature_names_out'): try: # Try to get feature names from preprocessor transformed_names = prep.get_feature_names_out() feature_names = [str(name) for name in transformed_names] except: # Fallback to original feature names if transformation fails feature_names = self._feature_names if len(importance_values) != len(feature_names): # If lengths don't match due to one-hot encoding, we can't map back easily # Just use indices as feature names feature_names = [f"feature_{i}" for i in range(len(importance_values))] else: feature_names = self._feature_names column_name = 'importance' # Keep backward compatibility for sklearn models except Exception as e: raise RuntimeError(f"Failed to extract feature importance from sklearn model: {e}") else: raise RuntimeError(f"Feature importance extraction not supported for model type: {type(self._fitted_model)}") # Ensure we have matching lengths if len(importance_values) != len(feature_names): # This can happen with preprocessing transformations # Create generic feature names if needed if len(importance_values) > len(feature_names): feature_names = feature_names + [f"transformed_feature_{i}" for i in range(len(feature_names), len(importance_values))] else: feature_names = feature_names[:len(importance_values)] # Create DataFrame with appropriate column name result_df = pd.DataFrame({ 'feature': feature_names, column_name: importance_values }) # Sort appropriately: by absolute value for SHAP values (to show most impactful features), # by value for regular importance (higher is better) if column_name == 'shap_mean': # For SHAP values, sort by absolute value to show most impactful features first result_df = result_df.reindex(result_df[column_name].abs().sort_values(ascending=False).index) else: # For regular importance, sort by value (descending) result_df = result_df.sort_values(column_name, ascending=False) result_df = result_df.reset_index(drop=True) return result_df