from collections.abc import Callable, Hashable
from itertools import permutations
import networkx as nx
import pandas as pd
from pgmpy.base import PDAG, UndirectedGraph
from pgmpy.causal_discovery import ExpertKnowledge
from pgmpy.causal_discovery._base import _BaseCausalDiscovery, _ConstraintMixin
from pgmpy.ci_tests import get_ci_test
[docs]
class PC(_ConstraintMixin, _BaseCausalDiscovery):
"""
The PC algorithm for causal discovery / structure learning.
This class implements the PC algorithm [1]_ for causal discovery. Given a
tabular dataset, the PC algorithm estimates the causal structure among the
variables in the data in a Directed Acyclic Graph (DAG) or Partially
Directed Acyclic Graph (PDAG). The algorithm works by identifying
(conditional) dependencies in data set using statistical independence tests
and estimates a DAG pattern that satisfies the identified dependencies.
When used with expert knowledge, the following flowchart can help you figure
out the expected results based on different choices of parameters and the
structure learned from the data.
┌──────────────────┐ No ┌─────────────┐
│ Expert Knowledge ├──────────► │ Normal PC │
│ specified? │ │ run │
└────────┬─────────┘ └─────────────┘
│
Yes │
│
▼
┌──────────────────┐
│ Enforce expert │
│ knowledge? │
└────────┬─────────┘
│
│
Yes │ No
┌─────────────────────────┴───────────────────────┐
│ │
▼ ▼
┌──────────────────────────────┐ ┌─────────────────────────┐
│ │ │ │
│ 1) Forbidden edges are │ │ Conflicts with learned │
│ removed from the skeleton │ │ structure (opposite │
│ │ │ edge orientations)? │
│ 2) Required edges will be │ │ │
│ present in the final │ └───────────┬─────────────┘
│ model (but direction is │ │
│ not guaranteed) │ ┌────────────────┴──────────────────┐
│ │ Yes │ │ No
└──────────────────────────────┘ │ │
▼ ▼
┌───────────────────┐ ┌──────────────────┐
│ Conflicting edges │ │ Expert knowledge │
│ are ignored │ │ applied fully │
└───────────────────┘ └──────────────────┘
Parameters
----------
variant: str, default="parallel"
The variant of PC algorithm to run.
- "orig": The original PC algorithm. Might not give the same results in different runs but does less
independence tests compared to stable.
- "stable": Gives the same result in every run but does needs to do more statistical independence tests.
- "parallel": Parallel version of PC Stable. Can run on multiple cores with the same result on each run. The
parallel version would be faster only on datasets with large number of variables or samples. For smaller
datasets, it might be slower due to the overhead of managing multiple processes.
ci_test : str or callable, default=None
The conditional independence (CI) test to use for finding (conditional) independences in the data. This can be
any of the CI test implemented in :mod:`pgmpy.ci_tests` or a custom function that follows the
signature of the built-in CI tests.
If None, the appropriate CI test will be chosen based on the data type.
return_type : str, default="pdag"
The type of structure to return. Can be one of: `pdag`, `cpdag`, `dag`.
- If `return_type=pdag` or `return_type=cpdag`: a partially directed structure is returned.
- If `return_type=dag`, a fully directed structure is returned. This DAG is one of the possible orientations of
the PDAG learned by the PC algorithm.
significance_level : float, default=0.01
The p-value threshold to use for the statistical independence tests. If the p-value of a test is greater than
`significance_level`, then the variables are considered independent.
max_cond_vars : int, default=5
The maximum number conditional variables to consider while performing conditional independence tests.
expert_knowledge : :class:`pgmpy.estimators.ExpertKnowledge`, optional
Expert knowledge to be used in the causal graph construction. This needs to be an instance of
:class:`pgmpy.estimators.ExpertKnowledge`. Users can specify knowledge in the form of required/forbidden edges,
temporal information, or restrict the search space.
enforce_expert_knowledge : bool, default=False
If True, the expert knowledge will be strictly enforced. This implies the following:
- For every edge (u, v) specified in `forbidden_edges`, there will be no edge between u and v.
- For every edge (u, v) specified in `required_edges`, one of the following would be present in the final model:
u -> v, u <- v, or u - v (if CPDAG is returned).
If False, the algorithm attempts to make the edge orientations as specified by expert knowledge after learning
the skeleton. This implies the following:
- For every edge (u, v) specified in `forbidden_edges`, the final graph would have either v <- u or no edge
except if u -> v is part of a collider structure in the learned skeleton.
- For every edge (u, v) specified in `required_edges`, the final graph would either have u -> v or no edge
except if v <- u is part of a collider structure in the learned skeleton.
n_jobs : int, default=-1
The number of jobs to run in parallel. This is only used when `variant="parallel"`.
show_progress : bool, default=True
If True, shows a progress bar while learning the causal structure.
Attributes
----------
causal_graph_ : :class:`~pgmpy.base.DAG` or :class: `~pgmpy.base.PDAG`
The learned causal graph.
- If `return_type="pdag"`, this will be a PDAG instance.
- If `return_type="dag"`, this will be a DAG instance.
adjacency_matrix_ : pd.DataFrame
Adjacency matrix representation of the learned causal graph, i.e. `causal_graph_`.
skeleton_ : :class:`~pgmpy.base.UndirectedGraph`
An estimate for the undirected graph skeleton of the DAG underlying the data.
separating_sets_ : dict
A dict containing for each pair of not directly connected nodes a
separating set ("witnessing set") of variables that makes them
conditionally independent. (needed for edge orientation procedures)
n_features_in_ : int
The number of features in the data used to learn the causal graph.
feature_names_in_ : np.ndarray
The feature names in the data used to learn the causal graph.
Examples
--------
Simulate some data to use for causal discovery:
>>> from pgmpy.example_models import load_model
>>> model = load_model("bnlearn/alarm")
>>> df = model.simulate(n_samples=1000, seed=42)
Use the PC algorithm to learn the causal structure from data:
>>> from pgmpy.causal_discovery import PC
>>> pc = PC(variant="parallel", ci_test="chi_square", significance_level=0.01)
>>> pc.fit(df)
PC(ci_test='chi_square')
>>> pc.causal_graph_ # doctest: +ELLIPSIS
<pgmpy.base.PDAG.PDAG object at 0x...>
>>> pc.n_features_in_
37
Specify expert knowledge:
References
----------
.. [1] Spirtes, P., Glymour, C., & Scheines, R. (2001). Causation, prediction, and search.
doi:10.7551/mitpress/1754.001.0001
.. [2] Neapolitan, Learning Bayesian Networks, Section 10.1.2 for the PC algorithm (page 550),
http://www.cs.technion.ac.il/~dang/books/Learning%20Bayesian%20Networks(Neapolitan,%20Richard).pdf
.. [3] Original PC: P. Spirtes, C. Glymour, and R. Scheines, Causation, Prediction, and Search, 2nd ed.
Cambridge, MA: MIT Press, 2000.
.. [4] Stable PC: D. Colombo and M. H. Maathuis, “A modification of the PC algorithm yielding order-independent
skeletons,” ArXiv e-prints, Nov. 2012.
.. [5] Parallel PC: Le, Thuc, et al. "A fast PC algorithm for high dimensional causal discovery with multi-core
PCs." IEEE/ACM transactions on computational biology and bioinformatics (2016).
.. [6] Expert Knowledge: Meek, Christopher. "Causal inference and causal explanation with background knowledge."
arXiv preprint arXiv:1302.4972 (2013).
"""
def __init__(
self,
variant: str = "parallel",
ci_test: str | Callable | None = None,
return_type: str = "pdag",
significance_level: float = 0.01,
max_cond_vars: int = 5,
expert_knowledge: ExpertKnowledge | None = None,
enforce_expert_knowledge: bool = False,
n_jobs: int = -1,
show_progress: bool = True,
):
self.variant = variant
self.ci_test = ci_test
self.return_type = return_type
self.significance_level = significance_level
self.max_cond_vars = max_cond_vars
self.expert_knowledge = expert_knowledge
self.enforce_expert_knowledge = enforce_expert_knowledge
self.n_jobs = n_jobs
self.show_progress = show_progress
def _fit(self, X: pd.DataFrame, independencies=None):
"""
The fitting procedure for the PC algorithm.
Parameters
----------
X : pd.DataFrame or np.ndarray
The data to learn the causal structure from. If a numpy array is
passed, then the column names would be integers from 0 to n_features-1.
Returns
-------
self : pgmpy.causal_discovery.PC
Returns the instance with the fitted attributes.
"""
# CI test
ci_test = get_ci_test(test=self.ci_test, data=X)
if self.expert_knowledge is None:
expert_knowledge = ExpertKnowledge()
else:
expert_knowledge = self.expert_knowledge
if expert_knowledge.search_space:
expert_knowledge.limit_search_space(X.columns)
# Step 1: Build the skeleton
self.skeleton_, self.separating_sets_ = self._build_skeleton(
data=X,
independencies=independencies,
variant=self.variant,
ci_test=ci_test,
significance_level=self.significance_level,
max_cond_vars=self.max_cond_vars,
expert_knowledge=expert_knowledge,
enforce_expert_knowledge=self.enforce_expert_knowledge,
n_jobs=self.n_jobs,
show_progress=self.show_progress,
)
# Step 2: Use separating sets to orient colliders
pdag = self._orient_colliders(self.skeleton_, self.separating_sets_, expert_knowledge.temporal_ordering)
# Step 3: apply orientation rules and expert knowledge
if expert_knowledge.temporal_order != [[]]:
pdag = expert_knowledge.apply_expert_knowledge(pdag)
pdag = pdag.apply_meeks_rules(apply_r4=True)
elif not self.enforce_expert_knowledge:
pdag = pdag.apply_meeks_rules(apply_r4=False)
pdag = expert_knowledge.apply_expert_knowledge(pdag)
pdag = pdag.apply_meeks_rules(apply_r4=True)
else:
pdag = pdag.apply_meeks_rules(apply_r4=False)
pdag.add_nodes_from(set(X.columns) - set(pdag.nodes()))
if self.return_type in ("pdag", "cpdag"):
self.causal_graph_ = pdag
elif self.return_type == "dag":
self.causal_graph_ = pdag.to_dag()
else:
raise ValueError(f"return_type must be one of: dag, pdag, or cpdag. Got: {self.return_type}")
self.adjacency_matrix_ = nx.to_pandas_adjacency(self.causal_graph_, weight=1, dtype="int")
return self
@staticmethod
def _orient_colliders(
skeleton: UndirectedGraph,
separating_sets: dict[frozenset, set],
temporal_ordering: dict[Hashable, int] = dict(),
) -> PDAG:
"""
Orients the edges that form v-structures in a graph skeleton based on
the `separating_sets` to form a PDAG. For each pair of non adjacent
nodes `u`, `v` , if a common neighbor `z` is not in the separating set of `u` and `v`;
then the v-structure is oriented as `u`->`z` , `v`->`z`.
Parameters
----------
skeleton: nx.Graph
An undirected graph skeleton as e.g. produced by the
estimate_skeleton method.
separating_sets: dict
A dict containing for each pair of not directly connected nodes a
separating set ("witnessing set") of variables that makes them
conditionally independent.
Returns
-------
Model after edge orientation: pgmpy.base.PDAG
An estimate for the DAG pattern of the BN underlying the data. The
graph might contain some nodes with both-way edges (X->Y and Y->X).
Any completion by (removing one of the both-way edges for each such
pair) results in a I-equivalent Bayesian network DAG.
References
----------
[1] Neapolitan, Learning Bayesian Networks, Section 10.1.2, Algorithm
10.2 (page 550)
[2] http://www.cs.technion.ac.il/~dang/books/Learning%20Bayesian%20Networks(Neapolitan,%20Richard).pdf
Examples
--------
>>> import pandas as pd
>>> import numpy as np
>>> from pgmpy.causal_discovery import PC
>>> from pgmpy.example_models import load_model
>>> df = load_model("bnlearn/cancer").simulate(int(1e3), seed=42)
>>> est = PC(ci_test='chi_square').fit(df)
>>> pdag = est._orient_colliders(est.skeleton_, est.separating_sets_)
>>> sorted(pdag.edges())
[('Pollution', 'Cancer'), ('Xray', 'Cancer')]
"""
pdag = skeleton.to_directed()
# 1) for each X-Z-Y, if Z not in the separating set of X,Y, then orient edges
# as X->Z<-Y (Algorithm 3.4 in Koller & Friedman PGM, page 86)
for X, Y in permutations(sorted(pdag.nodes()), 2):
if not skeleton.has_edge(X, Y):
for Z in set(skeleton.neighbors(X)) & set(skeleton.neighbors(Y)):
if Z not in separating_sets[frozenset((X, Y))]:
if (temporal_ordering == dict()) or (
(temporal_ordering[Z] >= temporal_ordering[X])
and (temporal_ordering[Z] >= temporal_ordering[Y])
):
pdag.remove_edges_from([(Z, X), (Z, Y)])
edges = set(pdag.edges())
undirected_edges = set()
directed_edges = set()
for u, v in edges:
if (v, u) in edges:
undirected_edges.add(tuple(sorted((u, v))))
else:
directed_edges.add((u, v))
pdag_oriented = PDAG(directed_ebunch=directed_edges, undirected_ebunch=undirected_edges)
pdag_oriented.add_nodes_from(pdag.nodes())
return pdag_oriented