Hi,
Could you give me examples about how to use LLMs in Code Repository? (both for Spark transforms and lightweight transforms)
Baris
Hi,
Could you give me examples about how to use LLMs in Code Repository? (both for Spark transforms and lightweight transforms)
Baris
Here is one lightweight transform code-snippet (a while ago) that you might use:
# from pyspark.sql import functions as F
from transforms.api import transform, Input, Output, lightweight
import polars as pl
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from palantir_models.transforms import OpenAiGptChatLanguageModelInput
from palantir_models.models import OpenAiGptChatLanguageModel
from language_model_service_api.languagemodelservice_api_completion_v3 import GptChatCompletionRequest
from language_model_service_api.languagemodelservice_api import ChatMessage, ChatMessageRole
from ratelimit import limits, sleep_and_retry
#rate limiting constants
rate_limit_number_calls = 500
rate_limit_period = 60
@lightweight()
@transform(
output=Output("/path/to/output_dataset"),
source_df=Input("/path/to/input_dataset"),
model=OpenAiGptChatLanguageModelInput("ri.language-model-service..language-model.gpt-4-o-mini"),
)
def compute_sentiment(ctx, source_df, model: OpenAiGptChatLanguageModel, output):
# Decorator to rate limit API calls
@sleep_and_retry
@limits(calls=rate_limit_number_calls, period=rate_limit_period)
def rate_limited_get_completions(review_content: str) -> str:
return get_completions(review_content)
# API call to get_completions
def get_completions(review_content: str) -> str:
system_prompt = "Write a 2 sentence poem inspired by the city name .random_word"
request = GptChatCompletionRequest(
[ChatMessage(ChatMessageRole.SYSTEM, system_prompt), ChatMessage(ChatMessageRole.USER, review_content)]
)
resp = model.create_chat_completion(request)
return resp.choices[0].message.content
# Convert source DataFrame to a DataFrame object for further processing
df = source_df.polars(lazy=True)
llm_input_cols = df.collect()['random_word'].to_list()
print(llm_input_cols)
# Function to manage API calls with threading
def process_api_calls(llm_input_cols):
results = []
with ThreadPoolExecutor(max_workers=10) as executor:
# Submit all the API calls at once, and let the ThreadPoolExecutor manage the concurrency
future_to_row = {executor.submit(rate_limited_get_completions, row): row for row in llm_input_cols}
for future in as_completed(future_to_row):
try:
data = future.result()
# Process your result here
results.append(data)
except Exception as exc:
print(f'Generated an exception: {exc}')
return results
# Call the function to process API calls with rate limiting
results = process_api_calls(llm_input_cols)
#add to dataframe
sentiment_df = pl.DataFrame({
'random_word': llm_input_cols,
'sentiment': results
})
return output.write_table(sentiment_df)
And you can generate some notional data (in spark here) with:
from pyspark.sql import types as T
from transforms.api import transform_df, Input, Output
import nltk
import uuid
import random
from nltk.corpus import words
from faker import Faker
import pandas as pd
@transform_df(
Output("/path/to/generate_notional_data"),
)
def compute(ctx, source_df):
# Initialize Faker
fake = Faker()
# Generate fake data
data = []
for _ in range(1000):
data.append({
'name': fake.name(),
'address': fake.address(),
'email': fake.email(),
'age': fake.random_int(min=18, max=80)
})
# Convert to Pandas DataFrame
pdf = pd.DataFrame(data)
# Define schema for Spark DataFrame
schema = T.StructType([
T.StructField('name', T.StringType(), True),
T.StructField('address', T.StringType(), True),
T.StructField('email', T.StringType(), True),
T.StructField('age', T.IntegerType(), True)
])
# Convert Pandas DataFrame to Spark DataFrame
return ctx.spark_session.createDataFrame(pdf, schema)