Hi all, currently I am working on a fake generated data which has 100000000 lines, the format is listed as following:
size
time_info
2000-01-01 00:00:00 0
2000-01-01 00:00:03 1
2000-01-01 00:00:06 2
2000-01-01 00:00:09 3
2000-01-01 00:00:12 4
Data link: link
I use dask’s read_parquet to read this file in, and it has 100 partitions. My dask client has 4 workers with nearly 16GB memory can be used.
I use dask’s resample to compute the sum of size columns with some time duration.
First example: 20 seconds:
It works fine.
Second example: 17 seconds:
It failed, and the exception log is listed:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
Cell In[203], line 1
----> 1 ddf.resample('17S').sum().compute()
File D:\Python3_10\lib\site-packages\dask\base.py:314, in DaskMethodsMixin.compute(self, **kwargs)
290 def compute(self, **kwargs):
291 """Compute this dask collection
292
293 This turns a lazy Dask collection into its in-memory equivalent.
(...)
312 dask.base.compute
313 """
--> 314 (result,) = compute(self, traverse=False, **kwargs)
315 return result
File D:\Python3_10\lib\site-packages\dask\base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
596 keys.append(x.__dask_keys__())
597 postcomputes.append(x.__dask_postcompute__())
--> 599 results = schedule(dsk, keys, **kwargs)
600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File D:\Python3_10\lib\site-packages\distributed\client.py:3137, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
3135 should_rejoin = False
3136 try:
-> 3137 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
3138 finally:
3139 for f in futures.values():
File D:\Python3_10\lib\site-packages\distributed\client.py:2306, in Client.gather(self, futures, errors, direct, asynchronous)
2304 else:
2305 local_worker = None
-> 2306 return self.sync(
2307 self._gather,
2308 futures,
2309 errors=errors,
2310 direct=direct,
2311 local_worker=local_worker,
2312 asynchronous=asynchronous,
2313 )
File D:\Python3_10\lib\site-packages\distributed\utils.py:338, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
336 return future
337 else:
--> 338 return sync(
339 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
340 )
File D:\Python3_10\lib\site-packages\distributed\utils.py:405, in sync(loop, func, callback_timeout, *args, **kwargs)
403 if error:
404 typ, exc, tb = error
--> 405 raise exc.with_traceback(tb)
406 else:
407 return result
File D:\Python3_10\lib\site-packages\distributed\utils.py:378, in sync.<locals>.f()
376 future = asyncio.wait_for(future, callback_timeout)
377 future = asyncio.ensure_future(future)
--> 378 result = yield future
379 except Exception:
380 error = sys.exc_info()
File D:\Python3_10\lib\site-packages\tornado\gen.py:769, in Runner.run(self)
766 exc_info = None
768 try:
--> 769 value = future.result()
770 except Exception:
771 exc_info = sys.exc_info()
File D:\Python3_10\lib\site-packages\distributed\client.py:2169, in Client._gather(self, futures, errors, direct, local_worker)
2167 exc = CancelledError(key)
2168 else:
-> 2169 raise exception.with_traceback(traceback)
2170 raise exc
2171 if errors == "skip":
File D:\Python3_10\lib\site-packages\dask\dataframe\tseries\resample.py:47, in _resample_series()
38 new_index = pd.date_range(
39 start.tz_localize(None),
40 end.tz_localize(None),
(...)
43 name=out.index.name,
44 ).tz_localize(start.tz, nonexistent="shift_forward")
46 if not out.index.isin(new_index).all():
---> 47 raise ValueError(
48 "Index is not contained within new index. This can often be "
49 "resolved by using larger partitions, or unambiguous "
50 "frequencies: 'Q', 'A'..."
51 )
53 return out.reindex(new_index, fill_value=fill_value)
ValueError: Index is not contained within new index. This can often be resolved by using larger partitions, or unambiguous frequencies: 'Q', 'A'...
I have three questions want to clarify:
- What does it mean “This can often be resolved by using larger partitions”?
- Does this mean that resample function may fail no matter what kinds of time offset is provided?
My case with 20 seconds works because of luck? - Do we have any workaround for this kind of error?