Unable to create Dask dataframe at scale

Context

The goal of my code is to:

  1. read many (~30k) files (>10TB in total, each one 3MB - 3GB in size), these are non-standard and have no dask readers,
  2. extract features from them, (these features are small - typically only 1 - 10% of the original file).
  3. save features to a dataframe on local disk where I can use them later for dataframe type analysis (groupbys, correlations, conditional means etc.)

I’m using Dask 2022.02.1 on a debian 10 cluster. I’m using the dask.futures api because part of the feature extraction involves reference data which I scatter to each worker.

Problem

My code works fine on a small scale when I write the data frame to hdf format. However, I run the code on all the data it crashes with the following exception:

distributed.worker - ERROR - Worker stream died during communication: tcp://127.0.0.1:39109
Traceback (most recent call last):
  File "/home/rob/miniconda3/envs/bax/lib/python3.9/site-packages/distributed/comm/tcp.py", line 211, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError

Code


    cluster = LocalCluster(n_workers=8, 
                           threads_per_worker=1, 
                           dashboard_address='localhost:8888', 
                           memory_limit=10GiB)
    client = Client(cluster)
    client.amm.start()

    reference = load_reference_data(path_to_reference_data)
    ref_future = client.scatter(reference, broadcast=True)

    pdf_futures = []. # pdf = 'pandas dataframe' feature df created by `load`
    for path in paths:  # list of paths to files len(paths)~30k
        pdf_futures.append(client.submit(load, *(str(path), ref_future))

    ddf_future = dd.from_delayed(pdf_futures, meta=dict_of_col_types)
    
    dd.to_hdf(ddf_future, path='output.h5', key='output', scheduler=client, compute=True )
    client.shutdown()

things I have tried

I’ve tried to write out individual dataframes but I can’t get this to work. Here are some attempts:

Attempt 1:

results = []
for path in sample_paths: 
    pdf_future = client.submit(load, *(str(path), ref_future))
    ddf = dd.from_delayed([pdf_future], meta=dict_of_col_type)
    write = ddf.to_hdf(out_dir.joinpath(Path(path.with_suffix('.hdf').name)), 
                            key='features', 
                            compute=False, scheduler=client)

    results.append(write)

client.compute(*results)

I get this error:

TypeError: Truth of Delayed objects is not supported

Attempt 2:

for path in sample_paths: 
    pdf_future = client.submit(load, *(str(path), ref_future))

    ddf_future = client.submit(dd.from_delayed, pdf_future, meta=dict_of_col_type) 
            
    write = client.submit(dd.to_hdf, ddf_future, 
                            path=out_dir.joinpath(Path(path.with_suffix('.hdf').name)), 
                            key='features', 
                            compute=False)


client.gather(results)

I get this error:

---> 32 client.gather(results)

File ~/miniconda3/envs/bax/lib/python3.9/site-packages/distributed/client.py:2162, in Client.gather(self, futures, errors, direct, asynchronous)
   2160 else:
   2161     local_worker = None
-> 2162 return self.sync(
   2163     self._gather,
   2164     futures,
   2165     errors=errors,
   2166     direct=direct,
   2167     local_worker=local_worker,
   2168     asynchronous=asynchronous,
   2169 )

File ~/miniconda3/envs/bax/lib/python3.9/site-packages/distributed/utils.py:311, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
...
--> 601         raise TypeError("Expected Delayed object, got %s" % type(df).__name__)
    603 if meta is None:
    604     meta = delayed(make_meta)(dfs[0]).compute()

TypeError: Expected Delayed object, got str

This is from the from_delayed step (after some investigation).

Question
I am not particularly tied to any of my methods for doing this - I just really want to know: how can I achieve my goals?

The only extra thing I’m concerned about is that sometimes my features may be huge (many columns) but they are just not in this case.

Thanks in advance!

It is often a bad idea to try to mix the collections API (dataframe) and direct low-level APIs of delayed/futures. If you are happy to write many files, the following will do it

pdf_futures = []. # pdf = 'pandas dataframe' feature df created by `load`
for path in paths:  # list of paths to files len(paths)~30k
    part_future = client.submit(load, *(str(path), ref_future)
    pdf_futures.append(client.submit(DataFrame.to_hdf, part_future, path + ".hdf")

furthermore, you may consider creating and computing your futures in batches, so that not the whole thing is in flight at a time.

1 Like

thanks very much!

When you say do it in batches you mean something like this:

for batch in batched_paths: 
    pdf_futures = []
    for path in batch:
        part_future = client.submit(load, *(str(path), ref_future)
      pdf_futures.append(client.submit(DataFrame.to_hdf, part_future, path + ".hdf")
    client.gather(pdf_futures)

or do you mean:

for batch_of_paths in batched_paths: 
    pdf_futures = []
    part_future = client.submit(load, *(batch_of_paths, ref_future)
    pdf_futures.append(client.submit(DataFrame.to_hdf, part_future, path + ".hdf")
    client.gather(pdf_futures)

?

Couple of follow up questions (if you have time) please:

  1. Doing it in batches - is this is because the graph may get too big if I don’t do it in batches?
  2. Is there any reason you can think of why my original code wouldn’t work? I presume your ‘don’t mix and match APIs’ is directed at my later attempts. I would rather write a single file if I’m honest.

Thanks again!

I think I meant the second code snippet.

Writing to a single file must require either gathering all the data to one node for the write or some complex write locking. I’m not sure what the HDF backend does, but this is the reason that other file writers (csv, parquet) all write to separate files normally.

is this is because the graph may get too big if I don’t do it in batches

The number of tasks being managed by the scheduler does contribute to its load; but also it’s possible to swamp the comm simply submitting so many in one go, so that there are lags between workers and scheduler, and heartbeat timeouts. There are config options for these, but your work feels very batch-like, so that seems a sensible approach.

I see, thanks for the clarification!

I usually do this way

def fn_process(param1, param2, param3):
    # create unique identifier. Could be file name (if not dupicate), file and time stamp, etc
    file_text = <create here>
    try:
        # do whatever you want to do for each partition. Processing etc
	data.to_parquet("Output_Path\\"+ file_text + ".gzip",compression='gzip') #each process write to their own file, hence true pararelism without blocking
        data_return = pd.DataFrame([["SSOLOW",file_text,'COMPLETE']], columns=['FILEGROUP','FILENAME','STATUS']) # create status for each input
    except:
        data_return = pd.DataFrame([["SSVLOW",file_text,'FAIL']], columns=['FILEGROUP','FILENAME','STATUS']) #also create status if fail
    return data_return

List = glob2.glob("Input folder"+"\\"+"*"+".xlsx") # example of paralel process of xlsx file
List = [ x for x in List if "~" not in x ] #this is filtering hidden file which usually caused by caching process from MS Office in Windows. Skip this if not necessary
# List are input list, each independent of each other
Output_ = [delayed(fn_process)(file,param2,param3) for file in List] #create delayed object
Output_dd = dd.from_delayed(Output_) #convert to dask dataframe. in this example, means the dataframe containing the status. The process output itself is on parquet file
Output_pd = Output_dd.compute() # run the entire process

#below are to get dask dataframe representing the output. One of the purposes is to have better partition for staging, so you can delete the original output. But if of no concern, you can use this for query
dd.read_parquet("Output_Path\\"+ "*" + ".gzip")

Tested until ~100 TB, ~100-200k file on old precision workstation (old 12 core xeon, 128gb ram if i didnt mistaken. But could run on much slower PC, just will take longer time)

Be advised though since this is paralel write, it will impact performance a lot if you are using HDDs. But it will run

Also usually only used if i need to do transportation on format not supported natively by dask (i.e. excel input). Parquet, csv etc i’ll just use the paralel map partition.

1 Like

Hi - thanks for your input. I think in structure what you’ve done is similar - The difference being you keep the load/process/write in a single function. This seems a bit more logical actually - no sense making an extra node in the compute graph for the write step. I’d be interested in how that impacts performance.

Your point about HDDs is well taken and I’ve already done a lot of profiling - our cluster has many HDDs mapped to a single logical drive which makes parallel read/writes possible.

I REALLY like your idea of returning a status dataframe for each file. I will be stealing this! thanks!