How to disable the distributed scheduler

To put the problem simply, I am required to import a bunch of libraries built upon dask/xarray, and somewhere in the import tree, dask is getting configured to use the distributed scheduler. For various reasons, this is undesirable and I would like to go back to the local scheduler but I cannot seem to do so. Or, at least, I do not know how to verify that it has gone back. I have tried attaching a ProgressBar which should only work on the local scheduler right?

Hi @jay-hennen, welcome to this forum!

I think that can only happen if somewhere in the code a Client object is created, which sounds weird and should probably be corrected on your dependencies.

Edit (2023-02-09): what I said just below is wrong! ProgressBar only works with non distributed Schedulers, and in the documentation we refer anything that is not distributed as ‘local’ schedulers
That’s not correct, ProgressBar will work with any Scheduler. And I’m not sure what you mean by local scheduler, a local scheduler can be either Threaded, Multiprocess or Distributed (LocalCluster class).

Anyway, you can normally chose the Scheduler to use either directly when calling compute method:

x.sum().compute(scheduler='processes')

Either by using the Dask config (and I think this is what you should be doing):

import dask
dask.config.set(scheduler='threads')  # overwrite default with threaded scheduler

There are a lot of information about these in the following documentation:
https://docs.dask.org/en/stable/scheduler-overview.html#configuring-the-schedulers

https://docs.dask.org/en/stable/scheduling.html

Hello!

Perhaps my terminology was inaccurate and for that I apologize. Looking at the second link I believe what is happening is that dask is running with a local, distributed scheduler. I have been led to believe that the ProgressBar I am trying to use does not work with dask in this mode:

https://docs.dask.org/en/latest/diagnostics-local.html#progress-bar

However if it IS expected to work, then I have to figure out why I get no output even when I redirect it to a debug output stream

No problem at all, I was just trying to clarify, maybe my answer was a little rough, sorry if it sounded like it.

I would be curious to know what made you believe this in the link you provided. That could help improving documentation.

This sounds like a different problem that the one you mentioned at first. Could you share some code so that we can help?

I tried again to use ProgressBar in context and as far as I can tell it is created and context-switched correctly. As far as I can tell nothing is being written out. Unfortunately, the actual operation I’m trying to measure is wrapped in so many layers of libraries I don’t know if it’ll be useful to provide the code here.

Here is the dask.config.config as I see it when the operation is initiated. Note the massive change in config once xarray is imported. I can only assume there is some kind of interaction between the MANY libraries in play here (specifically, dask, xarray, intake, intake-xarray, cf-xarray, cf-xarray, extract_model, and more)

Python 3.8.12 | packaged by conda-forge | (default, Oct 12 2021, 21:22:46) [MSC v.1916 64 bit (AMD64)]
Type 'copyright', 'credits' or 'license' for more information
IPython 7.29.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: import dask

In [2]: dask.config.config
Out[2]:
{'temporary-directory': None,
 'tokenize': {'ensure-deterministic': False},
 'dataframe': {'shuffle-compression': None,
  'parquet': {'metadata-task-size-local': 512,
   'metadata-task-size-remote': 16}},
 'array': {'svg': {'size': 120}, 'slicing': {'split-large-chunks': None}},
 'optimization': {'fuse': {'active': None,
   'ave-width': 1,
   'max-width': None,
   'max-height': inf,
   'max-depth-new-edges': None,
   'subgraphs': None,
   'rename-keys': True}}}

In [3]: import xarray
C:\Users\jay.hennen\Miniconda3\envs\gnome-libgoods\lib\site-packages\scipy\__init__.py:146: UserWarning: A NumPy version >=1.16.5 and <1.23.0 is required for this version of SciPy (detected version 1.23.2
  warnings.warn(f"A NumPy version >={np_minversion} and <{np_maxversion}"

In [4]: dask.config.config
Out[4]:
{'temporary-directory': None,
 'tokenize': {'ensure-deterministic': False},
 'dataframe': {'shuffle-compression': None,
  'parquet': {'metadata-task-size-local': 512,
   'metadata-task-size-remote': 16}},
 'array': {'svg': {'size': 120},
  'slicing': {'split-large-chunks': None},
  'chunk-size': '128MiB',
  'rechunk-threshold': 4},
 'optimization': {'fuse': {'active': None,
   'ave-width': 1,
   'max-width': None,
   'max-height': inf,
   'max-depth-new-edges': None,
   'subgraphs': None,
   'rename-keys': True}},
 'distributed': {'version': 2,
  'scheduler': {'allowed-failures': 3,
   'bandwidth': 100000000,
   'blocked-handlers': [],
   'default-data-size': '1kiB',
   'events-cleanup-delay': '1h',
   'idle-timeout': None,
   'transition-log-length': 100000,
   'events-log-length': 100000,
   'work-stealing': True,
   'work-stealing-interval': '100ms',
   'worker-ttl': None,
   'pickle': True,
   'preload': [],
   'preload-argv': [],
   'unknown-task-duration': '500ms',
   'default-task-durations': {'rechunk-split': '1us', 'split-shuffle': '1us'},
   'validate': False,
   'dashboard': {'status': {'task-stream-length': 1000},
    'tasks': {'task-stream-length': 100000},
    'tls': {'ca-file': None, 'key': None, 'cert': None},
    'bokeh-application': {'allow_websocket_origin': ['*'],
     'keep_alive_milliseconds': 500,
     'check_unused_sessions_milliseconds': 500}},
   'locks': {'lease-validation-interval': '10s', 'lease-timeout': '30s'},
   'http': {'routes': ['distributed.http.scheduler.prometheus',
     'distributed.http.scheduler.info',
     'distributed.http.scheduler.json',
     'distributed.http.health',
     'distributed.http.proxy',
     'distributed.http.statics']},
   'allowed-imports': ['dask', 'distributed'],
   'active-memory-manager': {'start': False,
    'interval': '2s',
    'policies': [{'class': 'distributed.active_memory_manager.ReduceReplicas'}]}},
  'worker': {'blocked-handlers': [],
   'multiprocessing-method': 'spawn',
   'use-file-locking': True,
   'connections': {'outgoing': 50, 'incoming': 10},
   'preload': [],
   'preload-argv': [],
   'daemon': True,
   'validate': False,
   'resources': {},
   'lifetime': {'duration': None, 'stagger': '0 seconds', 'restart': False},
   'profile': {'interval': '10ms', 'cycle': '1000ms', 'low-level': False},
   'memory': {'recent-to-old-time': '30s',
    'rebalance': {'measure': 'optimistic',
     'sender-min': 0.3,
     'recipient-max': 0.6,
     'sender-recipient-gap': 0.1},
    'target': 0.6,
    'spill': 0.7,
    'pause': 0.8,
    'terminate': 0.95,
    'max-spill': False,
    'monitor-interval': '100ms'},
   'http': {'routes': ['distributed.http.worker.prometheus',
     'distributed.http.health',
     'distributed.http.statics']}},
  'nanny': {'preload': [],
   'preload-argv': [],
   'environ': {'MALLOC_TRIM_THRESHOLD_': 65536,
    'OMP_NUM_THREADS': 1,
    'MKL_NUM_THREADS': 1}},
  'client': {'heartbeat': '5s',
   'scheduler-info-interval': '2s',
   'security-loader': None,
   'preload': [],
   'preload-argv': []},
  'deploy': {'lost-worker-timeout': '15s', 'cluster-repr-interval': '500ms'},
  'adaptive': {'interval': '1s',
   'target-duration': '5s',
   'minimum': 0,
   'maximum': inf,
   'wait-count': 3},
  'comm': {'retry': {'count': 0, 'delay': {'min': '1s', 'max': '20s'}},
   'compression': 'auto',
   'shard': '64MiB',
   'offload': '10MiB',
   'default-scheme': 'tcp',
   'socket-backlog': 2048,
   'recent-messages-log-length': 0,
   'ucx': {'cuda-copy': None,
    'tcp': None,
    'nvlink': None,
    'infiniband': None,
    'rdmacm': None,
    'create-cuda-context': None},
   'zstd': {'level': 3, 'threads': 0},
   'timeouts': {'connect': '30s', 'tcp': '30s'},
   'require-encryption': None,
   'tls': {'ciphers': None,
    'min-version': 1.2,
    'max-version': None,
    'ca-file': None,
    'scheduler': {'cert': None, 'key': None},
    'worker': {'key': None, 'cert': None},
    'client': {'key': None, 'cert': None}},
   'tcp': {'backend': 'tornado'},
   'websockets': {'shard': '8MiB'}},
  'diagnostics': {'nvml': True,
   'computations': {'max-history': 100,
    'ignore-modules': ['distributed',
     'dask',
     'xarray',
     'cudf',
     'cuml',
     'prefect',
     'xgboost']}},
  'dashboard': {'link': '{scheme}://{host}:{port}/status',
   'export-tool': False,
   'graph-max-items': 5000,
   'prometheus': {'namespace': 'dask'}},
  'admin': {'tick': {'interval': '20ms', 'limit': '3s'},
   'max-error-length': 10000,
   'log-length': 10000,
   'log-format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
   'pdb-on-err': False,
   'system-monitor': {'interval': '500ms'},
   'event-loop': 'tornado'},
  'rmm': {'pool-size': None}}}

Actually, this change in the config is because Dask distributed is imported at some point. You can reproduce this config change just with those lines:

import dask
import distributed
dask.config.config

And really, this is not a problem.

However, I’ve been wrong in my answer above, I apologize for that. Your were totally right: ProgressBar doesn’t work with a distributed scheduler, as it is stated at the top of the documentation page you were mentioning earlier:

dask.diagnostics provides functionality to aid in profiling and inspecting execution with the local task scheduler.

So if a distributed cluster is setup in some part of the code you can’t change, then ProgressBar won’t work. But this means that some part of the code is creating a Client object at some point.

Several things you can do from there:

  • Is there really some already created?
from distributed import get_client
get_client()
  • If there is one and you don’t want to use it:
import dask
dask.config.set(scheduler='threads')

Then the example code below should work:

import dask.array as da
from dask.diagnostics import ProgressBar
a = da.random.normal(size=(10000, 10000), chunks=(1000, 1000))
res = a.dot(a.T).mean(axis=0)
with ProgressBar():
    out = res.compute()
  • If there is one and you want to use it, just use the diagnostics described on this page:
from distributed import progress
import dask.array as da
a = da.random.normal(size=(10000, 10000), chunks=(1000, 1000))
res = a.dot(a.T).mean(axis=0)
res = res.persist()
progress(res)

Hi,

how do we change dask config if we are using dask distributed ECS/EC2 cluster

Hi @hjain371, it is not clear what your question is about. Please start another discussion if you need help in a particular setup.