Hi,
In previous versions of Dask (pre dask-expr integration), it was possible to run submit “multi-threaded” jobs in the following way:
with ThreadPoolExecutor(
max_workers=num_threads
) as ex:
futures = []
results = []
for param in range(params):
future = ex.submit(dask_fn, **param)
futures += [future]
for future in concurrent.futures.as_completed(futures):
results += [future.result()]
Now, in the latest version of dask, when this operation is run, it locks the cluster. I.e. No error but just stuck waiting for the future result to be returned.
I know that this may not necessarily be the correct way to get multi-threading on to the worker but it did previously provide about a 2x performance increase.
I have also tried, the following but it results in a similar issue.
def process_partition(df):
import time
from concurrent.futures import ThreadPoolExecutor
def worker(df):
time.sleep(0.1) # Simulate a computation-heavy task
return df.cumprod()
with ThreadPoolExecutor() as executor:
df['result'] = list(executor.map(worker, df['column']))
return df
# Apply the function to each partition
result_ddf = ddf.map_partitions(process_partition, meta=ddf)
If any one can suggest an alternative approve it would be much appreciated!