How to append to a Dask Dataframe

I am trying to find the number of distinct values of each combination of two columns of a Dask Dataframe and store the results to a Dask Dataframe which contains the following columns:
Distinct number, column name 1, column name 2.

The following code works fine using Pandas.

number_of_disincts = ['number_of_disincts']
columns_names = [f'column name{i+1}' for i in range(2)]
distincts = np.array([], dtype=int)
results = pd.DataFrame(columns=number_of_disincts + columns_names)
combinations = list(itertools.combinations(data.columns, 2))
for combination in combinations:
    distinct_tuples = ddf.drop_duplicates(subset=combination).shape[0].compute()
    distincts = np.append(distincts, [distinct_tuples])
results[number_of_disincts[0]] = distincts
results[columns_names] = combinations

When I try to use Dask Dataframe to store the results using the following code:

number_of_disincts = ['number_of_disincts']
columns_names = [f'column name{i+1}' for i in range(2)]
distincts = da.from_array(np.array([], dtype=int))
results = dd.from_pandas(pd.DataFrame(columns=number_of_disincts + columns_names), npartitions=2)
combinations = list(itertools.combinations(data.columns, 2))
for combination in combinations:
  distinct_tuples = ddf.drop_duplicates(subset=combination).shape[0].compute()
  distincts = da.append(distincts, [distinct_tuples])
results[number_of_disincts[0]] = distincts
results[columns_names] = combinations

this row: results[number_of_disincts[0]] = distincts leads to the following error:

ValueError: Number of partitions do not match

and this row: results[columns_names] = combinations leads to the following error:

NotImplementedError: Item assignment with <class ‘list’> not supported

Is there any workaround to have the same functionality as Pandas?

Hi @KonGian, welcome to this forum!

A first thought:

Do you really need a Dask Dataframe to store the result? Will you have a lot of values?

Then, in its current state, the code you provided is not reproducible, could you build a Minimum Reproducible Example, with some fake generated data?

Another thought: finding a number of distinct values in a distributed way is a hard problem. For example, when you do this:

I’m not sure it will work with Dask, and if it does, it won’t be parallelized, and probably won’t be really efficient. So you are computing all the distinct values in a single process memory, and want to create a Dask Dataframe from there? If this is really what you want to do, you should create the Dask dataframe from there, and not try to append values to an empty ones.

Hello @guillaumeeb and thank you for your reply.

Do you really need a Dask Dataframe to store the result? Will you have a lot of values?

My results Dataframe may exceed the local memory that’s why I want to store the results to a Dask Dataframe

you should create the Dask dataframe from there, and not try to append values to an empty ones.

Could you please provide me an example for this?

In order to do this, please first give us a Minimum reproducible example including some data (preferably generated by your code).

Here is an example:

The following code leads to the errors I described on my first post.

import pandas as pd
import numpy as np
from faker import Faker

fake = Faker()

def make_workers(num):
    
    status_list = ['Full Time', 'Part Time', 'Per Diem']
    team_list = [fake.color_name() for x in range(4)]
    
    fake_workers = [{'Worker ID':x+1000,
                  'Worker Name':fake.name(),
                  'Hire Date':fake.date_between(start_date='-30y', end_date='today'),
                  'Worker Status':np.random.choice(status_list, p=[0.50, 0.30, 0.20]),
                  'Team':np.random.choice(team_list)} for x in range(num)]
        
    return fake_workers

worker_df = pd.DataFrame(make_workers(num=10000))
ddf = dd.from_pandas(worker_df, npartitions=2)

number_of_disincts = ['number_of_disincts']
columns_names = [f'column name{i+1}' for i in range(2)]
distincts = da.from_array(np.array([], dtype=int))
results = dd.from_pandas(pd.DataFrame(columns=number_of_disincts + columns_names), npartitions=2)
combinations = list(itertools.combinations(ddf.columns, 2))
for combination in combinations:
    distinct_tuples = ddf.drop_duplicates(subset=combination).shape[0].compute()
    distincts = da.append(distincts, [distinct_tuples])
results[number_of_disincts[0]] = distincts
results[columns_names] = combinations

The following code works fine:

import pandas as pd
import numpy as np
from faker import Faker

fake = Faker()

def make_workers(num):
    
    status_list = ['Full Time', 'Part Time', 'Per Diem']
    team_list = [fake.color_name() for x in range(4)]
    

    fake_workers = [{'Worker ID':x+1000,
                  'Worker Name':fake.name(),
                  'Hire Date':fake.date_between(start_date='-30y', end_date='today'),
                  'Worker Status':np.random.choice(status_list, p=[0.50, 0.30, 0.20]),
                  'Team':np.random.choice(team_list)} for x in range(num)]
        
    return fake_workers

worker_df = pd.DataFrame(make_workers(num=10000))
ddf = dd.from_pandas(worker_df, npartitions=2)

number_of_disincts = ['number_of_disincts']
columns_names = [f'column name{i+1}' for i in range(2)]
distincts = np.array([], dtype=int)
results = pd.DataFrame(columns=number_of_disincts + columns_names)
combinations = list(itertools.combinations(ddf.columns, 2))
for combination in combinations:
    distinct_tuples = ddf.drop_duplicates(subset=combination).shape[0].compute()
    distincts = np.append(distincts, [distinct_tuples])
results[number_of_disincts[0]] = distincts
results[columns_names] = combinations

print(results)

Hope this will help :slightly_smiling_face:

Thanks,

OK, first a disclaimer, I will discuss this in the end, but I think there are several things to modify in your code if you want it to be truly distributed.

Let’s start with an answer to your question. If you want to build a Dask Dataframe, because you’ve got to much data to use Pandas, then you should do this by building independent partitions that fit in memory, and build the Dask Dataframe from these partitions. You’ve got several possibilities to do this, here in a example with Delayed:

from toolz import partition_all
from dask import delayed

delayed_partitions = []
# First partition your input
for sublist in partition_all(4, combinations):
    distincts = np.array([], dtype=int)
    for combination in sublist:
        distinct_tuples = ddf.drop_duplicates(subset=combination).shape[0].compute()
        distincts = np.append(distincts, [distinct_tuples])
    # delay a call to building the Dataframe from this sub results
    delayed_partitions.append(delayed(create_df_partition)(distincts, sublist))

# Build the dask dataframe
ddf = dd.from_delayed(delayed_partitions)

This build a Dataframe in a lazy way.
However, I tried to show an example from your code, but there is a big limitation here: the values to be put inside each partitioned DataFrame are computed in the main thread, which is not good if you’ve got a lot of data. If you want to really go distributed and avoid memory pressure on the main thread/process, you would need to compute the values inside delayed calls too.

And the disclaimer I put in the beginning of the message holds true for other parts of the code. You’re saying you’ll need a Dask Dataframe because you’ll have too many combinations to store. But that means that this code:

will probably won’t be able to store all the combinations into memory. And computing a list of unique combinations in a distributed way is hard. But here the problem is a bit simpler, and you can build this combinations list by part, this is what you should do if you have too many of them.

If you think it will fit in memory, then adding just a number of combination for each of these couples won’t add much, and you probably don’t need Dask.

Also, how will you build your input DataFrame? Can you read your input data by chunk using Dask?

Thank you for your help.

My intention is my code to be fully distributed. I am new to Dask and distributed computing so as you mentioned I maybe have plenty of mistakes in my code.

You are right about that. Could you please provide me a code that will be fully distributed and addresses the problems you mentioned? I am not sure how i can compute the values inside the delayed calls.

I have a csv as input so I use dd.read_csv.

Thanks again.

I’ve got two questions first:

  • How large will be ddf.columns length? You have that many columns into your input Dataframe?
  • Is your input data also too large to fit in memory?

My ddf.columns may be up to 300 columns. If I want to find the number of distinct values of each combination of two columns the combinations will be around 45000 and so the output Dataframe will have around 45000 rows so I can use Pandas. But my code should also work if I want to find the number of distinct values of each combination of four columns. In this case the combinations will be around 330 million and the output will have around 330 million rows. In this case I believe that i need to use Dask.

Yes, my input data may be too large to fit in memory also.

Okay, let’s not worry about how to write the result at first. I think your main problem is that you have two potential levels of parallelization. First, you need to chunk your input data if it doesn’t fit into memory, then, you need to apply the same computation for a lot of different combinations.

Really, the key point for this first answer is the need to use Dask to read your input dataset. This is a big constraint. Moreover, due to the computation you’re doing on this input Dataframe, Dask might not be performant, and you might still get memory issues. Let me explain a bit: in order to do a distributed drop_duplicates operation, Dask will first do this drop_duplicates on each of the input partitions, and then at one point it will do an aggregation on a single worker memory of all the remaining values in order to be able to really find the duplicates. This means that if the drop_duplicates doesn’t drop enough values, all the values will be in one Worker memory at some point, and this might fail the computation due to memory limit.

So it would be much better if the input Dataframe could fit into memory, so you can directly use Pandas for the drop_duplicates part.

Nevertheless, here is some code that might allow you to perform your whole computation, assuming the point raised above is not a problem.

ddf = ddf.persist()

for column in ddf.columns:
    sublist = list(ddf.columns)
    sublist.remove(column)
    subcombinations = list(itertools.combinations(sublist, 2))
    columns_names = [f'column name{i+1}' for i in range(3)]
    results = pd.DataFrame(columns=number_of_disincts + columns_names)
    distincts = np.array([], dtype=int)
    complete_combinations = []
    for combination in subcombinations:
        complete_combination = combination + (column,)
        complete_combinations.append(complete_combination)
        distinct_tuples = ddf.drop_duplicates(subset=complete_combination).shape[0].compute()
        distincts = np.append(distincts, [distinct_tuples])
    results[number_of_disincts[0]] = distincts
    results[columns_names] = complete_combinations
    # write subresult ...
    print(results)

The idea is to iterate over the combinations in a way that don’t blow up memory, and then just sequentially compute your results, periodically saving them using Pandas to disk. The ddf = ddf.persist() is key to optimize computations, but it assumes that your distributed cluster can store the Dataframe in distributed memory. Else you need to remove it. In the end, this will probably be painfully slow to do this, but right now I don’t see a better solution if your input dataset doesn’t fit into memory (but I’m not a Dataframe expert, maybe there is a better function to use than drop_duplicates to achieve your goal).

Thank you very much for your help.
Assuming that the point with the duplicates you have mentioned is not a problem and that the results can fit in a pandas Dataframe but the input Dataframe should be a Dask Dataframe, is there anything that I could do to make the computation of duplicates faster?

In the previous code snippet combinations and distincts can fit in memory, and ddf is a dask dataframe.
This loop runs very slow when running in dask distributed environment. Is there any way to make this for loop run in parallel using Dask?

As said above, if your input DataFrame doesn’t fit into memory, I don’t see what you can do except using persist before the for loop if the DataFrame fits in the distributed cluster memory. This would avoid to read back the data for every drop_duplicates call!

The thing is drop_duplicates is already running in parallel, so it would a bit difficult and dangerous to also try to run the for loop in parallel.