Gradually build up a Dataframe

I’m gradually building up a dataframe from pandas dataframes. Initially I just concatted these together with dask, but this unfortunately doesn’t work as expected. I’m trying to prevent all the data to sit in the client process. Which it still does after calling concat, due to the lazy nature. Calling persist every concat will move the data into the dask cluster, but also duplicates the data on every call.

Some toy code to explain the issue, imagine the data constructed in the beginning is external and will change each “iteration”

Initialize cluster, build some data to use for this example

import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client
from dask.dataframe.multi import concat

c=Client()

import uuid
data = []
for i in range(0, 1024*1024):
    data.append({'a': i, 'b': uuid.uuid4().hex, 'c': "some column", "other": 123235345, "data": 0.1123132123, "long": "wide", "some": "more data"})

I tried a few variants:

No persist, all data will just reside on the client

for j in range(32):
    pdf = pd.DataFrame(data)
    pdf = pdf.set_index('a')
    ddf = dd.from_pandas(pdf, npartitions=1)
    if df is None:
        df = ddf
    else:
        df = concat([df, ddf])

Persist in the end, this will move the data into workers. However, it defeats the point as this still means the client has to have enough memory to keep all data in memory. About 2.3 GB stored on the cluster after the persist call.

for j in range(32):
    pdf = pd.DataFrame(data)
    pdf = pdf.set_index('a')
    ddf = dd.from_pandas(pdf, npartitions=1)
    if df is None:
        df = ddf
    else:
        df = concat([df, ddf])
df.persist()

Persist every time. This doesn’t work, as it will duplicate data massively. In the end 7+GB stored on the cluster.

for j in range(32):
    pdf = pd.DataFrame(data)
    pdf = pdf.set_index('a')
    ddf = dd.from_pandas(pdf, npartitions=1)
    if df is None:
        df = ddf
    else:
        df = concat([df, ddf]).persist()

Persist every time. This doesn’t work, as it will duplicate data massively. In the end 7+GB stored on the cluster.

for j in range(32):
    pdf = pd.DataFrame(data)
    pdf = pdf.set_index('a')
    ddf = dd.from_pandas(pdf, npartitions=1)
    if df is None:
        df = ddf
    else:
        df = concat([df, ddf]).persist()

I tried a method I found on stackoverflow, but it doesn’t seem to work. Call the following method on the version of the dataframe before concatting.

from distributed.client import futures_of

def release(collection):
    for future in futures_of(collection):
        future.release()

I might have it working now (by calling persist on each from_pandas dataframe)