I have a Python External Transform that downloads Parquet files from an external bucket which are already structured and with a known schema (there’s a good reason for not using Data Connection). My download function just writes the files, and I’m not sure how to actually apply the schema without reading the dataset again and saving it via spark.createDataFrame
or spark.read.schema(schema).parquet(raw_path)
, as that would be computationally intensive.
After I run the below transform, I can infer the schema by clicking the “apply schema” button in the Dataset Preview, so I am looking to basically do the analogue action in Python. I guess I could make a POST request to the foundry-schema-inference/api/datasets/<dataset_rid>
API endpoint within the transform but that feels very hacky.
Finally, it would be nice to do the schema infer, but it’s also possible to get access to the hardcoded schema as a Python variable if that makes applying the schema easier.
Here’s my code:
def download_dataset(spark, credentials: 'Credential', dataset: 'TransformOutput', dataset_bucket_path: str, *,
proxy_creds: 'Credential' = None) -> None:
""" Download dataset from the GCS Bucket into the given dataset """
service_account_info = json.loads(credentials.get('service_account_json_key'))
service_account_info = frozenset(service_account_info.items()) # make hashable for cache
if proxy_creds is not None:
proxy_auth = (proxy_creds.get('proxy_username'), proxy_creds.get('proxy_password'))
else:
proxy_auth = None
def _download_file(full_source_path: str) -> None:
""" Download a single file and save to the dataset """
bucket = _storage.get_bucket(service_account_info, proxy_auth)
blob = bucket.blob(full_source_path)
local_path = full_source_path[len(source_prefix):] # get just the local path, i.e. bucket/dataset_name/<local path>
with dataset.filesystem().open(local_path, 'wb') as fp:
blob.download_to_file(fp)
logger.info("File '%s' downloaded to dataset path '%s'.", full_source_path, local_path)
bucket = _storage.get_bucket(service_account_info, proxy_auth)
source_prefix = _get_source_prefix(dataset_bucket_path)
full_source_paths = [blob.name for blob in bucket.list_blobs(prefix=source_prefix)]
rdd = spark.sparkContext.parallelize(full_source_paths)
rdd.foreach(_download_file)
logger.info("Full dataset (%d files) downloaded from %s", len(full_source_paths), source_prefix)
that I can just call in a compute function and it will download the files into the dataset, but without the schema:
@transform(output=...)
def compute(output, ...):
download_dataset(output, ...)