I am trying to speed up the training computations. Since it uses an Evolution Strategies optimizer, it takes longer for the ANN to learn.
I wanted to try and implement multicore parallel processing, but the code now takes longer (like 10x times longer) than without the parallel processing (although it does not give any errors). I have read that it may be overhead from the extra processes necesary for parallel processing, but I do not know if this is the case. For now, this the code that I have:
import time
import torch
import torchvision
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.multiprocessing as mp
import os
n_epochs = 15
batch_size_train = 256
batch_size_test = 1000
log_interval = 10
cores = os.cpu_count()
n_pop = 200
sigma = 0.1
alpha = 0.005
LAMBDA = 0.5
random_seed = 1
torch.backends.cudnn.enabled = False
torch.manual_seed(random_seed)
train_loader = torch.utils.data.DataLoader(
torchvision.datasets.MNIST('mnist/', train=True, download=True,
transform=torchvision.transforms.Compose([
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize(
(0.1307,), (0.3081,))
])),
batch_size=batch_size_train, shuffle=True)
test_loader = torch.utils.data.DataLoader(
torchvision.datasets.MNIST('mnist/', train=False, download=True,
transform=torchvision.transforms.Compose([
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize(
(0.1307,), (0.3081,))
])),
batch_size=batch_size_test, shuffle=True)
student_model = nn.Sequential(nn.Conv2d(1, 10, kernel_size=5),
nn.MaxPool2d(2),
nn.ReLU(),
nn.Conv2d(10, 20, kernel_size=5),
nn.MaxPool2d(2),
nn.ReLU(),
nn.Flatten(),
nn.Linear(320, 50),
nn.ReLU(),
nn.Linear(50, 10),
nn.LogSoftmax()
)
model_parameters = list(student_model.parameters())
# initializing teacher model and loading the state_dict
teacher_model = nn.Sequential(nn.Conv2d(1, 10, kernel_size=5),
nn.MaxPool2d(2),
nn.ReLU(),
nn.Conv2d(10, 20, kernel_size=5),
nn.MaxPool2d(2),
nn.ReLU(),
nn.Flatten(),
nn.Linear(320, 100),
nn.ReLU(),
nn.Linear(100, 10),
nn.LogSoftmax()
)
teacher_model.load_state_dict(torch.load("t-model.pth"))
# FROM OpenAI - Evolution Strategies
def compute_ranks(x):
"""
Returns ranks in [0, len(x))
Note: This is different from scipy.stats.rankdata, which returns ranks in [1, len(x)].
"""
assert x.ndim == 1
ranks = np.empty(len(x), dtype=int)
ranks[x.argsort()] = np.arange(len(x))
return ranks
def compute_centered_ranks(x):
y = compute_ranks(x.ravel()).reshape(x.shape).astype(np.float32)
y /= (x.size - 1)
y -= .5
return y
# END from OpenAI - Evolution Strategies
def train(rank, student_model, teacher_model, train_loader, data_queue, n_epochs, batch_size):
train_loader = torch.utils.data.DataLoader(train_loader.dataset, batch_size=batch_size, shuffle=True)
for epoch in range(1, n_epochs + 1):
#network.train()
for batch_idx, (data, target) in enumerate(train_loader):
###
st = time.time()
if batch_idx % os.cpu_count() != rank:
continue
data, target = data_queue.get() # Get a batch from the queue
base_weights = nn.utils.parameters_to_vector(model_parameters).cpu().detach()
# creating the original and negatively correlated samples for antithetic sampling
perturbations_pos = np.random.randn(n_pop, base_weights.shape[0]).astype(np.float32)
perturbations_neg = -perturbations_pos
fitness = np.zeros(n_pop).astype(np.float32)
_loss = np.zeros(n_pop)
# TODO: best to implement this in parallel for speed
# creating 2 loops, one for original and one for antithetic samples.
for p in range(n_pop):
w_try = base_weights + sigma * perturbations_pos[p, :]
nn.utils.vector_to_parameters(w_try, model_parameters)
# knowledge distilation
with torch.no_grad():
teacher_outputs = teacher_model(data)
student_outputs = student_model(data)
kd_loss = criterion_kd(F.log_softmax(student_outputs, dim=1), F.softmax(teacher_outputs, dim=1))
response_loss = criterion_response(student_outputs, target)
total_loss = kd_loss + LAMBDA * response_loss
_loss[p] = total_loss
fitness[p] = total_loss
for p in range(n_pop):
w_try = base_weights + sigma * perturbations_neg[p, :]
nn.utils.vector_to_parameters(w_try, model_parameters)
# knowledge distilation
with torch.no_grad():
teacher_outputs = teacher_model(data)
student_outputs = student_model(data)
kd_loss = criterion_kd(F.log_softmax(student_outputs, dim=1), F.softmax(teacher_outputs, dim=1))
response_loss = criterion_response(student_outputs, target)
total_loss = kd_loss + LAMBDA * response_loss
_loss[p] = total_loss
fitness[p] -= total_loss # Use antithetic sampling to adjust fitness
scaled_fitness = (fitness - np.mean(fitness)) / (np.std(fitness)+1e-5)
#scaled_fitness = compute_centered_ranks(fitness)
new_weights = base_weights - alpha/(n_pop*sigma) * np.dot(perturbations_pos.T, scaled_fitness)
nn.utils.vector_to_parameters(new_weights, model_parameters)
###
en = time.time()
if batch_idx % log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), np.mean(_loss)))
print(en-st)
def test():
student_model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
output = student_model(data)
test_loss += F.nll_loss(output, target, size_average=False).item()
pred = output.data.max(1, keepdim=True)[1]
correct += pred.eq(target.data.view_as(pred)).sum()
test_loss /= len(test_loader.dataset)
print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(test_loader.dataset),
100. * correct / len(test_loader.dataset)))
# knowledge distilation loss and response generation loss
# criterion_kd = nn.MSELoss()
criterion_kd = nn.KLDivLoss()
criterion_response = nn.CrossEntropyLoss()
if __name__ == "__main__":
# Share the model's memory to allow it to be accessed by multiple processes
student_model.share_memory()
teacher_model.share_memory()
# Create a multiprocessing queue to share data among processes
data_queue = mp.Queue()
# Fill the queue with data batches
for data, target in train_loader:
data_queue.put((data, target))
# Create a list of processes and start each process with the train function
processes = []
for rank in range(cores):
p = mp.Process(target=train,
args=(rank, student_model, teacher_model, train_loader, data_queue, n_epochs, batch_size_train),
name=f"Process-{rank}",
)
p.start()
processes.append(p)
print(f"Started {p.name}")
# Wait for all processes to finish
for p in processes:
p.join()
print(f"Finished {p.name}")
# Print the model's prediction on the test input after training
torch.save(student_model.state_dict(), 's-model.pth')
test()
What seems to problem? I am new to PyTorch, so if you see any glaring malpractices, please let me know!