Hi!,
so i have a pipeline that is being executed to process input files and create output files. Currently it runs for h5 files, it starts a pipeline that iterates through the keys and a list of modules and starts them. The order is:
Key: → FileInput → Aggregating → Filter → Another Filter → Conversion of Data → Another Conversion → FileOutput
I implemented dask with multiple partitions on dataframes and it works, but in order to improve i want to start multiple pipelines on multiple threads. I cam across ThreadPool in Python and the reimplementation as Dask Futures.
For the h5 files, a file contains around 667 keys. The machine i am working on has 48 core, 96 threads and 1TB or RAM. I want to create a threadpool, iterate through the keys and start a thread for each key. In order to “limit” the pool, i want to run X threads, wait for one to finish and then submit the next thread. When all keys are converted, the pipeline should finish and the main thread should finish too.
My questions are:
- since the last module is FileOutput and writes the processed key into a h5 file, is there anything i should be aware off to close or end the running thread?
- Do i need to handle any sephamores when reading/writing to a file or does Dask take care of it?
- What would be the appropriate way to limit the amounts of running threads at the same time? I created a LocalCluster and set processes=False and n_workers=2 for testing but that doesn’t seem to work…
To provide some code, the current situation looks like this:
if file.endswith("h5"):
with pd.HDFStore(file, "r") as h5store:
for key in h5store.keys(): #iterate keys
print("\ncurrently processed key: ", key, "\n")
for module in config["modules"]: # unimportant
if "args" in module and "h5_key" in module["args"]: # unimportant
module["args"]["h5_key"] = key # unimportant
file_config = (file, config)
thread_handler(file_config) # excute the method that should start a new thread?
# process_file(file, config) # previous implementation
I created the thread_handler as i tried around with multiprocesses that only supported methods with one argument. The thread_handler looks like this:
def thread_handler(file_config):
print("starting new thread...")
process_file(file_config[0], file_config[1])
time.sleep(2)
And process_file was the previous method to create a pipeline object and start a pipeline that looks like this:
def process_file(file, config):
print(f"Processing files: {file}, Output prefix: {config['output_prefix']}, Config: {config}")
pipeline, metadata, detector = ConversionPipeline.pipeline_from_config_dict(config)
pipeline.process(detector, metadata) # this starts the actual pipeline
Inside that pipeline we iterate through the module list of config and send the DF to be processed through each module until written to file.
What would be the best spot to implement Futures and how do i resolve them at the end?