We’ve tried creating a code repo to take a snapshot of data using an output from pipeline builder, which is placed in a particular folder and given a dynamic filename “snapshot” concatenated with yyyy/mm/dd. The initial build was successful and the snapshot is present in the folder as expected. Please see the code below (I’ve anonymised certain bits using XXXX)…
import polars as pl
from datetime import datetime
from transforms.api import transform, lightweight, Input, LightweightInput, Output, LightweightOutput
@lightweight
@transform(
equipment_catalogue=Input(“ri.foundry.main.dataset.XXXX”),
assets_snapshot=Output(f"/some/path/snapshot\_{datetime.today().strftime(‘%Y%m%d’)}")
)
def compute(equipment_catalogue: LightweightInput, assets_snapshot: LightweightOutput):
# Load the input dataset as a Polars DataFrame
df = equipment_catalogue.polars()
# Write the DataFrame to the dynamically constructed output dataset
assets_snapshot.write_table(df)
However, I created a schedule against the newly created snapshot in Data Lineage so that it would regularly build on the last day of every month at 11PM. It failed on Sunday evening, please see the AIP log explanation below (XXXX has been used to replace IDs etc)…
“The error indicates a KeyError when trying to retrieve a specific container transform from the self._container_transforms_compute_functions dictionary. The key 'myproject.datasets.lightweight-transform:compute:XXXX' was not found in the dictionary. This likely occurs because the container transform referenced was not properly registered in the pipeline or the key being used is incorrect.“
Could someone please help me understand what’s gone wrong here?
Hi, you can’t dynamically create a dataset at run time from foundry. The output url in your example gets resolved at CI/CD time and will not regenerate a dataset every build. For your pattern you might consider some alternative such as:
Keep a changelog of your dataset rather than distinct copies (primary key, updated at, deleted, etc)
Store a dataset of zipped snapshots of your dataset
Many thanks for your prompt, clear reply. I also appreciate the suggested alternatives too.
To give you some more context, the purpose of these snapshot datasets are to capture the state of records in a table at the end of every month so certain metrics can be calculated for KPIs etc. One example is the number of items in a given inventory, the inventory size variation is tracked across the year on a monthly basis and these snapshots allow us to do this. The intent was to let these datasets accumulate (as we might need to use them retrospectively or combine a few to produce quarterly figures) and trash them when older ones are no longer required.
This original method was suggested to us by a Foundry expert, and in their initial code the dynamic file name didn’t include the day element (it was just yyyy/mm). From what you’ve said it sounds like that would still have failed anyway?
Now that I’ve added a little more context, could you please suggest which method would you recommend as a best fit for our purpose?
Incrementally append to the initial monthly snapshot of the dataset you created in /snapshots. This way, you’d have a single dataset that grows once per month and you could reference a row across different snapshots with your snapshotted_at yyyy/mm column.
Materialise the downstream objectset at the end of every month, producing a standalone dataset. You could set up an automation that does this.
Hope it leads to something! I once had a use-case where I incrementally added the materialised dataset to a dataset that backs another object type. I then used that object type to let users compare equipment stock levels in the current month to stock levels in any previous month. Same could be done for a forecast of stock levels that takes planned purchases into account
We’ve decided to use the below option as it works for our needs…
Incrementally append to the initial monthly snapshot of the dataset you created in /snapshots. This way, you’d have a single dataset that grows once per month and you could reference a row across different snapshots with your snapshotted_at yyyy/mm column.