Dask on ray .persist() does not work with dask dataframes

Hi,

We use ray for our framework and try to make use of the dask on ray functionality as introduced in this video:

https://www.youtube.com/watch?v=sehuP7Hzxtw and on Rays side: Using Dask on Ray — Ray 2.9.1

In the documentation and video it is stated that using .persist() should persist the object on the ray cluster. The following example is given:

#Use our Dask config helper to set the scheduler to ray_dask_get globally,
#without having to specify it on each compute call.
enable_dask_on_ray()

d_arr = da.ones(100)
print(dask.base.collections_to_dsk([d_arr]))
#{(‘ones-c345e6f8436ff9bcd68ddf25287d27f3’,
#0): (functools.partial(<function _broadcast_trick_inner at 0x7f27f1a71f80>,
#dtype=dtype(‘float64’)), (5,))}

#This submits all underlying Ray tasks to the cluster and returns
#a Dask array with the Ray futures inlined.
d_arr_p = d_arr.persist()

#Notice that the Ray ObjectRef is inlined. The dask.ones() task has
#been submitted to and is running on the Ray cluster.
dask.base.collections_to_dsk([d_arr_p])
#{(‘ones-c345e6f8436ff9bcd68ddf25287d27f3’,
#0): ObjectRef(8b4e50dc1ddac855ffffffffffffffffffffffff0100000001000000)}

As you can see it is an ObjectRef as expected after persisting.

Now if we do the same on a random dask df:

enable_dask_on_ray()

df = dd.from_pandas(
pd.DataFrame(np.random.randint(0, 100, size=(1024, 2)), columns=[“age”, “grade”]),
npartitions=2,
)

df = df.persist()
print(dask.base.collections_to_dsk([df]))

We do not obtain a ObjectRef but a HighLevelGraph object (so not a ray storage reference). This means that the persist does not work as expected.

So this:
“calling dask.persist() with a Dask-on-Ray scheduler will submit the tasks to the Ray cluster and return Ray futures inlined in the Dask collection.”

Does not work for dask dataframes.

Is it expected to work? Do we need to take extra steps? Would really like some help.

Thank you!

Hi @dylanprins, welcome to Dask Discourse forum,

I’m not going to help a lot as I really do not know Dask on Ray, and Ray itself.

Did the Array exampled worked on your setup?

According to the documentation, it should work with both.

Did you try to open an issue on Ray side?

Hi! Thanks for the message

Yes opened an issue here and on Ray’s forum.

The array example does work indeed, it just does not work for a dask dataframe. But indeed, in the video and the post of ray it seems like both should work but it does not!

So maybe the implementation is incomplete.

1 Like