Customize data transfer protocol

Hi, I’m currently using the custom graph api to compute a large set of calculations. I’ve noticed that dasks built in task result transfer protocol is not as fast as some hacked transfer protocol I’ve set up through redis. Using standard transfer protocol approach job takes ~7minutes vs ~5.5minutes for redis approach (consistently reproducible). I’ve shared some pseudocode examples below.

It would take me some time to come up with an actual repro, as I cannot share the nature of the actual problem. However, my main question is not why this is the case, but rather, is there a way to customize the transfer protocol dask uses in the graph execution, so that I can decouple it from my task function?

Standard graph task function: (this approach includes a customized registered serialization family to perform the same serialization I’m doing in the redis version)

def task(dep_results):
    result = process_dep_results(dep_results)
    return result

Hacked redis transfer protocol:

def task(dep_results_keys):
    dep_results = []
    for k in dep_results_keys:
        dep_result = pickle.loads(lz4.frame.decompress(redis.get(k)))
        dep_results.append(dep_result)

    result = process_dep_results(dep_results)
    res_key = str(uuid4())
    redis.set(res_key, lz4.frame.compress(pickle.dumps(result)))
    return res_key

Stats of my task result sizes below. (Note: redis value sizes can actually be larger than 512mb with some configuration changes)

count: 25,200
mean: 1mb
std: 18.83mb
p50: 0.025mb
p75: 0.075mb
p95: 1.6mb
P99: 12mb
p99.9: 183mb
max: 1,025mb

Thanks.

Hi @dask-user,

How are you gathering results using Dask? Though dask.threaded.get? Or do you use a Client by any chance?

You can customize the communications layer, but I’m not sure how complex it might be.

cc @crusaderky who probably knows more here.

While it is possible to inject custom serialization protocols and network protocols into dask, it’s functionality that’s aimed at maintainers of third-party extensions (read: for NVLink in dask-cuda, dask-cudf, etc.) and I would strongly advise final users not to tamper with it.

You’ll find that your transfer rate will go up a lot if you enable direct transfers (Client(direct_to_workers=True)), as you’ll skip the transfer throught the scheduler.