@f04ffed88a2f6b971467 I don’t think there’s any way of passing the inputs to the transform generator as an argument. The pipeline documentation makes clear that the inputs are set at CI time, and that’s what generates the JobSpec, which is then static.
But…
There is a way of doing what I think you want, described below, but use with extreme caution, as it:
- Requires you repo is Project Scope exempted
- Lineage etc. won’t work (requires a correct JobSpec), so you can’t see all the datsets that went into the output, and neither can Foundry
- Can’t manage Markings, and there might be other weirdness with permissions, like any schedule will have to run in a user’s name.
- Unexpected build times as you’re unioning a potentially uncontrolled number of datasets.
In this example we’ll union two simple datasets, created in this first transform, and their rids stored in a third dataset also created in this first transform (yours are coming from an object materialisation I believe).
from transforms.api import transform, Output
@transform(
out_1=Output("ri.foundry.main.dataset.81481f02-caf6-4a60-afad-4eeed4288f68"),
out_2=Output("ri.foundry.main.dataset.cb99ee09-c0be-42bb-a7a6-962f7003da4e"),
out_3=Output("ri.foundry.main.dataset.66816452-19f1-4e02-b949-822e4f045ba8")
)
def compute(ctx, out_1, out_2, out_3):
data_1 = [(1, 2),
(2, 5),
(3, None),
(4, 6)]
data_2 = [(5, 2),
(6, 5),
(7, None),
(8, 6)]
schema = "id: string, data: int"
df1 = ctx.spark_session.createDataFrame(data_1, schema)
out_1.write_dataframe(df1)
df2 = ctx.spark_session.createDataFrame(data_2, schema)
out_2.write_dataframe(df2)
# RIDs in dataset
rids = [("ri.foundry.main.dataset.81481f02-caf6-4a60-afad-4eeed4288f68", ),
("ri.foundry.main.dataset.cb99ee09-c0be-42bb-a7a6-962f7003da4e", )]
rids_df = ctx.spark_session.createDataFrame(rids, "rids: string")
out_3.write_dataframe(rids_df)
return None
Then you can use the spark context and the foundry connection to load the dataframes based on their rids. For this to work the repo has to be project scope exempted - which might be a decision that’s down to your organisation if that’s allowed or not.
from pyspark.sql import DataFrame
from transforms.api import transform_df, Input, Output
from functools import reduce
@transform_df(
Output("ri.foundry.main.dataset.859fea16-2496-4123-bda7-f1ee8786e305"),
rids=Input("ri.foundry.main.dataset.66816452-19f1-4e02-b949-822e4f045ba8"),
)
def compute(ctx, rids):
# Get the RIDs locally as a list
rids_local = [_["rids"] for _ in rids.collect()]
# load the dataframes into a list
# using the spark context
dataframes = [ctx._foundry.input(rid=_,
branch="master"
).dataframe() for _ in rids_local]
# Union all the inputs, as a reduce function
# as we don't know the number of inputs.
unioned = reduce(DataFrame.unionByName, dataframes)
return unioned
The result is then:
id | data |
---|---|
1 | 2 |
2 | 5 |
3 | null |
4 | 6 |
5 | 2 |
6 | 5 |
7 | null |
8 | 6 |