Source code for pgmpy.models.FunctionalBayesianNetwork

import networkx as nx
import numpy as np
import pandas as pd
import pyro
import torch

from pgmpy import config
from pgmpy.factors.hybrid import FunctionalCPD
from pgmpy.global_vars import logger
from pgmpy.models import DiscreteBayesianNetwork


[docs] class FunctionalBayesianNetwork(DiscreteBayesianNetwork): """ Class for representing Functional Bayesian Network. Functional Bayesian Networks allow for representation of any probability distribution using CPDs in functional form (Functional CPD). Functional CPDs return a pyro.distribution object allowing for flexible representation of any distribution. """ def __init__(self, ebunch=None, latents=set(), lavaan_str=None, dagitty_str=None): """ Initializes a FunctionalBayesianNetwork. Parameters ---------- ebunch: list List of edges to build the Bayesian Network. Each edge should be a tuple (u, v) where u, v are nodes representing the edge u -> v. Examples -------- >>> from pgmpy.models import FunctionalBayesianNetwork >>> model = FunctionalBayesianNetwork([("x1", "x2"), ("x2", "x3")]) """ if config.get_backend() == "numpy": logger.info("Functional BN requires pytorch backend. Switching.") config.set_backend("torch") super(FunctionalBayesianNetwork, self).__init__( ebunch=ebunch, latents=latents, lavaan_str=lavaan_str, dagitty_str=dagitty_str, )
[docs] def add_cpds(self, *cpds): """ Adds FunctionalCPDs to the Bayesian Network. Parameters ---------- cpds: instances of FunctionalCPD List of FunctionalCPDs which will be associated with the model Examples -------- >>> from pgmpy.factors.hybrid import FunctionalCPD >>> from pgmpy.models import FunctionalBayesianNetwork >>> import pyro.distributions as dist >>> import numpy as np >>> model = FunctionalBayesianNetwork([("x1", "x2"), ("x2", "x3")]) >>> cpd1 = FunctionalCPD("x1", lambda _: dist.Normal(0, 1)) >>> cpd2 = FunctionalCPD("x2", lambda parent: dist.Normal(parent["x1"] + 2.0, 1), parents=["x1"]) >>> cpd3 = FunctionalCPD("x3", lambda parent: dist.Normal(parent["x2"] + 0.3, 2), parents=["x2"]) >>> model.add_cpds(cpd1, cpd2, cpd3) """ for cpd in cpds: if not isinstance(cpd, FunctionalCPD): raise ValueError( "Only FunctionalCPD can be added to Functional Bayesian Network." ) if set(cpd.variables) - set(cpd.variables).intersection(set(self.nodes())): raise ValueError(f"CPD defined on variable not in the model: {cpd}") for prev_cpd_index in range(len(self.cpds)): if self.cpds[prev_cpd_index].variable == cpd.variable: logger.warning(f"Replacing existing CPD for {cpd.variable}") self.cpds[prev_cpd_index] = cpd break else: self.cpds.append(cpd)
[docs] def get_cpds(self, node=None): """ Returns the cpd of the node. If node is not specified returns all the CPDs that have been added till now to the graph Parameter --------- node: any hashable python object (optional) The node whose CPD we want. If node not specified returns all the CPDs added to the model. Returns ------- A list of Functional CPDs. Examples -------- >>> from pgmpy.factors.hybrid import FunctionalCPD >>> from pgmpy.models import FunctionalBayesianNetwork >>> import numpy as np >>> import pyro.distributions as dist >>> model = FunctionalBayesianNetwork([("x1", "x2"), ("x2", "x3")]) >>> cpd1 = FunctionalCPD("x1", lambda _: dist.Normal(0, 1)) >>> cpd2 = FunctionalCPD("x2", lambda parent: dist.Normal(parent["x1"] + 2.0, 1), parents=["x1"]) >>> cpd3 = FunctionalCPD("x3", lambda parent: dist.Normal(parent["x2"] + 0.3, 2), parents=["x2"]) >>> model.add_cpds(cpd1, cpd2, cpd3) >>> model.get_cpds() """ return super(FunctionalBayesianNetwork, self).get_cpds(node)
[docs] def remove_cpds(self, *cpds): """ Removes the given `cpds` from the model. Parameters ---------- *cpds: FunctionalCPD objects A list of FunctionalCPD objects that need to be removed from the model. Examples -------- >>> from pgmpy.factors.hybrid import FunctionalCPD >>> from pgmpy.models import FunctionalBayesianNetwork >>> import numpy as np >>> import pyro.distributions as dist >>> model = FunctionalBayesianNetwork([("x1", "x2"), ("x2", "x3")]) >>> cpd1 = FunctionalCPD("x1", lambda _: dist.Normal(0, 1)) >>> cpd2 = FunctionalCPD("x2", lambda parent: dist.Normal(parent["x1"] + 2.0, 1), parents=["x1"]) >>> cpd3 = FunctionalCPD("x3", lambda parent: dist.Normal(parent["x2"] + 0.3, 2), parents=["x2"]) >>> model.add_cpds(cpd1, cpd2, cpd3) >>> for cpd in model.get_cpds(): ... print(cpd) >>> model.remove_cpds(cpd2, cpd3) >>> for cpd in model.get_cpds(): ... print(cpd) """ return super(FunctionalBayesianNetwork, self).remove_cpds(*cpds)
[docs] def check_model(self): """ Checks the model for various errors. This method checks for the following error - * Checks if the CPDs associated with nodes are consistent with their parents. Returns ------- check: boolean True if all the checks pass. """ for node in self.nodes(): cpd = self.get_cpds(node=node) if isinstance(cpd, FunctionalCPD): if set(cpd.parents) != set(self.get_parents(node)): raise ValueError( f"CPD associated with {node} doesn't have proper parents associated with it." ) return True
[docs] def simulate(self, n_samples=1000, seed=None): """ Simulate samples from the model. Parameters ---------- n_samples : int, optional (default: 1000) Number of samples to generate seed : int, optional The seed value for the random number generator. Returns ------- pandas.DataFrame Simulated samples with columns corresponding to network variables Examples -------- >>> from pgmpy.factors.hybrid import FunctionalCPD >>> from pgmpy.models import FunctionalBayesianNetwork >>> import numpy as np >>> import pyro.distributions as dist >>> model = FunctionalBayesianNetwork([("x1", "x2"), ("x2", "x3")]) >>> cpd1 = FunctionalCPD("x1", lambda _: dist.Normal(0, 1)) >>> cpd2 = FunctionalCPD("x2", lambda parent: dist.Normal(parent["x1"] + 2.0, 1), parents=["x1"]) >>> cpd3 = FunctionalCPD("x3", lambda parent: dist.Normal(parent["x2"] + 0.3, 2), parents=["x2"]) >>> model.add_cpds(cpd1, cpd2, cpd3) >>> model.simulate(n_samples=1000) """ if seed is not None: pyro.set_rng_seed(seed) nodes = list(nx.topological_sort(self)) samples = pd.DataFrame(index=range(n_samples)) for node in nodes: cpd = self.get_cpds(node) parent_samples = samples[cpd.parents] if cpd.parents else None samples[node] = cpd.sample( n_samples=n_samples, parent_sample=parent_samples ) return samples
[docs] def fit( self, data, method="SVI", optimizer=pyro.optim.Adam({"lr": 1e-2}), prior_fn=None, num_steps=1000, seed=None, nuts_kwargs=None, mcmc_kwargs=None, ): """ Fit the Bayesian network to data using Pyro's stochastic variational inference. Parameters ---------- data: pandas.DataFrame DataFrame with observations of variables. method: str (default: "SVI") Fitting method to use. Currently supports "SVI" and "MCMC". optimizer: Instance of pyro optimizer (default: pyro.optim.Adam({"lr": 1e-2})) Only used if method is "SVI". The optimizer to use for optimization. prior_fn: function Only used if method is "MCMC". A function that returns a dictionary of pyro distributions for each parameter in the model. num_steps: int (default: 100) Number of optimization steps. For SVI it is the `num_steps` argument for pyro.infer.SVI. For MCMC, it is the `num_samples` argument for pyro.infer.MCMC. seed: int (default: None) Seed value for random number generator. nuts_kwargs: dict (default: None) Only used if method is "MCMC". Additional arguments to pass to pyro.infer.NUTS. mcmc_kwargs: dict (default: None) Only used if method is "MCMC". Additional arguments to pass to pyro.infer.MCMC. Returns ------- dict: If method is "SVI", returns a dictionary of parameter values. If method is "MCMC", returns a dictionary of posterior samples for each parameter. Examples -------- >>> from pgmpy.factors.hybrid import FunctionalCPD >>> from pgmpy.models import FunctionalBayesianNetwork >>> import numpy as np >>> import pyro.distributions as dist >>> model = FunctionalBayesianNetwork([("x1", "x2")]) >>> x1 = np.random.normal(0.2, 0.8, size=10000) >>> x2 = np.random.normal(0.6 + x1, 1) >>> data = pd.DataFrame({"x1": x1, "x2": x2}) >>> def x1_fn(parents): ... mu = pyro.param("x1_mu", torch.tensor(1.0)) ... sigma = pyro.param("x1_sigma", torch.tensor(1.0), constraint=constraints.positive) ... return dist.Normal(mu, sigma) >>> def x2_fn(parents): ... intercept = pyro.param("x2_inter", torch.tensor(1.0)) ... sigma = pyro.param("x2_sigma", torch.tensor(1.0), constraint=constraints.positive) ... return dist.Normal(intercept + parents['x1'], sigma) >>> cpd1 = FunctionalCPD("x1", fn=x1_prior) >>> cpd2 = FunctionalCPD('x2', fn=x2_prior, parents=['x1']) >>> model.add_cpds(cpd1, cpd2) >>> params = model.fit(data, method="SVI", num_steps=100) >>> print(params) >>> def prior_fn(): ... return {"x1_mu": dist.Uniform(0, 1), "x1_sigma": dist.HalfNormal(5), ... "x2_inter": dist.Normal(1.0), "x2_sigma": dist.HalfNormal(1)} >>> def x1_fn(priors, parents): ... return dist.Normal(priors["x1_mu"], priors["x1_sigma"]) >>> def x2_fn(priors, parents): ... return dist.Normal(priors["x2_inter"] + parent['x1'], priors["x2_sigma"]) >>> cpd1 = FunctionalCPD("x1", fn=x1_fn) >>> cpd2 = FunctionalCPD('x2', fn=x2_fn, parents=['x1']) >>> model.add_cpds(cpd1, cpd2) >>> params = model.fit(data, method="MCMC", prior_fn=prior_fn, num_steps=100) >>> print(params["x1_mu"].mean(), params["x1_std"].mean()) """ # Step 0: Checks for specified arguments. if not isinstance(data, pd.DataFrame): raise ValueError( f"data should be a pandas.DataFrame object. Got: {type(data)}." ) if not isinstance(num_steps, int): raise ValueError(f"num_steps should be an integer. Got: {type(num_steps)}.") if method.lower() not in ["svi", "mcmc"]: raise ValueError( "Currently only SVI and MCMC methods are supported. method argument needs to be either 'SVI' or 'MCMC'." ) # Step 1: Preprocess the data and initialize data structures. if seed is not None: pyro.set_rng_seed(seed) sort_nodes = list(nx.topological_sort(self)) tensor_data = {} for node in sort_nodes: if node not in data.columns: raise ValueError(f"data doesn't contain column for the node: {node}.") else: tensor_data[node] = torch.tensor( data[node].values, dtype=config.get_dtype(), device=config.get_device(), ) nuts_kwargs = nuts_kwargs or {} mcmc_kwargs = mcmc_kwargs or {} cpds_dict = {node: self.get_cpds(node) for node in sort_nodes} # Step 2: Fit the model using the specified method. if method.lower() == "svi": def guide(tensor_data): pass # Step 2.1: Define the combined model for SVI. def combined_model_svi(tensor_data): with pyro.plate("data", data.shape[0]): for node in sort_nodes: pyro.sample( f"{node}", cpds_dict[node].fn( {p: tensor_data[p] for p in cpds_dict[node].parents} ), obs=tensor_data[node], ) # Step 2.2: Fit the model using SVI. svi = pyro.infer.SVI( model=combined_model_svi, guide=guide, optim=optimizer, loss=pyro.infer.Trace_ELBO(), ) for step in range(num_steps): loss = svi.step(tensor_data) if step % 50 == 0: logger.info(f"Step {step} | Loss: {loss:.4f}") # Step 3: Fit the model using specified method elif method.lower() == "mcmc": # Step 3.1: Define the combined model for MCMC. def combined_model_mcmc(tensor_data): priors = prior_fn() with pyro.plate("data", data.shape[0]): for node in sort_nodes: pyro.sample( f"{node}", cpds_dict[node].fn( priors, {p: tensor_data[p] for p in cpds_dict[node].parents}, ), obs=tensor_data[node], ) # Step 3.2: Fit the model using MCMC. nuts_kernel = pyro.infer.NUTS(combined_model_mcmc, **nuts_kwargs) mcmc = pyro.infer.MCMC(nuts_kernel, num_samples=num_steps, **mcmc_kwargs) mcmc.run(tensor_data) # Step 4: Return the fitted parameter values. if method.lower() == "svi": return dict(pyro.get_param_store().items()) else: return mcmc.get_samples()