Hi all, currently I am working on a fake generated data which has 100000000 lines, the format is listed as following:
                     size
time_info
2000-01-01 00:00:00     0
2000-01-01 00:00:03     1
2000-01-01 00:00:06     2
2000-01-01 00:00:09     3
2000-01-01 00:00:12     4
Data link: link
I use dask’s read_parquet to read this file in, and it has 100 partitions. My dask client has 4 workers with nearly 16GB memory can be used.
I use dask’s resample to compute the sum of size columns with some time duration.
First example: 20 seconds:
It works fine.
Second example: 17 seconds:
It failed, and the exception log is listed:
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[203], line 1
----> 1 ddf.resample('17S').sum().compute()
File D:\Python3_10\lib\site-packages\dask\base.py:314, in DaskMethodsMixin.compute(self, **kwargs)
    290 def compute(self, **kwargs):
    291     """Compute this dask collection
    292 
    293     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    312     dask.base.compute
    313     """
--> 314     (result,) = compute(self, traverse=False, **kwargs)
    315     return result
File D:\Python3_10\lib\site-packages\dask\base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    596     keys.append(x.__dask_keys__())
    597     postcomputes.append(x.__dask_postcompute__())
--> 599 results = schedule(dsk, keys, **kwargs)
    600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File D:\Python3_10\lib\site-packages\distributed\client.py:3137, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3135         should_rejoin = False
   3136 try:
-> 3137     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3138 finally:
   3139     for f in futures.values():
File D:\Python3_10\lib\site-packages\distributed\client.py:2306, in Client.gather(self, futures, errors, direct, asynchronous)
   2304 else:
   2305     local_worker = None
-> 2306 return self.sync(
   2307     self._gather,
   2308     futures,
   2309     errors=errors,
   2310     direct=direct,
   2311     local_worker=local_worker,
   2312     asynchronous=asynchronous,
   2313 )
File D:\Python3_10\lib\site-packages\distributed\utils.py:338, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    336     return future
    337 else:
--> 338     return sync(
    339         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    340     )
File D:\Python3_10\lib\site-packages\distributed\utils.py:405, in sync(loop, func, callback_timeout, *args, **kwargs)
    403 if error:
    404     typ, exc, tb = error
--> 405     raise exc.with_traceback(tb)
    406 else:
    407     return result
File D:\Python3_10\lib\site-packages\distributed\utils.py:378, in sync.<locals>.f()
    376         future = asyncio.wait_for(future, callback_timeout)
    377     future = asyncio.ensure_future(future)
--> 378     result = yield future
    379 except Exception:
    380     error = sys.exc_info()
File D:\Python3_10\lib\site-packages\tornado\gen.py:769, in Runner.run(self)
    766 exc_info = None
    768 try:
--> 769     value = future.result()
    770 except Exception:
    771     exc_info = sys.exc_info()
File D:\Python3_10\lib\site-packages\distributed\client.py:2169, in Client._gather(self, futures, errors, direct, local_worker)
   2167         exc = CancelledError(key)
   2168     else:
-> 2169         raise exception.with_traceback(traceback)
   2170     raise exc
   2171 if errors == "skip":
File D:\Python3_10\lib\site-packages\dask\dataframe\tseries\resample.py:47, in _resample_series()
     38 new_index = pd.date_range(
     39     start.tz_localize(None),
     40     end.tz_localize(None),
   (...)
     43     name=out.index.name,
     44 ).tz_localize(start.tz, nonexistent="shift_forward")
     46 if not out.index.isin(new_index).all():
---> 47     raise ValueError(
     48         "Index is not contained within new index. This can often be "
     49         "resolved by using larger partitions, or unambiguous "
     50         "frequencies: 'Q', 'A'..."
     51     )
     53 return out.reindex(new_index, fill_value=fill_value)
ValueError: Index is not contained within new index. This can often be resolved by using larger partitions, or unambiguous frequencies: 'Q', 'A'...
I have three questions want to clarify:
- What does it mean “This can often be resolved by using larger partitions”?
- Does this mean that resample function may fail no matter what kinds of time offset is provided?
 My case with 20 seconds works because of luck?
- Do we have any workaround for this kind of error?



