Source code for pgmpy.models.MarkovChain

#!/usr/bin/env python3
from collections import defaultdict

import numpy as np
from pandas import DataFrame
from scipy.linalg import eig

from pgmpy.factors.discrete import State
from pgmpy.global_vars import logger
from pgmpy.utils import sample_discrete


[docs] class MarkovChain(object): """ Class to represent a Markov Chain with multiple kernels for factored state space, along with methods to simulate a run. Examples -------- Create an empty Markov Chain: >>> from pgmpy.models import MarkovChain as MC >>> model = MC() And then add variables to it >>> model.add_variables_from(['intel', 'diff'], [2, 3]) Or directly create a Markov Chain from a list of variables and their cardinalities >>> model = MC(['intel', 'diff'], [2, 3]) Add transition models >>> intel_tm = {0: {0: 0.25, 1: 0.75}, 1: {0: 0.5, 1: 0.5}} >>> model.add_transition_model('intel', intel_tm) >>> diff_tm = {0: {0: 0.1, 1: 0.5, 2: 0.4}, 1: {0: 0.2, 1: 0.2, 2: 0.6 }, 2: {0: 0.7, 1: 0.15, 2: 0.15}} >>> model.add_transition_model('diff', diff_tm) Set a start state >>> from pgmpy.factors.discrete import State >>> model.set_start_state([State('intel', 0), State('diff', 2)]) Sample from it >>> model.sample(size=5) intel diff 0 0 2 1 1 0 2 0 1 3 1 0 4 0 2 """ def __init__(self, variables=None, card=None, start_state=None): """ Parameters ---------- variables: array-like iterable object A list of variables of the model. card: array-like iterable object A list of cardinalities of the variables. start_state: array-like iterable object List of tuples representing the starting states of the variables. """ if variables is None: variables = [] if card is None: card = [] if not hasattr(variables, "__iter__") or isinstance(variables, str): raise ValueError("variables must be a non-string iterable.") if not hasattr(card, "__iter__") or isinstance(card, str): raise ValueError("card must be a non-string iterable.") self.variables = variables self.cardinalities = {v: c for v, c in zip(variables, card)} self.transition_models = {var: {} for var in variables} if start_state is None or self._check_state(start_state): self.state = start_state
[docs] def set_start_state(self, start_state): """ Set the start state of the Markov Chain. If the start_state is given as an array-like iterable, its contents are reordered in the internal representation. Parameters ---------- start_state: dict or array-like iterable object Dict (or list) of tuples representing the starting states of the variables. Examples -------- >>> from pgmpy.models import MarkovChain as MC >>> from pgmpy.factors.discrete import State >>> model = MC(['a', 'b'], [2, 2]) >>> model.set_start_state([State('a', 0), State('b', 1)]) """ if start_state is not None: if not hasattr(start_state, "__iter__") or isinstance(start_state, str): raise ValueError("start_state must be a non-string iterable.") # Must be an array-like iterable. Reorder according to self.variables. state_dict = {var: st for var, st in start_state} start_state = [State(var, state_dict[var]) for var in self.variables] if start_state is None or self._check_state(start_state): self.state = start_state
def _check_state(self, state): """ Checks if a list representing the state of the variables is valid. """ if not hasattr(state, "__iter__") or isinstance(state, str): raise ValueError("Start state must be a non-string iterable object.") state_vars = {s.var for s in state} if not state_vars == set(self.variables): raise ValueError( f"Start state must represent a complete assignment to all variables." f"Expected variables in state: {state_vars}, Got: {set(self.variables)}." ) for var, val in state: if val >= self.cardinalities[var]: raise ValueError(f"Assignment {val} to {var} invalid.") return True
[docs] def add_variable(self, variable, card=0): """ Add a variable to the model. Parameters ---------- variable: any hashable python object card: int Representing the cardinality of the variable to be added. Examples -------- >>> from pgmpy.models import MarkovChain as MC >>> model = MC() >>> model.add_variable('x', 4) """ if variable not in self.variables: self.variables.append(variable) else: logger.warning(f"Variable {variable} already exists.") self.cardinalities[variable] = card self.transition_models[variable] = {}
[docs] def add_variables_from(self, variables, cards): """ Add several variables to the model at once. Parameters ---------- variables: array-like iterable object List of variables to be added. cards: array-like iterable object List of cardinalities of the variables to be added. Examples -------- >>> from pgmpy.models import MarkovChain as MC >>> model = MC() >>> model.add_variables_from(['x', 'y'], [3, 4]) """ for var, card in zip(variables, cards): self.add_variable(var, card)
[docs] def add_transition_model(self, variable, transition_model): """ Adds a transition model for a particular variable. Parameters ---------- variable: any hashable python object must be an existing variable of the model. transition_model: dict or 2d array dict representing valid transition probabilities defined for every possible state of the variable. array represent a square matrix where every row sums to 1, array[i,j] indicates the transition probalities from State i to State j Examples -------- >>> from pgmpy.models import MarkovChain as MC >>> model = MC() >>> model.add_variable('grade', 3) >>> grade_tm = {0: {0: 0.1, 1: 0.5, 2: 0.4}, 1: {0: 0.2, 1: 0.2, 2: 0.6 }, 2: {0: 0.7, 1: 0.15, 2: 0.15}} >>> grade_tm_matrix = np.array([[0.1, 0.5, 0.4], [0.2, 0.2, 0.6], [0.7, 0.15, 0.15]]) >>> model.add_transition_model('grade', grade_tm) >>> model.add_transition_model('grade', grade_tm_matrix) """ if isinstance(transition_model, list): transition_model = np.array(transition_model, dtype=float) # check if the transition model is valid if not isinstance(transition_model, dict): if not isinstance(transition_model, np.ndarray): raise ValueError("Transition model must be a dict or numpy array") elif len(transition_model.shape) != 2: raise ValueError( f"Transition model must be 2d array.given {transition_model.shape}" ) elif transition_model.shape[0] != transition_model.shape[1]: raise ValueError( f"Dimension mismatch {transition_model.shape[0]}!={transition_model.shape[1]}" ) else: # convert the matrix to dict size = transition_model.shape[0] transition_model = dict( ( i, dict( (j, float(transition_model[i][j])) for j in range(0, size) ), ) for i in range(0, size) ) exp_states = set(range(self.cardinalities[variable])) tm_states = set(transition_model.keys()) if not exp_states == tm_states: raise ValueError( f"Transitions must be defined for all states of variable {variable}. Expected states: {exp_states}, Got: {tm_states}." ) for _, transition in transition_model.items(): if not isinstance(transition, dict): raise ValueError("Each transition must be a dict.") prob_sum = 0 for _, prob in transition.items(): if prob < 0 or prob > 1: raise ValueError( "Transitions must represent valid probability weights." ) prob_sum += prob if not np.allclose(prob_sum, 1): raise ValueError("Transition probabilities must sum to 1.") self.transition_models[variable] = transition_model
[docs] def sample(self, start_state=None, size=1, seed=None): """ Sample from the Markov Chain. Parameters ---------- start_state: dict or array-like iterable Representing the starting states of the variables. If None is passed, a random start_state is chosen. size: int Number of samples to be generated. Returns ------- pandas.DataFrame Examples -------- >>> from pgmpy.models import MarkovChain as MC >>> from pgmpy.factors.discrete import State >>> model = MC(['intel', 'diff'], [2, 3]) >>> model.set_start_state([State('intel', 0), State('diff', 2)]) >>> intel_tm = {0: {0: 0.25, 1: 0.75}, 1: {0: 0.5, 1: 0.5}} >>> model.add_transition_model('intel', intel_tm) >>> diff_tm = {0: {0: 0.1, 1: 0.5, 2: 0.4}, 1: {0: 0.2, 1: 0.2, 2: 0.6 }, 2: {0: 0.7, 1: 0.15, 2: 0.15}} >>> model.add_transition_model('diff', diff_tm) >>> model.sample(size=5) intel diff 0 0 2 1 1 0 2 0 1 3 1 0 4 0 2 """ if start_state is None: if self.state is None: self.state = self.random_state() # else use previously-set state else: self.set_start_state(start_state) sampled = DataFrame(index=range(size), columns=self.variables) sampled.loc[0] = [st for var, st in self.state] var_states = defaultdict(dict) var_values = defaultdict(dict) samples = defaultdict(dict) for var in self.transition_models.keys(): for st in self.transition_models[var]: var_states[var][st] = list(self.transition_models[var][st].keys()) var_values[var][st] = list(self.transition_models[var][st].values()) samples[var][st] = sample_discrete( var_states[var][st], var_values[var][st], size=size, seed=seed ) for i in range(size - 1): for j, (var, st) in enumerate(self.state): next_st = samples[var][st][i] self.state[j] = State(var, next_st) sampled.loc[i + 1] = [st for var, st in self.state] return sampled
[docs] def prob_from_sample(self, state, sample=None, window_size=None): """ Given an instantiation (partial or complete) of the variables of the model, compute the probability of observing it over multiple windows in a given sample. If 'sample' is not passed as an argument, generate the statistic by sampling from the Markov Chain, starting with a random initial state. Examples -------- >>> from pgmpy.models.MarkovChain import MarkovChain as MC >>> from pgmpy.factors.discrete import State >>> model = MC(['intel', 'diff'], [3, 2]) >>> intel_tm = {0: {0: 0.2, 1: 0.4, 2:0.4}, 1: {0: 0, 1: 0.5, 2: 0.5}, 2: {2: 0.5, 1:0.5}} >>> model.add_transition_model('intel', intel_tm) >>> diff_tm = {0: {0: 0.5, 1: 0.5}, 1: {0: 0.25, 1:0.75}} >>> model.add_transition_model('diff', diff_tm) >>> model.prob_from_sample([State('diff', 0)]) array([ 0.27, 0.4 , 0.18, 0.23, ..., 0.29]) """ if sample is None: # generate sample of size 10000 sample = self.sample(self.random_state(), size=10000) if window_size is None: window_size = len(sample) // 100 # default window size is 100 windows = len(sample) // window_size probabilities = np.zeros(windows) for i in range(windows): for j in range(window_size): ind = i * window_size + j state_eq = [sample.loc[ind, v] == s for v, s in state] if all(state_eq): probabilities[i] += 1 return probabilities / window_size
[docs] def generate_sample(self, start_state=None, size=1, seed=None): """ Generator version of self.sample Returns ------- List of State namedtuples, representing the assignment to all variables of the model. Examples -------- >>> from pgmpy.models.MarkovChain import MarkovChain >>> from pgmpy.factors.discrete import State >>> model = MarkovChain() >>> model.add_variables_from(['intel', 'diff'], [3, 2]) >>> intel_tm = {0: {0: 0.2, 1: 0.4, 2:0.4}, 1: {0: 0, 1: 0.5, 2: 0.5}, 2: {0: 0.3, 1: 0.3, 2: 0.4}} >>> model.add_transition_model('intel', intel_tm) >>> diff_tm = {0: {0: 0.5, 1: 0.5}, 1: {0: 0.25, 1:0.75}} >>> model.add_transition_model('diff', diff_tm) >>> gen = model.generate_sample([State('intel', 0), State('diff', 0)], 2) >>> [sample for sample in gen] [[State(var='intel', state=2), State(var='diff', state=1)], [State(var='intel', state=2), State(var='diff', state=0)]] """ if start_state is None: if self.state is None: self.state = self.random_state() # else use previously-set state else: self.set_start_state(start_state) # sampled.loc[0] = [self.state[var] for var in self.variables] for i in range(size): for j, (var, st) in enumerate(self.state): next_st = sample_discrete( list(self.transition_models[var][st].keys()), list(self.transition_models[var][st].values()), seed=seed, )[0] self.state[j] = State(var, next_st) yield self.state[:]
[docs] def is_stationarity(self, tolerance=0.2, sample=None): """ Checks if the given markov chain is stationary and checks the steady state probability values for the state are consistent. Parameters ---------- tolerance: float represents the diff between actual steady state value and the computed value sample: [State(i,j)] represents the list of state which the markov chain has sampled Returns ------- Boolean: True, if the markov chain converges to steady state distribution within the tolerance False, if the markov chain does not converge to steady state distribution within tolerance Examples -------- >>> from pgmpy.models.MarkovChain import MarkovChain >>> from pgmpy.factors.discrete import State >>> model = MarkovChain() >>> model.add_variables_from(['intel', 'diff'], [3, 2]) >>> intel_tm = {0: {0: 0.2, 1: 0.4, 2:0.4}, 1: {0: 0, 1: 0.5, 2: 0.5}, 2: {0: 0.3, 1: 0.3, 2: 0.4}} >>> model.add_transition_model('intel', intel_tm) >>> diff_tm = {0: {0: 0.5, 1: 0.5}, 1: {0: 0.25, 1:0.75}} >>> model.add_transition_model('diff', diff_tm) >>> model.is_stationarity() True """ keys = self.transition_models.keys() return_val = True for k in keys: # convert dict to numpy matrix transition_mat = np.array( [ np.array(list(self.transition_models[k][i].values())) for i in self.transition_models[k].keys() ], dtype=float, ) S, U = eig(transition_mat.T) stationary = np.array(U[:, np.where(np.abs(S - 1.0) < 1e-8)[0][0]].flat) stationary = (stationary / np.sum(stationary)).real probabilities = [] window_size = 10000 if sample is None else len(sample) for i in range(0, transition_mat.shape[0]): probabilities.extend( self.prob_from_sample([State(k, i)], window_size=window_size) ) if any( np.abs(i) > tolerance for i in np.subtract(probabilities, stationary) ): return_val = return_val and False else: return_val = return_val and True return return_val
[docs] def random_state(self): """ Generates a random state of the Markov Chain. Returns ------- List of namedtuples, representing a random assignment to all variables of the model. Examples -------- >>> from pgmpy.models import MarkovChain as MC >>> model = MC(['intel', 'diff'], [2, 3]) >>> model.random_state() [State(var='diff', state=2), State(var='intel', state=1)] """ return [ State(var, np.random.randint(self.cardinalities[var])) for var in self.variables ]
[docs] def copy(self): """ Returns a copy of Markov Chain Model. Returns ------- MarkovChain : Copy of MarkovChain. Examples -------- >>> from pgmpy.models import MarkovChain >>> from pgmpy.factors.discrete import State >>> model = MarkovChain() >>> model.add_variables_from(['intel', 'diff'], [3, 2]) >>> intel_tm = {0: {0: 0.2, 1: 0.4, 2:0.4}, 1: {0: 0, 1: 0.5, 2: 0.5}, 2: {0: 0.3, 1: 0.3, 2: 0.4}} >>> model.add_transition_model('intel', intel_tm) >>> diff_tm = {0: {0: 0.5, 1: 0.5}, 1: {0: 0.25, 1:0.75}} >>> model.add_transition_model('diff', diff_tm) >>> model.set_start_state([State('intel', 0), State('diff', 1)]) >>> model_copy = model.copy() >>> model_copy.transition_models >>> {'diff': {0: {0: 0.1, 1: 0.5, 2: 0.4}, 1: {0: 0.2, 1: 0.2, 2: 0.6}, 2: {0: 0.7, 1: 0.15, 2: 0.15}}, ... 'intel': {0: {0: 0.25, 1: 0.75}, 1: {0: 0.5, 1: 0.5}}} """ markovchain_copy = MarkovChain( variables=list(self.cardinalities.keys()), card=list(self.cardinalities.values()), start_state=self.state, ) if self.transition_models: markovchain_copy.transition_models = self.transition_models.copy() return markovchain_copy