Scheduler stuck, unique keys runs slow with time

I’m running multiple DAGs where all tasks are getting pd.DataFrames. the dataframes are 10k rows and 20 columns
After a while, the workers are not processing tasks anymore
I see data (not mine, I guess its Dask’s) task being executed for a very long time
Even after having more workers, the tasks are waiting, and the workers are not running
The scheduler isn’t using a lot of RAM or CPU

It stays like this until I restart the scheduler

@ericman93 Welcome to Discourse!

Based on the “tasks processing” plot, it looks like only one worker is actively computing tasks? Would you be able to share a minimal, reproducible example, it’ll help us diagnose what’s going on.

I couldn’t reproduce it with a minimal example, no matter how hard I tried
I even reproduced the GIL error I see in production (but with much less unresponsive time. in my production environment I see unresponsive time of more than 3 hours)

distributed.core - INFO - Event loop was unresponsive in Scheduler for 164.49s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

Can the GIL event evenloop log might be the reason for the hanging?

Locally I got this log only when my computer went sleeping (after canceling the sleeping I don’t see this log anymore), but in production, I run both scheduler and workers on pods that shouldn’t going to sleep - so I wonder what is the reason

@ericman93 Thanks for sharing. I’m not sure what’s going on here, but it seems to be related to your specific tasks/workflow. Even if it’s not reproducible, can you share a minimal example, and the output of dump_cluster_state?

I’ll keep looking into this though

I’ll take a dump from the production, in the meantime this is the gist of my real code

import asyncio
from random import randint
import pandas as pd
from uuid import uuid4

import time

from dask.distributed import Client

def fetch_data(url: str):
    # in my actual example,
    # I do pooling here and wait for the data to be ready
    return pd.read_csv(url)


def add_columns(df: pd.DataFrame, value: str):
    time.sleep(2)  # mimic long task
    df[f'{value}_column'] = value
    return df


def merge_dfs(*dfs):
    # todo: merge currectly
    # print(f'get {len(dfs)} dfs to merge')
    return dfs[0]


async def do_once():
    url = "https://www.stats.govt.nz/assets/Uploads/Annual-enterprise-survey/Annual-enterprise-survey-2020-financial-year-provisional/Download-data/annual-enterprise-survey-2020-financial-year-provisional-size-bands-csv.csv"

    suffix = uuid4().hex
    async with Client(address='0.0.0.0:8786', asynchronous=True) as client:
        dsk = {
            f'getdata-{suffix}': (fetch_data, url),
            f'f1-{suffix}': (add_columns, f'getdata-{suffix}', randint(0, 200)),
            f'f2-{suffix}': (add_columns, f'getdata-{suffix}', randint(0, 200)),
            f'f3-{suffix}': (add_columns, f'f2-{suffix}', randint(0, 200)),
            f'f4-{suffix}': (add_columns, f'f1-{suffix}', randint(0, 200)),
            f'f5-{suffix}': (add_columns, f'getdata-{suffix}', randint(0, 200)),
            f'merge-{suffix}': (merge_dfs, f'f4-{suffix}', f'f3-{suffix}', f'f5-{suffix}')
        }

        futures = client.get(dsk, keys=[f'merge-{suffix}'], sync=False)
        res = await futures[0]


async def main():
    while True:
        print('running iteration')
        await do_once()
        time.sleep(5)


asyncio.run(main())

can it be because I’m using pd.DataFrame and not dd.DataFrame?

I see you’ve also opened: Unique keys runs slow with time · Issue #5965 · dask/distributed · GitHub, I think we can continue the discussion there. :smile:

moved the conversation to the github issue :slight_smile:

1 Like