Protobuf parsing in spark transform

Has anyone used spark’s from_protobuf in the past? Curious to see how one could efficiently parse a dataset with protobuf files

Figured it out:

To parse protobuf files with Spark, follow these steps:

1. Add the dependency

Add this to transforms-python/build.gradle (note: NOT the root build.gradle). You might have to toggle “show hidden files” to see the transforms-python/build.gradle:

dependencies {
    condaJars "org.apache.spark:spark-protobuf_2.12:3.5.5"
}

2. Generate the descriptor file

Generate the .desc file using protoc (can run this in Vscode in foundry directly if you have protobuf conda/pip installed):

protoc -o protoschema.desc your_schema.proto

3. Import the descriptor in your code

from pkg_resources import resource_stream
stream = resource_stream(__name__, "protoschema.desc")
protobuf_descriptor_binary: bytes = stream.read()

4. Deserialize protobuf data

If your protobuf data is already in a dataframe column named binary:

from pyspark.sql.protobuf import functions as PBF
import pyspark.sql.functions as F

output_df = df.withColumn(
    "deser", 
    PBF.from_protobuf(
        F.col("binary"),
        "NameOfMessageObject",
        binaryDescriptorSet=protobuf_descriptor_binary
    )
)

5. If working with protobuf files instead of columns

import pyspark.sql.types as T
import pyspark.sql.functions as F

fs = pb_files_input.filesystem()

@F.udf(T.BinaryType())
def read_pb_file(file_path: str) -> bytes:
    with fs.open(file_path, "rb") as f:
        return f.read()

df = fs.files().select(read_pb_file(F.col("path")).alias("binary"))