Generating representative JSON samples for schema autogeneration

When working with JSON data, Pipeline Builder makes it really easy to convert your JSON files using the “Extract rows from a JSON file” transform. This transform lets you paste in a sample of your JSON and simply click “Generate schema” to have all the fields created for you.

This is great if you know the JSON schema well and have enough samples to cover all the possible fields. But what happens if the JSON sample you paste in doesn’t contain all the possible fields in your data? Unfortunately, any additional fields will be silently dropped by your pipeline. :frowning_face:

I faced this issue while working with the public Cyber Czech dataset that contains a large quantity of network traffic flows and event logs, provided in structured JSON. While the dataset comes with schema.jsch JSON schema files, you can’t paste these into the schema autogenerator as it would give you a schema that matches the JSON of the schema file, not the corresponding data file!

Initially, I tried to just copy and paste a few lines of JSON from the data.json files in the dataset, which worked for the SyslogEntry and WinlogEntry data, but the IPFlowEntry data had some fields that only showed up occasionally so my manual sampling had missed them.

To solve the problem, I used Code Workbook to take a JSON dataset as input and merge all the JSON objects within it into a single “union” dictionary. It does this by recursively walking the dictionary for each JSON object and updating the union dictionary with the key and value of any new keys it finds. At the end it logs the union dictionary as JSON so you can copy and paste it into Pipeline Builder.

import json

def json_union(json_dataset):

    # Initialise an empty dictionary to store the union of all keys
    union_dict = {}

    # Function to walk the tree of a dictionary
    def walk_tree(current_dict, union_dict):
        for key, value in current_dict.items():
            if isinstance(value, dict):
                # If the value is a dictionary, recursively walk the tree
                if key not in union_dict:
                    union_dict[key] = {}
                walk_tree(value, union_dict[key])
            else:
                # If the key is not in the union_dict or its value is null, update it
                if key not in union_dict or union_dict[key] is None:
                    union_dict[key] = value
                elif value is not None:
                    union_dict[key] = value

    # Access the filesystem
    fs = json_dataset.filesystem()

    # Read the JSON file line by line
    with fs.open('data.json', 'r') as file:
        for line in file:
            # Parse each line as a JSON object
            json_object = json.loads(line)
            walk_tree(json_object, union_dict)

    # Pretty-print the final union_dict to the log
    print(json.dumps(union_dict, indent=2))

The function only stores the union dictionary as it traverses the JSON dataset, so won’t run out of memory even for large datasets, e.g. the WinlogEntry data.json from this dataset is 7.3GB and it works fine even on my Spark environment with only 3GB of RAM!

Hope someone finds this helpful! Please comment below if you’ve experienced this problem too and how you’ve solved it, or if you can answer any of these questions:

  • Have you used a different method to generate representative JSON in order to autogenerate the schema?
  • Is there a way to have Pipeline Builder warn you, when building your pipleine, if you’ve missed any fields from the JSON?
  • Have you used any tools that will generate JSON samples from a JSON schema? (e.g. Oxygen XML Editor has a JSON Schema Converter.)