Hi @nicornk
Here is the custom function i am using and i am also attaching a snapshot of how the pdf is getting loaded onto a dataset locally but not when published
Code
from functions.api import function, String, Date, Float
import os
import datetime
import json
# Conditional import for ontology SDK with fallbacks
try:
from invoiceontology_sdk.ontology.objects import TimesheetProd, Consultant
from invoiceontology_sdk import FoundryClient
ONTOLOGY_SDK_AVAILABLE = True
except ImportError:
# Fallback when SDK is not available - use String type
TimesheetProd = String
FoundryClient = None
ONTOLOGY_SDK_AVAILABLE = False
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle
from reportlab.lib import colors
from reportlab.lib.enums import TA_CENTER, TA_RIGHT, TA_LEFT
from io import BytesIO
import base64
@function
def generate_invoice_using_ontology(
timesheet_input: TimesheetProd,
starting_date: Date,
end_date: Date
) → String:
“”"
Generate timesheet PDF by taking TimesheetProd object and querying related ontology data.
This function takes a TimesheetProd object as input and generates a PDF invoice
by extracting the consultant information from the object and querying additional
data from the Palantir Foundry ontology as needed.
Args:
timesheet_input: TimesheetProd object from Palantir ontology
starting_date: Start date (Date type from Palantir functions API)
end_date: End date (Date type from Palantir functions API)
Returns:
Base64 encoded PDF string
"""
from datetime import datetime as dt, date, time, timedelta
# Check if ontology SDK is available
if not ONTOLOGY_SDK_AVAILABLE:
return “Error: Ontology SDK not available. Please ensure ontology SDK is installed.”
# Extract initial data from the provided TimesheetProd object
full_name = getattr(timesheet_input, 'full_name', None)
consultant_uid = getattr(timesheet_input, 'consultant_uid', None)
# Initialize variables
assigned_company = None
email = None
work_item = None
work_hours_per_day = None
rfx_number = None
submitting_person = None
approving_person = None
if not full_name:
return “Error: full_name is required in TimesheetProd object”
print(f"Processing timesheet for consultant: {full_name}")
# Now query the ontology to get complete consultant data
if ONTOLOGY_SDK_AVAILABLE:
try:
# Initialize Foundry client to query ontology
client = FoundryClient()
# If we have consultant_uid from the input object, use it to find all related records
if consultant_uid:
print(f"Using consultant_uid from input: {consultant_uid}")
# Query all TimesheetProd records for this consultant_uid
try:
all_consultant_timesheets = client.ontology.objects.TimesheetProd.limit(500)
except:
all_consultant_timesheets = client.ontology.objects.TimesheetProd
consultant_records_list = \[\]
for record in all_consultant_timesheets:
record_uid = getattr(record, 'consultant_uid', None)
if record_uid and record_uid == consultant_uid:
consultant_records_list.append(record)
# Limit to reasonable number of records
if len(consultant_records_list) >= 50:
break
# Get the most complete record
if consultant_records_list:
latest_record = consultant_records_list\[0\]
else:
latest_record = timesheet_input
else:
# If no consultant_uid, search by full_name
print(f"No consultant_uid in input, searching by full_name: {full_name}")
try:
all_timesheets = client.ontology.objects.TimesheetProd.limit(1000)
except Exception as query_error:
print(f"Error querying TimesheetProd: {query_error}")
all_timesheets = client.ontology.objects.TimesheetProd
matching_records = \[\]
records_checked = 0
for timesheet in all_timesheets:
records_checked += 1
record_full_name = getattr(timesheet, 'full_name', None)
if record_full_name:
# Case-insensitive comparison
if record_full_name.strip().lower() == full_name.strip().lower():
matching_records.append(timesheet)
print(f"Found matching record with full_name: {record_full_name}")
break
if records_checked >= 1000:
print(f"Checked {records_checked} records, stopping search")
break
if matching_records:
latest_record = matching_records\[0\]
consultant_uid = getattr(latest_record, 'consultant_uid', None)
print(f"Found consultant_uid: {consultant_uid}")
else:
return f"Error: No consultant found with name ‘{full_name}’ in ontology"
# Extract all consultant data from the found record
email = getattr(latest_record, 'email', None)
work_item = getattr(latest_record, 'work_item', None)
work_hours_per_day = getattr(latest_record, 'work_hour', None)
rfx_number = getattr(latest_record, 'rfx_number', None)
submitting_person = getattr(latest_record, 'submitting_person', None)
approving_person = getattr(latest_record, 'approving_person', None)
assigned_company = getattr(latest_record, 'assigned_company', None)
# If assigned_company still not found and we have email, try searching by email
if not assigned_company and email and consultant_records_list:
print(f"Searching for assigned_company using email: {email}")
for record in consultant_records_list:
record_email = getattr(record, 'email', None)
if record_email and record_email.strip().lower() == email.strip().lower():
temp_company = getattr(record, 'assigned_company', None)
if temp_company:
assigned_company = temp_company
print(f"Found assigned_company: {assigned_company}")
break
# Query Consultant object to get hourly_rate
hourly_rate = 150.0 # Default
if email:
print(f"Querying Consultant object for hourly_rate using email: {email}")
try:
matching_consultants = client.ontology.objects.Consultant.where(Consultant.email == email)
consultant = next(iter(matching_consultants), None)
if consultant:
consultant_hourly_rate = getattr(consultant, 'hourly_rate', None)
if consultant_hourly_rate:
hourly_rate = float(consultant_hourly_rate)
print(f"Found hourly_rate from Consultant: {hourly_rate}")
else:
print(f"Consultant found but hourly_rate missing, using default: {hourly_rate}")
else:
print(f"No Consultant found with email ‘{email}’, using default hourly_rate")
except Exception as consultant_error:
print(f"Error querying Consultant: {consultant_error}, using default hourly_rate")
# Convert work_hours to float if it exists
if work_hours_per_day:
work_hours_per_day = float(work_hours_per_day)
print(f"Successfully retrieved all data for consultant: {full_name}")
print(f" - Consultant UID: {consultant_uid}")
print(f" - Assigned Company: {assigned_company}")
print(f" - Email: {email}")
print(f" - Work Item: {work_item}")
print(f" - Work Hours/Day: {work_hours_per_day}")
print(f" - RFX Number: {rfx_number}")
print(f" - Hourly Rate: {hourly_rate}")
except Exception as e:
print(f"Error querying ontology: {str(e)}")
return f"Error: Failed to query consultant data from ontology. {str(e)}"
# Set defaults for any missing fields
if assigned_company is None:
assigned_company = "Company Not Found"
if email is None:
email = f"{full_name.lower().replace(' ', '.')}@consultant.com"
if work_item is None:
work_item = "Consulting Services"
if work_hours_per_day is None:
work_hours_per_day = 8.0
if rfx_number is None:
rfx_number = "N/A"
if submitting_person is None:
submitting_person = full_name
if approving_person is None:
approving_person = ""
# Parse dates and calculate working days
try:
# Convert date objects to datetime for calculations
if isinstance(starting_date, dt):
start_dt = starting_date
elif isinstance(starting_date, date):
start_dt = dt.combine(starting_date, time())
else:
start_dt = dt.strptime(str(starting_date)\[:10\], '%Y-%m-%d')
if isinstance(end_date, dt):
end_dt = end_date
elif isinstance(end_date, date):
end_dt = dt.combine(end_date, time())
else:
end_dt = dt.strptime(str(end_date)\[:10\], '%Y-%m-%d')
# Format period string
period_str = f"{start_dt.strftime('%B %d, %Y')} to {end_dt.strftime('%B %d, %Y')}"
invoice_date = end_dt.strftime('%Y-%m-%d')
# Calculate working days (excluding weekends)
working_days = 0
current_date = start_dt
while current_date <= end_dt:
if current_date.weekday() < 5: # Monday to Friday
working_days += 1
current_date += timedelta(days=1)
# Calculate total hours
total_hours = working_days \* work_hours_per_day
except Exception as e:
print(f"Warning: Date parsing error: {str(e)}")
period_str = f"{starting_date} to {end_date}"
invoice_date = str(end_date) if end_date else dt.now().strftime('%Y-%m-%d')
working_days = 20
total_hours = working_days \* work_hours_per_day
# Calculate financial totals
hourly_rate_value = float(hourly_rate) if hourly_rate else 150.0
subtotal = total_hours \* hourly_rate_value
hst_rate = 0.13
hst_amount = subtotal \* hst_rate
total_amount = subtotal + hst_amount
# Generate invoice number based on full_name and end date
safe_name = full_name.replace(' ', '\_').replace('/', '\_').replace('.', '\_')
invoice_number = f"{safe_name}-{end_dt.strftime('%Y%m%d')}"
# Create PDF
buffer = BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=A4,
rightMargin=72, leftMargin=72,
topMargin=72, bottomMargin=18)
styles = getSampleStyleSheet()
# Custom styles
title_style = ParagraphStyle(
‘CustomTitle’,
parent=styles\['Heading1'\],
fontSize=24,
spaceAfter=30,
alignment=TA_CENTER,
textColor=colors.darkblue
)
story = \[\]
# Title
story.append(Paragraph("TIMESHEET INVOICE", title_style))
story.append(Spacer(1, 20))
# Header information
from_style = ParagraphStyle(
‘FromStyle’,
parent=styles\['Normal'\],
fontSize=10,
leading=12,
alignment=TA_LEFT
)
to_style = ParagraphStyle(
‘ToStyle’,
parent=styles\['Normal'\],
fontSize=10,
leading=12,
alignment=TA_RIGHT
)
# Consultant and client info
from_header = Paragraph('<b>Consultant:</b>', from_style)
to_header = Paragraph('<b>Client:</b>', to_style)
consultant_info = f"{full_name}<br/>{email}"
if consultant_uid:
consultant_info += f"<br/>ID: {consultant_uid}"
from_consultant = Paragraph(consultant_info, from_style)
to_client = Paragraph(f"{assigned_company}<br/>RFX: {rfx_number}", to_style)
info_data = \[
\[from_header, to_header\],
\[from_consultant, to_client\],
\['', ''\],
\[Paragraph(f'<b>Invoice #:</b> {invoice_number}', from_style),
Paragraph(f'<b>Invoice Date:</b> {invoice_date}', to_style)\],
\[Paragraph(f'<b>Period:</b> {period_str}', from_style), ''\],
\]
if approving_person:
info_data.append(\[
Paragraph(f'<b>Submitted by:</b> {submitting_person}', from_style),
Paragraph(f'<b>Approved by:</b> {approving_person}', to_style)
\])
info_table = Table(info_data, colWidths=\[3.2\*inch, 2.8\*inch\])
info_table.setStyle(TableStyle(\[
('VALIGN', (0, 0), (-1, -1), 'TOP'),
('ALIGN', (0, 0), (0, -1), 'LEFT'),
('ALIGN', (1, 0), (1, -1), 'RIGHT'),
('TOPPADDING', (0, 0), (-1, -1), 6),
('BOTTOMPADDING', (0, 0), (-1, -1), 6),
\]))
story.append(info_table)
story.append(Spacer(1, 30))
# Work details table
items_data = \[\['Work Description', 'Details', 'Amount'\]\]
item_style = ParagraphStyle(
‘ItemStyle’,
parent=styles\['Normal'\],
fontSize=10,
leading=12
)
# Create detailed breakdown
work_description = Paragraph(f"<b>{work_item}</b><br/>Period: {period_str}", item_style)
work_details = Paragraph(
f"Working Days: {working_days}<br/>"
f"Hours/Day: {work_hours_per_day:.1f}<br/>"
f"Total Hours: {total_hours:.1f}<br/>"
f"Rate: ${hourly_rate_value:.2f}/hr",
item_style
)
items_data.append(\[
work_description,
work_details,
f"${subtotal:,.2f}"
\])
items_table = Table(items_data, colWidths=\[2.5\*inch, 2\*inch, 1.5\*inch\])
items_table.setStyle(TableStyle(\[
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('ALIGN', (2, 0), (2, -1), 'RIGHT'),
('VALIGN', (0, 0), (-1, -1), 'TOP'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 11),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('TOPPADDING', (0, 1), (-1, -1), 8),
('BOTTOMPADDING', (0, 1), (-1, -1), 8),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black),
\]))
story.append(items_table)
story.append(Spacer(1, 20))
# Financial totals
totals_data = \[
\['Subtotal:', f"${subtotal:,.2f}"\],
\['HST (13%):', f"${hst_amount:,.2f}"\],
\['Total Amount:', f"${total_amount:,.2f}"\]
\]
totals_table = Table(totals_data, colWidths=\[4\*inch, 2\*inch\])
totals_table.setStyle(TableStyle(\[
('ALIGN', (0, 0), (-1, -1), 'RIGHT'),
('FONTNAME', (0, -1), (-1, -1), 'Helvetica-Bold'),
('FONTSIZE', (0, -1), (-1, -1), 14),
('BACKGROUND', (0, -1), (-1, -1), colors.lightgrey),
('LINEBELOW', (0, 0), (-1, 0), 1, colors.black),
('LINEBELOW', (0, -2), (-1, -2), 1, colors.black),
\]))
story.append(totals_table)
# Build PDF
doc.build(story)
# Get PDF data and encode to base64
pdf_data = buffer.getvalue()
buffer.close()
return base64.b64encode(pdf_data).decode(‘utf-8’)
@function
def save_pdf_to_dataset(
timesheet_obj: TimesheetProd,
starting_date: Date,
end_date: Date,
dataset_rid: String
) → String:
“”"
Generate timesheet PDF and save to Foundry Dataset for persistent storage.
This function generates a timesheet PDF using generate_invoice_by_timesheetprod_ontology
and saves it to a specified Foundry dataset for persistent storage and access.
The filename is automatically generated using email-invoice_number-currenttime format.
Args:
timesheet_obj: TimesheetProd object from Palantir ontology
starting_date: Start date (Date type from Palantir functions API)
end_date: End date (Date type from Palantir functions API)
dataset_rid: RID of the target Foundry dataset where PDFs will be stored
Returns:
Success message with dataset location and file details
"""
from datetime import datetime as dt, date, time
# Check if ontology SDK is available
if not ONTOLOGY_SDK_AVAILABLE:
return “Error: Ontology SDK not available. Please ensure ontology SDK is generated and installed.”
# Extract data from the TimesheetProd object for file naming
full_name = getattr(timesheet_obj, 'full_name', 'Unknown')
email = getattr(timesheet_obj, 'email', None)
# Generate invoice number (same logic as in the PDF generation)
# Convert dates for invoice number generation
if isinstance(end_date, dt):
end_dt = end_date
elif isinstance(end_date, date):
end_dt = dt.combine(end_date, time())
else:
end_dt = dt.strptime(str(end_date)\[:10\], '%Y-%m-%d')
safe_name = full_name.replace(' ', '\_').replace('/', '\_').replace('.', '\_')
invoice_number = f"{safe_name}-{end_dt.strftime('%Y%m%d')}"
# Create safe email for filename
if email:
safe_email = email.replace('@', '\_at\_').replace('.', '\_')
else:
safe_email = f"{safe_name}\_email"
# Get current timestamp
current_time = dt.now().strftime('%Y%m%d\_%H%M%S')
try:
# Generate PDF data using the timesheet ontology function
base64_data = generate_invoice_using_ontology(timesheet_obj, starting_date, end_date)
# Check if the function returned an error message instead of base64 data
if base64_data.startswith(“Error:”):
return f"
Failed to generate timesheet: {base64_data}"
# Decode base64 to PDF binary data
pdf_data = base64.b64decode(base64_data)
# Create filename using email-invoice_number-currenttime format
pdf_filename = f"{safe_email}-{invoice_number}-{current_time}.pdf"
# Connect to Foundry using the correct API structure
client = FoundryClient()
# Access the foundry SDK through foundry_sdk attribute
foundry_client = client.foundry_sdk
# Upload PDF to dataset using the correct API structure
# DatasetClient.File is the FileClient for file operations
dataset_client = foundry_client.datasets.Dataset
file_client = dataset_client.File
try:
# Upload PDF file using FileClient with correct parameters
file_client.upload(
dataset_rid, # First positional argument
f"timesheets/{pdf_filename}", # File path as second argument
body=pdf_data # File content as 'body' parameter
)
return f"
Timesheet PDF saved to dataset successfully!\n:file_folder: Dataset: {dataset_rid}\n:page_facing_up: File: timesheets/{pdf_filename}\n:bar_chart: Size: {len(pdf_data)} bytes\n:bust_in_silhouette: Consultant: {full_name}\n:date: Period: {starting_date} to {end_date}"
except Exception as upload_error:
return f"
File upload failed: {str(upload_error)}\n:light_bulb: Dataset RID: {dataset_rid}\n:magnifying_glass_tilted_left: Check dataset permissions and API access"
except ImportError as e:
return f"
Error: Missing required SDK - {str(e)}\nPlease ensure ontology SDK is properly installed."
except AttributeError as e:
return f"
Error: Dataset access issue - {str(e)}\nPlease verify dataset RID and permissions."
except Exception as e:
return f"
Error saving timesheet PDF to dataset: {str(e)}\nDataset RID: {dataset_rid}\nConsultant: {full_name}"
**
Local Preview Code Output**
Thanks in advance !