Dataloaders, Buffers, and Replay
Avalanche provides several components that help you to balance data loading and implement rehearsal strategies.
Dataloaders are used to provide balancing between groups (e.g. tasks/classes/experiences). This is especially useful when you have unbalanced data.
Buffers are used to store data from the previous experiences. They are dynamic datasets with a fixed maximum size, and they can be updated with new data continuously.
Finally, Replay strategies implement rehearsal by using Avalanche's plugin system. Most rehearsal strategies use a custom dataloader to balance the buffer with the current experience and a buffer that is updated for each experience.

Dataloaders

Avalanche dataloaders are simple iterators, located under avalanche.benchmarks.utils.data_loader. Their interface is equivalent to pytorch's dataloaders. For example, GroupBalancedDataLoader takes a sequence of datasets and iterates over them by providing balanced mini-batches, where the number of samples is split equally among groups. Internally, it instantiate a DataLoader for each separate group. More specialized dataloaders exist such as TaskBalancedDataLoader.
All the dataloaders accept keyword arguments (**kwargs) that are passed directly to the dataloaders for each group.
1
from avalanche.benchmarks import SplitMNIST
2
from avalanche.benchmarks.utils.data_loader import GroupBalancedDataLoader
3
benchmark = SplitMNIST(5, return_task_id=True)
4
โ€‹
5
dl = GroupBalancedDataLoader([exp.dataset for exp in benchmark.train_stream], batch_size=4)
6
for x, y, t in dl:
7
print(t.tolist())
8
break
Copied!

Memory Buffers

Memory buffers store data up to a maximum capacity, and they implement policies to select which data to store and which the to remove when the buffer is full. They are available in the module avalanche.training.storage_policy. The base class is the ExemplarsBuffer, which implements two methods:
  • update(strategy) - given the strategy's state it updates the buffer (using the data in strategy.experience.dataset).
  • resize(strategy, new_size) - updates the maximum size and updates the buffer accordingly.
The data can be access using the attribute buffer.
1
from avalanche.training.storage_policy import ReservoirSamplingBuffer
2
from types import SimpleNamespace
3
โ€‹
4
benchmark = SplitMNIST(5, return_task_id=False)
5
storage_p = ReservoirSamplingBuffer(max_size=30)
6
โ€‹
7
print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
Copied!
At first, the buffer is empty. We can update it with data from a new experience.
Notice that we use a SimpleNamespace because we want to use the buffer standalone, without instantiating an Avalanche strategy. Reservoir sampling requires only the experience from the strategy's state.
1
for i in range(5):
2
strategy_state = SimpleNamespace(experience=benchmark.train_stream[i])
3
storage_p.update(strategy_state)
4
print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
5
print(f"class targets: {storage_p.buffer.targets}\n")
Copied!
Notice after each update some samples are substituted with new data. Reservoir sampling select these samples randomly.
Avalanche offers many more storage policies. For example, ParametricBuffer is a buffer split into several groups according to the groupby parameters (None, 'class', 'task', 'experience'), and according to an optional ExemplarsSelectionStrategy (random selection is the default choice).
1
from avalanche.training.storage_policy import ParametricBuffer, RandomExemplarsSelectionStrategy
2
storage_p = ParametricBuffer(
3
max_size=30,
4
groupby='class',
5
selection_strategy=RandomExemplarsSelectionStrategy()
6
)
7
โ€‹
8
print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
9
for i in range(5):
10
strategy_state = SimpleNamespace(experience=benchmark.train_stream[i])
11
storage_p.update(strategy_state)
12
print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
13
print(f"class targets: {storage_p.buffer.targets}\n")
Copied!
The advantage of using grouping buffers is that you get a balanced rehearsal buffer. You can even access the groups separately with the buffer_groups attribute. Combined with balanced dataloaders, you can ensure that the mini-batches stay balanced during training.
1
for k, v in storage_p.buffer_groups.items():
2
print(f"(group {k}) -> size {len(v.buffer)}")
Copied!
1
datas = [v.buffer for v in storage_p.buffer_groups.values()]
2
dl = GroupBalancedDataLoader(datas)
3
โ€‹
4
for x, y, t in dl:
5
print(y.tolist())
6
break
Copied!

Replay Plugins

Avalanche's strategy plugins can be used to update the rehearsal buffer and set the dataloader. This allows to easily implement replay strategies:
1
from avalanche.benchmarks.utils.data_loader import ReplayDataLoader
2
from avalanche.training.plugins import StrategyPlugin
3
โ€‹
4
class CustomReplay(StrategyPlugin):
5
def __init__(self, storage_policy):
6
super().__init__()
7
self.storage_policy = storage_policy
8
โ€‹
9
def before_training_exp(self, strategy,
10
num_workers: int = 0, shuffle: bool = True,
11
**kwargs):
12
""" Here we set the dataloader. """
13
if len(self.storage_policy.buffer) == 0:
14
# first experience. We don't use the buffer, no need to change
15
# the dataloader.
16
return
17
โ€‹
18
# replay dataloader samples mini-batches from the memory and current
19
# data separately and combines them together.
20
print("Override the dataloader.")
21
strategy.dataloader = ReplayDataLoader(
22
strategy.adapted_dataset,
23
self.storage_policy.buffer,
24
oversample_small_tasks=True,
25
num_workers=num_workers,
26
batch_size=strategy.train_mb_size,
27
shuffle=shuffle)
28
โ€‹
29
def after_training_exp(self, strategy: "BaseStrategy", **kwargs):
30
""" We update the buffer after the experience.
31
You can use a different callback to update the buffer in a different place
32
"""
33
print("Buffer update.")
34
self.storage_policy.update(strategy, **kwargs)
Copied!
And of course, we can use the plugin to train our continual model
1
from torch.nn import CrossEntropyLoss
2
from avalanche.training import Naive
3
from avalanche.evaluation.metrics import accuracy_metrics
4
from avalanche.training.plugins import EvaluationPlugin
5
from avalanche.logging import InteractiveLogger
6
from avalanche.models import SimpleMLP
7
import torch
8
โ€‹
9
scenario = SplitMNIST(5)
10
model = SimpleMLP(num_classes=scenario.n_classes)
11
storage_p = ParametricBuffer(
12
max_size=500,
13
groupby='class',
14
selection_strategy=RandomExemplarsSelectionStrategy()
15
)
16
โ€‹
17
# choose some metrics and evaluation method
18
interactive_logger = InteractiveLogger()
19
โ€‹
20
eval_plugin = EvaluationPlugin(
21
accuracy_metrics(experience=True, stream=True),
22
loggers=[interactive_logger])
23
โ€‹
24
# CREATE THE STRATEGY INSTANCE (NAIVE)
25
cl_strategy = Naive(model, torch.optim.Adam(model.parameters(), lr=0.001),
26
CrossEntropyLoss(),
27
train_mb_size=100, train_epochs=1, eval_mb_size=100,
28
plugins=[CustomReplay(storage_p)],
29
evaluator=eval_plugin
30
)
31
โ€‹
32
# TRAINING LOOP
33
print('Starting experiment...')
34
results = []
35
for experience in scenario.train_stream:
36
print("Start of experience ", experience.current_experience)
37
cl_strategy.train(experience)
38
print('Training completed')
39
โ€‹
40
print('Computing accuracy on the whole test set')
41
results.append(cl_strategy.eval(scenario.test_stream))
Copied!
Last modified 1mo ago
Export as PDF
Copy link