I have a lot of JSONs uploaded in one dataset, with various schemas.
How can I parse those jsons and perform operations on them without knowing their exact schema at time of implementation ?
I have a lot of JSONs uploaded in one dataset, with various schemas.
How can I parse those jsons and perform operations on them without knowing their exact schema at time of implementation ?
One solution is to put a generic schema on your dataset that contains the raw JSONs files.
Example schema:
{
"fieldSchemaList": [
{
"type": "STRING",
"name": "row",
"nullable": null,
"userDefinedTypeClass": null,
"customMetadata": {},
"arraySubtype": null,
"precision": null,
"scale": null,
"mapKeyType": null,
"mapValueType": null,
"subSchemas": null
}
],
"primaryKey": null,
"dataFrameReaderClass": "com.palantir.foundry.spark.input.DataSourceDataFrameReader",
"customMetadata": {
"format": "text",
"options": {}
}
}
You can then parse a given key in pyspark for example :
# Extract specific key (e.g., "key1") from the JSON string without schema
df = df.withColumn("key1", F.get_json_object(F.col("row"), "$.key1"))
or
# Define schema for the JSON string
schema = StructType([
StructField("key1", StringType(), True),
StructField("key2", StringType(), True)
])
# Extract specific key (e.g., "key1") from the JSON string
df = df.withColumn("key1", F.from_json(F.col("row"), schema)["key1"])
You can as well parse as many column as you can with a code such as
@transform_df(
Output("/path/example_output"),
source_df=Input("/path/example_input"),
)
def compute(ctx, source_df):
# Generate Schema
schema = ctx.spark_session.read.json(source_df.rdd.map(lambda x: x["row"])).schema
# You can use "multiLine=True" if you have one json in each file
# Parse the JSON with the discovered schema
results = source_df.withColumn("json_parsed", F.from_json("row", schema))
# Parse out all columns
results = results.select("row", "json_parsed.*")
return results