Run dask in parallel doesn't work as expected, in distributed kubernetes pods

Hi everyone,

We’re trying to run a 1gb data in parallel using dask, on 64 pods (in kubernetes), and by looking at the report, it looks like there is a lot of idle time (blank spots) in which the workers don’t do any job.
We have a 1 million records, and we submit batch of 1000 records for each worker. We tried to use a smaller batch of 500 records, and got the same results.
When we’re trying to increase to 128 pods, it even start to fail with timeout -

File “/usr/local/lib/python3.9/dist-packages/distributed/comm/”, line 317, in connect
raise OSError(
OSError: Timed out trying to connect to tcp://dask-anaplan-170cf546-7.pai-non-prod-mercury-perf:8786 after 120 s

Do you know what could be the problem for the workers not to be fully utilized?
And also why does it fail on 128 pods?

Thanks in advance.

Hello. Would be necessary to have some more information about the workload.
What is the amount of resources for each worker pod ? Probably would be better to have fewer pods
with more resources.
Another thing that could be useful is to inspect the dask worker profiles

Hi @moranRe, welcome here!

As @u_ser said, we lack some information to better help here. Could you produce some code snippet of your workload?

1GB data and 64 pod, so 64 workers, sounds a bit too much workers for the task. Also 1000 records batch sounds small: how large is each batch of data?

Thanks both @u_ser and @guillaumeeb for the answer. Each worker pod has 4 cpu and 15Gb memory. Each batch of data is about 1mb, even less. We aim to process data of few gb (~5gb), but currently we are testing it on overall data of 1gb. We wanted to see that adding more pods (more resources) will give us the ability to scale, so we’ll be able to support more data in the future. The problem is that when we increase the number of pods, and accept it to be faster, by adding more parallelism, we get about the same runtime when we run with 32 pods, 64 pods and also 128 pods. If I’ll run it with fewer pods, how will I gain more parallelism? Even when running with worker pods that has more resources, shouldn’t I need to define it to work on multiprocess so it will parallel the tasks in a single pod? And if so, how do I combine 2 different worker types - pods and processes? Or am I missing something here?
I also tried to run it with a 2000 records in each batch, and got pretty much the same results.
This is the function that we submit to the workers -

Thanks in advance.

This means that with 32 pods, you’ve got 128 tasks that could run in parallel, this is already a lot for only 1GB of data!

These are really small batches. from what type of storage are you reading the data? Are you not IO bound? How much time does take a single batch processing?

Which cluster manager are you using? dask-kubernetes?

I think that one pod can either run one threaded worker, or multiple worker processes, it depends on what you are using to launch pods.

Thanks @guillaumeeb for the information.
That’s why it seems very strange to me that it takes a lot of time, and we don’t get full parallelism.
We use S3 storage, which we read the data from, before sending it to dask. Inside the task that we send to dask, we also have a step at the end that writes statistics to s3, so I thought at the begining that we’re IO bound, but I saw that most of the time the task is spending is not in this step, but in the training step.
It’s our data science code in which they train the data, and use sklearn for that. So it looks like most of the time is spending there. A task of a single batch takes a few minutes (can get to even 7min).
But even if it takes quite a while, I still expect it not to be 2 and a half hours with 64 pods, and even 128 pods. WDYT?

We use dask-kubernetes to manage the cluster.
Can you please elaborate how do I define the pod to run with multiple worker processes? Maybe it can help.

Thank you very much.

If you are using KubeCluster — Dask Kubernetes 2021.03.0+159.g6d54d27 documentation, then you should be able to achieve this by specifying a custom worker_command kwarg with the --nworkers option into it. I’m not sure it will play well with auto-scaling, but it should work, or am I mistaken @jacobtomlinson?

Anyway, I’m not sure this really is your problem. In order to really help, we would need a Minimum Reproducible Example, or at least some code snippet of how you are reading the data and submitting work to Dask. Which Dask API are you using and how? If each task is taking a few minutes, there’s no reason you see idle workers if you have enough tasks planned in your task graph.

Hi @guillaumeeb , sorry for the late response. Thank you very much for the explanation. It is really hard for me to put a snippet code of the all flow, because it’s a code of data science, and the function that is submitted to dask contains a lot of functions in it, which makes it very hard to debug. But what i did find is that in each worker pod it seems that the memory increases more and more with each task it handles.
When I change the return statement in this submitted function to return an empty list, I see that the the memory is ok and doesn’t change. Also when I change the return instead of returning the output it returns today, to return the same input it got, the memory doesn’t change as well.
So I’m trying to understand if we have a memory leak, or maybe it’s something to do with the memory management in dask, and how the results are stored in the distributed memory of the worker nodes.
If you can elaborate on how the results are stored, it will be great (I read the documentation, but maybe you can add more to that). Do you think it could be related? Or do you have any idea what can I do to better understand the issue?

This is how we set the cluster -

extra_cluster_config["pod_template"] = make_pod_spec(
cluster = KubeCluster(scheduler_service_wait_timeout=240, **extra_cluster_config)
cluster.adapt(minimum=minimum_workers, maximum=maximum_workers)

This is how we set the client -

self._client = Client(cluster, timeout=120)

This is how we submit task to dask - We call the submit from a loop, for each batch of items -

for batch_number, first_batch_index in enumerate(
                range(0, len(items_to_predict), NUM_OF_ITEMS_IN_BATCH), start=1
    self._client.submit(func, *args, **kwargs, key=unique_task_name)

And this is how we collect the results: (We also used the gather api)

        n_tasks = len(tasks)
        results = []
        task_idx = 1
        for task in as_completed(tasks):
            result = task.result()
            logger.debug(f"Gathered {task_idx} task out of {n_tasks}")
            task_idx = task_idx + 1
        return results

Thank you very much in advance,

Sorry for the delay in responding. I don’t think the gaps in your scheduling are related to your use of dask-kubernetes.

I took the liberty to edit your post for more clarity using code cells.

Well, if you generate new data, it will be kept into Worker memory until you gather it back and release the Future somehow. But I don’t think this is related to your original problem.

There is something I don’t sure I understand here, looking at the Dashboard screenshot, your tasks seem more in the order of a few seconds each, so what’s the reality?

Here also, it does not seem to correspond to the screenshot: in the screenshot, you’ve got different colors for each tasks, which means there’s different steps in the graph, not just a submission of tasks within a for loop. And what do you mean by

for each batch of items


I don’t either, I think this is more related to the workflow somehow.

In the end, we would need a Minimum Reproducible Example: some simple code that generates these gaps on your Cluster. What do you observe when just submitting a bunch of sleeping tasks?

Hi @guillaumeeb. Thank you very much for your answer.
After further investigation, I see that in the beginning of the run of each worker pod, the tasks indeed takes a few seconds, but towards the end of the running, it becomes a few minutes.

What I mean by “for each batch of items” is that we have a dataframe of 1M items, and we split it into batches of 1000 items in each batch. And this data, along with other parameters, we send to a function called “new_instance_batch_seasonality_and_fit_predict” which we submit to dask.

And what do you mean by different steps in the graph? If you can elaborate about the report, it will be great, because it’s not so clear to us as well. What are the colors mean, and why do I see some lines that starts later on?

What I did found, when looking at some of the worker pods, is that in the middle of the run, the memory suddenly increases within a few minutes much more that it was. For example, it can start with ~3gb for half an hour, and then all of a sudden, it will increase to 9gb or even 14gb. And it happen for those workers at the same time. When I look at what happened during that time, I see that the scheduler retired the workers, and it looks like it happened because of the active memory manager.
Up until that moment, it seems that the workers were working in a good rate, but then they started to work slower.
Could you maybe explain a little bit about the active memory manager?

I think it will be hard for us to send a Minimum Reproducible Example, because I see it happened only when we return this output from the tasks. But I’'ll try to see if I can, and also submit some sleeping tasks and see what happens.

Thank you very much for your help,

Is that expected?

How do you do that, how do you read the input data and dispatch it to Workers? Couldn’t you use Dask DataFrame? Reading the data on Client side and then shipping it to Dask is a bad practice and could be one reason of what you’re seeing. Data should be read on worker side.

Your graph is showing rectangles with different colors, one colors usually correspond to one source function, so I was assuming that you had several steps into your computations that explaines those colors. But I’m under the impression that you just submit a bunch of task with the same function to be applied on different pieces of Data is that it? If you hoover with your mouse over the rectangles, what name does Dask shows?

Either time for data to be shipped to workers, either time to start these workers, especially with adaptive mode.

Well, by looking at your code, this does not seem to come from Dask or your input data, so it is probably related to what you do inside the function you are executing on the Dask Cluster. Scheduler is retiring workers before they go out of memory, you must try to understand why this is happening. Why at some point your tasks begin to eat up all the memory? What are your returning as output, what size, type, etc… ?