How do I open a dataset in Code repo from the RID stored in the dataset?

I would like to get the maximum and minimum values for a specific column of each dataset in code repo.
I have a dataset with the RID and PATH of the target dataset, and I would like to loop through it to get the data.
It stops at the realization of the following loop part.
How can this be implemented?

from pyspark.sql import functions as F
from transforms.api import Input, Output, transform

@transform(
    input_dataset=Input("{tareget_input_dataset}"),
    output_dataset=Output("{output_dataset}"),
)
def compute(ctx, input_dataset, output_dataset):
    df = input_dataset.dataframe()

    for row in df.collect():
        # Dataset contains RID and path of target data
        dataset_rid = row["RID"]

        # I want to open a dataset using path or rid 
        # and get the maximum and minimum values for a specific column

Hi, as far as I know, Code Repo does not provide a first-class way to dynamically define data transforms by reading about the dataset rids from input datasets. (The CI job pre-determines all relations between inputs and outputs)

However, you can achieve the behavior you described – looping through a list of datasets and apply logics to them – by first statically defining the input and output path in the code file like below

def generate_transform(data_name, input_dataset_path_or_rid, output_dataset_path_or_rid):
    """Generate transform base on pre-defined list of input paths

        data_name: optional human readable name for the data
        input_dataset_path_or_rid: path or rid of your input dataset
        output_dataset_path_or_rid: path or rid of your output dataset
    """
    @transform(
        input_dataset=Input(f"{input_dataset_path_or_rid}"),
        output_dataset=Output(f"{output_dataset_path_or_rid}"),
    )
    def compute(ctx, input_dataset, output_dataset):
        # do something
    
    # Totally optional but Code Repo Preview button will show a list of user friendly data name here
    compute.__name__ = f"compute_{data_name}"
    return compute


TRANSFORMS = [
    generate_transform(data_name, input_dataset_path_or_rid, output_dataset_path_or_rid)
    for data_name, input_dataset_path_or_rid, output_dataset_path_or_rid in [
        ("my notebook data", "input_dataset_path_1", "output_dataset_path_1"),
        ("my flight data", "input_dataset_path_2", "output_dataset_path_2"),
        ("furniture data", "input_dataset_path_3", "output_dataset_path_3"),
        ("animals data", "input_dataset_path_4", "output_dataset_path_4"),
        # ...
    ]
]

Also this documentation might be helpful!

Thank you for your advice.
It seems that all outputs need to be changed, but is it possible to consolidate them into one output?

@rkatsuk Oh, if you’re looking to process multiple inputs and merge them into a single output, you might be able to try something like below – the following is an example of taking a list of dataset paths or resource identifiers (RIDs), which you can define beforehand.
These are then fed into a compute function as a list of Foundry datasets (in this case *inputs). Inside the compute function, you can iterate over each dataset, apply any transformations you need, and then combine them in the way that best suits your project. Hope this helps!

from transforms.api import Input, Output, transform_df
from transforms.verbs.dataframes import union_many


def do_something(df):
    return df

inputs = [
    "input_dataset_path_or_rid1",
    "input_dataset_path_or_rid2",
    "input_dataset_path_or_rid3",
    "input_dataset_path_or_rid4",
    "input_dataset_path_or_rid6",
    "input_dataset_path_or_rid7",
    "input_dataset_path_or_rid8",
    # ... please define all the input rid / path here
]


@transform_df(
    Output("output_dataset_path_or_rid"),
    *(Input(input_name) for input_name in inputs)
)
def compute(*inputs):
    # do something with the input datasets
    results = []
    for input_dataset in inputs:
        results.append(do_something(input_dataset))
    return union_many(results)

Hi, @yushi

The code you provided gives the following error:

Internal Error: TypeError: transform_df() takes 1 positional argument but 8 were given

Is there any way to improve it?

@rkatsuk oh, right, it seems that transform_df is expecting keyword arguments, so we could pass in the dataset rids as a dict!

from transforms.api import Input, Output, transform_df
from transforms.verbs.dataframes import union_many


def do_something(df):
    return df


inputs = {
    "input1": "input_dataset_path_or_rid1",
    "input2": "input_dataset_path_or_rid2",
    "input3": "input_dataset_path_or_rid3",
    "input4": "input_dataset_path_or_rid4",
    "input5": "input_dataset_path_or_rid6",
    "input6": "input_dataset_path_or_rid7",
    "input7": "input_dataset_path_or_rid8",
    # ... please define all the input rid / path here
}


@transform_df(
    Output("output_dataset_path_or_rid"),
    **{input_name: Input(input_path) for input_name, input_path in inputs.items()}
)
def compute(**inputs):
    # do something with the input datasets
    results = []
    for input_name, input_dataset in inputs.items():
        results.append(do_something(input_dataset))
    return union_many(results)

I was able to achieve what I wanted by implementing the sample you provided, thank you very much.

1 Like