Hi @VincentF ,
I feel like this question comes up a lot and Palantir PD is not presenting a good (first-class) solution to it.
Therefore, sharing with the community how this can be done. You’ll need requests
as dependency in your repository. PD will say this is using “internal” APIs, so use at your own risk.
The code is essentially looking up the open transaction rid of the output and than using a catalog API to rename already written files. by a pattern that you supply. If there are multiple files, it will append partX
, if it’s a single file it will only change the first part of the filename. The extension (e.g. .parquet
) will be left untouched.
"""Rename Files Code"""
import logging
from urllib.parse import quote_plus
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
LOGGER = logging.getLogger(__name__)
def _get_session(retries=3, backoff_factor=0.3):
session = requests.Session()
retry = Retry(
total=retries,
backoff_factor=backoff_factor,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
SESSION = _get_session()
def _is_preview(ctx) -> bool:
return ctx.__class__.__name__ in {
"PreviewTransformContext",
"IncrementalPreviewTransformContext",
}
def _get_last_dataset_transaction(
auth_header: str,
foundry_url: str,
dataset_rid: str,
branch: str,
include_open_exclusive_transaction: bool = False,
) -> dict:
"""Returns the last transaction of a dataset / branch combination.
Args:
auth_header (str): ctx.auth_header
dataset_rid (str): Unique identifier of the dataset
branch (str): Branch
include_open_exclusive_transaction (bool): include_open_exclusive_transaction
Returns:
dict:
of transaction information.
"""
response = _request(
method="GET",
auth_header=auth_header,
url=f"{foundry_url}/foundry-catalog/api/catalog/datasets/"
f"{dataset_rid}/reverse-transactions2/{quote_plus(branch)}",
params={
"pageSize": 1,
"includeOpenExclusiveTransaction": include_open_exclusive_transaction,
},
expected_status_code=200,
)
as_json = response.json()
if "values" in as_json and len(as_json["values"]) > 0:
return as_json["values"][0]
raise ValueError(
f"Could not retrieve last transaction of dataset {dataset_rid} on branch {branch}"
)
def _catalog_rename_files(
auth_header: str,
foundry_url: str,
dataset_rid: str,
transaction_rid: str,
rename_file_request: dict,
):
"""Renames file in an open transaction
Args:
auth_header (str): ctx.auth_header
dataset_rid (str): Unique identifier of the dataset
transaction_rid (str): Transaction Rid
rename_file_request (dict): {'fromLogicalPath': '...', 'toLogicalPath': '...'}
Returns:
dict:
of transaction information.
"""
_request(
method="POST",
auth_header=auth_header,
url=f"{foundry_url}/foundry-catalog/api/catalog/datasets/"
f"{dataset_rid}/transactions/{transaction_rid}/files/rename2",
expected_status_code=204,
json=rename_file_request,
)
def _list_dataset_files(
auth_header: str,
foundry_url: str,
dataset_rid: str,
exclude_hidden_files: bool = True,
view: str = "master",
logical_path: str = None,
detail: bool = False,
*,
include_open_exclusive_transaction: bool = False,
) -> list:
# pylint: disable=too-many-arguments
"""Returns list of internal filenames of a dataset.
Args:
dataset_rid (str): the dataset rid
exclude_hidden_files (bool): if hidden files should be excluded (e.g. _log files)
view (str): branch or transaction rid of the dataset
logical_path (str): If logical_path is absent, returns all files in the view.
If logical_path matches a file exactly, returns just that file.
Otherwise, returns all files in the "directory" of logical_path:
(a slash is added to the end of logicalPath if necessary and a prefix-match is performed)
detail (bool): if passed as True, returns complete response from catalog API, otherwise only
returns logicalPath
include_open_exclusive_transaction (bool): if files added in open transaction should be returned
as well in the response
Returns:
list:
filenames
Raises:
DatasetNotFound: if dataset was not found
"""
def _inner_get(next_page_token=None):
response = _request(
"GET",
auth_header=auth_header,
url=f"{foundry_url}/foundry-catalog/api/catalog/datasets/{dataset_rid}/views2/{quote_plus(view)}/files",
params={
"pageSize": 10000000,
"includeOpenExclusiveTransaction": include_open_exclusive_transaction,
"logicalPath": logical_path,
"excludeHiddenFiles": exclude_hidden_files,
"pageStartLogicalPath": next_page_token,
},
expected_status_code=200,
)
return response.json()
result = []
batch_result = _inner_get(next_page_token=None)
result.extend(batch_result["values"])
while batch_result["nextPageToken"] is not None:
batch_result = _inner_get(next_page_token=batch_result["nextPageToken"])
result.extend(batch_result["values"])
if detail:
return result
return [file["logicalPath"] for file in result]
def _request(
method: str,
auth_header: str,
url: str,
params: dict = None,
expected_status_code: int = 200,
json: dict = None,
) -> requests.Response:
response = SESSION.request(
method=method,
url=url,
headers={"Authorization": auth_header},
json=json,
params=params,
timeout=30,
)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as error:
raise ValueError(
f"Url: {url}\n"
+ f"Fields: {params}\n"
+ f"Body: {json}\n"
+ f"Response Code: {response.status_code}\n"
+ f"Expected Status Code: {expected_status_code}\n"
+ f"Response Body: {response.text}"
) from error
if response.status_code != expected_status_code:
raise ValueError(
f"Url: {url}\n"
+ f"Fields: {params}\n"
+ f"Body: {json}\n"
+ f"Response Code: {response.status_code}\n"
+ f"Expected Status Code: {expected_status_code}\n"
+ f"Response Body: {response.text}"
)
return response
def _extract_suffix(path: str) -> str:
return ".".join(path.split(".")[1::1])
def _rename_files(
ctx,
transform_output,
prefix: str,
foundry_url: str,
):
# find my open transaction
open_transaction = _get_last_dataset_transaction(
auth_header=ctx.auth_header,
foundry_url=foundry_url,
dataset_rid=transform_output.rid,
branch=transform_output.branch,
include_open_exclusive_transaction=True,
)
open_transaction_rid = open_transaction["rid"]
LOGGER.info(f"{open_transaction_rid=}")
# get list of files
files_before_renaming = _list_dataset_files(
auth_header=ctx.auth_header,
foundry_url=foundry_url,
dataset_rid=transform_output.rid,
view=open_transaction_rid,
include_open_exclusive_transaction=True,
)
number_of_files = len(files_before_renaming)
LOGGER.info(f"{number_of_files=} {files_before_renaming=}")
for i, old_path in enumerate(files_before_renaming):
suffix = _extract_suffix(path=old_path)
# do not add part addition to path if there is only one file
part_addition = "" if number_of_files == 1 else f"_part{i}"
new_path = f"{prefix}{part_addition}.{suffix}"
_catalog_rename_files(
auth_header=ctx.auth_header,
foundry_url=foundry_url,
dataset_rid=transform_output.rid,
transaction_rid=open_transaction_rid,
rename_file_request={
"fromLogicalPath": old_path,
"toLogicalPath": new_path,
},
)
LOGGER.info(f"Renaming from {old_path=} {new_path=}")
def rename_files(
ctx,
transform_output,
prefix: str,
foundry_url: str = "https://stack.palantirfoundry.com",
):
"""Renames all files in the output with given prefix, -partXY will be added in case of multiple files in the output.
Args:
ctx (TransformContext): The context of the transform
transform_output (TransformOutput): the output of the transform
prefix (str): prefix for all files, e.g. New_Filename
foundry_url (str): Url to Foundry Stack
Examples:
Example transform:
>>> from .utils import rename_files
>>> from transforms.api import Input, Output, transform
>>>
>>>
>>> @transform(
>>> output=Output(
>>> "output"
>>> ),
>>> my_input=Input(
>>> "input"
>>> ),
>>> )
>>> def example_transform(ctx, my_input, output):
>>> output.write_dataframe(my_input.dataframe())
>>> rename_files(ctx=ctx, transform_output=output, prefix="Combined_Sales_")
"""
if _is_preview(ctx=ctx):
LOGGER.info(
"Can't do renaming in Preview because there is no transaction open."
)
else:
_rename_files(
ctx,
transform_output=transform_output,
prefix=prefix,
foundry_url=foundry_url,
)
You would invoke the code after you have written your dataframe to the output:
from .utils import rename_files
from transforms.api import Input, Output, transform
@transform(
output=Output(
"output"
),
my_input=Input(
"input"
),
)
def example_transform(ctx, my_input, output):
output.write_dataframe(my_input.dataframe())
rename_files(ctx=ctx, transform_output=output, prefix="New_FilePrefix_")
Hope that helps!