Creating your own algorithms
Recpack’s goal is to make it easy to set up an offline recommendation experiment to evaluate the merits of a recommendation algorithm. That is why we’ve made it super easy to create your own algorithm from scratch, or adapt from one of the many algorithm implementations included.
We have created a number of base classes that provide the necessary plumbing for your algorithm to fit within the recpack evaluation framework so that you can focus on what really matters: the algorithm itself!
We explain when and how to use these base classes by means of four example algorithms.
After reading through these examples you should be able to
Know which base class to pick for a new algorithm
Which functions to overwrite to implement the algorithm
Randomized Softmax Popularity Algorithm
Description
The Softmax Popularity Algorithm samples recommended items relative to a “temperature weighted item popularity”.
Where \(q(i)\) is the popularity score of item i, computed as the log of the amount of times the item is visited and \(\tau\) is the temperature parameter.
With a temperature of 1, we end up with the standard softmax function. For a temperature of 0 we always choose the best action, whereas high values of temperature starts to resemble uniformly random sampling. For more information on the softmax function, check out Wikipedia.
We limit our recommendations to the K most popular items, to avoid ever recommending a very unpopular item.
At prediction time, recommendations are sampled according to their probability (as defined by the softmax).
The algorithm can thus be considered randomized or stochastic: subsequent calls of the predict
method will
not necessarily yield the same results.
Implementation
We start by selecting a base class as parent.
We have no need for PyTorch, so recpack.algorithms.base.TorchMLAlgorithm
is easily discarded.
Likewise, this is neither a factorization nor item-similarity algorithm,
so we use the base class of base classes: recpack.algorithms.base.Algorithm
.
Even this base class already implements quite a bit:
fit(X)
provides a wrapper around the_fit
function we need to implement and handles type management and some additional checks.predict(X)
provides a wrapper around the_predict
function we need to implement._transform_predict_input(X)
and_transform_fit_input(X)
are used byfit
andpredict
to convert their input matrices (X) into the required types. By default, this base class transform the data into a csr_matrix, which suits our purpose perfectly as we have no need of timestamps._check_fit_complete()
is called at the end of thefit
method to make sure fitting was successful._check_prediction(X_pred, X)
is called at the end ofpredict
, to make sure the output predicted is as expected. If not it will log a warning.
All we really need to do is thus implement __init__
to set the hyper-parameters,
_fit
and _predict
.
__init__
Let’s get started, and define our class, add a docstring for reference and
implement the __init__
function.
Our algorithm has two hyper parameters:
K
: The number of items that can be considered for recommendations.tau
: The temperature parameter.
import numpy as np
from scipy.sparse import csr_matrix
from recpack.algorithms.base import Algorithm
class RandomizedSoftmaxPopularity(Algorithm):
"""Recommend items using softmax on the natural logarithm of item counts.
Recommendations are sampled from the probability distribution
created by taking the softmax of the natural logarithm of item counts.
Items are scored such that the distance between the item in first place
and the item in second place is the same as between all other items.
:param K: Only the K most frequent items are considered for recommendation
:param tau: Temperature in the softmax computation
"""
def __init__(self, K, tau):
self.K = K
self.tau = tau
_fit
Next we implement the _fit
function.
Our input is the matrix of interactions considered for training.
We compute the natural logarithm of the number of times users interacted
with the item, then take the softmax of the K most popular items.
def _fit(self, X: csr_matrix):
# compute pop by taking logarithm of the raw counts
# A1 puts it into a 1d array, making all subsequent operations easy
pop = np.log(np.sum(X, axis=0)).A1
max_pop = np.max(pop)
# Cut to top K
self.top_k_pop_items_ = np.argsort(pop)[-self.K:]
top_k_pop = pop[self.top_k_pop_items_]
# To make softmax numerically stable, we compute exp((pop - max(pop))/self.tau)
# instead of exp(pop/self.tau):
#
# softmax for item i can then be computed as
# e^((pop[i] - max(pop))/tau) / sum([e^(pop[j] - max(pop))/self.tau for j in topK])
top_k_pop_minus_max = (top_k_pop - max_pop)/self.tau
top_k_exp = np.exp(top_k_pop_minus_max)
top_k_pop_sum = np.sum(top_k_exp)
self.softmax_scores_ = top_k_exp / top_k_pop_sum
After fitting, the model is ready for prediction.
_predict
Finally we implement _predict
.
Here we sample recommendations for each user with at least one interaction
in the matrix of interactions.
Remember that sampling probabilities were stored in softmax_scores_
during fitting.
def _predict(self, X:csr_matrix):
# Randomly sample items, with weights decided by the softmax scores
users = X.nonzero()[0]
# The resulting score = (K - ix)/K
# The first sampled item gets score 1, and the last sampled item score 1/K
score_list = [
(u, i, (self.K-ix)/self.K)
for u in set(users)
for ix, i in enumerate(
np.random.choice(
self.top_k_pop_items_,
size=self.K,
replace=False,
p=self.softmax_scores_
)
)
]
user_idxs, item_idxs, scores = list(zip(*score_list))
score_matrix = csr_matrix((scores, (user_idxs, item_idxs)), shape=X.shape)
return score_matrix
This algorithm can now be used in evaluation pipelines just like any other algorithm already available in recpack!
Recency
Description
Next we create an algorithm that recommends the items that have been interacted with most recently. This algorithm can be considered a baseline, as it is not personalized.
Implementation
Again, we start from recpack.algorithms.base.Algorithm
.
This new algorithm is different from Randomized Softmax Popularity Algorithm in that
it needs the time of interaction to be able to make recommendations.
Thankfully, the recpack data format recpack.data.matrix.InteractionMatrix
has a timestamps
attribute that stores the time of interaction.
Our algorithm has no hyperparameters, so we have no use for an __init__
method.
_transform_fit_input
To make sure we receive a recpack.data.matrix.InteractionMatrix
at fitting time,
we update _transform_fit_input
.
import numpy as np
from scipy.sparse import csr_matrix, lil_matrix
from recpack.algorithms.base import Algorithm
from recpack.data.matrix import InteractionMatrix
class Recency(Algorithm):
def _transform_fit_input(self, X):
# X needs to be an InteractionMatrix for us to have access to
# the time of interaction at fitting time
self._assert_is_interaction_matrix(X)
# X needs to have timestamps available
self._assert_has_timestamps(X)
# No transformation needed
return X
_fit
Now that we have asserted that _fit
receives an object of type recpack.data.matrix.InteractionMatrix
,
we fit our algorithm by extracting for each item, its most recent time of interaction.
We then scale this to the interval [0, 1] using minmax normalisation.
def _fit(self, X:InteractionMatrix):
# X.timestamps gives a pandas MultiIndex object, indexed by user and item,
# we drop the index, and group by just the item index
# then we select the maximal timestamp from this groupby
max_ts_per_item = X.timestamps.reset_index().groupby('iid')['ts'].max()
# apply min_max normalisation
recency = np.zeros(X.shape[1])
recency[max_ts_per_item.index] = max_ts_per_item.values
most_recent = np.max(recency)
least_recent = np.min(recency)
recency = (recency - least_recent) / (most_recent - least_recent)
self.recency_ = recency.copy()
At fitting time, the base class’ fit
method calls both _transform_fit_input
and
_fit
.
The model is then ready for use, with attribute self.recency_
which contains the recommendation
scores per item.
_predict
Prediction is now easy: for each nonzero user in the input matrix
we set the item’s score equal to the recency score we computed in _fit
.
def _predict(self, X: csr_matrix):
results = lil_matrix(X.shape)
users = get_users(X)
results[users] = self.recency_
return results.tocsr()
Here we go, another algorithm ready for use in evaluation!
Singular Value Decomposition
Description
Let’s now implement SVD, a well-known matrix factorization algorithm.
Singular Value Decomposition decomposes a matrix of interactions into three matrices which
when multiplied together approximately reconstruct the original matrix , \(X = U \times \Sigma \times V\).
If matrix \(X\) is of shape (|users| x |items|)
,
then \(U\) is of shape (|users| x num_components)
,
\(\Sigma\) is a (num_components x num_components)
matrix,
and finally \(V\) is a (num_components x |items|)
matrix.
Implementation
Rather than implement the SVD computation ourselves, we adapt the optimised TruncatedSVD implementation in sklearn so that it matches the recpack interface.
As the name suggests, it makes sense to use recpack.algorithms.base.FactorizationAlgorithm
as base class in this example.
In addition to the methods implemented in recpack.algorithms.base.Algorithm
which we have highlighted in Randomized Softmax Popularity Algorithm, this class provides:
_predict
generates recommendations by multiplying the user embeddings of nonzero users with all item embeddings._check_fit_complete
performs an additional check on the dimensions of the embeddings
All that remains for us to implement is __init__
to set hyperparameters and _fit
to compute the embeddings.
__init__
For simplicity we use only one hyperparameter: num_components
, which defines the dimension of the embedding.
We also add a parameter random_state
, also a parameter of TruncatedSVD
, to ensure reproducibility.
Warning
The random_state parameter should not be considered a hyperparameter, i.e. we should not perform a parameter search to determine its optimal value.
import numpy as np
from scipy.sparse import csr_matrix, lil_matrix, diags
from sklearn.decomposition import TruncatedSVD
from recpack.algorithms.base import FactorizationAlgorithm
class SVD(FactorizationAlgorithm):
"""Singular Value Decomposition as dimension reduction recommendation algorithm.
SVD computed using the TruncatedSVD implementation from sklearn.
U x Sigma x V = X
U are the user features, and the item features are computed as Sigma x V.
:param num_components: The size of the latent dimension
:type num_components: int
:param random_state: The seed for the random state to allow for comparison
:type random_state: int
"""
def __init__(self, num_components=100, random_state=42):
super().__init__(num_components=num_components)
self.random_state = random_state
_fit
In _fit
we initialize an object of type TruncatedSVD.
For simplicity’s sake we expose only num_components
in our algorithm.
All other hyperparameter are left at their default values.
SVD decomposes the matrix into three matrices, while the
recpack.algorithms.base.FactorizationAlgorithm
class expects only two:
a user and item embedding.
Therefore we take the item embedding to be the product of \(\Sigma\) and \(V\).
Since \(\Sigma\) is a square matrix this does not change the matrix dimension:
\(\Sigma \times V\) is still a (num_components x |items|)
matrix.
def _fit(self, X: csr_matrix):
model = TruncatedSVD(
n_components=self.num_components, n_iter=7, random_state=self.random_state
)
# Factorization computes U x Sigma x V
# U are the user features,
# Sigma x V are the item features.
self.user_embedding_ = model.fit_transform(X)
V = model.components_
sigma = diags(model.singular_values_)
self.item_embedding_ = sigma @ V
return self
This concludes the modification of the TruncatedSVD algorithm for use in recpack!
SillyMF (Gradient Descent Algorithm)
Description
In this example we implement a very silly, iterative matrix factorization algorithm in PyTorch. It is by no means sophisticated or even guaranteed to converge, but serves well for our illustration purposes.
The model learns the weights of a matrix factorization of the initial matrix \(X\) as \(X = U \times V^T\).
Implementation
Because we are now dealing with an algorithm optimised
by means of gradient descent, it makes sense to use recpack.algorithms.base.TorchMLAlgorithm
as base class in this example.
This base class comes with quite a bit more plumbing that the others:
_predict
generates recommendations by calling_batch_predict
for batches of users (to keep the memory footprint low)._check_fit_complete
performs an additional check of the dimensions of the embeddings._check_prediction
makes sure predictions were made for all nonzero users.fit(X, validation_data)
performs a number of training epochs, each followed by an evaluation step on the full dataset. Unlike the other base classes, it now takes an additionalvalidation_data
argument to perform this evaluation step.save
saves the current PyTorch model to disk.load
loads a PyTorch model from file.filename
generates a unique filename for the current best model._transform_predict_input
transforms the input matrix to acsr_matrix
by default._transform_fit_input
transforms the input matrices to acsr_matrix
by default._evaluate
performs one evaluation step, which consists of making predictions. for the validation data and subsequently updating the stopping criterion._load_best
loads the best model encountered during training as the final model used to make predictions._save_best
saves the best model encountered during training to a temporary file.
Which leaves __init__
, _init_model
, _train_epoch
, my_loss
and _batch_predict
to be implemented, as well as the actual PyTorch nn.Module
that is the PyTorch model.
MFModule
First we create a PyTorch model that encodes this factorization.
The forward
method is also used to make recommendations at prediction time.
from typing import List
import numpy as np
from scipy.sparse import csr_matrix, lil_matrix
import torch
import torch.optim as optim
import torch.nn as nn
from recpack.algorithms.base import TorchMLAlgorithm
from recpack.algorithms.stopping_criterion import StoppingCriterion
class MFModule(nn.Module):
"""MF torch module, encodes the embeddings and the forward functionality.
:param num_users: the amount of users
:type num_users: int
:param num_items: the amount of items
:type num_items: int
:param num_components: The size of the embedding per user and item, defaults to 100
:type num_components: int, optional
"""
def __init__(self, num_users, num_items, num_components=100):
super().__init__()
self.num_components = num_components
self.num_users = num_users
self.num_items = num_items
self.user_embedding = nn.Embedding(num_users, num_components) # User embedding
self.item_embedding = nn.Embedding(num_items, num_components) # Item embedding
self.std = 1 / num_components ** 0.5
# Initialise embeddings to a random start
nn.init.normal_(self.user_embedding.weight, std=self.std)
nn.init.normal_(self.item_embedding.weight, std=self.std)
def forward(
self, user_tensor: torch.Tensor, item_tensor: torch.Tensor
) -> torch.Tensor:
"""
Compute dot-product of user embedding (w_u) and item embedding (h_i)
for every user and item pair in user_tensor and item_tensor.
:param user_tensor: [description]
:type user_tensor: [type]
:param item_tensor: [description]
:type item_tensor: [type]
"""
w_u = self.user_embedding(user_tensor)
h_i = self.item_embedding(item_tensor)
return w_u.matmul(h_i.T)
__init__
Next, we create the actual recpack.algorithms.base.TorchMLAlgorithm
.
The __init__
of any TorchMLAlgorithm expects at least the following
default hyperparameters to be defined:
batch_size
which dictactes how many users make up a training batch.max_epochs
which defines the maximum number of training epochs to run.learning_rate
which determines how much to change the model with every update.stopping_criterion
to define how to evaluate the model’s performance, and if and when to stop early.
Other optional hyperparameters in the baseclass can be found in the documentation.
We define one additional hyperparameter:
num_components
which is the dimension of our embeddings for both users and items.
For the sake of example we use a fixed random seed. The random seed is set to guarantee reproducibility of results.
As recpack.algorithms.stopping_criterion.StoppingCriterion
we use Recall, by default computed on the top 50 recommendations.
By default, early stopping is disabled.
class SillyMF(TorchMLAlgorithm):
def __init__(self, batch_size, max_epochs, learning_rate, num_components=100):
super().__init__(
batch_size=batch_size,
max_epochs=max_epochs,
learning_rate=learning_rate,
stopping_criterion="recall",
seed=42
)
self.num_components = num_components
_init_model
Next we implement _init_model
. We cannot initialize MFModule
as part of SillyMF’s
__init__
, because at this stage, we’re unaware of the dimensions of the interaction matrix.
We then define the optimizer. Here we use simple SGD, but any PyTorch optimizer can be used.
def _init_model(self, X:csr_matrix):
num_users, num_items = X.shape
self.model_ = MFModule(
num_users, num_items, num_components=self.num_components
).to(self.device)
# We'll use a basic SGD optimiser
self.optimizer = optim.SGD(self.model_.parameters(), lr=self.learning_rate)
self.steps = 0
_train_epoch
Next we implement training. First, we need to define a loss function to indicate how well our current embeddings are able to perform at the task we set. As mentioned in the description, this task is to reconstruct the matrix \(X\). Our loss function computes the average of the absolute error between \(U \times V\) and the original matrix \(X\) per user.
Note
For an overview of commonly used loss functions, check out PyTorch loss functions.
def my_loss(true_sim, predicted_sim):
"""Computes the total absolute error from predicted compared to true,
and averages over all users
"""
return torch.mean(torch.sum(torch.abs(true_sim - predicted_sim), axis=1))
Now we can continue with _train_epoch
.
Every epoch loops through the entire dataset (most often in batches).
For every batch, the loss and resulting gradients are computed and the embeddings are updated.
def _train_epoch(self, X):
losses = []
item_tensor = torch.arange(X.shape[1]).to(self.device)
for users in get_batches(get_users(X), batch_size=self.batch_size):
self.optimizer.zero_grad()
user_tensor = torch.LongTensor(users).to(self.device)
scores = self.model_.forward(user_tensor, item_tensor)
expected_scores = naive_sparse2tensor(X[users])
loss = my_loss(expected_scores, scores)
# Backwards propagation of the loss
loss.backward()
losses.append(loss.item())
# Update the weight according to the gradients.
# All automated thanks to torch.
self.optimizer.step()
self.steps += 1
return losses
_batch_predict
Now we can move on to the final step of our implementation: prediction. Predicting items is the same here as in Singular Value Decomposition: the user embeddings of nonzero users are multiplied with item embeddings.
As mentioned in the definition of MFModule
, MFModule.forward
is used to make predictions.
This method takes a PyTorch Tensor
of userids and a Tensor
of itemids as inputs.
It then computes the matrix multiplication of its embeddings.
def _batch_predict(self, X: csr_matrix, users: List[int] = None) -> np.ndarray:
"""Predict scores for matrix X, given the selected users.
If there are no selected users, you can assume X is a full matrix,
and users can be retrieved as the nonzero indices in the X matrix.
:param X: Matrix of user item interactions
:type X: csr_matrix
:param users: users selected for recommendation
:type users: List[int]
:return: dense matrix of scores per user item pair.
:rtype: np.ndarray
"""
X_pred = lil_matrix(X.shape)
if users is None:
users = get_users(X)
# Turn the np arrays and lists to torch tensors
user_tensor = torch.LongTensor(users).to(self.device)
item_tensor = torch.arange(X.shape[1]).to(self.device)
X_pred[users] = self.model_(user_tensor, item_tensor).detach().cpu().numpy()
return X_pred.tocsr()
And that’s all for implementing SillyMF!
Compare your algorithm to the state of the art
Now that you have learned how to create your own algorithm, you obviously want to know how well it performs compared to state of the art recommendation algorithms. Recpack provides pipeline functionality, which simplifies running experiments as well as making them reproducible.
Because we want you to use your own algorithms with the recpack pipelines, we have made it easy to set up a pipeline with your own algorithm.
The first (and only) step to using a new algorithm is to make sure it is registered in the recpack.pipelines.ALGORITHM_REGISTRY
.
Registering a new algorithm is done using the register
function. This function takes two arguments: the name of the algorithm to register and the class.
from recpack.pipelines import ALGORITHM_REGISTRY
ALGORITHM_REGISTRY.register(SillyMF.__name__, SillyMF)
Once the algorithm is registered, you can use it when constructing a pipeline. As an example we will compare the SillyMF algorithm to an ItemKNN algorithm, and the EASE algorithm.
from recpack.data.datasets import MovieLens25M
from recpack.pipelines import PipelineBuilder
from recpack.splitters.scenarios import StrongGeneralization
from recpack.pipelines import ALGORITHM_REGISTRY
ALGORITHM_REGISTRY.register(SillyMF.__name__, SillyMF)
# Get data to test on
dataset = MovieLens25M("data/ml25.csv", use_default_filters=False)
# This will apply default preprocessing
im = dataset.load_interaction_matrix()
# Data splitting scenario
scenario = StrongGeneralization(frac_users_train=0.7, frac_interactions_in=0.8, validation=True)
# Construct our pipeline object
pipeline_builder = PipelineBuilder()
pipeline_builder.set_train_data(scenario.training_data)
pipeline_builder.set_test_data(scenario.test_data)
pipeline_builder.set_validation_data(scenario.validation_data)
# Add the baseline algorithms
# Grid parameters will be optimised using grid search before final evaluation
pipeline_builder.add_algorithm('ItemKNN', grid={'K': [100, 200, 400, 800]})
pipeline_builder.add_algorithm('EASE', grid={'l2': [10, 100, 1000], 'alpha': [0, 0.1, 0.5]})
# Add our new algorithm
# Optimising learning rate and num_components
# setting fixed values for max_epochs and batch_size
pipeline_builder.add_algorithm(
'SillyMF',
grid={
'learning_rate': [0.1, 0.01, 0.3],
'num_components': [100, 200, 400]
},
params={
'max_epochs': 5,
'batch_size': 1024
}
)
# Add NDCG and Recall to be evaluated at 10, 20, 50 and 100
pipeline_builder.add_metric('NDCGK', [10, 20, 50, 100])
pipeline_builder.add_metric('RecallK', [10, 20, 50, 100])
# Set the optimisation metric, this metric will be used to select the best values from grid for each algorithm
pipeline_builder.set_optimisation_metric('RecallK', 20)
# Construct pipeline
pipeline = pipeline_builder.build()
# Run pipeline, will first do optimisation, and then evaluation
pipeline.run()
# Get the metric results.
# This will be a dict with the results of the run.
# Turning it into a dataframe makes reading easier
pd.DataFrame.from_dict(pipeline.get_metrics()).T
And there you have it, hopefully the new algorithm is better than the baseline algorithms!
For more information on how to use pipelines, see recpack.pipelines
.