How to execute multiple coroutines concurrently and do something when some coroutines finishes?

Context

I have three recipe classes which shares a common ingredient.


class BrewedNoodle:
    ingredients: list[Coroutine] = [Noodle]

    async def cook(noodle: Noodle):
        pass

class PorkNoodle:
    ingredients: list[Coroutine] = [Noodle, Pork]

    async def cook(noodle: Noodle, pork: Pork):
        pass


class BeefNoodle:
    ingredients: list[Coroutine] = [Noodle, Beef]

    async def cook(noodle: Noodle, beef: Beef):
        pass

please consider each indredient as an API (needs network call)

What I want to do

I want to perform asyncio.gather() to fetch(prepare) each ingredients concurrently.
and as soon as all necessary ingredients are prepared for a specific recipe, I want to cook it immediately, not waiting for other ingredients.

For example:

  1. as soon as Noodle is prepared, I cook BrewedNoodle.
  2. And Pork is prepared, (since Noodle and Pork is ready) then I cook PorkNoodle.
  3. And after the beef is prepared I cook BeefNoodle.

What I actually did

but I’m awaiting asyncio.gather(), I’m destined to wait until all ingredients are ready. (I can not pre-cook BrewedNoodle even if Noodle is prepared)

# prepare ingredients
noodle, pork, beef = await asyncio.gather(Noodle(), Pork(), Beef())


# cook
BrewedNoodle().cook(noodle)
PorkNoodle().cook(noodle, pork)
BeefNoodle().cook(noodle, beef)

Question

How can I cook a Recipe when necessary ingredients are ready without waiting for other ingredients?

If I understand your example correctly, there needs to be some kind of explicit mapping from ingredients to the dishes that can be created from them. The special case of Noodle being required for the other dishes also needs to be addressed separately. The core of the algorithm is asyncio.as_completed():

dishes = {
    Noodle: BrewedNoodle,
    Pork: PorkNoodle,
    Beef: BeefNoodle,
}
ingredients = [Noodle(), Pork(), Beef()]
noodle = None
for coro in asyncio.as_completed(ingredients):
    ingredient = await coro
    dish = dishes[type(ingredient)]
    if isinstance(ingredient, Noodle):
        dish.cook(ingredient)
        noodle = ingredient
    else:
        # Pork or Beef, assuming noodle has already finished
        dish.cook(noodle, ingredient)

Leave a Comment