Receive ValueError: bytes object is too large before CancelledError: for compute_chunk_sizes()

Hello
I have a section of code that throws an exception. The code is suppose to return the length of the array . I do not know if it’s a data issue or a data transfer issue. In the post I have added the
dataframe shape, the code in questions , the code to create the array , the exception and the performance report. Let me know what else i can add to assist in finding a solution

The array is created from a single column from a panda data frame < dimensions are below> .
the array is created by this command

op = da.array(df['open'],dtype=float)

which them calls the procedure to get the size

op_len=ar_computesize(op)
def ar_computesize(op_na):
    with performance_report(filename="ar_computesize.html"):
        dask.distributed.print("op_na type {0} ".format(type(op_na))) 
        op_len=op_na.compute_chunk_sizes()
        dask.distributed.print("op_len {0} ".format(op_len)) 
    return op_len**

Data Frame

Shape is   (30585600, 21) 
<class 'pandas.core.frame.DataFrame'>
 original shape : (Delayed('int-fd9a3be9-634c-41f7-a81a-44c45a698e89'), 21)
          exhts_book_rcv_ts   exhts_book_exch_ts            timestamp  \
172799  1691971199494808000  1691971199492000000  1691971199500000000   
172799  1683503999493518000  1683503999492000000  1683503999500000000   
172799  1688083199495387000  1688083199493000000  1688083199500000000   
172799  1679011199494253000  1679011199492000000  1679011199500000000   
172799  1692575999499254000  1692575999497000000  1692575999500000000   

                  exhts_eid     open     high      low    close  \
172799  1691971199492000000  1840.10  1840.10  1840.09  1840.09   
172799  1683503999492000000  1869.59  1869.82  1869.59  1869.82   
172799  1688083199493000000  1851.18  1851.19  1851.18  1851.19   
172799  1679011199492000000  1672.72  1672.72  1672.61  1672.61   
172799  1692575999497000000  1684.16  1684.16  1684.15  1684.15   

        exhts_tradeNum  exhts_buyNotional  ...  exhts_sellQty  \
172799               2         2999.36300  ...          0.006   
172799              83       103028.01748  ...         76.588   
172799               6         2993.37423  ...         12.250   
172799              21           18.39992  ...         20.190   
172799               7        13663.59008  ...         16.798   

        exhts_buyLiqQty  exhts_sellLiqQty  exhts_buyLiqNotional  \
172799              0.0               0.0                   0.0   
172799              0.0               0.0                   0.0   
172799              0.0               0.0                   0.0   
172799              0.0               0.0                   0.0   
172799              0.0               0.0                   0.0   

        exhts_sellLiqNotional   exhts_twap  exhts_bidQty_1  exhts_ask_1  \
172799                    0.0  1840.095000         109.194      1840.10   
172799                    0.0  1869.691325          26.621      1869.82   
172799                    0.0  1851.185000          84.492      1851.19   
172799                    0.0  1672.678095           0.095      1672.61   
172799                    0.0  1684.154286          91.567      1684.16   

        exhts_askQty_1   volume  
172799         558.642    1.636  
172799           9.064  131.694  
172799          42.136   13.867  
172799          94.546   20.201  
172799         128.879   24.911  

Trace generated:

 op_na type <class 'dask.array.core.Array'> 
 2023-09-27 16:34:53,794 - distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.10/site-packages/distributed/protocol/core.py", line 109, in dumps frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/msgpack/__init__.py", line 38, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 202, in msgpack._cmsgpack.Packer._pack
ValueError: bytes object is too large
---------------------------------------------------------------------------
CancelledError     Traceback (most recent call last) 
Cell In[11], line 128
    126 pandas_score = pd.DataFrame({'tmp':[0]})
    127 dask.distributed.print("calling ar_computesize  type={0}".format(type(op))) 
--> 128 op_len=ar_computesize(op)
    129 dask.distributed.print("op op_ln={0}".format(op_len)) 
    131 hi_len=ar_computesize(hi)

Cell In[10], line 27, in ar_computesize(op_na)
     25 with performance_report(filename="ar_computesize.html"):
     26  dask.distributed.print("op_na type {0} ".format(type(op_na))) 
---> 27  op_len=op_na.compute_chunk_sizes()
     28  dask.distributed.print("op_len {0} ".format(op_len))   
     29  return op_len

File ~/.local/lib/python3.10/site-packages/dask/array/core.py:1506, in Array.compute_chunk_sizes(self)
   1501     c.append(tuple(chunk_shapes[s]))
   1503 # `map_blocks` assigns numpy dtypes
   1504 # cast chunk dimensions back to python int before returning
   1505 x._chunks = tuple(
-> 1506     tuple(int(chunk) for chunk in chunks) for chunks in compute(tuple(c))[0]
   1507 )
   1508 return x

File ~/.local/lib/python3.10/site-packages/dask/base.py:628, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    625     postcomputes.append(x.__dask_postcompute__())
    627 with shorten_traceback():
--> 628     results = schedule(dsk, keys, **kwargs)
    630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/.local/lib/python3.10/site-packages/distributed/client.py:2253, in Client._gather(self, futures, errors, direct, local_worker)
   2251     else:
   2252         raise exception.with_traceback(traceback)
-> 2253     raise exc
   2254 if errors == "skip":
   2255     bad_keys.add(key)

CancelledError: ('getitem-68b11402de669d2c6381635518f4d78f',)

I have added the performance report

Thank you

Hi @larryverdils,

I edited your post in order to make it more readable, note that you can use code cells to make things clearer.

Unfortunately, it is really hard to tell the problem with the information you gave. What is the df object you use to build the Array? A Pandas DataFrame?

Usually, you build a Dask Array using other methods, see: Create Dask Arrays — Dask documentation.

Thank you for cleaning up the entry

My question is when I attempt to compute the size of the array
op_len=op_na.compute_chunk_sizes()

I get the error
ValueError: bytes object is too large
followed by
CancelledError: (‘getitem-68b11402de669d2c6381635518f4d78f’,)

So I do not know if this is a data error or a communication error ?

As said in my previous post, we lack information to help.

I subspect that the problem comes from how you create the array, what is your df object type and shape?

But without som reproducible example, it will be hard to help.

Originally i was creating the array by readings individual files into panda dataframe and then pushing the dataframe into a Dask dataframe . I have changed the structure after reading

and so far it’s running clean.

Thank you for point me in the right direction.

1 Like