Missing transform attribute

I am getting the below error while trying to build a python transform

A TransformContext object does not have an attribute inputs . Please check the spelling and/or the datatype of the object.

An below is my code:

import logging

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, StructField, StructType
from transforms.api import Input, Output, TransformContext, transform_df

# Set up logging configuration
logger = logging.getLogger(name)
logging.basicConfig(level=logging.INFO, format=“%(asctime)s - %(levelname)s - %(message)s”)

# Define schema at the top level for consistency
RESULT_SCHEMA = StructType([

  • StructField(“tech_src_sys”, StringType(), True),*
  • StructField(“tech_gold_copy_src_sys”, StringType(), True),*
  • StructField(“Name”, StringType(), True),*
    ])

def process_dataset(df: DataFrame, dataset_name: str) → DataFrame:

  • try:*
  •    required_columns = ["tech_src_sys", "tech_gold_copy_src_sys"]*
    
  •    if all(col in df.columns for col in required_columns):*
    
  •        return df.select(*required_columns).withColumn("Name", F.lit(dataset_name))*
    
  •    else:*
    
  •        missing_columns = [col for col in required_columns if col not in df.columns]*
    
  •        logger.warning(f"Dataset {dataset_name} is missing columns: {missing_columns}")*
    
  •        return None*
    
  • except Exception as e:*
  •    logger.error(f"Error processing dataset {dataset_name}: {str(e)}")*
    
  •    return None*
    

@transform_df(

  • Output(“…”),*

  • ds_1=Input(“…”),*

  • ds_2=Input(“…”),*
    )
    def compute(ctx: TransformContext, ds_1: DataFrame, ds_2: DataFrame) → DataFrame:

  • logger.info(f"TransformContext attributes: {dir(ctx)}")*

  • ds_1_name = ctx.inputs[“ds_1”].name*

  • ds_2_name = ctx.inputs[“ds_2”].name*

  • logger.info(f"Dataset names: {ds_1_name}, {ds_2_name}")*

  • Process both input datasets*

  • processed_df1 = process_dataset(ds_1, ds_1_name)*

  • processed_df2 = process_dataset(ds_2, ds_2_name)*

  • Filter out None results and create a list of processed dataframes*

  • processed_dfs = [df for df in [processed_df1, processed_df2] if df is not None]*

  • Initialize result_df*

  • result_df = None*

  • if len(processed_dfs) > 1:*

  •    result_df = processed_dfs[0]*
    
  •    for df in processed_dfs[1:]:*
    
  •        result_df = result_df.unionByName(df, allowMissingColumns=True)*
    
  • elif len(processed_dfs) == 1:*

  •    result_df = processed_dfs[0]*
    
  • Log summary statistics*

  • total_datasets = 2*

  • processed_datasets = len(processed_dfs)*

  • logger.info(f"Processed {processed_datasets} out of {total_datasets} datasets")*

  • If no datasets were processed, create an empty DataFrame with the correct schema*

  • if not result_df:*

  •    spark = SparkSession.builder.getOrCreate()*
    
  •    result_df = spark.createDataFrame([], schema=RESULT_SCHEMA)*
    
  •    logger.warning("No datasets were successfully processed. Returning an empty DataFrame.")*
    
  • Ensure schema consistency*

  • return result_df.select(RESULT_SCHEMA.fieldNames())

Can you help me understand where I might be going wrong?

Hello,

It’s saying that the following doesn’t work, because the ctx has no attribute called .inputs.

I’m not sure what ‘name’ it is you want to print in the logs? The RIDs are really the way datasets are referred to. There is a path and a dataset name that’s visible in Compass (the file browser), but that’s easily changeable, and the transform doesn’t care about it.

You could see the RIDs on the inputs by logging/printing ctx._parameters, that would give you a dict that contains the names of the inputs, their RIDs and the output RID.

(P.S. the formatting of your question makes it increadibly hard to read, perhaps you need to use code blocks? in the markdown?)