Error using merge_asof and .tail()

Hello,

I am using merge_asof as means of adding a column as an interpolated dataset (and thus, using direction="nearest").

I am getting an error calling .tail() on a non-computed dataframe that has used merge_asof. I have created a simple, reproducible snippet:

import numpy as np
import pandas as pd
import dask.dataframe as dd

# simple example dataframes
left_df = dd.from_pandas(pd.DataFrame(dict(a=np.linspace(0,1,10))), npartitions=3) # arbitrary npartitions greater than 1
right_df = dd.from_pandas(pd.DataFrame(dict(a=np.sort(np.random.random(20)), b=np.random.random(20))), npartitions=2) # arbitrary npartitions greater than 1
merged_df = dd.merge_asof(left_df, right_df, on='a', direction="nearest")
computed_df = merged_df.compute()

print(computed_df.tail()) # no error
print(merged_df.tail()) # throws error

Error:

---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
Cell In[53], line 12
     10 computed_df = merged_df.compute()
     11 display(computed_df.tail()) # no error
---> 12 display(merged_df.tail()) # throws error

File ~/.local/lib/python3.11/site-packages/dask/dataframe/dask_expr/_collection.py:702, in FrameBase.tail(self, n, compute)
    700 out = new_collection(expr.Tail(self, n=n))
    701 if compute:
--> 702     out = out.compute()
    703 return out

File ~/.local/lib/python3.11/site-packages/dask/base.py:373, in DaskMethodsMixin.compute(self, **kwargs)
    349 def compute(self, **kwargs):
    350     """Compute this dask collection
    351 
    352     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    371     dask.compute
    372     """
--> 373     (result,) = compute(self, traverse=False, **kwargs)
    374     return result

File ~/.local/lib/python3.11/site-packages/dask/base.py:681, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    678     expr = expr.optimize()
    679     keys = list(flatten(expr.__dask_keys__()))
--> 681     results = schedule(expr, keys, **kwargs)
    683 return repack(results)

File ~/.local/lib/python3.11/site-packages/dask/_expr.py:581, in __dask_graph__()
    578     continue
    579 seen.add(expr._name)
--> 581 layers.append(expr._layer())
    582 for operand in expr.dependencies():
    583     stack.append(operand)

File ~/.local/lib/python3.11/site-packages/dask/_expr.py:294, in _layer()
    263 def _layer(self) -> dict:
    264     """The graph layer added by this expression.
    265 
    266     Simple expressions that apply one task per partition can choose to only
   (...)
    291     Expr.__dask_graph__
    292     """
--> 294     return {
    295         (self._name, i): self._task((self._name, i), i)
    296         for i in range(self.npartitions)
    297     }

File ~/.local/lib/python3.11/site-packages/dask/_expr.py:295, in <dictcomp>()
    263 def _layer(self) -> dict:
    264     """The graph layer added by this expression.
    265 
    266     Simple expressions that apply one task per partition can choose to only
   (...)
    291     Expr.__dask_graph__
    292     """
    294     return {
--> 295         (self._name, i): self._task((self._name, i), i)
    296         for i in range(self.npartitions)
    297     }

File ~/.local/lib/python3.11/site-packages/dask/dataframe/dask_expr/_expr.py:2661, in _task()
   2660 def _task(self, name: Key, index: int) -> Task:
-> 2661     raise NotImplementedError()

NotImplementedError:

Versions:

Dask 2025.7.0
Pandas 2.2.3
Numpy 1.26.4

In this example, left_df and right_df are manually defined and arbitrary sizes; each npartitions is arbitrary, but no error is thrown when both have npartitions=1. In my actual context, I am using a dask dataframe made from read_parquet, so I don’t believe this is an error specific to the object origin.

Note also that .head() works and does not throw any error.

I can understand the meaning of the words “Not Implemented” but I’m unsure what this is referring to. Is there something I am doing wrong? Can this error be avoided?

Thank you very much for time and help!

Hi @zacharymartin, welcome to Dask community!

Thanks for the detail post and reproducer, this is really apprieciated.

I’m not sure of why this is erroring, but trying with an older version of Dask (2024.12.1), the code is working, so I would recommend to open an issue on dask repository with your question and telling that it was previously working.

Hi Guillaume,
Thanks for the response. I’ve opened an issue on the dask repo now.
I cannot use previous versions for other issues that merge_asof brought up. They are resolved in the latest version so no need to discuss them now.
In the meantime, I do not absolutely require using .tail() so I can still move forward. Just wanted to bring the bug to attention.
Thanks again!

1 Like