Source code for pycmtensor.optimizers

# optimizers.py
"""PyCMTensor optimizers module"""
import aesara
import aesara.tensor as aet
from aesara import shared
from aesara.tensor.sharedvar import TensorSharedVariable

from .data import FLOATX

__all__ = [
    "Adam",
    "Nadam",
    "Adamax",
    "Adadelta",
    "RMSProp",
    "Momentum",
    "NAG",
    "Adagrad",
    "SGD",
]


class Optimizer:
    def __init__(self, name, epsilon=1e-8, **kwargs):
        """Base optimizer class

        Args:
            name (str): name of the optimizer

        """
        self.name = name
        self._epsilon = epsilon

    @property
    def epsilon(self):
        return self._epsilon

    def __repr__(self):
        return f"{self.name}"


[docs]class Adam(Optimizer): def __init__(self, params: list, b1: float = 0.9, b2: float = 0.999, **kwargs): """An optimizer that implments the Adam algorithm [#]_ Args: params (list): a list of ``TensorSharedVariable`` b1 (float, optional): exponential decay rate for the 1st moment estimates. Defaults to ``0.9`` b2 (float, optional): exponential decay rate for the 2nd moment estimates. Defaults to ``0.999`` .. [#] Kingma et al., 2014. Adam: A Method for Stochastic Optimization. http://arxiv.org/abs/1412.6980 """ super().__init__(name="Adam") self.b1 = b1 self.b2 = b2 self._t = aesara.shared(1.0) self._m = [shared(aet.zeros_like(p()).eval()) for p in params if p.status != 1] self._v = [shared(aet.zeros_like(p()).eval()) for p in params if p.status != 1] @property
[docs] def t(self): return self._t
@property
[docs] def m_prev(self): return self._m
@property
[docs] def v_prev(self): return self._v
[docs] def update(self, cost, params: list, lr: float = 0.001): """Generate a list of updates Args: cost (TensorVariable): a scalar element for the expression of the cost function where the derivatives are calculated params (list): a list of ``TensorSharedVariable`` lr (float, optional): learning rate. Defaults to 0.001 Returns: list: a list of tuples of ``(p, p_t), (m, m_t), (v, v_t), (t, t_new)`` """ params = [p() for p in params if p.status != 1] grads = aet.grad(cost, params, disconnected_inputs="ignore") updates = [] t_new = self.t + 1.0 for m, v, param, grad in zip(self.m_prev, self.v_prev, params, grads): m_t = self.b1 * m + (1.0 - self.b1) * grad m_t_hat = m_t / (1.0 - aet.pow(self.b1, self.t)) v_t = self.b2 * v + (1.0 - self.b2) * aet.sqr(grad) v_t_hat = v_t / (1.0 - aet.pow(self.b2, self.t)) g_t = lr * m_t_hat / (aet.sqrt(v_t_hat) + self.epsilon) p_t = param - g_t updates.append((m, m_t)) updates.append((v, v_t)) updates.append((param, p_t)) updates.append((self.t, t_new)) return updates
[docs]class Nadam(Adam): def __init__(self, params: list, b1: float = 0.99, b2: float = 0.999, **kwargs): """An optimizer that implements the Nesterov Adam algorithm [#]_ Args: params (list): a list of ``TensorSharedVariable`` b1 (float, optional): exponential decay rate for the 1st moment estimates. Defaults to ``0.9`` b2 (float, optional): exponential decay rate for the 2nd moment estimates. Defaults to ``0.999`` .. [#] Dozat, T., 2016. Incorporating nesterov momentum into adam.(2016). Dostupné z: http://cs229.stanford.edu/proj2015/054_report.pdf. """ super().__init__(params, b1, b2) self.name = "Nadam"
[docs] def update(self, cost, params: list, lr: float = 0.001): """Generate a list of updates Args: cost (TensorVariable): a scalar element for the expression of the cost function where the derivatives are calculated params (list): a list of ``TensorSharedVariable`` lr (float, optional): learning rate. Defaults to 0.001 Returns: list: a list of tuples of ``(p, p_t), (m, m_t), (v, v_t), (t, t_new)`` """ params = [p() for p in params if p.status != 1] grads = aet.grad(cost, params, disconnected_inputs="ignore") updates = [] t_new = self.t + 1.0 b1 = self.b1 * (1 - 0.5 * aet.pow(0.96, self.t / 250)) b1_t = self.b1 * (1 - 0.5 * aet.pow(0.96, t_new / 250)) for m, v, param, grad in zip(self.m_prev, self.v_prev, params, grads): g_t = grad / (1.0 - aet.pow(self.b1, self.t)) m_t = self.b1 * m + (1.0 - self.b1) * grad m_t_hat = m_t / (1.0 - aet.pow(self.b1, t_new)) v_t = self.b2 * v + (1.0 - self.b2) * aet.sqr(grad) v_t_hat = v_t / (1.0 - aet.pow(self.b2, self.t)) m_t_hat = (1 - b1) * g_t + b1_t * m_t_hat g_t_hat = lr * m_t_hat / (aet.sqrt(v_t_hat) + self.epsilon) p_t = param - g_t_hat updates.append((m, m_t)) updates.append((v, v_t)) updates.append((param, p_t)) updates.append((self.t, t_new)) return updates
[docs]class Adamax(Adam): def __init__(self, params: list, b1: float = 0.9, b2: float = 0.999, **kwargs): """An optimizer that implements the Adamax algorithm [#]_. It is a variant of the Adam algorithm Args: params (list): a list of ``TensorSharedVariable`` b1 (float, optional): exponential decay rate for the 1st moment estimates. Defaults to ``0.9`` b2 (float, optional): exponential decay rate for the 2nd moment estimates. Defaults to ``0.999`` .. [#] Kingma et al., 2014. Adam: A Method for Stochastic Optimization. http://arxiv.org/abs/1412.6980 """ super().__init__(params, b1, b2) self.name = "Adamax"
[docs] def update(self, cost, params: list, lr: float = 0.001): """Caller to the optimizer class to generate a list of updates Args: cost (TensorVariable): a scalar element for the expression of the cost function where the derivatives are calculated params (list): a list of ``TensorSharedVariable`` lr (float, optional): learning rate. Defaults to ``0.001`` Returns: list: a list of tuples of ``(p, p_t), (m, m_t), (v, v_t), (t, t_new)`` """ params = [p() for p in params if p.status != 1] grads = aet.grad(cost, params, disconnected_inputs="ignore") updates = [] t_new = self.t + 1.0 a_t = lr / (1.0 - aet.pow(self.b1, self.t)) for m, v, param, grad in zip(self.m_prev, self.v_prev, params, grads): m_t = self.b1 * m + (1.0 - self.b1) * grad v_t = aet.maximum(self.b2 * v, aet.abs(grad)) g_t = a_t * m_t / (v_t + self.epsilon) p_t = param - g_t updates.append((m, m_t)) updates.append((v, v_t)) updates.append((param, p_t)) updates.append((self.t, t_new)) return updates
[docs]class Adadelta(Optimizer): def __init__(self, params: list, rho: float = 0.95, **kwargs): """An optimizer that implements the Adadelta algorithm [#]_ Adadelta is a stochastic gradient descent method that is based on adaptive learning rate per dimension to address two drawbacks: - The continual decay of learning rates throughout training - The need for a manually selected global learning rate Args: params (list): a list of ``TensorSharedVariable`` rho (float, optional): the decay rate for learning rate. Defaults to ``0.95`` .. [#] Zeiler, 2012. ADADELTA: An Adaptive Learning Rate Method. http://arxiv.org/abs/1212.5701 """ super().__init__(name="Adadelta") self.rho = rho self._accu = [ shared(aet.zeros_like(p()).eval()) for p in params if p.status != 1 ] self._delta = [ shared(aet.zeros_like(p()).eval()) for p in params if p.status != 1 ] @property
[docs] def accumulator(self): return self._accu
@property
[docs] def delta(self): return self._delta
[docs] def update(self, cost, params: list, lr: float = 1.0): """Caller to the optimizer class to generate a list of updates Args: cost (TensorVariable): a scalar element for the expression of the cost function where the derivatives are calculated params (list): a list of ``TensorSharedVariable`` lr (float, optional): learning rate. Defaults to ``1.0`` Returns: list: a list of tuples of ``(param, param_new), (a, a_t), (d, d_t)`` .. Note:: Since the Adadelta algorithm uses an adaptive learning rate, the learning rate is set to ``1.0`` """ params = [p() for p in params if p.status != 1] grads = aet.grad(cost, params, disconnected_inputs="ignore") updates = [] for accu, d, param, grad in zip(self.accumulator, self.delta, params, grads): # update accumulator accu_t = self.rho * accu + (1.0 - self.rho) * grad**2 # compute parameter update, using previous delta g_t = grad * aet.sqrt(d + self.epsilon) / aet.sqrt(accu_t + self.epsilon) p_t = param - lr * g_t d_t = self.rho * d + (1.0 - self.rho) * g_t**2 updates.append((param, p_t)) updates.append((accu, accu_t)) updates.append((d, d_t)) return updates
[docs]class RMSProp(Optimizer): def __init__(self, params: list, rho: float = 0.9, **kwargs): """An optimizer that implements the RMSprop algorithm [#]_ Args: params (list): a list of ``TensorSharedVariable`` rho (float, optional): discounting factor for the history/coming gradient. Defaults to ``0.9`` .. [#] Hinton, 2012. rmsprop: Divide the gradient by a running average of its recent magnitude. http://www.cs.toronto.edu/~tijmen/csc321/slides/lecture_slides_lec6.pdf """ super().__init__(name="RMSProp") self.rho = rho self._accu = [ shared(aet.zeros_like(p()).eval()) for p in params if p.status != 1 ] @property
[docs] def accumulator(self): return self._accu
[docs] def update(self, cost, params: list, lr: float = 0.001): """Caller to the optimizer class to generate a list of updates Args: cost (TensorVariable): a scalar element for the expression of the cost function where the derivatives are calculated params (list): a list of ``TensorSharedVariable`` lr (float, optional): learning rate. Defaults to ``0.001`` Returns: list: a list of tuples of ``(param, param_new), (a, a_t)`` """ params = [p() for p in params if p.status != 1] grads = aet.grad(cost, params, disconnected_inputs="ignore") updates = [] for accu, param, grad in zip(self.accumulator, params, grads): accu_t = self.rho * accu + (1.0 - self.rho) * aet.sqr(grad) g_t = lr / aet.sqrt(accu_t + self.epsilon) * grad p_t = param - g_t updates.append((accu, accu_t)) updates.append((param, p_t)) return updates
[docs]class Momentum(Optimizer): def __init__(self, params: list, mu: float = 0.9, **kwargs): """An optimizer that implements the Momentum algorithm [#]_ Args: params (list): a list of ``TensorSharedVariable`` mu (float, optional): acceleration factor in the relevant direction and dampens oscillations. Defaults to ``0.9`` .. [#] Sutskever et al., 2013. On the importance of initialization and momentum in deep learning. http://jmlr.org/proceedings/papers/v28/sutskever13.pdf """ super().__init__(name="Momentum") self.mu = mu self._v = [shared(aet.zeros_like(p()).eval()) for p in params if p.status != 1] @property
[docs] def velocity(self): return self._v
[docs] def update(self, cost, params: list, lr: float = 0.001): """Caller to the optimizer class to generate a list of updates Args: cost (TensorVariable): a scalar element for the expression of the cost function where the derivatives are calculated params (list): a list of ``TensorSharedVariable`` lr (float, optional): the learning rate. Defaults to ``0.001`` Returns: list: a list of tuples of ``(param, param_new), (v, v_t)`` """ params = [p() for p in params if p.status != 1] grads = aet.grad(cost, params, disconnected_inputs="ignore") updates = [] for v, param, grad in zip(self.velocity, params, grads): v_t = self.mu * v + grad p_t = param - lr * v_t updates.append((v, v_t)) updates.append((param, p_t)) return updates
[docs]class NAG(Momentum): def __init__(self, params: list, mu: float = 0.99, **kwargs): """An optimizer that implements the Nestrov Accelerated Gradient algorithm [#]_ Args: params (list): a list of ``TensorSharedVariable`` mu (float, optional): acceleration factor in the relevant direction and dampens oscillations. Defaults to ``0.9`` .. [#] Sutskever et al., 2013. On the importance of initialization and momentum in deep learning. http://jmlr.org/proceedings/papers/v28/sutskever13.pdf """ super().__init__(params, mu) self.name = "NAG" self._t = aesara.shared(0.0) @property
[docs] def t(self): return self._t
[docs] def update(self, cost, params: list, lr: float = 0.001): """Caller to the optimizer class to generate a list of updates Args: cost (TensorVariable): a scalar element for the expression of the cost function where the derivatives are calculated params (list): a list of ``TensorSharedVariable`` lr (float, optional): the learning rate. Defaults to ``0.001`` Returns: list: a list of tuples of ``(param, param_new), (v, v_t)`` """ params = [p() for p in params if p.status != 1] grads = aet.grad(cost, params, disconnected_inputs="ignore") updates = [] t_new = self.t + 1.0 mu = self.mu * (1 - 0.5 * aet.pow(0.96, self.t / 250)) mu_t = self.mu * (1 - 0.5 * aet.pow(0.96, t_new / 250)) for v, param, grad in zip(self.velocity, params, grads): v_t = mu * v + grad v_t_hat = grad + mu_t * v_t p_t = param - lr * v_t_hat updates.append((v, v_t)) updates.append((param, p_t)) return updates
class AdaGrad(Optimizer): def __init__(self, params: list, **kwargs): """An optimizer that implements the Adagrad algorithm [#]_ Adagrad is an optimizer with parameter-specific learning rates, which are adapted relative to how frequently a parameter gets updated during training. The more updates a parameter receives, the smaller the updates. Args: params (list): a list of ``TensorSharedVariable`` .. [#] Duchi et al., 2011. Adaptive Subgradient Methods for Online Learning and Stochastic Optimization. https://www.jmlr.org/papers/volume12/duchi11a/duchi11a.pdf """ super().__init__(name="AdaGrad") self._accu = [ shared(aet.zeros_like(p()).eval()) for p in params if p.status != 1 ] @property def accumulator(self): return self._accu def update(self, cost, params: list, lr: float = 1.0): """Caller to the optimizer class to generate a list of updates Args: cost (TensorVariable): a scalar element for the expression of the cost function where the derivatives are calculated params (list): a list of ``TensorSharedVariable`` lr (float, optional): the learning rate. Defaults to ``1.0`` Returns: list: a list of tuples of ``(param, param_new), (accu, accu_t)`` """ params = [p() for p in params if p.status != 1] grads = aet.grad(cost, params, disconnected_inputs="ignore") updates = [] for param, grad, accu in zip(params, grads, self.accumulator): accu_t = accu + aet.sqr(grad) g_t = lr / aet.sqrt(accu_t + self.epsilon) * grad p_t = param - g_t updates.append((accu, accu_t)) updates.append((param, p_t)) return updates
[docs]class SGD(Optimizer): def __init__(self, params: list, **kwargs): """An optimizer that implements the stochastic gradient algorithm Args: params (list): a list of ``TensorSharedVariable`` """ super().__init__(name="SGD")
[docs] def update(self, cost, params: list, lr: float = 0.001): """Caller to the optimizer class to generate a list of updates Args: cost (TensorVariable): a scalar element for the expression of the cost function where the derivatives are calculated params (list): a list of ``TensorSharedVariable`` lr (float, optional): the learning rate. Defaults to ``0.001`` Returns: list: a list of ``(param, param_new)`` tuples """ params = [p() for p in params if p.status != 1] grads = aet.grad(cost, params, disconnected_inputs="ignore") updates = [] for param, grad in zip(params, grads): p_t = param - lr * grad updates.append((param, p_t)) return updates