Source code for pgmpy.readwrite.BIF

import re
import warnings
from itertools import product
from string import Template

import numpy as np
import pandas as pd
import pyparsing as pp

try:
    from pyparsing import (
        CharsNotIn,
        Group,
        OneOrMore,
        Optional,
        Suppress,
        Word,
        ZeroOrMore,
        nums,
    )
except ImportError as e:
    raise ImportError(
        f"{e}. pyparsing is required for using read/write methods. Please install using: pip install pyparsing."
    ) from None

from pgmpy import logger
from pgmpy.factors.discrete import TabularCPD
from pgmpy.models import DiscreteBayesianNetwork
from pgmpy.utils import compat_fns


[docs] class BIFReader: """ Initializes a BIFReader object. Parameters ---------- path : file or str File of bif data string : str String of bif data include_properties: boolean If True, gets the properties tag from the file and stores in graph properties. Examples -------- >>> # dog-problem.bif file is present at >>> # http://www.cs.cmu.edu/~javabayes/Examples/DogProblem/dog-problem.bif >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") <pgmpy.readwrite.BIF.BIFReader object at 0x7f2375621cf8> >>> model = reader.get_model() Reference --------- [1] Geoff Hulten and Pedro Domingos. The interchange format for bayesian networks. http://www.cs.washington.edu/dm/vfml/appendixes/bif.htm, 2003. """ def __init__(self, path=None, string=None, include_properties=False): if path: with open(path) as network: self.network = network.read() elif string: self.network = string else: raise ValueError("Must specify either path or string") self.include_properties = include_properties if "/*" in self.network or "//" in self.network: # removing comments from the file pattern = r'("[^"\\]*(?:\\.[^"\\]*)*")|(/\*.*?\*/|//[^\n]*)' regex = re.compile(pattern, re.DOTALL) self.network = regex.sub(lambda m: m.group(1) if m.group(1) else "", self.network) if '"' in self.network: # Replacing quotes by spaces to remove case sensitivity like: # "Dog-Problem" and Dog-problem # or "true""false" and "true" "false" and true false self.network = self.network.replace('"', " ") ( name_expr, state_expr, property_expr, ) = self.get_variable_grammar() probability_expr, cpd_expr = self.get_probability_grammar() # self.get_network_name() match = re.search(r"network\s+([\w-]+)\s*\{", self.network) self.network_name = match.group(1) if match else None block_pattern = re.compile(r"(variable|probability).*?\}\n", re.DOTALL) # Regex for parsing probability headers: handles spaces and dots in names prob_header_re = re.compile(r"probability\s*\(\s*(.+?)(?:\s*\|\s*(.+?))?\s*\)") # Regex for detecting table/default keywords (not inside state names) table_keyword_re = re.compile(r"(?:^|\{)\s*(table|default)\s", re.MULTILINE) # Regex for extracting state names from type declarations state_decl_re = re.compile(r"type\s+\w+\s*\[\s*\d+\s*\]\s*\{([^}]+)\}\s*;") self.variable_states = {} self.variable_names = [] if self.include_properties: self.variable_properties = {} self.variable_parents = {} self.variable_edges = [] probability_blocks = [] for match in block_pattern.finditer(self.network): block_content = match.group(0) # self.get_variables(), self.get_states(), self.get_property() if block_content.startswith("variable"): name = name_expr.search_string(block_content)[0][0] self.variable_names.append(name) state_match = state_decl_re.search(block_content) raw_states = state_match.group(1) if "," in raw_states: states = [s.strip() for s in raw_states.split(",") if s.strip()] else: states = raw_states.split() self.variable_states[name] = states if self.include_properties: properties = property_expr.search_string(block_content) self.variable_properties[name] = [y.strip() for x in properties for y in x] # self.get_parents(), self.get_edges() elif block_content.startswith("probability"): header_line = block_content.split("\n")[0] prob_match = prob_header_re.search(header_line) var_name = prob_match.group(1).strip() if prob_match.group(2): # Has | separator: supports multi-word names parents = [p.strip() for p in prob_match.group(2).split(",")] else: # No | separator: fall back to space-separated word splitting # (old BIF format: first word is variable, rest are parents) words = var_name.split() if len(words) > 1: var_name = words[0] parents = list(words[1:]) else: parents = [] self.variable_parents[var_name] = parents self.variable_edges.extend([[p, var_name] for p in parents]) probability_blocks.append((block_content, var_name, parents)) # Normalize variable names in probability references to match declarations # (handles case mismatches like "neuroticism" vs "NEUROTICISM") name_map = {name.lower(): name for name in self.variable_names} normalized_parents = {} normalized_edges = [] for var_name, parents in self.variable_parents.items(): norm_var = name_map.get(var_name.lower(), var_name) norm_parents = [name_map.get(p.lower(), p) for p in parents] normalized_parents[norm_var] = norm_parents normalized_edges.extend([[p, norm_var] for p in norm_parents]) self.variable_parents = normalized_parents self.variable_edges = normalized_edges probability_blocks = [ (bc, name_map.get(vn.lower(), vn), [name_map.get(p.lower(), p) for p in ps]) for bc, vn, ps in probability_blocks ] # self.get_values() self.variable_cpds = {} state_maps = {var: {state: i for i, state in enumerate(states)} for var, states in self.variable_states.items()} for block_content, var_name, parents in probability_blocks: cpds_list = cpd_expr.search_string(block_content) n_rows = len(self.variable_states[var_name]) if table_keyword_re.search(block_content): arr = [float(j) for i in cpds_list for j in i] arr = np.array(arr).reshape(n_rows, -1) self.variable_cpds[var_name] = arr else: parent_cards = [len(self.variable_states[p]) for p in parents] arr_length = int(np.prod(parent_cards)) arr = np.zeros((n_rows, arr_length)) len_parents = len(parents) if len(cpds_list) > 0: df = pd.DataFrame(cpds_list) state_df = df.iloc[:, :len_parents].copy() values_df = df.iloc[:, len_parents:] for idx, parent in enumerate(parents): col = state_df.columns[idx] state_df = state_df.astype({col: "object"}) state_df.iloc[:, idx] = state_df.iloc[:, idx].map(state_maps[parent]) strides = np.cumprod([1] + parent_cards[::-1])[:-1][::-1] col_indices = state_df.dot(strides).astype(int) arr[:, col_indices] = values_df.astype(float).T self.variable_cpds[var_name] = arr
[docs] def get_variable_grammar(self): """ A method that returns variable grammar """ # Variable name: everything between "variable" and "{", allowing spaces name_expr = Suppress("variable") + pp.Regex(r"[^{]+").set_parse_action(lambda t: t[0].strip()) + Suppress("{") # State names: comma-separated values that may contain spaces state_value = pp.Regex(r"[^,};]+").set_parse_action(lambda t: t[0].strip()) # Defining a variable state expression variable_state_expr = ( Suppress("type") + Suppress(Word(pp.unicode.alphanums + "_" + "-" + ".")) + Suppress("[") + Suppress(Word(nums)) + Suppress("]") + Suppress("{") + Group(state_value + ZeroOrMore(Suppress(",") + state_value)) + Suppress("}") + Suppress(";") ) # variable states is of the form type description [args] { val1, val2 }; (comma may or may not be present) property_expr = Suppress("property") + CharsNotIn(";") + Suppress(";") # Creating an expr to find property return name_expr, variable_state_expr, property_expr
[docs] def get_probability_grammar(self): """ A method that returns probability grammar """ # Creating valid word expression for probability, it is of the format # wor1 | var2 , var3 or var1 var2 var3 or simply var word_expr = Word(pp.unicode.alphanums + "-" + "_" + ".") + Suppress(Optional("|")) + Suppress(Optional(",")) # creating an expression for valid numbers, of the format # 1.00 or 1 or 1.00. 0.00 or 9.8e-5 etc num_expr = Word(nums + "-" + "+" + "e" + "E" + ".") + Suppress(Optional(",")) probability_expr = Suppress("probability") + Suppress("(") + OneOrMore(word_expr) + Suppress(")") # State values in CPD rows: comma-separated values that may contain spaces state_value = pp.Regex(r"[^,)]+").set_parse_action(lambda t: t[0].strip()) optional_expr = Suppress("(") + state_value + ZeroOrMore(Suppress(",") + state_value) + Suppress(")") probab_attributes = optional_expr | Suppress("table") | Suppress("default") cpd_expr = probab_attributes + OneOrMore(num_expr) return probability_expr, cpd_expr
[docs] def get_model(self, state_name_type=str): """ Returns the Bayesian Model read from the file/str. Parameters ---------- state_name_type: int, str or bool (default: str) The data type to which to convert the state names of the variables. Example ---------- >>> from pgmpy.readwrite import BIFReader >>> reader = BIFReader("bif_test.bif") >>> reader.get_model() <pgmpy.models.DiscreteBayesianNetwork.DiscreteBayesianNetwork object at 0x7f20af154320> """ model = DiscreteBayesianNetwork() model.add_nodes_from(self.variable_names) model.add_edges_from(self.variable_edges) model.name = self.network_name tabular_cpds = [] for var in sorted(self.variable_cpds.keys()): values = self.variable_cpds[var] sn = { p_var: list(map(state_name_type, self.variable_states[p_var])) for p_var in self.variable_parents[var] } sn[var] = list(map(state_name_type, self.variable_states[var])) cpd = TabularCPD( var, len(self.variable_states[var]), values, evidence=self.variable_parents[var], evidence_card=[len(self.variable_states[evidence_var]) for evidence_var in self.variable_parents[var]], state_names=sn, ) tabular_cpds.append(cpd) model.add_cpds(*tabular_cpds) if self.include_properties: for node, properties in self.variable_properties.items(): for prop in properties: prop_name, prop_value = map(lambda t: t.strip(), prop.split("=")) model.nodes[node][prop_name] = prop_value return model
[docs] class BIFWriter: """ Initialise a BIFWriter Object Parameters ---------- model: DiscreteBayesianNetwork Instance round_values: int (default: None) Round the probability values to `round_values` decimals. If None, keeps all decimal points. Examples --------- >>> from pgmpy.readwrite import BIFWriter >>> from pgmpy.example_models import load_model >>> asia = load_model("bnlearn/asia") >>> writer = BIFWriter(asia) >>> writer <writer_BIF.BIFWriter at 0x7f05e5ea27b8> >>> writer.write("asia.bif") """ def __init__(self, model, round_values=None): if not isinstance(model, DiscreteBayesianNetwork): raise TypeError("model must be an instance of DiscreteBayesianNetwork") self.model = model self.round_values = round_values if not self.model.name: self.network_name = "unknown" else: self.network_name = self.model.name self.variable_states = self.get_states() self.property_tag = self.get_properties() self.variable_parents = self.get_parents() self.tables = self.get_cpds()
[docs] def BIF_templates(self): """ Create template for writing in BIF format """ network_template = Template("network $name {\n}\n") # property tag may or may not be present in model,and since no of properties # can be more than one, will replace them according to format otherwise null variable_template = Template( """variable $name { type discrete [ $no_of_states ] { $states }; $properties}\n""" ) property_template = Template(" property $prop ;\n") # $variable_ here is name of variable, used underscore for clarity probability_template = Template( """probability ( $variable_$separator_$parents ) { table $values ; }\n""" ) conditional_probability_template_total = Template( """probability ( $variable_$separator_$parents ) { $values }\n""" ) conditional_probability_template = Template(""" ( $state ) $values;\n""") return ( network_template, variable_template, property_template, probability_template, conditional_probability_template_total, conditional_probability_template, )
def __str__(self): """ Returns the BIF format as string """ ( network_template, variable_template, property_template, probability_template, conditional_probability_template_total, conditional_probability_template, ) = self.BIF_templates() network = "" network += network_template.substitute(name=self.network_name) variables = self.model.nodes() sorted_variables = sorted(variables) for var in sorted_variables: no_of_states = str(len(self.variable_states[var])) states = ", ".join(self.variable_states[var]) if not self.property_tag[var]: properties = "" else: properties = "" for prop_val in self.property_tag[var]: properties += property_template.substitute(prop=prop_val) network += variable_template.substitute( name=var, no_of_states=no_of_states, states=states, properties=properties, ) for var in sorted_variables: if not self.variable_parents[var]: parents = "" separator = "" cpd = ", ".join(map(str, self.tables[var])) network += probability_template.substitute( variable_=var, separator_=separator, parents=parents, values=cpd ) else: parents_str = ", ".join(self.variable_parents[var]) separator = " | " cpd = self.model.get_cpds(var) cpd_values_transpose = cpd.get_values().T # Get the sanitized state names for parents from self.variable_states parent_states = product(*[self.variable_states[var] for var in cpd.variables[1:]]) all_cpd = "" for index, state in enumerate(parent_states): all_cpd += conditional_probability_template.substitute( state=", ".join(map(str, state)), values=", ".join( map( str, compat_fns.to_numpy( cpd_values_transpose[index, :], decimals=self.round_values, ), ) ), ) network += conditional_probability_template_total.substitute( variable_=var, separator_=separator, parents=parents_str, values=all_cpd, ) return network
[docs] def get_variables(self): """ Add variables to BIF Returns ------- list: a list containing names of variable Example ------- >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> model = BIFReader("dog-problem.bif").get_model() >>> writer = BIFWriter(model) >>> writer.get_variables() ['bowel-problem', 'family-out', 'hear-bark', 'light-on', 'dog-out'] """ variables = self.model.nodes() return variables
[docs] def get_states(self): """ Add states to variable of BIF, handling commas in state names by replacing them with underscores. Returns ------- dict: dict of type {variable: a list of states} Example ------- >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> model = BIFReader("dog-problem.bif").get_model() >>> writer = BIFWriter(model) >>> writer.get_states() {'bowel-problem': ['bowel-problem_0', 'bowel-problem_1'], 'dog-out': ['dog-out_0', 'dog-out_1'], 'family-out': ['family-out_0', 'family-out_1'], 'hear-bark': ['hear-bark_0', 'hear-bark_1'], 'light-on': ['light-on_0', 'light-on_1']} """ variable_states = {} cpds = self.model.get_cpds() for cpd in cpds: variable = cpd.variable variable_states[variable] = [] for state in cpd.state_names[variable]: state_str = str(state) # Warn users if any commas in state names if "," in state_str: logger.warning( f"State name '{state_str}' for variable '{variable}' contains commas. " "This may cause issues when loading the file. Consider removing any special characters." ) variable_states[variable].append(state_str) return variable_states
[docs] def get_properties(self): """ Add property to variables in BIF Returns ------- dict: dict of type {variable: list of properties } Example ------- >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> model = BIFReader("dog-problem.bif").get_model() >>> writer = BIFWriter(model) >>> writer.get_properties() {'bowel-problem': ['position = (335, 99)'], 'dog-out': ['position = (300, 195)'], 'family-out': ['position = (257, 99)'], 'hear-bark': ['position = (296, 268)'], 'light-on': ['position = (218, 195)']} """ variables = self.model.nodes() property_tag = {} for variable in sorted(variables): properties = self.model.nodes[variable] property_tag[variable] = [f"{prop} = {val}" for prop, val in sorted(properties.items())] return property_tag
[docs] def get_parents(self): """ Add the parents to BIF Returns ------- dict: dict of type {variable: a list of parents} Example ------- >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> model = BIFReader("dog-problem.bif").get_model() >>> writer = BIFWriter(model) >>> writer.get_parents() {'bowel-problem': [], 'dog-out': ['bowel-problem', 'family-out'], 'family-out': [], 'hear-bark': ['dog-out'], 'light-on': ['family-out']} """ cpds = self.model.get_cpds() variable_parents = {} for cpd in cpds: variable_parents[cpd.variable] = cpd.variables[1:] return variable_parents
[docs] def get_cpds(self): """ Adds tables to BIF Returns ------- dict: dict of type {variable: array} Example ------- >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> model = BIFReader("dog-problem.bif").get_model() >>> writer = BIFWriter(model) >>> writer.get_cpds() {'bowel-problem': array([ 0.01, 0.99]), 'dog-out': array([ 0.99, 0.97, 0.9 , 0.3 , 0.01, 0.03, 0.1 , 0.7 ]), 'family-out': array([ 0.15, 0.85]), 'hear-bark': array([ 0.7 , 0.01, 0.3 , 0.99]), 'light-on': array([ 0.6 , 0.05, 0.4 , 0.95])} """ cpds = self.model.get_cpds() tables = {} for cpd in cpds: tables[cpd.variable] = compat_fns.to_numpy(cpd.values.ravel(), decimals=self.round_values) return tables
[docs] def write(self, filename): """ Writes the BIF data into a file Parameters ---------- filename : Name of the file Example ------- >>> from pgmpy.example_models import load_model >>> from pgmpy.readwrite import BIFReader, BIFWriter >>> asia = load_model("bnlearn/asia") >>> writer = BIFWriter(asia) >>> writer.write(filename="asia.bif") """ writer = self.__str__() with open(filename, "w") as fout: fout.write(writer)
[docs] def write_bif(self, filename): warnings.warn( "`BIFWriter.write_bif` is deprecated. Please use `BIFWriter.write` instead.", FutureWarning, stacklevel=2 ) self.write(filename)