Source code for pycmtensor.optimizers

# optimizers.py
"""PyCMTensor optimizers module"""
import aesara
import aesara.tensor as aet

floatX = aesara.config.floatX


[docs]class Optimizer: def __init__(self, params, name, b1=0.0, b2=0.0, m=0.0, rho=0.0, epsilon=1e-8): """Base optimizer class Args: params (list): a list of :class:`expressions.TensorVariable` type objects. Used for constructing optimizer parameters. """ self.name = name self._m = [] self._v = [] self._accu = [] self._delta = [] self._velocity = [] for param in params: if param.status == 1: continue p = param() if (b1 > 0.0) and (b2 > 0.0): self._m.append(aesara.shared(aet.zeros_like(p).eval())) self._v.append(aesara.shared(aet.zeros_like(p).eval())) if rho > 0.0: self._accu.append(aesara.shared(aet.zeros_like(p).eval())) self._delta.append(aesara.shared(aet.zeros_like(p).eval())) if m > 0.0: self._velocity.append(aesara.shared(aet.zeros_like(p).eval())) self.epsilon = epsilon self._t = aesara.shared(0.0) self.b1 = b1 self.b2 = b2 self.rho = rho self.m = m def __repr__(self): return f"{self.name}"
[docs]class Adam(Optimizer): def __init__(self, params: list, b1=0.9, b2=0.999, **kwargs): """An optimizer that implments the Adam algorithm [#]_ Args: params (list): a list of :class:`Betas` and/or :class:`Weights` b1 (float, optional): exponential decay rate for the 1st moment estimates. Defaults to ``0.9`` b2 (float, optional): exponential decay rate for the 2nd moment estimates. Defaults to ``0.999`` .. [#] Kingma et al., 2014. Adam: A Method for Stochastic Optimization. http://arxiv.org/abs/1412.6980 """ super().__init__(params, name="Adam", b1=b1, b2=b2)
[docs] def update(self, cost, params: list, lr=0.001): """Caller to the optimizer class to generate a list of updates Args: cost (TensorVariable): a scalar element for the expression of the cost function where the derivatives are calculated params (list): a list of :class:`Betas` and/or :class:`Weights` lr (float, optional): learning rate. Defaults to 0.001 Returns: list: a list of tuples of ``(p, p_t), (m, m_t), (v, v_t), (t, t_new)`` """ params = [p() for p in params if p.status != 1] grads = aet.grad(cost, params, disconnected_inputs="ignore") updates = [] t = self._t m_prev = self._m v_prev = self._v t_new = t + 1.0 a_t = aet.sqrt(1.0 - self.b2**t_new) / (1.0 - self.b1**t_new) for m, v, param, grad in zip(m_prev, v_prev, params, grads): m_t = self.b1 * m + (1.0 - self.b1) * grad v_t = self.b2 * v + (1.0 - self.b2) * grad**2 g_t = lr * a_t * m_t / (aet.sqrt(v_t) + self.epsilon) p_t = param - g_t updates.append((m, m_t)) updates.append((v, v_t)) updates.append((param, p_t)) updates.append((t, t_new)) return updates
[docs]class Adamax(Optimizer): def __init__(self, params: list, b1=0.9, b2=0.999, **kwargs): """An optimizer that implements the Adamax algorithm [#]_. It is a variant of the Adam algorithm Args: params (list): a list of :class:`Betas` and/or :class:`Weights` b1 (float, optional): exponential decay rate for the 1st moment estimates. Defaults to ``0.9`` b2 (float, optional): exponential decay rate for the 2nd moment estimates. Defaults to ``0.999`` .. [#] Kingma et al., 2014. Adam: A Method for Stochastic Optimization. http://arxiv.org/abs/1412.6980 """ super().__init__(params, name="Adamax", b1=b1, b2=b2)
[docs] def update(self, cost, params: list, lr=0.001): """Caller to the optimizer class to generate a list of updates Args: cost (TensorVariable): a scalar element for the expression of the cost function where the derivatives are calculated params (list): a list of :class:`Betas` and/or :class:`Weights` lr (float, optional): learning rate. Defaults to ``0.001`` Returns: list: a list of tuples of ``(p, p_t), (m, m_t), (v, v_t), (t, t_new)`` """ params = [p() for p in params if p.status != 1] grads = aet.grad(cost, params, disconnected_inputs="ignore") updates = [] t = self._t m_prev = self._m v_prev = self._v t_new = t + 1.0 a_t = lr / (1.0 - self.b1**t) for m, v, param, grad in zip(m_prev, v_prev, params, grads): m_t = self.b1 * m + (1.0 - self.b1) * grad v_t = aet.maximum(self.b2 * v, abs(grad)) g_t = a_t * m_t / (v_t + self.epsilon) p_t = param - g_t updates.append((m, m_t)) updates.append((v, v_t)) updates.append((param, p_t)) updates.append((t, t_new)) return updates
[docs]class Adadelta(Optimizer): def __init__(self, params: list, rho=0.95, **kwargs): """An optimizer that implements the Adadelta algorithm [#]_ Adadelta is a stochastic gradient descent method that is based on adaptive learning rate per dimension to address two drawbacks: - The continual decay of learning rates throughout training - The need for a manually selected global learning rate Args: params (list): a list of :class:`Betas` and/or :class:`Weights` rho (float, optional): the decay rate for learning rate. Defaults to ``0.95`` .. [#] Zeiler, 2012. ADADELTA: An Adaptive Learning Rate Method. http://arxiv.org/abs/1212.5701 """ super().__init__(params, name="Adadelta", rho=rho)
[docs] def update(self, cost, params: list, lr=1.0): """Caller to the optimizer class to generate a list of updates Args: cost (TensorVariable): a scalar element for the expression of the cost function where the derivatives are calculated params (list): a list of :class:`Betas` and/or :class:`Weights` lr (float, optional): learning rate. Defaults to ``1.0`` Returns: list: a list of tuples of ``(param, param_new), (a, a_t), (d, d_t)`` .. Note:: Since the Adadelta algorithm uses an adaptive learning rate, the learning rate is set to ``1.0`` """ params = [p() for p in params if p.status != 1] grads = aet.grad(cost, params, disconnected_inputs="ignore") updates = [] accumulator = self._accu delta = self._delta for a, d, param, grad in zip(accumulator, delta, params, grads): # update accumulator a_t = self.rho * a + (1.0 - self.rho) * grad**2 # compute parameter update, using previous delta g_t = grad * aet.sqrt(d + self.epsilon) / aet.sqrt(a_t + self.epsilon) p_t = param - lr * g_t d_t = self.rho * d + (1.0 - self.rho) * g_t**2 updates.append((param, p_t)) updates.append((a, a_t)) updates.append((d, d_t)) return updates
[docs]class RMSProp(Optimizer): def __init__(self, params: list, rho=0.9, **kwargs): """An optimizer that implements the RMSprop algorithm [#]_ Args: params (list): a list of :class:`Betas` and/or :class:`Weights` rho (float, optional): discounting factor for the history/coming gradient. Defaults to ``0.9`` .. [#] Hinton, 2012. rmsprop: Divide the gradient by a running average of its recent magnitude. http://www.cs.toronto.edu/~tijmen/csc321/slides/lecture_slides_lec6.pdf """ super().__init__(params, name="RMSProp", rho=rho)
[docs] def update(self, cost, params: list, lr=0.001): """Caller to the optimizer class to generate a list of updates Args: cost (TensorVariable): a scalar element for the expression of the cost function where the derivatives are calculated params (list): a list of :class:`Betas` and/or :class:`Weights` lr (float, optional): learning rate. Defaults to ``0.001`` Returns: list: a list of tuples of ``(param, param_new), (a, a_t)`` """ params = [p() for p in params if p.status != 1] grads = aet.grad(cost, params, disconnected_inputs="ignore") updates = [] accumulator = self._accu for a, param, grad in zip(accumulator, params, grads): a_t = self.rho * a + (1.0 - self.rho) * grad**2 g_t = lr / aet.sqrt(a_t + self.epsilon) * grad p_t = param - g_t updates.append((a, a_t)) updates.append((param, p_t)) return updates
[docs]class Momentum(Optimizer): def __init__(self, params: list, momentum=0.9, nesterov=True, **kwargs): """An optimizer that implements the Momentum algorithm [#]_ Args: params (list): a list of :class:`Betas` and/or :class:`Weights` momentum (float, optional): acceleration factor in the relevant direction and dampens oscillations. Defaults to ``0.9`` nesterov (bool, optional): whether to apply Nesterov momentum. Defaults to ``False`` .. [#] Sutskever et al., 2013. On the importance of initialization and momentum in deep learning. http://jmlr.org/proceedings/papers/v28/sutskever13.pdf """ super().__init__(params, name="Momentum", m=momentum) self.nesterov = nesterov if self.nesterov: self.name = "NAG"
[docs] def update(self, cost, params: list, lr=0.001): """Caller to the optimizer class to generate a list of updates Args: cost (TensorVariable): a scalar element for the expression of the cost function where the derivatives are calculated params (list): a list of :class:`Betas` and/or :class:`Weights` lr (float, optional): the learning rate. Defaults to ``0.001`` Returns: list: a list of tuples of ``(param, param_new), (v, v_t)`` """ params = [p() for p in params if p.status != 1] grads = aet.grad(cost, params, disconnected_inputs="ignore") updates = [] velocity = self._velocity for v, param, grad in zip(velocity, params, grads): v_t = self.m * v - lr * grad if self.nesterov: g_t = self.m * v_t - lr * grad else: g_t = v_t p_t = param + g_t updates.append((v, v_t)) updates.append((param, p_t)) return updates
[docs]class AdaGrad(Optimizer): def __init__(self, params: list, **kwargs): """An optimizer that implements the Adagrad algorithm [#]_ Adagrad is an optimizer with parameter-specific learning rates, which are adapted relative to how frequently a parameter gets updated during training. The more updates a parameter receives, the smaller the updates. Args: params (list): a list of :class:`Betas` and/or :class:`Weights` .. [#] Duchi et al., 2011. Adaptive Subgradient Methods for Online Learning and Stochastic Optimization. https://www.jmlr.org/papers/volume12/duchi11a/duchi11a.pdf """ super().__init__(params, name="AdaGrad", rho=0.1)
[docs] def update(self, cost, params: list, lr=1.0): """Caller to the optimizer class to generate a list of updates Args: cost (TensorVariable): a scalar element for the expression of the cost function where the derivatives are calculated params (list): a list of :class:`Betas` and/or :class:`Weights` lr (float, optional): the learning rate. Defaults to ``1.0`` Returns: list: a list of tuples of ``(param, param_new), (a, a_t)`` """ params = [p() for p in params if p.status != 1] grads = aet.grad(cost, params, disconnected_inputs="ignore") updates = [] accumulator = self._accu for param, grad, a in zip(params, grads, accumulator): a_t = a + grad**2 g_t = lr / aet.sqrt(a_t + self.epsilon) * grad p_t = param - g_t updates.append((a, a_t)) updates.append((param, p_t)) return updates
[docs]class SGD(Optimizer): def __init__(self, params: list, **kwargs): """An optimizer that implements the stochastic gradient algorithm Args: params (list): a list of :class:`Betas` and/or :class:`Weights` """ super().__init__(params, name="SGD")
[docs] def update(self, cost, params: list, lr=0.001): """Caller to the optimizer class to generate a list of updates Args: cost (TensorVariable): a scalar element for the expression of the cost function where the derivatives are calculated params (list): a list of :class:`Betas` and/or :class:`Weights` lr (float, optional): the learning rate. Defaults to ``0.001`` Returns: list: a list of tuples of ``(param, param_new)`` """ params = [p() for p in params if p.status != 1] grads = aet.grad(cost, params, disconnected_inputs="ignore") updates = [] for param, grad in zip(params, grads): p_t = param - lr * grad updates.append((param, p_t)) return updates