Distributed client on K8 OOM issue

I’m trying to read a large number of parquet of files (upwards of 12k, for 2 different tables) and I’m using a Dask cluster on Kubernetes. I’m able to read the files fine (and calling .head() also seems to work fine). However, when I try to join the two tables together (and try to call .head()), the client (Jupyter Notebook) actually runs out of memory, the scheduler and workers seem to have not been affected in any way (that is their memory usage did not spike). I’m curious as to why this would happen, shouldn’t the memory of the client not be impacted and if anything was to OOM it would be the scheduler ?

Greatly appreciate any help

Sorry I should’ve added some more context (on reading the files, I’m still not clear as to why the client is OOM)

When I try to read the files I do something like this

df = dd.read_parquet(dataset_uri(BUCKET_NAME, LAYOUT_NAME, NAME_12K, SESSION_12K, "table_name"),\
                             chunksize='128Mb',engine='pyarrow-dataset' ,columns=COLUMNS  ,\
                             gather_statistics=True ,aggregate_files="partition_key")
df.npartitions

df.npartitions still gives me 12k partitions, shouldn’t I see a smaller number of partitions. I know that dask read_parquet creates one dask dataframe partition per input file but can I somehow combine multiple files into one dataframe partition

1 Like

ahh maybe I was incorrect in my understanding above. Is it accurate that read_parquet reads one parquet row group per dataframe partition ? Can I aggregate multiple row groups into a single partition ?

1 Like

There was something wrong with the dask provided read_parquet instead implemented my own for my own use case and it works better now

1 Like

@bakht - would it be possible to share more about how you re-implemented read_parquet to get things to work?

1 Like

Hi @Gcav66 , I think there is something in dask’s implementation of read_parquet to OOM out the client. We have a lot of small files (that we can’t compact together before hand, so I wanted to simulate some sort of read time compact by combining multiple parquet files into one dask dataframe partition). Originally, the small files were causing the scheduler to OOM out, which makes sense because I assume there were a lot of tasks created and this overwhelmed the scheduler (the scheduler had almost 50Gb of memory, the actual dataset is maybe like 3gb in this case). So then I saw Dask’s best practices guide to use chunking when using “read_parquet” and that stopped the scheduler from OOM (IMPORTANT this produced as many partitions in the dask dataframe as the number of parquet files we had, and this is what I wanted to avoid by doing some sort of read compaction) but then when it came time to join two of these dataframes (that are derived from reading different sets of many small parquet files but this time with chunking enabled), the client side memory would shoot up past 50Gb and cause the client to OOM (and I"m only doing a .head call on the resulting dataframe). This did not make any sense to me because I thought only the workers should be doing the heavy lifting and the scheduler if my task graph is huge (which we tried to mitigate by using chunking in read_parquet).

From there the solution I ended up using (which albeit is not the most efficient but at least produces results) was to use Dask’s delayed API to construct my own read_parquet with which I pre-list out all the directories with the relevant files and then group multiples files into one dask dataframe partition and then read in the parquet files using my parquet reader library, I then concat the multiple pandas dataframes from each individual parquet file to produce one dask dataframe partition. This works but is not the most efficient because obviously it’s not smart enough for doing simple “head” calls but instead just computes the entire dataframe (and probably other optimizations that dask’s read_parquet had)

Please let me know if you need more info from me

1 Like

Ah to be fair, I think most of my problems are coming due to the fact that we are using an older version of dask (2021.03)

2 Likes

@bakht I just wanted to check in, did upgrading Dask resolve all your issues, or are some things still unclear?

Hi @pavithraes , using newer versions of Dask does help solve the issue in terms of reducing dask dataframe partition sizes. However, I still noticed that it seems to just hang when I try to provide the chunksize parameter so that we can combine many files into one of the dataframe partitions. I’ve not looked into this behavior too much since then because I had an acceptable workaround for now

1 Like