Read_parquet results in imbalanced load across workers

I read in a parquet dataset by doing:

        data = dd.read_parquet(
            data_dir_abs,
            columns=columns_to_select,
            filesystem="arrow",
            ignore_metadata_file=True,
        )

I have a 40 parquet files that were written by a spark process, I checked and every file is approximately 1.6gb with almost perfectly sized 128mb row groups. I run lightgbm distributed training using dask, but in the very first step when we read the data in I notice right away there is a worker imbalance. I’m running a mini cluster with 20 workers (one instance has both a worker process and a scheduler process running).

Am I missing something obvious as to why some workers read more data than others?
I’ve read the docs here: dask.dataframe.read_parquet — Dask documentation and also tried repartitioning and rebalancing but no matter what I do I end up with the scenario described above. Any advice here? I’m running on sagemaker so it is not trivial to get access to the Dask UI dashboard but working on getting that to help me debug.

The skew occurs right at the beginning when I load the data, I can see this clearly from the system metrics, since I persist and wait for the dataset to be read in I can pin point exactly when I’m done reading data in.

I really don’t see with your description why there would be such an imbalance. Whats information do you have on the first created DataFrame (data variable)? How many partitions typically?

Except for work stealing if some Worker is slow, there is no reason Workers reads more partitions than others if chunking is correct. You shouldn’t have to rebalance or repartition, things should be fine from the start (and it is much more efficient that way).

Do you have at least some screen copy of this? It is a bit hard to really understand with just that.