From the test results, it can be seen that the time it takes for the worker to transfer the result data to the main program is becoming longer and longer.
Based on your example and results, I don’t think the time to transfer is really increasing, this is the time between the end of the task and the gathering of the result which is increasing, but this sounds perfectly normal. Let me explain, since you have 32 workers, you have 32 tasks that end at almost the same time. But since you are gathering results sequentially, and it takes about 2 seconds for each result to be gathered back to the client, you’ll have a n4-n3 time of about 32*2s for the 32th result. It’s about the same time you see with a single Worker, 1GB takes about 2 seconds to be transferred.
So in this example, it all comes to the facts that results are being transferred sequentially to the client, and that the task processing time for 32 tasks in parallel is much lower than the time it takes to gather 32GB of results.
There is one weird thing though, the single execution of a Task is slower at the end, not sure why.
So the question is: do you really need to get the results back to the main process? Could you write the results to disk in each tasks? Or could you continue working on the result in a distributed way?
Hi @guillaumeeb, I roughly understood what you said.
I think I can adjust the process to ensure that calculations and transfers occur simultaneously, rather than completing all calculations before transferring.
For example, I can use submit to replace the map and let the worker who completes the task first transfer it. I’m going to give it a try.
Finally, thank you for your guidance and for resolving my doubts.
I was probably unclear: the first transfer starts right after the first task ends, and other calculations are started on the same worker during the transfer. But almost all the 32 tasks will end at the same time on each worker, and the transfers will be done sequentially for each of those. You will encounter the same behavior using submit.
By default, when you call <collection>.compute() or client.gather(), everything transits from the workers to the scheduler and then from the scheduler to the client. If you are sending many GBs of data, this can easily K.O. the host the scheduler runs on, and causes networking on the scheduler to become a bottleneck.
As @guillaumeeb suggested above, the best practice in these cases is to have the workers write to disk directly, without returning the data to the client.
If you can’t do that, you should enable direct transfers Client.gather(..., direct=True), which bypasses the scheduler. Note that your client will need to be able to contact the workers directly, which your network topology/firewall may or may not allow.
Hi @guillaumeeb@crusaderky ,
By setting direct_to_workers=True, the effect of reducing transmission time can be achieved. Network transmission is no longer limited by schedulers.