Dask on Databricks Clusters

I saw this question over on github which made me curious - has anyone had any experience running dask on a databricks cluster?

I don’t know a whole lot about how databricks clusters are running under the hood, but presumably set up would be fairly simple? (I’m curious because I maintain both spark code for some databricks jobs, and dask code elsewhere - being able to deploy dask code on databricks clusters every now and again and make use of the resources would be really cool)

I am also interested in this. I see a lot of folks run Dask on a single-node Databricks cluster, but it would be interesting to get multi-node working. We have dask-yarn but it’s my understanding that Databricks doesn’t use Yarn?

1 Like

Most of databricks is black magic to me, but it sounds from here like databricks does use yarn, but it’s more of an application running on yarn, rather than using yarn directly?

Searching around briefly I haven’t come across anything other than this suggesting it might but definitely didn’t used to, work just using dask’s threads scheduler? (I wonder if this has changed at all now?) and this medium blog post from 2019 which has some bash scripts to add into databricks’ cluster init.sh - although it looks like it’s based on databricks conda, which I don’t think is supported anymore? (presumably it wouldn’t be too crazy to adapt?)

This is what I understand too, which means you probably don’t have any access to the YaRN API when inside Databricks.

Well, if I understand correctly, it just suggest using Dask in a single node configuration, which is not what you want, as you won’t benefit from all the Workers of you Databricks cluster.

I think it is the best lead: is it still possible to add a custom init script? You need to be able to start a dask-scheduler on the driver node, and dask workers on the worker nodes. So you’ll probably have also to install Dask and your other dependencies on this nodes started inside YaRN, using Conda or pip.

Doh! Yes, you’re 100% right. I was getting mixed up between threading and multiprocessing.

Data bricks does still support cluster configs, and they haven’t changed a whole in how they work since that article. (Other than, I think conda not being supported)

Yeah conda support is gone now, but I’m sure we can manage this with pip.

Using the init scripts to side-load Dask is a really interesting idea, folks use them a bunch to install extra packages at runtime, but I hadn’t thought of using those to start background processes.

I will explore this further, probably not for a couple of weeks though.

1 Like

@jacobtomlinson, did you get a chance to explore this?

I’d love to have a look at seeing if side-loading dask might do the trick - does anyone have suggestions on where abouts to start looking?

Not yet, but if you make any progress please let us know!

1 Like

For info Add a DatabricksRunner · Issue #2 · jacobtomlinson/dask-hpc-runner · GitHub

1 Like

Ah nice! Just took an proper look through what the init script in the article i linked before, and can’t see why the overall idea (conditionally executing based on databricks IS_DRIVER environment variable) shouldn’t work.

I’ll try and make some time to figure out a bit more and run some tests. If it does work as simple as that, it would be cool to see whether/how dask running on worker nodes affects performance for spark running across the same network.

Hopefully should have something a little more concrete to feedback soon!

Had an experiment with this just now but haven’t got it working yet, figured I’d share what I came across in case it helps anyone though.

Basically, since 2018, all the environment variables databricks/spark uses are different, so things like IS_DRIVER doesn’t seem to exist.

There are SPARK_LOCAL_IP and DRIVER_IP variables though, and with some text manipulation, I think you can compare that they’re the same.

Something about this logic is wrong though, since I can create a dask distributed client, but it hangs any jobs in pending, I think because there are no workers available (or that it can comunicate with).

Here’s what I tried running as the init script:

LOG_DIR=/dbfs/databricks/scripts/logs/$DB_CLUSTER_ID/dask/
HOSTNAME=`hostname`

# Ensure LOG_DIR exists
mkdir -p "$LOG_DIR"

# Get driver IP
DRIVER_IP=${MASTER#*://}
DRIVER_IP=${DRIVER_IP%%:*}

# Install dask
pip install "dask[complete]"

# if driver node, start dask scheduler
if [[ "$DRIVER_IP" = "$SPARK_LOCAL_IP" ]]; then
  dask-scheduler &
  echo $! > $LOG_DIR/dask-scheduler.$HOSTNAME.pid
# otherwise, start dask worker
else
  dask-worker tcp://$DRIVER_IP:8786 &
  echo $! > $LOG_DIR/dask-worker.$HOSTNAME.pid
fi

and then this python in a notebook:

from dask.distributed import Client
import os

client = Client(f'{os.environ["SPARK_LOCAL_IP"]}:8786')

def inc(x):
    return x + 1

x = client.submit(inc, 10)

print(x)

x is just an always pending future though.

Not sure if anyone is still following this other than me :sweat_smile: but I’ve just had another quick look at this, I think I’m almost somewhere:

Found this very helpful guide from databricks on running ray on databricks

Copying that info, it implies this should work fine as an init script to get dask running:

set -e

# Install dask
pip install "dask[complete]"

# if no runtime version start dask worker
if [[ $DB_IS_DRIVER = "TRUE" ]]; then
  dask scheduler &
# otherwise, start start dask scheduler
else
  sleep 40
  dask worker tcp://$DB_DRIVER_IP:8786 &
fi

Only problem is, that it doesn’t look like that works:

from dask.distributed import Client
import os

client = Client(f'{os.environ["SPARK_LOCAL_IP"]}:8786')

def inc(x):
    return x + 1

x = client.submit(inc, 10)

leads again to x always being a pending future :smiling_face_with_tear:

I’m sure there must just be one missing part of the puzzle left, but not really sure what it is yet.

Massive thanks to @jacobtomlinson since this is now working. Looks like the only issue was potentially 40 seconds not being enough. There’s probably room to experiment, but here’s what wound up working for me:

set -e

# Install dask
pip install "dask[complete]"

# if no runtime version start dask worker
if [[ $DB_IS_DRIVER = "TRUE" ]]; then
  dask scheduler &
  sleep 100
# otherwise, start start dask scheduler
else
  sleep 100
  dask worker tcp://$DB_DRIVER_IP:8786 &
fi
1 Like