Source code for causalkit.data.generators

"""
Data generation utilities for causal inference tasks.
"""

from __future__ import annotations
import numpy as np
import pandas as pd
import uuid
from dataclasses import dataclass, field
from typing import Dict, Optional, Union, List, Tuple, Callable, Any

from causalkit.data.causaldata import CausalData


def _sigmoid(z):
    """
    Numerically stable sigmoid: 1 / (1 + exp(-z))
    Handles large positive/negative z without overflow warnings.
    
    Parameters
    ----------
    z : array-like
        Input values
        
    Returns
    -------
    array-like
        Sigmoid of input values
    """
    z = np.asarray(z, dtype=float)
    # Use a stable formulation to avoid overflow for large |z|
    # expit(z) = 0.5 * (1 + tanh(z/2)) is stable, but tanh can be slower.
    # Implement a piecewise stable computation directly.
    out = np.empty_like(z, dtype=float)
    pos_mask = z >= 0
    neg_mask = ~pos_mask
    # For positive z: 1 / (1 + exp(-z))
    out[pos_mask] = 1.0 / (1.0 + np.exp(-z[pos_mask]))
    # For negative z: exp(z) / (1 + exp(z)) to avoid exp(-z) overflow
    ez = np.exp(z[neg_mask])
    out[neg_mask] = ez / (1.0 + ez)
    return out


[docs] def generate_rct_data( n_users: int = 20_000, split: float = 0.5, random_state: Optional[int] = 42, target_type: str = "binary", # {"binary", "normal", "nonnormal"} target_params: Optional[Dict] = None, # distribution specifics (see docstring) ) -> pd.DataFrame: """ Create synthetic RCT data using CausalDatasetGenerator as the core engine. - Treatment is randomized to approximately match `split` (independent of covariates). - Outcome distribution is controlled by `target_type` and `target_params`. - Returns a legacy-compatible schema with ancillary covariates derived from the outcome (age, cnt_trans, platform_Android, platform_iOS, invited_friend), plus a UUID user_id. Parameters ---------- n_users : int Total number of users in the dataset. split : float Proportion of users in the treatment group (e.g., 0.5 => 50/50). random_state : int, optional Seed for reproducibility. target_type : {"binary","normal","nonnormal"} Outcome family. "nonnormal" is approximated via a Poisson mean process. target_params : dict, optional If None, defaults are used: - binary : {"p": {"A": 0.10, "B": 0.12}} - normal : {"mean": {"A": 0.00, "B": 0.20}, "std": 1.0} - nonnormal: {"shape": 2.0, "scale": {"A": 1.0, "B": 1.1}} Returns ------- pd.DataFrame Columns: user_id, treatment, outcome, age, cnt_trans, platform_Android, platform_iOS, invited_friend. Raises ------ ValueError If `target_type` is not one of {"binary", "normal", "nonnormal"}. """ rng = np.random.default_rng(random_state) # Defaults for target parameters if target_params is None: if target_type == "binary": target_params = {"p": {"A": 0.10, "B": 0.12}} elif target_type == "normal": target_params = {"mean": {"A": 0.00, "B": 0.20}, "std": 1.0} elif target_type == "nonnormal": target_params = {"shape": 2.0, "scale": {"A": 1.0, "B": 1.1}} else: raise ValueError("target_type must be 'binary', 'normal', or 'nonnormal'.") # Configure the generator based on requested target_type/params outcome_type = None alpha_y = 0.0 theta = 0.0 sigma_y = 1.0 if target_type == "binary": pA = float(target_params["p"]["A"]) # control prob pB = float(target_params["p"]["B"]) # treatment prob # Map to log-odds scale: logit(p) = log(p/(1-p)) def logit(p): p = np.clip(p, 1e-6, 1-1e-6) return float(np.log(p/(1-p))) alpha_y = logit(pA) theta = logit(pB) - logit(pA) outcome_type = "binary" elif target_type == "normal": muA = float(target_params["mean"]["A"]) # control mean muB = float(target_params["mean"]["B"]) # treatment mean sigma_y = float(target_params.get("std", 1.0)) alpha_y = muA theta = muB - muA outcome_type = "continuous" else: # "nonnormal" -> approximate with Poisson mean process shape = float(target_params.get("shape", 2.0)) scaleA = float(target_params["scale"]["A"]) # control scale scaleB = float(target_params["scale"]["B"]) # treatment scale lamA = shape * scaleA lamB = shape * scaleB alpha_y = float(np.log(max(lamA, 1e-6))) theta = float(np.log(max(lamB, 1e-6)) - np.log(max(lamA, 1e-6))) outcome_type = "poisson" gen = CausalDatasetGenerator( theta=theta, alpha_y=alpha_y, sigma_y=sigma_y, outcome_type=outcome_type, # RCT: treatment independent of X beta_t=None, g_t=None, u_strength_t=0.0, # outcome baseline independent of X beta_y=None, g_y=None, u_strength_y=0.0, target_t_rate=split, seed=random_state, # No need to generate confounders for RCT construction confounder_specs=[], k=0, ) df_core = gen.generate(n_users) # Extract core columns y = df_core["y"].to_numpy() t = df_core["t"].astype(int).to_numpy() # Build ancillary covariates based on outcome to preserve prior behavior # Age age = rng.normal(35 + 4 * y, 8, n_users).round().clip(18, 90).astype(int) # Transactions count (ensure non-negative rate) lam = np.maximum(1.5 + 2 * y, 1e-6) cnt_trans = rng.poisson(lam, n_users).astype(int) # Platform p_android = 1.0 / (1.0 + np.exp(-(-0.4 + 0.8 * y))) platform_android = rng.binomial(1, np.clip(p_android, 0.0, 1.0), n_users).astype(int) platform_ios = (1 - platform_android).astype(int) # Invited friend (normalize y to [0,1]) y_min, y_max = float(np.min(y)), float(np.max(y)) denom = (y_max - y_min) + 1e-8 y_norm = (y - y_min) / denom invited_friend = rng.binomial(1, np.clip(0.05 + 0.25 * y_norm, 0.0, 1.0), n_users).astype(int) # UUIDs user_ids = [str(uuid.uuid4()) for _ in range(n_users)] # Assemble legacy schema df = pd.DataFrame({ "user_id": user_ids, "treatment": t, "outcome": y, "age": age, "cnt_trans": cnt_trans, "platform_Android": platform_android, "platform_iOS": platform_ios, "invited_friend": invited_friend, }) return df
[docs] @dataclass class CausalDatasetGenerator: """ Generate synthetic causal inference datasets with controllable confounding, treatment prevalence, noise, and (optionally) heterogeneous treatment effects. **Data model (high level)** - Confounders X ∈ R^k are drawn from user-specified distributions. - Binary treatment T is assigned by a logistic model: T ~ Bernoulli( sigmoid(alpha_t + f_t(X) + u_strength_t * U) ), where f_t(X) = X @ beta_t + g_t(X), and U ~ N(0,1) is an optional unobserved confounder. - Outcome Y depends on treatment and confounders with link determined by `outcome_type`: outcome_type = "continuous": Y = alpha_y + f_y(X) + u_strength_y * U + T * tau(X) + ε, ε ~ N(0, sigma_y^2) outcome_type = "binary": logit P(Y=1|T,X) = alpha_y + f_y(X) + u_strength_y * U + T * tau(X) outcome_type = "poisson": log E[Y|T,X] = alpha_y + f_y(X) + u_strength_y * U + T * tau(X) where f_y(X) = X @ beta_y + g_y(X), and tau(X) is either constant `theta` or a user function. **Returned columns** - y: outcome - t: binary treatment (0/1) - x1..xk (or user-provided names) - propensity: P(T=1 | X) used to draw T (ground truth) - mu0: E[Y | T=0, X] on the natural outcome scale - mu1: E[Y | T=1, X] on the natural outcome scale - cate: mu1 - mu0 (conditional average treatment effect on the natural outcome scale) Notes on effect scale: - For "continuous", `theta` (or tau(X)) is an additive mean difference. - For "binary", tau acts on the *log-odds* scale (log-odds ratio). - For "poisson", tau acts on the *log-mean* scale (log incidence-rate ratio). Parameters ---------- theta : float, default=1.0 Constant treatment effect used if `tau` is None. tau : callable or None, default=None Function tau(X) -> array-like shape (n,) for heterogeneous effects. Ignored if None. beta_y : array-like of shape (k,), optional Linear coefficients of confounders in the outcome baseline f_y(X). beta_t : array-like of shape (k,), optional Linear coefficients of confounders in the treatment score f_t(X) (log-odds scale). g_y : callable, optional Nonlinear/additive function g_y(X) -> (n,) added to the outcome baseline. g_t : callable, optional Nonlinear/additive function g_t(X) -> (n,) added to the treatment score. alpha_y : float, default=0.0 Outcome intercept (natural scale for continuous; log-odds for binary; log-mean for Poisson). alpha_t : float, default=0.0 Treatment intercept (log-odds). If `target_t_rate` is set, `alpha_t` is auto-calibrated. sigma_y : float, default=1.0 Std. dev. of the Gaussian noise for continuous outcomes. outcome_type : {"continuous","binary","poisson"}, default="continuous" Outcome family and link as defined above. confounder_specs : list of dict, optional Schema for generating confounders. Each spec is one of: {"name": str, "dist": "normal", "mu": float, "sd": float} {"name": str, "dist": "uniform", "a": float, "b": float} {"name": str, "dist": "bernoulli","p": float} {"name": str, "dist": "categorical", "categories": list, "probs": list} - For "categorical", one-hot encoding is produced for all levels except the first. k : int, default=5 Number of confounders when `confounder_specs` is None. Defaults to independent N(0,1). x_sampler : callable, optional Custom sampler (n, k, seed) -> X ndarray of shape (n,k). Overrides `confounder_specs` and `k`. target_t_rate : float in (0,1), optional If set, calibrates `alpha_t` via bisection so that mean propensity ≈ outcome. u_strength_t : float, default=0.0 Strength of the unobserved confounder U in treatment assignment. u_strength_y : float, default=0.0 Strength of the unobserved confounder U in the outcome. seed : int, optional Random seed for reproducibility. Attributes ---------- rng : numpy.random.Generator Internal RNG seeded from `seed`. Examples -------- >>> gen = CausalDatasetGenerator( ... theta=2.0, ... beta_y=np.array([1.0, -0.5, 0.2]), ... beta_t=np.array([0.8, 1.2, -0.3]), ... target_t_rate=0.35, ... outcome_type="continuous", ... sigma_y=1.0, ... seed=42, ... confounder_specs=[ ... {"name":"age", "dist":"normal", "mu":50, "sd":10}, ... {"name":"smoker", "dist":"bernoulli", "p":0.3}, ... {"name":"bmi", "dist":"normal", "mu":27, "sd":4}, ... ]) >>> df = gen.generate(10_000) >>> df.columns Index([... 'y','t','age','smoker','bmi','propensity','mu0','mu1','cate'], dtype='object') """ # Core knobs theta: float = 1.0 # constant treatment effect (ATE) if tau is None tau: Optional[Callable[[np.ndarray], np.ndarray]] = None # heterogeneous effect tau(X) if provided # Confounder -> outcome/treatment effects beta_y: Optional[np.ndarray] = None # shape (k,) beta_t: Optional[np.ndarray] = None # shape (k,) g_y: Optional[Callable[[np.ndarray], np.ndarray]] = None # nonlinear baseline outcome f_y(X) g_t: Optional[Callable[[np.ndarray], np.ndarray]] = None # nonlinear treatment score f_t(X) # Outcome/treatment intercepts and noise alpha_y: float = 0.0 alpha_t: float = 0.0 sigma_y: float = 1.0 # used when outcome_type="continuous" outcome_type: str = "continuous" # "continuous" | "binary" | "poisson" # Confounder generation confounder_specs: Optional[List[Dict[str, Any]]] = None # list of {"name","dist",...} k: int = 5 # used if confounder_specs is None x_sampler: Optional[Callable[[int, int, int], np.ndarray]] = None # custom sampler (n, k, seed)->X # Practical controls target_t_rate: Optional[float] = None # e.g., 0.3 -> ~30% treated; solves for alpha_t u_strength_t: float = 0.0 # unobserved confounder effect on treatment u_strength_y: float = 0.0 # unobserved confounder effect on outcome seed: Optional[int] = None # Internals (filled post-init) rng: np.random.Generator = field(init=False, repr=False) def __post_init__(self): self.rng = np.random.default_rng(self.seed) if self.confounder_specs is not None: self.k = len(self.confounder_specs) # ---------- Confounder sampling ---------- def _sample_X(self, n: int) -> (np.ndarray, List[str]): if self.x_sampler is not None: X = self.x_sampler(n, self.k, self.seed) names = [f"x{i+1}" for i in range(self.k)] return X, names if self.confounder_specs is None: # Default: independent standard normals X = self.rng.normal(size=(n, self.k)) names = [f"x{i+1}" for i in range(self.k)] return X, names cols = [] names = [] for spec in self.confounder_specs: name = spec.get("name") or f"x{len(names)+1}" dist = spec.get("dist", "normal").lower() if dist == "normal": mu = spec.get("mu", 0.0); sd = spec.get("sd", 1.0) col = self.rng.normal(mu, sd, size=n) elif dist == "uniform": a = spec.get("a", 0.0); b = spec.get("b", 1.0) col = self.rng.uniform(a, b, size=n) elif dist == "bernoulli": p = spec.get("p", 0.5) col = self.rng.binomial(1, p, size=n).astype(float) elif dist == "categorical": categories = spec.get("categories", [0,1,2]) probs = spec.get("probs", None) col = self.rng.choice(categories, p=probs, size=n) # one-hot encode (except first level) oh = [ (col == c).astype(float) for c in categories[1:] ] if not oh: oh = [np.zeros(n)] for j, c in enumerate(categories[1:]): cols.append(oh[j]) names.append(f"{name}_{c}") continue else: raise ValueError(f"Unknown dist: {dist}") cols.append(col.astype(float)) names.append(name) X = np.column_stack(cols) if cols else np.empty((n,0)) self.k = X.shape[1] return X, names # ---------- Helpers ---------- def _treatment_score(self, X: np.ndarray, U: np.ndarray) -> np.ndarray: # Ensure numeric, finite arrays Xf = np.asarray(X, dtype=float) lin = np.zeros(Xf.shape[0], dtype=float) if self.beta_t is not None: bt = np.asarray(self.beta_t, dtype=float) if bt.ndim != 1: bt = bt.reshape(-1) if bt.shape[0] != Xf.shape[1]: raise ValueError(f"beta_t shape {bt.shape} is incompatible with X shape {Xf.shape}") lin += np.sum(Xf * bt, axis=1) if self.g_t is not None: lin += np.asarray(self.g_t(Xf), dtype=float) if self.u_strength_t != 0: lin += self.u_strength_t * np.asarray(U, dtype=float) return lin def _outcome_location(self, X: np.ndarray, T: np.ndarray, U: np.ndarray, tau_x: np.ndarray) -> np.ndarray: # location on natural scale for continuous; on logit/log scale for binary/poisson Xf = np.asarray(X, dtype=float) Tf = np.asarray(T, dtype=float) Uf = np.asarray(U, dtype=float) taux = np.asarray(tau_x, dtype=float) loc = float(self.alpha_y) if self.beta_y is not None: by = np.asarray(self.beta_y, dtype=float) if by.ndim != 1: by = by.reshape(-1) if by.shape[0] != Xf.shape[1]: raise ValueError(f"beta_y shape {by.shape} is incompatible with X shape {Xf.shape}") loc += np.sum(Xf * by, axis=1) if self.g_y is not None: loc += np.asarray(self.g_y(Xf), dtype=float) if self.u_strength_y != 0: loc += self.u_strength_y * Uf loc += Tf * taux return loc def _calibrate_alpha_t(self, X: np.ndarray, U: np.ndarray, target: float) -> float: # Bisection on alpha_t so that mean propensity ~ target lo, hi = -50.0, 50.0 # Use a wider range to handle extreme cases for _ in range(100): # Increase iterations for better precision mid = 0.5*(lo+hi) p = _sigmoid(mid + self._treatment_score(X, U)) m = p.mean() if abs(m - target) < 0.001: break if m > target: hi = mid # Current rate too high, decrease alpha_t else: lo = mid # Current rate too low, increase alpha_t return 0.5*(lo+hi) # ---------- Public API ----------
[docs] def generate(self, n: int) -> pd.DataFrame: """ Draw a synthetic dataset of size `n`. Parameters ---------- n : int Number of observations to simulate. Returns ------- pandas.DataFrame Columns: - y : float - t : {0.0, 1.0} - <confounder columns> : floats (and one-hot columns for categorical) - propensity : float in (0,1), true P(T=1 | X) - mu0 : expected outcome under control on the natural scale - mu1 : expected outcome under treatment on the natural scale - cate : mu1 - mu0 (conditional treatment effect on the natural scale) Notes ----- - If `target_t_rate` is set, `alpha_t` is internally recalibrated (bisection) on the *current draw of X and U*, so repeated calls can yield slightly different alpha_t values even with the same seed unless X and U are fixed. - For binary and Poisson outcomes, `cate` is reported on the natural scale (probability or mean), even though the structural model is specified on the log-odds / log-mean scale. """ X, names = self._sample_X(n) U = self.rng.normal(size=n) # unobserved confounder # Treatment assignment if self.target_t_rate is not None: self.alpha_t = self._calibrate_alpha_t(X, U, self.target_t_rate) logits_t = self.alpha_t + self._treatment_score(X, U) propensity = _sigmoid(logits_t) T = self.rng.binomial(1, propensity).astype(float) # Treatment effect (constant or heterogeneous) tau_x = (self.tau(X) if self.tau is not None else np.full(n, self.theta)).astype(float) # Outcome generation loc = self._outcome_location(X, T, U, tau_x) if self.outcome_type == "continuous": Y = loc + self.rng.normal(0, self.sigma_y, size=n) mu0 = self._outcome_location(X, np.zeros(n), U, np.zeros(n)) mu1 = self._outcome_location(X, np.ones(n), U, tau_x) elif self.outcome_type == "binary": # logit: logit P(Y=1|T,X) = loc p = _sigmoid(loc) Y = self.rng.binomial(1, p).astype(float) mu0 = _sigmoid(self._outcome_location(X, np.zeros(n), U, np.zeros(n))) mu1 = _sigmoid(self._outcome_location(X, np.ones(n), U, tau_x)) elif self.outcome_type == "poisson": # log link: log E[Y|T,X] = loc lam = np.exp(loc) Y = self.rng.poisson(lam).astype(float) mu0 = np.exp(self._outcome_location(X, np.zeros(n), U, np.zeros(n))) mu1 = np.exp(self._outcome_location(X, np.ones(n), U, tau_x)) else: raise ValueError("outcome_type must be 'continuous', 'binary', or 'poisson'") df = pd.DataFrame({"y": Y, "t": T}) for j, name in enumerate(names): df[name] = X[:, j] # Useful ground-truth columns for evaluation df["propensity"] = propensity df["mu0"] = mu0 df["mu1"] = mu1 df["cate"] = mu1 - mu0 return df
[docs] def to_causal_data(self, n: int, confounders: Optional[Union[str, List[str]]] = None) -> CausalData: """ Generate a dataset and convert it to a CausalData object. Parameters ---------- n : int Number of observations to simulate. confounders : Union[str, List[str]], optional Column name(s) to use as confounders. If None, all confounder columns are used. Returns ------- CausalData A CausalData object containing the generated data. """ df = self.generate(n) # Determine confounders to use if confounders is None: # Use all confounder columns (exclude y, t, propensity, mu0, mu1, cate) all_cols = set(df.columns) exclude_cols = {'y', 't', 'propensity', 'mu0', 'mu1', 'cate'} confounder_cols = list(all_cols - exclude_cols) else: confounder_cols = confounders # Create and return CausalData object return CausalData(df=df, treatment='t', outcome='y', confounders=confounder_cols)