Source code for pgmpy.ci_tests.gcm
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.base import clone
from sklearn.linear_model import LinearRegression
from ._base import _BaseCITest
[docs]
class GCM(_BaseCITest):
r"""
Generalized Covariance Measure (GCM) [1] test for conditional independence.
Fit an estimator on :math:`X` and :math:`Y` on :math:`[1, Z]`, let :math:`r_X` and :math:`r_Y` denote the
resulting residuals, and define :math:`U_i = r_{X, i} r_{Y, i}`. The resulting test statistic is
.. math::
T = \frac{1}{\sqrt{n}} \frac{\sum_{i=1}^n U_i}{\operatorname{std}(U_1, \ldots, U_n)},
where :math:`n` is the sample size. Under the null hypothesis :math:`X \perp Y \mid Z`, this statistic is
asymptotically standard normal.
Parameters
----------
data : pandas.DataFrame
The dataset in which to test the independence condition.
estimator: optional (default=None)
Any regressor with fit and predict methods to compute residuals. If None, LinearRegression() is used
as default.
Attributes
----------
statistic_ : float
The GCM test statistic. Set after calling the test.
p_value_ : float
The p-value for the test. Set after calling the test.
References
----------
.. [1] Rajen D. Shah, and Jonas Peters. "The Hardness of Conditional Independence Testing and the Generalised
Covariance Measure".
"""
_tags = {
"name": "gcm",
"data_types": ("continuous",),
"default_for": None,
"requires_data": True,
}
def __init__(self, data: pd.DataFrame, estimator=None):
self.data = data
if estimator is None:
self.estimator = LinearRegression()
else:
# Check if estimator is sklearn compatible.
required_methods = ["fit", "predict", "get_params", "set_params"]
if not all(hasattr(estimator, method) for method in required_methods):
raise ValueError(
"`estimator` must be a scikit-learn compatible.",
"It must have fit, predict methods and be clonable.",
)
self.estimator = estimator
super().__init__()
[docs]
def run_test(
self,
X: str,
Y: str,
Z: list,
):
"""
Compute GCM statistic and p-value.
Sets ``self.statistic_`` (t-statistic) and ``self.p_value_``.
"""
# Step 1: Append intercept column to ensure Z is never empty
Z_data = np.column_stack([self.data.loc[:, list(Z)].values, np.ones(self.data.shape[0])])
# Step 2: Compute residuals using the provided estimator
est_x = clone(self.estimator)
est_y = clone(self.estimator)
est_x.fit(Z_data, self.data.loc[:, X])
est_y.fit(Z_data, self.data.loc[:, Y])
res_x = self.data.loc[:, X] - est_x.predict(Z_data)
res_y = self.data.loc[:, Y] - est_y.predict(Z_data)
# Step 3: Compute the Generalised Covariance Measure.
n = res_x.shape[0]
t_stat = (1 / np.sqrt(n)) * np.dot(res_x, res_y) / np.std(res_x * res_y)
# Step 4: Compute p-value using standard normal distribution.
p_value = 2 * stats.norm.sf(np.abs(t_stat))
self.statistic_ = t_stat
self.p_value_ = p_value
return self.statistic_, self.p_value_