Creating a new dask df using columns from 2 dataframes and keeping the index of the first

Hello! I have 2 dask dfs of same number of rows and partitions. I want to create another one with one column of the first and some columns from the second one, I need to keep the index values of the first dask df. As it is in the code below. It apparently works, Then I do a merge with another dask df, and do a pivot table of the merge .
when I try to save the dask merge using parquet , it doesn’t work and show me the message of “ValueError: Length of values (1769167) does not match length of index (1784153)”. It looks like to me that the creation of pca_comps_df index is not ok. It dosné provide an error, but loos it doesn’t have an index.
Can anyone help me on that?
I also tried to save the dask array of pivot table of the merge andI got the same error above.

pca_comps_df_index = df_toPCA[['orig_index']].copy()
n_compon = 3
comps_sel = ['c'+str(x) for x in range(1,n_compon+1)]
for c in comps_sel:
       pca_comps_df_index[c] = pca_components_df2[c].valuespca_components_df_merge= 

df_toPCA_coords.merge(pca_comps_df_index, how='left', shuffle="tasks")img_comp_df[c]= 
pca_components_df_merge.pivot_table(index='coords_0', columns='coords_1', values=c)

#img_comp_dic[c] = [img_comp_df.to](http://img_comp_df.to/)_dask_array(lengths=True)

img_comp_dic[c] = img_comp_df.values

[pca_components_df_merge.to](http://pca_components_df_merge.to/)_parquet(save_path+f_name+'df_parquet', engine='pyarrow')
img_comp_dic[c].to_zarr(save_path+f_name)

Thanks in advance for any help.

Hi @flaviaas,

It’s really hard to help, as the provided code is not well formated and hard to read. Coul you please try to rework it? Ideally, a minimal reproducible example would be the best.

I don’t see a lot of Dask related code, and also some inplace assignment which is really not recommended with Dask.

Hello @guillaumeeb ,

I am going to try format the code better. Just to provide more information, I have 2 dask dataframes with the same total number of rows (111456171) and same number of partitions (63). Where I want to add one column (orig_index) from the first in the second. After that I want to merge it to df_toPCA_coords, which have the same number of partitions but more rows (111513600).
Code:

#apply the same repartition of df_toPCA to pca_components_df2
npart=df_toPCA.npartitions
pca_components_df2 = pca_components_df2.repartition(npartitions=npart)

#create a new dask df: pca_comp_df_index

orig_index = df_toPCA['orig_index'].to_dask_array(lengths=True)
pca_comps_df_index=pca_components_df2.copy()
pca_comps_df_index['orig_index'] = orig_index 

#merge 
pca_components_df_merge= df_toPCA_coords.merge(pca_comps_df_index, how='left', on='orig_index', shuffle="tasks") 

pca_components_df_merge.repartition(npartitions=npart)

#save the merge df to parquet files
pca_components_df_merge.to_parquet('/merge_files', engine='pyarrow')

The code show an error when I try to save to parquet files:
ValueError: Length of values (1784153) does not match length of index (1769167)

I checked that pca_comps_df_index also show this error when I tried some comands, for instance, I tryed to verify the number of rows running:
print (pca_comps_df_index.index.pipe(len)
and it show the same error:
ValueError: Length of values (1770994) does not match length of index (1769167)

I think there is a problem in pca_comps_df_index. I tried to create it in different ways and I always have the same error.

Thanks in advance for your help.

Okay, so you have a problem when trying to build pca_comps_df_index, not in the following part.

Why are you doing that, if your DataFrames have the same number of rows and partitions? How are you sure they are partitionned the same, with the same number of rows per partitions?

Hello @guillaumeeb, I am doing that because to be sure they have the same number of partitions, They have the same number of rows, but I don’t know how to assure they have the same number of rows per partition. I verified in the first 3 partitions of Estou a caminho!, and they don’t have.

subset_df = df_toPCA.partitions[[0,1,2]]
subset_df_lengths = subset_df.map_partitions(len).compute()
[########################################] | 100% Completed | 10.39 ss
print (subset_df_lengths)
0 1784153
1 1784014
2 1784413

subset_pca_df_lengths = subset_pca_df.map_partitions(len).compute()
[########################################] | 100% Completed | 8.55 sms
print (subset_pca_df_lengths)
0 1769167
1 1769020
2 1769411

If you have any idea how I should do that, please, let me know.
Thanks in advance.

How do you build these two Dataframes?

The df_toPCA I built from the 6 bands of an image of 10560x10560 pixels in 2 different days. Each column is a image with the bands values of the image in each day.
I read the tiff files to create a dataframe and then I calculated the partitions as below , which provided 111514.
df_toPCA.shape[1]/1000)

How is built the other Dataframe or Array?

I am sorry for the delayed answer but I am having some other issues with dask, I think there is. a problem when It is executed in mode interactive in the terminal of VS code.

The df_toPCA coords I build from df_toPCA:

cols_coords= df_toPCA.columns[0:3]

df_toPCA_coords = df_toPCA[cols_coords].copy()

I do this before drop Nans from df_toPCA.

BR

Then your two arrays won’t have the same number of values?

yes, df_toPCA_coords doesn’t have the same number of rows of df_toPCA
and pca_components_df2.

In the merge I want the rows that exist in df_toPCA_coords and don’t exist
in pca_components_df2 should be filled in with nan.

The problem is probably not in the merge, but when doing this:

yes, I agree! but when I do it with small dask dfs, it works.

Could you provide a reproducer with small dfs?

I am going to try, but the problems only happen with large dfs.

Does’nt fully matters, it is more for understanding your use case.