Rename files of my output in place in Transforms

I have a transforms that output parquet files.
I would like to rename those parquet files because I will export them via Data Connection Exports.
I don’t want to duplicate my data and the complexity of my pipeline, by creating yet another transforms to only “rename” the files as I want.

How can I save my dataframe to my output dataset, and then rename the parquet files written in place ?

I surely want to do something along the lines of:

@transform(
    output_dataset=Output("/PATH/example_output_dataset"),
    input_dataset=Input("/PATH/example_input_dataset"),
)
def compute(input_dataset, output_dataset):
    # Repartition the input dataset and write it on the output
    repartitioned_df = input_dataset.dataframe().repartition(1)
    output_dataset.write_dataframe(repartitioned_df)

    # Rename the files once written
    # Get the filesystems handles
    fs_out = output_dataset.filesystem()
    logging.warning(list(fs_out.ls()))

    # Save the DataFrame as a CSV file
    for curr_file in fs_out.ls():
        # Assuming we have some logic to generate file names, in folders, etc. Hardcoding only one file name here.
        target_file_path = "1.parquet"
        logging.warning(f"curr_file {curr_file}, target_file_path {target_file_path}")

        with fs_out.open(curr_file.path, "rb") as f_in:
            # Rename here ?
            # Move and delete old files ?

Hi @VincentF ,

I feel like this question comes up a lot and Palantir PD is not presenting a good (first-class) solution to it.

Therefore, sharing with the community how this can be done. You’ll need requests as dependency in your repository. PD will say this is using “internal” APIs, so use at your own risk.

The code is essentially looking up the open transaction rid of the output and than using a catalog API to rename already written files. by a pattern that you supply. If there are multiple files, it will append partX, if it’s a single file it will only change the first part of the filename. The extension (e.g. .parquet) will be left untouched.

"""Rename Files Code"""
import logging
from urllib.parse import quote_plus

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

LOGGER = logging.getLogger(__name__)


def _get_session(retries=3, backoff_factor=0.3):
    session = requests.Session()
    retry = Retry(
        total=retries,
        backoff_factor=backoff_factor,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    return session


SESSION = _get_session()


def _is_preview(ctx) -> bool:
    return ctx.__class__.__name__ in {
        "PreviewTransformContext",
        "IncrementalPreviewTransformContext",
    }


def _get_last_dataset_transaction(
    auth_header: str,
    foundry_url: str,
    dataset_rid: str,
    branch: str,
    include_open_exclusive_transaction: bool = False,
) -> dict:
    """Returns the last transaction of a dataset / branch combination.


    Args:
        auth_header (str): ctx.auth_header
        dataset_rid (str): Unique identifier of the dataset
        branch (str): Branch
        include_open_exclusive_transaction (bool): include_open_exclusive_transaction

    Returns:
        dict:
            of transaction information.

    """
    response = _request(
        method="GET",
        auth_header=auth_header,
        url=f"{foundry_url}/foundry-catalog/api/catalog/datasets/"
        f"{dataset_rid}/reverse-transactions2/{quote_plus(branch)}",
        params={
            "pageSize": 1,
            "includeOpenExclusiveTransaction": include_open_exclusive_transaction,
        },
        expected_status_code=200,
    )
    as_json = response.json()
    if "values" in as_json and len(as_json["values"]) > 0:
        return as_json["values"][0]
    raise ValueError(
        f"Could not retrieve last transaction of dataset {dataset_rid} on branch {branch}"
    )


def _catalog_rename_files(
    auth_header: str,
    foundry_url: str,
    dataset_rid: str,
    transaction_rid: str,
    rename_file_request: dict,
):
    """Renames file in an open transaction


    Args:
        auth_header (str): ctx.auth_header
        dataset_rid (str): Unique identifier of the dataset
        transaction_rid (str): Transaction Rid
        rename_file_request (dict): {'fromLogicalPath': '...', 'toLogicalPath': '...'}

    Returns:
        dict:
            of transaction information.

    """
    _request(
        method="POST",
        auth_header=auth_header,
        url=f"{foundry_url}/foundry-catalog/api/catalog/datasets/"
        f"{dataset_rid}/transactions/{transaction_rid}/files/rename2",
        expected_status_code=204,
        json=rename_file_request,
    )


def _list_dataset_files(
    auth_header: str,
    foundry_url: str,
    dataset_rid: str,
    exclude_hidden_files: bool = True,
    view: str = "master",
    logical_path: str = None,
    detail: bool = False,
    *,
    include_open_exclusive_transaction: bool = False,
) -> list:
    # pylint: disable=too-many-arguments
    """Returns list of internal filenames of a dataset.

    Args:
        dataset_rid (str): the dataset rid
        exclude_hidden_files (bool): if hidden files should be excluded (e.g. _log files)
        view (str): branch or transaction rid of the dataset
        logical_path (str): If logical_path is absent, returns all files in the view.
            If logical_path matches a file exactly, returns just that file.
            Otherwise, returns all files in the "directory" of logical_path:
            (a slash is added to the end of logicalPath if necessary and a prefix-match is performed)
        detail (bool): if passed as True, returns complete response from catalog API, otherwise only
                        returns logicalPath
        include_open_exclusive_transaction (bool): if files added in open transaction should be returned
                                                    as well in the response

    Returns:
        list:
            filenames

    Raises:
        DatasetNotFound: if dataset was not found
    """

    def _inner_get(next_page_token=None):
        response = _request(
            "GET",
            auth_header=auth_header,
            url=f"{foundry_url}/foundry-catalog/api/catalog/datasets/{dataset_rid}/views2/{quote_plus(view)}/files",
            params={
                "pageSize": 10000000,
                "includeOpenExclusiveTransaction": include_open_exclusive_transaction,
                "logicalPath": logical_path,
                "excludeHiddenFiles": exclude_hidden_files,
                "pageStartLogicalPath": next_page_token,
            },
            expected_status_code=200,
        )
        return response.json()

    result = []
    batch_result = _inner_get(next_page_token=None)
    result.extend(batch_result["values"])
    while batch_result["nextPageToken"] is not None:
        batch_result = _inner_get(next_page_token=batch_result["nextPageToken"])
        result.extend(batch_result["values"])
    if detail:
        return result
    return [file["logicalPath"] for file in result]


def _request(
    method: str,
    auth_header: str,
    url: str,
    params: dict = None,
    expected_status_code: int = 200,
    json: dict = None,
) -> requests.Response:
    response = SESSION.request(
        method=method,
        url=url,
        headers={"Authorization": auth_header},
        json=json,
        params=params,
        timeout=30,
    )
    try:
        response.raise_for_status()
    except requests.exceptions.HTTPError as error:
        raise ValueError(
            f"Url: {url}\n"
            + f"Fields: {params}\n"
            + f"Body: {json}\n"
            + f"Response Code: {response.status_code}\n"
            + f"Expected Status Code: {expected_status_code}\n"
            + f"Response Body: {response.text}"
        ) from error
    if response.status_code != expected_status_code:
        raise ValueError(
            f"Url: {url}\n"
            + f"Fields: {params}\n"
            + f"Body: {json}\n"
            + f"Response Code: {response.status_code}\n"
            + f"Expected Status Code: {expected_status_code}\n"
            + f"Response Body: {response.text}"
        )
    return response


def _extract_suffix(path: str) -> str:
    return ".".join(path.split(".")[1::1])


def _rename_files(
    ctx,
    transform_output,
    prefix: str,
    foundry_url: str,
):
    # find my open transaction
    open_transaction = _get_last_dataset_transaction(
        auth_header=ctx.auth_header,
        foundry_url=foundry_url,
        dataset_rid=transform_output.rid,
        branch=transform_output.branch,
        include_open_exclusive_transaction=True,
    )
    open_transaction_rid = open_transaction["rid"]
    LOGGER.info(f"{open_transaction_rid=}")

    # get list of files
    files_before_renaming = _list_dataset_files(
        auth_header=ctx.auth_header,
        foundry_url=foundry_url,
        dataset_rid=transform_output.rid,
        view=open_transaction_rid,
        include_open_exclusive_transaction=True,
    )
    number_of_files = len(files_before_renaming)
    LOGGER.info(f"{number_of_files=} {files_before_renaming=}")

    for i, old_path in enumerate(files_before_renaming):
        suffix = _extract_suffix(path=old_path)
        # do not add part addition to path if there is only one file
        part_addition = "" if number_of_files == 1 else f"_part{i}"
        new_path = f"{prefix}{part_addition}.{suffix}"
        _catalog_rename_files(
            auth_header=ctx.auth_header,
            foundry_url=foundry_url,
            dataset_rid=transform_output.rid,
            transaction_rid=open_transaction_rid,
            rename_file_request={
                "fromLogicalPath": old_path,
                "toLogicalPath": new_path,
            },
        )
        LOGGER.info(f"Renaming from {old_path=} {new_path=}")


def rename_files(
    ctx,
    transform_output,
    prefix: str,
    foundry_url: str = "https://stack.palantirfoundry.com",
):
    """Renames all files in the output with given prefix, -partXY will be added in case of multiple files in the output.

    Args:
        ctx (TransformContext): The context of the transform
        transform_output (TransformOutput): the output of the transform
        prefix (str): prefix for all files, e.g. New_Filename
        foundry_url (str): Url to Foundry Stack
    Examples:
        Example transform:

        >>> from .utils import rename_files
        >>> from transforms.api import Input, Output, transform
        >>>
        >>>
        >>> @transform(
        >>>     output=Output(
        >>>         "output"
        >>>     ),
        >>>     my_input=Input(
        >>>         "input"
        >>>     ),
        >>> )
        >>> def example_transform(ctx, my_input, output):
        >>>     output.write_dataframe(my_input.dataframe())
        >>>     rename_files(ctx=ctx, transform_output=output, prefix="Combined_Sales_")

    """
    if _is_preview(ctx=ctx):
        LOGGER.info(
            "Can't do renaming in Preview because there is no transaction open."
        )
    else:
        _rename_files(
            ctx,
            transform_output=transform_output,
            prefix=prefix,
            foundry_url=foundry_url,
        )

You would invoke the code after you have written your dataframe to the output:

from .utils import rename_files
from transforms.api import Input, Output, transform


@transform(
    output=Output(
        "output"
    ),
    my_input=Input(
        "input"
    ),
)
def example_transform(ctx, my_input, output):
    output.write_dataframe(my_input.dataframe())
    rename_files(ctx=ctx, transform_output=output, prefix="New_FilePrefix_")

Hope that helps!

3 Likes