import itertools
import networkx as nx
import numpy as np
import pandas as pd
from opt_einsum import contract
from pgmpy.inference import Inference
from pgmpy.utils import _check_1d_array_object, _check_length_equal, compat_fns
[docs]
class BayesianModelInference(Inference):
"""
Class to calculate probability (pmf) values specific to Bayesian Models
Parameters
----------
model: Bayesian Model
model on which inference queries will be computed
"""
def __init__(self, model):
from pgmpy.models import DiscreteBayesianNetwork
if not isinstance(model, DiscreteBayesianNetwork):
raise TypeError(f"Model expected type: DiscreteBayesianNetwork, got type: {type(model)}")
super().__init__(model)
self._initialize_structures()
self.topological_order = list(nx.topological_sort(model))
[docs]
def pre_compute_reduce(self, variable):
"""
Get probability arrays for a node as function of conditional dependencies
Internal function used for Bayesian networks, eg. in BayesianModelSampling
and BayesianModelProbability.
Parameters
----------
variable: Bayesian Model Node
node of the Bayesian network
Returns
-------
dict: dictionary with probability array for node
as function of conditional dependency values
"""
variable_cpd = self.model.get_cpds(variable)
variable_evid = variable_cpd.variables[:0:-1]
cached_values = {}
for state_combination in itertools.product(*[range(self.cardinality[var]) for var in variable_evid]):
states = list(zip(variable_evid, state_combination))
cached_values[state_combination] = variable_cpd.reduce(states, inplace=False, show_warnings=False).values
return cached_values
@staticmethod
def _reduce_marg(variable_cpd, reduce_index, sc):
"""
Method to compute values of the `variable_cpd` when it it reduced on
`variable_evid` with states `sc_values`. Rest of the evidence variables
of `variable_cpd` are marginalized. This is a stripped down version
DiscreteFactor.reduce to only compute the values for faster runtime.
Parameters
----------
variable_cpd: Instance of pgmpy.factors.discrete.TabularCPD
The CPD that will be reduced.
sc: list
list of list of states indices to which to reduce the CPD. The i-th
element of sc corresponds to the (i+1)-th variable in
variable_cpd.variables, i.e., i-th evidence variable.
Returns
-------
list: List of np.array with each element representing the reduced
values correponding to the states in sc_values.
"""
slice_ = [slice(None) for i in range(len(variable_cpd.variables))]
for i, index in enumerate(reduce_index):
slice_[index] = sc[i]
reduced_values = variable_cpd.values[tuple(slice_)]
marg_values = contract(reduced_values, range(reduced_values.ndim), [0])
return marg_values / marg_values.sum()
[docs]
def pre_compute_reduce_maps(self, variable, evidence=None, state_combinations=None):
"""
Get probability array-maps for a node as function of conditional dependencies
Internal function used for Bayesian networks, eg. in BayesianModelSampling
and BayesianModelProbability.
Parameters
----------
variable: Bayesian Model Node
node of the Bayesian network
evidence: list
List of evidence variables to compute the reduced values for. Rest
of the parent varaibles of the node are marginalized.
state_combinations: list (default=None)
List of tuple of state combinations for which to compute the reductions maps.
Returns
-------
dict: dictionary with probability array-index for node as function of conditional dependency values,
dictionary with mapping of probability array-index to probability array.
"""
variable_cpd = self.model.get_cpds(variable)
if evidence is None:
evidence = [var for var in variable_cpd.variables[1:] if var not in self.model.latents]
if state_combinations is None:
state_combinations = [
tuple(sc) for sc in itertools.product(*[range(self.cardinality[var]) for var in evidence])
]
reduce_index = [variable_cpd.variables.index(var) for var in evidence]
weights_list = compat_fns.stack(
[BayesianModelInference._reduce_marg(variable_cpd, reduce_index, sc) for sc in state_combinations]
)
unique_weights, weights_indices = compat_fns.unique(weights_list, axis=0, return_inverse=True)
# convert weights to index; make mapping of state to index
state_to_index = dict(zip(state_combinations, weights_indices))
# make mapping of index to weights
index_to_weight = dict(enumerate(unique_weights))
# return mappings of state to index, and index to weight
return state_to_index, index_to_weight
class BaseGradLogPDF:
"""
Base class for evaluating gradient log of probability density function/ distribution
Classes inheriting this base class can be passed as an argument for
finding gradient log of probability distribution in inference algorithms
The class should initialize self.grad_log and self.log_pdf
Parameters
----------
variable_assignments : A 1d array like object (numpy.ndarray or list)
Vector representing values(assignments) of variables at which we want to find gradient and log
model : An instance of pgmpy.models
Examples
--------
>>> from pgmpy.factors import GaussianDistribution
>>> from pgmpy.inference.continuous import BaseGradLogPDF
>>> import numpy as np
>>> class GradLogGaussian(BaseGradLogPDF):
... def __init__(self, position, model):
... BaseGradLogPDF.__init__(self, position, model)
... self.grad_log, self.log_pdf = self._get_gradient_log_pdf()
...
... def _get_gradient_log_pdf(self):
... sub_vec = self.position - self.model.mean.flatten()
... grad = -np.dot(self.model.precision_matrix, sub_vec)
... log_pdf = 0.5 * float(np.dot(sub_vec, grad))
... return grad, log_pdf
...
>>> mean = np.array([1, 1])
>>> covariance = np.array([[1, 0.2], [0.2, 7]])
>>> model = GaussianDistribution(["x", "y"], mean, covariance)
>>> dist_param = np.array([0.1, 0.9])
>>> grad_logp, logp = GradLogGaussian(dist_param, model).get_gradient_log_pdf()
>>> logp
-0.4054597701149426
>>> grad_logp
array([ 0.90229885, -0.01149425])
"""
def __init__(self, variable_assignments, model):
self.variable_assignments = _check_1d_array_object(variable_assignments, "variable_assignments")
_check_length_equal(
variable_assignments,
model.variables,
"variable_assignments",
"model.variables",
)
self.model = model
# The gradient log of probability distribution at position
self.grad_log = None
# The gradient log of probability distribution at position
self.log_pdf = None
def get_gradient_log_pdf(self):
"""
Returns the gradient log and log of model at given position
Returns
-------
Returns a tuple of following types
numpy.array: Representing gradient log of model at given position
float: Representing log of model at given position
Example
--------
>>> # Using implementation of GradLogPDFGaussian
>>> from pgmpy.sampling.base import GradLogPDFGaussian
>>> from pgmpy.factors.continuous import GaussianDistribution
>>> import numpy as np
>>> mean = np.array([1, 1])
>>> covariance = np.array([[1, -5], [-5, 2]])
>>> model = GaussianDistribution(["x", "y"], mean, covariance)
>>> dist_param = np.array([0.6, 0.8])
>>> grad_logp, logp = GradLogPDFGaussian(
... dist_param, model
... ).get_gradient_log_pdf()
>>> logp
0.025217391304347823
>>> grad_logp
array([-0.07826087, -0.09565217])
"""
return self.grad_log, self.log_pdf
class GradLogPDFGaussian(BaseGradLogPDF):
"""
Class for finding gradient and gradient log of Joint Gaussian Distribution
Inherits pgmpy.inference.base_continuous.BaseGradLogPDF
Here model must be an instance of GaussianDistribution
Parameters
----------
variable_assignments : A 1d array like object (numpy.ndarray or list)
Vector representing values of variables at which we want to find gradient and log
model : An instance of pgmpy.models.GaussianDistribution
Example
-------
>>> from pgmpy.sampling import GradLogPDFGaussian
>>> from pgmpy.factors.continuous import GaussianDistribution
>>> import numpy as np
>>> mean = np.array([3, 4])
>>> covariance = np.array([[5, 4], [4, 5]])
>>> model = GaussianDistribution(["x", "y"], mean, covariance)
>>> dist_param = np.array([12, 21])
>>> grad_logp, logp = GradLogPDFGaussian(dist_param, model).get_gradient_log_pdf()
>>> logp
-34.777777777777771
>>> grad_logp
array([ 2.55555556, -5.44444444])
"""
def __init__(self, variable_assignments, model):
BaseGradLogPDF.__init__(self, variable_assignments, model)
self.grad_log, self.log_pdf = self._get_gradient_log_pdf()
def _get_gradient_log_pdf(self):
"""
Method that finds gradient and its log at position
"""
sub_vec = self.variable_assignments - self.model.mean.flatten()
grad = -np.dot(self.model.precision_matrix, sub_vec)
log_pdf = 0.5 * np.dot(sub_vec, grad)
return grad, log_pdf
class BaseSimulateHamiltonianDynamics:
"""
Base class for proposing new values of position and momentum by simulating Hamiltonian Dynamics.
Classes inheriting this base class can be passed as an argument for
simulate_dynamics in inference algorithms.
Parameters
----------
model : An instance of pgmpy.models
Model for which DiscretizeTime object is initialized
position : A 1d array like object (numpy.ndarray or list)
Vector representing values of parameter position( or X)
momentum: A 1d array like object (numpy.ndarray or list)
Vector representing the proposed value for momentum (velocity)
stepsize: Float
stepsize for the simulating dynamics
grad_log_pdf : A subclass of pgmpy.inference.continuous.BaseGradLogPDF
A class for finding gradient log and log of distribution
grad_log_position: A 1d array like object, defaults to None
Vector representing gradient log at given position
If None, then will be calculated
Examples
--------
>>> from pgmpy.sampling import BaseSimulateHamiltonianDynamics
>>> from pgmpy.factors.continuous import GaussianDistribution
>>> from pgmpy.sampling import GradLogPDFGaussian
>>> import numpy as np
>>> # Class should initialize self.new_position, self.new_momentum and self.new_grad_logp
>>> # self.new_grad_logp represents gradient log at new proposed value of position
>>> class ModifiedEuler(BaseSimulateHamiltonianDynamics):
... def __init__(
... self,
... model,
... position,
... momentum,
... stepsize,
... grad_log_pdf,
... grad_log_position=None,
... ):
... BaseSimulateHamiltonianDynamics.__init__(
... self,
... model,
... position,
... momentum,
... stepsize,
... grad_log_pdf,
... grad_log_position,
... )
... self.new_position, self.new_momentum, self.new_grad_logp = (
... self._get_proposed_values()
... )
...
... def _get_proposed_values(self):
... momentum_bar = self.momentum + self.stepsize * self.grad_log_position
... position_bar = self.position + self.stepsize * momentum_bar
... grad_log_position, _ = self.grad_log_pdf(
... position_bar, self.model
... ).get_gradient_log_pdf()
... return position_bar, momentum_bar, grad_log_position
...
>>> pos = np.array([1, 2])
>>> momentum = np.array([0, 0])
>>> mean = np.array([0, 0])
>>> covariance = np.eye(2)
>>> model = GaussianDistribution(["x", "y"], mean, covariance)
>>> new_pos, new_momentum, new_grad = ModifiedEuler(
... model, pos, momentum, 0.25, GradLogPDFGaussian
... ).get_proposed_values()
>>> new_pos
array([0.9375, 1.875])
>>> new_momentum
array([-0.25, -0.5])
>>> new_grad
array([-0.9375, -1.875])
"""
def __init__(self, model, position, momentum, stepsize, grad_log_pdf, grad_log_position=None):
position = _check_1d_array_object(position, "position")
momentum = _check_1d_array_object(momentum, "momentum")
if not issubclass(grad_log_pdf, BaseGradLogPDF):
raise TypeError("grad_log_pdf must be an instance" + " of pgmpy.inference.continuous.base.BaseGradLogPDF")
_check_length_equal(position, momentum, "position", "momentum")
_check_length_equal(position, model.variables, "position", "model.variables")
if grad_log_position is None:
grad_log_position, _ = grad_log_pdf(position, model).get_gradient_log_pdf()
else:
grad_log_position = _check_1d_array_object(grad_log_position, "grad_log_position")
_check_length_equal(grad_log_position, position, "grad_log_position", "position")
self.position = position
self.momentum = momentum
self.stepsize = stepsize
self.model = model
self.grad_log_pdf = grad_log_pdf
self.grad_log_position = grad_log_position
# new_position is the new proposed position, new_momentum is the new proposed momentum, new_grad_lop
# is the value of grad log at new_position
self.new_position = self.new_momentum = self.new_grad_logp = None
def get_proposed_values(self):
"""
Returns the new proposed values of position and momentum
Returns
-------
Returns a tuple of following type (in order)
numpy.array: A 1d numpy.array representing new proposed value of position
numpy.array: A 1d numpy.array representing new proposed value of momentum
numpy.array: A 1d numpy.array representing gradient of log distribution at new proposed value of position
Example
-------
>>> # Using implementation of ModifiedEuler
>>> from pgmpy.inference.continuous import (
... ModifiedEuler,
... GradLogPDFGaussian as GLPG,
... )
>>> from pgmpy.factors import GaussianDistribution
>>> import numpy as np
>>> pos = np.array([3, 4])
>>> momentum = np.array([1, 1])
>>> mean = np.array([-1, 1])
>>> covariance = 3 * np.eye(2)
>>> model = GaussianDistribution(["x", "y"], mean, covariance)
>>> new_pos, new_momentum, new_grad = ModifiedEuler(
... model, pos, momentum, 0.70, GLPG
... ).get_proposed_values()
>>> new_pos
array([ 3.04666667, 4.21 ])
>>> new_momentum
array([ 0.06666667, 0.3 ])
>>> new_grad
array([-1.34888889, -1.07 ])
"""
return self.new_position, self.new_momentum, self.new_grad_logp
class LeapFrog(BaseSimulateHamiltonianDynamics):
"""
Class for simulating hamiltonian dynamics using leapfrog method
Inherits pgmpy.inference.base_continuous.BaseSimulateHamiltonianDynamics
Parameters
----------
model : An instance of pgmpy.models
Model for which DiscretizeTime object is initialized
position : A 1d array like object (numpy.ndarray or list)
Vector representing values of parameter position( or X)
momentum: A 1d array like object (numpy.ndarray or list)
Vector representing the proposed value for momentum (velocity)
stepsize: Float
stepsize for the simulating dynamics
grad_log_pdf : A subclass of pgmpy.inference.continuous.BaseGradLogPDF, defaults to None
A class for evaluating gradient log and log of distribution for a given assignment of variables
If None, then model.get_gradient_log_pdf will be used
grad_log_position: A 1d array like object, defaults to None
Vector representing gradient log at given position
If None, then will be calculated
Example
--------
>>> from pgmpy.factors.continuous import GaussianDistribution
>>> from pgmpy.sampling import LeapFrog, GradLogPDFGaussian as GLPG
>>> import numpy as np
>>> pos = np.array([2, 1])
>>> momentum = np.array([7, 7])
>>> mean = np.array([-5, 5])
>>> covariance = np.array([[1, 2], [2, 1]])
>>> model = GaussianDistribution(["x", "y"], mean, covariance)
>>> new_pos, new_momentum, new_grad = LeapFrog(
... model, pos, momentum, 4.0, GLPG
... ).get_proposed_values()
>>> new_pos
array([ 70., -19.])
>>> new_momentum
array([ 99., -121.])
>>> new_grad
array([ 41., -58.])
"""
def __init__(self, model, position, momentum, stepsize, grad_log_pdf, grad_log_position=None):
BaseSimulateHamiltonianDynamics.__init__(
self, model, position, momentum, stepsize, grad_log_pdf, grad_log_position
)
(
self.new_position,
self.new_momentum,
self.new_grad_logp,
) = self._get_proposed_values()
def _get_proposed_values(self):
"""
Method to perform time splitting using leapfrog
"""
# Take half step in time for updating momentum
momentum_bar = self.momentum + 0.5 * self.stepsize * self.grad_log_position
# Take full step in time for updating position
position_bar = self.position + self.stepsize * momentum_bar
grad_log, _ = self.grad_log_pdf(position_bar, self.model).get_gradient_log_pdf()
# Take remaining half step in time for updating momentum
momentum_bar = momentum_bar + 0.5 * self.stepsize * grad_log
return position_bar, momentum_bar, grad_log
class ModifiedEuler(BaseSimulateHamiltonianDynamics):
"""
Class for simulating Hamiltonian Dynamics using Modified euler method
Inherits pgmpy.inference.base_continuous.BaseSimulateHamiltonianDynamics
Parameters
----------
model: An instance of pgmpy.models
Model for which DiscretizeTime object is initialized
position: A 1d array like object (numpy.ndarray or list)
Vector representing values of parameter position( or X)
momentum: A 1d array like object (numpy.ndarray or list)
Vector representing the proposed value for momentum (velocity)
stepsize: Float
stepsize for the simulating dynamics
grad_log_pdf: A subclass of pgmpy.inference.continuous.BaseGradLogPDF, defaults to None
A class for finding gradient log and log of distribution
If None, then will use model.get_gradient_log_pdf
grad_log_position: A 1d array like object, defaults to None
Vector representing gradient log at given position
If None, then will be calculated
Example
--------
>>> from pgmpy.factors.continuous import GaussianDistribution
>>> from pgmpy.sampling import GradLogPDFGaussian, ModifiedEuler
>>> import numpy as np
>>> pos = np.array([2, 1])
>>> momentum = np.array([1, 1])
>>> mean = np.array([0, 0])
>>> covariance = np.eye(2)
>>> model = GaussianDistribution(["x", "y"], mean, covariance)
>>> new_pos, new_momentum, new_grad = ModifiedEuler(
... model, pos, momentum, 0.25, GradLogPDFGaussian
... ).get_proposed_values()
>>> new_pos
array([2.125, 1.1875])
>>> new_momentum
array([0.5, 0.75])
>>> new_grad
array([-2.125, -1.1875])
"""
def __init__(self, model, position, momentum, stepsize, grad_log_pdf, grad_log_position=None):
BaseSimulateHamiltonianDynamics.__init__(
self, model, position, momentum, stepsize, grad_log_pdf, grad_log_position
)
(
self.new_position,
self.new_momentum,
self.new_grad_logp,
) = self._get_proposed_values()
def _get_proposed_values(self):
"""
Method to perform time splitting using Modified euler method
"""
# Take full step in time and update momentum
momentum_bar = self.momentum + self.stepsize * self.grad_log_position
# Take full step in time and update position
position_bar = self.position + self.stepsize * momentum_bar
grad_log, _ = self.grad_log_pdf(position_bar, self.model).get_gradient_log_pdf()
return position_bar, momentum_bar, grad_log
def _return_samples(samples, state_names_map=None, columns_with_state_names=[]):
"""
A utility function to return samples according to type
"""
if isinstance(samples, np.recarray):
samples = pd.DataFrame(samples)
if state_names_map is not None:
for var in samples.columns:
if (var != "_weight") and (var not in columns_with_state_names):
samples[var] = samples[var].map(state_names_map[var])
return samples