Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Dealing with AvalancheDatasets
Understand the Avalanche Package Structure
Make it Custom, Make it Yours
!pip install avalanche-lib==0.3.1How to Contribute Back to the Avalanche Community
Benchmarks and DatasetCode Examples
Frequently Asked Questions
Avalanche
├── Benchmarks
│ ├── Classic
│ ├── Datasets
│ ├── Generators
│ ├── Scenarios
│ └── Utils
├── Evaluation
│ ├── Metrics
│ ├── Tensorboard
| └── Utils
├── Training
│ ├── Strategies
│ ├── Plugins
| └── Utils
├── Models
└── Loggers
Examples for the Loggers module offered in Avalanche
Examples for the Models module offered in Avalanche
Help us Design Avalanche of the Future
Help us Find Bug in Avalanche
To get Answers of Life, Ask Questions
We are all ears!
Protocols and Metrics Code Examples
import torch
from torch.nn import CrossEntropyLoss
from torch.optim import SGD
from avalanche.benchmarks.classic import PermutedMNIST
from avalanche.training.plugins import EvaluationPlugin
from avalanche.evaluation.metrics import accuracy_metrics
from avalanche.models import SimpleMLP
from avalanche.training.supervised import Naive
# Config
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# model
model = SimpleMLP(num_classes=10)
# CL Benchmark Creation
perm_mnist = PermutedMNIST(n_experiences=3)
train_stream = perm_mnist.train_stream
test_stream = perm_mnist.test_stream
# Prepare for training & testing
optimizer = SGD(model.parameters(), lr=0.001, momentum=0.9)
criterion = CrossEntropyLoss()
eval_plugin = EvaluationPlugin(
accuracy_metrics(minibatch=True, epoch=True, epoch_running=True,
experience=True, stream=True))
# Continual learning strategy
cl_strategy = Naive(
model, optimizer, criterion, train_mb_size=32, train_epochs=2,
eval_mb_size=32, evaluator=eval_plugin, device=device)
# train and test loop
results = []
for train_task in train_stream:
cl_strategy.train(train_task, num_workers=4)
results.append(cl_strategy.eval(test_stream))import torch
import torch.nn as nn
from torch.nn import CrossEntropyLoss
from torch.optim import SGD
from torchvision import transforms
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor, RandomCrop
from torch.utils.data import DataLoader
import numpy as np
from copy import copy
# Config
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# model
class SimpleMLP(nn.Module):
def __init__(self, num_classes=10, input_size=28*28):
super(SimpleMLP, self).__init__()
self.features = nn.Sequential(
nn.Linear(input_size, 512),
nn.ReLU(inplace=True),
nn.Dropout(),
)
self.classifier = nn.Linear(512, num_classes)
self._input_size = input_size
def forward(self, x):
x = x.contiguous()
x = x.view(x.size(0), self._input_size)
x = self.features(x)
x = self.classifier(x)
return x
model = SimpleMLP(num_classes=10)
# CL Benchmark Creation
list_train_dataset = []
list_test_dataset = []
rng_permute = np.random.RandomState(0)
train_transform = transforms.Compose([
RandomCrop(28, padding=4),
ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
test_transform = transforms.Compose([
ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
# permutation transformation
class PixelsPermutation(object):
def __init__(self, index_permutation):
self.permutation = index_permutation
def __call__(self, x):
return x.view(-1)[self.permutation].view(1, 28, 28)
def get_permutation():
return torch.from_numpy(rng_permute.permutation(784)).type(torch.int64)
# for every incremental step
permutations = []
for i in range(3):
# choose a random permutation of the pixels in the image
idx_permute = get_permutation()
current_perm = PixelsPermutation(idx_permute)
permutations.append(idx_permute)
# add the permutation to the default dataset transformation
train_transform_list = train_transform.transforms.copy()
train_transform_list.append(current_perm)
new_train_transform = transforms.Compose(train_transform_list)
test_transform_list = test_transform.transforms.copy()
test_transform_list.append(current_perm)
new_test_transform = transforms.Compose(test_transform_list)
# get the datasets with the constructed transformation
permuted_train = MNIST(root='./data/mnist',
download=True, transform=new_train_transform)
permuted_test = MNIST(root='./data/mnist',
train=False,
download=True, transform=new_test_transform)
list_train_dataset.append(permuted_train)
list_test_dataset.append(permuted_test)
# Train
optimizer = SGD(model.parameters(), lr=0.001, momentum=0.9)
criterion = CrossEntropyLoss()
for task_id, train_dataset in enumerate(list_train_dataset):
train_data_loader = DataLoader(
train_dataset, num_workers=4, batch_size=32)
for ep in range(2):
for iteration, (train_mb_x, train_mb_y) in enumerate(train_data_loader):
optimizer.zero_grad()
train_mb_x = train_mb_x.to(device)
train_mb_y = train_mb_y.to(device)
# Forward
logits = model(train_mb_x)
# Loss
loss = criterion(logits, train_mb_y)
# Backward
loss.backward()
# Update
optimizer.step()
# Test
acc_results = []
for task_id, test_dataset in enumerate(list_test_dataset):
test_data_loader = DataLoader(
test_dataset, num_workers=4, batch_size=32)
correct = 0
for iteration, (test_mb_x, test_mb_y) in enumerate(test_data_loader):
# Move mini-batch data to device
test_mb_x = test_mb_x.to(device)
test_mb_y = test_mb_y.to(device)
# Forward
test_logits = model(test_mb_x)
# Loss
test_loss = criterion(test_logits, test_mb_y)
# compute acc
correct += test_mb_y.eq(test_logits.argmax(dim=1)).sum().item()
acc_results.append(correct / len(test_dataset))@InProceedings{lomonaco2021avalanche,
title={Avalanche: an End-to-End Library for Continual Learning},
author={Vincenzo Lomonaco and Lorenzo Pellegrini and Andrea Cossu and Antonio Carta and Gabriele Graffieti and Tyler L. Hayes and Matthias De Lange and Marc Masana and Jary Pomponi and Gido van de Ven and Martin Mundt and Qi She and Keiland Cooper and Jeremy Forest and Eden Belouadah and Simone Calderara and German I. Parisi and Fabio Cuzzolin and Andreas Tolias and Simone Scardapane and Luca Antiga and Subutai Amhad and Adrian Popescu and Christopher Kanan and Joost van de Weijer and Tinne Tuytelaars and Davide Bacciu and Davide Maltoni},
booktitle={Proceedings of IEEE Conference on Computer Vision and Pattern Recognition},
series={2nd Continual Learning in Computer Vision Workshop},
year={2021}
}pip install avalanche-libpip install avalanche-lib[all]pip install avalanche-lib[extra] # supports for specific functionalities (e.g. specific strategies)
pip install avalanche-lib[rl] # reinforcement learning support
pip install avalanche-lib[detection] # object detection supportpip install git+https://github.com/ContinualAI/avalanche.git# choose your python version
python="3.8"
# Step 1
git clone https://github.com/ContinualAI/avalanche.git
cd avalanche
conda create -n avalanche-env python=$python -c conda-forge
conda activate avalanche-env
# Step 2
# Istall Pytorch with Conda (instructions here: https://pytorch.org/)
# Step 3
conda env update --file environment.yml# choose you python version
python="3.8"
# Step 1
git clone https://github.com/ContinualAI/avalanche.git
cd avalanche
conda create -n avalanche-dev-env python=$python -c conda-forge
conda activate avalanche-dev-env
# Step 2
# Istall Pytorch with Conda (instructions here: https://pytorch.org/)
# Step 3
conda env update --file environment-dev.ymlpycodestyle avalanche tests examples
python -m unittest discover tests -vUSE_GPU=True python -m unittest discover tests -vUSE_GPU=False FAST_TEST=True python -m unittest discover tests -vfrom torch.optim import SGD
from torch.nn import CrossEntropyLoss
from avalanche.benchmarks.classic import SplitMNIST
from avalanche.evaluation.metrics import forgetting_metrics, accuracy_metrics, \
loss_metrics, timing_metrics, cpu_usage_metrics, confusion_matrix_metrics, disk_usage_metrics
from avalanche.models import SimpleMLP
from avalanche.logging import InteractiveLogger, TextLogger, TensorboardLogger
from avalanche.training.plugins import EvaluationPlugin
from avalanche.training.supervised import Naive
scenario = SplitMNIST(n_experiences=5)
# MODEL CREATION
model = SimpleMLP(num_classes=scenario.n_classes)
# DEFINE THE EVALUATION PLUGIN and LOGGERS
# The evaluation plugin manages the metrics computation.
# It takes as argument a list of metrics, collectes their results and returns
# them to the strategy it is attached to.
# log to Tensorboard
tb_logger = TensorboardLogger()
# log to text file
text_logger = TextLogger(open('log.txt', 'a'))
# print to stdout
interactive_logger = InteractiveLogger()
eval_plugin = EvaluationPlugin(
accuracy_metrics(minibatch=True, epoch=True, experience=True, stream=True),
loss_metrics(minibatch=True, epoch=True, experience=True, stream=True),
timing_metrics(epoch=True, epoch_running=True),
forgetting_metrics(experience=True, stream=True),
cpu_usage_metrics(experience=True),
confusion_matrix_metrics(num_classes=scenario.n_classes, save_image=False,
stream=True),
disk_usage_metrics(minibatch=True, epoch=True, experience=True, stream=True),
loggers=[interactive_logger, text_logger, tb_logger]
)
# CREATE THE STRATEGY INSTANCE (NAIVE)
cl_strategy = Naive(
model, SGD(model.parameters(), lr=0.001, momentum=0.9),
CrossEntropyLoss(), train_mb_size=500, train_epochs=1, eval_mb_size=100,
evaluator=eval_plugin)
# TRAINING LOOP
print('Starting experiment...')
results = []
for experience in scenario.train_stream:
print("Start of experience: ", experience.current_experience)
print("Current Classes: ", experience.classes_in_this_experience)
# train returns a dictionary which contains all the metric values
res = cl_strategy.train(experience)
print('Training completed')
print('Computing accuracy on the whole test set')
# test also returns a dictionary which contains all the metric values
results.append(cl_strategy.eval(scenario.test_stream))!pip install avalanche-lib==0.3.1

Converting PyTorch Datasets to Avalanche Dataset

Baselines and Strategies Code Examples
from torch.optim import SGD
from torch.nn import CrossEntropyLoss
from avalanche.benchmarks.classic import SplitMNIST
from avalanche.evaluation.metrics import forgetting_metrics, \
accuracy_metrics, loss_metrics, timing_metrics, cpu_usage_metrics, \
confusion_matrix_metrics, disk_usage_metrics
from avalanche.models import SimpleMLP
from avalanche.logging import InteractiveLogger, TextLogger, TensorboardLogger, WandBLogger
from avalanche.training.plugins import EvaluationPlugin
from avalanche.training import Naive
benchmark = SplitMNIST(n_experiences=5, return_task_id=False)
# MODEL CREATION
model = SimpleMLP(num_classes=benchmark.n_classes)
# DEFINE THE EVALUATION PLUGIN and LOGGERS
# The evaluation plugin manages the metrics computation.
# It takes as argument a list of metrics, collectes their results and returns
# them to the strategy it is attached to.
loggers = []
# log to Tensorboard
loggers.append(TensorboardLogger())
# log to text file
loggers.append(TextLogger(open('log.txt', 'a')))
# print to stdout
loggers.append(InteractiveLogger())
# W&B logger - comment this if you don't have a W&B account
loggers.append(WandBLogger(project_name="avalanche", run_name="test"))
eval_plugin = EvaluationPlugin(
accuracy_metrics(minibatch=True, epoch=True, experience=True, stream=True),
loss_metrics(minibatch=True, epoch=True, experience=True, stream=True),
timing_metrics(epoch=True, epoch_running=True),
cpu_usage_metrics(experience=True),
forgetting_metrics(experience=True, stream=True),
confusion_matrix_metrics(num_classes=benchmark.n_classes, save_image=True,
stream=True),
disk_usage_metrics(minibatch=True, epoch=True, experience=True, stream=True),
loggers=loggers
)
# CREATE THE STRATEGY INSTANCE (NAIVE)
cl_strategy = Naive(
model, SGD(model.parameters(), lr=0.001, momentum=0.9),
CrossEntropyLoss(), train_mb_size=500, train_epochs=1, eval_mb_size=100,
evaluator=eval_plugin)
# TRAINING LOOP
print('Starting experiment...')
results = []
for experience in benchmark.train_stream:
# train returns a dictionary which contains all the metric values
res = cl_strategy.train(experience)
print('Training completed')
print('Computing accuracy on the whole test set')
# test also returns a dictionary which contains all the metric values
results.append(cl_strategy.eval(benchmark.test_stream))# need to manually call W&B run end since we are in a notebook
import wandb
wandb.finish()%load_ext tensorboard%tensorboard --logdir tb_data --port 6066!pip install avalanche-lib==0.3.1from avalanche.models import SimpleCNN
from avalanche.models import SimpleMLP
from avalanche.models import SimpleMLP_TinyImageNet
from avalanche.models import MobilenetV1
model = SimpleCNN()
print(model)from avalanche.benchmarks import SplitMNIST
from avalanche.models import IncrementalClassifier
benchmark = SplitMNIST(5, shuffle=False, class_ids_from_zero_in_each_exp=False)
model = IncrementalClassifier(in_features=784)
print(model)
for exp in benchmark.train_stream:
model.adaptation(exp)
print(model)from avalanche.benchmarks import SplitMNIST
from avalanche.models import MultiHeadClassifier
benchmark = SplitMNIST(5, shuffle=False, return_task_id=True, class_ids_from_zero_in_each_exp=True)
model = MultiHeadClassifier(in_features=784)
print(model)
for exp in benchmark.train_stream:
model.adaptation(exp)
print(model)from avalanche.models import MultiTaskModule
class CustomMTModule(MultiTaskModule):
def __init__(self, in_features, initial_out_features=2):
super().__init__()
def adaptation(self, dataset):
super().adaptation(dataset)
# your adaptation goes here
def forward_single_task(self, x, task_label):
# your forward goes here.
# task_label is a single integer
# the mini-batch is split by task-id inside the forward method.
passfrom avalanche.models import as_multitask
model = SimpleCNN()
print(model)
mt_model = as_multitask(model, 'classifier')
print(mt_model)!pip install avalanche-libimport torch
from torch.utils.data.dataset import TensorDataset
from avalanche.benchmarks.utils import AvalancheDataset
# Create a dataset of 100 data points described by 22 features + 1 class label
x_data = torch.rand(100, 22)
y_data = torch.randint(0, 5, (100,))
# Create the Dataset
torch_data = TensorDataset(x_data, y_data)
avl_data = AvalancheDataset(torch_data)print(torch_data[0])
print(avl_data[0])from avalanche.benchmarks.utils import make_classification_dataset
# first, we add targets to the dataset. This will be used by the AvalancheDataset
# If possible, avalanche tries to extract the targets from the dataset.
# most datasets in torchvision already have a targets field so you don't need this step.
torch_data.targets = torch.randint(0, 5, (100,)).tolist()
tls = [0 for _ in range(100)] # one task label for each sample
sup_data = make_classification_dataset(torch_data, task_labels=tls)from torch.utils.data.dataloader import DataLoader
my_dataloader = DataLoader(avl_data, batch_size=10, shuffle=True)
# Run one epoch
for x_minibatch, y_minibatch in my_dataloader:
print('Loaded minibatch of', len(x_minibatch), 'instances')
# Output: "Loaded minibatch of 10 instances" x10 timescat_data = avl_data.concat(avl_data)
print(len(cat_data)) # 100 + 100 = 200
print(len(avl_data)) # 100, original data stays the same
sub_data = avl_data.subset(list(range(50)))
print(len(sub_data)) # 50
print(len(avl_data)) # 100, original data stays the sametls = [0 for _ in range(100)] # one task label for each sample
sup_data = make_classification_dataset(torch_data, task_labels=tls)
print(sup_data.targets.name, len(sup_data.targets._data))
print(sup_data.targets_task_labels.name, len(sup_data.targets_task_labels._data))
# after subsampling
sub_data = sup_data.subset(range(10))
print(sub_data.targets.name, len(sub_data.targets._data))
print(sub_data.targets_task_labels.name, len(sub_data.targets_task_labels._data))
# after concat
cat_data = sup_data.concat(sup_data)
print(cat_data.targets.name, len(cat_data.targets._data))
print(cat_data.targets_task_labels.name, len(cat_data.targets_task_labels._data))!pip install avalanche-libfrom avalanche.benchmarks import SplitMNIST
from avalanche.benchmarks.utils.data_loader import GroupBalancedDataLoader
benchmark = SplitMNIST(5, return_task_id=True)
dl = GroupBalancedDataLoader([exp.dataset for exp in benchmark.train_stream], batch_size=4)
for x, y, t in dl:
print(t.tolist())
breakfrom avalanche.training.storage_policy import ReservoirSamplingBuffer
from types import SimpleNamespace
benchmark = SplitMNIST(5, return_task_id=False)
storage_p = ReservoirSamplingBuffer(max_size=30)
print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")for i in range(5):
strategy_state = SimpleNamespace(experience=benchmark.train_stream[i])
storage_p.update(strategy_state)
print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
print(f"class targets: {storage_p.buffer.targets}\n")from avalanche.training.storage_policy import ParametricBuffer, RandomExemplarsSelectionStrategy
storage_p = ParametricBuffer(
max_size=30,
groupby='class',
selection_strategy=RandomExemplarsSelectionStrategy()
)
print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
for i in range(5):
strategy_state = SimpleNamespace(experience=benchmark.train_stream[i])
storage_p.update(strategy_state)
print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
print(f"class targets: {storage_p.buffer.targets}\n")for k, v in storage_p.buffer_groups.items():
print(f"(group {k}) -> size {len(v.buffer)}")datas = [v.buffer for v in storage_p.buffer_groups.values()]
dl = GroupBalancedDataLoader(datas)
for x, y, t in dl:
print(y.tolist())
breakfrom avalanche.benchmarks.utils.data_loader import ReplayDataLoader
from avalanche.training.plugins import StrategyPlugin
class CustomReplay(StrategyPlugin):
def __init__(self, storage_policy):
super().__init__()
self.storage_policy = storage_policy
def before_training_exp(self, strategy,
num_workers: int = 0, shuffle: bool = True,
**kwargs):
""" Here we set the dataloader. """
if len(self.storage_policy.buffer) == 0:
# first experience. We don't use the buffer, no need to change
# the dataloader.
return
# replay dataloader samples mini-batches from the memory and current
# data separately and combines them together.
print("Override the dataloader.")
strategy.dataloader = ReplayDataLoader(
strategy.adapted_dataset,
self.storage_policy.buffer,
oversample_small_tasks=True,
num_workers=num_workers,
batch_size=strategy.train_mb_size,
shuffle=shuffle)
def after_training_exp(self, strategy: "BaseStrategy", **kwargs):
""" We update the buffer after the experience.
You can use a different callback to update the buffer in a different place
"""
print("Buffer update.")
self.storage_policy.update(strategy, **kwargs)
from torch.nn import CrossEntropyLoss
from avalanche.training import Naive
from avalanche.evaluation.metrics import accuracy_metrics
from avalanche.training.plugins import EvaluationPlugin
from avalanche.logging import InteractiveLogger
from avalanche.models import SimpleMLP
import torch
scenario = SplitMNIST(5)
model = SimpleMLP(num_classes=scenario.n_classes)
storage_p = ParametricBuffer(
max_size=500,
groupby='class',
selection_strategy=RandomExemplarsSelectionStrategy()
)
# choose some metrics and evaluation method
interactive_logger = InteractiveLogger()
eval_plugin = EvaluationPlugin(
accuracy_metrics(experience=True, stream=True),
loggers=[interactive_logger])
# CREATE THE STRATEGY INSTANCE (NAIVE)
cl_strategy = Naive(model, torch.optim.Adam(model.parameters(), lr=0.001),
CrossEntropyLoss(),
train_mb_size=100, train_epochs=1, eval_mb_size=100,
plugins=[CustomReplay(storage_p)],
evaluator=eval_plugin
)
# TRAINING LOOP
print('Starting experiment...')
results = []
for experience in scenario.train_stream:
print("Start of experience ", experience.current_experience)
cl_strategy.train(experience)
print('Training completed')
print('Computing accuracy on the whole test set')
results.append(cl_strategy.eval(scenario.test_stream))
Continual Learning Algorithms Prototyping Made Easy
!pip install avalanche-lib==0.3.1!pip install avalanche-lib==0.3.1torch.nn.functional.adaptationstrategyHappiness is only Real when Shared
from torch.optim import SGD
from torch.nn import CrossEntropyLoss
from avalanche.models import SimpleMLP
from avalanche.training.supervised import Naive, CWRStar, Replay, GDumb, Cumulative, LwF, GEM, AGEM, EWC # and many more!
model = SimpleMLP(num_classes=10)
optimizer = SGD(model.parameters(), lr=0.001, momentum=0.9)
criterion = CrossEntropyLoss()
cl_strategy = Naive(
model, optimizer, criterion,
train_mb_size=100, train_epochs=4, eval_mb_size=100
)from avalanche.benchmarks.classic import SplitMNIST
# scenario
benchmark = SplitMNIST(n_experiences=5, seed=1)
# TRAINING LOOP
print('Starting experiment...')
results = []
for experience in benchmark.train_stream:
print("Start of experience: ", experience.current_experience)
print("Current Classes: ", experience.classes_in_this_experience)
cl_strategy.train(experience)
print('Training completed')
print('Computing accuracy on the whole test set')
results.append(cl_strategy.eval(benchmark.test_stream))from avalanche.training.plugins import EarlyStoppingPlugin
strategy = Naive(
model, optimizer, criterion,
plugins=[EarlyStoppingPlugin(patience=10, val_stream_name='train')])from avalanche.training.templates import SupervisedTemplate
from avalanche.training.plugins import ReplayPlugin, EWCPlugin
replay = ReplayPlugin(mem_size=100)
ewc = EWCPlugin(ewc_lambda=0.001)
strategy = SupervisedTemplate(
model, optimizer, criterion,
plugins=[replay, ewc])train
before_training
before_train_dataset_adaptation
train_dataset_adaptation
after_train_dataset_adaptation
make_train_dataloader
model_adaptation
make_optimizer
before_training_exp # for each exp
before_training_epoch # for each epoch
before_training_iteration # for each iteration
before_forward
after_forward
before_backward
after_backward
after_training_iteration
before_update
after_update
after_training_epoch
after_training_exp
after_trainingeval
before_eval
before_eval_dataset_adaptation
eval_dataset_adaptation
after_eval_dataset_adaptation
make_eval_dataloader
model_adaptation
before_eval_exp # for each exp
eval_epoch # we have a single epoch in evaluation mode
before_eval_iteration # for each iteration
before_eval_forward
after_eval_forward
after_eval_iteration
after_eval_exp
after_evalfrom avalanche.benchmarks.utils.data_loader import ReplayDataLoader
from avalanche.core import SupervisedPlugin
from avalanche.training.storage_policy import ReservoirSamplingBuffer
class ReplayP(SupervisedPlugin):
def __init__(self, mem_size):
""" A simple replay plugin with reservoir sampling. """
super().__init__()
self.buffer = ReservoirSamplingBuffer(max_size=mem_size)
def before_training_exp(self, strategy: "SupervisedTemplate",
num_workers: int = 0, shuffle: bool = True,
**kwargs):
""" Use a custom dataloader to combine samples from the current data and memory buffer. """
if len(self.buffer.buffer) == 0:
# first experience. We don't use the buffer, no need to change
# the dataloader.
return
strategy.dataloader = ReplayDataLoader(
strategy.adapted_dataset,
self.buffer.buffer,
oversample_small_tasks=True,
num_workers=num_workers,
batch_size=strategy.train_mb_size,
shuffle=shuffle)
def after_training_exp(self, strategy: "SupervisedTemplate", **kwargs):
""" Update the buffer. """
self.buffer.update(strategy, **kwargs)
benchmark = SplitMNIST(n_experiences=5, seed=1)
model = SimpleMLP(num_classes=10)
optimizer = SGD(model.parameters(), lr=0.01, momentum=0.9)
criterion = CrossEntropyLoss()
strategy = Naive(model=model, optimizer=optimizer, criterion=criterion, train_mb_size=128,
plugins=[ReplayP(mem_size=2000)])
strategy.train(benchmark.train_stream)
strategy.eval(benchmark.test_stream)from avalanche.benchmarks.utils import AvalancheConcatDataset
from avalanche.training.templates import SupervisedTemplate
class Cumulative(SupervisedTemplate):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.dataset = None # cumulative dataset
def train_dataset_adaptation(self, **kwargs):
super().train_dataset_adaptation(**kwargs)
curr_data = self.experience.dataset
if self.dataset is None:
self.dataset = curr_data
else:
self.dataset = AvalancheConcatDataset([self.dataset, curr_data])
self.adapted_dataset = self.dataset.train()
strategy = Cumulative(model=model, optimizer=optimizer, criterion=criterion, train_mb_size=128)
strategy.train(benchmark.train_stream)!pip install avalanche-libimport sys
sys.path.append('/home/lorenzo/Desktop/ProjectsVCS/avalanche/')
import os
from typing import Sequence
import torch
from torch.nn import CrossEntropyLoss
from torch.optim import SGD
from avalanche.benchmarks import CLExperience, SplitMNIST
from avalanche.evaluation.metrics import accuracy_metrics, loss_metrics, \
class_accuracy_metrics
from avalanche.logging import InteractiveLogger, TensorboardLogger, \
WandBLogger, TextLogger
from avalanche.models import SimpleMLP, as_multitask
from avalanche.training.determinism.rng_manager import RNGManager
from avalanche.training.plugins import EvaluationPlugin, ReplayPlugin
from avalanche.training.plugins.checkpoint import CheckpointPlugin, \
FileSystemCheckpointStorage
from avalanche.training.supervised import Naive# Set a fixed seed: must be kept the same across save/resume operations
RNGManager.set_random_seeds(1234)
# Nothing new here...
device = torch.device(
f"cuda:0"
if torch.cuda.is_available()
else "cpu"
)
print('Using device', device)
# CL Benchmark Creation
n_experiences = 5
benchmark = SplitMNIST(n_experiences=n_experiences,
return_task_id=True)
input_size = 28*28*1
# Define the model (and load initial weights if necessary)
# Again, not checkpoint-related
model = SimpleMLP(input_size=input_size,
num_classes=benchmark.n_classes // n_experiences)
model = as_multitask(model, 'classifier')
# Prepare for training & testing: not checkpoint-related
optimizer = SGD(model.parameters(), lr=0.01, momentum=0.9)
criterion = CrossEntropyLoss()checkpoint_plugin = CheckpointPlugin(
FileSystemCheckpointStorage(
directory='./checkpoints/task_incremental',
),
map_location=device
)
# Load checkpoint (if exists in the given storage)
# If it does not exist, strategy will be None and initial_exp will be 0
strategy, initial_exp = checkpoint_plugin.load_checkpoint_if_exists()
# Create the CL strategy (if not already loaded from checkpoint)
if strategy is None:
plugins = [
checkpoint_plugin, # Add the checkpoint plugin to the list!
ReplayPlugin(mem_size=500), # Other plugins you want to use
# ...
]
# Create loggers (as usual)
# Note that the checkpoint plugin will automatically correctly
# resume loggers!
os.makedirs(f'./logs/checkpointing_example',
exist_ok=True)
loggers = [
TextLogger(
open(f'./logs/checkpointing_example/log.txt', 'w')),
InteractiveLogger(),
TensorboardLogger(f'./logs/checkpointing_example')
]
# The W&B logger is correctly resumed without resulting in
# duplicated runs!
use_wandb = False
if use_wandb:
loggers.append(WandBLogger(
project_name='AvalancheCheckpointing',
run_name=f'checkpointing_example'
))
# Create the evaluation plugin (as usual)
evaluation_plugin = EvaluationPlugin(
accuracy_metrics(minibatch=False, epoch=True,
experience=True, stream=True),
loss_metrics(minibatch=False, epoch=True,
experience=True, stream=True),
class_accuracy_metrics(
stream=True
),
loggers=loggers
)
# Create the strategy (as usual)
strategy = Naive(
model=model,
optimizer=optimizer,
criterion=criterion,
train_mb_size=128,
train_epochs=2,
eval_mb_size=128,
device=device,
plugins=plugins,
evaluator=evaluation_plugin
)exit_early = False
for train_task in benchmark.train_stream[initial_exp:]:
strategy.train(train_task)
strategy.eval(benchmark.test_stream)
if exit_early:
exit(0)import torch
from avalanche.evaluation.metrics import Accuracy, TaskAwareAccuracy
# create an instance of the standalone Accuracy metric
# initial accuracy is 0
acc_metric = Accuracy()
print("Initial Accuracy: ", acc_metric.result()) # output 0.0
# two consecutive metric updates
real_y = torch.tensor([1, 2]).long()
predicted_y = torch.tensor([1, 0]).float()
acc_metric.update(real_y, predicted_y)
acc = acc_metric.result()
print("Average Accuracy: ", acc) # output 0.5
predicted_y = torch.tensor([1,2]).float()
acc_metric.update(real_y, predicted_y)
acc = acc_metric.result()
print("Average Accuracy: ", acc) # output 0.75
# reset accuracy
acc_metric.reset()
print("After reset: ", acc_metric.result()) # output 0.0# create an instance of the standalone TaskAwareAccuracy metric
# initial accuracy is 0 for each task
acc_metric = TaskAwareAccuracy()
print("Initial Accuracy: ", acc_metric.result()) # output {}
# metric updates for 2 different tasks
task_label = 0
real_y = torch.tensor([1, 2]).long()
predicted_y = torch.tensor([1, 0]).float()
acc_metric.update(real_y, predicted_y, task_label)
acc = acc_metric.result()
print("Average Accuracy: ", acc) # output 0.5 for task 0
task_label = 1
predicted_y = torch.tensor([1,2]).float()
acc_metric.update(real_y, predicted_y, task_label)
acc = acc_metric.result()
print("Average Accuracy: ", acc) # output 0.75 for task 0 and 1.0 for task 1
task_label = 0
predicted_y = torch.tensor([1,2]).float()
acc_metric.update(real_y, predicted_y, task_label)
acc = acc_metric.result()
print("Average Accuracy: ", acc) # output 0.75 for task 0 and 1.0 for task 1
# reset accuracy
acc_metric.reset()
print("After reset: ", acc_metric.result()) # output {}from avalanche.evaluation.metrics import accuracy_metrics, \
loss_metrics, forgetting_metrics, bwt_metrics,\
confusion_matrix_metrics, cpu_usage_metrics, \
disk_usage_metrics, gpu_usage_metrics, MAC_metrics, \
ram_usage_metrics, timing_metrics
# you may pass the result to the EvaluationPlugin
metrics = accuracy_metrics(epoch=True, experience=True)from torch.nn import CrossEntropyLoss
from torch.optim import SGD
from avalanche.benchmarks.classic import SplitMNIST
from avalanche.evaluation.metrics import forgetting_metrics, \
accuracy_metrics, loss_metrics, timing_metrics, cpu_usage_metrics, \
confusion_matrix_metrics, disk_usage_metrics
from avalanche.models import SimpleMLP
from avalanche.logging import InteractiveLogger
from avalanche.training.plugins import EvaluationPlugin
from avalanche.training import Naive
benchmark = SplitMNIST(n_experiences=5)
# MODEL CREATION
model = SimpleMLP(num_classes=benchmark.n_classes)
# DEFINE THE EVALUATION PLUGIN
# The evaluation plugin manages the metrics computation.
# It takes as argument a list of metrics, collectes their results and returns
# them to the strategy it is attached to.
eval_plugin = EvaluationPlugin(
accuracy_metrics(minibatch=True, epoch=True, experience=True, stream=True),
loss_metrics(minibatch=True, epoch=True, experience=True, stream=True),
timing_metrics(epoch=True),
forgetting_metrics(experience=True, stream=True),
cpu_usage_metrics(experience=True),
confusion_matrix_metrics(num_classes=benchmark.n_classes, save_image=False, stream=True),
disk_usage_metrics(minibatch=True, epoch=True, experience=True, stream=True),
loggers=[InteractiveLogger()],
strict_checks=False
)
# CREATE THE STRATEGY INSTANCE (NAIVE)
cl_strategy = Naive(
model, SGD(model.parameters(), lr=0.001, momentum=0.9),
CrossEntropyLoss(), train_mb_size=500, train_epochs=1, eval_mb_size=100,
evaluator=eval_plugin)
# TRAINING LOOP
print('Starting experiment...')
results = []
for experience in benchmark.train_stream:
# train returns a dictionary which contains all the metric values
res = cl_strategy.train(experience)
print('Training completed')
print('Computing accuracy on the whole test set')
# test also returns a dictionary which contains all the metric values
results.append(cl_strategy.eval(benchmark.test_stream))from avalanche.evaluation import Metric
# a standalone metric implementation
class MyStandaloneMetric(Metric[float]):
"""
This metric will return a `float` value
"""
def __init__(self):
"""
Initialize your metric here
"""
super().__init__()
pass
def update(self):
"""
Update metric value here
"""
pass
def result(self) -> float:
"""
Emit the metric result here
"""
return 0
def reset(self):
"""
Reset your metric here
"""
passfrom avalanche.evaluation import PluginMetric
from avalanche.evaluation.metrics import Accuracy
from avalanche.evaluation.metric_results import MetricValue
from avalanche.evaluation.metric_utils import get_metric_name
class MyPluginMetric(PluginMetric[float]):
"""
This metric will return a `float` value after
each training epoch
"""
def __init__(self):
"""
Initialize the metric
"""
super().__init__()
self._accuracy_metric = Accuracy()
def reset(self) -> None:
"""
Reset the metric
"""
self._accuracy_metric.reset()
def result(self) -> float:
"""
Emit the result
"""
return self._accuracy_metric.result()
def after_training_iteration(self, strategy: 'PluggableStrategy') -> None:
"""
Update the accuracy metric with the current
predictions and targets
"""
# task labels defined for each experience
task_labels = strategy.experience.task_labels
if len(task_labels) > 1:
# task labels defined for each pattern
task_labels = strategy.mb_task_id
else:
task_labels = task_labels[0]
self._accuracy_metric.update(strategy.mb_output, strategy.mb_y,
task_labels)
def before_training_epoch(self, strategy: 'PluggableStrategy') -> None:
"""
Reset the accuracy before the epoch begins
"""
self.reset()
def after_training_epoch(self, strategy: 'PluggableStrategy'):
"""
Emit the result
"""
return self._package_result(strategy)
def _package_result(self, strategy):
"""Taken from `GenericPluginMetric`, check that class out!"""
metric_value = self.accuracy_metric.result()
add_exp = False
plot_x_position = strategy.clock.train_iterations
if isinstance(metric_value, dict):
metrics = []
for k, v in metric_value.items():
metric_name = get_metric_name(
self, strategy, add_experience=add_exp, add_task=k)
metrics.append(MetricValue(self, metric_name, v,
plot_x_position))
return metrics
else:
metric_name = get_metric_name(self, strategy,
add_experience=add_exp,
add_task=True)
return [MetricValue(self, metric_name, metric_value,
plot_x_position)]
def __str__(self):
"""
Here you can specify the name of your metric
"""
return "Top1_Acc_Epoch"eval_plugin2 = EvaluationPlugin(
accuracy_metrics(minibatch=True, epoch=True, experience=True, stream=True),
loss_metrics(minibatch=True, epoch=True, experience=True, stream=True),
forgetting_metrics(experience=True, stream=True),
timing_metrics(epoch=True),
cpu_usage_metrics(experience=True),
confusion_matrix_metrics(num_classes=benchmark.n_classes, save_image=False, stream=True),
disk_usage_metrics(minibatch=True, epoch=True, experience=True, stream=True),
collect_all=True, # this is default value anyway
loggers=[InteractiveLogger()]
)
# since no training and evaluation has been performed, this will return an empty dict.
metric_dict = eval_plugin2.get_all_metrics()
print(metric_dict)d = eval_plugin.get_all_metrics()
d['Top1_Acc_Epoch/train_phase/train_stream/Task000']print(res)print(results[-1])!pip install avalanche-libfrom torchvision import transforms
from torchvision.datasets import MNIST
from avalanche.benchmarks.utils import make_classification_dataset
mnist_dataset = MNIST('mnist_data', download=True)
# Define the training transformation for X values
train_transformation = transforms.Compose([
transforms.RandomRotation(45),
transforms.ToTensor(),
])
# Define the training transformation for Y values (rarely used)
train_target_transformation = None
# Define the test transformation for X values
eval_transformation = transforms.ToTensor()
# Define the test transformation for Y values (rarely used)
eval_target_transformation = None
transform_groups = {
'train': (train_transformation, train_target_transformation),
'eval': (eval_transformation, eval_target_transformation)
}
avl_mnist_transform = make_classification_dataset(mnist_dataset, transform_groups=transform_groups)# Not recommended: use transform_groups instead
avl_mnist_same_transforms = make_classification_dataset(mnist_dataset, transform=train_transformation)# Obtain a view of the dataset in which eval transformations are enabled
avl_mnist_eval = avl_mnist_transform.eval()
# Obtain a view of the dataset in which we get back to train transforms
# Basically, avl_mnist_transform ~= avl_mnist_train
avl_mnist_train = avl_mnist_eval.train()
# we are looking inside the dataset to check the transformations.
# in real code, you never need to do this ;)
cgroup = avl_mnist_train._transform_groups.current_group
print("Original dataset transformations: (train group by default)")
# notice that the original transform are unchanged.
print(avl_mnist_train._transform_groups.transform_groups[cgroup])
print("\neval mode dataset transformations:")
cgroup = avl_mnist_eval._transform_groups.current_group
print(avl_mnist_eval._transform_groups.transform_groups[cgroup])
print("\ntrain mode dataset transformations:")
cgroup = avl_mnist_train._transform_groups.current_group
print(avl_mnist_train._transform_groups.transform_groups[cgroup])from avalanche.benchmarks.utils import AvalancheDataset
replay_transform = transforms.Compose([
transforms.RandomCrop(28, padding=4),
transforms.ToTensor()
])
replay_target_transform = None
transform_groups_with_replay = {
'train': (None, None),
'eval': (None, None),
'replay': (replay_transform, replay_target_transform)
}
AvalancheDataset(mnist_dataset, transform_groups=transform_groups_with_replay)avl_mnist_custom_transform_not_enabled = AvalancheDataset(
mnist_dataset,
transform_groups=transform_groups_with_replay)
avl_mnist_custom_transform_2 = avl_mnist_custom_transform_not_enabled.with_transforms('replay')
cgroup = avl_mnist_custom_transform_2._transform_groups.current_group
print(avl_mnist_custom_transform_2._transform_groups.transform_groups[cgroup])
# prints output:
# Compose(
# RandomCrop(size=(28, 28), padding=4)
# ToTensor()
# )avl_mnist = make_classification_dataset(mnist_dataset, transform_groups=transform_groups)
new_transform = transforms.RandomCrop(size=(28, 28), padding=4)
# Append a transformation. Simple as:
transform = (new_transform, None)
avl_mnist_replaced_transform = avl_mnist.replace_current_transform_group(transform)
cgroup = avl_mnist_replaced_transform._transform_groups.current_group
print('With replaced transform:', avl_mnist_replaced_transform._transform_groups.transform_groups[cgroup])
# Prints: "With replaces transforms: RandomCrop(size=(28, 28), padding=4)"
# Check that the original dataset was not affected:
cgroup = avl_mnist._transform_groups.current_group
print('Original dataset:', avl_mnist._transform_groups.transform_groups[cgroup])
# Prints: "Original dataset: ToTensor()"from avalanche.benchmarks.classic.cmnist import PixelsPermutation
import numpy as np
import torch
# Instantiate MNIST train and test sets
mnist_train = MNIST('mnist_data', train=True, download=True)
mnist_test = MNIST('mnist_data', train=False, download=True)
# Define the transformation used to permute the pixels
rng_seed = 4321
rng_permute = np.random.RandomState(rng_seed)
idx_permute = torch.from_numpy(rng_permute.permutation(784)).type(torch.int64)
permutation_transform = PixelsPermutation(idx_permute)
# Define the transforms group
perm_group_transforms = dict(
train=(permutation_transform, None),
eval=(permutation_transform, None)
)
# Create the datasets and freeze transforms
# Note: one can call "freeze_transforms" on constructor result
# or you can do this in 2 steps. The result is the same (obviously).
# The next part show both ways:
# Train set
permuted_train_set = AvalancheDataset(
mnist_train,
transform_groups=perm_group_transforms).freeze_transforms()
# Test set
permuted_test_set = AvalancheDataset(mnist_test, transform_groups=perm_group_transforms).eval()
permuted_test_set = permuted_test_set.freeze_transforms()# First, show that the image pixels are permuted
print('Before replace_transforms:')
display(permuted_train_set[0][0].resize((192, 192), 0))
# Try to remove the permutation
with_removed_transforms = permuted_train_set.replace_current_transform_group((None, None))
print('After replace_transforms:')
display(permuted_train_set[0][0].resize((192, 192), 0))
display(with_removed_transforms[0][0].resize((192, 192), 0))Evaluation: Training utils and Benchmarks are not enough alone to push continual learning research forward. Comprehensive and sound evaluation protocols and metrics need to be employed as well.!pip install avalanche-lib[all]
!pip show avalanche-libAvalanche
├── Benchmarks
│ ├── Classic
│ ├── Datasets
│ ├── Generators
│ ├── Scenarios
│ └── Utils
├── Evaluation
│ ├── Metrics
| └── Utils
├── Training
│ ├── Strategies
│ ├── Plugins
| └── Utils
├── Models
└── Loggers
from avalanche.benchmarks.datasets import MNIST, FashionMNIST, KMNIST, EMNIST, \
QMNIST, FakeData, CocoCaptions, CocoDetection, LSUN, ImageNet, CIFAR10, \
CIFAR100, STL10, SVHN, PhotoTour, SBU, Flickr8k, Flickr30k, VOCDetection, \
VOCSegmentation, Cityscapes, SBDataset, USPS, HMDB51, UCF101, CelebA, \
CORe50Dataset, TinyImagenet, CUB200, OpenLORIS, MiniImageNetDataset, \
Stream51, CLEARDatasetfrom avalanche.benchmarks.classic import CORe50, SplitTinyImageNet, SplitCIFAR10, \
SplitCIFAR100, SplitCIFAR110, SplitMNIST, RotatedMNIST, PermutedMNIST, SplitCUB200
# creating the benchmark (scenario object)
perm_mnist = PermutedMNIST(
n_experiences=3,
seed=1234,
)
# recovering the train and test streams
train_stream = perm_mnist.train_stream
test_stream = perm_mnist.test_stream
# iterating over the train stream
for experience in train_stream:
print("Start of task ", experience.task_label)
print('Classes in this task:', experience.classes_in_this_experience)
# The current Pytorch training set can be easily recovered through the
# experience
current_training_set = experience.dataset
# ...as well as the task_label
print('Task {}'.format(experience.task_label))
print('This task contains', len(current_training_set), 'training examples')
# we can recover the corresponding test experience in the test stream
current_test_set = test_stream[experience.current_experience].dataset
print('This task contains', len(current_test_set), 'test examples')from avalanche.benchmarks.generators import nc_benchmark, ni_benchmark
from torchvision.datasets import MNIST
mnist_train = MNIST('.', train=True, download=True)
mnist_test = MNIST('.', train=False)
benchmark = ni_benchmark(
mnist_train, mnist_test, n_experiences=10, shuffle=True, seed=1234,
balance_experiences=True
)
benchmark = nc_benchmark(
mnist_train, mnist_test, n_experiences=10, shuffle=True, seed=1234,
task_labels=False
)from avalanche.benchmarks.generators import filelist_benchmark, dataset_benchmark, \
tensors_benchmark, paths_benchmarkfrom avalanche.models import SimpleMLP
from avalanche.training import Naive, CWRStar, Replay, GDumb, \
Cumulative, LwF, GEM, AGEM, EWC, AR1
from torch.optim import SGD
from torch.nn import CrossEntropyLoss
model = SimpleMLP(num_classes=10)
cl_strategy = Naive(
model, SGD(model.parameters(), lr=0.001, momentum=0.9),
CrossEntropyLoss(), train_mb_size=100, train_epochs=4, eval_mb_size=100
)from torch.utils.data import DataLoader
class MyStrategy():
"""My Basic Strategy"""
def __init__(self, model, optimizer, criterion):
self.model = model
self.optimizer = optimizer
self.criterion = criterion
def train(self, experience):
# here you can implement your own training loop for each experience (i.e.
# batch or task).
train_dataset = experience.dataset
t = experience.task_label
train_data_loader = DataLoader(
train_dataset, num_workers=4, batch_size=128
)
for epoch in range(1):
for mb in train_data_loader:
# you magin here...
pass
def eval(self, experience):
# here you can implement your own eval loop for each experience (i.e.
# batch or task).
eval_dataset = experience.dataset
t = experience.task_label
eval_data_loader = DataLoader(
eval_dataset, num_workers=4, batch_size=128
)
# eval herefrom avalanche.models import SimpleMLP
from avalanche.benchmarks import SplitMNIST
# Benchmark creation
benchmark = SplitMNIST(n_experiences=5)
# Model Creation
model = SimpleMLP(num_classes=benchmark.n_classes)
# Create the Strategy Instance (MyStrategy)
cl_strategy = MyStrategy(
model, SGD(model.parameters(), lr=0.001, momentum=0.9),
CrossEntropyLoss())
# Training Loop
print('Starting experiment...')
for exp_id, experience in enumerate(benchmark.train_stream):
print("Start of experience ", experience.current_experience)
cl_strategy.train(experience)
print('Training completed')
print('Computing accuracy on the current test set')
cl_strategy.eval(benchmark.test_stream[exp_id])# utility functions to create plugin metrics
from avalanche.evaluation.metrics import accuracy_metrics, loss_metrics, forgetting_metrics
from avalanche.logging import InteractiveLogger, TensorboardLogger
from avalanche.training.plugins import EvaluationPlugin
eval_plugin = EvaluationPlugin(
# accuracy after each training epoch
# and after each evaluation experience
accuracy_metrics(epoch=True, experience=True),
# loss after each training minibatch and each
# evaluation stream
loss_metrics(minibatch=True, stream=True),
# catastrophic forgetting after each evaluation
# experience
forgetting_metrics(experience=True, stream=True),
# add as many metrics as you like
loggers=[InteractiveLogger(), TensorboardLogger()])
# pass the evaluation plugin instance to the strategy
# strategy = EWC(..., evaluator=eval_plugin)
# THAT'S IT!!from avalanche.benchmarks.classic import SplitMNIST
from avalanche.evaluation.metrics import forgetting_metrics, accuracy_metrics,\
loss_metrics, timing_metrics, cpu_usage_metrics, StreamConfusionMatrix,\
disk_usage_metrics, gpu_usage_metrics
from avalanche.models import SimpleMLP
from avalanche.logging import InteractiveLogger, TextLogger, TensorboardLogger
from avalanche.training.plugins import EvaluationPlugin
from avalanche.training import Naive
from torch.optim import SGD
from torch.nn import CrossEntropyLoss
benchmark = SplitMNIST(n_experiences=5)
# MODEL CREATION
model = SimpleMLP(num_classes=benchmark.n_classes)
# DEFINE THE EVALUATION PLUGIN and LOGGERS
# The evaluation plugin manages the metrics computation.
# It takes as argument a list of metrics, collectes their results and returns
# them to the strategy it is attached to.
# log to Tensorboard
tb_logger = TensorboardLogger()
# log to text file
text_logger = TextLogger(open('log.txt', 'a'))
# print to stdout
interactive_logger = InteractiveLogger()
eval_plugin = EvaluationPlugin(
accuracy_metrics(minibatch=True, epoch=True, experience=True, stream=True),
loss_metrics(minibatch=True, epoch=True, experience=True, stream=True),
timing_metrics(epoch=True),
cpu_usage_metrics(experience=True),
forgetting_metrics(experience=True, stream=True),
StreamConfusionMatrix(num_classes=benchmark.n_classes, save_image=False),
disk_usage_metrics(minibatch=True, epoch=True, experience=True, stream=True),
loggers=[interactive_logger, text_logger, tb_logger]
)
# CREATE THE STRATEGY INSTANCE (NAIVE)
cl_strategy = Naive(
model, SGD(model.parameters(), lr=0.001, momentum=0.9),
CrossEntropyLoss(), train_mb_size=500, train_epochs=1, eval_mb_size=100,
evaluator=eval_plugin)
# TRAINING LOOP
print('Starting experiment...')
results = []
for experience in benchmark.train_stream:
print("Start of experience: ", experience.current_experience)
print("Current Classes: ", experience.classes_in_this_experience)
# train returns a dictionary which contains all the metric values
res = cl_strategy.train(experience, num_workers=4)
print('Training completed')
print('Computing accuracy on the whole test set')
# eval also returns a dictionary which contains all the metric values
results.append(cl_strategy.eval(benchmark.test_stream, num_workers=4))


Create your Continual Learning Benchmark and Start Prototyping
!pip install avalanche-lib==0.3.1Benchmark we mean a well-defined and carefully thought combination of a scenario with one or multiple datasets that we can use to asses our continual learning algorithms.Genericimport torch
import torchvision
from avalanche.benchmarks.datasets import MNIST, FashionMNIST, KMNIST, EMNIST, \
QMNIST, FakeData, CocoCaptions, CocoDetection, LSUN, ImageNet, CIFAR10, \
CIFAR100, STL10, SVHN, PhotoTour, SBU, Flickr8k, Flickr30k, VOCDetection, \
VOCSegmentation, Cityscapes, SBDataset, USPS, HMDB51, UCF101, \
CelebA, CORe50Dataset, TinyImagenet, CUB200, OpenLORIS
# As we would simply do with any Pytorch dataset we can create the train and
# test sets from it. We could use any of the above imported Datasets, but let's
# just try to use the standard MNIST.
train_MNIST = MNIST(
'./data/mnist', train=True, download=True, transform=torchvision.transforms.ToTensor()
)
test_MNIST = MNIST(
'./data/mnist', train=False, download=True, transform=torchvision.transforms.ToTensor()
)
# Given these two sets we can simply iterate them to get the examples one by one
for i, example in enumerate(train_MNIST):
pass
print("Num. examples processed: {}".format(i))
# or use a Pytorch DataLoader
train_loader = torch.utils.data.DataLoader(
train_MNIST, batch_size=32, shuffle=True
)
for i, (x, y) in enumerate(train_loader):
pass
print("Num. mini-batch processed: {}".format(i))from avalanche.benchmarks.utils import ImageFolder, DatasetFolder, FilelistDataset, AvalancheDatasetfrom avalanche.benchmarks.classic import SplitMNIST
split_mnist = SplitMNIST(n_experiences=5, seed=1)
# Original train/test sets
print('--- Original datasets:')
print(split_mnist.original_train_dataset)
print(split_mnist.original_test_dataset)
# A list describing which training patterns are assigned to each experience.
# Patterns are identified by their id w.r.t. the dataset found in the
# original_train_dataset field.
print('--- Train patterns assignment:')
print(split_mnist.train_exps_patterns_assignment)
# A list describing which test patterns are assigned to each experience.
# Patterns are identified by their id w.r.t. the dataset found in the
# original_test_dataset field
print('--- Test patterns assignment:')
print(split_mnist.test_exps_patterns_assignment)
# the task label of each experience.
print('--- Task labels:')
print(split_mnist.task_labels)
# train and test streams
print('--- Streams:')
print(split_mnist.train_stream)
print(split_mnist.test_stream)
# A list that, for each experience (identified by its index/ID),
# stores a set of the (optionally remapped) IDs of classes of patterns
# assigned to that experience.
print('--- Classes in each experience:')
print(split_mnist.original_classes_in_exp)# each stream has a name: "train" or "test"
train_stream = split_mnist.train_stream
print(train_stream.name)
# we have access to the scenario from which the stream was taken
train_stream.benchmark
# we can slice and reorder the stream as we like!
substream = train_stream[0]
substream = train_stream[0:2]
substream = train_stream[0,2,1]
len(substream)# we get the first experience
experience = train_stream[0]
# task label and dataset are the main attributes
t_label = experience.task_label
dataset = experience.dataset
# but you can recover additional info
experience.current_experience
experience.classes_in_this_experience
experience.classes_seen_so_far
experience.previous_classes
experience.future_classes
experience.origin_stream
experience.benchmark
# As always, we can iterate over it normally or with a pytorch
# data loader.
# For instance, we can use tqdm to add a progress bar.
from tqdm import tqdm
for i, data in enumerate(tqdm(dataset)):
pass
print("\nNumber of examples:", i + 1)
print("Task Label:", t_label)from avalanche.benchmarks.classic import CORe50, SplitTinyImageNet, \
SplitCIFAR10, SplitCIFAR100, SplitCIFAR110, SplitMNIST, RotatedMNIST, \
PermutedMNIST, SplitCUB200, SplitImageNet
# creating PermutedMNIST (Task-Incremental)
perm_mnist = PermutedMNIST(
n_experiences=2,
seed=1234,
)# creating the benchmark instance (scenario object)
perm_mnist = PermutedMNIST(
n_experiences=3,
seed=1234,
)
# recovering the train and test streams
train_stream = perm_mnist.train_stream
test_stream = perm_mnist.test_stream
# iterating over the train stream
for experience in train_stream:
print("Start of task ", experience.task_label)
print('Classes in this task:', experience.classes_in_this_experience)
# The current Pytorch training set can be easily recovered through the
# experience
current_training_set = experience.dataset
# ...as well as the task_label
print('Task {}'.format(experience.task_label))
print('This task contains', len(current_training_set), 'training examples')
# we can recover the corresponding test experience in the test stream
current_test_set = test_stream[experience.current_experience].dataset
print('This task contains', len(current_test_set), 'test examples')from avalanche.benchmarks.generators import nc_benchmark, ni_benchmarkfrom torchvision.transforms import Compose, ToTensor, Normalize, RandomCrop
train_transform = Compose([
RandomCrop(28, padding=4),
ToTensor(),
Normalize((0.1307,), (0.3081,))
])
test_transform = Compose([
ToTensor(),
Normalize((0.1307,), (0.3081,))
])
mnist_train = MNIST(
'./data/mnist', train=True, download=True, transform=train_transform
)
mnist_test = MNIST(
'./data/mnist', train=False, download=True, transform=test_transform
)scenario = ni_benchmark(
mnist_train, mnist_test, n_experiences=10, shuffle=True, seed=1234,
balance_experiences=True
)
train_stream = scenario.train_stream
for experience in train_stream:
t = experience.task_label
exp_id = experience.current_experience
training_dataset = experience.dataset
print('Task {} batch {} -> train'.format(t, exp_id))
print('This batch contains', len(training_dataset), 'patterns')scenario = nc_benchmark(
mnist_train, mnist_test, n_experiences=10, shuffle=True, seed=1234,
task_labels=False
)
train_stream = scenario.train_stream
for experience in train_stream:
t = experience.task_label
exp_id = experience.current_experience
training_dataset = experience.dataset
print('Task {} batch {} -> train'.format(t, exp_id))
print('This batch contains', len(training_dataset), 'patterns')from avalanche.benchmarks.generators import filelist_benchmark, dataset_benchmark, \
tensors_benchmark, paths_benchmark!wget -N --no-check-certificate \
https://storage.googleapis.com/mledu-datasets/cats_and_dogs_filtered.zip
!unzip -q -o cats_and_dogs_filtered.zipimport os
# let's create the filelists since we don't have it
dirpath = "cats_and_dogs_filtered/train"
for filelist, rel_dir, t_label in zip(
["train_filelist_00.txt", "train_filelist_01.txt"],
["cats", "dogs"],
[0, 1]):
# First, obtain the list of files
filenames_list = os.listdir(os.path.join(dirpath, rel_dir))
# Create the text file containing the filelist
# Filelists must be in Caffe-style, which means
# that they must define path in the format:
#
# relative_path_img1 class_label_first_img
# relative_path_img2 class_label_second_img
# ...
#
# For instance:
# cat/cat_0.png 1
# dog/dog_54.png 0
# cat/cat_3.png 1
# ...
#
# Paths are relative to a root path
# (specified when calling filelist_benchmark)
with open(filelist, "w") as wf:
for name in filenames_list:
wf.write(
"{} {}\n".format(os.path.join(rel_dir, name), t_label)
)
# Here we create a GenericCLScenario ready to be iterated
generic_scenario = filelist_benchmark(
dirpath,
["train_filelist_00.txt", "train_filelist_01.txt"],
["train_filelist_00.txt"],
task_labels=[0, 0],
complete_test_set_only=True,
train_transform=ToTensor(),
eval_transform=ToTensor()
)train_experiences = []
for rel_dir, label in zip(
["cats", "dogs"],
[0, 1]):
# First, obtain the list of files
filenames_list = os.listdir(os.path.join(dirpath, rel_dir))
# Don't create a file list: instead, we create a list of
# paths + class labels
experience_paths = []
for name in filenames_list:
instance_tuple = (os.path.join(dirpath, rel_dir, name), label)
experience_paths.append(instance_tuple)
train_experiences.append(experience_paths)
# Here we create a GenericCLScenario ready to be iterated
generic_scenario = paths_benchmark(
train_experiences,
[train_experiences[0]], # Single test set
task_labels=[0, 0],
complete_test_set_only=True,
train_transform=ToTensor(),
eval_transform=ToTensor()
)train_cifar10 = CIFAR10(
'./data/cifar10', train=True, download=True
)
test_cifar10 = CIFAR10(
'./data/cifar10', train=False, download=True
)
generic_scenario = dataset_benchmark(
[train_MNIST, train_cifar10],
[test_MNIST, test_cifar10]
)# Alternatively, task labels can also be a list (or tensor)
# containing the task label of each pattern
from avalanche.benchmarks.utils import make_classification_dataset
train_MNIST_task0 = make_classification_dataset(train_cifar10, task_labels=0)
test_MNIST_task0 = make_classification_dataset(test_cifar10, task_labels=0)
train_cifar10_task1 = make_classification_dataset(train_cifar10, task_labels=1)
test_cifar10_task1 = make_classification_dataset(test_cifar10, task_labels=1)
scenario_custom_task_labels = dataset_benchmark(
[train_MNIST_task0, train_cifar10_task1],
[test_MNIST_task0, test_cifar10_task1]
)
print('Without custom task labels:',
generic_scenario.train_stream[1].task_label)
print('With custom task labels:',
scenario_custom_task_labels.train_stream[1].task_label)pattern_shape = (3, 32, 32)
# Definition of training experiences
# Experience 1
experience_1_x = torch.zeros(100, *pattern_shape)
experience_1_y = torch.zeros(100, dtype=torch.long)
# Experience 2
experience_2_x = torch.zeros(80, *pattern_shape)
experience_2_y = torch.ones(80, dtype=torch.long)
# Test experience
# For this example we define a single test experience,
# but "tensors_benchmark" allows you to define even more than one!
test_x = torch.zeros(50, *pattern_shape)
test_y = torch.zeros(50, dtype=torch.long)
generic_scenario = tensors_benchmark(
train_tensors=[(experience_1_x, experience_1_y), (experience_2_x, experience_2_y)],
test_tensors=[(test_x, test_y)],
task_labels=[0, 0], # Task label of each train exp
complete_test_set_only=True
)