I need help diagnosing an asyncio.exceptions.CancelledError when attempting to write a data frame to parquet.
TBF I’m probably not using Dask as intended (a la stick with Pandas until you absolutely need Dask), but what I’m attempting to do is load parquet data with Dask, transform it with Pandas, and then write the transformed data to a new parquet with Dask. The intention here is that I can leverage Dask when loading up the transformed data as an intermediate file while I’m writing an ML task.
The code below works fine until we get to the ‘to_parquet’ part, and the export data
bit works with two other datasets aside from the one that is causing the issue. It also uses pyarrow
if that gives any clues.
Dask version is 2023.6.0
Python version is 3.10
Please let me know if I can give you any more info to help.
def get_index_labs(build:bool=False, save_parquet:bool=False):
"""Get the labs present at index culture"""
if build:
# Get and engineer raw data
ddf_ic = get_index_cultures()
ddf_labs = extract_index_features(ddf_ic, Paths.LABS_PARQUET_PATH)
df_labs = du.dask_wrapper(ddf_labs.compute)()
df_labs['SPECIMEN_TYPE'] = df_labs['SPECIMEN_TYPE'].fillna('MISSING SPECIMEN TYPE')
print("Classifiying Encounters")
df_adt = get_classified_encounters()
df_adt = df_adt.loc[:, [phi.patient_id, phi.encounter_id, 'ENCOUNTER_TYPE']]
df_labs = df_labs.merge(df_adt, how='left', on=[phi.patient_id, phi.encounter_id])
if save_parquet:
du.export_data(df_labs, Paths.INDEX_LABS_PARQUET_PATH)
else:
# Load prebuilt parquet
df_labs = du.import_data(Paths.INDEX_LABS_PARQUET_PATH, d_func='parquet', compute=True)
"""
print("Making Feature Labels")
df_labs = make_feature_labels(df_labs, ['SPECIMEN_TYPE', 'LAB_TEST'], label=FEATURE_LABEL)
"""
return df_labs
ENGINE = 'pyarrow'
def export_data(df:pd.DataFrame|dd.DataFrame, out_path:str, overwrite:bool=True, n_parts=250, **kwargs) -> None:
"""Exports the dataframe to a parquet folder"""
# Make sure we're using partitioned DataFrames
if isinstance(df, pd.DataFrame):
df = dask_wrapper(dd.from_pandas)(df, npartitions=n_parts)
print("Successfully converted to dask DataFrame")
# Get a file path for output
# TODO: Probably need to revist this
if overwrite and os.path.isdir(out_path):
import shutil
shutil.rmtree(out_path)
os.mkdir(out_path)
# Make output
dask_wrapper(df.to_parquet)(out_path, engine=ENGINE, **kwargs)
<Client: 'tcp://127.0.0.1:41775' processes=12 threads=12, memory=120.00 GiB>
<Client: 'tcp://127.0.0.1:40431' processes=12 threads=12, memory=120.00 GiB>
/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 41862 instead
warnings.warn(
<Client: 'tcp://127.0.0.1:46483' processes=12 threads=12, memory=120.00 GiB>
<Client: 'tcp://127.0.0.1:45640' processes=12 threads=12, memory=120.00 GiB>
Classifiying Encounters
<Client: 'tcp://127.0.0.1:45770' processes=12 threads=12, memory=120.00 GiB>
<Client: 'tcp://127.0.0.1:34560' processes=12 threads=12, memory=120.00 GiB>
Successfully converted to dask DataFrame
<Client: 'tcp://127.0.0.1:45814' processes=12 threads=12, memory=120.00 GiB>
/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/client.py:3125: UserWarning: Sending large graph of size 20.28 GiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
warnings.warn(
2023-07-26 23:14:01,968 - distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/protocol/core.py", line 109, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 202, in msgpack._cmsgpack.Packer._pack
ValueError: bytes object is too large
2023-07-26 23:14:01,969 - distributed.comm.utils - ERROR - bytes object is too large
Traceback (most recent call last):
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/comm/utils.py", line 55, in _to_frames
return list(protocol.dumps(msg, **kwargs))
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/protocol/core.py", line 109, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 202, in msgpack._cmsgpack.Packer._pack
ValueError: bytes object is too large
2023-07-26 23:14:01,970 - distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/batched.py", line 115, in _background_send
nbytes = yield coro
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/tornado/gen.py", line 769, in run
value = future.result()
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/comm/tcp.py", line 271, in write
frames = await to_frames(
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/comm/utils.py", line 70, in to_frames
return await offload(_to_frames)
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/utils.py", line 1470, in run_in_executor_with_context
return await loop.run_in_executor(
File "/opt/conda/envs/bmx-study/lib/python3.10/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/utils.py", line 1471, in <lambda>
executor, lambda: context.run(func, *args, **kwargs)
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/comm/utils.py", line 55, in _to_frames
return list(protocol.dumps(msg, **kwargs))
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/protocol/core.py", line 109, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 202, in msgpack._cmsgpack.Packer._pack
ValueError: bytes object is too large
2023-07-26 23:14:02,253 - distributed.client - ERROR -
Traceback (most recent call last):
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/utils.py", line 754, in wrapper
return await func(*args, **kwargs)
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/client.py", line 1332, in _reconnect
await self._ensure_connected(timeout=timeout)
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/client.py", line 1362, in _ensure_connected
comm = await connect(
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/comm/core.py", line 336, in connect
comm = await wait_for(
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/utils.py", line 1878, in wait_for
return await asyncio.wait_for(fut, timeout)
File "/opt/conda/envs/bmx-study/lib/python3.10/asyncio/tasks.py", line 432, in wait_for
await waiter
asyncio.exceptions.CancelledError
Traceback (most recent call last):
File "/home/william.m/git/bmx-study/washu/labs.py", line 48, in <module>
get_index_labs(build=True, save_parquet=True)
File "/home/william.m/git/bmx-study/washu/labs.py", line 35, in get_index_labs
du.export_data(df_labs, Paths.INDEX_LABS_PARQUET_PATH)
File "/home/william.m/git/bmx-study/washu/../washu/data_utility.py", line 108, in export_data
dask_wrapper(df.to_parquet)(out_path, engine=ENGINE, **kwargs)
File "/home/william.m/git/bmx-study/washu/../washu/data_utility.py", line 45, in wrapper
result = func(*args, **kwargs)
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/dask/dataframe/core.py", line 5555, in to_parquet
return to_parquet(self, path, *args, **kwargs)
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/dask/dataframe/io/parquet/core.py", line 1060, in to_parquet
out = out.compute(**compute_kwargs)
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/dask/base.py", line 310, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/dask/base.py", line 595, in compute
results = schedule(dsk, keys, **kwargs)
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/client.py", line 3243, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/client.py", line 2368, in gather
return self.sync(
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/utils.py", line 351, in sync
return sync(
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/utils.py", line 418, in sync
raise exc.with_traceback(tb)
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/utils.py", line 391, in f
result = yield future
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/tornado/gen.py", line 769, in run
value = future.result()
File "/opt/conda/envs/bmx-study/lib/python3.10/site-packages/distributed/client.py", line 2232, in _gather
raise exc
concurrent.futures._base.CancelledError: ('store-to-parquet-bfbb14fe35f710a58a9872f8a3c064ca', 0)