Streaming Bad Records

We have a pipeline where we create an array of structs of length N, where N is found in another column Column A in our record. We ran into an issue where we had a bad record (Column A had a value of 300 (instead of usually 0-10). As a result, our pipeline created a new row such that it now exceeded the maximum size for kafka and threw a build error?

Is there any way to purge this record from our stream? What are the best practices for handling “bad records” in streaming?

You can use filter rows transform to remove the row before it reaches the transform that creates the array of structs.

The filter can be defined as “remove rows where the column value is greater than 10”.