Source code for pgmpy.readwrite.BIF

import re
import collections
from string import Template
from itertools import product

import numpy as np
from joblib import Parallel, delayed
from pyparsing import (
    Word,
    alphanums,
    Suppress,
    Optional,
    CharsNotIn,
    Group,
    nums,
    ZeroOrMore,
    OneOrMore,
    cppStyleComment,
    printables,
)

from pgmpy.models import BayesianModel
from pgmpy.factors.discrete import TabularCPD


[docs]class BIFReader(object): """ Base class for reading network file in bif format """ def __init__(self, path=None, string=None, include_properties=False, n_jobs=-1): """ Initializes a BIFReader object. Parameters ---------- path : file or str File of bif data string : str String of bif data include_properties: boolean If True, gets the properties tag from the file and stores in graph properties. n_jobs: int (default: -1) Number of jobs to run in parallel. `-1` means use all processors. Examples -------- # dog-problem.bif file is present at # http://www.cs.cmu.edu/~javabayes/Examples/DogProblem/dog-problem.bif >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") <pgmpy.readwrite.BIF.BIFReader object at 0x7f2375621cf8> >>> model = reader.get_model() """ if path: with open(path, "r") as network: self.network = network.read() elif string: self.network = string else: raise ValueError("Must specify either path or string") self.n_jobs = n_jobs self.include_properties = include_properties if '"' in self.network: # Replacing quotes by spaces to remove case sensitivity like: # "Dog-Problem" and Dog-problem # or "true""false" and "true" "false" and true false self.network = self.network.replace('"', " ") if "/*" in self.network or "//" in self.network: self.network = cppStyleComment.suppress().transformString( self.network ) # removing comments from the file ( self.name_expr, self.state_expr, self.property_expr, ) = self.get_variable_grammar() self.probability_expr, self.cpd_expr = self.get_probability_grammar() self.network_name = self.get_network_name() self.variable_names = self.get_variables() self.variable_states = self.get_states() if self.include_properties: self.variable_properties = self.get_property() self.variable_parents = self.get_parents() self.variable_cpds = self.get_values() self.variable_edges = self.get_edges()
[docs] def get_variable_grammar(self): """ A method that returns variable grammar """ # Defining a expression for valid word word_expr = Word(alphanums + "_" + "-") word_expr2 = Word(initChars=printables, excludeChars=["{", "}", ",", " "]) name_expr = Suppress("variable") + word_expr + Suppress("{") state_expr = ZeroOrMore(word_expr2 + Optional(Suppress(","))) # Defining a variable state expression variable_state_expr = ( Suppress("type") + Suppress(word_expr) + Suppress("[") + Suppress(Word(nums)) + Suppress("]") + Suppress("{") + Group(state_expr) + Suppress("}") + Suppress(";") ) # variable states is of the form type description [args] { val1, val2 }; (comma may or may not be present) property_expr = ( Suppress("property") + CharsNotIn(";") + Suppress(";") ) # Creating a expr to find property return name_expr, variable_state_expr, property_expr
[docs] def get_probability_grammar(self): """ A method that returns probability grammar """ # Creating valid word expression for probability, it is of the format # wor1 | var2 , var3 or var1 var2 var3 or simply var word_expr = ( Word(alphanums + "-" + "_") + Suppress(Optional("|")) + Suppress(Optional(",")) ) word_expr2 = Word( initChars=printables, excludeChars=[",", ")", " ", "("] ) + Suppress(Optional(",")) # creating an expression for valid numbers, of the format # 1.00 or 1 or 1.00. 0.00 or 9.8e-5 etc num_expr = Word(nums + "-" + "+" + "e" + "E" + ".") + Suppress(Optional(",")) probability_expr = ( Suppress("probability") + Suppress("(") + OneOrMore(word_expr) + Suppress(")") ) optional_expr = Suppress("(") + OneOrMore(word_expr2) + Suppress(")") probab_attributes = optional_expr | Suppress("table") cpd_expr = probab_attributes + OneOrMore(num_expr) return probability_expr, cpd_expr
def variable_block(self): start = re.finditer("variable", self.network) for index in start: end = self.network.find("}\n", index.start()) yield self.network[index.start() : end] def probability_block(self): start = re.finditer("probability", self.network) for index in start: end = self.network.find("}\n", index.start()) yield self.network[index.start() : end]
[docs] def get_network_name(self): """ Returns the name of the network Example --------------- >>> from pgmpy.readwrite import BIFReader >>> reader = BIF.BifReader("bif_test.bif") >>> reader.network_name() 'Dog-Problem' """ start = self.network.find("network") end = self.network.find("}\n", start) # Creating a network attribute network_attribute = Suppress("network") + Word(alphanums + "_" + "-") + "{" network_name = network_attribute.searchString(self.network[start:end])[0][0] return network_name
[docs] def get_variables(self): """ Returns list of variables of the network Example ------------- >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") >>> reader.get_variables() ['light-on','bowel_problem','dog-out','hear-bark','family-out'] """ variable_names = [] for block in self.variable_block(): name = self.name_expr.searchString(block)[0][0] variable_names.append(name) return variable_names
[docs] def get_states(self): """ Returns the states of variables present in the network Example ----------- >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") >>> reader.get_states() {'bowel-problem': ['true','false'], 'dog-out': ['true','false'], 'family-out': ['true','false'], 'hear-bark': ['true','false'], 'light-on': ['true','false']} """ variable_states = {} for block in self.variable_block(): name = self.name_expr.searchString(block)[0][0] variable_states[name] = list(self.state_expr.searchString(block)[0][0]) return variable_states
[docs] def get_property(self): """ Returns the property of the variable Example ------------- >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") >>> reader.get_property() {'bowel-problem': ['position = (335, 99)'], 'dog-out': ['position = (300, 195)'], 'family-out': ['position = (257, 99)'], 'hear-bark': ['position = (296, 268)'], 'light-on': ['position = (218, 195)']} """ variable_properties = {} for block in self.variable_block(): name = self.name_expr.searchString(block)[0][0] properties = self.property_expr.searchString(block) variable_properties[name] = [y.strip() for x in properties for y in x] return variable_properties
[docs] def get_parents(self): """ Returns the parents of the variables present in the network Example -------- >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") >>> reader.get_parents() {'bowel-problem': [], 'dog-out': ['family-out', 'bowel-problem'], 'family-out': [], 'hear-bark': ['dog-out'], 'light-on': ['family-out']} """ variable_parents = {} for block in self.probability_block(): names = self.probability_expr.searchString(block.split("\n")[0])[0] variable_parents[names[0]] = names[1:] return variable_parents
def _get_values_from_block(self, block): names = self.probability_expr.searchString(block) var_name, parents = names[0][0], names[0][1:] cpds = self.cpd_expr.searchString(block) # Check if the block is a table. if bool(re.search(".*\\n[ ]*table .*\n.*", block)): arr = np.array([float(j) for i in cpds for j in i]) arr = arr.reshape( ( len(self.variable_states[var_name]), arr.size // len(self.variable_states[var_name]), ) ) else: arr_length = np.prod([len(self.variable_states[var]) for var in parents]) arr = np.zeros((len(self.variable_states[var_name]), arr_length)) values_dict = {} for prob_line in cpds: states = prob_line[: len(parents)] vals = [float(i) for i in prob_line[len(parents) :]] values_dict[tuple(states)] = vals for index, combination in enumerate( product(*[self.variable_states[var] for var in parents]) ): arr[:, index] = values_dict[combination] return var_name, arr
[docs] def get_values(self): """ Returns the CPD of the variables present in the network Example -------- >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") >>> reader.get_values() {'bowel-problem': np.array([[0.01], [0.99]]), 'dog-out': np.array([[0.99, 0.97, 0.9, 0.3], [0.01, 0.03, 0.1, 0.7]]), 'family-out': np.array([[0.15], [0.85]]), 'hear-bark': np.array([[0.7, 0.01], [0.3, 0.99]]), 'light-on': np.array([[0.6, 0.05], [0.4, 0.95]])} """ cpd_values = Parallel(n_jobs=self.n_jobs)( delayed(self._get_values_from_block)(block) for block in self.probability_block() ) variable_cpds = {} for var_name, arr in cpd_values: variable_cpds[var_name] = arr return variable_cpds
[docs] def get_edges(self): """ Returns the edges of the network Example -------- >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") >>> reader.get_edges() [['family-out', 'light-on'], ['family-out', 'dog-out'], ['bowel-problem', 'dog-out'], ['dog-out', 'hear-bark']] """ edges = [ [value, key] for key in self.variable_parents.keys() for value in self.variable_parents[key] ] return edges
[docs] def get_model(self, state_name_type=str): """ Returns the Bayesian Model read from the file/str. Parameters ---------- state_name_type: int, str or bool (default: str) The data type to which to convert the state names of the variables. Example ---------- >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") >>> reader.get_model() <pgmpy.models.BayesianModel.BayesianModel object at 0x7f20af154320> """ try: model = BayesianModel() model.add_nodes_from(self.variable_names) model.add_edges_from(self.variable_edges) model.name = self.network_name tabular_cpds = [] for var in sorted(self.variable_cpds.keys()): values = self.variable_cpds[var] sn = { p_var: list(map(state_name_type, self.variable_states[p_var])) for p_var in self.variable_parents[var] } sn[var] = list(map(state_name_type, self.variable_states[var])) cpd = TabularCPD( var, len(self.variable_states[var]), values, evidence=self.variable_parents[var], evidence_card=[ len(self.variable_states[evidence_var]) for evidence_var in self.variable_parents[var] ], state_names=sn, ) tabular_cpds.append(cpd) model.add_cpds(*tabular_cpds) if self.include_properties: for node, properties in self.variable_properties.items(): for prop in properties: prop_name, prop_value = map( lambda t: t.strip(), prop.split("=") ) model.nodes[node][prop_name] = prop_value return model except AttributeError: raise AttributeError( "First get states of variables, edges, parents and network name" )
[docs]class BIFWriter(object): """ Base class for writing BIF network file format """ def __init__(self, model): """ Initialise a BIFWriter Object Parameters ---------- model: BayesianModel Instance Examples --------- >>> from pgmpy.readwrite import BIFWriter >>> from pgmpy.utils import get_example_model >>> asia = get_example_model('asia') >>> writer = BIFWriter(asia) >>> writer <writer_BIF.BIFWriter at 0x7f05e5ea27b8> >>> writer.write_bif('asia.bif') """ if not isinstance(model, BayesianModel): raise TypeError("model must be an instance of BayesianModel") self.model = model if not self.model.name: self.network_name = "unknown" else: self.network_name = self.model.name self.variable_states = self.get_states() self.property_tag = self.get_properties() self.variable_parents = self.get_parents() self.tables = self.get_cpds()
[docs] def BIF_templates(self): """ Create template for writing in BIF format """ network_template = Template("network $name {\n}\n") # property tag may or may not be present in model,and since no of properties # can be more than one , will replace them accoriding to format otherwise null variable_template = Template( """variable $name { type discrete [ $no_of_states ] { $states }; $properties}\n""" ) property_template = Template(" property $prop ;\n") # $variable_ here is name of variable, used underscore for clarity probability_template = Template( """probability ( $variable_$seprator_$parents ) { table $values ; }\n""" ) return ( network_template, variable_template, property_template, probability_template, )
def __str__(self): """ Returns the BIF format as string """ ( network_template, variable_template, property_template, probability_template, ) = self.BIF_templates() network = "" network += network_template.substitute(name=self.network_name) variables = self.model.nodes() for var in sorted(variables): no_of_states = str(len(self.variable_states[var])) states = ", ".join(self.variable_states[var]) if not self.property_tag[var]: properties = "" else: properties = "" for prop_val in self.property_tag[var]: properties += property_template.substitute(prop=prop_val) network += variable_template.substitute( name=var, no_of_states=no_of_states, states=states, properties=properties, ) for var in sorted(variables): if not self.variable_parents[var]: parents = "" seprator = "" else: parents = ", ".join(self.variable_parents[var]) seprator = " | " cpd = ", ".join(map(str, self.tables[var])) network += probability_template.substitute( variable_=var, seprator_=seprator, parents=parents, values=cpd ) return network
[docs] def get_variables(self): """ Add variables to BIF Returns ------- list: a list containing names of variable Example ------- >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> model = BIFReader('dog-problem.bif').get_model() >>> writer = BIFWriter(model) >>> writer.get_variables() ['bowel-problem', 'family-out', 'hear-bark', 'light-on', 'dog-out'] """ variables = self.model.nodes() return variables
[docs] def get_states(self): """ Add states to variable of BIF Returns ------- dict: dict of type {variable: a list of states} Example ------- >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> model = BIFReader('dog-problem.bif').get_model() >>> writer = BIFWriter(model) >>> writer.get_states() {'bowel-problem': ['bowel-problem_0', 'bowel-problem_1'], 'dog-out': ['dog-out_0', 'dog-out_1'], 'family-out': ['family-out_0', 'family-out_1'], 'hear-bark': ['hear-bark_0', 'hear-bark_1'], 'light-on': ['light-on_0', 'light-on_1']} """ variable_states = {} cpds = self.model.get_cpds() for cpd in cpds: variable = cpd.variable variable_states[variable] = [] for state in cpd.state_names[variable]: variable_states[variable].append(str(state)) return variable_states
[docs] def get_properties(self): """ Add property to variables in BIF Returns ------- dict: dict of type {variable: list of properties } Example ------- >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> model = BIFReader('dog-problem.bif').get_model() >>> writer = BIFWriter(model) >>> writer.get_properties() {'bowel-problem': ['position = (335, 99)'], 'dog-out': ['position = (300, 195)'], 'family-out': ['position = (257, 99)'], 'hear-bark': ['position = (296, 268)'], 'light-on': ['position = (218, 195)']} """ variables = self.model.nodes() property_tag = {} for variable in sorted(variables): properties = self.model.nodes[variable] properties = collections.OrderedDict(sorted(properties.items())) property_tag[variable] = [] for prop, val in properties.items(): property_tag[variable].append(str(prop) + " = " + str(val)) return property_tag
[docs] def get_parents(self): """ Add the parents to BIF Returns ------- dict: dict of type {variable: a list of parents} Example ------- >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> model = BIFReader('dog-problem.bif').get_model() >>> writer = BIFWriter(model) >>> writer.get_parents() {'bowel-problem': [], 'dog-out': ['bowel-problem', 'family-out'], 'family-out': [], 'hear-bark': ['dog-out'], 'light-on': ['family-out']} """ cpds = self.model.get_cpds() variable_parents = {} for cpd in cpds: variable_parents[cpd.variable] = cpd.variables[1:] return variable_parents
[docs] def get_cpds(self): """ Adds tables to BIF Returns ------- dict: dict of type {variable: array} Example ------- >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> model = BIFReader('dog-problem.bif').get_model() >>> writer = BIFWriter(model) >>> writer.get_cpds() {'bowel-problem': array([ 0.01, 0.99]), 'dog-out': array([ 0.99, 0.97, 0.9 , 0.3 , 0.01, 0.03, 0.1 , 0.7 ]), 'family-out': array([ 0.15, 0.85]), 'hear-bark': array([ 0.7 , 0.01, 0.3 , 0.99]), 'light-on': array([ 0.6 , 0.05, 0.4 , 0.95])} """ cpds = self.model.get_cpds() tables = {} for cpd in cpds: tables[cpd.variable] = cpd.values.ravel() return tables
[docs] def write_bif(self, filename): """ Writes the BIF data into a file Parameters ---------- filename : Name of the file Example ------- >>> from pgmpy.utils import get_example_model >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> asia = get_example_model('asia') >>> writer = BIFWriter(asia) >>> writer.write_bif(filename='asia.bif') """ writer = self.__str__() with open(filename, "w") as fout: fout.write(writer)