Source code for pgmpy.readwrite.XMLBIF

#!/usr/bin/env python

import xml.etree.ElementTree as etree
from io import BytesIO
from itertools import chain

import numpy as np

try:
    import pyparsing as pp
except ImportError:
    raise ImportError(
        e.message()
        + ". pyparsing is required for using read/write methods. Please install using: pip install pyparsing."
    )

from pgmpy.factors.discrete import State, TabularCPD
from pgmpy.models import BayesianNetwork
from pgmpy.utils import compat_fns


[docs] class XMLBIFReader(object): """ Initialisation of XMLBIFReader object. Parameters ---------- path : file or str File of XMLBIF data File of XMLBIF data string : str String of XMLBIF data Examples -------- # xmlbif_test.xml is the file present in # http://www.cs.cmu.edu/~fgcozman/Research/InterchangeFormat/ >>> from pgmpy.readwrite import XMLBIFReader >>> reader = XMLBIFReader("xmlbif_test.xml") >>> model = reader.get_model() Reference --------- [1] https://www.cs.cmu.edu/afs/cs/user/fgcozman/www/Research/InterchangeFormat/ """ def __init__(self, path=None, string=None): if path: self.network = etree.ElementTree(file=path).getroot().find("NETWORK") elif string: self.network = etree.fromstring(string.encode("utf-8")).find("NETWORK") else: raise ValueError("Must specify either path or string") self.network_name = self.network.find("NAME").text self.variables = self.get_variables() self.variable_parents = self.get_parents() self.edge_list = self.get_edges() self.variable_states = self.get_states() self.variable_CPD = self.get_values() self.variable_property = self.get_property() self.state_names = self.get_states()
[docs] def get_variables(self): """ Returns list of variables of the network Examples -------- >>> reader = XMLBIF.XMLBIFReader("xmlbif_test.xml") >>> reader.get_variables() ['light-on', 'bowel-problem', 'dog-out', 'hear-bark', 'family-out'] """ variables = [ variable.find("NAME").text for variable in self.network.findall("VARIABLE") ] return variables
[docs] def get_edges(self): """ Returns the edges of the network Examples -------- >>> reader = XMLBIF.XMLBIFReader("xmlbif_test.xml") >>> reader.get_edges() [['family-out', 'light-on'], ['family-out', 'dog-out'], ['bowel-problem', 'dog-out'], ['dog-out', 'hear-bark']] """ edge_list = [ [value, key] for key in self.variable_parents for value in self.variable_parents[key] ] return edge_list
[docs] def get_states(self): """ Returns the states of variables present in the network Examples -------- >>> reader = XMLBIF.XMLBIFReader("xmlbif_test.xml") >>> reader.get_states() {'bowel-problem': ['true', 'false'], 'dog-out': ['true', 'false'], 'family-out': ['true', 'false'], 'hear-bark': ['true', 'false'], 'light-on': ['true', 'false']} """ variable_states = { variable.find("NAME").text: [ outcome.text for outcome in variable.findall("OUTCOME") ] for variable in self.network.findall("VARIABLE") } return variable_states
[docs] def get_parents(self): """ Returns the parents of the variables present in the network Examples -------- >>> reader = XMLBIF.XMLBIFReader("xmlbif_test.xml") >>> reader.get_parents() {'bowel-problem': [], 'dog-out': ['family-out', 'bowel-problem'], 'family-out': [], 'hear-bark': ['dog-out'], 'light-on': ['family-out']} """ variable_parents = { definition.find("FOR").text: [ edge.text for edge in definition.findall("GIVEN") ] for definition in self.network.findall("DEFINITION") } return variable_parents
[docs] def get_values(self): """ Returns the CPD of the variables present in the network Examples -------- >>> reader = XMLBIF.XMLBIFReader("xmlbif_test.xml") >>> reader.get_values() {'bowel-problem': array([[ 0.01], [ 0.99]]), 'dog-out': array([[ 0.99, 0.01, 0.97, 0.03], [ 0.9 , 0.1 , 0.3 , 0.7 ]]), 'family-out': array([[ 0.15], [ 0.85]]), 'hear-bark': array([[ 0.7 , 0.3 ], [ 0.01, 0.99]]), 'light-on': array([[ 0.6 , 0.4 ], [ 0.05, 0.95]])} """ variable_CPD = { definition.find("FOR").text: list(map(float, table.text.split())) for definition in self.network.findall("DEFINITION") for table in definition.findall("TABLE") } for variable in variable_CPD: arr = np.array(variable_CPD[variable]) arr = arr.reshape( ( len(self.variable_states[variable]), arr.size // len(self.variable_states[variable]), ), order="F", ) variable_CPD[variable] = arr return variable_CPD
[docs] def get_property(self): """ Returns the property of the variable Examples -------- >>> reader = XMLBIF.XMLBIFReader("xmlbif_test.xml") >>> reader.get_property() {'bowel-problem': ['position = (190, 69)'], 'dog-out': ['position = (155, 165)'], 'family-out': ['position = (112, 69)'], 'hear-bark': ['position = (154, 241)'], 'light-on': ['position = (73, 165)']} """ variable_property = { variable.find("NAME").text: [ property.text for property in variable.findall("PROPERTY") ] for variable in self.network.findall("VARIABLE") } return variable_property
[docs] def get_model(self, state_name_type=str): """ Returns a Bayesian Network instance from the file/string. Parameters ---------- state_name_type: int, str, or bool (default: str) The data type to which to convert the state names of the variables. Returns ------- BayesianNetwork instance: The read model. Examples -------- >>> from pgmpy.readwrite import XMLBIFReader >>> reader = XMLBIFReader("xmlbif_test.xml") >>> model = reader.get_model() """ model = BayesianNetwork() model.add_nodes_from(self.variables) model.add_edges_from(self.edge_list) model.name = self.network_name tabular_cpds = [] for var, values in self.variable_CPD.items(): evidence_card = [ len(self.variable_states[evidence_var]) for evidence_var in self.variable_parents[var] ] cpd = TabularCPD( var, len(self.variable_states[var]), values, evidence=self.variable_parents[var], evidence_card=evidence_card, state_names={ var: list(map(state_name_type, self.state_names[var])) for var in chain([var], self.variable_parents[var]) }, ) tabular_cpds.append(cpd) model.add_cpds(*tabular_cpds) for node, properties in self.variable_property.items(): for prop in properties: if prop is not None: prop_name, prop_value = map(lambda t: t.strip(), prop.split("=")) model.nodes[node][prop_name] = prop_value return model
[docs] class XMLBIFWriter(object): """ Initialise a XMLBIFWriter object. Parameters ---------- model: BayesianNetwork Instance Model to write encoding: str (optional) Encoding for text data prettyprint: Bool(optional) Indentation in output XML if true Examples -------- >>> from pgmpy.readwrite import XMLBIFWriter >>> from pgmpy.utils import get_example_model >>> model = get_example_model('asia') >>> writer = XMLBIFWriter(model) >>> writer.write_xmlbif('asia.xml') Reference --------- [1] https://www.cs.cmu.edu/afs/cs/user/fgcozman/www/Research/InterchangeFormat/ """ def __init__(self, model, encoding="utf-8", prettyprint=True): if not isinstance(model, BayesianNetwork): raise TypeError("model must an instance of BayesianNetwork") self.model = model self.encoding = encoding self.prettyprint = prettyprint self.xml = etree.Element("BIF", attrib={"VERSION": "0.3"}) self.network = etree.SubElement(self.xml, "NETWORK") if self.model.name: etree.SubElement(self.network, "NAME").text = self.model.name else: etree.SubElement(self.network, "NAME").text = "UNTITLED" self.variables = self.get_variables() self.states = self.get_states() self.properties = self.get_properties() self.definition = self.get_definition() self.tables = self.get_values() def __str__(self): """ Return the XML as string. """ if self.prettyprint: self.indent(self.xml) f = BytesIO() et = etree.ElementTree(self.xml) et.write(f, encoding=self.encoding, xml_declaration=True) return f.getvalue().decode(self.encoding)
[docs] def indent(self, elem, level=0): """ Inplace prettyprint formatter. """ i = "\n" + level * " " if len(elem): if not elem.text or not elem.text.strip(): elem.text = i + " " if not elem.tail or not elem.tail.strip(): elem.tail = i for elem in elem: self.indent(elem, level + 1) if not elem.tail or not elem.tail.strip(): elem.tail = i else: if level and (not elem.tail or not elem.tail.strip()): elem.tail = i
[docs] def get_variables(self): """ Add variables to XMLBIF Return ------ dict: dict of type {variable: variable tags} Examples -------- >>> writer = XMLBIFWriter(model) >>> writer.get_variables() {'bowel-problem': <Element VARIABLE at 0x7fe28607dd88>, 'family-out': <Element VARIABLE at 0x7fe28607de08>, 'hear-bark': <Element VARIABLE at 0x7fe28607de48>, 'dog-out': <Element VARIABLE at 0x7fe28607ddc8>, 'light-on': <Element VARIABLE at 0x7fe28607de88>} """ variables = self.model.nodes() variable_tag = {} for var in sorted(variables): variable_tag[var] = etree.SubElement( self.network, "VARIABLE", attrib={"TYPE": "nature"} ) etree.SubElement(variable_tag[var], "NAME").text = var return variable_tag
[docs] def get_states(self): """ Add outcome to variables of XMLBIF Return ------ dict: dict of type {variable: outcome tags} Examples -------- >>> writer = XMLBIFWriter(model) >>> writer.get_states() {'dog-out': [<Element OUTCOME at 0x7ffbabfcdec8>, <Element OUTCOME at 0x7ffbabfcdf08>], 'family-out': [<Element OUTCOME at 0x7ffbabfd4108>, <Element OUTCOME at 0x7ffbabfd4148>], 'bowel-problem': [<Element OUTCOME at 0x7ffbabfd4088>, <Element OUTCOME at 0x7ffbabfd40c8>], 'hear-bark': [<Element OUTCOME at 0x7ffbabfcdf48>, <Element OUTCOME at 0x7ffbabfcdf88>], 'light-on': [<Element OUTCOME at 0x7ffbabfcdfc8>, <Element OUTCOME at 0x7ffbabfd4048>]} """ outcome_tag = {} cpds = self.model.get_cpds() for cpd in cpds: var = cpd.variable outcome_tag[var] = [] if cpd.state_names is None or cpd.state_names.get(var) is None: states = range(cpd.get_cardinality([var])[var]) else: states = cpd.state_names[var] for state in states: state_tag = etree.SubElement(self.variables[var], "OUTCOME") state_tag.text = self._make_valid_state_name(state) outcome_tag[var].append(state_tag) return outcome_tag
def _make_valid_state_name(self, state_name): """Transform the input state_name into a valid state in XMLBIF. XMLBIF states must start with a letter an only contain letters, numbers and underscores. """ # TODO: Throw a warning that the state names are going to be modified instead of silently modifying it. s = str(state_name) s_fixed = ( pp.CharsNotIn(pp.alphanums + "_") .setParseAction(pp.replaceWith("_")) .transformString(s) ) if not s_fixed[0].isalpha(): s_fixed = s_fixed return s_fixed
[docs] def get_properties(self): """ Add property to variables in XMLBIF Return ------ dict: dict of type {variable: property tag} Examples -------- >>> writer = XMLBIFWriter(model) >>> writer.get_property() {'light-on': <Element PROPERTY at 0x7f7a2ffac1c8>, 'family-out': <Element PROPERTY at 0x7f7a2ffac148>, 'hear-bark': <Element PROPERTY at 0x7f7a2ffac188>, 'bowel-problem': <Element PROPERTY at 0x7f7a2ffac0c8>, 'dog-out': <Element PROPERTY at 0x7f7a2ffac108>} """ variables = self.model.nodes() property_tag = {} for var in sorted(variables): properties = self.model.nodes[var] property_tag[var] = etree.SubElement(self.variables[var], "PROPERTY") for prop, val in properties.items(): property_tag[var].text = str(prop) + " = " + str(val) return property_tag
[docs] def get_definition(self): """ Add Definition to XMLBIF Return ------ dict: dict of type {variable: definition tag} Examples -------- >>> writer = XMLBIFWriter(model) >>> writer.get_definition() {'hear-bark': <Element DEFINITION at 0x7f1d48977408>, 'family-out': <Element DEFINITION at 0x7f1d489773c8>, 'dog-out': <Element DEFINITION at 0x7f1d48977388>, 'bowel-problem': <Element DEFINITION at 0x7f1d48977348>, 'light-on': <Element DEFINITION at 0x7f1d48977448>} """ cpds = self.model.get_cpds() cpds.sort(key=lambda x: x.variable) definition_tag = {} for cpd in cpds: definition_tag[cpd.variable] = etree.SubElement(self.network, "DEFINITION") etree.SubElement(definition_tag[cpd.variable], "FOR").text = cpd.variable for parent in cpd.variables[1:]: etree.SubElement(definition_tag[cpd.variable], "GIVEN").text = parent return definition_tag
[docs] def get_values(self): """ Add Table to XMLBIF. Return --------------- dict: dict of type {variable: table tag} Examples ------- >>> writer = XMLBIFWriter(model) >>> writer.get_values() {'dog-out': <Element TABLE at 0x7f240726f3c8>, 'light-on': <Element TABLE at 0x7f240726f488>, 'bowel-problem': <Element TABLE at 0x7f240726f388>, 'family-out': <Element TABLE at 0x7f240726f408>, 'hear-bark': <Element TABLE at 0x7f240726f448>} """ cpds = self.model.get_cpds() definition_tag = self.definition table_tag = {} for cpd in cpds: table_tag[cpd.variable] = etree.SubElement( definition_tag[cpd.variable], "TABLE" ) table_tag[cpd.variable].text = "" for val in compat_fns.ravel_f(cpd.get_values()): table_tag[cpd.variable].text += str(val) + " " return table_tag
[docs] def write_xmlbif(self, filename): """ Write the xml data into the file. Parameters ---------- filename: Name of the file. Examples -------- >>> from pgmpy.readwrite import XMLBIFWriter >>> from pgmpy.utils import get_example_model >>> model = get_example_model('asia') >>> writer = XMLBIFWriter(model) >>> writer.write_xmlbif('asia.xml') """ with open(filename, "w") as fout: fout.write(self.__str__())