I saw this question over on github which made me curious - has anyone had any experience running dask on a databricks cluster?
I don’t know a whole lot about how databricks clusters are running under the hood, but presumably set up would be fairly simple? (I’m curious because I maintain both spark code for some databricks jobs, and dask code elsewhere - being able to deploy dask code on databricks clusters every now and again and make use of the resources would be really cool)
I am also interested in this. I see a lot of folks run Dask on a single-node Databricks cluster, but it would be interesting to get multi-node working. We have dask-yarn but it’s my understanding that Databricks doesn’t use Yarn?
Most of databricks is black magic to me, but it sounds from here like databricks does use yarn, but it’s more of an application running on yarn, rather than using yarn directly?
Searching around briefly I haven’t come across anything other than this suggesting it might but definitely didn’t used to, work just using dask’s threads scheduler? (I wonder if this has changed at all now?) and this medium blog post from 2019 which has some bash scripts to add into databricks’ cluster init.sh - although it looks like it’s based on databricks conda, which I don’t think is supported anymore? (presumably it wouldn’t be too crazy to adapt?)
This is what I understand too, which means you probably don’t have any access to the YaRN API when inside Databricks.
Well, if I understand correctly, it just suggest using Dask in a single node configuration, which is not what you want, as you won’t benefit from all the Workers of you Databricks cluster.
I think it is the best lead: is it still possible to add a custom init script? You need to be able to start a dask-scheduler on the driver node, and dask workers on the worker nodes. So you’ll probably have also to install Dask and your other dependencies on this nodes started inside YaRN, using Conda or pip.
Doh! Yes, you’re 100% right. I was getting mixed up between threading and multiprocessing.
Data bricks does still support cluster configs, and they haven’t changed a whole in how they work since that article. (Other than, I think conda not being supported)
Yeah conda support is gone now, but I’m sure we can manage this with pip.
Using the init scripts to side-load Dask is a really interesting idea, folks use them a bunch to install extra packages at runtime, but I hadn’t thought of using those to start background processes.
I will explore this further, probably not for a couple of weeks though.
Ah nice! Just took an proper look through what the init script in the article i linked before, and can’t see why the overall idea (conditionally executing based on databricks IS_DRIVER environment variable) shouldn’t work.
I’ll try and make some time to figure out a bit more and run some tests. If it does work as simple as that, it would be cool to see whether/how dask running on worker nodes affects performance for spark running across the same network.
Hopefully should have something a little more concrete to feedback soon!
Had an experiment with this just now but haven’t got it working yet, figured I’d share what I came across in case it helps anyone though.
Basically, since 2018, all the environment variables databricks/spark uses are different, so things like IS_DRIVER doesn’t seem to exist.
There are SPARK_LOCAL_IP and DRIVER_IP variables though, and with some text manipulation, I think you can compare that they’re the same.
Something about this logic is wrong though, since I can create a dask distributed client, but it hangs any jobs in pending, I think because there are no workers available (or that it can comunicate with).
Here’s what I tried running as the init script:
LOG_DIR=/dbfs/databricks/scripts/logs/$DB_CLUSTER_ID/dask/
HOSTNAME=`hostname`
# Ensure LOG_DIR exists
mkdir -p "$LOG_DIR"
# Get driver IP
DRIVER_IP=${MASTER#*://}
DRIVER_IP=${DRIVER_IP%%:*}
# Install dask
pip install "dask[complete]"
# if driver node, start dask scheduler
if [[ "$DRIVER_IP" = "$SPARK_LOCAL_IP" ]]; then
dask-scheduler &
echo $! > $LOG_DIR/dask-scheduler.$HOSTNAME.pid
# otherwise, start dask worker
else
dask-worker tcp://$DRIVER_IP:8786 &
echo $! > $LOG_DIR/dask-worker.$HOSTNAME.pid
fi
and then this python in a notebook:
from dask.distributed import Client
import os
client = Client(f'{os.environ["SPARK_LOCAL_IP"]}:8786')
def inc(x):
return x + 1
x = client.submit(inc, 10)
print(x)
Copying that info, it implies this should work fine as an init script to get dask running:
set -e
# Install dask
pip install "dask[complete]"
# if no runtime version start dask worker
if [[ $DB_IS_DRIVER = "TRUE" ]]; then
dask scheduler &
# otherwise, start start dask scheduler
else
sleep 40
dask worker tcp://$DB_DRIVER_IP:8786 &
fi
Only problem is, that it doesn’t look like that works:
from dask.distributed import Client
import os
client = Client(f'{os.environ["SPARK_LOCAL_IP"]}:8786')
def inc(x):
return x + 1
x = client.submit(inc, 10)
leads again to x always being a pending future
I’m sure there must just be one missing part of the puzzle left, but not really sure what it is yet.
Massive thanks to @jacobtomlinson since this is now working. Looks like the only issue was potentially 40 seconds not being enough. There’s probably room to experiment, but here’s what wound up working for me:
set -e
# Install dask
pip install "dask[complete]"
# if no runtime version start dask worker
if [[ $DB_IS_DRIVER = "TRUE" ]]; then
dask scheduler &
sleep 100
# otherwise, start start dask scheduler
else
sleep 100
dask worker tcp://$DB_DRIVER_IP:8786 &
fi