Unable to Pass Dataset as Input to Parameterized Python Function UDF in Pipeline

I created a Python Function code repository and successfully published my function as a UDF for use in my Pipeline. My goal is to parameterize the function so that I can pass a dataset as an input file (I’d like to be able to use this function on several different datasets without needing to manually update the Python script). While I was able to add the UDF to the pipeline and apply a transform to the dataset, I have been unable to figure out how to pass the dataset as my input.

Here is my function

from pyspark.sql.functions import monotonically_increasing_id, last
from pyspark.sql.window import Window
from functions.api import function
from functions.sources import get_source


@function
def UnorderToOrder(input_path: str, output_path: str) -> str:
    df = get_source(input_path).read()
    df = df.withColumn("rowNumber", monotonically_increasing_id())
    window_spec = Window.orderBy("rowNumber").rowsBetween(Window.unboundedPreceding, 0)
    for col in [
        "recordIdType1", "recordIdType2", "recordIdType4", "recordIdType5",
        "recordIdType6", "recordIdType7", "recordIdType8", "recordIdType9"
    ]:
        df = df.withColumn(col, last(col, ignorenulls=True).over(window_spec))
        df.write.modeoverwrite").parquet(output_path)
("
    return output_path


For this kind of reusable workflow across datasets you need to use a transforms generator in code repository.

The way you envisioned it will not work as you can’t pass in a complete dataset into a UDF as a UDF is usually getting single rows of one dataset as input.

With a transforms generator you can parameterize code that acts on a complete dataset, just like to the code that you have shared in your UDF.

Thanks for the information on UDFs and how they are used for single rows. I’ve updated my script to use Transforms within a Python Transforms Code Repository.

However, I am struggling to figure out how to use this script from my pipeline. Unlike the UDF, I don’t see a publish option. In the past when I have used the Transforms, I’ve built individual datasets where I specify the input (via RID or path) and output. Since I am not specifying an input (because I am trying to pass that as a parameter via my Pipeline), I am not sure how to proceed.

You won’t be able to call this from your pipeline builder. Did you look for transform generator pattern in the docs or by leveraging AIP Assist?

You Essentially pass a list of input and output paths to your generator and during checks all of those datasets will be created automatically.