import itertools
from collections import namedtuple
import networkx as nx
import numpy as np
import pandas as pd
import torch
from joblib import Parallel, delayed
from tqdm.auto import tqdm
from pgmpy import config
from pgmpy.factors import factor_product
from pgmpy.models import BayesianNetwork, MarkovChain, MarkovNetwork
from pgmpy.sampling import BayesianModelInference, _return_samples
from pgmpy.utils.mathext import sample_discrete, sample_discrete_maps
State = namedtuple("State", ["var", "state"])
[docs]
class BayesianModelSampling(BayesianModelInference):
"""
Class for sampling methods specific to Bayesian Models
Parameters
----------
model: instance of BayesianNetwork
model on which inference queries will be computed
"""
def __init__(self, model):
super(BayesianModelSampling, self).__init__(model)
[docs]
def forward_sample(
self,
size=1,
include_latents=False,
seed=None,
show_progress=True,
partial_samples=None,
n_jobs=-1,
):
"""
Generates sample(s) from joint distribution of the Bayesian Network.
Parameters
----------
size: int
size of sample to be generated
include_latents: boolean
Whether to include the latent variable values in the generated samples.
seed: int (default: None)
If a value is provided, sets the seed for numpy.random.
show_progress: boolean
Whether to show a progress bar of samples getting generated.
partial_samples: pandas.DataFrame
A pandas dataframe specifying samples on some of the variables in the model. If
specified, the sampling procedure uses these sample values, instead of generating them.
n_jobs: int (default: -1)
The number of CPU cores to use. Default uses all cores.
Returns
-------
sampled: pandas.DataFrame
The generated samples
Examples
--------
>>> from pgmpy.models import BayesianNetwork
>>> from pgmpy.factors.discrete import TabularCPD
>>> from pgmpy.sampling import BayesianModelSampling
>>> student = BayesianNetwork([('diff', 'grade'), ('intel', 'grade')])
>>> cpd_d = TabularCPD('diff', 2, [[0.6], [0.4]])
>>> cpd_i = TabularCPD('intel', 2, [[0.7], [0.3]])
>>> cpd_g = TabularCPD('grade', 3, [[0.3, 0.05, 0.9, 0.5], [0.4, 0.25,
... 0.08, 0.3], [0.3, 0.7, 0.02, 0.2]],
... ['intel', 'diff'], [2, 2])
>>> student.add_cpds(cpd_d, cpd_i, cpd_g)
>>> inference = BayesianModelSampling(student)
>>> inference.forward_sample(size=2)
rec.array([(0, 0, 1), (1, 0, 2)], dtype=
[('diff', '<i8'), ('intel', '<i8'), ('grade', '<i8')])
"""
sampled = pd.DataFrame(columns=list(self.model.nodes()))
if show_progress and config.SHOW_PROGRESS:
pbar = tqdm(self.topological_order)
else:
pbar = self.topological_order
if seed is not None:
np.random.seed(seed)
for node in pbar:
if show_progress and config.SHOW_PROGRESS:
pbar.set_description(f"Generating for node: {node}")
# If values specified in partial_samples, use them. Else generate the values.
if (partial_samples is not None) and (node in partial_samples.columns):
sampled[node] = partial_samples.loc[:, node].values
else:
cpd = self.model.get_cpds(node)
states = range(self.cardinality[node])
evidence = cpd.variables[1:]
if evidence:
evidence_values = np.vstack([sampled[i] for i in evidence])
unique, inverse = np.unique(
evidence_values.T, axis=0, return_inverse=True
)
unique = [tuple(u) for u in unique]
state_to_index, index_to_weight = self.pre_compute_reduce_maps(
variable=node, evidence=evidence, state_combinations=unique
)
if config.get_backend() == "numpy":
weight_index = np.array([state_to_index[u] for u in unique])[
inverse
]
else:
weight_index = torch.Tensor(
[state_to_index[u] for u in unique]
)[inverse]
sampled[node] = sample_discrete_maps(
states, weight_index, index_to_weight, size
)
else:
weights = cpd.values
sampled[node] = sample_discrete(states, weights, size)
samples_df = _return_samples(sampled, self.state_names_map)
if not include_latents:
samples_df.drop(self.model.latents, axis=1, inplace=True)
return samples_df
[docs]
def rejection_sample(
self,
evidence=[],
size=1,
include_latents=False,
seed=None,
show_progress=True,
partial_samples=None,
):
"""
Generates sample(s) from joint distribution of the Bayesian Network,
given the evidence.
Parameters
----------
evidence: list of `pgmpy.factor.State` namedtuples
None if no evidence
size: int
size of sample to be generated
include_latents: boolean
Whether to include the latent variable values in the generated samples.
seed: int (default: None)
If a value is provided, sets the seed for numpy.random.
show_progress: boolean
Whether to show a progress bar of samples getting generated.
partial_samples: pandas.DataFrame
A pandas dataframe specifying samples on some of the variables in the model. If
specified, the sampling procedure uses these sample values, instead of generating them.
Returns
-------
sampled: pandas.DataFrame
The generated samples
Examples
--------
>>> from pgmpy.models import BayesianNetwork
>>> from pgmpy.factors.discrete import TabularCPD
>>> from pgmpy.factors.discrete import State
>>> from pgmpy.sampling import BayesianModelSampling
>>> student = BayesianNetwork([('diff', 'grade'), ('intel', 'grade')])
>>> cpd_d = TabularCPD('diff', 2, [[0.6], [0.4]])
>>> cpd_i = TabularCPD('intel', 2, [[0.7], [0.3]])
>>> cpd_g = TabularCPD('grade', 3, [[0.3, 0.05, 0.9, 0.5], [0.4, 0.25,
... 0.08, 0.3], [0.3, 0.7, 0.02, 0.2]],
... ['intel', 'diff'], [2, 2])
>>> student.add_cpds(cpd_d, cpd_i, cpd_g)
>>> inference = BayesianModelSampling(student)
>>> evidence = [State(var='diff', state=0)]
>>> inference.rejection_sample(evidence=evidence, size=2, return_type='dataframe')
intel diff grade
0 0 0 1
1 0 0 1
"""
if seed is not None:
np.random.seed(seed)
# If no evidence is given, it is equivalent to forward sampling.
if len(evidence) == 0:
return self.forward_sample(size=size, include_latents=include_latents)
# Setup array to be returned
sampled = pd.DataFrame()
prob = 1
i = 0
# Do the sampling by generating samples from forward sampling and rejecting the
# samples which do not match our evidence. Keep doing until we have enough
# samples.
if show_progress and config.SHOW_PROGRESS:
pbar = tqdm(total=size)
while i < size:
_size = int(((size - i) / prob) * 1.5)
# If partial_samples is specified, can only generate < partial_samples.shape[0] number of samples
# at a time. For simplicity, just generate the same size as partial_samples.shape[0].
if partial_samples is not None:
_size = partial_samples.shape[0]
_sampled = self.forward_sample(
size=_size,
include_latents=True,
show_progress=False,
partial_samples=partial_samples,
)
for var, state in evidence:
_sampled = _sampled[_sampled[var] == state]
prob = max(len(_sampled) / _size, 0.01)
sampled = pd.concat([sampled, _sampled], axis=0, join="outer").iloc[
:size, :
]
i += _sampled.shape[0]
if show_progress and config.SHOW_PROGRESS:
# Update at maximum to `size`
comp = _sampled.shape[0] if i < size else size - (i - _sampled.shape[0])
pbar.update(comp)
if show_progress and config.SHOW_PROGRESS:
pbar.close()
sampled = sampled.reset_index(drop=True)
if not include_latents:
sampled.drop(self.model.latents, axis=1, inplace=True)
return sampled
[docs]
def likelihood_weighted_sample(
self,
evidence=[],
size=1,
include_latents=False,
seed=None,
show_progress=True,
n_jobs=-1,
):
"""
Generates weighted sample(s) from joint distribution of the Bayesian
Network, that comply with the given evidence.
'Probabilistic Graphical Model Principles and Techniques', Koller and
Friedman, Algorithm 12.2 pp 493.
Parameters
----------
evidence: list of `pgmpy.factor.State` namedtuples
None if no evidence
size: int
size of sample to be generated
include_latents: boolean
Whether to include the latent variable values in the generated samples.
seed: int (default: None)
If a value is provided, sets the seed for numpy.random.
show_progress: boolean
Whether to show a progress bar of samples getting generated.
n_jobs: int (default: -1)
The number of CPU cores to use. Default uses all cores.
Returns
-------
sampled: A pandas.DataFrame
The generated samples with corresponding weights
Examples
--------
>>> from pgmpy.factors.discrete import State
>>> from pgmpy.models import BayesianNetwork
>>> from pgmpy.factors.discrete import TabularCPD
>>> from pgmpy.sampling import BayesianModelSampling
>>> student = BayesianNetwork([('diff', 'grade'), ('intel', 'grade')])
>>> cpd_d = TabularCPD('diff', 2, [[0.6], [0.4]])
>>> cpd_i = TabularCPD('intel', 2, [[0.7], [0.3]])
>>> cpd_g = TabularCPD('grade', 3, [[0.3, 0.05, 0.9, 0.5], [0.4, 0.25,
... 0.08, 0.3], [0.3, 0.7, 0.02, 0.2]],
... ['intel', 'diff'], [2, 2])
>>> student.add_cpds(cpd_d, cpd_i, cpd_g)
>>> inference = BayesianModelSampling(student)
>>> evidence = [State('diff', 0)]
>>> inference.likelihood_weighted_sample(evidence=evidence, size=2, return_type='recarray')
rec.array([(0, 0, 1, 0.6), (0, 0, 2, 0.6)], dtype=
[('diff', '<i8'), ('intel', '<i8'), ('grade', '<i8'), ('_weight', '<f8')])
"""
if seed is not None:
np.random.seed(seed)
# Convert evidence state names to number
evidence = [
(var, self.model.get_cpds(var).get_state_no(var, state))
for var, state in evidence
]
# Prepare the return dataframe
sampled = pd.DataFrame(columns=list(self.model.nodes()))
sampled["_weight"] = np.ones(size)
evidence_dict = dict(evidence)
if show_progress and config.SHOW_PROGRESS:
pbar = tqdm(self.topological_order)
else:
pbar = self.topological_order
# Do the sampling
for node in pbar:
if show_progress and config.SHOW_PROGRESS:
pbar.set_description(f"Generating for node: {node}")
cpd = self.model.get_cpds(node)
states = range(self.cardinality[node])
evidence = cpd.get_evidence()
if evidence:
evidence_values = np.vstack([sampled[i] for i in evidence])
unique, inverse = np.unique(
evidence_values.T, axis=0, return_inverse=True
)
unique = [tuple(u) for u in unique]
state_to_index, index_to_weight = self.pre_compute_reduce_maps(
variable=node, evidence=evidence, state_combinations=unique
)
weight_index = np.array([state_to_index[tuple(u)] for u in unique])[
inverse
]
if node in evidence_dict:
evidence_value = evidence_dict[node]
sampled[node] = evidence_value
sampled.loc[:, "_weight"] *= np.array(
list(
map(
lambda i: index_to_weight[weight_index[i]][
evidence_value
],
range(size),
)
)
)
else:
sampled[node] = sample_discrete_maps(
states, weight_index, index_to_weight, size
)
else:
if node in evidence_dict:
sampled[node] = evidence_dict[node]
sampled.loc[:, "_weight"] *= np.array(
list(
map(lambda _: cpd.values[evidence_dict[node]], range(size))
)
)
else:
sampled[node] = sample_discrete(states, cpd.values, size)
# Postprocess the samples: Change state numbers to names, remove latents.
samples_df = _return_samples(sampled, self.state_names_map)
if not include_latents:
samples_df.drop(self.model.latents, axis=1, inplace=True)
return samples_df
[docs]
class GibbsSampling(MarkovChain):
"""
Class for performing Gibbs sampling.
Parameters
----------
model: BayesianNetwork or MarkovNetwork
Model from which variables are inherited and transition probabilities computed.
Examples
--------
Initialization from a BayesianNetwork object:
>>> from pgmpy.factors.discrete import TabularCPD
>>> from pgmpy.models import BayesianNetwork
>>> intel_cpd = TabularCPD('intel', 2, [[0.7], [0.3]])
>>> sat_cpd = TabularCPD('sat', 2, [[0.95, 0.2], [0.05, 0.8]], evidence=['intel'], evidence_card=[2])
>>> student = BayesianNetwork()
>>> student.add_nodes_from(['intel', 'sat'])
>>> student.add_edge('intel', 'sat')
>>> student.add_cpds(intel_cpd, sat_cpd)
>>> from pgmpy.sampling import GibbsSampling
>>> gibbs_chain = GibbsSampling(student)
>>> gibbs_chain.sample(size=3)
intel sat
0 0 0
1 0 0
2 1 1
"""
def __init__(self, model=None):
super(GibbsSampling, self).__init__()
if isinstance(model, BayesianNetwork):
self._get_kernel_from_bayesian_model(model)
elif isinstance(model, MarkovNetwork):
self._get_kernel_from_markov_model(model)
def _get_kernel_from_bayesian_model(self, model):
"""
Computes the Gibbs transition models from a Bayesian Network.
'Probabilistic Graphical Model Principles and Techniques', Koller and
Friedman, Section 12.3.3 pp 512-513.
Parameters
----------
model: BayesianNetwork
The model from which probabilities will be computed.
"""
self.variables = np.array(model.nodes())
self.latents = model.latents
self.cardinalities = {
var: model.get_cpds(var).variable_card for var in self.variables
}
for var in self.variables:
other_vars = [v for v in self.variables if var != v]
other_cards = [self.cardinalities[v] for v in other_vars]
kernel = {}
factors = [cpd.to_factor() for cpd in model.cpds if var in cpd.scope()]
factor = factor_product(*factors)
scope = set(factor.scope())
for tup in itertools.product(*[range(card) for card in other_cards]):
states = [State(v, s) for v, s in zip(other_vars, tup) if v in scope]
reduced_factor = factor.reduce(states, inplace=False)
kernel[tup] = reduced_factor.values / sum(reduced_factor.values)
self.transition_models[var] = kernel
def _get_kernel_from_markov_model(self, model):
"""
Computes the Gibbs transition models from a Markov Network.
'Probabilistic Graphical Model Principles and Techniques', Koller and
Friedman, Section 12.3.3 pp 512-513.
Parameters
----------
model: MarkovNetwork
The model from which probabilities will be computed.
"""
self.variables = np.array(model.nodes())
self.latents = model.latents
factors_dict = {var: [] for var in self.variables}
for factor in model.get_factors():
for var in factor.scope():
factors_dict[var].append(factor)
# Take factor product
factors_dict = {
var: factor_product(*factors) if len(factors) > 1 else factors[0]
for var, factors in factors_dict.items()
}
self.cardinalities = {
var: factors_dict[var].get_cardinality([var])[var] for var in self.variables
}
for var in self.variables:
other_vars = [v for v in self.variables if var != v]
other_cards = [self.cardinalities[v] for v in other_vars]
kernel = {}
factor = factors_dict[var]
scope = set(factor.scope())
for tup in itertools.product(*[range(card) for card in other_cards]):
states = [
State(first_var, s)
for first_var, s in zip(other_vars, tup)
if first_var in scope
]
reduced_factor = factor.reduce(states, inplace=False)
kernel[tup] = reduced_factor.values / sum(reduced_factor.values)
self.transition_models[var] = kernel
[docs]
def sample(self, start_state=None, size=1, seed=None, include_latents=False):
"""
Sample from the Markov Chain.
Parameters
----------
start_state: dict or array-like iterable
Representing the starting states of the variables. If None is passed, a random start_state is chosen.
size: int
Number of samples to be generated.
seed: int (default: None)
If a value is provided, sets the seed for numpy.random.
include_latents: boolean
Whether to include the latent variable values in the generated samples.
Returns
-------
sampled: pandas.DataFrame
The generated samples
Examples
--------
>>> from pgmpy.factors.discrete import DiscreteFactor
>>> from pgmpy.sampling import GibbsSampling
>>> from pgmpy.models import MarkovNetwork
>>> model = MarkovNetwork([('A', 'B'), ('C', 'B')])
>>> factor_ab = DiscreteFactor(['A', 'B'], [2, 2], [1, 2, 3, 4])
>>> factor_cb = DiscreteFactor(['C', 'B'], [2, 2], [5, 6, 7, 8])
>>> model.add_factors(factor_ab, factor_cb)
>>> gibbs = GibbsSampling(model)
>>> gibbs.sample(size=4, return_tupe='dataframe')
A B C
0 0 1 1
1 1 0 0
2 1 1 0
3 1 1 1
"""
if start_state is None and self.state is None:
self.state = self.random_state()
elif start_state is not None:
self.set_start_state(start_state)
if seed is not None:
np.random.seed(seed)
types = [(str(var_name), "int") for var_name in self.variables]
sampled = np.zeros(size, dtype=types).view(np.recarray)
sampled[0] = tuple(st for var, st in self.state)
for i in tqdm(range(size - 1)):
for j, (var, st) in enumerate(self.state):
other_st = tuple(st for v, st in self.state if var != v)
next_st = sample_discrete(
list(range(self.cardinalities[var])),
self.transition_models[var][other_st],
)[0]
self.state[j] = State(var, next_st)
sampled[i + 1] = tuple(st for var, st in self.state)
samples_df = _return_samples(sampled)
if not include_latents:
samples_df.drop(self.latents, axis=1, inplace=True)
return samples_df
[docs]
def generate_sample(
self, start_state=None, size=1, include_latents=False, seed=None
):
"""
Generator version of self.sample
Returns
-------
List of State namedtuples, representing the assignment to all variables of the model.
Examples
--------
>>> from pgmpy.factors.discrete import DiscreteFactor
>>> from pgmpy.sampling import GibbsSampling
>>> from pgmpy.models import MarkovNetwork
>>> model = MarkovNetwork([('A', 'B'), ('C', 'B')])
>>> factor_ab = DiscreteFactor(['A', 'B'], [2, 2], [1, 2, 3, 4])
>>> factor_cb = DiscreteFactor(['C', 'B'], [2, 2], [5, 6, 7, 8])
>>> model.add_factors(factor_ab, factor_cb)
>>> gibbs = GibbsSampling(model)
>>> gen = gibbs.generate_sample(size=2)
>>> [sample for sample in gen]
[[State(var='C', state=1), State(var='B', state=1), State(var='A', state=0)],
[State(var='C', state=0), State(var='B', state=1), State(var='A', state=1)]]
"""
if seed is not None:
np.random.seed(seed)
if start_state is None and self.state is None:
self.state = self.random_state()
elif start_state is not None:
self.set_start_state(start_state)
for i in range(size):
for j, (var, st) in enumerate(self.state):
other_st = tuple(st for v, st in self.state if var != v)
next_st = sample_discrete(
list(range(self.cardinalities[var])),
self.transition_models[var][other_st],
)[0]
self.state[j] = State(var, next_st)
if include_latents:
yield self.state[:]
else:
yield [s for s in self.state if i not in self.latents]