How can I PGP decrypt ingested files in Foundry?

I’m ingesting files that are PGP encrypted (e.g. files that were stored encrypted on a FTP server).

How can I decrypt those files once ingested in Foundry ?

1 Like

You have different options:

  1. You setup a source + an PGP decryption transformers. The files received in Foundry will be deciphered and ready to consume. Lowest complexity of all solutions
  2. You ingest the files as any other ingest (no decryption) and do a transform downstream to decipher the files
  3. You do an external transforms, where you load the files, and decrypt them after their ingest and before writing them to the final output dataset. The files will no be ciphered in Foundry, but you handled all the complexity (connection, PGP, …)

Explanation for [1]
You can use file-based transformers directly in data-connection:
You will need to configure the PGP keys on the agent, and provide the path to those PGP keys (the specific place on the agent machine where the private (or secret) keys are stored)

You can run commands like:

gpg --list-keys
gpg --list-secret-keys

and use the path to the keyring: /path/to/your/custom/keyring/directory/ (not sure if folder or the file directly)

Code snippet for [2]
You can ingest the files encrypted and you can decrypt your files in a transform, by doing something like below.
This is inspired from transforms of unstructured files, and using PGP libs.

@transform(
    out=Output(""),
    source_df=Input("")
)
def compute(source_df, out):
    key, _ = pgpy.PGPKey.from_blob(pgp_key_string)
    for f_name in source_df.filesystem().ls():
        with source_df.filesystem().open(f_name.path, 'rb') as f, out.filesystem().open('new_file', 'wb') as g:
            enc_file_stream = pgpy.PGPMessage.from_blob(f.read())
            plaintext = key.decrypt(enc_file_stream).message
            g.write(plaintext)

Explanation for [3]

You will need to write an external transform and use a library that handles connection to FTPs.

2 Likes

Hi @VincentF,

I tried to use second option but i have struck on pgp_key_string, i got .asc file for my case.

You can store the key as a string in a secret in the Data Connection Source.
https://www.palantir.com/docs/foundry/data-connection/core-concepts#credentials

The relevant example:

@external_systems(
    poke_source=Source("ri.magritte..source.e301d738-b532-431a-8bda-fa211228bba6")
)
@transform_df(
    Output("/path/to/output/dataset"),
    pokemon_list=Input("/path/to/input/dataset")
)
def compute(poke_source, pokemon_list, ctx):
    poke = poke_source.get_https_connection().get_client()
    poke_url = poke_source.get_https_connection().url

    api_secret = poke_source.get_secret("my_key")

It seems the above code doesn’t work well, given cryptography==3.3.2 is required to work with pgpy.

Here are a few more up to date examples.

In a code repository, in the middle of a pipeline:

import logging
import tempfile
import os
import gnupg
from pyspark.sql import DataFrame

# Install python-gnupg and gnupg
from pyspark.sql import functions as F
from transforms.api import Input, Output, transform


@transform(
    output_dataset=Output("ri.foundry.main.dataset.8e5dcc94-xxxx-4304-a2ea-4d710f0aeeb0"),
    files_dataset=Input("ri.foundry.main.dataset.88f00419-xxxx-4d7d-9674-0a61aff0a37a"),
    keys_dataset=Input("ri.foundry.main.dataset.2730292e-xxxx-4235-869c-889083dca058"),
)
def compute(files_dataset, keys_dataset, output_dataset):
    # Example: your private key and passphrase as strings
    passphrase = "testpass"  # Set to None if not needed

    # Set up a temporary GPG home directory for key import
    with tempfile.TemporaryDirectory() as gpg_home:
        # Set permissions for GPG home directory
        # os.chmod(gpg_home, 0o700)

        # Initialize GPG with verbose output
        gpg = gnupg.GPG(gnupghome=gpg_home, verbose=True)

        # Read and import the private key
        try:
            with keys_dataset.filesystem().open("private_key.asc", "rb") as f:
                private_key_string = f.read()

                # Log key information for debugging
                logging.info(f"Importing key of length: {len(private_key_string)} bytes")

                import_result = gpg.import_keys(private_key_string)

                # Trust the imported key explicitly
                if import_result.fingerprints:
                    for fingerprint in import_result.fingerprints:
                        gpg.trust_keys(fingerprint, "TRUST_ULTIMATE")

                logging.info(f"Import result: {import_result.results}")
                logging.info(f"Imported fingerprints: {import_result.fingerprints}")

                if not import_result.fingerprints:
                    raise Exception("Failed to import private key. Check key format and content.")

                # List all keys for debugging
                secret_keys = gpg.list_keys(True)
                public_keys = gpg.list_keys(False)
                logging.info(f"Secret keys: {secret_keys}")
                logging.info(f"Public keys: {public_keys}")

        except Exception as e:
            logging.error(f"Error importing key: {str(e)}")
            raise

        # Read and decrypt the encrypted file
        try:
            with files_dataset.filesystem().open("test.txt.gpg", "rb") as f:
                encrypted_data = f.read()

                # Log encrypted data info for debugging
                logging.info(f"Attempting to decrypt data of size: {len(encrypted_data)} bytes")

                # Try to identify the key needed for this file
                encrypted_info = gpg.list_keys(encrypted_data)
                logging.info(f"Encrypted file info: {encrypted_info}")

                # Attempt decryption with additional options
                decrypted_data = gpg.decrypt(
                    encrypted_data,
                    passphrase=passphrase,
                    always_trust=True,  # Try with always_trust option
                )

                if not decrypted_data.ok:
                    logging.error(f"Decryption status: {decrypted_data.status}")
                    logging.error(f"Decryption stderr: {decrypted_data.stderr}")
                    raise Exception(f"Decryption failed: {decrypted_data.status}")

                # Write the decrypted data to the output dataset
                with output_dataset.filesystem().open("new_file_decrypted.txt", "wb") as g:
                    g.write(decrypted_data.data)

                logging.info("Successfully decrypted and wrote file")

        except Exception as e:
            logging.error(f"Error in decryption process: {str(e)}")
            raise

Input dataset and the “keys” dataset


And the final file decrypted:

You can quickly test it by generating a PGP file, this way:

import gnupg
import os

# === Generate a sample file for encryption ===
file_to_encrypt = 'test.txt'  # Name of the file to create

sample_content = """\
Hello, this is a test file.
This file will be encrypted using PGP via the pgpy library.
Generated for demonstration purposes.
"""

with open(file_to_encrypt, 'w') as f:
    f.write(sample_content)

print(f"Sample file '{file_to_encrypt}' created.")

# Set up GnuPG home directory (for keyring isolation)
gpg_home = './gpg_home'
os.makedirs(gpg_home, exist_ok=True)
gpg = gnupg.GPG(gnupghome=gpg_home)

# Step 1: Generate a key pair
input_data = gpg.gen_key_input(
    name_email='your.email@example.com',
    name_real='Your Name',
    passphrase='testpass',  # For test/demo only!
    key_type='RSA',
    key_length=2048
)
key = gpg.gen_key(input_data)

# Step 2: Export and save the keys
private_keys = gpg.export_keys(key.fingerprint, True, passphrase='testpass')
public_keys = gpg.export_keys(key.fingerprint)

with open('private_key.asc', 'w') as f:
    f.write(private_keys)
with open('public_key.asc', 'w') as f:
    f.write(public_keys)

print("PGP key pair generated and saved to 'private_key.asc' and 'public_key.asc'")

# Step 3: Generate a sample file
file_to_encrypt = 'test.txt'
sample_content = """\
Hello, this is a test file.
This file will be encrypted using GnuPG via python-gnupg.
Generated for demonstration purposes.
"""
with open(file_to_encrypt, 'w') as f:
    f.write(sample_content)
print(f"Sample file '{file_to_encrypt}' created.")

# Step 4: Encrypt the file using the public key
with open(file_to_encrypt, 'rb') as f:
    status = gpg.encrypt_file(
        f,
        recipients=[key.fingerprint],
        output=file_to_encrypt + '.gpg'
    )

if status.ok:
    print(f"File '{file_to_encrypt}' encrypted and saved as '{file_to_encrypt}.gpg'")
else:
    print("Encryption failed:", status.status)

The alternative approach using a Source to store the credentials

from transforms.external.systems import external_systems, Source

@external_systems(
    example_rest_api_source_source=Source("ri.magritte..source.e0bb1168-xxxx-xxxx-a137-05bdb32c65f9")
)
@transform(
    output_dataset=Output("/path.../exampleOutputPGP_bis"),
    files_dataset=Input("ri.foundry.main.dataset.88f00419-xxxx-xxxx-9674-0a61aff0a37a")
)
def compute_external_transforms(example_rest_api_source_source, files_dataset, output_dataset):
    # Example: your private key and passphrase as strings
    passphrase = "testpass"  # Set to None if not needed

    # Set up a temporary GPG home directory for key import
    with tempfile.TemporaryDirectory() as gpg_home:
        # Set permissions for GPG home directory
        # os.chmod(gpg_home, 0o700)

        # Initialize GPG with verbose output
        gpg = gnupg.GPG(gnupghome=gpg_home, verbose=True)

        # Read and import the private key
        try:
            private_key_string = example_rest_api_source_source.get_secret("SecretPGPKey")

            # Log key information for debugging
            logging.info(f"Importing key of length: {len(private_key_string)} bytes")

            import_result = gpg.import_keys(private_key_string)


           # The rest stays the same

The source configuration with the secret

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.