Source code for pgmpy.estimators.EM

from itertools import chain, product
from math import log

import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from tqdm.auto import tqdm

from pgmpy import config
from pgmpy.estimators import MaximumLikelihoodEstimator, ParameterEstimator
from pgmpy.factors.discrete import TabularCPD
from pgmpy.models import BayesianNetwork


[docs]class ExpectationMaximization(ParameterEstimator): """ Class used to compute parameters for a model using Expectation Maximization (EM). EM is an iterative algorithm commonly used for estimation in the case when there are latent variables in the model. The algorithm iteratively improves the parameter estimates maximizing the likelihood of the given data. Parameters ---------- model: A pgmpy.models.BayesianNetwork instance data: pandas DataFrame object DataFrame object with column names identical to the variable names of the network. (If some values in the data are missing the data cells should be set to `numpy.NaN`. Note that pandas converts each column containing `numpy.NaN`s to dtype `float`.) state_names: dict (optional) A dict indicating, for each variable, the discrete set of states that the variable can take. If unspecified, the observed values in the data set are taken to be the only possible states. Examples -------- >>> import numpy as np >>> import pandas as pd >>> from pgmpy.models import BayesianNetwork >>> from pgmpy.estimators import ExpectationMaximization >>> data = pd.DataFrame(np.random.randint(low=0, high=2, size=(1000, 5)), ... columns=['A', 'B', 'C', 'D', 'E']) >>> model = BayesianNetwork([('A', 'B'), ('C', 'B'), ('C', 'D'), ('B', 'E')]) >>> estimator = ExpectationMaximization(model, data) """ def __init__(self, model, data, **kwargs): if not isinstance(model, BayesianNetwork): raise NotImplementedError( "Expectation Maximization is only implemented for BayesianNetwork" ) super(ExpectationMaximization, self).__init__(model, data, **kwargs) self.model_copy = self.model.copy() def _get_log_likelihood(self, datapoint): """ Computes the likelihood of a given datapoint. Goes through each CPD matching the combination of states to get the value and multiplies them together. """ likelihood = 0 for cpd in self.model_copy.cpds: scope = set(cpd.scope()) likelihood += log( max( cpd.get_value( **{ key: value for key, value in datapoint.items() if key in scope } ), 1e-10, ) ) return likelihood def _parallel_compute_weights( self, data_unique, latent_card, n_counts, offset, batch_size ): cache = [] for i in range(offset, min(offset + batch_size, data_unique.shape[0])): v = list(product(*[range(card) for card in latent_card.values()])) latent_combinations = np.array(v, dtype=int) df = data_unique.iloc[[i] * latent_combinations.shape[0]].reset_index( drop=True ) for index, latent_var in enumerate(latent_card.keys()): df[latent_var] = latent_combinations[:, index] weights = np.e ** ( df.apply(lambda t: self._get_log_likelihood(dict(t)), axis=1) ) df["_weight"] = (weights / weights.sum()) * n_counts[ tuple(data_unique.iloc[i]) ] cache.append(df) return pd.concat(cache, copy=False) def _compute_weights(self, n_jobs, latent_card, batch_size): """ For each data point, creates extra data points for each possible combination of states of latent variables and assigns weights to each of them. """ data_unique = self.data.drop_duplicates() n_counts = self.data.groupby(list(self.data.columns)).size().to_dict() cache = Parallel(n_jobs=n_jobs)( delayed(self._parallel_compute_weights)( data_unique, latent_card, n_counts, i, batch_size ) for i in range(0, data_unique.shape[0], batch_size) ) return pd.concat(cache, copy=False) def _is_converged(self, new_cpds, atol=1e-08): """ Checks if the values of `new_cpds` is within tolerance limits of current model cpds. """ for cpd in new_cpds: if not cpd.__eq__(self.model_copy.get_cpds(node=cpd.scope()[0]), atol=atol): return False return True
[docs] def get_parameters( self, latent_card=None, max_iter=100, atol=1e-08, n_jobs=1, batch_size=1000, seed=None, init_cpds={}, show_progress=True, ): """ Method to estimate all model parameters (CPDs) using Expecation Maximization. Parameters ---------- latent_card: dict (default: None) A dictionary of the form {latent_var: cardinality} specifying the cardinality (number of states) of each latent variable. If None, assumes `2` states for each latent variable. max_iter: int (default: 100) The maximum number of iterations the algorithm is allowed to run for. If max_iter is reached, return the last value of parameters. atol: int (default: 1e-08) The absolute accepted tolerance for checking convergence. If the parameters change is less than atol in an iteration, the algorithm will exit. n_jobs: int (default: 1) Number of jobs to run in parallel. Using n_jobs > 1 for small models or datasets might be slower. batch_size: int (default: 1000) Number of data used to compute weights in a batch. seed: int The random seed to use for generating the intial values. init_cpds: dict A dictionary of the form {variable: instance of TabularCPD} specifying the initial CPD values for the EM optimizer to start with. If not specified, CPDs involving latent variables are initialized randomly, and CPDs involving only observed variables are initialized with their MLE estimates. show_progress: boolean (default: True) Whether to show a progress bar for iterations. Returns ------- Estimated paramters (CPDs): list A list of estimated CPDs for the model. Examples -------- >>> import numpy as np >>> import pandas as pd >>> from pgmpy.models import BayesianNetwork >>> from pgmpy.estimators import ExpectationMaximization as EM >>> data = pd.DataFrame(np.random.randint(low=0, high=2, size=(1000, 3)), ... columns=['A', 'C', 'D']) >>> model = BayesianNetwork([('A', 'B'), ('C', 'B'), ('C', 'D')], latents={'B'}) >>> estimator = EM(model, data) >>> estimator.get_parameters(latent_card={'B': 3}) [<TabularCPD representing P(C:2) at 0x7f7b534251d0>, <TabularCPD representing P(B:3 | C:2, A:2) at 0x7f7b4dfd4da0>, <TabularCPD representing P(A:2) at 0x7f7b4dfd4fd0>, <TabularCPD representing P(D:2 | C:2) at 0x7f7b4df822b0>] """ # Step 1: Parameter checks if latent_card is None: latent_card = {var: 2 for var in self.model_copy.latents} # Step 2: Create structures/variables to be used later. n_states_dict = {key: len(value) for key, value in self.state_names.items()} n_states_dict.update(latent_card) for var in self.model_copy.latents: self.state_names[var] = list(range(n_states_dict[var])) # Step 3: Initialize CPDs. # Step 3.1: Learn the CPDs of variables which don't involve # latent variables using MLE if their init_cpd is # not specified. fixed_cpds = [] fixed_cpd_vars = ( set(self.model.nodes()) - self.model.latents - set(chain(*[self.model.get_children(var) for var in self.model.latents])) - set(init_cpds.keys()) ) mle = MaximumLikelihoodEstimator.__new__(MaximumLikelihoodEstimator) mle.model = self.model mle.data = self.data mle.state_names = self.state_names for var in fixed_cpd_vars: fixed_cpds.append(mle.estimate_cpd(var)) # Step 3.2: Randomly initialize the CPDs involving latent variables if init_cpds is not specified. latent_cpds = [] vars_with_latents = ( set(self.model_copy.nodes()) - fixed_cpd_vars - set(init_cpds.keys()) ) for node in vars_with_latents: parents = list(self.model_copy.predecessors(node)) latent_cpds.append( TabularCPD.get_random( variable=node, evidence=parents, cardinality={ var: n_states_dict[var] for var in chain([node], parents) }, state_names={ var: self.state_names[var] for var in chain([node], parents) }, seed=seed, ) ) self.model_copy.add_cpds( *list(chain(fixed_cpds, latent_cpds, list(init_cpds.values()))) ) if show_progress and config.SHOW_PROGRESS: pbar = tqdm(total=max_iter) mle.model = self.model_copy # Step 4: Run the EM algorithm. for _ in range(max_iter): # Step 4.1: E-step: Expands the dataset and computes the likelihood of each # possible state of latent variables. weighted_data = self._compute_weights(n_jobs, latent_card, batch_size) # Step 4.2: M-step: Uses the weights of the dataset to do a weighted MLE. new_cpds = fixed_cpds.copy() mle.data = weighted_data for var in vars_with_latents.union(set(init_cpds.keys())): new_cpds.append(mle.estimate_cpd(var, weighted=True)) # Step 4.3: Check of convergence and max_iter if self._is_converged(new_cpds, atol=atol): if show_progress and config.SHOW_PROGRESS: pbar.close() return new_cpds else: self.model_copy.cpds = new_cpds if show_progress and config.SHOW_PROGRESS: pbar.update(1) return new_cpds