Dataset stale after API upload (50% Failure after 15+ second delay)

Hi all,

I’m running into an issue where, after uploading a Parquet file to a dataset and applying the schema (only way I could figure out how make sure the upload turns into a real dataset), the new data is not reliably visible when I read the dataset back—even after waiting 15 seconds. In my tests, about 50% of the time, the new data isn’t there after a successful upload, schema application, and 15 second sleep. I’m wondering if I’m doing something wrong or if this is a known limitation.

How I’m uploading:

  1. I upload a Parquet file to the dataset (using the /api/v2/datasets/{datasetRid}/files/data.parquet/upload endpoint with transactionType=SNAPSHOT).

  2. I then call the schema inference endpoint (/foundry-schema-inference/api/datasets/{datasetRid}/branches/master/schema) and apply the inferred schema using the metadata endpoint.

  3. After a configurable delay (currently 15 seconds), I read the dataset back using the /api/v2/datasets/{datasetRid}/readTable endpoint.

Test script:

I’ve attached a minimal Python script that reproduces the issue. You can configure the delay and see the success/failure rate for yourself.

Questions:

  • Is this delay in data visibility expected?

  • Is my approach to schema application correct, or is there a better/more reliable way to upload and make new data visible?

  • Are there any additional steps or API calls I should be making to ensure data is available for reading immediately after upload?

  • is it just my stack?

The usecase is I am trying to use foundry as a source of truth and read, write access is needed through an external application. Ontology cannot work do to a couple limitations.

Any thoughts would be appreciated.

Thanks,
Jonah


Repro Script – create a new dataset on any stack and insert rid, try running a couple of times

import os
import uuid
import tempfile
import time
from datetime import datetime, timezone
from urllib.parse import quote

import pandas as pd
import pyarrow as pa
import requests

# --- Configuration ---

# This script reads your Foundry token and hostname from environment variables.
# Please ensure `FOUNDRY_TOKEN` and `FOUNDRY_HOSTNAME` are set in your shell.
# Example:
# export FOUNDRY_TOKEN="your-token-here"
# export FOUNDRY_HOSTNAME="https://your-foundry-instance.com"
FOUNDRY_TOKEN = os.getenv("FOUNDRY_TOKEN")
FOUNDRY_HOSTNAME = os.getenv("FOUNDRY_HOSTNAME")
FOUNDRY_DATASET_RID = "ri.foundry.main.dataset.4bfb99e5-646c-4658-bed9-20b426707ff4"
# Optional: Add a delay in seconds after uploading before reading the data back.
DELAY_AFTER_UPLOAD_SECONDS = 15


def convert_arrow_table_to_dataframe(arrow_table: pa.Table) -> pd.DataFrame:
    """Converts a PyArrow Table to a pandas DataFrame with proper type handling."""
    df = pd.DataFrame()
    for col_name in arrow_table.column_names:
        col_data = arrow_table[col_name]
        if pa.types.is_timestamp(col_data.type):
            temp_series = col_data.to_pandas(timestamp_as_object=True)
            df[col_name] = pd.to_datetime(temp_series, utc=True)
        else:
            df[col_name] = col_data.to_pandas()
    return df


def get_dataframe_from_foundry(dataset_rid):
    """Downloads a dataset from Foundry and returns it as a pandas DataFrame."""
    print("Downloading data from branch 'master'...")
    headers = {"authorization": f"Bearer {FOUNDRY_TOKEN}"}
    url = f"{FOUNDRY_HOSTNAME}/api/v2/datasets/{dataset_rid}/readTable"
    params = {"branchName": "master", "format": "ARROW"}
    response = requests.get(url, headers=headers, params=params, timeout=60.0)
    response.raise_for_status()
    arrow_table = pa.ipc.open_stream(response.content).read_all()
    return convert_arrow_table_to_dataframe(arrow_table)


def apply_schema_on_branch(dataset_rid):
    """Infers and applies a schema for the data on the 'master' branch."""
    print("Applying schema on branch 'master'...")
    headers = {
        "authorization": f"Bearer {FOUNDRY_TOKEN}",
        "Content-Type": "application/json",
    }
    encoded_branch = quote("master", safe="")

    # 1. Infer schema from the uploaded parquet file
    inference_url = f"{FOUNDRY_HOSTNAME}/foundry-schema-inference/api/datasets/{dataset_rid}/branches/{encoded_branch}/schema"
    inference_res = requests.post(inference_url, headers=headers, json={}, timeout=60.0)
    inference_res.raise_for_status()
    field_schema_list = inference_res.json()["data"]["foundrySchema"]["fieldSchemaList"]

    # 2. Apply the inferred schema
    apply_url = f"{FOUNDRY_HOSTNAME}/foundry-metadata/api/schemas/datasets/{dataset_rid}/branches/{encoded_branch}"
    payload = {
        "fieldSchemaList": field_schema_list,
        "dataFrameReaderClass": "com.palantir.foundry.spark.input.ParquetDataFrameReader",
        "customMetadata": {"format": "parquet"},
    }
    apply_res = requests.post(apply_url, headers=headers, json=payload, timeout=60.0)
    apply_res.raise_for_status()


def upload_dataframe_to_foundry(dataset_rid, df):
    """Uploads a pandas DataFrame to a Foundry dataset as a SNAPSHOT on 'master'."""
    print("Uploading DataFrame to branch 'master' as a SNAPSHOT...")
    # Use a temporary file to store the parquet representation of the DataFrame
    with tempfile.NamedTemporaryFile(suffix=".parquet", delete=True) as temp_file:
        df.to_parquet(temp_file.name, index=False)

        # Upload the file content
        headers = {
            "authorization": f"Bearer {FOUNDRY_TOKEN}",
            "Content-Type": "application/octet-stream",
        }
        url = f"{FOUNDRY_HOSTNAME}/api/v2/datasets/{dataset_rid}/files/data.parquet/upload"
        params = {"branchName": "master", "transactionType": "SNAPSHOT"}
        with open(temp_file.name, "rb") as f:
            upload_res = requests.post(
                url, headers=headers, params=params, data=f.read(), timeout=120.0
            )
            upload_res.raise_for_status()

    # After uploading, a schema must be applied for the data to be queryable
    apply_schema_on_branch(dataset_rid)


def main():
    """Main script to run the data visibility test."""
    if not FOUNDRY_TOKEN or not FOUNDRY_HOSTNAME:
        print(
            "FATAL: Please set FOUNDRY_TOKEN and FOUNDRY_HOSTNAME environment variables."
        )
        return

    print(f"--- Starting test on dataset '{FOUNDRY_DATASET_RID}' ---")

    # 1. Generate new, unique data to upload.
    upload_uuid = str(uuid.uuid4())
    df_to_upload = pd.DataFrame(
        {
            "id": [upload_uuid],
            "timestamp": [datetime.now(timezone.utc).isoformat()],
            "description": ["Test data for visibility delay"],
        }
    )
    print(f"Generated new data with UUID: {upload_uuid}")
    print(f"DataFrame to upload:\n{df_to_upload}")

    # 2. Upload this new data.
    upload_dataframe_to_foundry(FOUNDRY_DATASET_RID, df_to_upload)
    print("Upload and schema application complete.")

    # Pause after uploading
    if DELAY_AFTER_UPLOAD_SECONDS > 0:
        print(
            f"Sleeping for {DELAY_AFTER_UPLOAD_SECONDS} seconds before reading data back..."
        )
        time.sleep(DELAY_AFTER_UPLOAD_SECONDS)

    # 3. Download the data to check if the new content is visible.
    print("Fetching data to verify visibility...")
    df_after = get_dataframe_from_foundry(FOUNDRY_DATASET_RID)
    print(f"Downloaded data has {len(df_after)} rows.")
    if not df_after.empty:
        print(f"Downloaded DataFrame head:\n{df_after.head()}")

    # 4. Verify the result.
    print("--- Verifying results ---")
    if "id" in df_after.columns and upload_uuid in df_after["id"].values:
        print(
            f"✅ SUCCESS: The newly uploaded data was retrieved after {DELAY_AFTER_UPLOAD_SECONDS} seconds."
        )
    else:
        print("❌ FAILURE: The newly uploaded data was NOT found.")
        print(
            f"The data read after {DELAY_AFTER_UPLOAD_SECONDS} seconds appears to be stale."
        )

    print("--- Test finished ---")


if __name__ == "__main__":
    main()

Hi @jadler,

one nice property of the S3 compatible API for datasets is that it tries to add read after write semantics. You can leverage to upload files and when the first read request arrives at the API it will automatically commit the transaction.

I have rewritten your script a bit and leveraged foundry-dev-tools which offers nice integrations with pandas through fsspec. I also added an SQL Query at the end to show that it is consistent.

To run this script you’ll need to install foundry-dev-tools with pip "install foundry-dev-tools[s3]".

Anyway, here is the code:

import os
import time
import uuid
from datetime import datetime, timezone

import pandas as pd

from foundry_dev_tools import FoundryContext, JWTTokenProvider

# --- Configuration ---

FOUNDRY_TOKEN = os.getenv("FOUNDRY_TOKEN")
FOUNDRY_HOSTNAME = os.getenv("FOUNDRY_HOSTNAME")  # e.g. stack.palantirfoundry.com
FOUNDRY_DATASET_RID = "ri.foundry.main.dataset.a5d71d93-8b61-4027-acdd-2c7d97121507"
# Optional: Add a delay in seconds after uploading before reading the data back.
DELAY_AFTER_UPLOAD_SECONDS = 0.01

ctx = FoundryContext(
    token_provider=JWTTokenProvider(
        host=FOUNDRY_HOSTNAME,
        jwt=FOUNDRY_TOKEN,
    )
)


def get_dataframe_from_foundry(dataset_rid) -> pd.DataFrame:
    """Downloads a dataset from Foundry and returns it as a pandas DataFrame."""
    print("Downloading data from branch 'master'...")
    return pd.read_parquet(f"s3://{dataset_rid}", storage_options=ctx.s3.get_s3fs_storage_options())


def get_dataframe_from_foundry_sql(dataset_rid) -> pd.DataFrame:
    """Downloads a dataset from Foundry and returns it as a pandas DataFrame."""
    print("Querying data from branch 'master' using SQL query...")
    return ctx.foundry_sql_server.query_foundry_sql(f"SELECT * FROM `{dataset_rid}`")


def apply_schema_on_branch(dataset_rid):
    """Infers and applies a schema for the data on the 'master' branch."""

    print("Applying schema on branch 'master'...")

    # 1. Infer schema from the uploaded parquet file
    field_schema_list = ctx.schema_inference.infer_dataset_schema(dataset_rid=dataset_rid, branch="master")

    # 2. get last transaction rid
    last_transaction_rid = ctx.foundry_rest_client.get_dataset_last_transaction_rid(
        dataset_rid=dataset_rid, branch="master"
    )

    # 3. Apply the inferred schema
    ctx.metadata.api_upload_dataset_schema(
        FOUNDRY_DATASET_RID, transaction_rid=last_transaction_rid, schema=field_schema_list, branch="master"
    )

    print("Applying schema on branch 'master'... COMPLETED")


def upload_dataframe_to_foundry(dataset_rid, df):
    """Uploads a pandas DataFrame to a Foundry dataset as a SNAPSHOT on 'master'."""
    print("Uploading DataFrame to branch 'master' as a SNAPSHOT...")
    # Use a temporary file to store the parquet representation of the DataFrame

    df.to_parquet(f"s3://{dataset_rid}/output.parquet", storage_options=ctx.s3.get_s3fs_storage_options())


def main():
    """Main script to run the data visibility test."""
    print(f"--- Starting test on dataset '{FOUNDRY_DATASET_RID}' ---")

    # 1. Generate new, unique data to upload.
    upload_uuid = str(uuid.uuid4())
    df_to_upload = pd.DataFrame(
        {
            "id": [upload_uuid],
            "timestamp": [datetime.now(timezone.utc).isoformat()],
            "description": ["Test data for visibility delay"],
        }
    )
    print(f"Generated new data with UUID: {upload_uuid}")
    print(f"DataFrame to upload:\n{df_to_upload}")

    # 2. Upload this new data using s3 compatible API.
    upload_dataframe_to_foundry(FOUNDRY_DATASET_RID, df_to_upload)
    print("Upload and schema application complete.")

    # Pause after uploading
    if DELAY_AFTER_UPLOAD_SECONDS > 0:
        print(f"Sleeping for {DELAY_AFTER_UPLOAD_SECONDS} seconds before reading data back...")
        time.sleep(DELAY_AFTER_UPLOAD_SECONDS)

    # 4. Download the data to force the transaction to commit.
    print("Fetching data to verify visibility...")
    df_after = get_dataframe_from_foundry(FOUNDRY_DATASET_RID)
    print(f"Downloaded data has {len(df_after)} rows.")
    if not df_after.empty:
        print(f"Downloaded DataFrame head:\n{df_after.head()}")

        # 5. Verify the result.
        print("--- Verifying results ---")
        if "id" in df_after.columns and upload_uuid in df_after["id"].values:
            print(f"✅ SUCCESS: The newly uploaded data was retrieved after {DELAY_AFTER_UPLOAD_SECONDS} seconds.")
        else:
            print("❌ FAILURE: The newly uploaded data was NOT found.")
            print(f"The data read after {DELAY_AFTER_UPLOAD_SECONDS} seconds appears to be stale.")

    # 6. After uploading, a schema must be applied for the data to be queryable
    apply_schema_on_branch(FOUNDRY_DATASET_RID)

    # 7. Query using Foundry SQL Server
    df_after_sql = get_dataframe_from_foundry_sql(FOUNDRY_DATASET_RID)
    print(f"Downloaded data has {len(df_after)} rows.")
    if "id" in df_after_sql.columns and upload_uuid in df_after_sql["id"].values:
        print(
            f"✅ SUCCESS: The newly uploaded data was retrieved using SQL after {DELAY_AFTER_UPLOAD_SECONDS} seconds."
        )

    print("--- Test finished ---")


if __name__ == "__main__":
    main()


Here is the script output:

--- Starting test on dataset 'ri.foundry.main.dataset.a5d71d93-8b61-4027-acdd-2c7d97121507' ---
Generated new data with UUID: 69004b44-f43c-4efe-9d71-0fe07899032f
DataFrame to upload:
                                     id  ...                     description
0  69004b44-f43c-4efe-9d71-0fe07899032f  ...  Test data for visibility delay

[1 rows x 3 columns]
Uploading DataFrame to branch 'master' as a SNAPSHOT...
Upload and schema application complete.
Sleeping for 0.01 seconds before reading data back...
Fetching data to verify visibility...
Downloading data from branch 'master'...
Downloaded data has 1 rows.
Downloaded DataFrame head:
                                     id  ...                     description
0  69004b44-f43c-4efe-9d71-0fe07899032f  ...  Test data for visibility delay

[1 rows x 3 columns]
--- Verifying results ---
✅ SUCCESS: The newly uploaded data was retrieved after 0.01 seconds.
Applying schema on branch 'master'...
Applying schema on branch 'master'... COMPLETED
Querying data from branch 'master' using SQL query...
Downloaded data has 1 rows.
✅ SUCCESS: The newly uploaded data was retrieved using SQL after 0.01 seconds.
--- Test finished ---

And if you want to look at the code, it’s all on Github

https://github.com/emdgroup/foundry-dev-tools

1 Like