Dilemma: Schedule IO-Bound / CPU-Bound tasks in cascaded clients

First of all, thank you for this excellent project - from idea to documentation.

BG: I’m coding a CLI application about Voice-AI datasets (different flavors, versions, languages) where a user can specify what to do on what, and individual datasets are also of course different wrt. sizes, so I need an adaptive approach (I already use cluster.adapt()).

Process summary (LocalCluster):

  • IO-BOUND: Read one dataset file in chunks (tarfile generator) using threads (may process 2-3 files - data is on spinning HDD’s)
  • POSSIBLE CPU-BOUND: If requested, transcode/analyze audio using sub-chunks
  • IO-BOUND: Write out the results (extracted raw audio or transcoded one, possibly to parquet if requested) + metadata on audio into another drive (so can be more parallel to reading)

I want separate IO / CPU bound threads/processes to max out CPU/RAM/HDD usage. I was thinking of having outer cluster as 2-3 threads which do the tar.gz chunked reading, and inner cluster using processes (limiting it to unused cores) to do the CPU bound work.

But for that, I need to buffer the data (e.g. read a chunk of 10k recordings, re-chunk them to 100 for transcoding) and pass them to client.submit(), and (obviously) I hit the warning: Consider loading the data with Dask directly or using futures or delayed objects to embed the data into the graph without repetition., as the graph becomes large (already in best-practices here). And eventually the schedular overloads and crashes occur.

These are suggested:

  • loading the data with Dask directly (this is not I want, I want to read sequential data and pass)
  • using futures (I do?)
  • delayed objects to embed the data into the graph without repetition (this I don’t understand)

Can someone please guide me for a solution/example/possible better method?

Why won’t you want that?

Have you taken a look at resources to specialize worker and and affect tasks to them?

Thank you for answering.

My whole point was controlling IO-bound (threads), CPU-bound (processes) and mixed jobs (processes, but might be mixed). That was what I’ve been controlling pre-dask. The data is I deal (files) is un-balanced, 100MB to 100GB. And according to my estimations one process will take 13 days on a 6c/12t machine, so I need to dig.

The whole point is to max CPU, RAM usage and disk r/w bandwidth usage in addition to concurrency & parallelism. And the whole point in moving to dask is to use a LAN cluster at the top level with data on shared drives but process a single file in a single machine. I also check the file sizes and pre-organize the data to maximize parallelism and drop the total amount of time (e.g. starting with largest data).

Reading data within tasks would make everything mixed. I understand that this is due distributed nature and not to pass big data over NW. I was in the impression that when I use LocalCluster that would not be needed (I’ve been using shared data structures with futures),

I also tried with “delayed” at the second level, delaying only the function was not enough, now also trying to delay reading, which also causes problems and it will be the same as reading data within process I think. I also have problems with nout here, as I don’t know the amount until I process them. I wrapped the data into dataclasses for now…

I’m still not thinking dask-ish I think…

I think I read everything and stuff is floating in my head, I had the impression that concept was for multiple machines.

More learning and experimentation is required on my end, and probably I need to fall back to reading data within a task for now :frowning:

I think my main problem lies in the fact that I don’t know the amount of data which will be incoming (without scanning the tarfile twice).

This is not exactly a dask problem, but the large graph issue is limiting it.

  • If I read the data inside futures, I can only create one task, so it becomes serial.
  • If I read the chunks outside and pass, it passes large amount of data and overloads the scheduler.
  • If I use parallelism on the upper level (processing many tarfiles in parallel) seek times get large, also the outputs will be irregular and cause huge disk defragmentation.

Given these, is it possible to get the schedular overhead and just make it use the provided data?

def chunk_reader(...):

def transcode_audio(...):

def process_tarfile(...):
    for chunk in chunk_reader(...):
        futures.append(client.submit(transcode_audio, params))

    for future in as_completed(futures)
        results.append(future.result())

   ...

Or are there any other possibilities for this? Bag? Delayed? As far as I can see they also do not solve this.

It took a while (and asking ChatGPT) for me to understand what you meant. One of my problems was: If I create a cluster with all my cores and try to use that cluster for both files and transcode sub-processes, the cores were filling with file processes.

Now I defined this:

cluster = LocalCluster(
    ...
   resources={"io_bound": 2, "cpu_bound": 10}
    ...
)

and used client.submit(... resources={"io_bound": 1}) in upper level (files), and with dask.annotate(resources={"cpu_bound": 1}): in transcode sub-processes. That did the trick. Thank you!

For the main problem (where to read data) I still do not have a solution, I keep getting warnings. To create sub-processes from large chunk reads, I used dask.bag, re-chunked it to smaller and used delayed.

Although it is far from perfect, it works better now. This one is from a 3 file dataset (train, dev, test) totaling 3607 records, which took 231 secs to transform, or 13 recs/sec.

With this speed, it will take 22-23 days to pre-process my data. I need to play more to optimize…

1 Like

Replaced back everything with two level futures with better optimized chunk sizes, that removed all overhead related to bag/delayed. Still no luck with reading inside sub-futures thou.

But I think this cannot be better:

1 Like

Not sure what you mean by that?

Sorry, bad “terminology”… I think my scenario is similar to the Launch Tasks from Tasks page.

Pseudo algorithm:

  • Get a list of tar.gz files, create a cluster/client, also with the support of resources option as explained above. So that I use 2-3 processes (futures) for handling individual tar.gz files, and the remaining cores for sub-tasks (processes for transcoding the audio in my case).
  • Each file process repeatedly reads in a chunk (say 1000) members from the tar.gz file, filters irrelevant ones out, re-chunks them (say to 100 records), passes them as new processes (futures to do the transcoding) to the same cluster.

I think I tried every possible combination, but could not find a way to read the data at the leaf processes (those for transcoding) lazily. I need to read/write in larger chunks to overcome IO overhead, but that causes large graphs as mentioned in warnings (which I had to silence). The data size it complains is actually in 20-30 MiBs for each sub-chunk (as I pass audio as bytes 100 records take much).

At the end, I need to re-collect them in a list of records and write out them as parquet files of size 500MB-1GB (which uses RAM), which I asked here and here

I’ll try this without sub-chunking (i.e. just reading 100 records and process them directly) and compare the results and see if I can read them lazily.

That worked in an acceptable speed, bars are full, no HDD bandwidth issues for now (increased chunk size to 200 and added some memory management/garbage-collection). I think trying to re-chunk it moved it away from being lazy. No more memory/graph-size related warnings.

In short, my initial way of thinking was not a dask way of doing it.

1 Like

Nice, glad you’ve found a solution!!

By the way @bozden, your use case looks original among Dask users, would you be willing to write a blog post in https://blog.dask.org/? It’s mainly opening a PR and writing a bit of markdown here: GitHub - dask/dask-blog: Dask development blog.

cc @scharlottej13 who could help you in this journey!

2 Likes

Thank you for the opportunity. I’ll be happy to do it after the current project finishes (reached ~60%) and I’ve solved all dask related issues I’ve been “dumping” in this Discourse :frowning:

Yes, the problem is unique, that’s why I’ve been struggling to find a near optimal solution. In most cases the data flow is known and horizontal (same), but here I encounter different patterns.

1 Like