Here is a minimal example:
@transform(
output=Output("/path/to/bucketed/flights/dataset"),
source=Input("/path/to/flights/dataset"),
)
def compute(output, source):
output.write_dataframe(
source.dataframe().repartition("dep_runway_actual"),
bucket_cols=["dep_runway_actual"],
bucket_count=200,
sort_by=["dep_runway_actual"],
)
In the above example, I include a sort_by
in addition to specifying bucket_cols
because in principle this would allow downstream consumers to take advantage of both the bucketing and the sorting when performing a sort-merge join. However, this will only make a difference if the BUCKET_SORTED_SCAN_ENABLED spark property is enabled on the downstream job (and as noted at https://www.palantir.com/docs/foundry/optimizing-pipelines/projections-overview/#join-projected-and-non-projected-datasets, a bucket sorted scan does not necessarily improve performance).
Also note that in the above example, I call repartition on the dataframe in addition to specifying bucket columns in the write settings - this is recommended because Spark will perform bucketing separately for each partition, so if the same value of the bucket column exists in multiple partitions, you can end up with an excessive amount of output dataset files.
Please see my earlier answer for a discussion of the differences between bucketing and using a join-optimized projection (depending upon details of your use-case, like whether your pipeline is incremental, creating a join-optimized projection may be the correct way to achieve your objectives here).