Source code for pgmpy.metrics.fisher_c
import math
from itertools import combinations
import numpy as np
import pandas as pd
from scipy import stats
from tqdm import tqdm
from pgmpy.base import DAG
from pgmpy.ci_tests import get_ci_test
from pgmpy.global_vars import config
from pgmpy.metrics import _BaseUnsupervisedMetric
[docs]
class FisherC(_BaseUnsupervisedMetric):
"""
Returns a p-value for testing whether the given data is faithful to the
model structure's constraints.
Each missing edge in a model structure implies a CI statement. This test
uses constructs implied CIs such that they are independent of each other,
run statistical tests for each of them on the data, and finally combines
them using the Fisher's method.
Parameters
----------
ci_test: str or callable
The CI test to use for statistical testing. Can be a string name of any test
in :mod:`pgmpy.ci_tests` (e.g. ``"chi_square"``, ``"pearsonr"``) or a callable.
compute_rmsea: bool (default: False)
While calculating Fisher C statistic if RMSEA value required should be
included in method call as True. Returns a tuple of (p-value, rmsea) if
True otherwise only the p-value.
show_progress: bool (default: True)
Whether to show the progress of testing.
Returns
-------
float (default): The p-value for the fit of the model structure to the data. A low
p-value (e.g. <0.05) represents that the model structure doesn't fit the
data well. This is returned if the compute_rmsea parameter is False.
tuple: A (float, float) tuple packing p-value and rmsea value is returned if RMSEA
computation is necessary, i.e., compute_rmsea is True in the method call
Examples
--------
>>> from pgmpy.example_models import load_model
>>> model = load_model("bnlearn/cancer")
>>> df = model.simulate(int(1e3))
>>> fisher_c = FisherC(ci_test="chi_square", compute_rmsea=False)
>>> fisher_c(X=df, causal_graph=model)
0.7504
"""
_tags = {
"name": "fisher_c",
"requires_true_graph": False,
"requires_data": True,
"lower_is_better": False,
"supported_graph_types": (DAG,),
"is_default": False,
}
def __init__(self, ci_test=None, compute_rmsea=False, show_progress=True):
self.ci_test = ci_test
self.compute_rmsea = compute_rmsea
self.show_progress = show_progress
def _evaluate(self, X, causal_graph):
if len(causal_graph.latents) > 0:
raise ValueError("This test can not be performed on models with latent variables.")
cis = []
ci_test = get_ci_test(test=self.ci_test, data=X)
if self.show_progress and config.SHOW_PROGRESS:
comb_iter = tqdm(
combinations(causal_graph.nodes(), 2),
total=math.comb(len(causal_graph.nodes()), 2),
)
else:
comb_iter = combinations(causal_graph.nodes(), 2)
for u, v in comb_iter:
if not ((u in causal_graph[v]) or (v in causal_graph[u])):
Z = set(causal_graph.predecessors(u)).union(causal_graph.predecessors(v))
ci_test.is_independent(X=u, Y=v, Z=list(Z))
cis.append([u, v, Z, ci_test.p_value_])
cis = pd.DataFrame(cis, columns=["u", "v", "cond_vars", "p_value"])
cis.loc[:, "p_value"] = cis.loc[:, "p_value"].clip(lower=1e-6)
C = -2 * np.log(cis.loc[:, "p_value"]).sum()
p_value = 1 - stats.chi2.cdf(C, df=2 * cis.shape[0])
rmsea = np.nan
if self.compute_rmsea:
if len(X) != 1 and len(cis) != 0:
rmsea = np.sqrt(max((C - 2 * len(cis)) / (2 * len(cis) * (len(X) - 1)), 0))
return (p_value, rmsea)
else:
return p_value