from typing import Any
import pandas as pd
from sklearn.base import clone
from sklearn.linear_model import LinearRegression
from sklearn.utils.validation import check_is_fitted, validate_data
from pgmpy.prediction._base import _BaseCausalPrediction
[docs]
class NaiveIVRegressor(_BaseCausalPrediction):
"""
Implements Naive Instrumental Variable (IV) regressor (single exposure, multiple instruments).
This estimator implements a simple two-stage least squares style procedure
for the case of a single exposure and a single outcome with one or more
instrumental variables. The first stage fits `exposure ~ instrument`
using `stage1_estimator`. The second stage fits
`outcome ~ predicted_exposure (+ pretreatment covariates)` using `stage2_estimator`.
Parameters
----------
causal_graph : DAG, PDAG, ADMG, MAG, or PAG
Causal graph with defined variable roles
stage1_estimator : optional, sklearn regressor (default = LinearRegression())
Estimator for stage 1 regression of exposure on instrument(s)
stage2_estimator : optional, sklearn regressor (default = LinearRegression())
Estimator for stage 2 regression of outcome on predicted exposure and pretreatment covariates (if any).
Attributes
----------
exposure_var_ : str
Name of the exposure variable (single).
outcome_var_ : str
Name of the outcome variable (single).
instrument_vars_ : list of str
Names of instrument variables extracted from the causal graph
pretreatment_vars_ : list of str
Names of pretreatment covariates extracted from the causal graph.
feature_columns_fit_ : list of str
Names of features used during 'fit'
feature_columns_predict_ : list of str
Names of features used during `predict`.
stage1_est_ : estimator
Fitted first-stage estimator.
stage2_est_ : estimator
Fitted second-stage estimator.
coef_ : array-like
Coefficients from the fitted `stage2_estimator` (if available).
Examples
--------
>>> # Example 1: Basic usage with LinearRegression estimators
>>> import pandas as pd
>>> from pgmpy.base import DAG
>>> from sklearn.linear_model import LinearRegression
>>> from pgmpy.prediction import NaiveIVRegressor
>>>
>>> # Simulate data from a linear Gaussian Bayesian network
>>> lgbn = DAG.from_dagitty(
... "dag { Z1 -> X [beta=0.2] Z2 -> X [beta=0.2] X -> Y [beta=0.3] }"
... )
>>> data = lgbn.simulate(1000, seed=42) # returns a pandas DataFrame
>>> df = data.loc[:, ["X", "Z1", "Z2"]]
>>> df = (df - df.mean(axis=0)) / df.std(axis=0)
>>> y = data["Y"]
>>> G = DAG(
... lgbn.edges(),
... roles={"exposures": "X", "instrument": ("Z1", "Z2"), "outcomes": "Y"},
... )
>>>
>>> model = NaiveIVRegressor(
... causal_graph=G,
... stage1_estimator=LinearRegression(),
... stage2_estimator=LinearRegression(),
... )
>>> # Fit the model and make predictions
>>> _ = model.fit(df, y)
>>> preds = model.predict(df)
>>> preds.shape[0]
1000
>>> # Example 2: Usage with multiple instruments and pretreatment
>>> import pandas as pd
>>> from pgmpy.base import DAG
>>> from sklearn.linear_model import LinearRegression
>>> from pgmpy.prediction import NaiveIVRegressor
>>>
>>> # Simulate data from a linear Gaussian Bayesian Network
>>> lgbn = DAG.from_dagitty(
... "dag { U1 -> X [beta=0.3] U2 -> X [beta=0.2] U3 -> X [beta=0.1] "
... "U4 -> X [beta=0.2] X -> Y [beta=0.6] P -> Y [beta=0.2] }"
... )
>>> data = lgbn.simulate(300, seed=42)
>>> df = data.loc[:, ["X", "U1", "U2", "U3", "P"]]
>>>
>>> dag = DAG(
... ebunch=[
... ("U1", "X"),
... ("U2", "X"),
... ("U3", "X"),
... ("U4", "X"),
... ("X", "Y"),
... ("P", "Y"),
... ],
... roles={
... "exposures": "X",
... "instrument": ("U1", "U2", "U3"),
... "outcomes": "Y",
... "pretreatment": ["P"],
... },
... )
>>> model = NaiveIVRegressor(
... causal_graph=dag,
... )
>>>
>>> # Fit the model and make predictions
>>> _ = model.fit(df, data["Y"])
>>> preds = model.predict(df)
>>> preds.shape[0]
300
>>> # Example 3: Usage with custom estimators and numpy array inputs
>>> import pandas as pd
>>> import numpy as np
>>> from pgmpy.base import DAG
>>> from sklearn.linear_model import LinearRegression
>>> from sklearn.ensemble import RandomForestRegressor
>>> from pgmpy.prediction import NaiveIVRegressor
>>>
>>> dag = DAG(
... ebunch=[(1, 0), (0, 2)],
... roles={"exposures": [0], "outcomes": [2], "instrument": [1]},
... )
>>> model = NaiveIVRegressor(
... causal_graph=dag,
... stage1_estimator=RandomForestRegressor(),
... stage2_estimator=LinearRegression(),
... )
>>>
>>> # Simulate some random data
>>> n_samples = 50
>>> X_array = np.random.normal(0, 1, (n_samples, 2))
>>> y_array = np.random.normal(0, 1, n_samples)
>>>
>>> # Fit the model and make predictions
>>> _ = model.fit(X_array, y_array)
>>> preds = model.predict(X_array)
>>> preds.shape[0]
50
References
----------
.. [1] “Instrumental Variables Estimation.”
Wikipedia: https://en.wikipedia.org/wiki/Instrumental_variables_estimation
"""
def __init__(
self,
causal_graph,
stage1_estimator: Any | None = None,
stage2_estimator: Any | None = None,
):
self.causal_graph = causal_graph
self.stage1_estimator = stage1_estimator
self.stage2_estimator = stage2_estimator
[docs]
def fit(self, X, y, sample_weight: Any | None = None):
"""
This method performs two-stage least squares regression using the specified causal graph.
It first fits the stage 1 estimator to predict the exposure variable from the instrument,
then fits the stage 2 estimator to predict the outcome variable from the predicted exposure
and pretreatment variables.
Parameters
----------
X : pandas.DataFrame or numpy ndarray
Feature data containing exposure, instrument, and pretreatment variables.
y : pandas.Series, pandas.DataFrame, or numpy.ndarray
Outcome variable.
sample_weight : array-like, optional
Sample weights for fitting the estimators.
Returns
-------
self : object
Fitted estimator.
"""
# Step 0: validate Inputs
validate_data(
self,
X,
y,
accept_sparse=False,
ensure_2d=True,
ensure_min_features=2,
dtype="numeric",
)
# Step 1: Initialize data structures and read roles from DAG.
if self.stage1_estimator is None:
self.stage1_estimator = LinearRegression()
if self.stage2_estimator is None:
self.stage2_estimator = LinearRegression()
stage1_estimator = clone(self.stage1_estimator)
stage2_estimator = clone(self.stage2_estimator)
# Step 1.1: Get roles from the causal graph and assign to attributes.
exposure_vars = self.causal_graph.get_role("exposures")
outcome_vars = self.causal_graph.get_role("outcomes")
instrument_vars = self.causal_graph.get_role("instrument")
# Step 1.2: Validate that exactly one exposure, one outcome and atleast one instrument are specified.
if len(exposure_vars) != 1:
raise ValueError(f"The current implementation only works for a single exposure; got {len(exposure_vars)}")
if len(outcome_vars) != 1:
raise ValueError(f"The current implementation only works for a single outcome; got {len(outcome_vars)}")
if len(instrument_vars) < 1:
raise ValueError("NaiveIVRegressor requires at least one instrument.")
self.exposure_var_ = exposure_vars[0]
self.outcome_var_ = outcome_vars[0]
self.instrument_vars_ = instrument_vars
self.pretreatment_vars_ = self.causal_graph.get_role("pretreatment")
self.feature_columns_fit_ = [self.exposure_var_] + self.instrument_vars_ + self.pretreatment_vars_
# Step 1.2: Prepare feature dataframes and sample weights
df = self._prepare_feature_df(X, required_features=self.feature_columns_fit_)
self.feature_columns_predict_ = [self.exposure_var_] + self.pretreatment_vars_
exposure_df = df[self.exposure_var_]
instrument_df = df[self.instrument_vars_]
pretreatment_df = df[self.pretreatment_vars_]
# Step 2: fit stage1: E ~ Z
stage1_estimator.fit(instrument_df, exposure_df, sample_weight=sample_weight)
t_hat = stage1_estimator.predict(instrument_df)
# Step 2.1: fit stage2: Y ~ t_hat + X
t_hat_2d = pd.DataFrame(t_hat.reshape(-1, 1), columns=[self.exposure_var_])
covariates_df = pd.concat([t_hat_2d, pretreatment_df], axis=1)
stage2_estimator.fit(covariates_df, y, sample_weight=sample_weight)
# step 3: Store fitted estimators and coefficients
self.stage1_est_ = stage1_estimator
self.stage2_est_ = stage2_estimator
self.coef_ = self.stage2_est_.coef_
return self
[docs]
def predict(self, X):
# Step 0: Validate Inputs and check if fit has been called
check_is_fitted(self, "stage1_est_")
check_is_fitted(self, "stage2_est_")
validate_data(self, X, accept_sparse=False, ensure_2d=True, dtype="numeric", reset=False)
# Step 1: Prepare feature DataFrame for prediction
X_df = self._prepare_feature_df(X, required_features=self.feature_columns_predict_)
exposure = X_df[self.exposure_var_]
pre_treatment = X_df[self.pretreatment_vars_]
# Step 2: Predict using stage2 estimator
y_pred = self.stage2_est_.predict(pd.concat([exposure, pre_treatment], axis=1))
return y_pred