Receiving Error :CancelledError: ['dict-c99565de-7b35-489e-9356-82504a139608'

Hello ,
We are trying to leverage DaskLGBMRegressor and receiving an error . The data structures are

print(x_train.info() )
<class 'dask.dataframe.core.DataFrame'>
Columns: 448 entries, CDL2CROWS_1Min to CDLXSIDEGAP3METHODS_128Min
dtypes: float64(448)None
print(y_train.info() )
<class 'dask.dataframe.core.DataFrame'>
Columns: 1 entries, close_future_512 to close_future_512
dtypes: float64(1)None

the code being executed is

model = lgb.DaskLGBMRegressor()
    
now = datetime.datetime.now()
dask.distributed.print("STATUS -{0} Set patams ".format(now.strftime("%Y-%m-%d %H:%M:%S")))
  
model.set_params(n_estimators=200)
model.set_params(max_depth=7)
model.set_params(num_leaves=16)
model.set_params(boosting_type='goss')
model.set_params(min_child_samples=25)
model.set_params(learning_rate=0.02)
model.set_params(colsample_bytree=0.9)
  
now = datetime.datetime.now()
dask.distributed.print("begin initial training at {}".format(now.strftime("%Y-%m-%d %H:%M:%S")))
gbm = model.fit(X=x_train,y=y_train, eval_metric= usefill_metric)
now = datetime.datetime.now()
dask.distributed.print("finished initial training at {}".format(now.strftime("%Y-%m-%d %H:%M:%S")))
print('feature importance is: ')
print(gbm.feature_importances_)

the error we get is

CancelledError                            Traceback (most recent call last)
Cell In[7], line 285
    283 now = datetime.datetime.now()
    284 dask.distributed.print("begin initial training at {}".format(now.strftime("%Y-%m-%d %H:%M:%S")))
--> 285 gbm = model.fit(X=x_train,y=y_train, eval_metric= usefill_metric)
    286 now = datetime.datetime.now()
    287 dask.distributed.print("finished initial training at {}".format(now.strftime("%Y-%m-%d %H:%M:%S")))

File ~/.local/lib/python3.10/site-packages/lightgbm/dask.py:1406, in DaskLGBMRegressor.fit(self, X, y, sample_weight, init_score, eval_set, eval_names, eval_sample_weight, eval_init_score, eval_metric, **kwargs)
   1392 def fit(  # type: ignore[override]
   1393     self,
   1394     X: _DaskMatrixLike,
   (...)
   1403     **kwargs: Any
   1404 ) -> "DaskLGBMRegressor":
   1405     """Docstring is inherited from the lightgbm.LGBMRegressor.fit."""
-> 1406     self._lgb_dask_fit(
   1407         model_factory=LGBMRegressor,
   1408         X=X,
   1409         y=y,
   1410         sample_weight=sample_weight,
   1411         init_score=init_score,
   1412         eval_set=eval_set,
   1413         eval_names=eval_names,
   1414         eval_sample_weight=eval_sample_weight,
   1415         eval_init_score=eval_init_score,
   1416         eval_metric=eval_metric,
   1417         **kwargs
   1418     )
   1419     return self

File ~/.local/lib/python3.10/site-packages/lightgbm/dask.py:1082, in _DaskLGBMModel._lgb_dask_fit(self, model_factory, X, y, sample_weight, init_score, group, eval_set, eval_names, eval_sample_weight, eval_class_weight, eval_init_score, eval_group, eval_metric, eval_at, **kwargs)
   1079 params = self.get_params(True)  # type: ignore[attr-defined]
   1080 params.pop("client", None)
-> 1082 model = _train(
   1083     client=_get_dask_client(self.client),
   1084     data=X,
   1085     label=y,
   1086     params=params,
   1087     model_factory=model_factory,
   1088     sample_weight=sample_weight,
   1089     init_score=init_score,
   1090     group=group,
   1091     eval_set=eval_set,
   1092     eval_names=eval_names,
   1093     eval_sample_weight=eval_sample_weight,
   1094     eval_class_weight=eval_class_weight,
   1095     eval_init_score=eval_init_score,
   1096     eval_group=eval_group,
   1097     eval_metric=eval_metric,
   1098     eval_at=eval_at,
   1099     **kwargs
   1100 )
   1102 #self.set_params(**model.get_params())  # type: ignore[attr-defined]
   1103 #self._lgb_dask_copy_extra_params(model, self)  # type: ignore[attr-defined]
   1105 return self

File ~/.local/lib/python3.10/site-packages/lightgbm/dask.py:694, in _train(client, data, label, params, model_factory, sample_weight, init_score, group, eval_set, eval_names, eval_sample_weight, eval_class_weight, eval_init_score, eval_group, eval_metric, eval_at, **kwargs)
    692 parts = list(map(delayed, parts))
    693 parts = client.compute(parts)
--> 694 wait(parts)
    696 for part in parts:
    697     if part.status == 'error':  # type: ignore
    698         # trigger error locally

File ~/.local/lib/python3.10/site-packages/distributed/client.py:5267, in wait(fs, timeout, return_when)
   5265     timeout = parse_timedelta(timeout, default="s")
   5266 client = default_client()
-> 5267 result = client.sync(_wait, fs, timeout=timeout, return_when=return_when)
   5268 return result

File ~/.local/lib/python3.10/site-packages/distributed/utils.py:359, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    357     return future
    358 else:
--> 359     return sync(
    360         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    361     )

File ~/.local/lib/python3.10/site-packages/distributed/utils.py:426, in sync(loop, func, callback_timeout, *args, **kwargs)
    424 if error:
    425     typ, exc, tb = error
--> 426     raise exc.with_traceback(tb)
    427 else:
    428     return result

File ~/.local/lib/python3.10/site-packages/distributed/utils.py:399, in sync.<locals>.f()
    397         future = wait_for(future, callback_timeout)
    398     future = asyncio.ensure_future(future)
--> 399     result = yield future
    400 except Exception:
    401     error = sys.exc_info()

File ~/.local/lib/python3.10/site-packages/tornado/gen.py:767, in Runner.run(self)
    765 try:
    766     try:
--> 767         value = future.result()
    768     except Exception as e:
    769         # Save the exception for later. It's important that
    770         # gen.throw() not be called inside this try/except block
    771         # because that makes sys.exc_info behave unexpectedly.
    772         exc: Optional[Exception] = e

File ~/.local/lib/python3.10/site-packages/distributed/client.py:5243, in _wait(fs, timeout, return_when)
   5241 cancelled = [f.key for f in done if f.status == "cancelled"]
   5242 if cancelled:
-> 5243     raise CancelledError(cancelled)
   5245 return DoneAndNotDoneFutures(done, not_done)

CancelledError: ['dict-c99565de-7b35-489e-9356-82504a139608']

The cluster was created using EC2Cluster

Hi @larryverdils,

I edited your post to make code and outputs more readable.

It is very hard to tell without some minimal reproducer and only the final stack trace. Did you track the computation through Dask Dashboard? Do you have other logs (Scheduler or Worker logs) to share?

This might be a memory problem, but the cause might be totally elsewhere.

Thank you for the clean up … what specific components would you need ?
The cluster setup code is

extra_bootstrap = [
"sudo fallocate -l 200G /swapfile1 ",
"sudo fallocate -l 200G /swapfile2 ",
"sudo fallocate -l 200G /swapfile3 ",
“sudo chmod 600 /swapfile1”,
“sudo chmod 600 /swapfile2”,
“sudo chmod 600 /swapfile3”,
“sudo mkswap /swapfile1”,
“sudo mkswap /swapfile2”,
“sudo mkswap /swapfile3”,
“sudo swapon /swapfile1”,
“sudo swapon /swapfile2”,
“sudo swapon /swapfile3”,
“sudo apt-get install software-properties-common”,
"sudo add-apt-repository ppa:deadsnakes/ppa -y ",
“sudo apt install ntpdate”,
“sudo ntpdate time.nist.gov”,
“sudo apt install build-essential”,
“sudo apt install gcc”,
"sudo apt-get update -y ",
“sudo apt-get install python3.10 -y”,
“python -m pip install awscli”,
“python -m pip install boto botocore”,
“python -m pip install jupyter-server-proxy”,
“export DASK_DISTRIBUTED__LOGGING=debug”,
“export DASK_WORKER_MEMORY_LIMIT=0”,
“export DASK_WORKER_MEMORY_TERMINATE=0.99”
]

req_dask_worker_cnt=2
py_packages={“EXTRA_PIP_PACKAGES”:“jupyter-server-proxy s3fs asyncio lightgbm[dask] panda scikit-learn scikit-learn-intelex seaborn scipy matplotlib graphviz joblib nbconvert dpcpp-cpp-rt aiobotocore boto3 aioboto3==11.2.0 “}
req_docker_args=””
worker_instance_type=‘r6id.8xlarge’
scheduler_instance_type=‘c6id.4xlarge’
region_tag=“us-east-2”
debug=req_debug_mode=True
docker_image_tag=“Package dask · GitHub

cluster = EC2Cluster(
env_vars=py_packages,
debug=req_debug_mode,
filesystem_size=900,
docker_image=docker_image_tag,
security_groups=[“sg-0952a59ff138d0edf”],
worker_instance_type=worker_instance_type,
scheduler_instance_type=scheduler_instance_type,
iam_instance_profile={‘Arn’: ‘arn:aws:iam::xxxxx:instance-profile/DaskProfile’},
n_workers=req_dask_worker_cnt,
security=False,
key_name=‘awsthoranalytics’,
extra_bootstrap=extra_bootstrap,
auto_shutdown=False,
instance_tags={“application”:“daskthor”}, # can be set to any name you want
vpc=“vpc-xxxxx”,
subnet_id=‘subnet-xxx’,
availability_zone=“us-east-2a”,
silence_logs=False,
region=‘us-east-2’,
########################################
enable_detailed_monitoring=True,
# private / public is not relevant to the performance, only influences the security
use_private_ip=False
)

I also found these entries in the worker
2023-09-08 14:26:51,825 - distributed.nanny - INFO - Start Nanny at: ‘tcp://11.0.0.52:33775’
2023-09-08 14:26:52,201 - distributed.worker - INFO - Start worker at: tcp://11.0.0.52:3733 3
2023-09-08 14:26:52,201 - distributed.worker - INFO - Listening to: tcp://11.0.0.52:3733 3
2023-09-08 14:26:52,201 - distributed.worker - INFO - Worker name: dask-9f0d0c3c-worker-dc36 53fa
2023-09-08 14:26:52,201 - distributed.worker - INFO - dashboard at: 11.0.0.52:3780 9
2023-09-08 14:26:52,201 - distributed.worker - INFO - Waiting to connect to: tcp://18.189.185.38:878 6
2023-09-08 14:26:52,201 - distributed.worker - INFO - ------------------------------------------------ -
2023-09-08 14:26:52,201 - distributed.worker - INFO - Threads: 3 2
2023-09-08 14:26:52,201 - distributed.worker - INFO - Memory: 247.74 Gi B
2023-09-08 14:26:52,201 - distributed.worker - INFO - Local Directory: /tmp/dask-scratch-space/w orker-vcmqt93t
2023-09-08 14:26:52,201 - distributed.worker - INFO - ------------------------------------------------ -
2023-09-08 14:26:54,054 - distributed.worker - INFO - Starting Worker plugin shuffle
2023-09-08 14:26:54,054 - distributed.worker - INFO - Registered to: tcp://18.189.185.38:878 6
2023-09-08 14:26:54,054 - distributed.worker - INFO - ------------------------------------------------ -
2023-09-08 14:26:54,055 - distributed.core - INFO - Starting established connection to tcp://18.189.18 5.38:8786
2023-09-08 15:17:34,589 - distributed.utils_perf - INFO - full garbage collection released 802.75 MiB from 512 reference cycles (threshold: 9.54 MiB)
2023-09-08 15:22:52,876 - distributed.utils_perf - INFO - full garbage collection released 127.95 MiB from 224 reference cycles (threshold: 9.54 MiB)
2023-09-08 15:29:07,698 - distributed.utils_perf - INFO - full garbage collection released 48.96 MiB f rom 160 reference cycles (threshold: 9.54 MiB)
2023-09-08 16:16:33,472 - distributed.utils_perf - INFO - full garbage collection released 2.14 GiB fr om 189 reference cycles (threshold: 9.54 MiB)
2023-09-08 16:25:06,149 - distributed.utils_perf - INFO - full garbage collection released 696.83 MiB from 190 reference cycles (threshold: 9.54 MiB)
2023-09-08 16:29:32,805 - distributed.utils_perf - INFO - full garbage collection released 148.55 MiB from 377 reference cycles (threshold: 9.54 MiB)
2023-09-08 16:37:47,310 - distributed.utils_perf - INFO - full garbage collection released 1.46 GiB fr om 120 reference cycles (threshold: 9.54 MiB)
2023-09-08 18:02:59,686 - distributed.utils_perf - INFO - full garbage collection released 620.22 MiB from 80 reference cycles (threshold: 9.54 MiB)
2023-09-08 18:05:52,920 - distributed.utils_perf - INFO - full garbage collection released 801.39 MiB from 411 reference cycles (threshold: 9.54 MiB)
2023-09-08 18:10:01,492 - distributed.utils_perf - INFO - full garbage collection released 1.28 GiB fr om 150 reference cycles (threshold: 9.54 MiB)
ubuntu@ip-11-0-0-52:/var/log$

Also found similar entries on the second worker
2023-09-08 14:26:54,443 - distributed.worker - INFO - -------------------------------------------------
2023-09-08 14:26:54,443 - distributed.worker - INFO - Threads: 32
2023-09-08 14:26:54,443 - distributed.worker - INFO - Memory: 247.74 GiB
2023-09-08 14:26:54,443 - distributed.worker - INFO - Local Directory: /tmp/dask-scratch-space/worker-5z421bow
2023-09-08 14:26:54,444 - distributed.worker - INFO - -------------------------------------------------
2023-09-08 14:26:55,391 - distributed.worker - INFO - Starting Worker plugin shuffle
2023-09-08 14:26:55,391 - distributed.worker - INFO - Registered to: tcp://18.189.185.38:8786
2023-09-08 14:26:55,391 - distributed.worker - INFO - -------------------------------------------------
2023-09-08 14:26:55,392 - distributed.core - INFO - Starting established connection to tcp://18.189.185.38:8786
2023-09-08 15:13:08,044 - distributed.utils_perf - INFO - full garbage collection released 127.91 MiB from 330 reference cycles (threshold: 9.54 MiB)
2023-09-08 15:17:35,869 - distributed.utils_perf - INFO - full garbage collection released 59.59 MiB from 364 reference cycles (threshold: 9.54 MiB)
2023-09-08 16:25:17,438 - distributed.utils_perf - INFO - full garbage collection released 1.56 GiB from 86 reference cycles (threshold: 9.54 MiB)
2023-09-08 18:05:22,883 - distributed.utils_perf - INFO - full garbage collection released 800.22 MiB from 457 reference cycles (threshold: 9.54 MiB)
2023-09-08 18:10:01,298 - distributed.utils_perf - INFO - full garbage collection released 8.50 GiB from 202 reference cycles (threshold: 9.54 MiB)
2023-09-08 18:13:43,028 - distributed.utils_perf - INFO - full garbage collection released 2.35 GiB from 80 reference cycles (threshold: 9.54 MiB)
2023-09-08 18:20:50,457 - distributed.utils_perf - INFO - full garbage collection released 3.12 GiB from 214 reference cycles (threshold: 9.54 MiB)

Also attaching worker instance configuration

Thank you for point me into leveraging the performance report. After setting one up I was able to see that the scheduler was under powered and was getting overwhelmed which was causing the issue.

I need to upgrade the scheduler to be able to handle the load from the 4 nodes which it wasn’t able to do before. This fixed the issue

1 Like