Applying a schema to an already structured dataset of Parquet files?

I have a Python External Transform that downloads Parquet files from an external bucket which are already structured and with a known schema (there’s a good reason for not using Data Connection). My download function just writes the files, and I’m not sure how to actually apply the schema without reading the dataset again and saving it via spark.createDataFrame or spark.read.schema(schema).parquet(raw_path), as that would be computationally intensive.

After I run the below transform, I can infer the schema by clicking the “apply schema” button in the Dataset Preview, so I am looking to basically do the analogue action in Python. I guess I could make a POST request to the foundry-schema-inference/api/datasets/<dataset_rid> API endpoint within the transform but that feels very hacky.

Finally, it would be nice to do the schema infer, but it’s also possible to get access to the hardcoded schema as a Python variable if that makes applying the schema easier.

Here’s my code:

def download_dataset(spark, credentials: 'Credential', dataset: 'TransformOutput', dataset_bucket_path: str, *,
                     proxy_creds: 'Credential' = None) -> None:
    """ Download dataset from the GCS Bucket into the given dataset """
    service_account_info = json.loads(credentials.get('service_account_json_key'))
    service_account_info = frozenset(service_account_info.items())  # make hashable for cache
    if proxy_creds is not None:
        proxy_auth = (proxy_creds.get('proxy_username'), proxy_creds.get('proxy_password'))
    else:
        proxy_auth = None

    def _download_file(full_source_path: str) -> None:
        """ Download a single file and save to the dataset """
        bucket = _storage.get_bucket(service_account_info, proxy_auth)
        blob = bucket.blob(full_source_path)
        local_path = full_source_path[len(source_prefix):]  # get just the local path, i.e. bucket/dataset_name/<local path>
        with dataset.filesystem().open(local_path, 'wb') as fp:
            blob.download_to_file(fp)
            logger.info("File '%s' downloaded to dataset path '%s'.", full_source_path, local_path)

    bucket = _storage.get_bucket(service_account_info, proxy_auth)
    source_prefix = _get_source_prefix(dataset_bucket_path)
    full_source_paths = [blob.name for blob in bucket.list_blobs(prefix=source_prefix)]
    rdd = spark.sparkContext.parallelize(full_source_paths)
    rdd.foreach(_download_file)
    logger.info("Full dataset (%d files) downloaded from %s", len(full_source_paths), source_prefix)

that I can just call in a compute function and it will download the files into the dataset, but without the schema:

@transform(output=...)
def compute(output, ...):
    download_dataset(output, ...)

Instead of downloading the files from GCS, have you tried using spark.read.parquet to read the files from GCS directly into a dataframe? You might have to do some configuration of the spark session you use to read the files to properly authenticate with GCS, but you should be able to then do something like:

df = spark.read.parquet('gs://your-bucket-name/path/to/parquet/files")

I’ll also offer https://www.palantir.com/docs/foundry/data-integration/virtual-tables/ as an option here. This supports Parquet and GCS. This will handle all the connectivity and data loading for you.

Nothing beats copying raw bytes in terms of performance and consequently cost of your transform.

I am not aware that you can call the schema inference service on uncommitted transactions. In addition I am not sure if the scoped build token would be accepted by the schema inference service.

You could do try something like this:

  • Read one row of your parquet file into a spark dataframe (e.g. by using @cdesouza proposal or by reading the parquet file from the output filesystem)
  • write the dataframe with 1 row to the output (I assume that this write call will trigger the schema capturing)
  • List the files in the output (to capture the Filename of the 1 row dataframe)
  • Make an api call to get the open transaction id of the output
  • Delete the file from above
  • Now execute your regular code to copy the parquet bytes in parallel

I think this should work with the scoped token from the ctx.