Source code for pgmpy.factors.continuous.CanonicalFactor

```# -*- coding: utf-8 -*-

from __future__ import division

import numpy as np

from pgmpy.factors.continuous import ContinuousFactor
from pgmpy.factors.continuous import JointGaussianDistribution

[docs]class CanonicalFactor(ContinuousFactor):
u"""
The intermediate factors in a Gaussian network can be described
compactly using a simple parametric representation called the
canonical form. This representation is closed under the basic
operations used in inference: factor product, factor division,
factor reduction, and marginalization. Thus, we define this
CanonicalFactor class that allows the inference process to be
performed on joint Gaussian networks.

A canonical form C (X; K,h,g) is defined as

C (X; K,h,g) = exp( ((-1/2) * X.T * K * X) + (h.T * X) + g)

Reference
---------
Probabilistic Graphical Models, Principles and Techniques,
Daphne Koller and Nir Friedman, Section 14.2, Chapter 14.

"""
def __init__(self, variables, K, h, g):
"""
Parameters
----------
variables: list or array-like
The variables for wich the distribution is defined.

K: n x n, 2-d array-like

h : n x 1, array-like

g : int, float

pdf: function
The probability density function of the distribution.

The terms K, h and g are defined parameters for canonical
factors representation.

Examples
--------
>>> from pgmpy.factors.continuous import CanonicalFactor
>>> phi = CanonicalFactor(['X', 'Y'], np.array([[1, -1], [-1, 1]]),
np.array([[1], [-1]]), -3)
>>> phi.variables
['X', 'Y']

>>> phi.K
array([[1, -1],
[-1, 1]])

>>> phi.h
array([[1],
[-1]])

>>> phi.g
-3

"""
no_of_var = len(variables)

if len(h) != no_of_var:
raise ValueError("Length of h parameter vector must be equal to the"
"number of variables.")

self.h = np.asarray(np.reshape(h, (no_of_var, 1)), dtype=float)
self.g = g
self.K = np.asarray(K, dtype=float)

if self.K.shape != (no_of_var, no_of_var):
raise ValueError("The K matrix should be a square matrix with order equal to"
"the number of variables. Got: {got_shape}, Expected: {exp_shape}".format
(got_shape=self.K.shape, exp_shape=(no_of_var, no_of_var)))

super(CanonicalFactor, self).__init__(variables, None)

@property
def pdf(self):
def fun(*args):
x = np.array(args)
return np.exp(self.g + np.dot(x, self.h)[0] - 0.5 * np.dot(x.T, np.dot(self.K, x)))
return fun

[docs]    def copy(self):
"""
Makes a copy of the factor.

Returns
-------
CanonicalFactor object: Copy of the factor

Examples
--------
>>> from pgmpy.factors.continuous import CanonicalFactor
>>> phi = CanonicalFactor(['X', 'Y'], np.array([[1, -1], [-1, 1]]),
np.array([[1], [-1]]), -3)
>>> phi.variables
['X', 'Y']

>>> phi.K
array([[1, -1],
[-1, 1]])

>>> phi.h
array([[1],
[-1]])

>>> phi.g
-3

>>> phi2 = phi.copy()

>>> phi2.variables
['X', 'Y']

>>> phi2.K
array([[1, -1],
[-1, 1]])

>>> phi2.h
array([[1],
[-1]])

>>> phi2.g
-3

"""
copy_factor = CanonicalFactor(self.scope(), self.K.copy(),
self.h.copy(), self.g)

return copy_factor

[docs]    def to_joint_gaussian(self):
"""
Return an equivalent Joint Gaussian Distribution.

Examples
--------

>>> import numpy as np
>>> from pgmpy.factors.continuous import CanonicalFactor
>>> phi = CanonicalFactor(['x1', 'x2'], np.array([[3, -2], [-2, 4]]),
np.array([[5], [-1]]), 1)
>>> jgd = phi.to_joint_gaussian()
>>> jgd.variables
['x1', 'x2']
>>> jgd.covariance
array([[ 0.5  ,  0.25 ],
[ 0.25 ,  0.375]])
>>> jgd.mean
array([[ 2.25 ],
[ 0.875]])

"""
covariance = np.linalg.inv(self.K)
mean = np.dot(covariance, self.h)

return JointGaussianDistribution(self.scope(), mean, covariance)

[docs]    def reduce(self, values, inplace=True):
"""
Reduces the distribution to the context of the given variable values.

Let C(X,Y ; K, h, g) be some canonical form over X,Y where,

k = [[K_XX, K_XY],        ;       h = [[h_X],
[K_YX, K_YY]]                     [h_Y]]

The formula for the obtained conditional distribution for setting
Y = y is given by,

.. math:: K' = K_{XX}
.. math:: h' = h_X - K_{XY} * y
.. math:: g' = g + {h^T}_Y * y - 0.5 * y^T * K_{YY} * y

Parameters
----------
values: list, array-like
A list of tuples of the form (variable name, variable value).

inplace: boolean
If inplace=True it will modify the factor itself, else would return
a new CaninicalFactor object.

Returns
-------
CanonicalFactor or None:
if inplace=True (default) returns None
if inplace=False returns a new CanonicalFactor instance.

Examples
--------
>>> import numpy as np
>>> from pgmpy.factors.continuous import CanonicalFactor
>>> phi = CanonicalFactor(['X1', 'X2', 'X3'],
...                       np.array([[1, -1, 0], [-1, 4, -2], [0, -2, 4]]),
...                       np.array([[1], [4], [-1]]), -2)
>>> phi.variables
['X1', 'X2', 'X3']

>>> phi.K
array([[ 1., -1.],
[-1.,  3.]])

>>> phi.h
array([[ 1. ],
[ 3.5]])

>>> phi.g
-2

>>> phi.reduce([('X3', 0.25)])

>>> phi.variables
['X1', 'X2']

>>> phi.K
array([[ 1, -1],
[-1,  4]])

>>> phi.h
array([[ 1. ],
[ 4.5]])

>>> phi.g
-2.375
"""
if not isinstance(values, (list, tuple, np.ndarray)):
raise TypeError("variables: Expected type list or array-like, "
"got type {var_type}".format(var_type=type(values)))

if not all([var in self.variables for var, value in values]):
raise ValueError("Variable not in scope.")

phi = self if inplace else self.copy()

var_to_reduce = [var for var, value in values]

# index_to_keep -> j vector
index_to_keep = [self.variables.index(var) for var in self.variables
if var not in var_to_reduce]
# index_to_reduce -> i vector
index_to_reduce = [self.variables.index(var) for var in var_to_reduce]

K_i_i = self.K[np.ix_(index_to_keep, index_to_keep)]
K_i_j = self.K[np.ix_(index_to_keep, index_to_reduce)]
K_j_j = self.K[np.ix_(index_to_reduce, index_to_reduce)]
h_i = self.h[index_to_keep]
h_j = self.h[index_to_reduce]

# The values for the reduced variables.
y = np.array([value for index,
value in sorted([(self.variables.index(var), value) for var,
value in values])]).reshape(len(index_to_reduce), 1)

phi.variables = [self.variables[index] for index in index_to_keep]
phi.K = K_i_i
phi.h = h_i - np.dot(K_i_j, y)
phi.g = self.g + (np.dot(h_j.T, y) - (0.5 * np.dot(np.dot(y.T, K_j_j), y)))[0][0]

if not inplace:
return phi

[docs]    def marginalize(self, variables, inplace=True):
u"""
Modifies the factor with marginalized values.

Let C(X,Y ; K, h, g) be some canonical form over X,Y where,

k = [[K_XX, K_XY],        ;       h = [[h_X],
[K_YX, K_YY]]                     [h_Y]]

In this case, the result of the integration operation is a canonical
from C (K', h', g') given by,

.. math:: K' = K_{XX} - K_{XY} * {K^{-1}}_{YY} * K_YX
.. math:: h' = h_X - K_{XY} * {K^{-1}}_{YY} * h_Y
.. math:: g' = g + 0.5 * (|Y| * log(2*pi) - log(|K_{YY}|) + {h^T}_Y * K_{YY} * h_Y)

Parameters
----------

variables: list or array-like
List of variables over which to marginalize.

inplace: boolean
If inplace=True it will modify the distribution itself,
else would return a new distribution.

Returns
-------
CanonicalFactor or None :
if inplace=True (default) returns None
if inplace=False return a new CanonicalFactor instance

Examples
--------
>>> import numpy as np
>>> from pgmpy.factors.continuous import CanonicalFactor
>>> phi = CanonicalFactor(['X1', 'X2', 'X3'],
...                       np.array([[1, -1, 0], [-1, 4, -2], [0, -2, 4]]),
...                       np.array([[1], [4], [-1]]), -2)
>>> phi.K
array([[ 1, -1,  0],
[-1,  4, -2],
[ 0, -2,  4]])

>>> phi.h
array([[ 1],
[ 4],
[-1]])

>>> phi.g
-2

>>> phi.marginalize(['X3'])

>>> phi.K
array([[ 1., -1.],
[-1.,  3.]])

>>> phi.h
array([[ 1. ],
[ 3.5]])

>>> phi.g
0.22579135
"""
if not isinstance(variables, (list, tuple, np.ndarray)):
raise TypeError("variables: Expected type list or array-like, "
"got type {var_type}".format(var_type=type(variables)))

if not all([var in self.variables for var in variables]):
raise ValueError("Variable not in scope.")

phi = self if inplace else self.copy()

# index_to_keep -> i vector
index_to_keep = [self.variables.index(var) for var in self.variables
if var not in variables]
# index_to_marginalize -> j vector
index_to_marginalize = [self.variables.index(var) for var in variables]

K_i_i = self.K[np.ix_(index_to_keep, index_to_keep)]
K_i_j = self.K[np.ix_(index_to_keep, index_to_marginalize)]
K_j_i = self.K[np.ix_(index_to_marginalize, index_to_keep)]
K_j_j = self.K[np.ix_(index_to_marginalize, index_to_marginalize)]
K_j_j_inv = np.linalg.inv(K_j_j)
h_i = self.h[index_to_keep]
h_j = self.h[index_to_marginalize]

phi.variables = [self.variables[index] for index in index_to_keep]

phi.K = K_i_i - np.dot(np.dot(K_i_j, K_j_j_inv), K_j_i)
phi.h = h_i - np.dot(np.dot(K_i_j, K_j_j_inv), h_j)
phi.g = self.g + 0.5 * (len(variables) * np.log(2 * np.pi) -
np.log(abs(np.linalg.det(K_j_j))) + np.dot(np.dot(h_j.T, K_j_j), h_j))[0][0]

if not inplace:
return phi

def _operate(self, other, operation, inplace=True):
"""
Gives the CanonicalFactor operation (product or divide) with
the other factor.

The product of two canonical factors over the same scope
X is simply:

C(K1, h1, g1) * C(K2, h2, g2) = C(K1+K2, h1+h2, g1+g2)

The division of canonical forms is defined analogously:

C(K1, h1, g1) / C(K2, h2, g2) = C(K1-K2, h1-h2, g1- g2)

When we have two canonical factors over different scopes X and Y,
we simply extend the scope of both to make their scopes match and
then perform the operation of the above equation. The extension of
the scope is performed by simply adding zero entries to both the K
matrices and the h vectors.

Parameters
----------
other: CanonicalFactor
The CanonicalFactor to be multiplied.

operation: String
'product' for multiplication operation and
'divide' for division operation.

Returns
-------
CanonicalFactor or None:
if inplace=True (default) returns None
if inplace=False returns a new CanonicalFactor instance.

Example
-------
>>> import numpy as np
>>> from pgmpy.factors.continuous import CanonicalFactor
>>> phi1 = CanonicalFactor(['x1', 'x2', 'x3'],
np.array([[1, -1, 0], [-1, 4, -2], [0, -2, 4]]),
np.array([[1], [4], [-1]]), -2)
>>> phi2 = CanonicalFactor(['x1', 'x2'], np.array([[3, -2], [-2, 4]]),
np.array([[5], [-1]]), 1)

>>> phi3 = phi1 * phi2
>>> phi3.K
array([[ 4., -3.,  0.],
[-3.,  8., -2.],
[ 0., -2.,  4.]])
>>> phi3.h
array([ 6.,  3., -1.])
>>> phi3.g
-1

>>> phi4 = phi1 / phi2
>>> phi4.K
array([[-2.,  1.,  0.],
[ 1.,  0., -2.],
[ 0., -2.,  4.]])
>>> phi4.h
array([-4.,  5., -1.])
>>> phi4.g
-3

"""
if not isinstance(other, CanonicalFactor):
raise TypeError("CanonicalFactor object can only be multiplied or divided with "
"an another CanonicalFactor object. Got {other_type}, expected "
"CanonicalFactor.".format(other_type=type(other)))

phi = self if inplace else self.copy()

all_vars = self.variables + [var for var in other.variables if var not in self.variables]
no_of_var = len(all_vars)

self_var_index = [all_vars.index(var) for var in self.variables]
other_var_index = [all_vars.index(var) for var in other.variables]

def _extend_K_scope(K, index):
ext_K = np.zeros([no_of_var, no_of_var])
ext_K[np.ix_(index, index)] = K
return ext_K

def _extend_h_scope(h, index):
ext_h = np.zeros(no_of_var).reshape(no_of_var, 1)
ext_h[index] = h
return ext_h

phi.variables = all_vars

if operation == 'product':
phi.K = _extend_K_scope(self.K, self_var_index) + _extend_K_scope(other.K, other_var_index)
phi.h = _extend_h_scope(self.h, self_var_index) + _extend_h_scope(other.h, other_var_index)
phi.g = self.g + other.g

else:
phi.K = _extend_K_scope(self.K, self_var_index) - _extend_K_scope(other.K, other_var_index)
phi.h = _extend_h_scope(self.h, self_var_index) - _extend_h_scope(other.h, other_var_index)
phi.g = self.g - other.g

if not inplace:
return phi
```