Transforms generator providing additional parameter for every transformation

I was trying to use Param class to provide additional parameter for a transformation. But it bumps into a seemingly incorrect error message:

TypeError: transform_generator..compute() missing 1 required positional argument: ‘col’

How to correctly provide a parameter for every transformation when using transforms generator?

The current code:

from transforms.api import transform, Input, Output, Param


def transform_generator(rids):
    transforms = []
    for rid_in, rid_out, col in rids:
        @transform(
            out=Output(rid_out),
            inp=Input(rid_in),
            col=Param(col))
        def compute(out, inp, col):
            out.write_dataframe(inp.dataframe().selectExpr(f"1 {col}"))
        transforms.append(compute)
    return transforms


TRANSFORMS = transform_generator([
    # input, output, col
    # dataset_1
    ('ri.foundry.main.dataset.4bc24c69-63fc-4dfb-8e0c-394c816af76b', 'ri.foundry.main.dataset.77ae201f-fe25-4d71-b92f-ac701581d411', 'col_a'),
    # dataset_2
    ('ri.foundry.main.dataset.180dfc91-9b7a-4b15-ae1d-4292692faafd', 'ri.foundry.main.dataset.c7700301-84c9-4963-9eea-b5c47dfc2ef6', 'col_c'),
])

Here is an example where you can pass parameters.

configuration = [
    {
        "datasets": {
            "dataset-1": Input("/folder/myfolder/dataset_1")
            "dataset-2": Input("/folder/myfolder/dataset_2")
        },
        "snapshot_inputs": ["dataset-1"],
        "output": "/folder/myfolder/output_1"
    },
    {
        "datasets": {
            "dataset-1": Input("/folder/myOtherfolder/dataset_1")
            "dataset-2": Input("/folder/myOtherfolder/dataset_2")
        },
        "snapshot_inputs": ["dataset-1"],
        "output": "/folder/myOtherfolder/output_1"
    }
    # , ...
]

# Loop over the configuration to generate the transforms
def create_transforms():
    transforms = []
    for args_dict in configuration:
        transforms.append(generate_transform(args_dict))
    return transforms

# Generate one transform
def generate_transform(args_dict):
   # Access the configurations
    spark_profile = args_dict["spark_profile"] if "spark_profile" in args_dict else None
    semantic_version = args_dict["semantic_version"] if "semantic_version" in args_dict else 1
    snapshot_inputs = args_dict["snapshot_inputs"] if "snapshot_inputs" in args_dict else []

    # Fetch required inputs (data, metadata)
    inputs = args_dict["datasets"]
    output = args_dict["output"]

    @configure(profile=spark_profile)
    @incremental(semantic_version=semantic_version, snapshot_inputs=snapshot_inputs)
    @transform(
        output_dataset=Output(output["output"]),
        ** inputs
    )
    def my_transform(ctx, output_dataset, **others_datasets):
        logging.info(f"My Transform is running as Incremental : {ctx.is_incremental}")

        # Convert to dataframes
        inputs_dataframes = {key: curr_dataset.dataframe() for key, curr_dataset in others_datasets.items()}

        # Process those N datasets, e.g. union them all together
        all_unioned = union_inputs(inputs_dataframes)

        # [...] rest of the processing

    return my_transform

In your case, given you want to pass a parameter which is going to be use in the transform, I believe there are 2 options:

  • [NOT WORKING] Or you can directly use it (you don’t need to pass it via parameter, as the context of the function will hold it) but I don’t think this will work properly, as on each iteration the variable might change and so not behave as you expect
  • [WORKING] Or you can pass it as a default argument - Given the @transform decorator will fill in the other parameters, it will be able to ignore the additional my_other_param, and so you will be able to pass it without error (like my_other_param is not a dataframe or my_other_param not found)
    my_other_param = 42

    @transform(
        output_dataset=Output(output["output"]),
        ** inputs
    )
    def my_transform(ctx, output_dataset, **others_datasets, my_other_param=my_other_param):
        logging.info(f"My other parameter : { my_other_param}")

So in your case, this might work:

        @transform(
            out=Output(rid_out),
            inp=Input(rid_in)
        )
        def compute(out, inp, col=col):
            out.write_dataframe(inp.dataframe().selectExpr(f"1 {col}"))
1 Like

hey @ZygD I see you’re trying to pass a custom parameter to your transforms.

The class you’re using Param is not meant for that use, instead you should use the StringParam class for this. Here’s an example of using multiple of these custom parameters

from transforms.api import transform, Output, StringParam, BooleanParam, IntegerParam, FloatParam

from myproject.datasets import utils


@transform(
    dummy_data=Output('...'),
    name=StringParam('A', description="A string parameter", allowed_values={"A", "BC"}),
    isValid=BooleanParam(True, description="a boolean parameter"),
    count=IntegerParam(5),
    Rate=FloatParam(2.5, description="A float parameter")
)
1 Like

Thank you @VincentF for the answer and a broader algorithm for generating transforms.
BTW, is there a version of the script which would have multiple inputs and multiple outputs?

Only the last suggestion works (providing a default argument).

As you suspected, without providing default argument, only the last element of the loop is sent into every transform, which is not expected.

The example above is quite generic, but it’s not specific to inputs and outputs.
The approach remains the same:

        "datasets": {
            "input-dataset-1": Input("/folder/myfolder/dataset_1"),
            "output-dataset-2": Output("/folder/myfolder/dataset_2")
        },

And then somehow in your transform, you need to list which are inputs and which are outputs:

    # Note: all_inputs_and_outputs and inputs_and_outputs don't need to have the same name, because the @transforms decorator is passing all the inputs "again"
    @transform(
        ** inputs_and_outputs
    )
    def my_transform(ctx, **all_inputs_and_outputs):

        # Split the inputs and the outputs
         inputs_datasets = {k: v for k, v in original_dict.items() if 'input' in k} # This will match with "input-dataset-1"
         outputs_datasets = {k: v for k, v in original_dict.items() if 'output' in k} # This will match "output-dataset-2"

        # Convert to dataframes
        inputs_dfs = {key: inputs_datasets.dataframe() for key, curr_dataset in all_inputs_and_outputs.items()}
        
        # [...] rest of the processing. 
        # You can access an input dataframe by inputs_dfs["input-dataset-1"]
        # You can write to an output by outputs_datasets["output-dataset-2"].write_dataframe(...)

    return my_transform

The following versions work.

  1. Providing a default parameter inside compute function

    from transforms.api import transform, Input, Output
    
    
    def transform_logic(out, inp, col):
        out.write_dataframe(inp.dataframe().selectExpr(f"1 {col}"))
    
    
    def transform_generator(rids):
        transforms = []
        for rid_in, rid_out, col in rids:
            @transform(
                out=Output(rid_out),
                inp=Input(rid_in))
            def compute(out, inp, col=col):
                transform_logic(out, inp, col)
            transforms.append(compute)
        return transforms
    
    
    TRANSFORMS = transform_generator([
        # input, output, col
        # dataset_1
        ('ri.foundry.main.dataset.4bc24c69-63fc-4dfb-8e0c-394c816af76b', 'ri.foundry.main.dataset.77ae201f-fe25-4d71-b92f-ac701581d411', 'col_a'),
        # dataset_2
        ('ri.foundry.main.dataset.180dfc91-9b7a-4b15-ae1d-4292692faafd', 'ri.foundry.main.dataset.c7700301-84c9-4963-9eea-b5c47dfc2ef6', 'col_c'),
    ])
    
  2. Using StringParam

    from transforms.api import transform, Input, Output, StringParam
    
    
    def transform_logic(out, inp, col):
        out.write_dataframe(inp.dataframe().selectExpr(f"1 {col}"))
    
    
    def transform_generator(rids):
        transforms = []
        for rid_in, rid_out, col in rids:
            @transform(
                out=Output(rid_out),
                inp=Input(rid_in),
                col=StringParam(col))
            def compute(out, inp, col):
                transform_logic(out, inp, col.value)
            transforms.append(compute)
        return transforms
    
    
    TRANSFORMS = transform_generator([
        # input, output, col
        # dataset_1
        ('ri.foundry.main.dataset.4bc24c69-63fc-4dfb-8e0c-394c816af76b', 'ri.foundry.main.dataset.77ae201f-fe25-4d71-b92f-ac701581d411', 'col_a'),
        # dataset_2
        ('ri.foundry.main.dataset.180dfc91-9b7a-4b15-ae1d-4292692faafd', 'ri.foundry.main.dataset.c7700301-84c9-4963-9eea-b5c47dfc2ef6', 'col_c'),
    ])