Hello
I have a section of code that throws an exception. The code is suppose to return the length of the array . I do not know if it’s a data issue or a data transfer issue. In the post I have added the
dataframe shape, the code in questions , the code to create the array , the exception and the performance report. Let me know what else i can add to assist in finding a solution
The array is created from a single column from a panda data frame < dimensions are below> .
the array is created by this command
op = da.array(df['open'],dtype=float)
which them calls the procedure to get the size
op_len=ar_computesize(op)
def ar_computesize(op_na):
with performance_report(filename="ar_computesize.html"):
dask.distributed.print("op_na type {0} ".format(type(op_na)))
op_len=op_na.compute_chunk_sizes()
dask.distributed.print("op_len {0} ".format(op_len))
return op_len**
Data Frame
Shape is (30585600, 21)
<class 'pandas.core.frame.DataFrame'>
original shape : (Delayed('int-fd9a3be9-634c-41f7-a81a-44c45a698e89'), 21)
exhts_book_rcv_ts exhts_book_exch_ts timestamp \
172799 1691971199494808000 1691971199492000000 1691971199500000000
172799 1683503999493518000 1683503999492000000 1683503999500000000
172799 1688083199495387000 1688083199493000000 1688083199500000000
172799 1679011199494253000 1679011199492000000 1679011199500000000
172799 1692575999499254000 1692575999497000000 1692575999500000000
exhts_eid open high low close \
172799 1691971199492000000 1840.10 1840.10 1840.09 1840.09
172799 1683503999492000000 1869.59 1869.82 1869.59 1869.82
172799 1688083199493000000 1851.18 1851.19 1851.18 1851.19
172799 1679011199492000000 1672.72 1672.72 1672.61 1672.61
172799 1692575999497000000 1684.16 1684.16 1684.15 1684.15
exhts_tradeNum exhts_buyNotional ... exhts_sellQty \
172799 2 2999.36300 ... 0.006
172799 83 103028.01748 ... 76.588
172799 6 2993.37423 ... 12.250
172799 21 18.39992 ... 20.190
172799 7 13663.59008 ... 16.798
exhts_buyLiqQty exhts_sellLiqQty exhts_buyLiqNotional \
172799 0.0 0.0 0.0
172799 0.0 0.0 0.0
172799 0.0 0.0 0.0
172799 0.0 0.0 0.0
172799 0.0 0.0 0.0
exhts_sellLiqNotional exhts_twap exhts_bidQty_1 exhts_ask_1 \
172799 0.0 1840.095000 109.194 1840.10
172799 0.0 1869.691325 26.621 1869.82
172799 0.0 1851.185000 84.492 1851.19
172799 0.0 1672.678095 0.095 1672.61
172799 0.0 1684.154286 91.567 1684.16
exhts_askQty_1 volume
172799 558.642 1.636
172799 9.064 131.694
172799 42.136 13.867
172799 94.546 20.201
172799 128.879 24.911
Trace generated:
op_na type <class 'dask.array.core.Array'>
2023-09-27 16:34:53,794 - distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.10/site-packages/distributed/protocol/core.py", line 109, in dumps frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/home/ubuntu/.local/lib/python3.10/site-packages/msgpack/__init__.py", line 38, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 202, in msgpack._cmsgpack.Packer._pack
ValueError: bytes object is too large
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
Cell In[11], line 128
126 pandas_score = pd.DataFrame({'tmp':[0]})
127 dask.distributed.print("calling ar_computesize type={0}".format(type(op)))
--> 128 op_len=ar_computesize(op)
129 dask.distributed.print("op op_ln={0}".format(op_len))
131 hi_len=ar_computesize(hi)
Cell In[10], line 27, in ar_computesize(op_na)
25 with performance_report(filename="ar_computesize.html"):
26 dask.distributed.print("op_na type {0} ".format(type(op_na)))
---> 27 op_len=op_na.compute_chunk_sizes()
28 dask.distributed.print("op_len {0} ".format(op_len))
29 return op_len
File ~/.local/lib/python3.10/site-packages/dask/array/core.py:1506, in Array.compute_chunk_sizes(self)
1501 c.append(tuple(chunk_shapes[s]))
1503 # `map_blocks` assigns numpy dtypes
1504 # cast chunk dimensions back to python int before returning
1505 x._chunks = tuple(
-> 1506 tuple(int(chunk) for chunk in chunks) for chunks in compute(tuple(c))[0]
1507 )
1508 return x
File ~/.local/lib/python3.10/site-packages/dask/base.py:628, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
625 postcomputes.append(x.__dask_postcompute__())
627 with shorten_traceback():
--> 628 results = schedule(dsk, keys, **kwargs)
630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/.local/lib/python3.10/site-packages/distributed/client.py:2253, in Client._gather(self, futures, errors, direct, local_worker)
2251 else:
2252 raise exception.with_traceback(traceback)
-> 2253 raise exc
2254 if errors == "skip":
2255 bad_keys.add(key)
CancelledError: ('getitem-68b11402de669d2c6381635518f4d78f',)
I have added the performance report
Thank you