Multiprocessing does not seem to work on df.apply()

I’ve a pandas dataframe on which I want to apply row operations for several columns. The function that returns the pandas Series I need looks like this:

def get_info_previous_flight(row, original_column_name, new_column_name):
if row['n_flight_of_day'] == 1:
    return 0
else:
    flight_date = row['dep_sched_date']
    ac_registration = row['ac_registration']
    previous_flight_number = row['n_flight_of_day'] - 1
    value_previous_flight = flight_info_df.loc[
        (flight_info_df['ac_registration'] == ac_registration) &
        (flight_info_df['dep_sched_date']  == flight_date) &
        (flight_info_df['n_flight_of_day'] == previous_flight_number)
    ][['leg_no', original_column_name]]
    output = pd.Series({'leg_no': value_previous_flight['leg_no'], new_column_name: value_previous_flight[original_column_name]})
    return output

The function that I use to perform this tasks for multiple columns is as follows:

def perform_task(parameters):
    return parameters[2].apply(lambda row: get_info_previous_flight(row, parameters[0], 
           parameters[1]), axis=1)

If I create a pool like this, it does not seem to work:

    previous_flight_info = {'change_reason_code_previous_flight': 'change_reason_code',
                        'dep_delay_previous_flight': 'dep_delay',
                        'act_trans_time_previous_flight': 'trans_time',
                        'sched_trans_time_previous_flight': 'sched_trans_time',
                        'act_groundtime_previous_flight': 'Act Groundtime',
                        'sched_groundtime_previous_flight': 'Sched Groundtime'}

tasks = [(key, value, flight_info_df) for key, value in previous_flight_info.items()]
         
pool = mp.Pool()

results = pool.map(perform_task, tasks)
pool.close()
pool.join()

print(results)

The thing is, if I perform this .apply() function for each key item combination in the previous_flight_info it takes about 3 minutes to run. If I use above example, I don’t see any increase in CPU usage and my kernel eventually times out.

  • Yep, multiprocessing won’t work because each process will have its own copy of the dataframe, and changes won’t propagate into the parent process.

    – 

  • If I understand your intent correctly, you want to fill in the data of a previous flight for each row; you should probably ask a question about how to efficiently do that (and I think groupby will be the answer there).

    – 

  • 1

    And now that I look at it, you’re also doing a whole lot of extra work since you do the finding-the-previous-flight work separately for each column to be computed.

    – 




  • You are correct. I’m not sure how ‘groupby’ will help me here. Can you explain how you would do that? @AKX

    – 




multiprocessing won’t work because each process will have its own copy of the dataframe, and changes won’t propagate into the parent process.

Instead of doing a whole lot of work with apply, you could do something with groupby.

The idea here is to sort the original dataframe so flights are in order, then group it by the pairs used for figuring out previous flights ((dep_sched_date, ac_registration) tuples), and then you can simply assign from each group:

import pandas as pd

pd.options.display.width = 0

flight_info_df = pd.DataFrame(
    {
        "dep_sched_date": ["2020-01-01", "2020-01-01", "2020-01-02", "2020-01-02", "2020-01-02", "2020-01-02", "2020-01-02"],
        "ac_registration": ["B-1234", "B-1234", "B-1234", "B-1234", "B-2345", "B-2345", "B-2345"],
        "n_flight_of_day": [1, 2, 1, 2, 1, 2, 3],
        "dep_delay": [7, 3, 5, 2, 0, 0.6, -8],
        "meals": ["beef", "chicken", "vegetarian", "just cheese", "soup", "tart", "beef"],
    }
)

flight_info_df.sort_values(
    ["dep_sched_date", "ac_registration", "n_flight_of_day"], inplace=True
)
grouped = flight_info_df.groupby(["dep_sched_date", "ac_registration"])

flight_info_df["prev_dep_delay"] = grouped["dep_delay"].shift(1)
flight_info_df["prev_meals"] = grouped["meals"].shift(1)

print(flight_info_df)

This prints out

  dep_sched_date ac_registration  n_flight_of_day  dep_delay        meals  prev_dep_delay  prev_meals
0     2020-01-01          B-1234                1        7.0         beef             NaN         NaN
1     2020-01-01          B-1234                2        3.0      chicken             7.0        beef
2     2020-01-02          B-1234                1        5.0   vegetarian             NaN         NaN
3     2020-01-02          B-1234                2        2.0  just cheese             5.0  vegetarian
4     2020-01-02          B-2345                1        0.0         soup             NaN         NaN
5     2020-01-02          B-2345                2        0.6         tart             0.0        soup
6     2020-01-02          B-2345                3       -8.0         beef             0.6        tart

Leave a Comment