Source code for pgmpy.inference.ExactInference

#!/usr/bin/env python3
import copy
import itertools
from functools import reduce

import networkx as nx
import numpy as np
from opt_einsum import contract
from tqdm.auto import tqdm

from pgmpy import config
from pgmpy.factors import factor_product
from pgmpy.factors.discrete import DiscreteFactor
from pgmpy.inference import Inference
from pgmpy.inference.EliminationOrder import (
    MinFill,
    MinNeighbors,
    MinWeight,
    WeightedMinFill,
)
from pgmpy.models import (
    BayesianNetwork,
    DynamicBayesianNetwork,
    FactorGraph,
    JunctionTree,
)
from pgmpy.utils import compat_fns


[docs] class VariableElimination(Inference): def _get_working_factors(self, evidence): """ Uses the evidence given to the query methods to modify the factors before running the variable elimination algorithm. Parameters ---------- evidence: dict Dict of the form {variable: state} Returns ------- dict: Modified working factors. """ working_factors = { node: {(factor, None) for factor in self.factors[node]} for node in self.factors } # Dealing with evidence. Reducing factors over it before VE is run. if evidence: for evidence_var in evidence: for factor, origin in working_factors[evidence_var]: factor_reduced = factor.reduce( [(evidence_var, evidence[evidence_var])], inplace=False ) for var in factor_reduced.scope(): working_factors[var].remove((factor, origin)) working_factors[var].add((factor_reduced, evidence_var)) del working_factors[evidence_var] return working_factors def _get_elimination_order( self, variables, evidence, elimination_order, show_progress=True ): """ Deals with all elimination order parameters given to _variable_elimination method and returns a list of variables that are to be eliminated Parameters ---------- elimination_order: str or list Returns ------- list: A list of variables names in the order they need to be eliminated. """ to_eliminate = ( set(self.variables) - set(variables) - set(evidence.keys() if evidence else []) ) # Step 1: If elimination_order is a list, verify it's correct and return. # Step 1.1: Check that not of the `variables` and `evidence` is in the elimination_order. if hasattr(elimination_order, "__iter__") and ( not isinstance(elimination_order, str) ): if any( var in elimination_order for var in set(variables).union( set(evidence.keys() if evidence else []) ) ): raise ValueError( "Elimination order contains variables which are in" " variables or evidence args" ) # Step 1.2: Check if elimination_order has variables which are not in the model. elif any(var not in self.model.nodes() for var in elimination_order): elimination_order = list( filter(lambda t: t in self.model.nodes(), elimination_order) ) # Step 1.3: Check if the elimination_order has all the variables that need to be eliminated. elif to_eliminate != set(elimination_order): raise ValueError( f"Elimination order doesn't contain all the variables" f"which need to be eliminated. The variables which need to" f"be eliminated are {to_eliminate}" ) return elimination_order # Step 2: If elimination order is None or a Markov model, return a random order. elif (elimination_order is None) or ( not isinstance(self.model, BayesianNetwork) ): return to_eliminate # Step 3: If elimination order is a str, compute the order using the specified heuristic. elif isinstance(elimination_order, str) and isinstance( self.model, BayesianNetwork ): heuristic_dict = { "weightedminfill": WeightedMinFill, "minneighbors": MinNeighbors, "minweight": MinWeight, "minfill": MinFill, } elimination_order = heuristic_dict[elimination_order.lower()]( self.model ).get_elimination_order(nodes=to_eliminate, show_progress=show_progress) return elimination_order def _variable_elimination( self, variables, operation, evidence=None, elimination_order="MinFill", joint=True, show_progress=True, ): """ Implementation of a generalized variable elimination. Parameters ---------- variables: list, array-like variables that are not to be eliminated. operation: str ('marginalize' | 'maximize') The operation to do for eliminating the variable. evidence: dict a dict key, value pair as {var: state_of_var_observed} None if no evidence elimination_order: str or list (array-like) If str: Heuristic to use to find the elimination order. If array-like: The elimination order to use. If None: A random elimination order is used. """ # Step 1: Deal with the input arguments. if isinstance(variables, str): raise TypeError("variables must be a list of strings") if isinstance(evidence, str): raise TypeError("evidence must be a list of strings") # Dealing with the case when variables are not provided. if not variables: all_factors = [] for factor_li in self.factors.values(): all_factors.extend(factor_li) if joint: return factor_product(*set(all_factors)) else: return set(all_factors) # Step 2: Prepare data structures to run the algorithm. eliminated_variables = set() # Get working factors and elimination order working_factors = self._get_working_factors(evidence) elimination_order = self._get_elimination_order( variables, evidence, elimination_order, show_progress=show_progress ) # Step 3: Run variable elimination if show_progress and config.SHOW_PROGRESS: pbar = tqdm(elimination_order) else: pbar = elimination_order for var in pbar: if show_progress and config.SHOW_PROGRESS: pbar.set_description(f"Eliminating: {var}") # Removing all the factors containing the variables which are # eliminated (as all the factors should be considered only once) factors = [ factor for factor, _ in working_factors[var] if not set(factor.variables).intersection(eliminated_variables) ] phi = factor_product(*factors) phi = getattr(phi, operation)([var], inplace=False) del working_factors[var] for variable in phi.variables: working_factors[variable].add((phi, var)) eliminated_variables.add(var) # Step 4: Prepare variables to be returned. final_distribution = set() for node in working_factors: for factor, origin in working_factors[node]: if not set(factor.variables).intersection(eliminated_variables): final_distribution.add((factor, origin)) final_distribution = [factor for factor, _ in final_distribution] if joint: if isinstance(self.model, BayesianNetwork): return factor_product(*final_distribution).normalize(inplace=False) else: return factor_product(*final_distribution) else: query_var_factor = {} if isinstance(self.model, BayesianNetwork): for query_var in variables: phi = factor_product(*final_distribution) query_var_factor[query_var] = phi.marginalize( list(set(variables) - set([query_var])), inplace=False ).normalize(inplace=False) else: for query_var in variables: phi = factor_product(*final_distribution) query_var_factor[query_var] = phi.marginalize( list(set(variables) - set([query_var])), inplace=False ) return query_var_factor
[docs] def query( self, variables, evidence=None, virtual_evidence=None, elimination_order="greedy", joint=True, show_progress=True, ): """ Parameters ---------- variables: list list of variables for which you want to compute the probability evidence: dict a dict key, value pair as {var: state_of_var_observed} None if no evidence virtual_evidence: list (default:None) A list of pgmpy.factors.discrete.TabularCPD representing the virtual evidences. elimination_order: str or list (default='greedy') Order in which to eliminate the variables in the algorithm. If list is provided, should contain all variables in the model except the ones in `variables`. str options are: `greedy`, `WeightedMinFill`, `MinNeighbors`, `MinWeight`, `MinFill`. Please refer https://pgmpy.org/exact_infer/ve.html#module-pgmpy.inference.EliminationOrder for details. joint: boolean (default: True) If True, returns a Joint Distribution over `variables`. If False, returns a dict of distributions over each of the `variables`. show_progress: boolean If True, shows a progress bar. Examples -------- >>> from pgmpy.inference import VariableElimination >>> from pgmpy.models import BayesianNetwork >>> import numpy as np >>> import pandas as pd >>> values = pd.DataFrame(np.random.randint(low=0, high=2, size=(1000, 5)), ... columns=['A', 'B', 'C', 'D', 'E']) >>> model = BayesianNetwork([('A', 'B'), ('C', 'B'), ('C', 'D'), ('B', 'E')]) >>> model.fit(values) >>> inference = VariableElimination(model) >>> phi_query = inference.query(['A', 'B']) """ evidence = evidence if evidence is not None else dict() # Step 1: Parameter Checks common_vars = set(evidence if evidence is not None else []).intersection( set(variables) ) if common_vars: raise ValueError( f"Can't have the same variables in both `variables` and `evidence`. Found in both: {common_vars}" ) # Step 2: If virtual_evidence is provided, modify the network. if isinstance(self.model, BayesianNetwork) and (virtual_evidence is not None): self._virtual_evidence(virtual_evidence) virt_evidence = {"__" + cpd.variables[0]: 0 for cpd in virtual_evidence} return self.query( variables=variables, evidence={**evidence, **virt_evidence}, virtual_evidence=None, elimination_order=elimination_order, joint=joint, show_progress=show_progress, ) # Step 3: Prune the network based on variables and evidence. if isinstance(self.model, BayesianNetwork): model_reduced, evidence = self._prune_bayesian_model(variables, evidence) factors = model_reduced.cpds else: model_reduced = self.model factors = self.model.factors # Step 4: If elimination_order is greedy, do a tensor contraction approach # else do the classic Variable Elimination. if elimination_order == "greedy": # Step 5.1: Compute the values array for factors after reducing them to provided # evidence. evidence_vars = set(evidence) reduce_indexes = [] reshape_indexes = [] for phi in factors: indexes_to_reduce = [ phi.variables.index(var) for var in set(phi.variables).intersection(evidence_vars) ] indexer = [slice(None)] * len(phi.variables) for index in indexes_to_reduce: indexer[index] = phi.get_state_no( phi.variables[index], evidence[phi.variables[index]] ) reduce_indexes.append(tuple(indexer)) # Step 5.2: Prepare values and index arrays to do use in einsum if isinstance(self.model, JunctionTree): var_int_map = { var: i for i, var in enumerate( set(itertools.chain(*model_reduced.nodes())) ) } else: var_int_map = {var: i for i, var in enumerate(model_reduced.nodes())} evidence_var_set = set(evidence.keys()) einsum_expr = [] if isinstance(self.model, BayesianNetwork): for index, phi in enumerate(factors): if len(set(phi.variables) - evidence_var_set) > 0: # if phi.variable not in evidence_var_set: einsum_expr.append((phi.values[reduce_indexes[index]])) einsum_expr.append( [ var_int_map[var] for var in phi.variables if var not in evidence.keys() ] ) else: for index, phi in enumerate(factors): einsum_expr.append((phi.values[reduce_indexes[index]])) einsum_expr.append( [ var_int_map[var] for var in phi.variables if var not in evidence.keys() ] ) result_values = contract( *einsum_expr, [var_int_map[var] for var in variables], optimize="greedy" ) # Step 5.3: Prepare return values. result = DiscreteFactor( variables, result_values.shape, result_values, state_names={var: model_reduced.states[var] for var in variables}, ) if joint: if isinstance( self.model, (BayesianNetwork, JunctionTree, DynamicBayesianNetwork) ): return result.normalize(inplace=False) else: return result else: result_dict = {} all_vars = set(variables) if isinstance( self.model, (BayesianNetwork, JunctionTree, DynamicBayesianNetwork) ): for var in variables: result_dict[var] = result.marginalize( all_vars - {var}, inplace=False ).normalize(inplace=False) else: for var in variables: result_dict[var] = result.marginalize( all_vars - {var}, inplace=False ) return result_dict else: # Step 5.1: Initialize data structures for the reduced bn. reduced_ve = VariableElimination(model_reduced) reduced_ve._initialize_structures() # Step 5.2: Do the actual variable elimination result = reduced_ve._variable_elimination( variables=variables, operation="marginalize", evidence=evidence, elimination_order=elimination_order, joint=joint, show_progress=show_progress, ) return result
[docs] def max_marginal( self, variables=None, evidence=None, elimination_order="MinFill", show_progress=True, ): """ Computes the max-marginal over the variables given the evidence. Parameters ---------- variables: list list of variables over which we want to compute the max-marginal. evidence: dict a dict key, value pair as {var: state_of_var_observed} None if no evidence elimination_order: list order of variable eliminations (if nothing is provided) order is computed automatically Examples -------- >>> import numpy as np >>> import pandas as pd >>> from pgmpy.models import BayesianNetwork >>> from pgmpy.inference import VariableElimination >>> values = pd.DataFrame(np.random.randint(low=0, high=2, size=(1000, 5)), ... columns=['A', 'B', 'C', 'D', 'E']) >>> model = BayesianNetwork([('A', 'B'), ('C', 'B'), ('C', 'D'), ('B', 'E')]) >>> model.fit(values) >>> inference = VariableElimination(model) >>> phi_query = inference.max_marginal(['A', 'B']) """ if not variables: variables = [] common_vars = set(evidence if evidence is not None else []).intersection( set(variables if variables is not None else []) ) if common_vars: raise ValueError( f"Can't have the same variables in both `variables` and `evidence`. Found in both: {common_vars}" ) if isinstance(self.model, BayesianNetwork): model_reduced, evidence = self._prune_bayesian_model(variables, evidence) else: model_reduced = self.model reduced_ve = VariableElimination(model_reduced) reduced_ve._initialize_structures() final_distribution = reduced_ve._variable_elimination( variables=variables, operation="maximize", evidence=evidence, elimination_order=elimination_order, show_progress=show_progress, ) return compat_fns.max(final_distribution.values)
[docs] def map_query( self, variables=None, evidence=None, virtual_evidence=None, elimination_order="MinFill", show_progress=True, ): """ Computes the MAP Query over the variables given the evidence. Returns the highest probable state in the joint distribution of `variables`. Parameters ---------- variables: list list of variables over which we want to compute the max-marginal. evidence: dict a dict key, value pair as {var: state_of_var_observed} None if no evidence virtual_evidence: list (default:None) A list of pgmpy.factors.discrete.TabularCPD representing the virtual evidences. elimination_order: list order of variable eliminations (if nothing is provided) order is computed automatically show_progress: boolean If True, shows a progress bar. Examples -------- >>> from pgmpy.inference import VariableElimination >>> from pgmpy.models import BayesianNetwork >>> import numpy as np >>> import pandas as pd >>> values = pd.DataFrame(np.random.randint(low=0, high=2, size=(1000, 5)), ... columns=['A', 'B', 'C', 'D', 'E']) >>> model = BayesianNetwork([('A', 'B'), ('C', 'B'), ('C', 'D'), ('B', 'E')]) >>> model.fit(values) >>> inference = VariableElimination(model) >>> phi_query = inference.map_query(['A', 'B']) """ variables = [] if variables is None else variables evidence = evidence if evidence is not None else dict() common_vars = set(evidence if evidence is not None else []).intersection( variables ) if common_vars: raise ValueError( f"Can't have the same variables in both `variables` and `evidence`. Found in both: {common_vars}" ) if isinstance(self.model, BayesianNetwork) and (virtual_evidence is not None): self._virtual_evidence(virtual_evidence) virt_evidence = {"__" + cpd.variables[0]: 0 for cpd in virtual_evidence} return self.map_query( variables=variables, evidence={**evidence, **virt_evidence}, virtual_evidence=None, elimination_order=elimination_order, show_progress=show_progress, ) if isinstance(self.model, BayesianNetwork): model_reduced, evidence = self._prune_bayesian_model(variables, evidence) else: model_reduced = self.model reduced_ve = VariableElimination(model_reduced) reduced_ve._initialize_structures() final_distribution = reduced_ve._variable_elimination( variables=variables, operation="marginalize", evidence=evidence, elimination_order=elimination_order, joint=True, show_progress=show_progress, ) argmax = compat_fns.argmax(final_distribution.values) assignment = final_distribution.assignment([argmax])[0] map_query_results = {} for var_assignment in assignment: var, value = var_assignment map_query_results[var] = value return map_query_results
[docs] def induced_graph(self, elimination_order): """ Returns the induced graph formed by running Variable Elimination on the network. Parameters ---------- elimination_order: list, array like List of variables in the order in which they are to be eliminated. Examples -------- >>> import numpy as np >>> import pandas as pd >>> from pgmpy.models import BayesianNetwork >>> from pgmpy.inference import VariableElimination >>> values = pd.DataFrame(np.random.randint(low=0, high=2, size=(1000, 5)), ... columns=['A', 'B', 'C', 'D', 'E']) >>> model = BayesianNetwork([('A', 'B'), ('C', 'B'), ('C', 'D'), ('B', 'E')]) >>> model.fit(values) >>> inference = VariableElimination(model) >>> inference.induced_graph(['C', 'D', 'A', 'B', 'E']) """ self._initialize_structures() # If the elimination order does not contain the same variables as the model if set(elimination_order) != set(self.variables): raise ValueError( "Set of variables in elimination order" " different from variables in model" ) eliminated_variables = set() working_factors = { node: [factor.scope() for factor in self.factors[node]] for node in self.factors } # The set of cliques that should be in the induced graph cliques = set() for factors in working_factors.values(): for factor in factors: cliques.add(tuple(factor)) # Removing all the factors containing the variables which are # eliminated (as all the factors should be considered only once) for var in elimination_order: factors = [ factor for factor in working_factors[var] if not set(factor).intersection(eliminated_variables) ] phi = set(itertools.chain(*factors)).difference({var}) cliques.add(tuple(phi)) del working_factors[var] for variable in phi: working_factors[variable].append(list(phi)) eliminated_variables.add(var) edges_comb = [ itertools.combinations(c, 2) for c in filter(lambda x: len(x) > 1, cliques) ] return nx.Graph(itertools.chain(*edges_comb))
[docs] def induced_width(self, elimination_order): """ Returns the width (integer) of the induced graph formed by running Variable Elimination on the network. The width is the defined as the number of nodes in the largest clique in the graph minus 1. Parameters ---------- elimination_order: list, array like List of variables in the order in which they are to be eliminated. Examples -------- >>> import numpy as np >>> import pandas as pd >>> from pgmpy.models import BayesianNetwork >>> from pgmpy.inference import VariableElimination >>> values = pd.DataFrame(np.random.randint(low=0, high=2, size=(1000, 5)), ... columns=['A', 'B', 'C', 'D', 'E']) >>> model = BayesianNetwork([('A', 'B'), ('C', 'B'), ('C', 'D'), ('B', 'E')]) >>> model.fit(values) >>> inference = VariableElimination(model) >>> inference.induced_width(['C', 'D', 'A', 'B', 'E']) 3 """ induced_graph = self.induced_graph(elimination_order) return max((len(clique) for clique in nx.find_cliques(induced_graph))) - 1
[docs] class BeliefPropagation(Inference): """ Class for performing inference using Belief Propagation method. Creates a Junction Tree or Clique Tree (JunctionTree class) for the input probabilistic graphical model and performs calibration of the junction tree so formed using belief propagation. Parameters ---------- model: BayesianNetwork, MarkovNetwork, FactorGraph, JunctionTree model for which inference is to performed """ def __init__(self, model): super(BeliefPropagation, self).__init__(model) if not isinstance(model, JunctionTree): self.junction_tree = model.to_junction_tree() else: self.junction_tree = copy.deepcopy(model) self.clique_beliefs = {} self.sepset_beliefs = {}
[docs] def get_cliques(self): """ Returns cliques used for belief propagation. """ return self.junction_tree.nodes()
[docs] def get_clique_beliefs(self): """ Returns clique beliefs. Should be called after the clique tree (or junction tree) is calibrated. """ return self.clique_beliefs
[docs] def get_sepset_beliefs(self): """ Returns sepset beliefs. Should be called after clique tree (or junction tree) is calibrated. """ return self.sepset_beliefs
def _update_beliefs(self, sending_clique, receiving_clique, operation): """ This is belief-update method. Parameters ---------- sending_clique: node (as the operation is on junction tree, node should be a tuple) Node sending the message receiving_clique: node (as the operation is on junction tree, node should be a tuple) Node receiving the message operation: str ('marginalize' | 'maximize') The operation to do for passing messages between nodes. Takes belief of one clique and uses it to update the belief of the neighboring ones. """ sepset = frozenset(sending_clique).intersection(frozenset(receiving_clique)) sepset_key = frozenset((sending_clique, receiving_clique)) # \sigma_{i \rightarrow j} = \sum_{C_i - S_{i, j}} \beta_i # marginalize the clique over the sepset sigma = getattr(self.clique_beliefs[sending_clique], operation)( list(frozenset(sending_clique) - sepset), inplace=False ) # \beta_j = \beta_j * \frac{\sigma_{i \rightarrow j}}{\mu_{i, j}} self.clique_beliefs[receiving_clique] *= ( sigma / self.sepset_beliefs[sepset_key] if self.sepset_beliefs[sepset_key] else sigma ) # \mu_{i, j} = \sigma_{i \rightarrow j} self.sepset_beliefs[sepset_key] = sigma def _is_converged(self, operation): """ Checks whether the calibration has converged or not. At convergence the sepset belief would be precisely the sepset marginal. Parameters ---------- operation: str ('marginalize' | 'maximize') The operation to do for passing messages between nodes. if operation == marginalize, it checks whether the junction tree is calibrated or not else if operation == maximize, it checks whether the junction tree is max calibrated or not Formally, at convergence or at calibration this condition would be satisfied for .. math:: \sum_{C_i - S_{i, j}} \beta_i = \sum_{C_j - S_{i, j}} \beta_j = \mu_{i, j} and at max calibration this condition would be satisfied .. math:: \max_{C_i - S_{i, j}} \beta_i = \max_{C_j - S_{i, j}} \beta_j = \mu_{i, j} """ # If no clique belief, then the clique tree is not calibrated if not self.clique_beliefs: return False for edge in self.junction_tree.edges(): sepset = frozenset(edge[0]).intersection(frozenset(edge[1])) sepset_key = frozenset(edge) if ( edge[0] not in self.clique_beliefs or edge[1] not in self.clique_beliefs or sepset_key not in self.sepset_beliefs ): return False marginal_1 = getattr(self.clique_beliefs[edge[0]], operation)( list(frozenset(edge[0]) - sepset), inplace=False ) marginal_2 = getattr(self.clique_beliefs[edge[1]], operation)( list(frozenset(edge[1]) - sepset), inplace=False ) if ( marginal_1 != marginal_2 or marginal_1 != self.sepset_beliefs[sepset_key] ): return False return True def _calibrate_junction_tree(self, operation): """ Generalized calibration of junction tree or clique using belief propagation. This method can be used for both calibrating as well as max-calibrating. Uses Lauritzen-Spiegelhalter algorithm or belief-update message passing. Parameters ---------- operation: str ('marginalize' | 'maximize') The operation to do for passing messages between nodes. Reference --------- Algorithm 10.3 Calibration using belief propagation in clique tree Probabilistic Graphical Models: Principles and Techniques Daphne Koller and Nir Friedman. """ # Initialize clique beliefs as well as sepset beliefs self.clique_beliefs = { clique: self.junction_tree.get_factors(clique) for clique in self.junction_tree.nodes() } self.sepset_beliefs = { frozenset(edge): None for edge in self.junction_tree.edges() } for clique in self.junction_tree.nodes(): if not self._is_converged(operation=operation): neighbors = self.junction_tree.neighbors(clique) # update root's belief using neighbor clique's beliefs # upward pass for neighbor_clique in neighbors: self._update_beliefs(neighbor_clique, clique, operation=operation) bfs_edges = nx.algorithms.breadth_first_search.bfs_edges( self.junction_tree, clique ) # update the beliefs of all the nodes starting from the root to leaves using root's belief # downward pass for edge in bfs_edges: self._update_beliefs(edge[0], edge[1], operation=operation) else: break
[docs] def calibrate(self): """ Calibration using belief propagation in junction tree or clique tree. Examples -------- >>> from pgmpy.models import BayesianNetwork >>> from pgmpy.factors.discrete import TabularCPD >>> from pgmpy.inference import BeliefPropagation >>> G = BayesianNetwork([('diff', 'grade'), ('intel', 'grade'), ... ('intel', 'SAT'), ('grade', 'letter')]) >>> diff_cpd = TabularCPD('diff', 2, [[0.2], [0.8]]) >>> intel_cpd = TabularCPD('intel', 3, [[0.5], [0.3], [0.2]]) >>> grade_cpd = TabularCPD('grade', 3, ... [[0.1, 0.1, 0.1, 0.1, 0.1, 0.1], ... [0.1, 0.1, 0.1, 0.1, 0.1, 0.1], ... [0.8, 0.8, 0.8, 0.8, 0.8, 0.8]], ... evidence=['diff', 'intel'], ... evidence_card=[2, 3]) >>> sat_cpd = TabularCPD('SAT', 2, ... [[0.1, 0.2, 0.7], ... [0.9, 0.8, 0.3]], ... evidence=['intel'], evidence_card=[3]) >>> letter_cpd = TabularCPD('letter', 2, ... [[0.1, 0.4, 0.8], ... [0.9, 0.6, 0.2]], ... evidence=['grade'], evidence_card=[3]) >>> G.add_cpds(diff_cpd, intel_cpd, grade_cpd, sat_cpd, letter_cpd) >>> bp = BeliefPropagation(G) >>> bp.calibrate() """ self._calibrate_junction_tree(operation="marginalize")
[docs] def max_calibrate(self): """ Max-calibration of the junction tree using belief propagation. Examples -------- >>> from pgmpy.models import BayesianNetwork >>> from pgmpy.factors.discrete import TabularCPD >>> from pgmpy.inference import BeliefPropagation >>> G = BayesianNetwork([('diff', 'grade'), ('intel', 'grade'), ... ('intel', 'SAT'), ('grade', 'letter')]) >>> diff_cpd = TabularCPD('diff', 2, [[0.2], [0.8]]) >>> intel_cpd = TabularCPD('intel', 3, [[0.5], [0.3], [0.2]]) >>> grade_cpd = TabularCPD('grade', 3, ... [[0.1, 0.1, 0.1, 0.1, 0.1, 0.1], ... [0.1, 0.1, 0.1, 0.1, 0.1, 0.1], ... [0.8, 0.8, 0.8, 0.8, 0.8, 0.8]], ... evidence=['diff', 'intel'], ... evidence_card=[2, 3]) >>> sat_cpd = TabularCPD('SAT', 2, ... [[0.1, 0.2, 0.7], ... [0.9, 0.8, 0.3]], ... evidence=['intel'], evidence_card=[3]) >>> letter_cpd = TabularCPD('letter', 2, ... [[0.1, 0.4, 0.8], ... [0.9, 0.6, 0.2]], ... evidence=['grade'], evidence_card=[3]) >>> G.add_cpds(diff_cpd, intel_cpd, grade_cpd, sat_cpd, letter_cpd) >>> bp = BeliefPropagation(G) >>> bp.max_calibrate() """ self._calibrate_junction_tree(operation="maximize")
def _query( self, variables, operation, evidence=None, joint=True, show_progress=True ): """ This is a generalized query method that can be used for both query and map query. Parameters ---------- variables: list list of variables for which you want to compute the probability operation: str ('marginalize' | 'maximize') The operation to do for passing messages between nodes. evidence: dict a dict key, value pair as {var: state_of_var_observed} None if no evidence Examples -------- >>> from pgmpy.inference import BeliefPropagation >>> from pgmpy.models import BayesianNetwork >>> import numpy as np >>> import pandas as pd >>> values = pd.DataFrame(np.random.randint(low=0, high=2, size=(1000, 5)), ... columns=['A', 'B', 'C', 'D', 'E']) >>> model = BayesianNetwork([('A', 'B'), ('C', 'B'), ('C', 'D'), ('B', 'E')]) >>> model.fit(values) >>> inference = BeliefPropagation(model) >>> phi_query = inference.query(['A', 'B']) References ---------- Algorithm 10.4 Out-of-clique inference in clique tree Probabilistic Graphical Models: Principles and Techniques Daphne Koller and Nir Friedman. """ is_calibrated = self._is_converged(operation=operation) # Calibrate the junction tree if not calibrated if not is_calibrated: self.calibrate() if not isinstance(variables, (list, tuple, set)): query_variables = [variables] else: query_variables = list(variables) query_variables.extend(evidence.keys() if evidence else []) # Find a tree T' such that query_variables are a subset of scope(T') nodes_with_query_variables = set() for var in query_variables: nodes_with_query_variables.update( filter(lambda x: var in x, self.junction_tree.nodes()) ) subtree_nodes = nodes_with_query_variables # Conversion of set to tuple just for indexing nodes_with_query_variables = tuple(nodes_with_query_variables) # As junction tree is a tree, that means that there would be only path between any two nodes in the tree # thus we can just take the path between any two nodes; no matter there order is for i in range(len(nodes_with_query_variables) - 1): subtree_nodes.update( nx.shortest_path( self.junction_tree, nodes_with_query_variables[i], nodes_with_query_variables[i + 1], ) ) subtree_undirected_graph = self.junction_tree.subgraph(subtree_nodes) # Converting subtree into a junction tree if len(subtree_nodes) == 1: subtree = JunctionTree() subtree.add_node(subtree_nodes.pop()) else: subtree = JunctionTree(subtree_undirected_graph.edges()) # Selecting a node is root node. Root node would be having only one neighbor if len(subtree.nodes()) == 1: root_node = list(subtree.nodes())[0] else: root_node = tuple( filter(lambda x: len(list(subtree.neighbors(x))) == 1, subtree.nodes()) )[0] clique_potential_list = [self.clique_beliefs[root_node]] # For other nodes in the subtree compute the clique potentials as follows # As all the nodes are nothing but tuples so simple set(root_node) won't work at it would update the set with # all the elements of the tuple; instead use set([root_node]) as it would include only the tuple not the # internal elements within it. parent_nodes = set([root_node]) nodes_traversed = set() while parent_nodes: parent_node = parent_nodes.pop() for child_node in set(subtree.neighbors(parent_node)) - nodes_traversed: clique_potential_list.append( self.clique_beliefs[child_node] / self.sepset_beliefs[frozenset([parent_node, child_node])] ) parent_nodes.update([child_node]) nodes_traversed.update([parent_node]) # Add factors to the corresponding junction tree subtree.add_factors(*clique_potential_list) # Sum product variable elimination on the subtree variable_elimination = VariableElimination(subtree) if operation == "marginalize": return variable_elimination.query( variables=variables, evidence=evidence, joint=joint, show_progress=show_progress, ) elif operation == "maximize": return variable_elimination.map_query( variables=variables, evidence=evidence, show_progress=show_progress )
[docs] def query( self, variables, evidence=None, virtual_evidence=None, joint=True, show_progress=True, ): """ Query method using belief propagation. Parameters ---------- variables: list list of variables for which you want to compute the probability evidence: dict a dict key, value pair as {var: state_of_var_observed} None if no evidence virtual_evidence: list (default:None) A list of pgmpy.factors.discrete.TabularCPD representing the virtual evidences. joint: boolean If True, returns a Joint Distribution over `variables`. If False, returns a dict of distributions over each of the `variables`. show_progress: boolean If True shows a progress bar. Examples -------- >>> from pgmpy.factors.discrete import TabularCPD >>> from pgmpy.models import BayesianNetwork >>> from pgmpy.inference import BeliefPropagation >>> bayesian_model = BayesianNetwork([('A', 'J'), ('R', 'J'), ('J', 'Q'), ... ('J', 'L'), ('G', 'L')]) >>> cpd_a = TabularCPD('A', 2, [[0.2], [0.8]]) >>> cpd_r = TabularCPD('R', 2, [[0.4], [0.6]]) >>> cpd_j = TabularCPD('J', 2, ... [[0.9, 0.6, 0.7, 0.1], ... [0.1, 0.4, 0.3, 0.9]], ... ['R', 'A'], [2, 2]) >>> cpd_q = TabularCPD('Q', 2, ... [[0.9, 0.2], ... [0.1, 0.8]], ... ['J'], [2]) >>> cpd_l = TabularCPD('L', 2, ... [[0.9, 0.45, 0.8, 0.1], ... [0.1, 0.55, 0.2, 0.9]], ... ['G', 'J'], [2, 2]) >>> cpd_g = TabularCPD('G', 2, [[0.6], [0.4]]) >>> bayesian_model.add_cpds(cpd_a, cpd_r, cpd_j, cpd_q, cpd_l, cpd_g) >>> belief_propagation = BeliefPropagation(bayesian_model) >>> belief_propagation.query(variables=['J', 'Q'], ... evidence={'A': 0, 'R': 0, 'G': 0, 'L': 1}) """ evidence = evidence if evidence is not None else dict() orig_model = self.model.copy() # Step 1: Parameter Checks common_vars = set(evidence if evidence is not None else []).intersection( set(variables) ) if common_vars: raise ValueError( f"Can't have the same variables in both `variables` and `evidence`. Found in both: {common_vars}" ) # Step 2: If virtual_evidence is provided, modify model and evidence. if isinstance(self.model, BayesianNetwork) and (virtual_evidence is not None): self._virtual_evidence(virtual_evidence) virt_evidence = {"__" + cpd.variables[0]: 0 for cpd in virtual_evidence} return self.query( variables=variables, evidence={**evidence, **virt_evidence}, virtual_evidence=None, joint=joint, show_progress=show_progress, ) # Step 3: Do network pruning. if isinstance(self.model, BayesianNetwork): self.model, evidence = self._prune_bayesian_model(variables, evidence) self._initialize_structures() # Step 4: Run inference. result = self._query( variables=variables, operation="marginalize", evidence=evidence, joint=joint, show_progress=show_progress, ) self.__init__(orig_model) if joint: return result.normalize(inplace=False) else: return result
[docs] def map_query( self, variables=None, evidence=None, virtual_evidence=None, show_progress=True ): """ MAP Query method using belief propagation. Returns the highest probable state in the joint distributon of `variables`. Parameters ---------- variables: list list of variables for which you want to compute the probability virtual_evidence: list (default:None) A list of pgmpy.factors.discrete.TabularCPD representing the virtual evidences. evidence: dict a dict key, value pair as {var: state_of_var_observed} None if no evidence show_progress: boolean If True, shows a progress bar. Examples -------- >>> from pgmpy.factors.discrete import TabularCPD >>> from pgmpy.models import BayesianNetwork >>> from pgmpy.inference import BeliefPropagation >>> bayesian_model = BayesianNetwork([('A', 'J'), ('R', 'J'), ('J', 'Q'), ... ('J', 'L'), ('G', 'L')]) >>> cpd_a = TabularCPD('A', 2, [[0.2], [0.8]]) >>> cpd_r = TabularCPD('R', 2, [[0.4], [0.6]]) >>> cpd_j = TabularCPD('J', 2, ... [[0.9, 0.6, 0.7, 0.1], ... [0.1, 0.4, 0.3, 0.9]], ... ['R', 'A'], [2, 2]) >>> cpd_q = TabularCPD('Q', 2, ... [[0.9, 0.2], ... [0.1, 0.8]], ... ['J'], [2]) >>> cpd_l = TabularCPD('L', 2, ... [[0.9, 0.45, 0.8, 0.1], ... [0.1, 0.55, 0.2, 0.9]], ... ['G', 'J'], [2, 2]) >>> cpd_g = TabularCPD('G', 2, [[0.6], [0.4]]) >>> bayesian_model.add_cpds(cpd_a, cpd_r, cpd_j, cpd_q, cpd_l, cpd_g) >>> belief_propagation = BeliefPropagation(bayesian_model) >>> belief_propagation.map_query(variables=['J', 'Q'], ... evidence={'A': 0, 'R': 0, 'G': 0, 'L': 1}) """ variables = [] if variables is None else variables evidence = evidence if evidence is not None else dict() common_vars = set(evidence if evidence is not None else []).intersection( variables ) if common_vars: raise ValueError( f"Can't have the same variables in both `variables` and `evidence`. Found in both: {common_vars}" ) # TODO:Check the note in docstring. Change that behavior to return the joint MAP if not variables: variables = list(self.model.nodes()) # Make a copy of the original model and then replace self.model with it later. orig_model = self.model.copy() if isinstance(self.model, BayesianNetwork) and (virtual_evidence is not None): self._virtual_evidence(virtual_evidence) virt_evidence = {"__" + cpd.variables[0]: 0 for cpd in virtual_evidence} return self.map_query( variables=variables, evidence={**evidence, **virt_evidence}, virtual_evidence=None, show_progress=show_progress, ) if isinstance(self.model, BayesianNetwork): self.model, evidence = self._prune_bayesian_model(variables, evidence) self._initialize_structures() final_distribution = self._query( variables=variables, operation="marginalize", evidence=evidence, joint=True, show_progress=show_progress, ) self.__init__(orig_model) # To handle the case when no argument is passed then # _variable_elimination returns a dict. argmax = compat_fns.argmax(final_distribution.values) assignment = final_distribution.assignment([argmax])[0] map_query_results = {} for var_assignment in assignment: var, value = var_assignment map_query_results[var] = value return map_query_results
[docs] class BeliefPropagationWithMessagePassing(Inference): """ Class for performing efficient inference using Belief Propagation method on factor graphs with no loops. The message-passing algorithm recursively parses the factor graph to propagate the model's beliefs to infer the posterior distribution of the queried variable. The recursion stops when reaching an observed variable or a unobserved root/leaf variable. It does not work for loopy graphs. Parameters ---------- model: FactorGraph Model on which to run the inference. References ---------- Algorithm 2.1 in https://www.mbmlbook.com/LearningSkills_Testing_out_the_model.html by J Winn (Microsoft Research). """ def __init__(self, model: FactorGraph, check_model=True): assert isinstance( model, FactorGraph ), "Model must be an instance of FactorGraph" if check_model: model.check_model() self.model = model class _RecursiveMessageSchedulingQuery(object): """ Private class used in `BeliefPropagationWithMessagePassing.query()` to efficiently manage the message scheduling across the different queried variables, in a recursive way. Parameters ---------- Same as in the query method. """ def __init__( self, belief_propagation, variables, evidence, virtual_evidence, get_messages, ): self.bp = belief_propagation self.variables = variables self.evidence = evidence self.virtual_evidence = virtual_evidence self.all_messages = {} if get_messages else None def run(self): agg_res = {} for variable in self.variables: res = self.schedule_variable_node_messages( variable, from_factor=None, ) agg_res[variable] = DiscreteFactor([variable], [len(res)], res) if self.all_messages is None: return agg_res else: return agg_res, self.all_messages def schedule_variable_node_messages( self, variable, from_factor, ): """ Returns the message sent by the variable to the factor requesting it. For that, the variable requests the messages coming from its neighbouring factors, except the one making the request. Parameters ---------- variable: str The variable node from which to compute the outgoing message from_factor: pgmpy.factors.discrete.DiscreteFactor or None. The factor requesting the message, as part of the recursion. None for the first time this function is called. """ if self.evidence is not None and variable in self.evidence.keys(): # Is an observed variable return self.bp.model.get_point_mass_message( variable, self.evidence[variable] ) virtual_messages = [] if ( self.virtual_evidence is not None and variable in self.bp._get_virtual_evidence_var_list(self.virtual_evidence) ): virtual_messages = [ cpd.values for cpd in self.virtual_evidence if cpd.variables[0] == variable ] incoming_factors = [ factor for factor in list(self.bp.model.neighbors(variable)) if factor != from_factor ] if len(incoming_factors) == 0: # Is an unobserved leaf variable return self.bp.calc_variable_node_message( variable, [] + virtual_messages ) else: # Else, get the incoming messages from all incoming factors incoming_messages = [] for factor in incoming_factors: incoming_message = self.schedule_factor_node_messages( factor, variable ) if self.all_messages is not None: # Store the message if it's not already stored factor_node_key = f"{factor.variables} -> {variable}" if factor_node_key not in self.all_messages.keys(): self.all_messages[factor_node_key] = incoming_message incoming_messages.append(incoming_message) return self.bp.calc_variable_node_message( variable, incoming_messages + virtual_messages ) def schedule_factor_node_messages(self, factor, from_variable): """ Returns the message sent from the factor to the variable requesting it. For that, the factor requests the messages coming from its neighbouring variables, except the one making the request. Parameters ---------- factor: pgmpy.factors.discrete.DiscreteFactor The factor from which we want to compute the outgoing message. from_variable: str The variable requesting the message, as part of the recursion. """ assert from_variable is not None, "from_var must be specified" incoming_vars = [var for var in factor.variables if var != from_variable] if len(incoming_vars) == 0: # from_var is a root variable. The factor is its prior return self.bp.calc_factor_node_message(factor, [], from_variable) else: # Else, get the incoming messages from all incoming variables incoming_messages = [] for var in incoming_vars: incoming_messages.append( self.schedule_variable_node_messages(var, factor) ) return self.bp.calc_factor_node_message( factor, incoming_messages, from_variable )
[docs] def query( self, variables, evidence=None, virtual_evidence=None, get_messages=False ): """ Computes the posterior distributions for each of the queried variable, given the `evidence`, and the `virtual_evidence`. Optionally also returns the computed messages. Parameters ---------- variables: list List of variables for which you want to compute the posterior. evidence: dict or None (default: None) A dict key, value pair as {var: state_of_var_observed}. None if no evidence. virtual_evidence: list or None (default: None) A list of pgmpy.factors.discrete.TabularCPD representing the virtual evidences. Each virtual evidence becomes a virtual message that gets added to the list of computed messages incoming to the variable node. None if no virtual evidence. Returns ------- If `get_messages` is False, returns a dict of the variables, posterior distributions pairs: {variable: pgmpy.factors.discrete.DiscreteFactor}. If `get_messages` is True, returns: 1. A dict of the variables, posterior distributions pairs: {variable: pgmpy.factors.discrete.DiscreteFactor} 2. A dict of all messages sent from a factor to a node: {"{pgmpy.factors.discrete.DiscreteFactor.variables} -> variable": np.array}. Examples -------- >>> from pgmpy.factors.discrete import DiscreteFactor >>> from pgmpy.models import FactorGraph >>> from pgmpy.inference import BeliefPropagation >>> factor_graph = FactorGraph() >>> factor_graph.add_nodes_from(["A", "B", "C", "D"]) >>> phi1 = DiscreteFactor(["A"], [2], [0.4, 0.6]) >>> phi2 = DiscreteFactor( ... ["B", "A"], [3, 2], [[0.2, 0.05], [0.3, 0.15], [0.5, 0.8]] ... ) >>> phi3 = DiscreteFactor(["C", "B"], [2, 3], [[0.4, 0.5, 0.1], [0.6, 0.5, 0.9]]) >>> phi4 = DiscreteFactor( ... ["D", "B"], [3, 3], [[0.1, 0.1, 0.2], [0.3, 0.2, 0.1], [0.6, 0.7, 0.7]] ... ) >>> factor_graph.add_factors(phi1, phi2, phi3, phi4) >>> factor_graph.add_edges_from( ... [ ... (phi1, "A"), ... ("A", phi2), ... (phi2, "B"), ... ("B", phi3), ... (phi3, "C"), ... ("B", phi4), ... (phi4, "D"), ... ] ... ) >>> belief_propagation = BeliefPropagation(factor_graph) >>> belief_propagation.query(variables=['B', 'C'], ... evidence={'D': 0}, ... virtual_evidence=[TabularCPD(['A'], 2, [[0.3], [0.7]])]) """ common_vars = set(evidence if evidence is not None else []).intersection( set(variables) ) if common_vars: raise ValueError( f"Can't have the same variables in both `variables` and `evidence`. Found in both: {common_vars}" ) # Can't have the same variables in both `evidence` and `virtual_evidence` if evidence is not None and virtual_evidence is not None: self._check_virtual_evidence(virtual_evidence) ve_names = self._get_virtual_evidence_var_list(virtual_evidence) common_vars = set(evidence).intersection(set(ve_names)) if common_vars: raise ValueError( f"Can't have the same variables in both `evidence` and `virtual_evidence`. Found in both: {common_vars}" ) query = self._RecursiveMessageSchedulingQuery( self, variables, evidence, virtual_evidence, get_messages ) return query.run()
[docs] def calc_variable_node_message(self, variable, incoming_messages): """ The outgoing message is the element wise product of all incoming messages If there are no incoming messages, returns a uniform message If there is only one incoming message, returns that message Otherwise, returns the product of all incoming messages Parameters ---------- variable: str the variable node from which to compute the outgoing message incoming_messages: list list of messages coming to this variable node """ if len(incoming_messages) == 0: return self.model.get_uniform_message(variable) elif len(incoming_messages) == 1: return incoming_messages[0] else: outgoing_message = reduce(np.multiply, incoming_messages) return outgoing_message / np.sum(outgoing_message)
[docs] @staticmethod def calc_factor_node_message(factor, incoming_messages, target_var): """ Returns the outgoing message for a factor node, which is the multiplication of the incoming messages with the factor function (CPT). The variables' order in the incoming messages list must match the variable's order in the CPT's dimensions Parameters ---------- factor: str the factor node from which to compute the outgoing message incoming_messages: list list of messages coming to this factor node target_var: str the variable node to which the outgoing message is being sent to """ cpt = factor.values assert ( len(incoming_messages) == cpt.ndim - 1 ), f"Error computing factor node message for {target_var}. The number of incoming messages must equal the card(CPT) - 1" if len(incoming_messages) == 0: return cpt # Ensure that the target var is on the CPT's 0th axis target_var_idx = factor.variables.index(target_var) if target_var_idx != 0: # Move target var to the 0th axis to allow the reduction cpt = np.moveaxis(cpt, target_var_idx, 0) # Invert incoming_messages, so that the first message corresponds to the last # dimension of the CPT incoming_messages = list(reversed(incoming_messages)) # Reduce the CPT with the inverted list of incoming messages outgoing_message = reduce( lambda cpt_reduced, m: np.matmul(cpt_reduced, m), incoming_messages, cpt ) # Normalise return outgoing_message / sum(outgoing_message)