Dask dataframe scheduling policy by partition order

I am looking for a way to prioritize by partition/index order. My computation graph contains lots of backward-looking operations (ie: ffill, shift, etc) so there is no way partition n+1 can finish before lower partition n. That’s why prioritizing by partition order will allow me to get faster partial results and use less memory.

Here a non-satisfactory pseudo-code:

# Start async computation by partition/index order
futures = []
for idx, partition in enumerate(dd.partitions):
    futures.append(client.persist(partition, priority=len(dd.partitions)-idx))

# Wait and display partial result
for future in futures:
    print(future.compute())

Unfortunately:

  • it creates a new graph per partition which is much slower than a single dd.persist()
  • it breaks the dask dashboard “Groups” view showing many items instead of a single task group: