Multi-region clusters

I’m looking at using Dask for distributing computation on big (weather and climate) datasets between data centers. The goal is to automagically process data where it resides, but pass results between workers when required.

I’m finding that the scheduler will assign tasks to workers that do not have a chunk of data if all workers that do are busy. Is there any way to prevent this, perhaps by influencing the scheduler’s assessment of the cost of copying chunks?

Ah, so I’ve found with dask.config.set({'scheduler.work-stealing': True}), but the call fails.

import dask

with dask.config.set({'scheduler.work-stealing': True}):
    pass

is fine, but

from dask.distributed import Client
client = Client()

with dask.config.set({'scheduler.work-stealing': True}):
    pass

fails with

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [3], in <module>
----> 1 with dask.config.set({'scheduler.work-stealing': True}):
      2     pass

File ~/miniconda3/envs/irisxarray/lib/python3.10/site-packages/dask/config.py:310, in set.__init__(self, arg, config, lock, **kwargs)
    308     for key, value in arg.items():
    309         key = check_deprecations(key)
--> 310         self._assign(key.split("."), value, config)
    311 if kwargs:
    312     for key, value in kwargs.items():

File ~/miniconda3/envs/irisxarray/lib/python3.10/site-packages/dask/config.py:370, in set._assign(self, keys, value, d, path, record)
    368     # No need to record subsequent operations after an insert
    369     record = False
--> 370 self._assign(keys[1:], value, d[key], path, record=record)

File ~/miniconda3/envs/irisxarray/lib/python3.10/site-packages/dask/config.py:362, in set._assign(self, keys, value, d, path, record)
    360         else:
    361             self._record.append(("insert", path, None))
--> 362     d[key] = value
    363 else:
    364     if key not in d:

TypeError: 'str' object does not support item assignment

This later may not be a bug as such https://github.com/dask/distributed/issues/5752

1 Like

@dmcg Welcome to Discourse, and thanks for creating the issue!

Since this is related to Understanding Work Stealing, I think we can continue this discussion there.

To close the loop on this topic:

  • we need to use distributed.scheduler.work-stealing here, and
  • setting the config values using the config file or environment variables is recommended (because some config values are set only during scheduler start-up)