from collections import deque
from collections.abc import Hashable
import networkx as nx
import pandas as pd
from tqdm.auto import trange
from pgmpy import config
from pgmpy.base import DAG
from pgmpy.causal_discovery import ExpertKnowledge
from pgmpy.causal_discovery._base import _BaseCausalDiscovery, _ScoreMixin
from pgmpy.structure_score import BaseStructureScore, get_scoring_method
[docs]
class HillClimbSearch(_ScoreMixin, _BaseCausalDiscovery):
"""
Score-based causal discovery using hill climbing optimization.
This class implements the HillClimbSearch algorithm [1]_ for causal discovery.
Given a tabular dataset, the algorithm estimates the causal structure among
the variables in the data as a Directed Acyclic Graph (DAG). The algorithm
works by iteratively making local modifications to the graph structure
(adding, removing, or reversing edges) and keeping changes that improve
the score until a local maximum is reached.
The algorithm is a greedy local search method that:
1. Starts from an initial graph (empty by default or based on provided expert knowledge).
2. Evaluates all possible single-edge modifications (add, delete, reverse).
3. Applies the modification with the highest score improvement.
4. Repeats until no improvement can be made.
A tabu list is used to prevent the algorithm from immediately undoing recent
changes, which helps avoid getting stuck in local optima.
Parameters
----------
scoring_method : str or BaseStructureScore instance, default=None
The score to be optimized during structure estimation. Supported
structure scores:
- Discrete data: 'k2', 'bdeu', 'bds', 'bic-d', 'aic-d'
- Continuous data: 'll-g', 'aic-g', 'bic-g'
- Mixed data: 'll-cg', 'aic-cg', 'bic-cg'
If None, the appropriate scoring method is automatically selected based
on the data type. Also accepts a custom score instance that inherits
from `BaseStructureScore`.
start_dag : DAG instance, default=None
The starting point for the local search. By default, a completely
disconnected network (no edges) is used. If provided, the DAG must
contain exactly the same variables as in the data.
tabu_length : int, default=100
The number of recent graph modifications to store in the tabu list.
These modifications cannot be reversed during the search procedure.
This serves to enforce a wider exploration of the search space.
max_indegree : int or None, default=None
If provided, the procedure only searches among models where all nodes
have at most `max_indegree` parents. This can significantly reduce
the search space and computation time for large graphs.
expert_knowledge : ExpertKnowledge instance, default=None
Expert knowledge to be used with the algorithm. Expert knowledge
allows specification of:
- Required edges that must be present in the final graph
- Forbidden edges that cannot be present in the final graph
- Temporal ordering of nodes
return_type : str, default='pdag'
The type of graph to return. Options are:
- 'dag': Returns a directed acyclic graph (DAG).
- 'pdag': Returns a partially directed acyclic graph (PDAG) where edges that
could not be oriented are left undirected.
epsilon : float, default=1e-4
Defines the exit condition. If the improvement in score is less
than `epsilon`, the algorithm terminates and returns the learned model.
max_iter : int, default=1e6
The maximum number of iterations allowed. The algorithm terminates
and returns the learned model when the number of iterations exceeds
`max_iter`.
show_progress : bool, default=True
If True, shows a progress bar while learning the causal structure.
Attributes
----------
causal_graph_ : DAG
The learned causal graph as a DAG at a (local) score maximum.
adjacency_matrix_ : pd.DataFrame
Adjacency matrix representation of the learned causal graph.
n_features_in_ : int
The number of features in the data used to learn the causal graph.
feature_names_in_ : np.ndarray
The feature names in the data used to learn the causal graph.
Examples
--------
Simulate some data to use for causal discovery:
>>> from pgmpy.example_models import load_model
>>> model = load_model("bnlearn/alarm")
>>> df = model.simulate(n_samples=1000, seed=42)
Use the HillClimbSearch algorithm to learn the causal structure from data:
>>> from pgmpy.causal_discovery import HillClimbSearch
>>> hc = HillClimbSearch(scoring_method="bic-d")
>>> hc.fit(df)
>>> hc.causal_graph_.edges()
Use expert knowledge to constrain the search:
>>> from pgmpy.causal_discovery import ExpertKnowledge
>>> expert = ExpertKnowledge(forbidden_edges=[("HISTORY", "CVP")])
>>> hc = HillClimbSearch(scoring_method="bic-d", expert_knowledge=expert)
>>> hc.fit(df)
References
----------
.. [1] Koller & Friedman, Probabilistic Graphical Models - Principles and
Techniques, 2009, Section 18.4.3 (page 811ff)
"""
def __init__(
self,
scoring_method: str | BaseStructureScore | None = None,
start_dag: DAG | None = None,
tabu_length: int = 100,
max_indegree: int | None = None,
expert_knowledge: ExpertKnowledge | None = None,
return_type: str = "pdag",
epsilon: float = 1e-4,
max_iter: int = int(1e6),
show_progress: bool = True,
):
self.scoring_method = scoring_method
self.start_dag = start_dag
self.tabu_length = tabu_length
self.max_indegree = max_indegree
self.expert_knowledge = expert_knowledge
self.return_type = return_type
self.epsilon = epsilon
self.max_iter = max_iter
self.show_progress = show_progress
def _fit(self, X: pd.DataFrame):
"""
The fitting procedure for the HillClimbSearch algorithm.
Parameters
----------
X : pd.DataFrame or np.ndarray
The data to learn the causal structure from. If a numpy array is
passed, then the column names would be integers from 0 to n_features-1.
Returns
-------
self : pgmpy.causal_discovery.HillClimbSearch
Returns the instance with the fitted attributes.
"""
self.variables_ = list(X.columns)
# Step 1: Initial checks and setup for arguments
# Step 1.1: Check score
score = get_scoring_method(self.scoring_method, X)
# Step 1.2: Check the start_dag
if self.start_dag is None:
start_dag = DAG()
start_dag.add_nodes_from(self.variables_)
elif not isinstance(self.start_dag, DAG) or not set(self.start_dag.nodes()) == set(self.variables_):
raise ValueError("'start_dag' should be a DAG with the same variables as the data set, or 'None'.")
else:
start_dag = self.start_dag.copy()
# Step 1.3: Check if expert knowledge was specified
if self.expert_knowledge is None:
expert_knowledge = ExpertKnowledge()
else:
expert_knowledge = self.expert_knowledge
# Step 1.3.1: If search_space in expert_knowledge is not None, limit the search space
if expert_knowledge.search_space:
expert_knowledge.limit_search_space(X.columns)
# Step 1.4: Check if required edges cause a cycle
start_dag.add_edges_from(expert_knowledge.required_edges)
if not nx.is_directed_acyclic_graph(start_dag):
raise ValueError(
"required_edges create a cycle in start_dag. Please modify either required_edges or start_dag."
)
expert_knowledge._orient_temporal_forbidden_edges(start_dag, only_edges=False)
start_dag.remove_edges_from(expert_knowledge.forbidden_edges)
# Step 1.5: Initialize max_indegree, tabu_list, and progress bar
max_indegree = self.max_indegree
if max_indegree is None:
max_indegree = float("inf")
tabu_list: deque[tuple[str, tuple[Hashable, Hashable]]] = deque(maxlen=self.tabu_length)
current_model = start_dag
if self.show_progress and config.SHOW_PROGRESS:
iteration = trange(int(self.max_iter))
else:
iteration = range(int(self.max_iter))
# Step 2: For each iteration, find the best scoring operation and
# do that to the current model. If no legal operation is
# possible, sets best_operation=None.
for _ in iteration:
best_operation, best_score_delta = max(
self._legal_operations_dag(
model=current_model,
scoring_method=score,
tabu_list=tabu_list,
max_indegree=max_indegree,
forbidden_edges=expert_knowledge.forbidden_edges,
required_edges=expert_knowledge.required_edges,
),
key=lambda t: t[1],
default=(None, None),
)
if best_operation is None or best_score_delta < self.epsilon:
break
elif best_operation[0] == "+":
current_model.add_edge(*best_operation[1])
tabu_list.append(("-", best_operation[1]))
elif best_operation[0] == "-":
current_model.remove_edge(*best_operation[1])
tabu_list.append(("+", best_operation[1]))
elif best_operation[0] == "flip":
X_node, Y_node = best_operation[1]
current_model.remove_edge(X_node, Y_node)
current_model.add_edge(Y_node, X_node)
tabu_list.append(best_operation)
# Step 3: Store results
if self.return_type.lower() == "dag":
self.causal_graph_ = current_model
elif self.return_type.lower() == "pdag":
self.causal_graph_ = current_model.to_pdag()
else:
raise ValueError(f"return_type must be one of: dag, pdag, or cpdag. Got: {self.return_type}")
self.adjacency_matrix_ = nx.to_pandas_adjacency(self.causal_graph_, weight=1, dtype="int")
return self