Using futures and iterrows - optimal?

Hi, I am not sure if the following setup for using dask is correct and whether I should be using futures.

I build my dask cluster as so and import my data

n_workers = 8
print(f"Creating cluster with {n_workers} workers")
cluster = LocalCluster(n_workers=n_workers, threads_per_worker = 1)
client = Client(cluster)
d = dld.csv_load(filename) ## I have to use this custom csv_load function which creates a pandas df
ddf = dd.from_pandas(d, npartitions=1) ## to test whether order is preserved when writing to parquet

I then have a custom func (that calls a whole bunch of other custom modules) that does some computation row-by-row

def process_row(row):
        new_df = pd.DataFrame(row).transpose().reset_index(drop=True)
....... # does some computations and creates a dataframe for that row which is probably ridiculous in this setup
        results_df = pd.DataFrame({
            **summary,
            'R': [q_dict['qz']],
            'Ch': [q_dict['chisq']]
        })
       return results_df

I then use futures as such.

futures = []
for _, row in ddf.iterrows():
    future = client.submit(process_row,row)
    futures.append(future)
  
ddfg = dd.from_delayed(futures)
ddfg.to_parquet(outname, engine='pyarrow', write_index=False, compute=True)

The computation is incredibly slow and I am not sure if the order is preserved when writing to parquet.
Any thoughts/help would be greatly appreciated, thanks.

Hi @astrhug, welcome to Dask Discourse forum!

Building a Dask Dataframe and iterating on its row on Client side is useless in your example. But this does not explain the slowness, if processing each row takes a sufficient amount of time (e.g. at least about 1s), you should see some speed up.

The main parallelization can be done with two concepts:

  • Either Dask DataFrame and something like apply or map_partitions,
  • Either looping on a Pandas Dataframe and submitting Future for each row as you doing.

Some questions;

  • How many rows do you have in the input DataFrame?
  • How much time takes the processing of one row?
  • Why do are yo saying it is incredibly slow? Do you have a Pandas way of doing thing that is faster?

Hi @guillaumeeb , I appreciate the quick reply!

Building a Dask Dataframe and iterating on its row on Client side is useless in your example.

Are you saying to avoid this line?

ddf = dd.from_pandas(d, npartitions=1)

Rows: Varies but I have been testing with < 5k and eventually it will be in the millions
Time: I am not sure but It takes 40 seconds per row in the non-parallelised way.
Pandas: I have inherited somebody else’s code to parallelise/speed=up.

Thank you

Yes, in your current code it does not make sense.

So it should be perfectly fine to submit a future per row.

However, if the length goes above the million lines, then it will result in a lot of burden on Scheduler side.

So is the Pandas way faster? How does the code look using Pandas?