Hey,
So I currently have a Code Repository set up, and within this I have imported a REST API as a source (called “Gdelt2Retrieve”) . Within this REST API, I also have a webhook called ("Gdelt2) configured. I can run this in the data connection tab and successfully see the correct results from the webhook/API. I also have the egress policies set up.
I am trying to configure this webhook in a code repository, with the end goal of having this function imported into pipeline builder (This would be the input for the rest of the pipeline). I also have the transform-external-systems setting enabled from the documentation. From my understanding, I have to call the webhook, and also have an output as a dataset.
Could somebody tell me if I am on the right track with this code? I tried to mimic the documentation:
from palantir.datasets.core import Dataset
from palantir.datasets.webhooks import WebhookClient
from pyspark.sql import function
@function(sources=["Gdelt2Retrieve"])
def call_webhook() -> str:
# Create a WebhookClient instance
webhook_client = WebhookClient()
# Execute the webhook
try:
response = webhook_client.execute("Gdelt2")
except Exception as e:
return f"Error: Webhook call failed due to an exception: {e}"
# Check if the webhook execution was successful
if response.status_code != 200:
return f"Error: Webhook call failed with status code {response.status_code}, response: {response.text}"
# Process the response data
try:
data = response.json() # Extract JSON data
except ValueError:
return "Error:"
return str(data)
Also, after importing my API: ‘Gdelt2Retrieve’, In the resources side tab it gives me this starter code:
import requests
@function(sources=["Gdelt2Retrieve"])
def my_function() -> String:
# TODO: specify endpoint url
response = requests.get(...)
if response.status_code != 200:
# Handle error
data = response.json()
# Use response data
I don’t necessarily need to use the function, but I figured this would be the easiest way to get a webhook integrated into the pipeline builder.