Source code for pgmpy.readwrite.BIF

import collections
import re
from itertools import product
from string import Template

import numpy as np
import pyparsing as pp
from joblib import Parallel, delayed

try:
    from pyparsing import (
        CharsNotIn,
        Group,
        OneOrMore,
        Optional,
        Suppress,
        Word,
        ZeroOrMore,
        cppStyleComment,
        nums,
        printables,
    )
except ImportError as e:
    raise ImportError(
        f"{e}. pyparsing is required for using read/write methods. Please install using: pip install pyparsing."
    ) from None

from pgmpy.factors.discrete import TabularCPD
from pgmpy.global_vars import logger
from pgmpy.models import DiscreteBayesianNetwork
from pgmpy.utils import compat_fns


[docs] class BIFReader(object): """ Initializes a BIFReader object. Parameters ---------- path : file or str File of bif data string : str String of bif data include_properties: boolean If True, gets the properties tag from the file and stores in graph properties. n_jobs: int (default: 1) Number of jobs to run in parallel. `-1` means use all processors. Examples -------- >>> # dog-problem.bif file is present at >>> # http://www.cs.cmu.edu/~javabayes/Examples/DogProblem/dog-problem.bif >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") <pgmpy.readwrite.BIF.BIFReader object at 0x7f2375621cf8> >>> model = reader.get_model() Reference --------- [1] Geoff Hulten and Pedro Domingos. The interchange format for bayesian networks. http://www.cs.washington.edu/dm/vfml/appendixes/bif.htm, 2003. """ def __init__(self, path=None, string=None, include_properties=False, n_jobs=1): if path: with open(path, "r") as network: self.network = network.read() elif string: self.network = string else: raise ValueError("Must specify either path or string") self.n_jobs = n_jobs self.include_properties = include_properties if '"' in self.network: # Replacing quotes by spaces to remove case sensitivity like: # "Dog-Problem" and Dog-problem # or "true""false" and "true" "false" and true false self.network = self.network.replace('"', " ") if "/*" in self.network or "//" in self.network: self.network = cppStyleComment.suppress().transformString( self.network ) # removing comments from the file ( self.name_expr, self.state_expr, self.property_expr, ) = self.get_variable_grammar() self.probability_expr, self.cpd_expr = self.get_probability_grammar() self.network_name = self.get_network_name() self.variable_names = self.get_variables() self.variable_states = self.get_states() if self.include_properties: self.variable_properties = self.get_property() self.variable_parents = self.get_parents() self.variable_cpds = self.get_values() self.variable_edges = self.get_edges()
[docs] def get_variable_grammar(self): """ A method that returns variable grammar """ # Defining an expression for valid word word_expr = Word(pp.unicode.alphanums + "_" + "-" + ".") word_expr2 = Word(initChars=printables, excludeChars=["{", "}", ",", " "]) name_expr = Suppress("variable") + word_expr + Suppress("{") state_expr = ZeroOrMore(word_expr2 + Optional(Suppress(","))) # Defining a variable state expression variable_state_expr = ( Suppress("type") + Suppress(word_expr) + Suppress("[") + Suppress(Word(nums)) + Suppress("]") + Suppress("{") + Group(state_expr) + Suppress("}") + Suppress(";") ) # variable states is of the form type description [args] { val1, val2 }; (comma may or may not be present) property_expr = ( Suppress("property") + CharsNotIn(";") + Suppress(";") ) # Creating an expr to find property return name_expr, variable_state_expr, property_expr
[docs] def get_probability_grammar(self): """ A method that returns probability grammar """ # Creating valid word expression for probability, it is of the format # wor1 | var2 , var3 or var1 var2 var3 or simply var word_expr = ( Word(pp.unicode.alphanums + "-" + "_") + Suppress(Optional("|")) + Suppress(Optional(",")) ) word_expr2 = Word( initChars=printables, excludeChars=[",", ")", " ", "("] ) + Suppress(Optional(",")) # creating an expression for valid numbers, of the format # 1.00 or 1 or 1.00. 0.00 or 9.8e-5 etc num_expr = Word(nums + "-" + "+" + "e" + "E" + ".") + Suppress(Optional(",")) probability_expr = ( Suppress("probability") + Suppress("(") + OneOrMore(word_expr) + Suppress(")") ) optional_expr = Suppress("(") + OneOrMore(word_expr2) + Suppress(")") probab_attributes = optional_expr | Suppress("table") | Suppress("default") cpd_expr = probab_attributes + OneOrMore(num_expr) return probability_expr, cpd_expr
def variable_block(self): start = re.finditer("variable", self.network) for index in start: end = self.network.find("}\n", index.start()) yield self.network[index.start() : end] def probability_block(self): start = re.finditer("probability", self.network) for index in start: end = self.network.find("}\n", index.start()) yield self.network[index.start() : end]
[docs] def get_network_name(self): """ Returns the name of the network Example --------------- >>> from pgmpy.readwrite import BIFReader >>> reader = BIF.BifReader("bif_test.bif") >>> reader.network_name() 'Dog-Problem' """ start = self.network.find("network") end = self.network.find("}\n", start) # Creating a network attribute network_attribute = ( Suppress("network") + Word(pp.unicode.alphanums + "_" + "-") + "{" ) network_name = network_attribute.searchString(self.network[start:end])[0][0] return network_name
[docs] def get_variables(self): """ Returns list of variables of the network Example ------------- >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") >>> reader.get_variables() ['light-on','bowel_problem','dog-out','hear-bark','family-out'] """ variable_names = [] for block in self.variable_block(): name = self.name_expr.searchString(block)[0][0] variable_names.append(name) return variable_names
[docs] def get_states(self): """ Returns the states of variables present in the network Example ----------- >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") >>> reader.get_states() {'bowel-problem': ['true','false'], 'dog-out': ['true','false'], 'family-out': ['true','false'], 'hear-bark': ['true','false'], 'light-on': ['true','false']} """ variable_states = {} for block in self.variable_block(): name = self.name_expr.searchString(block)[0][0] variable_states[name] = list(self.state_expr.searchString(block)[0][0]) return variable_states
[docs] def get_property(self): """ Returns the property of the variable Example ------------- >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") >>> reader.get_property() {'bowel-problem': ['position = (335, 99)'], 'dog-out': ['position = (300, 195)'], 'family-out': ['position = (257, 99)'], 'hear-bark': ['position = (296, 268)'], 'light-on': ['position = (218, 195)']} """ variable_properties = {} for block in self.variable_block(): name = self.name_expr.searchString(block)[0][0] properties = self.property_expr.searchString(block) variable_properties[name] = [y.strip() for x in properties for y in x] return variable_properties
[docs] def get_parents(self): """ Returns the parents of the variables present in the network Example -------- >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") >>> reader.get_parents() {'bowel-problem': [], 'dog-out': ['family-out', 'bowel-problem'], 'family-out': [], 'hear-bark': ['dog-out'], 'light-on': ['family-out']} """ variable_parents = {} for block in self.probability_block(): names = self.probability_expr.searchString(block.split("\n")[0])[0] variable_parents[names[0]] = names[1:] return variable_parents
def _get_values_from_block(self, block): names = self.probability_expr.searchString(block) var_name, parents = names[0][0], names[0][1:] cpds = self.cpd_expr.searchString(block) # Check if the block is a table. if bool(re.search(".*\n[ ]*(table|default) .*\n.*", block)): arr = np.array([float(j) for i in cpds for j in i]) arr = arr.reshape( ( len(self.variable_states[var_name]), arr.size // len(self.variable_states[var_name]), ) ) else: arr_length = np.prod([len(self.variable_states[var]) for var in parents]) arr = np.zeros((len(self.variable_states[var_name]), arr_length)) values_dict = {} for prob_line in cpds: states = prob_line[: len(parents)] vals = [float(i) for i in prob_line[len(parents) :]] values_dict[tuple(states)] = vals for index, combination in enumerate( product(*[self.variable_states[var] for var in parents]) ): arr[:, index] = values_dict[combination] return var_name, arr
[docs] def get_values(self): """ Returns the CPD of the variables present in the network Example -------- >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") >>> reader.get_values() {'bowel-problem': np.array([[0.01], [0.99]]), 'dog-out': np.array([[0.99, 0.97, 0.9, 0.3], [0.01, 0.03, 0.1, 0.7]]), 'family-out': np.array([[0.15], [0.85]]), 'hear-bark': np.array([[0.7, 0.01], [0.3, 0.99]]), 'light-on': np.array([[0.6, 0.05], [0.4, 0.95]])} """ cpd_values = Parallel(n_jobs=self.n_jobs)( delayed(self._get_values_from_block)(block) for block in self.probability_block() ) variable_cpds = {} for var_name, arr in cpd_values: variable_cpds[var_name] = arr return variable_cpds
[docs] def get_edges(self): """ Returns the edges of the network Example -------- >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") >>> reader.get_edges() [['family-out', 'light-on'], ['family-out', 'dog-out'], ['bowel-problem', 'dog-out'], ['dog-out', 'hear-bark']] """ edges = [ [value, key] for key in self.variable_parents.keys() for value in self.variable_parents[key] ] return edges
[docs] def get_model(self, state_name_type=str): """ Returns the Bayesian Model read from the file/str. Parameters ---------- state_name_type: int, str or bool (default: str) The data type to which to convert the state names of the variables. Example ---------- >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") >>> reader.get_model() <pgmpy.models.DiscreteBayesianNetwork.DiscreteBayesianNetwork object at 0x7f20af154320> """ try: model = DiscreteBayesianNetwork() model.add_nodes_from(self.variable_names) model.add_edges_from(self.variable_edges) model.name = self.network_name tabular_cpds = [] for var in sorted(self.variable_cpds.keys()): values = self.variable_cpds[var] sn = { p_var: list(map(state_name_type, self.variable_states[p_var])) for p_var in self.variable_parents[var] } sn[var] = list(map(state_name_type, self.variable_states[var])) cpd = TabularCPD( var, len(self.variable_states[var]), values, evidence=self.variable_parents[var], evidence_card=[ len(self.variable_states[evidence_var]) for evidence_var in self.variable_parents[var] ], state_names=sn, ) tabular_cpds.append(cpd) model.add_cpds(*tabular_cpds) if self.include_properties: for node, properties in self.variable_properties.items(): for prop in properties: prop_name, prop_value = map( lambda t: t.strip(), prop.split("=") ) model.nodes[node][prop_name] = prop_value return model except AttributeError: raise AttributeError( "First get states of variables, edges, parents and network name" )
[docs] class BIFWriter(object): """ Initialise a BIFWriter Object Parameters ---------- model: DiscreteBayesianNetwork Instance round_values: int (default: None) Round the probability values to `round_values` decimals. If None, keeps all decimal points. Examples --------- >>> from pgmpy.readwrite import BIFWriter >>> from pgmpy.utils import get_example_model >>> asia = get_example_model("asia") >>> writer = BIFWriter(asia) >>> writer <writer_BIF.BIFWriter at 0x7f05e5ea27b8> >>> writer.write_bif("asia.bif") """ def __init__(self, model, round_values=None): if not isinstance(model, DiscreteBayesianNetwork): raise TypeError("model must be an instance of DiscreteBayesianNetwork") self.model = model self.round_values = round_values if not self.model.name: self.network_name = "unknown" else: self.network_name = self.model.name self.variable_states = self.get_states() self.property_tag = self.get_properties() self.variable_parents = self.get_parents() self.tables = self.get_cpds()
[docs] def BIF_templates(self): """ Create template for writing in BIF format """ network_template = Template("network $name {\n}\n") # property tag may or may not be present in model,and since no of properties # can be more than one, will replace them according to format otherwise null variable_template = Template( """variable $name { type discrete [ $no_of_states ] { $states }; $properties}\n""" ) property_template = Template(" property $prop ;\n") # $variable_ here is name of variable, used underscore for clarity probability_template = Template( """probability ( $variable_$separator_$parents ) { table $values ; }\n""" ) conditional_probability_template_total = Template( """probability ( $variable_$separator_$parents ) { $values }\n""" ) conditional_probability_template = Template(""" ( $state ) $values;\n""") return ( network_template, variable_template, property_template, probability_template, conditional_probability_template_total, conditional_probability_template, )
def __str__(self): """ Returns the BIF format as string """ ( network_template, variable_template, property_template, probability_template, conditional_probability_template_total, conditional_probability_template, ) = self.BIF_templates() network = "" network += network_template.substitute(name=self.network_name) variables = self.model.nodes() for var in sorted(variables): no_of_states = str(len(self.variable_states[var])) states = ", ".join(self.variable_states[var]) if not self.property_tag[var]: properties = "" else: properties = "" for prop_val in self.property_tag[var]: properties += property_template.substitute(prop=prop_val) network += variable_template.substitute( name=var, no_of_states=no_of_states, states=states, properties=properties, ) for var in sorted(variables): if not self.variable_parents[var]: parents = "" separator = "" cpd = ", ".join(map(str, self.tables[var])) network += probability_template.substitute( variable_=var, separator_=separator, parents=parents, values=cpd ) else: parents_str = ", ".join(self.variable_parents[var]) separator = " | " cpd = self.model.get_cpds(var) cpd_values_transpose = cpd.get_values().T # Get the sanitized state names for parents from self.variable_states parent_states = product( *[self.variable_states[var] for var in cpd.variables[1:]] ) all_cpd = "" for index, state in enumerate(parent_states): all_cpd += conditional_probability_template.substitute( state=", ".join(map(str, state)), values=", ".join( map( str, compat_fns.to_numpy( cpd_values_transpose[index, :], decimals=self.round_values, ), ) ), ) network += conditional_probability_template_total.substitute( variable_=var, separator_=separator, parents=parents_str, values=all_cpd, ) return network
[docs] def get_variables(self): """ Add variables to BIF Returns ------- list: a list containing names of variable Example ------- >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> model = BIFReader("dog-problem.bif").get_model() >>> writer = BIFWriter(model) >>> writer.get_variables() ['bowel-problem', 'family-out', 'hear-bark', 'light-on', 'dog-out'] """ variables = self.model.nodes() return variables
[docs] def get_states(self): """ Add states to variable of BIF, handling commas in state names by replacing them with underscores. Returns ------- dict: dict of type {variable: a list of states} Example ------- >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> model = BIFReader("dog-problem.bif").get_model() >>> writer = BIFWriter(model) >>> writer.get_states() {'bowel-problem': ['bowel-problem_0', 'bowel-problem_1'], 'dog-out': ['dog-out_0', 'dog-out_1'], 'family-out': ['family-out_0', 'family-out_1'], 'hear-bark': ['hear-bark_0', 'hear-bark_1'], 'light-on': ['light-on_0', 'light-on_1']} """ variable_states = {} cpds = self.model.get_cpds() for cpd in cpds: variable = cpd.variable variable_states[variable] = [] for state in cpd.state_names[variable]: state_str = str(state) # Warn users if any commas in state names if "," in state_str: logger.warning( f"State name '{state_str}' for variable '{variable}' contains commas. " "This may cause issues when loading the file. Consider removing any special characters." ) variable_states[variable].append(state_str) return variable_states
[docs] def get_properties(self): """ Add property to variables in BIF Returns ------- dict: dict of type {variable: list of properties } Example ------- >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> model = BIFReader("dog-problem.bif").get_model() >>> writer = BIFWriter(model) >>> writer.get_properties() {'bowel-problem': ['position = (335, 99)'], 'dog-out': ['position = (300, 195)'], 'family-out': ['position = (257, 99)'], 'hear-bark': ['position = (296, 268)'], 'light-on': ['position = (218, 195)']} """ variables = self.model.nodes() property_tag = {} for variable in sorted(variables): properties = self.model.nodes[variable] properties = collections.OrderedDict(sorted(properties.items())) property_tag[variable] = [] for prop, val in properties.items(): property_tag[variable].append(str(prop) + " = " + str(val)) return property_tag
[docs] def get_parents(self): """ Add the parents to BIF Returns ------- dict: dict of type {variable: a list of parents} Example ------- >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> model = BIFReader("dog-problem.bif").get_model() >>> writer = BIFWriter(model) >>> writer.get_parents() {'bowel-problem': [], 'dog-out': ['bowel-problem', 'family-out'], 'family-out': [], 'hear-bark': ['dog-out'], 'light-on': ['family-out']} """ cpds = self.model.get_cpds() variable_parents = {} for cpd in cpds: variable_parents[cpd.variable] = cpd.variables[1:] return variable_parents
[docs] def get_cpds(self): """ Adds tables to BIF Returns ------- dict: dict of type {variable: array} Example ------- >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> model = BIFReader("dog-problem.bif").get_model() >>> writer = BIFWriter(model) >>> writer.get_cpds() {'bowel-problem': array([ 0.01, 0.99]), 'dog-out': array([ 0.99, 0.97, 0.9 , 0.3 , 0.01, 0.03, 0.1 , 0.7 ]), 'family-out': array([ 0.15, 0.85]), 'hear-bark': array([ 0.7 , 0.01, 0.3 , 0.99]), 'light-on': array([ 0.6 , 0.05, 0.4 , 0.95])} """ cpds = self.model.get_cpds() tables = {} for cpd in cpds: tables[cpd.variable] = compat_fns.to_numpy( cpd.values.ravel(), decimals=self.round_values ) return tables
[docs] def write_bif(self, filename): """ Writes the BIF data into a file Parameters ---------- filename : Name of the file Example ------- >>> from pgmpy.utils import get_example_model >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> asia = get_example_model("asia") >>> writer = BIFWriter(asia) >>> writer.write_bif(filename="asia.bif") """ writer = self.__str__() with open(filename, "w") as fout: fout.write(writer)