Dask on Databricks Clusters

I saw this question over on github which made me curious - has anyone had any experience running dask on a databricks cluster?

I don’t know a whole lot about how databricks clusters are running under the hood, but presumably set up would be fairly simple? (I’m curious because I maintain both spark code for some databricks jobs, and dask code elsewhere - being able to deploy dask code on databricks clusters every now and again and make use of the resources would be really cool)

I am also interested in this. I see a lot of folks run Dask on a single-node Databricks cluster, but it would be interesting to get multi-node working. We have dask-yarn but it’s my understanding that Databricks doesn’t use Yarn?

1 Like

Most of databricks is black magic to me, but it sounds from here like databricks does use yarn, but it’s more of an application running on yarn, rather than using yarn directly?

Searching around briefly I haven’t come across anything other than this suggesting it might but definitely didn’t used to, work just using dask’s threads scheduler? (I wonder if this has changed at all now?) and this medium blog post from 2019 which has some bash scripts to add into databricks’ cluster init.sh - although it looks like it’s based on databricks conda, which I don’t think is supported anymore? (presumably it wouldn’t be too crazy to adapt?)

This is what I understand too, which means you probably don’t have any access to the YaRN API when inside Databricks.

Well, if I understand correctly, it just suggest using Dask in a single node configuration, which is not what you want, as you won’t benefit from all the Workers of you Databricks cluster.

I think it is the best lead: is it still possible to add a custom init script? You need to be able to start a dask-scheduler on the driver node, and dask workers on the worker nodes. So you’ll probably have also to install Dask and your other dependencies on this nodes started inside YaRN, using Conda or pip.

Doh! Yes, you’re 100% right. I was getting mixed up between threading and multiprocessing.

Data bricks does still support cluster configs, and they haven’t changed a whole in how they work since that article. (Other than, I think conda not being supported)

Yeah conda support is gone now, but I’m sure we can manage this with pip.

Using the init scripts to side-load Dask is a really interesting idea, folks use them a bunch to install extra packages at runtime, but I hadn’t thought of using those to start background processes.

I will explore this further, probably not for a couple of weeks though.

1 Like

@jacobtomlinson, did you get a chance to explore this?

I’d love to have a look at seeing if side-loading dask might do the trick - does anyone have suggestions on where abouts to start looking?

Not yet, but if you make any progress please let us know!

1 Like

For info Add a DatabricksRunner · Issue #2 · jacobtomlinson/dask-hpc-runner · GitHub

1 Like

Ah nice! Just took an proper look through what the init script in the article i linked before, and can’t see why the overall idea (conditionally executing based on databricks IS_DRIVER environment variable) shouldn’t work.

I’ll try and make some time to figure out a bit more and run some tests. If it does work as simple as that, it would be cool to see whether/how dask running on worker nodes affects performance for spark running across the same network.

Hopefully should have something a little more concrete to feedback soon!

Had an experiment with this just now but haven’t got it working yet, figured I’d share what I came across in case it helps anyone though.

Basically, since 2018, all the environment variables databricks/spark uses are different, so things like IS_DRIVER doesn’t seem to exist.

There are SPARK_LOCAL_IP and DRIVER_IP variables though, and with some text manipulation, I think you can compare that they’re the same.

Something about this logic is wrong though, since I can create a dask distributed client, but it hangs any jobs in pending, I think because there are no workers available (or that it can comunicate with).

Here’s what I tried running as the init script:


# Ensure LOG_DIR exists
mkdir -p "$LOG_DIR"

# Get driver IP

# Install dask
pip install "dask[complete]"

# if driver node, start dask scheduler
if [[ "$DRIVER_IP" = "$SPARK_LOCAL_IP" ]]; then
  dask-scheduler &
  echo $! > $LOG_DIR/dask-scheduler.$HOSTNAME.pid
# otherwise, start dask worker
  dask-worker tcp://$DRIVER_IP:8786 &
  echo $! > $LOG_DIR/dask-worker.$HOSTNAME.pid

and then this python in a notebook:

from dask.distributed import Client
import os

client = Client(f'{os.environ["SPARK_LOCAL_IP"]}:8786')

def inc(x):
    return x + 1

x = client.submit(inc, 10)


x is just an always pending future though.