I’m gradually building up a dataframe from pandas dataframes. Initially I just concatted these together with dask, but this unfortunately doesn’t work as expected. I’m trying to prevent all the data to sit in the client process. Which it still does after calling concat, due to the lazy nature. Calling persist every concat will move the data into the dask cluster, but also duplicates the data on every call.
Some toy code to explain the issue, imagine the data constructed in the beginning is external and will change each “iteration”
Initialize cluster, build some data to use for this example
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client
from dask.dataframe.multi import concat
c=Client()
import uuid
data = []
for i in range(0, 1024*1024):
data.append({'a': i, 'b': uuid.uuid4().hex, 'c': "some column", "other": 123235345, "data": 0.1123132123, "long": "wide", "some": "more data"})
I tried a few variants:
No persist, all data will just reside on the client
for j in range(32):
pdf = pd.DataFrame(data)
pdf = pdf.set_index('a')
ddf = dd.from_pandas(pdf, npartitions=1)
if df is None:
df = ddf
else:
df = concat([df, ddf])
Persist in the end, this will move the data into workers. However, it defeats the point as this still means the client has to have enough memory to keep all data in memory. About 2.3 GB stored on the cluster after the persist call.
for j in range(32):
pdf = pd.DataFrame(data)
pdf = pdf.set_index('a')
ddf = dd.from_pandas(pdf, npartitions=1)
if df is None:
df = ddf
else:
df = concat([df, ddf])
df.persist()
Persist every time. This doesn’t work, as it will duplicate data massively. In the end 7+GB stored on the cluster.
for j in range(32):
pdf = pd.DataFrame(data)
pdf = pdf.set_index('a')
ddf = dd.from_pandas(pdf, npartitions=1)
if df is None:
df = ddf
else:
df = concat([df, ddf]).persist()
Persist every time. This doesn’t work, as it will duplicate data massively. In the end 7+GB stored on the cluster.
for j in range(32):
pdf = pd.DataFrame(data)
pdf = pdf.set_index('a')
ddf = dd.from_pandas(pdf, npartitions=1)
if df is None:
df = ddf
else:
df = concat([df, ddf]).persist()
I tried a method I found on stackoverflow, but it doesn’t seem to work. Call the following method on the version of the dataframe before concatting.
from distributed.client import futures_of
def release(collection):
for future in futures_of(collection):
future.release()