import random
import xml.dom.minidom as md
import xml.etree.ElementTree as etree
from itertools import chain
import networkx as nx
from pgmpy.factors.discrete import TabularCPD
from pgmpy.global_vars import logger
from pgmpy.models import DiscreteBayesianNetwork
from pgmpy.utils import compat_fns
[docs]
class XDSLReader(object):
"""
Initializes the reader object for XDSL file formats[1] created through GeNIe[2].
Note that XDSLReader only supports cpt blocks from the XDSL file format; elements like
'deterministic' need to be aapropriately converted into 'cpt' elements before usage.
Parameters
----------
path : file or str
Path to the XDSL file.
string : str
A string containing the XDSL file content.
Examples
--------
>>> # AsiaDiagnosis.xdsl is an example file downloadable from
>>> # https://repo.bayesfusion.com/bayesbox.html
>>> # The file has been modified slightly to adhere to XDSLReader requirements
>>> from pgmpy.readwrite import XDSLReader
>>> reader = XDSLReader("AsiaDiagnosis.xdsl")
>>> model = reader.get_model()
Reference
---------
[1] https://support.bayesfusion.com/docs/GeNIe/saving_xdslfileformat.html
[2] https://www.bayesfusion.com/genie/
"""
def __init__(self, path=None, string=None):
if path:
self.network = etree.ElementTree(file=path).getroot()
elif string:
self.network = etree.fromstring(string)
else:
raise ValueError("Must specify either path or string")
self.network_name = self.network.attrib["id"]
self.cpt_elements = self.network.find("nodes").findall("cpt")
self.variables = self.get_variables()
self.variable_parents = self.get_parents()
self.edge_list = self.get_edges()
self.variable_states = self.get_states()
self.variable_CPD = self.get_values()
[docs]
def get_variables(self):
"""
Returns list of variables of the network
Examples
--------
>>> reader = XDSLReader("AsiaDiagnosis.xdsl")
>>> reader.get_variables()
['asia', 'tub', 'smoke', 'lung', 'either', 'xray', 'bronc', 'dysp']
"""
variables = [variable.attrib["id"] for variable in self.cpt_elements]
for var in variables:
if isinstance(var, str) and (" " in var):
raise ValueError(
f"XDSLReader does not support models with node names that contain whitespaces. Failed to process node: {var}"
)
return variables
[docs]
def get_parents(self):
"""
Returns the parents of the variables present in the network
Examples
--------
>>> reader = XDSLReader("AsiaDiagnosis.xdsl")
>>> reader.get_parents()
{'asia': [],
'tub': ['asia'],
'smoke': [],
'lung': ['smoke'],
'either': ['tub', 'lung'],
'xray': ['either'],
'bronc': ['smoke'],
'dysp': ['either', 'bronc']
}
"""
variable_parents = {}
for node in self.cpt_elements:
parents = node.find("parents")
if parents is not None:
variable_parents[node.attrib["id"]] = parents.text.split(" ")
else:
variable_parents[node.attrib["id"]] = []
return variable_parents
[docs]
def get_edges(self):
"""
Returns the edges of the network
Examples
--------
>>> reader = XDSLReader("AsiaDiagnosis.xdsl")
>>> reader.get_edges()
[['asia', 'tub'],
['smoke', 'lung'],
['tub', 'either'],
['lung', 'either'],
['either', 'xray'],
['smoke', 'bronc'],
['either', 'dysp'],
['bronc', 'dysp']]
"""
edge_list = [
[value, key]
for key in self.variable_parents
for value in self.variable_parents[key]
]
return edge_list
[docs]
def get_states(self):
"""
Returns the states of variables present in the network
Examples
--------
>>> reader = XDSLReader("AsiaDiagnosis.xdsl")
>>> reader.get_states()
{'asia': ['no', 'yes'],
'tub': ['no', 'yes'],
'smoke': ['no', 'yes'],
'lung': ['no', 'yes'],
'either': ['Nothing',
'CancerORTuberculosis'],
'xray': ['Normal', 'Abnormal'],
'bronc': ['Absent', 'Present'], '
dysp': ['Absent', 'Present']
}
"""
variable_states = {}
for cpt in self.cpt_elements:
variable_states[cpt.attrib["id"]] = [
state.attrib["id"] for state in cpt.findall("state")
]
return variable_states
[docs]
def get_values(self):
"""
Returns the CPD of the variables present in the network
Examples
--------
>>> reader = XDSLReader("AsiaDiagnosis.xdsl")
>>> reader.get_values()
{'asia': [[0.99], [0.01]],
'tub': [[0.99, 0.95], [0.01, 0.05]],
'smoke': [[0.5], [0.5]],
'lung': [[0.99, 0.9], [0.01, 0.1]],
'either': [[1.0, 1.0, 1.0, 0.0], [0.0, 0.0, 0.0, 1.0]],
'xray': [[0.95, 0.02], [0.05, 0.98]],
'bronc': [[0.7, 0.4], [0.3, 0.6]],
'dysp': [[0.9, 0.2, 0.3, 0.1], [0.1, 0.8, 0.7, 0.9]]
}
"""
variable_CPD = {}
for cpt in self.cpt_elements:
combined_prob = cpt.find("probabilities")
num_states = len([state for state in cpt.findall("state")])
cpd_arr = [[] for k in range(num_states)]
prob_values = combined_prob.text.split(" ")
for j in range(num_states):
for i in range(j, len(prob_values), num_states):
cpd_arr[j].append(float(prob_values[i]))
variable_CPD[cpt.attrib["id"]] = cpd_arr
return variable_CPD
[docs]
def get_model(self, state_name_type=str):
"""
Returns a Bayesian Network instance from the file/string.
Parameters
----------
state_name_type: int, str, or bool (default: str)
The data type to which to convert the state names of the variables.
Returns
-------
DiscreteBayesianNetwork instance: The read model.
Examples
--------
>>> from pgmpy.readwrite import XDSLReader
>>> reader = XDSLReader("AsiaDiagnosis.xdsl")
>>> model = reader.get_model()
"""
model = DiscreteBayesianNetwork()
model.add_nodes_from(self.variables)
model.add_edges_from(self.edge_list)
model.name = self.network_name
tabular_cpds = []
for var, values in self.variable_CPD.items():
evidence_card = [
len(self.variable_states[evidence_var])
for evidence_var in self.variable_parents[var]
]
cpd = TabularCPD(
var,
len(self.variable_states[var]),
values,
evidence=self.variable_parents[var],
evidence_card=evidence_card,
state_names={
var: list(map(state_name_type, self.variable_states[var]))
for var in chain([var], self.variable_parents[var])
},
)
tabular_cpds.append(cpd)
model.add_cpds(*tabular_cpds)
return model
[docs]
class XDSLWriter(object):
"""
Initialise a XDSL writer object to export pgmpy models to XDSL file format[1] used by GeNIe[2].
Parameters
----------
model: pgmpy.models.DiscreteBayesianNetwork instance.
The model to write to the file.
network_id: str (default: "MyNetwork")
Name/id of the network
num_samples: int (default: 0)
Number of samples used for continuous variables
disc_samples: int (default: 0)
Number of samples used for discrete variables
encoding: str (optional, default='utf-8')
Encoding for text data
Examples
---------
>>> from pgmpy.readwrite import XDSLWriter
>>> from pgmpy.utils import get_example_model
>>> asia = get_example_model('asia')
>>> writer = XDSLWriter(asia)
>>> writer.write_xdsl('asia.xdsl')
Reference
---------
[1] https://support.bayesfusion.com/docs/GeNIe/saving_xdslfileformat.html
[2] https://www.bayesfusion.com/genie/
"""
def __init__(
self,
model,
network_id="MyNetwork",
num_samples="0",
disc_samples="0",
encoding="utf-8",
):
if not isinstance(model, DiscreteBayesianNetwork):
raise TypeError("model must an instance of DiscreteBayesianNetwork")
self.model = model
self.encoding = encoding
self.network_id = network_id
self.root = etree.Element(
"smile",
{
"version": "1.0",
"id": network_id,
"numsamples": num_samples,
"discsamples": disc_samples,
},
)
self.variables = self.get_variables()
self.cpds = self.get_cpds()
self._create_extensions()
[docs]
def get_variables(self):
"""
Add variables and their XML elements/representation to XDSL
Return
------
dict: dict of type {variable: variable tags}
Examples
--------
>>> writer = XDSLWriter(model)
>>> writer.get_variables()
{'asia': <Element 'cpt' at 0x000001DC6BFA1350>,
'tub': <Element 'cpt' at 0x000001DC6BFA35B0>,
'smoke': <Element 'cpt' at 0x000001DC6BFA3560>,
'lung': <Element 'cpt' at 0x000001DC6BFA12B0>,
'bronc': <Element 'cpt' at 0x000001DC6BFA1260>,
'either': <Element 'cpt' at 0x000001DC6BFA3510>,
'xray': <Element 'cpt' at 0x000001DC6BFA34C0>,
'dysp': <Element 'cpt' at 0x000001DC6BFA1210>}
"""
variable_tag = {}
nodes_elem = etree.SubElement(self.root, "nodes")
for var in self.model.nodes:
if isinstance(var, str) and " " in var:
logger.warning(
f" Node '{var}' contains whitespaces. This could cause issues, especially when using pgmpy.readwrite.XDSLReader"
)
variable_tag[var] = etree.SubElement(nodes_elem, "cpt", {"id": var})
return variable_tag
[docs]
def get_cpds(self):
"""
Add the complete CPT element (with states and probabilities) to XDSL.
Return
---------------
dict: dict of type {variable: table tag}
Examples
-------
>>> writer = XDSLWriter(model)
>>> writer.get_values()
{'asia': <TabularCPD representing P(asia:2) at 0x1885817c830>,
'tub': <TabularCPD representing P(tub:2 | asia:2) at 0x1885a7e57c0>,
'smoke': <TabularCPD representing P(smoke:2) at 0x18858327950>,
'lung': <TabularCPD representing P(lung:2 | smoke:2) at 0x188583278f0>,
'bronc': <TabularCPD representing P(bronc:2 | smoke:2) at 0x18855e05610>,
'either': <TabularCPD representing P(either:2 | lung:2, tub:2) at 0x188582792e0>,
'xray': <TabularCPD representing P(xray:2 | either:2) at 0x1885a7e5910>,
'dysp': <TabularCPD representing P(dysp:2 | bronc:2, either:2) at 0x18858278b90>}
"""
outcome_tag = {}
cpds = self.model.get_cpds()
cpd_vars = [cpd.variable for cpd in cpds]
for var in self.model.nodes:
idx = cpd_vars.index(var)
cpd = cpds[idx]
cpt_elem = self.variables[var]
states = cpd.state_names[cpd.variable]
for st in states:
etree.SubElement(cpt_elem, "state", {"id": str(st)})
evidence = cpd.variables
if len(evidence) > 1:
parents_str = " ".join(evidence[1:])
parents_elem = etree.SubElement(cpt_elem, "parents")
parents_elem.text = parents_str
# Add the <probabilities> element.
probs_elem = etree.SubElement(cpt_elem, "probabilities")
values = cpd.get_values()
# Flatten in column-major order so that for each parent configuration the probabilities for all states are listed.
flat_values = compat_fns.ravel_f(values)
probs_elem.text = " ".join("{:.16f}".format(float(x)) for x in flat_values)
outcome_tag[var] = cpd
return outcome_tag
def _create_extensions(self):
"""
Create the <extensions> block with a minimal <genie> element for layout information.
Parameters
----------
"""
extensions_elem = etree.SubElement(self.root, "extensions")
genie_elem = etree.SubElement(
extensions_elem,
"genie",
{
"version": "1.0",
"app": "GeNIe 5.0.4830.0 ACADEMIC",
"name": self.network_id,
},
)
for node in list(nx.topological_sort(self.model)):
node_elem = etree.SubElement(genie_elem, "node", {"id": node})
name_elem = etree.SubElement(node_elem, "name")
name_elem.text = node
# Appearance details (colors, font).
etree.SubElement(node_elem, "interior", {"color": "e5f6f7"})
etree.SubElement(node_elem, "outline", {"color": "000080"})
etree.SubElement(
node_elem, "font", {"color": "000000", "name": "Arial", "size": "8"}
)
# Set node position (x1, y1, x2, y2).
# Provide random position to each node.
pos_x, pos_y = random.randint(0, 100), random.randint(0, 100)
pos_elem = etree.SubElement(node_elem, "position")
pos_elem.text = f"{pos_x} {pos_y} {pos_x+72} {pos_y+48}"
etree.SubElement(
node_elem,
"barchart",
{"active": "true", "width": "128", "height": "128"},
)
[docs]
def write_xdsl(self, filename=None):
"""
Write the xdsl data into the file.
Parameters
----------
filename: Name (path) of the file.
Examples
--------
>>> from pgmpy.readwrite import XDSLWriter
>>> from pgmpy.utils import get_example_model
>>> model = get_example_model('asia')
>>> writer = XDSLWriter(model)
>>> writer.write_xdsl('asia.xdsl')
"""
xml_str = etree.tostring(self.root, encoding=self.encoding)
parsed = md.parseString(xml_str)
pretty_xml_str = parsed.toprettyxml(indent=" ", encoding=self.encoding)
if filename is not None:
with open(filename, "wb") as f:
f.write(pretty_xml_str)