Processing a large number of compressed files

Hi, I’m new to Dask and am wondering if there is a better way to handle my use case.

I have a large number of compressed (gz) files that I need to download from an S3 bucket. Each file holds up to 5 GB of data in JSON lines format (one JSON-formatted object per line).

My current approach is (omitting parts of the code not involved in file processing):

import dask
import dask.bag as db
from dask.distributed import Client

for i, file in enumerate(filenames):

    # download and extract the file
    unzipped_filename = dask.compute(download_file(file, path))[0]

    # process the file
    b = (
        db.read_text(unzipped_filename, blocksize=block_size)
        .map_partitions(processor.process_partition)
        .compute()
    )
    del b

    # remove local file
    os.remove(unzipped_filename)

The download_file method gets the current file from S3 and decompresses it into a text file. I have a class (processor) that contains a method (process_partition) that works on partitions of the current file. It performs some data cleanup, reformatting, and insertion into a database.

Overall, this works, but I suspect I could be doing things more efficiently. One issue I’m getting is a lot of warnings about “full garbage collections” taking around 12-15% of CPU time. I found a few mentions of this issue in other posts, and have tried changing the block_size to no avail.

I would hugely appreciate any recommendations on how to make better use of distributed processing. Thank you!

Hi @argonaut76, welcome to Dask community!

If the file downloading takes much shorter time than the file processing itself, and if you have enough partitions to distribute the process over all you resources, then I think this approach is fine.

About the garbage collections warning, it’s hard to tell where the problem might be. What is your setup, are you using a LocalCluster? How many processes, how much memory? Dask should optimize the memory footprint of input data, but if the process_partition method is using more memory, then you’ll go into more complex garbage collection phases.

On a more high level thinking, I would consider downloading and processing all the files in parallel if you have enough disk space, creating a huge bag using all the files, but this might complicate how you handle things for not much if the time to process one file is long enough.

Hi @guillaumeeb thank you for the warm welcome!

File download and decompression is definitely a bottleneck. Individual partitions of block_size = 20 MB process very quickly, and even a large file doesn’t take overly long to get through when spread across all workers. The download/decompression is only happening on a single worker, though.

Downloading everything and decompressing would require a huge amount of disk space. But I was thinking about doing this in batches with the number of files = the number of workers, then running a bag over the batch of files. That should take fuller advantage of the resources while balancing disk consumption. What do you think?

I’m currently using a LocalCluster with 5 workers sharing around 30 GB of memory. Once I have the pipeline proved out I would run it on a larger machine or a distributed cluster. Would that solve the garbage collection issue?

It sounds like a good compromise!

I guess it can solve it if your have more memory per Worker.

This sounds a bit small, did you come up with that number for parallelization of because of memory issues?

Hi @argonaut76 ,
have you tried using the Fine Performance Metrics and/or the gilknocker dashboard panel? These two tools combined should give you a good idea of where your bottlenecks are.