Hi,
We use ray for our framework and try to make use of the dask on ray functionality as introduced in this video:
https://www.youtube.com/watch?v=sehuP7Hzxtw and on Rays side: Using Dask on Ray — Ray 2.9.1
In the documentation and video it is stated that using .persist() should persist the object on the ray cluster. The following example is given:
#Use our Dask config helper to set the scheduler to ray_dask_get globally,
#without having to specify it on each compute call.
enable_dask_on_ray()
d_arr = da.ones(100)
print(dask.base.collections_to_dsk([d_arr]))
#{(‘ones-c345e6f8436ff9bcd68ddf25287d27f3’,
#0): (functools.partial(<function _broadcast_trick_inner at 0x7f27f1a71f80>,
#dtype=dtype(‘float64’)), (5,))}
#This submits all underlying Ray tasks to the cluster and returns
#a Dask array with the Ray futures inlined.
d_arr_p = d_arr.persist()
#Notice that the Ray ObjectRef is inlined. The dask.ones() task has
#been submitted to and is running on the Ray cluster.
dask.base.collections_to_dsk([d_arr_p])
#{(‘ones-c345e6f8436ff9bcd68ddf25287d27f3’,
#0): ObjectRef(8b4e50dc1ddac855ffffffffffffffffffffffff0100000001000000)}
As you can see it is an ObjectRef as expected after persisting.
Now if we do the same on a random dask df:
enable_dask_on_ray()
df = dd.from_pandas(
pd.DataFrame(np.random.randint(0, 100, size=(1024, 2)), columns=[“age”, “grade”]),
npartitions=2,
)
df = df.persist()
print(dask.base.collections_to_dsk([df]))
We do not obtain a ObjectRef but a HighLevelGraph object (so not a ray storage reference). This means that the persist does not work as expected.
So this:
“calling dask.persist()
with a Dask-on-Ray scheduler will submit the tasks to the Ray cluster and return Ray futures inlined in the Dask collection.”
Does not work for dask dataframes.
Is it expected to work? Do we need to take extra steps? Would really like some help.
Thank you!