Multi-Threading on workers in Dask Distrubed (>2024.3.0)

Hi,

In previous versions of Dask (pre dask-expr integration), it was possible to run submit “multi-threaded” jobs in the following way:

with ThreadPoolExecutor(
        max_workers=num_threads
    ) as ex:
        futures = []
        results = []
        for param in range(params):
            future = ex.submit(dask_fn, **param)
            futures += [future]

        for future in concurrent.futures.as_completed(futures):
            results += [future.result()]

Now, in the latest version of dask, when this operation is run, it locks the cluster. I.e. No error but just stuck waiting for the future result to be returned.

I know that this may not necessarily be the correct way to get multi-threading on to the worker but it did previously provide about a 2x performance increase.

I have also tried, the following but it results in a similar issue.

def process_partition(df):
    import time
    from concurrent.futures import ThreadPoolExecutor
    def worker(df):
        time.sleep(0.1)  # Simulate a computation-heavy task
        return df.cumprod()

    with ThreadPoolExecutor() as executor:
        df['result'] = list(executor.map(worker, df['column']))
    return df

# Apply the function to each partition
result_ddf = ddf.map_partitions(process_partition, meta=ddf)

If any one can suggest an alternative approve it would be much appreciated!

Hi @Lucas, welcome to Dask community!

If I understand correctly, your problem arrises when trying to apply a multithreaded execution ThreadPoolExecutor on a single partition of a Dask DataFrame in a map_partitions call?

And to be more precise, you want to do that because there might be an inner level of parallelisation on the columns of this DataFrame?

In order to better help, could you build a MVCE of this problem? It would be much easier to help, or to determine if it is an issue. It is true that usually you don’t mix Dask with other parallelisation methods.