Source code for pycmtensor.pycmtensor

# pycmtensor.py
"""PyCMTensor main module"""
from collections import OrderedDict
from time import perf_counter
from typing import Union

import aesara.tensor as aet
import dill as pickle
import numpy as np
from aesara import function
from aesara.tensor.var import TensorVariable

from pycmtensor import config, rng

from .expressions import Beta, ExpressionParser, Weight
from .functions import bhhh, errors, gnorm, hessians
from .logger import debug, info, warning
from .optimizers import Adam, Optimizer
from .results import Results
from .utils import time_format


[docs]class PyCMTensorModel: def __init__(self, db, **kwargs): """Base model class object""" self.name = "PyCMTensorModel" self.config = config self.params = [] # keep track of all the params self.betas = [] # keep track of the Betas self.weights = [] # keep track of the Weights self.updates = [] # keep track of the updates self.inputs = db.all self.learning_rate = aet.scalar("learning_rate") self.optimizer = Adam self.results = Results() for key, value in kwargs.items(): if key == "optimizer" or key == "opt": if not isinstance(value, Optimizer): raise TypeError(f"invalid optimizer {value}") self.optimizer = value debug(f"Building model...")
[docs] def add_params(self, params: Union[dict, list]): """Method to load locally defined variables Args: params (Union[dict, list]): a dict or list of ``TensorSharedVariable`` """ if not isinstance(params, (dict, list)): raise TypeError(f"params must be of Type dict or list") # create a dict of str(param): param, if params is given as a list if isinstance(params, list): params = {str(p): p for p in params} # iterate through the dict of params: if not hasattr(self, "cost"): raise ValueError(f"No valid cost function defined.") symbols = ExpressionParser().parse(getattr(self, "cost")) seen = set() for _, p in params.items(): if not isinstance(p, (Beta, Weight)): continue if p.name in seen: continue # raise NameError(f"Duplicate param names defined: {p.name}.") if (p.name not in symbols) and (p.status == 0): warning(f"Unused Beta {p.name} removed from computational graph") continue self.params.append(p) seen.add(p.name) if isinstance(p, (Beta)): self.betas.append(p) else: self.weights.append(p)
[docs] def add_regularizers(self, l_reg: TensorVariable): """Adds regularizer to model cost function Args: l_reg (TensorVariable): symbolic variable defining the regularizer term """ if not hasattr(self, "cost"): raise ValueError("No valid cost function defined.") self.cost += l_reg
@property
[docs] def n_params(self): """Get the total number of parameters""" return self.n_betas + sum(self.n_weights)
@property
[docs] def n_betas(self): """Get the count of Beta parameters""" return len(self.betas)
@property
[docs] def n_weights(self): """Get the total number of elements of each weight matrix""" return [w.size for w in self.get_weights()]
[docs] def get_betas(self) -> dict: """Returns the Beta (key, value) pairs as a dict""" return {beta.name: beta.get_value() for beta in self.betas}
[docs] def get_weights(self) -> list[np.ndarray]: """Returns the Weights as a list of matrices""" return [w.get_value() for w in self.weights]
[docs] def reset_values(self): """Resets param values to their initial values""" for p in self.params: p.reset_value()
[docs] def model_loglikelihood(self): """Loads the function to ``self.loglikelihood()`` to output the loglikelihood value of the model given inputs""" self.loglikelihood = function( name="loglikelihood", inputs=self.inputs, outputs=self.ll,
)
[docs] def model_choice_probabilities(self): """Loads the function to ``self.choice_probabilities()`` to output discrete choice probabilities. Axes of outputs are swapped""" self.choice_probabilities = function( name="choice probabilities", inputs=self.inputs, outputs=self.p_y_given_x.swapaxes(0, 1),
)
[docs] def model_choice_predictions(self): """Loads the function to ``self.choice_predictions()`` to output discrete choice predictions""" self.choice_predictions = function( name="choice predictions", inputs=self.inputs, outputs=self.pred,
)
[docs] def model_prediction_error(self): """Loads the function to ``self.prediction_error()`` to output the model error wrt inputs""" self.prediction_error = function( name="prediction error", inputs=self.inputs, outputs=errors(self.p_y_given_x, self.y),
)
[docs] def model_H(self): """Loads the function to ``self.H()`` to calculate the Hessian matrix or the 2nd-order partial derivatives of the model. """ self.H = function( name="Hessian matrix", inputs=self.inputs, outputs=hessians(self.ll, self.betas),
)
[docs] def model_BHHH(self): """Loads the function to ``self.BHHH()`` to calculate the Berndt-Hall-Hall- Hausman (BHHH) approximation. """ self.BHHH = function( name="BHHH matrix", inputs=self.inputs, outputs=bhhh(self.ll, self.betas),
)
[docs] def model_gnorm(self): """Loads the function to ``self.gradient_norm()`` to calculate the gradient norm of the model cost function. """ self.gradient_norm = function( name="Gradient norm", inputs=self.inputs, outputs=gnorm(self.cost, self.betas),
)
[docs] def predict(self, db, return_choices: bool = True): """Returns the predicted choice or choice probabilites Args: db (pycmtensor.Data): database for prediction return_choices (bool): if `True` then returns discrete choices instead of probabilities Returns: numpy.ndarray: the output vector """ if not return_choices: return self.choice_probabilities(*db.valid_data) else: return self.choice_predictions(*db.valid_data)
[docs] def train(self, db, **kwargs): """Function to train the model Args: db (pycmtensor.Data): database used to train the model **kwargs: keyword arguments for adjusting training configuration. Possible values are `max_steps:int`, `patience:int`, `lr_scheduler:scheduler.Scheduler`, `batch_size:int`. For more information and other possible options, see :py:data:`hyperparameters <pycmtensor.config.Config.hyperparameters>` """ self.config.set_hyperparameter(**kwargs) # [train-start] lr_scheduler = self.config["lr_scheduler"] batch_size = self.config["batch_size"] max_steps = self.config["max_steps"] patience = self.config["patience"] patience_increase = self.config["patience_increase"] validation_threshold = self.config["validation_threshold"] db.n_train_batches = db.n_train_samples // batch_size validation_frequency = db.n_train_batches max_iterations = max_steps * db.n_train_batches # initalize results array self.results.performance_graph = OrderedDict() # compute the inital results of the model self.results.init_loglikelihood = self.loglikelihood(*db.train_data) self.results.best_loglikelihood = self.results.init_loglikelihood self.results.best_valid_error = self.prediction_error(*db.valid_data) # loop parameters done_looping = False step = 0 iteration = 0 shift = 0 # set learning rate learning_rate = lr_scheduler(step) # main loop start_time = perf_counter() info(f"Start (n={db.n_train_samples})") while (step < max_steps) and (not done_looping): # loop over batch learning_rate = lr_scheduler(step) for index in range(db.n_train_batches): if patience <= iteration: done_looping = True debug(f"Early stopping... (step={step})") break # increment iteration iteration += 1 # set index and shift slices if self.config["batch_shuffle"]: index = rng.integers(0, db.n_train_batches) shift = rng.integers(0, batch_size) # get data slice from dataset batch_data = db.get_train_data(self.inputs, index, batch_size, shift) # model update step self.update_wrt_cost(*batch_data, learning_rate) # model validate step if iteration % validation_frequency != 0: continue train_ll = self.loglikelihood(*db.train_data) valid_error = self.prediction_error(*db.valid_data) self.results.performance_graph[step] = ( np.round(train_ll, 2), np.round(valid_error, 4), ) if valid_error >= self.results.best_valid_error: continue msg = f"Best validation error = {valid_error*100:.3f}%, (s={step}, i={iteration}, p={patience}, ll={train_ll:.2f})" debug(msg) self.results.best_step = step self.results.best_iteration = iteration self.results.best_loglikelihood = train_ll self.results.best_valid_error = valid_error self.results.betas = self.betas self.results.weights = self.weights # increase patience if past validation threshold if train_ll > (self.results.best_loglikelihood / validation_threshold): continue patience = min( max(patience, iteration * patience_increase), max_iterations ) # increment step step += 1 train_time = round(perf_counter() - start_time, 3) self.results.train_time = time_format(train_time) self.results.iterations_per_sec = round(iteration / train_time, 2) msg = f"End (t={self.results.train_time}, VE={self.results.best_valid_error*100:.2f}%, LL={self.results.best_loglikelihood:.2f}, Step={self.results.best_step})" info(msg) self.betas = self.results.betas self.weights = self.results.weights self.results.n_train_samples = db.n_train_samples self.results.n_valid_samples = db.n_valid_samples self.results.n_params = self.n_params self.results.seed = self.config["seed"] self.results.lr_history_graph = self.config["lr_scheduler"].history # statistical analysis step self.results.gnorm = self.gradient_norm(*db.train_data) self.results.hessian_matrix = self.H(*db.train_data) self.results.bhhh_matrix = self.BHHH(*db.train_data)
[docs] def export_to_pickle(self, f): """to be removed in 1.4.0""" model = self pickle.dump(model, f)
def __str__(self): return f"{self.name}"