Client.map() over large iterables is slow …

Gentlefolk,
I am writing a simple system to handle large numbers of embarrassingly parallel tasks. For example, a new student is reasonably asking to start upwards of 1.2M or even 5M new tasks. My fancy cluster sits idle while client.map() processes the load. Here is the basic compute & write to the DB loop:

    tick = time.perf_counter()
    # delayed_instance = delayed(instance)
    # futures = client.compute([delayed_instance(**p) for p in parameters])
    futures = client.map(lambda p: instance(**p), parameters)  # Properly isolates the instance keywords from `client.map()`.
    i = base_index
    for batch in as_completed(futures, with_results=True).batches():
        for future, result in batch:
            i += 1
            if not (i % 10):  # Log results every tenth output
                tock = time.perf_counter() - tick
                count = i - base_index
                s_i = tock / count
                logging.info(f"Count: {count}; Time: {round(tock)}; Seconds/Instance: {s_i:0.4f}; Remaining: {round((instance_count - count) * s_i)}")
                logging.info(result)
            db.batch_result(result)
            future.release()  # As these are Embarrassingly Parallel tasks, clean up memory.
        db.push_batch()
    db.final_push()
    client.shutdown()
    total_time = time.perf_counter() - tick

This line that takes the time is:

 futures = client.map(lambda p: instance(**p), parameters)

I could easily batch this up into smaller submissions and update the as_completed() iterator. (Much as you see me using the .batches() capability of the iterator.) Or, I could fully embrace the asynchronous nature of the task? My question is will client.map() naturally start submitting jobs to the cluster and thus keep my code simpler?

IIUC, you want the batch_size keyword to client.map: API — Dask.distributed 2023.8.1 documentation.

Thank you for sharing your insight. I will implement the addition and report back success.

The batch_size= parameter has solved the delayed start of computation problem. Now we have the much smaller problem of task starvation during loading. More thinking needed on reaching the right balance.

Thank you again for your advice.

Anon,
Andrew