asyncio.exceptions.CancelledError when writing to parquet

I need help diagnosing an asyncio.exceptions.CancelledError when attempting to write a data frame to parquet.

TBF I’m probably not using Dask as intended (a la stick with Pandas until you absolutely need Dask), but what I’m attempting to do is load parquet data with Dask, transform it with Pandas, and then write the transformed data to a new parquet with Dask. The intention here is that I can leverage Dask when loading up the transformed data as an intermediate file while I’m writing an ML task.

The code below works fine until we get to the ‘to_parquet’ part, and the export data bit works with two other datasets aside from the one that is causing the issue. It also uses pyarrow if that gives any clues.

Dask version is 2023.6.0
Python version is 3.10

Please let me know if I can give you any more info to help.

def get_index_labs(build:bool=False, save_parquet:bool=False):
    """Get the labs present at index culture"""
    if build:
        # Get and engineer raw data
        ddf_ic = get_index_cultures()
        ddf_labs = extract_index_features(ddf_ic, Paths.LABS_PARQUET_PATH)
    
        df_labs = du.dask_wrapper(ddf_labs.compute)()

        df_labs['SPECIMEN_TYPE'] = df_labs['SPECIMEN_TYPE'].fillna('MISSING SPECIMEN TYPE')
        
        print("Classifiying Encounters")
        df_adt = get_classified_encounters()
        df_adt = df_adt.loc[:, [phi.patient_id, phi.encounter_id, 'ENCOUNTER_TYPE']]
        df_labs = df_labs.merge(df_adt, how='left', on=[phi.patient_id, phi.encounter_id])

        if save_parquet:
            du.export_data(df_labs, Paths.INDEX_LABS_PARQUET_PATH)
    else:
        # Load prebuilt parquet
        df_labs = du.import_data(Paths.INDEX_LABS_PARQUET_PATH, d_func='parquet', compute=True)

    """
    print("Making Feature Labels")
    df_labs = make_feature_labels(df_labs, ['SPECIMEN_TYPE', 'LAB_TEST'], label=FEATURE_LABEL)
    """
    
    return df_labs
ENGINE = 'pyarrow'

def export_data(df:pd.DataFrame|dd.DataFrame, out_path:str, overwrite:bool=True, n_parts=250, **kwargs) -> None:
	"""Exports the dataframe to a parquet folder"""

	# Make sure we're using partitioned DataFrames
	if isinstance(df, pd.DataFrame):
		df = dask_wrapper(dd.from_pandas)(df, npartitions=n_parts)
		print("Successfully converted to dask DataFrame")

	# Get a file path for output
	# TODO: Probably need to revist this
	if overwrite and os.path.isdir(out_path):
		import shutil
		shutil.rmtree(out_path)
	os.mkdir(out_path)

	# Make output
	dask_wrapper(df.to_parquet)(out_path, engine=ENGINE, **kwargs)
<Client: 'tcp://127.0.0.1:41775' processes=12 threads=12, memory=120.00 GiB>
<Client: 'tcp://127.0.0.1:40431' processes=12 threads=12, memory=120.00 GiB>
/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 41862 instead
  warnings.warn(
<Client: 'tcp://127.0.0.1:46483' processes=12 threads=12, memory=120.00 GiB>
<Client: 'tcp://127.0.0.1:45640' processes=12 threads=12, memory=120.00 GiB>
Classifiying Encounters
<Client: 'tcp://127.0.0.1:45770' processes=12 threads=12, memory=120.00 GiB>
<Client: 'tcp://127.0.0.1:34560' processes=12 threads=12, memory=120.00 GiB>
Successfully converted to dask DataFrame
<Client: 'tcp://127.0.0.1:45814' processes=12 threads=12, memory=120.00 GiB>
/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/client.py:3125: UserWarning: Sending large graph of size 20.28 GiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  warnings.warn(
2023-07-26 23:14:01,968 - distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/protocol/core.py", line 109, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 202, in msgpack._cmsgpack.Packer._pack
ValueError: bytes object is too large
2023-07-26 23:14:01,969 - distributed.comm.utils - ERROR - bytes object is too large
Traceback (most recent call last):
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/comm/utils.py", line 55, in _to_frames
    return list(protocol.dumps(msg, **kwargs))
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/protocol/core.py", line 109, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 202, in msgpack._cmsgpack.Packer._pack
ValueError: bytes object is too large
2023-07-26 23:14:01,970 - distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/tornado/gen.py", line 769, in run
    value = future.result()
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/comm/tcp.py", line 271, in write
    frames = await to_frames(
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/comm/utils.py", line 70, in to_frames
    return await offload(_to_frames)
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/utils.py", line 1470, in run_in_executor_with_context
    return await loop.run_in_executor(
  File "/opt/conda/envs/bmx-study/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/utils.py", line 1471, in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/comm/utils.py", line 55, in _to_frames
    return list(protocol.dumps(msg, **kwargs))
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/protocol/core.py", line 109, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 202, in msgpack._cmsgpack.Packer._pack
ValueError: bytes object is too large
2023-07-26 23:14:02,253 - distributed.client - ERROR - 
Traceback (most recent call last):
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/utils.py", line 754, in wrapper
    return await func(*args, **kwargs)
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/client.py", line 1332, in _reconnect
    await self._ensure_connected(timeout=timeout)
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/client.py", line 1362, in _ensure_connected
    comm = await connect(
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/comm/core.py", line 336, in connect
    comm = await wait_for(
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/utils.py", line 1878, in wait_for
    return await asyncio.wait_for(fut, timeout)
  File "/opt/conda/envs/bmx-study/lib/python3.10/asyncio/tasks.py", line 432, in wait_for
    await waiter
asyncio.exceptions.CancelledError
Traceback (most recent call last):
  File "/home/william.m/git/bmx-study/washu/labs.py", line 48, in <module>
    get_index_labs(build=True, save_parquet=True)
  File "/home/william.m/git/bmx-study/washu/labs.py", line 35, in get_index_labs
    du.export_data(df_labs, Paths.INDEX_LABS_PARQUET_PATH)
  File "/home/william.m/git/bmx-study/washu/../washu/data_utility.py", line 108, in export_data
    dask_wrapper(df.to_parquet)(out_path, engine=ENGINE, **kwargs)
  File "/home/william.m/git/bmx-study/washu/../washu/data_utility.py", line 45, in wrapper
    result = func(*args, **kwargs)
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/dask/dataframe/core.py", line 5555, in to_parquet
    return to_parquet(self, path, *args, **kwargs)
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/dask/dataframe/io/parquet/core.py", line 1060, in to_parquet
    out = out.compute(**compute_kwargs)
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/dask/base.py", line 310, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/dask/base.py", line 595, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/client.py", line 3243, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/client.py", line 2368, in gather
    return self.sync(
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/utils.py", line 351, in sync
    return sync(
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/utils.py", line 418, in sync
    raise exc.with_traceback(tb)
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/utils.py", line 391, in f
    result = yield future
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/tornado/gen.py", line 769, in run
    value = future.result()
  File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/client.py", line 2232, in _gather
    raise exc
concurrent.futures._base.CancelledError: ('store-to-parquet-bfbb14fe35f710a58a9872f8a3c064ca', 0)

I’d also be open to ideas as to how to do this properly if that ends up being easier.

This line, is really not good. Looks like you are embedding all of your input data in the tasks graph. This is probably the root cause of the first error ( distributed.protocol.core - CRITICAL - Failed to Serialize), and thus the CancelledError.

I don’t know what you are doing inside the dask_wrapper function, or how you are reading the data, but something is wrong.

def dask_wrapper(func:object, compute:bool=False, n_workers:int=N_WORKERS, threads_per_worker:int=N_THREADS):
	"""Define a wrapper that we can use everywhere"""
	def wrapper(*args, **kwargs):
		with LocalCluster(DASK_DASHBOARD_ADDRESS,
						n_workers=n_workers,
						threads_per_worker=threads_per_worker) as cluster:
			with Client(cluster) as client:
				print(client)
				result = func(*args, **kwargs)
				result = result.compute() if compute else result
		return result
	return wrapper

The wrapper is just a convenient way to launch a cluster. The data is just read in from a parquet using read_parquet and is then computed to do some feature engineering. I then try to convert it back to Dask to leverage multiprocessing on the write out.

I don’t really understand how I would avoid embedding the input data in the tasks graph when using from_pandas.

Okay, I’m not sure why you want to use such a Wrapper, and no just starting a LocalCluster/Client at the beginning to use it all along, but I think this is the problem, or at least part of it. I’m not sure what is the result of this wrapping since you are shutting down the LocalCluster each time. And this is a bit hard to read to understand how the tasks graph will be built.

I think you should try to rewrite the code without the wrapping, and then we would have a better view of what other problem there might be.

Also, I’m not sure why you want to go from Dask to Pandas and back to Dask?