Dask not distributing reading of parquet file?

Hey,
I have a large parquet file that I’m trying to distribute across workers. The file fits on my local machine so I can check it has the following properties:

import pandas as pd
import pyarrow.parquet as pq

parquet_file = pq.ParquetFile(train_data_file)
print(f"Number of row groups: {parquet_file.num_row_groups}")

train_raw = pd.read_parquet(train_data_file, columns=cols, use_threads=True)
print(f"Memory usage: {train_raw.memory_usage().sum() / 1024**3} GB")

Number of row groups: 14
Memory usage: 12.07241478562355 GB

The whole

$ ls -lh train.parquet
-rw-rw-r-- 1 blah1 blah2 9.1G Apr  5 13:35 /more/blah/train.parquet

I’m running a cluster with 8 workers, each with 40G of memory (!!). When I load the data into pandas first and then distribute it with from_pandas, everything works great:

client.restart()
npartitions = 8

train_raw = pd.read_parquet(train_data_file, columns=cols, use_threads=True)
train_raw = train_raw.dropna(subset=[resp])

train_data = dd.from_pandas(train_raw[features + [resp, weight]], npartitions=npartitions)
train_data = train_data.persist()
wait(train_data)

Looking at the dashboard, it seems like memory is nicely distributed across workers. (I tried to include a picture like below, but this board doesn’t let new users post more than one image at once?)

Of course this does not really scale, and in the future I want to be able to reach much larger dataframes (spread out across multiple parquet files). So I try to use `dask.dataframe.read_parquet):

client.restart()

train_raw = dd.read_parquet(
    train_data_file,
    columns=cols,
)

# Is this even necessary?
train_raw = train_raw.repartition(npartitions)

train_raw = train_raw.persist()
wait(train_raw)

When I do this and look at the dashboard, I see that the read_parquet job essentially is given to a single worker, which then grows in memory usage until some ~24GB, at which point the worker dies and the job moves to the next worker:

image

The jobs listed on the dash board are

repartition         0 / 8
split               0 / 1
read-parquet        0 / 1

… where the last one is highlighed as running. The error from my dask cluster:

distributed.scheduler - INFO - Task ('read-parquet-ec3cfb3f93478b36129139b787d98cd6', 0) marked as failed because 3 workers died while trying to run it

And in python:

KilledWorker: ("('read-parquet-ec3cfb3f93478b36129139b787d98cd6', 0)", <WorkerState 'tcp://XXX.XXX.XXX.XXX:38785', name: XXX-pid-XXX, status: closed, memory: 0, processing: 1>)

I don’t really understand what is happening. My parquet file has 14 row groups. Even if it didn’t, the dataframe should fit on a single worker almost four times over. What is going wrong?

Hi @adfea9c0, welcome to this forum!

I see two problems here.

First, Dask is not splitting your input file, thus it reads all the data in a single partition, which is why you see only one worker actually doing something.
When using parquet, you should not have to use repartition, your data should be read in parallel from the start. Your parquet input is a bit unusual though, it is not split into several files. I’m not sure why Dask is not inferring correctly how to read the file by default, maybe a metadata problem. However, I think that using split_row_groups=True as a kwarg to read_parquet should fix the problem.

Second, the memory use which appears to be higher than with Pandas. Well, I’m at a loss here. Either the numbers reported by Pandas are wrong, either there is a different setting somewhere within Dask, like the default engine? Dask is using Pandas under the hood for each partition, so memory usage should be the same.