data = dd.read_parquet(
data_dir_abs,
columns=columns_to_select,
filesystem="arrow",
ignore_metadata_file=True,
)
I have a 40 parquet files that were written by a spark process, I checked and every file is approximately 1.6gb with almost perfectly sized 128mb row groups. I run lightgbm distributed training using dask, but in the very first step when we read the data in I notice right away there is a worker imbalance. I’m running a mini cluster with 20 workers (one instance has both a worker process and a scheduler process running).
Am I missing something obvious as to why some workers read more data than others?
I’ve read the docs here: dask.dataframe.read_parquet — Dask documentation and also tried repartitioning and rebalancing but no matter what I do I end up with the scenario described above. Any advice here? I’m running on sagemaker so it is not trivial to get access to the Dask UI dashboard but working on getting that to help me debug.
The skew occurs right at the beginning when I load the data, I can see this clearly from the system metrics, since I persist and wait for the dataset to be read in I can pin point exactly when I’m done reading data in.
I really don’t see with your description why there would be such an imbalance. Whats information do you have on the first created DataFrame (data variable)? How many partitions typically?
Except for work stealing if some Worker is slow, there is no reason Workers reads more partitions than others if chunking is correct. You shouldn’t have to rebalance or repartition, things should be fine from the start (and it is much more efficient that way).
Do you have at least some screen copy of this? It is a bit hard to really understand with just that.
I have fewer instances: 40 instances with 192 GB Memory, the total dataset size (compressed) is 80GB, and I have 80 files all just over 1GB, so that I have equally sized row groups of around 128MB.
I forced evaluation after reading in the data and metrics would indicate the worker with most data seems to get roughly 6GB more memory usage. All columns are the same exact data type.
I can pull more detailed numbers / analysis. I will try to send a copy of the dask dashboard maybe, since I’m running on SageMaker I need to do some ssh magic to get that for you
Do you have information on the others? 6GB seems quite normal with an input of 2GB compressed data.
You could check if things are balanced with something like:
import dask.array as da
from distributed import Client
client = Client(n_workers=2, threads_per_worker=2, memory_limit='2GiB')#Create your client
my_arr = client.persist(da.random.random(size=(10000,10000), chunks=(1000,1000)))#Persist your array
workers = client.cluster.scheduler.workers.values()
for ws in workers:
print(ws.metrics['task_counts'])
print(ws.metrics['memory'] / (1024*1024))