Using LLMs in Code Repository

Hi,

Could you give me examples about how to use LLMs in Code Repository? (both for Spark transforms and lightweight transforms)

Baris

Tutorial: Supervised machine learning • Train a model in Code Repositories • Palantir

Here is one lightweight transform code-snippet (a while ago) that you might use:

# from pyspark.sql import functions as F
from transforms.api import transform, Input, Output, lightweight
import polars as pl
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from palantir_models.transforms import OpenAiGptChatLanguageModelInput
from palantir_models.models import OpenAiGptChatLanguageModel
from language_model_service_api.languagemodelservice_api_completion_v3 import GptChatCompletionRequest
from language_model_service_api.languagemodelservice_api import ChatMessage, ChatMessageRole
from ratelimit import limits, sleep_and_retry

#rate limiting constants
rate_limit_number_calls = 500
rate_limit_period = 60

@lightweight()
@transform(
    output=Output("/path/to/output_dataset"),
    source_df=Input("/path/to/input_dataset"),
    model=OpenAiGptChatLanguageModelInput("ri.language-model-service..language-model.gpt-4-o-mini"),
)
def compute_sentiment(ctx, source_df, model: OpenAiGptChatLanguageModel, output):
    # Decorator to rate limit API calls
    @sleep_and_retry
    @limits(calls=rate_limit_number_calls, period=rate_limit_period)
    def rate_limited_get_completions(review_content: str) -> str:
        return get_completions(review_content)

    # API call to get_completions
    def get_completions(review_content: str) -> str:
        system_prompt = "Write a 2 sentence poem inspired by the city name .random_word"
        request = GptChatCompletionRequest(
            [ChatMessage(ChatMessageRole.SYSTEM, system_prompt), ChatMessage(ChatMessageRole.USER, review_content)]
        )
        resp = model.create_chat_completion(request)
        return resp.choices[0].message.content
        
    # Convert source DataFrame to a DataFrame object for further processing
    df = source_df.polars(lazy=True)
    llm_input_cols = df.collect()['random_word'].to_list()
    print(llm_input_cols)

    # Function to manage API calls with threading
    def process_api_calls(llm_input_cols):
        results = []
        with ThreadPoolExecutor(max_workers=10) as executor:
            # Submit all the API calls at once, and let the ThreadPoolExecutor manage the concurrency
            future_to_row = {executor.submit(rate_limited_get_completions, row): row for row in llm_input_cols}

            for future in as_completed(future_to_row):
                try:
                    data = future.result()
                    # Process your result here
                    results.append(data)
                except Exception as exc:
                    print(f'Generated an exception: {exc}')
        return results

    # Call the function to process API calls with rate limiting
    results = process_api_calls(llm_input_cols)

    #add to dataframe
    sentiment_df = pl.DataFrame({
        'random_word': llm_input_cols,
        'sentiment': results
    })

    return output.write_table(sentiment_df)

And you can generate some notional data (in spark here) with:

from pyspark.sql import types as T
from transforms.api import transform_df, Input, Output
import nltk
import uuid
import random
from nltk.corpus import words
from faker import Faker
import pandas as pd

@transform_df(
    Output("/path/to/generate_notional_data"),
)
def compute(ctx, source_df):
    # Initialize Faker
    fake = Faker()

    # Generate fake data
    data = []
    for _ in range(1000):
        data.append({
            'name': fake.name(),
            'address': fake.address(),
            'email': fake.email(),
            'age': fake.random_int(min=18, max=80)
        })

    # Convert to Pandas DataFrame
    pdf = pd.DataFrame(data)

    # Define schema for Spark DataFrame
    schema = T.StructType([
        T.StructField('name', T.StringType(), True),
        T.StructField('address', T.StringType(), True),
        T.StructField('email', T.StringType(), True),
        T.StructField('age', T.IntegerType(), True)
    ])

    # Convert Pandas DataFrame to Spark DataFrame
    return ctx.spark_session.createDataFrame(pdf, schema)

2 Likes