MS Exchange Online to Foundry

Is it possible to ingest from a MS Exchange Online to Foundry via standard connector ?

In short: You can use External Transforms on Foundry but you need an application registered on Microsoft side, with relevant accesses.


Let’s assume you have a mailbox or a shared mailbox on Exchange.

In Microsoft Entra, you will need to create an application. You can follow this tutorial: https://learn.microsoft.com/en-us/graph/auth-v2-service?tabs=http

As of July 2024, here are the steps I followed:
In Microsoft Entra, access “Identity”, then “App registrations” and perform a new registration, which will create a new application.

You can then create a new secret. Here I used a client secret but other options are available.

Note: Client secret have a limited lifespan, so you will need to regularly refresh this token, manually. At maximum, it can be of 1-2 years, same for certificates.

You will now need to grant permissions to this application:

Here I’ve choosen application permissions and selected this option:

Such configuration will give access the application to all mailboxes and will require Admin consent to activate. You will be able to limit the scope of the mailbox the application can access by using ApplicationAccessPolicy on Microsoft side. I won’t cover the details of this part in this post.
See this documentation: https://learn.microsoft.com/en-us/graph/auth-limit-mailbox-access

With this application, on the “Overview” page, you will have access to:

  • A client_id
  • A client_secret
  • A tenant_id

In Foundry, you can create an External Transform, create egresses to:

  • login.microsoftonline.com on port 443
  • graph.microsoft.com on port 443

Create a credential with your client_id, client_secret and potentially your tenant_id.

As a matter of optimization, the below code uses the “KUBERNETES_NO_EXECUTORS” spark profile that you can import in the settings of the code repository, to avoid requesting Spark executors which we won’t use there.

The code is incremental, which means that the transform will record the start timestamp before it queried for the new emails, to filter the query of the next run of the transform.

from datetime import datetime
import json
import logging
import mimetypes
import shutil
import uuid

import msal
import pyspark.sql.functions as F
import pyspark.sql.types as T
import requests
import pytz
from transforms.api import Output, configure, incremental, transform
from transforms.external.systems import Credential, EgressPolicy, use_external_systems


@incremental(semantic_version=1)
@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@use_external_systems(
    egress_login=EgressPolicy("ri.resource-policy-manager.global.network-egress-policy.x"),
    egress_graph=EgressPolicy("ri.resource-policy-manager.global.network-egress-policy. x"),
    creds=Credential("ri.credential..credential.x")
)
@transform(
    output_emails=Output("/path/df_emails_incremental"),
    output_attachment=Output("/path/files_attachments_incremental"),
    output_list_attachment=Output("/path/df_attachments_incremental")
)
def compute(ctx, egress_login, egress_graph, output_emails, output_attachment, output_list_attachment, creds):
    # Your Azure credentials -  Azure Service Principal
    client_id = creds.get("client_id")
    client_secret = creds.get("client_secret")
    tenant_id = "<YOUR_TENANT_ID>"

    # Configuration
    AUTHORITY = f"https://login.microsoftonline.com/{tenant_id}"
    SCOPE = ["https://graph.microsoft.com/.default"]
    SHARED_MAILBOX = "<YOUR_SHARED_INBOX_EMAIL>"

    # Incremental variables
    # Prepend with "_" to consider it a hidden file and not show it in the dataset preview.
    state_filename = "_state.json"

    # Initialize the MSAL ConfidentialClientApplication
    app = msal.ConfidentialClientApplication(client_id=client_id, client_credential=client_secret, authority=AUTHORITY)

    def get_current_timestamp():
        # Get the current time in UTC
        current_time = datetime.now(pytz.utc)
        # Convert it to ISO 8601 format, remove microseconds, and add 'Z'
        iso_format_time = current_time.replace(microsecond=0).isoformat()
        return iso_format_time

    def get_token(app):
        result = app.acquire_token_for_client(scopes=SCOPE)
        if "access_token" in result:
            access_token = result["access_token"]
            logging.info("Access token acquired.")
            return access_token
        else:
            logging.info(f'Error acquiring token: {result.get("error")}, {result.get("error_description")}')
            raise Exception("Failed to fetch the main token")

    def get_emails_from_inbox(access_token, email_inbox, since_timestamp=None):
        # Make an API call to get emails from the shared mailbox
        mailbox_url = f"https://graph.microsoft.com/v1.0/users/{email_inbox}/messages"
        headers = {"Authorization": f"Bearer {access_token}", "Accept": "application/json"}
        query_params = {}

        if since_timestamp is not None:
            query_params["$filter"] = f"receivedDateTime ge {since_timestamp}"

        emails = []
        while mailbox_url:
            response = requests.get(mailbox_url, headers=headers, params=query_params)
            if response.status_code == 200:
                response_data = response.json()
                emails.extend(response_data.get("value", []))
                mailbox_url = response_data.get("@odata.nextLink")
            else:
                logging.error(f"Error fetching emails: {response.status_code} - {response.text}")
                raise Exception(f"Failed to fetch emails: {response.status_code} - {response.text}")

            query_params = {}  # Reset query params for subsequent requests

        logging.info(f"Emails fetched successfully = {len(emails)}")
        return emails

    def get_attachments(access_token, email_inbox, email_id):
        headers = {"Authorization": f"Bearer {access_token}", "Accept": "application/json"}
        attachments_url = f"https://graph.microsoft.com/v1.0/users/{SHARED_MAILBOX}/messages/{email_id}/attachments"
        attachments_response = requests.get(attachments_url, headers=headers)
        if attachments_response.status_code == 200:
            attachments = attachments_response.json().get("value", [])
            return attachments
        else:
            logging.error(
                f"Error fetching attachments: {attachments_response.status_code} - {attachments_response.text}"
            )
            raise Exception("Failed to fetch attachments")

    def generate_random_filename(content_type):
        # Generate a new random filename while conserving the mimetype
        # Extract the file extension from the MIME type
        try:
            file_extension = mimetypes.guess_extension(content_type)
        except Exception as e:
            logging.error(f"Error while finding extension of attachment: {e}")
            file_extension = None

        if file_extension is None:
            # If the MIME type is unknown, default to a generic extension
            file_extension = ".bin"

        # Generate a random UUID
        generated_uuid = uuid.uuid4()

        # Create the file name by combining the UUID and the file extension
        filename = f"{generated_uuid}{file_extension}"

        return filename

    def save_attachement_to_dataset(access_token, attachment, output_dataset):
        headers = {"Authorization": f"Bearer {access_token}", "Accept": "application/json"}
        if "@odata.mediaContentType" in attachment:
            # Get the attachment id and download it
            attachment_id = attachment["id"]
            download_url = f"https://graph.microsoft.com/v1.0/users/{SHARED_MAILBOX}/messages/{email_id}/attachments/{attachment_id}/$value"
            download_response = requests.get(download_url, headers=headers)

            # If successfully downloaded, save it in the output dataset
            if download_response.status_code == 200:
                # Handle the attachment content
                attachment_content = download_response.content

                # Generate a random filename which respect the type received (best effort)
                attachment_mime_type = attachment["contentType"]
                file_name = generate_random_filename(attachment_mime_type)

                # Save the attachment using output_attachment.filesystem().open()
                with output_attachment.filesystem().open(file_name, "wb") as f:
                    f.write(attachment_content)
                return file_name
            logging.info(f"Attachment {attachment['name']} saved successfully.")
        else:
            logging.info(f"Error downloading attachment: {download_response.status_code} - {download_response.text}")
            raise Exception("Failed to download attachment")

    def write_dict_as_dataframe(ctx, list_of_dicts_to_save, output_dataset):
        # Convert dictionary to Spark DataFrame
        # Infer schema with all fields as StringType
        fields = [T.StructField(k, T.StringType(), True) for k in list_of_dicts_to_save[0].keys()]
        schema = T.StructType(fields)
        df = ctx.spark_session.createDataFrame(list_of_dicts_to_save, schema)
        output_dataset.write_dataframe(df)

    def get_incremental_state(output_dataset_for_state):
        # Try to fetch the state from the output dataset:
        try:
            with output_dataset_for_state.filesystem().open(state_filename, mode="r") as state_file:
                data = json.load(state_file)
                # Validate the fetched state:
                state = data
                logging.info(f"state file found, continuing from : {data}")
                return state["last_seen"]
        except Exception as e:
            logging.warn(f"state file not found, starting over from default state: {e}")
            return None

    def generate_state():
        current_timestamp = get_current_timestamp()
        state = {"last_seen": current_timestamp}  # Some arbitrary starting state
        return state

    def save_new_state(output_dataset, state):
        out_fs = output_dataset.filesystem()
        # Save the new state on the output dataset:
        with out_fs.open(state_filename, "w") as state_file:
            json.dump(state, state_file)
            logging.info("New state saved")

    # Variables to store the future dataframes
    attachments_flat = []
    # Some variables to handle the incrementality, see https://www.palantir.com/docs/foundry/data-integration/external-transforms/
    last_seen_timestamp = get_incremental_state(output_emails)  # Will be None or the last timestamp
    new_state = generate_state()

    # Acquire token
    access_token = get_token(app)

    # Get the emails
    emails = get_emails_from_inbox(access_token, SHARED_MAILBOX, last_seen_timestamp)

    # Iterate over each email
    for email in emails:
        logging.info(f"Processing - Subject: {email['subject']}")

        # Fetch attachments for the current email
        email_id = email["id"]
        attachments = get_attachments(access_token, SHARED_MAILBOX, email_id)

        # Handle each attachment
        for attachment in attachments:
            logging.info(f"Attachment: {attachment['name']}")
            file_name = save_attachement_to_dataset(access_token, attachment, output_attachment)
            # Add the new name so that we can pivot to it later
            attachment["new_file_name"] = file_name

        # Add a foreign keys from attachments to emails
        for attachment in attachments:
            attachment["email_id"] = email_id

        # Append to the main list of attachments
        attachments_flat.extend(attachments)

    # Save to output datasets the flat table for emails and attachments list
    save_new_state(output_emails, new_state)
    write_dict_as_dataframe(ctx, attachments_flat, output_list_attachment)
    write_dict_as_dataframe(ctx, emails, output_emails)

Note on improvements:

  • In case the API changes and does not return some fields, column deletion would need to be handled, for example by consolidating the current output’s schema with the current schema inferred from the JSON responses. Like: my_output.dataframe().schema consolidated with what is infered from write_dict_as_dataframe
  • If no email are fetched, the output dataset’s schema won’t be inferred which might lead to a failure at write time. It is good practice to cancel the build and abort() the transaction on the outputs if no data is to be written. This can be implemented with something like if future_output.count() == 0: future_output.abort() else: future_output.write_dataframe(). Because of the incremental context, all outputs needs to be aborted, or none. Aborting half of the output only will generate failures.

EDIT, bis: Feedback received

  • It seems the MS API is actually paging emails 10 by 10. So the above code would need a tweak, to fetch more, go through the pages, or to get the max timestamp from the latest email ingested instead of the current timestamp ==> this has been fixed in the above code, where paging is now used to fetch all emails available
  • If emails have attachments that are emails, it seems an error can be thrown, this would need special handling (and unpacking of the email attachments) ==> The generate filename function should be more resilient
4 Likes