How to query Audit logs v3 via API in a lightweight transform?

Audit logs v3 exposes endpoints to query those logs. I would like to have more flexibility about the frequency, parsing, and handling to loading those logs inside Foundry. How can I query those Audit v3 logs via API ? Is there any example ?

Here is an example of lighweight transform, incremental, that would load logs from the API Endpoints made available.

A few notes:

  • The secrets shouldn’t be hardcoded in the transform, but most likely stored in a source. For simplicity and readability, this is here hardcoded. Knowing that audit logs are sensitive, this transform should anyway be restricted to only the few required users.
  • The state is stored in the “cursor” output dataset between runs, to continue where the logs reading was left off.
  • The exact numbers (Memory, DOWNLOAD_THREADS, CHUNK_SIZE, …) can and should be tweaked to your current audit log volume you wish to process. This also depends of the frequency at which you want to run this transform
  • The transform can use a hardcoded token, or a client_id/secret that is created by a Developer Console app, and so which uses a service user in the background. This requires 1/ to allow the service user to “audit read permissions” 2/ to grant it permission in Control Panel, potentially via a customer role

Developer console restriction where the developer console app’s service user is authorized to use audit endpoints

Custom role in Control Panel, using “Create datasets with audit logs for the organization”

"""
Audit V3 log export transform.

Fetches audit.3 logs from the Foundry audit API and writes them as structured
rows to an output dataset.  Designed to run as a lightweight + incremental
transform so that each build only fetches new log files since the last run.

The approach mirrors the built-in:
  1. List available log files via GET /api/v2/audit/organizations/{orgId}/logFiles
  2. Download each file's content via GET .../logFiles/{logFileId}/content
  3. Parse the gzip-compressed, newline-delimited JSON (NDJSON) content
  4. Write the parsed rows to a Foundry dataset

State management:
  A second output dataset ("cursor") is snapshot-overwritten on every build.
  It stores the startDate and nextPageToken from the listLogFiles API so that
  subsequent builds resume exactly where the previous one stopped, without
  re-listing or re-downloading files.

Prerequisites:
  - The token used to run this transform must have the `audit-export:view`
    operation on the target organization (granted via Control Panel).
  - Replace ORGANIZATION_RID below with the target organization RID.

Authentication priority (first match wins):
  1. CLIENT_ID + CLIENT_SECRET  → OAuth2 client-credentials grant → short-lived token
  2. HARDCODED_TOKEN            → static bearer token you paste in
  3. ctx.auth_header             → the transform's own runtime token (limited scope)
"""

import gzip
import json
import logging
import pathlib
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple

import polars as pl
import requests
from transforms.api import (
    IncrementalLightweightInput,
    IncrementalLightweightOutput,
    LightweightContext,
    LightweightOutput,
    Output,
    StringParam,
    incremental,
    lightweight,
    transform,
)

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
# Replace with your organization RID
DEFAULT_ORG_RID = "ri.multipass..organization.25639231-1234-5678-9000-833910bc0727"

# Replace with your Foundry stack hostname (e.g. "mycompany.palantirfoundry.com")
FOUNDRY_STACK_DOMAIN = "REDACTED.com"

# ---------------------------------------------------------------------------
# Authentication – configure ONE of the following approaches:
#
# Option 1 (preferred): OAuth2 client-credentials via a third-party application
#   Create a third-party app in Developer Console with the `api:audit-read`
#   scope, then paste the client_id and client_secret here.  The transform will
#   exchange them for a short-lived access token at the start of every buimenld.
CLIENT_ID: Optional[str] = "REDACTED"  # e.g. "0a1b2c3d4e5f..."
CLIENT_SECRET: Optional[str] = "REDACTED"  # e.g. "0123456..."
#
# Option 2: A static bearer token (e.g. a long-lived user/service-user token)
#   Generate a token from Account → Settings → Tokens, or from the service user.
HARDCODED_TOKEN: Optional[str] = None  # e.g. "eyJhbGciOi..."
#
# Option 3 (automatic fallback): The transform's own runtime token
#   This is ctx.auth_header.  It has limited scope and will only work if the
#   build-triggering user/service-user has audit-export:view on the org.
# ---------------------------------------------------------------------------

# How many days to look back on the very first (snapshot) run
INITIAL_LOOKBACK_DAYS = 7

# Max number of files to process per build
MAX_FILES_PER_JOB = 10_000

# Parallelism for downloading log file content
DOWNLOAD_THREADS = 32

# Page size when listing log files
PAGE_SIZE = 500


# ---------------------------------------------------------------------------
# Auth helpers
# ---------------------------------------------------------------------------
def _get_foundry_base_url() -> str:
    return f"https://{FOUNDRY_STACK_DOMAIN}"


def _obtain_token_via_client_credentials(base_url: str) -> str:
    """
    Exchange CLIENT_ID / CLIENT_SECRET for a short-lived access token using
    the OAuth2 client-credentials grant.

    POST /multipass/api/oauth2/token
    Content-Type: application/x-www-form-urlencoded


    """
    url = f"{base_url}/multipass/api/oauth2/token"
    resp = requests.post(
        url,
        data={
            "grant_type": "client_credentials",
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
        },
        headers={"Content-Type": "application/x-www-form-urlencoded"},
        timeout=30,
    )
    resp.raise_for_status()
    access_token = resp.json()["access_token"]
    logger.info("Obtained access token via client-credentials grant.")
    return access_token


def _resolve_auth_header(base_url: str, ctx: LightweightContext) -> str:
    """
    Resolve the Authorization header to use, with the following priority:
      1. CLIENT_ID + CLIENT_SECRET  → OAuth2 client-credentials token
      2. HARDCODED_TOKEN            → static bearer token
      3. ctx.auth_header             → transform runtime token (fallback)
    """
    if CLIENT_ID and CLIENT_SECRET:
        logger.info("Using OAuth2 client-credentials (client_id=%s).", CLIENT_ID)
        token = _obtain_token_via_client_credentials(base_url)
        return f"Bearer {token}"

    if HARDCODED_TOKEN:
        logger.info("Using hardcoded bearer token.")
        return f"Bearer {HARDCODED_TOKEN}"

    logger.info("Falling back to transform runtime auth header (ctx.auth_header).")
    return ctx.auth_header


# ---------------------------------------------------------------------------
# Cursor helpers
# ---------------------------------------------------------------------------
def _read_cursor(cursor_output: IncrementalLightweightOutput) -> Optional[Dict[str, str]]:
    """
    Read the cursor from the previous build.
    Returns a dict with keys: start_date, next_page_token, end_date
    or None if no cursor exists (first run).
    """
    try:
        df = cursor_output.polars(mode="previous")
        if df is not None and df.height > 0:
            row = df.row(0, named=True)
            logger.info("Read cursor from previous build: %s", row)
            return row
    except Exception:
        logger.info("No previous cursor found (first run or reset).")
    return None


def _write_cursor(
    cursor_output: LightweightOutput,
    start_date: str,
    end_date: str,
    next_page_token: Optional[str],
    last_run_utc: str,
) -> None:
    """
    Snapshot-write the cursor state for the next build.
    """
    cursor_df = pl.DataFrame(
        {
            "start_date": [start_date],
            "end_date": [end_date],
            "next_page_token": [next_page_token if next_page_token else ""],
            "last_run_utc": [last_run_utc],
        },
        schema=_cursor_schema(),
    )
    logger.info(
        "Writing cursor: start_date=%s, end_date=%s, has_next_page=%s",
        start_date,
        end_date,
        bool(next_page_token),
    )
    cursor_output.set_mode("replace")
    cursor_output.write_table(cursor_df)


def _cursor_schema() -> dict:
    return {
        "start_date": pl.Utf8,
        "end_date": pl.Utf8,
        "next_page_token": pl.Utf8,
        "last_run_utc": pl.Utf8,
    }


# ---------------------------------------------------------------------------
# Audit API helpers
# ---------------------------------------------------------------------------
def _list_log_files(
    base_url: str,
    auth_header: str,
    org_rid: str,
    start_date: str,
    end_date: Optional[str] = None,
    resume_page_token: Optional[str] = None,
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
    """
    Page through available log files for the given date range.
    Optionally resumes from a previous page token.

    Returns:
        (files, next_page_token) — the list of files fetched this run, and
        the page token to resume from on the *next* build (or None if exhausted).
    """
    url = f"{base_url}/api/v2/audit/organizations/{org_rid}/logFiles"
    all_files: List[Dict[str, Any]] = []

    page_token: Optional[str] = resume_page_token
    page_num = 0
    next_page_token_out: Optional[str] = None

    if resume_page_token:
        logger.info("Resuming file listing from saved page token.")
    logger.info("Listing log files from %s (startDate=%s, endDate=%s) ...", url, start_date, end_date)

    while True:
        params: Dict[str, Any] = {
            "startDate": start_date,
            "pageSize": PAGE_SIZE,
        }
        if end_date:
            params["endDate"] = end_date
        if page_token:
            params["pageToken"] = page_token

        page_num += 1
        logger.info("Fetching page %d (collected %d files so far) ...", page_num, len(all_files))

        resp = requests.get(
            url,
            headers={"Authorization": auth_header},
            params=params,
            timeout=120,
        )
        logger.info(
            "Page %d response: status=%d, content-length=%s",
            page_num,
            resp.status_code,
            resp.headers.get("content-length", "?"),
        )
        resp.raise_for_status()
        body = resp.json()

        files = body.get("data", [])
        all_files.extend(files)
        page_token = body.get("nextPageToken")
        logger.info("Page %d returned %d files (total now: %d).", page_num, len(files), len(all_files))

        # The API can return a nextPageToken even when the page has 0 files,
        # which would cause an infinite loop.  Stop as soon as we get an
        # empty page — there is no more data.
        if len(files) == 0:
            logger.info("Received empty page — no more data available. Stopping pagination.")
            next_page_token_out = None
            break

        if len(all_files) >= MAX_FILES_PER_JOB:
            logger.info("Reached max files per job (%d), stopping pagination.", MAX_FILES_PER_JOB)
            next_page_token_out = page_token
            break

        if not page_token:
            # Exhausted all pages — no token to save
            next_page_token_out = None
            break

    logger.info(
        "Listed %d audit log files in %d pages (more_pages=%s).",
        len(all_files),
        page_num,
        bool(next_page_token_out),
    )
    return all_files, next_page_token_out


def _get_log_file_content(
    base_url: str,
    auth_header: str,
    org_rid: str,
    log_file_id: str,
) -> bytes:
    """Download the raw content of a single log file."""
    url = f"{base_url}/api/v2/audit/organizations/{org_rid}/logFiles/{log_file_id}/content"
    resp = requests.get(
        url,
        headers={"Authorization": auth_header},
        timeout=300,
    )
    resp.raise_for_status()
    return resp.content


# ---------------------------------------------------------------------------
# Parsing
# ---------------------------------------------------------------------------
def _parse_log_content(raw_content: bytes) -> List[Dict[str, Any]]:
    """
    Parse raw log file content.

    The content is gzip-compressed NDJSON (one JSON object per line).
    If it is not gzip, fall back to plain text.
    """
    try:
        text = gzip.decompress(raw_content).decode("utf-8")
    except gzip.BadGzipFile:
        text = raw_content.decode("utf-8")

    rows: List[Dict[str, Any]] = []
    for line in text.strip().splitlines():
        line = line.strip()
        if not line:
            continue
        try:
            rows.append(json.loads(line))
        except json.JSONDecodeError:
            logger.warning("Skipping malformed JSON line: %s...", line[:120])
    return rows


def _normalize_rows(raw_rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Flatten each raw audit.3 JSON object into a flat dict that maps to the
    audit.3 schema columns.  Complex fields (categories, entities, origins,
    users, requestFields, resultFields) are serialised as JSON strings so the
    output dataset is fully tabular.
    """
    normalised: List[Dict[str, Any]] = []
    for row in raw_rows:
        normalised.append(
            {
                "logEntryId": row.get("logEntryId"),
                "eventId": row.get("eventId"),
                "name": row.get("name", ""),
                "time": row.get("time"),
                "categories": json.dumps(sorted(row["categories"])) if row.get("categories") else "[]",
                "entities": json.dumps(row.get("entities", [])),
                "environment": row.get("environment"),
                "host": row.get("host", ""),
                "orgId": row.get("orgId"),
                "origin": row.get("origin"),
                "origins": json.dumps(row.get("origins", [])),
                "product": row.get("product", ""),
                "producerType": row.get("producerType", ""),
                "productVersion": row.get("productVersion", ""),
                "requestFields": json.dumps(row.get("requestFields", {})),
                "result": row.get("result", ""),
                "resultFields": json.dumps(row.get("resultFields", {})),
                "sequenceId": row.get("sequenceId"),
                "service": row.get("service"),
                "sid": row.get("sid"),
                "sourceOrigin": row.get("sourceOrigin"),
                "stack": row.get("stack"),
                "tokenId": row.get("tokenId"),
                "traceId": row.get("traceId"),
                "uid": row.get("uid"),
                "userAgent": row.get("userAgent"),
                "users": json.dumps(row.get("users", [])),
            }
        )
    return normalised


# ---------------------------------------------------------------------------
# Transform
# ---------------------------------------------------------------------------
@lightweight(cpu_cores=2, memory_gb=8)
@incremental(semantic_version=4, strict_append=True)
@transform(
    output=Output("/path/to/audit_v3_logs"),
    cursor=Output("/path/to/audit_v3_cursor"),
    organization_rid=StringParam(default=DEFAULT_ORG_RID),
)
def audit_v3_export(
    output: IncrementalLightweightOutput,
    cursor: IncrementalLightweightOutput,
    ctx: LightweightContext,
    organization_rid: str,
) -> None:
    base_url = _get_foundry_base_url()
    auth_header = _resolve_auth_header(base_url, ctx)
    # StringParam is injected as a ParamValueInput wrapper; unwrap with .value
    org_rid = organization_rid.value if hasattr(organization_rid, "value") else organization_rid

    now = datetime.now(timezone.utc)
    now_str = now.strftime("%Y-%m-%dT%H:%M:%SZ")

    # ------------------------------------------------------------------

    # Read cursor from the previous build to decide where to resume.
    # ------------------------------------------------------------------

    prev_cursor = _read_cursor(cursor) if ctx.is_incremental else None

    if prev_cursor and prev_cursor.get("next_page_token"):
        # Previous build hit MAX_FILES_PER_JOB — continue from where it
        # left off with the same date range.
        start_date = prev_cursor["start_date"]
        end_date = prev_cursor["end_date"]
        resume_token = prev_cursor["next_page_token"]
        logger.info(
            "Resuming from previous cursor: start_date=%s, end_date=%s, has_token=True",
            start_date,
            end_date,
        )
    else:
        # First run (snapshot) — look back INITIAL_LOOKBACK_DAYS.
        start_date = (now - timedelta(days=INITIAL_LOOKBACK_DAYS)).strftime("%Y-%m-%d")
        end_date = now.strftime("%Y-%m-%d")
        resume_token = None
        logger.info(
            "First run: fetching from %s to %s.",
            start_date,
            end_date,
        )

    # 1. List available log files (resuming if we have a page token)
    log_files, next_page_token = _list_log_files(
        base_url, auth_header, org_rid, start_date, end_date, resume_page_token=resume_token
    )

    if not log_files:
        logger.info("No log files returned.")
        if ctx.is_incremental:
            logger.info("No new data — aborting to keep previous transaction.")
            ctx.abort_job()
            return
        # First (snapshot) run with no data — write empty dataset + cursor
        _write_cursor(cursor, start_date, end_date, next_page_token=None, last_run_utc=now_str)
        write_dir = pathlib.Path(output.path_for_write_table)
        write_dir.mkdir(parents=True, exist_ok=True)
        pl.DataFrame(schema=_output_schema()).write_parquet(write_dir / "empty.parquet")
        output.write_table(write_dir)
        return

    # 2. Download, parse, and write in chunks to limit peak memory.
    #    Each chunk processes CHUNK_SIZE files, writes them out, then
    #    releases the memory before moving on to the next chunk.
    CHUNK_SIZE = 1000
    total_rows_written = 0
    total_files = len(log_files)

    def _download_and_parse(log_file: Dict[str, Any]) -> List[Dict[str, Any]]:
        file_id = log_file["id"]
        raw = _get_log_file_content(base_url, auth_header, org_rid, file_id)
        parsed = _parse_log_content(raw)
        del raw
        return parsed

    # Prepare a local directory for writing parquet chunks
    write_dir = pathlib.Path(output.path_for_write_table)
    write_dir.mkdir(parents=True, exist_ok=True)

    logger.info(
        "Downloading and parsing %d log files in chunks of %d with %d threads ...",
        total_files,
        CHUNK_SIZE,
        DOWNLOAD_THREADS,
    )

    for chunk_start in range(0, total_files, CHUNK_SIZE):
        chunk = log_files[chunk_start : chunk_start + CHUNK_SIZE]
        chunk_num = chunk_start // CHUNK_SIZE + 1
        chunk_rows: List[Dict[str, Any]] = []

        logger.info(
            "Chunk %d: processing files %d–%d of %d ...",
            chunk_num,
            chunk_start + 1,
            chunk_start + len(chunk),
            total_files,
        )

        with ThreadPoolExecutor(max_workers=DOWNLOAD_THREADS) as executor:
            futures = {executor.submit(_download_and_parse, lf): lf for lf in chunk}
            for future in as_completed(futures):
                lf = futures[future]
                try:
                    rows = future.result()
                    chunk_rows.extend(rows)
                    del rows
                except Exception:
                    logger.exception("Failed to download/parse log file %s", lf.get("id"))

        if chunk_rows:
            normalised = _normalize_rows(chunk_rows)
            del chunk_rows
            df = pl.DataFrame(normalised, schema=_output_schema())
            del normalised
            # Write each chunk as a separate parquet file in the write dir
            df.write_parquet(write_dir / f"chunk_{chunk_num:04d}.parquet")
            total_rows_written += df.height
            logger.info(
                "Chunk %d: wrote %d rows (running total: %d).",
                chunk_num,
                df.height,
                total_rows_written,
            )
            del df
        else:
            logger.info("Chunk %d: all files were empty.", chunk_num)
            del chunk_rows

    logger.info("Finished: wrote %d total rows from %d files.", total_rows_written, total_files)

    # 3. Commit all parquet chunks to the output dataset in one call.
    #    write_table(path) uploads all parquet files and infers the schema.
    if total_rows_written > 0:
        output.write_table(write_dir)
    else:
        logger.info("No rows written across all chunks.")
        if ctx.is_incremental:
            ctx.abort_job()
            return
        pl.DataFrame(schema=_output_schema()).write_parquet(write_dir / "empty.parquet")
        output.write_table(write_dir)

    # 4. Write cursor — must happen even if no rows were written so the
    #    next build doesn't re-list the same files.
    _write_cursor(cursor, start_date, end_date, next_page_token, last_run_utc=now_str)


def _output_schema() -> dict:
    """Return the Polars schema for the output dataset."""
    return {
        "logEntryId": pl.Utf8,
        "eventId": pl.Utf8,
        "name": pl.Utf8,
        "time": pl.Utf8,
        "categories": pl.Utf8,
        "entities": pl.Utf8,
        "environment": pl.Utf8,
        "host": pl.Utf8,
        "orgId": pl.Utf8,
        "origin": pl.Utf8,
        "origins": pl.Utf8,
        "product": pl.Utf8,
        "producerType": pl.Utf8,
        "productVersion": pl.Utf8,
        "requestFields": pl.Utf8,
        "result": pl.Utf8,
        "resultFields": pl.Utf8,
        "sequenceId": pl.Utf8,
        "service": pl.Utf8,
        "sid": pl.Utf8,
        "sourceOrigin": pl.Utf8,
        "stack": pl.Utf8,
        "tokenId": pl.Utf8,
        "traceId": pl.Utf8,
        "uid": pl.Utf8,
        "userAgent": pl.Utf8,
        "users": pl.Utf8,
    }

Hi @VincentF ,

this is a great solution.

Two questions / comments from a professional audit log wrangler :wink:

  • “Create datasets with audit logs for the organization” is different from “retrieving audit log files from API” → Is this one scope that manages both? If yes, could you ask the team to update the text in CP. If not, is “Create datasets with audit logs for the organization” really required?
  • I would recommend that you directly HIVE partition the parquet files by year/month/date to allow efficient time based queries. At least on our stack, if we don’t do that, queries are more or less impossible as the logs are so big every takes extremely long and gets very, very expensive.
    • I don’t think there is a first class way with write_table to write hive partitioned polars dataframes, so you would have to write to a local temp directory first (path_for_write_table) and than upload. As an alternative you could use duckdb to write the polars dataframe.
  • Did you by any chance try writing to an iceberg table? Iceberg supports partition evolution as well as compaction maintenance, which might be a neat thing in the future.
    • At the moment the best supported iceberg writer is pyspark, which means it will not be as efficient as polars. For this simple use case pyiceberg should also do it though.

Cheers, Nicolas

1 Like