Why Dask resample may fail in some cases?

Hi all, currently I am working on a fake generated data which has 100000000 lines, the format is listed as following:

                     size
time_info
2000-01-01 00:00:00     0
2000-01-01 00:00:03     1
2000-01-01 00:00:06     2
2000-01-01 00:00:09     3
2000-01-01 00:00:12     4

Data link: link

I use dask’s read_parquet to read this file in, and it has 100 partitions. My dask client has 4 workers with nearly 16GB memory can be used.


I use dask’s resample to compute the sum of size columns with some time duration.

First example: 20 seconds:


It works fine.

Second example: 17 seconds:


It failed, and the exception log is listed:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[203], line 1
----> 1 ddf.resample('17S').sum().compute()

File D:\Python3_10\lib\site-packages\dask\base.py:314, in DaskMethodsMixin.compute(self, **kwargs)
    290 def compute(self, **kwargs):
    291     """Compute this dask collection
    292 
    293     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    312     dask.base.compute
    313     """
--> 314     (result,) = compute(self, traverse=False, **kwargs)
    315     return result

File D:\Python3_10\lib\site-packages\dask\base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    596     keys.append(x.__dask_keys__())
    597     postcomputes.append(x.__dask_postcompute__())
--> 599 results = schedule(dsk, keys, **kwargs)
    600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File D:\Python3_10\lib\site-packages\distributed\client.py:3137, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3135         should_rejoin = False
   3136 try:
-> 3137     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3138 finally:
   3139     for f in futures.values():

File D:\Python3_10\lib\site-packages\distributed\client.py:2306, in Client.gather(self, futures, errors, direct, asynchronous)
   2304 else:
   2305     local_worker = None
-> 2306 return self.sync(
   2307     self._gather,
   2308     futures,
   2309     errors=errors,
   2310     direct=direct,
   2311     local_worker=local_worker,
   2312     asynchronous=asynchronous,
   2313 )

File D:\Python3_10\lib\site-packages\distributed\utils.py:338, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    336     return future
    337 else:
--> 338     return sync(
    339         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    340     )

File D:\Python3_10\lib\site-packages\distributed\utils.py:405, in sync(loop, func, callback_timeout, *args, **kwargs)
    403 if error:
    404     typ, exc, tb = error
--> 405     raise exc.with_traceback(tb)
    406 else:
    407     return result

File D:\Python3_10\lib\site-packages\distributed\utils.py:378, in sync.<locals>.f()
    376         future = asyncio.wait_for(future, callback_timeout)
    377     future = asyncio.ensure_future(future)
--> 378     result = yield future
    379 except Exception:
    380     error = sys.exc_info()

File D:\Python3_10\lib\site-packages\tornado\gen.py:769, in Runner.run(self)
    766 exc_info = None
    768 try:
--> 769     value = future.result()
    770 except Exception:
    771     exc_info = sys.exc_info()

File D:\Python3_10\lib\site-packages\distributed\client.py:2169, in Client._gather(self, futures, errors, direct, local_worker)
   2167         exc = CancelledError(key)
   2168     else:
-> 2169         raise exception.with_traceback(traceback)
   2170     raise exc
   2171 if errors == "skip":

File D:\Python3_10\lib\site-packages\dask\dataframe\tseries\resample.py:47, in _resample_series()
     38 new_index = pd.date_range(
     39     start.tz_localize(None),
     40     end.tz_localize(None),
   (...)
     43     name=out.index.name,
     44 ).tz_localize(start.tz, nonexistent="shift_forward")
     46 if not out.index.isin(new_index).all():
---> 47     raise ValueError(
     48         "Index is not contained within new index. This can often be "
     49         "resolved by using larger partitions, or unambiguous "
     50         "frequencies: 'Q', 'A'..."
     51     )
     53 return out.reindex(new_index, fill_value=fill_value)

ValueError: Index is not contained within new index. This can often be resolved by using larger partitions, or unambiguous frequencies: 'Q', 'A'...

I have three questions want to clarify:

  1. What does it mean “This can often be resolved by using larger partitions”?
  2. Does this mean that resample function may fail no matter what kinds of time offset is provided?
    My case with 20 seconds works because of luck?
  3. Do we have any workaround for this kind of error?

Hi @vava24680,

Okay, I’m not an expert here, so take it with care.

I think what happens is that Dask would need data from several partitions do do the resampling with this 17s. It doesn’t play well with the divisions of the input dataset somehow. This is only a guess, I didn’t look at the internal here.

The workaround would be to chose a rule that is compatible with your Dataframe.

I know this is not a good answer, I’ll see if I can find some better explanation somewhere.