Implementation of CPU multicore parallel processing makes the code slower – PyTorch ANN

I am trying to speed up the training computations. Since it uses an Evolution Strategies optimizer, it takes longer for the ANN to learn.

I wanted to try and implement multicore parallel processing, but the code now takes longer (like 10x times longer) than without the parallel processing (although it does not give any errors). I have read that it may be overhead from the extra processes necesary for parallel processing, but I do not know if this is the case. For now, this the code that I have:

import time
import torch
import torchvision
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.multiprocessing as mp
import os

n_epochs = 15
batch_size_train = 256
batch_size_test = 1000
log_interval = 10
cores = os.cpu_count()

n_pop = 200
sigma = 0.1
alpha = 0.005
LAMBDA = 0.5

random_seed = 1
torch.backends.cudnn.enabled = False
torch.manual_seed(random_seed)


train_loader = torch.utils.data.DataLoader(
  torchvision.datasets.MNIST('mnist/', train=True, download=True,
                             transform=torchvision.transforms.Compose([
                               torchvision.transforms.ToTensor(),
                               torchvision.transforms.Normalize(
                                 (0.1307,), (0.3081,))
                             ])),
  batch_size=batch_size_train, shuffle=True)

test_loader = torch.utils.data.DataLoader(
  torchvision.datasets.MNIST('mnist/', train=False, download=True,
                             transform=torchvision.transforms.Compose([
                               torchvision.transforms.ToTensor(),
                               torchvision.transforms.Normalize(
                                 (0.1307,), (0.3081,))
                             ])),
  batch_size=batch_size_test, shuffle=True)



student_model = nn.Sequential(nn.Conv2d(1, 10, kernel_size=5),
                        nn.MaxPool2d(2),
                        nn.ReLU(),
                        nn.Conv2d(10, 20, kernel_size=5),
                        nn.MaxPool2d(2),
                        nn.ReLU(),
                        nn.Flatten(),
                        nn.Linear(320, 50),
                        nn.ReLU(),
                        nn.Linear(50, 10),
                        nn.LogSoftmax()
                       )

model_parameters = list(student_model.parameters())

# initializing teacher model and loading the state_dict
teacher_model = nn.Sequential(nn.Conv2d(1, 10, kernel_size=5),
                        nn.MaxPool2d(2),
                        nn.ReLU(),
                        nn.Conv2d(10, 20, kernel_size=5),
                        nn.MaxPool2d(2),
                        nn.ReLU(),
                        nn.Flatten(),
                        nn.Linear(320, 100),
                        nn.ReLU(),
                        nn.Linear(100, 10),
                        nn.LogSoftmax()
                       )

teacher_model.load_state_dict(torch.load("t-model.pth"))



# FROM OpenAI - Evolution Strategies
def compute_ranks(x):
    """
    Returns ranks in [0, len(x))
    Note: This is different from scipy.stats.rankdata, which returns ranks in [1, len(x)].
    """
    assert x.ndim == 1
    ranks = np.empty(len(x), dtype=int)
    ranks[x.argsort()] = np.arange(len(x))
    return ranks

def compute_centered_ranks(x):
    y = compute_ranks(x.ravel()).reshape(x.shape).astype(np.float32)
    y /= (x.size - 1)
    y -= .5
    return y
# END from OpenAI - Evolution Strategies

def train(rank, student_model, teacher_model, train_loader, data_queue, n_epochs, batch_size):

  train_loader = torch.utils.data.DataLoader(train_loader.dataset, batch_size=batch_size, shuffle=True)
  
  for epoch in range(1, n_epochs + 1):
    #network.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        ###
        st = time.time()

        if batch_idx % os.cpu_count() != rank:
                continue
        
        data, target = data_queue.get()  # Get a batch from the queue
        
        base_weights = nn.utils.parameters_to_vector(model_parameters).cpu().detach()
        
        # creating the original and negatively correlated samples for antithetic sampling
        perturbations_pos = np.random.randn(n_pop, base_weights.shape[0]).astype(np.float32)
        perturbations_neg = -perturbations_pos

        fitness = np.zeros(n_pop).astype(np.float32)
        _loss = np.zeros(n_pop)

        # TODO: best to implement this in parallel for speed
        # creating 2 loops, one for original and one for antithetic samples.
        for p in range(n_pop):
            w_try = base_weights + sigma * perturbations_pos[p, :]
            nn.utils.vector_to_parameters(w_try, model_parameters)

            # knowledge distilation
            with torch.no_grad():
              teacher_outputs = teacher_model(data)
            student_outputs = student_model(data)

            kd_loss = criterion_kd(F.log_softmax(student_outputs, dim=1), F.softmax(teacher_outputs, dim=1))
            response_loss = criterion_response(student_outputs, target)
            total_loss = kd_loss + LAMBDA * response_loss

            _loss[p] = total_loss
            fitness[p] = total_loss

        for p in range(n_pop):
            w_try = base_weights + sigma * perturbations_neg[p, :]
            nn.utils.vector_to_parameters(w_try, model_parameters)

            # knowledge distilation
            with torch.no_grad():
              teacher_outputs = teacher_model(data)
            student_outputs = student_model(data)

            kd_loss = criterion_kd(F.log_softmax(student_outputs, dim=1), F.softmax(teacher_outputs, dim=1))
            response_loss = criterion_response(student_outputs, target)
            total_loss = kd_loss + LAMBDA * response_loss

            _loss[p] = total_loss
            fitness[p] -= total_loss # Use antithetic sampling to adjust fitness

        scaled_fitness = (fitness - np.mean(fitness)) / (np.std(fitness)+1e-5)
        #scaled_fitness = compute_centered_ranks(fitness)

        new_weights = base_weights - alpha/(n_pop*sigma) * np.dot(perturbations_pos.T, scaled_fitness)
        nn.utils.vector_to_parameters(new_weights, model_parameters)

        ###
        en = time.time()

        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
              epoch, batch_idx * len(data), len(train_loader.dataset),
              100. * batch_idx / len(train_loader), np.mean(_loss)))
            print(en-st)

def test():
  student_model.eval()
  test_loss = 0
  correct = 0
  with torch.no_grad():
    for data, target in test_loader:
      output = student_model(data)
      test_loss += F.nll_loss(output, target, size_average=False).item()
      pred = output.data.max(1, keepdim=True)[1]
      correct += pred.eq(target.data.view_as(pred)).sum()
  test_loss /= len(test_loader.dataset)
  print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
    test_loss, correct, len(test_loader.dataset),
    100. * correct / len(test_loader.dataset)))

# knowledge distilation loss and response generation loss
# criterion_kd = nn.MSELoss() 
criterion_kd = nn.KLDivLoss()
criterion_response = nn.CrossEntropyLoss()  

if __name__ == "__main__":
    # Share the model's memory to allow it to be accessed by multiple processes
    student_model.share_memory()
    teacher_model.share_memory()

    # Create a multiprocessing queue to share data among processes
    data_queue = mp.Queue()

    # Fill the queue with data batches
    for data, target in train_loader:
        data_queue.put((data, target))
  
    # Create a list of processes and start each process with the train function
    processes = []
    for rank in range(cores):
        p = mp.Process(target=train,
            args=(rank, student_model, teacher_model, train_loader, data_queue, n_epochs, batch_size_train),
            name=f"Process-{rank}",
        )
        p.start()
        processes.append(p)
        print(f"Started {p.name}")
  
    # Wait for all processes to finish
    for p in processes:
        p.join()
        print(f"Finished {p.name}")
  
    # Print the model's prediction on the test input after training
    torch.save(student_model.state_dict(), 's-model.pth')

    test()



What seems to problem? I am new to PyTorch, so if you see any glaring malpractices, please let me know!

Leave a Comment