Huge difference between Live inference & Build with a model

Hello,

We are using a ‘large’ PyTorch model as part of our pipeline and we are having long build time (around 1hr) to just infer 1 or 2 rows.
While, when using a live deployment using same architecture (1*T4 GPU), the inference would take a few dozen of seconds.

Is there particular points to pay attention to optimize ‘build inferences’ ?

Cheers,

~seconds vs hours feels like you might be running inference on CPU rather than GPU… I’d check that the GPU is indeed with something like that:

import torch
import logging
from transforms.api import transform, Output, lightweight

@lightweight(gpu_type='NVIDIA_T4')
@transform(out=Output('/Project/folder/output'))
def compute(out):
    logging.info(torch.cuda.get_device_name(0))

or even a failure condition on the device name being the expected GPU.

Continuing this thread as I am still experiencing performance issue on CPU

On a 350mb DistilBERT torch model:

Live deployment (0.5 cpu - 2GB): 1000 similar rows - 11sec for inference once deployment is ready. (=approx. 20min for 100,000rows)

Build Lightweight (2 cpu - 8gb): 1000 rows - OOM

Build - Spark with sidecar : 100,000 - 90min (here it is an ONNX equivalent)

Build with Spark default : 100,000 - 110min

How to get a build with similar performance than a live deployement ?

Does ONNX vs Torch should be looked at ?

Cheers,

from transforms.api import transform, Input, Output, lightweight
from palantir_models.transforms import ModelInput
import logging

@lightweight(cpu_cores=2, memory_gb=8)
@transform(
    inference_input=Input("ri.foundry.main.dataset.anonymized-input-dataset-rid"),
    model=ModelInput("ri.models.main.model.anonymized-model-rid"),
    inference_output=Output("ri.foundry.main.dataset.anonymized-output-dataset-rid"),
)
def compute(inference_input, model, inference_output):
    # Set up logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    # Anonymized list of columns needed for the model
    columns_needed = [
        "feature_column_a_id",
        "text_description_1",
        "text_description_2",
        "category_type_id"
    ]

    # Load only the required columns and a sample number of rows
    # The number 1000 is kept as it suggests a sampling or testing limit, not sensitive data.
    input_data = inference_input.pandas()[columns_needed].head(1000)

    # Model inference
    inference_results = model.transform(input_data)
    logger.info(f"Model inference complete. Output shape: {inference_results.output_data.shape}")

    # Write output
    inference_output.write_pandas(inference_results.output_data)
    logger.info("Output written successfully.")

Sidecar:

from palantir_models.transforms import ModelInput
from transforms.api import transform, Input, Output, configure
from pyspark.sql.types import StructType, StringType # Simplified imports

@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@transform(
    # Anonymized RIDs for input/output datasets and the model
    output=Output("ri.foundry.main.dataset.anonymized-output-rid"),
    source_df=Input("ri.foundry.main.dataset.anonymized-input-rid"),
    model_input=ModelInput(
        "ri.models.main.model.anonymized-model-rid",
        use_sidecar=True,
        sidecar_resources={"cpus": 2, "memory_gb": 4} # Model sidecar configuration kept for context
    ),
)
def compute(ctx, source_df, model_input, output):
    # Simplified output schema with generic names
    # Only a few fields are kept to show the structure, with all types simplified to StringType
    output_schema = StructType([
        StructField("feature_id", StringType(), True),
        StructField("input_text_1", StringType(), True),
        StructField("prediction_label", StringType(), True),
    ])

    # --- Data Loading and Pre-processing ---
    spark_df = source_df.dataframe()
    
    # Anonymized and simplified filtering logic
    filtered_spark_df = (
        spark_df
        # Simplified filter condition
        .filter(spark_df["required_column"].isNotNull())
        # Simplified selection of input columns
        .select("feature_id", "input_text_1")
        .limit(10000) # Keep limit for batch context
    )

    # Handle the edge case of an empty dataset
    if filtered_spark_df.count() == 0:
        empty_df = ctx.spark_session.createDataFrame([], output_schema)
        output.write_dataframe(empty_df)
        return

    # --- Model Inference ---
    filtered_pandas_df = filtered_spark_df.toPandas()
    print(f"Starting inference on {len(filtered_pandas_df)} records.")
    
    # The key step: calling the model's prediction method
    inference_pandas_df = model_input.predict(filtered_pandas_df)

    # --- Data Writing ---
    # Convert the resulting pandas DataFrame back to a Spark DataFrame using the defined schema
    final_spark_df = ctx.spark_session.createDataFrame(inference_pandas_df, schema=output_schema)
    
    # Write the final results to the output dataset
    output.write_dataframe(final_spark_df)

Hi,

For Spark with sidecar, I would recommend using the ARROW_ENABLED profile (see Spark profile reference). I expect most of the slowness with Spark comes from having to convert the dataframe from Spark to Pandas format, which is generally slow and memory intensive, but made much more efficient with Arrow. You could also leverage executors via the DistributedInferenceWrapper as described here. This natively uses Arrow via Spark’s mapInPandas method.

We will soon support bringing a model as a sidecar in lightweight transforms as well, so that users can benefit both from the portability of models as containers and the efficient data loading of lightweight transforms.

Lightweight should show good performance as the data is loaded natively in Pandas, and no conversion is needed. I wonder if the out of memory error is coming from the fact that

 input_data = inference_input.pandas()[columns_needed].head(1000)

will actually load the entire dataset in memory, then get the first 1000 rows, which could easily fill available memory. In contrast, Spark is able to only read a subset of the data to load the 1000 rows in the dataset when you do:

    filtered_spark_df = (
        spark_df
        # Simplified filter condition
        .filter(spark_df["required_column"].isNotNull())
        # Simplified selection of input columns
        .select("feature_id", "input_text_1")
        .limit(10000) # Keep limit for batch context
    )

You might want to use the Polars lazy API instead, which supports filter pushdowns and should stream data rather than reading the entirety of the dataset to avoid memory issues.


df = (input.polars(lazy=True)
    .select(columns_needed)
    .filter(your_filters_here) 
    .head(10000)
    .collect()
).to_pandas()

or use the filesystem API directly to read a subset of files.

Hope this helps - let me know if you’re still not getting the performance you’d like.

PS: As to your initial question, I am curious if you were able to find out why you were seeing such discrepancies between live and batch deployments. Happy to help investigate if not.

Thanks,

Julien

Hello,

So yes, we ended up using this kind of configuration of lightweight transform:

A Lightweight transform with lazy polars + quantized ONNX (from the base Pytorch) yielded great results:

For 100k rows: from multiple hours with a mid-sized Spark profile (with distributed inference wrapper) down to 10min with a minimal 1 CPU / 6gb configuration: We can infer millions of rows for a few dollars !

Regarding the initial OOM, I have the same feeling as you where the input dataset (multiple dozen of GBs) would be loaded in memory and lead to the OOM before even being filtered. While using the Polar API, the filtering is done upstream enough to not have this issue.

For future readers, I believe Julien answers would solve most of your issues.

Cheers,

2 Likes

This topic was automatically closed 91 days after the last reply. New replies are no longer allowed.