Source code for pgmpy.structure_score.log_likelihood_cond_gauss

import numpy as np
import pandas as pd
from scipy.stats import multivariate_normal

from pgmpy.structure_score._base import BaseStructureScore


[docs] class LogLikelihoodCondGauss(BaseStructureScore): r""" Log-likelihood score for Bayesian networks with mixed discrete and continuous variables. This score is based on conditional Gaussian distributions [1]_ and supports local families with both discrete and continuous variables. For a continuous target :math:`C_1` with continuous parents :math:`C_2` and discrete parents :math:`D`, it computes .. math:: \ell(C_1 \mid C_2, D) = \sum_{t=1}^{n} \log \frac{p(c_{1t}, c_{2t} \mid d_t)}{p(c_{2t} \mid d_t)}. For a discrete target :math:`D_1` with continuous parents :math:`C` and discrete parents :math:`D_2`, it computes .. math:: \ell(D_1 \mid C, D_2) = \sum_{t=1}^{n} \log \frac{p(c_t \mid d_{1t}, d_{2t}) p(d_{1t}, d_{2t})} {p(c_t \mid d_{2t}) p(d_{2t})}. The Gaussian densities are estimated from the corresponding grouped samples. Parameters ---------- data : pandas.DataFrame DataFrame where columns may be discrete or continuous variables. state_names : dict, optional Dictionary mapping discrete variable names to their possible states. Examples -------- >>> import numpy as np >>> import pandas as pd >>> from pgmpy.structure_score import LogLikelihoodCondGauss >>> rng = np.random.default_rng(0) >>> data = pd.DataFrame( ... { ... "A": rng.normal(size=100), ... "B": rng.integers(0, 2, size=100), ... "C": rng.normal(size=100), ... } ... ) >>> score = LogLikelihoodCondGauss(data) >>> round(score.local_score("A", ("B", "C")), 3) np.float64(-137.319) Raises ------ ValueError If the data or variable types are not suitable for conditional Gaussian modeling. References ---------- .. [1] Andrews, B., Ramsey, J., & Cooper, G. F. (2018). Scoring Bayesian Networks of Mixed Variables. International Journal of Data Science and Analytics, 6(1), 3-18. https://doi.org/10.1007/s41060-017-0085-7 """ _tags = { "name": "ll-cg", "supported_datatype": "mixed", "default_for": None, "is_parameteric": False, } def __init__(self, data, state_names=None): super().__init__(data, state_names=state_names) @staticmethod def _adjusted_cov(df: pd.DataFrame) -> pd.DataFrame: if (df.shape[0] == 1) or (df.shape[0] < len(df.columns)): return pd.DataFrame(np.eye(len(df.columns)), index=df.columns, columns=df.columns) df_cov = df.cov() if np.any(np.isclose(np.linalg.eig(df_cov)[0], 0)): df_cov = df_cov + 1e-6 return df_cov def _cat_parents_product(self, parents: tuple[str, ...]) -> int: k = 1 for pa in parents: if self.dtypes[pa] != "N": n_states = self.data[pa].nunique() if n_states > 1: k *= self.data[pa].nunique() return k def _get_num_parameters(self, variable: str, parents: tuple[str, ...]) -> int: parent_dtypes = [self.dtypes[pa] for pa in parents] n_cont_parents = parent_dtypes.count("N") if self.dtypes[variable] == "N": k = self._cat_parents_product(parents=parents) * (n_cont_parents + 2) else: if n_cont_parents == 0: k = self._cat_parents_product(parents=parents) * (self.data[variable].nunique() - 1) else: k = ( self._cat_parents_product(parents=parents) * (self.data[variable].nunique() - 1) * (n_cont_parents + 2) ) return k def _log_likelihood(self, variable: str, parents: tuple[str, ...]) -> float: parent_list = list(parents) df = self.data.loc[:, [variable] + parent_list] if self.dtypes[variable] == "N": c1 = variable c2 = [var for var in parents if self.dtypes[var] == "N"] d = list(set(parents) - set(c2)) if len(d) == 0: if len(c2) == 0: p_c1c2_d = multivariate_normal.pdf( x=df, mean=df.mean(axis=0), cov=LogLikelihoodCondGauss._adjusted_cov(df), allow_singular=True, ) return np.sum(np.log(p_c1c2_d)) else: p_c1c2_d = multivariate_normal.pdf( x=df, mean=df.mean(axis=0), cov=LogLikelihoodCondGauss._adjusted_cov(df), allow_singular=True, ) df_c2 = df.loc[:, c2] p_c2_d = np.maximum( 1e-8, multivariate_normal.pdf( x=df_c2, mean=df_c2.mean(axis=0), cov=LogLikelihoodCondGauss._adjusted_cov(df_c2), allow_singular=True, ), ) return np.sum(np.log(p_c1c2_d / p_c2_d)) else: log_like = 0 for d_states, df_d in df.groupby(d, observed=True): p_c1c2_d = multivariate_normal.pdf( x=df_d.loc[:, [c1] + c2], mean=df_d.loc[:, [c1] + c2].mean(axis=0), cov=LogLikelihoodCondGauss._adjusted_cov(df_d.loc[:, [c1] + c2]), allow_singular=True, ) if len(c2) == 0: p_c2_d = 1 else: p_c2_d = np.maximum( 1e-8, multivariate_normal.pdf( x=df_d.loc[:, c2], mean=df_d.loc[:, c2].mean(axis=0), cov=LogLikelihoodCondGauss._adjusted_cov(df_d.loc[:, c2]), allow_singular=True, ), ) log_like += np.sum(np.log(p_c1c2_d / p_c2_d)) return log_like else: d1 = variable c = [var for var in parents if self.dtypes[var] == "N"] d2 = list(set(parents) - set(c)) log_like = 0 for d_states, df_d1d2 in df.groupby([d1] + d2, observed=True): if len(c) == 0: p_c_d1d2 = 1 else: p_c_d1d2 = multivariate_normal.pdf( x=df_d1d2.loc[:, c], mean=df_d1d2.loc[:, c].mean(axis=0), cov=LogLikelihoodCondGauss._adjusted_cov(df_d1d2.loc[:, c]), allow_singular=True, ) p_d1d2 = np.repeat(df_d1d2.shape[0] / df.shape[0], df_d1d2.shape[0]) if len(d2) == 0: if len(c) == 0: p_c_d2 = 1 else: p_c_d2 = np.maximum( 1e-8, multivariate_normal.pdf( x=df_d1d2.loc[:, c], mean=df.loc[:, c].mean(axis=0), cov=LogLikelihoodCondGauss._adjusted_cov(df.loc[:, c]), allow_singular=True, ), ) log_like += np.sum(np.log(p_c_d1d2 * p_d1d2 / p_c_d2)) else: if len(c) == 0: p_c_d2 = 1 else: df_d2 = df for var, state in zip(d2, d_states[1:]): df_d2 = df_d2.loc[df_d2[var] == state] p_c_d2 = np.maximum( 1e-8, multivariate_normal.pdf( x=df_d1d2.loc[:, c], mean=df_d2.loc[:, c].mean(axis=0), cov=LogLikelihoodCondGauss._adjusted_cov(df_d2.loc[:, c]), allow_singular=True, ), ) p_d2 = df.groupby(d2, observed=True).count() / df.shape[0] for var, value in zip(d2, d_states[1:]): p_d2 = p_d2.loc[p_d2.index.get_level_values(var) == value] log_like += np.sum(np.log((p_c_d1d2 * p_d1d2) / (p_c_d2 * p_d2.values.ravel()[0]))) return log_like def _local_score(self, variable: str, parents: tuple[str, ...]) -> float: ll = self._log_likelihood(variable=variable, parents=parents) return ll