I created a Python Function code repository and successfully published my function as a UDF for use in my Pipeline. My goal is to parameterize the function so that I can pass a dataset as an input file (I’d like to be able to use this function on several different datasets without needing to manually update the Python script). While I was able to add the UDF to the pipeline and apply a transform to the dataset, I have been unable to figure out how to pass the dataset as my input.
Here is my function
from pyspark.sql.functions import monotonically_increasing_id, last
from pyspark.sql.window import Window
from functions.api import function
from functions.sources import get_source
@function
def UnorderToOrder(input_path: str, output_path: str) -> str:
df = get_source(input_path).read()
df = df.withColumn("rowNumber", monotonically_increasing_id())
window_spec = Window.orderBy("rowNumber").rowsBetween(Window.unboundedPreceding, 0)
for col in [
"recordIdType1", "recordIdType2", "recordIdType4", "recordIdType5",
"recordIdType6", "recordIdType7", "recordIdType8", "recordIdType9"
]:
df = df.withColumn(col, last(col, ignorenulls=True).over(window_spec))
df.write.modeoverwrite").parquet(output_path)
("
return output_path