I am having trouble running HDBSCAN for my entire input dataset, I keep running out of memory. I have tested and saw that I can run 25% of the input in 5% batches. I would like to run 4 different chunks to capture the full dataset - without using a for loop for the 4. I want to make sure I am minimizing the amount of memory I am using so I wanted to create intermediate files that hold the 25% data to make sure none is lost - then create a union of the 4. I am starting to realize that I may not know how to store the intermediates correctly or unionize them correctly in foundry.
My final dataset is not a dataframe but is a log file and spark parquet file. I do not know why it is not resulting in the unionized dataframe I specified for. Any help would be appreciated. Thank you.
- Import Libraries:
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.types import (
ArrayType,
StringType,
StructField,
StructType,
)
from sklearn.cluster import HDBSCAN
from transforms.api import Input, Output, configure, transform_df
- Configure Transform:
@configure(
profile=[
"DYNAMIC_ALLOCATION_ENABLED_32_64",
"DRIVER_MEMORY_LARGE",
"DRIVER_MEMORY_OVERHEAD_LARGE",
"EXECUTOR_MEMORY_LARGE",
]
)
- Define Transform Function:
@transform_df(
Output("path"),
grant_appls=Input("path")
)
- Start Spark Session:
def compute(grant_appls, ctx):
# start spark session
spark = ctx.spark_session
- Define
hdbscan
Function:
def hdbscan(agenda, ids, embeddings, eps):
model = HDBSCAN(
min_cluster_size=2,
min_samples=2,
cluster_selection_epsilon=0.15,
max_cluster_size=500,
metric="cosine",
allow_single_cluster=False,
leaf_size=90,
)
clusters = model.fit_predict(embeddings)
# numpy array to list for spark df and grouping by clusters
clusters = clusters.tolist()
# assign new cluster values to noise points (-1)
max_cluster_id = max(clusters) # find the maximum cluster value
new_cluster_id = max_cluster_id + 1 # increment by 1
for i, cluster in enumerate(clusters):
if cluster == -1:
clusters[i] = new_cluster_id
new_cluster_id += 1
else:
continue
cluster_pk = [f"{agenda}_{str(cluster)}" for cluster in clusters]
# dataframe for zip and grouping
cluster_schema = StructType(
[
StructField("cluster_pk", StringType()),
StructField("ids", ArrayType(StringType())),
]
)
# create spark df from lists
cluster_df = spark.createDataFrame(
zip(cluster_pk, ids),
schema=cluster_schema,
)
# groupby clusters for ids
df = cluster_df.groupBy("cluster_pk").agg(
F.collect_list("ids").alias("ids")
)
return df
- Define
process_chunks
Function:- Define a function to process data in chunks, performing clustering and writing results.
def process_chunks(iteration_start, iteration_end, chunk_id):
chunk_df = spark.createDataFrame([], schema=final_schema)
cumulative_processed = 0
for start in range(iteration_start, iteration_end, batch_size):
end = min(start + batch_size, iteration_end)
batch_agenda = agenda_list[start:end]
# Process each batch
for num in batch_agenda:
num = str(num) # cast to string
df = grant_appls.where(F.col("agenda") == num)
df_list = df.select("id", "abstract_text_embedding").collect()
ids = [row.id for row in df_list] # get ids as list
embeddings = np.array(
[row.embedding for row in df_list]
) # get embeddings as numpy array
if len(embeddings) > 1:
df = hdbscan(num, ids, embeddings, eps=0.15)
chunk_df = chunk_df.unionByName(df)
else:
print(f"Skipping agenda {num} due to insufficient samples")
cumulative_processed += len(batch_agenda)
print(f"Processed {cumulative_processed} agendas")
# Write intermediate results
intermediate_path = ctx.intermediate_path(
f"/path/chunk_{chunk_id}.parquet"
)
chunk_df.write.mode("overwrite").parquet(intermediate_path)
return intermediate_path
- Set Up Schema and Data Preparation:
- Define the schema for the final DataFrame.
- Collect unique agendas for batching.
# empty dataframe with the same schema as the output of hdbscan func
final_schema = StructType(
[
StructField("cluster_pk", StringType()),
StructField("ids", ArrayType(StringType())),
]
)
unique_agendas = grant_appls.select("agenda").distinct()
agenda_list = [row.agenda for row in unique_agendas.collect()]
- Process Data in Batches:
- Calculate batch size and process data in 5% chunks.
num_agenda = len(agenda_list)
batch_size = max(1, int(num_agenda * 0.05)) # 5% batch size
max_batches = int(num_agenda * 0.25)
final_df = spark.createDataFrame([], schema=final_schema)
- Execute Chunk Processing:
- Call
process_chunks
for each batch and collect intermediate results.
- Call
chunk1 = process_chunks(0, max_batches, 1)
chunk2 = process_chunks(max_batches, 2 * max_batches, 2)
chunk3 = process_chunks(2 * max_batches, 3 * max_batches, 3)
chunk4 = process_chunks(3 * max_batches, 4 * max_batches, 4)
- Combine Intermediate Results:
- Read each intermediate CSV, union them, and form the final DataFrame.
for chunk in [chunk1, chunk2, chunk3, chunk4]:
chunk_df = spark.read.schema(final_schema).parquet(chunk)
if chunk_df.count() > 0:
final_df = final_df.unionByName(chunk_df)
else:
print(f"chunk {chunk} is empty")
- Return Final DataFrame:
- Return the combined DataFrame as the output of the transform.
if final_df.count() == 0:
print("final dataframe is empty")
else:
print(f"final dataframe has {final_df.count()} rows")
return final_df