I have a 2TB dataset that I want to apply some basic operations to (a rename and a regex). When I run this build with 64 executors and “executor memory large” I still run into OOM issues. Given I am not using any joins/windows/aggregations I know I can safely process the input dataset in multiple chunks.
What is the best way to go about this?
It sounds like your operation should not involve a shuffle. I’d recommend looking at the partitions sizes. Do you have any particularly large partitions? Maybe you have a partition size skew?
Looking at the build there is 0 shuffle write and disk spillage. The biggest file in the input (which I know doesn’t exactly map to partitions) is 19MB so I wouldn’t expect there to be issues. Is there something I’m missing?
Hard to tell without seeing the build and the plan. 19MB will be compressed, so potentially there could be very large amount of data when uncompressed, but that would be quite a large compression ratio.
Assuming here you are talking of an executor OOM - If you have 64 executors and “executor memory large”, you could reduce down to a single executor core. This would provide more memory per executor core. Alternatively moving to “dynamic_allocation_max_128” will give you more resource, as would “executor memory extra large”. You could increase these and looks at the stats from the successful build to understand how to improve.
Are you sure it is not a driver OOM? And would require increased driver memory?
I think the compression ratio is indeed really high since its timeseries data with a lot of repeated data (e.g. each timeseries_id will have millions of repeats and sometimes the signal is equally repeated).
The OOM is definitely on executors as its stated on the build failure.
This is the physical plan (with redactions):
+- WriteFiles
+- CollectMetrics 0, [count(1) AS count(1)#41L, size_in_bytes(0, 0) AS size_in_bytes()#42L, count(col1#25) AS count(col1)#43L, count(col2#30L) AS count(col2)#44L, count(col3#20) AS count(col3)#45L, count(col4#10) AS count(col4)#46L]
+- *(1) Project [col#7 AS col1#25, othercol#8L AS col2#30L, regexp_extract(col3#9, <pattern>), 1) AS col3#20, col4#10]
+- *(1) Filter NOT isnan(col1#7)
+- *(1) ColumnarToRow
+- BatchScan parquet <path>/files/spark[col#7, othercol#8L, othercol2#9, othercol3#10] FoundryParquetScan DataFilters: [NOT isnan(col1#7)], Format: foundryparquet, Location: FoundryFileIndexV2(1 paths)[<path>..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [], PushedGroupBy: [], ReadSchema: struct<col1:double,col2:bigint,col3:string> RuntimeFilters: []
You said that reducing the number of executor cores would help correct? So not the executor count?
Yes, if you have 10g of memory and 2 cores you will be processing two tasks per executor at once. To simplify a bit: one task might need 6g after uncompressing, so if the seconds task needs >4g then you’ll get an OOM.
by reducing the executor core count to 1 you ensure that this conflict doesn’t happen, and each task gets the full 10g without causing an OOM.