Source code for pgmpy.sampling.Sampling

import itertools
from collections import namedtuple

import numpy as np
import pandas as pd
from tqdm.auto import tqdm

from pgmpy import config
from pgmpy.factors import factor_product
from pgmpy.models import DiscreteBayesianNetwork, DiscreteMarkovNetwork, MarkovChain
from pgmpy.sampling import BayesianModelInference, _return_samples
from pgmpy.utils.mathext import sample_discrete, sample_discrete_maps

State = namedtuple("State", ["var", "state"])


[docs] class BayesianModelSampling(BayesianModelInference): """ Class for sampling methods specific to Bayesian Models Parameters ---------- model: instance of DiscreteBayesianNetwork model on which inference queries will be computed """ def __init__(self, model): super(BayesianModelSampling, self).__init__(model)
[docs] def forward_sample( self, size=1, include_latents=False, seed=None, show_progress=True, partial_samples=None, n_jobs=-1, ): """ Generates sample(s) from joint distribution of the Bayesian Network. Parameters ---------- size: int size of sample to be generated include_latents: boolean Whether to include the latent variable values in the generated samples. seed: int (default: None) If a value is provided, sets the seed for numpy.random. show_progress: boolean Whether to show a progress bar of samples getting generated. partial_samples: pandas.DataFrame A pandas dataframe specifying samples on some of the variables in the model. If specified, the sampling procedure uses these sample values, instead of generating them. n_jobs: int (default: -1) The number of CPU cores to use. Default uses all cores. Returns ------- sampled: pandas.DataFrame The generated samples Examples -------- >>> from pgmpy.models import DiscreteBayesianNetwork >>> from pgmpy.factors.discrete import TabularCPD >>> from pgmpy.sampling import BayesianModelSampling >>> student = DiscreteBayesianNetwork([("diff", "grade"), ("intel", "grade")]) >>> cpd_d = TabularCPD("diff", 2, [[0.6], [0.4]]) >>> cpd_i = TabularCPD("intel", 2, [[0.7], [0.3]]) >>> cpd_g = TabularCPD( ... "grade", ... 3, ... [[0.3, 0.05, 0.9, 0.5], [0.4, 0.25, 0.08, 0.3], [0.3, 0.7, 0.02, 0.2]], ... ["intel", "diff"], ... [2, 2], ... ) >>> student.add_cpds(cpd_d, cpd_i, cpd_g) >>> inference = BayesianModelSampling(student) >>> inference.forward_sample(size=2) rec.array([(0, 0, 1), (1, 0, 2)], dtype= [('diff', '<i8'), ('intel', '<i8'), ('grade', '<i8')]) """ sampled = pd.DataFrame(columns=list(self.model.nodes())) if show_progress and config.SHOW_PROGRESS: pbar = tqdm(self.topological_order) else: pbar = self.topological_order if seed is not None: np.random.seed(seed) for node in pbar: if show_progress and config.SHOW_PROGRESS: pbar.set_description(f"Generating for node: {node}") # If values specified in partial_samples, use them. Else generate the values. if (partial_samples is not None) and (node in partial_samples.columns): sampled[node] = partial_samples.loc[:, node].values else: cpd = self.model.get_cpds(node) states = range(self.cardinality[node]) evidence = cpd.variables[1:] if evidence: evidence_values = np.vstack([sampled[i] for i in evidence]) unique, inverse = np.unique( evidence_values.T, axis=0, return_inverse=True ) unique = [tuple(u) for u in unique] state_to_index, index_to_weight = self.pre_compute_reduce_maps( variable=node, evidence=evidence, state_combinations=unique ) if config.get_backend() == "numpy": weight_index = np.array([state_to_index[u] for u in unique])[ inverse ] else: import torch weight_index = torch.Tensor( [state_to_index[u] for u in unique] )[inverse] sampled[node] = sample_discrete_maps( states, weight_index, index_to_weight, size ) else: weights = cpd.values sampled[node] = sample_discrete(states, weights, size) samples_df = _return_samples( sampled, self.state_names_map, partial_samples.columns.tolist() if partial_samples is not None else [], ) if not include_latents and any( latent in samples_df.columns for latent in self.model.latents ): samples_df.drop(self.model.latents, axis=1, inplace=True) return samples_df
[docs] def rejection_sample( self, evidence=[], size=1, include_latents=False, seed=None, show_progress=True, partial_samples=None, ): """ Generates sample(s) from joint distribution of the Bayesian Network, given the evidence. Parameters ---------- evidence: list of `pgmpy.factor.State` namedtuples None if no evidence size: int size of sample to be generated include_latents: boolean Whether to include the latent variable values in the generated samples. seed: int (default: None) If a value is provided, sets the seed for numpy.random. show_progress: boolean Whether to show a progress bar of samples getting generated. partial_samples: pandas.DataFrame A pandas dataframe specifying samples on some of the variables in the model. If specified, the sampling procedure uses these sample values, instead of generating them. Returns ------- sampled: pandas.DataFrame The generated samples Examples -------- >>> from pgmpy.models import DiscreteBayesianNetwork >>> from pgmpy.factors.discrete import TabularCPD >>> from pgmpy.factors.discrete import State >>> from pgmpy.sampling import BayesianModelSampling >>> student = DiscreteBayesianNetwork([("diff", "grade"), ("intel", "grade")]) >>> cpd_d = TabularCPD("diff", 2, [[0.6], [0.4]]) >>> cpd_i = TabularCPD("intel", 2, [[0.7], [0.3]]) >>> cpd_g = TabularCPD( ... "grade", ... 3, ... [[0.3, 0.05, 0.9, 0.5], [0.4, 0.25, 0.08, 0.3], [0.3, 0.7, 0.02, 0.2]], ... ["intel", "diff"], ... [2, 2], ... ) >>> student.add_cpds(cpd_d, cpd_i, cpd_g) >>> inference = BayesianModelSampling(student) >>> evidence = [State(var="diff", state=0)] >>> inference.rejection_sample( ... evidence=evidence, size=2, return_type="dataframe" ... ) intel diff grade 0 0 0 1 1 0 0 1 """ if seed is not None: np.random.seed(seed) # If no evidence is given, it is equivalent to forward sampling. if len(evidence) == 0: return self.forward_sample(size=size, include_latents=include_latents) # Setup array to be returned sampled = pd.DataFrame() prob = 1 i = 0 # Do the sampling by generating samples from forward sampling and rejecting the # samples which do not match our evidence. Keep doing until we have enough # samples. if show_progress and config.SHOW_PROGRESS: pbar = tqdm(total=size) while i < size: _size = int(((size - i) / prob) * 1.5) # If partial_samples is specified, can only generate < partial_samples.shape[0] number of samples # at a time. For simplicity, just generate the same size as partial_samples.shape[0]. if partial_samples is not None: _size = partial_samples.shape[0] _sampled = self.forward_sample( size=_size, include_latents=True, show_progress=False, partial_samples=partial_samples, ) for var, state in evidence: _sampled = _sampled[_sampled[var] == state] prob = max(len(_sampled) / _size, 0.01) sampled = pd.concat([sampled, _sampled], axis=0, join="outer").iloc[ :size, : ] i += _sampled.shape[0] if show_progress and config.SHOW_PROGRESS: # Update at maximum to `size` comp = _sampled.shape[0] if i < size else size - (i - _sampled.shape[0]) pbar.update(comp) if show_progress and config.SHOW_PROGRESS: pbar.close() sampled = sampled.reset_index(drop=True) if not include_latents: sampled.drop(self.model.latents, axis=1, inplace=True) return sampled
[docs] def likelihood_weighted_sample( self, evidence=[], size=1, include_latents=False, seed=None, show_progress=True, n_jobs=-1, ): """ Generates weighted sample(s) from joint distribution of the Bayesian Network, that comply with the given evidence. 'Probabilistic Graphical Model Principles and Techniques', Koller and Friedman, Algorithm 12.2 pp 493. Parameters ---------- evidence: list of `pgmpy.factor.State` namedtuples None if no evidence size: int size of sample to be generated include_latents: boolean Whether to include the latent variable values in the generated samples. seed: int (default: None) If a value is provided, sets the seed for numpy.random. show_progress: boolean Whether to show a progress bar of samples getting generated. n_jobs: int (default: -1) The number of CPU cores to use. Default uses all cores. Returns ------- sampled: A pandas.DataFrame The generated samples with corresponding weights Examples -------- >>> from pgmpy.factors.discrete import State >>> from pgmpy.models import DiscreteBayesianNetwork >>> from pgmpy.factors.discrete import TabularCPD >>> from pgmpy.sampling import BayesianModelSampling >>> student = DiscreteBayesianNetwork([("diff", "grade"), ("intel", "grade")]) >>> cpd_d = TabularCPD("diff", 2, [[0.6], [0.4]]) >>> cpd_i = TabularCPD("intel", 2, [[0.7], [0.3]]) >>> cpd_g = TabularCPD( ... "grade", ... 3, ... [[0.3, 0.05, 0.9, 0.5], [0.4, 0.25, 0.08, 0.3], [0.3, 0.7, 0.02, 0.2]], ... ["intel", "diff"], ... [2, 2], ... ) >>> student.add_cpds(cpd_d, cpd_i, cpd_g) >>> inference = BayesianModelSampling(student) >>> evidence = [State("diff", 0)] >>> inference.likelihood_weighted_sample( ... evidence=evidence, size=2, return_type="recarray" ... ) rec.array([(0, 0, 1, 0.6), (0, 0, 2, 0.6)], dtype= [('diff', '<i8'), ('intel', '<i8'), ('grade', '<i8'), ('_weight', '<f8')]) """ if seed is not None: np.random.seed(seed) # Convert evidence state names to number evidence = [ (var, self.model.get_cpds(var).get_state_no(var, state)) for var, state in evidence ] # Prepare the return dataframe sampled = pd.DataFrame(columns=list(self.model.nodes())) sampled["_weight"] = np.ones(size) evidence_dict = dict(evidence) if show_progress and config.SHOW_PROGRESS: pbar = tqdm(self.topological_order) else: pbar = self.topological_order # Do the sampling for node in pbar: if show_progress and config.SHOW_PROGRESS: pbar.set_description(f"Generating for node: {node}") cpd = self.model.get_cpds(node) states = range(self.cardinality[node]) evidence = cpd.get_evidence() if evidence: evidence_values = np.vstack([sampled[i] for i in evidence]) unique, inverse = np.unique( evidence_values.T, axis=0, return_inverse=True ) unique = [tuple(u) for u in unique] state_to_index, index_to_weight = self.pre_compute_reduce_maps( variable=node, evidence=evidence, state_combinations=unique ) weight_index = np.array([state_to_index[tuple(u)] for u in unique])[ inverse ] if node in evidence_dict: evidence_value = evidence_dict[node] sampled[node] = evidence_value sampled.loc[:, "_weight"] *= np.array( list( map( lambda i: index_to_weight[weight_index[i]][ evidence_value ], range(size), ) ) ) else: sampled[node] = sample_discrete_maps( states, weight_index, index_to_weight, size ) else: if node in evidence_dict: sampled[node] = evidence_dict[node] sampled.loc[:, "_weight"] *= np.array( list( map(lambda _: cpd.values[evidence_dict[node]], range(size)) ) ) else: sampled[node] = sample_discrete(states, cpd.values, size) # Postprocess the samples: Change state numbers to names, remove latents. samples_df = _return_samples(sampled, self.state_names_map) if not include_latents: samples_df.drop(self.model.latents, axis=1, inplace=True) return samples_df
[docs] class GibbsSampling(MarkovChain): """ Class for performing Gibbs sampling. Parameters ---------- model: DiscreteBayesianNetwork or DiscreteMarkovNetwork Model from which variables are inherited and transition probabilities computed. Examples -------- Initialization from a DiscreteBayesianNetwork object: >>> from pgmpy.factors.discrete import TabularCPD >>> from pgmpy.models import DiscreteBayesianNetwork >>> intel_cpd = TabularCPD("intel", 2, [[0.7], [0.3]]) >>> sat_cpd = TabularCPD( ... "sat", 2, [[0.95, 0.2], [0.05, 0.8]], evidence=["intel"], evidence_card=[2] ... ) >>> student = DiscreteBayesianNetwork() >>> student.add_nodes_from(["intel", "sat"]) >>> student.add_edge("intel", "sat") >>> student.add_cpds(intel_cpd, sat_cpd) >>> from pgmpy.sampling import GibbsSampling >>> gibbs_chain = GibbsSampling(student) >>> gibbs_chain.sample(size=3) intel sat 0 0 0 1 0 0 2 1 1 """ def __init__(self, model=None): super(GibbsSampling, self).__init__() if isinstance(model, DiscreteBayesianNetwork): self._get_kernel_from_bayesian_model(model) elif isinstance(model, DiscreteMarkovNetwork): self._get_kernel_from_markov_model(model) def _get_kernel_from_bayesian_model(self, model): """ Computes the Gibbs transition models from a Bayesian Network. 'Probabilistic Graphical Model Principles and Techniques', Koller and Friedman, Section 12.3.3 pp 512-513. Parameters ---------- model: DiscreteBayesianNetwork The model from which probabilities will be computed. """ self.variables = np.array(model.nodes()) self.latents = model.latents self.cardinalities = { var: model.get_cpds(var).variable_card for var in self.variables } for var in self.variables: other_vars = [v for v in self.variables if var != v] other_cards = [self.cardinalities[v] for v in other_vars] kernel = {} factors = [cpd.to_factor() for cpd in model.cpds if var in cpd.scope()] factor = factor_product(*factors) scope = set(factor.scope()) for tup in itertools.product(*[range(card) for card in other_cards]): states = [State(v, s) for v, s in zip(other_vars, tup) if v in scope] reduced_factor = factor.reduce(states, inplace=False) kernel[tup] = reduced_factor.values / sum(reduced_factor.values) self.transition_models[var] = kernel def _get_kernel_from_markov_model(self, model): """ Computes the Gibbs transition models from a Markov Network. 'Probabilistic Graphical Model Principles and Techniques', Koller and Friedman, Section 12.3.3 pp 512-513. Parameters ---------- model: DiscreteMarkovNetwork The model from which probabilities will be computed. """ self.variables = np.array(model.nodes()) self.latents = model.latents factors_dict = {var: [] for var in self.variables} for factor in model.get_factors(): for var in factor.scope(): factors_dict[var].append(factor) # Take factor product factors_dict = { var: factor_product(*factors) if len(factors) > 1 else factors[0] for var, factors in factors_dict.items() } self.cardinalities = { var: factors_dict[var].get_cardinality([var])[var] for var in self.variables } for var in self.variables: other_vars = [v for v in self.variables if var != v] other_cards = [self.cardinalities[v] for v in other_vars] kernel = {} factor = factors_dict[var] scope = set(factor.scope()) for tup in itertools.product(*[range(card) for card in other_cards]): states = [ State(first_var, s) for first_var, s in zip(other_vars, tup) if first_var in scope ] reduced_factor = factor.reduce(states, inplace=False) kernel[tup] = reduced_factor.values / sum(reduced_factor.values) self.transition_models[var] = kernel
[docs] def sample(self, start_state=None, size=1, seed=None, include_latents=False): """ Sample from the Markov Chain. Parameters ---------- start_state: dict or array-like iterable Representing the starting states of the variables. If None is passed, a random start_state is chosen. size: int Number of samples to be generated. seed: int (default: None) If a value is provided, sets the seed for numpy.random. include_latents: boolean Whether to include the latent variable values in the generated samples. Returns ------- sampled: pandas.DataFrame The generated samples Examples -------- >>> from pgmpy.factors.discrete import DiscreteFactor >>> from pgmpy.sampling import GibbsSampling >>> from pgmpy.models import DiscreteMarkovNetwork >>> model = DiscreteMarkovNetwork([("A", "B"), ("C", "B")]) >>> factor_ab = DiscreteFactor(["A", "B"], [2, 2], [1, 2, 3, 4]) >>> factor_cb = DiscreteFactor(["C", "B"], [2, 2], [5, 6, 7, 8]) >>> model.add_factors(factor_ab, factor_cb) >>> gibbs = GibbsSampling(model) >>> gibbs.sample(size=4, return_tupe="dataframe") A B C 0 0 1 1 1 1 0 0 2 1 1 0 3 1 1 1 """ if start_state is None and self.state is None: self.state = self.random_state() elif start_state is not None: self.set_start_state(start_state) if seed is not None: np.random.seed(seed) types = [(str(var_name), "int") for var_name in self.variables] sampled = np.zeros(size, dtype=types).view(np.recarray) sampled[0] = tuple(st for var, st in self.state) for i in tqdm(range(size - 1)): for j, (var, st) in enumerate(self.state): other_st = tuple(st for v, st in self.state if var != v) next_st = sample_discrete( list(range(self.cardinalities[var])), self.transition_models[var][other_st], )[0] self.state[j] = State(var, next_st) sampled[i + 1] = tuple(st for var, st in self.state) samples_df = _return_samples(sampled) if not include_latents: samples_df.drop(self.latents, axis=1, inplace=True) return samples_df
[docs] def generate_sample( self, start_state=None, size=1, include_latents=False, seed=None ): """ Generator version of self.sample Returns ------- List of State namedtuples, representing the assignment to all variables of the model. Examples -------- >>> from pgmpy.factors.discrete import DiscreteFactor >>> from pgmpy.sampling import GibbsSampling >>> from pgmpy.models import DiscreteMarkovNetwork >>> model = DiscreteMarkovNetwork([("A", "B"), ("C", "B")]) >>> factor_ab = DiscreteFactor(["A", "B"], [2, 2], [1, 2, 3, 4]) >>> factor_cb = DiscreteFactor(["C", "B"], [2, 2], [5, 6, 7, 8]) >>> model.add_factors(factor_ab, factor_cb) >>> gibbs = GibbsSampling(model) >>> gen = gibbs.generate_sample(size=2) >>> [sample for sample in gen] [[State(var='C', state=1), State(var='B', state=1), State(var='A', state=0)], [State(var='C', state=0), State(var='B', state=1), State(var='A', state=1)]] """ if seed is not None: np.random.seed(seed) if start_state is None and self.state is None: self.state = self.random_state() elif start_state is not None: self.set_start_state(start_state) for i in range(size): for j, (var, st) in enumerate(self.state): other_st = tuple(st for v, st in self.state if var != v) next_st = sample_discrete( list(range(self.cardinalities[var])), self.transition_models[var][other_st], )[0] self.state[j] = State(var, next_st) if include_latents: yield self.state[:] else: yield [s for s in self.state if i not in self.latents]