Hello,
I am using merge_asof as means of adding a column as an interpolated dataset (and thus, using direction="nearest").
I am getting an error calling .tail() on a non-computed dataframe that has used merge_asof. I have created a simple, reproducible snippet:
import numpy as np
import pandas as pd
import dask.dataframe as dd
# simple example dataframes
left_df = dd.from_pandas(pd.DataFrame(dict(a=np.linspace(0,1,10))), npartitions=3) # arbitrary npartitions greater than 1
right_df = dd.from_pandas(pd.DataFrame(dict(a=np.sort(np.random.random(20)), b=np.random.random(20))), npartitions=2) # arbitrary npartitions greater than 1
merged_df = dd.merge_asof(left_df, right_df, on='a', direction="nearest")
computed_df = merged_df.compute()
print(computed_df.tail()) # no error
print(merged_df.tail()) # throws error
Error:
---------------------------------------------------------------------------
NotImplementedError Traceback (most recent call last)
Cell In[53], line 12
10 computed_df = merged_df.compute()
11 display(computed_df.tail()) # no error
---> 12 display(merged_df.tail()) # throws error
File ~/.local/lib/python3.11/site-packages/dask/dataframe/dask_expr/_collection.py:702, in FrameBase.tail(self, n, compute)
700 out = new_collection(expr.Tail(self, n=n))
701 if compute:
--> 702 out = out.compute()
703 return out
File ~/.local/lib/python3.11/site-packages/dask/base.py:373, in DaskMethodsMixin.compute(self, **kwargs)
349 def compute(self, **kwargs):
350 """Compute this dask collection
351
352 This turns a lazy Dask collection into its in-memory equivalent.
(...)
371 dask.compute
372 """
--> 373 (result,) = compute(self, traverse=False, **kwargs)
374 return result
File ~/.local/lib/python3.11/site-packages/dask/base.py:681, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
678 expr = expr.optimize()
679 keys = list(flatten(expr.__dask_keys__()))
--> 681 results = schedule(expr, keys, **kwargs)
683 return repack(results)
File ~/.local/lib/python3.11/site-packages/dask/_expr.py:581, in __dask_graph__()
578 continue
579 seen.add(expr._name)
--> 581 layers.append(expr._layer())
582 for operand in expr.dependencies():
583 stack.append(operand)
File ~/.local/lib/python3.11/site-packages/dask/_expr.py:294, in _layer()
263 def _layer(self) -> dict:
264 """The graph layer added by this expression.
265
266 Simple expressions that apply one task per partition can choose to only
(...)
291 Expr.__dask_graph__
292 """
--> 294 return {
295 (self._name, i): self._task((self._name, i), i)
296 for i in range(self.npartitions)
297 }
File ~/.local/lib/python3.11/site-packages/dask/_expr.py:295, in <dictcomp>()
263 def _layer(self) -> dict:
264 """The graph layer added by this expression.
265
266 Simple expressions that apply one task per partition can choose to only
(...)
291 Expr.__dask_graph__
292 """
294 return {
--> 295 (self._name, i): self._task((self._name, i), i)
296 for i in range(self.npartitions)
297 }
File ~/.local/lib/python3.11/site-packages/dask/dataframe/dask_expr/_expr.py:2661, in _task()
2660 def _task(self, name: Key, index: int) -> Task:
-> 2661 raise NotImplementedError()
NotImplementedError:
Versions:
Dask 2025.7.0
Pandas 2.2.3
Numpy 1.26.4
In this example, left_df and right_df are manually defined and arbitrary sizes; each npartitions is arbitrary, but no error is thrown when both have npartitions=1. In my actual context, I am using a dask dataframe made from read_parquet, so I don’t believe this is an error specific to the object origin.
Note also that .head() works and does not throw any error.
I can understand the meaning of the words “Not Implemented” but I’m unsure what this is referring to. Is there something I am doing wrong? Can this error be avoided?
Thank you very much for time and help!