Low cpu utilisation and only unmanaged memory

I am not completly new to dask, however this is the first large scale project with dask where I had to think about memory management and where some other effects appeared that I had never seen before.

I wrote a python script that reads a file from a network drive, analysis it (numpy, skimage) and writes the result to the drive (pandas). It is similar to skimages regionprops, and also uses them, but looks at some additional properties, but does not use dask at all (initially it did, but A) the graph became to complicated when processing hundreds of files in parallel and many of the tasks are really small which caused a lot of overhead and B) dask started moving around the large image data from worker to worker which costed way too much time and didn’t make any sense).

Putting all of that native python in a delayed function (later just called the delayed function, since there is just this one) and processing many files in parallel by creating a list containing the functions calls for different file paths and then commputing that list, did not go as expected:

  1. CPU utilisation is at 100% at the beginning of the first batch, but later drops to 20-30% for most of the time. The taskstream and graph show as expected only the delayed function. Looking at the runtime profile widget of dask there is no single subfunction / part of the analysis that takes especially long, also there are no waiting functions, its just that every part of the analysis is slow, while the cpu is not at its limit at all. I also tried many combinations of n_workers and threads_per_workers, also tried to calculate it batchwise, but no noticable difference. Only difference was when I set n_workers = 16(#cores) and only one thread per core. Then cpu stayed at 100% and also finished faster (only 60-80% runtime compared to before). Interestingly for this configuration according to dask profile, all the small function used for the analysis made up only a small fraction in the overall delayed function, with the remaing big fraction of the delayed function’s runtime being unlabeld. What does that mean?

  2. The memory per worker widget informs me that apparently 0% is managed, its all “unmanaged”.

  3. for large images I have to lower worker and thread number, or go batch by batch as otherwise dask tries to load too many images at once and then kills it workers. By having the batchsize and/or n_workers*threads_per_worker equal to 2x #cores, everything works just fine, so it seems dask is to overconfident at how many tasks it can handle at once.

Any suggestions of what could cause and how to solve the problem(s)?

Could you post a simple code example that shows exactly how are you are writing and calling your delayed functions? Don’t worry about including the details, but my initial thought is that the functions are too lightweight and you are spending a lot of time in scheduler overhead.

Hi Jacob,
The execution time of the delayed function is 5-20min per image, so I doubt overhead is a problem here. Here is the code:

  @dask.delayed()
  def layer_analysis_and_save(z, path_dict, name):
      results = analyze_layer(path_dict, z)
      pd.DataFrame(results).to_csv(f"_temp/{name}_z{z:04d}.csv", index = False)  
      return None
  
  batch_size = 32
      for z in range(z_max):
          queue.append(layer_analysis_and_save(z, path_dict, name))
          
      if batch_size > 1:
          for z in tqdm(range(len(queue))[::batch_size]):   ### batch process
              dask.compute(queue[z:z+batch_size])
      
      else: dask.compute(queue)    ## process all at once

Hi @Paul_S, welcome to Dask Discourse forum!

A few thoughts:

So with this configuration, no performance problem? I think this means that you are bound by the GIL and cannot run layer_analysis_and_save functions calls concurrently.

If you use Delayed and no Dask collection, I don’t think Dask is aware of the memory used by objects. So I would say this is normal.

I’m not sure I understand, especially the second part. Why would you set n_workers*threads equal to 2 times the number of core? And is this enough to prevent memory problems?

In Delayed case, I see no reason Dask would start more than n_workers*threads_per_worker tasks…

Hi @guillaumeeb, many thanks for your input.

I thinks so. Since I don’t have any reference of what the optimal runtime should be I just looked out for every resource being used to its full potential, and with this configuration the cpu util stays at 100% and runtimes impoved as described. The only doubt I have is caused by unlabeled runtime in the dask profile (i.e. the difference between the runtime of the whole delayed function, and the sum of all its components).

What causes this? Thats the first time I have this kind of problem, and I have been using the same standard libaries (numpy, scipy, skimage, pandas) for many other (although smaller) applications.

Is that a problem, does it matter for the performance or the automatic optimisation of the memory usage?

You are correct, if I provide these arguments dask only starts n_workers*threads_per_worker tasks and this was one strategy of limiting the amount of data loaded at once, so that dask does not run out of memory. The idea behind n_workers*threads_per_worker = 2x #cores was that the memory is not the limiting factor, then I would expect this to be the maximum number of tasks the cpu can process (2x because of hyperthreading) and therefore starting any more tasks in parallel would not be faster. And i noticed that if I start dask without providing these arguments (I suspect it chooses them automatically on the workload and machine) it starts much more tasks (which in theory can’t be processed faster anyway), to the point were it has to kill workers because it always runs over the memory limit with all its workers.

more general
is the general concept flawed and I should go back do do many of the mall steps with dask? But how can I tell dask to not move data between workers all the time and how minimize the overhead of having a graph with potentially hundreads of thousands of function calls?

Well, it really depends on your Python code and what you are doing. Not an expert here.

I don’t think this is a problem.

Dask has some rule for choosing the right amount of process/threads per defauts, but the resulting n_workers*threads_per_worker should be equal to the number of cores on your machine, so I’m a little surprised by what you are saying.

No, I think this is the right approach! If high level parallelization is enough, stick with it!