Source code for pgmpy.estimators.base

#!/usr/bin/env python

from collections import defaultdict

import numpy as np

from pgmpy.factors import FactorDict
from pgmpy.factors.discrete import DiscreteFactor
from pgmpy.inference.ExactInference import BeliefPropagation
from pgmpy.utils import build_state_names, get_state_counts, preprocess_data


[docs] class BaseEstimator: """ Base class for estimators in pgmpy; `ParameterEstimator`, `StructureEstimator` and `StructureScore` derive from this class. Parameters ---------- data: pandas DataFrame object object where each column represents one variable. (If some values in the data are missing the data cells should be set to `numpy.nan`. Note that pandas converts each column containing `numpy.nan`s to dtype `float`.) state_names: dict (optional) A dict indicating, for each variable, the discrete set of states (or values) that the variable can take. If unspecified, the observed values in the data set are taken to be the only possible states. """ def __init__(self, data=None, state_names=None): if data is None: self.data = None self.dtypes = None else: self.data, self.dtypes = preprocess_data(data) # data can be None in the case when learning structure from # independence conditions. Look into PC.py. if self.data is not None: self.variables = list(self.data.columns.values) self.state_names = build_state_names(self.data, state_names=state_names)
[docs] def state_counts( self, variable, parents=[], weighted=False, reindex=True, ): """ Return counts how often each state of 'variable' occurred in the data. If a list of parents is provided, counting is done conditionally for each state configuration of the parents. Parameters ---------- variable: string Name of the variable for which the state count is to be done. parents: list Optional list of variable parents, if conditional counting is desired. Order of parents in list is reflected in the returned DataFrame weighted: bool If True, data must have a `_weight` column specifying the weight of the datapoint (row). If False, each datapoint has a weight of `1`. reindex: bool If True, returns a data frame with all possible parents state combinations as the columns. If False, drops the state combinations which are not present in the data. Returns ------- state_counts: pandas.DataFrame Table with state counts for 'variable' Examples -------- >>> import pandas as pd >>> from pgmpy.estimators import BaseEstimator >>> data = pd.DataFrame( ... data={ ... "A": ["a1", "a1", "a2"], ... "B": ["b1", "b2", "b1"], ... "C": ["c1", "c1", "c2"], ... } ... ) >>> estimator = BaseEstimator(data) >>> estimator.state_counts(variable="A").values array([[2], [1]]) >>> estimator.state_counts(variable="C", parents=["A", "B"]).values array([[1., 1., 0., 0.], [0., 0., 1., 0.]]) """ return get_state_counts( data=self.data, state_names=self.state_names, variable=variable, parents=parents, weighted=weighted, reindex=reindex, )
[docs] class ParameterEstimator(BaseEstimator): """ Base class for parameter estimators in pgmpy. Parameters ---------- model: pgmpy.models.DiscreteBayesianNetwork or pgmpy.models.DiscreteMarkovNetwork model for which parameter estimation is to be done. data: pandas DataFrame object dataframe object with column names identical to the variable names of the model. (If some values in the data are missing the data cells should be set to `numpy.nan`. Note that pandas converts each column containing `numpy.nan`s to dtype `float`.) state_names: dict (optional) A dict indicating, for each variable, the discrete set of states (or values) that the variable can take. If unspecified, the observed values in the data set are taken to be the only possible states. """ def __init__(self, model, data, **kwargs): """ Base class for parameter estimators in pgmpy. Parameters ---------- model: pgmpy.models.DiscreteBayesianNetwork or pgmpy.models.DiscreteMarkovNetwork model for which parameter estimation is to be done. data: pandas DataFrame object dataframe object with column names identical to the variable names of the model. (If some values in the data are missing the data cells should be set to `numpy.nan`. Note that pandas converts each column containing `numpy.nan`s to dtype `float`.) state_names: dict (optional) A dict indicating, for each variable, the discrete set of states (or values) that the variable can take. If unspecified, the observed values in the data set are taken to be the only possible states. complete_samples_only: bool (optional, default `True`) Specifies how to deal with missing data, if present. If set to `True` all rows that contain `np.Nan` somewhere are ignored. If `False` then, for each variable, every row where neither the variable nor its parents are `np.nan` is used. This sets the behavior of the `state_count`-method. """ self.model = model super().__init__(data, **kwargs)
[docs] def state_counts(self, variable, weighted=False, **kwargs): """ Return counts how often each state of 'variable' occurred in the data. If the variable has parents, counting is done conditionally for each state configuration of the parents. Parameters ---------- variable: string Name of the variable for which the state count is to be done. Returns ------- state_counts: pandas.DataFrame Table with state counts for 'variable' Examples -------- >>> import pandas as pd >>> from pgmpy.models import DiscreteBayesianNetwork >>> from pgmpy.estimators import ParameterEstimator >>> model = DiscreteBayesianNetwork([("A", "C"), ("B", "C")]) >>> data = pd.DataFrame( ... data={ ... "A": ["a1", "a1", "a2"], ... "B": ["b1", "b2", "b1"], ... "C": ["c1", "c1", "c2"], ... } ... ) >>> estimator = ParameterEstimator(model, data) >>> estimator.state_counts(variable="A").values array([[2], [1]]) >>> estimator.state_counts(variable="C").values array([[1., 1., 0., 0.], [0., 0., 1., 0.]]) """ parents = sorted(self.model.get_parents(variable)) return super().state_counts(variable, parents=parents, weighted=weighted, **kwargs)
[docs] class StructureEstimator(BaseEstimator): """ Base class for structure estimators in pgmpy. Parameters ---------- data: pandas DataFrame object dataframe object where each column represents one variable. (If some values in the data are missing the data cells should be set to `numpy.nan`. Note that pandas converts each column containing `numpy.nan`s to dtype `float`.) state_names: dict (optional) A dict indicating, for each variable, the discrete set of states (or values) that the variable can take. If unspecified, the observed values in the data set are taken to be the only possible states. """ def __init__(self, data=None, independencies=None, **kwargs): self.independencies = independencies if self.independencies is not None: self.variables = self.independencies.get_all_variables() super().__init__(data=data, **kwargs)
[docs] def estimate(self): pass
[docs] class MarginalEstimator(BaseEstimator): """ Base class for marginal estimators in pgmpy. Parameters ---------- model: DiscreteMarkovNetwork | FactorGraph | JunctionTree A model to optimize, using Belief Propagation and an estimation method. data: pandas DataFrame object dataframe object where each column represents one variable. (If some values in the data are missing the data cells should be set to `numpy.nan`. Note that pandas converts each column containing `numpy.nan`s to dtype `float`.) state_names: dict (optional) A dict indicating, for each variable, the discrete set of states (or values) that the variable can take. If unspecified, the observed values in the data set are taken to be the only possible states. """ def __init__(self, model, data, **kwargs): super().__init__(data, **kwargs) self.belief_propagation = BeliefPropagation(model=model) self.theta = None @staticmethod def _clique_to_marginal(marginals, clique_nodes): """ Construct a minimal mapping from cliques to marginals. Parameters ---------- marginals: FactorDict A mapping from cliques to factors. clique_nodes: List[Tuple[str, ...]] Cliques that exist within a different FactorDict. Returns ------- clique_to_marginal: A mapping from clique to a list of marginals such that each clique is a super set of the marginals it is associated with. """ clique_to_marginal = defaultdict(lambda: []) for marginal_clique, marginal in marginals.items(): for clique in clique_nodes: if set(marginal_clique) <= set(clique): clique_to_marginal[clique].append(marginal) break else: raise ValueError( "Could not find a corresponding clique for" + f" marginal: {marginal_clique}" + f" out of cliques: {clique_nodes}" ) return clique_to_marginal def _marginal_loss(self, marginals, clique_to_marginal, metric): """ Compute the loss and gradient for a given dictionary of clique beliefs. Parameters ---------- marginals: FactorDict A mapping from a clique to an observed marginal represented by a `DiscreteFactor`. clique_to_marginal: Dict[Tuple[str, ...], List[DiscreteFactor]] A mapping from a Junction Tree's clique to a list of corresponding marginals such that a clique is a superset of the marginal with the constraint that each marginal only appears once across all cliques. metric: str One of either 'L1' or 'L2'. Returns ------- Loss and gradient of the loss: Tuple[float, pgmpy.factors.FactorDict.FactorDict] Marginal loss and the gradients of the loss with respect to the estimated beliefs. """ loss = 0.0 gradient = FactorDict({}) for clique, mu in marginals.items(): # Initialize a gradient for this clique as zero. gradient[clique] = mu.identity_factor() * 0 # Iterate over all marginals involving this clique. for y in clique_to_marginal[clique]: # Step 1: Marginalize the clique to the size of `y`. projection_variables = list(set(mu.scope()) - set(y.scope())) mu2 = mu.marginalize( variables=projection_variables, inplace=False, ) if not isinstance(mu2, DiscreteFactor): raise TypeError(f"Expecting a DiscreteFactor but found {type(mu2)}") # Step 2: Compute the difference between the `mu2` and `y`. diff_factor = mu2 + (y * -1) if not diff_factor: raise ValueError("An error occured when calculating the gradient.") diff = diff_factor.values.flatten() # Step 3: Compute the loss and gradient based upon the metric. if metric == "L1": loss += abs(diff).sum() grad = diff.sign() if hasattr(diff, "sign") else np.sign(diff) elif metric == "L2": loss += 0.5 * (diff @ diff) grad = diff else: raise ValueError("Metric must be one of L1 or L2.") # Step 4: Update the gradient from this marginal. gradient[clique] += DiscreteFactor( variables=mu2.scope(), cardinality=mu2.cardinality, values=grad, state_names=mu2.state_names, ) return loss, gradient
[docs] def estimate(self): pass