How can I set the reasoningEffort on GPT-5 via function?

I want to use the “reasoningEffort” for GPT-5 when I’m calling it from my Functions, in a code repository, in Foundry. How can I do so ?

Here is an example code:

import { Function } from "@foundry/functions-api"
// NOTE: Model Imports must be manually added through "Resource Imports" in the left-hand panel
import { GPT_5 } from "@foundry/models-api/language-models"

/**
 * Used to send a text completion request to the model based on user input
 * @param {string} userInput - Text input to send to model
 */
export class MyFunctions {
    @Function()
    public async createChatCompletion(userInput: string): Promise<string | undefined> {
        const response = await GPT_5.createChatCompletion({
            params: {
                "temperature": 1.0,
                "maxTokens": 1000,
                "reasoningEffort": "Minimal"
            },
            messages: [{ role: "SYSTEM", contents: [{ text: "This is system prompt for completion" }] }, { role: "USER", contents: [{ text: userInput }],  }],
        });
        return response.choices[0].message.content;
    }
}

This code is mostly generated via Model Catalog that you can find in Foundry directly: https://www.palantir.com/docs/foundry/model-catalog/overview

Note that this parameter is available in AIP Logic too !

1 Like

I have a few questions on using GPT 5 and its new params using CompletionOrchestrator on pyspark transfomrs, here they are:

(they were collected a couple of months ago, so they may be obsolete!)

  1. Is there any source of documentation on CompletionOrchestrator? All I have seen is the docstrings on the class and nothing else, I wonder if there’s any other source of info that could help us out.
  2. When running the orchestrator with verbose=True, the output lists supported models, but I do not see GPT-4.1 Mini/Nano or GPT-5 in that list. We are running (seemingly without any issues) 4.1 Mini, but if we want to use GPT5 I don’t think the Orchestrator would handle the new params. Is there any ongoing work to add that capacity?
  3. We are seeing a high number of “echo” responses from the orchestrator, but these are not accompanied by any error messages in the output (e.g., in the _completion_error column). Is it possible that the orchestrator is not properly surfacing rate limit errors or other API errors?

Given this is in the context of pipelines, this would be worth its own topic.

  1. Depends what you are looking for specific, but orchestrators have some documentation there: https://www.palantir.com/docs/foundry/transforms-python-spark/aip-orchestrators
  2. Given the warning at the top of the page, it is possible. I’m not aware if it does support it.
  3. For “echo” responses, I would be curious if you still face that today. Especially in sparks transforms, it might be due to some executors-related-interferences, so using KUBERNETES_NO_EXECUTORS configuration parameter should help. Reducing the number of concurrent workers (e.g. lowering driver and core memory) might help too. Batching into smaller runs would help too.

that warning at the top of the page is probably more than enough to start working on a better solution than orchestrators :smiley: Thanks Vincent!

If you want some code to boostrap your work, you can get inspiration from the below.
This is fairly old code, but it easily runs a large number of API calls quickly, while still giving you control over the parallelism of the calls, which is important to not saturate LLMs apis in “one batch”.

from transforms.api import transform, Input, Output, TransformInput, TransformOutput, lightweight
import asyncio
import logging
import time
import pandas as pd
import myproject.datasets.utils as utils
'''
async def process_task_async_sleep(task_id):
    await my_processing_function_is_here()
    return task_id

'''

def get_rows(input_dataset):
    # Read the input dataset (lazy execution is not relevant given it's immediately collected)
    input_dataset = input_dataset.polars()

    # Convert the dataset to a list of dictionaries for more efficient row iteration
    rows = input_dataset.to_dicts()
    return rows


@lightweight(cpu_cores=8)
@transform(
    example_input=Input("/path/to/my_input"),
    example_output=Output("/path/to/lightweight_asyncio_8-cpu_with-semaphore"),
)
def compute(example_input:TransformInput, example_output: TransformOutput):
    # Get the input rows
    rows =get_rows(example_input)

    async def processing():
        semaphore_limit = 10  # Limit of concurrent tasks

        logging.warning("Running with async event loop")
        start_time = time.time()

        # Create a semaphore to limit the number of concurrent tasks
        semaphore = asyncio.Semaphore(semaphore_limit)

        async def sem_task(task_id):
            async with semaphore:
                return await utils.process_task_async_your_function(task_id)

        # Create tasks with the semaphore
        tasks = [asyncio.create_task(sem_task(row["id"])) for row in rows]
        done, pending = await asyncio.wait(tasks)

        results = []
        for task in done:
            try:
                result = task.result()
                results.append(result)
            except Exception as e:
                logging.error(f"Task failed with exception: {e}")

        end_time = time.time()
        logging.warning(f"async event loop completed in {end_time - start_time:.2f} seconds")

        # Sample data
        df_threadpool = pd.DataFrame(results, columns=['ThreadPoolResult'])
        example_output.write_pandas(df_threadpool)

    asyncio.run(processing())