Hi, I’m currently using the custom graph api to compute a large set of calculations. I’ve noticed that dasks built in task result transfer protocol is not as fast as some hacked transfer protocol I’ve set up through redis. Using standard transfer protocol approach job takes ~7minutes vs ~5.5minutes for redis approach (consistently reproducible). I’ve shared some pseudocode examples below.
It would take me some time to come up with an actual repro, as I cannot share the nature of the actual problem. However, my main question is not why this is the case, but rather, is there a way to customize the transfer protocol dask uses in the graph execution, so that I can decouple it from my task function?
Standard graph task function: (this approach includes a customized registered serialization family to perform the same serialization I’m doing in the redis version)
def task(dep_results):
result = process_dep_results(dep_results)
return result
Hacked redis transfer protocol:
def task(dep_results_keys):
dep_results = []
for k in dep_results_keys:
dep_result = pickle.loads(lz4.frame.decompress(redis.get(k)))
dep_results.append(dep_result)
result = process_dep_results(dep_results)
res_key = str(uuid4())
redis.set(res_key, lz4.frame.compress(pickle.dumps(result)))
return res_key
Stats of my task result sizes below. (Note: redis value sizes can actually be larger than 512mb with some configuration changes)
count: 25,200
mean: 1mb
std: 18.83mb
p50: 0.025mb
p75: 0.075mb
p95: 1.6mb
P99: 12mb
p99.9: 183mb
max: 1,025mb
Thanks.