We have a use case where we use Dask with map_partitions
to to various compute-intensive transformations of json-based data (~200GB per run). We do this mostly using pandas using vectorized operations where possible, and a lot of pre-processing using pandas.json_normalize
.
The usual workflow is:
- Process
- Repartition
- Store to parquet (using
snappy
compression)
The task graph for this looks about how I would (with my limited Dask knowledge) expect it to:
i.e. lots of computation in the beginning and data transfers afterwards during repartitioning.
There are approximately ~12,000-13,000 partitions per job, and we start around 48 t3.large
instances (2 vCPU, 8GB Memory).
I have noted that whenever the compute-intensive task runs, that Tasks Processing
view in the Status Dashboard shows something like the following (note that this was a run using 24 instances, but the behaviour still holds):
The workers also seem to have quite a lot of unmanaged memory:
and it does happen that a worker dies due to the memory limit being reached.
Besides adjusting memory_limit
, is the tasks being, for lack of a better technical term that I do not know - “jostled around” between workers so much indicative of a deeper, systemic problem to our approach?