I’ve been trying to implement this transform in PySpark for a while and can’t figure out how to get it right. The code is below:
from transforms.api import transform, Input, Output
from transforms.external.systems import external_systems, Source
from pyspark.sql.types import StructType, StructField, StringType, DateType
from datetime import datetime
import uuid
import json
@external_systems(source=Source("rid"))
@transform(
raw_output=Output("rid"),
errors_output=Output("rid"),
ids_input=Input("rid"),
)
def fetch_activity(ctx, source, ids_input, raw_output, errors_output):
raw_schema = StructType([
StructField("primary_key", StringType()),
StructField("insertion_date", DateType()),
StructField("gender", StringType()),
StructField("event_type", StringType()),
StructField("player_id", StringType()),
StructField("data", StringType())
])
errors_schema = StructType([
StructField("primary_key", StringType()),
StructField("insertion_date", DateType()),
StructField("gender", StringType()),
StructField("event_type", StringType()),
StructField("player_id", StringType()),
StructField("error", StringType())
])
url = source.get_https_connection().url
client = source.get_https_connection().get_client()
current_date = datetime.today().date()
ids_df = ids_input.dataframe()
ids_rdd = ids_df.select("id").rdd.map(lambda row: row.id)
def fetch_data(id):
raw_data = []
errors_data = []
for event_type, suffix in [("singles", "sgl"), ("doubles", "dbl")]:
url = f"{url}/en/-/www/activity/{suffix}/{id}"
try:
response = client.get(url)
response_json = response.json()
raw_data.append((
event_type,
id,
json.dumps(response_json),
))
except Exception as e:
activity_errors_data.append((
event_type,
id,
f"error fetching tournament data: {str(e)}",
))
return raw_data, errors_data
# Map RDD to fetch data in parallel
result_rdd = ds_rdd.flatMap(fetch_data)
# Separate raw data and error data
raw_data_rdd = result_rdd.flatMap(lambda x: x[0])
errors_data_rdd = result_rdd.flatMap(lambda x: x[1])
# Convert RDDs to DataFrames
raw_data_df = ctx.spark_session.createDataFrame(
raw_data_rdd.map(lambda row: (
str(uuid.uuid4()),
current_date,
"male",
row[0],
row[1],
row[2]
)),
schema=activity_schema
)
errors_data_df = ctx.spark_session.createDataFrame(
errors_data_rdd.map(lambda row: (
str(uuid.uuid4()),
current_date,
"male",
row[0],
row[1],
row[2],
)),
schema=errors_schema
)
raw_output.write_dataframe(raw_data_df)
errors_output.write_dataframe(errors_data_df)
Currently, I receive the following error:
File "/scratch/standalone/01060370-d2f4-4561-bf11-93db36058388/code-assist/contents/transforms-python/src/project/fetch/activity.py", line 70, in <lambda>
raw_data_rdd = result_rdd.flatMap(lambda x: x[0])
~^^^
IndexError: list index out of range
I’ve tried using the debugger, but I find it hard to inspect RDDs. Is there anything wrong you see in my code, and can you please point me in the right direction to learn how to utilize PySpark. I’m trying to use it here in Code Repos because my initial implementation of just iterating through the ids and fetching the data resulted in “driver out of memory” errors. If there is a better way to get around the driver out of memory error, please let me know (note, I tried modifying the Spark profile already).