Published Function has no access to Dataset

I have created a custom user defined function that grabs parameters from different Ontologies and create a pdf file.

I am able to load the pdf file onto a Dataset when i run the code in Local Preview but once i publish the function it throws the below error

:cross_mark: File upload failed: { “errorCode”: “PERMISSION_DENIED”, “errorInstanceId”: “209fe377-269b-4abb-8ae2-06b584823d8a”, “errorName”: “ApiUsageDenied”, “parameters”: { “missingScope”: “api:usage:datasets-write” } } :light_bulb: Dataset RID: ri.foundry.main.dataset.21273352-6f8f-4ef7-9242-0d2df31ebb90 :magnifying_glass_tilted_left: Check dataset permissions and API access

This inturn is affecting me to call the function on a workshop and trigger an event to load the data into a pdf and store them into a dataset.

Thanks in advance

Can you share the code of your function?

Hi @nicornk

Here is the custom function i am using and i am also attaching a snapshot of how the pdf is getting loaded onto a dataset locally but not when published

Code

from functions.api import function, String, Date, Float

import os

import datetime

import json

# Conditional import for ontology SDK with fallbacks

try:

from invoiceontology_sdk.ontology.objects import TimesheetProd, Consultant

from invoiceontology_sdk import FoundryClient

ONTOLOGY_SDK_AVAILABLE = True

except ImportError:

# Fallback when SDK is not available - use String type

TimesheetProd = String

FoundryClient = None

ONTOLOGY_SDK_AVAILABLE = False

from reportlab.lib.pagesizes import A4

from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle

from reportlab.lib.units import inch

from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle

from reportlab.lib import colors

from reportlab.lib.enums import TA_CENTER, TA_RIGHT, TA_LEFT

from io import BytesIO

import base64

@function

def generate_invoice_using_ontology(

timesheet_input: TimesheetProd,

starting_date: Date,

end_date: Date

) → String:

“”"

Generate timesheet PDF by taking TimesheetProd object and querying related ontology data.

This function takes a TimesheetProd object as input and generates a PDF invoice

by extracting the consultant information from the object and querying additional

data from the Palantir Foundry ontology as needed.

Args:

    timesheet_input: TimesheetProd object from Palantir ontology

    starting_date: Start date (Date type from Palantir functions API)

    end_date: End date (Date type from Palantir functions API)

Returns:

    Base64 encoded PDF string

"""

from datetime import datetime as dt, date, time, timedelta

# Check if ontology SDK is available

if not ONTOLOGY_SDK_AVAILABLE:

return “Error: Ontology SDK not available. Please ensure ontology SDK is installed.”

# Extract initial data from the provided TimesheetProd object

full_name = getattr(timesheet_input, 'full_name', None)

consultant_uid = getattr(timesheet_input, 'consultant_uid', None)

# Initialize variables

assigned_company = None

email = None

work_item = None

work_hours_per_day = None

rfx_number = None

submitting_person = None

approving_person = None

if not full_name:

return “Error: full_name is required in TimesheetProd object”

print(f"Processing timesheet for consultant: {full_name}")

# Now query the ontology to get complete consultant data

if ONTOLOGY_SDK_AVAILABLE:

try:

# Initialize Foundry client to query ontology

        client = FoundryClient()

# If we have consultant_uid from the input object, use it to find all related records

if consultant_uid:

print(f"Using consultant_uid from input: {consultant_uid}")

# Query all TimesheetProd records for this consultant_uid

try:

                all_consultant_timesheets = client.ontology.objects.TimesheetProd.limit(500)

except:

                all_consultant_timesheets = client.ontology.objects.TimesheetProd

            consultant_records_list = \[\]

for record in all_consultant_timesheets:

                record_uid = getattr(record, 'consultant_uid', None)

if record_uid and record_uid == consultant_uid:

                    consultant_records_list.append(record)

# Limit to reasonable number of records

if len(consultant_records_list) >= 50:

break

# Get the most complete record

if consultant_records_list:

                latest_record = consultant_records_list\[0\]

else:

                latest_record = timesheet_input

else:

# If no consultant_uid, search by full_name

print(f"No consultant_uid in input, searching by full_name: {full_name}")

try:

                all_timesheets = client.ontology.objects.TimesheetProd.limit(1000)

except Exception as query_error:

print(f"Error querying TimesheetProd: {query_error}")

                all_timesheets = client.ontology.objects.TimesheetProd

            matching_records = \[\]

            records_checked = 0

for timesheet in all_timesheets:

                records_checked += 1

                record_full_name = getattr(timesheet, 'full_name', None)

if record_full_name:

# Case-insensitive comparison

if record_full_name.strip().lower() == full_name.strip().lower():

                        matching_records.append(timesheet)

print(f"Found matching record with full_name: {record_full_name}")

break

if records_checked >= 1000:

print(f"Checked {records_checked} records, stopping search")

break

if matching_records:

                latest_record = matching_records\[0\]

                consultant_uid = getattr(latest_record, 'consultant_uid', None)

print(f"Found consultant_uid: {consultant_uid}")

else:

return f"Error: No consultant found with name ‘{full_name}’ in ontology"

# Extract all consultant data from the found record

        email = getattr(latest_record, 'email', None)

        work_item = getattr(latest_record, 'work_item', None)

        work_hours_per_day = getattr(latest_record, 'work_hour', None)

        rfx_number = getattr(latest_record, 'rfx_number', None)

        submitting_person = getattr(latest_record, 'submitting_person', None)

        approving_person = getattr(latest_record, 'approving_person', None)

        assigned_company = getattr(latest_record, 'assigned_company', None)

# If assigned_company still not found and we have email, try searching by email

if not assigned_company and email and consultant_records_list:

print(f"Searching for assigned_company using email: {email}")

for record in consultant_records_list:

                record_email = getattr(record, 'email', None)

if record_email and record_email.strip().lower() == email.strip().lower():

                    temp_company = getattr(record, 'assigned_company', None)

if temp_company:

                        assigned_company = temp_company

print(f"Found assigned_company: {assigned_company}")

break

# Query Consultant object to get hourly_rate

        hourly_rate = 150.0  # Default

if email:

print(f"Querying Consultant object for hourly_rate using email: {email}")

try:

                matching_consultants = client.ontology.objects.Consultant.where(Consultant.email == email)

                consultant = next(iter(matching_consultants), None)

if consultant:

                    consultant_hourly_rate = getattr(consultant, 'hourly_rate', None)

if consultant_hourly_rate:

                        hourly_rate = float(consultant_hourly_rate)

print(f"Found hourly_rate from Consultant: {hourly_rate}")

else:

print(f"Consultant found but hourly_rate missing, using default: {hourly_rate}")

else:

print(f"No Consultant found with email ‘{email}’, using default hourly_rate")

except Exception as consultant_error:

print(f"Error querying Consultant: {consultant_error}, using default hourly_rate")

# Convert work_hours to float if it exists

if work_hours_per_day:

            work_hours_per_day = float(work_hours_per_day)

print(f"Successfully retrieved all data for consultant: {full_name}")

print(f" - Consultant UID: {consultant_uid}")

print(f" - Assigned Company: {assigned_company}")

print(f" - Email: {email}")

print(f" - Work Item: {work_item}")

print(f" - Work Hours/Day: {work_hours_per_day}")

print(f" - RFX Number: {rfx_number}")

print(f" - Hourly Rate: {hourly_rate}")

except Exception as e:

print(f"Error querying ontology: {str(e)}")

return f"Error: Failed to query consultant data from ontology. {str(e)}"

# Set defaults for any missing fields

if assigned_company is None:

    assigned_company = "Company Not Found"

if email is None:

    email = f"{full_name.lower().replace(' ', '.')}@consultant.com"

if work_item is None:

    work_item = "Consulting Services"

if work_hours_per_day is None:

    work_hours_per_day = 8.0

if rfx_number is None:

    rfx_number = "N/A"

if submitting_person is None:

    submitting_person = full_name

if approving_person is None:

    approving_person = ""

# Parse dates and calculate working days

try:

# Convert date objects to datetime for calculations

if isinstance(starting_date, dt):

        start_dt = starting_date

elif isinstance(starting_date, date):

        start_dt = dt.combine(starting_date, time())

else:

        start_dt = dt.strptime(str(starting_date)\[:10\], '%Y-%m-%d')

if isinstance(end_date, dt):

        end_dt = end_date

elif isinstance(end_date, date):

        end_dt = dt.combine(end_date, time())

else:

        end_dt = dt.strptime(str(end_date)\[:10\], '%Y-%m-%d')

# Format period string

    period_str = f"{start_dt.strftime('%B %d, %Y')} to {end_dt.strftime('%B %d, %Y')}"

    invoice_date = end_dt.strftime('%Y-%m-%d')

# Calculate working days (excluding weekends)

    working_days = 0

    current_date = start_dt

while current_date <= end_dt:

if current_date.weekday() < 5: # Monday to Friday

            working_days += 1

        current_date += timedelta(days=1)

# Calculate total hours

    total_hours = working_days \* work_hours_per_day

except Exception as e:

print(f"Warning: Date parsing error: {str(e)}")

    period_str = f"{starting_date} to {end_date}"

    invoice_date = str(end_date) if end_date else dt.now().strftime('%Y-%m-%d')

    working_days = 20

    total_hours = working_days \* work_hours_per_day

# Calculate financial totals

hourly_rate_value = float(hourly_rate) if hourly_rate else 150.0

subtotal = total_hours \* hourly_rate_value

hst_rate = 0.13

hst_amount = subtotal \* hst_rate

total_amount = subtotal + hst_amount

# Generate invoice number based on full_name and end date

safe_name = full_name.replace(' ', '\_').replace('/', '\_').replace('.', '\_')

invoice_number = f"{safe_name}-{end_dt.strftime('%Y%m%d')}"

# Create PDF

buffer = BytesIO()

doc = SimpleDocTemplate(buffer, pagesize=A4, 

                      rightMargin=72, leftMargin=72,

                      topMargin=72, bottomMargin=18)

styles = getSampleStyleSheet()

# Custom styles

title_style = ParagraphStyle(

‘CustomTitle’,

    parent=styles\['Heading1'\],

    fontSize=24,

    spaceAfter=30,

    alignment=TA_CENTER,

    textColor=colors.darkblue

)

story = \[\]

# Title

story.append(Paragraph("TIMESHEET INVOICE", title_style))

story.append(Spacer(1, 20))

# Header information

from_style = ParagraphStyle(

‘FromStyle’,

    parent=styles\['Normal'\],

    fontSize=10,

    leading=12,

    alignment=TA_LEFT

)

to_style = ParagraphStyle(

‘ToStyle’,

    parent=styles\['Normal'\],

    fontSize=10,

    leading=12,

    alignment=TA_RIGHT

)

# Consultant and client info

from_header = Paragraph('<b>Consultant:</b>', from_style)

to_header = Paragraph('<b>Client:</b>', to_style)

consultant_info = f"{full_name}<br/>{email}"

if consultant_uid:

    consultant_info += f"<br/>ID: {consultant_uid}"

from_consultant = Paragraph(consultant_info, from_style)

to_client = Paragraph(f"{assigned_company}<br/>RFX: {rfx_number}", to_style)

info_data = \[

    \[from_header, to_header\],

    \[from_consultant, to_client\],

    \['', ''\],

    \[Paragraph(f'<b>Invoice #:</b> {invoice_number}', from_style), 

     Paragraph(f'<b>Invoice Date:</b> {invoice_date}', to_style)\],

    \[Paragraph(f'<b>Period:</b> {period_str}', from_style), ''\],

\]

if approving_person:

    info_data.append(\[

        Paragraph(f'<b>Submitted by:</b> {submitting_person}', from_style),

        Paragraph(f'<b>Approved by:</b> {approving_person}', to_style)

    \])

info_table = Table(info_data, colWidths=\[3.2\*inch, 2.8\*inch\])

info_table.setStyle(TableStyle(\[

    ('VALIGN', (0, 0), (-1, -1), 'TOP'),

    ('ALIGN', (0, 0), (0, -1), 'LEFT'),

    ('ALIGN', (1, 0), (1, -1), 'RIGHT'),

    ('TOPPADDING', (0, 0), (-1, -1), 6),

    ('BOTTOMPADDING', (0, 0), (-1, -1), 6),

\]))

story.append(info_table)

story.append(Spacer(1, 30))

# Work details table

items_data = \[\['Work Description', 'Details', 'Amount'\]\]

item_style = ParagraphStyle(

‘ItemStyle’,

    parent=styles\['Normal'\],

    fontSize=10,

    leading=12

)

# Create detailed breakdown

work_description = Paragraph(f"<b>{work_item}</b><br/>Period: {period_str}", item_style)

work_details = Paragraph(

    f"Working Days: {working_days}<br/>"

    f"Hours/Day: {work_hours_per_day:.1f}<br/>"

    f"Total Hours: {total_hours:.1f}<br/>"

    f"Rate: ${hourly_rate_value:.2f}/hr",

    item_style

)

items_data.append(\[

    work_description,

    work_details,

    f"${subtotal:,.2f}"

\])

items_table = Table(items_data, colWidths=\[2.5\*inch, 2\*inch, 1.5\*inch\])

items_table.setStyle(TableStyle(\[

    ('BACKGROUND', (0, 0), (-1, 0), colors.grey),

    ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),

    ('ALIGN', (0, 0), (-1, -1), 'LEFT'),

    ('ALIGN', (2, 0), (2, -1), 'RIGHT'),

    ('VALIGN', (0, 0), (-1, -1), 'TOP'),

    ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),

    ('FONTSIZE', (0, 0), (-1, 0), 11),

    ('BOTTOMPADDING', (0, 0), (-1, 0), 12),

    ('TOPPADDING', (0, 1), (-1, -1), 8),

    ('BOTTOMPADDING', (0, 1), (-1, -1), 8),

    ('BACKGROUND', (0, 1), (-1, -1), colors.beige),

    ('GRID', (0, 0), (-1, -1), 1, colors.black),

\]))

story.append(items_table)

story.append(Spacer(1, 20))

# Financial totals

totals_data = \[

    \['Subtotal:', f"${subtotal:,.2f}"\],

    \['HST (13%):', f"${hst_amount:,.2f}"\],

    \['Total Amount:', f"${total_amount:,.2f}"\]

\]

totals_table = Table(totals_data, colWidths=\[4\*inch, 2\*inch\])

totals_table.setStyle(TableStyle(\[

    ('ALIGN', (0, 0), (-1, -1), 'RIGHT'),

    ('FONTNAME', (0, -1), (-1, -1), 'Helvetica-Bold'),

    ('FONTSIZE', (0, -1), (-1, -1), 14),

    ('BACKGROUND', (0, -1), (-1, -1), colors.lightgrey),

    ('LINEBELOW', (0, 0), (-1, 0), 1, colors.black),

    ('LINEBELOW', (0, -2), (-1, -2), 1, colors.black),

\]))

story.append(totals_table)

# Build PDF

doc.build(story)

# Get PDF data and encode to base64

pdf_data = buffer.getvalue()

buffer.close()

return base64.b64encode(pdf_data).decode(‘utf-8’)

@function

def save_pdf_to_dataset(

timesheet_obj: TimesheetProd,

starting_date: Date,

end_date: Date,

dataset_rid: String

) → String:

“”"

Generate timesheet PDF and save to Foundry Dataset for persistent storage.

This function generates a timesheet PDF using generate_invoice_by_timesheetprod_ontology

and saves it to a specified Foundry dataset for persistent storage and access.

The filename is automatically generated using email-invoice_number-currenttime format.

Args:

    timesheet_obj: TimesheetProd object from Palantir ontology

    starting_date: Start date (Date type from Palantir functions API)

    end_date: End date (Date type from Palantir functions API)

    dataset_rid: RID of the target Foundry dataset where PDFs will be stored

Returns:

    Success message with dataset location and file details

"""

from datetime import datetime as dt, date, time

# Check if ontology SDK is available

if not ONTOLOGY_SDK_AVAILABLE:

return “Error: Ontology SDK not available. Please ensure ontology SDK is generated and installed.”

# Extract data from the TimesheetProd object for file naming

full_name = getattr(timesheet_obj, 'full_name', 'Unknown')

email = getattr(timesheet_obj, 'email', None)

# Generate invoice number (same logic as in the PDF generation)

# Convert dates for invoice number generation

if isinstance(end_date, dt):

    end_dt = end_date

elif isinstance(end_date, date):

    end_dt = dt.combine(end_date, time())

else:

    end_dt = dt.strptime(str(end_date)\[:10\], '%Y-%m-%d')

safe_name = full_name.replace(' ', '\_').replace('/', '\_').replace('.', '\_')

invoice_number = f"{safe_name}-{end_dt.strftime('%Y%m%d')}"

# Create safe email for filename

if email:

    safe_email = email.replace('@', '\_at\_').replace('.', '\_')

else:

    safe_email = f"{safe_name}\_email"

# Get current timestamp

current_time = dt.now().strftime('%Y%m%d\_%H%M%S')

try:

# Generate PDF data using the timesheet ontology function

    base64_data = generate_invoice_using_ontology(timesheet_obj, starting_date, end_date)

# Check if the function returned an error message instead of base64 data

if base64_data.startswith(“Error:”):

return f":cross_mark: Failed to generate timesheet: {base64_data}"

# Decode base64 to PDF binary data

    pdf_data = base64.b64decode(base64_data)

# Create filename using email-invoice_number-currenttime format

    pdf_filename = f"{safe_email}-{invoice_number}-{current_time}.pdf"

# Connect to Foundry using the correct API structure

    client = FoundryClient()

# Access the foundry SDK through foundry_sdk attribute

    foundry_client = client.foundry_sdk

# Upload PDF to dataset using the correct API structure

# DatasetClient.File is the FileClient for file operations

    dataset_client = foundry_client.datasets.Dataset

    file_client = dataset_client.File

try:

# Upload PDF file using FileClient with correct parameters

        file_client.upload(

            dataset_rid,  # First positional argument

            f"timesheets/{pdf_filename}",  # File path as second argument

            body=pdf_data  # File content as 'body' parameter

        )

return f":white_check_mark: Timesheet PDF saved to dataset successfully!\n​:file_folder: Dataset: {dataset_rid}\n​:page_facing_up: File: timesheets/{pdf_filename}\n​:bar_chart: Size: {len(pdf_data)} bytes\n​:bust_in_silhouette: Consultant: {full_name}\n​:date: Period: {starting_date} to {end_date}"

except Exception as upload_error:

return f":cross_mark: File upload failed: {str(upload_error)}\n​:light_bulb: Dataset RID: {dataset_rid}\n​:magnifying_glass_tilted_left: Check dataset permissions and API access"

except ImportError as e:

return f":cross_mark: Error: Missing required SDK - {str(e)}\nPlease ensure ontology SDK is properly installed."

except AttributeError as e:

return f":cross_mark: Error: Dataset access issue - {str(e)}\nPlease verify dataset RID and permissions."

except Exception as e:

return f":cross_mark: Error saving timesheet PDF to dataset: {str(e)}\nDataset RID: {dataset_rid}\nConsultant: {full_name}"
**
Local Preview Code Output**

Thanks in advance !

Please properly Format the code using the code block, otherwise it’s impossible to read.

let me ask you a question: where should the function have the credentials (authorization) from to upload to your dataset?

You are not passing them in, in any way.