Python Logging from ThreadPoolExecutor in Code Repository Logs

Description: I am facing an issue when working a Python Functions Repository where logging output from a worker thread spawned by a ThreadPoolExecutor is not appearing in the Foundry logs, even though logging from the main thread works as intended.

Additional Context: I am using this pattern to parallelize queries to the ontology via the foundry_sdk as a component of a retrieval batching strategy. For each batch request executing in its own worker thread a helper function is called that handles some minor transformation logic. The main thread computation works correctly, but and logging statements from inside the invoked helper function are not reflected when I try and debug the process at large for efficiency.

What I have Tried:

  • Using a root logger with logging.getLogger
  • Ensured basicConfig is called before any threading
  1. Is this a known limitation of Foundry’s log capture system for computation in threads outside of the main?
  2. Are their recommended patterns for debugging threaded code in Repos?

Expected Behavior: All logging from both main and worker threads appears in logging output returned.

As a mockup example running the following in a local IDE will highlight the desired behavior.

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random


logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(threadName)s] %(levelname)s: %(message)s",
    handlers=[
        logging.StreamHandler()          
    ]
)

logger = logging.getLogger(__name__)


def helper_function(task_id):
    """Simulate a task that logs progress."""
    logger.info(f"Starting task {task_id}")
    try:
        time.sleep(random.uniform(0.5, 2.0))

        # Random chance of warning
        if random.random() < 0.2:
            logger.warning(f"Task {task_id} encountered a minor issue.")

        # Simulate random result
        result = random.randint(100, 999)
        logger.info(f"Completed task {task_id} with result={result}")
        return result

    except Exception as e:
        logger.exception(f"Error in task {task_id}: {e}")
        raise


def run_tasks():
    tasks = list(range(1, 9))
    results = []
    logger.info("Submitting tasks to ThreadPoolExecutor...")

    with ThreadPoolExecutor(max_workers=3) as executor:
        # Submit all tasks at once
        futures = {executor.submit(helper_function, t): t for t in tasks}

        # Collect results as they complete
        for future in as_completed(futures):
            task_id = futures[future]
            try:
                result = future.result()
                logger.info(f"Task {task_id} finished successfully with result={result}")
                results.append((task_id, result))

            except Exception as e:
                logger.error(f"Task {task_id} failed with exception: {e}")

    logger.info(f"All tasks done. Final results: {results}")


if __name__ == "__main__":
    run_tasks()

Encountered Behavior: Only logging statements made in the main thread appear.

2025-10-15 13:35:14,439 [MainThread] INFO: Submitting tasks to ThreadPoolExecutor...
2025-10-15 13:35:15,502 [MainThread] INFO: Task 2 finished successfully with result=276
2025-10-15 13:35:15,640 [MainThread] INFO: Task 1 finished successfully with result=456
2025-10-15 13:35:16,125 [MainThread] INFO: Task 3 finished successfully with result=388
2025-10-15 13:35:16,937 [MainThread] INFO: Task 5 finished successfully with result=391
2025-10-15 13:35:16,983 [MainThread] INFO: Task 4 finished successfully with result=495
2025-10-15 13:35:17,673 [MainThread] INFO: Task 6 finished successfully with result=955
2025-10-15 13:35:17,727 [MainThread] INFO: Task 8 finished successfully with result=352
2025-10-15 13:35:18,500 [MainThread] INFO: Task 7 finished successfully with result=444
2025-10-15 13:35:18,500 [MainThread] INFO: All tasks done. Final results: [(2, 276), (1, 456), (3, 388), (5, 391), (4, 495), (6, 955), (8, 352), (7, 444)]
1 Like

Within a Foundry managed Code Repository, logging conforms with the platform’s context-aware execution model, which uses Python’s contextvars to track metadata such as function invocation details and user context. Unlike standard Python logging, which is globally thread-safe and works seamlessly across threads, Foundry’s logging system relies on these context variables to ensure that every log message is correctly attributed to its execution context.

However, because contextvars are thread-local by design, when you spawn new threads (for example, using ThreadPoolExecutor), the context from the main thread is not automatically available in the child threads. This means that, without explicitly propagating the context using contextvars.copy_context() and running your thread’s target function within that context, log messages from those threads may lack critical context information or may not be logged at all.

As a hacky workaround refer to the following script as a baseline:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
import logging
import contextvars

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(threadName)s] %(levelname)s: %(message)s",
    handlers=[
        logging.StreamHandler()          
    ]
)

logger = logging.getLogger(__name__)


def helper_function(task_id):
    logger.info(f"Starting task {task_id}")
    time.sleep(random.uniform(0.5, 2.0))

    if random.random() < 0.2:
        logger.warning(f"Task {task_id} encountered a minor issue.")

    result = random.randint(100, 999)
    logger.info(f"Completed task {task_id} with result={result}")
    return result


def test_threadpool_logging(n: int = 4):
    logger.info(f"[MainThread] Starting with n={n}")
    results = []

    with ThreadPoolExecutor(max_workers=n) as executor:
        futures = {}

        for task_id in range(n):
            ctx = contextvars.copy_context()
            fut = executor.submit(ctx.run, helper_function, task_id)
            futures[fut] = task_id

        for fut in as_completed(futures):
            task_id = futures[fut]
            try:
                result = fut.result()
                logger.info(f"[MainThread] Got result from {task_id}: {result}")
                results.append(result)
            except Exception:
                logger.exception(f"[MainThread] Task {task_id} failed")

    logger.info("[MainThread] Finished")

    return results