Need help with parallelization when using filesystem()

I have a working PySpark file that writes a subset of files from a dataset to a transactional media set. It uses the second example from the docs here; however, the for loop is running on a single executor, and at this rate it would take 1-2 months to convert all the PDFs.

In the code below, I try to omit the for loop by using put_dataset_files() instead, but I get an error that “A DataFrame object does not have an attribute filesystem”.

Is it possible to either (1) convert input.filesystem().files() back to an input format (or other format compatible with put_dataset_files) or (2) replace a for loop with something that runs in parallel?

    fs = pdfs_input.filesystem()
    df = fs.files()  # Get files as a dataframe of metadata
    df = df.limit(10)
    # **df= df.toInputOrToOutputOrToSomeOtherMagicHere**

    # Output PDFs to a media set
    media_set_output.put_dataset_files(
        df,  # ERROR HERE: A "DataFrame" object does not have an attribute "filesystem"
        ignore_items_not_matching_schema=True,
        ignore_items_failing_to_convert=True
    )

You should be able to replace the for-loop on the driver with the Spark foreach method, as below:

def put_file(f)
    with fs.open(f.path, 'rb') as f:
        filename = os.path.basename(f.path)
        output_mediaset.put_media_item(f, filename)

df.foreach(put_file)

This will run in parallel on the executors.

Hi @sandpiper, I tried your suggestion, and I think it still ran the foreach on one executor. Please see below. Is there any information I can give you that would help troubleshoot this issue?

Full code:

# Modified from CodeStrap's post at https://community.palantir.com/t/pipeline-builder-incremental-mediasets-pdf-extraction/1279/5
from transforms.api import transform, Input, Output, incremental, configure
from transforms.mediasets import MediaSetOutput
from pyspark.sql import functions as F
import os
import logging
from conjure_python_client._http.requests_client import ConjureHTTPError
from urllib3.exceptions import UnrewindableBodyError


logger = logging.getLogger(__name__)


@configure(profile=[
    "EXECUTOR_MEMORY_MEDIUM",
    "DYNAMIC_ALLOCATION_ENABLED",
    "DYNAMIC_ALLOCATION_MIN_16",
    "DRIVER_MEMORY_OVERHEAD_LARGE",
    "EXECUTOR_MEMORY_OVERHEAD_MEDIUM",
    "EXECUTOR_CORES_SMALL",
    "DRIVER_MEMORY_MEDIUM",
    "DRIVER_CORES_MEDIUM"])
@incremental(snapshot_inputs=["pdfs_input"], v2_semantics=True)
@transform(
    metadata_output=Output("ri.foundry.main.dataset.aa1e8a25-3252-4e48-8b88-db441bd065bc"),
    media_set_output=MediaSetOutput("ri.mio.main.media-set.e8d1c25d-1d94-468e-981d-62f53aa19f17"),
    pdfs_input=Input("ri.foundry.main.dataset.052024ea-9eb0-44b5-9033-afbc7bc1a99d")
)
def compute(ctx, pdfs_input, metadata_output, media_set_output, limit_rows=True, limit=10000):
    fs = pdfs_input.filesystem()
    df = fs.files()  # Get files as a dataframe of metadata
    df = df.withColumn(
        'modified',
        F.from_unixtime(F.col('modified') / 1000).cast("timestamp")
    )  # Convert last modified time (UTC) from long to timestamp
    df = df.withColumn('processed_at', F.current_timestamp())  # Add a timestamp

    if hasattr(ctx, '_is_incremental'):
        df = df.withColumn('is_incremental', F.lit(ctx.is_incremental))  # Check if build is running incrementally
        if ctx.is_incremental:
            output_df_previous_dataframe = metadata_output.dataframe('previous')
            df = df.join(output_df_previous_dataframe, how="left_anti", on="path")
            if df.count() == 0:
                ctx.abort_job()  # This stops the build schedule's infinite loop after processing all input pdfs
    else:
        df = df.withColumn('is_incremental', F.lit(False))

    # Conditional row limiting based on the parameter
    if limit_rows:
        df = df.orderBy(F.col("modified").desc(), F.col("path").desc()).limit(limit)

    metadata_output.write_dataframe(df)  # Write output dataset

    # Define media set converter function
    def convert_file(current_file):
        try:
            with fs.open(current_file.path, 'rb') as f:
                filename = os.path.basename(current_file.path)
                media_set_output.put_media_item(f, filename)
        except ConjureHTTPError:
            logger.warning("ConjureHTTPError: " + current_file.path)
        except UnrewindableBodyError:
            logger.warning("UnrewindableBodyError: " + current_file.path)

    # Convert to media set
    df.foreach(convert_file)

@Joel
when you do put_dataset_files, you should directly pass in the dataset input instead of df.
If your pdfs_input is a dataset of files, you can try
media_set_output.put_dataset_files(
pdfs_input,
ignore_items_not_matching_schema=True,
ignore_items_failing_to_convert=True
)

put_dataset_files can handle things in parallel.

@sarahwang, my pdfs_input is too large (4M+ PDFs) and errored out when I tried put_dataset_files previously. In my full code, I use the below logic to reduce the size of the input. Is there a way to then bring the filtered df back to a format that put_dataset_files can accept? If not, would you write the filtered files to a new dataset, and then pass that to put_dataset_files?

df = pdfs_input.filesystem().files
df = df.join(paths_already_processed_df, how="left_anti", on="path")
df = df.limit(10000)

How about try to specify the media set file size limit?
def put_dataset_files(
self,
dataset_input: TransformInput,
ignore_items_not_matching_schema: bool = False,
ignore_items_failing_to_convert: bool = False,
file_name_pattern: str = None,
file_size_limit_bytes: int = None,
):
“”"
Copies the files in the input dataset into this media set.
Additional options:
ignore_items_not_matching_schema: Unless set to true, will error if the file does not match
the expected schema of the media set
file_name_pattern: A UNIX-style file name pattern may be specified to filter the items from input dataset
file_size_limit_bytes: Files whose size in bytes exceeds this limit will be ignored

    """

Your dataframe may only have one partition (possibly as a consequence of the limit operation). Try this:

df.repartition(32).foreach(convert_file)

Note that one could imagine (though I don’t have concrete evidence of this, and I’m fairly sure it’s not true) that automatic coalescing to one partition also happens during the df → rdd conversion that happens as part of foreach. To be extra safe, then, you can try the following:

df.rdd.repartition(32).foreach(convert_file)

Thank you so much @sandpiper! df.repartition(32).foreach(convert_file) solved this issue. This reduced the foreach stage of the build from 1h 45m to 7m! This brings the total ETA from 45 days to 10-15 days due to the other stages, which I’m working on troubleshooting now. Will post updates, but also pasting latest code for reference.

# Modified from CodeStrap's post at https://community.palantir.com/t/pipeline-builder-incremental-mediasets-pdf-extraction/1279/5
from transforms.api import transform, Input, Output, incremental, configure
from transforms.mediasets import MediaSetOutput
from pyspark.sql import functions as F
import os
import logging
from conjure_python_client._http.requests_client import ConjureHTTPError
from urllib3.exceptions import UnrewindableBodyError


logger = logging.getLogger(__name__)


@configure(profile=[
    "EXECUTOR_MEMORY_MEDIUM",
    "DYNAMIC_ALLOCATION_ENABLED",
    "DYNAMIC_ALLOCATION_MIN_16",
    "DRIVER_MEMORY_OVERHEAD_LARGE",
    "EXECUTOR_MEMORY_OVERHEAD_MEDIUM",
    "EXECUTOR_CORES_SMALL",
    "DRIVER_MEMORY_MEDIUM",
    "DRIVER_CORES_MEDIUM"])
@incremental(semantic_version=2, snapshot_inputs=["pdfs_input"], v2_semantics=True)
@transform(
    metadata_output=Output("..."),
    media_set_output=MediaSetOutput("{**Transactional** media set}"),
    pdfs_input=Input({**4M PDFs** dataset})
)
def compute(ctx, pdfs_input, metadata_output, media_set_output, limit_rows=True, limit=10000):
    fs = pdfs_input.filesystem()
    df = fs.files()  # Get files as a dataframe of metadata
    df = df.withColumn(
        'modified',
        F.from_unixtime(F.col('modified') / 1000).cast("timestamp")
    )  # Convert last modified time (UTC) from long to timestamp
    df = df.withColumn('processed_at', F.current_timestamp())  # Add a timestamp

    if hasattr(ctx, '_is_incremental'):
        df = df.withColumn('is_incremental', F.lit(ctx.is_incremental))  # Check if build is running incrementally
        if ctx.is_incremental:
            output_df_previous_dataframe = metadata_output.dataframe('previous')
            df = df.join(output_df_previous_dataframe, how="left_anti", on="path")
            if df.count() == 0:
                ctx.abort_job()  # This stops the build schedule's infinite loop after processing all input pdfs
    else:
        df = df.withColumn('is_incremental', F.lit(False))

    # Conditional row limiting based on the parameter
    if limit_rows:
        df = df.orderBy(F.col("modified").desc(), F.col("path").desc()).limit(limit)

    metadata_output.write_dataframe(df)  # Write output dataset

    # Define media set converter function
    def convert_file(current_file):
        try:
            with fs.open(current_file.path, 'rb') as f:
                filename = os.path.basename(current_file.path)
                media_set_output.put_media_item(f, filename)
        except ConjureHTTPError:
            logger.warning("ConjureHTTPError: " + current_file.path)
        except UnrewindableBodyError:
            logger.warning("UnrewindableBodyError: " + current_file.path)

    # Convert to media set
    df.repartition(64).foreach(convert_file)

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.