Need help with parallelization when using filesystem()

Thank you so much @sandpiper! df.repartition(32).foreach(convert_file) solved this issue. This reduced the foreach stage of the build from 1h 45m to 7m! This brings the total ETA from 45 days to 10-15 days due to the other stages, which I’m working on troubleshooting now. Will post updates, but also pasting latest code for reference.

# Modified from CodeStrap's post at https://community.palantir.com/t/pipeline-builder-incremental-mediasets-pdf-extraction/1279/5
from transforms.api import transform, Input, Output, incremental, configure
from transforms.mediasets import MediaSetOutput
from pyspark.sql import functions as F
import os
import logging
from conjure_python_client._http.requests_client import ConjureHTTPError
from urllib3.exceptions import UnrewindableBodyError


logger = logging.getLogger(__name__)


@configure(profile=[
    "EXECUTOR_MEMORY_MEDIUM",
    "DYNAMIC_ALLOCATION_ENABLED",
    "DYNAMIC_ALLOCATION_MIN_16",
    "DRIVER_MEMORY_OVERHEAD_LARGE",
    "EXECUTOR_MEMORY_OVERHEAD_MEDIUM",
    "EXECUTOR_CORES_SMALL",
    "DRIVER_MEMORY_MEDIUM",
    "DRIVER_CORES_MEDIUM"])
@incremental(semantic_version=2, snapshot_inputs=["pdfs_input"], v2_semantics=True)
@transform(
    metadata_output=Output("..."),
    media_set_output=MediaSetOutput("{**Transactional** media set}"),
    pdfs_input=Input({**4M PDFs** dataset})
)
def compute(ctx, pdfs_input, metadata_output, media_set_output, limit_rows=True, limit=10000):
    fs = pdfs_input.filesystem()
    df = fs.files()  # Get files as a dataframe of metadata
    df = df.withColumn(
        'modified',
        F.from_unixtime(F.col('modified') / 1000).cast("timestamp")
    )  # Convert last modified time (UTC) from long to timestamp
    df = df.withColumn('processed_at', F.current_timestamp())  # Add a timestamp

    if hasattr(ctx, '_is_incremental'):
        df = df.withColumn('is_incremental', F.lit(ctx.is_incremental))  # Check if build is running incrementally
        if ctx.is_incremental:
            output_df_previous_dataframe = metadata_output.dataframe('previous')
            df = df.join(output_df_previous_dataframe, how="left_anti", on="path")
            if df.count() == 0:
                ctx.abort_job()  # This stops the build schedule's infinite loop after processing all input pdfs
    else:
        df = df.withColumn('is_incremental', F.lit(False))

    # Conditional row limiting based on the parameter
    if limit_rows:
        df = df.orderBy(F.col("modified").desc(), F.col("path").desc()).limit(limit)

    metadata_output.write_dataframe(df)  # Write output dataset

    # Define media set converter function
    def convert_file(current_file):
        try:
            with fs.open(current_file.path, 'rb') as f:
                filename = os.path.basename(current_file.path)
                media_set_output.put_media_item(f, filename)
        except ConjureHTTPError:
            logger.warning("ConjureHTTPError: " + current_file.path)
        except UnrewindableBodyError:
            logger.warning("UnrewindableBodyError: " + current_file.path)

    # Convert to media set
    df.repartition(64).foreach(convert_file)