I need help in setting up a gmail connection into palantir

Hello,

I’ll try to give you a tutorial for the whole thing. The exact error you encounter depends on how you’ve set things up on the Google and Foundry side, hence it might be more efficient to give you a tutorial and see where it diverges from there.

Tutorial

You want to load emails from a Google Mail inbox.
You will need to have access to this Google Mail inbox, and to have a GCP administrator account with sufficient permissions to access it in a programmatic way. Namely, you will need a education or corporate GCP account with admin access, @gmail.com domain will not work (see Google Ref: https://support.google.com/a/answer/182076?sjid=5770074006582230181-EU).

Setup the Google side

We will authenticate with Google/GCP via the OAuth2 protocol, described there: https://developers.google.com/identity/protocols/oauth2
We will follow the “Service accounts” case.

Steps

  1. Create a service account by following the documentation here: https://cloud.google.com/iam/docs/service-accounts-create
    a. Create a new project or select an existing one.


    b. Navigate to IAM & Admin > Service Accounts and click “Create Service Account”

    c. Fill in the necessary details and click Create.
    d. Assign the Gmail API role to the service account.
    e. Create a new key in JSON format and download it.

  2. Grant enough permissions to the service account by delegating Domain-wide authority to the service account
    :warning: Note: This is not available for personal GCP Account, you need a company or educational account. :warning:
    a. Go to IAM & Admin > Service Accounts.
    b. Click on your service account.
    c. Click Edit and then Show Domain-Wide Delegation.
    d. Enable G Suite Domain-wide Delegation and save.
    e. Go to your G Suite admin console and navigate to Security > API Controls > Domain-wide Delegation.
    f. Add a new API client and enter the Client ID from your service account along with the required scopes (e.g., https://www.googleapis.com/auth/gmail.readonly).

You now have a service account you can connect to, to fetch emails from the inboxes it is allowed to access.

Setup the Foundry side

You will need to create a Code Repository to host an External Transforms: Some logic that will connect and perform API calls to the Google APIs.

  1. In Foundry, you can create an External Transform
  2. Create a “source” in the left side-bar.
  3. Create and approve egresses (=authorization to have outbound traffic) to:
    a. oauth2.googleapis.com on port 443
    b. accounts.google.com on port 443
    c. gmail.googleapis.com on port 443
  4. You need to add those libraries, import them in the left side panel:
    a. google-auth
    b. google-api-python-client
  5. Create a credential with the content of the JSON File you downloaded earlier, containing the different secrets required.

You are now ready to use the egresses, credentials and libraries to perform API calls to the Google APIs.

Write the transform/logic

We will now write the body of the transform, which will perform the actual API calls to the Google APIs.

Notes:

  • As a matter of optimization, the below code uses the “KUBERNETES_NO_EXECUTORS” spark profile, that you can import in the settings of the code repository, to avoid requesting Spark executors which we won’t use there.
  • The code is incremental, which means that the transform will record the start timestamp before it queried for the new emails, to filter the query of the next run of the transform.
  • The Google’s libraries require a json file path to be provided, which contains the secrets to use. Given we stored the secret in the “credentials” store of Foundry, there is no such file path to pass. The solution here, is to write the secret (from the credentials store) in a file, at runtime, and pass this file path to the library. That’s the role of “write_secrets_as_json_file” below function.

Here is an example of the transforms:

from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql import types as T
from transforms.api import Input, Output, transform
from myproject.datasets import utils
import os
import google.auth
from google.auth.transport.requests import Request
from google.oauth2 import service_account
from googleapiclient.discovery import build
from transforms.external.systems import external_systems, Source
import json

@external_systems(
    google_connection_source=Source("ri.magritte..source.2623a48b-xxxx-4207-b6a4-27b2269abcdd")
)
@transform(
    output_dataset=Output(
        "/PATH/Google Mail Connection/example_dataset"
    )
)
def compute(ctx, output_dataset, google_connection_source):
    
    '''
    The Google’s libraries require a json file path to be provided, which contains the secrets to use. 
    Given we stored the secret in the “credentials” store of Foundry, there is no such file path to pass. 
    The solution here, is to write the secret (from the credentials store) in a file, at runtime, and 
    pass this file path to the library. 
    '''
    def write_secrets_as_json_file():
        # This assumes that the content of the json was put into a secret named "JsonSecret"
        secretJson = google_connection_source.get_secret("JsonSecret")

        # Decode the string to remove the escape characters
        decoded_string = secretJson.encode().decode('unicode_escape').strip('"')

        # Load the JSON data from the decoded string
        json_data = json.loads(decoded_string)

        # Write the JSON data to a file with proper formatting
        with open('service-account-file.json', 'w') as json_file:
            json.dump(json_data, json_file, indent=4)

    write_secrets_as_json_file()

    # Replace with the path to your service account key file
    SERVICE_ACCOUNT_FILE = 'service-account-file.json' # This file is dynamically created by write_secrets_as_json_file()
    USER_TO_IMPERSONATE = 'google-service-account@project-xxxx.iam.gserviceaccount.com'

    # Define the required scope
    SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

    # Authenticate and create the service
    credentials = service_account.Credentials.from_service_account_file(
        SERVICE_ACCOUNT_FILE, scopes=SCOPES)

    # Delegate domain-wide authority to the service account
    credentials = credentials.with_subject(USER_TO_IMPERSONATE)

    # Build the Gmail service
    service = build('gmail', 'v1', credentials=credentials)

    # Use the connection to do stuff
    results = service.users().messages().list(userId='me').execute()
    messages = results.get('messages', [])
    
    # Function to list and get emails
    def list_and_get_emails(service, user_id='me'):
        emails = []

        try:
            # List all messages
            response = service.users().messages().list(userId=user_id).execute()
            messages = response.get('messages', [])

            if not messages:
                print('No messages found.')
            else:
                print('Messages:')
                for message in messages:
                    msg = service.users().messages().get(userId=user_id, id=message['id']).execute()
                    print(f"Message snippet: {msg['snippet']}")
                    emails.append(message)

            return emails
        except Exception as e:
            print(f'An error occurred: {e}')

    # List and get emails
    emails = list_and_get_emails(service)
    
    '''
    Function that takes an array of JSON/dict and save it as a dataframe.
    The schema is inferred dynamically.
    '''
    def write_dict_as_dataframe(ctx, list_of_dicts_to_save, output_dataset):
        # Convert dictionary to Spark DataFrame
        # Infer schema with all fields as StringType
        fields = [T.StructField(k, T.StringType(), True) for k in list_of_dicts_to_save[0].keys()]
        schema = T.StructType(fields)
        df = ctx.spark_session.createDataFrame(list_of_dicts_to_save, schema)
        output_dataset.write_dataframe(df)

    write_dict_as_dataframe(ctx, emails, output_dataset)

Note on improvements:

  • In case the API changes and does not return some fields, column deletion would need to be handled, for example by consolidating the current output’s schema with the current schema inferred from the JSON responses. Like: my_output.dataframe().schema consolidated with what is inferred from write_dict_as_dataframe
  • If no email are fetched, the output dataset’s schema won’t be inferred which might lead to a failure at write time. It is good practice to cancel the build and abort() the transaction on the outputs if no data is to be written. This can be implemented with something like if future_output.count() == 0: future_output.abort() else: future_output.write_dataframe(). Because of the incremental context, all outputs needs to be aborted, or none. Aborting half of the output only will generate failures.

Hope that helps,
Thanks,

4 Likes