Hi all,
I’m running into an issue where, after uploading a Parquet file to a dataset and applying the schema (only way I could figure out how make sure the upload turns into a real dataset), the new data is not reliably visible when I read the dataset back—even after waiting 15 seconds. In my tests, about 50% of the time, the new data isn’t there after a successful upload, schema application, and 15 second sleep. I’m wondering if I’m doing something wrong or if this is a known limitation.
How I’m uploading:
-
I upload a Parquet file to the dataset (using the /api/v2/datasets/{datasetRid}/files/data.parquet/upload endpoint with transactionType=SNAPSHOT).
-
I then call the schema inference endpoint (/foundry-schema-inference/api/datasets/{datasetRid}/branches/master/schema) and apply the inferred schema using the metadata endpoint.
-
After a configurable delay (currently 15 seconds), I read the dataset back using the /api/v2/datasets/{datasetRid}/readTable endpoint.
Test script:
I’ve attached a minimal Python script that reproduces the issue. You can configure the delay and see the success/failure rate for yourself.
Questions:
-
Is this delay in data visibility expected?
-
Is my approach to schema application correct, or is there a better/more reliable way to upload and make new data visible?
-
Are there any additional steps or API calls I should be making to ensure data is available for reading immediately after upload?
-
is it just my stack?
The usecase is I am trying to use foundry as a source of truth and read, write access is needed through an external application. Ontology cannot work do to a couple limitations.
Any thoughts would be appreciated.
Thanks,
Jonah
Repro Script – create a new dataset on any stack and insert rid, try running a couple of times
import os
import uuid
import tempfile
import time
from datetime import datetime, timezone
from urllib.parse import quote
import pandas as pd
import pyarrow as pa
import requests
# --- Configuration ---
# This script reads your Foundry token and hostname from environment variables.
# Please ensure `FOUNDRY_TOKEN` and `FOUNDRY_HOSTNAME` are set in your shell.
# Example:
# export FOUNDRY_TOKEN="your-token-here"
# export FOUNDRY_HOSTNAME="https://your-foundry-instance.com"
FOUNDRY_TOKEN = os.getenv("FOUNDRY_TOKEN")
FOUNDRY_HOSTNAME = os.getenv("FOUNDRY_HOSTNAME")
FOUNDRY_DATASET_RID = "ri.foundry.main.dataset.4bfb99e5-646c-4658-bed9-20b426707ff4"
# Optional: Add a delay in seconds after uploading before reading the data back.
DELAY_AFTER_UPLOAD_SECONDS = 15
def convert_arrow_table_to_dataframe(arrow_table: pa.Table) -> pd.DataFrame:
"""Converts a PyArrow Table to a pandas DataFrame with proper type handling."""
df = pd.DataFrame()
for col_name in arrow_table.column_names:
col_data = arrow_table[col_name]
if pa.types.is_timestamp(col_data.type):
temp_series = col_data.to_pandas(timestamp_as_object=True)
df[col_name] = pd.to_datetime(temp_series, utc=True)
else:
df[col_name] = col_data.to_pandas()
return df
def get_dataframe_from_foundry(dataset_rid):
"""Downloads a dataset from Foundry and returns it as a pandas DataFrame."""
print("Downloading data from branch 'master'...")
headers = {"authorization": f"Bearer {FOUNDRY_TOKEN}"}
url = f"{FOUNDRY_HOSTNAME}/api/v2/datasets/{dataset_rid}/readTable"
params = {"branchName": "master", "format": "ARROW"}
response = requests.get(url, headers=headers, params=params, timeout=60.0)
response.raise_for_status()
arrow_table = pa.ipc.open_stream(response.content).read_all()
return convert_arrow_table_to_dataframe(arrow_table)
def apply_schema_on_branch(dataset_rid):
"""Infers and applies a schema for the data on the 'master' branch."""
print("Applying schema on branch 'master'...")
headers = {
"authorization": f"Bearer {FOUNDRY_TOKEN}",
"Content-Type": "application/json",
}
encoded_branch = quote("master", safe="")
# 1. Infer schema from the uploaded parquet file
inference_url = f"{FOUNDRY_HOSTNAME}/foundry-schema-inference/api/datasets/{dataset_rid}/branches/{encoded_branch}/schema"
inference_res = requests.post(inference_url, headers=headers, json={}, timeout=60.0)
inference_res.raise_for_status()
field_schema_list = inference_res.json()["data"]["foundrySchema"]["fieldSchemaList"]
# 2. Apply the inferred schema
apply_url = f"{FOUNDRY_HOSTNAME}/foundry-metadata/api/schemas/datasets/{dataset_rid}/branches/{encoded_branch}"
payload = {
"fieldSchemaList": field_schema_list,
"dataFrameReaderClass": "com.palantir.foundry.spark.input.ParquetDataFrameReader",
"customMetadata": {"format": "parquet"},
}
apply_res = requests.post(apply_url, headers=headers, json=payload, timeout=60.0)
apply_res.raise_for_status()
def upload_dataframe_to_foundry(dataset_rid, df):
"""Uploads a pandas DataFrame to a Foundry dataset as a SNAPSHOT on 'master'."""
print("Uploading DataFrame to branch 'master' as a SNAPSHOT...")
# Use a temporary file to store the parquet representation of the DataFrame
with tempfile.NamedTemporaryFile(suffix=".parquet", delete=True) as temp_file:
df.to_parquet(temp_file.name, index=False)
# Upload the file content
headers = {
"authorization": f"Bearer {FOUNDRY_TOKEN}",
"Content-Type": "application/octet-stream",
}
url = f"{FOUNDRY_HOSTNAME}/api/v2/datasets/{dataset_rid}/files/data.parquet/upload"
params = {"branchName": "master", "transactionType": "SNAPSHOT"}
with open(temp_file.name, "rb") as f:
upload_res = requests.post(
url, headers=headers, params=params, data=f.read(), timeout=120.0
)
upload_res.raise_for_status()
# After uploading, a schema must be applied for the data to be queryable
apply_schema_on_branch(dataset_rid)
def main():
"""Main script to run the data visibility test."""
if not FOUNDRY_TOKEN or not FOUNDRY_HOSTNAME:
print(
"FATAL: Please set FOUNDRY_TOKEN and FOUNDRY_HOSTNAME environment variables."
)
return
print(f"--- Starting test on dataset '{FOUNDRY_DATASET_RID}' ---")
# 1. Generate new, unique data to upload.
upload_uuid = str(uuid.uuid4())
df_to_upload = pd.DataFrame(
{
"id": [upload_uuid],
"timestamp": [datetime.now(timezone.utc).isoformat()],
"description": ["Test data for visibility delay"],
}
)
print(f"Generated new data with UUID: {upload_uuid}")
print(f"DataFrame to upload:\n{df_to_upload}")
# 2. Upload this new data.
upload_dataframe_to_foundry(FOUNDRY_DATASET_RID, df_to_upload)
print("Upload and schema application complete.")
# Pause after uploading
if DELAY_AFTER_UPLOAD_SECONDS > 0:
print(
f"Sleeping for {DELAY_AFTER_UPLOAD_SECONDS} seconds before reading data back..."
)
time.sleep(DELAY_AFTER_UPLOAD_SECONDS)
# 3. Download the data to check if the new content is visible.
print("Fetching data to verify visibility...")
df_after = get_dataframe_from_foundry(FOUNDRY_DATASET_RID)
print(f"Downloaded data has {len(df_after)} rows.")
if not df_after.empty:
print(f"Downloaded DataFrame head:\n{df_after.head()}")
# 4. Verify the result.
print("--- Verifying results ---")
if "id" in df_after.columns and upload_uuid in df_after["id"].values:
print(
f"✅ SUCCESS: The newly uploaded data was retrieved after {DELAY_AFTER_UPLOAD_SECONDS} seconds."
)
else:
print("❌ FAILURE: The newly uploaded data was NOT found.")
print(
f"The data read after {DELAY_AFTER_UPLOAD_SECONDS} seconds appears to be stale."
)
print("--- Test finished ---")
if __name__ == "__main__":
main()