configuration = [
{
"datasets": {
"dataset-1": Input("/folder/myfolder/dataset_1")
"dataset-2": Input("/folder/myfolder/dataset_2")
},
"snapshot_inputs": ["dataset-1"],
"output": "/folder/myfolder/output_1"
},
{
"datasets": {
"dataset-1": Input("/folder/myOtherfolder/dataset_1")
"dataset-2": Input("/folder/myOtherfolder/dataset_2")
},
"snapshot_inputs": ["dataset-1"],
"output": "/folder/myOtherfolder/output_1"
}
# , ...
]
# Loop over the configuration to generate the transforms
def create_transforms():
transforms = []
for args_dict in configuration:
transforms.append(generate_transform(args_dict))
return transforms
# Generate one transform
def generate_transform(args_dict):
# Access the configurations
spark_profile = args_dict["spark_profile"] if "spark_profile" in args_dict else None
semantic_version = args_dict["semantic_version"] if "semantic_version" in args_dict else 1
snapshot_inputs = args_dict["snapshot_inputs"] if "snapshot_inputs" in args_dict else []
# Fetch required inputs (data, metadata)
inputs = args_dict["datasets"]
output = args_dict["output"]
@configure(profile=spark_profile)
@incremental(semantic_version=semantic_version, snapshot_inputs=snapshot_inputs)
@transform(
output_dataset=Output(output["output"]),
** inputs
)
def my_transform(ctx, output_dataset, **others_datasets):
logging.info(f"My Transform is running as Incremental : {ctx.is_incremental}")
# Convert to dataframes
inputs_dataframes = {key: curr_dataset.dataframe() for key, curr_dataset in others_datasets.items()}
# Process those N datasets, e.g. union them all together
all_unioned = union_inputs(inputs_dataframes)
# [...] rest of the processing
return my_transform
In your case, given you want to pass a parameter which is going to be use in the transform, I believe there are 2 options:
[NOT WORKING] Or you can directly use it (you don’t need to pass it via parameter, as the context of the function will hold it) but I don’t think this will work properly, as on each iteration the variable might change and so not behave as you expect
[WORKING] Or you can pass it as a default argument - Given the @transform decorator will fill in the other parameters, it will be able to ignore the additional my_other_param, and so you will be able to pass it without error (like my_other_param is not a dataframe or my_other_param not found)
hey @ZygD I see you’re trying to pass a custom parameter to your transforms.
The class you’re using Param is not meant for that use, instead you should use the StringParam class for this. Here’s an example of using multiple of these custom parameters
Thank you @VincentF for the answer and a broader algorithm for generating transforms.
BTW, is there a version of the script which would have multiple inputs and multiple outputs?
Only the last suggestion works (providing a default argument).
As you suspected, without providing default argument, only the last element of the loop is sent into every transform, which is not expected.
And then somehow in your transform, you need to list which are inputs and which are outputs:
# Note: all_inputs_and_outputs and inputs_and_outputs don't need to have the same name, because the @transforms decorator is passing all the inputs "again"
@transform(
** inputs_and_outputs
)
def my_transform(ctx, **all_inputs_and_outputs):
# Split the inputs and the outputs
inputs_datasets = {k: v for k, v in original_dict.items() if 'input' in k} # This will match with "input-dataset-1"
outputs_datasets = {k: v for k, v in original_dict.items() if 'output' in k} # This will match "output-dataset-2"
# Convert to dataframes
inputs_dfs = {key: inputs_datasets.dataframe() for key, curr_dataset in all_inputs_and_outputs.items()}
# [...] rest of the processing.
# You can access an input dataframe by inputs_dfs["input-dataset-1"]
# You can write to an output by outputs_datasets["output-dataset-2"].write_dataframe(...)
return my_transform