How to handle conflicting tasks

Hello Everyone:

I currently got a LocalCluster of 4 workers running, and there are 2 Clients which both submits a job to the cluster where each job is doing daskDataFrame .to_parquet (each job will contain about 2x tasks which are distriubted among 4 workers).

The problem that I am having is the 2 client’s job are interfering with each other because they are writing to the same parquet partition. Would it be possible to isolate the jobs? (e.g. run the job as a unit of work in sequence)

Any advice would be appreciated.

Hi @bill, welcome to this forum!

Would this be possible to have some MVCE, or at list a code snippet showing what you’re doing?

I’m not sure exactly how you manage to have 2 separate clients submitting jobs concurrently on a LocalCluster?

Moreover, is this possible to write in an already created partition in Parquet format? I thought the partition needed to be completely overwritten (but I’m probably not up to date here)?

Hi @guillaumeeb,

Thanks for your reply. Here is some snippets, assume they are ran in sequential order.

when Client 2 runs, it returns the The process cannot access the file because it is being used by another process: " on one of the partitions.

Dask Cluster:

LocalCluster(host=localhost, scheduler_port=1234, n_workers=4)

Client 1:

Client('localhost:1234)
df = dd.read_sql_table(<my table>, ...)
df.to_parquet(output_dir, engine='pyarrow', overwrite=True) <- this takes about 30min to run and contains ~ 100 tasks within dask

Client 2:

Client('localhost:1234)
df = dd.read_sql_table(<my table>, ...)
df.to_parquet(output_dir, engine='pyarrow', overwrite=True) <- This writes to the same location as client 1 because the same request are called twice by the upstream. 

Thanks!

I probably still miss something, but why don’t you create only one Client?

Somehow, if you want to overwrite some partitions of the Parquet file, you’ll have to wait for the first to_parquet call to finish, don’t you?

client 1 and client 2 are actually invoked by the upstream process which I have less control. I can implement some checking to ensure only one client is running at any give point in time, but I was wondering if its something that can be done in dask automicatially.

I don’t see a good solution on Dask side for this.

Currently, I imagine that your two clients are running in separate process or at least threads, so you have parallelization or concurrency with the two computations.

df.to_parquet is blocking, so in a serial context, you shouldn’t be able to submit two of these workflows at the same time.

Somehow, if you don’t want two computations of that kind to be executed at the same time, you’ll have to wait for the first one to finish. I guess this can be done either in pure Python code, being careful with some locks or other solution to not submit both writing of the same file at the same time. Either you can try to use Dask client methods to inspect Scheduler state and wait for any task to finish…

These are not really clean or satisfactory, it would be better to really wait for a df.to_parquet call to finish before calling another if you want them to be sequential.

ok that makes sense. Thanks