API error when trying to dynamically define transform profiles

Hello Community team,

We have a use case for optimizing our resource costs where we want to dynamically define the profiles to be used during the build of our transform according to the number of partitions in a dataset:

To do this, as a test, we have defined 2 simple functions before the transform:

getDatasetFilesStats(), which calculates the number of partitions in the table via an api call and (see code below)

choose_profile(), which defines the profile to be used according to the number of partitions, and which we pass as a configure parameter. (see code below)

Everything works fine in preview mode. However, when we launch the build, we get a host connection error : Failed to create connection to host, This could be from the device being busy or an error in name resolution. Please retry

has anyone came across a similar use case/issue? I can only imagine that it’s because the call API is made before the transform (because by moving the same call api in the transform the build works) and that something is missing in relation to the host, but we don’t know what.

Your help in solving this issue or giving us some other ideas to achieve this use case will be greatly appreciate.

Best,
Wilfried

def getDatasetFilesStats():

    token = token_value

    response = requests.get(
   
        url = "host/api/v1/datasets/{}/files".format(datasetRid),
        headers={
            'content-type': 'application/json',
            'Authorization': 'Bearer ' + token
        },
        data=json.dumps({
            "datasetRid": datasetRid,
            "branch": "master"
        })
    )

    return len(response.json()["data"])


def choose_profile():

    number_partition = getDatasetFilesStats()
    print("number_partition", number_partition)
    if number_partition > 50 :
        profile = ["DRIVER_MEMORY_SMALL"]
    else :
        profile = ["DRIVER_MEMORY_MEDIUM", "NUM_EXECUTORS_4"]

    print("profile chosed", profile)
    return profile


@configure(choose_profile())
@transform(
    output=Output("output_rid"),
    input_df=Input("input_rid"),
)
def compute(ctx, input_df, output,):
output.write_dataframe(input_df.dataframe())

Yea this type of thing isn’t possible due to the order of operations of a transform.

I would suggest using a dynamic allocation profile and let spark spin up/down depending on demand

Even if you solve the networking issue, your desired setup can’t work with your suggested logic.

What you’re expecting is that your code that you have before the transform will be evaluated at runtime with each build, but that will not be the case!.

Foundry datasets always have a jobSpec that contains all necessary informations for a build, e.g. resources that should be attached to it and the jobSpec is only updated with a commit made to your transformation logic in a code repo :frowning:

You can pull dynamical information at runtime but only if it’s within the transformer.

There is a way around it though, a bit ugly solution but it works, I did apply it successfully to a different project with different requirements, but still kinda make sense to share.

You need to have a way to automatically commit and merge code to the repo, I did use the Logic flows with an API call that creates a file in a folder whenever I need to update the jobSpec.

Potential scenario lets say that you need to have more resources every Monday because you want to retrain a ML model

an API call from a different process creates a file → logic flow creates a PR that is automatically merged → your jobSpec is updated as the function that sets the parameters is reevaluated.

Keep in mind this will only work with very loose merge policies in your main branch as you want the merge to happen automatically, so again, kinda technically possible but make sense for specific use cases and not a generic solution that you’re probably searching for, maybe future automations could be more useful for your case and hopefully at some point we would be able to define the automations ourself :crossed_fingers: :pray:

In your case I would go with @arochat suggestion and use the dynamic allocation and not over engineer it too much :see_no_evil:

Hello,

@arochat @md5, I have a question for your suggestion with dynamic allocation.
We are using dynamic allocation but while running the job, we allocate the maximum of VCPU on our queue.
We have severals jobs running on parallel, normally those jobs are running incrementally but sometime for specific needs (like GDPR or historical data reprocess), we need to snapshot and use bigger profiles.
Due to the high number of parallel job, they are queued as we have a limited number of VCPU in our queue and the daily execution take a longer time.
Do you know how dynamic allocation work and how VCPU are allocated for each job ?

@ali2901 I would guess that Foundry will just internally reserve the VCPUs for the time of the build as defined in your profile, Spark will then at runtime do the up or down scaling based on the needs. In general I would consider your setup as an anti pattern and not recommend to always have the maximum in your profile. Could you not have a separate job for when a historical run needs to run? Also in your case it’s about your assigned limits on the queue but it’s also kinda relevant for compute costs. Spark can assign more resources and then do aggressive downscaling, you’ll pay for the fact that extra resources were requested even if they are used for a very short time e.g. reading files in parallel and then filtering things out.

@md5 Thank you for your reply. It’s very helpful.

Actually, we can’t have seperate jobs as we can’t write into the same output with different jobs.
In fact, we are working with template as we have Dev repos that are templatized and push into production project. As for production we freeze the code, we don’t want to do PR on production repository everytime we have a specific need. That’s why we are trying to set those profiles dynamically. I believe that always having the maximum in our profiles is not the best practice but it’s the only way that we found to handle those particular cases and not run out of memory.
As you said maybe with future automation we could be able to do that but today we are facing foundry limitation to scale the jobs automatically.

@ali2901 you don’t necessary need to write into the same output, depending on your setup your downstream job can take two inputs that represents the same data (increment & snapshot), and use the one that it needs. If you run a snapshot once in a while then downstream would use the full recomputed dataset in other case always only incremental will be used.

# downstream_job.py
@incremental()
@configure(profile=["KUBERNETES_NO_EXECUTORS_SMALL"])
@transform(
    input_inc_dataset=Input("rid.1"),
    snap_dataset=Input("rid.2"),
    output_dataset=Output("rid.3")
)
def compute(input_inc_dataset, snap_dataset, output_dataset, ctx):
    if ctx.is_incremental:
        df = input_inc_dataset.dataframe()
    else:
        df = snap_dataset.dataframe()

   # my complex logic that can work as snap and inc share the same schema

    output_dataset.write_dataframe(df)

this will work because if one of the input dataset have a snapshot transaction then downstream also will need to run as a snapshot, when incremental then the snapshot dataset will be ignored by spark as it’s not used.

1 Like