description: How to implement replay and data loading
Dataloading, Memory Buffers, and Replay
Avalanche provides several components that help you to balance data loading and implement rehearsal strategies.
Dataloaders are used to provide balancing between groups (e.g. tasks/classes/experiences). This is especially useful when you have unbalanced data.
Buffers are used to store data from the previous experiences. They are dynamic datasets with a fixed maximum size, and they can be updated with new data continuously.
Finally, Replay strategies implement rehearsal by using Avalanche's plugin system. Most rehearsal strategies use a custom dataloader to balance the buffer with the current experience and a buffer that is updated for each experience.
First, let's install Avalanche. You can skip this step if you have installed it already.
!pip install avalanche-lib
Dataloaders
Avalanche dataloaders are simple iterators, located under avalanche.benchmarks.utils.data_loader. Their interface is equivalent to pytorch's dataloaders. For example, GroupBalancedDataLoader takes a sequence of datasets and iterates over them by providing balanced mini-batches, where the number of samples is split equally among groups. Internally, it instantiate a DataLoader for each separate group. More specialized dataloaders exist such as TaskBalancedDataLoader.
All the dataloaders accept keyword arguments (**kwargs) that are passed directly to the dataloaders for each group.
from avalanche.benchmarks import SplitMNISTfrom avalanche.benchmarks.utils.data_loader import GroupBalancedDataLoaderbenchmark =SplitMNIST(5, return_task_id=True)dl =GroupBalancedDataLoader([exp.dataset for exp in benchmark.train_stream], batch_size=5)for x, y, t in dl:print(t.tolist())break
Memory Buffers
Memory buffers store data up to a maximum capacity, and they implement policies to select which data to store and which the to remove when the buffer is full. They are available in the module avalanche.training.storage_policy. The base class is the ExemplarsBuffer, which implements two methods:
update(strategy) - given the strategy's state it updates the buffer (using the data in strategy.experience.dataset).
resize(strategy, new_size) - updates the maximum size and updates the buffer accordingly.
The data can be access using the attribute buffer.
from avalanche.training.storage_policy import ReservoirSamplingBufferfrom types import SimpleNamespacebenchmark =SplitMNIST(5, return_task_id=False)storage_p =ReservoirSamplingBuffer(max_size=30)print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
At first, the buffer is empty. We can update it with data from a new experience.
Notice that we use a SimpleNamespace because we want to use the buffer standalone, without instantiating an Avalanche strategy. Reservoir sampling requires only the experience from the strategy's state.
for i inrange(5): strategy_state =SimpleNamespace(experience=benchmark.train_stream[i]) storage_p.update(strategy_state)print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")print(f"class targets: {storage_p.buffer.targets.uniques}\n")
Notice after each update some samples are substituted with new data. Reservoir sampling select these samples randomly.
Avalanche offers many more storage policies. For example, ParametricBuffer is a buffer split into several groups according to the groupby parameters (None, 'class', 'task', 'experience'), and according to an optional ExemplarsSelectionStrategy (random selection is the default choice).
from avalanche.training.storage_policy import ParametricBuffer, RandomExemplarsSelectionStrategystorage_p =ParametricBuffer( max_size=30, groupby='class', selection_strategy=RandomExemplarsSelectionStrategy())print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")for i inrange(5): strategy_state =SimpleNamespace(experience=benchmark.train_stream[i]) storage_p.update(strategy_state)print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")print(f"class targets: {storage_p.buffer.targets.uniques}\n")
The advantage of using grouping buffers is that you get a balanced rehearsal buffer. You can even access the groups separately with the buffer_groups attribute. Combined with balanced dataloaders, you can ensure that the mini-batches stay balanced during training.
for k, v in storage_p.buffer_groups.items():print(f"(group {k}) -> size {len(v.buffer)}")
datas = [v.buffer for v in storage_p.buffer_groups.values()]dl =GroupBalancedDataLoader(datas)for x, y, t in dl:print(y.tolist())break
Replay Plugins
Avalanche's strategy plugins can be used to update the rehearsal buffer and set the dataloader. This allows to easily implement replay strategies:
from avalanche.benchmarks.utils.data_loader import ReplayDataLoaderfrom avalanche.training.plugins import SupervisedPluginclassCustomReplay(SupervisedPlugin):def__init__(self,storage_policy):super().__init__() self.storage_policy = storage_policydefbefore_training_exp(self,strategy,num_workers:int=0,shuffle:bool=True,**kwargs):""" Here we set the dataloader. """iflen(self.storage_policy.buffer)==0:# first experience. We don't use the buffer, no need to change# the dataloader.return# replay dataloader samples mini-batches from the memory and current# data separately and combines them together.print("Override the dataloader.") strategy.dataloader =ReplayDataLoader( strategy.adapted_dataset, self.storage_policy.buffer, oversample_small_tasks=True, num_workers=num_workers, batch_size=strategy.train_mb_size, shuffle=shuffle)defafter_training_exp(self,strategy:"BaseStrategy",**kwargs):""" We update the buffer after the experience. You can use a different callback to update the buffer in a different place """print("Buffer update.") self.storage_policy.update(strategy, **kwargs)
And of course, we can use the plugin to train our continual model
from torch.nn import CrossEntropyLossfrom avalanche.training import Naivefrom avalanche.evaluation.metrics import accuracy_metricsfrom avalanche.training.plugins import EvaluationPluginfrom avalanche.logging import InteractiveLoggerfrom avalanche.models import SimpleMLPimport torchscenario =SplitMNIST(5)model =SimpleMLP(num_classes=scenario.n_classes)storage_p =ParametricBuffer( max_size=500, groupby='class', selection_strategy=RandomExemplarsSelectionStrategy())# choose some metrics and evaluation methodinteractive_logger =InteractiveLogger()eval_plugin =EvaluationPlugin(accuracy_metrics(experience=True, stream=True), loggers=[interactive_logger])# CREATE THE STRATEGY INSTANCE (NAIVE)cl_strategy =Naive(model, torch.optim.Adam(model.parameters(), lr=0.001),CrossEntropyLoss(), train_mb_size=100, train_epochs=1, eval_mb_size=100, plugins=[CustomReplay(storage_p)], evaluator=eval_plugin )# TRAINING LOOPprint('Starting experiment...')results = []for experience in scenario.train_stream:print("Start of experience ", experience.current_experience) cl_strategy.train(experience)print('Training completed')print('Computing accuracy on the whole test set') results.append(cl_strategy.eval(scenario.test_stream))