I’m having some trouble getting the connection to work in PySpark but I was successful using Pandas:
import pandas as pd
from transforms.external.systems import ResolvedSource
from transforms.api import transform, Output, lightweight
import snowflake.connector
from transforms.external.systems import external_systems, Source
import base64
@lightweightlightweight(memo@external_systemsy_gb=64)
@external_systems(
source=Source("source")
)
@transform(
tables=Output("output"),
)
def compute(tables, source: ResolvedSource) → pd.DataFrame:
private_key_raw = source.get_secret("secret")
if private_key_raw.startswith(“-----BEGIN PRIVATE KEY-----”):
private_key_pem = private_key_raw.encode("utf-8")
private_key_stripped = "".join(
\[line.strip() for line in private_key_pem.decode("utf-8").splitlines() if "BEGIN" not in line and "END" not in line\]
)
private_key = base64.b64decode(private_key_stripped) # Decode base64 to get DER format
else:
private_key = private_key_raw.encode("utf-8")
# Establish database connection
conn = snowflake.connector.connect(
user="user”
private_key=private_key,
account="",
warehouse="",
database="",
role="",
)
try:
cursor = conn.cursor()
query = """
SELECT \*
FROM Table
WHERE "type" = 'type'
"""
cursor.execute(query)
results = cursor.fetchall()
column_names = \[col\[0\] for col in cursor.description\]
query_df = pd.DataFrame(results, columns=column_names)
# Write the DataFrame to the output table
tables.write_pandas(query_df)
finally:
# Close cursor and connection
cursor.close()
conn.close()
It would be helpful to see what kind of errors you are getting when trying to run this in PySpark - it would give a starting point for investigation of what’s going on here
It cannot come close to handling the scale I need unfortunately, it seems my only option is to make a transform generator of external transforms to break it up into many parts. For example the sync current fails with 1/200th of the data but succeeds with 1/300th and I don’t want to manually make 300 syncs.
All the errors have been around the private key being in the wrong format, I’m passing it the same way I passed it in the Pandas version
if private_key_raw.startswith("-----BEGIN PRIVATE KEY-----"):
private_key_pem = private_key_raw.encode("utf-8")
private_key_stripped = "".join(
[line.strip() for line in private_key_pem.decode("utf-8").splitlines() if "BEGIN" not in line and "END" not in line]
)
private_key = base64.b64decode(private_key_stripped) # Decode base64 for DER format
The error is
The Py4JJavaError suggests that the Spark connector failed when executing the .load() operation, likely due to incorrect Snowflake connection options or misconfiguration. Specifically, the private key used for pem_private_key might not be properly parsed or encoded for Snowflake’s authentication method.
Create a Virtual Table on top of your View/table in snowflake and use that in a pyspark transform as Input. Use an 3XL warehouse in the sync config and 64 or more executors (dynamic allocation) on foundry side. This will scale to very big tables.
Having said that, my suggestion from the other thread still holds. The most performant solution is to COPY INTO (use a large warehouse to accelerate this) your query into a snowflake internal stage as parquet. Download the parquet files using snowpark to the disk of the executor. Upload to the foundry filesystem.
I tried virtual syncs and was consistently getting timeouts with very small portions of the data, can you explain the other suggestion a bit more, I’m not sure I understand what you are suggesting
Did you add the internal stage bucket url as egress policy and bucket policy as the documentation suggests? Timeouts are usually the sign that this is not done.
Could you post the code and the error messages when you try to pull in the virtual table? Also please share the warehouse size, spark config and table size.
net.snowflake.client.jdbc.SnowflakeSQLLoggedException: JDBC driver internal error: Max retry reached for the download of chunk#0 (Total chunks: 4) retry: 7, error: net.snowflake.client.jdbc.SnowflakeSQLException: JDBC driver encountered communication error. Message: Exception encountered for HTTP request: REDACTED.blob.core.REDACTED.net. at net.snowflake.client.jdbc.RestRequest.executeWithRetries(RestRequest.java:743) at net.snowflake.client.jdbc.RestRequest.executeWithRetries(RestRequest.java:568) at net.snowflake.client.jdbc.DefaultResultStreamProvider.getResultChunk(DefaultResultStreamProvider.java:135) at net.snowflake.client.jdbc.DefaultResultStreamProvider.getInputStream(DefaultResultStreamProvider.java:48) at net.snowflake.client.jdbc.SnowflakeChunkDownloader$2.call(SnowflakeChunkDownloader.java:1021) at net.snowflake.client.jdbc.SnowflakeChunkDownloader$2.call(SnowflakeChunkDownloader.java:935) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) Caused by: java.net.UnknownHostException: REDACTED.blob.core.REDACTED.net at java.base/java.net.InetAddress$CachedLookup.get(InetAddress.java:988) at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1818) at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1688) at n..
will fail regardless of spark profile, I don’t know warehouse size, table size might 30 TBs
It’s the exact error that I told you about. The driver is not able to download the chunks from the internal stage. The url is in your error log: REDACTED.blob.core.REDACTED.net
You need to know the warehouse size and be able to change it to make this ingestion working.