Has anyone used spark’s from_protobuf in the past? Curious to see how one could efficiently parse a dataset with protobuf files
Figured it out:
To parse protobuf files with Spark, follow these steps:
1. Add the dependency
Add this to transforms-python/build.gradle
(note: NOT the root build.gradle
). You might have to toggle “show hidden files” to see the transforms-python/build.gradle
:
dependencies {
condaJars "org.apache.spark:spark-protobuf_2.12:3.5.5"
}
2. Generate the descriptor file
Generate the .desc
file using protoc
(can run this in Vscode in foundry directly if you have protobuf
conda/pip installed):
protoc -o protoschema.desc your_schema.proto
3. Import the descriptor in your code
from pkg_resources import resource_stream
stream = resource_stream(__name__, "protoschema.desc")
protobuf_descriptor_binary: bytes = stream.read()
4. Deserialize protobuf data
If your protobuf data is already in a dataframe column named binary
:
from pyspark.sql.protobuf import functions as PBF
import pyspark.sql.functions as F
output_df = df.withColumn(
"deser",
PBF.from_protobuf(
F.col("binary"),
"NameOfMessageObject",
binaryDescriptorSet=protobuf_descriptor_binary
)
)
5. If working with protobuf files instead of columns
import pyspark.sql.types as T
import pyspark.sql.functions as F
fs = pb_files_input.filesystem()
@F.udf(T.BinaryType())
def read_pb_file(file_path: str) -> bytes:
with fs.open(file_path, "rb") as f:
return f.read()
df = fs.files().select(read_pb_file(F.col("path")).alias("binary"))