I have some existing SQL statements living in Jupyter code notebook and I want to migrate this to Foundry.
I want to use Code Repository to host this code.
I want to use SQL in my pyspark transforms in Code repository. How can I do this ?
I have some existing SQL statements living in Jupyter code notebook and I want to migrate this to Foundry.
I want to use Code Repository to host this code.
I want to use SQL in my pyspark transforms in Code repository. How can I do this ?
You need to use create a Temp view of your dataset createOrReplaceTempView
, fetch the spark context ctx.spark_session
, and execute SQL statement as a result .sql(...)
.
Note: You should use createOrReplaceTempView
and not createTempView
as the Preview will otherwise fails with something along the lines of [TEMP_TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create the temporary view ... because it already exists
This is an example code:
from pyspark.sql.types import StructField, StructType, StringType
from transforms.api import transform, Input, Output, incremental
SCHEMA = StructType([
StructField('id', StringType(), True),
StructField('hair_color', StringType(), True),
StructField('eye_color', StringType(), True),
StructField('gender', StringType(), True),
])
@incremental()
@transform(
incremental_input=Input(),
processed=Output()
)
def process_new_transforms(ctx, incremental_input, processed):
new_students_df = incremental_input.dataframe()
new_students_df. createOrReplaceTempView("new_students_df")
# Apply SQL Transform on Spark dataframe
sql_query = "SELECT * FROM new_students_df WHERE hair_color=='BLACK'"
processed_transform = ctx.spark_session.sql(sql_query)
processed.write_dataframe(
processed_transform
)
One advantage amongst otherwise: You will benefit from the whole incremental syntax and flexibility, while still being able to use SQL statements.