Hi, I am not sure if the following setup for using dask is correct and whether I should be using futures.
I build my dask cluster as so and import my data
n_workers = 8
print(f"Creating cluster with {n_workers} workers")
cluster = LocalCluster(n_workers=n_workers, threads_per_worker = 1)
client = Client(cluster)
d = dld.csv_load(filename) ## I have to use this custom csv_load function which creates a pandas df
ddf = dd.from_pandas(d, npartitions=1) ## to test whether order is preserved when writing to parquet
I then have a custom func (that calls a whole bunch of other custom modules) that does some computation row-by-row
def process_row(row):
new_df = pd.DataFrame(row).transpose().reset_index(drop=True)
....... # does some computations and creates a dataframe for that row which is probably ridiculous in this setup
results_df = pd.DataFrame({
**summary,
'R': [q_dict['qz']],
'Ch': [q_dict['chisq']]
})
return results_df
I then use futures as such.
futures = []
for _, row in ddf.iterrows():
future = client.submit(process_row,row)
futures.append(future)
ddfg = dd.from_delayed(futures)
ddfg.to_parquet(outname, engine='pyarrow', write_index=False, compute=True)
The computation is incredibly slow and I am not sure if the order is preserved when writing to parquet.
Any thoughts/help would be greatly appreciated, thanks.