Source code for pgmpy.readwrite.XMLBIF

#!/usr/bin/env python

import xml.etree.ElementTree as etree
from io import BytesIO
from itertools import chain

import numpy as np
import pyparsing as pp

from pgmpy.factors.discrete import State, TabularCPD
from pgmpy.models import BayesianNetwork
from pgmpy.utils import compat_fns


[docs]class XMLBIFReader(object): """ Initialisation of XMLBIFReader object. Parameters ---------- path : file or str File of XMLBIF data File of XMLBIF data string : str String of XMLBIF data Examples -------- # xmlbif_test.xml is the file present in # http://www.cs.cmu.edu/~fgcozman/Research/InterchangeFormat/ >>> from pgmpy.readwrite import XMLBIFReader >>> reader = XMLBIFReader("xmlbif_test.xml") >>> model = reader.get_model() Reference --------- [1] https://www.cs.cmu.edu/afs/cs/user/fgcozman/www/Research/InterchangeFormat/ """ def __init__(self, path=None, string=None): if path: self.network = etree.ElementTree(file=path).getroot().find("NETWORK") elif string: self.network = etree.fromstring(string.encode("utf-8")).find("NETWORK") else: raise ValueError("Must specify either path or string") self.network_name = self.network.find("NAME").text self.variables = self.get_variables() self.variable_parents = self.get_parents() self.edge_list = self.get_edges() self.variable_states = self.get_states() self.variable_CPD = self.get_values() self.variable_property = self.get_property() self.state_names = self.get_states()
[docs] def get_variables(self): """ Returns list of variables of the network Examples -------- >>> reader = XMLBIF.XMLBIFReader("xmlbif_test.xml") >>> reader.get_variables() ['light-on', 'bowel-problem', 'dog-out', 'hear-bark', 'family-out'] """ variables = [ variable.find("NAME").text for variable in self.network.findall("VARIABLE") ] return variables
[docs] def get_edges(self): """ Returns the edges of the network Examples -------- >>> reader = XMLBIF.XMLBIFReader("xmlbif_test.xml") >>> reader.get_edges() [['family-out', 'light-on'], ['family-out', 'dog-out'], ['bowel-problem', 'dog-out'], ['dog-out', 'hear-bark']] """ edge_list = [ [value, key] for key in self.variable_parents for value in self.variable_parents[key] ] return edge_list
[docs] def get_states(self): """ Returns the states of variables present in the network Examples -------- >>> reader = XMLBIF.XMLBIFReader("xmlbif_test.xml") >>> reader.get_states() {'bowel-problem': ['true', 'false'], 'dog-out': ['true', 'false'], 'family-out': ['true', 'false'], 'hear-bark': ['true', 'false'], 'light-on': ['true', 'false']} """ variable_states = { variable.find("NAME").text: [ outcome.text for outcome in variable.findall("OUTCOME") ] for variable in self.network.findall("VARIABLE") } return variable_states
[docs] def get_parents(self): """ Returns the parents of the variables present in the network Examples -------- >>> reader = XMLBIF.XMLBIFReader("xmlbif_test.xml") >>> reader.get_parents() {'bowel-problem': [], 'dog-out': ['family-out', 'bowel-problem'], 'family-out': [], 'hear-bark': ['dog-out'], 'light-on': ['family-out']} """ variable_parents = { definition.find("FOR").text: [ edge.text for edge in definition.findall("GIVEN") ] for definition in self.network.findall("DEFINITION") } return variable_parents
[docs] def get_values(self): """ Returns the CPD of the variables present in the network Examples -------- >>> reader = XMLBIF.XMLBIFReader("xmlbif_test.xml") >>> reader.get_values() {'bowel-problem': array([[ 0.01], [ 0.99]]), 'dog-out': array([[ 0.99, 0.01, 0.97, 0.03], [ 0.9 , 0.1 , 0.3 , 0.7 ]]), 'family-out': array([[ 0.15], [ 0.85]]), 'hear-bark': array([[ 0.7 , 0.3 ], [ 0.01, 0.99]]), 'light-on': array([[ 0.6 , 0.4 ], [ 0.05, 0.95]])} """ variable_CPD = { definition.find("FOR").text: list(map(float, table.text.split())) for definition in self.network.findall("DEFINITION") for table in definition.findall("TABLE") } for variable in variable_CPD: arr = np.array(variable_CPD[variable]) arr = arr.reshape( ( len(self.variable_states[variable]), arr.size // len(self.variable_states[variable]), ), order="F", ) variable_CPD[variable] = arr return variable_CPD
[docs] def get_property(self): """ Returns the property of the variable Examples -------- >>> reader = XMLBIF.XMLBIFReader("xmlbif_test.xml") >>> reader.get_property() {'bowel-problem': ['position = (190, 69)'], 'dog-out': ['position = (155, 165)'], 'family-out': ['position = (112, 69)'], 'hear-bark': ['position = (154, 241)'], 'light-on': ['position = (73, 165)']} """ variable_property = { variable.find("NAME").text: [ property.text for property in variable.findall("PROPERTY") ] for variable in self.network.findall("VARIABLE") } return variable_property
[docs] def get_model(self, state_name_type=str): """ Returns a Bayesian Network instance from the file/string. Parameters ---------- state_name_type: int, str, or bool (default: str) The data type to which to convert the state names of the variables. Returns ------- BayesianNetwork instance: The read model. Examples -------- >>> from pgmpy.readwrite import XMLBIFReader >>> reader = XMLBIFReader("xmlbif_test.xml") >>> model = reader.get_model() """ model = BayesianNetwork() model.add_nodes_from(self.variables) model.add_edges_from(self.edge_list) model.name = self.network_name tabular_cpds = [] for var, values in self.variable_CPD.items(): evidence_card = [ len(self.variable_states[evidence_var]) for evidence_var in self.variable_parents[var] ] cpd = TabularCPD( var, len(self.variable_states[var]), values, evidence=self.variable_parents[var], evidence_card=evidence_card, state_names={ var: list(map(state_name_type, self.state_names[var])) for var in chain([var], self.variable_parents[var]) }, ) tabular_cpds.append(cpd) model.add_cpds(*tabular_cpds) for node, properties in self.variable_property.items(): for prop in properties: if prop is not None: prop_name, prop_value = map(lambda t: t.strip(), prop.split("=")) model.nodes[node][prop_name] = prop_value return model
[docs]class XMLBIFWriter(object): """ Initialise a XMLBIFWriter object. Parameters ---------- model: BayesianNetwork Instance Model to write encoding: str (optional) Encoding for text data prettyprint: Bool(optional) Indentation in output XML if true Examples -------- >>> from pgmpy.readwrite import XMLBIFWriter >>> from pgmpy.utils import get_example_model >>> model = get_example_model('asia') >>> writer = XMLBIFWriter(model) >>> writer.write_xmlbif('asia.xml') Reference --------- [1] https://www.cs.cmu.edu/afs/cs/user/fgcozman/www/Research/InterchangeFormat/ """ def __init__(self, model, encoding="utf-8", prettyprint=True): if not isinstance(model, BayesianNetwork): raise TypeError("model must an instance of BayesianNetwork") self.model = model self.encoding = encoding self.prettyprint = prettyprint self.xml = etree.Element("BIF", attrib={"VERSION": "0.3"}) self.network = etree.SubElement(self.xml, "NETWORK") if self.model.name: etree.SubElement(self.network, "NAME").text = self.model.name else: etree.SubElement(self.network, "NAME").text = "UNTITLED" self.variables = self.get_variables() self.states = self.get_states() self.properties = self.get_properties() self.definition = self.get_definition() self.tables = self.get_values() def __str__(self): """ Return the XML as string. """ if self.prettyprint: self.indent(self.xml) f = BytesIO() et = etree.ElementTree(self.xml) et.write(f, encoding=self.encoding, xml_declaration=True) return f.getvalue().decode(self.encoding)
[docs] def indent(self, elem, level=0): """ Inplace prettyprint formatter. """ i = "\n" + level * " " if len(elem): if not elem.text or not elem.text.strip(): elem.text = i + " " if not elem.tail or not elem.tail.strip(): elem.tail = i for elem in elem: self.indent(elem, level + 1) if not elem.tail or not elem.tail.strip(): elem.tail = i else: if level and (not elem.tail or not elem.tail.strip()): elem.tail = i
[docs] def get_variables(self): """ Add variables to XMLBIF Return ------ dict: dict of type {variable: variable tags} Examples -------- >>> writer = XMLBIFWriter(model) >>> writer.get_variables() {'bowel-problem': <Element VARIABLE at 0x7fe28607dd88>, 'family-out': <Element VARIABLE at 0x7fe28607de08>, 'hear-bark': <Element VARIABLE at 0x7fe28607de48>, 'dog-out': <Element VARIABLE at 0x7fe28607ddc8>, 'light-on': <Element VARIABLE at 0x7fe28607de88>} """ variables = self.model.nodes() variable_tag = {} for var in sorted(variables): variable_tag[var] = etree.SubElement( self.network, "VARIABLE", attrib={"TYPE": "nature"} ) etree.SubElement(variable_tag[var], "NAME").text = var return variable_tag
[docs] def get_states(self): """ Add outcome to variables of XMLBIF Return ------ dict: dict of type {variable: outcome tags} Examples -------- >>> writer = XMLBIFWriter(model) >>> writer.get_states() {'dog-out': [<Element OUTCOME at 0x7ffbabfcdec8>, <Element OUTCOME at 0x7ffbabfcdf08>], 'family-out': [<Element OUTCOME at 0x7ffbabfd4108>, <Element OUTCOME at 0x7ffbabfd4148>], 'bowel-problem': [<Element OUTCOME at 0x7ffbabfd4088>, <Element OUTCOME at 0x7ffbabfd40c8>], 'hear-bark': [<Element OUTCOME at 0x7ffbabfcdf48>, <Element OUTCOME at 0x7ffbabfcdf88>], 'light-on': [<Element OUTCOME at 0x7ffbabfcdfc8>, <Element OUTCOME at 0x7ffbabfd4048>]} """ outcome_tag = {} cpds = self.model.get_cpds() for cpd in cpds: var = cpd.variable outcome_tag[var] = [] if cpd.state_names is None or cpd.state_names.get(var) is None: states = range(cpd.get_cardinality([var])[var]) else: states = cpd.state_names[var] for state in states: state_tag = etree.SubElement(self.variables[var], "OUTCOME") state_tag.text = self._make_valid_state_name(state) outcome_tag[var].append(state_tag) return outcome_tag
def _make_valid_state_name(self, state_name): """Transform the input state_name into a valid state in XMLBIF. XMLBIF states must start with a letter an only contain letters, numbers and underscores. """ # TODO: Throw a warning that the state names are going to be modified instead of silently modifying it. s = str(state_name) s_fixed = ( pp.CharsNotIn(pp.alphanums + "_") .setParseAction(pp.replaceWith("_")) .transformString(s) ) if not s_fixed[0].isalpha(): s_fixed = s_fixed return s_fixed
[docs] def get_properties(self): """ Add property to variables in XMLBIF Return ------ dict: dict of type {variable: property tag} Examples -------- >>> writer = XMLBIFWriter(model) >>> writer.get_property() {'light-on': <Element PROPERTY at 0x7f7a2ffac1c8>, 'family-out': <Element PROPERTY at 0x7f7a2ffac148>, 'hear-bark': <Element PROPERTY at 0x7f7a2ffac188>, 'bowel-problem': <Element PROPERTY at 0x7f7a2ffac0c8>, 'dog-out': <Element PROPERTY at 0x7f7a2ffac108>} """ variables = self.model.nodes() property_tag = {} for var in sorted(variables): properties = self.model.nodes[var] property_tag[var] = etree.SubElement(self.variables[var], "PROPERTY") for prop, val in properties.items(): property_tag[var].text = str(prop) + " = " + str(val) return property_tag
[docs] def get_definition(self): """ Add Definition to XMLBIF Return ------ dict: dict of type {variable: definition tag} Examples -------- >>> writer = XMLBIFWriter(model) >>> writer.get_definition() {'hear-bark': <Element DEFINITION at 0x7f1d48977408>, 'family-out': <Element DEFINITION at 0x7f1d489773c8>, 'dog-out': <Element DEFINITION at 0x7f1d48977388>, 'bowel-problem': <Element DEFINITION at 0x7f1d48977348>, 'light-on': <Element DEFINITION at 0x7f1d48977448>} """ cpds = self.model.get_cpds() cpds.sort(key=lambda x: x.variable) definition_tag = {} for cpd in cpds: definition_tag[cpd.variable] = etree.SubElement(self.network, "DEFINITION") etree.SubElement(definition_tag[cpd.variable], "FOR").text = cpd.variable for parent in cpd.variables[1:]: etree.SubElement(definition_tag[cpd.variable], "GIVEN").text = parent return definition_tag
[docs] def get_values(self): """ Add Table to XMLBIF. Return --------------- dict: dict of type {variable: table tag} Examples ------- >>> writer = XMLBIFWriter(model) >>> writer.get_values() {'dog-out': <Element TABLE at 0x7f240726f3c8>, 'light-on': <Element TABLE at 0x7f240726f488>, 'bowel-problem': <Element TABLE at 0x7f240726f388>, 'family-out': <Element TABLE at 0x7f240726f408>, 'hear-bark': <Element TABLE at 0x7f240726f448>} """ cpds = self.model.get_cpds() definition_tag = self.definition table_tag = {} for cpd in cpds: table_tag[cpd.variable] = etree.SubElement( definition_tag[cpd.variable], "TABLE" ) table_tag[cpd.variable].text = "" for val in compat_fns.ravel_f(cpd.get_values()): table_tag[cpd.variable].text += str(val) + " " return table_tag
[docs] def write_xmlbif(self, filename): """ Write the xml data into the file. Parameters ---------- filename: Name of the file. Examples -------- >>> from pgmpy.readwrite import XMLBIFWriter >>> from pgmpy.utils import get_example_model >>> model = get_example_model('asia') >>> writer = XMLBIFWriter(model) >>> writer.write_xmlbif('asia.xml') """ with open(filename, "w") as fout: fout.write(self.__str__())