In short: You can process chunk by chunk, with some custom code.
The answer I’ll give is not-LLM specificy, and is usually met once you have very large volume of data (TB). In case of LLM, given the per-row processing is fairly slow, you can indeed face the same kind of issues earlier.
If you use code repositories, you can “distill” an incremental into multiple builds.
The high level principle is fairly simple. You will have 3 datasets:
- [A] The original dataset with all the data
- [B] The “distilling” dataset - which will have rows appended “by chunk”, e.g. 10k by 10k.
- [C] The output dataset, with the result of the computation, always appending
The idea is that you regularly trigger a build containing B and C, a batch of news rows will be appended to B and C will process them. Repeat as many time as needed.
Example code that defines dataset B:
@incremental(snapshot_inputs=["input_dataset"], semantic_version=1)
# @transform to have more control over inputs and outputs.
@transform(
output_dataset = Output(".../rows_by_chunk"),
input_dataset=Input(".../all_rows"),
)
def compute_snapshot_to_incremental(ctx, input_dataset, output_dataset):
# We enforce the read of the input dataframe as a snapshot
# So that, if it were to one day run as incremental, the logic here won't break.
input_df_all_dataframe = input_dataset.dataframe(mode="current")
if ctx._is_incremental:
# We read the current output to see what we already processed in previous builds
# Note: We have to specify the schema for the first run
# you can get the below schema by going on the dataset preview, schema, copy, as Python StructTypes - here I leave the schema of a mediaset for reference.
out_schema = T.StructType([
T.StructField('mediaItemRid', T.StringType()),
T.StructField('path', T.StringType()),
T.StructField('mediaReference', T.StringType()),
T.StructField('text', T.StringType()),
T.StructField('first_line', T.StringType()),
T.StructField("incremental_ts", T.TimestampType(), True)
])
output_df_previous_dataframe = output_dataset.dataframe('current', out_schema)
# We diff the input with the current output, to find the "new rows".
# We do this with a LEFT ANTI join : A - B <==> A LEFT ANTI B
KEY = ["path"] # the primary keys of the rows
new_rows_df = input_df_all_dataframe.join(output_df_previous_dataframe, how="left_anti", on=KEY)
else:
# On first run
new_rows_df = input_df_all_dataframe
# We had a timestamp for easier tracking/debugging/understanding of the example
new_rows_df = new_rows_df.withColumn('incremental_ts', F.current_timestamp())
# We limit at 1k rows
new_rows_df = new_rows_df.limit(1000)
# ==== End of example processing ====
# This will append rows
output_dataset.set_mode("modify")
output_dataset.write_dataframe(new_rows_df)
1000 rows will be added on each run (configurable).