Does anyone have an example of an external transform using asyncio to sync data concurrently? I am trying to set up an example on my end, but am not getting it to loop through the pages of the API I’m hitting, even though it works fine when I use multithreading instead.
Code:
from transforms.api import transform, Output, incremental, lightweight
from transforms.external.systems import external_systems, Source
import pandas as pd
import yaml
import logging
import asyncio
import aiohttp
log = logging.getLogger(__name__)
@lightweight()
@incremental(require_incremental=True)
@external_systems(
pokeSource=Source("ri.magritte..source.72b1293a-2cd3-467a-af37-79e83888a565")
)
@transform(
out=Output("ri.foundry.main.dataset.0ca50951-c5a8-4722-a168-9ad3bce02fd5"),
)
async def compute(pokeSource, out):
out_fs = out.filesystem()
state_filename = "_state.yaml"
state = {"start_url": "https://pokeapi.co/api/v2/pokemon?limit=100&offset=0"}
try:
with out_fs.open(state_filename, mode='r') as state_file:
state = yaml.safe_load(state_file)
logging.info(f"state file found, continuing from : {state}")
except Exception:
logging.warn("state file not found, starting over from default state")
async def fetch_pokemon_data(session, url):
async with session.get(url) as response:
return await response.json()
async with aiohttp.ClientSession() as session:
response0 = await fetch_pokemon_data(session, state["start_url"])
urls = [response0["next"]] # Start with the next URL
# Fetch more URLs if needed
while len(urls) < 5 and urls[-1] is not None: # Adjust the number of parallel requests as needed
resp = await fetch_pokemon_data(session, urls[-1])
if resp["next"]:
urls.append(resp["next"])
else:
break
tasks = [fetch_pokemon_data(session, url) for url in urls]
responses = await asyncio.gather(*tasks)
new_data = []
for url, responseJson in zip(urls, responses):
for pokemon in responseJson["results"]:
new_data.append({"name": pokemon["name"], "url": url})
new_df = pd.DataFrame(new_data)
# Write the DataFrame directly using the lightweight API
out.write_table(new_df)
state["start_url"] = urls[-1] if urls else state["start_url"]
with out_fs.open(state_filename, "w") as state_file:
yaml.dump(state, state_file)