Can I read/write to my output with lightweight transforms?

I would like to read and write to my output in a given transforms in lightweight, how can I do so ?

Here are a few examples to read and write to an output in a lightweight transforms, being it tabular data or directly files.

In the simplest example, you can read the output as a tabular dataframe, operate on it, and write a new version (here I’m replacing the whole output, but it could be an append if needs be).

import polars as pl
from transforms.api import transform, lightweight, Output, Input, LightweightOutput, LightweightInput, incremental
import logging

@lightweight()
@incremental()
@transform(
    output_dataset=Output("/path/tabular_incremental_state")
)
def compute_1(ctx, output_dataset: LightweightOutput): 
    if ctx.is_incremental:
        curr_output = output_dataset.polars(lazy=False)
        # Increment the "state" column by 1
        curr_output = curr_output.with_columns(
            (pl.col("state") + 1).alias("state")
        )
        output_dataset.set_mode("replace")
        output_dataset.write_table(curr_output)

    else:
        # on first run, save something on the output
        default_df = pl.DataFrame({"state": [0]})
        output_dataset.write_table(default_df)

We can do the same but at a file level. We can write to a file (if prepended by an underscore, it will be a hidden file that doesn’t participate to the actual view of the dataset), and read from a file from an output dataset.

import polars as pl
from transforms.api import transform, lightweight, Output, Input, LightweightOutput, LightweightInput, incremental
import logging
import json

@lightweight()
@incremental()
@transform(
    output_dataset=Output("/path/tabular_incremental_state_filebased")
)
def compute_1(ctx, output_dataset: LightweightOutput): 
    state_filename = "_state.json" # Prepend with "_" to consider it a hidden file and not show it in the dataset preview.
    state = {"last_seen" : 0} # Some arbitrary starting state

    if ctx.is_incremental:
        with output_dataset.filesystem().open(state_filename, mode='r') as state_file:
            data = json.load(state_file)
            # Validate the fetched state:
            state = data
            logging.info(f"state file found, continuing from : {data}")
        state["last_seen"] = 1 + state["last_seen"]

    # Save the new state on the output dataset:
    with output_dataset.filesystem().open(state_filename, "w") as state_file:
        json.dump(state, state_file)

And here, we can mix both approach, where we write tabular data to the output, and we as well read and write a hidden file to store the state (for next run, for example, particularly useful for external transforms)

import polars as pl
from transforms.api import transform, lightweight, Output, Input, LightweightOutput, LightweightInput, incremental
import logging
import json

@lightweight()
@incremental()
@transform(
    output_dataset=Output("/path/tabular_incremental_state_filebased_with_table")
)
def compute_1(ctx, output_dataset: LightweightOutput): 
    state_filename = "_state.json" # Prepend with "_" to consider it a hidden file and not show it in the dataset preview.
    state = {"last_seen" : 0} # Some arbitrary starting state

    if ctx.is_incremental:
        with output_dataset.filesystem().open(state_filename, mode='r') as state_file:
            data = json.load(state_file)
            # Validate the fetched state:
            state = data
            logging.info(f"state file found, continuing from : {data}")
        state["last_seen"] = 1 + state["last_seen"]

    # Save the new state on the output dataset:
    with output_dataset.filesystem().open(state_filename, "w") as state_file:
        json.dump(state, state_file)

    default_df = pl.DataFrame({"some_col": ["some_data", "look at the hidden file !"]})
    output_dataset.write_table(default_df)

Those examples are lightweight equivalent of https://www.palantir.com/docs/foundry/data-integration/external-transforms#incremental-processing

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.