import torch
import random
import numpy as np
import torch.nn as nn
from tqdm import tqdm
import matplotlib.pyplot as plt
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torch.optim.lr_scheduler as lr_scheduler
from torch.utils.data.sampler import SubsetRandomSampler
# Torchvison
import torchvision.transforms as T
from torchvision.datasets import CIFAR10
Why Active Learning?
- Experimental result of semi-supervised learning suggests that higher portion of annotated data ensures better performance.
- But, annotation is expensive and time-consuming. Thus, limiting the budget.
- Active learning is a solution to this problem as it selects data points that are most informative and asks for annotation of these data points only.
What is Active Learning?
- Model actively selects data points that the model is uncertain about. Using the selected data points, the model is retrained.
- Eg: In a classification task, the model selects data points that are close to the decision boundary. Later, the model is retrained using the selected data points.
- Lets’ demonstrate this using CIFAR-10 example.
import os
"TF_XLA_FLAGS"] = "--tf_xla_enable_xla_devices" ## setting the environment
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "1" ## using GPU core 1 os.environ[
Model Architecture
class BasicBlock(nn.Module):
= 1
expansion
def __init__(self, in_planes, planes, stride=1):
super(BasicBlock, self).__init__()
self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(planes)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes)
self.shortcut = nn.Sequential()
if stride != 1 or in_planes != self.expansion*planes:
self.shortcut = nn.Sequential(
self.expansion*planes, kernel_size=1, stride=stride, bias=False),
nn.Conv2d(in_planes, self.expansion*planes)
nn.BatchNorm2d(
)
def forward(self, x):
= F.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += self.shortcut(x)
out = F.relu(out)
out return out
class ResNet(nn.Module):
def __init__(self, block, num_blocks, num_classes=10):
super(ResNet, self).__init__()
self.in_planes = 64
self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(64)
self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
self.linear = nn.Linear(512*block.expansion, num_classes)
def _make_layer(self, block, planes, num_blocks, stride):
= [stride] + [1]*(num_blocks-1)
strides = []
layers for stride in strides:
self.in_planes, planes, stride))
layers.append(block(self.in_planes = planes * block.expansion
return nn.Sequential(*layers)
def forward(self, x):
= F.relu(self.bn1(self.conv1(x)))
out = self.layer1(out)
out1 = self.layer2(out1)
out2 = self.layer3(out2)
out3 = self.layer4(out3)
out4 = F.avg_pool2d(out4, 4)
out = out.view(out.size(0), -1)
out = self.linear(out)
out return out, [out1, out2, out3, out4]
def ResNet18(num_classes = 10):
return ResNet(BasicBlock, [2,2,2,2], num_classes)
Data Transformation (CIFAR-10)
= T.Compose([
train_transform
T.RandomHorizontalFlip(),=32, padding=4),
T.RandomCrop(size
T.ToTensor(),0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010])
T.Normalize([
])
= T.Compose([
test_transform
T.ToTensor(),0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010])
T.Normalize([ ])
= CIFAR10('../cifar10', train=True, download=True, transform=train_transform)
cifar10_train = CIFAR10('../cifar10', train=True, download=True, transform=test_transform)
cifar10_unlabeled = CIFAR10('../cifar10', train=False, download=True, transform=test_transform) cifar10_test
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
class SubsetSequentialSampler(torch.utils.data.Sampler):
def __init__(self, indices):
self.indices = indices
def __iter__(self):
return (self.indices[i] for i in range(len(self.indices)))
def __len__(self):
return len(self.indices)
Visualising the dataset
# plot some sample images from the dataset
= plt.subplots(nrows=3, ncols=5, figsize=(10, 6))
fig, axes for i, ax in enumerate(axes.flat):
= cifar10_train[i]
img, label = np.clip(img.permute(1, 2, 0), 0, 1) # clip the input image for the valid range for imshow
img
ax.imshow(img)f"Label: {label}")
ax.set_title('off')
ax.axis(
plt.tight_layout()
plt.show()
# cifar_10_labels = {0: 'airplane', 1: 'automobile', 2: 'bird', 3: 'cat', 4: 'deer',
# 5: 'dog', 6: 'frog', 7: 'horse', 8: 'ship', 9: 'truck'}
Training & Testing function
def train(model, criterion, optimizer, dataloaders, num_epochs, scheduler):
model.train()= []
train_accuracy for epoch in range(num_epochs):
scheduler.step()
= 0
train_total = 0
train_correct
for data in tqdm(dataloaders['train'], leave=False, total=len(dataloaders['train'])):
= data[0].cuda()
inputs = data[1].cuda()
labels
optimizer.zero_grad()
= model(inputs)
scores, features = criterion(scores, labels)
target_loss = torch.max(scores.data, 1)
_, preds += labels.size(0)
train_total += (preds == labels).sum().item()
train_correct
= torch.sum(target_loss) / target_loss.size(0)
loss
loss.backward()
optimizer.step()
= 100* train_correct / train_total
train_acc
train_accuracy.append(train_acc)
# Evaluating test loss
= 0
total = 0
correct eval()
model.for data in dataloaders['test']:
= data[0].cuda()
inputs = data[1].cuda()
labels
= model(inputs)
scores, features = torch.max(scores.data, 1)
_, preds += labels.size(0)
total += (preds == labels).sum().item()
correct
= 100* correct / total
test_acc
return train_accuracy, test_acc
Using Entropy as the acquisition function for active learning
def entropy(model, unlabeled_loader):
eval()
model.= []
entropies with torch.no_grad():
for data in unlabeled_loader:
= data[0].cuda()
inputs = model(inputs)
scores, _ = -torch.sum(F.softmax(scores, dim=1) * F.log_softmax(scores, dim=1), dim=1)
entropy
entropies.append(entropy.cpu().numpy())return np.concatenate(entropies)
## CONFIG
= 50000
NUM_TRAIN = 128
BATCH = 1000
ADDENDUM = 10000
SUBSET = 0.9
MOMENTUM = 5e-4
WDECAY = 0.01
LR = 5
CYCLES = [160]
MILESTONES = 40 EPOCH
Active Learning Loop
= list(range(NUM_TRAIN))
indices
random.shuffle(indices)= indices[:ADDENDUM]
labeled_set = indices[ADDENDUM:]
unlabeled_set
= []
test_accuracy_list = []
train_accuracy_list
= DataLoader(cifar10_train, batch_size=BATCH, sampler=SubsetRandomSampler(labeled_set), pin_memory=True)
train_loader = DataLoader(cifar10_test, batch_size=BATCH)
test_loader = {'train': train_loader, 'test': test_loader}
dataloaders
# Model
= ResNet18(num_classes=10).cuda()
resnet18 = False
torch.backends.cudnn.benchmark
# Active learning cycles
for cycle in range(CYCLES):
= nn.CrossEntropyLoss(reduction='none')
criterion = optim.SGD(resnet18.parameters(), lr=LR, momentum=MOMENTUM, weight_decay=WDECAY)
optimizer = lr_scheduler.MultiStepLR(optimizer, milestones=MILESTONES)
scheduler
# Training and test
= train(resnet18, criterion, optimizer, dataloaders, EPOCH, scheduler)
train_acc, test_acc
test_accuracy_list.append(test_acc)
train_accuracy_list.append(train_acc)print('Cycle {}/{} || Label set size {}: Test Accuracy {} '.format(cycle+1, CYCLES, len(labeled_set), test_acc))
random.shuffle(unlabeled_set)= unlabeled_set[:SUBSET]
subset
# Create unlabeled dataloader for the unlabeled subset
= DataLoader(cifar10_unlabeled, batch_size=BATCH, sampler=SubsetSequentialSampler(subset), pin_memory=True)
unlabeled_loader
# Measure uncertainty of each data points in the subset
= entropy(resnet18, unlabeled_loader)
uncertainty
# Index in ascending order
= np.argsort(uncertainty)
arg
# Update the labeled dataset and the unlabeled dataset, respectively
+= list(torch.tensor(subset)[arg][-ADDENDUM:].numpy())
labeled_set = list(torch.tensor(subset)[arg][:-ADDENDUM].numpy()) + unlabeled_set[SUBSET:]
unlabeled_set
# Create a new dataloader for the updated labeled dataset
'train'] = DataLoader(cifar10_train, batch_size=BATCH, sampler=SubsetRandomSampler(labeled_set),
dataloaders[=True) pin_memory
/home/project_3/anaconda3/envs/gan/lib/python3.10/site-packages/torch/optim/lr_scheduler.py:139: UserWarning: Detected call of `lr_scheduler.step()` before `optimizer.step()`. In PyTorch 1.1.0 and later, you should call them in the opposite order: `optimizer.step()` before `lr_scheduler.step()`. Failure to do this will result in PyTorch skipping the first value of the learning rate schedule. See more details at https://pytorch.org/docs/stable/optim.html#how-to-adjust-learning-rate
warnings.warn("Detected call of `lr_scheduler.step()` before `optimizer.step()`. "
Cycle 1/5 || Label set size 1000: Test Accuracy 39.01
Cycle 2/5 || Label set size 2000: Test Accuracy 51.99
Cycle 3/5 || Label set size 3000: Test Accuracy 57.39
Cycle 4/5 || Label set size 4000: Test Accuracy 62.73
Cycle 5/5 || Label set size 5000: Test Accuracy 65.35
= [1, 2, 3, 4, 5] # Active learning loop numbers
active_learning_loop
='o', linestyle='-')
plt.plot(active_learning_loop, test_accuracy_list, marker
'Active Learning Loop')
plt.xlabel('Test Accuracy')
plt.ylabel('Test Accuracy Over Active Learning Loop')
plt.title( plt.show()
Learning to Loss Active Learning
- In this active learning a loss prediction module is attached to the target network to predict the loss.
- This module is then used to suggest the data that the target module is likely to produce a wrong prediction on.
Loss for the loss prediction network
\[\begin{array}{r} L_{\text {loss }}\left(\hat{l p}, l^p\right)= \max \left(0,-\mathbb{1}\left(l_i, l_j\right) \cdot\left(\hat{l_i}-\hat{l_j}\right)+\xi\right) \text { s.t. } \mathbb{1}\left(l_i, l_j\right)= \begin{cases}+1, & \text { if } l_i>l_j \\ -1, & \text { otherwise }\end{cases} \end{array}\]We learn the loss prediction module by considering the difference between a pair of loss predictions, which completely make the loss prediction module discard the overall scale changes
def LossPredLoss(input, target, margin=1.0):
assert len(input) % 2 == 0, 'the batch size is not even.'
assert input.shape == input.flip(0).shape
input = (input - input.flip(0))[:len(input)//2] # [l_1 - l_2B, l_2 - l_2B-1, ... , l_B - l_B+1], where batch_size = 2B
= (target - target.flip(0))[:len(target)//2]
target = target.detach()
target
= 2 * torch.sign(torch.clamp(target, min=0)) - 1 # 1 operation which is defined by the authors
one = torch.sum(torch.clamp(margin - one * input, min=0))
loss = loss / input.size(0) # Note that the size of input is already halved
loss
return loss
LossNet
LossNet takes in as input the output from the intermediate layer of the target network to predict the loss.
class LossNet(nn.Module):
def __init__(self, feature_sizes=[32, 16, 8, 4], num_channels=[64, 128, 256, 512], interm_dim=128):
super(LossNet, self).__init__()
self.GAP1 = nn.AvgPool2d(feature_sizes[0])
self.GAP2 = nn.AvgPool2d(feature_sizes[1])
self.GAP3 = nn.AvgPool2d(feature_sizes[2])
self.GAP4 = nn.AvgPool2d(feature_sizes[3])
self.FC1 = nn.Linear(num_channels[0], interm_dim)
self.FC2 = nn.Linear(num_channels[1], interm_dim)
self.FC3 = nn.Linear(num_channels[2], interm_dim)
self.FC4 = nn.Linear(num_channels[3], interm_dim)
self.linear = nn.Linear(4 * interm_dim, 1)
def forward(self, features):
= self.GAP1(features[0])
out1 = out1.view(out1.size(0), -1)
out1 = F.relu(self.FC1(out1))
out1
= self.GAP2(features[1])
out2 = out2.view(out2.size(0), -1)
out2 = F.relu(self.FC2(out2))
out2
= self.GAP3(features[2])
out3 = out3.view(out3.size(0), -1)
out3 = F.relu(self.FC3(out3))
out3
= self.GAP4(features[3])
out4 = out4.view(out4.size(0), -1)
out4 = F.relu(self.FC4(out4))
out4
= self.linear(torch.cat((out1, out2, out3, out4), 1))
out return out
# Function for evaluating the test accuracy
def test(models, dataloaders, mode='val'):
assert mode == 'val' or mode == 'test'
'backbone'].eval()
models['module'].eval()
models[
= 0
total = 0
correct with torch.no_grad():
for (inputs, labels) in dataloaders[mode]:
= inputs.cuda()
inputs = labels.cuda()
labels
= models['backbone'](inputs)
scores, _ = torch.max(scores.data, 1)
_, preds += labels.size(0)
total += (preds == labels).sum().item()
correct
return 100 * correct / total
# Training function for both the target model and the loss prediction module
def train(models, criterion, optimizers, schedulers, dataloaders, num_epochs, epoch_loss):
for epoch in range(num_epochs):
'backbone'].step()
schedulers['module'].step()
schedulers[
'backbone'].train()
models['module'].train()
models[
for data in tqdm(dataloaders['train'], leave=False, total=len(dataloaders['train'])):
= data[0].cuda()
inputs = data[1].cuda()
labels
'backbone'].zero_grad()
optimizers['module'].zero_grad()
optimizers[
= models['backbone'](inputs)
scores, features = criterion(scores, labels)
target_loss
# After 120 epochs, stop the gradient from the loss prediction module propagated to the target model.
if epoch > epoch_loss:
0] = features[0].detach()
features[1] = features[1].detach()
features[2] = features[2].detach()
features[3] = features[3].detach()
features[= models['module'](features)
pred_loss = pred_loss.view(pred_loss.size(0))
pred_loss
# Combined loss function
= torch.sum(target_loss) / target_loss.size(0)
m_backbone_loss = LossPredLoss(pred_loss, target_loss, margin=MARGIN)
m_module_loss = m_backbone_loss + WEIGHT * m_module_loss
loss
loss.backward()'backbone'].step()
optimizers['module'].step() optimizers[
# Selecting the acquisition points based on the predictions of the loss prediction module
def get_uncertainty(models, unlabeled_loader):
'backbone'].eval()
models['module'].eval()
models[= torch.tensor([]).cuda()
uncertainty
with torch.no_grad():
for (inputs, labels) in unlabeled_loader:
= inputs.cuda()
inputs = models['backbone'](inputs)
scores, features = models['module'](features)
pred_loss = pred_loss.view(pred_loss.size(0))
pred_loss
= torch.cat((uncertainty, pred_loss), 0)
uncertainty return uncertainty.cpu()
= 120 # After EPOCHL, stop the gradient from the loss prediction module propagated to the target model.
EPOCHL = 1.0
MARGIN = 1.0 WEIGHT
= list(range(NUM_TRAIN))
indices
random.shuffle(indices)= indices[:ADDENDUM]
labeled_set = indices[ADDENDUM:]
unlabeled_set
= []
test_accuracy_lal
= DataLoader(cifar10_train, batch_size=BATCH, sampler=SubsetRandomSampler(labeled_set), pin_memory=True)
train_loader = DataLoader(cifar10_test, batch_size=BATCH)
test_loader = {'train': train_loader, 'test': test_loader}
dataloaders
# Model
= ResNet18(num_classes=10).cuda()
resnet18 = LossNet().cuda()
loss_module = {'backbone': resnet18, 'module': loss_module}
models = False
torch.backends.cudnn.benchmark
# Active learning cycles
for cycle in range(CYCLES):
# Loss, criterion and scheduler (re)initialization
= nn.CrossEntropyLoss(reduction='none')
criterion = optim.SGD(models['backbone'].parameters(), lr=LR,
optim_backbone =MOMENTUM, weight_decay=WDECAY)
momentum= optim.SGD(models['module'].parameters(), lr=LR,
optim_module =MOMENTUM, weight_decay=WDECAY)
momentum= lr_scheduler.MultiStepLR(optim_backbone, milestones=MILESTONES)
sched_backbone = lr_scheduler.MultiStepLR(optim_module, milestones=MILESTONES)
sched_module
= {'backbone': optim_backbone, 'module': optim_module}
optimizers = {'backbone': sched_backbone, 'module': sched_module}
schedulers
# Training and test
train(models, criterion, optimizers, schedulers, dataloaders, EPOCH, EPOCHL)= test(models, dataloaders, mode='test')
acc
test_accuracy_lal.append(acc)print('Cycle {}/{} || Label set size {}: Test acc {}'.format(cycle+1, CYCLES, len(labeled_set), acc))
# Randomly sample 10000 unlabeled data points
random.shuffle(unlabeled_set)= unlabeled_set[:SUBSET]
subset
# Create unlabeled dataloader for the unlabeled subset
= DataLoader(cifar10_unlabeled, batch_size=BATCH, sampler=SubsetSequentialSampler(subset), pin_memory=True)
unlabeled_loader
# Measure uncertainty of each data points in the subset
= get_uncertainty(models, unlabeled_loader)
uncertainty
# Index in ascending order
= np.argsort(uncertainty)
arg
# Update the labeled dataset and the unlabeled dataset, respectively
+= list(torch.tensor(subset)[arg][-ADDENDUM:].numpy())
labeled_set = list(torch.tensor(subset)[arg][:-ADDENDUM].numpy()) + unlabeled_set[SUBSET:]
unlabeled_set
# Create a new dataloader for the updated labeled dataset
'train'] = DataLoader(cifar10_train, batch_size=BATCH, sampler=SubsetRandomSampler(labeled_set), pin_memory=True) dataloaders[
0%| | 0/8 [00:00<?, ?it/s]
Cycle 1/5 || Label set size 1000: Test acc 43.6
Cycle 2/5 || Label set size 2000: Test acc 50.23
Cycle 3/5 || Label set size 3000: Test acc 57.39
Cycle 4/5 || Label set size 4000: Test acc 63.17
Cycle 5/5 || Label set size 5000: Test acc 69.3
= [1, 2, 3, 4, 5] # Active learning loop numbers
active_learning_loop ='o', linestyle='-')
plt.plot(active_learning_loop, test_accuracy_lal, marker
'Active Learning Loop')
plt.xlabel('Test Accuracy')
plt.ylabel('Test Accuracy Over Active Learning Loop')
plt.title( plt.show()
References:
- https://arxiv.org/pdf/1905.03677.pdf
- https://github.com/Mephisto405/Learning-Loss-for-Active-Learning