Source code for pycmtensor.functions

# functions.py
"""PyCMTensor functions module"""
from ctypes import util
from typing import Union

import aesara.tensor as aet
import aesara.tensor.nlinalg as nlinalg
import numpy as np
from aesara.tensor.sharedvar import TensorSharedVariable
from aesara.tensor.var import TensorVariable

from pycmtensor.expressions import Beta, Param

from .logger import error, log

__all__ = [
    "exp_mov_average",
    "logit",
    "loglikelihood",
    "rmse",
    "mae",
    "kl_divergence",
    "kl_multivar_norm",
    "errors",
    "hessians",
    "bhhh",
    "gnorm",
]


[docs]def exp_mov_average( batch_avg: TensorVariable, moving_avg: TensorVariable, alpha: float = 0.1 ): """Calculates the exponential moving average (EMA) of a new minibatch Args: batch_avg (TensorVariable): the new batch value of the mean moving_avg (TensorVariable): the moving value of the accumulated mean alpha (float): the moving average factor of the batch mean Returns: TensorVariable: the new moving average Note: The moving average will decay by the difference between the existing value and the new value multiplied by the moving average factor. A higher ``alpha`` value results in faster changing moving average. Formula: .. math:: x_{EMA} = \\alpha * x_t + x_{EMA} * (1-\\alpha) """ while moving_avg.ndim < batch_avg.ndim: moving_avg = aet.expand_dims(moving_avg, -1) ema = batch_avg * alpha + moving_avg * (1 - alpha) return ema
[docs]def logit( utility: Union[list, tuple, TensorVariable], avail: Union[list, tuple, TensorVariable] = None, ): """Computes the Logit function, with availability conditions. Args: utility (list, tuple, TensorVariable): list of :math:`M` utility equations avail (list, tuple, TensorVariable): list of :math:`M` availability conditions, if no availability conditions are provided, defaults to 1 for all availabilities. Returns: TensorVariable: A NxM matrix of probabilities. Note: The 0-th dimension is the numbering of alternatives. """ if isinstance(utility, (list, tuple)): if (avail != None) and (len(utility) != len(avail)): msg = f"{utility} must have the same length as {avail}" raise ValueError(msg) _utility = utility for n, u in enumerate(_utility): if not isinstance(u, (TensorVariable, TensorSharedVariable)): utility[n] = u() if utility[n].ndim == 0: utility[n] = aet.expand_dims(utility[n], -1) U = aet.stack(utility) elif isinstance(utility, TensorVariable): U = utility else: raise NotImplementedError( f"utility {utility} has to be a list, tuple or TensorVariable instance" ) prob = aet.special.softmax(U, axis=0) if avail != None: AV = aet.stack(avail) while AV.ndim < prob.ndim: AV = aet.expand_dims(AV, 1) prob = prob * AV prob = prob / aet.sum(prob, axis=0, keepdims=1) return prob
def log_likelihood(prob: TensorVariable, y: TensorVariable): """Symbolic representation of the log likelihood cost function. Args: prob (TensorVariable): matrix describing the choice probabilites y (TensorVariable): ``TensorVariable`` referencing the choice column Returns: TensorVariable: a symbolic representation of the log likelihood with ndim=0. Note: The 0-th dimension is the numbering of alternatives. """ return aet.sum(aet.log(prob)[y, ..., aet.arange(y.shape[0])])
[docs]def rmse(y_hat: TensorVariable, y: TensorVariable): """Computes the root mean squared error (RMSE) between pairs of observations Args: y_hat (TensorVariable): model estimated values y (TensorVariable): ground truth values Returns: TensorVariable: symbolic scalar representation of the rmse Note: Tensor is flattened to an N-vector if the input args are :math:`N\\times 1` matrices. Formula: .. math:: RMSE = \\sqrt{\\frac{1}{N}\\sum_{i=1}^N(\\hat{y}_i-y_i)^2} """ if y_hat.ndim != y.ndim: msg = f"y_hat should have the same dimensions as y. y_hat.ndim: {y_hat.ndim}, q.ndim: {y.ndim}" raise ValueError(msg) if y_hat.ndim < 1: y_hat = aet.flatten(y_hat) y = aet.flatten(y) return aet.sqrt(aet.mean(aet.sqr(y_hat - y)))
[docs]def mae(y_hat: TensorVariable, y: TensorVariable): """Computes the mean absolute error (MAE) between pairs of observations Args: y_hat (TensorVariable): model estimated values y (TensorVariable): ground truth values Returns: TensorVariable: symbolic scalar representation of the mae Note: Tensor is flattened to an N-vector if the input args are :math:`N\\times 1` matrices. Formula: .. math:: MAE = \\frac{\sum_{i=1}^N|\\hat{y}_i-y_i|}{N} """ if y_hat.ndim != y.ndim: msg = f"y_hat should have the same dimensions as y. y_hat.ndim: {y_hat.ndim}, q.ndim: {y.ndim}" raise ValueError(msg) if y_hat.ndim < 1: y_hat = aet.flatten(y_hat) y = aet.flatten(y) return aet.mean(aet.abs(y_hat - y))
[docs]def kl_divergence(p: TensorVariable, q: TensorVariable): """Computes the KL divergence loss between discrete distributions ``p`` and ``q``. Args: p (TensorVariable): model output probabilities q (TensorVariable): ground truth probabilities Returns: TensorVariable: a symbolic representation of the KL loss with Note: Formula: .. math:: L = \\begin{cases} \\sum_{i=1}^N (p_i * log(p_i/q_i)) & p>0\\\\ 0 & p<=0 \\end{cases} """ if p.ndim != q.ndim: msg = f"p should have the same shape as q. p.ndim: {p.ndim}, q.ndim: {q.ndim}" raise ValueError(msg) return aet.sum(aet.switch(aet.neq(p, 0), p * aet.log(p / q), 0))
[docs]def kl_multivar_norm(m0, v0, m1, v1, epsilon=1e-6): """Computes the KL divergence loss between two multivariate normal distributions. Args: m0: mean vector of the first Normal m.v. distribution :math:`N_0` v0: (co-)variance matrix of the first Normal m.v. distribution :math:`N_0` m1: mean vector of the second Normal m.v. distribution :math:`N_1` v1: (co-)variance of the second Normal m.v. distribution :math:`N_1` epsilon (float): small value to prevent divide-by-zero error Note: k = dimension of the distribution. Formula: .. math:: D_{KL}(N_0||N_1) = 0.5 * \\Big(\\ln\\big(\\frac{|v_1|}{|v_0|}\\big) + trace(v_1^{-1} v_0) + (m_1-m_0)^T v_1^{-1} (m_1-m_0) - k\\Big) In variational inference, the kl divergence is the relative entropy between a diagonal multivariate Normal and a standard Normal distribution, N(0, 1), therefore, for VI, ``m1=aet.constant(0)``, ``v1=aet.constant(1)`` For two univariate distributions, dimensions of m0,m1,v0,v1 = 0 """ if not ( (m0.ndim >= m1.ndim) and (v0.ndim >= v1.ndim) and (m0.ndim <= 1) and (v0.ndim <= 2) ): msg = f"Incorrect dimensions inputs: m0.ndim={m0.ndim}, v0.ndim={v0.ndim}, m1.ndim={m1.ndim}, v1.ndim={v1.ndim}" raise ValueError(msg) # VI KL divergence if (m1.ndim == v1.ndim == 0) and (m0.ndim == 1) and (v0.ndim == 2): v0 = aet.diag(v0) kld = 0.5 * aet.sum(v0 + aet.sqr(m0) - 1 - aet.log(v0)) # univariate KL divergence elif (m0.ndim == v0.ndim == 0) and (m1.ndim == v1.ndim == 0): kld = ( aet.log(aet.sqrt(v1 / (v0 + epsilon))) + 0.5 * (v0 + aet.sqr(m0 - m1)) / v1 - 0.5 ) # multivariate KL divergence else: k = m0.shape[0] v1_inv = nlinalg.inv(v1) det_term = aet.log(nlinalg.det(v1) / nlinalg.det(v0)) trace_term = nlinalg.trace(aet.dot(v1_inv, v0)) prod_term = aet.dot((m1 - m0).T, aet.dot(v1_inv, (m1 - m0))) kld = aet.sum(det_term + trace_term + prod_term - k) return kld
[docs]def errors(prob: TensorVariable, y: TensorVariable): """Symbolic representation of the discrete prediction as a percentage error. Args: prob (TensorVariable): matrix describing the choice probabilites y (TensorVariable): the ``TensorVariable`` referencing the choice column Returns: TensorVariable: the mean prediction error over the input ``y`` """ pred = aet.argmax(prob, axis=0) if y.dtype.startswith("int"): return aet.mean(aet.neq(pred, y)) else: raise NotImplementedError(f"y should be int32 or int64", ("y.dtype:", y.dtype))
[docs]def hessians(ll: TensorVariable, params: list[Beta]): """Symbolic representation of the Hessian matrix given the log likelihood. Args: ll (TensorVariable): the loglikelihood to compute the gradients over params (list): list of params to compute the gradients over Returns: TensorVariable: the Hessian matrix Note: Parameters with status=1 are ignored. """ if not isinstance(params, list): raise TypeError(f"params is not list instance. type(params)={type(params)}") params = [p() for p in params if (p.status != 1)] grads = aet.grad(ll, params, disconnected_inputs="ignore") mat = aet.as_tensor_variable(np.zeros((len(grads), len(grads)))) for i in range(len(grads)): mat = aet.set_subtensor( x=mat[i, :], y=aet.grad(grads[i], params, disconnected_inputs="ignore"), ) return mat
[docs]def bhhh(ll: TensorVariable, params: list[Beta]): """Symbolic representation of the Berndt-Hall-Hall-Hausman (BHHH) algorithm given the log likelihood. Args: ll (TensorVariable): the loglikelihood to compute the gradients over params (list): list of params to compute the gradients over Returns: TensorVariable: the outer product of the gradient with ndim=2 Note: Parameters with status=1 are ignored. """ if not isinstance(params, (dict, list)): raise TypeError( f"params is not list or dict instance. type(params)={type(params)}" ) if isinstance(params, dict): params = list(params.values()) params = [p() for p in params if (p.status != 1)] grads = aet.grad(ll, params, disconnected_inputs="ignore") mat = aet.outer(aet.as_tensor_variable(grads), aet.as_tensor_variable(grads).T) return mat
[docs]def gnorm(cost: TensorVariable, params: list[Beta]): """Symbolic representation of the gradient norm given the cost. Args: cost (TensorVariable): the cost to compute the gradients over params (list): list of params to compute the gradients over Returns: TensorVariable: the gradient norm value Note: Parameters with status=1 are ignored. """ if not isinstance(params, (dict, list)): raise TypeError( f"params is not list or dict instance. type(params)={type(params)}" ) if isinstance(params, dict): params = list(params.values()) params = [p() for p in params if (p.status != 1)] grads = aet.grad(cost, params, disconnected_inputs="ignore") return nlinalg.norm(grads, ord=None)