Hi @guillaumeeb , sorry for the late response. Thank you very much for the explanation. It is really hard for me to put a snippet code of the all flow, because it’s a code of data science, and the function that is submitted to dask contains a lot of functions in it, which makes it very hard to debug. But what i did find is that in each worker pod it seems that the memory increases more and more with each task it handles.
When I change the return statement in this submitted function to return an empty list, I see that the the memory is ok and doesn’t change. Also when I change the return instead of returning the output it returns today, to return the same input it got, the memory doesn’t change as well.
So I’m trying to understand if we have a memory leak, or maybe it’s something to do with the memory management in dask, and how the results are stored in the distributed memory of the worker nodes.
If you can elaborate on how the results are stored, it will be great (I read the documentation, but maybe you can add more to that). Do you think it could be related? Or do you have any idea what can I do to better understand the issue?
This is how we set the cluster -
extra_cluster_config["pod_template"] = make_pod_spec(
image=image,
labels=labels,
threads_per_worker=threads_per_worker,
env=env,
extra_container_config=extra_container_config,
extra_pod_config=extra_pod_config,
memory_limit=memory_limit,
memory_request=memory_request,
cpu_limit=cpu_limit,
cpu_request=cpu_request,
)
cluster = KubeCluster(scheduler_service_wait_timeout=240, **extra_cluster_config)
cluster.adapt(minimum=minimum_workers, maximum=maximum_workers)
This is how we set the client -
self._client = Client(cluster, timeout=120)
This is how we submit task to dask - We call the submit from a loop, for each batch of items -
for batch_number, first_batch_index in enumerate(
range(0, len(items_to_predict), NUM_OF_ITEMS_IN_BATCH), start=1
):
self._client.submit(func, *args, **kwargs, key=unique_task_name)
And this is how we collect the results: (We also used the gather api)
n_tasks = len(tasks)
results = []
task_idx = 1
for task in as_completed(tasks):
result = task.result()
results.append(result)
logger.debug(f"Gathered {task_idx} task out of {n_tasks}")
task_idx = task_idx + 1
return results
Thank you very much in advance,
Moran.