In short: You can use External Transforms on Foundry but you need an application registered on Microsoft side, with relevant accesses.
Let’s assume you have a mailbox or a shared mailbox on Exchange.
In Microsoft Entra, you will need to create an application. You can follow this tutorial: https://learn.microsoft.com/en-us/graph/auth-v2-service?tabs=http
As of July 2024, here are the steps I followed:
In Microsoft Entra, access “Identity”, then “App registrations” and perform a new registration, which will create a new application.
You can then create a new secret. Here I used a client secret but other options are available.
Note: Client secret have a limited lifespan, so you will need to regularly refresh this token, manually. At maximum, it can be of 1-2 years, same for certificates.
You will now need to grant permissions to this application:
Here I’ve choosen application permissions and selected this option:
Such configuration will give access the application to all mailboxes and will require Admin consent to activate. You will be able to limit the scope of the mailbox the application can access by using ApplicationAccessPolicy on Microsoft side. I won’t cover the details of this part in this post.
See this documentation: https://learn.microsoft.com/en-us/graph/auth-limit-mailbox-access
With this application, on the “Overview” page, you will have access to:
- A client_id
- A client_secret
- A tenant_id
In Foundry, you can create an External Transform, create egresses to:
- login.microsoftonline.com on port 443
- graph.microsoft.com on port 443
Create a credential with your client_id, client_secret and potentially your tenant_id.
As a matter of optimization, the below code uses the “KUBERNETES_NO_EXECUTORS” spark profile that you can import in the settings of the code repository, to avoid requesting Spark executors which we won’t use there.
The code is incremental, which means that the transform will record the start timestamp before it queried for the new emails, to filter the query of the next run of the transform.
from datetime import datetime
import json
import logging
import mimetypes
import shutil
import uuid
import msal
import pyspark.sql.functions as F
import pyspark.sql.types as T
import requests
import pytz
from transforms.api import Output, configure, incremental, transform
from transforms.external.systems import Credential, EgressPolicy, use_external_systems
@incremental(semantic_version=1)
@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@use_external_systems(
egress_login=EgressPolicy("ri.resource-policy-manager.global.network-egress-policy.x"),
egress_graph=EgressPolicy("ri.resource-policy-manager.global.network-egress-policy. x"),
creds=Credential("ri.credential..credential.x")
)
@transform(
output_emails=Output("/path/df_emails_incremental"),
output_attachment=Output("/path/files_attachments_incremental"),
output_list_attachment=Output("/path/df_attachments_incremental")
)
def compute(ctx, egress_login, egress_graph, output_emails, output_attachment, output_list_attachment, creds):
# Your Azure credentials - Azure Service Principal
client_id = creds.get("client_id")
client_secret = creds.get("client_secret")
tenant_id = "<YOUR_TENANT_ID>"
# Configuration
AUTHORITY = f"https://login.microsoftonline.com/{tenant_id}"
SCOPE = ["https://graph.microsoft.com/.default"]
SHARED_MAILBOX = "<YOUR_SHARED_INBOX_EMAIL>"
# Incremental variables
# Prepend with "_" to consider it a hidden file and not show it in the dataset preview.
state_filename = "_state.json"
# Initialize the MSAL ConfidentialClientApplication
app = msal.ConfidentialClientApplication(client_id=client_id, client_credential=client_secret, authority=AUTHORITY)
def get_current_timestamp():
# Get the current time in UTC
current_time = datetime.now(pytz.utc)
# Convert it to ISO 8601 format, remove microseconds, and add 'Z'
iso_format_time = current_time.replace(microsecond=0).isoformat()
return iso_format_time
def get_token(app):
result = app.acquire_token_for_client(scopes=SCOPE)
if "access_token" in result:
access_token = result["access_token"]
logging.info("Access token acquired.")
return access_token
else:
logging.info(f'Error acquiring token: {result.get("error")}, {result.get("error_description")}')
raise Exception("Failed to fetch the main token")
def get_emails_from_inbox(access_token, email_inbox, since_timestamp=None):
# Make an API call to get emails from the shared mailbox
mailbox_url = f"https://graph.microsoft.com/v1.0/users/{email_inbox}/messages"
headers = {"Authorization": f"Bearer {access_token}", "Accept": "application/json"}
query_params = {}
if since_timestamp is not None:
query_params["$filter"] = f"receivedDateTime ge {since_timestamp}"
emails = []
while mailbox_url:
response = requests.get(mailbox_url, headers=headers, params=query_params)
if response.status_code == 200:
response_data = response.json()
emails.extend(response_data.get("value", []))
mailbox_url = response_data.get("@odata.nextLink")
else:
logging.error(f"Error fetching emails: {response.status_code} - {response.text}")
raise Exception(f"Failed to fetch emails: {response.status_code} - {response.text}")
query_params = {} # Reset query params for subsequent requests
logging.info(f"Emails fetched successfully = {len(emails)}")
return emails
def get_attachments(access_token, email_inbox, email_id):
headers = {"Authorization": f"Bearer {access_token}", "Accept": "application/json"}
attachments_url = f"https://graph.microsoft.com/v1.0/users/{SHARED_MAILBOX}/messages/{email_id}/attachments"
attachments_response = requests.get(attachments_url, headers=headers)
if attachments_response.status_code == 200:
attachments = attachments_response.json().get("value", [])
return attachments
else:
logging.error(
f"Error fetching attachments: {attachments_response.status_code} - {attachments_response.text}"
)
raise Exception("Failed to fetch attachments")
def generate_random_filename(content_type):
# Generate a new random filename while conserving the mimetype
# Extract the file extension from the MIME type
try:
file_extension = mimetypes.guess_extension(content_type)
except Exception as e:
logging.error(f"Error while finding extension of attachment: {e}")
file_extension = None
if file_extension is None:
# If the MIME type is unknown, default to a generic extension
file_extension = ".bin"
# Generate a random UUID
generated_uuid = uuid.uuid4()
# Create the file name by combining the UUID and the file extension
filename = f"{generated_uuid}{file_extension}"
return filename
def save_attachement_to_dataset(access_token, attachment, output_dataset):
headers = {"Authorization": f"Bearer {access_token}", "Accept": "application/json"}
if "@odata.mediaContentType" in attachment:
# Get the attachment id and download it
attachment_id = attachment["id"]
download_url = f"https://graph.microsoft.com/v1.0/users/{SHARED_MAILBOX}/messages/{email_id}/attachments/{attachment_id}/$value"
download_response = requests.get(download_url, headers=headers)
# If successfully downloaded, save it in the output dataset
if download_response.status_code == 200:
# Handle the attachment content
attachment_content = download_response.content
# Generate a random filename which respect the type received (best effort)
attachment_mime_type = attachment["contentType"]
file_name = generate_random_filename(attachment_mime_type)
# Save the attachment using output_attachment.filesystem().open()
with output_attachment.filesystem().open(file_name, "wb") as f:
f.write(attachment_content)
return file_name
logging.info(f"Attachment {attachment['name']} saved successfully.")
else:
logging.info(f"Error downloading attachment: {download_response.status_code} - {download_response.text}")
raise Exception("Failed to download attachment")
def write_dict_as_dataframe(ctx, list_of_dicts_to_save, output_dataset):
# Convert dictionary to Spark DataFrame
# Infer schema with all fields as StringType
fields = [T.StructField(k, T.StringType(), True) for k in list_of_dicts_to_save[0].keys()]
schema = T.StructType(fields)
df = ctx.spark_session.createDataFrame(list_of_dicts_to_save, schema)
output_dataset.write_dataframe(df)
def get_incremental_state(output_dataset_for_state):
# Try to fetch the state from the output dataset:
try:
with output_dataset_for_state.filesystem().open(state_filename, mode="r") as state_file:
data = json.load(state_file)
# Validate the fetched state:
state = data
logging.info(f"state file found, continuing from : {data}")
return state["last_seen"]
except Exception as e:
logging.warn(f"state file not found, starting over from default state: {e}")
return None
def generate_state():
current_timestamp = get_current_timestamp()
state = {"last_seen": current_timestamp} # Some arbitrary starting state
return state
def save_new_state(output_dataset, state):
out_fs = output_dataset.filesystem()
# Save the new state on the output dataset:
with out_fs.open(state_filename, "w") as state_file:
json.dump(state, state_file)
logging.info("New state saved")
# Variables to store the future dataframes
attachments_flat = []
# Some variables to handle the incrementality, see https://www.palantir.com/docs/foundry/data-integration/external-transforms/
last_seen_timestamp = get_incremental_state(output_emails) # Will be None or the last timestamp
new_state = generate_state()
# Acquire token
access_token = get_token(app)
# Get the emails
emails = get_emails_from_inbox(access_token, SHARED_MAILBOX, last_seen_timestamp)
# Iterate over each email
for email in emails:
logging.info(f"Processing - Subject: {email['subject']}")
# Fetch attachments for the current email
email_id = email["id"]
attachments = get_attachments(access_token, SHARED_MAILBOX, email_id)
# Handle each attachment
for attachment in attachments:
logging.info(f"Attachment: {attachment['name']}")
file_name = save_attachement_to_dataset(access_token, attachment, output_attachment)
# Add the new name so that we can pivot to it later
attachment["new_file_name"] = file_name
# Add a foreign keys from attachments to emails
for attachment in attachments:
attachment["email_id"] = email_id
# Append to the main list of attachments
attachments_flat.extend(attachments)
# Save to output datasets the flat table for emails and attachments list
save_new_state(output_emails, new_state)
write_dict_as_dataframe(ctx, attachments_flat, output_list_attachment)
write_dict_as_dataframe(ctx, emails, output_emails)
Note on improvements:
- In case the API changes and does not return some fields, column deletion would need to be handled, for example by consolidating the current output’s schema with the current schema inferred from the JSON responses. Like:
my_output.dataframe().schema
consolidated with what is infered fromwrite_dict_as_dataframe
- If no email are fetched, the output dataset’s schema won’t be inferred which might lead to a failure at write time. It is good practice to cancel the build and
abort()
the transaction on the outputs if no data is to be written. This can be implemented with something likeif future_output.count() == 0: future_output.abort() else: future_output.write_dataframe()
. Because of the incremental context, all outputs needs to be aborted, or none. Aborting half of the output only will generate failures.
EDIT, bis: Feedback received
- It seems the MS API is actually paging emails 10 by 10. So the above code would need a tweak, to fetch more, go through the pages, or to get the max timestamp from the latest email ingested instead of the current timestamp ==> this has been fixed in the above code, where paging is now used to fetch all emails available
- If emails have attachments that are emails, it seems an error can be thrown, this would need special handling (and unpacking of the email attachments) ==> The generate filename function should be more resilient