#!/usr/bin/env python3
"""Contains the different formats of CPDs used in PGM"""
import csv
import numbers
from itertools import chain, product
from shutil import get_terminal_size
from warnings import warn
import numpy as np
import torch
from pgmpy import config
from pgmpy.extern import tabulate
from pgmpy.factors.discrete import DiscreteFactor
from pgmpy.utils import compat_fns
[docs]
class TabularCPD(DiscreteFactor):
"""
Defines the conditional probability distribution table (CPD table)
Parameters
----------
variable: int, string (any hashable python object)
The variable whose CPD is defined.
variable_card: integer
Cardinality/no. of states of `variable`
values: 2D array, 2D list or 2D tuple
Values for the CPD table. Please refer the example for the
exact format needed.
evidence: array-like
List of variables in evidences(if any) w.r.t. which CPD is defined.
evidence_card: array-like
cardinality/no. of states of variables in `evidence`(if any)
Examples
--------
For a distribution of P(grade|diff, intel)
+---------+-------------------------+------------------------+
|diff: | easy | hard |
+---------+------+--------+---------+------+--------+--------+
|aptitude:| low | medium | high | low | medium | high |
+---------+------+--------+---------+------+--------+--------+
|gradeA | 0.1 | 0.1 | 0.1 | 0.1 | 0.1 | 0.1 |
+---------+------+--------+---------+------+--------+--------+
|gradeB | 0.1 | 0.1 | 0.1 | 0.1 | 0.1 | 0.1 |
+---------+------+--------+---------+------+--------+--------+
|gradeC | 0.8 | 0.8 | 0.8 | 0.8 | 0.8 | 0.8 |
+---------+------+--------+---------+------+--------+--------+
values should be
[[0.1,0.1,0.1,0.1,0.1,0.1],
[0.1,0.1,0.1,0.1,0.1,0.1],
[0.8,0.8,0.8,0.8,0.8,0.8]]
>>> cpd = TabularCPD('grade',3,[[0.1,0.1,0.1,0.1,0.1,0.1],
... [0.1,0.1,0.1,0.1,0.1,0.1],
... [0.8,0.8,0.8,0.8,0.8,0.8]],
... evidence=['diff', 'intel'], evidence_card=[2,3])
>>> print(cpd)
+---------+---------+---------+---------+---------+---------+---------+
| diff | diff_0 | diff_0 | diff_0 | diff_1 | diff_1 | diff_1 |
+---------+---------+---------+---------+---------+---------+---------+
| intel | intel_0 | intel_1 | intel_2 | intel_0 | intel_1 | intel_2 |
+---------+---------+---------+---------+---------+---------+---------+
| grade_0 | 0.1 | 0.1 | 0.1 | 0.1 | 0.1 | 0.1 |
+---------+---------+---------+---------+---------+---------+---------+
| grade_1 | 0.1 | 0.1 | 0.1 | 0.1 | 0.1 | 0.1 |
+---------+---------+---------+---------+---------+---------+---------+
| grade_2 | 0.8 | 0.8 | 0.8 | 0.8 | 0.8 | 0.8 |
+---------+---------+---------+---------+---------+---------+---------+
>>> cpd.values
array([[[ 0.1, 0.1, 0.1],
[ 0.1, 0.1, 0.1]],
[[ 0.1, 0.1, 0.1],
[ 0.1, 0.1, 0.1]],
[[ 0.8, 0.8, 0.8],
[ 0.8, 0.8, 0.8]]])
>>> cpd.variables
['grade', 'diff', 'intel']
>>> cpd.cardinality
array([3, 2, 3])
>>> cpd.variable
'grade'
>>> cpd.variable_card
3
"""
def __init__(
self,
variable,
variable_card,
values,
evidence=None,
evidence_card=None,
state_names={},
):
self.variable = variable
self.variable_card = None
variables = [variable]
if not isinstance(variable_card, numbers.Integral):
raise TypeError("Event cardinality must be an integer")
self.variable_card = variable_card
cardinality = [variable_card]
if evidence_card is not None:
if isinstance(evidence_card, numbers.Real):
raise TypeError("Evidence card must be a list of numbers")
cardinality.extend(evidence_card)
if evidence is not None:
if isinstance(evidence, str):
raise TypeError("Evidence must be list, tuple or array of strings.")
variables.extend(evidence)
if not len(evidence_card) == len(evidence):
raise ValueError(
"Length of evidence_card doesn't match length of evidence"
)
if config.BACKEND == "numpy":
values = np.array(values, dtype=config.get_dtype())
else:
values = (
torch.Tensor(values).type(config.get_dtype()).to(config.get_device())
)
if values.ndim != 2:
raise TypeError("Values must be a 2D list/array")
if evidence is None:
expected_cpd_shape = (variable_card, 1)
else:
expected_cpd_shape = (variable_card, np.prod(evidence_card))
if values.shape != expected_cpd_shape:
raise ValueError(
f"values must be of shape {expected_cpd_shape}. Got shape: {values.shape}"
)
if not isinstance(state_names, dict):
raise ValueError(
f"state_names must be of type dict. Got {type(state_names)}"
)
super(TabularCPD, self).__init__(
variables, cardinality, values.flatten(), state_names=state_names
)
def __repr__(self):
var_str = f"<TabularCPD representing P({self.variable}:{self.variable_card}"
evidence = self.variables[1:]
evidence_card = self.cardinality[1:]
if evidence:
evidence_str = " | " + ", ".join(
[f"{var}:{card}" for var, card in zip(evidence, evidence_card)]
)
else:
evidence_str = ""
return var_str + evidence_str + f") at {hex(id(self))}>"
[docs]
def get_values(self):
"""
Returns the values of the CPD as a 2-D array. The order of the
parents is the same as provided in evidence.
Examples
--------
>>> from pgmpy.factors.discrete import TabularCPD
>>> cpd = TabularCPD('grade', 3, [[0.1, 0.1],
... [0.1, 0.1],
... [0.8, 0.8]],
... evidence='evi1', evidence_card=2)
>>> cpd.get_values()
array([[ 0.1, 0.1],
[ 0.1, 0.1],
[ 0.8, 0.8]])
"""
if self.variable in self.variables:
return self.values.reshape(
tuple([self.cardinality[0], np.prod(self.cardinality[1:])])
)
else:
return self.values.reshape(tuple([np.prod(self.cardinality), 1]))
def __str__(self):
return self._make_table_str(tablefmt="grid")
def _str(self, phi_or_p="p", tablefmt="fancy_grid"):
return super(self, TabularCPD)._str(phi_or_p, tablefmt)
def _make_table_str(
self, tablefmt="fancy_grid", print_state_names=True, return_list=False
):
headers_list = []
# Build column headers
evidence = self.variables[1:]
evidence_card = self.cardinality[1:]
if evidence:
col_indexes = np.array(list(product(*[range(i) for i in evidence_card])))
if self.state_names and print_state_names:
for i in range(len(evidence_card)):
column_header = [str(evidence[i])] + [
"{var}({state})".format(
var=evidence[i], state=self.state_names[evidence[i]][d]
)
for d in col_indexes.T[i]
]
headers_list.append(column_header)
else:
for i in range(len(evidence_card)):
column_header = [str(evidence[i])] + [
f"{evidence[i]}_{d}" for d in col_indexes.T[i]
]
headers_list.append(column_header)
# Build row headers
if self.state_names and print_state_names:
variable_array = [
[
"{var}({state})".format(
var=self.variable, state=self.state_names[self.variable][i]
)
for i in range(self.variable_card)
]
]
else:
variable_array = [
[f"{self.variable}_{i}" for i in range(self.variable_card)]
]
# Stack with data
labeled_rows = np.hstack(
(np.array(variable_array).T, compat_fns.to_numpy(self.get_values()))
).tolist()
if return_list:
return headers_list + labeled_rows
# No support for multi-headers in tabulate
cdf_str = tabulate(headers_list + labeled_rows, tablefmt=tablefmt)
cdf_str = self._truncate_strtable(cdf_str)
return cdf_str
def _truncate_strtable(self, cdf_str):
terminal_width, terminal_height = get_terminal_size()
list_rows_str = cdf_str.split("\n")
table_width, table_height = len(list_rows_str[0]), len(list_rows_str)
colstr_i = np.array(
[pos for pos, char in enumerate(list_rows_str[0]) if char == "+"]
)
if table_width > terminal_width:
half_width = terminal_width // 2 - 3
left_i = colstr_i[colstr_i < half_width][-1]
right_i = colstr_i[(table_width - colstr_i) < half_width][0]
new_cdf_str = []
for temp_row_str in list_rows_str:
left = temp_row_str[: left_i + 1]
right = temp_row_str[right_i:]
if temp_row_str[left_i] == "+":
joiner = "-----"
else:
joiner = " ... "
new_cdf_str.append(left + joiner + right)
cdf_str = "\n".join(new_cdf_str)
# TODO: vertical limiter
# if table_height > terminal_height:
# half_height = terminal_height // 2
return cdf_str
[docs]
def to_csv(self, filename):
"""
Exports the CPD to a CSV file.
Examples
--------
>>> from pgmpy.utils import get_example_model
>>> model = get_example_model("alarm")
>>> cpd = model.get_cpds("SAO2")
>>> cpd.to_csv(filename="sao2.cs")
"""
with open(filename, "w") as f:
writer = csv.writer(f)
writer.writerows(self._make_table_str(tablefmt="grid", return_list=True))
[docs]
def copy(self):
"""
Returns a copy of the `TabularCPD` object.
Examples
--------
>>> from pgmpy.factors.discrete import TabularCPD
>>> cpd = TabularCPD('grade', 2,
... [[0.7, 0.6, 0.6, 0.2],[0.3, 0.4, 0.4, 0.8]],
... ['intel', 'diff'], [2, 2])
>>> copy = cpd.copy()
>>> copy.variable
'grade'
>>> copy.variable_card
2
>>> copy.evidence
['intel', 'diff']
>>> copy.values
array([[[ 0.7, 0.6],
[ 0.6, 0.2]],
[[ 0.3, 0.4],
[ 0.4, 0.8]]])
"""
evidence = self.variables[1:] if len(self.variables) > 1 else None
evidence_card = self.cardinality[1:] if len(self.variables) > 1 else None
return TabularCPD(
self.variable,
self.variable_card,
compat_fns.copy(self.get_values()),
evidence,
evidence_card,
state_names=self.state_names.copy(),
)
[docs]
def normalize(self, inplace=True):
"""
Normalizes the cpd table. The method modifies each column of values such
that it sums to 1 without changing the proportion between states.
Parameters
----------
inplace: boolean
If inplace=True it will modify the CPD itself, else would return
a new CPD
Examples
--------
>>> from pgmpy.factors.discrete import TabularCPD
>>> cpd_table = TabularCPD('grade', 2,
... [[0.7, 0.2, 0.6, 0.2],[0.4, 0.4, 0.4, 0.8]],
... ['intel', 'diff'], [2, 2])
>>> cpd_table.normalize()
>>> cpd_table.get_values()
array([[ 0.63636364, 0.33333333, 0.6 , 0.2 ],
[ 0.36363636, 0.66666667, 0.4 , 0.8 ]])
"""
tabular_cpd = self if inplace else self.copy()
cpd = tabular_cpd.get_values()
tabular_cpd.values = (cpd / cpd.sum(axis=0)).reshape(
tuple(tabular_cpd.cardinality)
)
if not inplace:
return tabular_cpd
[docs]
def marginalize(self, variables, inplace=True):
"""
Modifies the CPD table with marginalized values. Marginalization refers to
summing out variables, hence that variable would no longer appear in the
CPD.
Parameters
----------
variables: list, array-like
list of variable to be marginalized
inplace: boolean
If inplace=True it will modify the CPD itself, else would return
a new CPD
Examples
--------
>>> from pgmpy.factors.discrete import TabularCPD
>>> cpd_table = TabularCPD('grade', 2,
... [[0.7, 0.6, 0.6, 0.2],[0.3, 0.4, 0.4, 0.8]],
... ['intel', 'diff'], [2, 2])
>>> cpd_table.marginalize(['diff'])
>>> cpd_table.get_values()
array([[ 0.65, 0.4 ],
[ 0.35, 0.6 ]])
"""
if self.variable in variables:
raise ValueError(
"Marginalization not allowed on the variable on which CPD is defined"
)
tabular_cpd = self if inplace else self.copy()
super(TabularCPD, tabular_cpd).marginalize(variables)
tabular_cpd.normalize()
if not inplace:
return tabular_cpd
[docs]
def reduce(self, values, inplace=True, show_warnings=True):
"""
Reduces the cpd table to the context of given variable values. Reduce fixes the
state of given variable to specified value. The reduced variables will no longer
appear in the CPD.
Parameters
----------
values: list, array-like
A list of tuples of the form (variable_name, variable_state).
inplace: boolean
If inplace=True it will modify the factor itself, else would return
a new factor.
Examples
--------
>>> from pgmpy.factors.discrete import TabularCPD
>>> cpd_table = TabularCPD('grade', 2,
... [[0.7, 0.6, 0.6, 0.2],[0.3, 0.4, 0.4, 0.8]],
... ['intel', 'diff'], [2, 2])
>>> cpd_table.reduce([('diff', 0)])
>>> cpd_table.get_values()
array([[ 0.7, 0.6],
[ 0.3, 0.4]])
"""
if self.variable in (value[0] for value in values):
raise ValueError(
"Reduce not allowed on the variable on which CPD is defined"
)
tabular_cpd = self if inplace else self.copy()
super(TabularCPD, tabular_cpd).reduce(values, show_warnings=show_warnings)
tabular_cpd.normalize()
if not inplace:
return tabular_cpd
[docs]
def to_factor(self):
"""
Returns an equivalent factor with the same variables, cardinality, values as that of the CPD.
Since factor doesn't distinguish between conditional and non-conditional distributions,
evidence information will be lost.
Examples
--------
>>> from pgmpy.factors.discrete import TabularCPD
>>> cpd = TabularCPD('grade', 3, [[0.1, 0.1],
... [0.1, 0.1],
... [0.8, 0.8]],
... evidence='evi1', evidence_card=2)
>>> factor = cpd.to_factor()
>>> factor
<DiscreteFactor representing phi(grade:3, evi1:2) at 0x7f847a4f2d68>
"""
factor = DiscreteFactor.__new__(DiscreteFactor)
factor.variables = self.variables.copy()
factor.cardinality = self.cardinality.copy()
factor.values = compat_fns.copy(self.values)
factor.state_names = self.state_names.copy()
factor.name_to_no = self.name_to_no.copy()
factor.no_to_name = self.no_to_name.copy()
return factor
[docs]
def reorder_parents(self, new_order, inplace=True):
"""
Returns a new cpd table according to provided parent/evidence order.
Parameters
----------
new_order: list
list of new ordering of variables
inplace: boolean
If inplace == True it will modify the CPD itself
otherwise new value will be returned without affecting old values
Examples
--------
Consider a CPD P(grade| diff, intel)
>>> cpd = TabularCPD('grade',3,[[0.1,0.1,0.0,0.4,0.2,0.1],
... [0.3,0.2,0.1,0.4,0.3,0.2],
... [0.6,0.7,0.9,0.2,0.5,0.7]],
... evidence=['diff', 'intel'], evidence_card=[2,3])
>>> print(cpd)
+----------+----------+----------+----------+----------+----------+----------+
| diff | diff(0) | diff(0) | diff(0) | diff(1) | diff(1) | diff(1) |
+----------+----------+----------+----------+----------+----------+----------+
| intel | intel(0) | intel(1) | intel(2) | intel(0) | intel(1) | intel(2) |
+----------+----------+----------+----------+----------+----------+----------+
| grade(0) | 0.1 | 0.1 | 0.0 | 0.4 | 0.2 | 0.1 |
+----------+----------+----------+----------+----------+----------+----------+
| grade(1) | 0.3 | 0.2 | 0.1 | 0.4 | 0.3 | 0.2 |
+----------+----------+----------+----------+----------+----------+----------+
| grade(2) | 0.6 | 0.7 | 0.9 | 0.2 | 0.5 | 0.7 |
+----------+----------+----------+----------+----------+----------+----------+
>>> cpd.values
array([[[ 0.1, 0.1, 0. ],
[ 0.4, 0.2, 0.1]],
[[ 0.3, 0.2, 0.1],
[ 0.4, 0.3, 0.2]],
[[ 0.6, 0.7, 0.9],
[ 0.2, 0.5, 0.7]]])
>>> cpd.variables
['grade', 'diff', 'intel']
>>> cpd.cardinality
array([3, 2, 3])
>>> cpd.variable
'grade'
>>> cpd.variable_card
3
>>> cpd.reorder_parents(['intel', 'diff'])
array([[0.1, 0.4, 0.1, 0.2, 0. , 0.1],
[0.3, 0.4, 0.2, 0.3, 0.1, 0.2],
[0.6, 0.2, 0.7, 0.5, 0.9, 0.7]])
>>> print(cpd)
+----------+----------+----------+----------+----------+----------+----------+
| intel | intel(0) | intel(0) | intel(1) | intel(1) | intel(2) | intel(2) |
+----------+----------+----------+----------+----------+----------+----------+
| diff | diff(0) | diff(1) | diff(0) | diff(1) | diff(0) | diff(1) |
+----------+----------+----------+----------+----------+----------+----------+
| grade(0) | 0.1 | 0.4 | 0.1 | 0.2 | 0.0 | 0.1 |
+----------+----------+----------+----------+----------+----------+----------+
| grade(1) | 0.3 | 0.4 | 0.2 | 0.3 | 0.1 | 0.2 |
+----------+----------+----------+----------+----------+----------+----------+
| grade(2) | 0.6 | 0.2 | 0.7 | 0.5 | 0.9 | 0.7 |
+----------+----------+----------+----------+----------+----------+----------+
>>> cpd.values
array([[[0.1, 0.4],
[0.1, 0.2],
[0. , 0.1]],
[[0.3, 0.4],
[0.2, 0.3],
[0.1, 0.2]],
[[0.6, 0.2],
[0.7, 0.5],
[0.9, 0.7]]])
>>> cpd.variables
['grade', 'intel', 'diff']
>>> cpd.cardinality
array([3, 3, 2])
>>> cpd.variable
'grade'
>>> cpd.variable_card
3
"""
if (
len(self.variables) <= 1
or (set(new_order) - set(self.variables))
or (set(self.variables[1:]) - set(new_order))
):
raise ValueError("New order either has missing or extra arguments")
else:
if new_order != self.variables[1:]:
evidence = self.variables[1:]
evidence_card = self.cardinality[1:]
card_map = dict(zip(evidence, evidence_card))
old_pos_map = dict(zip(evidence, range(len(evidence))))
trans_ord = [0] + [(old_pos_map[letter] + 1) for letter in new_order]
new_values = compat_fns.transpose(self.values, tuple(trans_ord))
if inplace:
variables = [self.variables[0]] + new_order
cardinality = [self.variable_card] + [
card_map[var] for var in new_order
]
super(TabularCPD, self).__init__(
variables, cardinality, new_values.flatten()
)
return self.get_values()
else:
return new_values.reshape(
tuple(
[
self.cardinality[0],
np.prod([card_map[var] for var in new_order]),
]
)
)
else:
warn("Same ordering provided as current")
return self.get_values()
[docs]
def get_evidence(self):
"""
Returns the evidence variables of the CPD.
"""
return self.variables[:0:-1]
[docs]
@staticmethod
def get_random(variable, evidence=None, cardinality=None, state_names={}, seed=42):
"""
Generates a TabularCPD instance with random values on `variable` with
parents/evidence `evidence` with cardinality/number of states as given
in `cardinality`.
Parameters
----------
variable: str, int or any hashable python object.
The variable on which to define the TabularCPD.
evidence: list, array-like
A list of variable names which are the parents/evidence of `variable`.
cardinality: dict (default: None)
A dict of the form {var_name: card} specifying the number of states/
cardinality of each of the variables. If None, assigns each variable
2 states.
state_names: dict (default: {})
A dict of the form {var_name: list of states} to specify the state names
for the variables in the CPD. If state_names=None, integral state names
starting from 0 is assigned.
Returns
-------
Random CPD: pgmpy.factors.discrete.TabularCPD
A TabularCPD object on `variable` with `evidence` as evidence with random values.
Examples
--------
>>> from pgmpy.factors.discrete import TabularCPD
>>> TabularCPD(variable='A', evidence=['C', 'D'],
... cardinality={'A': 3, 'B': 2, 'C': 4})
<TabularCPD representing P(A:3 | C:4, B:2) at 0x7f95e22b8040>
>>> TabularCPD(variable='A', evidence=['C', 'D'],
... cardinality={'A': 2, 'B': 2, 'C': 2},
... state_names={'A': ['a1', 'a2'],
... 'B': ['b1', 'b2'],
... 'C': ['c1', 'c2']})
"""
generator = np.random.default_rng(seed=seed)
if evidence is None:
evidence = []
if cardinality is None:
cardinality = {var: 2 for var in chain([variable], evidence)}
else:
for var in chain([variable], evidence):
if var not in cardinality.keys():
raise ValueError(f"Cardinality for variable: {var} not specified.")
if len(evidence) == 0:
values = generator.random((cardinality[variable], 1))
values = values / np.sum(values, axis=0)
node_cpd = TabularCPD(
variable=variable,
variable_card=cardinality[variable],
values=values,
state_names=state_names,
)
else:
parent_card = [cardinality[var] for var in evidence]
values = generator.random((cardinality[variable], np.prod(parent_card)))
values = values / np.sum(values, axis=0)
node_cpd = TabularCPD(
variable=variable,
variable_card=cardinality[variable],
values=values,
evidence=evidence,
evidence_card=parent_card,
state_names=state_names,
)
return node_cpd