Memory usage of the scheduler when computing a dataframe with many partitions and many columns

Hi everyone.

We are currently seeing very high memory usage from the scheduler when trying to compute things for large dataframe with around 5000 partitions and 5000 columns.

I tried to build the example below to replicate the behavior:

class BuildParquetFromNumberColumns:

    def __init__(
        self,
        nb_columns,
    ):
        self.nb_columns = nb_columns

    @property
    def meta(self):
        return pd.DataFrame([], columns=[f'this_is_a_big_column_name{i}' for i in range(self.nb_columns)], dtype=np.int64)

    @property
    def columns(self):
        return self.meta.columns

    def __call__(self, path):
        frame_size = 10 * self.nb_columns
        data = np.arange(path * frame_size, (path + 1) * frame_size).reshape((10, self.nb_columns))
        return pd.DataFrame(data, columns=self.columns).astype(self.meta.dtypes)


def build_df(nb_paths, nb_columns):
    token = tokenize(nb_paths, nb_columns)
    read_func = BuildParquetFromNumberColumns(nb_columns)
    df = dd.from_map(
            read_func,
            range(nb_paths),
            meta=read_func.meta,
            label="read-parquet-file",
            token=token,
        )
    return df

Trying with a few values for nb_paths and nb_columns we observed the following memory usage:

Number partitions Number of columns Scheduler memory usage (MiB)
10 10 212
100 10 214
1000 10 232
10000 10 398
10 100 214
10 1000 215
10 10000 245
100 100 214
1000 1000 232
10000 10000 3500

It looks like we have a multiplicative relation between n_partitions * n_columns for memory usage. Is it possible that we are keeping in the scheduler’s memory one meta for each partition that needs to be computed, even though they are all the same?

Thanks

Milton

Hi @miltava,

I didn’t try the code and looked into it in detail yet.

But yes, the scheduler will track tasks that needs to be computed or are being computed, which are usually in a proportional number to the number of partitions.

See https://distributed.dask.org/en/latest/scheduling-state.html:

The scheduler keeps internal state about several kinds of entities:

  • Individual tasks known to the scheduler
  • Workers connected to the scheduler
  • Clients connected to the scheduler

You’ve got also this answer by @mrocklin:

Salut again Guillaume.

Thank you for your answer.

I understand that the scheduler needs to keep in memory the tasks known to the scheduler.

But if all these tasks have results with the same meta, are they all using the same object in memory to store this information? Or does each one of the them have its own copy of the meta object?

In the first case, we would need only 1 object in memory for all tasks. While in the second, we would need n_partitions objects.

Does that make sense to you?

Hi Guillaume.

@mrocklin response was really helpful. And I’m looking a bit at the dask code.

It seems that if I set enforce_metadata to False, scheduler memory drops considerably.

So changing the original code to:

def build_df(nb_paths, nb_columns):
    token = tokenize(nb_paths, nb_columns)
    read_func = BuildParquetFromNumberColumns(nb_columns)
    df = dd.from_map(
            read_func,
            range(nb_paths),
            meta=read_func.meta,
            label="read-parquet-file",
            token=token,
            enforce_metadata=False,
        )
    return df

The memory usage of processing 10k partitions with 10k columns drops from ~3.5GiB to 380MiB.

I still didn’t get to the part of the code where the actual graph is built. It may be the case that:

  • to enforce the meta the workers need the metadata
  • the meta object was not scattered (and hence is kept in the scheduler memory according to the answer you sent)
  • the meta object is copied for each task to be sent to the scheduler

This would fill up the scheduler memory, right?

Thanks again

Milton