Transforms generator providing additional parameter for every transformation

Here is an example where you can pass parameters.

configuration = [
    {
        "datasets": {
            "dataset-1": Input("/folder/myfolder/dataset_1")
            "dataset-2": Input("/folder/myfolder/dataset_2")
        },
        "snapshot_inputs": ["dataset-1"],
        "output": "/folder/myfolder/output_1"
    },
    {
        "datasets": {
            "dataset-1": Input("/folder/myOtherfolder/dataset_1")
            "dataset-2": Input("/folder/myOtherfolder/dataset_2")
        },
        "snapshot_inputs": ["dataset-1"],
        "output": "/folder/myOtherfolder/output_1"
    }
    # , ...
]

# Loop over the configuration to generate the transforms
def create_transforms():
    transforms = []
    for args_dict in configuration:
        transforms.append(generate_transform(args_dict))
    return transforms

# Generate one transform
def generate_transform(args_dict):
   # Access the configurations
    spark_profile = args_dict["spark_profile"] if "spark_profile" in args_dict else None
    semantic_version = args_dict["semantic_version"] if "semantic_version" in args_dict else 1
    snapshot_inputs = args_dict["snapshot_inputs"] if "snapshot_inputs" in args_dict else []

    # Fetch required inputs (data, metadata)
    inputs = args_dict["datasets"]
    output = args_dict["output"]

    @configure(profile=spark_profile)
    @incremental(semantic_version=semantic_version, snapshot_inputs=snapshot_inputs)
    @transform(
        output_dataset=Output(output["output"]),
        ** inputs
    )
    def my_transform(ctx, output_dataset, **others_datasets):
        logging.info(f"My Transform is running as Incremental : {ctx.is_incremental}")

        # Convert to dataframes
        inputs_dataframes = {key: curr_dataset.dataframe() for key, curr_dataset in others_datasets.items()}

        # Process those N datasets, e.g. union them all together
        all_unioned = union_inputs(inputs_dataframes)

        # [...] rest of the processing

    return my_transform

In your case, given you want to pass a parameter which is going to be use in the transform, I believe there are 2 options:

  • [NOT WORKING] Or you can directly use it (you don’t need to pass it via parameter, as the context of the function will hold it) but I don’t think this will work properly, as on each iteration the variable might change and so not behave as you expect
  • [WORKING] Or you can pass it as a default argument - Given the @transform decorator will fill in the other parameters, it will be able to ignore the additional my_other_param, and so you will be able to pass it without error (like my_other_param is not a dataframe or my_other_param not found)
    my_other_param = 42

    @transform(
        output_dataset=Output(output["output"]),
        ** inputs
    )
    def my_transform(ctx, output_dataset, **others_datasets, my_other_param=my_other_param):
        logging.info(f"My other parameter : { my_other_param}")

So in your case, this might work:

        @transform(
            out=Output(rid_out),
            inp=Input(rid_in)
        )
        def compute(out, inp, col=col):
            out.write_dataframe(inp.dataframe().selectExpr(f"1 {col}"))
1 Like