#!/usr/bin/env python3
import itertools
import networkx as nx
import numpy as np
import pandas as pd
from pgmpy.base import UndirectedGraph
from pgmpy.global_vars import logger
from pgmpy.independencies import Independencies
[docs]
class DAG(nx.DiGraph):
"""
Base class for all Directed Graphical Models.
Each node in the graph can represent either a random variable, `Factor`,
or a cluster of random variables. Edges in the graph represent the
dependencies between these.
Parameters
----------
data: input graph
Data to initialize graph. If data=None (default) an empty graph is
created. The data can be an edge list or any Networkx graph object.
Examples
--------
Create an empty DAG with no nodes and no edges
>>> from pgmpy.base import DAG
>>> G = DAG()
G can be grown in several ways:
**Nodes:**
Add one node at a time:
>>> G.add_node(node='a')
Add the nodes from any container (a list, set or tuple or the nodes
from another graph).
>>> G.add_nodes_from(nodes=['a', 'b'])
**Edges:**
G can also be grown by adding edges.
Add one edge,
>>> G.add_edge(u='a', v='b')
a list of edges,
>>> G.add_edges_from(ebunch=[('a', 'b'), ('b', 'c')])
If some edges connect nodes not yet in the model, the nodes
are added automatically. There are no errors when adding
nodes or edges that already exist.
**Shortcuts:**
Many common graph features allow python syntax for speed reporting.
>>> 'a' in G # check if node in graph
True
>>> len(G) # number of nodes in graph
3
"""
def __init__(self, ebunch=None, latents=set()):
super(DAG, self).__init__(ebunch)
self.latents = set(latents)
cycles = []
try:
cycles = list(nx.find_cycle(self))
except nx.NetworkXNoCycle:
pass
else:
out_str = "Cycles are not allowed in a DAG."
out_str += "\nEdges indicating the path taken for a loop: "
out_str += "".join([f"({u},{v}) " for (u, v) in cycles])
raise ValueError(out_str)
[docs]
def add_node(self, node, weight=None, latent=False):
"""
Adds a single node to the Graph.
Parameters
----------
node: str, int, or any hashable python object.
The node to add to the graph.
weight: int, float
The weight of the node.
latent: boolean (default: False)
Specifies whether the variable is latent or not.
Examples
--------
>>> from pgmpy.base import DAG
>>> G = DAG()
>>> G.add_node(node='A')
>>> sorted(G.nodes())
['A']
Adding a node with some weight.
>>> G.add_node(node='B', weight=0.3)
The weight of these nodes can be accessed as:
>>> G.nodes['B']
{'weight': 0.3}
>>> G.nodes['A']
{'weight': None}
"""
# Check for networkx 2.0 syntax
if isinstance(node, tuple) and len(node) == 2 and isinstance(node[1], dict):
node, attrs = node
if attrs.get("weight", None) is not None:
attrs["weight"] = weight
else:
attrs = {"weight": weight}
if latent:
self.latents.add(node)
super(DAG, self).add_node(node, weight=weight)
[docs]
def add_nodes_from(self, nodes, weights=None, latent=False):
"""
Add multiple nodes to the Graph.
**The behviour of adding weights is different than in networkx.
Parameters
----------
nodes: iterable container
A container of nodes (list, dict, set, or any hashable python
object).
weights: list, tuple (default=None)
A container of weights (int, float). The weight value at index i
is associated with the variable at index i.
latent: list, tuple (default=False)
A container of boolean. The value at index i tells whether the
node at index i is latent or not.
Examples
--------
>>> from pgmpy.base import DAG
>>> G = DAG()
>>> G.add_nodes_from(nodes=['A', 'B', 'C'])
>>> G.nodes()
NodeView(('A', 'B', 'C'))
Adding nodes with weights:
>>> G.add_nodes_from(nodes=['D', 'E'], weights=[0.3, 0.6])
>>> G.nodes['D']
{'weight': 0.3}
>>> G.nodes['E']
{'weight': 0.6}
>>> G.nodes['A']
{'weight': None}
"""
nodes = list(nodes)
if isinstance(latent, bool):
latent = [latent] * len(nodes)
if weights:
if len(nodes) != len(weights):
raise ValueError(
"The number of elements in nodes and weights" "should be equal."
)
for index in range(len(nodes)):
self.add_node(
node=nodes[index], weight=weights[index], latent=latent[index]
)
else:
for index in range(len(nodes)):
self.add_node(node=nodes[index], latent=latent[index])
[docs]
def add_edge(self, u, v, weight=None):
"""
Add an edge between u and v.
The nodes u and v will be automatically added if they are
not already in the graph.
Parameters
----------
u, v : nodes
Nodes can be any hashable Python object.
weight: int, float (default=None)
The weight of the edge
Examples
--------
>>> from pgmpy.base import DAG
>>> G = DAG()
>>> G.add_nodes_from(nodes=['Alice', 'Bob', 'Charles'])
>>> G.add_edge(u='Alice', v='Bob')
>>> G.nodes()
NodeView(('Alice', 'Bob', 'Charles'))
>>> G.edges()
OutEdgeView([('Alice', 'Bob')])
When the node is not already present in the graph:
>>> G.add_edge(u='Alice', v='Ankur')
>>> G.nodes()
NodeView(('Alice', 'Ankur', 'Bob', 'Charles'))
>>> G.edges()
OutEdgeView([('Alice', 'Bob'), ('Alice', 'Ankur')])
Adding edges with weight:
>>> G.add_edge('Ankur', 'Maria', weight=0.1)
>>> G.edge['Ankur']['Maria']
{'weight': 0.1}
"""
super(DAG, self).add_edge(u, v, weight=weight)
[docs]
def add_edges_from(self, ebunch, weights=None):
"""
Add all the edges in ebunch.
If nodes referred in the ebunch are not already present, they
will be automatically added. Node names can be any hashable python
object.
**The behavior of adding weights is different than networkx.
Parameters
----------
ebunch : container of edges
Each edge given in the container will be added to the graph.
The edges must be given as 2-tuples (u, v).
weights: list, tuple (default=None)
A container of weights (int, float). The weight value at index i
is associated with the edge at index i.
Examples
--------
>>> from pgmpy.base import DAG
>>> G = DAG()
>>> G.add_nodes_from(nodes=['Alice', 'Bob', 'Charles'])
>>> G.add_edges_from(ebunch=[('Alice', 'Bob'), ('Bob', 'Charles')])
>>> G.nodes()
NodeView(('Alice', 'Bob', 'Charles'))
>>> G.edges()
OutEdgeView([('Alice', 'Bob'), ('Bob', 'Charles')])
When the node is not already in the model:
>>> G.add_edges_from(ebunch=[('Alice', 'Ankur')])
>>> G.nodes()
NodeView(('Alice', 'Bob', 'Charles', 'Ankur'))
>>> G.edges()
OutEdgeView([('Alice', 'Bob'), ('Bob', 'Charles'), ('Alice', 'Ankur')])
Adding edges with weights:
>>> G.add_edges_from([('Ankur', 'Maria'), ('Maria', 'Mason')],
... weights=[0.3, 0.5])
>>> G.edge['Ankur']['Maria']
{'weight': 0.3}
>>> G.edge['Maria']['Mason']
{'weight': 0.5}
"""
ebunch = list(ebunch)
if weights:
if len(ebunch) != len(weights):
raise ValueError(
"The number of elements in ebunch and weights" "should be equal"
)
for index in range(len(ebunch)):
self.add_edge(ebunch[index][0], ebunch[index][1], weight=weights[index])
else:
for edge in ebunch:
self.add_edge(edge[0], edge[1])
[docs]
def get_parents(self, node):
"""
Returns a list of parents of node.
Throws an error if the node is not present in the graph.
Parameters
----------
node: string, int or any hashable python object.
The node whose parents would be returned.
Examples
--------
>>> from pgmpy.base import DAG
>>> G = DAG(ebunch=[('diff', 'grade'), ('intel', 'grade')])
>>> G.get_parents(node='grade')
['diff', 'intel']
"""
return list(self.predecessors(node))
[docs]
def moralize(self):
"""
Removes all the immoralities in the DAG and creates a moral
graph (UndirectedGraph).
A v-structure X->Z<-Y is an immorality if there is no directed edge
between X and Y.
Examples
--------
>>> from pgmpy.base import DAG
>>> G = DAG(ebunch=[('diff', 'grade'), ('intel', 'grade')])
>>> moral_graph = G.moralize()
>>> moral_graph.edges()
EdgeView([('intel', 'grade'), ('intel', 'diff'), ('grade', 'diff')])
"""
moral_graph = UndirectedGraph()
moral_graph.add_nodes_from(self.nodes())
moral_graph.add_edges_from(self.to_undirected().edges())
for node in self.nodes():
moral_graph.add_edges_from(
itertools.combinations(self.get_parents(node), 2)
)
return moral_graph
[docs]
def get_leaves(self):
"""
Returns a list of leaves of the graph.
Examples
--------
>>> from pgmpy.base import DAG
>>> graph = DAG([('A', 'B'), ('B', 'C'), ('B', 'D')])
>>> graph.get_leaves()
['C', 'D']
"""
return [node for node, out_degree in self.out_degree_iter() if out_degree == 0]
[docs]
def out_degree_iter(self, nbunch=None, weight=None):
if nx.__version__.startswith("1"):
return super(DAG, self).out_degree_iter(nbunch, weight)
else:
return iter(self.out_degree(nbunch, weight))
[docs]
def in_degree_iter(self, nbunch=None, weight=None):
if nx.__version__.startswith("1"):
return super(DAG, self).in_degree_iter(nbunch, weight)
else:
return iter(self.in_degree(nbunch, weight))
[docs]
def get_roots(self):
"""
Returns a list of roots of the graph.
Examples
--------
>>> from pgmpy.base import DAG
>>> graph = DAG([('A', 'B'), ('B', 'C'), ('B', 'D'), ('E', 'B')])
>>> graph.get_roots()
['A', 'E']
"""
return [
node for node, in_degree in dict(self.in_degree()).items() if in_degree == 0
]
[docs]
def get_children(self, node):
"""
Returns a list of children of node.
Throws an error if the node is not present in the graph.
Parameters
----------
node: string, int or any hashable python object.
The node whose children would be returned.
Examples
--------
>>> from pgmpy.base import DAG
>>> g = DAG(ebunch=[('A', 'B'), ('C', 'B'), ('B', 'D'),
('B', 'E'), ('B', 'F'), ('E', 'G')])
>>> g.get_children(node='B')
['D', 'E', 'F']
"""
return list(self.successors(node))
[docs]
def get_independencies(self, latex=False, include_latents=False):
"""
Computes independencies in the DAG, by checking d-seperation.
Parameters
----------
latex: boolean
If latex=True then latex string of the independence assertion
would be created.
include_latents: boolean
If True, includes latent variables in the independencies. Otherwise,
only generates independencies on observed variables.
Examples
--------
>>> from pgmpy.base import DAG
>>> chain = DAG([('X', 'Y'), ('Y', 'Z')])
>>> chain.get_independencies()
(X \u27C2 Z | Y)
(Z \u27C2 X | Y)
"""
nodes = set(self.nodes())
if not include_latents:
nodes = set(self.nodes()) - self.latents
independencies = Independencies()
for start in nodes:
if not include_latents:
rest = set(self.nodes()) - {start} - self.latents
else:
rest = set(self.nodes()) - {start}
for r in range(len(rest)):
for observed in itertools.combinations(rest, r):
d_seperated_variables = (
rest
- set(observed)
- set(
self.active_trail_nodes(
start,
observed=observed,
include_latents=include_latents,
)[start]
)
)
if d_seperated_variables:
independencies.add_assertions(
[start, d_seperated_variables, observed]
)
independencies.reduce()
if not latex:
return independencies
else:
return independencies.latex_string()
[docs]
def local_independencies(self, variables):
"""
Returns an instance of Independencies containing the local independencies
of each of the variables.
Parameters
----------
variables: str or array like
variables whose local independencies are to be found.
Examples
--------
>>> from pgmpy.base import DAG
>>> student = DAG()
>>> student.add_edges_from([('diff', 'grade'), ('intel', 'grade'),
>>> ('grade', 'letter'), ('intel', 'SAT')])
>>> ind = student.local_independencies('grade')
>>> ind
(grade \u27C2 SAT | diff, intel)
"""
independencies = Independencies()
for variable in (
variables if isinstance(variables, (list, tuple)) else [variables]
):
non_descendents = (
set(self.nodes())
- {variable}
- set(nx.dfs_preorder_nodes(self, variable))
)
parents = set(self.get_parents(variable))
if non_descendents - parents:
independencies.add_assertions(
[variable, non_descendents - parents, parents]
)
return independencies
[docs]
def is_iequivalent(self, model):
"""
Checks whether the given model is I-equivalent
Two graphs G1 and G2 are said to be I-equivalent if they have same skeleton
and have same set of immoralities.
Parameters
----------
model : A DAG object, for which you want to check I-equivalence
Returns
--------
I-equivalence: boolean
True if both are I-equivalent, False otherwise
Examples
--------
>>> from pgmpy.base import DAG
>>> G = DAG()
>>> G.add_edges_from([('V', 'W'), ('W', 'X'),
... ('X', 'Y'), ('Z', 'Y')])
>>> G1 = DAG()
>>> G1.add_edges_from([('W', 'V'), ('X', 'W'),
... ('X', 'Y'), ('Z', 'Y')])
>>> G.is_iequivalent(G1)
True
"""
if not isinstance(model, DAG):
raise TypeError(
f"Model must be an instance of DAG. Got type: {type(model)}"
)
if (self.to_undirected().edges() == model.to_undirected().edges()) and (
self.get_immoralities() == model.get_immoralities()
):
return True
return False
[docs]
def get_immoralities(self):
"""
Finds all the immoralities in the model
A v-structure X -> Z <- Y is an immorality if there is no direct edge between X and Y .
Returns
-------
Immoralities: set
A set of all the immoralities in the model
Examples
---------
>>> from pgmpy.base import DAG
>>> student = DAG()
>>> student.add_edges_from([('diff', 'grade'), ('intel', 'grade'),
... ('intel', 'SAT'), ('grade', 'letter')])
>>> student.get_immoralities()
{('diff', 'intel')}
"""
immoralities = set()
for node in self.nodes():
for parents in itertools.combinations(self.predecessors(node), 2):
if not self.has_edge(parents[0], parents[1]) and not self.has_edge(
parents[1], parents[0]
):
immoralities.add(tuple(sorted(parents)))
return immoralities
[docs]
def is_dconnected(self, start, end, observed=None):
"""
Returns True if there is an active trail (i.e. d-connection) between
`start` and `end` node given that `observed` is observed.
Parameters
----------
start, end : int, str, any hashable python object.
The nodes in the DAG between which to check the d-connection/active trail.
observed : list, array-like (optional)
If given the active trail would be computed assuming these nodes to
be observed.
Examples
--------
>>> from pgmpy.base import DAG
>>> student = DAG()
>>> student.add_nodes_from(['diff', 'intel', 'grades', 'letter', 'sat'])
>>> student.add_edges_from([('diff', 'grades'), ('intel', 'grades'), ('grades', 'letter'),
... ('intel', 'sat')])
>>> student.is_dconnected('diff', 'intel')
False
>>> student.is_dconnected('grades', 'sat')
True
"""
if end in self.active_trail_nodes(start, observed)[start]:
return True
else:
return False
[docs]
def minimal_dseparator(self, start, end):
"""
Finds the minimal d-separating set for `start` and `end`.
Parameters
----------
start: node
The first node.
end: node
The second node.
Examples
--------
>>> dag = DAG([('A', 'B'), ('B', 'C')])
>>> dag.minimal_dseparator(start='A', end='C')
{'B'}
References
----------
[1] Algorithm 4, Page 10: Tian, Jin, Azaria Paz, and Judea Pearl. Finding minimal d-separators. Computer Science Department, University of California, 1998.
"""
if (end in self.neighbors(start)) or (start in self.neighbors(end)):
raise ValueError(
"No possible separators because start and end are adjacent"
)
an_graph = self.get_ancestral_graph([start, end])
separator = set(
itertools.chain(self.predecessors(start), self.predecessors(end))
)
# If any of the parents were latents, take the latent's parent
while len(separator.intersection(self.latents)) != 0:
separator_copy = separator.copy()
for u in separator:
if u in self.latents:
separator_copy.remove(u)
separator_copy.update(set(self.predecessors(u)))
separator = separator_copy
# Remove the start and end nodes in case it reaches there while removing latents.
separator.difference_update({start, end})
# If the initial set is not able to d-separate, no d-separator is possible.
if an_graph.is_dconnected(start, end, observed=separator):
return None
# Go through the separator set, remove one element and check if it remains
# a dseparating set.
minimal_separator = separator.copy()
for u in separator:
if not an_graph.is_dconnected(start, end, observed=minimal_separator - {u}):
minimal_separator.remove(u)
return minimal_separator
[docs]
def get_markov_blanket(self, node):
"""
Returns a markov blanket for a random variable. In the case
of Bayesian Networks, the markov blanket is the set of
node's parents, its children and its children's other parents.
Returns
-------
Markov Blanket: list
List of nodes in the markov blanket of `node`.
Parameters
----------
node: string, int or any hashable python object.
The node whose markov blanket would be returned.
Examples
--------
>>> from pgmpy.base import DAG
>>> from pgmpy.factors.discrete import TabularCPD
>>> G = DAG([('x', 'y'), ('z', 'y'), ('y', 'w'), ('y', 'v'), ('u', 'w'),
('s', 'v'), ('w', 't'), ('w', 'm'), ('v', 'n'), ('v', 'q')])
>>> G.get_markov_blanket('y')
['s', 'w', 'x', 'u', 'z', 'v']
"""
children = self.get_children(node)
parents = self.get_parents(node)
blanket_nodes = children + parents
for child_node in children:
blanket_nodes.extend(self.get_parents(child_node))
blanket_nodes = set(blanket_nodes)
blanket_nodes.discard(node)
return list(blanket_nodes)
[docs]
def active_trail_nodes(self, variables, observed=None, include_latents=False):
"""
Returns a dictionary with the given variables as keys and all the nodes reachable
from that respective variable as values.
Parameters
----------
variables: str or array like
variables whose active trails are to be found.
observed : List of nodes (optional)
If given the active trails would be computed assuming these nodes to be
observed.
include_latents: boolean (default: False)
Whether to include the latent variables in the returned active trail nodes.
Examples
--------
>>> from pgmpy.base import DAG
>>> student = DAG()
>>> student.add_nodes_from(['diff', 'intel', 'grades'])
>>> student.add_edges_from([('diff', 'grades'), ('intel', 'grades')])
>>> student.active_trail_nodes('diff')
{'diff': {'diff', 'grades'}}
>>> student.active_trail_nodes(['diff', 'intel'], observed='grades')
{'diff': {'diff', 'intel'}, 'intel': {'diff', 'intel'}}
References
----------
Details of the algorithm can be found in 'Probabilistic Graphical Model
Principles and Techniques' - Koller and Friedman
Page 75 Algorithm 3.1
"""
if observed:
if isinstance(observed, set):
observed = list(observed)
observed_list = (
observed if isinstance(observed, (list, tuple)) else [observed]
)
else:
observed_list = []
ancestors_list = self._get_ancestors_of(observed_list)
# Direction of flow of information
# up -> from parent to child
# down -> from child to parent
active_trails = {}
for start in variables if isinstance(variables, list) else [variables]:
visit_list = set()
visit_list.add((start, "up"))
traversed_list = set()
active_nodes = set()
while visit_list:
node, direction = visit_list.pop()
if (node, direction) not in traversed_list:
if node not in observed_list:
active_nodes.add(node)
traversed_list.add((node, direction))
if direction == "up" and node not in observed_list:
for parent in self.predecessors(node):
visit_list.add((parent, "up"))
for child in self.successors(node):
visit_list.add((child, "down"))
elif direction == "down":
if node not in observed_list:
for child in self.successors(node):
visit_list.add((child, "down"))
if node in ancestors_list:
for parent in self.predecessors(node):
visit_list.add((parent, "up"))
if include_latents:
active_trails[start] = active_nodes
else:
active_trails[start] = active_nodes - self.latents
return active_trails
def _get_ancestors_of(self, nodes):
"""
Returns a dictionary of all ancestors of all the observed nodes including the
node itself.
Parameters
----------
nodes: string, list-type
name of all the observed nodes
Examples
--------
>>> from pgmpy.base import DAG
>>> model = DAG([('D', 'G'), ('I', 'G'), ('G', 'L'),
... ('I', 'L')])
>>> model._get_ancestors_of('G')
{'D', 'G', 'I'}
>>> model._get_ancestors_of(['G', 'I'])
{'D', 'G', 'I'}
"""
if not isinstance(nodes, (list, tuple)):
nodes = [nodes]
for node in nodes:
if node not in self.nodes():
raise ValueError(f"Node {node} not in not in graph")
ancestors_list = set()
nodes_list = set(nodes)
while nodes_list:
node = nodes_list.pop()
if node not in ancestors_list:
nodes_list.update(self.predecessors(node))
ancestors_list.add(node)
return ancestors_list
# TODO: Commented out till the method is implemented.
# def to_pdag(self):
# """
# Returns the PDAG (the equivalence class of DAG; also known as CPDAG) of the DAG.
#
# Returns
# -------
# Partially oriented DAG: pgmpy.base.PDAG
# An instance of pgmpy.base.PDAG.
#
# Examples
# --------
#
# """
# pass
[docs]
def do(self, nodes, inplace=False):
"""
Applies the do operator to the graph and returns a new DAG with the
transformed graph.
The do-operator, do(X = x) has the effect of removing all edges from
the parents of X and setting X to the given value x.
Parameters
----------
nodes : list, array-like
The names of the nodes to apply the do-operator for.
inplace: boolean (default: False)
If inplace=True, makes the changes to the current object,
otherwise returns a new instance.
Returns
-------
Modified DAG: pgmpy.base.DAG
A new instance of DAG modified by the do-operator
Examples
--------
Initialize a DAG
>>> graph = DAG()
>>> graph.add_edges_from([('X', 'A'),
... ('A', 'Y'),
... ('A', 'B')])
>>> # Applying the do-operator will return a new DAG with the desired structure.
>>> graph_do_A = graph.do('A')
>>> # Which we can verify is missing the edges we would expect.
>>> graph_do_A.edges
OutEdgeView([('A', 'B'), ('A', 'Y')])
References
----------
Causality: Models, Reasoning, and Inference, Judea Pearl (2000). p.70.
"""
dag = self if inplace else self.copy()
if isinstance(nodes, (str, int)):
nodes = [nodes]
else:
nodes = list(nodes)
if not set(nodes).issubset(set(self.nodes())):
raise ValueError(
f"Nodes not found in the model: {set(nodes) - set(self.nodes)}"
)
for node in nodes:
parents = list(dag.predecessors(node))
for parent in parents:
dag.remove_edge(parent, node)
return dag
[docs]
def get_ancestral_graph(self, nodes):
"""
Returns the ancestral graph of the given `nodes`. The ancestral graph only
contains the nodes which are ancestors of atleast one of the variables in
node.
Parameters
----------
node: iterable
List of nodes whose ancestral graph needs to be computed.
Returns
-------
Ancestral Graph: pgmpy.base.DAG
Examples
--------
>>> from pgmpy.base import DAG
>>> dag = DAG([('A', 'C'), ('B', 'C'), ('D', 'A'), ('D', 'B')])
>>> anc_dag = dag.get_ancestral_graph(nodes=['A', 'B'])
>>> anc_dag.edges()
OutEdgeView([('D', 'A'), ('D', 'B')])
"""
return self.subgraph(nodes=self._get_ancestors_of(nodes=nodes))
[docs]
def to_daft(
self,
node_pos="circular",
latex=True,
pgm_params={},
edge_params={},
node_params={},
):
"""
Returns a daft (https://docs.daft-pgm.org/en/latest/) object which can be rendered for
publication quality plots. The returned object's render method can be called to see the plots.
Parameters
----------
node_pos: str or dict (default: circular)
If str: Must be one of the following: circular, kamada_kawai, planar, random, shell, sprint,
spectral, spiral. Please refer: https://networkx.org/documentation/stable//reference/drawing.html#module-networkx.drawing.layout for details on these layouts.
If dict should be of the form {node: (x coordinate, y coordinate)} describing the x and y coordinate of each
node.
If no argument is provided uses circular layout.
latex: boolean
Whether to use latex for rendering the node names.
pgm_params: dict (optional)
Any additional parameters that need to be passed to `daft.PGM` initializer.
Should be of the form: {param_name: param_value}
edge_params: dict (optional)
Any additional edge parameters that need to be passed to `daft.add_edge` method.
Should be of the form: {(u1, v1): {param_name: param_value}, (u2, v2): {...} }
node_params: dict (optional)
Any additional node parameters that need to be passed to `daft.add_node` method.
Should be of the form: {node1: {param_name: param_value}, node2: {...} }
Returns
-------
Daft object: daft.PGM object
Daft object for plotting the DAG.
Examples
--------
>>> from pgmpy.base import DAG
>>> dag = DAG([('a', 'b'), ('b', 'c'), ('d', 'c')])
>>> dag.to_daft(node_pos={'a': (0, 0), 'b': (1, 0), 'c': (2, 0), 'd': (1, 1)})
<daft.PGM at 0x7fc756e936d0>
>>> dag.to_daft(node_pos="circular")
<daft.PGM at 0x7f9bb48c5eb0>
>>> dag.to_daft(node_pos="circular", pgm_params={'observed_style': 'inner'})
<daft.PGM at 0x7f9bb48b0bb0>
>>> dag.to_daft(node_pos="circular",
... edge_params={('a', 'b'): {'label': 2}},
... node_params={'a': {'shape': 'rectangle'}})
<daft.PGM at 0x7f9bb48b0bb0>
"""
try:
from daft import PGM
except ImportError as e:
raise ImportError(
"Package daft required. Please visit: https://docs.daft-pgm.org/en/latest/ for installation instructions."
)
if isinstance(node_pos, str):
supported_layouts = {
"circular": nx.circular_layout,
"kamada_kawai": nx.kamada_kawai_layout,
"planar": nx.planar_layout,
"random": nx.random_layout,
"shell": nx.shell_layout,
"spring": nx.spring_layout,
"spectral": nx.spectral_layout,
"spiral": nx.spiral_layout,
}
if node_pos not in supported_layouts.keys():
raise ValueError(
"Unknown node_pos argument. Please refer docstring for accepted values"
)
else:
node_pos = supported_layouts[node_pos](self)
elif isinstance(node_pos, dict):
for node in self.nodes():
if node not in node_pos.keys():
raise ValueError(f"No position specified for {node}.")
else:
raise ValueError(
"Argument node_pos not valid. Please refer to the docstring."
)
daft_pgm = PGM(**pgm_params)
for node in self.nodes():
try:
extra_params = node_params[node]
except KeyError:
extra_params = dict()
if latex:
daft_pgm.add_node(
node,
rf"${node}$",
node_pos[node][0],
node_pos[node][1],
observed=True,
**extra_params,
)
else:
daft_pgm.add_node(
node,
f"{node}",
node_pos[node][0],
node_pos[node][1],
observed=True,
**extra_params,
)
for u, v in self.edges():
try:
extra_params = edge_params[(u, v)]
except KeyError:
extra_params = dict()
daft_pgm.add_edge(u, v, **extra_params)
return daft_pgm
[docs]
@staticmethod
def get_random(n_nodes=5, edge_prob=0.5, node_names=None, latents=False):
"""
Returns a randomly generated DAG with `n_nodes` number of nodes with
edge probability being `edge_prob`.
Parameters
----------
n_nodes: int
The number of nodes in the randomly generated DAG.
edge_prob: float
The probability of edge between any two nodes in the topologically
sorted DAG.
node_names: list (default: None)
A list of variables names to use in the random graph.
If None, the node names are integer values starting from 0.
latents: bool (default: False)
If True, includes latent variables in the generated DAG.
Returns
-------
Random DAG: pgmpy.base.DAG
The randomly generated DAG.
Examples
--------
>>> from pgmpy.base import DAG
>>> random_dag = DAG.get_random(n_nodes=10, edge_prob=0.3)
>>> random_dag.nodes()
NodeView((0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
>>> random_dag.edges()
OutEdgeView([(0, 6), (1, 6), (1, 7), (7, 9), (2, 5), (2, 7), (2, 8), (5, 9), (3, 7)])
"""
# Step 1: Generate a matrix of 0 and 1. Prob of choosing 1 = edge_prob
adj_mat = np.random.choice(
[0, 1], size=(n_nodes, n_nodes), p=[1 - edge_prob, edge_prob]
)
# Step 2: Use the upper triangular part of the matrix as adjacency.
if node_names is None:
node_names = list(range(n_nodes))
adj_pd = pd.DataFrame(
np.triu(adj_mat, k=1), columns=node_names, index=node_names
)
nx_dag = nx.from_pandas_adjacency(adj_pd, create_using=nx.DiGraph)
dag = DAG(nx_dag)
dag.add_nodes_from(node_names)
if latents:
dag.latents = set(
np.random.choice(
dag.nodes(), np.random.randint(low=0, high=len(dag.nodes()))
)
)
return dag
[docs]
def to_graphviz(self):
"""
Retuns a pygraphviz object for the DAG. pygraphviz is useful for
visualizing the network structure.
Examples
--------
>>> from pgmpy.utils import get_example_model
>>> model = get_example_model('alarm')
>>> model.to_graphviz()
<AGraph <Swig Object of type 'Agraph_t *' at 0x7fdea4cde040>>
>>> model.draw('model.png', prog='neato')
"""
return nx.nx_agraph.to_agraph(self)
[docs]
class PDAG(nx.DiGraph):
"""
Class for representing PDAGs (also known as CPDAG). PDAGs are the equivalence classes of
DAGs and contain both directed and undirected edges.
Note: In this class, undirected edges are represented using two edges in both direction i.e.
an undirected edge between X - Y is represented using X -> Y and X <- Y.
"""
def __init__(self, directed_ebunch=[], undirected_ebunch=[], latents=[]):
"""
Initializes a PDAG class.
Parameters
----------
directed_ebunch: list, array-like of 2-tuples
List of directed edges in the PDAG.
undirected_ebunch: list, array-like of 2-tuples
List of undirected edges in the PDAG.
latents: list, array-like
List of nodes which are latent variables.
Returns
-------
An instance of the PDAG object.
Examples
--------
"""
super(PDAG, self).__init__(
directed_ebunch
+ undirected_ebunch
+ [(Y, X) for (X, Y) in undirected_ebunch]
)
self.latents = set(latents)
self.directed_edges = set(directed_ebunch)
self.undirected_edges = set(undirected_ebunch)
# TODO: Fix the cycle issue
# import pdb; pdb.set_trace()
# try:
# # Filter out undirected edges as they also form a cycle in
# # themself when represented using directed edges.
# cycles = filter(lambda t: len(t) > 2, nx.simple_cycles(self))
# if cycles:
# out_str = "Cycles are not allowed in a PDAG. "
# out_str += "The following path forms a loop: "
# out_str += "".join(["({u},{v}) ".format(u=u, v=v) for (u, v) in cycles])
# raise ValueError(out_str)
# except nx.NetworkXNoCycle:
# pass
[docs]
def copy(self):
"""
Returns a copy of the object instance.
Returns
-------
Copy of PDAG: pgmpy.dag.PDAG
Returns a copy of self.
"""
return PDAG(
directed_ebunch=list(self.directed_edges.copy()),
undirected_ebunch=list(self.undirected_edges.copy()),
latents=self.latents,
)
[docs]
def to_dag(self, required_edges=[]):
"""
Returns one possible DAG which is represented using the PDAG.
Parameters
----------
required_edges: list, array-like of 2-tuples
The list of edges that should be included in the DAG.
Returns
-------
Returns an instance of DAG.
Examples
--------
"""
# Add required edges if it doesn't form a new v-structure or an opposite edge
# is already present in the network.
dag = DAG()
# Add all the nodes and the directed edges
dag.add_nodes_from(self.nodes())
dag.add_edges_from(self.directed_edges)
dag.latents = self.latents
pdag = self.copy()
while pdag.number_of_nodes() > 0:
# find node with (1) no directed outgoing edges and
# (2) the set of undirected neighbors is either empty or
# undirected neighbors + parents of X are a clique
found = False
for X in pdag.nodes():
directed_outgoing_edges = set(pdag.successors(X)) - set(
pdag.predecessors(X)
)
undirected_neighbors = set(pdag.successors(X)) & set(
pdag.predecessors(X)
)
neighbors_are_clique = all(
(
pdag.has_edge(Y, Z)
for Z in pdag.predecessors(X)
for Y in undirected_neighbors
if not Y == Z
)
)
if not directed_outgoing_edges and (
not undirected_neighbors or neighbors_are_clique
):
found = True
# add all edges of X as outgoing edges to dag
for Y in pdag.predecessors(X):
dag.add_edge(Y, X)
pdag.remove_node(X)
break
if not found:
logger.warning(
"PDAG has no faithful extension (= no oriented DAG with the "
+ "same v-structures as PDAG). Remaining undirected PDAG edges "
+ "oriented arbitrarily."
)
for X, Y in pdag.edges():
if not dag.has_edge(Y, X):
try:
dag.add_edge(X, Y)
except ValueError:
pass
break
return dag
[docs]
def to_graphviz(self):
"""
Retuns a pygraphviz object for the DAG. pygraphviz is useful for
visualizing the network structure.
Examples
--------
>>> from pgmpy.utils import get_example_model
>>> model = get_example_model('alarm')
>>> model.to_graphviz()
<AGraph <Swig Object of type 'Agraph_t *' at 0x7fdea4cde040>>
"""
return nx.nx_agraph.to_agraph(self)