I have a multiple rest api endpoints (urls) which streams data. I wonder what would be the best approach to read from all of them in one / many processes.
currently I’m reading the data only from one url, doing something like:
s = requests.Session()
resp = s.get(url, headers=headers, stream=True)
for line in resp.iter_lines():
if line:
print(line)
I would like to do the same with more urls and I wonder what would be the best approach here.
Here is an example how you can read from multiple URLs using concurrent.futures.ThreadPoolExecutor
. But this is just one approach, you can use multiprocessing
, asyncio
/aiohttp
, etc.
from concurrent.futures import ThreadPoolExecutor
import requests
def get_from_api(tpl):
session, url = tpl
resp = session.get(url, stream=True)
# just for example:
count_lines = 0
for line in resp.iter_lines():
count_lines += 1
return url, count_lines
def main():
api_urls = [
"https://google.com",
"https://yahoo.com",
"https://facebook.com",
"https://instagram.com",
# ...etc.
]
with ThreadPoolExecutor(max_workers=2) as pool, requests.session() as session:
for url, count_lines in pool.map(
get_from_api, ((session, url) for url in api_urls)
):
print(url, count_lines)
if __name__ == "__main__":
main()
Prints:
https://google.com 17
https://yahoo.com 648
https://facebook.com 26
https://instagram.com 50
EDIT: Using asyncio
/aiohttp
:
import asyncio
# Streaming API:
# https://docs.aiohttp.org/en/stable/streams.html#streaming-api
import aiohttp
async def fetch(session, url):
while True:
async with session.get(url) as response:
reader = response.content
cnt = 0
async for line in reader:
cnt += 1
print(f"{url}: {cnt} lines read")
await asyncio.sleep(3)
async def main():
urls = [
"https://google.com", # Replace with actual URLs
"https://facebook.com",
]
async with aiohttp.ClientSession() as session:
tasks = {asyncio.create_task(fetch(session, url)) for url in urls}
# this loops indifinitely:
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
Prints:
https://google.com: 17 lines read
https://facebook.com: 26 lines read
https://google.com: 655 lines read
https://facebook.com: 26 lines read
...
It depends what you’re trying to do… You can spawn more threads, each thread will read from different URL. But I recommend to begin with something easier, e.g.
concurrent.futures.ThreadPoolExecutor
I wonder if using
with
to open multiple connection can be a good approachI’ve added an example with
ThreadPoolExecutor