import math
from itertools import combinations
import networkx as nx
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.metrics import f1_score
from tqdm import tqdm
from pgmpy import config
from pgmpy.base import DAG
from pgmpy.models import BayesianNetwork
[docs]
def correlation_score(
model,
data,
test="chi_square",
significance_level=0.05,
score=f1_score,
return_summary=False,
):
"""
Function to score how well the model structure represents the correlations
in the data. The model doesn't need to be parameterized for this score.
A Bayesian Network or DAG has d-connection property which can be used to
determine which variables are correlated according to the model. This
function uses this d-connection/d-separation property to compare the model
with variable correlations in a given dataset. For every pair of variables
in the dataset, a correlation test (specified by `test` argument) is done.
We say that any two variables are correlated if the test's p-value <
significance_level. The same pair of variables are then tested whether they
are d-connected in the network structure or not. Finally, a metric specified
by `score` is computed by using the correlation test as the true value and
d-connections as predicted values.
Absense of correlation/d-separation is considered as the positive class for
computing the metrics.
Parameters
----------
model: Instance of pgmpy.base.DAG or pgmpy.models.BayesianNetwork
The model which needs to be tested.
data: pandas.DataFrame instance
The dataset against which to test the model structure.
test: str or function
The statistical tests to use for determining whether the variables in data
are correlated or not. For discrete variables, the options are: 1) chi_square
2) g_sq 3) log_likelihood 4) freeman_tuckey 5) modified_log_likelihood 6) neyman
7) cressie_read. For continuous variables only one test is available: 1) pearsonr.
A function with the signature fun(X, Y, Z, data) can also be passed which
returns True for uncorrelated and False otherwise.
significance_level: float
A value between 0 and 1. If p_value < significance_level, the variables are
considered uncorrelated.
score: function (default: f1-score)
Any classification scoring metric from scikit-learn.
https://scikit-learn.org/stable/modules/classes.html#classification-metrics
return_summary: boolean (default: False)
If True, returns a dataframe with details for each of the conditions checked.
Returns
-------
The specified metric: float
The metric specified by the `score` argument. By defults returns the f1-score.
Examples
--------
>>> from pgmpy.utils import get_examples_model
>>> from pgmpy.metrics import correlation_score
>>> alarm = get_example_model("alarm")
>>> data = alarm.simulate(int(1e4))
>>> correlation_score(alarm, data, test="chi_square", significance_level=0.05)
0.911957950065703
"""
from pgmpy.estimators.CITests import (
chi_square,
g_sq,
log_likelihood,
modified_log_likelihood,
pearsonr,
pillai_trace,
)
# Step 1: Checks for input arguments.
supported_tests = {
"chi_square": chi_square,
"g_sq": g_sq,
"log_likelihood": log_likelihood,
"modified_log_likelihood": modified_log_likelihood,
"pearsonr": pearsonr,
"pillai": pillai_trace,
}
if not isinstance(model, (DAG, BayesianNetwork)):
raise ValueError(
f"model must be an instance of pgmpy.base.DAG or pgmpy.models.BayesianNetwork. Got {type(model)}"
)
elif not isinstance(data, pd.DataFrame):
raise ValueError(f"data must be a pandas.DataFrame instance. Got {type(data)}")
elif set(model.nodes()) != set(data.columns):
raise ValueError(
f"Missing columns in data. Can't find values for the following variables: { set(model.nodes()) - set(data.columns) }"
)
elif (test not in supported_tests.keys()) and (not callable(test)):
raise ValueError(f"test not supported and not a callable")
elif not callable(score):
raise ValueError(f"score should be scikit-learn classification metric.")
# Step 2: Create a dataframe of every 2 combination of variables
results = []
for i, j in combinations(model.nodes(), 2):
test_result = supported_tests[test](
X=i,
Y=j,
Z=[],
data=data,
boolean=True,
significance_level=significance_level,
)
d_connected = not model.is_dconnected(start=i, end=j)
results.append(
{"var1": i, "var2": j, "stat_test": test_result, "d_connected": d_connected}
)
results = pd.DataFrame(results)
metric = score(
y_true=results["stat_test"].values, y_pred=results["d_connected"].values
)
if return_summary:
return results
else:
return metric
[docs]
def log_likelihood_score(model, data):
"""
Computes the log-likelihood of a given dataset i.e. P(data | model).
The log-likelihood measure can be used to check how well the specified
model describes the data. This method requires the parameters of the model to be
specified as well. Direct interpretation of this score is difficult but can
be used to compare the fit of two or more models. A higher score means ab
better fit.
Parameters
----------
model: pgmpy.base.DAG or pgmpy.models.BayesianNetwork instance
The model whose score needs to be computed.
data: pd.DataFrame instance
The dataset against which to score the model.
Examples
--------
>>> from pgmpy.metrics import log_likelihood_score
>>> from pgmpy.utils import get_example_model
>>> model = get_example_model("alarm")
>>> data = model.simulate(int(1e4))
>>> log_likelihood_score(model, data)
-103818.57516969478
"""
# Step 1: Check the inputs
if not isinstance(model, BayesianNetwork):
raise ValueError(f"Only Bayesian Networks are supported. Got {type(model)}.")
elif not isinstance(data, pd.DataFrame):
raise ValueError(f"data must be a pandas.DataFrame instance. Got {type(data)}")
elif set(model.nodes()) != set(data.columns):
raise ValueError(
f"Missing columns in data. Can't find values for the following variables: { set(model.nodes()) - set(data.columns) }"
)
model.check_model()
# Step 2: Compute the log-likelihood
from pgmpy.metrics import BayesianModelProbability
return BayesianModelProbability(model).score(data)
[docs]
def structure_score(model, data, scoring_method="bic-g", **kwargs):
"""
Uses the standard model scoring methods to give a score for each structure.
The score doesn't have very straight forward interpretebility but can be
used to compare different models. A higher score represents a better fit.
This method only needs the model structure to compute the score and parameters
aren't required.
Parameters
----------
model: pgmpy.base.DAG or pgmpy.models.BayesianNetwork instance
The model whose score needs to be computed.
data: pd.DataFrame instance
The dataset against which to score the model.
scoring_method: str
Options are: k2, bdeu, bds, bic-d, aic-d, ll-g, aic-g, bic-g, ll-cg, aic-cg, bic-cg
kwargs: kwargs
Any additional parameters that needs to be passed to the
scoring method. Check pgmpy.estimators.StructureScore for details.
Returns
-------
Model score: float
A score value for the model.
Examples
--------
>>> from pgmpy.utils import get_example_model
>>> from pgmpy.metrics import structure_score
>>> model = get_example_model('alarm')
>>> data = model.simulate(int(1e4))
>>> structure_score(model, data, scoring_method="bic-g")
-106665.9383064447
"""
from pgmpy.estimators import (
AIC,
BIC,
K2,
AICCondGauss,
AICGauss,
BDeu,
BDs,
BICCondGauss,
BICGauss,
LogLikelihoodCondGauss,
LogLikelihoodGauss,
)
supported_methods = {
"k2": K2,
"bdeu": BDeu,
"bds": BDs,
"bic-d": BIC,
"aic-d": AIC,
"ll-g": LogLikelihoodGauss,
"aic-g": AICGauss,
"bic-g": BICGauss,
"ll-cg": LogLikelihoodCondGauss,
"aic-cg": AICCondGauss,
"bic-cg": BICCondGauss,
}
# Step 1: Test the inputs
if not isinstance(model, (DAG, BayesianNetwork)):
raise ValueError(
f"model must be an instance of pgmpy.base.DAG or pgmpy.models.BayesianNetwork. Got {type(model)}"
)
elif not isinstance(data, pd.DataFrame):
raise ValueError(f"data must be a pandas.DataFrame instance. Got {type(data)}")
elif set(model.nodes()) != set(data.columns):
raise ValueError(
f"Missing columns in data. Can't find values for the following variables: { set(model.nodes()) - set(data.columns) }"
)
elif (scoring_method not in supported_methods.keys()) and (
not callable(scoring_method)
):
raise ValueError(f"scoring method not supported and not a callable")
# Step 2: Compute the score and return
return supported_methods[scoring_method](data, **kwargs).score(model)
[docs]
def implied_cis(model, data, ci_test, show_progress=True):
"""
Tests the implied Conditional Independences (CI) of the DAG in the given data.
Each missing edge in a model structure implies a CI statement. If the
distribution of the data is faithful to the constraints of the model
structure, these CI statements should hold in the data as well. This
function runs statistical tests for each implied CI on the given data.
Parameters
==========
model: pgmpy.base.DAG or pgmpy.models.BayesianNetwork
The model whose structure need to be tested against the given data.
data: pd.DataFrame
Dataset to use for testing.
ci_test: function
The function for statistical test. Can be either any of the tests in
pgmpy.estimators.CITests or any custom function of the same form.
show_progress: bool (default: True)
Whether to show the progress of testing.
Returns
=======
pd.DataFrame: Returns a dataframe with each implied CI of the model and a p-value
corresponding to it from the statistical test. A low p-value (e.g. <0.05)
represents that the CI does not hold in the data.
Examples
========
>>> from pgmpy.utils import get_example_model
>>> from pgmpy.metrics import implied_cis
>>> from pgmpy.estimators.CITests import chi_square
>>> model = get_example_model('cancer')
>>> df = model.simulate(int(1e3))
>>> implied_cis(model=model, data=df, ci_test=chi_square, show_progress=False)
u v cond_vars p-value
0 Pollution Smoker [] 0.189851
1 Pollution Xray [Cancer] 0.404149
2 Pollution Dyspnoea [Cancer] 0.613370
3 Smoker Xray [Cancer] 0.352665
4 Smoker Dyspnoea [Cancer] 1.000000
5 Xray Dyspnoea [Cancer] 0.888619
"""
if not isinstance(model, (DAG, BayesianNetwork)):
raise ValueError(
f"model must be an instance of DAG or BayesianNetwork. Got {type(model)}"
)
cis = []
if show_progress and config.SHOW_PROGRESS:
comb_iter = tqdm(
combinations(model.nodes(), 2), total=math.comb(len(model.nodes()), 2)
)
else:
comb_iter = combinations(model.nodes(), 2)
for u, v in comb_iter:
if not ((u in model[v]) or (v in model[u])):
Z = list(model.minimal_dseparator(u, v))
test_results = ci_test(X=u, Y=v, Z=Z, data=data, boolean=False)
cis.append([u, v, Z, test_results[1]])
cis = pd.DataFrame(cis, columns=["u", "v", "cond_vars", "p-value"])
return cis
[docs]
def fisher_c(model, data, ci_test, show_progress=True):
"""
Returns a p-value for testing whether the given data is faithful to the
model structure's constraints.
Each missing edge in a model structure implies a CI statement. This test
uses constructs implied CIs such that they are independent of each other,
run statistical tests for each of them on the data, and finally combines
them using the Fisher's method.
Parameters
==========
model: pgmpy.base.DAG or pgmpy.models.BayesianNetwork
The model whose structure need to be tested against the given data.
data: pd.DataFrame
Dataset to use for testing.
ci_test: function
The function for statistical test. Can be either any of the tests in
pgmpy.estimators.CITests or any custom function of the same form.
show_progress: bool (default: True)
Whether to show the progress of testing.
Returns
=======
float: The p-value for the fit of the model structure to the data. A low
p-value (e.g. <0.05) represents that the model structure doesn't fit the
data well.
Examples
========
>>> from pgmpy.utils import get_example_model
>>> from pgmpy.metrics import implied_cis
>>> from pgmpy.estimators.CITests import chi_square
>>> model = get_example_model('cancer')
>>> df = model.simulate(int(1e3))
>>> fisher_c(model=model, data=df, ci_test=chi_square, show_progress=False)
0.7504
"""
if not isinstance(model, (DAG, BayesianNetwork)):
raise ValueError(
f"model must be an instance of DAG or BayesianNetwork. Got {type(model)}"
)
if len(model.latents) > 0:
raise ValueError(
f"This test can not be performed on models with latent variables."
)
cis = []
if show_progress and config.SHOW_PROGRESS:
comb_iter = tqdm(
combinations(model.nodes(), 2), total=math.comb(len(model.nodes()), 2)
)
else:
comb_iter = combinations(model.nodes(), 2)
for u, v in comb_iter:
if not ((u in model[v]) or (v in model[u])):
Z = set(model.predecessors(u)).union(model.predecessors(v))
test_results = ci_test(X=u, Y=v, Z=Z, data=data, boolean=False)
cis.append([u, v, Z, test_results[1]])
cis = pd.DataFrame(cis, columns=["u", "v", "cond_vars", "p_value"])
cis.loc[:, "p_value"] = cis.loc[:, "p_value"].clip(lower=1e-6)
C = -2 * np.log(cis.loc[:, "p_value"]).sum()
p_value = 1 - stats.chi2.cdf(C, df=2 * cis.shape[0])
return p_value
[docs]
def SHD(true_model, est_model):
"""
Computes the Structural Hamming Distance between `true_model` and `est_model`.
SHD is defined as total number of basic operations: adding edges, removing
edges, and reversing edges required to transform one graph to the other. It
is a symmetrical measure.
The code first accounts for edges that need to be deleted (from true_model),
added (to true_model) and finally edges that need to be reversed. All operations
count as 1.
Parameters
----------
true_model: pgmpy.base.DAG or pgmpy.base.CPDAG or pgmpy.models.BayesianNetwork
The first model to compare.
est_model: pgmpy.base.DAG or pgmpy.base.CPDAG or pgmpy.models.BayesianNetwork
The second model to compare.
Returns
-------
int:
If both true_model and est_model are DAGs or Bayesian Networks returns
an integer.
Examples
--------
>>> from pgmpy.metrics import SHD
>>> from pgmpy.models import BayesianNetwork
>>> dag1 = BayesianNetwork([(1, 2), (2, 3)])
>>> dag2 = BayesianNetwork([(2, 1), (2, 3)])
>>> SHD(dag1, dag2)
1
"""
if set(true_model.nodes()) != set(est_model.nodes()):
raise ValueError("The graphs must have the same nodes.")
nodes_list = true_model.nodes()
dag_true = nx.DiGraph(true_model.edges())
m1 = nx.adjacency_matrix(dag_true, nodelist=nodes_list).todense()
dag_est = nx.DiGraph(est_model.edges())
m2 = nx.adjacency_matrix(dag_est, nodelist=nodes_list).todense()
shd = 0
s1 = m1 + m1.T
s2 = m2 + m2.T
# Edges that are in m1 but not in m2 (deletions from m1)
ds = s1 - s2
ind = np.where(ds > 0)
m1[ind] = 0
shd = shd + (len(ind[0]) / 2)
# Edges that are in m2 but not in m1 (additions to m1)
ind = np.where(ds < 0)
m1[ind] = m2[ind]
shd = shd + (len(ind[0]) / 2)
# Edges that need to be simply reversed
d = np.abs(m1 - m2)
shd = shd + (np.sum((d + d.T) > 0) / 2)
return int(shd)