import collections
import warnings
from math import prod
from string import Template
import numpy as np
from pgmpy import logger
try:
from pyparsing import (
CharsNotIn,
Group,
OneOrMore,
Optional,
Suppress,
Word,
ZeroOrMore,
alphanums,
alphas,
cppStyleComment,
nums,
printables,
)
except ImportError as e:
raise ImportError(
f"{e}. pyparsing is required for using read/write methods. Please install using: pip install pyparsing."
) from None
from pgmpy.factors.discrete.CPD import TabularCPD
from pgmpy.models import DiscreteBayesianNetwork
from pgmpy.utils import compat_fns
[docs]
class NETWriter:
"""
Base class for writing network file in net format
Parameters
----------
model: DiscreteBayesianNetwork Instance
Examples
----------
>>> from pgmpy.readwrite import NETWriter
>>> from pgmpy.example_models import load_model
>>> asia = load_model("bnlearn/asia")
>>> writer = NETWriter(asia)
>>> writer
<pgmpy.readwrite.NET.NETWriter at 0x7feac652c2b0>
>>> writer.write("asia.net")
Reference
---------
[1] HUGIN EXPERT A/S . The HUGIN file format. http://www.hugin.com, 2011.
"""
def __init__(self, model):
if not isinstance(model, DiscreteBayesianNetwork):
raise TypeError("model must be an instance of DiscreteBayesianNetwork")
self.model = model
if not self.model.name:
self.network_name = "unknown"
else:
self.network_name = self.model.name
self.variables = self.get_variables()
self.variable_states = self.get_states()
self.property_tag = self.get_properties()
self.variable_parents = self.get_parents()
self.tables = self.get_cpds()
[docs]
def NET_templates(self):
"""
Create template for writing in NET format
"""
network_template = Template("net {\n}\n")
node_template = Template("node $name{\n states = ($states);\n$properties}\n")
potential_template = Template("potential ($variable_$separator_$parents){\n data = $values;\n}\n")
property_template = Template(" $prop;\n")
return (network_template, node_template, potential_template, property_template)
def __str__(self):
"""Return the NET"""
(
network_template,
node_template,
potential_template,
property_template,
) = self.NET_templates()
network = ""
network += network_template.substitute()
variables = self.variables
for var in sorted(variables):
quoted_states = ['"' + state + '"' for state in self.variable_states[var]]
states = " ".join(quoted_states)
if not self.property_tag[var]:
properties = ""
else:
properties = ""
for prop_val in self.property_tag[var]:
properties += property_template.substitute(prop=prop_val)
network += node_template.substitute(name=var, states=states, properties=properties)
for var in sorted(variables):
if not self.variable_parents[var]:
parents = ""
separator = " |"
else:
parents = " ".join(self.variable_parents[var])
separator = " | "
potentials = self.net_cpd(var)
network += potential_template.substitute(
variable_=var,
separator_=separator,
parents=parents,
values=potentials,
)
return network
[docs]
def net_cpd(self, var_name):
"""
Util function for turning pgmpy CPT values into CPT format of .net files
Inputs
-------
var_name: string, name of the variable
Returns
-------
string: CPT format of .net files
"""
cpt = self.tables[var_name]
cpt_array = np.moveaxis(compat_fns.to_numpy(cpt, decimals=8), 0, -1)
# avoid truncated output when serializing to str
cpt_string = np.array2string(cpt_array, threshold=np.inf, max_line_width=np.inf)
net_cpt_string = cpt_string.replace("[", "(").replace("]", ")").replace(". ", ".0 ").replace(".)", ".0)")
# Genie does not read potentials such as 1. therefore last line adds .0 to those
return net_cpt_string
[docs]
def get_variables(self):
"""
Add variables to NET
Returns
-------
list: a list containing names of variable
Example
-------
>>> from pgmpy.example_models import load_model
>>> from pgmpy.readwrite import NETWriter
>>> asia = load_model("bnlearn/asia")
>>> writer = NETWriter(asia)
>>> writer.get_variables()
['asia', 'tub', 'smoke', 'lung', 'bronc', 'either', 'xray', 'dysp']
"""
variables = list(self.model.nodes())
return variables
[docs]
def get_cpds(self):
"""
Adds tables to NET
Returns
-------
dict: dict of type {variable: array}
Example
-------
>>> from pgmpy.example_models import load_model
>>> from pgmpy.readwrite import NETWriter
>>> asia = load_model("bnlearn/asia")
>>> writer = NETWriter(asia)
>>> writer.get_cpds()
{'asia': array([0.01, 0.99]),
'bronc': array([[0.6, 0.3],
[0.4, 0.7]]),
'dysp': array([[[0.9, 0.8],
[0.7, 0.1]],
[[0.1, 0.2],
[0.3, 0.9]]]),
'either': array([[[1., 1.],
[1., 0.]],
[[0., 0.],
[0., 1.]]]),
'lung': array([[0.1 , 0.01],
[0.9 , 0.99]]),
'smoke': array([0.5, 0.5]),
'tub': array([[0.05, 0.01],
[0.95, 0.99]]),
'xray': array([[0.98, 0.05],
[0.02, 0.95]])}
"""
cpds = self.model.get_cpds()
tables = {}
for cpd in cpds:
tables[cpd.variable] = cpd.values
return tables
[docs]
def get_properties(self):
"""
Add property to variables in NET
Returns
-------
dict: dict of type {variable: list of properties }
Example
-------
>>> from pgmpy.example_models import load_model
>>> from pgmpy.readwrite import NETWriter
>>> asia = load_model("bnlearn/asia")
>>> writer = NETWriter(asia)
>>> writer.get_properties()
"""
variables = self.model.nodes()
property_tag = {}
for variable in sorted(variables):
properties = self.model.nodes[variable]
properties = collections.OrderedDict(sorted(properties.items()))
property_tag[variable] = []
for prop, val in properties.items():
property_tag[variable].append(str(prop) + " = " + str(val))
return property_tag
[docs]
def get_states(self):
"""
Add states to variable of NET
Returns
-------
dict: dict of type {variable: a list of states}
Example
-------
>>> from pgmpy.example_models import load_model
>>> from pgmpy.readwrite import NETWriter
>>> asia = load_model("bnlearn/asia")
>>> writer = NETWriter(asia)
>>> writer.get_states()
{'asia': ['yes', 'no'],
'bronc': ['yes', 'no'],
'dysp': ['yes', 'no'],
'either': ['yes', 'no'],
'lung': ['yes', 'no'],
'smoke': ['yes', 'no'],
'tub': ['yes', 'no'],
'xray': ['yes', 'no']}
"""
variable_states = {}
cpds = self.model.get_cpds()
for cpd in cpds:
variable = cpd.variable
variable_states[variable] = []
for state in cpd.state_names[variable]:
state_str = str(state)
if "," in state_str:
logger.warning(
f"State name '{state_str}' for variable '{variable}' contains commas. "
"This may cause issues when loading the file. Consider removing any special characters."
)
variable_states[variable].append(state_str)
return variable_states
[docs]
def get_parents(self):
"""
Add the parents to NET
Returns
-------
dict: dict of type {variable: a list of parents}
Example
-------
>>> from pgmpy.example_models import load_model
>>> from pgmpy.readwrite import NETWriter
>>> asia = load_model("bnlearn/asia")
>>> writer = NETWriter(asia)
>>> writer.get_parents()
{'asia': [],
'bronc': ['smoke'],
'dysp': ['bronc', 'either'],
'either': ['lung', 'tub'],
'lung': ['smoke'],
'smoke': [],
'tub': ['asia'],
'xray': ['either']}
"""
cpds = self.model.get_cpds()
variable_parents = {}
for cpd in cpds:
variable_parents[cpd.variable] = cpd.variables[1:]
return variable_parents
[docs]
def write(self, filename):
"""
Writes the NET data into a file
Parameters
----------
filename : Name of the file
Example
-------
>>> from pgmpy.example_models import load_model
>>> from pgmpy.readwrite import NETWriter
>>> asia = load_model("bnlearn/asia")
>>> writer = NETWriter(asia)
>>> writer.write(filename="asia.net")
"""
writer = self.__str__()
with open(filename, "w") as fout:
fout.write(writer)
[docs]
def write_net(self, filename):
warnings.warn(
"`NETWriter.write_net` is deprecated. Please use `NETWriter.write` instead.", FutureWarning, stacklevel=2
)
self.write(filename)
[docs]
class NETReader:
"""
Initializes a NETReader object.
Parameters
----------
path : file or str
File of net data
string : str
String of net data
include_properties: boolean
If True, gets the properties tag from the file and stores in graph properties.
defaultname: int (default: "bn_model")
Default name for the network if a network name is not available in the net file.
Examples
--------
# asia.net file is present at
# https://www.bnlearn.com/bnrepository/discrete-small.html#asia
>>> from pgmpy.readwrite import NETReader
>>> reader = NETReader("asia.net")
>>> reader
<pgmpy.readwrite.NET.NETReader at 0x7feac645c640>
>>> model = reader.get_model()
"""
def __init__(self, path=None, string=None, include_properties=False, defaultName="bn_model"):
if path:
with open(path) as network:
self.network = network.read()
elif string:
self.network = string
else:
raise ValueError("Must specify either path or string")
self.include_properties = include_properties
if "/*" in self.network or "//" in self.network:
self.network = cppStyleComment.suppress().transform_string(self.network) # removing comments from the file
(
self.name_expr,
self.state_expr,
self.property_expr,
) = self.get_variable_grammar()
self.potential_expr, self.cpd_expr = self.get_probability_grammar()
if not self.get_network_name():
self.network_name = defaultName
else:
self.network_name = self.get_network_name()
self.variable_names = self.get_variables()
self.variable_states = self.get_states()
if self.include_properties:
self.variable_properties = self.get_property()
self.variable_parents = self.get_parents()
self.variable_cpds = self.get_values()
self.edges = self.get_edges()
[docs]
def get_variable_grammar(self):
"""
A method that returns variable grammar
"""
# Defining an expression for valid word
word_expr = Word(alphanums + "_" + "-")("nodename")
name_expr = Suppress("node ") + word_expr + Optional(Suppress("{"))
word_expr2 = Word(init_chars=printables, exclude_chars=["(", ")", ",", " "])
state_expr = ZeroOrMore(word_expr2 + Optional(Suppress(",")))
# Defining a variable state expression
variable_state_expr = (
Suppress("states")
+ Suppress("=")
+ Suppress("(")
+ Group(state_expr)("statenames")
+ Suppress(")")
+ Suppress(";")
)
# variable states is of the form type description [args] { val1, val2 }; (comma may or may not be present)
pexpr = Word(alphas.lower()) + Suppress("=") + CharsNotIn(";") + Suppress(";")
property_expr = ZeroOrMore(pexpr) # Creating an expr to find property
variable_property_expr = (
Suppress("node ")
+ Word(alphanums + "_" + "-")("varname")
+ Suppress("{")
+ Group(property_expr)("properties")
+ Suppress("}")
)
return name_expr, variable_state_expr, variable_property_expr
[docs]
def get_probability_grammar(self):
"""
A method that returns probability grammar
"""
word_expr = Word(alphanums + "-" + "_") + Suppress(Optional("|"))
potential_expr = Suppress("potential") + Suppress("(") + OneOrMore(word_expr) + Suppress(")")
num_expr = Suppress(ZeroOrMore("(")) + Word(nums + "-" + "+" + "e" + "E" + ".") + Suppress(ZeroOrMore(")"))
cpd_expr = Suppress("data") + Suppress("=") + OneOrMore(num_expr)
return potential_expr, cpd_expr
[docs]
def get_network_name(self):
"""
Returns the name of the network. Returns false if no network name is available
Example
---------------
# asia.net file is present at
# https://www.bnlearn.com/bnrepository/discrete-small.html#asia
>>> from pgmpy.readwrite import NETReader
>>> reader = NETReader("asia.net")
>>> reader.get_network_name()
False
"""
start = self.network.find("net")
end = self.network.find("}\n", start)
# Creating a network attribute
network_attribute = (
Suppress("name")
+ Suppress("=")
+ Suppress('"')
+ Word(alphanums + "_" + "-")
+ Suppress('"')
+ Suppress(";")
)
network_name = network_attribute.search_string(self.network[start:end])
if not network_name:
return False
return network_name[0][0]
[docs]
def get_variables(self):
"""
Returns list of variables of the network
Example
---------------
# asia.net file is present at
# https://www.bnlearn.com/bnrepository/discrete-small.html#asia
>>> from pgmpy.readwrite import NETReader
>>> reader = NETReader("asia.net")
>>> reader.get_variables()
['asia', 'tub', 'smoke', 'lung', 'bronc', 'either', 'xray', 'dysp']
"""
variable_names = []
for match in self.name_expr.scan_string(self.network):
result = match[0]
name = result.nodename
variable_names.append(name)
return variable_names
[docs]
def get_states(self):
"""
Returns the states of each variable in the network
Example
---------------
# asia.net file is present at
# https://www.bnlearn.com/bnrepository/discrete-small.html#asia
>>> from pgmpy.readwrite import NETReader
>>> reader = NETReader("asia.net")
>>> reader.get_states()
{'asia': ['yes', 'no'],
'tub': ['yes', 'no'],
'smoke': ['yes', 'no'],
'lung': ['yes', 'no'],
'bronc': ['yes', 'no'],
'either': ['yes', 'no'],
'xray': ['yes', 'no'],
'dysp': ['yes', 'no']}
"""
variable_states = {}
for index, match in enumerate(self.name_expr.scan_string(self.network)):
result = match[0]
name = result.nodename
allstates = list(self.state_expr.scan_string(self.network))
states_unedited = list(
allstates[index][0].statenames
) # includes double quotation like ['"state1"', '"state2"']
states_edited = [state.replace('"', "") for state in states_unedited]
variable_states[name] = states_edited
return variable_states
[docs]
def get_property(self):
"""
Returns the property of the variable
Example
-------------
# asia.net file is present at
# https://www.bnlearn.com/bnrepository/discrete-small.html#asia
>>> from pgmpy.readwrite import NETReader
>>> reader = NETReader("asia.net")
>>> reader.get_property()
{'asia': {},
'tub': {},
'smoke': {},
'lung': {},
'bronc': {},
'either': {},
'xray': {},
'dysp': {}}
"""
variable_properties = {}
for match in self.property_expr.scan_string(self.network):
var_name = match[0].varname
prop_list = match[0].properties
num_props = len(prop_list)
props = {}
for index in range(0, num_props, 2):
props[prop_list[index].strip()] = prop_list[index + 1].strip()
# Remove states from props
props.pop("states", None)
variable_properties[var_name] = props
return variable_properties
[docs]
def get_parents(self):
"""
Returns the parents of the variables present in the network
Example
-------------
# asia.net file is present at
# https://www.bnlearn.com/bnrepository/discrete-small.html#asia
>>> from pgmpy.readwrite import NETReader
>>> reader = NETReader("asia.net")
>>> reader.get_parents()
{'asia': [],
'tub': ['asia'],
'smoke': [],
'lung': ['smoke'],
'bronc': ['smoke'],
'either': ['lung', 'tub'],
'xray': ['either'],
'dysp': ['bronc', 'either']}
"""
variable_parents = {}
for match in self.potential_expr.scan_string(self.network):
vars_in_potential = match[0]
variable_parents[vars_in_potential[0]] = vars_in_potential[1:]
return variable_parents
[docs]
def get_values(self):
"""
Returns the CPD of the variables present in the network
Example
-------------
# asia.net file is present at
# https://www.bnlearn.com/bnrepository/discrete-small.html#asia
>>> from pgmpy.readwrite import NETReader
>>> reader = NETReader("asia.net")
>>> reader.get_values()
{'asia': array([[0.01],
[0.99]]),
'tub': array([[0.05, 0.01],
[0.95, 0.99]]),
'smoke': array([[0.5],
[0.5]]),
'lung': array([[0.1 , 0.01],
[0.9 , 0.99]]),
'bronc': array([[0.6, 0.3],
[0.4, 0.7]]),
'either': array([[1., 1., 1., 0.],
[0., 0., 0., 1.]]),
'xray': array([[0.98, 0.05],
[0.02, 0.95]]),
'dysp': array([[0.9, 0.8, 0.7, 0.1],
[0.1, 0.2, 0.3, 0.9]])}
"""
variable_cpds = {}
parents = self.variable_parents
variables = list(parents.keys())
states = self.variable_states
cpds = self.cpd_expr.scan_string(self.network)
for index, match in enumerate(cpds):
var = variables[index]
pars = parents[var]
var_state_num = len(states[var])
par_states_prod = prod([len(states[par]) for par in pars])
cpd_flat = np.array(match[0], dtype="float64")
cpd_2d = cpd_flat.reshape(par_states_prod, var_state_num).T
variable_cpds[var] = cpd_2d
return variable_cpds
[docs]
def get_edges(self):
"""
Returns the edges of the network
Example
-------------
# asia.net file is present at
# https://www.bnlearn.com/bnrepository/discrete-small.html#asia
>>> from pgmpy.readwrite import NETReader
>>> reader = NETReader("asia.net")
>>> reader.get_edges()
[['asia', 'tub'],
['smoke', 'lung'],
['smoke', 'bronc'],
['lung', 'either'],
['tub', 'either'],
['either', 'xray'],
['bronc', 'dysp'],
['either', 'dysp']]
"""
edges = [[value, key] for key in self.variable_parents.keys() for value in self.variable_parents[key]]
return edges
[docs]
def get_model(self, state_name_type=str):
"""
Returns the Bayesian Model read from the file/str.
Parameters
----------
state_name_type: int, str or bool (default: str)
The data type to which to convert the state names of the variables.
Example
----------
# asia.net file is present at
# https://www.bnlearn.com/bnrepository/discrete-small.html#asia
>>> from pgmpy.readwrite import NETReader
>>> reader = NETReader("asia.net")
>>> reader.get_model()
<pgmpy.models.DiscreteBayesianNetwork.DiscreteBayesianNetwork at 0x7febc059b430>
"""
try:
model = DiscreteBayesianNetwork()
model.add_nodes_from(self.variable_names)
model.add_edges_from(self.edges)
model.name = self.network_name
tabular_cpds = []
for var in sorted(self.variable_cpds.keys()):
values = self.variable_cpds[var]
states = self.variable_states[var]
states_num = len(states)
parents = self.variable_parents[var]
parent_states_num = [len(self.variable_states[par]) for par in parents]
state_names = {
par_var: list(map(state_name_type, self.variable_states[par_var])) for par_var in parents
}
state_names[var] = list(map(state_name_type, states))
cpd = TabularCPD(
var,
states_num,
values,
evidence=parents,
evidence_card=parent_states_num,
state_names=state_names,
)
tabular_cpds.append(cpd)
model.add_cpds(*tabular_cpds)
if self.include_properties:
for node, properties in self.variable_properties.items():
for prop_name, prop_value in properties.items():
model.nodes[node][prop_name] = prop_value
return model
except AttributeError:
raise AttributeError("First get states of variables, edges, parents and network name")