PySpark RDD in Code Repos

I’ve been trying to implement this transform in PySpark for a while and can’t figure out how to get it right. The code is below:

from transforms.api import transform, Input, Output
from transforms.external.systems import external_systems, Source
from pyspark.sql.types import StructType, StructField, StringType, DateType
from datetime import datetime
import uuid
import json


@external_systems(source=Source("rid"))
@transform(
    raw_output=Output("rid"),
    errors_output=Output("rid"),
    ids_input=Input("rid"),
)
def fetch_activity(ctx, source, ids_input, raw_output, errors_output):
    raw_schema = StructType([
        StructField("primary_key", StringType()),
        StructField("insertion_date", DateType()),
        StructField("gender", StringType()),
        StructField("event_type", StringType()),
        StructField("player_id", StringType()),
        StructField("data", StringType())
    ])

    errors_schema = StructType([
        StructField("primary_key", StringType()),
        StructField("insertion_date", DateType()),
        StructField("gender", StringType()),
        StructField("event_type", StringType()),
        StructField("player_id", StringType()),
        StructField("error", StringType())
    ])

    url = source.get_https_connection().url
    client = source.get_https_connection().get_client()
    current_date = datetime.today().date()

    ids_df = ids_input.dataframe()
    ids_rdd = ids_df.select("id").rdd.map(lambda row: row.id)

    def fetch_data(id):
        raw_data = []
        errors_data = []

        for event_type, suffix in [("singles", "sgl"), ("doubles", "dbl")]:
            url = f"{url}/en/-/www/activity/{suffix}/{id}"
            try:
                response = client.get(url)
                response_json = response.json()
                raw_data.append((
                    event_type,
                    id,
                    json.dumps(response_json),
                ))
            except Exception as e:
                activity_errors_data.append((
                    event_type,
                    id,
                    f"error fetching tournament data: {str(e)}",
                ))

        return raw_data, errors_data

    # Map RDD to fetch data in parallel
    result_rdd = ds_rdd.flatMap(fetch_data)

    # Separate raw data and error data
    raw_data_rdd = result_rdd.flatMap(lambda x: x[0])
    errors_data_rdd = result_rdd.flatMap(lambda x: x[1])

    # Convert RDDs to DataFrames
    raw_data_df = ctx.spark_session.createDataFrame(
        raw_data_rdd.map(lambda row: (
            str(uuid.uuid4()),
            current_date,
            "male",
            row[0],
            row[1],
            row[2]
        )),
        schema=activity_schema
    )

    errors_data_df = ctx.spark_session.createDataFrame(
        errors_data_rdd.map(lambda row: (
            str(uuid.uuid4()),
            current_date,
            "male",
            row[0],
            row[1],
            row[2],
        )),
        schema=errors_schema
    )

    raw_output.write_dataframe(raw_data_df)
    errors_output.write_dataframe(errors_data_df)

Currently, I receive the following error:

File "/scratch/standalone/01060370-d2f4-4561-bf11-93db36058388/code-assist/contents/transforms-python/src/project/fetch/activity.py", line 70, in <lambda>
    raw_data_rdd = result_rdd.flatMap(lambda x: x[0])
                                                ~^^^

IndexError: list index out of range

I’ve tried using the debugger, but I find it hard to inspect RDDs. Is there anything wrong you see in my code, and can you please point me in the right direction to learn how to utilize PySpark. I’m trying to use it here in Code Repos because my initial implementation of just iterating through the ids and fetching the data resulted in “driver out of memory” errors. If there is a better way to get around the driver out of memory error, please let me know (note, I tried modifying the Spark profile already).

Hi,

The error you’re encountering might be because the fetch_data function is not returning a tuple with two lists as expected. Could you check manually that you have a valid output when running fetch_data for a few ids? Could there be a case where you get an empty list, meaning x[0] would be out of range?

also, looks like you have a typo: activity_errors_data should probably be errors_data in your fetch_data function.

Hey there!

The typo was just me trying to obfuscate some of the variables and rids.

The fetch_data function should return two lists no matter if the API requests succeed or fail as I initialize the raw_data and errors_data variables to empty lists at the beginning of the fetch_data function.

I am struggling to test this manually. I am trying to use preview and debug but I am struggling. I don’t know what to look for.

There’s several options here:

  • you can use external transforms (https://www.palantir.com/docs/foundry/data-integration/external-transforms/) which will allow you to preview the API call (otherwise you’ll likely get blocked - which might not be a problem for just testing the pyspark logic though).
  • have you tried using breakpoint, for example just after result_rdd = ds_rdd.flatMap(fetch_data) and use createDataFrame & .collect() to observe what was in your datasets? That might be helpful to understand what is in your rdd at time T with a sample of the data (again, if you’re not in external transforms, your API call will likely not work but that should not be a problem for testing).

Generally speaking, you don’t want to use Spark (whether you’re using the RDD API or the DataFrame API) when you’re calling out to an external API, because programs that interact with external APIs are spending most of their time waiting on I/O rather than doing computation - and the core advantage of Spark is distributing computation! It’s very expensive to have all of those cores just sit around waiting for an API to return some data. Spark is also lazily evaluated, which can result in some non-intuitive behavior for programs like this (for example, I believe that your program is actually going to do each API call twice as an artifact of the way you split the results into raw_output and errors_output).

If you want to do many network requests at once, using simple Python multithreading is very straightforward and a much better choice than Spark for this sort of use-case. If you don’t have enough memory on your driver to fit all of the data that you need to fetch, you can use one of the following approaches.

  1. Use DRIVER_MEMORY_LARGE, or even DRIVER_MEMORY_EXTRA_LARGE (be sure to also use KUBERNETES_NO_EXECUTORS if you’re not doing distributed spark computation).
  2. Use lightweight transforms, which also expose functionality to control how much memory your container gets.
  3. Only process a certain amount of records on each build, using the transforms incremental APIs.

As one final note - while I don’t recommend using Spark here for the reasons above, if you were to hypothetically use Spark after all, you can just use the DataFrame API with a UDF that returns a struct. I suspect that that would result in a better debugging experience than the RDD API is giving you right now.

1 Like

Thank you, this was very helpful!

I ended up using the @configure decorator and specified the profile to be “DRIVER_MEMORY_LARGE”. I had previously just made that profile available, but didn’t know I had to reference it in my code.