Source code for pgmpy.estimators.BayesianEstimator

# -*- coding: utf-8 -*-

import numbers
from itertools import chain
from warnings import warn

import numpy as np
from joblib import Parallel, delayed

from pgmpy.estimators import ParameterEstimator
from pgmpy.factors.discrete import TabularCPD
from pgmpy.models import BayesianNetwork


[docs]class BayesianEstimator(ParameterEstimator): def __init__(self, model, data, **kwargs): """ Class used to compute parameters for a model using Bayesian Parameter Estimation. See `MaximumLikelihoodEstimator` for constructor parameters. """ if not isinstance(model, BayesianNetwork): raise NotImplementedError( "Bayesian Parameter Estimation is only implemented for BayesianNetwork" ) elif len(model.latents) != 0: raise ValueError( f"Bayesian Parameter Estimation works only on models with all observed variables. Found latent variables: {model.latents}" ) super(BayesianEstimator, self).__init__(model, data, **kwargs)
[docs] def get_parameters( self, prior_type="BDeu", equivalent_sample_size=5, pseudo_counts=None, n_jobs=-1, weighted=False, ): """ Method to estimate the model parameters (CPDs). Parameters ---------- prior_type: 'dirichlet', 'BDeu', or 'K2' string indicting which type of prior to use for the model parameters. - If 'prior_type' is 'dirichlet', the following must be provided: 'pseudo_counts' = dirichlet hyperparameters; a single number or a dict containing, for each variable, a 2-D array of the shape (node_card, product of parents_card) with a "virtual" count for each variable state in the CPD, that is added to the state counts. (lexicographic ordering of states assumed) - If 'prior_type' is 'BDeu', then an 'equivalent_sample_size' must be specified instead of 'pseudo_counts'. This is equivalent to 'prior_type=dirichlet' and using uniform 'pseudo_counts' of `equivalent_sample_size/(node_cardinality*np.prod(parents_cardinalities))` for each node. 'equivalent_sample_size' can either be a numerical value or a dict that specifies the size for each variable separately. - A prior_type of 'K2' is a shorthand for 'dirichlet' + setting every pseudo_count to 1, regardless of the cardinality of the variable. weighted: bool If weighted=True, the data must contain a `_weight` column specifying the weight of each datapoint (row). If False, assigns an equal weight to each datapoint. Returns ------- parameters: list List of TabularCPDs, one for each variable of the model Examples -------- >>> import numpy as np >>> import pandas as pd >>> from pgmpy.models import BayesianNetwork >>> from pgmpy.estimators import BayesianEstimator >>> values = pd.DataFrame(np.random.randint(low=0, high=2, size=(1000, 4)), ... columns=['A', 'B', 'C', 'D']) >>> model = BayesianNetwork([('A', 'B'), ('C', 'B'), ('C', 'D')]) >>> estimator = BayesianEstimator(model, values) >>> estimator.get_parameters(prior_type='BDeu', equivalent_sample_size=5) [<TabularCPD representing P(C:2) at 0x7f7b534251d0>, <TabularCPD representing P(B:2 | C:2, A:2) at 0x7f7b4dfd4da0>, <TabularCPD representing P(A:2) at 0x7f7b4dfd4fd0>, <TabularCPD representing P(D:2 | C:2) at 0x7f7b4df822b0>] """ def _get_node_param(node): _equivalent_sample_size = ( equivalent_sample_size[node] if isinstance(equivalent_sample_size, dict) else equivalent_sample_size ) if isinstance(pseudo_counts, numbers.Real): _pseudo_counts = pseudo_counts else: _pseudo_counts = pseudo_counts[node] if pseudo_counts else None cpd = self.estimate_cpd( node, prior_type=prior_type, equivalent_sample_size=_equivalent_sample_size, pseudo_counts=_pseudo_counts, weighted=weighted, ) return cpd parameters = Parallel(n_jobs=n_jobs, prefer="threads")( delayed(_get_node_param)(node) for node in self.model.nodes() ) return parameters
[docs] def estimate_cpd( self, node, prior_type="BDeu", pseudo_counts=[], equivalent_sample_size=5, weighted=False, ): """ Method to estimate the CPD for a given variable. Parameters ---------- node: int, string (any hashable python object) The name of the variable for which the CPD is to be estimated. prior_type: 'dirichlet', 'BDeu', 'K2', string indicting which type of prior to use for the model parameters. - If 'prior_type' is 'dirichlet', the following must be provided: 'pseudo_counts' = dirichlet hyperparameters; a single number or 2-D array of shape (node_card, product of parents_card) with a "virtual" count for each variable state in the CPD. The virtual counts are added to the actual state counts found in the data. (if a list is provided, a lexicographic ordering of states is assumed) - If 'prior_type' is 'BDeu', then an 'equivalent_sample_size' must be specified instead of 'pseudo_counts'. This is equivalent to 'prior_type=dirichlet' and using uniform 'pseudo_counts' of `equivalent_sample_size/(node_cardinality*np.prod(parents_cardinalities))`. - A prior_type of 'K2' is a shorthand for 'dirichlet' + setting every pseudo_count to 1, regardless of the cardinality of the variable. weighted: bool If weighted=True, the data must contain a `_weight` column specifying the weight of each datapoint (row). If False, assigns an equal weight to each datapoint. Returns ------- CPD: TabularCPD The estimated CPD for `node`. Examples -------- >>> import pandas as pd >>> from pgmpy.models import BayesianNetwork >>> from pgmpy.estimators import BayesianEstimator >>> data = pd.DataFrame(data={'A': [0, 0, 1], 'B': [0, 1, 0], 'C': [1, 1, 0]}) >>> model = BayesianNetwork([('A', 'C'), ('B', 'C')]) >>> estimator = BayesianEstimator(model, data) >>> cpd_C = estimator.estimate_cpd('C', prior_type="dirichlet", ... pseudo_counts=[[1, 1, 1, 1], ... [2, 2, 2, 2]]) >>> print(cpd_C) ╒══════╤══════╤══════╤══════╤════════════════════╕ │ A │ A(0) │ A(0) │ A(1) │ A(1) │ ├──────┼──────┼──────┼──────┼────────────────────┤ │ B │ B(0) │ B(1) │ B(0) │ B(1) │ ├──────┼──────┼──────┼──────┼────────────────────┤ │ C(0) │ 0.25 │ 0.25 │ 0.5 │ 0.3333333333333333 │ ├──────┼──────┼──────┼──────┼────────────────────┤ │ C(1) │ 0.75 │ 0.75 │ 0.5 │ 0.6666666666666666 │ ╘══════╧══════╧══════╧══════╧════════════════════╛ """ node_cardinality = len(self.state_names[node]) parents = sorted(self.model.get_parents(node)) parents_cardinalities = [len(self.state_names[parent]) for parent in parents] cpd_shape = (node_cardinality, np.prod(parents_cardinalities, dtype=int)) prior_type = prior_type.lower() # Throw a warning if pseudo_count is specified without prior_type=dirichlet # cast to np.array first to use the array.size attribute, which returns 0 also for [[],[]] # (where len([[],[]]) evaluates to 2) if ( pseudo_counts is not None and np.array(pseudo_counts).size > 0 and (prior_type != "dirichlet") ): warn( f"pseudo count specified with {prior_type} prior. It will be ignored, use dirichlet prior for specifying pseudo_counts" ) if prior_type == "k2": pseudo_counts = np.ones(cpd_shape, dtype=int) elif prior_type == "bdeu": alpha = float(equivalent_sample_size) / ( node_cardinality * np.prod(parents_cardinalities) ) pseudo_counts = np.ones(cpd_shape, dtype=float) * alpha elif prior_type == "dirichlet": if isinstance(pseudo_counts, numbers.Real): pseudo_counts = np.ones(cpd_shape, dtype=int) * pseudo_counts else: pseudo_counts = np.array(pseudo_counts) if pseudo_counts.shape != cpd_shape: raise ValueError( f"The shape of pseudo_counts for the node: {node} must be of shape: {str(cpd_shape)}" ) else: raise ValueError("'prior_type' not specified") state_counts = self.state_counts(node, weighted=weighted) bayesian_counts = state_counts + pseudo_counts cpd = TabularCPD( node, node_cardinality, np.array(bayesian_counts), evidence=parents, evidence_card=parents_cardinalities, state_names={var: self.state_names[var] for var in chain([node], parents)}, ) cpd.normalize() return cpd