Source code for pgmpy.readwrite.UAI

from itertools import combinations

import numpy as np

try:
    from pyparsing import Combine, Literal, Optional, Regex, Word, alphas, nums
except ImportError:
    raise ImportError(
        e.message()
        + ". pyparsing is required for using read/write methods. Please install using: pip install pyparsing."
    )

from pgmpy.factors.discrete import DiscreteFactor, TabularCPD
from pgmpy.models import BayesianNetwork, MarkovNetwork
from pgmpy.utils import compat_fns


[docs] class UAIReader(object): """ Initialize an instance of UAI reader class Parameters ---------- path : file or str Path of the file containing UAI information. string : str String containing UAI information. Examples -------- >>> from pgmpy.readwrite import UAIReader >>> reader = UAIReader('TestUai.uai') >>> model = reader.get_model() Reference --------- [1] https://uaicompetition.github.io/uci-2022/file-formats/model-format/ [2] https://forgemia.inra.fr/thomas.schiex/toulbar2/-/blob/master/doc/UAI08Format.txt """ def __init__(self, path=None, string=None): if path: with open(path, "r") as f: self.network = f.read() elif string: self.network = string else: raise ValueError("Must specify either path or string.") if "#" in self.network: self.network = ( Regex("#.*").suppress().transformString(self.network) ) # removing comments from the file self.grammar = self.get_grammar() self.network_type = self.get_network_type() self.variables = self.get_variables() self.domain = self.get_domain() self.edges = self.get_edges() self.tables = self.get_tables()
[docs] def get_grammar(self): """ Returns the grammar of the UAI file. """ network_name = Word(alphas).setResultsName("network_name") no_variables = Word(nums).setResultsName("no_variables") grammar = network_name + no_variables self.no_variables = int(grammar.parseString(self.network)["no_variables"]) domain_variables = (Word(nums) * self.no_variables).setResultsName( "domain_variables" ) grammar += domain_variables no_functions = Word(nums).setResultsName("no_functions") grammar += no_functions self.no_functions = int(grammar.parseString(self.network)["no_functions"]) integer = Word(nums).setParseAction(lambda t: int(t[0])) for function in range(0, self.no_functions): scope_grammar = Word(nums).setResultsName("fun_scope_" + str(function)) grammar += scope_grammar function_scope = grammar.parseString(self.network)[ "fun_scope_" + str(function) ] function_grammar = ((integer) * int(function_scope)).setResultsName( "fun_" + str(function) ) grammar += function_grammar floatnumber = Combine( Word(nums) + Optional(Literal(".") + Optional(Word(nums))) ) for function in range(0, self.no_functions): no_values_grammar = Word(nums).setResultsName( "fun_no_values_" + str(function) ) grammar += no_values_grammar no_values = grammar.parseString(self.network)[ "fun_no_values_" + str(function) ] values_grammar = ((floatnumber) * int(no_values)).setResultsName( "fun_values_" + str(function) ) grammar += values_grammar return grammar
[docs] def get_network_type(self): """ Returns the type of network defined by the file. Returns ------- string : str String containing network type. Examples -------- >>> from pgmpy.readwrite import UAIReader >>> reader = UAIReader('TestUAI.uai') >>> reader.get_network_type() 'MARKOV' """ network_type = self.grammar.parseString(self.network) return network_type["network_name"]
[docs] def get_variables(self): """ Returns a list of variables. Each variable is represented by an index of list. For example if the no of variables are 4 then the list will be [var_0, var_1, var_2, var_3] Returns ------- list: list of variables Examples -------- >>> from pgmpy.readwrite import UAIReader >>> reader = UAIReader('TestUAI.uai') >>> reader.get_variables() ['var_0', 'var_1', 'var_2'] """ variables = [] for var in range(0, self.no_variables): var_name = "var_" + str(var) variables.append(var_name) return variables
[docs] def get_domain(self): """ Returns the dictionary of variables with keys as variable name and values as domain of the variables. Returns ------- dict: dictionary containing variables and their domains Examples -------- >>> from pgmpy.readwrite import UAIReader >>> reader = UAIReader('TestUAI.uai') >>> reader.get_domain() {'var_0': '2', 'var_1': '2', 'var_2': '3'} """ domain = {} var_domain = self.grammar.parseString(self.network)["domain_variables"] for var in range(0, len(var_domain)): domain["var_" + str(var)] = var_domain[var] return domain
[docs] def get_edges(self): """ Returns the edges of the network. Returns ------- set: set containing the edges of the network Examples -------- >>> from pgmpy.readwrite import UAIReader >>> reader = UAIReader('TestUAI.uai') >>> reader.get_edges() {('var_0', 'var_1'), ('var_0', 'var_2'), ('var_1', 'var_2')} """ edges = [] for function in range(0, self.no_functions): function_variables = self.grammar.parseString(self.network)[ "fun_" + str(function) ] if isinstance(function_variables, int): function_variables = [function_variables] if self.network_type == "BAYES": child_var = "var_" + str(function_variables[-1]) function_variables = function_variables[:-1] for var in function_variables: edges.append(("var_" + str(var), child_var)) elif self.network_type == "MARKOV": function_variables = ["var_" + str(var) for var in function_variables] edges.extend(list(combinations(function_variables, 2))) return set(edges)
[docs] def get_tables(self): """ Returns list of tuple of child variable and CPD in case of Bayesian and list of tuple of scope of variables and values in case of Markov. Returns ------- list : list of tuples of child variable and values in Bayesian list of tuples of scope of variables and values in case of Markov. Examples -------- >>> from pgmpy.readwrite import UAIReader >>> reader = UAIReader('TestUAI.uai') >>> reader.get_tables() [(['var_0', 'var_1'], ['4.000', '2.400', '1.000', '0.000']), (['var_0', 'var_1', 'var_2'], ['2.2500', '3.2500', '3.7500', '0.0000', '0.0000', '10.0000', '1.8750', '4.0000', '3.3330', '2.0000', '2.0000', '3.4000'])] """ tables = [] for function in range(0, self.no_functions): function_variables = self.grammar.parseString(self.network)[ "fun_" + str(function) ] if isinstance(function_variables, int): function_variables = [function_variables] if self.network_type == "BAYES": child_var = "var_" + str(function_variables[-1]) values = self.grammar.parseString(self.network)[ "fun_values_" + str(function) ] tables.append((child_var, list(values))) elif self.network_type == "MARKOV": function_variables = ["var_" + str(var) for var in function_variables] values = self.grammar.parseString(self.network)[ "fun_values_" + str(function) ] tables.append((function_variables, list(values))) return tables
[docs] def get_model(self): """ Returns an instance of Bayesian Model or Markov Model. Variables are in the pattern var_0, var_1, var_2 where var_0 is 0th index variable, var_1 is 1st index variable. Return ------ model: an instance of Bayesian or Markov Model. Examples -------- >>> from pgmpy.readwrite import UAIReader >>> reader = UAIReader('TestUAI.uai') >>> reader.get_model() """ if self.network_type == "BAYES": model = BayesianNetwork() model.add_nodes_from(self.variables) model.add_edges_from(self.edges) tabular_cpds = [] for child_var, values in self.tables: states = int(self.domain[child_var]) values = np.fromiter(values, dtype=float) values = values.reshape(states, values.size // states) parents = list(model.predecessors(child_var)) if len(parents) == 0: tabular_cpds.append(TabularCPD(child_var, states, values)) else: tabular_cpds.append( TabularCPD( child_var, states, values, evidence=parents, evidence_card=[int(self.domain[var]) for var in parents], ) ) model.add_cpds(*tabular_cpds) return model elif self.network_type == "MARKOV": model = MarkovNetwork(self.edges) factors = [] for table in self.tables: variables = table[0] cardinality = [int(self.domain[var]) for var in variables] value = list(map(float, table[1])) factor = DiscreteFactor( variables=variables, cardinality=cardinality, values=value ) factors.append(factor) model.add_factors(*factors) return model
[docs] class UAIWriter(object): """ Initialize an instance of UAI writer class Parameters ---------- model: A Bayesian or Markov model The model to write round_values: int (default: None) The number to decimals to which to round the probability values. If None, keeps all decimals points. Examples -------- >>> from pgmpy.readwrite import UAIWriter >>> from pgmpy.utils import get_example_model >>> model = get_example_model('asia') >>> writer = UAIWriter(asia) >>> writer.write_uai('asia.uai') """ def __init__(self, model, round_values=None): if isinstance(model, BayesianNetwork): self.network = "BAYES\n" elif isinstance(model, MarkovNetwork): self.network = "MARKOV\n" else: raise TypeError("Model must be an instance of Bayesian or Markov model.") self.model = model self.round_values = round_values self.no_nodes = self.get_nodes() self.domain = self.get_domain() self.functions = self.get_functions() self.tables = self.get_tables() def __str__(self): """ Returns the UAI file as a string. """ self.network += self.no_nodes + "\n" domain = sorted(self.domain.items(), key=lambda x: (x[1], x[0])) self.network += " ".join([var[1] for var in domain]) + "\n" self.network += str(len(self.functions)) + "\n" for fun in self.functions: self.network += str(len(fun)) + " " self.network += " ".join(fun) + "\n" self.network += "\n" for table in self.tables: self.network += str(len(table)) + "\n" self.network += " ".join(table) + "\n" return self.network[:-1]
[docs] def get_nodes(self): """ Adds variables to the network. Examples -------- >>> from pgmpy.readwrite import UAIWriter >>> writer = UAIWriter(model) >>> writer.get_nodes() """ no_nodes = len(self.model.nodes()) return str(no_nodes)
[docs] def get_domain(self): """ Adds domain of each variable to the network. Examples -------- >>> from pgmpy.readwrite import UAIWriter >>> writer = UAIWriter(model) >>> writer.get_domain() """ if isinstance(self.model, BayesianNetwork): cpds = self.model.get_cpds() cpds.sort(key=lambda x: x.variable) domain = {} for cpd in cpds: domain[cpd.variable] = str(cpd.variable_card) return domain elif isinstance(self.model, MarkovNetwork): factors = self.model.get_factors() domain = {} for factor in factors: variables = factor.variables for var in variables: if var not in domain: domain[var] = str(factor.get_cardinality([var])[var]) return domain else: raise TypeError("Model must be an instance of Markov or Bayesian model.")
[docs] def get_functions(self): """ Adds functions to the network. Examples -------_ >>> from pgmpy.readwrite import UAIWriter >>> writer = UAIWriter(model) >>> writer.get_functions() """ if isinstance(self.model, BayesianNetwork): cpds = self.model.get_cpds() cpds.sort(key=lambda x: x.variable) variables = sorted(self.domain.items(), key=lambda x: (x[1], x[0])) functions = [] for cpd in cpds: child_var = cpd.variable evidence = cpd.variables[:0:-1] function = [ str(variables.index((var, self.domain[var]))) for var in evidence ] function.append( str(variables.index((child_var, self.domain[child_var]))) ) functions.append(function) return functions elif isinstance(self.model, MarkovNetwork): factors = self.model.get_factors() functions = [] variables = sorted(self.domain.items(), key=lambda x: (x[1], x[0])) for factor in factors: scope = factor.scope() function = [ str(variables.index((var, self.domain[var]))) for var in scope ] functions.append(function) return functions else: raise TypeError("Model must be an instance of Markov or Bayesian model.")
[docs] def get_tables(self): """ Adds tables to the network. Examples -------- >>> from pgmpy.readwrite import UAIWriter >>> writer = UAIWriter(model) >>> writer.get_tables() """ if isinstance(self.model, BayesianNetwork): cpds = self.model.get_cpds() cpds.sort(key=lambda x: x.variable) tables = [] for cpd in cpds: values = list( map( str, compat_fns.to_numpy( cpd.values.ravel(), decimals=self.round_values ), ) ) tables.append(values) return tables elif isinstance(self.model, MarkovNetwork): factors = self.model.get_factors() tables = [] for factor in factors: values = list( map( str, compat_fns.to_numpy( factor.values.ravel(), decimals=self.round_values ), ) ) tables.append(values) return tables else: raise TypeError("Model must be an instance of Markov or Bayesian model.")
[docs] def write_uai(self, filename): """ Write the xml data into the file. Parameters ---------- filename: Name of the file. Examples -------- >>> from pgmpy.readwrite import UAIWriter >>> from pgmpy.utils import get_example_model >>> model = get_example_model('asia') >>> writer = UAIWriter(asia) >>> writer.write_uai('asia.uai') """ writer = self.__str__() with open(filename, "w") as fout: fout.write(writer)