Here is an example where you can pass parameters.
configuration = [
{
"datasets": {
"dataset-1": Input("/folder/myfolder/dataset_1")
"dataset-2": Input("/folder/myfolder/dataset_2")
},
"snapshot_inputs": ["dataset-1"],
"output": "/folder/myfolder/output_1"
},
{
"datasets": {
"dataset-1": Input("/folder/myOtherfolder/dataset_1")
"dataset-2": Input("/folder/myOtherfolder/dataset_2")
},
"snapshot_inputs": ["dataset-1"],
"output": "/folder/myOtherfolder/output_1"
}
# , ...
]
# Loop over the configuration to generate the transforms
def create_transforms():
transforms = []
for args_dict in configuration:
transforms.append(generate_transform(args_dict))
return transforms
# Generate one transform
def generate_transform(args_dict):
# Access the configurations
spark_profile = args_dict["spark_profile"] if "spark_profile" in args_dict else None
semantic_version = args_dict["semantic_version"] if "semantic_version" in args_dict else 1
snapshot_inputs = args_dict["snapshot_inputs"] if "snapshot_inputs" in args_dict else []
# Fetch required inputs (data, metadata)
inputs = args_dict["datasets"]
output = args_dict["output"]
@configure(profile=spark_profile)
@incremental(semantic_version=semantic_version, snapshot_inputs=snapshot_inputs)
@transform(
output_dataset=Output(output["output"]),
** inputs
)
def my_transform(ctx, output_dataset, **others_datasets):
logging.info(f"My Transform is running as Incremental : {ctx.is_incremental}")
# Convert to dataframes
inputs_dataframes = {key: curr_dataset.dataframe() for key, curr_dataset in others_datasets.items()}
# Process those N datasets, e.g. union them all together
all_unioned = union_inputs(inputs_dataframes)
# [...] rest of the processing
return my_transform
In your case, given you want to pass a parameter which is going to be use in the transform, I believe there are 2 options:
- [NOT WORKING] Or you can directly use it (you don’t need to pass it via parameter, as the context of the function will hold it) but I don’t think this will work properly, as on each iteration the variable might change and so not behave as you expect
- [WORKING] Or you can pass it as a default argument - Given the
@transform
decorator will fill in the other parameters, it will be able to ignore the additionalmy_other_param
, and so you will be able to pass it without error (likemy_other_param is not a dataframe
ormy_other_param not found
)
my_other_param = 42
@transform(
output_dataset=Output(output["output"]),
** inputs
)
def my_transform(ctx, output_dataset, **others_datasets, my_other_param=my_other_param):
logging.info(f"My other parameter : { my_other_param}")
So in your case, this might work:
@transform(
out=Output(rid_out),
inp=Input(rid_in)
)
def compute(out, inp, col=col):
out.write_dataframe(inp.dataframe().selectExpr(f"1 {col}"))