Here’s a good snippet to mock an incremental behavior:
Building this transform will keep “appending” fake data, which let you test schedules, downstream transforms, etc.
# from pyspark.sql import functions as F
from transforms.api import transform, Input, Output, configure, incremental
from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql import types as T
import logging
import time
import json
log = logging.getLogger(__name__)
@incremental(semantic_version=1)
@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@transform(
out=Output("<rid>"),
)
def compute(ctx, out):
# HERE TODO : API Calls, custom processing, generate a dataframe and save it or save files directly to the output dataset
out.write_dataframe(get_dataframe(ctx))
def get_dataframe(ctx):
# Define the schema of the dataframe
schema = T.StructType([
T.StructField("name", T.StringType(), True),
T.StructField("age", T.IntegerType(), True),
T.StructField("city", T.StringType(), True)
])
# Create a list of rows
data = [("Alice", 25, "New York"),
("Bob", 30, "San Francisco"),
("Charlie", 35, "London")]
# Create a PySpark dataframe
df = ctx.spark_session.createDataFrame(data, schema=schema)
return df