#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import xml.etree.ElementTree as etree
from collections import defaultdict
[docs]
class PomdpXReader(object):
"""
Initialize an instance of PomdpX reader class
Parameters
----------
path : file or str
Path of the file containing PomdpX information.
string : str
String containing PomdpX information.
Example
-------
reader = PomdpXReader('TestPomdpX.xml')
Reference
---------
http://bigbird.comp.nus.edu.sg/pmwiki/farm/appl/index.php?n=Main.PomdpXDocumentation
"""
def __init__(self, path=None, string=None):
if path:
self.network = etree.ElementTree(file=path).getroot()
elif string:
self.network = etree.fromstring(string)
else:
raise ValueError("Must specify either path or string")
[docs]
def get_description(self):
"""
Return the problem description
Examples
--------
>>> reader = PomdpXReader('Test_Pomdpx.xml')
>>> reader.get_description()
'RockSample problem for map size 1 x 3.
Rock is at 0, Rover’s initial position is at 1.
Exit is at 2.'
>>> reader = PomdpXReader('Test_PomdpX.xml')
>>> reader.get_description()
'RockSample problem for map size 1 x 3.
Rock is at 0, Rover’s initial position is at 1.
Exit is at 2.'
"""
return self.network.find("Description").text
[docs]
def get_discount(self):
"""
Returns the discount factor for the problem
Example
--------
>>> reader = PomdpXReader('Test_PomdpX.xml')
>>> reader.get_discount()
0.95
"""
return float(self.network.find("Discount").text)
[docs]
def get_variables(self):
"""
Returns list of variables of the network
Example
-------
>>> reader = PomdpXReader("pomdpx.xml")
>>> reader.get_variables()
{'StateVar': [
{'vnamePrev': 'rover_0',
'vnameCurr': 'rover_1',
'ValueEnum': ['s0', 's1', 's2'],
'fullyObs': True},
{'vnamePrev': 'rock_0',
'vnameCurr': 'rock_1',
'fullyObs': False,
'ValueEnum': ['good', 'bad']}],
'ObsVar': [{'vname': 'obs_sensor',
'ValueEnum': ['ogood', 'obad']}],
'RewardVar': [{'vname': 'reward_rover'}],
'ActionVar': [{'vname': 'action_rover',
'ValueEnum': ['amw', 'ame',
'ac', 'as']}]
}
"""
self.variables = defaultdict(list)
for variable in self.network.findall("Variable"):
_variables = defaultdict(list)
for var in variable.findall("StateVar"):
state_variables = defaultdict(list)
state_variables["vnamePrev"] = var.get("vnamePrev")
state_variables["vnameCurr"] = var.get("vnameCurr")
if var.get("fullyObs"):
state_variables["fullyObs"] = True
else:
state_variables["fullyObs"] = False
state_variables["ValueEnum"] = []
if var.find("NumValues") is not None:
for i in range(0, int(var.find("NumValues").text)):
state_variables["ValueEnum"].append("s" + str(i))
if var.find("ValueEnum") is not None:
state_variables["ValueEnum"] = var.find("ValueEnum").text.split()
_variables["StateVar"].append(state_variables)
for var in variable.findall("ObsVar"):
obs_variables = defaultdict(list)
obs_variables["vname"] = var.get("vname")
obs_variables["ValueEnum"] = var.find("ValueEnum").text.split()
_variables["ObsVar"].append(obs_variables)
for var in variable.findall("ActionVar"):
action_variables = defaultdict(list)
action_variables["vname"] = var.get("vname")
action_variables["ValueEnum"] = var.find("ValueEnum").text.split()
_variables["ActionVar"].append(action_variables)
for var in variable.findall("RewardVar"):
reward_variables = defaultdict(list)
reward_variables["vname"] = var.get("vname")
_variables["RewardVar"].append(reward_variables)
self.variables.update(_variables)
return self.variables
[docs]
def get_initial_beliefs(self):
"""
Returns the state, action and observation variables as a dictionary
in the case of table type parameter and a nested structure in case of
decision diagram parameter
Examples
--------
>>> reader = PomdpXReader('Test_PomdpX.xml')
>>> reader.get_initial_beliefs()
[{'Var': 'rover_0',
'Parent': ['null'],
'Type': 'TBL',
'Parameter': [{'Instance': ['-'],
'ProbTable': ['0.0', '1.0', '0.0']}]
},
{'Var': '',
'...': ...,'
'...': '...',
}]
"""
initial_state_belief = []
for variable in self.network.findall("InitialStateBelief"):
for var in variable.findall("CondProb"):
cond_prob = defaultdict(list)
cond_prob["Var"] = var.find("Var").text
cond_prob["Parent"] = var.find("Parent").text.split()
if not var.find("Parameter").get("type"):
cond_prob["Type"] = "TBL"
else:
cond_prob["Type"] = var.find("Parameter").get("type")
cond_prob["Parameter"] = self.get_parameter(var)
initial_state_belief.append(cond_prob)
return initial_state_belief
[docs]
def get_state_transition_function(self):
"""
Returns the transition of the state variables as nested dict in the
case of table type parameter and a nested structure in case of
decision diagram parameter
Example
--------
>>> reader = PomdpXReader('Test_PomdpX.xml')
>>> reader.get_state_transition_function()
[{'Var': 'rover_1',
'Parent': ['action_rover', 'rover_0'],
'Type': 'TBL',
'Parameter': [{'Instance': ['amw', 's0', 's2'],
'ProbTable': ['1.0']},
{'Instance': ['amw', 's1', 's0'],
'ProbTable': ['1.0']},
...
]
}]
"""
state_transition_function = []
for variable in self.network.findall("StateTransitionFunction"):
for var in variable.findall("CondProb"):
cond_prob = defaultdict(list)
cond_prob["Var"] = var.find("Var").text
cond_prob["Parent"] = var.find("Parent").text.split()
if not var.find("Parameter").get("type"):
cond_prob["Type"] = "TBL"
else:
cond_prob["Type"] = var.find("Parameter").get("type")
cond_prob["Parameter"] = self.get_parameter(var)
state_transition_function.append(cond_prob)
return state_transition_function
[docs]
def get_obs_function(self):
"""
Returns the observation function as nested dict in the case of table-
type parameter and a nested structure in case of
decision diagram parameter
Example
--------
>>> reader = PomdpXReader('Test_PomdpX.xml')
>>> reader.get_obs_function()
[{'Var': 'obs_sensor',
'Parent': ['action_rover', 'rover_1', 'rock_1'],
'Type': 'TBL',
'Parameter': [{'Instance': ['amw', '*', '*', '-'],
'ProbTable': ['1.0', '0.0']},
...
]
}]
"""
obs_function = []
for variable in self.network.findall("ObsFunction"):
for var in variable.findall("CondProb"):
cond_prob = defaultdict(list)
cond_prob["Var"] = var.find("Var").text
cond_prob["Parent"] = var.find("Parent").text.split()
if not var.find("Parameter").get("type"):
cond_prob["Type"] = "TBL"
else:
cond_prob["Type"] = var.find("Parameter").get("type")
cond_prob["Parameter"] = self.get_parameter(var)
obs_function.append(cond_prob)
return obs_function
[docs]
def get_reward_function(self):
"""
Returns the reward function as nested dict in the case of table-
type parameter and a nested structure in case of
decision diagram parameter
Example
--------
>>> reader = PomdpXReader('Test_PomdpX.xml')
>>> reader.get_reward_function()
[{'Var': 'reward_rover',
'Parent': ['action_rover', 'rover_0', 'rock_0'],
'Type': 'TBL',
'Parameter': [{'Instance': ['ame', 's1', '*'],
'ValueTable': ['10']},
...
]
}]
"""
reward_function = []
for variable in self.network.findall("RewardFunction"):
for var in variable.findall("Func"):
func = defaultdict(list)
func["Var"] = var.find("Var").text
func["Parent"] = var.find("Parent").text.split()
if not var.find("Parameter").get("type"):
func["Type"] = "TBL"
else:
func["Type"] = var.find("Parameter").get("type")
func["Parameter"] = self.get_parameter(var)
reward_function.append(func)
return reward_function
[docs]
def get_parameter(self, var):
"""
This method supports the functional tags by providing the actual
values in the function as list of dict in case of table type parameter or as
nested dict in case of decision diagram
"""
parameter = []
for parameter_tag in var.findall("Parameter"):
parameter_type = "TBL"
if parameter_tag.get("type") is not None:
parameter_type = parameter_tag.get("type")
if parameter_type == "TBL":
parameter = self.get_parameter_tbl(parameter_tag)
elif parameter_type == "DD":
parameter = defaultdict(list)
parameter = self.get_parameter_dd(parameter_tag)
return parameter
[docs]
def get_parameter_tbl(self, parameter):
"""
This method returns parameters as list of dict in case of table type
parameter
"""
par = []
for entry in parameter.findall("Entry"):
instance = defaultdict(list)
instance["Instance"] = entry.find("Instance").text.split()
if entry.find("ProbTable") is None:
instance["ValueTable"] = entry.find("ValueTable").text.split()
else:
instance["ProbTable"] = entry.find("ProbTable").text.split()
par.append(instance)
return par
[docs]
def get_parameter_dd(self, parameter):
"""
This method returns parameters as nested dicts in case of decision
diagram parameter.
"""
dag = defaultdict(list)
dag_elem = parameter.find("DAG")
node = dag_elem.find("Node")
root = node.get("var")
def get_param(node):
edges = defaultdict(list)
for edge in node.findall("Edge"):
if edge.find("Terminal") is not None:
edges[edge.get("val")] = edge.find("Terminal").text
elif edge.find("Node") is not None:
node_cpd = defaultdict(list)
node_cpd[edge.find("Node").get("var")] = get_param(
edge.find("Node")
)
edges[edge.get("val")] = node_cpd
elif edge.find("SubDAG") is not None:
subdag_attribute = defaultdict(list)
subdag_attribute["type"] = edge.find("SubDAG").get("type")
if subdag_attribute["type"] == "template":
subdag_attribute["idref"] = edge.find("SubDAG").get("idref")
if edge.find("SubDAG").get("var"):
subdag_attribute["var"] = edge.find("SubDAG").get("var")
if edge.find("SubDAG").get("val"):
subdag_attribute["val"] = edge.find("SubDAG").get("val")
edges[edge.get("val")] = subdag_attribute
return edges
if parameter.find("SubDAGTemplate") is not None:
SubDAGTemplate = parameter.find("SubDAGTemplate")
subdag_root = SubDAGTemplate.find("Node")
subdag_node = subdag_root.get("var")
subdag_dict = defaultdict(list)
subdag_dict[subdag_node] = get_param(subdag_root)
dag["SubDAGTemplate"] = subdag_dict
dag["id"] = SubDAGTemplate.get("id")
dag[root] = get_param(node)
return dag
[docs]
class PomdpXWriter(object):
"""
Initialise a PomdpXWriter Object
Parameters
----------
model: A Bayesian of Markov Model
The model to write
encoding: String(optional)
Encoding for text data
prettyprint: Bool(optional)
Indentation in output XML if true
"""
def __init__(self, model_data, encoding="utf-8", prettyprint=True):
self.model = model_data
self.encoding = encoding
self.prettyprint = prettyprint
self.xml = etree.Element("pomdpx", attrib={"version": "1.0"})
self.description = etree.SubElement(self.xml, "Description")
self.discount = etree.SubElement(self.xml, "Discount")
self.variable = etree.SubElement(self.xml, "Variable")
self.initial_belief = etree.SubElement(self.xml, "InitialStateBelief")
self.transition_function = etree.SubElement(self.xml, "StateTransitionFunction")
self.observation_function = etree.SubElement(self.xml, "ObsFunction")
self.reward_function = etree.SubElement(self.xml, "RewardFunction")
def __str__(self, xml):
"""
Return the XML as string.
"""
if self.prettyprint:
self.indent(xml)
return etree.tostring(xml, encoding=self.encoding)
[docs]
def indent(self, elem, level=0):
"""
Inplace prettyprint formatter.
"""
i = "\n" + level * " "
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + " "
if not elem.tail or not elem.tail.strip():
elem.tail = i
for elem in elem:
self.indent(elem, level + 1)
if not elem.tail or not elem.tail.strip():
elem.tail = i
else:
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i
def _add_value_enum(self, var, tag):
"""
supports adding variables to the xml
Parameters
---------------
var: The SubElement variable
tag: The SubElement tag to which enum value is to be added
Return
---------------
None
"""
if var["ValueEnum"][0] == "s0":
numvalues_tag = etree.SubElement(tag, "NumValues")
numvalues_tag.text = str(int(var["ValueEnum"][-1][-1]) + 1)
else:
valueenum_tag = etree.SubElement(tag, "ValueEnum")
valueenum_tag.text = ""
for value in var["ValueEnum"]:
valueenum_tag.text += value + " "
valueenum_tag.text = valueenum_tag.text[:-1]
[docs]
def get_variables(self):
"""
Add variables to PomdpX
Return
---------------
xml containing variables tag
"""
state_variables = self.model["variables"]["StateVar"]
for var in state_variables:
state_var_tag = etree.SubElement(
self.variable,
"StateVar",
attrib={
"vnamePrev": var["vnamePrev"],
"vnameCurr": var["vnameCurr"],
"fullyObs": "true" if var["fullyObs"] else "false",
},
)
self._add_value_enum(var, state_var_tag)
obs_variables = self.model["variables"]["ObsVar"]
for var in obs_variables:
obs_var_tag = etree.SubElement(
self.variable, "ObsVar", attrib={"vname": var["vname"]}
)
self._add_value_enum(var, obs_var_tag)
action_variables = self.model["variables"]["ActionVar"]
for var in action_variables:
action_var_tag = etree.SubElement(
self.variable, "ActionVar", attrib={"vname": var["vname"]}
)
self._add_value_enum(var, action_var_tag)
reward_var = self.model["variables"]["RewardVar"]
for var in reward_var:
etree.SubElement(self.variable, "RewardVar", attrib={"vname": var["vname"]})
return self.__str__(self.variable)[:-1]
[docs]
def add_parameter_dd(self, dag_tag, node_dict):
"""
helper function for adding parameters in condition
Parameters
---------------
dag_tag: etree SubElement
the DAG tag is contained in this subelement
node_dict: dictionary
the decision diagram dictionary
Return
---------------
None
"""
if isinstance(node_dict, defaultdict) or isinstance(node_dict, dict):
node_tag = etree.SubElement(
dag_tag, "Node", attrib={"var": next(iter(node_dict.keys()))}
)
edge_dict = next(iter(node_dict.values()))
for edge in sorted(edge_dict.keys(), key=tuple):
edge_tag = etree.SubElement(node_tag, "Edge", attrib={"val": edge})
value = edge_dict.get(edge)
if isinstance(value, str):
terminal_tag = etree.SubElement(edge_tag, "Terminal")
terminal_tag.text = value
elif "type" in value:
if "val" in value:
etree.SubElement(
edge_tag,
"SubDAG",
attrib={
"type": value["type"],
"var": value["var"],
"val": value["val"],
},
)
elif "idref" in value:
etree.SubElement(
edge_tag,
"SubDAG",
attrib={"type": value["type"], "idref": value["idref"]},
)
else:
etree.SubElement(
edge_tag,
"SubDAG",
attrib={"type": value["type"], "var": value["var"]},
)
else:
self.add_parameter_dd(edge_tag, value)
[docs]
def add_conditions(self, condition, condprob):
"""
helper function for adding probability conditions for model\
Parameters
---------------
condition: dictionary
contains and element of conditions list
condprob: etree SubElement
the tag to which condition is added
Return
---------------
None
"""
var_tag = etree.SubElement(condprob, "Var")
var_tag.text = condition["Var"]
parent_tag = etree.SubElement(condprob, "Parent")
parent_tag.text = ""
for parent in condition["Parent"]:
parent_tag.text += parent + " "
parent_tag.text = parent_tag.text[:-1]
parameter_tag = etree.SubElement(
condprob,
"Parameter",
attrib={
"type": condition["Type"] if condition["Type"] is not None else "TBL"
},
)
if condition["Type"] == "DD":
dag_tag = etree.SubElement(parameter_tag, "DAG")
parameter_dict = condition["Parameter"]
if "SubDAGTemplate" in parameter_dict:
subdag_tag = etree.SubElement(
parameter_tag, "SubDAGTemplate", attrib={"id": parameter_dict["id"]}
)
self.add_parameter_dd(subdag_tag, parameter_dict["SubDAGTemplate"])
del parameter_dict["SubDAGTemplate"]
del parameter_dict["id"]
self.add_parameter_dd(dag_tag, parameter_dict)
else:
self.add_parameter_dd(dag_tag, parameter_dict)
else:
for parameter in condition["Parameter"]:
entry = etree.SubElement(parameter_tag, "Entry")
instance = etree.SubElement(entry, "Instance")
instance.text = ""
for instance_var in parameter["Instance"]:
instance.text += instance_var + " "
length_instance = len(parameter["Instance"])
if len(parameter["Instance"][length_instance - 1]) > 1:
instance.text = instance.text[:-1]
if len(parameter["Instance"]) == 1:
instance.text = " " + instance.text
if condprob.tag == "Func":
table = "ValueTable"
else:
table = "ProbTable"
prob_table = parameter[table]
prob_table_tag = etree.SubElement(entry, table)
prob_table_tag.text = ""
for probability in prob_table:
prob_table_tag.text += probability + " "
prob_table_tag.text = prob_table_tag.text[:-1]
[docs]
def add_initial_belief(self):
"""
add initial belief tag to pomdpx model
Return
---------------
string containing the xml for initial belief tag
"""
initial_belief = self.model["initial_state_belief"]
for condition in initial_belief:
condprob = etree.SubElement(self.initial_belief, "CondProb")
self.add_conditions(condition, condprob)
return self.__str__(self.initial_belief)[:-1]
[docs]
def add_state_transition_function(self):
"""
add state transition function tag to pomdpx model
Return
---------------
string containing the xml for state transition tag
"""
state_transition_function = self.model["state_transition_function"]
for condition in state_transition_function:
condprob = etree.SubElement(self.transition_function, "CondProb")
self.add_conditions(condition, condprob)
return self.__str__(self.transition_function)[:-1]
[docs]
def add_obs_function(self):
"""
add observation function tag to pomdpx model
Return
---------------
string containing the xml for observation function tag
"""
obs_function = self.model["obs_function"]
for condition in obs_function:
condprob = etree.SubElement(self.observation_function, "CondProb")
self.add_conditions(condition, condprob)
return self.__str__(self.observation_function)[:-1]
[docs]
def add_reward_function(self):
"""
add reward function tag to pomdpx model
Return
---------------
string containing the xml for reward function tag
"""
reward_function = self.model["reward_function"]
for condition in reward_function:
condprob = etree.SubElement(self.reward_function, "Func")
self.add_conditions(condition, condprob)
return self.__str__(self.reward_function)[:-1]