Optimizing PDF Transformation Workflow: How to Process Only the Latest Uploaded Document in a Media Set?

Hello,

I am developing a workshop where users can upload PDF documents, which are then processed and transformed in the backend. Currently, when a PDF is uploaded, it is placed into a media set, and the entire media set undergoes transformation. This approach is both time-consuming and resource-intensive.

I am seeking advice on how to modify this workflow so that only the most recently uploaded PDF is transformed, rather than reprocessing the entire media set. Given that media sets do not support incremental pipeline builds, I am looking for suggestions on how to achieve more efficient, incremental processing.

Hey @etuhabonye the media set team will be working on adding incremental media set support in the coming weeks.

In the meantime potentially you could upload the new sets of documentations separately and union those together. I’ll also let the media set team comment on the above and see if there is another workaround that’s more preferred

Hi @helenq,

Following up on this, is there a way to incrementally build a media set? I just gave it a try and received this error, but I’m not sure if there’s another method I should try:

Full code:

from transforms.api import transform, Input, incremental  # configure, ComputeBackend
from transforms.mediasets import MediaSetOutput


# @configure(profile=["EXECUTOR_MEMORY_MEDIUM", "EXECUTOR_MEMORY_OFFHEAP_FRACTION_HIGH"], backend=ComputeBackend.VELOX)
@incremental()
@transform(
    pdfs_dataset=Input("ri.foundry.main.dataset.acffb422-8df5-43a4-b26e-541ba8965715"),
    pdfs_media_set=MediaSetOutput(
        "ri.mio.main.media-set.6c60b58f-1186-4e54-bbdd-8fe57c6b569f"
    ),
)
def upload_to_media_set(pdfs_dataset, pdfs_media_set):
    pdfs_media_set.put_dataset_files(
        pdfs_dataset, ignore_items_not_matching_schema=False
    )

Thanks,
Joel

Hey Joel, this should be possible in Transforms Python using the incremental decorator with v2_semantics=true

Progress! v2_semantics=True successfully built my first incremental media set. Code below:

from transforms.api import transform, Input, incremental  # configure, ComputeBackend
from transforms.mediasets import MediaSetOutput


# @configure(profile=["EXECUTOR_MEMORY_MEDIUM", "EXECUTOR_MEMORY_OFFHEAP_FRACTION_HIGH"], backend=ComputeBackend.VELOX)
@incremental(v2_semantics=True)
@transform(
    pdfs_dataset=Input("{test dataset}"),
    pdfs_media_set=MediaSetOutput(
        "{transactional media set}"
    ),
)
def upload_to_media_set(pdfs_dataset, pdfs_media_set):
    pdfs_media_set.put_dataset_files(
        pdfs_dataset, ignore_items_not_matching_schema=False
    )

That worked on a small, test input dataset. Now I need to find a solution for my larger dataset. (I’m testing next on 20K PDFs; the full dataset is ~4M PDFs.) The code needs to be updated because the larger input is too big for the 10K limit per transaction:

How do I build incrementally with a transactionless media set output? Just changing the output in the code above gives this error:

Adding should_snapshot=True (below) gives the below error:

from transforms.api import transform, Input, incremental  # configure, ComputeBackend
from transforms.mediasets import MediaSetOutput


# @configure(profile=["EXECUTOR_MEMORY_MEDIUM", "EXECUTOR_MEMORY_OFFHEAP_FRACTION_HIGH"], backend=ComputeBackend.VELOX)
@incremental(v2_semantics=True)
@transform(
    pdfs_dataset=Input("ri.foundry.main.dataset.acffb422-8df5-43a4-b26e-541ba8965715"),
    pdfs_media_set=MediaSetOutput(
        "ri.mio.main.media-set.6c60b58f-1186-4e54-bbdd-8fe57c6b569f",
        should_snapshot=True
    ),
)
def upload_to_media_set(pdfs_dataset, pdfs_media_set):
    pdfs_media_set.put_dataset_files(
        pdfs_dataset, ignore_items_not_matching_schema=False
    )

When I try should_snapshot=False instead, I get this error:

1 Like

Update: more progress. After initializing a new transactionless media set, the incremental code was consistently failing with the “Media set output should be snapshotted” error. The fix was running the below, non-incremental code once, and then running the incremental code. I am new to incremental builds, and I don’t know why I had to run it non-incrementally first.

Up next is I’ll try to incrementally add only 100K of the input’s 4M PDFs. I’ll try an approach similar to this. Please comment if there’s any ideas here.

Non-incremental code:

from transforms.api import transform, Input  # , incremental  # configure, ComputeBackend
from transforms.mediasets import MediaSetOutput


# @configure(profile=["EXECUTOR_MEMORY_MEDIUM", "EXECUTOR_MEMORY_OFFHEAP_FRACTION_HIGH"], backend=ComputeBackend.VELOX)
# @incremental(v2_semantics=True)
@transform(
    pdfs_dataset=Input("{Test dataset with 1 PDF}"),
    pdfs_media_set=MediaSetOutput(
        "ri.mio.main.media-set.543a1b44-5041-4fa1-af79-9bcd26e20110"
        # , should_snapshot=False
    )
)
def upload_to_media_set(pdfs_dataset, pdfs_media_set):
    pdfs_media_set.put_dataset_files(
        pdfs_dataset, ignore_items_not_matching_schema=False
    )

Incremental code:

from transforms.api import transform, Input , incremental  # configure, ComputeBackend
from transforms.mediasets import MediaSetOutput


# @configure(profile=["EXECUTOR_MEMORY_MEDIUM", "EXECUTOR_MEMORY_OFFHEAP_FRACTION_HIGH"], backend=ComputeBackend.VELOX)
@incremental(v2_semantics=True)
@transform(
    pdfs_dataset=Input("{Test dataset with 10 PDFs}"),
    pdfs_media_set=MediaSetOutput(
        "ri.mio.main.media-set.543a1b44-5041-4fa1-af79-9bcd26e20110"
        , should_snapshot=False
    )
)
def upload_to_media_set(pdfs_dataset, pdfs_media_set):
    pdfs_media_set.put_dataset_files(
        pdfs_dataset, ignore_items_not_matching_schema=False
    )