How can I optimize the speed of reading JSON Lines file(s) into a Dask dataframe?

Hello All,

First of all, I am just starting with Dask, so please bare with me. :slight_smile:

At this point, I don’t want to do too much, but here is the main idea:

  • I have one (or many) JSON Lines file(s) which are locally on a laptop, which has 32GB of RAM, so quite decent.
  • The file(s) have to remain locally, so cloud is not an option
  • The JSON files can have altogether up to 250 GB of raw data uncompressed
  • The file can have up to hundreds of millions of rows and hundreds of columns
  • My first goal would be to reduce the file size and to keep only 8 columns, which are more relevant for the use case
  • For now, what I try to do is rather simple:
    • read the file(s) as fast as possible, considering that all has to be done locally
    • Keep the needed columns (if possible to be done at reading time, even better, I guess)
    • Save the file as Parquet
  • After this, reading the Parquet file for further processing, should be faster. Of course, one course of actions might be that some processing will happen once the Dataframe is created in the first place, since is already available.

Using a Notebook within JupyterLab, here is what I did so far with a file which has ~5GB and ~40 million rows.

import dask.dataframe as dd
df = dd.read_json(ORIGINAL_JSONL_FILE, lines=True)

This will read the file in 9 minutes.

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=4, threads_per_worker=2)
client = Client(address=cluster) # I used also asynchronous=True

In this case it failed:

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
...
KilledWorker: ('read_json_file-012185e8-7a30-4a9c-a6c9-014a94682642', <WorkerState 'tcp://127.0.0.1:61069', name: 0, status: closed, memory: 0, processing: 1>)

I would like to know what can I try in terms of optimizing the reading time.

Thank you!

Hi,

LocalCluster(n_workers=4, threads_per_worker=2)

This automatically sets memory_limit to 32 GB / 4 per worker. You are reading a single json file, which by default is loaded by a single task on a single CPU on a single worker. Once you reach 32 / 4 * 0.95 = 7.6 GB RAM usage, which is unsurprisingly not enough to crunch through a 5GB file, the worker is killed off.

If you pass blocksize="128 MiB" to read_json, the file will be split into much more manageable segments and you will also benefit from all of your available CPUs, drastically cutting down the end-to-end runtime (your hard drive throughput may become a bottleneck though).

1 Like

Thank you!
Using the blocksize parameter didn’t kill the worker. Although I have many other follow-up questions, I will close this post, to keep things separated.