Source code for pgmpy.prediction.DoubleMLRegressor

from typing import Any

import numpy as np
import pandas as pd
from sklearn.base import clone
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import KFold
from sklearn.utils.validation import check_is_fitted, validate_data

from pgmpy.prediction._base import _BaseCausalPrediction


[docs] class DoubleMLRegressor(_BaseCausalPrediction): """ Implements the Double Machine Learning Regressor[1] (DML2) with cross-fitting. This estimator implements the DoubleML algorithm with cross-fitting in a scikit-learn compatible estimator API. It uses user-specified causal graphs to extract exposure, outcome, and adjustment variables and uses that to fit/predict a Double ML regressor. The model is defined as follows: Given data D: (Y, T, X), where: Y : outcome variable T : treatment (exposure) variable X : adjustment (confounder + pretreatment) variables The DoubleML fitting procedure consists of three main steps: 1. Sample splitting into `n_folds` folds for cross-fitting. 2. Fitting two nuisance estimators on each fold: - Outcome Models (`outcome_est_`): Predict Y using X. - Treatment Models (`treatment_est_`): Predict T from X. 2. Computing residuals using nuisance estimators on each fold. - Outcome residuals: `Y - outcome_est_.predict(X)` - Treatment residuals: `T - treatment_est_.predict(X)` 3. Stack the residuals from the folds together and fit the effect estimator (`effect_est_`) to predict the outcome residuals from the treatment residuals. Using the fitted models, predictions on new data `(X_new, T_new)` are computed as: `res_T_new = T_new - treatment_est_.predict(X_new)` `Y_pred = effect_est_(res_T_new) + outcome_est_.predict(X_new)` Parameters ---------- causal_graph : DAG, PDAG, ADMG, MAG, or PAG Causal graph with defined variable roles. The causal graph must have the following roles: `exposures`, `outcomes`, and `adjustment`. Additionally, `pretreatment` can be specified. nuisance_estimators: an estimator or a tuple of estimators of size 2 (default=LinearRegression) If a single estimator is provided, it is used for both outcome and treatment nuisance models. If a tuple of two estimators is provided, the first one is used for the treatment model and the second for the outcome model. If None, defaults to LinearRegression for both models. effect_estimator : estimator-like (default=LinearRegression) Estimator for the final effect estimation step. Must have a `fit` method and a `predict` method. If None, defaults to LinearRegression. n_folds : int, default=5 Number of folds to use for cross-fitting. If 1, doesn't perform cross-fitting and computes in-sample residuals. seed : int or None Random seed for cross-fitting splits. Attributes ---------- n_folds_ : int Number of folds used in cross-fitting. n_features_in_ : int Number of features seen during fit. n_samples_ : int Number of samples seen during fit. exposure_var_ : str Name of the exposure (treatment) variable. outcome_var_ : str Name of the outcome variable. adjustment_vars_ : list of str Names of adjustment (confounder) variables. pretreatment_vars_ : list of str Names of pretreatment variables. feature_columns_fit_ : list of str Names of features used in the model. outcome_est_ : estimator-like or list of estimator-like Fitted outcome nuisance model(s). treatment_est_ : estimator-like or list of estimator-like Fitted treatment nuisance model(s). effect_est_ : estimator-like Fitted final effect estimator. Examples -------- >>> # Example 1: With adjustments and cross-fitting >>> import numpy as np >>> import pandas as pd >>> from sklearn.linear_model import LinearRegression >>> from pgmpy.base.DAG import DAG >>> from pgmpy.prediction import DoubleMLRegressor >>> # Simulate data from a linear Gaussian BN that we use to estimate the causal effect from. >>> lgbn = DAG.from_dagitty( ... "dag { X -> T [beta=0.2] X -> Y [beta=0.3] T -> Y [beta=0.4] }" ... ) >>> data = lgbn.simulate(n_samples=1000, seed=42) >>> X = data.loc[:, ["X", "T"]] >>> y = data["Y"] >>> # construct a DAG (roles must match DataFrame column names) >>> dag = DAG( ... lgbn.edges(), roles={"exposures": "T", "adjustment": "X", "outcomes": "Y"} ... ) >>> dml = DoubleMLRegressor( ... causal_graph=dag, ... nuisance_estimators=LinearRegression(), ... effect_estimator=LinearRegression(), ... n_folds=3, ... ) >>> dml = dml.fit(X, y) >>> dml.effect_est_ LinearRegression() >>> dml.effect_est_.coef_.round(1) array([0.4]) >>> preds = dml.predict(X.iloc[:5]) >>> preds.shape (5,) >>> dml.n_folds_ 3 >>> dml.n_samples_ 1000 Notes ----- While the implementations allows the effect estimator to be any sklearn compatible estimator, the theoretical guarantees for DoubleML hold when the effect estimator is a linear model (such as LinearRegression). Using non-linear effect estimators may lead to biased estimates. References ---------- .. [1] Chernozhukov, V., Chetverikov, D., Demirer, M., Duflo, E., Hansen, C., Newey, W., & Robins, J. (2018). Double/debiased machine learning for treatment and structural parameters. The Econometrics Journal, 21(1), C1-C68. """ def __init__( self, causal_graph, nuisance_estimators=None, effect_estimator=None, n_folds: int = 5, seed: int | None = None, ): self.causal_graph = causal_graph self.nuisance_estimators = nuisance_estimators self.effect_estimator = effect_estimator self.n_folds = n_folds self.seed = seed
[docs] def fit(self, X, y, sample_weight: Any | None = None): """ Fit the DoubleML model using the provided data. Parameters ---------- X : pandas.DataFrame or numpy.ndarray Feature data containing exposure and adjustment variables. If a numpy array is provided, it is converted to a dataframe with column names starting from 0. y : pandas.Series, pandas.DataFrame, or numpy.ndarray Outcome variable. If a DataFrame is provided, it must have a single column. sample_weight : array-like of shape (n_samples,), optional Sample weights to be used in fitting the nuisance and effect estimators. Returns ------- self : object Fitted estimator. """ # Step 0: Validate inputs # Step 0.1: Check `nuisance_estimators`, `effect_estimator`, and assign variables. if self.nuisance_estimators is None: treatment_est = LinearRegression() outcome_est = LinearRegression() elif isinstance(self.nuisance_estimators, tuple): if len(self.nuisance_estimators) != 2: raise ValueError("If nuisance_estimators is a tuple, it must have exactly two elements.") treatment_est = clone(self.nuisance_estimators[0]) outcome_est = clone(self.nuisance_estimators[1]) else: treatment_est = clone(self.nuisance_estimators) outcome_est = clone(self.nuisance_estimators) if self.effect_estimator is None: effect_est = LinearRegression() else: effect_est = clone(self.effect_estimator) # Step 0.2: Validate `n_folds` if (not isinstance(self.n_folds, int)) or (self.n_folds < 1): raise ValueError("n_folds must be an integer >= 1 ") self.n_folds_ = self.n_folds # Step 0.3: Validate `X` and `y` validate_data(self, X, y, accept_sparse=False, ensure_2d=True, dtype="numeric") # Step 0.4: Validate single exposure and outcome. exposure_vars = self.causal_graph.get_role("exposures") outcome_vars = self.causal_graph.get_role("outcomes") if len(exposure_vars) != 1: raise ValueError(f"DoubleMLRegressor only supports a single exposure variable. Got: {len(exposure_vars)}") if len(outcome_vars) != 1: raise ValueError(f"DoubleMLRegressor only supports a single outcome variable. Got: {len(outcome_vars)}") # Step 0.5: Check if n_folds is greater than n_samples. if self.n_folds_ > np.asarray(X).shape[0]: raise ValueError("The number of folds specified is greater than the number of samples.") # Step 1: Initialize data structures and read roles from DAG. # Step 1.1: Get roles from the causal graph and assign to attributes. self.exposure_var_ = exposure_vars[0] self.outcome_var_ = outcome_vars[0] self.adjustment_vars_ = self.causal_graph.get_role("adjustment") self.pretreatment_vars_ = self.causal_graph.get_role("pretreatment") self.feature_columns_fit_ = [self.exposure_var_] + self.adjustment_vars_ + self.pretreatment_vars_ # Step 1.2: Prepare feature dataframe and sample weights. df = self._prepare_feature_df(X, required_features=self.feature_columns_fit_) df.insert(0, self.outcome_var_, np.asarray(y)) exposure_df = df[self.exposure_var_] outcome_df = df[self.outcome_var_] self.n_samples_ = df.shape[0] if sample_weight is None: sample_weight = np.ones(self.n_samples_) # Step 2: Prepare covariate dataframe. If no adjustment or pretreatment # variables, use intercept only. if len(self.adjustment_vars_ + self.pretreatment_vars_) == 0: covariates_df = pd.DataFrame({"_intercept": np.ones(self.n_samples_)}, index=df.index) else: covariates_df = df[self.adjustment_vars_ + self.pretreatment_vars_] # Step 3: Fit nuisance models # Step 3.1: If n_folds = 1, fit nuisance models on full data and # compute in-sample predictions. self.outcome_est_ = [] self.treatment_est_ = [] if int(self.n_folds) == 1: outcome_est.fit(covariates_df, outcome_df, sample_weight=sample_weight) outcome_pred = outcome_est.predict(covariates_df) treatment_est.fit(covariates_df, exposure_df, sample_weight=sample_weight) treatment_pred = treatment_est.predict(covariates_df) self.outcome_est_.append(outcome_est) self.treatment_est_.append(treatment_est) # Step 3.2: If n_folds > 1, perform cross-fitting and compute out-of-sample predictions. else: splitter = KFold(n_splits=self.n_folds, shuffle=True, random_state=self.seed) outcome_pred = pd.Series(0.0, index=df.index) treatment_pred = pd.Series(0.0, index=df.index) for train_idx, test_idx in splitter.split(covariates_df, exposure_df): outcome_est_kfold = clone(outcome_est) outcome_est_kfold.fit( covariates_df.iloc[train_idx], outcome_df.iloc[train_idx], sample_weight=sample_weight[train_idx], ) outcome_pred.iloc[test_idx] = outcome_est_kfold.predict(covariates_df.iloc[test_idx]) self.outcome_est_.append(outcome_est_kfold) treatment_est_kfold = clone(treatment_est) treatment_est_kfold.fit( covariates_df.iloc[train_idx], exposure_df.iloc[train_idx], sample_weight=sample_weight[train_idx], ) treatment_pred.iloc[test_idx] = treatment_est_kfold.predict(covariates_df.iloc[test_idx]) self.treatment_est_.append(treatment_est_kfold) # Step 4: Compute the residuals. outcome_res = outcome_df - outcome_pred treatment_res = exposure_df - treatment_pred # Step 5: Fit the final effect estimator on the residuals. effect_est.fit(treatment_res.to_frame(), outcome_res, sample_weight=sample_weight) self.effect_est_ = effect_est return self
[docs] def predict(self, X): """ Makes conditional interventional (CATE) predictions using the fitted DoubleML model. Parameters ---------- X : pandas.DataFrame Feature data containing data for exposure and adjustment variables for which to make predictions. Returns ------- outcome_pred : numpy.ndarray Predicted outcome values. """ # Step 0: Validate inputs and check if fitted check_is_fitted(self, "effect_est_") check_is_fitted(self, "outcome_est_") check_is_fitted(self, "treatment_est_") validate_data(self, X, accept_sparse=False, ensure_2d=True, dtype="numeric", reset=False) # Step 1: Prepare feature DataFrame X_df = self._prepare_feature_df(X, required_features=self.feature_columns_fit_) X_intervention = X_df.loc[:, [self.exposure_var_]] if len(self.adjustment_vars_ + self.pretreatment_vars_) == 0: X_new_covariates = pd.DataFrame({"_intercept": np.ones(X_df.shape[0])}, index=X_df.index) else: X_new_covariates = X_df[self.adjustment_vars_ + self.pretreatment_vars_] # Step 2: Compute and return predictions. Average the predictions from # each fold's nuisance model to compute the outcome prediction. outcome_preds = np.column_stack([est.predict(X_new_covariates) for est in self.outcome_est_]) outcome_pred_mean = np.mean(outcome_preds, axis=1) outcome_pred = self.effect_est_.predict(X_intervention) + outcome_pred_mean return outcome_pred