I mean that it’s not ideal to rely on the internal implementation of the expectations library (ie using attributes like _col
), and that the product team could in the future modify these internal things without warning, as it’s not exactly how the library should be used.
Something more reliable could be to hardcode the logic to produce the pyspark column corresponding to each check type, eg:
"""
check is an attribute, but column a method: we need to initialise the checks outside the transform (to provide them to the output),
but we can't create pyspark columns outside the transform body, so we provide them via a method to be called within the transform itself.
"""
class CustomCheck:
def __init__(self, col: str, check_type: str, name: str, on_error: str, description: str, values: List[str] = None):
expectation = self._create_expectation(col, check_type, values)
self.col = col
self.check_type = check_type
self.name = name
self.values = values
self.check = Check(expectation, name, on_error, description)
def _create_expectation(self, col: str, check_type: str, values: List[str] = None):
if check_type == "is_in":
return E.col(col).is_in(*values)
elif check_type == "non_null":
return E.col(col).non_null()
else:
raise ValueError(f"Unsupported check type: {check_type}")
def column(self):
if self.check_type == "is_in":
return F.col(self.col).isin(*self.values)
elif self.check_type == "non_null":
return F.col(self.col).isNotNull()
else:
raise ValueError(f"Unsupported check type: {self.check_type}")
Which you would use this way to generate :
custom_checks = [
CustomCheck(
col="col_A",
check_type="is_in",
name="colA_is_valid",
on_error="WARN",
description="col_A has expected values",
values=["foo", "bar"]
),
CustomCheck(
col="col_B",
check_type="non_null",
name="col_B_is_valid",
on_error="FAIL",
description="col_B has non null values"
)
]
@transform_df(
Output(
"ri.foundry.main.dataset.xxx",
checks=[custom_check.check for custom_check in custom_checks],
),
df=Input("ri.foundry.main.dataset.yyy"),
)
def compute(df):
df = df.select("*", *[custom_check.column().alias(custom_check.name) for custom_check in custom_checks])
return df