Read from multiple rest api stream endpoints – python

I have a multiple rest api endpoints (urls) which streams data. I wonder what would be the best approach to read from all of them in one / many processes.

currently I’m reading the data only from one url, doing something like:

    s = requests.Session()
    resp = s.get(url, headers=headers, stream=True)
    for line in resp.iter_lines():
        if line:
            print(line)

I would like to do the same with more urls and I wonder what would be the best approach here.

  • It depends what you’re trying to do… You can spawn more threads, each thread will read from different URL. But I recommend to begin with something easier, e.g. concurrent.futures.ThreadPoolExecutor

    – 

  • I wonder if using with to open multiple connection can be a good approach

    – 

  • I’ve added an example with ThreadPoolExecutor

    – 

Here is an example how you can read from multiple URLs using concurrent.futures.ThreadPoolExecutor. But this is just one approach, you can use multiprocessing, asyncio/aiohttp, etc.

from concurrent.futures import ThreadPoolExecutor

import requests


def get_from_api(tpl):
    session, url = tpl

    resp = session.get(url, stream=True)

    # just for example:
    count_lines = 0
    for line in resp.iter_lines():
        count_lines += 1

    return url, count_lines


def main():
    api_urls = [
        "https://google.com",
        "https://yahoo.com",
        "https://facebook.com",
        "https://instagram.com",
        # ...etc.
    ]

    with ThreadPoolExecutor(max_workers=2) as pool, requests.session() as session:
        for url, count_lines in pool.map(
            get_from_api, ((session, url) for url in api_urls)
        ):
            print(url, count_lines)


if __name__ == "__main__":
    main()

Prints:

https://google.com 17
https://yahoo.com 648
https://facebook.com 26
https://instagram.com 50

EDIT: Using asyncio/aiohttp:

import asyncio

# Streaming API:
# https://docs.aiohttp.org/en/stable/streams.html#streaming-api
import aiohttp


async def fetch(session, url):
    while True:
        async with session.get(url) as response:
            reader = response.content

            cnt = 0
            async for line in reader:
                cnt += 1

            print(f"{url}: {cnt} lines read")

        await asyncio.sleep(3)


async def main():
    urls = [
        "https://google.com",  # Replace with actual URLs
        "https://facebook.com",
    ]

    async with aiohttp.ClientSession() as session:
        tasks = {asyncio.create_task(fetch(session, url)) for url in urls}

        # this loops indifinitely:
        await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(main())

Prints:

https://google.com: 17 lines read
https://facebook.com: 26 lines read
https://google.com: 655 lines read
https://facebook.com: 26 lines read

...

Leave a Comment