Source code for pgmpy.inference.dbn_inference

from collections import defaultdict
from itertools import chain, combinations, tee

from pgmpy.factors import factor_product
from pgmpy.factors.discrete import DiscreteFactor
from pgmpy.inference import BeliefPropagation, Inference


[docs] class DBNInference(Inference): """ Class for performing inference using Belief Propagation method for the input Dynamic Bayesian Network. For the exact inference implementation, the interface algorithm is used which is adapted from [1]. Parameters ---------- model: Dynamic Bayesian Network Model for which inference is to performed Examples -------- >>> from pgmpy.factors.discrete import TabularCPD >>> from pgmpy.models import DynamicBayesianNetwork as DBN >>> from pgmpy.inference import DBNInference >>> dbnet = DBN() >>> dbnet.add_edges_from([(('Z', 0), ('X', 0)), (('X', 0), ('Y', 0)), ... (('Z', 0), ('Z', 1))]) >>> z_start_cpd = TabularCPD(('Z', 0), 2, [[0.5], [0.5]]) >>> x_i_cpd = TabularCPD(('X', 0), 2, [[0.6, 0.9], ... [0.4, 0.1]], ... evidence=[('Z', 0)], ... evidence_card=[2]) >>> y_i_cpd = TabularCPD(('Y', 0), 2, [[0.2, 0.3], ... [0.8, 0.7]], ... evidence=[('X', 0)], ... evidence_card=[2]) >>> z_trans_cpd = TabularCPD(('Z', 1), 2, [[0.4, 0.7], ... [0.6, 0.3]], ... evidence=[('Z', 0)], ... evidence_card=[2]) >>> dbnet.add_cpds(z_start_cpd, z_trans_cpd, x_i_cpd, y_i_cpd) >>> dbnet.initialize_initial_state() >>> dbn_inf = DBNInference(dbnet) >>> dbn_inf.start_junction_tree.nodes() NodeView(((('X', 0), ('Y', 0)), (('X', 0), ('Z', 0)))) >>> dbn_inf.one_and_half_junction_tree.nodes() NodeView(((('Z', 1), ('Z', 0)), (('Y', 1), ('X', 1)), (('Z', 1), ('X', 1)))) References ---------- [1] Dynamic Bayesian Networks: Representation, Inference and Learning by Kevin Patrick Murphy http://www.cs.ubc.ca/~murphyk/Thesis/thesis.pdf """ def __init__(self, model): super(DBNInference, self).__init__(model) self._initialize_structures() self.interface_nodes_0 = model.get_interface_nodes(time_slice=0) self.interface_nodes_1 = model.get_interface_nodes(time_slice=1) start_markov_model = self.start_bayesian_model.to_markov_model() one_and_half_markov_model = self.one_and_half_model.to_markov_model() combinations_slice_0 = tee(combinations(set(self.interface_nodes_0), 2), 2) combinations_slice_1 = combinations(set(self.interface_nodes_1), 2) start_markov_model.add_edges_from(combinations_slice_0[0]) one_and_half_markov_model.add_edges_from( chain(combinations_slice_0[1], combinations_slice_1) ) self.one_and_half_junction_tree = one_and_half_markov_model.to_junction_tree() self.start_junction_tree = start_markov_model.to_junction_tree() self.start_interface_clique = self._get_clique( self.start_junction_tree, self.interface_nodes_0 ) self.in_clique = self._get_clique( self.one_and_half_junction_tree, self.interface_nodes_0 ) self.out_clique = self._get_clique( self.one_and_half_junction_tree, self.interface_nodes_1 ) def _shift_nodes(self, nodes, time_slice): """ Shifting the nodes to a certain required timeslice. Parameters ---------- nodes: list, array-like List of node names. nodes that are to be shifted to some other time slice. time_slice: int time slice where to shift the nodes. """ return [(node[0], time_slice) for node in nodes] def _get_clique(self, junction_tree, nodes): """ Extracting the cliques from the junction tree which are a subset of the given nodes. Parameters ---------- junction_tree: Junction tree from which the nodes are to be extracted. nodes: iterable container A container of nodes (list, dict, set, etc.). """ return [ clique for clique in junction_tree.nodes() if set(nodes).issubset(clique) ][0] def _get_evidence(self, evidence_dict, time_slice, shift): """ Getting the evidence belonging to a particular timeslice. Parameters ---------- evidence: dict a dict key, value pair as {var: state_of_var_observed} None if no evidence time: int the evidence corresponding to the time slice shift: int shifting the evidence corresponding to the given time slice. """ if evidence_dict: return { (node[0], shift): evidence_dict[node] for node in evidence_dict if node[1] == time_slice } def _marginalize_factor(self, nodes, factor): """ Marginalizing the factor selectively for a set of variables. Parameters ---------- nodes: list, array-like A container of nodes (list, dict, set, etc.). factor: factor factor which is to be marginalized. """ marginalizing_nodes = list(set(factor.scope()).difference(nodes)) return factor.marginalize(marginalizing_nodes, inplace=False) def _update_belief(self, belief_prop, clique, clique_potential, message=None): """ Method for updating the belief. Parameters ---------- belief_prop: Belief Propagation Belief Propagation which needs to be updated. in_clique: clique The factor which needs to be updated corresponding to the input clique. out_clique_potential: factor Multiplying factor which will be multiplied to the factor corresponding to the clique. """ old_factor = belief_prop.junction_tree.get_factors(clique) belief_prop.junction_tree.remove_factors(old_factor) if message: if message.scope() and clique_potential.scope(): new_factor = old_factor * message new_factor = new_factor / clique_potential else: new_factor = old_factor else: new_factor = old_factor * clique_potential belief_prop.junction_tree.add_factors(new_factor) belief_prop.calibrate() def _get_factor(self, belief_prop, evidence): """ Extracts the required factor from the junction tree. Parameters ---------- belief_prop: Belief Propagation Belief Propagation which needs to be updated. evidence: dict a dict key, value pair as {var: state_of_var_observed} """ final_factor = factor_product(*belief_prop.junction_tree.get_factors()) if evidence: for var in evidence: if var in final_factor.scope(): final_factor.reduce([(var, evidence[var])]) return final_factor def _shift_factor(self, factor, shift): """ Shifting the factor to a certain required time slice. Parameters ---------- factor: DiscreteFactor The factor which needs to be shifted. shift: int The new timeslice to which the factor should belong to. """ new_scope = self._shift_nodes(factor.scope(), shift) return DiscreteFactor(new_scope, factor.cardinality, factor.values)
[docs] def forward_inference(self, variables, evidence=None, args=None): """ Forward inference method using belief propagation. Parameters ---------- variables: list list of variables for which you want to compute the probability evidence: dict a dict key, value pair as {var: state_of_var_observed} None if no evidence Examples -------- >>> from pgmpy.factors.discrete import TabularCPD >>> from pgmpy.models import DynamicBayesianNetwork as DBN >>> from pgmpy.inference import DBNInference >>> dbnet = DBN() >>> dbnet.add_edges_from([(('Z', 0), ('X', 0)), (('X', 0), ('Y', 0)), ... (('Z', 0), ('Z', 1))]) >>> z_start_cpd = TabularCPD(('Z', 0), 2, [[0.5], [0.5]]) >>> x_i_cpd = TabularCPD(('X', 0), 2, [[0.6, 0.9], ... [0.4, 0.1]], ... evidence=[('Z', 0)], ... evidence_card=[2]) >>> y_i_cpd = TabularCPD(('Y', 0), 2, [[0.2, 0.3], ... [0.8, 0.7]], ... evidence=[('X', 0)], ... evidence_card=[2]) >>> z_trans_cpd = TabularCPD(('Z', 1), 2, [[0.4, 0.7], ... [0.6, 0.3]], ... evidence=[('Z', 0)], ... evidence_card=[2]) >>> dbnet.add_cpds(z_start_cpd, z_trans_cpd, x_i_cpd, y_i_cpd) >>> dbnet.initialize_initial_state() >>> dbn_inf = DBNInference(dbnet) >>> dbn_inf.forward_inference([('X', 2)], {('Y', 0):1, ('Y', 1):0, ('Y', 2):1})[('X', 2)].values array([0.76738736, 0.23261264]) """ variable_dict = defaultdict(list) for var in variables: variable_dict[var[1]].append(var) time_range = max(variable_dict) if evidence: evid_time_range = max([time_slice for var, time_slice in evidence.keys()]) time_range = max(time_range, evid_time_range) start_bp = BeliefPropagation(self.start_junction_tree) mid_bp = BeliefPropagation(self.one_and_half_junction_tree) evidence_0 = self._get_evidence(evidence, 0, 0) interface_nodes_dict = {} potential_dict = {} if evidence: interface_nodes_dict = { k: v for k, v in evidence_0.items() if k in self.interface_nodes_0 } initial_factor = self._get_factor(start_bp, evidence_0) marginalized_factor = self._marginalize_factor( self.interface_nodes_0, initial_factor ) potential_dict[0] = marginalized_factor self._update_belief(mid_bp, self.in_clique, marginalized_factor) if variable_dict[0]: factor_values = start_bp.query( variable_dict[0], evidence=evidence_0, joint=False ) else: factor_values = {} for time_slice in range(1, time_range + 1): evidence_time = self._get_evidence(evidence, time_slice, 1) if interface_nodes_dict: evidence_time.update(interface_nodes_dict) if variable_dict[time_slice]: variable_time = self._shift_nodes(variable_dict[time_slice], 1) new_values = mid_bp.query( variable_time, evidence=evidence_time, joint=False ) changed_values = {} for key in new_values.keys(): new_key = (key[0], time_slice) new_factor = DiscreteFactor( [new_key], new_values[key].cardinality, new_values[key].values ) changed_values[new_key] = new_factor factor_values.update(changed_values) clique_phi = self._get_factor(mid_bp, evidence_time) out_clique_phi = self._marginalize_factor( self.interface_nodes_1, clique_phi ) new_factor = self._shift_factor(out_clique_phi, 0) potential_dict[time_slice] = new_factor mid_bp = BeliefPropagation(self.one_and_half_junction_tree) self._update_belief(mid_bp, self.in_clique, new_factor) if evidence_time: interface_nodes_dict = { (k[0], 0): v for k, v in evidence_time.items() if k in self.interface_nodes_1 } else: interface_nodes_dict = {} if args == "potential": return potential_dict return factor_values
[docs] def backward_inference(self, variables, evidence=None): """ Backward inference method using belief propagation. Parameters ---------- variables: list list of variables for which you want to compute the probability evidence: dict a dict key, value pair as {var: state_of_var_observed} None if no evidence Examples -------- >>> from pgmpy.factors.discrete import TabularCPD >>> from pgmpy.models import DynamicBayesianNetwork as DBN >>> from pgmpy.inference import DBNInference >>> dbnet = DBN() >>> dbnet.add_edges_from([(('Z', 0), ('X', 0)), (('X', 0), ('Y', 0)), ... (('Z', 0), ('Z', 1))]) >>> z_start_cpd = TabularCPD(('Z', 0), 2, [[0.5], [0.5]]) >>> x_i_cpd = TabularCPD(('X', 0), 2, [[0.6, 0.9], ... [0.4, 0.1]], ... evidence=[('Z', 0)], ... evidence_card=[2]) >>> y_i_cpd = TabularCPD(('Y', 0), 2, [[0.2, 0.3], ... [0.8, 0.7]], ... evidence=[('X', 0)], ... evidence_card=[2]) >>> z_trans_cpd = TabularCPD(('Z', 1), 2, [[0.4, 0.7], ... [0.6, 0.3]], ... evidence=[('Z', 0)], ... evidence_card=[2]) >>> dbnet.add_cpds(z_start_cpd, z_trans_cpd, x_i_cpd, y_i_cpd) >>> dbnet.initialize_initial_state() >>> dbn_inf = DBNInference(dbnet) >>> dbn_inf.backward_inference([('X', 0)], {('Y', 0):0, ('Y', 1):1, ('Y', 2):1})[('X', 0)].values array([0.66594382, 0.33405618]) """ variable_dict = defaultdict(list) for var in variables: variable_dict[var[1]].append(var) time_range = max(variable_dict) interface_nodes_dict = {} if evidence: evid_time_range = max([time_slice for var, time_slice in evidence.keys()]) time_range = max(time_range, evid_time_range) end_bp = BeliefPropagation(self.start_junction_tree) potential_dict = self.forward_inference(variables, evidence, "potential") update_factor = self._shift_factor(potential_dict[time_range], 1) factor_values = {} for time_slice in range(time_range, 0, -1): evidence_time = self._get_evidence(evidence, time_slice, 1) evidence_prev_time = self._get_evidence(evidence, time_slice - 1, 0) if evidence_prev_time: interface_nodes_dict = { k: v for k, v in evidence_prev_time.items() if k in self.interface_nodes_0 } if evidence_time: evidence_time.update(interface_nodes_dict) mid_bp = BeliefPropagation(self.one_and_half_junction_tree) self._update_belief(mid_bp, self.in_clique, potential_dict[time_slice - 1]) forward_factor = self._shift_factor(potential_dict[time_slice], 1) self._update_belief(mid_bp, self.out_clique, forward_factor, update_factor) if variable_dict[time_slice]: variable_time = self._shift_nodes(variable_dict[time_slice], 1) new_values = mid_bp.query( variable_time, evidence=evidence_time, joint=False ) changed_values = {} for key in new_values.keys(): new_key = (key[0], time_slice) new_factor = DiscreteFactor( [new_key], new_values[key].cardinality, new_values[key].values ) changed_values[new_key] = new_factor factor_values.update(changed_values) clique_phi = self._get_factor(mid_bp, evidence_time) in_clique_phi = self._marginalize_factor(self.interface_nodes_0, clique_phi) update_factor = self._shift_factor(in_clique_phi, 1) out_clique_phi = self._shift_factor(update_factor, 0) self._update_belief( end_bp, self.start_interface_clique, potential_dict[0], out_clique_phi ) evidence_0 = self._get_evidence(evidence, 0, 0) if variable_dict[0]: factor_values.update( end_bp.query(variable_dict[0], evidence_0, joint=False) ) return factor_values
[docs] def query(self, variables, evidence=None, args="exact"): """ Query method for Dynamic Bayesian Network using Interface Algorithm. Parameters ---------- variables: list list of variables for which you want to compute the probability evidence: dict a dict key, value pair as {var: state_of_var_observed} None if no evidence Examples -------- >>> from pgmpy.factors.discrete import TabularCPD >>> from pgmpy.models import DynamicBayesianNetwork as DBN >>> from pgmpy.inference import DBNInference >>> dbnet = DBN() >>> dbnet.add_edges_from([(('Z', 0), ('X', 0)), (('X', 0), ('Y', 0)), ... (('Z', 0), ('Z', 1))]) >>> z_start_cpd = TabularCPD(('Z', 0), 2, [[0.5], [0.5]]) >>> x_i_cpd = TabularCPD(('X', 0), 2, [[0.6, 0.9], ... [0.4, 0.1]], ... evidence=[('Z', 0)], ... evidence_card=[2]) >>> y_i_cpd = TabularCPD(('Y', 0), 2, [[0.2, 0.3], ... [0.8, 0.7]], ... evidence=[('X', 0)], ... evidence_card=[2]) >>> z_trans_cpd = TabularCPD(('Z', 1), 2, [[0.4, 0.7], ... [0.6, 0.3]], ... evidence=[('Z', 0)], ... evidence_card=[2]) >>> dbnet.add_cpds(z_start_cpd, z_trans_cpd, x_i_cpd, y_i_cpd) >>> dbnet.initialize_initial_state() >>> dbn_inf = DBNInference(dbnet) >>> dbn_inf.query([('X', 0)], {('Y', 0):0, ('Y', 1):1, ('Y', 2):1})[('X', 0)].values array([0.66594382, 0.33405618]) """ if args == "exact": return self.backward_inference(variables, evidence)