Intermediate files on Foundry for high memory processes

I am having trouble running HDBSCAN for my entire input dataset, I keep running out of memory. I have tested and saw that I can run 25% of the input in 5% batches. I would like to run 4 different chunks to capture the full dataset - without using a for loop for the 4. I want to make sure I am minimizing the amount of memory I am using so I wanted to create intermediate files that hold the 25% data to make sure none is lost - then create a union of the 4. I am starting to realize that I may not know how to store the intermediates correctly or unionize them correctly in foundry.

My final dataset is not a dataframe but is a log file and spark parquet file. I do not know why it is not resulting in the unionized dataframe I specified for. Any help would be appreciated. Thank you.

  1. Import Libraries:
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.types import (
    ArrayType,
    StringType,
    StructField,
    StructType,
)
from sklearn.cluster import HDBSCAN
from transforms.api import Input, Output, configure, transform_df
  1. Configure Transform:
@configure(
    profile=[
        "DYNAMIC_ALLOCATION_ENABLED_32_64",
        "DRIVER_MEMORY_LARGE",
        "DRIVER_MEMORY_OVERHEAD_LARGE",
        "EXECUTOR_MEMORY_LARGE",
    ]
)
  1. Define Transform Function:
@transform_df(
    Output("path"),
    grant_appls=Input("path")
)
  1. Start Spark Session:
def compute(grant_appls, ctx):
    # start spark session
    spark = ctx.spark_session
  1. Define hdbscan Function:
   def hdbscan(agenda, ids, embeddings, eps):
        model = HDBSCAN(
            min_cluster_size=2,
            min_samples=2,
            cluster_selection_epsilon=0.15,
            max_cluster_size=500,
            metric="cosine",
            allow_single_cluster=False,
            leaf_size=90,
        )
        clusters = model.fit_predict(embeddings)

 # numpy array to list for spark df and grouping by clusters
        clusters = clusters.tolist()

        # assign new cluster values to noise points (-1)
        max_cluster_id = max(clusters)  # find the maximum cluster value
        new_cluster_id = max_cluster_id + 1  # increment by 1

        for i, cluster in enumerate(clusters):
            if cluster == -1:
                clusters[i] = new_cluster_id
                new_cluster_id += 1
            else:
                continue

        cluster_pk = [f"{agenda}_{str(cluster)}" for cluster in clusters]

        # dataframe for zip and grouping
        cluster_schema = StructType(
            [
                StructField("cluster_pk", StringType()),
                StructField("ids", ArrayType(StringType())),
]
        )

        # create spark df from lists
        cluster_df = spark.createDataFrame(
            zip(cluster_pk, ids),
            schema=cluster_schema,
        )

        # groupby clusters for ids
        df = cluster_df.groupBy("cluster_pk").agg(
             F.collect_list("ids").alias("ids")
        )

        return df

  1. Define process_chunks Function:
    • Define a function to process data in chunks, performing clustering and writing results.
    def process_chunks(iteration_start, iteration_end, chunk_id):
        chunk_df = spark.createDataFrame([], schema=final_schema)
        cumulative_processed = 0

        for start in range(iteration_start, iteration_end, batch_size):
            end = min(start + batch_size, iteration_end)
            batch_agenda = agenda_list[start:end]

            # Process each batch
            for num in batch_agenda:
                num = str(num)  # cast to string
                df = grant_appls.where(F.col("agenda") == num)
                df_list = df.select("id", "abstract_text_embedding").collect()
                ids = [row.id for row in df_list]  # get ids as list
                embeddings = np.array(
                    [row.embedding for row in df_list]
                )  # get embeddings as numpy array

                if len(embeddings) > 1:
                    df = hdbscan(num, ids, embeddings, eps=0.15)
                    chunk_df = chunk_df.unionByName(df)
                else:
                    print(f"Skipping agenda {num} due to insufficient samples")

            cumulative_processed += len(batch_agenda)
            print(f"Processed {cumulative_processed} agendas")

            # Write intermediate results
            intermediate_path = ctx.intermediate_path(
                f"/path/chunk_{chunk_id}.parquet"
            )
            chunk_df.write.mode("overwrite").parquet(intermediate_path)

        return intermediate_path
  1. Set Up Schema and Data Preparation:
    • Define the schema for the final DataFrame.
    • Collect unique agendas for batching.
  # empty dataframe with the same schema as the output of hdbscan func
  final_schema = StructType(
      [
          StructField("cluster_pk", StringType()),
          StructField("ids", ArrayType(StringType())),
      ]
  )

  unique_agendas = grant_appls.select("agenda").distinct()
  agenda_list = [row.agenda for row in unique_agendas.collect()]
  1. Process Data in Batches:
    • Calculate batch size and process data in 5% chunks.
    num_agenda = len(agenda_list)
    batch_size = max(1, int(num_agenda * 0.05))  # 5% batch size
    max_batches = int(num_agenda * 0.25)

    final_df = spark.createDataFrame([], schema=final_schema)
  1. Execute Chunk Processing:
    • Call process_chunks for each batch and collect intermediate results.
    chunk1 = process_chunks(0, max_batches, 1)
    chunk2 = process_chunks(max_batches, 2 * max_batches, 2)
    chunk3 = process_chunks(2 * max_batches, 3 * max_batches, 3)
    chunk4 = process_chunks(3 * max_batches, 4 * max_batches, 4)

  1. Combine Intermediate Results:
    • Read each intermediate CSV, union them, and form the final DataFrame.
    for chunk in [chunk1, chunk2, chunk3, chunk4]:
        chunk_df = spark.read.schema(final_schema).parquet(chunk)
        if chunk_df.count() > 0:
            final_df = final_df.unionByName(chunk_df)
        else:
            print(f"chunk {chunk} is empty")

  1. Return Final DataFrame:
    • Return the combined DataFrame as the output of the transform.
 if final_df.count() == 0:
     print("final dataframe is empty")
 else:
     print(f"final dataframe has {final_df.count()} rows")

 return final_df

Foundry doesn’t let you write a file out unless you are using the filesystem api of the Output() obejcts, so i think your process chunks function is failing silently when you are trying to write to file.

The way i have handled a need to chunk in the past is by creating a chunk group number as a new column, e.g. F.floor(F.row_number()/chunk_size). Then you can either use a pandas udf on a groupby(‘chunk’).applyInPandas(your_pandas_function()), or if you really want to you can loop over it and union the results, but if you do that make sure you cache the original dataframe, and probably checkpoint on each loop iteration to truncate execution plan complexity. ApplyInPandas is probably the better way.