The general idea is that you can filter for the exact cut of files you want to process “manually”, and to force to write on the output via set_mode("modify")
on dataset and set_write_mode("modify")
on mediasets, which is respected even in the case of an incremental transform that “wants” to snapshot for some reason.
If the incremental transform tries to snapshot (e.g. you bumped the semantic version) you can still write your output as an append !
However, reading your output will yield to an empty dataframe (e.g. if you do my_output.filesystems().files() it will be empty).
Case 1 - with a mediaset as input
The exact code snippet to do so is:
list_media_filtered: DataFrame = input_mediaset.list_media_items(mode="current").filter( F.col("logicalOrdering") >= 1742579979575000)
This allow to filter the items of the mediaset by timestamp, and so to granularly control what to include or not.
To obtain the timestamp, you can create a quick pipeline builder, and use the transform “convert media sets to table rows”
You need to append three zeroes to the obtained timestamp !
- Correct: 1742495355003950
- Incorrect: 1742546845951
The below transform copy paste files from one input mediaset to an output mediaset, and logs each file in an additional output dataset.
import logging
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
# from pyspark.sql import functions as F
from transforms.api import Input, Output, incremental, transform
from transforms.mediasets import MediaSetInput, MediaSetOutput
def process_normally(list_of_medias, input_mediaset, output_mediaset, output_dataset):
# See https://www.palantir.com/docs/foundry/transforms-python/media-sets
def upload_pdfs(media_item_rid, path):
with input_mediaset.get_media_item(media_item_rid) as curr_file:
response = output_mediaset.put_media_item(curr_file, path)
return response.media_item_rid
upload_udf = F.udf(upload_pdfs, StringType())
media_reference_template = output_mediaset.media_reference_template()
uploaded_pdfs = (
list_of_medias.withColumn("uploaded_media_item_rid", upload_udf(F.col("mediaItemRid"), F.col("path")))
.select("path", "uploaded_media_item_rid")
.withColumn("mediaReference", F.format_string(media_reference_template, "uploaded_media_item_rid"))
)
# Add current timestamp as a new column
uploaded_pdfs = uploaded_pdfs.withColumn("current_ts", F.current_timestamp())
column_typeclasses = {"mediaReference": [{"kind": "reference", "name": "media_reference"}]}
output_dataset.write_dataframe(uploaded_pdfs, column_typeclasses=column_typeclasses)
def fix_pointers(input_mediaset, output_mediaset, output_dataset):
list_media_filtered: DataFrame = input_mediaset.list_media_items(mode="current").filter(
F.col("logicalOrdering") >= 1742579979575000
)
logging.warning(f"show : {list_media_filtered.collect()}")
logging.warning(f"column : {list_media_filtered.columns}")
# As the transform is in "incremental" mode, but will try to snapshot the output, we enforce an "append" on the outputs
output_dataset.set_mode("modify")
output_mediaset.set_write_mode("modify")
# do the code
process_normally(list_media_filtered, input_mediaset, output_mediaset, output_dataset)
@incremental(v2_semantics=True, semantic_version=2)
@transform(
output_dataset=Output("ri.foundry.main.dataset.11eff7fb-e9c6-4d13-9344-04736e6f46b6"),
output_mediaset=MediaSetOutput("ri.mio.main.media-set.f60d8552-7d45-44b0-8752-d017e40526f1"),
input_mediaset=MediaSetInput("ri.mio.main.media-set.bbcb55bc-eafc-4d34-80b9-3a010d324c0b"),
)
def compute(ctx, input_mediaset, output_dataset, output_mediaset):
# Do the normal process like "everyday"
# list_of_medias = input_mediaset.list_media_items_by_path(ctx)
# process_normally(list_of_medias, input_mediaset, output_mediaset, output_dataset)
# Alternative processing:
fix_pointers(input_mediaset, output_mediaset, output_dataset)
Case 2 - with a dataset as input
The important line is input_dataset.filesystem().files().filter(F.col("modified") > 1742831163000)
which allows to filter for the raw input files that are “after” a specific timestamp. Of course the transform needs to operate on the file level (e.g. via a mapPartitions)
import logging
import os
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.types import StringType
# from pyspark.sql import functions as F
from transforms.api import Input, Output, incremental, transform
from transforms.mediasets import MediaSetInput, MediaSetOutput
def process_normally(input_dataset, output_mediaset, output_dataset):
# See https://www.palantir.com/docs/foundry/transforms-python/media-sets
# all_files = list(input_dataset.filesystem().ls())
# for current_file in all_files:
# with input_dataset.filesystem().open(current_file.path, 'rb') as f:
# filename = os.path.basename(current_file.path)
# response = output_mediaset.put_media_item(f, filename)
# # return response.media_item_rid
def upload_pdf(current_file):
with input_dataset.filesystem().open(current_file, "rb") as f:
filename = os.path.basename(current_file)
response = output_mediaset.put_media_item(f, filename)
return response.media_item_rid
upload_udf = F.udf(upload_pdf, T.StringType())
media_reference_template = output_mediaset.media_reference_template()
uploaded_pdfs = (
input_dataset.filesystem().files().filter(F.col("modified") > 1742831163000)
.withColumn("uploaded_media_item_rid", upload_udf(F.col("path")))
.select("path", "uploaded_media_item_rid")
.withColumn("mediaReference", F.format_string(media_reference_template, "uploaded_media_item_rid"))
)
# Add current timestamp as a new column
uploaded_pdfs = uploaded_pdfs.withColumn("current_ts", F.current_timestamp())
column_typeclasses = {"mediaReference": [{"kind": "reference", "name": "media_reference"}]}
output_dataset.write_dataframe(uploaded_pdfs, column_typeclasses=column_typeclasses)
def fix_pointers(input_dataset, output_mediaset, output_dataset):
# Note: the filtering could happen here instead:
# input_dataset.filesystem().files().filter(F.col("modified") > 1742831163000)
# As the transform is in "incremental" mode, but will try to snapshot the output, we enforce an "append" on the outputs
output_dataset.set_mode("modify")
output_mediaset.set_write_mode("modify")
# do the code
process_normally(input_dataset, output_mediaset, output_dataset)
@incremental(v2_semantics=True, semantic_version=3)
@transform(
output_dataset=Output("ri.foundry.main.dataset.16d0fa2e-ff21-4341-bfed-6ce961984b8c"),
output_mediaset=MediaSetOutput("ri.mio.main.media-set.39b7c986-1ca7-4256-ab55-0a61cbaa7143"),
input_dataset=Input("ri.foundry.main.dataset.f20ec271-7b9b-4961-bda7-1b7f965b3a12"),
)
def compute(ctx, input_dataset, output_dataset, output_mediaset):
# Do the normal process like "everyday"
# process_normally(input_dataset, output_mediaset, output_dataset)
# Alternative processing:
fix_pointers(input_dataset, output_mediaset, output_dataset)