How to get accelerated using multiprocessing/process_map et al., with heavy calculation in python (e.g., numpy)?

I’ve got a function written in Numpy where each call costs about 2 seconds, but I have to run it tens of thousands of times. It is obvious that I should use multi-thread or multi-processing, and the easiest way might be using process_map or thread_map in tqdm.contrib.concurrent. But I found it not working as expected.

Here is an example that I’ve tested:

import time
import numpy as np
from tqdm.auto import tqdm
from tqdm.contrib.concurrent import process_map, thread_map


def mydataset(size, length):
    for ii in range(length):
        yield np.random.rand(*size)


def calc(mat):
    # simulate some heavy calculation
    for ii in range(1000):
        avg = np.mean(mat)
        std = np.std(mat)
    return avg, std

def main():
    ds = list(mydataset((500,500), 100))

    t0 = time.time()
    res1 = []
    for mat in tqdm(ds):
        res1.append(calc(mat))
    print(f'for loop: {time.time() - t0}s')

    t0 = time.time()
    res2 = list(map(calc, tqdm(ds)))
    print(f'native map: {time.time() - t0}s')

    t0 = time.time()
    res3 = process_map(calc, ds)
    print(f'process map: {time.time() - t0}s')

    t0 = time.time()
    res4 = thread_map(calc, ds)
    print(f'thread map: {time.time() - t0}s')

    pass


if __name__ == '__main__':
    main()

And it came out:

100%|████████████████████████████████████████████| 100/100 [00:51<00:00,  1.93it/s]
for loop: 51.884708642959595s
100%|████████████████████████████████████████████| 100/100 [00:52<00:00,  1.91it/s]
native map: 52.48910164833069s
100%|████████████████████████████████████████████| 100/100 [01:10<00:00,  1.41it/s]
process map: 71.0565574169159s
100%|████████████████████████████████████████████| 100/100 [00:41<00:00,  2.39it/s]
thread map: 42.04276633262634s

process_map is much slower than a for-loop, and thread_map also costs too much time, considering there are 28 cores in my PC. Yes, the CPU usage is not as high as expected, only 1 to 2 cores are at 100%.

And it seems that when my calc do something simpler, process_map DO have acceleration.

My test environment is openSUSE Tumbleweed with Linux 6.63, python 3.10.10 | packaged by conda-forge | (main, Mar 24 2023, 20:08:06) [GCC 11.3.0], tqdm 4.66.1.

I also tried the previous code under windows 11 x64 with python 3.8.10 (tags/v3.8.10:3d8993a, May 3 2021, 11:48:03) [MSC v.1928 64 bit (AMD64)], tqdm 4.62.3, the result are similar, CPU usage reached 100% (all cores) in taskmgr, but process_map (64.32s) and thread_map (66.13s) are slightly faster than for-loop (78.41s) and map (79.79s).

So my question is, what is the correct way to do multi-processing/multi-threading acceleration where there are heavy calculations? How could I speed-up those calculations without manually splitting the dataset and running several for-loops concurrently?

  • I’m able to get a significant speedup in process_map on your example by using a smaller dtype, by putting ds = [mat.astype('float32') for mat in ds] after the line which creates the dataset. That suggests that this benchmark is mostly held back by inter-process communication costs. Don’t know if that solution is applicable to your other problem.

    – 

I’ve tested your code and achieved a time of 64 seconds when using it as it is (4 cores). Afterwards I increased the core count to 6 and 8, but only got a very small gain. So it’s not limited by the calculation itself, but by the copying of the matrices.

So I changed your code, using a manager to hold your data. That copies the matrices only once, at the beginning of the calculation. I ran the loop and obtained a time of 2 seconds. Which means, avoiding to copy the data makes it much faster. The only things that are copied are the reference to the manager and some integer-numbers for the tasks.

here is my code:

import time
import numpy as np
from multiprocessing import Pool, Manager


def mydataset(size, length):
    for ii in range(length):
        yield np.random.rand(*size)


def calc(idx, mat_list):
    # simulate some heavy calculation
    for ii in range(1000):
        avg = np.mean(mat_list[idx])
        std = np.std(mat_list[idx])
    return avg, std


def main():
    ds = list(mydataset((500, 500), 100))

    mypool = Pool(4)
    manager = Manager()
    mylist = manager.list(ds)

    t0 = time.time()
    res1 = mypool.starmap(calc, zip(range(len(ds)), mylist))
    print(f"map manager: {time.time() - t0}s")

if __name__ == "__main__":
    main()

Output:

map manager: 1.935054063796997s

So, try to avoid extensive copy-operation during your multiprocessing as they usually bottleneck the operation. If you have lots of data, use a sharing-concept where the data is copied only once and later read from that source.

Leave a Comment