#!/usr/bin/env python3
import itertools
import networkx as nx
import numpy as np
import pandas as pd
from pgmpy.base import UndirectedGraph
from pgmpy.global_vars import logger
from pgmpy.independencies import Independencies
[docs]
class DAG(nx.DiGraph):
"""
Base class for all Directed Graphical Models.
Each node in the graph can represent either a random variable, `Factor`,
or a cluster of random variables. Edges in the graph represent the
dependencies between these.
Parameters
----------
data: input graph
Data to initialize graph. If data=None (default) an empty graph is
created. The data can be an edge list or any Networkx graph object.
Examples
--------
Create an empty DAG with no nodes and no edges
>>> from pgmpy.base import DAG
>>> G = DAG()
G can be grown in several ways:
**Nodes:**
Add one node at a time:
>>> G.add_node(node='a')
Add the nodes from any container (a list, set or tuple or the nodes
from another graph).
>>> G.add_nodes_from(nodes=['a', 'b'])
**Edges:**
G can also be grown by adding edges.
Add one edge,
>>> G.add_edge(u='a', v='b')
a list of edges,
>>> G.add_edges_from(ebunch=[('a', 'b'), ('b', 'c')])
If some edges connect nodes not yet in the model, the nodes
are added automatically. There are no errors when adding
nodes or edges that already exist.
**Shortcuts:**
Many common graph features allow python syntax for speed reporting.
>>> 'a' in G # check if node in graph
True
>>> len(G) # number of nodes in graph
3
"""
def __init__(self, ebunch=None, latents=set()):
super(DAG, self).__init__(ebunch)
self.latents = set(latents)
cycles = []
try:
cycles = list(nx.find_cycle(self))
except nx.NetworkXNoCycle:
pass
else:
out_str = "Cycles are not allowed in a DAG."
out_str += "\nEdges indicating the path taken for a loop: "
out_str += "".join([f"({u},{v}) " for (u, v) in cycles])
raise ValueError(out_str)
[docs]
def add_node(self, node, weight=None, latent=False):
"""
Adds a single node to the Graph.
Parameters
----------
node: str, int, or any hashable python object.
The node to add to the graph.
weight: int, float
The weight of the node.
latent: boolean (default: False)
Specifies whether the variable is latent or not.
Examples
--------
>>> from pgmpy.base import DAG
>>> G = DAG()
>>> G.add_node(node='A')
>>> sorted(G.nodes())
['A']
Adding a node with some weight.
>>> G.add_node(node='B', weight=0.3)
The weight of these nodes can be accessed as:
>>> G.nodes['B']
{'weight': 0.3}
>>> G.nodes['A']
{'weight': None}
"""
# Check for networkx 2.0 syntax
if isinstance(node, tuple) and len(node) == 2 and isinstance(node[1], dict):
node, attrs = node
if attrs.get("weight", None) is not None:
attrs["weight"] = weight
else:
attrs = {"weight": weight}
if latent:
self.latents.add(node)
super(DAG, self).add_node(node, weight=weight)
[docs]
def add_nodes_from(self, nodes, weights=None, latent=False):
"""
Add multiple nodes to the Graph.
**The behviour of adding weights is different than in networkx.
Parameters
----------
nodes: iterable container
A container of nodes (list, dict, set, or any hashable python
object).
weights: list, tuple (default=None)
A container of weights (int, float). The weight value at index i
is associated with the variable at index i.
latent: list, tuple (default=False)
A container of boolean. The value at index i tells whether the
node at index i is latent or not.
Examples
--------
>>> from pgmpy.base import DAG
>>> G = DAG()
>>> G.add_nodes_from(nodes=['A', 'B', 'C'])
>>> G.nodes()
NodeView(('A', 'B', 'C'))
Adding nodes with weights:
>>> G.add_nodes_from(nodes=['D', 'E'], weights=[0.3, 0.6])
>>> G.nodes['D']
{'weight': 0.3}
>>> G.nodes['E']
{'weight': 0.6}
>>> G.nodes['A']
{'weight': None}
"""
nodes = list(nodes)
if isinstance(latent, bool):
latent = [latent] * len(nodes)
if weights:
if len(nodes) != len(weights):
raise ValueError(
"The number of elements in nodes and weights" "should be equal."
)
for index in range(len(nodes)):
self.add_node(
node=nodes[index], weight=weights[index], latent=latent[index]
)
else:
for index in range(len(nodes)):
self.add_node(node=nodes[index], latent=latent[index])
[docs]
def add_edge(self, u, v, weight=None):
"""
Add an edge between u and v.
The nodes u and v will be automatically added if they are
not already in the graph.
Parameters
----------
u, v : nodes
Nodes can be any hashable Python object.
weight: int, float (default=None)
The weight of the edge
Examples
--------
>>> from pgmpy.base import DAG
>>> G = DAG()
>>> G.add_nodes_from(nodes=['Alice', 'Bob', 'Charles'])
>>> G.add_edge(u='Alice', v='Bob')
>>> G.nodes()
NodeView(('Alice', 'Bob', 'Charles'))
>>> G.edges()
OutEdgeView([('Alice', 'Bob')])
When the node is not already present in the graph:
>>> G.add_edge(u='Alice', v='Ankur')
>>> G.nodes()
NodeView(('Alice', 'Ankur', 'Bob', 'Charles'))
>>> G.edges()
OutEdgeView([('Alice', 'Bob'), ('Alice', 'Ankur')])
Adding edges with weight:
>>> G.add_edge('Ankur', 'Maria', weight=0.1)
>>> G.edge['Ankur']['Maria']
{'weight': 0.1}
"""
super(DAG, self).add_edge(u, v, weight=weight)
[docs]
def add_edges_from(self, ebunch, weights=None):
"""
Add all the edges in ebunch.
If nodes referred in the ebunch are not already present, they
will be automatically added. Node names can be any hashable python
object.
**The behavior of adding weights is different than networkx.
Parameters
----------
ebunch : container of edges
Each edge given in the container will be added to the graph.
The edges must be given as 2-tuples (u, v).
weights: list, tuple (default=None)
A container of weights (int, float). The weight value at index i
is associated with the edge at index i.
Examples
--------
>>> from pgmpy.base import DAG
>>> G = DAG()
>>> G.add_nodes_from(nodes=['Alice', 'Bob', 'Charles'])
>>> G.add_edges_from(ebunch=[('Alice', 'Bob'), ('Bob', 'Charles')])
>>> G.nodes()
NodeView(('Alice', 'Bob', 'Charles'))
>>> G.edges()
OutEdgeView([('Alice', 'Bob'), ('Bob', 'Charles')])
When the node is not already in the model:
>>> G.add_edges_from(ebunch=[('Alice', 'Ankur')])
>>> G.nodes()
NodeView(('Alice', 'Bob', 'Charles', 'Ankur'))
>>> G.edges()
OutEdgeView([('Alice', 'Bob'), ('Bob', 'Charles'), ('Alice', 'Ankur')])
Adding edges with weights:
>>> G.add_edges_from([('Ankur', 'Maria'), ('Maria', 'Mason')],
... weights=[0.3, 0.5])
>>> G.edge['Ankur']['Maria']
{'weight': 0.3}
>>> G.edge['Maria']['Mason']
{'weight': 0.5}
"""
ebunch = list(ebunch)
if weights:
if len(ebunch) != len(weights):
raise ValueError(
"The number of elements in ebunch and weights" "should be equal"
)
for index in range(len(ebunch)):
self.add_edge(ebunch[index][0], ebunch[index][1], weight=weights[index])
else:
for edge in ebunch:
self.add_edge(edge[0], edge[1])
[docs]
def get_parents(self, node):
"""
Returns a list of parents of node.
Throws an error if the node is not present in the graph.
Parameters
----------
node: string, int or any hashable python object.
The node whose parents would be returned.
Examples
--------
>>> from pgmpy.base import DAG
>>> G = DAG(ebunch=[('diff', 'grade'), ('intel', 'grade')])
>>> G.get_parents(node='grade')
['diff', 'intel']
"""
return list(self.predecessors(node))
[docs]
def moralize(self):
"""
Removes all the immoralities in the DAG and creates a moral
graph (UndirectedGraph).
A v-structure X->Z<-Y is an immorality if there is no directed edge
between X and Y.
Examples
--------
>>> from pgmpy.base import DAG
>>> G = DAG(ebunch=[('diff', 'grade'), ('intel', 'grade')])
>>> moral_graph = G.moralize()
>>> moral_graph.edges()
EdgeView([('intel', 'grade'), ('intel', 'diff'), ('grade', 'diff')])
"""
moral_graph = UndirectedGraph()
moral_graph.add_nodes_from(self.nodes())
moral_graph.add_edges_from(self.to_undirected().edges())
for node in self.nodes():
moral_graph.add_edges_from(
itertools.combinations(self.get_parents(node), 2)
)
return moral_graph
[docs]
def get_leaves(self):
"""
Returns a list of leaves of the graph.
Examples
--------
>>> from pgmpy.base import DAG
>>> graph = DAG([('A', 'B'), ('B', 'C'), ('B', 'D')])
>>> graph.get_leaves()
['C', 'D']
"""
return [node for node, out_degree in self.out_degree_iter() if out_degree == 0]
[docs]
def out_degree_iter(self, nbunch=None, weight=None):
if nx.__version__.startswith("1"):
return super(DAG, self).out_degree_iter(nbunch, weight)
else:
return iter(self.out_degree(nbunch, weight))
[docs]
def in_degree_iter(self, nbunch=None, weight=None):
if nx.__version__.startswith("1"):
return super(DAG, self).in_degree_iter(nbunch, weight)
else:
return iter(self.in_degree(nbunch, weight))
[docs]
def get_roots(self):
"""
Returns a list of roots of the graph.
Examples
--------
>>> from pgmpy.base import DAG
>>> graph = DAG([('A', 'B'), ('B', 'C'), ('B', 'D'), ('E', 'B')])
>>> graph.get_roots()
['A', 'E']
"""
return [
node for node, in_degree in dict(self.in_degree()).items() if in_degree == 0
]
[docs]
def get_children(self, node):
"""
Returns a list of children of node.
Throws an error if the node is not present in the graph.
Parameters
----------
node: string, int or any hashable python object.
The node whose children would be returned.
Examples
--------
>>> from pgmpy.base import DAG
>>> g = DAG(ebunch=[('A', 'B'), ('C', 'B'), ('B', 'D'),
('B', 'E'), ('B', 'F'), ('E', 'G')])
>>> g.get_children(node='B')
['D', 'E', 'F']
"""
return list(self.successors(node))
[docs]
def get_independencies(self, latex=False, include_latents=False):
"""
Computes independencies in the DAG, by checking minimal d-seperation.
Parameters
----------
latex: boolean
If latex=True then latex string of the independence assertion
would be created.
include_latents: boolean
If True, includes latent variables in the independencies. Otherwise,
only generates independencies on observed variables.
Examples
--------
>>> from pgmpy.base import DAG
>>> chain = DAG([('X', 'Y'), ('Y', 'Z')])
>>> chain.get_independencies()
(X \u27C2 Z | Y)
"""
nodes = set(self.nodes())
if not include_latents:
nodes -= self.latents
independencies = Independencies()
for x, y in itertools.combinations(nodes, 2):
if not self.has_edge(x, y) and not self.has_edge(y, x):
minimal_separator = self.minimal_dseparator(
start=x, end=y, include_latents=include_latents
)
if minimal_separator is not None:
independencies.add_assertions([x, y, minimal_separator])
independencies = independencies.reduce()
if not latex:
return independencies
else:
return independencies.latex_string()
[docs]
def local_independencies(self, variables):
"""
Returns an instance of Independencies containing the local independencies
of each of the variables.
Parameters
----------
variables: str or array like
variables whose local independencies are to be found.
Examples
--------
>>> from pgmpy.base import DAG
>>> student = DAG()
>>> student.add_edges_from([('diff', 'grade'), ('intel', 'grade'),
>>> ('grade', 'letter'), ('intel', 'SAT')])
>>> ind = student.local_independencies('grade')
>>> ind
(grade \u27C2 SAT | diff, intel)
"""
independencies = Independencies()
for variable in (
variables if isinstance(variables, (list, tuple)) else [variables]
):
non_descendents = (
set(self.nodes())
- {variable}
- set(nx.dfs_preorder_nodes(self, variable))
)
parents = set(self.get_parents(variable))
if non_descendents - parents:
independencies.add_assertions(
[variable, non_descendents - parents, parents]
)
return independencies
[docs]
def is_iequivalent(self, model):
"""
Checks whether the given model is I-equivalent
Two graphs G1 and G2 are said to be I-equivalent if they have same skeleton
and have same set of immoralities.
Parameters
----------
model : A DAG object, for which you want to check I-equivalence
Returns
--------
I-equivalence: boolean
True if both are I-equivalent, False otherwise
Examples
--------
>>> from pgmpy.base import DAG
>>> G = DAG()
>>> G.add_edges_from([('V', 'W'), ('W', 'X'),
... ('X', 'Y'), ('Z', 'Y')])
>>> G1 = DAG()
>>> G1.add_edges_from([('W', 'V'), ('X', 'W'),
... ('X', 'Y'), ('Z', 'Y')])
>>> G.is_iequivalent(G1)
True
"""
if not isinstance(model, DAG):
raise TypeError(
f"Model must be an instance of DAG. Got type: {type(model)}"
)
if (self.to_undirected().edges() == model.to_undirected().edges()) and (
self.get_immoralities() == model.get_immoralities()
):
return True
return False
[docs]
def get_immoralities(self):
"""
Finds all the immoralities in the model
A v-structure X -> Z <- Y is an immorality if there is no direct edge between X and Y .
Returns
-------
Immoralities: set
A set of all the immoralities in the model
Examples
---------
>>> from pgmpy.base import DAG
>>> student = DAG()
>>> student.add_edges_from([('diff', 'grade'), ('intel', 'grade'),
... ('intel', 'SAT'), ('grade', 'letter')])
>>> student.get_immoralities()
{('diff', 'intel')}
"""
immoralities = dict()
for node in self.nodes():
parent_pairs = []
for parents in itertools.combinations(self.predecessors(node), 2):
if not self.has_edge(parents[0], parents[1]) and not self.has_edge(
parents[1], parents[0]
):
parent_pairs.append(tuple(sorted(parents)))
immoralities[node] = parent_pairs
return immoralities
[docs]
def is_dconnected(self, start, end, observed=None, include_latents=False):
"""
Returns True if there is an active trail (i.e. d-connection) between
`start` and `end` node given that `observed` is observed.
Parameters
----------
start, end : int, str, any hashable python object.
The nodes in the DAG between which to check the d-connection/active trail.
observed : list, array-like (optional)
If given the active trail would be computed assuming these nodes to
be observed.
include_latents: boolean (default: False)
If true, latent variables are return as part of the active trail.
Examples
--------
>>> from pgmpy.base import DAG
>>> student = DAG()
>>> student.add_nodes_from(['diff', 'intel', 'grades', 'letter', 'sat'])
>>> student.add_edges_from([('diff', 'grades'), ('intel', 'grades'), ('grades', 'letter'),
... ('intel', 'sat')])
>>> student.is_dconnected('diff', 'intel')
False
>>> student.is_dconnected('grades', 'sat')
True
"""
if (
end
in self.active_trail_nodes(
variables=start, observed=observed, include_latents=include_latents
)[start]
):
return True
else:
return False
[docs]
def minimal_dseparator(self, start, end, include_latents=False):
"""
Finds the minimal d-separating set for `start` and `end`.
Parameters
----------
start: node
The first node.
end: node
The second node.
include_latents: boolean (default: False)
If true, latent variables are consider for minimal d-seperator.
Examples
--------
>>> dag = DAG([('A', 'B'), ('B', 'C')])
>>> dag.minimal_dseparator(start='A', end='C')
{'B'}
References
----------
[1] Algorithm 4, Page 10: Tian, Jin, Azaria Paz, and Judea Pearl. Finding minimal d-separators. Computer Science Department, University of California, 1998.
"""
if (end in self.neighbors(start)) or (start in self.neighbors(end)):
raise ValueError(
"No possible separators because start and end are adjacent"
)
an_graph = self.get_ancestral_graph([start, end])
separator = set(
itertools.chain(self.predecessors(start), self.predecessors(end))
)
if not include_latents:
# If any of the parents were latents, take the latent's parent
while len(separator.intersection(self.latents)) != 0:
separator_copy = separator.copy()
for u in separator:
if u in self.latents:
separator_copy.remove(u)
separator_copy.update(set(self.predecessors(u)))
separator = separator_copy
# Remove the start and end nodes in case it reaches there while removing latents.
separator.difference_update({start, end})
# If the initial set is not able to d-separate, no d-separator is possible.
if an_graph.is_dconnected(start, end, observed=separator):
return None
# Go through the separator set, remove one element and check if it remains
# a dseparating set.
minimal_separator = separator.copy()
for u in separator:
if not an_graph.is_dconnected(start, end, observed=minimal_separator - {u}):
minimal_separator.remove(u)
return minimal_separator
[docs]
def get_markov_blanket(self, node):
"""
Returns a markov blanket for a random variable. In the case
of Bayesian Networks, the markov blanket is the set of
node's parents, its children and its children's other parents.
Returns
-------
Markov Blanket: list
List of nodes in the markov blanket of `node`.
Parameters
----------
node: string, int or any hashable python object.
The node whose markov blanket would be returned.
Examples
--------
>>> from pgmpy.base import DAG
>>> from pgmpy.factors.discrete import TabularCPD
>>> G = DAG([('x', 'y'), ('z', 'y'), ('y', 'w'), ('y', 'v'), ('u', 'w'),
('s', 'v'), ('w', 't'), ('w', 'm'), ('v', 'n'), ('v', 'q')])
>>> G.get_markov_blanket('y')
['s', 'w', 'x', 'u', 'z', 'v']
"""
children = self.get_children(node)
parents = self.get_parents(node)
blanket_nodes = children + parents
for child_node in children:
blanket_nodes.extend(self.get_parents(child_node))
blanket_nodes = set(blanket_nodes)
blanket_nodes.discard(node)
return list(blanket_nodes)
[docs]
def active_trail_nodes(self, variables, observed=None, include_latents=False):
"""
Returns a dictionary with the given variables as keys and all the nodes reachable
from that respective variable as values.
Parameters
----------
variables: str or array like
variables whose active trails are to be found.
observed : List of nodes (optional)
If given the active trails would be computed assuming these nodes to be
observed.
include_latents: boolean (default: False)
Whether to include the latent variables in the returned active trail nodes.
Examples
--------
>>> from pgmpy.base import DAG
>>> student = DAG()
>>> student.add_nodes_from(['diff', 'intel', 'grades'])
>>> student.add_edges_from([('diff', 'grades'), ('intel', 'grades')])
>>> student.active_trail_nodes('diff')
{'diff': {'diff', 'grades'}}
>>> student.active_trail_nodes(['diff', 'intel'], observed='grades')
{'diff': {'diff', 'intel'}, 'intel': {'diff', 'intel'}}
References
----------
Details of the algorithm can be found in 'Probabilistic Graphical Model
Principles and Techniques' - Koller and Friedman
Page 75 Algorithm 3.1
"""
if observed:
if isinstance(observed, set):
observed = list(observed)
observed_list = (
observed if isinstance(observed, (list, tuple)) else [observed]
)
else:
observed_list = []
ancestors_list = self._get_ancestors_of(observed_list)
# Direction of flow of information
# up -> from parent to child
# down -> from child to parent
active_trails = {}
for start in variables if isinstance(variables, list) else [variables]:
visit_list = set()
visit_list.add((start, "up"))
traversed_list = set()
active_nodes = set()
while visit_list:
node, direction = visit_list.pop()
if (node, direction) not in traversed_list:
if node not in observed_list:
active_nodes.add(node)
traversed_list.add((node, direction))
if direction == "up" and node not in observed_list:
for parent in self.predecessors(node):
visit_list.add((parent, "up"))
for child in self.successors(node):
visit_list.add((child, "down"))
elif direction == "down":
if node not in observed_list:
for child in self.successors(node):
visit_list.add((child, "down"))
if node in ancestors_list:
for parent in self.predecessors(node):
visit_list.add((parent, "up"))
if include_latents:
active_trails[start] = active_nodes
else:
active_trails[start] = active_nodes - self.latents
return active_trails
def _get_ancestors_of(self, nodes):
"""
Returns a dictionary of all ancestors of all the observed nodes including the
node itself.
Parameters
----------
nodes: string, list-type
name of all the observed nodes
Examples
--------
>>> from pgmpy.base import DAG
>>> model = DAG([('D', 'G'), ('I', 'G'), ('G', 'L'),
... ('I', 'L')])
>>> model._get_ancestors_of('G')
{'D', 'G', 'I'}
>>> model._get_ancestors_of(['G', 'I'])
{'D', 'G', 'I'}
"""
if not isinstance(nodes, (list, tuple)):
nodes = [nodes]
for node in nodes:
if node not in self.nodes():
raise ValueError(f"Node {node} not in graph")
ancestors_list = set()
nodes_list = set(nodes)
while nodes_list:
node = nodes_list.pop()
if node not in ancestors_list:
nodes_list.update(self.predecessors(node))
ancestors_list.add(node)
return ancestors_list
# TODO: Commented out till the method is implemented.
# def to_pdag(self):
# """
# Returns the PDAG (the equivalence class of DAG; also known as CPDAG) of the DAG.
#
# Returns
# -------
# Partially oriented DAG: pgmpy.base.PDAG
# An instance of pgmpy.base.PDAG.
#
# Examples
# --------
#
# """
# pass
[docs]
def do(self, nodes, inplace=False):
"""
Applies the do operator to the graph and returns a new DAG with the
transformed graph.
The do-operator, do(X = x) has the effect of removing all edges from
the parents of X and setting X to the given value x.
Parameters
----------
nodes : list, array-like
The names of the nodes to apply the do-operator for.
inplace: boolean (default: False)
If inplace=True, makes the changes to the current object,
otherwise returns a new instance.
Returns
-------
Modified DAG: pgmpy.base.DAG
A new instance of DAG modified by the do-operator
Examples
--------
Initialize a DAG
>>> graph = DAG()
>>> graph.add_edges_from([('X', 'A'),
... ('A', 'Y'),
... ('A', 'B')])
>>> # Applying the do-operator will return a new DAG with the desired structure.
>>> graph_do_A = graph.do('A')
>>> # Which we can verify is missing the edges we would expect.
>>> graph_do_A.edges
OutEdgeView([('A', 'B'), ('A', 'Y')])
References
----------
Causality: Models, Reasoning, and Inference, Judea Pearl (2000). p.70.
"""
dag = self if inplace else self.copy()
if isinstance(nodes, (str, int)):
nodes = [nodes]
else:
nodes = list(nodes)
if not set(nodes).issubset(set(self.nodes())):
raise ValueError(
f"Nodes not found in the model: {set(nodes) - set(self.nodes)}"
)
for node in nodes:
parents = list(dag.predecessors(node))
for parent in parents:
dag.remove_edge(parent, node)
return dag
[docs]
def get_ancestral_graph(self, nodes):
"""
Returns the ancestral graph of the given `nodes`. The ancestral graph only
contains the nodes which are ancestors of atleast one of the variables in
node.
Parameters
----------
node: iterable
List of nodes whose ancestral graph needs to be computed.
Returns
-------
Ancestral Graph: pgmpy.base.DAG
Examples
--------
>>> from pgmpy.base import DAG
>>> dag = DAG([('A', 'C'), ('B', 'C'), ('D', 'A'), ('D', 'B')])
>>> anc_dag = dag.get_ancestral_graph(nodes=['A', 'B'])
>>> anc_dag.edges()
OutEdgeView([('D', 'A'), ('D', 'B')])
"""
return self.subgraph(nodes=self._get_ancestors_of(nodes=nodes))
[docs]
def to_daft(
self,
node_pos="circular",
latex=True,
pgm_params={},
edge_params={},
node_params={},
):
"""
Returns a daft (https://docs.daft-pgm.org/en/latest/) object which can be rendered for
publication quality plots. The returned object's render method can be called to see the plots.
Parameters
----------
node_pos: str or dict (default: circular)
If str: Must be one of the following: circular, kamada_kawai, planar, random, shell, sprint,
spectral, spiral. Please refer: https://networkx.org/documentation/stable//reference/drawing.html#module-networkx.drawing.layout for details on these layouts.
If dict should be of the form {node: (x coordinate, y coordinate)} describing the x and y coordinate of each
node.
If no argument is provided uses circular layout.
latex: boolean
Whether to use latex for rendering the node names.
pgm_params: dict (optional)
Any additional parameters that need to be passed to `daft.PGM` initializer.
Should be of the form: {param_name: param_value}
edge_params: dict (optional)
Any additional edge parameters that need to be passed to `daft.add_edge` method.
Should be of the form: {(u1, v1): {param_name: param_value}, (u2, v2): {...} }
node_params: dict (optional)
Any additional node parameters that need to be passed to `daft.add_node` method.
Should be of the form: {node1: {param_name: param_value}, node2: {...} }
Returns
-------
Daft object: daft.PGM object
Daft object for plotting the DAG.
Examples
--------
>>> from pgmpy.base import DAG
>>> dag = DAG([('a', 'b'), ('b', 'c'), ('d', 'c')])
>>> dag.to_daft(node_pos={'a': (0, 0), 'b': (1, 0), 'c': (2, 0), 'd': (1, 1)})
<daft.PGM at 0x7fc756e936d0>
>>> dag.to_daft(node_pos="circular")
<daft.PGM at 0x7f9bb48c5eb0>
>>> dag.to_daft(node_pos="circular", pgm_params={'observed_style': 'inner'})
<daft.PGM at 0x7f9bb48b0bb0>
>>> dag.to_daft(node_pos="circular",
... edge_params={('a', 'b'): {'label': 2}},
... node_params={'a': {'shape': 'rectangle'}})
<daft.PGM at 0x7f9bb48b0bb0>
"""
try:
from daft import PGM
except ImportError as e:
raise ImportError(
"Package daft required. Please visit: https://docs.daft-pgm.org/en/latest/ for installation instructions."
)
if isinstance(node_pos, str):
supported_layouts = {
"circular": nx.circular_layout,
"kamada_kawai": nx.kamada_kawai_layout,
"planar": nx.planar_layout,
"random": nx.random_layout,
"shell": nx.shell_layout,
"spring": nx.spring_layout,
"spectral": nx.spectral_layout,
"spiral": nx.spiral_layout,
}
if node_pos not in supported_layouts.keys():
raise ValueError(
"Unknown node_pos argument. Please refer docstring for accepted values"
)
else:
node_pos = supported_layouts[node_pos](self)
elif isinstance(node_pos, dict):
for node in self.nodes():
if node not in node_pos.keys():
raise ValueError(f"No position specified for {node}.")
else:
raise ValueError(
"Argument node_pos not valid. Please refer to the docstring."
)
daft_pgm = PGM(**pgm_params)
for node in self.nodes():
try:
extra_params = node_params[node]
except KeyError:
extra_params = dict()
if latex:
daft_pgm.add_node(
node,
rf"${node}$",
node_pos[node][0],
node_pos[node][1],
observed=True,
**extra_params,
)
else:
daft_pgm.add_node(
node,
f"{node}",
node_pos[node][0],
node_pos[node][1],
observed=True,
**extra_params,
)
for u, v in self.edges():
try:
extra_params = edge_params[(u, v)]
except KeyError:
extra_params = dict()
daft_pgm.add_edge(u, v, **extra_params)
return daft_pgm
[docs]
@staticmethod
def get_random(n_nodes=5, edge_prob=0.5, node_names=None, latents=False, seed=None):
"""
Returns a randomly generated DAG with `n_nodes` number of nodes with
edge probability being `edge_prob`.
Parameters
----------
n_nodes: int
The number of nodes in the randomly generated DAG.
edge_prob: float
The probability of edge between any two nodes in the topologically
sorted DAG.
node_names: list (default: None)
A list of variables names to use in the random graph.
If None, the node names are integer values starting from 0.
latents: bool (default: False)
If True, includes latent variables in the generated DAG.
seed: int (default: None)
The seed for the random number generator.
Returns
-------
Random DAG: pgmpy.base.DAG
The randomly generated DAG.
Examples
--------
>>> from pgmpy.base import DAG
>>> random_dag = DAG.get_random(n_nodes=10, edge_prob=0.3)
>>> random_dag.nodes()
NodeView((0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
>>> random_dag.edges()
OutEdgeView([(0, 6), (1, 6), (1, 7), (7, 9), (2, 5), (2, 7), (2, 8), (5, 9), (3, 7)])
"""
# Step 1: Generate a matrix of 0 and 1. Prob of choosing 1 = edge_prob
gen = np.random.default_rng(seed=seed)
adj_mat = gen.choice(
[0, 1], size=(n_nodes, n_nodes), p=[1 - edge_prob, edge_prob]
)
# Step 2: Use the upper triangular part of the matrix as adjacency.
if node_names is None:
node_names = list(range(n_nodes))
adj_pd = pd.DataFrame(
np.triu(adj_mat, k=1), columns=node_names, index=node_names
)
nx_dag = nx.from_pandas_adjacency(adj_pd, create_using=nx.DiGraph)
dag = DAG(nx_dag)
dag.add_nodes_from(node_names)
if latents:
dag.latents = set(
gen.choice(dag.nodes(), gen.integers(low=0, high=len(dag.nodes())))
)
return dag
[docs]
def to_graphviz(self):
"""
Retuns a pygraphviz object for the DAG. pygraphviz is useful for
visualizing the network structure.
Examples
--------
>>> from pgmpy.utils import get_example_model
>>> model = get_example_model('alarm')
>>> model.to_graphviz()
<AGraph <Swig Object of type 'Agraph_t *' at 0x7fdea4cde040>>
>>> model.draw('model.png', prog='neato')
"""
return nx.nx_agraph.to_agraph(self)
[docs]
def fit(self, data, estimator=None, state_names=[], n_jobs=1, **kwargs):
"""
Estimates the CPD for each variable based on a given data set.
Parameters
----------
data: pandas DataFrame object
DataFrame object with column names identical to the variable names of the network.
(If some values in the data are missing the data cells should be set to `numpy.nan`.
Note that pandas converts each column containing `numpy.nan`s to dtype `float`.)
estimator: Estimator class
One of:
- MaximumLikelihoodEstimator (default)
- BayesianEstimator: In this case, pass 'prior_type' and either 'pseudo_counts'
or 'equivalent_sample_size' as additional keyword arguments.
See `BayesianEstimator.get_parameters()` for usage.
- ExpectationMaximization
state_names: dict (optional)
A dict indicating, for each variable, the discrete set of states
that the variable can take. If unspecified, the observed values
in the data set are taken to be the only possible states.
n_jobs: int (default: 1)
Number of threads/processes to use for estimation. Using n_jobs > 1
for small models or datasets might be slower.
Returns
-------
Fitted Model: BayesianNetwork
Returns a BayesianNetwork object with learned CPDs.
The DAG structure is preserved, and parameters (CPDs) are added.
This allows the DAG to represent both the structure and the parameters of a Bayesian Network.
Examples
--------
>>> import pandas as pd
>>> from pgmpy.models import BayesianNetwork
>>> from pgmpy.base import DAG
>>> data = pd.DataFrame(data={'A': [0, 0, 1], 'B': [0, 1, 0], 'C': [1, 1, 0]})
>>> model = DAG([('A', 'C'), ('B', 'C')])
>>> fitted_model = model.fit(data)
>>> fitted_model.get_cpds()
[<TabularCPD representing P(A:2) at 0x17945372c30>,
<TabularCPD representing P(B:2) at 0x17945a19760>,
<TabularCPD representing P(C:2 | A:2, B:2) at 0x17944f42690>]
"""
from pgmpy.estimators import BaseEstimator, MaximumLikelihoodEstimator
from pgmpy.models import BayesianNetwork
if isinstance(self, BayesianNetwork):
bn = self
else:
bn = BayesianNetwork(self.edges())
if estimator is None:
estimator = MaximumLikelihoodEstimator
else:
if not issubclass(estimator, BaseEstimator):
raise TypeError("Estimator object should be a valid pgmpy estimator.")
_estimator = estimator(
bn,
data,
state_names=state_names,
)
cpds_list = _estimator.get_parameters(n_jobs=n_jobs, **kwargs)
bn.add_cpds(*cpds_list)
return bn
[docs]
class PDAG(nx.DiGraph):
"""
Class for representing PDAGs (also known as CPDAG). PDAGs are the equivalence classes of
DAGs and contain both directed and undirected edges.
Note: In this class, undirected edges are represented using two edges in both direction i.e.
an undirected edge between X - Y is represented using X -> Y and X <- Y.
"""
def __init__(self, directed_ebunch=[], undirected_ebunch=[], latents=[]):
"""
Initializes a PDAG class.
Parameters
----------
directed_ebunch: list, array-like of 2-tuples
List of directed edges in the PDAG.
undirected_ebunch: list, array-like of 2-tuples
List of undirected edges in the PDAG.
latents: list, array-like
List of nodes which are latent variables.
Returns
-------
An instance of the PDAG object.
Examples
--------
"""
super(PDAG, self).__init__(
directed_ebunch
+ undirected_ebunch
+ [(Y, X) for (X, Y) in undirected_ebunch]
)
self.latents = set(latents)
self.directed_edges = set(directed_ebunch)
self.undirected_edges = set(undirected_ebunch)
# TODO: Fix the cycle issue
# import pdb; pdb.set_trace()
# try:
# # Filter out undirected edges as they also form a cycle in
# # themself when represented using directed edges.
# cycles = filter(lambda t: len(t) > 2, nx.simple_cycles(self))
# if cycles:
# out_str = "Cycles are not allowed in a PDAG. "
# out_str += "The following path forms a loop: "
# out_str += "".join(["({u},{v}) ".format(u=u, v=v) for (u, v) in cycles])
# raise ValueError(out_str)
# except nx.NetworkXNoCycle:
# pass
[docs]
def copy(self):
"""
Returns a copy of the object instance.
Returns
-------
Copy of PDAG: pgmpy.dag.PDAG
Returns a copy of self.
"""
return PDAG(
directed_ebunch=list(self.directed_edges.copy()),
undirected_ebunch=list(self.undirected_edges.copy()),
latents=self.latents,
)
[docs]
def to_dag(self, required_edges=[]):
"""
Returns one possible DAG which is represented using the PDAG.
Parameters
----------
required_edges: list, array-like of 2-tuples
The list of edges that should be included in the DAG.
Returns
-------
Returns an instance of DAG.
Examples
--------
"""
# Add required edges if it doesn't form a new v-structure or an opposite edge
# is already present in the network.
dag = DAG()
# Add all the nodes and the directed edges
dag.add_nodes_from(self.nodes())
dag.add_edges_from(self.directed_edges)
dag.latents = self.latents
pdag = self.copy()
while pdag.number_of_nodes() > 0:
# find node with (1) no directed outgoing edges and
# (2) the set of undirected neighbors is either empty or
# undirected neighbors + parents of X are a clique
found = False
for X in pdag.nodes():
directed_outgoing_edges = set(pdag.successors(X)) - set(
pdag.predecessors(X)
)
undirected_neighbors = set(pdag.successors(X)) & set(
pdag.predecessors(X)
)
neighbors_are_clique = all(
(
pdag.has_edge(Y, Z)
for Z in pdag.predecessors(X)
for Y in undirected_neighbors
if not Y == Z
)
)
if not directed_outgoing_edges and (
not undirected_neighbors or neighbors_are_clique
):
found = True
# add all edges of X as outgoing edges to dag
for Y in pdag.predecessors(X):
dag.add_edge(Y, X)
pdag.remove_node(X)
break
if not found:
logger.warning(
"PDAG has no faithful extension (= no oriented DAG with the "
+ "same v-structures as PDAG). Remaining undirected PDAG edges "
+ "oriented arbitrarily."
)
for X, Y in pdag.edges():
if not dag.has_edge(Y, X):
try:
dag.add_edge(X, Y)
except ValueError:
pass
break
return dag
[docs]
def to_graphviz(self):
"""
Retuns a pygraphviz object for the DAG. pygraphviz is useful for
visualizing the network structure.
Examples
--------
>>> from pgmpy.utils import get_example_model
>>> model = get_example_model('alarm')
>>> model.to_graphviz()
<AGraph <Swig Object of type 'Agraph_t *' at 0x7fdea4cde040>>
"""
return nx.nx_agraph.to_agraph(self)