"""
Sensitivity functions refactored into a dedicated module.
This module centralizes bias-aware sensitivity helpers and the public
entry points used by refutation utilities for unconfoundedness.
"""
from __future__ import annotations
from typing import Dict, Any, Optional, List, Tuple
import warnings
import numpy as np
import pandas as pd
__all__ = ["sensitivity_analysis", "sensitivity_benchmark", "get_sensitivity_summary"]
# ---------------- Internals ----------------
_ESSENTIALLY_ZERO = 1e-32
# ---------------- Core sensitivity primitives (public, legacy-compatible) ----------------
def _compute_sensitivity_bias_unified(
sigma2: np.ndarray | float,
nu2: np.ndarray | float,
psi_sigma2: np.ndarray,
psi_nu2: np.ndarray,
) -> tuple[float, np.ndarray]:
"""
max_bias = sqrt(max(sigma2 * nu2, 0)). Influence function via delta method.
Returns zero IF on the boundary and an IF shaped like psi_sigma2 otherwise.
"""
sigma2_f = float(np.asarray(sigma2).reshape(()))
nu2_f = float(np.asarray(nu2).reshape(()))
if not (sigma2_f > 0.0 and nu2_f > 0.0):
return 0.0, np.zeros_like(psi_sigma2, dtype=float)
max_bias = float(np.sqrt(sigma2_f * nu2_f))
denom = 2.0 * max_bias if max_bias > _ESSENTIALLY_ZERO else 1.0
psi_sigma2 = np.asarray(psi_sigma2, float)
psi_sigma2 = psi_sigma2 - float(np.mean(psi_sigma2))
psi_nu2 = np.asarray(psi_nu2, float)
psi_nu2 = psi_nu2 - float(np.mean(psi_nu2))
psi_max_bias = (sigma2_f * psi_nu2 + nu2_f * psi_sigma2) / denom
return max_bias, psi_max_bias
# Backward-compatible alias
def _compute_sensitivity_bias(
sigma2: np.ndarray | float,
nu2: np.ndarray | float,
psi_sigma2: np.ndarray,
psi_nu2: np.ndarray,
) -> tuple[float, np.ndarray]:
return _compute_sensitivity_bias_unified(sigma2, nu2, psi_sigma2, psi_nu2)
def _combine_nu2(m_alpha: np.ndarray, rr: np.ndarray, cf_y: float, cf_d: float, rho: float) -> tuple[float, np.ndarray]:
"""Combine sensitivity levers into nu2 via per-unit quadratic form.
nu2_i = (sqrt(2*m_alpha_i))^2 * cf_y + (|rr_i|)^2 * cf_d + 2*rho*sqrt(cf_y*cf_d)*|rr_i|*sqrt(2*m_alpha_i)
Returns (nu2, psi_nu2) with psi_nu2 centered.
Note: we use abs(rr) for a conservative worst-case cross-term; the quadratic
form is PSD for signed rr as well, but abs() avoids reductions when rr < 0.
"""
cf_y = float(cf_y)
cf_d = float(cf_d)
rho = float(np.clip(rho, -1.0, 1.0))
if cf_y < 0 or cf_d < 0:
raise ValueError("cf_y and cf_d must be >= 0.")
a = np.sqrt(2.0 * np.maximum(np.asarray(m_alpha, dtype=float), 0.0))
b = np.abs(np.asarray(rr, dtype=float))
base = (a * a) * cf_y + (b * b) * cf_d + 2.0 * rho * np.sqrt(cf_y * cf_d) * a * b
# numeric PSD clamp
base = np.maximum(base, 0.0)
nu2 = float(np.mean(base))
psi_nu2 = base - nu2
return nu2, psi_nu2
# ---------------- Bias-aware helpers (local variants + pullers) ----------------
def _combine_nu2_local(m_alpha: np.ndarray, rr: np.ndarray, cf_y: float, cf_d: float, rho: float, *, use_signed_rr: bool) -> tuple[float, np.ndarray]:
"""Nu^2 via per-unit quadratic form with optional sign-preserving rr."""
cf_y = float(cf_y); cf_d = float(cf_d); rho = float(np.clip(rho, -1.0, 1.0))
if cf_y < 0 or cf_d < 0:
raise ValueError("cf_y and cf_d must be >= 0.")
a = np.sqrt(2.0 * np.maximum(np.asarray(m_alpha, float), 0.0))
b = np.asarray(rr, float)
if not use_signed_rr:
b = np.abs(b) # worst-case sign
base = (a * a) * cf_y + (b * b) * cf_d + 2.0 * rho * np.sqrt(cf_y * cf_d) * a * b
base = np.maximum(base, 0.0)
nu2 = float(np.mean(base))
psi_nu2 = base - nu2
return nu2, psi_nu2
def _compute_sensitivity_bias_local(
sigma2: float,
nu2: float,
psi_sigma2: np.ndarray,
psi_nu2: np.ndarray,
) -> tuple[float, np.ndarray]:
"""Backward-compatible wrapper delegating to unified helper."""
return _compute_sensitivity_bias_unified(sigma2, nu2, psi_sigma2, psi_nu2)
def _pull_theta_se_ci(effect_estimation: Dict[str, Any], level: float) -> tuple[float, float, tuple[float, float]]:
"""Robustly extract θ, se, and sampling CI."""
from scipy.stats import norm as _norm
model = effect_estimation['model']
# theta
try:
theta = float(effect_estimation.get('coefficient', float(model.coef_[0])))
except Exception:
theta = float(model.coef_[0])
# se
try:
se = float(effect_estimation.get('std_error', float(model.se_[0])))
except Exception:
se = float(model.se_[0])
# sampling CI
ci = effect_estimation.get('confidence_interval', None)
if ci is None and hasattr(model, 'confint'):
try:
ci_df = model.confint(level=level)
if isinstance(ci_df, pd.DataFrame):
lower = None; upper = None
for col in ['ci_lower', f"{(1-level)/2*100:.1f} %", '2.5 %', '2.5%']:
if col in ci_df.columns:
lower = float(ci_df[col].iloc[0]); break
for col in ['ci_upper', f"{(0.5+level/2)*100:.1f} %", '97.5 %', '97.5%']:
if col in ci_df.columns:
upper = float(ci_df[col].iloc[0]); break
if lower is None or upper is None:
lower = float(ci_df.iloc[0, 0]); upper = float(ci_df.iloc[0, 1])
ci = (lower, upper)
except Exception:
pass
if ci is None:
z = _norm.ppf(0.5 + level/2.0)
ci = (theta - z*se, theta + z*se)
return float(theta), float(se), (float(ci[0]), float(ci[1]))
# ---------------- Public API: bias-aware CI and text summaries ----------------
def compute_bias_aware_ci(
effect_estimation: Dict[str, Any],
*,
cf_y: float,
cf_d: float,
rho: float = 1.0,
level: float = 0.95,
use_signed_rr: bool = False
) -> Dict[str, Any]:
"""
Returns a dict with:
- theta, se, level, z
- sampling_ci
- theta_bounds_confounding = [theta_lower, theta_upper] = theta ± max_bias
- bias_aware_ci = [theta - (max_bias + z*se), theta + (max_bias + z*se)]
- max_bias and components (sigma2, nu2)
"""
from scipy.stats import norm as _norm
if not isinstance(effect_estimation, dict) or 'model' not in effect_estimation:
raise TypeError("Pass the usual result dict with a fitted model under key 'model'.")
theta, se, sampling_ci = _pull_theta_se_ci(effect_estimation, level)
z = float(_norm.ppf(0.5 + level/2.0))
model = effect_estimation['model']
# Default: no confounding info → bias_aware = sampling CI
max_bias = 0.0
sigma2 = np.nan; nu2 = np.nan
if hasattr(model, "_sensitivity_element_est"):
elems = model._sensitivity_element_est()
sigma2 = float(elems["sigma2"])
psi_sigma2 = np.asarray(elems["psi_sigma2"], float)
psi_sigma2 = psi_sigma2 - float(np.mean(psi_sigma2))
m_alpha = np.asarray(elems["m_alpha"], float)
rr = np.asarray(elems["riesz_rep"], float)
nu2, psi_nu2 = _combine_nu2_local(m_alpha, rr, cf_y, cf_d, rho, use_signed_rr=use_signed_rr)
max_bias = float(np.sqrt(max(nu2, 0.0)) * se)
theta_lower = float(theta) - float(max_bias)
theta_upper = float(theta) + float(max_bias)
# Graceful fallback: if se is non-finite, report confounding bounds only
if not (np.isfinite(se) and se >= 0.0 and np.isfinite(z)):
bias_aware_ci = (float(theta) - float(max_bias), float(theta) + float(max_bias))
else:
bias_aware_ci = (
float(theta) - (float(max_bias) + z * float(se)),
float(theta) + (float(max_bias) + z * float(se)),
)
return dict(
theta=float(theta),
se=float(se),
level=float(level),
z=z,
sampling_ci=tuple(map(float, sampling_ci)),
theta_bounds_confounding=(float(theta_lower), float(theta_upper)),
bias_aware_ci=tuple(map(float, bias_aware_ci)),
max_bias=float(max_bias),
sigma2=float(sigma2),
nu2=float(nu2),
params=dict(cf_y=float(cf_y), cf_d=float(cf_d), rho=float(np.clip(rho, -1.0, 1.0)), use_signed_rr=bool(use_signed_rr)),
)
def format_bias_aware_summary(res: Dict[str, Any], label: str | None = None) -> str:
"""Pretty, one-row printout (aligned with new summary style)."""
lbl = (label or 'theta').rjust(6)
ci_l, ci_u = res['sampling_ci']
th_l, th_u = res['theta_bounds_confounding']
bci_l, bci_u = res['bias_aware_ci']
theta = res['theta']; se = res['se']
level = res['level']; z = res['z']
cf = res['params']
lines = []
lines.append("================== Bias-aware Interval ==================")
lines.append("")
lines.append("------------------ Scenario ------------------")
lines.append(f"Significance Level: level={level}")
lines.append(f"Sensitivity parameters: cf_y={cf['cf_y']}; cf_d={cf['cf_d']}, rho={cf['rho']}, use_signed_rr={cf['use_signed_rr']}")
lines.append("")
lines.append("------------------ Components ------------------")
lines.append(f"{'':>6} {'theta':>11} {'se':>11} {'z':>8} {'max_bias':>12} {'sigma2':>12} {'nu2':>12}")
lines.append(f"{lbl} {theta:11.6f} {se:11.6f} {z:8.4f} {res['max_bias']:12.6f} {res['sigma2']:12.6f} {res['nu2']:12.6f}")
lines.append("")
lines.append("------------------ Intervals ------------------")
lines.append(f"{'':>6} {'Sampling CI lower':>18} {'Conf. θ lower':>16} {'Bias-aware lower':>18} {'Bias-aware upper':>18} {'Conf. θ upper':>16} {'Sampling CI upper':>20}")
lines.append(f"{lbl} {ci_l:18.6f} {th_l:16.6f} {bci_l:18.6f} {bci_u:18.6f} {th_u:16.6f} {ci_u:20.6f}")
return "\n".join(lines)
# ---------------- Human-facing wrappers and legacy formatting ----------------
def _format_sensitivity_summary(
summary: pd.DataFrame,
cf_y: float,
cf_d: float,
rho: float,
level: float
) -> str:
"""
Format the sensitivity analysis summary into the expected output format.
Parameters
----------
summary : pd.DataFrame
The sensitivity summary DataFrame from DoubleML
cf_y : float
Sensitivity parameter for the outcome equation
cf_d : float
Sensitivity parameter for the treatment equation
rho : float
Correlation parameter
level : float
Confidence level
Returns
-------
str
Formatted sensitivity analysis report
"""
# Create the formatted output
output_lines = []
output_lines.append("================== Sensitivity Analysis ==================")
output_lines.append("")
output_lines.append("------------------ Scenario ------------------")
output_lines.append(f"Significance Level: level={level}")
output_lines.append(f"Sensitivity parameters: cf_y={cf_y}; cf_d={cf_d}, rho={rho}")
output_lines.append("")
# Bounds with CI section
output_lines.append("------------------ Bounds with CI ------------------")
# Create header for the table
header = f"{'':>6} {'CI lower':>11} {'theta lower':>12} {'theta':>15} {'theta upper':>12} {'CI upper':>13}"
output_lines.append(header)
# Extract values from summary DataFrame
# The summary should contain bounds and confidence intervals
lower_lbl = f"{(1 - level) / 2 * 100:.1f} %"
upper_lbl = f"{(0.5 + level / 2) * 100:.1f} %"
for idx, row in summary.iterrows():
# Format the row data - adjust column names based on actual DoubleML output
row_name = str(idx) if not isinstance(idx, str) else idx
try:
ci_lower = row.get('ci_lower', row.get(lower_lbl, row.get('2.5 %', row.get('2.5%', 0.0))))
theta_lower = row.get('theta_lower', row.get('theta lower', row.get('lower_bound', row.get('lower', 0.0))))
theta = row.get('theta', row.get('estimate', row.get('coef', 0.0)))
theta_upper = row.get('theta_upper', row.get('theta upper', row.get('upper_bound', row.get('upper', 0.0))))
ci_upper = row.get('ci_upper', row.get(upper_lbl, row.get('97.5 %', row.get('97.5%', 0.0))))
row_str = f"{row_name:>6} {ci_lower:11.6f} {theta_lower:12.6f} {theta:15.6f} {theta_upper:12.6f} {ci_upper:13.6f}"
output_lines.append(row_str)
except (KeyError, AttributeError):
# Fallback formatting if exact column names differ
row_values = [f"{val:11.6f}" if isinstance(val, (int, float)) else f"{val:>11}"
for val in list(row.values)[:5]]
row_str = f"{row_name:>6} " + " ".join(row_values)
output_lines.append(row_str)
output_lines.append("")
# Robustness SNR proxy section
output_lines.append("------------------ Robustness (risk proxy) -------------")
# Create header for robustness values
rob_header = f"{'':>6} {'H_0':>6} {'risk proxy (%)':>15} {'adj (%)':>8}"
output_lines.append(rob_header)
# Add robustness values if present, else placeholders
for idx, row in summary.iterrows():
row_name = str(idx) if not isinstance(idx, str) else idx
try:
h_0 = row.get('H_0', row.get('null_hypothesis', 0.0))
rv = row.get('RV', row.get('robustness_value', 0.0))
rva = row.get('RVa', row.get('robustness_value_adjusted', 0.0))
rob_row = f"{row_name:>6} {h_0:6.1f} {rv:15.6f} {rva:8.6f}"
output_lines.append(rob_row)
except (KeyError, AttributeError):
rob_row = f"{row_name:>6} {0.0:6.1f} {0.0:15.6f} {0.0:8.6f}"
output_lines.append(rob_row)
return "\n".join(output_lines)
[docs]
def get_sensitivity_summary(
effect_estimation: Dict[str, Any],
*,
label: Optional[str] = None,
) -> Optional[str]:
"""
Render a single, unified bias-aware summary string.
If bias-aware components are missing, shows a sampling-only variant with max_bias=0
and then formats via `format_bias_aware_summary` for consistency.
"""
if not isinstance(effect_estimation, dict) or 'model' not in effect_estimation:
return None
model = effect_estimation['model']
if label is None:
t = getattr(getattr(model, 'data', None), 'treatment', None)
label = getattr(t, 'name', None) or 'theta'
res = effect_estimation.get('bias_aware')
# Build a sampling-only placeholder if needed (level fixed at 0.95 here)
if not isinstance(res, dict):
theta, se, ci = _pull_theta_se_ci(effect_estimation, level=0.95)
from scipy.stats import norm
z = float(norm.ppf(0.5 + 0.95 / 2.0))
res = dict(
theta=float(theta),
se=float(se),
level=0.95,
z=z,
sampling_ci=(float(ci[0]), float(ci[1])),
theta_bounds_confounding=(float(theta), float(theta)), # max_bias = 0
bias_aware_ci=(float(theta - z * se), float(theta + z * se)),
max_bias=0.0,
sigma2=np.nan,
nu2=np.nan,
params=dict(cf_y=0.0, cf_d=0.0, rho=0.0, use_signed_rr=False),
)
# Single clean summary (reuse the one definitive formatter)
return format_bias_aware_summary(res, label=label)
# ---------------- Benchmarking sensitivity (short vs long model) ----------------
def sensitivity_benchmark(
effect_estimation: Dict[str, Any],
benchmarking_set: List[str],
fit_args: Optional[Dict[str, Any]] = None,
) -> pd.DataFrame:
"""
Computes a benchmark for a given set of features by refitting a short IRM model
(excluding the provided features) and contrasting it with the original (long) model.
Returns a DataFrame containing cf_y, cf_d, rho and the change in estimates.
Parameters
----------
effect_estimation : dict
A dictionary containing the fitted IRM model under the key 'model'.
benchmarking_set : list[str]
List of confounder names to be used for benchmarking (to be removed in the short model).
fit_args : dict, optional
Additional keyword arguments for the IRM.fit() method of the short model.
Returns
-------
pandas.DataFrame
A one-row DataFrame indexed by the treatment name with columns:
- cf_y, cf_d, rho: residual-based benchmarking strengths
- theta_long, theta_short, delta: effect estimates and their change (long - short)
"""
if not isinstance(effect_estimation, dict) or 'model' not in effect_estimation:
raise TypeError("effect_estimation must be a dict containing a fitted IRM under key 'model'.")
model = effect_estimation['model']
# Validate model type by attribute presence (duck-typing IRM)
required_attrs = ['data', 'coef_', 'se_', '_sensitivity_element_est']
for attr in required_attrs:
if not hasattr(model, attr):
raise NotImplementedError("Sensitivity benchmarking requires a fitted IRM model with sensitivity elements.")
# Extract current confounders
try:
x_list_long = list(getattr(model.data, 'confounders', []))
except Exception as e:
raise RuntimeError(f"Failed to access model data confounders: {e}")
# input checks
if not isinstance(benchmarking_set, list):
raise TypeError(
f"benchmarking_set must be a list. {str(benchmarking_set)} of type {type(benchmarking_set)} was passed."
)
if len(benchmarking_set) == 0:
raise ValueError("benchmarking_set must not be empty.")
if not set(benchmarking_set) <= set(x_list_long):
raise ValueError(
f"benchmarking_set must be a subset of features {str(x_list_long)}. "
f"{str(benchmarking_set)} was passed."
)
if fit_args is not None and not isinstance(fit_args, dict):
raise TypeError(f"fit_args must be a dict. {str(fit_args)} of type {type(fit_args)} was passed.")
# Build short data excluding benchmarking features
x_list_short = [x for x in x_list_long if x not in benchmarking_set]
if len(x_list_short) == 0:
raise ValueError("After removing benchmarking_set there are no confounders left to fit the short model.")
# Create a shallow copy of the underlying DataFrame and build a new CausalData
df_long = model.data.get_df()
treatment_name = model.data.treatment.name
target_name = model.data.target.name
# Prefer in-scope names; fallback to import to avoid fragile self-import patterns
try:
CausalData # type: ignore[name-defined]
IRM # type: ignore[name-defined]
except NameError:
from causalis.data.causaldata import CausalData
from causalis.inference.estimators.irm import IRM
data_short = CausalData(df=df_long, treatment=treatment_name, outcome=target_name, confounders=x_list_short)
# Clone/construct a short IRM with same hyperparameters/learners
irm_short = IRM(
data=data_short,
ml_g=model.ml_g,
ml_m=model.ml_m,
n_folds=getattr(model, 'n_folds', 4),
n_rep=getattr(model, 'n_rep', 1),
score=getattr(model, 'score', 'ATE'),
normalize_ipw=getattr(model, 'normalize_ipw', False),
trimming_rule=getattr(model, 'trimming_rule', 'truncate'),
trimming_threshold=getattr(model, 'trimming_threshold', 1e-2),
weights=getattr(model, 'weights', None),
random_state=getattr(model, 'random_state', None),
)
# Fit short model
if fit_args is None:
irm_short.fit()
else:
irm_short.fit(**fit_args)
# Long model stats
theta_long = float(model.coef_[0])
# Short model stats
theta_short = float(irm_short.coef_[0])
# Compute residual-based strengths on the long model
df = model.data.get_df()
y = df[target_name].to_numpy(dtype=float)
d = df[treatment_name].to_numpy(dtype=float)
m_hat = np.asarray(model.m_hat_, dtype=float)
g0 = np.asarray(model.g0_hat_, dtype=float)
g1 = np.asarray(model.g1_hat_, dtype=float)
r_y = y - (d * g1 + (1.0 - d) * g0)
r_d = d - m_hat
def _center(a: np.ndarray) -> np.ndarray:
return a - np.mean(a)
def _center_w(a: np.ndarray, w: np.ndarray) -> np.ndarray:
w = np.asarray(w, float)
a = np.asarray(a, float)
sw = float(np.sum(w))
mu = float(np.sum(w * a)) / (sw if sw > 1e-12 else 1.0)
return a - mu
def _ols_r2_and_fit(yv: np.ndarray, Z: np.ndarray, w: Optional[np.ndarray] = None) -> tuple[float, np.ndarray]:
"""Stable (weighted) OLS on centered & standardized vars for R^2 and fitted component."""
Z = np.asarray(Z, dtype=float)
if Z.ndim == 1:
Z = Z.reshape(-1, 1)
if w is None:
# Unweighted
yv_c = _center(yv)
Zc = Z - np.nanmean(Z, axis=0, keepdims=True)
col_std = np.nanstd(Zc, axis=0, ddof=0)
valid = np.isfinite(col_std) & (col_std > 1e-12)
if not np.any(valid):
return 0.0, np.zeros_like(yv_c)
Zcs = Zc[:, valid] / col_std[valid]
Zcs = np.nan_to_num(Zcs, nan=0.0, posinf=0.0, neginf=0.0)
yv_c = np.nan_to_num(np.asarray(yv_c, float), nan=0.0, posinf=0.0, neginf=0.0)
from numpy.linalg import lstsq
beta, *_ = lstsq(Zcs, yv_c, rcond=1e-12)
yhat = Zcs @ beta
denom = float(np.dot(yv_c, yv_c))
if not np.isfinite(denom) or denom <= 1e-12:
return 0.0, np.zeros_like(yv_c)
num = float(np.dot(yhat, yhat))
if not np.isfinite(num) or num < 0.0:
return 0.0, np.zeros_like(yv_c)
r2 = float(np.clip(num / denom, 0.0, 1.0))
return r2, yhat
else:
# Weighted
w = np.asarray(w, float)
# sanitize weights and features to avoid NaN/inf propagation
w = np.nan_to_num(w, nan=0.0, posinf=0.0, neginf=0.0)
Z = np.asarray(Z, float)
Z = np.nan_to_num(Z, nan=0.0, posinf=0.0, neginf=0.0)
sw = float(np.sum(w))
if not np.isfinite(sw) or sw <= 1e-12:
return 0.0, np.zeros_like(yv, dtype=float)
yv_c = _center_w(yv, w)
# Weighted column means
muZ = (w[:, None] * Z).sum(axis=0) / sw
Zc = Z - muZ
# Weighted std per column
var = (w[:, None] * (Zc * Zc)).sum(axis=0) / sw
std = np.sqrt(np.maximum(var, 0.0))
valid = np.isfinite(std) & (std > 1e-12)
if not np.any(valid):
return 0.0, np.zeros_like(yv_c)
Zcs = Zc[:, valid] / std[valid]
Zcs = np.nan_to_num(Zcs, nan=0.0, posinf=0.0, neginf=0.0)
yv_c = np.nan_to_num(np.asarray(yv_c, float), nan=0.0, posinf=0.0, neginf=0.0)
# Weighted least squares via sqrt(w)
swr = np.sqrt(np.clip(w, 0.0, np.inf))
Zsw = Zcs * swr[:, None]
ysw = yv_c * swr
from numpy.linalg import lstsq
beta, *_ = lstsq(Zsw, ysw, rcond=1e-12)
yhat = Zcs @ beta
denom = float(np.dot(ysw, ysw))
if not np.isfinite(denom) or denom <= 1e-12:
return 0.0, np.zeros_like(yv_c)
num = float(np.dot(swr * yhat, swr * yhat))
if not np.isfinite(num) or num < 0.0:
return 0.0, np.zeros_like(yv_c)
r2 = float(np.clip(num / denom, 0.0, 1.0))
return r2, yhat
Z = df[benchmarking_set].to_numpy(dtype=float)
# ATT weighting if applicable
p = float(np.mean(d)) if (np.isfinite(np.mean(d)) and np.mean(d) > 0.0) else 1.0
w_att = np.where(d > 0.5, 1.0 / max(p, 1e-12), 0.0)
is_att = str(getattr(model, 'score', '')).upper().startswith('ATT')
weights = w_att if is_att else None
R2y, yhat_u = _ols_r2_and_fit(r_y, Z, w=weights)
R2d, dhat_u = _ols_r2_and_fit(r_d, Z, w=weights)
cf_y = float(R2y / max(1e-12, 1.0 - R2y))
cf_d = float(R2d / max(1e-12, 1.0 - R2d))
def _safe_corr(u: np.ndarray, v: np.ndarray, w: Optional[np.ndarray] = None) -> float:
if w is None:
u = _center(u); v = _center(v)
su, sv = np.std(u), np.std(v)
if not (np.isfinite(su) and np.isfinite(sv)) or su <= 0 or sv <= 0:
return 0.0
val = float(np.corrcoef(u, v)[0, 1])
return float(np.clip(val, -1.0, 1.0))
u = _center_w(u, w); v = _center_w(v, w)
sw = float(np.sum(w))
su = np.sqrt(max(0.0, float(np.sum(w * u * u)) / (sw if sw > 1e-12 else 1.0)))
sv = np.sqrt(max(0.0, float(np.sum(w * v * v)) / (sw if sw > 1e-12 else 1.0)))
if su <= 0 or sv <= 0:
return 0.0
cov = float(np.sum(w * u * v)) / (sw if sw > 1e-12 else 1.0)
val = cov / (su * sv)
return float(np.clip(val, -1.0, 1.0))
rho = _safe_corr(yhat_u, dhat_u, w=weights)
delta = theta_long - theta_short
df_benchmark = pd.DataFrame(
{
"cf_y": [cf_y],
"cf_d": [cf_d],
"rho": [rho],
"theta_long": [theta_long],
"theta_short": [theta_short],
"delta": [delta],
},
index=[treatment_name],
)
return df_benchmark
# ---------------- Main entry for producing textual sensitivity summary ----------------
[docs]
def sensitivity_analysis(
effect_estimation: Dict[str, Any],
*,
cf_y: float,
cf_d: float,
rho: float = 1.0,
level: float = 0.95,
use_signed_rr: bool = False,
) -> Dict[str, Any]:
"""
Compute bias-aware components and cache them on `effect_estimation["bias_aware"]`.
Returns a dict with:
- theta, se, level, z
- sampling_ci
- theta_bounds_confounding = (theta - max_bias, theta + max_bias)
- bias_aware_ci = [theta - (max_bias + z*se), theta + (max_bias + z*se)]
- max_bias and components (sigma2, nu2)
- params (cf_y, cf_d, rho, use_signed_rr)
"""
if not isinstance(effect_estimation, dict) or 'model' not in effect_estimation:
raise TypeError("Pass a result dict with a fitted model under key 'model'.")
if not (0.0 < float(level) < 1.0):
raise ValueError("level must be in (0,1).")
if cf_y < 0 or cf_d < 0:
raise ValueError("cf_y and cf_d must be >= 0.")
from scipy.stats import norm as _norm
theta, se, sampling_ci = _pull_theta_se_ci(effect_estimation, level)
z = float(_norm.ppf(0.5 + level / 2.0))
model = effect_estimation['model']
max_bias = 0.0
sigma2 = np.nan
nu2 = np.nan
if hasattr(model, "_sensitivity_element_est"):
elems = model._sensitivity_element_est()
sigma2 = float(elems["sigma2"])
psi_sigma2 = np.asarray(elems["psi_sigma2"], float)
psi_sigma2 = psi_sigma2 - float(np.mean(psi_sigma2))
m_alpha = np.asarray(elems["m_alpha"], float)
rr = np.asarray(elems["riesz_rep"], float)
nu2, psi_nu2 = _combine_nu2_local(
m_alpha, rr, cf_y=cf_y, cf_d=cf_d, rho=rho, use_signed_rr=use_signed_rr
)
max_bias = float(np.sqrt(max(nu2, 0.0)) * se)
theta_lower = float(theta) - float(max_bias)
theta_upper = float(theta) + float(max_bias)
# Graceful fallback: if se is non-finite, report confounding bounds only
if not (np.isfinite(se) and se >= 0.0 and np.isfinite(z)):
bias_aware_ci = (float(theta) - float(max_bias), float(theta) + float(max_bias))
else:
bias_aware_ci = (
float(theta) - (float(max_bias) + z * float(se)),
float(theta) + (float(max_bias) + z * float(se)),
)
res = dict(
theta=float(theta),
se=float(se),
level=float(level),
z=z,
sampling_ci=tuple(map(float, sampling_ci)),
theta_bounds_confounding=(float(theta_lower), float(theta_upper)),
bias_aware_ci=tuple(map(float, bias_aware_ci)),
max_bias=float(max_bias),
sigma2=float(sigma2),
nu2=float(nu2),
params=dict(
cf_y=float(cf_y),
cf_d=float(cf_d),
rho=float(np.clip(rho, -1.0, 1.0)),
use_signed_rr=bool(use_signed_rr),
),
)
effect_estimation["bias_aware"] = res
return res