Thank you so much @sandpiper! df.repartition(32).foreach(convert_file) solved this issue. This reduced the foreach stage of the build from 1h 45m to 7m! This brings the total ETA from 45 days to 10-15 days due to the other stages, which I’m working on troubleshooting now. Will post updates, but also pasting latest code for reference.
# Modified from CodeStrap's post at https://community.palantir.com/t/pipeline-builder-incremental-mediasets-pdf-extraction/1279/5
from transforms.api import transform, Input, Output, incremental, configure
from transforms.mediasets import MediaSetOutput
from pyspark.sql import functions as F
import os
import logging
from conjure_python_client._http.requests_client import ConjureHTTPError
from urllib3.exceptions import UnrewindableBodyError
logger = logging.getLogger(__name__)
@configure(profile=[
"EXECUTOR_MEMORY_MEDIUM",
"DYNAMIC_ALLOCATION_ENABLED",
"DYNAMIC_ALLOCATION_MIN_16",
"DRIVER_MEMORY_OVERHEAD_LARGE",
"EXECUTOR_MEMORY_OVERHEAD_MEDIUM",
"EXECUTOR_CORES_SMALL",
"DRIVER_MEMORY_MEDIUM",
"DRIVER_CORES_MEDIUM"])
@incremental(semantic_version=2, snapshot_inputs=["pdfs_input"], v2_semantics=True)
@transform(
metadata_output=Output("..."),
media_set_output=MediaSetOutput("{**Transactional** media set}"),
pdfs_input=Input({**4M PDFs** dataset})
)
def compute(ctx, pdfs_input, metadata_output, media_set_output, limit_rows=True, limit=10000):
fs = pdfs_input.filesystem()
df = fs.files() # Get files as a dataframe of metadata
df = df.withColumn(
'modified',
F.from_unixtime(F.col('modified') / 1000).cast("timestamp")
) # Convert last modified time (UTC) from long to timestamp
df = df.withColumn('processed_at', F.current_timestamp()) # Add a timestamp
if hasattr(ctx, '_is_incremental'):
df = df.withColumn('is_incremental', F.lit(ctx.is_incremental)) # Check if build is running incrementally
if ctx.is_incremental:
output_df_previous_dataframe = metadata_output.dataframe('previous')
df = df.join(output_df_previous_dataframe, how="left_anti", on="path")
if df.count() == 0:
ctx.abort_job() # This stops the build schedule's infinite loop after processing all input pdfs
else:
df = df.withColumn('is_incremental', F.lit(False))
# Conditional row limiting based on the parameter
if limit_rows:
df = df.orderBy(F.col("modified").desc(), F.col("path").desc()).limit(limit)
metadata_output.write_dataframe(df) # Write output dataset
# Define media set converter function
def convert_file(current_file):
try:
with fs.open(current_file.path, 'rb') as f:
filename = os.path.basename(current_file.path)
media_set_output.put_media_item(f, filename)
except ConjureHTTPError:
logger.warning("ConjureHTTPError: " + current_file.path)
except UnrewindableBodyError:
logger.warning("UnrewindableBodyError: " + current_file.path)
# Convert to media set
df.repartition(64).foreach(convert_file)