I’ve got a function written in Numpy where each call costs about 2 seconds, but I have to run it tens of thousands of times. It is obvious that I should use multi-thread or multi-processing, and the easiest way might be using process_map
or thread_map
in tqdm.contrib.concurrent
. But I found it not working as expected.
Here is an example that I’ve tested:
import time
import numpy as np
from tqdm.auto import tqdm
from tqdm.contrib.concurrent import process_map, thread_map
def mydataset(size, length):
for ii in range(length):
yield np.random.rand(*size)
def calc(mat):
# simulate some heavy calculation
for ii in range(1000):
avg = np.mean(mat)
std = np.std(mat)
return avg, std
def main():
ds = list(mydataset((500,500), 100))
t0 = time.time()
res1 = []
for mat in tqdm(ds):
res1.append(calc(mat))
print(f'for loop: {time.time() - t0}s')
t0 = time.time()
res2 = list(map(calc, tqdm(ds)))
print(f'native map: {time.time() - t0}s')
t0 = time.time()
res3 = process_map(calc, ds)
print(f'process map: {time.time() - t0}s')
t0 = time.time()
res4 = thread_map(calc, ds)
print(f'thread map: {time.time() - t0}s')
pass
if __name__ == '__main__':
main()
And it came out:
100%|████████████████████████████████████████████| 100/100 [00:51<00:00, 1.93it/s]
for loop: 51.884708642959595s
100%|████████████████████████████████████████████| 100/100 [00:52<00:00, 1.91it/s]
native map: 52.48910164833069s
100%|████████████████████████████████████████████| 100/100 [01:10<00:00, 1.41it/s]
process map: 71.0565574169159s
100%|████████████████████████████████████████████| 100/100 [00:41<00:00, 2.39it/s]
thread map: 42.04276633262634s
process_map
is much slower than a for-loop, and thread_map
also costs too much time, considering there are 28 cores in my PC. Yes, the CPU usage is not as high as expected, only 1 to 2 cores are at 100%.
And it seems that when my calc
do something simpler, process_map
DO have acceleration.
My test environment is openSUSE Tumbleweed with Linux 6.63
, python 3.10.10 | packaged by conda-forge | (main, Mar 24 2023, 20:08:06) [GCC 11.3.0]
, tqdm 4.66.1
.
I also tried the previous code under windows 11 x64
with python 3.8.10 (tags/v3.8.10:3d8993a, May 3 2021, 11:48:03) [MSC v.1928 64 bit (AMD64)]
, tqdm 4.62.3
, the result are similar, CPU usage reached 100% (all cores) in taskmgr, but process_map
(64.32s) and thread_map
(66.13s) are slightly faster than for-loop (78.41s) and map (79.79s).
So my question is, what is the correct way to do multi-processing/multi-threading acceleration where there are heavy calculations? How could I speed-up those calculations without manually splitting the dataset and running several for-loops concurrently?
I’ve tested your code and achieved a time of 64 seconds when using it as it is (4 cores). Afterwards I increased the core count to 6 and 8, but only got a very small gain. So it’s not limited by the calculation itself, but by the copying of the matrices.
So I changed your code, using a manager to hold your data. That copies the matrices only once, at the beginning of the calculation. I ran the loop and obtained a time of 2 seconds. Which means, avoiding to copy the data makes it much faster. The only things that are copied are the reference to the manager and some integer-numbers for the tasks.
here is my code:
import time
import numpy as np
from multiprocessing import Pool, Manager
def mydataset(size, length):
for ii in range(length):
yield np.random.rand(*size)
def calc(idx, mat_list):
# simulate some heavy calculation
for ii in range(1000):
avg = np.mean(mat_list[idx])
std = np.std(mat_list[idx])
return avg, std
def main():
ds = list(mydataset((500, 500), 100))
mypool = Pool(4)
manager = Manager()
mylist = manager.list(ds)
t0 = time.time()
res1 = mypool.starmap(calc, zip(range(len(ds)), mylist))
print(f"map manager: {time.time() - t0}s")
if __name__ == "__main__":
main()
Output:
map manager: 1.935054063796997s
So, try to avoid extensive copy-operation during your multiprocessing as they usually bottleneck the operation. If you have lots of data, use a sharing-concept where the data is copied only once and later read from that source.
I’m able to get a significant speedup in process_map on your example by using a smaller dtype, by putting
ds = [mat.astype('float32') for mat in ds]
after the line which creates the dataset. That suggests that this benchmark is mostly held back by inter-process communication costs. Don’t know if that solution is applicable to your other problem.