Avalanche provides several components that help you to balance data loading and implement rehearsal strategies.
Dataloaders are used to provide balancing between groups (e.g. tasks/classes/experiences). This is especially useful when you have unbalanced data.
Buffers are used to store data from the previous experiences. They are dynamic datasets with a fixed maximum size, and they can be updated with new data continuously.
Finally, Replay strategies implement rehearsal by using Avalanche's plugin system. Most rehearsal strategies use a custom dataloader to balance the buffer with the current experience and a buffer that is updated for each experience.
First, let's install Avalanche. You can skip this step if you have installed it already.
!pip install avalanche-lib
Dataloaders
Avalanche dataloaders are simple iterators, located under avalanche.benchmarks.utils.data_loader. Their interface is equivalent to pytorch's dataloaders. For example, GroupBalancedDataLoader takes a sequence of datasets and iterates over them by providing balanced mini-batches, where the number of samples is split equally among groups. Internally, it instantiate a DataLoader for each separate group. More specialized dataloaders exist such as TaskBalancedDataLoader.
All the dataloaders accept keyword arguments (**kwargs) that are passed directly to the dataloaders for each group.
from avalanche.benchmarks import SplitMNISTfrom avalanche.benchmarks.utils.data_loader import GroupBalancedDataLoaderbenchmark =SplitMNIST(5, return_task_id=True)dl =GroupBalancedDataLoader([exp.dataset for exp in benchmark.train_stream], batch_size=4)for x, y, t in dl:print(t.tolist())break
Memory Buffers
Memory buffers store data up to a maximum capacity, and they implement policies to select which data to store and which the to remove when the buffer is full. They are available in the module avalanche.training.storage_policy. The base class is the ExemplarsBuffer, which implements two methods:
update(strategy) - given the strategy's state it updates the buffer (using the data in strategy.experience.dataset).
resize(strategy, new_size) - updates the maximum size and updates the buffer accordingly.
The data can be access using the attribute buffer.
from avalanche.training.storage_policy import ReservoirSamplingBufferfrom types import SimpleNamespacebenchmark =SplitMNIST(5, return_task_id=False)storage_p =ReservoirSamplingBuffer(max_size=30)print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
At first, the buffer is empty. We can update it with data from a new experience.
Notice that we use a SimpleNamespace because we want to use the buffer standalone, without instantiating an Avalanche strategy. Reservoir sampling requires only the experience from the strategy's state.
for i inrange(5): strategy_state =SimpleNamespace(experience=benchmark.train_stream[i]) storage_p.update(strategy_state)print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")print(f"class targets: {storage_p.buffer.targets}\n")
Notice after each update some samples are substituted with new data. Reservoir sampling select these samples randomly.
Avalanche offers many more storage policies. For example, ParametricBuffer is a buffer split into several groups according to the groupby parameters (None, 'class', 'task', 'experience'), and according to an optional ExemplarsSelectionStrategy (random selection is the default choice).
from avalanche.training.storage_policy import ParametricBuffer, RandomExemplarsSelectionStrategystorage_p =ParametricBuffer( max_size=30, groupby='class', selection_strategy=RandomExemplarsSelectionStrategy())print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")for i inrange(5): strategy_state =SimpleNamespace(experience=benchmark.train_stream[i]) storage_p.update(strategy_state)print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")print(f"class targets: {storage_p.buffer.targets}\n")
The advantage of using grouping buffers is that you get a balanced rehearsal buffer. You can even access the groups separately with the buffer_groups attribute. Combined with balanced dataloaders, you can ensure that the mini-batches stay balanced during training.
for k, v in storage_p.buffer_groups.items():print(f"(group {k}) -> size {len(v.buffer)}")
datas = [v.buffer for v in storage_p.buffer_groups.values()]dl =GroupBalancedDataLoader(datas)for x, y, t in dl:print(y.tolist())break
Replay Plugins
Avalanche's strategy plugins can be used to update the rehearsal buffer and set the dataloader. This allows to easily implement replay strategies:
from avalanche.benchmarks.utils.data_loader import ReplayDataLoaderfrom avalanche.training.plugins import StrategyPluginclassCustomReplay(StrategyPlugin):def__init__(self,storage_policy):super().__init__() self.storage_policy = storage_policydefbefore_training_exp(self,strategy,num_workers:int=0,shuffle:bool=True,**kwargs):""" Here we set the dataloader. """iflen(self.storage_policy.buffer)==0:# first experience. We don't use the buffer, no need to change# the dataloader.return# replay dataloader samples mini-batches from the memory and current# data separately and combines them together.print("Override the dataloader.") strategy.dataloader =ReplayDataLoader( strategy.adapted_dataset, self.storage_policy.buffer, oversample_small_tasks=True, num_workers=num_workers, batch_size=strategy.train_mb_size, shuffle=shuffle)defafter_training_exp(self,strategy:"BaseStrategy",**kwargs):""" We update the buffer after the experience. You can use a different callback to update the buffer in a different place """print("Buffer update.") self.storage_policy.update(strategy, **kwargs)
And of course, we can use the plugin to train our continual model
from torch.nn import CrossEntropyLossfrom avalanche.training import Naivefrom avalanche.evaluation.metrics import accuracy_metricsfrom avalanche.training.plugins import EvaluationPluginfrom avalanche.logging import InteractiveLoggerfrom avalanche.models import SimpleMLPimport torchscenario =SplitMNIST(5)model =SimpleMLP(num_classes=scenario.n_classes)storage_p =ParametricBuffer( max_size=500, groupby='class', selection_strategy=RandomExemplarsSelectionStrategy())# choose some metrics and evaluation methodinteractive_logger =InteractiveLogger()eval_plugin =EvaluationPlugin(accuracy_metrics(experience=True, stream=True), loggers=[interactive_logger])# CREATE THE STRATEGY INSTANCE (NAIVE)cl_strategy =Naive(model, torch.optim.Adam(model.parameters(), lr=0.001),CrossEntropyLoss(), train_mb_size=100, train_epochs=1, eval_mb_size=100, plugins=[CustomReplay(storage_p)], evaluator=eval_plugin )# TRAINING LOOPprint('Starting experiment...')results = []for experience in scenario.train_stream:print("Start of experience ", experience.current_experience) cl_strategy.train(experience)print('Training completed')print('Computing accuracy on the whole test set') results.append(cl_strategy.eval(scenario.test_stream))