Source code for pgmpy.estimators.GES

from itertools import combinations

import networkx as nx
import numpy as np

from pgmpy import config
from pgmpy.base import DAG
from pgmpy.estimators import (
    AICScore,
    AICScoreGauss,
    BDeuScore,
    BDsScore,
    BicScore,
    BicScoreGauss,
    K2Score,
    ScoreCache,
    StructureEstimator,
    StructureScore,
)
from pgmpy.global_vars import logger


[docs] class GES(StructureEstimator): """ Implementation of Greedy Equivalence Search (GES) causal discovery / structure learning algorithm. GES is a score-based casual discovery / structure learning algorithm that works in three phases: 1. Forward phase: New edges are added such that the model score improves. 2. Backward phase: Edges are removed from the model such that the model score improves. 3. Edge flipping phase: Edge orientations are flipped such that model score improves. Parameters ---------- data: pandas DataFrame object dataframe object where each column represents one variable. (If some values in the data are missing the data cells should be set to `numpy.nan`. Note that pandas converts each column containing `numpy.nan`s to dtype `float`.) use_caching: boolean If True, uses caching of score for faster computation. Note: Caching only works for scoring methods which are decomposable. Can give wrong results in case of custom scoring methods. References ---------- Chickering, David Maxwell. "Optimal structure identification with greedy search." Journal of machine learning research 3.Nov (2002): 507-554. """ def __init__(self, data, use_cache=True, **kwargs): self.use_cache = use_cache super(GES, self).__init__(data=data, **kwargs) def _legal_edge_additions(self, current_model): """ Returns a list of all edges that can be added to the graph such that it remains a DAG. """ edges = [] for u, v in combinations(current_model.nodes(), 2): if not (current_model.has_edge(u, v) or current_model.has_edge(v, u)): if not nx.has_path(current_model, v, u): edges.append((u, v)) if not nx.has_path(current_model, u, v): edges.append((v, u)) return edges def _legal_edge_flips(self, current_model): """ Returns a list of all the edges in the `current_model` that can be flipped such that the model remains a DAG. """ potential_flips = [] edges = list(current_model.edges()) for u, v in edges: current_model.remove_edge(u, v) if not nx.has_path(current_model, u, v): potential_flips.append((v, u)) # Restore the edge to get to the original model current_model.add_edge(u, v) return potential_flips
[docs] def estimate(self, scoring_method="bic", debug=False): """ Estimates the DAG from the data. Parameters ---------- scoring_method: str or StructureScore instance The score to be optimized during structure estimation. Supported structure scores: k2, bdeu, bds, bic, aic, bic-g, aic-g. Also accepts a custom score, but it should be an instance of `StructureScore`. Returns ------- Estimated model: pgmpy.base.DAG A `DAG` at a (local) score maximum. Examples -------- >>> # Simulate some sample data from a known model to learn the model structure from >>> from pgmpy.utils import get_example_model >>> model = get_example_model('alarm') >>> df = model.simulate(int(1e3)) >>> # Learn the model structure using GES algorithm from `df` >>> from pgmpy.estimators import GES >>> est = GES(data) >>> dag = est.estimate(scoring_method='bic') >>> len(dag.nodes()) 37 >>> len(dag.edges()) 45 """ # Step 0: Initial checks and setup for arguments supported_methods = { "k2": K2Score, "bdeu": BDeuScore, "bds": BDsScore, "bic": BicScore, "aic": AICScore, "aic-g": AICScoreGauss, "bic-g": BicScoreGauss, } if isinstance(scoring_method, str): if scoring_method.lower() in [ "k2score", "bdeuscore", "bdsscore", "bicscore", "aicscore", ]: raise ValueError( f"The scoring method names have been changed. Please refer the documentation." ) elif scoring_method.lower() not in list(supported_methods.keys()): raise ValueError( f"Unknown scoring method. Please refer documentation for a list of supported score metrics." ) elif not isinstance(scoring_method, StructureScore): raise ValueError( "scoring_method should either be one of k2score, bdeuscore, bicscore, bdsscore, aicscore, or an instance of StructureScore" ) if isinstance(scoring_method, str): score = supported_methods[scoring_method.lower()](data=self.data) else: score = scoring_method if self.use_cache: score_fn = ScoreCache(score, self.data).local_score else: score_fn = score.local_score # Step 1: Initialize an empty model. current_model = DAG() current_model.add_nodes_from(list(self.data.columns)) # Step 2: Forward step: Iteratively add edges till score stops improving. while True: potential_edges = self._legal_edge_additions(current_model) score_deltas = np.zeros(len(potential_edges)) for index, (u, v) in enumerate(potential_edges): current_parents = current_model.get_parents(v) score_delta = score_fn(v, current_parents + [u]) - score_fn( v, current_parents ) score_deltas[index] = score_delta if (len(potential_edges) == 0) or (np.all(score_deltas <= 0)): break edge_to_add = potential_edges[np.argmax(score_deltas)] current_model.add_edge(edge_to_add[0], edge_to_add[1]) if debug: logger.info( f"Adding edge {edge_to_add[0]} -> {edge_to_add[1]}. Improves score by: {score_deltas.max()}" ) # Step 3: Backward Step: Iteratively remove edges till score stops improving. while True: potential_removals = list(current_model.edges()) score_deltas = np.zeros(len(potential_removals)) for index, (u, v) in enumerate(potential_removals): current_parents = current_model.get_parents(v) score_deltas[index] = score_fn( v, [node for node in current_parents if node != u] ) - score_fn(v, current_parents) if (len(potential_removals) == 0) or (np.all(score_deltas <= 0)): break edge_to_remove = potential_removals[np.argmax(score_deltas)] current_model.remove_edge(edge_to_remove[0], edge_to_remove[1]) if debug: logger.info( f"Removing edge {edge_to_remove[0]} -> {edge_to_remove[1]}. Improves score by: {score_deltas.max()}" ) # Step 4: Flip Edges: Iteratively try to flip edges till score stops improving. while True: potential_flips = self._legal_edge_flips(current_model) score_deltas = np.zeros(len(potential_flips)) for index, (u, v) in enumerate(potential_flips): v_parents = current_model.get_parents(v) u_parents = current_model.get_parents(u) score_deltas[index] = ( score_fn(v, v_parents + [u]) - score_fn(v, v_parents) ) + ( score_fn(u, [node for node in u_parents if node != v]) - score_fn(u, u_parents) ) if (len(potential_flips) == 0) or (np.all(score_deltas <= 0)): break edge_to_flip = potential_flips[np.argmax(score_deltas)] current_model.remove_edge(edge_to_flip[1], edge_to_flip[0]) current_model.add_edge(edge_to_flip[0], edge_to_flip[1]) if debug: logger.info( f"Fliping edge {edge_to_flip[1]} -> {edge_to_flip[0]}. Improves score by: {score_deltas.max()}" ) # Step 5: Return the model. return current_model