Dataloaders, Buffers, and Replay
How to implement replay and data loading
Avalanche provides several components that help you to balance data loading and implement rehearsal strategies.
Dataloaders are used to provide balancing between groups (e.g. tasks/classes/experiences). This is especially useful when you have unbalanced data.
Buffers are used to store data from the previous experiences. They are dynamic datasets with a fixed maximum size, and they can be updated with new data continuously.
Finally, Replay strategies implement rehearsal by using Avalanche's plugin system. Most rehearsal strategies use a custom dataloader to balance the buffer with the current experience and a buffer that is updated for each experience.
First, let's install Avalanche. You can skip this step if you have installed it already.
1
!pip install avalanche-lib==0.1.0
Copied!

Dataloaders

Avalanche dataloaders are simple iterators, located under avalanche.benchmarks.utils.data_loader. Their interface is equivalent to pytorch's dataloaders. For example, GroupBalancedDataLoader takes a sequence of datasets and iterates over them by providing balanced mini-batches, where the number of samples is split equally among groups. Internally, it instantiate a DataLoader for each separate group. More specialized dataloaders exist such as TaskBalancedDataLoader.
All the dataloaders accept keyword arguments (**kwargs) that are passed directly to the dataloaders for each group.
1
from avalanche.benchmarks import SplitMNIST
2
from avalanche.benchmarks.utils.data_loader import GroupBalancedDataLoader
3
benchmark = SplitMNIST(5, return_task_id=True)
4
โ€‹
5
dl = GroupBalancedDataLoader([exp.dataset for exp in benchmark.train_stream], batch_size=4)
6
for x, y, t in dl:
7
print(t.tolist())
8
break
Copied!

Memory Buffers

Memory buffers store data up to a maximum capacity, and they implement policies to select which data to store and which the to remove when the buffer is full. They are available in the module avalanche.training.storage_policy. The base class is the ExemplarsBuffer, which implements two methods:
  • update(strategy) - given the strategy's state it updates the buffer (using the data in strategy.experience.dataset).
  • resize(strategy, new_size) - updates the maximum size and updates the buffer accordingly.
The data can be access using the attribute buffer.
1
from avalanche.training.storage_policy import ReservoirSamplingBuffer
2
from types import SimpleNamespace
3
โ€‹
4
benchmark = SplitMNIST(5, return_task_id=False)
5
storage_p = ReservoirSamplingBuffer(max_size=30)
6
โ€‹
7
print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
Copied!
At first, the buffer is empty. We can update it with data from a new experience.
Notice that we use a SimpleNamespace because we want to use the buffer standalone, without instantiating an Avalanche strategy. Reservoir sampling requires only the experience from the strategy's state.
1
for i in range(5):
2
strategy_state = SimpleNamespace(experience=benchmark.train_stream[i])
3
storage_p.update(strategy_state)
4
print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
5
print(f"class targets: {storage_p.buffer.targets}\n")
Copied!
Notice after each update some samples are substituted with new data. Reservoir sampling select these samples randomly.
Avalanche offers many more storage policies. For example, ParametricBuffer is a buffer split into several groups according to the groupby parameters (None, 'class', 'task', 'experience'), and according to an optional ExemplarsSelectionStrategy (random selection is the default choice).
1
from avalanche.training.storage_policy import ParametricBuffer, RandomExemplarsSelectionStrategy
2
storage_p = ParametricBuffer(
3
max_size=30,
4
groupby='class',
5
selection_strategy=RandomExemplarsSelectionStrategy()
6
)
7
โ€‹
8
print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
9
for i in range(5):
10
strategy_state = SimpleNamespace(experience=benchmark.train_stream[i])
11
storage_p.update(strategy_state)
12
print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
13
print(f"class targets: {storage_p.buffer.targets}\n")
Copied!
The advantage of using grouping buffers is that you get a balanced rehearsal buffer. You can even access the groups separately with the buffer_groups attribute. Combined with balanced dataloaders, you can ensure that the mini-batches stay balanced during training.
1
for k, v in storage_p.buffer_groups.items():
2
print(f"(group {k}) -> size {len(v.buffer)}")
Copied!
1
datas = [v.buffer for v in storage_p.buffer_groups.values()]
2
dl = GroupBalancedDataLoader(datas)
3
โ€‹
4
for x, y, t in dl:
5
print(y.tolist())
6
break
Copied!

Replay Plugins

Avalanche's strategy plugins can be used to update the rehearsal buffer and set the dataloader. This allows to easily implement replay strategies:
1
from avalanche.benchmarks.utils.data_loader import ReplayDataLoader
2
from avalanche.training.plugins import StrategyPlugin
3
โ€‹
4
class CustomReplay(StrategyPlugin):
5
def __init__(self, storage_policy):
6
super().__init__()
7
self.storage_policy = storage_policy
8
โ€‹
9
def before_training_exp(self, strategy,
10
num_workers: int = 0, shuffle: bool = True,
11
**kwargs):
12
""" Here we set the dataloader. """
13
if len(self.storage_policy.buffer) == 0:
14
# first experience. We don't use the buffer, no need to change
15
# the dataloader.
16
return
17
โ€‹
18
# replay dataloader samples mini-batches from the memory and current
19
# data separately and combines them together.
20
print("Override the dataloader.")
21
strategy.dataloader = ReplayDataLoader(
22
strategy.adapted_dataset,
23
self.storage_policy.buffer,
24
oversample_small_tasks=True,
25
num_workers=num_workers,
26
batch_size=strategy.train_mb_size,
27
shuffle=shuffle)
28
โ€‹
29
def after_training_exp(self, strategy: "BaseStrategy", **kwargs):
30
""" We update the buffer after the experience.
31
You can use a different callback to update the buffer in a different place
32
"""
33
print("Buffer update.")
34
self.storage_policy.update(strategy, **kwargs)
Copied!
And of course, we can use the plugin to train our continual model
1
from torch.nn import CrossEntropyLoss
2
from avalanche.training import Naive
3
from avalanche.evaluation.metrics import accuracy_metrics
4
from avalanche.training.plugins import EvaluationPlugin
5
from avalanche.logging import InteractiveLogger
6
from avalanche.models import SimpleMLP
7
import torch
8
โ€‹
9
scenario = SplitMNIST(5)
10
model = SimpleMLP(num_classes=scenario.n_classes)
11
storage_p = ParametricBuffer(
12
max_size=500,
13
groupby='class',
14
selection_strategy=RandomExemplarsSelectionStrategy()
15
)
16
โ€‹
17
# choose some metrics and evaluation method
18
interactive_logger = InteractiveLogger()
19
โ€‹
20
eval_plugin = EvaluationPlugin(
21
accuracy_metrics(experience=True, stream=True),
22
loggers=[interactive_logger])
23
โ€‹
24
# CREATE THE STRATEGY INSTANCE (NAIVE)
25
cl_strategy = Naive(model, torch.optim.Adam(model.parameters(), lr=0.001),
26
CrossEntropyLoss(),
27
train_mb_size=100, train_epochs=1, eval_mb_size=100,
28
plugins=[CustomReplay(storage_p)],
29
evaluator=eval_plugin
30
)
31
โ€‹
32
# TRAINING LOOP
33
print('Starting experiment...')
34
results = []
35
for experience in scenario.train_stream:
36
print("Start of experience ", experience.current_experience)
37
cl_strategy.train(experience)
38
print('Training completed')
39
โ€‹
40
print('Computing accuracy on the whole test set')
41
results.append(cl_strategy.eval(scenario.test_stream))
Copied!
Export as PDF
Copy link