Continuing this thread as I am still experiencing performance issue on CPU
On a 350mb DistilBERT torch model:
Live deployment (0.5 cpu - 2GB): 1000 similar rows - 11sec for inference once deployment is ready. (=approx. 20min for 100,000rows)
Build Lightweight (2 cpu - 8gb): 1000 rows - OOM
Build - Spark with sidecar : 100,000 - 90min (here it is an ONNX equivalent)
Build with Spark default : 100,000 - 110min
How to get a build with similar performance than a live deployement ?
Does ONNX vs Torch should be looked at ?
Cheers,
from transforms.api import transform, Input, Output, lightweight
from palantir_models.transforms import ModelInput
import logging
@lightweight(cpu_cores=2, memory_gb=8)
@transform(
inference_input=Input("ri.foundry.main.dataset.anonymized-input-dataset-rid"),
model=ModelInput("ri.models.main.model.anonymized-model-rid"),
inference_output=Output("ri.foundry.main.dataset.anonymized-output-dataset-rid"),
)
def compute(inference_input, model, inference_output):
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Anonymized list of columns needed for the model
columns_needed = [
"feature_column_a_id",
"text_description_1",
"text_description_2",
"category_type_id"
]
# Load only the required columns and a sample number of rows
# The number 1000 is kept as it suggests a sampling or testing limit, not sensitive data.
input_data = inference_input.pandas()[columns_needed].head(1000)
# Model inference
inference_results = model.transform(input_data)
logger.info(f"Model inference complete. Output shape: {inference_results.output_data.shape}")
# Write output
inference_output.write_pandas(inference_results.output_data)
logger.info("Output written successfully.")
Sidecar:
from palantir_models.transforms import ModelInput
from transforms.api import transform, Input, Output, configure
from pyspark.sql.types import StructType, StringType # Simplified imports
@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@transform(
# Anonymized RIDs for input/output datasets and the model
output=Output("ri.foundry.main.dataset.anonymized-output-rid"),
source_df=Input("ri.foundry.main.dataset.anonymized-input-rid"),
model_input=ModelInput(
"ri.models.main.model.anonymized-model-rid",
use_sidecar=True,
sidecar_resources={"cpus": 2, "memory_gb": 4} # Model sidecar configuration kept for context
),
)
def compute(ctx, source_df, model_input, output):
# Simplified output schema with generic names
# Only a few fields are kept to show the structure, with all types simplified to StringType
output_schema = StructType([
StructField("feature_id", StringType(), True),
StructField("input_text_1", StringType(), True),
StructField("prediction_label", StringType(), True),
])
# --- Data Loading and Pre-processing ---
spark_df = source_df.dataframe()
# Anonymized and simplified filtering logic
filtered_spark_df = (
spark_df
# Simplified filter condition
.filter(spark_df["required_column"].isNotNull())
# Simplified selection of input columns
.select("feature_id", "input_text_1")
.limit(10000) # Keep limit for batch context
)
# Handle the edge case of an empty dataset
if filtered_spark_df.count() == 0:
empty_df = ctx.spark_session.createDataFrame([], output_schema)
output.write_dataframe(empty_df)
return
# --- Model Inference ---
filtered_pandas_df = filtered_spark_df.toPandas()
print(f"Starting inference on {len(filtered_pandas_df)} records.")
# The key step: calling the model's prediction method
inference_pandas_df = model_input.predict(filtered_pandas_df)
# --- Data Writing ---
# Convert the resulting pandas DataFrame back to a Spark DataFrame using the defined schema
final_spark_df = ctx.spark_session.createDataFrame(inference_pandas_df, schema=output_schema)
# Write the final results to the output dataset
output.write_dataframe(final_spark_df)