How to monitor the results of expectation check at a row level?

Hey all,

I was wondering if there is a way to get more information regarding the rows that fail the expectation check(s) that are defined inside the code repository transformation in an automatic way.

Either by generating companion datasets for each combination of output-expectation check, or by adding a number of (boolean) columns to the output dataset equal to the number of checks, indicating if the specific row passes/fails the respective check.

Is it possible to get something like this done by working on top of Palantir Foundry’s check and expectations classes.

Thank you.

It is currently not possible to get row-level data expectation checks out of the box. You can hack your way around it, but things could then likely break in the future.

Considering a list of checks that should be satisfied, eg:

  checks = [
      Check(
          E.col("col_A").is_in(*["foo", "bar"]),
          "colA_is_valid",
          on_error="WARN",
          description="col_A has expected values",
      ),
      Check(
          E.col("col_B").non_null(),
          "col_B_is_valid",
          on_error="FAIL",
          description="col_B has non null values",
      )
  ]

You could use their definitions to derive new columns in the same or a separate dataset, something like:

df = df.select("*", *[
    check.expectation.predicate(
        F.col(check.expectation._col),
        type('DfWrapper', (object,), {'df': df})
    ).alias(check.name) for check in checks
])

which here would return a dataframe containing the original columns, as well as boolean columns colA_is_valid and col_B_is_valid corresponding to the defined checks.

Thank you very much. I will try it out, I think it’s exactly what I wanted.

One extra question, what do you mean that things could likely break in the future? Would you suggest to implement that in production pipelines or we run the risk of breaking them?

I mean that it’s not ideal to rely on the internal implementation of the expectations library (ie using attributes like _col), and that the product team could in the future modify these internal things without warning, as it’s not exactly how the library should be used.

Something more reliable could be to hardcode the logic to produce the pyspark column corresponding to each check type, eg:

"""
check is an attribute, but column a method: we need to initialise the checks outside the transform (to provide them to the output),
but we can't create pyspark columns outside the transform body, so we provide them via a method to be called within the transform itself. 
"""
class CustomCheck:
    def __init__(self, col: str, check_type: str, name: str, on_error: str, description: str, values: List[str] = None):
        expectation = self._create_expectation(col, check_type, values)
        self.col = col
        self.check_type = check_type
        self.name = name
        self.values = values
        self.check = Check(expectation, name, on_error, description)

    def _create_expectation(self, col: str, check_type: str, values: List[str] = None):
        if check_type == "is_in":
            return E.col(col).is_in(*values)
        elif check_type == "non_null":
            return E.col(col).non_null()
        else:
            raise ValueError(f"Unsupported check type: {check_type}")

    def column(self): 
        if self.check_type == "is_in":
            return F.col(self.col).isin(*self.values)
        elif self.check_type == "non_null":
            return F.col(self.col).isNotNull()
        else:
            raise ValueError(f"Unsupported check type: {self.check_type}")

Which you would use this way to generate :

custom_checks = [
    CustomCheck(
        col="col_A",
        check_type="is_in",
        name="colA_is_valid",
        on_error="WARN",
        description="col_A has expected values",
        values=["foo", "bar"]
    ),
    CustomCheck(
        col="col_B",
        check_type="non_null",
        name="col_B_is_valid",
        on_error="FAIL",
        description="col_B has non null values"
    )
]

@transform_df(
    Output(
        "ri.foundry.main.dataset.xxx",
        checks=[custom_check.check for custom_check in custom_checks],
    ),
    df=Input("ri.foundry.main.dataset.yyy"),
)
def compute(df): 
    df = df.select("*", *[custom_check.column().alias(custom_check.name) for custom_check in custom_checks])
    return df