Example external transform to snowflake that uses PySpark and preferably uses a private key for authentication? I got it to work in pandas

I’m having some trouble getting the connection to work in PySpark but I was successful using Pandas:

import pandas as pd

from transforms.external.systems import ResolvedSource

from transforms.api import transform, Output, lightweight

import snowflake.connector

from transforms.external.systems import external_systems, Source

import base64

@lightweightlightweight(memo@external_systemsy_gb=64)

@external_systems(

source=Source("source")

)

@transform(

tables=Output("output"),

)

def compute(tables, source: ResolvedSource) → pd.DataFrame:

private_key_raw = source.get_secret("secret")

if private_key_raw.startswith(“-----BEGIN PRIVATE KEY-----”):

    private_key_pem = private_key_raw.encode("utf-8")

    private_key_stripped = "".join(

        \[line.strip() for line in private_key_pem.decode("utf-8").splitlines() if "BEGIN" not in line and "END" not in line\]

    )

    private_key = base64.b64decode(private_key_stripped)  # Decode base64 to get DER format

else:

    private_key = private_key_raw.encode("utf-8")

# Establish database connection

conn = snowflake.connector.connect(

    user="user”

    private_key=private_key,

    account="",

    warehouse="",

    database="",

    role="",

)

try:

    cursor = conn.cursor()



    query = """

        SELECT \*

        FROM Table

        WHERE "type" = 'type'

     

    """

    cursor.execute(query)

    results = cursor.fetchall()

    column_names = \[col\[0\] for col in cursor.description\]



    query_df = pd.DataFrame(results, columns=column_names)

# Write the DataFrame to the output table

    tables.write_pandas(query_df)

finally:

# Close cursor and connection

    cursor.close()

    conn.close()






It would be helpful to see what kind of errors you are getting when trying to run this in PySpark - it would give a starting point for investigation of what’s going on here :slight_smile:

This seems un-necessary. Why don’t you leverage the standard Snowflake Source to hold credentials and query data using a virtual table?

foundry will pushdown where clauses to snowflake when you query snowflake virtual tables.

2 Likes

It cannot come close to handling the scale I need unfortunately, it seems my only option is to make a transform generator of external transforms to break it up into many parts. For example the sync current fails with 1/200th of the data but succeeds with 1/300th and I don’t want to manually make 300 syncs.

All the errors have been around the private key being in the wrong format, I’m passing it the same way I passed it in the Pandas version

  if private_key_raw.startswith("-----BEGIN PRIVATE KEY-----"):
        private_key_pem = private_key_raw.encode("utf-8")
        private_key_stripped = "".join(
            [line.strip() for line in private_key_pem.decode("utf-8").splitlines() if "BEGIN" not in line and "END" not in line]
        )
        private_key = base64.b64decode(private_key_stripped)  # Decode base64 for DER format

The error is

The Py4JJavaError suggests that the Spark connector failed when executing the .load() operation, likely due to incorrect Snowflake connection options or misconfiguration. Specifically, the private key used for pem_private_key might not be properly parsed or encoded for Snowflake’s authentication method.

I am not suggesting to use Syncs.

Create a Virtual Table on top of your View/table in snowflake and use that in a pyspark transform as Input. Use an 3XL warehouse in the sync config and 64 or more executors (dynamic allocation) on foundry side. This will scale to very big tables.

Having said that, my suggestion from the other thread still holds. The most performant solution is to COPY INTO (use a large warehouse to accelerate this) your query into a snowflake internal stage as parquet. Download the parquet files using snowpark to the disk of the executor. Upload to the foundry filesystem.

1 Like

I tried virtual syncs and was consistently getting timeouts with very small portions of the data, can you explain the other suggestion a bit more, I’m not sure I understand what you are suggesting

Did you add the internal stage bucket url as egress policy and bucket policy as the documentation suggests? Timeouts are usually the sign that this is not done.

Yes those are all present in the egress policy

Could you post the code and the error messages when you try to pull in the virtual table? Also please share the warehouse size, spark config and table size.

net.snowflake.client.jdbc.SnowflakeSQLLoggedException: JDBC driver internal error: Max retry reached for the download of chunk#0 (Total chunks: 4) retry: 7, error: net.snowflake.client.jdbc.SnowflakeSQLException: JDBC driver encountered communication error. Message: Exception encountered for HTTP request: REDACTED.blob.core.REDACTED.net. at net.snowflake.client.jdbc.RestRequest.executeWithRetries(RestRequest.java:743) at net.snowflake.client.jdbc.RestRequest.executeWithRetries(RestRequest.java:568) at net.snowflake.client.jdbc.DefaultResultStreamProvider.getResultChunk(DefaultResultStreamProvider.java:135) at net.snowflake.client.jdbc.DefaultResultStreamProvider.getInputStream(DefaultResultStreamProvider.java:48) at net.snowflake.client.jdbc.SnowflakeChunkDownloader$2.call(SnowflakeChunkDownloader.java:1021) at net.snowflake.client.jdbc.SnowflakeChunkDownloader$2.call(SnowflakeChunkDownloader.java:935) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) Caused by: java.net.UnknownHostException: REDACTED.blob.core.REDACTED.net at java.base/java.net.InetAddress$CachedLookup.get(InetAddress.java:988) at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1818) at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1688) at n..

will fail regardless of spark profile, I don’t know warehouse size, table size might 30 TBs

It’s the exact error that I told you about. The driver is not able to download the chunks from the internal stage. The url is in your error log: REDACTED.blob.core.REDACTED.net

You need to know the warehouse size and be able to change it to make this ingestion working.

I do not control the warehouse, it belongs to our customer so I don’t think I can just get it raised