Hey,
I have a large parquet file that I’m trying to distribute across workers. The file fits on my local machine so I can check it has the following properties:
import pandas as pd
import pyarrow.parquet as pq
parquet_file = pq.ParquetFile(train_data_file)
print(f"Number of row groups: {parquet_file.num_row_groups}")
train_raw = pd.read_parquet(train_data_file, columns=cols, use_threads=True)
print(f"Memory usage: {train_raw.memory_usage().sum() / 1024**3} GB")
Number of row groups: 14
Memory usage: 12.07241478562355 GB
The whole
$ ls -lh train.parquet
-rw-rw-r-- 1 blah1 blah2 9.1G Apr 5 13:35 /more/blah/train.parquet
I’m running a cluster with 8 workers, each with 40G of memory (!!). When I load the data into pandas first and then distribute it with from_pandas
, everything works great:
client.restart()
npartitions = 8
train_raw = pd.read_parquet(train_data_file, columns=cols, use_threads=True)
train_raw = train_raw.dropna(subset=[resp])
train_data = dd.from_pandas(train_raw[features + [resp, weight]], npartitions=npartitions)
train_data = train_data.persist()
wait(train_data)
Looking at the dashboard, it seems like memory is nicely distributed across workers. (I tried to include a picture like below, but this board doesn’t let new users post more than one image at once?)
Of course this does not really scale, and in the future I want to be able to reach much larger dataframes (spread out across multiple parquet files). So I try to use `dask.dataframe.read_parquet):
client.restart()
train_raw = dd.read_parquet(
train_data_file,
columns=cols,
)
# Is this even necessary?
train_raw = train_raw.repartition(npartitions)
train_raw = train_raw.persist()
wait(train_raw)
When I do this and look at the dashboard, I see that the read_parquet
job essentially is given to a single worker, which then grows in memory usage until some ~24GB, at which point the worker dies and the job moves to the next worker:
The jobs listed on the dash board are
repartition 0 / 8
split 0 / 1
read-parquet 0 / 1
… where the last one is highlighed as running. The error from my dask cluster:
distributed.scheduler - INFO - Task ('read-parquet-ec3cfb3f93478b36129139b787d98cd6', 0) marked as failed because 3 workers died while trying to run it
And in python:
KilledWorker: ("('read-parquet-ec3cfb3f93478b36129139b787d98cd6', 0)", <WorkerState 'tcp://XXX.XXX.XXX.XXX:38785', name: XXX-pid-XXX, status: closed, memory: 0, processing: 1>)
I don’t really understand what is happening. My parquet file has 14 row groups. Even if it didn’t, the dataframe should fit on a single worker almost four times over. What is going wrong?