Source code for causalis.eda.eda

"""EDA utilities for causal analysis (propensity, overlap, balance, weights).

This module provides a lightweight CausalEDA class to quickly assess whether a
binary treatment problem is suitable for causal effect estimation. The outputs
focus on interpretability: treatment predictability, overlap/positivity,
covariate balance before/after weighting, and basic data health.

What the main outputs mean

- outcome_stats(): DataFrame with comprehensive statistics (count, mean, std, 
  percentiles, min/max) for outcome grouped by treatment.
- fit_propensity(): Numpy array of cross-validated propensity scores P(T=1|X).
- confounders_roc_auc(): Float ROC AUC of treatment vs. propensity score.
  Higher AUC implies treatment is predictable from confounders (more confounding risk).
- positivity_check(): Dict with bounds, share_below, share_above, and flag.
  It reports what share of units have PS outside [low, high]; a large share
  signals poor overlap (violated positivity).
- plot_ps_overlap(): Overlaid histograms of PS for treated vs control.
- confounders_means(): DataFrame with comprehensive balance assessment including
  means by treatment group, absolute differences, and standardized mean differences (SMD).

Note: The class accepts either the project’s CausalData object (duck-typed) or a
CausalDataLite with explicit fields.
"""
from __future__ import annotations

from dataclasses import dataclass
from typing import List, Optional, Dict, Any, Tuple

import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold, cross_val_predict, KFold
from sklearn.metrics import roc_auc_score, mean_squared_error, mean_absolute_error
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression, LinearRegression
from catboost import CatBoostClassifier, CatBoostRegressor
import matplotlib.pyplot as plt


import warnings
_CK_EDA_WARN_ONCE = set()

def _warn_once(name: str, to: str):
    key = (name, to)
    if key not in _CK_EDA_WARN_ONCE:
        warnings.warn(
            f"`{name}` is deprecated; use `{to}` instead. "
            "This alias will be removed in a future release.",
            DeprecationWarning,
            stacklevel=2,
        )
        _CK_EDA_WARN_ONCE.add(key)


[docs] class PropensityModel: """A model for m(x) = P(D=1|X) and related diagnostics. This class encapsulates propensity m and provides methods for: - Computing ROC AUC (AUC of D vs m) - Extracting SHAP values - Plotting m overlap - Checking positivity/overlap The class is returned by CausalEDA.fit_m() and provides a cleaner interface for propensity analysis. """
[docs] def __init__(self, m: Optional[np.ndarray] = None, d: Optional[np.ndarray] = None, fitted_model: Any = None, feature_names: List[str] = None, X_for_shap: Optional[np.ndarray] = None, cat_features_for_shap: Optional[List[int]] = None, propensity_scores: Optional[np.ndarray] = None, treatment_values: Optional[np.ndarray] = None): """Initialize PropensityModel with fitted model artifacts. Parameters ---------- m : np.ndarray Array of m(x) = P(D=1|X) d : np.ndarray Array of actual treatment assignments (0/1) fitted_model : Any The fitted propensity model feature_names : List[str] Names of features used in the model X_for_shap : Optional[np.ndarray] Preprocessed feature matrix for SHAP computation cat_features_for_shap : Optional[List[int]] Categorical feature indices for SHAP computation propensity_scores, treatment_values : Optional legacy aliases accepted for back-compat """ # Back-compat arg names if m is None and propensity_scores is not None: _warn_once("propensity_scores (init)", "m") m = propensity_scores if d is None and treatment_values is not None: d = treatment_values self.m = np.asarray(m) if m is not None else None self.d = np.asarray(d) if d is not None else None self.fitted_model = fitted_model self.feature_names = feature_names self.X_for_shap = X_for_shap self.cat_features_for_shap = cat_features_for_shap
[docs] @classmethod def from_kfold( cls, X: pd.DataFrame, t: np.ndarray, model: Optional[Any] = None, n_splits: int = 5, random_state: int = 42, preprocessor: Optional[ColumnTransformer] = None, ) -> "PropensityModel": """Estimate m(x) via K-fold and return a PropensityModel. Produces out-of-fold m estimates using StratifiedKFold. If no model is provided, a fast LogisticRegression is used. Categorical features are one-hot encoded and numeric features standardized by default using a ColumnTransformer. """ # Ensure inputs are aligned if isinstance(X, pd.DataFrame): feature_names = X.columns.tolist() else: raise ValueError("X must be a pandas DataFrame with named columns.") t = np.asarray(t).astype(int) # validate binary treatment if not np.isin(t, [0, 1]).all(): raise ValueError("Treatment must be binary {0,1}.") # Preprocessor default if preprocessor is None: num = X.select_dtypes(include=[np.number]).columns.tolist() cat = [c for c in X.columns if c not in num] num_transformer = Pipeline(steps=[("scaler", StandardScaler(with_mean=True, with_std=True))]) preprocessor = ColumnTransformer( transformers=[ ("num", num_transformer, num), ("cat", OneHotEncoder(handle_unknown="ignore", drop=None, sparse_output=False), cat), ], remainder="drop", ) # Model default if model is None: model = LogisticRegression(max_iter=1000) cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state) # Compute out-of-fold probabilities if isinstance(model, CatBoostClassifier): import warnings ps = np.zeros(len(X)) with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) for tr_idx, te_idx in cv.split(X, t): X_tr, X_te = X.iloc[tr_idx], X.iloc[te_idx] t_tr = t[tr_idx] X_tr_p = preprocessor.fit_transform(X_tr) X_te_p = preprocessor.transform(X_te) fold_model = CatBoostClassifier(thread_count=-1, random_seed=random_state, verbose=False, allow_writing_files=False) fold_model.fit(X_tr_p, t_tr) ps[te_idx] = fold_model.predict_proba(X_te_p)[:, 1] # Fit final model for SHAP X_full_p = preprocessor.fit_transform(X) final_model = CatBoostClassifier(thread_count=-1, random_state=random_state, verbose=False, allow_writing_files=False) with np.errstate(divide='ignore', invalid='ignore', over='ignore'): final_model.fit(X_full_p, t) X_for_shap = X_full_p fitted = final_model # use transformed feature names for SHAP alignment try: feature_names = preprocessor.get_feature_names_out().tolist() except Exception: feature_names = [f"feature_{i}" for i in range(X_full_p.shape[1])] else: pipe = Pipeline([("prep", preprocessor), ("clf", model)]) import warnings with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) ps = cross_val_predict(pipe, X, t, cv=cv, method="predict_proba")[:, 1] pipe.fit(X, t) fitted = pipe X_for_shap = None # clip for stability ps = np.clip(ps, 1e-6, 1 - 1e-6) return cls( propensity_scores=ps, treatment_values=t, fitted_model=fitted, feature_names=feature_names, X_for_shap=X_for_shap, )
@property def roc_auc(self) -> float: """Compute ROC AUC of treatment assignment vs. m(x). Higher AUC means treatment is more predictable from confounders, indicating stronger systematic differences between groups (potential confounding). Values near 0.5 suggest random-like assignment. Returns ------- float ROC AUC score between 0 and 1 """ return float(roc_auc_score(self.d, self.m)) @property def shap(self) -> pd.DataFrame: """Return feature attribution from the fitted propensity model. For CatBoost models: returns SHAP-based attributions with both signed mean (shap_mean) and magnitude (shap_mean_abs), sorted by shap_mean_abs. For sklearn linear models (LogisticRegression): returns absolute coefficients under column 'coef_abs' (not SHAP), sorted descending. """ # Extract SHAP values or feature importance based on model type if isinstance(self.fitted_model, CatBoostClassifier): try: from catboost import Pool if self.X_for_shap is None: raise RuntimeError("Preprocessed data for SHAP computation not available.") shap_pool = Pool(data=self.X_for_shap) shap_values = self.fitted_model.get_feature_importance(type='ShapValues', data=shap_pool) shap_values_no_bias = shap_values[:, :-1] shap_mean = np.mean(shap_values_no_bias, axis=0) shap_mean_abs = np.mean(np.abs(shap_values_no_bias), axis=0) feature_names = list(self.feature_names) if len(feature_names) != shap_values_no_bias.shape[1]: raise RuntimeError("Feature names length does not match SHAP values. Ensure transformed names are used.") result_df = pd.DataFrame({ 'feature': feature_names, 'shap_mean': shap_mean, 'shap_mean_abs': shap_mean_abs, }) # Sort by absolute shap_mean (test expects this order) result_df['abs_shap_mean'] = np.abs(result_df['shap_mean'].values) result_df = result_df.sort_values('abs_shap_mean', ascending=False).drop(columns=['abs_shap_mean']).reset_index(drop=True) # Augment with probability-scale metrics using baseline p0 = mean propensity score p0 = float(np.mean(self.propensity_scores)) p0 = float(np.clip(p0, 1e-9, 1 - 1e-9)) logit_p0 = float(np.log(p0 / (1.0 - p0))) # result_df['odds_mult_abs'] = np.exp(result_df['shap_mean_abs'].values) result_df['exact_pp_change_abs'] = 1.0 / (1.0 + np.exp(-(logit_p0 + result_df['shap_mean_abs'].values))) - p0 result_df['exact_pp_change_signed'] = 1.0 / (1.0 + np.exp(-(logit_p0 + result_df['shap_mean'].values))) - p0 return result_df except Exception as e: raise RuntimeError(f"Failed to extract SHAP values from CatBoost model: {e}") elif hasattr(self.fitted_model, 'named_steps') and hasattr(self.fitted_model.named_steps.get('clf'), 'coef_'): try: clf = self.fitted_model.named_steps['clf'] coef_abs = np.abs(clf.coef_[0]) prep = self.fitted_model.named_steps.get('prep') if hasattr(prep, 'get_feature_names_out'): try: feature_names = [str(n) for n in prep.get_feature_names_out()] except Exception: feature_names = [f"feature_{i}" for i in range(len(coef_abs))] else: feature_names = [f"feature_{i}" for i in range(len(coef_abs))] if len(coef_abs) != len(feature_names): raise RuntimeError("Feature names length does not match coefficients.") df = pd.DataFrame({'feature': feature_names, 'importance': coef_abs, 'coef_abs': coef_abs}).sort_values('importance', ascending=False).reset_index(drop=True) return df except Exception as e: raise RuntimeError(f"Failed to extract feature importance from sklearn model: {e}") else: raise RuntimeError(f"Feature importance extraction not supported for model type: {type(self.fitted_model)}") @property def propensity_scores(self): _warn_once("propensity_scores", "m") return self.m
[docs] def plot_m_overlap( self, clip=(0.01, 0.99), bins="fd", kde=True, shade_overlap=True, ax=None, figsize=(9, 5.5), dpi=220, font_scale=1.15, save=None, save_dpi=None, transparent=False, color_t=None, # None -> use Matplotlib defaults color_c=None, # None -> use Matplotlib defaults ): """ Overlap plot for m(x)=P(D=1|X) with high-res rendering. - x in [0,1] - Stable NumPy KDE w/ boundary reflection (no SciPy warnings) - Uses Matplotlib default colors unless color_t/color_c are provided """ import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt # ------- Helpers -------------------------------------------------------- def _silverman_bandwidth(x): x = np.asarray(x, float) n = x.size if n < 2: return 0.04 sd = np.std(x, ddof=1) iqr = np.subtract(*np.percentile(x, [75, 25])) s = sd if iqr <= 0 else min(sd, iqr / 1.34) h = 0.9 * s * n ** (-1 / 5) return float(max(h, 0.02)) def _kde_reflect(x, xs, h): x = np.asarray(x, float) if x.size == 0: return np.zeros_like(xs) if x.size < 2 or np.std(x) < 1e-8: mu = float(np.mean(x)) if x.size else 0.5 h0 = max(h, 0.02) z = (xs - mu) / h0 return np.exp(-0.5 * z ** 2) / (np.sqrt(2 * np.pi) * h0) xr = np.concatenate([x, -x, 2 - x]) # reflect at 0 and 1 diff = (xs[None, :] - xr[:, None]) / h kern = np.exp(-0.5 * diff ** 2) / (np.sqrt(2 * np.pi) * h) return kern.mean(axis=0) def _patch_color(patches, fallback): # Grab facecolor from the first bar; fallback to cycle color if needed for p in patches: fc = p.get_facecolor() if fc is not None: return fc # RGBA return fallback # ------- Data ----------------------------------------------------------- d = np.asarray(self.d).astype(int) m = np.asarray(self.m, dtype=float) mask = np.isfinite(m) & np.isfinite(d) d, m = d[mask], m[mask] mt = m[d == 1] mc = m[d == 0] if mt.size == 0 or mc.size == 0: raise ValueError("Both treated and control must have at least one observation after cleaning.") # Clamp to [0,1] to keep plot stable and inside bounds mtp = np.clip(mt, 0.0, 1.0) mcp = np.clip(mc, 0.0, 1.0) # ------- Figure/axes with high DPI & scaled fonts ---------------------- rc = { "font.size": 11 * font_scale, "axes.titlesize": 13 * font_scale, "axes.labelsize": 12 * font_scale, "legend.fontsize": 10 * font_scale, "xtick.labelsize": 10 * font_scale, "ytick.labelsize": 10 * font_scale, } with mpl.rc_context(rc): if ax is None: fig, ax = plt.subplots(figsize=figsize, dpi=dpi) else: fig = ax.figure try: fig.set_dpi(dpi) except Exception: pass # ------- Histograms (ALWAYS [0,1]) --------------------------------- ht = ax.hist(mtp, bins=bins, range=(0.0, 1.0), density=True, alpha=0.45, label=f"Treated (n={mt.size})", edgecolor="white", linewidth=0.6, color=color_t) # None -> default color hc = ax.hist(mcp, bins=bins, range=(0.0, 1.0), density=True, alpha=0.45, label=f"Control (n={mc.size})", edgecolor="white", linewidth=0.6, color=color_c) # None -> default color # Determine the actual colors used (so KDE/means match the bars) cycle = mpl.rcParams["axes.prop_cycle"].by_key().get("color", ["C0", "C1"]) used_t = color_t or _patch_color(ht[2], cycle[0]) used_c = color_c or _patch_color(hc[2], cycle[1]) # ------- KDE (stable NumPy implementation) ------------------------- if kde: if clip: lo, hi = np.quantile(np.clip(m, 0, 1), [clip[0], clip[1]]) lo, hi = float(max(0.0, lo)), float(min(1.0, hi)) if not (hi > lo): lo, hi = 0.0, 1.0 else: lo, hi = 0.0, 1.0 xs = np.linspace(0.0, 1.0, 800) h_t = _silverman_bandwidth(mtp) h_c = _silverman_bandwidth(mcp) yt = _kde_reflect(mtp, xs, h_t) yc = _kde_reflect(mcp, xs, h_c) ax.plot(xs, yt, linewidth=2.2, label="Treated (KDE)", color=used_t, antialiased=True) ax.plot(xs, yc, linewidth=2.2, linestyle="--", label="Control (KDE)", color=used_c, antialiased=True) if shade_overlap: y_min = np.minimum(yt, yc) ax.fill_between(xs, y_min, 0, alpha=0.12, color="grey", rasterized=False) # ------- Means ------------------------------------------------------ ax.axvline(float(mtp.mean()), linestyle=":", linewidth=1.8, color=used_t, alpha=0.95) ax.axvline(float(mcp.mean()), linestyle=":", linewidth=1.8, color=used_c, alpha=0.95) # ------- Cosmetics -------------------------------------------------- ax.set_xlabel(r"$m(x) = \mathbb{P}(D=1 \mid X)$") ax.set_ylabel("Density") ax.set_title("Propensity Overlap by Treatment Group") ax.set_xlim(0.0, 1.0) ax.grid(True, linewidth=0.5, alpha=0.45) for spine in ("top", "right"): ax.spines[spine].set_visible(False) ax.legend(frameon=False) fig.tight_layout() # ------- Optional save --------------------------------------------- if save is not None: ext = str(save).lower().split(".")[-1] _dpi = save_dpi or (300 if ext in {"png", "jpg", "jpeg", "tif", "tiff"} else dpi) fig.savefig( save, dpi=_dpi, bbox_inches="tight", pad_inches=0.1, transparent=transparent, facecolor="none" if transparent else "white" ) return fig
[docs] def positivity_check_m(self, bounds: Tuple[float, float] = (0.05, 0.95)) -> Dict[str, Any]: """Check overlap/positivity for m(x) based on thresholds.""" low, high = bounds m = self.m return { "bounds": bounds, "share_below": float((m < low).mean()), "share_above": float((m > high).mean()), "flag": bool(((m < low).mean() + (m > high).mean()) > 0.02), }
# Back-compat shims
[docs] def ps_graph(self, *args, **kwargs): _warn_once("ps_graph()", "plot_m_overlap()") return self.plot_m_overlap(*args, **kwargs)
[docs] def positivity_check(self, *args, **kwargs): _warn_once("positivity_check()", "positivity_check_m()") return self.positivity_check_m(*args, **kwargs)
[docs] class OutcomeModel: """A model for outcome prediction and related diagnostics. This class encapsulates outcome predictions and provides methods for: - Computing RMSE and MAE regression metrics - Extracting SHAP values for outcome prediction The class is returned by CausalEDA.outcome_fit() and provides a cleaner interface for outcome model analysis. """
[docs] def __init__(self, predicted_outcomes: np.ndarray, actual_outcomes: np.ndarray, fitted_model: Any, feature_names: List[str], X_for_shap: Optional[np.ndarray] = None, cat_features_for_shap: Optional[List[int]] = None): """Initialize OutcomeModel with fitted model artifacts. Parameters ---------- predicted_outcomes : np.ndarray Array of predicted outcome values actual_outcomes : np.ndarray Array of actual outcome values fitted_model : Any The fitted outcome prediction model feature_names : List[str] Names of features used in the model (confounders only) X_for_shap : Optional[np.ndarray] Preprocessed feature matrix for SHAP computation cat_features_for_shap : Optional[List[int]] Categorical feature indices for SHAP computation """ self.predicted_outcomes = predicted_outcomes self.actual_outcomes = actual_outcomes self.fitted_model = fitted_model self.feature_names = feature_names self.X_for_shap = X_for_shap self.cat_features_for_shap = cat_features_for_shap
[docs] @classmethod def from_kfold( cls, X: pd.DataFrame, y: np.ndarray, model: Optional[Any] = None, n_splits: int = 5, random_state: int = 42, preprocessor: Optional[ColumnTransformer] = None, ) -> "OutcomeModel": """Estimate an outcome model via K-fold and return an OutcomeModel. Produces out-of-fold predictions using KFold. If no model is provided, a fast LinearRegression is used. Categorical features are one-hot encoded and numeric features standardized by default. """ if not isinstance(X, pd.DataFrame): raise ValueError("X must be a pandas DataFrame with named columns.") feature_names = X.columns.tolist() y = np.asarray(y) # Preprocessor default if preprocessor is None: num = X.select_dtypes(include=[np.number]).columns.tolist() cat = [c for c in X.columns if c not in num] num_transformer = Pipeline(steps=[("scaler", StandardScaler(with_mean=True, with_std=True))]) preprocessor = ColumnTransformer( transformers=[ ("num", num_transformer, num), ("cat", OneHotEncoder(handle_unknown="ignore", drop=None, sparse_output=False), cat), ], remainder="drop", ) # Model default if model is None: model = LinearRegression() cv = KFold(n_splits=n_splits, shuffle=True, random_state=random_state) if isinstance(model, CatBoostRegressor): import warnings preds = np.zeros(len(X)) with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) for tr_idx, te_idx in cv.split(X, y): X_tr, X_te = X.iloc[tr_idx], X.iloc[te_idx] y_tr = y[tr_idx] X_tr_p = preprocessor.fit_transform(X_tr) X_te_p = preprocessor.transform(X_te) fold_model = CatBoostRegressor(thread_count=-1, random_seed=random_state, verbose=False, allow_writing_files=False) fold_model.fit(X_tr_p, y_tr) preds[te_idx] = fold_model.predict(X_te_p) # Fit final model for SHAP X_full_p = preprocessor.fit_transform(X) final_model = CatBoostRegressor(thread_count=-1, random_seed=random_state, verbose=False, allow_writing_files=False) with np.errstate(divide='ignore', invalid='ignore', over='ignore'): final_model.fit(X_full_p, y) fitted = final_model X_for_shap = X_full_p else: pipe = Pipeline([("prep", preprocessor), ("reg", model)]) import warnings with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) preds = cross_val_predict(pipe, X, y, cv=cv) pipe.fit(X, y) fitted = pipe X_for_shap = None return cls( predicted_outcomes=preds, actual_outcomes=y, fitted_model=fitted, feature_names=feature_names, X_for_shap=X_for_shap, )
@property def scores(self) -> Dict[str, float]: """Compute regression metrics (RMSE and MAE) for outcome predictions. Returns ------- Dict[str, float] Dictionary containing: - 'rmse': Root Mean Squared Error - 'mae': Mean Absolute Error """ mse = mean_squared_error(self.actual_outcomes, self.predicted_outcomes) rmse = np.sqrt(mse) mae = mean_absolute_error(self.actual_outcomes, self.predicted_outcomes) return { 'rmse': float(rmse), 'mae': float(mae) } @property def shap(self) -> pd.DataFrame: """Return SHAP values from the fitted outcome prediction model. SHAP values show the directional contribution of each feature to outcome prediction, where positive values increase the predicted outcome and negative values decrease it. Returns ------- pd.DataFrame For CatBoost models: DataFrame with columns 'feature' and 'shap_mean', where 'shap_mean' represents the mean SHAP value across all samples. For sklearn models: DataFrame with columns 'feature' and 'importance' (absolute coefficient values, for backward compatibility). Raises ------ RuntimeError If the fitted model does not support SHAP values extraction. """ # Extract SHAP values or feature importance based on model type if isinstance(self.fitted_model, CatBoostRegressor): # Use CatBoost's SHAP values for directional feature contributions try: # Import Pool for SHAP computation from catboost import Pool # Check if we have the required data for SHAP computation if self.X_for_shap is None: raise RuntimeError("Preprocessed data for SHAP computation not available.") # Create Pool object for SHAP computation (numeric-only after preprocessing) shap_pool = Pool(data=self.X_for_shap) # Get SHAP values - returns array of shape (n_samples, n_features + 1) # where the last column is the bias term shap_values = self.fitted_model.get_feature_importance(type='ShapValues', data=shap_pool) # Remove bias term (last column) and compute mean SHAP values across samples # This gives us the average directional contribution of each feature shap_values_no_bias = shap_values[:, :-1] # Remove bias column importance_values = np.mean(shap_values_no_bias, axis=0) # Mean across samples feature_names = self.feature_names column_name = 'shap_mean' # Use different column name to indicate SHAP values except Exception as e: raise RuntimeError(f"Failed to extract SHAP values from CatBoost model: {e}") elif hasattr(self.fitted_model, 'named_steps') and hasattr(self.fitted_model.named_steps.get('clf'), 'coef_'): # Handle sklearn pipeline with linear regression try: clf = self.fitted_model.named_steps['clf'] # For linear regression, use absolute coefficients as importance importance_values = np.abs(clf.coef_) # Need to map back to original feature names through preprocessing # For simplicity, if we have preprocessed features, we'll use the original feature names # and aggregate importance for one-hot encoded categorical features prep = self.fitted_model.named_steps.get('prep') if hasattr(prep, 'get_feature_names_out'): try: # Try to get feature names from preprocessor transformed_names = prep.get_feature_names_out() feature_names = [str(name) for name in transformed_names] except: # Fallback to original feature names if transformation fails feature_names = self.feature_names if len(importance_values) != len(feature_names): # If lengths don't match due to one-hot encoding, we can't map back easily # Just use indices as feature names feature_names = [f"feature_{i}" for i in range(len(importance_values))] else: feature_names = self.feature_names column_name = 'importance' # Keep backward compatibility for sklearn models except Exception as e: raise RuntimeError(f"Failed to extract feature importance from sklearn model: {e}") else: raise RuntimeError(f"Feature importance extraction not supported for model type: {type(self.fitted_model)}") # Ensure we have matching lengths if len(importance_values) != len(feature_names): # This can happen with preprocessing transformations # Create generic feature names if needed if len(importance_values) > len(feature_names): feature_names = feature_names + [f"transformed_feature_{i}" for i in range(len(feature_names), len(importance_values))] else: feature_names = feature_names[:len(importance_values)] # Create DataFrame with appropriate column name result_df = pd.DataFrame({ 'feature': feature_names, column_name: importance_values }) # Sort appropriately: by absolute value for SHAP values (to show most impactful features), # by value for regular importance (higher is better) if column_name == 'shap_mean': # For SHAP values, sort by absolute value to show most impactful features first result_df = result_df.reindex(result_df[column_name].abs().sort_values(ascending=False).index) else: # For regular importance, sort by value (descending) result_df = result_df.sort_values(column_name, ascending=False) result_df = result_df.reset_index(drop=True) return result_df
# Optional lightweight dataclass for standalone usage, but CausalEDA also # supports the existing causalis.data.CausalData which uses `confounders`.
[docs] @dataclass class CausalDataLite: """A minimal container for dataset roles used by CausalEDA. Attributes - df: The full pandas DataFrame containing treatment, outcome and covariates. - treatment: Column name of the binary treatment indicator (0/1). - target: Column name of the outcome variable. - confounders: List of covariate column names used to model treatment. """ df: pd.DataFrame treatment: str target: str confounders: List[str]
def _extract_roles(data_obj: Any) -> Dict[str, Any]: """Extract dataset roles from various supported data containers. Accepts: - CausalDataLite - Project's CausalData (duck-typed: attributes df, treatment, outcome, and either confounders or confounders) - Any object exposing the same attributes/properties Returns a dict with keys: df, treatment, outcome, confounders. If both confounders/confounders are absent, it assumes all columns except treatment/outcome are confounders. """ # Direct dataclass-like attributes df = getattr(data_obj, "df") treatment_attr = getattr(data_obj, "treatment") # Support both 'outcome' (project's CausalData) and 'target' (CausalDataLite) target_attr = getattr(data_obj, "outcome", None) if target_attr is None: target_attr = getattr(data_obj, "target") # If these are Series (as in causalis.data.CausalData properties), convert to column names if isinstance(treatment_attr, pd.Series): treatment = treatment_attr.name else: treatment = treatment_attr if isinstance(target_attr, pd.Series): target = target_attr.name else: target = target_attr if hasattr(data_obj, "confounders") and getattr(data_obj, "confounders") is not None: confs = list(getattr(data_obj, "confounders")) elif hasattr(data_obj, "confounders") and getattr(data_obj, "confounders") is not None: # causalis.data.CausalData.confounders returns a DataFrame or None; if it's a # DataFrame, use its columns; if it's a list/iterable, cast to list. cofs = getattr(data_obj, "confounders") if isinstance(cofs, pd.DataFrame): confs = list(cofs.columns) else: confs = list(cofs) if cofs is not None else [] else: # Last resort: assume all columns except treatment/outcome are confounders confs = [c for c in df.columns if c not in {treatment, target}] return {"df": df, "treatment": treatment, "outcome": target, "confounders": confs}
[docs] class CausalEDA: """Exploratory diagnostics for causal designs with binary treatment. The class exposes methods to: - Summarize outcome by treatment and naive mean difference. - Estimate cross-validated propensity scores and assess treatment predictability (AUC) and positivity/overlap. - Inspect covariate balance via standardized mean differences (SMD) before/after IPTW weighting; visualize with a love plot. - Inspect weight distributions and effective sample size (ESS). """
[docs] def __init__(self, data: Any, ps_model: Optional[Any] = None, n_splits: int = 5, random_state: int = 42): roles = _extract_roles(data) self.d = CausalDataLite(df=roles["df"], treatment=roles["treatment"], target=roles["outcome"], confounders=roles["confounders"]) self.n_splits = n_splits self.random_state = random_state self.ps_model = ps_model or CatBoostClassifier( thread_count=-1, # Use all available threads random_seed=random_state, verbose=False, # Suppress training output allow_writing_files=False ) # Preprocessing: always make features numeric via OneHotEncoder for categoricals X = self.d.df[self.d.confounders] num = X.select_dtypes(include=[np.number]).columns.tolist() cat = [c for c in X.columns if c not in num] num_transformer = Pipeline(steps=[ ("scaler", StandardScaler(with_mean=True, with_std=True)) ]) self.preproc = ColumnTransformer( transformers=[ ("num", num_transformer, num), ("cat", OneHotEncoder(handle_unknown="ignore", drop=None, sparse_output=False), cat), ], remainder="drop", ) self.ps_pipe = Pipeline([("prep", self.preproc), ("clf", self.ps_model)]) # Optional: clarify no native categorical indices are used post-OHE self.cat_features = None
# ---------- basics ----------
[docs] def data_shape(self) -> Dict[str, int]: """Return the shape information of the causal dataset. Returns a dict with: - n_rows: number of rows (observations) in the dataset - n_columns: number of columns (features) in the dataset This provides a quick overview of the dataset dimensions for exploratory analysis and reporting purposes. Returns ------- Dict[str, int] Dictionary containing 'n_rows' and 'n_columns' keys with corresponding integer values representing the dataset dimensions. Examples -------- >>> eda = CausalEDA(causal_data) >>> shape_info = eda.data_shape() >>> print(f"Dataset has {shape_info['n_rows']} rows and {shape_info['n_columns']} columns") """ df = self.d.df n_rows, n_columns = df.shape return {"n_rows": n_rows, "n_columns": n_columns}
[docs] def outcome_stats(self) -> pd.DataFrame: """Comprehensive outcome statistics grouped by treatment. Returns a DataFrame with detailed outcome statistics for each treatment group, including count, mean, std, min, various percentiles, and max. This method provides comprehensive outcome analysis and returns data in a clean DataFrame format suitable for reporting. Returns ------- pd.DataFrame DataFrame with treatment groups as index and the following columns: - count: number of observations in each group - mean: average outcome value - std: standard deviation of outcome - min: minimum outcome value - p10: 10th percentile - p25: 25th percentile (Q1) - median: 50th percentile (median) - p75: 75th percentile (Q3) - p90: 90th percentile - max: maximum outcome value Examples -------- >>> eda = CausalEDA(causal_data) >>> stats = eda.outcome_stats() >>> print(stats) count mean std min p10 p25 median p75 p90 max treatment 0 3000 5.123456 2.345678 0.123456 2.345678 3.456789 5.123456 6.789012 7.890123 9.876543 1 2000 6.789012 2.456789 0.234567 3.456789 4.567890 6.789012 8.901234 9.012345 10.987654 """ df, t, y = self.d.df, self.d.treatment, self.d.target # Ensure treatment is numeric for grouping if not pd.api.types.is_numeric_dtype(df[t]): raise ValueError("Treatment must be numeric 0/1 for outcome_stats().") # Create grouped object for multiple operations grouped = df.groupby(t)[y] # Calculate basic statistics using built-in methods basic_stats = grouped.agg(['count', 'mean', 'std', 'min', 'median', 'max']) # Calculate percentiles separately to avoid pandas aggregation mixing issues p10 = grouped.quantile(0.10) p25 = grouped.quantile(0.25) p75 = grouped.quantile(0.75) p90 = grouped.quantile(0.90) # Combine all statistics into a single DataFrame stats_df = pd.DataFrame({ 'count': basic_stats['count'], 'mean': basic_stats['mean'], 'std': basic_stats['std'], 'min': basic_stats['min'], 'p10': p10, 'p25': p25, 'median': basic_stats['median'], 'p75': p75, 'p90': p90, 'max': basic_stats['max'] }) # Ensure the index is named appropriately stats_df.index.name = 'treatment' return stats_df
# ---------- propensity & overlap ----------
[docs] def fit_m(self) -> 'PropensityModel': """Estimate cross-validated m(x) = P(D=1|X). Uses a preprocessing + classifier setup with stratified K-fold to generate out-of-fold probabilities. For CatBoost, data are one-hot encoded via the configured ColumnTransformer before fitting. Returns a PropensityModel. """ df = self.d.df X = df[self.d.confounders] t = df[self.d.treatment].astype(int).values if not np.isin(t, [0, 1]).all(): raise ValueError("Treatment must be binary {0,1}.") cv = StratifiedKFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state) if isinstance(self.ps_model, CatBoostClassifier): import warnings ps = np.zeros(len(X)) with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) for train_idx, test_idx in cv.split(X, t): X_train, X_test = X.iloc[train_idx], X.iloc[test_idx] t_train = t[train_idx] X_train_prep = self.preproc.fit_transform(X_train) X_test_prep = self.preproc.transform(X_test) model = CatBoostClassifier(thread_count=-1, random_seed=self.random_state, verbose=False, allow_writing_files=False) model.fit(X_train_prep, t_train) ps[test_idx] = model.predict_proba(X_test_prep)[:, 1] else: import warnings with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) ps = cross_val_predict(self.ps_pipe, X, t, cv=cv, method="predict_proba")[:, 1] m = np.clip(ps, 1e-6, 1 - 1e-6) self._m = m if isinstance(self.ps_model, CatBoostClassifier): X_full_prep = self.preproc.fit_transform(X) final_model = CatBoostClassifier(thread_count=-1, random_seed=self.random_state, verbose=False) with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) final_model.fit(X_full_prep, t) self._fitted_model = final_model try: self._feature_names = self.preproc.get_feature_names_out().tolist() except Exception: self._feature_names = [f"feature_{i}" for i in range(X_full_prep.shape[1])] self._X_for_shap = X_full_prep else: with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) self.ps_pipe.fit(X, t) self._fitted_model = self.ps_pipe self._feature_names = X.columns.tolist() return PropensityModel( m=m, d=t, fitted_model=self._fitted_model, feature_names=self._feature_names, X_for_shap=getattr(self, '_X_for_shap', None) )
# Back-compat shim
[docs] def fit_propensity(self) -> 'PropensityModel': _warn_once("fit_propensity()", "fit_m()") return self.fit_m()
[docs] def outcome_fit(self, outcome_model: Optional[Any] = None) -> 'OutcomeModel': """Fit a regression model to predict outcome from confounders only. Uses a preprocessing+CatBoost regressor pipeline with K-fold cross_val_predict to generate out-of-fold predictions. CatBoost uses all available threads and handles categorical features natively. Returns an OutcomeModel instance containing predicted outcomes and diagnostic methods. The outcome model predicts the baseline outcome from confounders only, excluding treatment. This is essential for proper causal analysis. Parameters ---------- outcome_model : Optional[Any] Custom regression model to use. If None, uses CatBoostRegressor. Returns ------- OutcomeModel An OutcomeModel instance with methods for: - scores: RMSE and MAE regression metrics - shap: SHAP values DataFrame property for outcome prediction """ df = self.d.df # Features: confounders only (treatment excluded for proper causal analysis) X = df[self.d.confounders] y = df[self.d.target].values cv = KFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state) # Default to CatBoostRegressor if no custom model provided if outcome_model is None: outcome_model = CatBoostRegressor( thread_count=-1, random_seed=self.random_state, verbose=False, allow_writing_files=False ) # Identify numeric and categorical features for preprocessing num_features = X.select_dtypes(include=[np.number]).columns.tolist() cat_features = [c for c in X.columns if c not in num_features] # Setup preprocessing: always OHE categoricals so model input is numeric num_transformer = Pipeline(steps=[ ("scaler", StandardScaler(with_mean=True, with_std=True)) ]) preproc = ColumnTransformer( transformers=[ ("num", num_transformer, num_features), ("cat", OneHotEncoder(handle_unknown="ignore", drop=None, sparse_output=False), cat_features), ], remainder="drop", ) # Special handling for CatBoost to properly pass categorical features if isinstance(outcome_model, CatBoostRegressor): import warnings predictions = np.zeros(len(X)) with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) for train_idx, test_idx in cv.split(X, y): # Prepare data for this fold X_train, X_test = X.iloc[train_idx], X.iloc[test_idx] y_train = y[train_idx] # Apply preprocessing X_train_prep = preproc.fit_transform(X_train) X_test_prep = preproc.transform(X_test) # Create and train CatBoost model for this fold model = CatBoostRegressor( thread_count=-1, random_seed=self.random_state, verbose=False, allow_writing_files=False ) # All features are numeric after preprocessing; no cat_features needed model.fit(X_train_prep, y_train) predictions[test_idx] = model.predict(X_test_prep) else: # Use standard sklearn pipeline for non-CatBoost models pipeline = Pipeline([("prep", preproc), ("reg", outcome_model)]) import warnings with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) predictions = cross_val_predict(pipeline, X, y, cv=cv) self._outcome_predictions = predictions # Train a final model on the full dataset for SHAP computation if isinstance(outcome_model, CatBoostRegressor): # Apply preprocessing to full dataset X_full_prep = preproc.fit_transform(X) # Create and train final model final_model = CatBoostRegressor( thread_count=-1, random_seed=self.random_state, verbose=False ) with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) final_model.fit(X_full_prep, y) # Store the trained model and data needed for SHAP computation self._outcome_fitted_model = final_model self._outcome_feature_names = X.columns.tolist() self._outcome_X_for_shap = X_full_prep # Store preprocessed data for SHAP else: # For non-CatBoost models, fit the pipeline on full data pipeline = Pipeline([("prep", preproc), ("reg", outcome_model)]) with np.errstate(divide='ignore', invalid='ignore', over='ignore'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) pipeline.fit(X, y) self._outcome_fitted_model = pipeline self._outcome_feature_names = X.columns.tolist() # Create and return OutcomeModel instance return OutcomeModel( predicted_outcomes=predictions, actual_outcomes=y, fitted_model=self._outcome_fitted_model, feature_names=self._outcome_feature_names, X_for_shap=getattr(self, '_outcome_X_for_shap', None) )
[docs] def auc_m(self, m: Optional[np.ndarray] = None) -> float: """Compute ROC AUC of treatment assignment vs. m(x).""" if m is None: m = getattr(self, "_m", None) if m is None: pm = self.fit_m() m = pm.m t = self.d.df[self.d.treatment].astype(int).values return float(roc_auc_score(t, m))
[docs] def positivity_check_m(self, m: Optional[np.ndarray] = None, bounds: Tuple[float, float] = (0.05, 0.95)) -> Dict[str, Any]: """Check overlap/positivity for m(x) based on thresholds.""" if m is None: m = getattr(self, "_m", None) if m is None: pm = self.fit_m() m = pm.m low, high = bounds return { "bounds": bounds, "share_below": float((m < low).mean()), "share_above": float((m > high).mean()), "flag": bool(((m < low).mean() + (m > high).mean()) > 0.02), }
[docs] def plot_m_overlap(self, m: Optional[np.ndarray] = None): """Plot overlaid histograms of m(x) for treated vs control.""" if m is None: m = getattr(self, "_m", None) if m is None: pm = self.fit_m() m = pm.m df = self.d.df t = df[self.d.treatment].astype(int).values fig, ax = plt.subplots() ax.hist(m[t == 1], bins=30, alpha=0.5, density=True, label="treated") ax.hist(m[t == 0], bins=30, alpha=0.5, density=True, label="control") ax.set_xlabel("m(x) = P(D=1|X)") ax.set_ylabel("Density") ax.legend() ax.set_title("Overlap of m(x)") return fig
# Back-compat shims for public API
[docs] def confounders_roc_auc(self, ps: Optional[np.ndarray] = None) -> float: _warn_once("confounders_roc_auc()", "auc_m()") return self.auc_m(m=ps)
[docs] def positivity_check(self, ps: Optional[np.ndarray] = None, bounds: Tuple[float, float] = (0.05, 0.95)) -> Dict[str, Any]: _warn_once("positivity_check()", "positivity_check_m()") return self.positivity_check_m(m=ps, bounds=bounds)
[docs] def plot_ps_overlap(self, ps: Optional[np.ndarray] = None): _warn_once("plot_ps_overlap()", "plot_m_overlap()") return self.plot_m_overlap(m=ps)
# ---------- balance ----------
[docs] def confounders_means(self) -> pd.DataFrame: """Comprehensive confounders balance assessment with means by treatment group. Returns a DataFrame with detailed balance information including: - Mean values of each confounder for control group (treatment=0) - Mean values of each confounder for treated group (treatment=1) - Absolute difference between treatment groups - Standardized Mean Difference (SMD) for formal balance assessment - Kolmogorov–Smirnov statistic (ks) and p-value (ks_pvalue) for distributional differences This method provides a comprehensive view of confounder balance by showing the actual mean values alongside the standardized differences, making it easier to understand both the magnitude and direction of imbalances. Returns ------- pd.DataFrame DataFrame with confounders as index and the following columns: - mean_t_0: mean value for control group (treatment=0) - mean_t_1: mean value for treated group (treatment=1) - abs_diff: absolute difference abs(mean_t_1 - mean_t_0) - smd: standardized mean difference (Cohen's d) - ks: Kolmogorov–Smirnov statistic - ks_pvalue: p-value of the KS test Notes ----- SMD values > 0.1 in absolute value typically indicate meaningful imbalance. Categorical variables are automatically converted to dummy variables. Examples -------- >>> eda = CausalEDA(causal_data) >>> balance = eda.confounders_means() >>> print(balance.head()) mean_t_0 mean_t_1 abs_diff smd confounders age 29.5 31.2 1.7 0.085 income 45000.0 47500.0 2500.0 0.125 education 0.25 0.35 0.1 0.215 """ # Delegate implementation to dedicated balance module for reuse and testing from .cofounders_balance import confounders_balance return confounders_balance( df=self.d.df, treatment=self.d.treatment, confounders=self.d.confounders, )
from typing import Optional, Tuple
[docs] def outcome_hist( self, treatment: Optional[str] = None, target: Optional[str] = None, bins="fd", # smarter default (still accepts int) density: bool = True, alpha: float = 0.45, sharex: bool = True, kde: bool = True, # overlay smooth density (SciPy-free) clip: Optional[Tuple[float, float]] = (0.01, 0.99), # trim tails for nicer view figsize: Tuple[float, float] = (9, 5.5), dpi: int = 220, font_scale: float = 1.15, save: Optional[str] = None, # "outcome.png" / ".svg" / ".pdf" save_dpi: Optional[int] = None, transparent: bool = False, ): """ Plot the distribution of the outcome for each treatment on a single, pretty plot. Features -------- - High-DPI canvas + scalable fonts - Default Matplotlib colors; KDE & mean lines match their histogram colors - Numeric outcomes: shared x-range (optional), optional KDE, quantile clipping - Categorical outcomes: normalized grouped bars by treatment - Optional hi-res export (PNG/SVG/PDF) """ import numpy as np import pandas as pd import matplotlib as mpl import matplotlib.pyplot as plt # ---------- Helpers ----------------------------------------------------- def _silverman_bandwidth(x: np.ndarray) -> float: x = np.asarray(x, float) n = x.size if n < 2: return 0.04 sd = np.std(x, ddof=1) iqr = np.subtract(*np.percentile(x, [75, 25])) s = sd if iqr <= 0 else min(sd, iqr / 1.34) h = 0.9 * s * n ** (-1 / 5) return float(max(h, 1e-6)) def _kde_unbounded(x: np.ndarray, xs: np.ndarray, h: float) -> np.ndarray: """ Gaussian KDE on R, implemented with NumPy (no SciPy). Handles degenerate cases by drawing a small bump at the mean. """ x = np.asarray(x, float) if x.size == 0: return np.zeros_like(xs) if x.size < 2 or np.std(x) < 1e-12: mu = float(np.mean(x)) if x.size else 0.0 h0 = max(h, 1e-3) z = (xs - mu) / h0 return np.exp(-0.5 * z ** 2) / (np.sqrt(2 * np.pi) * h0) diff = (xs[None, :] - x[:, None]) / h kern = np.exp(-0.5 * diff ** 2) / (np.sqrt(2 * np.pi) * h) return kern.mean(axis=0) def _first_patch_color(patches, fallback): for p in patches: fc = p.get_facecolor() if fc is not None: return fc return fallback # ---------- Data & columns --------------------------------------------- df = self.d.df t_col = treatment or self.d.treatment y_col = target or self.d.target if t_col not in df.columns or y_col not in df.columns: raise ValueError("Specified treatment/outcome columns not found in DataFrame.") treatments = pd.unique(df[t_col]) # preserves input order # Filter rows with valid outcome & treatment valid = df[[t_col, y_col]].dropna() if valid.empty: raise ValueError("No non-missing values for the selected treatment/outcome.") # ---------- Figure with high DPI & scaled fonts ------------------------ rc = { "font.size": 11 * font_scale, "axes.titlesize": 13 * font_scale, "axes.labelsize": 12 * font_scale, "legend.fontsize": 10 * font_scale, "xtick.labelsize": 10 * font_scale, "ytick.labelsize": 10 * font_scale, } with mpl.rc_context(rc): fig, ax = plt.subplots(figsize=figsize, dpi=dpi) # ---------- Branch: categorical vs numeric ------------------------- if not pd.api.types.is_numeric_dtype(valid[y_col]): # CATEGORICAL: normalized grouped bars vals = pd.unique(valid[y_col]) # Stable, readable category order vals_sorted = sorted(vals, key=lambda v: (str(type(v)), str(v))) width = 0.8 / max(1, len(treatments)) x = np.arange(len(vals_sorted)) # Color cycle cycle = mpl.rcParams["axes.prop_cycle"].by_key().get("color", ["C0", "C1", "C2", "C3"]) for i, tr in enumerate(treatments): sub = valid.loc[valid[t_col] == tr, y_col] counts = sub.value_counts(normalize=True) heights = [float(counts.get(v, 0.0)) for v in vals_sorted] ax.bar(x + i * width, heights, width=width, alpha=alpha, label=f"{tr} (n={sub.shape[0]})", color=cycle[i % len(cycle)], edgecolor="white", linewidth=0.6) ax.set_xticks(x + (len(treatments) - 1) * width / 2) ax.set_xticklabels([str(v) for v in vals_sorted]) ax.set_ylabel("Proportion") ax.set_xlabel(str(y_col)) ax.set_title("Outcome distribution by treatment (categorical)") ax.grid(True, axis="y", linewidth=0.5, alpha=0.45) for spine in ("top", "right"): ax.spines[spine].set_visible(False) ax.legend(title=str(t_col), frameon=False) fig.tight_layout() else: # NUMERIC: overlay histograms (+ optional KDE) per treatment y_all = valid[y_col].to_numpy() # Shared x-limits for all treatments if requested if sharex: if clip: lo, hi = np.quantile(y_all, [clip[0], clip[1]]) else: lo, hi = np.nanmin(y_all), np.nanmax(y_all) if not np.isfinite(lo) or not np.isfinite(hi) or hi <= lo: lo, hi = float(np.nanmin(y_all)), float(np.nanmax(y_all)) hist_range = (float(lo), float(hi)) ax.set_xlim(*hist_range) else: hist_range = None # each call to hist will expand limits as needed # Keep track of colors actually used so KDE/means match cycle = mpl.rcParams["axes.prop_cycle"].by_key().get("color", ["C0", "C1", "C2", "C3"]) used_colors = {} # Draw hists for i, tr in enumerate(treatments): y_vals = valid.loc[valid[t_col] == tr, y_col].to_numpy() y_vals = y_vals[np.isfinite(y_vals)] if y_vals.size == 0: continue h = ax.hist( y_vals, bins=bins, density=density, alpha=alpha, label=f"{tr} (n={y_vals.size})", range=hist_range, edgecolor="white", linewidth=0.6, color=None, # let Matplotlib choose ) color_this = _first_patch_color(h[2], cycle[i % len(cycle)]) used_colors[tr] = color_this # KDE overlays (SciPy-free, stable) if kde and len(used_colors) > 0: # Build a common grid for smooth lines if hist_range is None: # Determine from all numeric values (optionally clipped) if clip: lo, hi = np.quantile(y_all, [clip[0], clip[1]]) else: lo, hi = np.nanmin(y_all), np.nanmax(y_all) if not np.isfinite(lo) or not np.isfinite(hi) or hi <= lo: lo, hi = float(np.nanmin(y_all)), float(np.nanmax(y_all)) else: lo, hi = hist_range xs = np.linspace(float(lo), float(hi), 800) for tr in treatments: if tr not in used_colors: continue y_vals = valid.loc[valid[t_col] == tr, y_col].to_numpy() y_vals = y_vals[np.isfinite(y_vals)] if y_vals.size == 0: continue hbw = _silverman_bandwidth(y_vals) dens = _kde_unbounded(y_vals, xs, hbw) # If histogram is counts, scale KDE area roughly to counts for visual parity if not density: # approximate scaling by total count and bin width near midrange bw = (hi - lo) / (bins if isinstance(bins, int) else 30) dens = dens * y_vals.size * bw ax.plot(xs, dens, linewidth=2.2, color=used_colors[tr], label=f"{tr} (KDE)") # Mean lines for tr in treatments: y_vals = valid.loc[valid[t_col] == tr, y_col].to_numpy() y_vals = y_vals[np.isfinite(y_vals)] if y_vals.size == 0: continue mu = float(np.mean(y_vals)) # Clamp mean line to visible range if sharex & clipped if sharex and hist_range is not None: mu = float(np.clip(mu, hist_range[0], hist_range[1])) ax.axvline(mu, linestyle=":", linewidth=1.8, color=used_colors.get(tr, "k"), alpha=0.95) ax.set_xlabel(str(y_col)) ax.set_ylabel("Density" if density else "Count") ax.set_title("Outcome distribution by treatment") ax.grid(True, linewidth=0.5, alpha=0.45) for spine in ("top", "right"): ax.spines[spine].set_visible(False) ax.legend(title=str(t_col), frameon=False) fig.tight_layout() # -------- Optional high-res save ----------------------------------- if save is not None: ext = str(save).lower().split(".")[-1] _dpi = save_dpi or (300 if ext in {"png", "jpg", "jpeg", "tif", "tiff"} else dpi) fig.savefig( save, dpi=_dpi, bbox_inches="tight", pad_inches=0.1, transparent=transparent, facecolor="none" if transparent else "white", ) plt.close(fig) return fig
from typing import Optional, Tuple
[docs] def outcome_boxplot( self, treatment: Optional[str] = None, target: Optional[str] = None, figsize: Tuple[float, float] = (9, 5.5), dpi: int = 220, font_scale: float = 1.15, showfliers: bool = True, patch_artist: bool = True, save: Optional[str] = None, # "boxplot.png" / ".svg" / ".pdf" save_dpi: Optional[int] = None, transparent: bool = False, ): """ Prettified boxplot of the outcome by treatment. Features -------- - High-DPI figure, scalable fonts - Soft modern color styling (default Matplotlib palette) - Optional outliers, gentle transparency - Optional save to PNG/SVG/PDF """ import numpy as np import pandas as pd import matplotlib as mpl import matplotlib.pyplot as plt # --- Data setup -------------------------------------------------------- df = self.d.df t_col = treatment or self.d.treatment y_col = target or self.d.target if t_col not in df.columns or y_col not in df.columns: raise ValueError("Specified treatment/outcome columns not found in DataFrame.") # Drop rows with missing outcome df_valid = df[[t_col, y_col]].dropna() if df_valid.empty: raise ValueError("No valid rows with both treatment and outcome present.") treatments = pd.unique(df_valid[t_col]) data = [df_valid.loc[df_valid[t_col] == tr, y_col].values for tr in treatments] # --- Matplotlib rc settings -------------------------------------------- rc = { "font.size": 11 * font_scale, "axes.titlesize": 13 * font_scale, "axes.labelsize": 12 * font_scale, "legend.fontsize": 10 * font_scale, "xtick.labelsize": 10 * font_scale, "ytick.labelsize": 10 * font_scale, } with mpl.rc_context(rc): fig, ax = plt.subplots(figsize=figsize, dpi=dpi) # Use Matplotlib default color cycle cycle = mpl.rcParams["axes.prop_cycle"].by_key().get("color", ["C0", "C1", "C2", "C3"]) colors = [cycle[i % len(cycle)] for i in range(len(treatments))] # --- Draw boxplot --------------------------------------------------- bp = ax.boxplot( data, patch_artist=patch_artist, labels=[str(tr) for tr in treatments], showfliers=showfliers, boxprops=dict(linewidth=1.1, alpha=0.8), whiskerprops=dict(linewidth=1.0, alpha=0.8), capprops=dict(linewidth=1.0, alpha=0.8), medianprops=dict(linewidth=2.0, color="black"), flierprops=dict( marker="o", markersize=4, markerfacecolor="grey", alpha=0.6, markeredgewidth=0.3, ), ) # --- Colorize boxes with soft fill --------------------------------- if patch_artist: for patch, color in zip(bp["boxes"], colors): patch.set_facecolor(color) patch.set_alpha(0.35) patch.set_edgecolor(color) # --- Titles and cosmetics ------------------------------------------ ax.set_xlabel(str(t_col)) ax.set_ylabel(str(y_col)) ax.set_title("Outcome by treatment (boxplot)") ax.grid(True, axis="y", linewidth=0.5, alpha=0.45) for spine in ("top", "right"): ax.spines[spine].set_visible(False) fig.tight_layout() # --- Optional save ------------------------------------------------- if save is not None: ext = str(save).lower().split(".")[-1] _dpi = save_dpi or (300 if ext in {"png", "jpg", "jpeg", "tif", "tiff"} else dpi) fig.savefig( save, dpi=_dpi, bbox_inches="tight", pad_inches=0.1, transparent=transparent, facecolor="none" if transparent else "white", ) return fig
[docs] def outcome_plots(self, treatment: Optional[str] = None, target: Optional[str] = None, bins: int = 30, density: bool = True, alpha: float = 0.5, figsize: Tuple[float, float] = (7, 4), sharex: bool = True) -> Tuple[plt.Figure, plt.Figure]: """ Plot the distribution of the outcome for every treatment on one plot, and also produce a boxplot by treatment to visualize outliers. Parameters ---------- treatment : Optional[str] Treatment column name. Defaults to the treatment stored in the CausalEDA data. target : Optional[str] Target/outcome column name. Defaults to the outcome stored in the CausalEDA data. bins : int Number of bins for histograms when the outcome is numeric. density : bool Whether to normalize histograms to form a density. alpha : float Transparency for overlaid histograms. figsize : tuple Figure size for the plots. sharex : bool If True and the outcome is numeric, use the same x-limits across treatments. Returns ------- Tuple[matplotlib.figure.Figure, matplotlib.figure.Figure] (fig_distribution, fig_boxplot) """ fig_hist = self.outcome_hist( treatment=treatment, target=target, bins=bins, density=density, alpha=alpha, figsize=figsize, sharex=sharex, ) fig_box = self.outcome_boxplot( treatment=treatment, target=target, figsize=figsize, ) return fig_hist, fig_box
[docs] def m_features(self) -> pd.DataFrame: """Return feature attribution from the fitted m(x) model. - CatBoost path: SHAP attributions with columns 'shap_mean' and 'shap_mean_abs', sorted by 'shap_mean_abs'. Uses transformed feature names from the preprocessor. - Sklearn path (LogisticRegression): absolute coefficients reported as 'coef_abs'. """ # Check if model has been fitted if not hasattr(self, '_fitted_model') or self._fitted_model is None: raise RuntimeError("No fitted propensity model found. Please call fit_m() first.") if not hasattr(self, '_feature_names') or self._feature_names is None: raise RuntimeError("Feature names not available. Please call fit_m() first.") if isinstance(self._fitted_model, CatBoostClassifier): try: from catboost import Pool if not hasattr(self, '_X_for_shap') or self._X_for_shap is None: raise RuntimeError("Preprocessed data for SHAP computation not available. Please call fit_m() first.") shap_pool = Pool(data=self._X_for_shap) shap_values = self._fitted_model.get_feature_importance(type='ShapValues', data=shap_pool) shap_values_no_bias = shap_values[:, :-1] shap_mean = np.mean(shap_values_no_bias, axis=0) shap_mean_abs = np.mean(np.abs(shap_values_no_bias), axis=0) feature_names = list(self._feature_names) if len(feature_names) != shap_values_no_bias.shape[1]: raise RuntimeError("Feature names length does not match SHAP values. Ensure transformed names are used.") # Adjust shap_mean to have magnitude equal to shap_mean_abs while preserving sign shap_mean_signed = np.sign(shap_mean) * shap_mean_abs result_df = pd.DataFrame({ 'feature': feature_names, 'shap_mean': shap_mean_signed, 'shap_mean_abs': shap_mean_abs, }) # Sort by shap_mean_abs (magnitude) descending for determinism result_df = result_df.sort_values('shap_mean_abs', ascending=False).reset_index(drop=True) # Strip ColumnTransformer/pipeline prefixes like 'num__' or 'cat__' for readability try: result_df['feature'] = result_df['feature'].astype(str).str.replace(r'^[^_]+__', '', regex=True) except Exception: pass # Augment with probability-scale metrics using baseline p0 = mean m p0 = float(np.mean(getattr(self, '_m', None))) if hasattr(self, '_m') else np.nan if not np.isfinite(p0): raise RuntimeError("Baseline m estimates not available to compute probability-based columns. Fit m first.") p0 = float(np.clip(p0, 1e-9, 1 - 1e-9)) logit_p0 = float(np.log(p0 / (1.0 - p0))) result_df['odds_mult_abs'] = np.exp(result_df['shap_mean_abs'].values) result_df['exact_pp_change_abs'] = 1.0 / (1.0 + np.exp(-(logit_p0 + result_df['shap_mean_abs'].values))) - p0 result_df['exact_pp_change_signed'] = 1.0 / (1.0 + np.exp(-(logit_p0 + result_df['shap_mean'].values))) - p0 return result_df except Exception as e: raise RuntimeError(f"Failed to extract SHAP values from CatBoost model: {e}") elif hasattr(self._fitted_model, 'named_steps') and hasattr(self._fitted_model.named_steps.get('clf'), 'coef_'): try: clf = self._fitted_model.named_steps['clf'] coef_abs = np.abs(clf.coef_[0]) prep = self._fitted_model.named_steps.get('prep') if hasattr(prep, 'get_feature_names_out'): try: feature_names = [str(n) for n in prep.get_feature_names_out()] except Exception: feature_names = [f"feature_{i}" for i in range(len(coef_abs))] else: feature_names = [f"feature_{i}" for i in range(len(coef_abs))] if len(coef_abs) != len(feature_names): raise RuntimeError("Feature names length does not match coefficients.") df = pd.DataFrame({'feature': feature_names, 'importance': coef_abs, 'coef_abs': coef_abs}) # Strip ColumnTransformer/pipeline prefixes like 'num__' or 'cat__' for readability try: df['feature'] = df['feature'].astype(str).str.replace(r'^[^_]+__', '', regex=True) except Exception: pass df = df.sort_values('importance', ascending=False).reset_index(drop=True) return df except Exception as e: raise RuntimeError(f"Failed to extract feature importance from sklearn model: {e}") else: raise RuntimeError(f"Feature importance extraction not supported for model type: {type(self._fitted_model)}")
# Back-compat shim
[docs] def treatment_features(self) -> pd.DataFrame: _warn_once("treatment_features()", "m_features()") return self.m_features()