Access data in the global scope of a PySpark repository

Is it possible to access data from a materialized dataset in the global scope of a PySpark repository without using @transform_df or @transform decorators?

What do you mean with global scope?

from transforms.api import transform_df, Input, Output

global_variable = None  # Define global variable

@transform_df(
    Output("/path/to/output/dataset"),
    df=Input("/path/to/input/dataset"),
)
def my_transform(df):
    global global_variable  # Use global keyword

    # Store DataFrame in a global variable
    global_variable = df

    return df

I mean to use data like global var but without decorators

No, I don’t think that’s possible, because it’s the @transform decorator that wraps the function to make the link to the data on the platform.

Perhaps if you describe what you’re trying to achieve with having that data outside the transform, we could help with your use case?

I have list of strings in ontology object property. I want to pass it to tranform generator as an argument.

1 Like

@f04ffed88a2f6b971467 I don’t think there’s any way of passing the inputs to the transform generator as an argument. The pipeline documentation makes clear that the inputs are set at CI time, and that’s what generates the JobSpec, which is then static.

But…

There is a way of doing what I think you want, described below, but use with extreme caution, as it:

  • Requires you repo is Project Scope exempted
  • Lineage etc. won’t work (requires a correct JobSpec), so you can’t see all the datsets that went into the output, and neither can Foundry
  • Can’t manage Markings, and there might be other weirdness with permissions, like any schedule will have to run in a user’s name.
  • Unexpected build times as you’re unioning a potentially uncontrolled number of datasets.

In this example we’ll union two simple datasets, created in this first transform, and their rids stored in a third dataset also created in this first transform (yours are coming from an object materialisation I believe).

from transforms.api import transform, Output


@transform(
    out_1=Output("ri.foundry.main.dataset.81481f02-caf6-4a60-afad-4eeed4288f68"),
    out_2=Output("ri.foundry.main.dataset.cb99ee09-c0be-42bb-a7a6-962f7003da4e"),
    out_3=Output("ri.foundry.main.dataset.66816452-19f1-4e02-b949-822e4f045ba8")
    )
def compute(ctx, out_1, out_2, out_3):

    data_1 = [(1, 2),
              (2, 5),
              (3, None),
              (4, 6)]

    data_2 = [(5, 2),
              (6, 5),
              (7, None),
              (8, 6)]

    schema = "id: string, data: int"

    df1 = ctx.spark_session.createDataFrame(data_1, schema)

    out_1.write_dataframe(df1)

    df2 = ctx.spark_session.createDataFrame(data_2, schema)

    out_2.write_dataframe(df2)

    # RIDs in dataset
    rids = [("ri.foundry.main.dataset.81481f02-caf6-4a60-afad-4eeed4288f68", ),
            ("ri.foundry.main.dataset.cb99ee09-c0be-42bb-a7a6-962f7003da4e", )]

    rids_df = ctx.spark_session.createDataFrame(rids, "rids: string")

    out_3.write_dataframe(rids_df)

    return None

Then you can use the spark context and the foundry connection to load the dataframes based on their rids. For this to work the repo has to be project scope exempted - which might be a decision that’s down to your organisation if that’s allowed or not.

from pyspark.sql import DataFrame
from transforms.api import transform_df, Input, Output
from functools import reduce


@transform_df(
    Output("ri.foundry.main.dataset.859fea16-2496-4123-bda7-f1ee8786e305"),
    rids=Input("ri.foundry.main.dataset.66816452-19f1-4e02-b949-822e4f045ba8"),
)
def compute(ctx, rids):

    # Get the RIDs locally as a list
    rids_local = [_["rids"] for _ in rids.collect()]

    # load the dataframes into a list
    # using the spark context
    dataframes = [ctx._foundry.input(rid=_,
                                     branch="master"
                                     ).dataframe() for _ in rids_local]

    # Union all the inputs, as a reduce function
    # as we don't know the number of inputs.
    unioned = reduce(DataFrame.unionByName, dataframes)

    return unioned

The result is then:

id data
1 2
2 5
3 null
4 6
5 2
6 5
7 null
8 6
1 Like