The problem of slow big data transmission in dask workers

Recently, during the use of dask, I have found an issue with slow data transmission in dask workers.

I created 32 workers, and after each worker completes the task, they will transfer around 1GB of data back.

However, the transfer rate of each worker is very low, around a few hundred kb per second, resulting in a long transmission time.

Often, it takes 2-3 minutes for all workers to complete the task, while the transfer takes 7-8 minutes.

Now, I want to increase the transmission speed to improve performance.

So how can I solve this problem? Thanks a lot.

Hi @yangtong93, welcome to Dask discourse forum!

Would you be able to share some reproducer or at least some code snippet of what you are doing?

Also, what is your setup, are you on a LocalCluster?

Worker transfer rates should be as fast as network allows it, but it might really depend on what you’re sending/gathering back and how.

Hi @guillaumeeb ,thank you for your reply!

I am having these workers read the images, perform some image processing, and then transfer them to the main program.

My code snippet is roughly like this:

client = Client('127.0.0.1:8786')
futures  = client.map(self.fuse, parameters)
for future in futures:
     index,image = future.result()
     marged_image[index] = image
def fuse(self,parameters):
    images = self.read_tile(parameters)
    images = apply_profile_correct(images, parameters)
    img = self.fuse_image_slice(images, self._shifts)
    return index,img

All tasks are completed locally. The following are the commands I use to start the worker

dask scheduler
dask worker 127.0.0.1:8786 --nworkers auto

So, I want to know the correct transmission method.

In addition, I attempted to transfer 12GB of data using a single worker and the transfer was completed within 4 seconds.

So, I suspect that there is competition between multiple workers when transmitting data, which has led to a decrease in overall speed.

What happens if you try to use the as_completed function to get the results as soon as they end?

Hi @guillaumeeb ,thank you for your reply!

I tried the method you said, and the result is still the same as before.

Is there any other way I can try it out?

I see no reason why the transfer rate would be lower with multiple Workers and a single one, did you use the same code?

In the code, you are retrieving results one by one in a loop, so there is no competition.

Could you build a complete repoducer by generating random data?

Hi @guillaumeeb, I used some test code to simulate my usage scenario.

import dask
from dask.distributed import Client,Scheduler,as_completed
import numpy as np
import sys
from pympler import asizeof
import time


def fuse(index):
    n2 = time.time()
    array_size = 1024 * 1024 * 1024  

    random_array = np.random.randint(0, 65536, size=array_size // 2, dtype=np.uint16)

    img = random_array.reshape(512, -1)
    memory_usage_gb = img.nbytes / (1024**3)
    print(index,memory_usage_gb,'GB')
    n3 = time.time()
    return index,img,n2,n3

client = Client('127.0.0.1:8786')
task_count = 80
marged_image = {}
parameters = list(range(task_count))
futures  = client.map(fuse, parameters)
n0 = time.time()
for future in as_completed(futures):
     n1 = time.time()
     index,image,n2,n3 = future.result()
     n4 = time.time()
     marged_image[index] = image
     n5 = time.time()
     print('index[{}] n1-n0[{:.1f}]s  n3-n2[{:.1f}]s n4-n3[{:.1f}]s n5-n4[{:.1f}]s'.format(index,n1-n0,n3-n2,n4-n3,n5-n4))

end = time.time()
print('time spend: [{:.1f}]s'.format(end-n0))

memory_usage_gb = asizeof.asizeof(marged_image)/ (1024**3)
print(memory_usage_gb, 'GB')

The running result of the above code is as follows:

From the test results, it can be seen that the time it takes for the worker to transfer the result data to the main program is becoming longer and longer.

In addition, I used a single worker to transfer 20GB of data, and the following are the test results:

Hi @yangtong93, thanks for working on this!

Based on your example and results, I don’t think the time to transfer is really increasing, this is the time between the end of the task and the gathering of the result which is increasing, but this sounds perfectly normal. Let me explain, since you have 32 workers, you have 32 tasks that end at almost the same time. But since you are gathering results sequentially, and it takes about 2 seconds for each result to be gathered back to the client, you’ll have a n4-n3 time of about 32*2s for the 32th result. It’s about the same time you see with a single Worker, 1GB takes about 2 seconds to be transferred.

So in this example, it all comes to the facts that results are being transferred sequentially to the client, and that the task processing time for 32 tasks in parallel is much lower than the time it takes to gather 32GB of results.

There is one weird thing though, the single execution of a Task is slower at the end, not sure why.

So the question is: do you really need to get the results back to the main process? Could you write the results to disk in each tasks? Or could you continue working on the result in a distributed way?

Hi @guillaumeeb, I roughly understood what you said.

I think I can adjust the process to ensure that calculations and transfers occur simultaneously, rather than completing all calculations before transferring.

For example, I can use submit to replace the map and let the worker who completes the task first transfer it. I’m going to give it a try.

Finally, thank you for your guidance and for resolving my doubts.

I was probably unclear: the first transfer starts right after the first task ends, and other calculations are started on the same worker during the transfer. But almost all the 32 tasks will end at the same time on each worker, and the transfers will be done sequentially for each of those. You will encounter the same behavior using submit.

Hi @yangtong93 ,

By default, when you call <collection>.compute() or client.gather(), everything transits from the workers to the scheduler and then from the scheduler to the client. If you are sending many GBs of data, this can easily K.O. the host the scheduler runs on, and causes networking on the scheduler to become a bottleneck.

As @guillaumeeb suggested above, the best practice in these cases is to have the workers write to disk directly, without returning the data to the client.

If you can’t do that, you should enable direct transfers Client.gather(..., direct=True), which bypasses the scheduler. Note that your client will need to be able to contact the workers directly, which your network topology/firewall may or may not allow.

1 Like

Hi @guillaumeeb @crusaderky ,
By setting direct_to_workers=True, the effect of reducing transmission time can be achieved. Network transmission is no longer limited by schedulers.

Thank you for your reply, it has helped me a lot.

1 Like