I am getting the below error while trying to build a python transform
A TransformContext
object does not have an attribute inputs
. Please check the spelling and/or the datatype of the object.
An below is my code:
import logging
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, StructField, StructType
from transforms.api import Input, Output, TransformContext, transform_df
# Set up logging configuration
logger = logging.getLogger(name)
logging.basicConfig(level=logging.INFO, format=“%(asctime)s - %(levelname)s - %(message)s”)
# Define schema at the top level for consistency
RESULT_SCHEMA = StructType([
- StructField(“tech_src_sys”, StringType(), True),*
- StructField(“tech_gold_copy_src_sys”, StringType(), True),*
- StructField(“Name”, StringType(), True),*
])
def process_dataset(df: DataFrame, dataset_name: str) → DataFrame:
- try:*
-
required_columns = ["tech_src_sys", "tech_gold_copy_src_sys"]*
-
if all(col in df.columns for col in required_columns):*
-
return df.select(*required_columns).withColumn("Name", F.lit(dataset_name))*
-
else:*
-
missing_columns = [col for col in required_columns if col not in df.columns]*
-
logger.warning(f"Dataset {dataset_name} is missing columns: {missing_columns}")*
-
return None*
- except Exception as e:*
-
logger.error(f"Error processing dataset {dataset_name}: {str(e)}")*
-
return None*
@transform_df(
-
Output(“…”),*
-
ds_1=Input(“…”),*
-
ds_2=Input(“…”),*
)
def compute(ctx: TransformContext, ds_1: DataFrame, ds_2: DataFrame) → DataFrame: -
logger.info(f"TransformContext attributes: {dir(ctx)}")*
-
ds_1_name = ctx.inputs[“ds_1”].name*
-
ds_2_name = ctx.inputs[“ds_2”].name*
-
logger.info(f"Dataset names: {ds_1_name}, {ds_2_name}")*
-
Process both input datasets*
-
processed_df1 = process_dataset(ds_1, ds_1_name)*
-
processed_df2 = process_dataset(ds_2, ds_2_name)*
-
Filter out None results and create a list of processed dataframes*
-
processed_dfs = [df for df in [processed_df1, processed_df2] if df is not None]*
-
Initialize result_df*
-
result_df = None*
-
if len(processed_dfs) > 1:*
-
result_df = processed_dfs[0]*
-
for df in processed_dfs[1:]:*
-
result_df = result_df.unionByName(df, allowMissingColumns=True)*
-
elif len(processed_dfs) == 1:*
-
result_df = processed_dfs[0]*
-
Log summary statistics*
-
total_datasets = 2*
-
processed_datasets = len(processed_dfs)*
-
logger.info(f"Processed {processed_datasets} out of {total_datasets} datasets")*
-
If no datasets were processed, create an empty DataFrame with the correct schema*
-
if not result_df:*
-
spark = SparkSession.builder.getOrCreate()*
-
result_df = spark.createDataFrame([], schema=RESULT_SCHEMA)*
-
logger.warning("No datasets were successfully processed. Returning an empty DataFrame.")*
-
Ensure schema consistency*
-
return result_df.select(RESULT_SCHEMA.fieldNames())
Can you help me understand where I might be going wrong?