Gradually build up a Dataframe

I’m gradually building up a dataframe from pandas dataframes. Initially I just concatted these together with dask, but this unfortunately doesn’t work as expected. I’m trying to prevent all the data to sit in the client process. Which it still does after calling concat, due to the lazy nature. Calling persist every concat will move the data into the dask cluster, but also duplicates the data on every call.

Some toy code to explain the issue, imagine the data constructed in the beginning is external and will change each “iteration”

Initialize cluster, build some data to use for this example

import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client
from dask.dataframe.multi import concat

c=Client()

import uuid
data = []
for i in range(0, 1024*1024):
    data.append({'a': i, 'b': uuid.uuid4().hex, 'c': "some column", "other": 123235345, "data": 0.1123132123, "long": "wide", "some": "more data"})

I tried a few variants:

No persist, all data will just reside on the client

for j in range(32):
    pdf = pd.DataFrame(data)
    pdf = pdf.set_index('a')
    ddf = dd.from_pandas(pdf, npartitions=1)
    if df is None:
        df = ddf
    else:
        df = concat([df, ddf])

Persist in the end, this will move the data into workers. However, it defeats the point as this still means the client has to have enough memory to keep all data in memory. About 2.3 GB stored on the cluster after the persist call.

for j in range(32):
    pdf = pd.DataFrame(data)
    pdf = pdf.set_index('a')
    ddf = dd.from_pandas(pdf, npartitions=1)
    if df is None:
        df = ddf
    else:
        df = concat([df, ddf])
df.persist()

Persist every time. This doesn’t work, as it will duplicate data massively. In the end 7+GB stored on the cluster.

for j in range(32):
    pdf = pd.DataFrame(data)
    pdf = pdf.set_index('a')
    ddf = dd.from_pandas(pdf, npartitions=1)
    if df is None:
        df = ddf
    else:
        df = concat([df, ddf]).persist()

Persist every time. This doesn’t work, as it will duplicate data massively. In the end 7+GB stored on the cluster.

for j in range(32):
    pdf = pd.DataFrame(data)
    pdf = pdf.set_index('a')
    ddf = dd.from_pandas(pdf, npartitions=1)
    if df is None:
        df = ddf
    else:
        df = concat([df, ddf]).persist()

I tried a method I found on stackoverflow, but it doesn’t seem to work. Call the following method on the version of the dataframe before concatting.

from distributed.client import futures_of

def release(collection):
    for future in futures_of(collection):
        future.release()

I might have it working now (by calling persist on each from_pandas dataframe)

@paultjuh Apologies for the delay in response!

I’m gradually building up a dataframe from pandas dataframes.

I’m curious about why you would need to do this?

Persist in the end, this will move the data into workers.

Indeed, persist triggers execution, which is why your data is being sent to the works.

Looking at your code, specifically the for loop where you read data and conct – this is usually not a good practice while using Dask. In Dask, we try to read data within tasks, so that we can read it in parallel. Two options you can try:

  • Using the new from_map function
  • Using pandas operations with Dask’s delayed API

Something like:

import uuid
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

client = Client()

data = []
for i in range(0, 32*32):
    data.append({'a': i, 'b': uuid.uuid4().hex, 'c': "some column", "other": 123235345, "data": 0.1123132123, "long": "wide", "some": "more data"})
data_list = [data, data] # dupilcating data just for demonstration

def my_func(x):
    pdf = pd.DataFrame(data)
    pdf = pdf.set_index('a')
    return pdf

ddf = dd.from_map(my_func, data_list)

ddf.visualize()

download-13

A general note while using both of the above options, Dask will be most helpful if you can build each partition of your Dask DataFrame using the task/function independently.