Source code for pgmpy.inference.base

#!/usr/bin/env python3

from collections import defaultdict
from itertools import chain

from pgmpy.factors.discrete import DiscreteFactor, TabularCPD
from pgmpy.models import (
    DiscreteBayesianNetwork,
    DiscreteMarkovNetwork,
    DynamicBayesianNetwork,
    FactorGraph,
    JunctionTree,
)
from pgmpy.utils import compat_fns


[docs] class Inference: """ Base class for all inference algorithms. Converts DiscreteBayesianNetwork and DiscreteMarkovNetwork to a uniform representation so that inference algorithms can be applied. Also, it checks if all the associated CPDs / Factors are consistent with the model. Initialize inference for a model. Parameters ---------- model: pgmpy.models.DiscreteBayesianNetwork or pgmpy.models.DiscreteMarkovNetwork model for which to initialize the inference object. Examples -------- >>> from pgmpy.inference import Inference >>> from pgmpy.models import DiscreteBayesianNetwork >>> from pgmpy.factors.discrete import TabularCPD >>> student = DiscreteBayesianNetwork([("diff", "grade"), ("intel", "grade")]) >>> diff_cpd = TabularCPD("diff", 2, [[0.2], [0.8]]) >>> intel_cpd = TabularCPD("intel", 2, [[0.3], [0.7]]) >>> grade_cpd = TabularCPD( ... "grade", ... 3, ... [[0.1, 0.1, 0.1, 0.1], [0.1, 0.1, 0.1, 0.1], [0.8, 0.8, 0.8, 0.8]], ... evidence=["diff", "intel"], ... evidence_card=[2, 2], ... ) >>> student.add_cpds(diff_cpd, intel_cpd, grade_cpd) >>> model = Inference(student) >>> from pgmpy.models import DiscreteMarkovNetwork >>> from pgmpy.factors.discrete import DiscreteFactor >>> import numpy as np >>> student = DiscreteMarkovNetwork( ... [ ... ("Alice", "Bob"), ... ("Bob", "Charles"), ... ("Charles", "Debbie"), ... ("Debbie", "Alice"), ... ] ... ) >>> factor_a_b = DiscreteFactor( ... ["Alice", "Bob"], cardinality=[2, 2], values=np.random.rand(4) ... ) >>> factor_b_c = DiscreteFactor( ... ["Bob", "Charles"], cardinality=[2, 2], values=np.random.rand(4) ... ) >>> factor_c_d = DiscreteFactor( ... ["Charles", "Debbie"], cardinality=[2, 2], values=np.random.rand(4) ... ) >>> factor_d_a = DiscreteFactor( ... ["Debbie", "Alice"], cardinality=[2, 2], values=np.random.rand(4) ... ) >>> student.add_factors(factor_a_b, factor_b_c, factor_c_d, factor_d_a) >>> model = Inference(student) """ def __init__(self, model): self.model = model model.check_model() if isinstance(self.model, JunctionTree): self.variables = set(chain(*self.model.nodes())) else: self.variables = self.model.nodes() def _initialize_structures(self): """ Initializes all the data structures which will later be used by the inference algorithms. """ if isinstance(self.model, JunctionTree): self.variables = set(chain(*self.model.nodes())) else: self.variables = self.model.nodes() self.cardinality = {} self.factors = defaultdict(list) if isinstance(self.model, DiscreteBayesianNetwork): self.state_names_map = {} for node in self.model.nodes(): cpd = self.model.get_cpds(node) if isinstance(cpd, TabularCPD): self.cardinality[node] = cpd.variable_card cpd = cpd.to_factor() for var in cpd.scope(): self.factors[var].append(cpd) self.state_names_map.update(cpd.no_to_name) elif isinstance(self.model, (DiscreteMarkovNetwork, FactorGraph, JunctionTree)): self.cardinality = self.model.get_cardinality() for factor in self.model.get_factors(): for var in factor.variables: self.factors[var].append(factor) elif isinstance(self.model, DynamicBayesianNetwork): # Initialize main inference properties for DBN self.state_names_map = {} for node in self.model.nodes(): cpd = self.model.get_cpds(node) if isinstance(cpd, TabularCPD): self.cardinality[node] = cpd.variable_card cpd_factor = cpd.to_factor() for var in cpd_factor.scope(): self.factors[var].append(cpd_factor) self.state_names_map.update(cpd_factor.no_to_name) # Create start_bayesian_model intra_edges_0 = self.model.get_intra_edges(0) self.start_bayesian_model = DiscreteBayesianNetwork(intra_edges_0) # Add all nodes from time slice 0 even if there are no intra-edges time_slice_0_nodes = self.model.get_slice_nodes(time_slice=0) for node in time_slice_0_nodes: if node not in self.start_bayesian_model.nodes(): self.start_bayesian_model.add_node(node) self.start_bayesian_model.add_cpds(*self.model.get_cpds(time_slice=0)) cpd_inter = [self.model.get_cpds(node) for node in self.model.get_interface_nodes(1)] self.interface_nodes = self.model.get_interface_nodes(0) self.one_and_half_model = DiscreteBayesianNetwork( self.model.get_inter_edges() + self.model.get_intra_edges(1) ) self.one_and_half_model.add_cpds(*(self.model.get_cpds(time_slice=1) + cpd_inter)) def _prune_bayesian_model(self, variables, evidence): """ Prunes unnecessary nodes from the model to optimize the computation. Parameters ---------- variables: list The variables on which the query is done i.e. the variables whose values we are interested in. evidence: dict (default: None) The variables whose values we know. The values can be specified as {variable: state}. Returns ------- Pruned model: pgmpy.models.DiscreteBayesianNetwork The pruned model. Examples -------- >>> >>> References ---------- [1] Baker, M., & Boult, T. E. (2013). Pruning Bayesian networks for efficient computation. arXiv preprint arXiv:1304.1112. """ evidence = {} if evidence is None else evidence variables = list(self.model.nodes()) if len(variables) == 0 else list(variables) # Step 1: Remove all the variables that are d-separated from `variables` when conditioned # on `evidence` d_connected = self.model.active_trail_nodes( variables=variables, observed=list(evidence.keys()), include_latents=True ) d_connected = set.union(*d_connected.values()).union(evidence.keys()) bn = self.model.subgraph(d_connected) evidence = {var: state for var, state in evidence.items() if var in d_connected} # Step 2: Reduce the model to ancestral graph of [`variables` + `evidence`] bn = bn.get_ancestral_graph(list(variables) + list(evidence.keys())) # Step 3: Since all the CPDs are lost, add them back. Also marginalize them if some # of the variables in scope aren't in the network anymore. cpds = [] for var in bn.nodes(): cpd = self.model.get_cpds(var) scope_diff = set(cpd.scope()) - set(bn.nodes()) if len(scope_diff) == 0: cpds.append(cpd) else: cpds.append(cpd.marginalize(scope_diff, inplace=False)) bn.cpds = cpds return bn, evidence def _check_virtual_evidence(self, virtual_evidence): """ Checks the virtual evidence's format is correct. Each evidence must: - Be a TabularCPD instance or a DiscreteFactor on a single variable. - Be targeted to a single variable - Be defined on a variable which is in the model - Have the same cardinality as its corresponding variable in the model Parameters ---------- virtual_evidence: list A list of TabularCPD instances specifying the virtual evidence for each of the evidence variables. """ for cpd in virtual_evidence: if not isinstance(cpd, (TabularCPD, DiscreteFactor)): raise ValueError( f"Virtual evidence should be an instance of TabularCPD or DiscreteFactor. Got: {type(cpd)}" ) if isinstance(cpd, DiscreteFactor): if len(cpd.variables) > 1: raise ValueError( f"If cpd is an instance of DiscreteFactor," f" it should be defined on a single variable. Got: {cpd}" ) var = cpd.variables[0] if var not in self.model.nodes(): raise ValueError("Evidence provided for variable which is not in the model") elif len(cpd.variables) > 1: raise ValueError( "Virtual evidence should be defined on individual variables." " Maybe you are looking for soft evidence." ) elif self.model.get_cardinality(var) != cpd.get_cardinality([var])[var]: raise ValueError( "The number of states/cardinality for the evidence should" " be same as the number of states/cardinality of the variable in the model" ) def _virtual_evidence(self, virtual_evidence): """ Modifies the model to incorporate virtual evidence. For each virtual evidence variable a binary variable is added as the child of the evidence variable to the model. The state 0 probabilities of the child is the evidence. Parameters ---------- virtual_evidence: list A list of TabularCPD instances specifying the virtual evidence for each of the evidence variables. Returns ------- None References ---------- [1] Mrad, Ali Ben, et al. "Uncertain evidence in Bayesian networks: Presentation and comparison on a simple example." International Conference on Information Processing and Management of Uncertainty in Knowledge-Based Systems. Springer, Berlin, Heidelberg, 2012. """ self._check_virtual_evidence(virtual_evidence) bn = self.model.copy() for cpd in virtual_evidence: var = cpd.variables[0] new_var = "__" + var bn.add_edge(var, new_var) values = compat_fns.get_compute_backend().vstack((cpd.values, 1 - cpd.values)) new_cpd = TabularCPD( variable=new_var, variable_card=2, values=values, evidence=[var], evidence_card=[self.model.get_cardinality(var)], state_names={new_var: [0, 1], var: cpd.state_names[var]}, ) bn.add_cpds(new_cpd) self.__init__(bn) @staticmethod def _get_virtual_evidence_var_list(virtual_evidence): """ Returns the list of variables that have a virtual evidence. Parameters ---------- virtual_evidence: list A list of TabularCPD instances specifying the virtual evidence for each of the evidence variables. """ return [cpd.variables[0] for cpd in virtual_evidence]