I’m ingesting files that are PGP encrypted (e.g. files that were stored encrypted on a FTP server).
How can I decrypt those files once ingested in Foundry ?
I’m ingesting files that are PGP encrypted (e.g. files that were stored encrypted on a FTP server).
How can I decrypt those files once ingested in Foundry ?
You have different options:
Explanation for [1]
You can use file-based transformers directly in data-connection:
You will need to configure the PGP keys on the agent, and provide the path to those PGP keys (the specific place on the agent machine where the private (or secret) keys are stored)
You can run commands like:
gpg --list-keys
gpg --list-secret-keys
and use the path to the keyring: /path/to/your/custom/keyring/directory/
(not sure if folder or the file directly)
Code snippet for [2]
You can ingest the files encrypted and you can decrypt your files in a transform, by doing something like below.
This is inspired from transforms of unstructured files, and using PGP libs.
@transform(
out=Output(""),
source_df=Input("")
)
def compute(source_df, out):
key, _ = pgpy.PGPKey.from_blob(pgp_key_string)
for f_name in source_df.filesystem().ls():
with source_df.filesystem().open(f_name.path, 'rb') as f, out.filesystem().open('new_file', 'wb') as g:
enc_file_stream = pgpy.PGPMessage.from_blob(f.read())
plaintext = key.decrypt(enc_file_stream).message
g.write(plaintext)
Explanation for [3]
You will need to write an external transform and use a library that handles connection to FTPs.
Hi @VincentF,
I tried to use second option but i have struck on pgp_key_string, i got .asc file for my case.
You can store the key as a string in a secret in the Data Connection Source.
https://www.palantir.com/docs/foundry/data-connection/core-concepts#credentials
The relevant example:
@external_systems(
poke_source=Source("ri.magritte..source.e301d738-b532-431a-8bda-fa211228bba6")
)
@transform_df(
Output("/path/to/output/dataset"),
pokemon_list=Input("/path/to/input/dataset")
)
def compute(poke_source, pokemon_list, ctx):
poke = poke_source.get_https_connection().get_client()
poke_url = poke_source.get_https_connection().url
api_secret = poke_source.get_secret("my_key")
It seems the above code doesn’t work well, given cryptography==3.3.2 is required to work with pgpy.
Here are a few more up to date examples.
In a code repository, in the middle of a pipeline:
import logging
import tempfile
import os
import gnupg
from pyspark.sql import DataFrame
# Install python-gnupg and gnupg
from pyspark.sql import functions as F
from transforms.api import Input, Output, transform
@transform(
output_dataset=Output("ri.foundry.main.dataset.8e5dcc94-xxxx-4304-a2ea-4d710f0aeeb0"),
files_dataset=Input("ri.foundry.main.dataset.88f00419-xxxx-4d7d-9674-0a61aff0a37a"),
keys_dataset=Input("ri.foundry.main.dataset.2730292e-xxxx-4235-869c-889083dca058"),
)
def compute(files_dataset, keys_dataset, output_dataset):
# Example: your private key and passphrase as strings
passphrase = "testpass" # Set to None if not needed
# Set up a temporary GPG home directory for key import
with tempfile.TemporaryDirectory() as gpg_home:
# Set permissions for GPG home directory
# os.chmod(gpg_home, 0o700)
# Initialize GPG with verbose output
gpg = gnupg.GPG(gnupghome=gpg_home, verbose=True)
# Read and import the private key
try:
with keys_dataset.filesystem().open("private_key.asc", "rb") as f:
private_key_string = f.read()
# Log key information for debugging
logging.info(f"Importing key of length: {len(private_key_string)} bytes")
import_result = gpg.import_keys(private_key_string)
# Trust the imported key explicitly
if import_result.fingerprints:
for fingerprint in import_result.fingerprints:
gpg.trust_keys(fingerprint, "TRUST_ULTIMATE")
logging.info(f"Import result: {import_result.results}")
logging.info(f"Imported fingerprints: {import_result.fingerprints}")
if not import_result.fingerprints:
raise Exception("Failed to import private key. Check key format and content.")
# List all keys for debugging
secret_keys = gpg.list_keys(True)
public_keys = gpg.list_keys(False)
logging.info(f"Secret keys: {secret_keys}")
logging.info(f"Public keys: {public_keys}")
except Exception as e:
logging.error(f"Error importing key: {str(e)}")
raise
# Read and decrypt the encrypted file
try:
with files_dataset.filesystem().open("test.txt.gpg", "rb") as f:
encrypted_data = f.read()
# Log encrypted data info for debugging
logging.info(f"Attempting to decrypt data of size: {len(encrypted_data)} bytes")
# Try to identify the key needed for this file
encrypted_info = gpg.list_keys(encrypted_data)
logging.info(f"Encrypted file info: {encrypted_info}")
# Attempt decryption with additional options
decrypted_data = gpg.decrypt(
encrypted_data,
passphrase=passphrase,
always_trust=True, # Try with always_trust option
)
if not decrypted_data.ok:
logging.error(f"Decryption status: {decrypted_data.status}")
logging.error(f"Decryption stderr: {decrypted_data.stderr}")
raise Exception(f"Decryption failed: {decrypted_data.status}")
# Write the decrypted data to the output dataset
with output_dataset.filesystem().open("new_file_decrypted.txt", "wb") as g:
g.write(decrypted_data.data)
logging.info("Successfully decrypted and wrote file")
except Exception as e:
logging.error(f"Error in decryption process: {str(e)}")
raise
Input dataset and the “keys” dataset
And the final file decrypted:
You can quickly test it by generating a PGP file, this way:
import gnupg
import os
# === Generate a sample file for encryption ===
file_to_encrypt = 'test.txt' # Name of the file to create
sample_content = """\
Hello, this is a test file.
This file will be encrypted using PGP via the pgpy library.
Generated for demonstration purposes.
"""
with open(file_to_encrypt, 'w') as f:
f.write(sample_content)
print(f"Sample file '{file_to_encrypt}' created.")
# Set up GnuPG home directory (for keyring isolation)
gpg_home = './gpg_home'
os.makedirs(gpg_home, exist_ok=True)
gpg = gnupg.GPG(gnupghome=gpg_home)
# Step 1: Generate a key pair
input_data = gpg.gen_key_input(
name_email='your.email@example.com',
name_real='Your Name',
passphrase='testpass', # For test/demo only!
key_type='RSA',
key_length=2048
)
key = gpg.gen_key(input_data)
# Step 2: Export and save the keys
private_keys = gpg.export_keys(key.fingerprint, True, passphrase='testpass')
public_keys = gpg.export_keys(key.fingerprint)
with open('private_key.asc', 'w') as f:
f.write(private_keys)
with open('public_key.asc', 'w') as f:
f.write(public_keys)
print("PGP key pair generated and saved to 'private_key.asc' and 'public_key.asc'")
# Step 3: Generate a sample file
file_to_encrypt = 'test.txt'
sample_content = """\
Hello, this is a test file.
This file will be encrypted using GnuPG via python-gnupg.
Generated for demonstration purposes.
"""
with open(file_to_encrypt, 'w') as f:
f.write(sample_content)
print(f"Sample file '{file_to_encrypt}' created.")
# Step 4: Encrypt the file using the public key
with open(file_to_encrypt, 'rb') as f:
status = gpg.encrypt_file(
f,
recipients=[key.fingerprint],
output=file_to_encrypt + '.gpg'
)
if status.ok:
print(f"File '{file_to_encrypt}' encrypted and saved as '{file_to_encrypt}.gpg'")
else:
print("Encryption failed:", status.status)
The alternative approach using a Source to store the credentials
from transforms.external.systems import external_systems, Source
@external_systems(
example_rest_api_source_source=Source("ri.magritte..source.e0bb1168-xxxx-xxxx-a137-05bdb32c65f9")
)
@transform(
output_dataset=Output("/path.../exampleOutputPGP_bis"),
files_dataset=Input("ri.foundry.main.dataset.88f00419-xxxx-xxxx-9674-0a61aff0a37a")
)
def compute_external_transforms(example_rest_api_source_source, files_dataset, output_dataset):
# Example: your private key and passphrase as strings
passphrase = "testpass" # Set to None if not needed
# Set up a temporary GPG home directory for key import
with tempfile.TemporaryDirectory() as gpg_home:
# Set permissions for GPG home directory
# os.chmod(gpg_home, 0o700)
# Initialize GPG with verbose output
gpg = gnupg.GPG(gnupghome=gpg_home, verbose=True)
# Read and import the private key
try:
private_key_string = example_rest_api_source_source.get_secret("SecretPGPKey")
# Log key information for debugging
logging.info(f"Importing key of length: {len(private_key_string)} bytes")
import_result = gpg.import_keys(private_key_string)
# The rest stays the same
The source configuration with the secret
This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.