import collections
import re
from copy import copy
from itertools import product
from string import Template
import numpy as np
import pyparsing as pp
from joblib import Parallel, delayed
from pyparsing import (
CharsNotIn,
Group,
OneOrMore,
Optional,
Suppress,
Word,
ZeroOrMore,
alphanums,
cppStyleComment,
nums,
printables,
)
from pgmpy.factors.discrete import TabularCPD
from pgmpy.models import BayesianNetwork
from pgmpy.utils import compat_fns
[docs]class BIFReader(object):
"""
Initializes a BIFReader object.
Parameters
----------
path : file or str
File of bif data
string : str
String of bif data
include_properties: boolean
If True, gets the properties tag from the file and stores in graph properties.
n_jobs: int (default: -1)
Number of jobs to run in parallel. `-1` means use all processors.
Examples
--------
# dog-problem.bif file is present at
# http://www.cs.cmu.edu/~javabayes/Examples/DogProblem/dog-problem.bif
>>> from pgmpy.readwrite import BIFReader
>>> reader = BIFReader("bif_test.bif")
<pgmpy.readwrite.BIF.BIFReader object at 0x7f2375621cf8>
>>> model = reader.get_model()
Reference
---------
[1] Geoff Hulten and Pedro Domingos. The interchange format for bayesian networks.
http://www.cs.washington.edu/dm/vfml/appendixes/bif.htm, 2003.
"""
def __init__(self, path=None, string=None, include_properties=False, n_jobs=-1):
if path:
with open(path, "r") as network:
self.network = network.read()
elif string:
self.network = string
else:
raise ValueError("Must specify either path or string")
self.n_jobs = n_jobs
self.include_properties = include_properties
if '"' in self.network:
# Replacing quotes by spaces to remove case sensitivity like:
# "Dog-Problem" and Dog-problem
# or "true""false" and "true" "false" and true false
self.network = self.network.replace('"', " ")
if "/*" in self.network or "//" in self.network:
self.network = cppStyleComment.suppress().transformString(
self.network
) # removing comments from the file
(
self.name_expr,
self.state_expr,
self.property_expr,
) = self.get_variable_grammar()
self.probability_expr, self.cpd_expr = self.get_probability_grammar()
self.network_name = self.get_network_name()
self.variable_names = self.get_variables()
self.variable_states = self.get_states()
if self.include_properties:
self.variable_properties = self.get_property()
self.variable_parents = self.get_parents()
self.variable_cpds = self.get_values()
self.variable_edges = self.get_edges()
[docs] def get_variable_grammar(self):
"""
A method that returns variable grammar
"""
# Defining an expression for valid word
word_expr = Word(pp.unicode.alphanums + "_" + "-")
word_expr2 = Word(initChars=printables, excludeChars=["{", "}", ",", " "])
name_expr = Suppress("variable") + word_expr + Suppress("{")
state_expr = ZeroOrMore(word_expr2 + Optional(Suppress(",")))
# Defining a variable state expression
variable_state_expr = (
Suppress("type")
+ Suppress(word_expr)
+ Suppress("[")
+ Suppress(Word(nums))
+ Suppress("]")
+ Suppress("{")
+ Group(state_expr)
+ Suppress("}")
+ Suppress(";")
)
# variable states is of the form type description [args] { val1, val2 }; (comma may or may not be present)
property_expr = (
Suppress("property") + CharsNotIn(";") + Suppress(";")
) # Creating an expr to find property
return name_expr, variable_state_expr, property_expr
[docs] def get_probability_grammar(self):
"""
A method that returns probability grammar
"""
# Creating valid word expression for probability, it is of the format
# wor1 | var2 , var3 or var1 var2 var3 or simply var
word_expr = (
Word(pp.unicode.alphanums + "-" + "_")
+ Suppress(Optional("|"))
+ Suppress(Optional(","))
)
word_expr2 = Word(
initChars=printables, excludeChars=[",", ")", " ", "("]
) + Suppress(Optional(","))
# creating an expression for valid numbers, of the format
# 1.00 or 1 or 1.00. 0.00 or 9.8e-5 etc
num_expr = Word(nums + "-" + "+" + "e" + "E" + ".") + Suppress(Optional(","))
probability_expr = (
Suppress("probability")
+ Suppress("(")
+ OneOrMore(word_expr)
+ Suppress(")")
)
optional_expr = Suppress("(") + OneOrMore(word_expr2) + Suppress(")")
probab_attributes = optional_expr | Suppress("table") | Suppress("default")
cpd_expr = probab_attributes + OneOrMore(num_expr)
return probability_expr, cpd_expr
def variable_block(self):
start = re.finditer("variable", self.network)
for index in start:
end = self.network.find("}\n", index.start())
yield self.network[index.start() : end]
def probability_block(self):
start = re.finditer("probability", self.network)
for index in start:
end = self.network.find("}\n", index.start())
yield self.network[index.start() : end]
[docs] def get_network_name(self):
"""
Returns the name of the network
Example
---------------
>>> from pgmpy.readwrite import BIFReader
>>> reader = BIF.BifReader("bif_test.bif")
>>> reader.network_name()
'Dog-Problem'
"""
start = self.network.find("network")
end = self.network.find("}\n", start)
# Creating a network attribute
network_attribute = (
Suppress("network") + Word(pp.unicode.alphanums + "_" + "-") + "{"
)
network_name = network_attribute.searchString(self.network[start:end])[0][0]
return network_name
[docs] def get_variables(self):
"""
Returns list of variables of the network
Example
-------------
>>> from pgmpy.readwrite import BIFReader
>>> reader = BIFReader("bif_test.bif")
>>> reader.get_variables()
['light-on','bowel_problem','dog-out','hear-bark','family-out']
"""
variable_names = []
for block in self.variable_block():
name = self.name_expr.searchString(block)[0][0]
variable_names.append(name)
return variable_names
[docs] def get_states(self):
"""
Returns the states of variables present in the network
Example
-----------
>>> from pgmpy.readwrite import BIFReader
>>> reader = BIFReader("bif_test.bif")
>>> reader.get_states()
{'bowel-problem': ['true','false'],
'dog-out': ['true','false'],
'family-out': ['true','false'],
'hear-bark': ['true','false'],
'light-on': ['true','false']}
"""
variable_states = {}
for block in self.variable_block():
name = self.name_expr.searchString(block)[0][0]
variable_states[name] = list(self.state_expr.searchString(block)[0][0])
return variable_states
[docs] def get_property(self):
"""
Returns the property of the variable
Example
-------------
>>> from pgmpy.readwrite import BIFReader
>>> reader = BIFReader("bif_test.bif")
>>> reader.get_property()
{'bowel-problem': ['position = (335, 99)'],
'dog-out': ['position = (300, 195)'],
'family-out': ['position = (257, 99)'],
'hear-bark': ['position = (296, 268)'],
'light-on': ['position = (218, 195)']}
"""
variable_properties = {}
for block in self.variable_block():
name = self.name_expr.searchString(block)[0][0]
properties = self.property_expr.searchString(block)
variable_properties[name] = [y.strip() for x in properties for y in x]
return variable_properties
[docs] def get_parents(self):
"""
Returns the parents of the variables present in the network
Example
--------
>>> from pgmpy.readwrite import BIFReader
>>> reader = BIFReader("bif_test.bif")
>>> reader.get_parents()
{'bowel-problem': [],
'dog-out': ['family-out', 'bowel-problem'],
'family-out': [],
'hear-bark': ['dog-out'],
'light-on': ['family-out']}
"""
variable_parents = {}
for block in self.probability_block():
names = self.probability_expr.searchString(block.split("\n")[0])[0]
variable_parents[names[0]] = names[1:]
return variable_parents
def _get_values_from_block(self, block):
names = self.probability_expr.searchString(block)
var_name, parents = names[0][0], names[0][1:]
cpds = self.cpd_expr.searchString(block)
# Check if the block is a table.
if bool(re.search(".*\n[ ]*(table|default) .*\n.*", block)):
arr = np.array([float(j) for i in cpds for j in i])
arr = arr.reshape(
(
len(self.variable_states[var_name]),
arr.size // len(self.variable_states[var_name]),
)
)
else:
arr_length = np.prod([len(self.variable_states[var]) for var in parents])
arr = np.zeros((len(self.variable_states[var_name]), arr_length))
values_dict = {}
for prob_line in cpds:
states = prob_line[: len(parents)]
vals = [float(i) for i in prob_line[len(parents) :]]
values_dict[tuple(states)] = vals
for index, combination in enumerate(
product(*[self.variable_states[var] for var in parents])
):
arr[:, index] = values_dict[combination]
return var_name, arr
[docs] def get_values(self):
"""
Returns the CPD of the variables present in the network
Example
--------
>>> from pgmpy.readwrite import BIFReader
>>> reader = BIFReader("bif_test.bif")
>>> reader.get_values()
{'bowel-problem': np.array([[0.01],
[0.99]]),
'dog-out': np.array([[0.99, 0.97, 0.9, 0.3],
[0.01, 0.03, 0.1, 0.7]]),
'family-out': np.array([[0.15],
[0.85]]),
'hear-bark': np.array([[0.7, 0.01],
[0.3, 0.99]]),
'light-on': np.array([[0.6, 0.05],
[0.4, 0.95]])}
"""
cpd_values = Parallel(n_jobs=self.n_jobs)(
delayed(self._get_values_from_block)(block)
for block in self.probability_block()
)
variable_cpds = {}
for var_name, arr in cpd_values:
variable_cpds[var_name] = arr
return variable_cpds
[docs] def get_edges(self):
"""
Returns the edges of the network
Example
--------
>>> from pgmpy.readwrite import BIFReader
>>> reader = BIFReader("bif_test.bif")
>>> reader.get_edges()
[['family-out', 'light-on'],
['family-out', 'dog-out'],
['bowel-problem', 'dog-out'],
['dog-out', 'hear-bark']]
"""
edges = [
[value, key]
for key in self.variable_parents.keys()
for value in self.variable_parents[key]
]
return edges
[docs] def get_model(self, state_name_type=str):
"""
Returns the Bayesian Model read from the file/str.
Parameters
----------
state_name_type: int, str or bool (default: str)
The data type to which to convert the state names of the variables.
Example
----------
>>> from pgmpy.readwrite import BIFReader
>>> reader = BIFReader("bif_test.bif")
>>> reader.get_model()
<pgmpy.models.BayesianNetwork.BayesianNetwork object at 0x7f20af154320>
"""
try:
model = BayesianNetwork()
model.add_nodes_from(self.variable_names)
model.add_edges_from(self.variable_edges)
model.name = self.network_name
tabular_cpds = []
for var in sorted(self.variable_cpds.keys()):
values = self.variable_cpds[var]
sn = {
p_var: list(map(state_name_type, self.variable_states[p_var]))
for p_var in self.variable_parents[var]
}
sn[var] = list(map(state_name_type, self.variable_states[var]))
cpd = TabularCPD(
var,
len(self.variable_states[var]),
values,
evidence=self.variable_parents[var],
evidence_card=[
len(self.variable_states[evidence_var])
for evidence_var in self.variable_parents[var]
],
state_names=sn,
)
tabular_cpds.append(cpd)
model.add_cpds(*tabular_cpds)
if self.include_properties:
for node, properties in self.variable_properties.items():
for prop in properties:
prop_name, prop_value = map(
lambda t: t.strip(), prop.split("=")
)
model.nodes[node][prop_name] = prop_value
return model
except AttributeError:
raise AttributeError(
"First get states of variables, edges, parents and network name"
)
[docs]class BIFWriter(object):
"""
Initialise a BIFWriter Object
Parameters
----------
model: BayesianNetwork Instance
round_values: int (default: None)
Round the probability values to `round_values` decimals. If None, keeps all decimal points.
Examples
---------
>>> from pgmpy.readwrite import BIFWriter
>>> from pgmpy.utils import get_example_model
>>> asia = get_example_model('asia')
>>> writer = BIFWriter(asia)
>>> writer
<writer_BIF.BIFWriter at 0x7f05e5ea27b8>
>>> writer.write_bif('asia.bif')
"""
def __init__(self, model, round_values=None):
if not isinstance(model, BayesianNetwork):
raise TypeError("model must be an instance of BayesianNetwork")
self.model = model
self.round_values = round_values
if not self.model.name:
self.network_name = "unknown"
else:
self.network_name = self.model.name
self.variable_states = self.get_states()
self.property_tag = self.get_properties()
self.variable_parents = self.get_parents()
self.tables = self.get_cpds()
[docs] def BIF_templates(self):
"""
Create template for writing in BIF format
"""
network_template = Template("network $name {\n}\n")
# property tag may or may not be present in model,and since no of properties
# can be more than one, will replace them according to format otherwise null
variable_template = Template(
"""variable $name {
type discrete [ $no_of_states ] { $states };
$properties}\n"""
)
property_template = Template(" property $prop ;\n")
# $variable_ here is name of variable, used underscore for clarity
probability_template = Template(
"""probability ( $variable_$separator_$parents ) {
table $values ;
}\n"""
)
conditional_probability_template_total = Template(
"""probability ( $variable_$separator_$parents ) {
$values
}\n"""
)
conditional_probability_template = Template(""" ( $state ) $values;\n""")
return (
network_template,
variable_template,
property_template,
probability_template,
conditional_probability_template_total,
conditional_probability_template,
)
def __str__(self):
"""
Returns the BIF format as string
"""
(
network_template,
variable_template,
property_template,
probability_template,
conditional_probability_template_total,
conditional_probability_template,
) = self.BIF_templates()
network = ""
network += network_template.substitute(name=self.network_name)
variables = self.model.nodes()
for var in sorted(variables):
no_of_states = str(len(self.variable_states[var]))
states = ", ".join(self.variable_states[var])
if not self.property_tag[var]:
properties = ""
else:
properties = ""
for prop_val in self.property_tag[var]:
properties += property_template.substitute(prop=prop_val)
network += variable_template.substitute(
name=var,
no_of_states=no_of_states,
states=states,
properties=properties,
)
for var in sorted(variables):
if not self.variable_parents[var]:
parents = ""
separator = ""
cpd = ", ".join(map(str, self.tables[var]))
network += probability_template.substitute(
variable_=var, separator_=separator, parents=parents, values=cpd
)
else:
parents_str = ", ".join(self.variable_parents[var])
separator = " | "
cpd = self.model.get_cpds(var)
cpd_values_transpose = cpd.get_values().T
parent_states = product(
*[cpd.state_names[var] for var in cpd.variables[1:]]
)
all_cpd = ""
for index, state in enumerate(parent_states):
all_cpd += conditional_probability_template.substitute(
state=", ".join(map(str, state)),
values=", ".join(
map(
str,
compat_fns.to_numpy(
cpd_values_transpose[index, :],
decimals=self.round_values,
),
)
),
)
network += conditional_probability_template_total.substitute(
variable_=var,
separator_=separator,
parents=parents_str,
values=all_cpd,
)
return network
[docs] def get_variables(self):
"""
Add variables to BIF
Returns
-------
list: a list containing names of variable
Example
-------
>>> from pgmpy.readwrite import BIFReader, BIFWriter
>>> model = BIFReader('dog-problem.bif').get_model()
>>> writer = BIFWriter(model)
>>> writer.get_variables()
['bowel-problem', 'family-out', 'hear-bark', 'light-on', 'dog-out']
"""
variables = self.model.nodes()
return variables
[docs] def get_states(self):
"""
Add states to variable of BIF
Returns
-------
dict: dict of type {variable: a list of states}
Example
-------
>>> from pgmpy.readwrite import BIFReader, BIFWriter
>>> model = BIFReader('dog-problem.bif').get_model()
>>> writer = BIFWriter(model)
>>> writer.get_states()
{'bowel-problem': ['bowel-problem_0', 'bowel-problem_1'],
'dog-out': ['dog-out_0', 'dog-out_1'],
'family-out': ['family-out_0', 'family-out_1'],
'hear-bark': ['hear-bark_0', 'hear-bark_1'],
'light-on': ['light-on_0', 'light-on_1']}
"""
variable_states = {}
cpds = self.model.get_cpds()
for cpd in cpds:
variable = cpd.variable
variable_states[variable] = []
for state in cpd.state_names[variable]:
variable_states[variable].append(str(state))
return variable_states
[docs] def get_properties(self):
"""
Add property to variables in BIF
Returns
-------
dict: dict of type {variable: list of properties }
Example
-------
>>> from pgmpy.readwrite import BIFReader, BIFWriter
>>> model = BIFReader('dog-problem.bif').get_model()
>>> writer = BIFWriter(model)
>>> writer.get_properties()
{'bowel-problem': ['position = (335, 99)'],
'dog-out': ['position = (300, 195)'],
'family-out': ['position = (257, 99)'],
'hear-bark': ['position = (296, 268)'],
'light-on': ['position = (218, 195)']}
"""
variables = self.model.nodes()
property_tag = {}
for variable in sorted(variables):
properties = self.model.nodes[variable]
properties = collections.OrderedDict(sorted(properties.items()))
property_tag[variable] = []
for prop, val in properties.items():
property_tag[variable].append(str(prop) + " = " + str(val))
return property_tag
[docs] def get_parents(self):
"""
Add the parents to BIF
Returns
-------
dict: dict of type {variable: a list of parents}
Example
-------
>>> from pgmpy.readwrite import BIFReader, BIFWriter
>>> model = BIFReader('dog-problem.bif').get_model()
>>> writer = BIFWriter(model)
>>> writer.get_parents()
{'bowel-problem': [],
'dog-out': ['bowel-problem', 'family-out'],
'family-out': [],
'hear-bark': ['dog-out'],
'light-on': ['family-out']}
"""
cpds = self.model.get_cpds()
variable_parents = {}
for cpd in cpds:
variable_parents[cpd.variable] = cpd.variables[1:]
return variable_parents
[docs] def get_cpds(self):
"""
Adds tables to BIF
Returns
-------
dict: dict of type {variable: array}
Example
-------
>>> from pgmpy.readwrite import BIFReader, BIFWriter
>>> model = BIFReader('dog-problem.bif').get_model()
>>> writer = BIFWriter(model)
>>> writer.get_cpds()
{'bowel-problem': array([ 0.01, 0.99]),
'dog-out': array([ 0.99, 0.97, 0.9 , 0.3 , 0.01, 0.03, 0.1 , 0.7 ]),
'family-out': array([ 0.15, 0.85]),
'hear-bark': array([ 0.7 , 0.01, 0.3 , 0.99]),
'light-on': array([ 0.6 , 0.05, 0.4 , 0.95])}
"""
cpds = self.model.get_cpds()
tables = {}
for cpd in cpds:
tables[cpd.variable] = compat_fns.to_numpy(
cpd.values.ravel(), decimals=self.round_values
)
return tables
[docs] def write_bif(self, filename):
"""
Writes the BIF data into a file
Parameters
----------
filename : Name of the file
Example
-------
>>> from pgmpy.utils import get_example_model
>>> from pgmpy.readwrite import BIFReader, BIFWriter
>>> asia = get_example_model('asia')
>>> writer = BIFWriter(asia)
>>> writer.write_bif(filename='asia.bif')
"""
writer = self.__str__()
with open(filename, "w") as fout:
fout.write(writer)