Source code for causalis.data.generators

"""
Data generation utilities for causal inference tasks.
"""

from __future__ import annotations
import numpy as np
import pandas as pd
import uuid
from dataclasses import dataclass, field
from typing import Dict, Optional, Union, List, Tuple, Callable, Any
from scipy.special import erfinv, erf

from causalis.data.causaldata import CausalData



def _sigmoid(z):
    """
    Numerically stable sigmoid: 1 / (1 + exp(-z))
    Handles large positive/negative z without overflow warnings.
    Ensures outputs lie strictly within (0,1) and are strictly monotone in z
    even under floating-point saturation by using nextafter for exact 0/1 cases.

    Supports inputs of any shape; uses a flattened view to apply exact-0/1 nudges
    and reshapes back to preserve original structure.

    Parameters
    ----------
    z : array-like
        Input values of any shape

    Returns
    -------
    array-like
        Sigmoid of input values with same shape as input
    """
    z_arr = np.asarray(z, dtype=float)
    out = np.empty_like(z_arr, dtype=float)

    pos_mask = z_arr >= 0
    neg_mask = ~pos_mask
    out[pos_mask] = 1.0 / (1.0 + np.exp(-z_arr[pos_mask]))
    ez = np.exp(z_arr[neg_mask])
    out[neg_mask] = ez / (1.0 + ez)

    if out.ndim == 0:
        val = float(out)
        if not (0.0 < val < 1.0):
            return float(np.nextafter(0.0, 1.0)) if val <= 0.0 else float(np.nextafter(1.0, 0.0))
        return val

    # Order-preserving nudge off exact 0/1 on a flattened view, then reshape back
    flat = out.ravel()
    zflat = z_arr.ravel()

    zero_idx = np.flatnonzero(flat <= 0.0)
    if zero_idx.size:
        order = np.argsort(zflat[zero_idx])  # ascending z
        val = 0.0
        for j in order:
            val = np.nextafter(val, 1.0)
            flat[zero_idx[j]] = val

    one_idx = np.flatnonzero(flat >= 1.0)
    if one_idx.size:
        order = np.argsort(zflat[one_idx])  # ascending z
        k = one_idx.size
        val = 1.0
        steps = []
        for _ in range(k):
            val = np.nextafter(val, 0.0)
            steps.append(val)  # descending sequence < 1
        for rank, j in enumerate(order):
            flat[one_idx[j]] = steps[k - rank - 1]

    return flat.reshape(out.shape)



def _logit(p: float) -> float:
    p = float(np.clip(p, 1e-12, 1 - 1e-12))
    return float(np.log(p / (1 - p)))


def _deterministic_ids(rng: np.random.Generator, n: int) -> List[str]:
    """Return deterministic uuid-like hex strings using the provided RNG."""
    return [rng.bytes(16).hex() for _ in range(n)]


def _sample_confounders_like_class(
    n: int,
    rng: np.random.Generator,
    *,
    confounder_specs: Optional[List[Dict[str, Any]]],
    k: int,
    x_sampler: Optional[Callable[[int, int, int], np.ndarray]],
    seed: Optional[int],
) -> Tuple[np.ndarray, List[str]]:
    """Replicates CausalDatasetGenerator._sample_X behavior (names & one-hot)."""
    # Custom sampler wins
    if x_sampler is not None:
        X = x_sampler(n, k, seed)
        names = [f"x{i+1}" for i in range(k)]
        return np.asarray(X, dtype=float), names

    # Specs provided
    if confounder_specs is not None:
        cols, names = [], []
        for spec in confounder_specs:
            name = spec.get("name") or f"x{len(names)+1}"
            dist = str(spec.get("dist", "normal")).lower()

            if dist == "normal":
                mu = float(spec.get("mu", 0.0)); sd = float(spec.get("sd", 1.0))
                col = rng.normal(mu, sd, size=n)
                cols.append(col.astype(float)); names.append(name)

            elif dist == "uniform":
                a = float(spec.get("a", 0.0)); b = float(spec.get("b", 1.0))
                col = rng.uniform(a, b, size=n)
                cols.append(col.astype(float)); names.append(name)

            elif dist == "bernoulli":
                p = float(spec.get("p", 0.5))
                col = rng.binomial(1, p, size=n).astype(float)
                cols.append(col); names.append(name)

            elif dist == "categorical":
                categories = list(spec.get("categories", [0, 1, 2]))
                probs = spec.get("probs", None)
                if probs is not None:
                    p = np.asarray(probs, dtype=float)
                    ps = p / p.sum()
                else:
                    ps = None
                col = rng.choice(categories, p=ps, size=n)
                # one-hot for all levels except the first
                rest = categories[1:]
                if len(rest) == 0:
                    # Add a zero column if only one category provided, with a collision-safe suffix
                    cols.append(np.zeros(n, dtype=float))
                    names.append(f"{name}__onlylevel")
                else:
                    for c in rest:
                        oh = (col == c).astype(float)
                        cols.append(oh)
                        names.append(f"{name}_{c}")
            else:
                raise ValueError(f"Unknown dist: {dist}")

        X = np.column_stack(cols) if cols else np.empty((n, 0))
        return X, names

    # Default: iid N(0,1) if k>0; else no X
    if k > 0:
        X = rng.normal(size=(n, k))
        names = [f"x{i+1}" for i in range(k)]
        return X, names
    else:
        return np.empty((n, 0)), []


def _gaussian_copula(
    rng: np.random.Generator,
    n: int,
    specs: List[Dict[str, Any]],
    corr: Optional[np.ndarray] = None,
) -> Tuple[np.ndarray, List[str]]:
    """
    Generate mixed-type X with a Gaussian copula. 'specs' use the existing schema.
    For categorical, use quantile thresholds on a latent normal (via uniform U).
    """
    d = len(specs)
    if d == 0:
        return np.empty((n, 0)), []

    if corr is None:
        L = np.eye(d)
    else:
        C = np.asarray(corr, dtype=float)
        if C.shape != (d, d):
            raise ValueError(f"copula_corr must have shape {(d, d)}, got {C.shape}")
        # Ensure symmetry
        C = 0.5 * (C + C.T)
        I = np.eye(d)
        eps_list = [1e-12, 1e-10, 1e-8, 1e-6]
        L = None
        for eps in eps_list:
            try:
                L = np.linalg.cholesky(C + eps * I)
                break
            except np.linalg.LinAlgError:
                continue
        if L is None:
            # As a last resort, fall back to identity (independence)
            L = I
    # latent Z ~ N(0, corr)
    Z = rng.normal(size=(n, d)) @ L.T
    # Exact standard normal CDF via error function
    U = 0.5 * (1.0 + erf(Z / np.sqrt(2.0)))

    cols: List[np.ndarray] = []
    names: List[str] = []

    for j, spec in enumerate(specs):
        name = spec.get("name") or f"x{j+1}"
        dist = str(spec.get("dist", "normal")).lower()
        u = U[:, j]
        # keep u strictly inside (0,1) to avoid boundary issues in searchsorted
        eps = np.finfo(float).eps
        u = np.clip(u, eps, 1.0 - eps)
        if dist == "normal":
            mu = float(spec.get("mu", 0.0)); sd = float(spec.get("sd", 1.0))
            x = mu + sd * np.sqrt(2.0) * erfinv(2.0 * u - 1.0)
            cols.append(x.astype(float)); names.append(name)
        elif dist == "uniform":
            a = float(spec.get("a", 0.0)); b = float(spec.get("b", 1.0))
            x = a + (b - a) * u
            cols.append(x.astype(float)); names.append(name)
        elif dist == "bernoulli":
            p = float(spec.get("p", 0.5))
            x = (u < p).astype(float)
            cols.append(x); names.append(name)
        elif dist == "categorical":
            categories = list(spec.get("categories", [0, 1, 2]))
            probs = spec.get("probs", None)
            if probs is None:
                probs = [1.0 / max(len(categories), 1) for _ in categories]
            p = np.asarray(probs, dtype=float)
            if p.ndim != 1 or p.shape[0] != len(categories):
                raise ValueError("'probs' must be a 1D list matching 'categories' length")
            ps = p / p.sum()
            cum = np.cumsum(ps)
            idx = np.searchsorted(cum, u, side="right")
            # Guard boundary: if u==1 (after potential numerical saturation), clamp to last index
            if np.isscalar(idx):
                if idx >= len(categories):
                    idx = len(categories) - 1
            else:
                idx[idx == len(categories)] = len(categories) - 1
            cat_arr = np.array(categories, dtype=object)
            lab = cat_arr[idx]
            rest = categories[1:]
            if len(rest) == 0:
                cols.append(np.zeros(n, dtype=float))
                names.append(f"{name}__onlylevel")
            else:
                for c in rest:
                    cols.append((lab == c).astype(float))
                    names.append(f"{name}_{c}")
        else:
            raise ValueError(f"Unknown dist: {dist}")

    X = np.column_stack(cols) if cols else np.empty((n, 0))
    return X, names


[docs] def generate_rct( n: int = 20_000, split: float = 0.5, random_state: Optional[int] = 42, target_type: str = "binary", # {"binary","normal","poisson"}; "nonnormal" -> "poisson" target_params: Optional[Dict] = None, confounder_specs: Optional[List[Dict[str, Any]]] = None, k: int = 0, x_sampler: Optional[Callable[[int, int, int], np.ndarray]] = None, add_ancillary: bool = True, deterministic_ids: bool = False, ) -> pd.DataFrame: """ Generate an RCT dataset via CausalDatasetGenerator (thin wrapper), ensuring randomized treatment independent of X. Keeps y and d as float for ML compatibility. """ # RNG for ancillary generation rng = np.random.default_rng(random_state) # Validate split split_f = float(split) if not (0.0 < split_f < 1.0): raise ValueError("split must be in (0,1).") # Normalize target_type ttype = target_type.lower() if ttype == "nonnormal": ttype = "poisson" if ttype not in {"binary", "normal", "poisson"}: raise ValueError("target_type must be 'binary', 'normal', or 'poisson' (or alias 'nonnormal').") # Default target_params if target_params is None: if ttype == "binary": target_params = {"p": {"A": 0.10, "B": 0.12}} elif ttype == "normal": target_params = {"mean": {"A": 0.00, "B": 0.20}, "std": 1.0} else: target_params = {"shape": 2.0, "scale": {"A": 1.0, "B": 1.1}} # Map to natural-scale means if ttype == "binary": pA = float(target_params["p"]["A"]); pB = float(target_params["p"]["B"]) if not (0.0 < pA < 1.0 and 0.0 < pB < 1.0): raise ValueError("For binary outcomes, probabilities must be in (0,1).") mu0_nat, mu1_nat = pA, pB elif ttype == "normal": muA = float(target_params["mean"]["A"]); muB = float(target_params["mean"]["B"]) sd = float(target_params.get("std", 1.0)) if not (sd > 0): raise ValueError("For normal outcomes, std must be > 0.") mu0_nat, mu1_nat = muA, muB else: # poisson shape = float(target_params.get("shape", 2.0)) scaleA = float(target_params["scale"]["A"]) scaleB = float(target_params["scale"]["B"]) lamA = shape * scaleA; lamB = shape * scaleB if not (lamA > 0 and lamB > 0): raise ValueError("For Poisson outcomes, implied rates must be > 0.") mu0_nat, mu1_nat = lamA, lamB # Convert to class parameters if ttype == "binary": alpha_y = _logit(mu0_nat) theta = _logit(mu1_nat) - _logit(mu0_nat) outcome_type_cls = "binary" sigma_y = 1.0 elif ttype == "normal": alpha_y = mu0_nat theta = mu1_nat - mu0_nat outcome_type_cls = "continuous" sigma_y = sd else: # poisson alpha_y = float(np.log(mu0_nat)) theta = float(np.log(mu1_nat / mu0_nat)) outcome_type_cls = "poisson" sigma_y = 1.0 # Instantiate the unified generator with randomized treatment gen = CausalDatasetGenerator( theta=theta, tau=None, beta_y=None, beta_d=None, g_y=None, g_d=None, alpha_y=alpha_y, alpha_d=_logit(split_f), sigma_y=sigma_y, outcome_type=outcome_type_cls, confounder_specs=confounder_specs, k=int(k), x_sampler=x_sampler, use_copula=False, target_d_rate=None, u_strength_d=0.0, u_strength_y=0.0, seed=random_state, ) df = gen.generate(n) # Ancillary columns (optional) if add_ancillary: y = df["y"].to_numpy() # Age ~ N(35 + 4*y, 8), int clipped to [18,90] age = rng.normal(35 + 4 * y, 8, n).round().clip(18, 90).astype(int) # cnt_trans ~ Poisson(max(1.5 + 2*y, 1e-6)) lam_tx = np.maximum(1.5 + 2 * y, 1e-6) cnt_trans = rng.poisson(lam_tx, n).astype(int) # Platform: P(Android) = sigmoid(-0.4 + 0.8*y) p_android = _sigmoid(-0.4 + 0.8 * y) platform_android = rng.binomial(1, np.clip(p_android, 0.0, 1.0), n).astype(int) platform_ios = (1 - platform_android).astype(int) # Invited friend: normalized y -> [0,1] y_min, y_max = float(np.min(y)), float(np.max(y)) y_norm = (y - y_min) / ((y_max - y_min) + 1e-8) invited_friend = rng.binomial(1, np.clip(0.05 + 0.25 * y_norm, 0.0, 1.0), n).astype(int) # User IDs if deterministic_ids: user_ids = _deterministic_ids(rng, n) else: user_ids = [str(uuid.uuid4()) for _ in range(n)] df.insert(0, "user_id", user_ids) df["age"] = age df["cnt_trans"] = cnt_trans df["platform_Android"] = platform_android df["platform_iOS"] = platform_ios df["invited_friend"] = invited_friend return df
[docs] def generate_rct_data( n: int = 20_000, split: float = 0.5, random_state: Optional[int] = 42, target_type: str = "binary", target_params: Optional[Dict] = None, confounder_specs: Optional[List[Dict[str, Any]]] = None, k: int = 0, x_sampler: Optional[Callable[[int, int, int], np.ndarray]] = None, add_ancillary: bool = True, deterministic_ids: bool = False, ) -> pd.DataFrame: """ Backward-compatible alias for generate_rct used in documentation. Parameters mirror generate_rct and are forwarded directly. """ return generate_rct( n=n, split=split, random_state=random_state, target_type=target_type, target_params=target_params, confounder_specs=confounder_specs, k=k, x_sampler=x_sampler, add_ancillary=add_ancillary, deterministic_ids=deterministic_ids, )
[docs] @dataclass(slots=True) class CausalDatasetGenerator: """ Generate synthetic causal inference datasets with controllable confounding, treatment prevalence, noise, and (optionally) heterogeneous treatment effects. **Data model (high level)** - Confounders X ∈ R^k are drawn from user-specified distributions. - Binary treatment D is assigned by a logistic model: D ~ Bernoulli( sigmoid(alpha_d + f_d(X) + u_strength_d * U) ), where f_d(X) = X @ beta_d + g_d(X), and U ~ N(0,1) is an optional unobserved confounder. - Outcome Y depends on treatment and confounders with link determined by `outcome_type`: outcome_type = "continuous": Y = alpha_y + f_y(X) + u_strength_y * U + T * tau(X) + ε, ε ~ N(0, sigma_y^2) outcome_type = "binary": logit P(Y=1|T,X) = alpha_y + f_y(X) + u_strength_y * U + T * tau(X) outcome_type = "poisson": log E[Y|T,X] = alpha_y + f_y(X) + u_strength_y * U + T * tau(X) where f_y(X) = X @ beta_y + g_y(X), and tau(X) is either constant `theta` or a user function. **Returned columns** - y: outcome - d: binary treatment (0/1) - x1..xk (or user-provided names) - m : true propensity P(T=1 | X) - g0 : E[Y | X, T=0] on the natural outcome scale - g1 : E[Y | X, T=1] on the natural outcome scale - cate: g1 - g0 (conditional average treatment effect on the natural outcome scale) Notes on effect scale: - For "continuous", `theta` (or tau(X)) is an additive mean difference. - For "binary", tau acts on the *log-odds* scale (log-odds ratio). - For "poisson", tau acts on the *log-mean* scale (log incidence-rate ratio). Parameters ---------- theta : float, default=1.0 Constant treatment effect used if `tau` is None. tau : callable or None, default=None Function tau(X) -> array-like shape (n,) for heterogeneous effects. Ignored if None. beta_y : array-like of shape (k,), optional Linear coefficients of confounders in the outcome baseline f_y(X). Length must match the expanded X after one-hot encoding. beta_d : array-like of shape (k,), optional Linear coefficients of confounders in the treatment score f_t(X) (log-odds scale). Length must match the expanded X after one-hot encoding. g_y : callable, optional Nonlinear/additive function g_y(X) -> (n,) added to the outcome baseline. g_d : callable, optional Nonlinear/additive function g_d(X) -> (n,) added to the treatment score. alpha_y : float, default=0.0 Outcome intercept (natural scale for continuous; log-odds for binary; log-mean for Poisson). alpha_d : float, default=0.0 Treatment intercept (log-odds). If `target_d_rate` is set, `alpha_d` is auto-calibrated. sigma_y : float, default=1.0 Std. dev. of the Gaussian noise for continuous outcomes. outcome_type : {"continuous","binary","poisson"}, default="continuous" Outcome family and link as defined above. confounder_specs : list of dict, optional Schema for generating confounders. Each spec is one of: {"name": str, "dist": "normal", "mu": float, "sd": float} {"name": str, "dist": "uniform", "a": float, "b": float} {"name": str, "dist": "bernoulli","p": float} {"name": str, "dist": "categorical", "categories": list, "probs": list} - For "categorical", one-hot encoding is produced for all levels except the first. k : int, default=5 Number of confounders when `confounder_specs` is None. Defaults to independent N(0,1). x_sampler : callable, optional Custom sampler (n, k, seed) -> X ndarray of shape (n,k). Overrides `confounder_specs` and `k`. target_d_rate : float in (0,1), optional If set, calibrates `alpha_d` via bisection so that mean propensity ≈ target_d_rate. u_strength_d : float, default=0.0 Strength of the unobserved confounder U in treatment assignment. u_strength_y : float, default=0.0 Strength of the unobserved confounder U in the outcome. seed : int, optional Random seed for reproducibility. Attributes ---------- rng : numpy.random.Generator Internal RNG seeded from `seed`. Examples -------- >>> gen = CausalDatasetGenerator( ... theta=2.0, ... beta_y=np.array([1.0, -0.5, 0.2]), ... beta_d=np.array([0.8, 1.2, -0.3]), ... target_d_rate=0.35, ... outcome_type="continuous", ... sigma_y=1.0, ... seed=42, ... confounder_specs=[ ... {"name":"age", "dist":"normal", "mu":50, "sd":10}, ... {"name":"smoker", "dist":"bernoulli", "p":0.3}, ... {"name":"bmi", "dist":"normal", "mu":27, "sd":4}, ... ]) >>> df = gen.generate(10_000) >>> df.columns Index([... 'y','d','age','smoker','bmi','propensity','mu0','mu1','cate'], dtype='object') """ # Core knobs theta: float = 1.0 # constant treatment effect (ATE) if tau is None tau: Optional[Callable[[np.ndarray], np.ndarray]] = None # heterogeneous effect tau(X) if provided # Confounder -> outcome/treatment effects beta_y: Optional[np.ndarray] = None # shape (k,) beta_d: Optional[np.ndarray] = None # shape (k,) g_y: Optional[Callable[[np.ndarray], np.ndarray]] = None # nonlinear baseline outcome f_y(X) g_d: Optional[Callable[[np.ndarray], np.ndarray]] = None # nonlinear treatment score f_d(X) # Outcome/treatment intercepts and noise alpha_y: float = 0.0 alpha_d: float = 0.0 sigma_y: float = 1.0 # used when outcome_type="continuous" outcome_type: str = "continuous" # "continuous" | "binary" | "poisson" # Confounder generation confounder_specs: Optional[List[Dict[str, Any]]] = None # list of {"name","dist",...} k: int = 5 # used if confounder_specs is None x_sampler: Optional[Callable[[int, int, int], np.ndarray]] = None # custom sampler (n, k, seed)->X use_copula: bool = False # if True and confounder_specs provided, use Gaussian copula copula_corr: Optional[np.ndarray] = None # correlation matrix for copula (shape dxd where d=len(specs)) # Practical controls target_d_rate: Optional[float] = None # e.g., 0.3 -> ~30% treated; solves for alpha_d u_strength_d: float = 0.0 # unobserved confounder effect on treatment u_strength_y: float = 0.0 # unobserved confounder effect on outcome propensity_sharpness: float = 1.0 # scales the X-driven treatment score to adjust positivity difficulty seed: Optional[int] = None # Internals (filled post-init) rng: np.random.Generator = field(init=False, repr=False) def __post_init__(self): self.rng = np.random.default_rng(self.seed) if self.confounder_specs is not None: self.k = len(self.confounder_specs) # ---------- Confounder sampling ---------- def _sample_X(self, n: int) -> Tuple[np.ndarray, List[str]]: if self.x_sampler is not None: X = self.x_sampler(n, self.k, self.seed) names = [f"x{i+1}" for i in range(self.k)] return X, names if self.confounder_specs is None: # Default: independent standard normals X = self.rng.normal(size=(n, self.k)) names = [f"x{i+1}" for i in range(self.k)] return X, names # If specs are provided and copula is requested, use Gaussian copula if getattr(self, "use_copula", False): X, names = _gaussian_copula(self.rng, n, self.confounder_specs, getattr(self, "copula_corr", None)) self.k = X.shape[1] return X, names cols = [] names = [] for spec in self.confounder_specs: name = spec.get("name") or f"x{len(names)+1}" dist = spec.get("dist", "normal").lower() if dist == "normal": mu = spec.get("mu", 0.0); sd = spec.get("sd", 1.0) col = self.rng.normal(mu, sd, size=n) elif dist == "uniform": a = spec.get("a", 0.0); b = spec.get("b", 1.0) col = self.rng.uniform(a, b, size=n) elif dist == "bernoulli": p = spec.get("p", 0.5) col = self.rng.binomial(1, p, size=n).astype(float) elif dist == "categorical": categories = list(spec.get("categories", [0,1,2])) probs = spec.get("probs", None) if probs is not None: p = np.asarray(probs, dtype=float) ps = p / p.sum() else: ps = None col = self.rng.choice(categories, p=ps, size=n) # one-hot encode (except first level) rest = categories[1:] if len(rest) == 0: cols.append(np.zeros(n, dtype=float)) names.append(f"{name}__onlylevel") continue for c in rest: cols.append((col == c).astype(float)) names.append(f"{name}_{c}") continue else: raise ValueError(f"Unknown dist: {dist}") cols.append(col.astype(float)) names.append(name) X = np.column_stack(cols) if cols else np.empty((n,0)) self.k = X.shape[1] return X, names # ---------- Helpers ---------- def _treatment_score(self, X: np.ndarray, U: np.ndarray) -> np.ndarray: # Ensure numeric, finite arrays Xf = np.asarray(X, dtype=float) # X-driven part of the score score_x = np.zeros(Xf.shape[0], dtype=float) if self.beta_d is not None: bt = np.asarray(self.beta_d, dtype=float) if bt.ndim != 1: bt = bt.reshape(-1) if bt.shape[0] != Xf.shape[1]: raise ValueError(f"beta_d shape {bt.shape} is incompatible with X shape {Xf.shape}") score_x += np.sum(Xf * bt, axis=1) if self.g_d is not None: score_x += np.asarray(self.g_d(Xf), dtype=float) # Scale sharpness to control positivity difficulty s = float(getattr(self, "propensity_sharpness", 1.0)) lin = s * score_x # Add unobserved confounder contribution (unscaled) if self.u_strength_d != 0: lin += self.u_strength_d * np.asarray(U, dtype=float) return lin def _outcome_location(self, X: np.ndarray, D: np.ndarray, U: np.ndarray, tau_x: np.ndarray) -> np.ndarray: # location on natural scale for continuous; on logit/log scale for binary/poisson Xf = np.asarray(X, dtype=float) Df = np.asarray(D, dtype=float) Uf = np.asarray(U, dtype=float) taux = np.asarray(tau_x, dtype=float) loc = np.full(Xf.shape[0], float(self.alpha_y), dtype=float) if self.beta_y is not None: by = np.asarray(self.beta_y, dtype=float) if by.ndim != 1: by = by.reshape(-1) if by.shape[0] != Xf.shape[1]: raise ValueError(f"beta_y shape {by.shape} is incompatible with X shape {Xf.shape}") loc += np.sum(Xf * by, axis=1) if self.g_y is not None: loc += np.asarray(self.g_y(Xf), dtype=float) if self.u_strength_y != 0: loc += self.u_strength_y * Uf loc += Df * taux return loc def _calibrate_alpha_d(self, X: np.ndarray, U: np.ndarray, target: float) -> float: """Calibrate alpha_d so mean propensity ~= target using robust bracketing and bisection.""" lo, hi = -50.0, 50.0 # Define function whose root we seek def f(a: float) -> float: return float(_sigmoid(a + self._treatment_score(X, U)).mean() - target) flo, fhi = f(lo), f(hi) # If the target is not bracketed, try expanding the bracket a few times if flo * fhi > 0: for scale in (2, 5, 10): lo2, hi2 = -scale * 50.0, scale * 50.0 flo2, fhi2 = f(lo2), f(hi2) if flo2 * fhi2 <= 0: lo, hi = lo2, hi2 flo, fhi = flo2, fhi2 break else: # Fall back to the closer endpoint return lo if abs(flo) < abs(fhi) else hi # Standard bisection with tighter tolerance and bounded iterations for _ in range(80): mid = 0.5 * (lo + hi) fm = f(mid) if abs(fm) < 1e-6: return mid if fm > 0: hi = mid else: lo = mid return 0.5 * (lo + hi) # ---------- Public API ----------
[docs] def generate(self, n: int) -> pd.DataFrame: """ Draw a synthetic dataset of size `n`. Parameters ---------- n : int Number of observations to simulate. Returns ------- pandas.DataFrame Columns: - y : float - d : {0.0, 1.0} - <confounder columns> : floats (and one-hot columns for categorical) - m : float in (0,1), true P(T=1 | X) - g0 : expected outcome under control on the natural scale - g1 : expected outcome under treatment on the natural scale - cate: g1 - g0 (conditional treatment effect on the natural scale) (Deprecated aliases also emitted for one release: propensity, mu0, mu1) Notes ----- - If `target_d_rate` is set, `alpha_d` is internally recalibrated (bisection) on the *current draw of X and U*, so repeated calls can yield slightly different alpha_d values even with the same seed unless X and U are fixed. - For binary and Poisson outcomes, `cate` is reported on the natural scale (probability or mean), even though the structural model is specified on the log-odds / log-mean scale. """ X, names = self._sample_X(n) U = self.rng.normal(size=n) # unobserved confounder # Treatment assignment if self.target_d_rate is not None: self.alpha_d = self._calibrate_alpha_d(X, U, self.target_d_rate) logits_t = self.alpha_d + self._treatment_score(X, U) m_obs = _sigmoid(logits_t) # realized propensity including U # Marginal m(x) = E[D|X] (integrate out U if it affects treatment) if float(self.u_strength_d) != 0.0: gh_x, gh_w = np.polynomial.hermite.hermgauss(21) gh_w = gh_w / np.sqrt(np.pi) base = self.alpha_d + self._treatment_score(X, np.zeros(n)) m = np.sum(_sigmoid(base[:, None] + self.u_strength_d * np.sqrt(2.0) * gh_x[None, :]) * gh_w[None, :], axis=1) else: # Closed form when U doesn't affect D base = self.alpha_d + self._treatment_score(X, np.zeros(n)) m = _sigmoid(base) D = self.rng.binomial(1, m_obs).astype(float) # Treatment effect (constant or heterogeneous) tau_x = (self.tau(X) if self.tau is not None else np.full(n, self.theta)).astype(float) # Outcome generation loc = self._outcome_location(X, D, U, tau_x) if self.outcome_type == "continuous": Y = loc + self.rng.normal(0, self.sigma_y, size=n) elif self.outcome_type == "binary": # logit: logit P(Y=1|D,X) = loc p = _sigmoid(loc) Y = self.rng.binomial(1, p).astype(float) elif self.outcome_type == "poisson": # log link: log E[Y|D,X] = loc; guard against overflow on link scale loc_c = np.clip(loc, -20, 20) lam = np.exp(loc_c) Y = self.rng.poisson(lam).astype(float) else: raise ValueError("outcome_type must be 'continuous', 'binary', or 'poisson'") # Compute oracle g0/g1 on the natural scale, excluding U for continuous, and # marginalizing over U for binary/poisson when u_strength_y != 0. if self.outcome_type == "continuous": # Oracle means exclude U (mean-zero unobserved) g0 = self._outcome_location(X, np.zeros(n), np.zeros(n), np.zeros(n)) g1 = self._outcome_location(X, np.ones(n), np.zeros(n), tau_x) elif self.outcome_type == "binary": if float(self.u_strength_y) != 0.0: gh_x, gh_w = np.polynomial.hermite.hermgauss(21) gh_w = gh_w / np.sqrt(np.pi) base0 = self._outcome_location(X, np.zeros(n), np.zeros(n), np.zeros(n)) base1 = self._outcome_location(X, np.ones(n), np.zeros(n), tau_x) Uq = np.sqrt(2.0) * gh_x g0 = np.sum(_sigmoid(base0[:, None] + self.u_strength_y * Uq[None, :]) * gh_w[None, :], axis=1) g1 = np.sum(_sigmoid(base1[:, None] + self.u_strength_y * Uq[None, :]) * gh_w[None, :], axis=1) else: g0 = _sigmoid(self._outcome_location(X, np.zeros(n), np.zeros(n), np.zeros(n))) g1 = _sigmoid(self._outcome_location(X, np.ones(n), np.zeros(n), tau_x)) elif self.outcome_type == "poisson": if float(self.u_strength_y) != 0.0: gh_x, gh_w = np.polynomial.hermite.hermgauss(21) gh_w = gh_w / np.sqrt(np.pi) base0 = self._outcome_location(X, np.zeros(n), np.zeros(n), np.zeros(n)) base1 = self._outcome_location(X, np.ones(n), np.zeros(n), tau_x) Uq = np.sqrt(2.0) * gh_x g0 = np.sum(np.exp(np.clip(base0[:, None] + self.u_strength_y * Uq[None, :], -20, 20)) * gh_w[None, :], axis=1) g1 = np.sum(np.exp(np.clip(base1[:, None] + self.u_strength_y * Uq[None, :], -20, 20)) * gh_w[None, :], axis=1) else: g0 = np.exp(np.clip(self._outcome_location(X, np.zeros(n), np.zeros(n), np.zeros(n)), -20, 20)) g1 = np.exp(np.clip(self._outcome_location(X, np.ones(n), np.zeros(n), tau_x), -20, 20)) else: raise ValueError("outcome_type must be 'continuous', 'binary', or 'poisson'") df = pd.DataFrame({"y": Y, "d": D}) for j, name in enumerate(names): df[name] = X[:, j] # Useful ground-truth columns for evaluation (IRM naming) df["m"] = m # marginal E[D|X] df["m_obs"] = m_obs # realized sigmoid(alpha_d + score + u*U) df["g0"] = g0 df["g1"] = g1 df["cate"] = df["g1"] - df["g0"] # Legacy aliases retained for compatibility (no warnings) df["propensity"] = df["m_obs"] df["mu0"] = df["g0"] df["mu1"] = df["g1"] return df
[docs] def to_causal_data(self, n: int, confounders: Optional[Union[str, List[str]]] = None) -> CausalData: """ Generate a dataset and convert it to a CausalData object. Parameters ---------- n : int Number of observations to simulate. confounders : Union[str, List[str]], optional Column name(s) to use as confounders. If None, all confounder columns are used. Returns ------- CausalData A CausalData object containing the generated data. """ df = self.generate(n) # Determine confounders to use exclude = {'y', 'd', 'm', 'm_obs', 'g0', 'g1', 'cate', 'propensity', 'mu0', 'mu1', 'user_id'} if confounders is None: # Keep original column order; exclude outcome/treatment/ground-truth columns and non-numeric confounder_cols = [ c for c in df.columns if c not in exclude and pd.api.types.is_numeric_dtype(df[c]) ] elif isinstance(confounders, str): confounder_cols = [confounders] else: confounder_cols = [c for c in confounders if c in df.columns] # Create and return CausalData object return CausalData(df=df, treatment='d', outcome='y', confounders=confounder_cols)
[docs] def oracle_nuisance(self, num_quad: int = 21): """ Return nuisance functions (m(x), g0(x), g1(x)) compatible with IRM. NOTE: Previously documented as (e, m0, m1). The semantics are unchanged; only naming now matches IRM conventions. Behavior: - If both u_strength_d != 0 and u_strength_y != 0, DML is not identified; raise ValueError. - m(x): If u_strength_d == 0, closed form sigmoid(alpha_d + s * f_t(x)). If u_strength_d != 0, approximate E_U[sigmoid(alpha_d + s * f_t(x) + u_strength_d * U)] using Gauss–Hermite quadrature with `num_quad` nodes, where U ~ N(0,1). - g0/g1(x): closed-form mappings on the natural outcome scale evaluated with U omitted. Parameters ---------- num_quad : int, default=21 Number of Gauss–Hermite nodes for marginalizing over U in m(x) when u_strength_d != 0. """ # Disallow unobserved confounding for DML when U affects both T and Y if (getattr(self, "u_strength_d", 0.0) != 0) and (getattr(self, "u_strength_y", 0.0) != 0): raise ValueError( "DML identification fails when U affects both T and Y. " "Use instruments (PLIV-DML) or set one of u_strength_* to 0." ) # Precompute GH nodes/weights normalized for N(0,1) gh_x, gh_w = np.polynomial.hermite.hermgauss(int(num_quad)) gh_w = gh_w / np.sqrt(np.pi) def m_of_x(x_row: np.ndarray) -> float: x = np.asarray(x_row, dtype=float).reshape(1, -1) # Base score without U contribution, matching _treatment_score(X, U=0) base = float(self.alpha_d) base += float(self._treatment_score(x, np.zeros(1, dtype=float))[0]) ut = float(getattr(self, "u_strength_d", 0.0)) if ut == 0.0: return float(_sigmoid(base)) # Integrate over U ~ N(0,1) using Gauss–Hermite: U = sqrt(2) * gh_x z = base + ut * np.sqrt(2.0) * gh_x return float(np.sum(_sigmoid(z) * gh_w)) def g_of_x_d(x_row: np.ndarray, d: int) -> float: x = np.asarray(x_row, dtype=float).reshape(1, -1) if self.tau is None: tau_val = float(self.theta) else: tau_val = float(np.asarray(self.tau(x), dtype=float).reshape(-1)[0]) loc = float(self.alpha_y) if self.beta_y is not None: loc += float(np.sum(x * np.asarray(self.beta_y, dtype=float), axis=1)) if self.g_y is not None: loc += float(np.asarray(self.g_y(x), dtype=float)) loc += float(d) * tau_val if self.outcome_type == "continuous": return float(loc) uy = float(getattr(self, "u_strength_y", 0.0)) if self.outcome_type == "binary": if uy == 0.0: return float(_sigmoid(loc)) z = loc + uy * np.sqrt(2.0) * gh_x return float(np.sum(_sigmoid(z) * gh_w)) if self.outcome_type == "poisson": if uy == 0.0: return float(np.exp(np.clip(loc, -20.0, 20.0))) z = np.clip(loc + uy * np.sqrt(2.0) * gh_x, -20.0, 20.0) return float(np.sum(np.exp(z) * gh_w)) raise ValueError("outcome_type must be 'continuous','binary','poisson'.") return (lambda x: m_of_x(x), lambda x: g_of_x_d(x, 0), lambda x: g_of_x_d(x, 1))