Creating your own algorithms

Recpack’s goal is to make it easy to set up an offline recommendation experiment to evaluate the merits of a recommendation algorithm. That is why we’ve made it super easy to create your own algorithm from scratch, or adapt from one of the many algorithm implementations included.

We have created a number of base classes that provide the necessary plumbing for your algorithm to fit within the recpack evaluation framework so that you can focus on what really matters: the algorithm itself!

We explain when and how to use these base classes by means of four example algorithms.

After reading through these examples you should be able to

  • Know which base class to pick for a new algorithm

  • Which functions to overwrite to implement the algorithm

Randomized Softmax Popularity Algorithm

Description

The Softmax Popularity Algorithm samples recommended items relative to a “temperature weighted item popularity”.

\[P(i) = \frac{exp(q(i)/\tau)}{\sum\limits_{j \in I} exp(q(i)/\tau)}\]

Where \(q(i)\) is the popularity score of item i, computed as the log of the amount of times the item is visited and \(\tau\) is the temperature parameter.

With a temperature of 1, we end up with the standard softmax function. For a temperature of 0 we always choose the best action, whereas high values of temperature starts to resemble uniformly random sampling. For more information on the softmax function, check out Wikipedia.

We limit our recommendations to the K most popular items, to avoid ever recommending a very unpopular item.

At prediction time, recommendations are sampled according to their probability (as defined by the softmax). The algorithm can thus be considered randomized or stochastic: subsequent calls of the predict method will not necessarily yield the same results.

Implementation

We start by selecting a base class as parent. We have no need for PyTorch, so recpack.algorithms.base.TorchMLAlgorithm is easily discarded. Likewise, this is neither a factorization nor item-similarity algorithm, so we use the base class of base classes: recpack.algorithms.base.Algorithm.

Even this base class already implements quite a bit:

  • fit(X) provides a wrapper around the _fit function we need to implement and handles type management and some additional checks.

  • predict(X) provides a wrapper around the _predict function we need to implement.

  • _transform_predict_input(X) and _transform_fit_input(X) are used by fit and predict to convert their input matrices (X) into the required types. By default, this base class transform the data into a csr_matrix, which suits our purpose perfectly as we have no need of timestamps.

  • _check_fit_complete() is called at the end of the fit method to make sure fitting was successful.

  • _check_prediction(X_pred, X) is called at the end of predict, to make sure the output predicted is as expected. If not it will log a warning.

All we really need to do is thus implement __init__ to set the hyper-parameters, _fit and _predict.

__init__

Let’s get started, and define our class, add a docstring for reference and implement the __init__ function.

Our algorithm has two hyper parameters:

  • K: The number of items that can be considered for recommendations.

  • tau: The temperature parameter.

import numpy as np
from scipy.sparse import csr_matrix

from recpack.algorithms.base import Algorithm

class RandomizedSoftmaxPopularity(Algorithm):
    """Recommend items using softmax on the natural logarithm of item counts.

    Recommendations are sampled from the probability distribution
    created by taking the softmax of the natural logarithm of item counts.
    Items are scored such that the distance between the item in first place
    and the item in second place is the same as between all other items.

    :param K: Only the K most frequent items are considered for recommendation
    :param tau: Temperature in the softmax computation
    """
    def __init__(self, K, tau):
        self.K = K
        self.tau = tau

_fit

Next we implement the _fit function. Our input is the matrix of interactions considered for training. We compute the natural logarithm of the number of times users interacted with the item, then take the softmax of the K most popular items.

def _fit(self, X: csr_matrix):
    # compute pop by taking logarithm of the raw counts
    # A1 puts it into a 1d array, making all subsequent operations easy
    pop = np.log(np.sum(X, axis=0)).A1

    max_pop = np.max(pop)

    # Cut to top K
    self.top_k_pop_items_ = np.argsort(pop)[-self.K:]
    top_k_pop = pop[self.top_k_pop_items_]

    # To make softmax numerically stable, we compute exp((pop - max(pop))/self.tau)
    # instead of exp(pop/self.tau):
    #
    # softmax for item i can then be computed as
    # e^((pop[i] - max(pop))/tau) / sum([e^(pop[j] - max(pop))/self.tau for j in topK])
    top_k_pop_minus_max = (top_k_pop - max_pop)/self.tau

    top_k_exp = np.exp(top_k_pop_minus_max)

    top_k_pop_sum = np.sum(top_k_exp)

    self.softmax_scores_ = top_k_exp / top_k_pop_sum

After fitting, the model is ready for prediction.

_predict

Finally we implement _predict. Here we sample recommendations for each user with at least one interaction in the matrix of interactions. Remember that sampling probabilities were stored in softmax_scores_ during fitting.

def _predict(self, X:csr_matrix):
    # Randomly sample items, with weights decided by the softmax scores
    users = X.nonzero()[0]

    # The resulting score = (K - ix)/K
    # The first sampled item gets score 1, and the last sampled item score 1/K
    score_list = [
        (u, i, (self.K-ix)/self.K)
        for u in set(users)
        for ix, i in enumerate(
            np.random.choice(
                self.top_k_pop_items_,
                size=self.K,
                replace=False,
                p=self.softmax_scores_
            )
        )
    ]
    user_idxs, item_idxs, scores = list(zip(*score_list))
    score_matrix = csr_matrix((scores, (user_idxs, item_idxs)), shape=X.shape)

    return score_matrix

This algorithm can now be used in evaluation pipelines just like any other algorithm already available in recpack!

Recency

Description

Next we create an algorithm that recommends the items that have been interacted with most recently. This algorithm can be considered a baseline, as it is not personalized.

Implementation

Again, we start from recpack.algorithms.base.Algorithm. This new algorithm is different from Randomized Softmax Popularity Algorithm in that it needs the time of interaction to be able to make recommendations. Thankfully, the recpack data format recpack.data.matrix.InteractionMatrix has a timestamps attribute that stores the time of interaction.

Our algorithm has no hyperparameters, so we have no use for an __init__ method.

_transform_fit_input

To make sure we receive a recpack.data.matrix.InteractionMatrix at fitting time, we update _transform_fit_input.

import numpy as np
from scipy.sparse import csr_matrix, lil_matrix

from recpack.algorithms.base import Algorithm
from recpack.data.matrix import InteractionMatrix

class Recency(Algorithm):
    def _transform_fit_input(self, X):
        # X needs to be an InteractionMatrix for us to have access to
        # the time of interaction at fitting time
        self._assert_is_interaction_matrix(X)
        # X needs to have timestamps available
        self._assert_has_timestamps(X)
        # No transformation needed
        return X

_fit

Now that we have asserted that _fit receives an object of type recpack.data.matrix.InteractionMatrix, we fit our algorithm by extracting for each item, its most recent time of interaction. We then scale this to the interval [0, 1] using minmax normalisation.

def _fit(self, X:InteractionMatrix):
    # X.timestamps gives a pandas MultiIndex object, indexed by user and item,
    # we drop the index, and group by just the item index
    # then we select the maximal timestamp from this groupby
    max_ts_per_item = X.timestamps.reset_index().groupby('iid')['ts'].max()

    # apply min_max normalisation
    recency = np.zeros(X.shape[1])
    recency[max_ts_per_item.index] = max_ts_per_item.values

    most_recent = np.max(recency)
    least_recent = np.min(recency)

    recency = (recency - least_recent) / (most_recent - least_recent)
    self.recency_ = recency.copy()

At fitting time, the base class’ fit method calls both _transform_fit_input and _fit. The model is then ready for use, with attribute self.recency_ which contains the recommendation scores per item.

_predict

Prediction is now easy: for each nonzero user in the input matrix we set the item’s score equal to the recency score we computed in _fit.

def _predict(self, X: csr_matrix):
    results = lil_matrix(X.shape)

    users = get_users(X)

    results[users] = self.recency_

    return results.tocsr()

Here we go, another algorithm ready for use in evaluation!

Singular Value Decomposition

Description

Let’s now implement SVD, a well-known matrix factorization algorithm. Singular Value Decomposition decomposes a matrix of interactions into three matrices which when multiplied together approximately reconstruct the original matrix , \(X = U \times \Sigma \times V\). If matrix \(X\) is of shape (|users| x |items|), then \(U\) is of shape (|users| x num_components), \(\Sigma\) is a (num_components x num_components) matrix, and finally \(V\) is a (num_components x |items|) matrix.

Implementation

Rather than implement the SVD computation ourselves, we adapt the optimised TruncatedSVD implementation in sklearn so that it matches the recpack interface.

As the name suggests, it makes sense to use recpack.algorithms.base.FactorizationAlgorithm as base class in this example. In addition to the methods implemented in recpack.algorithms.base.Algorithm which we have highlighted in Randomized Softmax Popularity Algorithm, this class provides:

  • _predict generates recommendations by multiplying the user embeddings of nonzero users with all item embeddings.

  • _check_fit_complete performs an additional check on the dimensions of the embeddings

All that remains for us to implement is __init__ to set hyperparameters and _fit to compute the embeddings.

__init__

For simplicity we use only one hyperparameter: num_components, which defines the dimension of the embedding. We also add a parameter random_state, also a parameter of TruncatedSVD, to ensure reproducibility.

Warning

The random_state parameter should not be considered a hyperparameter, i.e. we should not perform a parameter search to determine its optimal value.

import numpy as np
from scipy.sparse import csr_matrix, lil_matrix, diags
from sklearn.decomposition import TruncatedSVD

from recpack.algorithms.base import FactorizationAlgorithm

class SVD(FactorizationAlgorithm):
    """Singular Value Decomposition as dimension reduction recommendation algorithm.

    SVD computed using the TruncatedSVD implementation from sklearn.
    U x Sigma x V = X
    U are the user features, and the item features are computed as Sigma x V.

    :param num_components: The size of the latent dimension
    :type num_components: int

    :param random_state: The seed for the random state to allow for comparison
    :type random_state: int
    """

    def __init__(self, num_components=100, random_state=42):
        super().__init__(num_components=num_components)

        self.random_state = random_state

_fit

In _fit we initialize an object of type TruncatedSVD. For simplicity’s sake we expose only num_components in our algorithm. All other hyperparameter are left at their default values.

SVD decomposes the matrix into three matrices, while the recpack.algorithms.base.FactorizationAlgorithm class expects only two: a user and item embedding. Therefore we take the item embedding to be the product of \(\Sigma\) and \(V\). Since \(\Sigma\) is a square matrix this does not change the matrix dimension: \(\Sigma \times V\) is still a (num_components x |items|) matrix.

def _fit(self, X: csr_matrix):
    model = TruncatedSVD(
        n_components=self.num_components, n_iter=7, random_state=self.random_state
    )
    # Factorization computes U x Sigma x V
    # U are the user features,
    # Sigma x V are the item features.
    self.user_embedding_ = model.fit_transform(X)

    V = model.components_
    sigma = diags(model.singular_values_)
    self.item_embedding_ = sigma @ V

    return self

This concludes the modification of the TruncatedSVD algorithm for use in recpack!

SillyMF (Gradient Descent Algorithm)

Description

In this example we implement a very silly, iterative matrix factorization algorithm in PyTorch. It is by no means sophisticated or even guaranteed to converge, but serves well for our illustration purposes.

The model learns the weights of a matrix factorization of the initial matrix \(X\) as \(X = U \times V^T\).

Implementation

Because we are now dealing with an algorithm optimised by means of gradient descent, it makes sense to use recpack.algorithms.base.TorchMLAlgorithm as base class in this example. This base class comes with quite a bit more plumbing that the others:

  • _predict generates recommendations by calling _batch_predict for batches of users (to keep the memory footprint low).

  • _check_fit_complete performs an additional check of the dimensions of the embeddings.

  • _check_prediction makes sure predictions were made for all nonzero users.

  • fit(X, validation_data) performs a number of training epochs, each followed by an evaluation step on the full dataset. Unlike the other base classes, it now takes an additional validation_data argument to perform this evaluation step.

  • save saves the current PyTorch model to disk.

  • load loads a PyTorch model from file.

  • filename generates a unique filename for the current best model.

  • _transform_predict_input transforms the input matrix to a csr_matrix by default.

  • _transform_fit_input transforms the input matrices to a csr_matrix by default.

  • _evaluate performs one evaluation step, which consists of making predictions. for the validation data and subsequently updating the stopping criterion.

  • _load_best loads the best model encountered during training as the final model used to make predictions.

  • _save_best saves the best model encountered during training to a temporary file.

Which leaves __init__, _init_model, _train_epoch, my_loss and _batch_predict to be implemented, as well as the actual PyTorch nn.Module that is the PyTorch model.

MFModule

First we create a PyTorch model that encodes this factorization. The forward method is also used to make recommendations at prediction time.

from typing import List

import numpy as np
from scipy.sparse import csr_matrix, lil_matrix
import torch
import torch.optim as optim
import torch.nn as nn

from recpack.algorithms.base import TorchMLAlgorithm
from recpack.algorithms.stopping_criterion import StoppingCriterion


class MFModule(nn.Module):
    """MF torch module, encodes the embeddings and the forward functionality.

    :param num_users: the amount of users
    :type num_users: int
    :param num_items: the amount of items
    :type num_items: int
    :param num_components: The size of the embedding per user and item, defaults to 100
    :type num_components: int, optional
    """

    def __init__(self, num_users, num_items, num_components=100):
        super().__init__()

        self.num_components = num_components
        self.num_users = num_users
        self.num_items = num_items

        self.user_embedding = nn.Embedding(num_users, num_components)  # User embedding
        self.item_embedding = nn.Embedding(num_items, num_components)  # Item embedding

        self.std = 1 / num_components ** 0.5
        # Initialise embeddings to a random start
        nn.init.normal_(self.user_embedding.weight, std=self.std)
        nn.init.normal_(self.item_embedding.weight, std=self.std)

    def forward(
        self, user_tensor: torch.Tensor, item_tensor: torch.Tensor
    ) -> torch.Tensor:
        """
        Compute dot-product of user embedding (w_u) and item embedding (h_i)
        for every user and item pair in user_tensor and item_tensor.

        :param user_tensor: [description]
        :type user_tensor: [type]
        :param item_tensor: [description]
        :type item_tensor: [type]
        """
        w_u = self.user_embedding(user_tensor)
        h_i = self.item_embedding(item_tensor)

        return w_u.matmul(h_i.T)

__init__

Next, we create the actual recpack.algorithms.base.TorchMLAlgorithm. The __init__ of any TorchMLAlgorithm expects at least the following default hyperparameters to be defined:

  • batch_size which dictactes how many users make up a training batch.

  • max_epochs which defines the maximum number of training epochs to run.

  • learning_rate which determines how much to change the model with every update.

  • stopping_criterion to define how to evaluate the model’s performance, and if and when to stop early.

Other optional hyperparameters in the baseclass can be found in the documentation.

We define one additional hyperparameter:

  • num_components which is the dimension of our embeddings for both users and items.

For the sake of example we use a fixed random seed. The random seed is set to guarantee reproducibility of results.

As recpack.algorithms.stopping_criterion.StoppingCriterion we use Recall, by default computed on the top 50 recommendations. By default, early stopping is disabled.

class SillyMF(TorchMLAlgorithm):
    def __init__(self, batch_size, max_epochs, learning_rate, num_components=100):
        super().__init__(
            batch_size=batch_size,
            max_epochs=max_epochs,
            learning_rate=learning_rate,
            stopping_criterion="recall",
            seed=42
        )
        self.num_components = num_components

_init_model

Next we implement _init_model. We cannot initialize MFModule as part of SillyMF’s __init__, because at this stage, we’re unaware of the dimensions of the interaction matrix.

We then define the optimizer. Here we use simple SGD, but any PyTorch optimizer can be used.

def _init_model(self, X:csr_matrix):
    num_users, num_items = X.shape
    self.model_ = MFModule(
        num_users, num_items, num_components=self.num_components
    ).to(self.device)

    # We'll use a basic SGD optimiser
    self.optimizer = optim.SGD(self.model_.parameters(), lr=self.learning_rate)
    self.steps = 0

_train_epoch

Next we implement training. First, we need to define a loss function to indicate how well our current embeddings are able to perform at the task we set. As mentioned in the description, this task is to reconstruct the matrix \(X\). Our loss function computes the average of the absolute error between \(U \times V\) and the original matrix \(X\) per user.

Note

For an overview of commonly used loss functions, check out PyTorch loss functions.

def my_loss(true_sim, predicted_sim):
    """Computes the total absolute error from predicted compared to true,
    and averages over all users
    """
    return torch.mean(torch.sum(torch.abs(true_sim - predicted_sim), axis=1))

Now we can continue with _train_epoch. Every epoch loops through the entire dataset (most often in batches). For every batch, the loss and resulting gradients are computed and the embeddings are updated.

def _train_epoch(self, X):
    losses = []
    item_tensor = torch.arange(X.shape[1]).to(self.device)
    for users in get_batches(get_users(X), batch_size=self.batch_size):
        self.optimizer.zero_grad()
        user_tensor = torch.LongTensor(users).to(self.device)
        scores = self.model_.forward(user_tensor, item_tensor)
        expected_scores = naive_sparse2tensor(X[users])
        loss = my_loss(expected_scores, scores)

        # Backwards propagation of the loss
        loss.backward()
        losses.append(loss.item())
        # Update the weight according to the gradients.
        # All automated thanks to torch.
        self.optimizer.step()
        self.steps += 1
    return losses

_batch_predict

Now we can move on to the final step of our implementation: prediction. Predicting items is the same here as in Singular Value Decomposition: the user embeddings of nonzero users are multiplied with item embeddings.

As mentioned in the definition of MFModule, MFModule.forward is used to make predictions. This method takes a PyTorch Tensor of userids and a Tensor of itemids as inputs. It then computes the matrix multiplication of its embeddings.

def _batch_predict(self, X: csr_matrix, users: List[int] = None) -> np.ndarray:
    """Predict scores for matrix X, given the selected users.

    If there are no selected users, you can assume X is a full matrix,
    and users can be retrieved as the nonzero indices in the X matrix.

    :param X: Matrix of user item interactions
    :type X: csr_matrix
    :param users: users selected for recommendation
    :type users: List[int]
    :return: dense matrix of scores per user item pair.
    :rtype: np.ndarray
    """
    X_pred = lil_matrix(X.shape)
    if users is None:
        users = get_users(X)

    # Turn the np arrays and lists to torch tensors
    user_tensor = torch.LongTensor(users).to(self.device)
    item_tensor = torch.arange(X.shape[1]).to(self.device)

    X_pred[users] = self.model_(user_tensor, item_tensor).detach().cpu().numpy()
    return X_pred.tocsr()

And that’s all for implementing SillyMF!

Compare your algorithm to the state of the art

Now that you have learned how to create your own algorithm, you obviously want to know how well it performs compared to state of the art recommendation algorithms. Recpack provides pipeline functionality, which simplifies running experiments as well as making them reproducible.

Because we want you to use your own algorithms with the recpack pipelines, we have made it easy to set up a pipeline with your own algorithm.

The first (and only) step to using a new algorithm is to make sure it is registered in the recpack.pipelines.ALGORITHM_REGISTRY. Registering a new algorithm is done using the register function. This function takes two arguments: the name of the algorithm to register and the class.

from recpack.pipelines import ALGORITHM_REGISTRY

ALGORITHM_REGISTRY.register(SillyMF.__name__, SillyMF)

Once the algorithm is registered, you can use it when constructing a pipeline. As an example we will compare the SillyMF algorithm to an ItemKNN algorithm, and the EASE algorithm.

from recpack.data.datasets import MovieLens25M
from recpack.pipelines import PipelineBuilder
from recpack.splitters.scenarios import StrongGeneralization
from recpack.pipelines import ALGORITHM_REGISTRY

ALGORITHM_REGISTRY.register(SillyMF.__name__, SillyMF)

# Get data to test on
dataset = MovieLens25M("data/ml25.csv", use_default_filters=False)
# This will apply default preprocessing
im = dataset.load_interaction_matrix()

# Data splitting scenario
scenario = StrongGeneralization(frac_users_train=0.7, frac_interactions_in=0.8, validation=True)

# Construct our pipeline object
pipeline_builder = PipelineBuilder()

pipeline_builder.set_train_data(scenario.training_data)
pipeline_builder.set_test_data(scenario.test_data)
pipeline_builder.set_validation_data(scenario.validation_data)

# Add the baseline algorithms
# Grid parameters will be optimised using grid search before final evaluation
pipeline_builder.add_algorithm('ItemKNN', grid={'K': [100, 200, 400, 800]})
pipeline_builder.add_algorithm('EASE', grid={'l2': [10, 100, 1000], 'alpha': [0, 0.1, 0.5]})

# Add our new algorithm
# Optimising learning rate and num_components
# setting fixed values for max_epochs and batch_size
pipeline_builder.add_algorithm(
    'SillyMF',
    grid={
        'learning_rate': [0.1, 0.01, 0.3],
        'num_components': [100, 200, 400]
    },
    params={
        'max_epochs': 5,
        'batch_size': 1024
    }
)

# Add NDCG and Recall to be evaluated at 10, 20, 50 and 100
pipeline_builder.add_metric('NDCGK', [10, 20, 50, 100])
pipeline_builder.add_metric('RecallK', [10, 20, 50, 100])

# Set the optimisation metric, this metric will be used to select the best values from grid for each algorithm
pipeline_builder.set_optimisation_metric('RecallK', 20)

# Construct pipeline
pipeline = pipeline_builder.build()

# Run pipeline, will first do optimisation, and then evaluation
pipeline.run()

# Get the metric results.
# This will be a dict with the results of the run.
# Turning it into a dataframe makes reading easier
pd.DataFrame.from_dict(pipeline.get_metrics()).T

And there you have it, hopefully the new algorithm is better than the baseline algorithms! For more information on how to use pipelines, see recpack.pipelines.