Hello,
I am trying to write Iceberg tables (https://iceberg.apache.org/) to AWS S3 from within a Python transforms repository. I see documentation that Apache Iceberg tables are supported within Foundry. I am getting an error when I try to create my own Spark session or modify the Foundry Spark session.
Here is an example of what I’m trying to do.
@external_systems(s3=Source("source here"))
@transform(
docs=Input("table I'm trying to write to an Iceberg table"),
output=Output("Empty dataset because otherwise I get an error when I run a python transforms"),
)
def compute(s3, ctx, docs, output):
refreshable_credentials = s3.get_aws_credentials()
credentials = refreshable_credentials.get()
spark = ctx.spark_session
env = "dev"
warehouse_path = f"s3://palantir/{env}/iceberg"
catalog_name = f"iceberg_catalog_{env}"
docs_df = docs.dataframe()
logger.info("about to stop spark session")
spark.stop()
logger.info("spark session stopped")
new_spark = (
SparkSession.builder.appName("Table Ingestion")
.config(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
)
.config(
f"spark.sql.catalog.iceberg_catalog_{env}",
"org.apache.iceberg.spark.SparkCatalog",
)
.config("spark.sql.catalog.iceberg_catalog_{env}.type", "hadoop")
.config(f"spark.sql.catalog.iceberg_catalog_{env}.warehouse", warehouse_path)
.config("spark.hadoop.fs.s3a.impI", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.access.key", credentials.access_key_id)
.config("spark.hadoop.fs.s3a.secret.key", credentials.secret_access_key)
.getOrCreate()
)
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {namespace}")
table_exists = spark.catalog.tableExists("insert table_name here")
docs_df.writeTo("insert table_name here).using("iceberg").createOrReplace()
Can anyone advise if this is possible? If it is, how do I do this?
Thanks,
Jenny