Tasks slowing down significantly after 10-12 batches

I am using dask with AWS ECSCluster and the setup works fine. I have around 100K tasks which I am dividing in batches of 1000. However, I have noticed that the tasks that initially takes ~1 second starts taking 10-20 seconds after 10-12 batches. When I kill the job and restart from the last batch it works fine e.g. if I run from 1-10 batches and if it slows down at the 11th batch, I kill the job and start again from 11 batches and it works fine again.
I am using the default scheduler size and the worker size if 1 CPU and 2 GB memory. Any idea what could be happening here?

Following is the code snippet

ecs_params = {
    "cluster_name_template":"dask-datascience",
    "fargate_scheduler":True,
    "fargate_workers":True,
    "image":f"{AWS_ACCOUNT_ID}.dkr.ecr.{AWS_DEFAULT_REGION}.amazonaws.com/datascience/dask:latest",
    "vpc":VPC_ID,
    "subnets":[SUBNET_ID],
    "security_groups":[SECURITY_GROUP],
    "execution_role_arn":f"arn:aws:iam::{AWS_ACCOUNT_ID}:role/DaskECSClusterRole",
    "task_role_arn":f"arn:aws:iam::{AWS_ACCOUNT_ID}:role/DaskECSTaskRole",
    "worker_cpu":1024,
    "worker_mem":2048,
}
cluster = ECSCluster(**ecs_params)
cluster.adapt(minimum=MIN_WORKERS, maximum=MAX_WORKERS)

logging.info("Initializing client ...")
client = Client(cluster)
logging.info("="*30)
logging.info(f"Client dashboard url: {client.dashboard_link}")
logging.info("="*30)

try:
    all_jobs_data = get_all_jobs_data()
    all_client_data = get_all_client_data()

    client_datasets = client.list_datasets()
    if 'all_jobs_data' not in client_datasets:
        client.persist(dd.from_pandas(all_jobs_data, npartitions=10))
        client.publish_dataset(all_jobs_data=all_jobs_data)
    
    if 'all_client_data' not in client_datasets:
        client.persist(dd.from_pandas(all_client_data, npartitions=10))
        client.publish_dataset(all_client_data=all_client_data)

    batches = [client_refs[i:i + BATCH_SIZE] for i in range(0, len(client_refs), BATCH_SIZE)]
    for index, batch in enumerate(batches):
        logging.info(f"Executing batch {index+1} of {len(batches)} ...")
        logging.info(f"Client ref from {batch[0]} to {batch[-1]} ...")
        
        batch_rec_tasks = [store_recommendation(client_ref) for client_ref in batch]
        batch_result = dask.compute(*batch_rec_tasks)
except Exception as e:
    logging.exception("Exception occurred during dask compute")
finally:
    logging.info("Closing cluster ...")
    client.close()
    cluster.close()
             

are you monitoring the memory? are you using up memory that is perhaps never released back? this could contribute to slower speeds, after some batches. once you “kill the job” you are essentially clearing the memory and starting afresh, which could explain why it suddenly works fast again.

I don’t see any unusual spikes in the workers’ memory. Could this be related to the scheduler? Is there a way to release the memory programmatically? I tried adding client.restart(wait_for_workers=False) after each batch but that didn’t fix the issue.

Hi @afsangujarati93, welcome to Dask community!

Do you have some code snippet on how you are using Dask? Which API/collection?
Also, how many Workers are you using?

And what do you really mean by the time it takes tasks to start, how do you measure this?

I have shared the code snippet in my initial post. I have between 20-25 workers but the issues occur even with fewer workers.

I am looking at the dask dashboard to monitor the progress of the tasks and there I can see the duration each task takes. I do see that initially, it takes roughly 1.5-2 seconds for each task to finish. However, by the time I reach 10-12 batches (i.e. 10K-12K tasks), the time to complete tasks increases up to 25 seconds. I don’t see any errors or anything, so the tasks do run but they slow down.

What is interesting is that when I kill the cluster and restart again (from the last slow running batch), it works fine. As jurgencuschieri mentioned, it is most likely a memory issue but I don’t see anything related to that on the dask dashboard is there some way of releasing the memory manually?

Ok, so this is really the task durations that are increasing, not the scheduling time.

In the dashboard, do you see the memory used by workers increasing? You don’t have any log messages telling Workers are using too much memory?

I tried watching the log trail to see if any specific logs are printed when the tasks slow down, but I didn’t see any.

PFA for the screenshots from the dashboard
This shows that the task is taking ~4 seconds

However, it suddenly jumps to 40+ seconds

The workers don’t show any issues:

The Scheduler system utilization just before the drop

So if I understand, the task slow-down is sudden, going from 4s, to ten times more.

From your dashboard screen copies, I can’t see why this would happen, memory doesn’t seem really high.

Killing the cluster, and restarting it (on a different AWS node I imagine?), fixes the problem.

I’m wondering if this caould be related to another component than Dask. What do you do into the store_recommendation function? Maybe the facility you’re writing result to is slowing down for some reason (too many requests by a given node)? Could you try to measure the writing times if there are any in this function?

1 Like

The issue is now resolved.

This is what I was doing. Before the Dask started processing the tasks, I was sending two datasets (all_jobs_data and all_client_data) to the scheduler.
Now, In my store_recommendation function, I was calling another function which was get_recommendation. In the get_recommendation I was downloading both the datasets to the worker for each task (using get_dataset). So every time a new task runs, it goes to the scheduler and gets those two datasets. The was causing the bandwidth transfer on the scheduler to be very high (around 1.5GB/s) which I assume was the culprit. However, to solve it, I downloaded the dataset only the first time if they didn’t already exist on the worker else, it would refer to the dataset that’s already present on the worker. For anyone wondering how did I do that, here’s the code snippet

worker = get_worker()
client = get_client()
if hasattr(worker, 'all_jobs_data') and client:
    all_jobs_data = worker.all_jobs_data
elif client:
    all_jobs_data = client.get_dataset('all_jobs_data')
    worker.all_jobs_data = all_jobs_data
1 Like