Huge difference between Live inference & Build with a model

Hello,

We are using a ‘large’ PyTorch model as part of our pipeline and we are having long build time (around 1hr) to just infer 1 or 2 rows.
While, when using a live deployment using same architecture (1*T4 GPU), the inference would take a few dozen of seconds.

Is there particular points to pay attention to optimize ‘build inferences’ ?

Cheers,

~seconds vs hours feels like you might be running inference on CPU rather than GPU… I’d check that the GPU is indeed with something like that:

import torch
import logging
from transforms.api import transform, Output, lightweight

@lightweight(gpu_type='NVIDIA_T4')
@transform(out=Output('/Project/folder/output'))
def compute(out):
    logging.info(torch.cuda.get_device_name(0))

or even a failure condition on the device name being the expected GPU.

Continuing this thread as I am still experiencing performance issue on CPU

On a 350mb DistilBERT torch model:

Live deployment (0.5 cpu - 2GB): 1000 similar rows - 11sec for inference once deployment is ready. (=approx. 20min for 100,000rows)

Build Lightweight (2 cpu - 8gb): 1000 rows - OOM

Build - Spark with sidecar : 100,000 - 90min (here it is an ONNX equivalent)

Build with Spark default : 100,000 - 110min

How to get a build with similar performance than a live deployement ?

Does ONNX vs Torch should be looked at ?

Cheers,

from transforms.api import transform, Input, Output, lightweight
from palantir_models.transforms import ModelInput
import logging

@lightweight(cpu_cores=2, memory_gb=8)
@transform(
    inference_input=Input("ri.foundry.main.dataset.anonymized-input-dataset-rid"),
    model=ModelInput("ri.models.main.model.anonymized-model-rid"),
    inference_output=Output("ri.foundry.main.dataset.anonymized-output-dataset-rid"),
)
def compute(inference_input, model, inference_output):
    # Set up logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    # Anonymized list of columns needed for the model
    columns_needed = [
        "feature_column_a_id",
        "text_description_1",
        "text_description_2",
        "category_type_id"
    ]

    # Load only the required columns and a sample number of rows
    # The number 1000 is kept as it suggests a sampling or testing limit, not sensitive data.
    input_data = inference_input.pandas()[columns_needed].head(1000)

    # Model inference
    inference_results = model.transform(input_data)
    logger.info(f"Model inference complete. Output shape: {inference_results.output_data.shape}")

    # Write output
    inference_output.write_pandas(inference_results.output_data)
    logger.info("Output written successfully.")

Sidecar:

from palantir_models.transforms import ModelInput
from transforms.api import transform, Input, Output, configure
from pyspark.sql.types import StructType, StringType # Simplified imports

@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@transform(
    # Anonymized RIDs for input/output datasets and the model
    output=Output("ri.foundry.main.dataset.anonymized-output-rid"),
    source_df=Input("ri.foundry.main.dataset.anonymized-input-rid"),
    model_input=ModelInput(
        "ri.models.main.model.anonymized-model-rid",
        use_sidecar=True,
        sidecar_resources={"cpus": 2, "memory_gb": 4} # Model sidecar configuration kept for context
    ),
)
def compute(ctx, source_df, model_input, output):
    # Simplified output schema with generic names
    # Only a few fields are kept to show the structure, with all types simplified to StringType
    output_schema = StructType([
        StructField("feature_id", StringType(), True),
        StructField("input_text_1", StringType(), True),
        StructField("prediction_label", StringType(), True),
    ])

    # --- Data Loading and Pre-processing ---
    spark_df = source_df.dataframe()
    
    # Anonymized and simplified filtering logic
    filtered_spark_df = (
        spark_df
        # Simplified filter condition
        .filter(spark_df["required_column"].isNotNull())
        # Simplified selection of input columns
        .select("feature_id", "input_text_1")
        .limit(10000) # Keep limit for batch context
    )

    # Handle the edge case of an empty dataset
    if filtered_spark_df.count() == 0:
        empty_df = ctx.spark_session.createDataFrame([], output_schema)
        output.write_dataframe(empty_df)
        return

    # --- Model Inference ---
    filtered_pandas_df = filtered_spark_df.toPandas()
    print(f"Starting inference on {len(filtered_pandas_df)} records.")
    
    # The key step: calling the model's prediction method
    inference_pandas_df = model_input.predict(filtered_pandas_df)

    # --- Data Writing ---
    # Convert the resulting pandas DataFrame back to a Spark DataFrame using the defined schema
    final_spark_df = ctx.spark_session.createDataFrame(inference_pandas_df, schema=output_schema)
    
    # Write the final results to the output dataset
    output.write_dataframe(final_spark_df)