Parallelizing pipeline across threads

Hi!,

so i have a pipeline that is being executed to process input files and create output files. Currently it runs for h5 files, it starts a pipeline that iterates through the keys and a list of modules and starts them. The order is:

Key: → FileInput → Aggregating → Filter → Another Filter → Conversion of Data → Another Conversion → FileOutput

I implemented dask with multiple partitions on dataframes and it works, but in order to improve i want to start multiple pipelines on multiple threads. I cam across ThreadPool in Python and the reimplementation as Dask Futures.

For the h5 files, a file contains around 667 keys. The machine i am working on has 48 core, 96 threads and 1TB or RAM. I want to create a threadpool, iterate through the keys and start a thread for each key. In order to “limit” the pool, i want to run X threads, wait for one to finish and then submit the next thread. When all keys are converted, the pipeline should finish and the main thread should finish too.

My questions are:

  • since the last module is FileOutput and writes the processed key into a h5 file, is there anything i should be aware off to close or end the running thread?
  • Do i need to handle any sephamores when reading/writing to a file or does Dask take care of it?
  • What would be the appropriate way to limit the amounts of running threads at the same time? I created a LocalCluster and set processes=False and n_workers=2 for testing but that doesn’t seem to work…

To provide some code, the current situation looks like this:

if file.endswith("h5"): 
    with pd.HDFStore(file, "r") as h5store: 
        for key in h5store.keys(): #iterate keys
            print("\ncurrently processed key: ", key, "\n")
                for module in config["modules"]:  # unimportant
                    if "args" in module and "h5_key" in module["args"]: # unimportant
                        module["args"]["h5_key"] = key # unimportant 
                file_config = (file, config)
                thread_handler(file_config) # excute the method that should start a new thread?
                # process_file(file, config) # previous implementation

I created the thread_handler as i tried around with multiprocesses that only supported methods with one argument. The thread_handler looks like this:

def thread_handler(file_config):
    print("starting new thread...")
    process_file(file_config[0], file_config[1])
    time.sleep(2)

And process_file was the previous method to create a pipeline object and start a pipeline that looks like this:

def process_file(file, config):
    print(f"Processing files: {file}, Output prefix: {config['output_prefix']}, Config: {config}")
    pipeline, metadata, detector = ConversionPipeline.pipeline_from_config_dict(config)
    pipeline.process(detector, metadata) # this starts the actual pipeline

Inside that pipeline we iterate through the module list of config and send the DF to be processed through each module until written to file.

What would be the best spot to implement Futures and how do i resolve them at the end?

Welcome back @MarcoPasta,

So, I’m not sure I’m following. Do you want to run in parallel tasks that themself are running things using Dask dataframe, so working in parallel? If so this is a really complex setup, and in my opinion a bad idea. In most case, you should chose between parallelization at a low level (so inside your modules), and parallelization at a higher level (on your keys).

But I’m not sure that I understand it all, because if you already split by ket, do you really need to use Dask Dataframe for each key? Usually, parallelizing things at a high level is enough.

Anyway, doing this with Dask at a high level should be really simple (aslong as you don’t use Dask Dataframe inside each tasks, see above):

import dask
from dask import delayed
if file.endswith("h5"): 
    with pd.HDFStore(file, "r") as h5store: 
        delayed_results = []
        for key in h5store.keys(): #iterate keys
            for module in config["modules"]:  # unimportant
                if "args" in module and "h5_key" in module["args"]: # unimportant
                    module["args"]["h5_key"] = key # unimportant 
                file_config = (file, config)
                delayed_results.append(delayed(thread_handler)(file_config)) # exc
       dask.compute(*delayed_results)#computation actually starts there

With this kind of code, Dask will run one task per available threads, and just launch another task as soon as one is finished until there is no more. See
https://examples.dask.org/applications/embarrassingly-parallel.html

You can actually do this with Future also, but I found it cleaner to use Delayed here.

Probably, I don’t know if you can write to a single HDF5 file with multiple threads, that might be a problem. If you write to file using Pandas or directly through h5py, I don’t know what will happen.

processes=False and n_workers=2 are kind of antagonist, the second one meaning you want two separate processes. If you want only thread, you can use LocalCluster(n_workers=1, threads_per_worker=96). But actually, if you just create a LocalCluster without any argument, Dask will fit it to your machine available cores (with some processes and some threads_per_worker, processes * threads_per_worker will be equal to the number of cores on you server).

Hi @guillaumeeb and thanks for answering!

yes, i’m saddened to say that requirements kinda switched around… Dask was implemented into each module to make streaming possible and to handle memory better (which worked and was a success). But know i got asked if i am able to parallelize processing multiple keys at the same time - this is why i came up with this.

So for instance, i run FileInput module and read a .h5 file with dask read_hdf and split by chunks of size 10,000 rows. Then when send to each module, it is processed in a variety of 2-15 partitions. But still, there is only one key loaded up at the same time.

If i understand correctly and if i was up to process the keys in parallel - i have to remove dask from every module and work with pandas again?

Also the next question that arises - at the moment i don’t load a complete file but rather load the key of a file into the pipeline, process it and then write the key into a new file.

def _load_data_h5(self, input_file: str):
        df = dd.read_hdf(input_file, key=self.h5_key, mode="r", chunksize=10_000)
        return df[self.columns]

If i want to process multiple keys at once, could i potencially just continue loading the file/key at the moment a “new” thread is started or would it better to read the full file first, then pick a key and process that one? In second case, i wouldn’t see a chance to pick a key from the file and then put the key back to the same file and write the full file all at once at the end. :confused:

Edit: testing a little around in a notebook revealed (again) to me that if i load a full file with all keys, it has 667 partitions (which is exactly the amount of keys the file has). I have also created a client with 1 worker and 48 threads. I performed map_partitions over the 667 partitions where i just put a 0 in every row and every int column and looking at the dashboard it looked like that multiple partitions where processed in parallel. :thinking: Couldn’t this potencially solve my “problem”?

And being on case anyway: how does delayed and map_partitions differ in particular?

Question is: do you really need to split your input data for only one HDF5 key?

It would be a lot simpler to do so. Actually, you could use two levels of parallelisation with Dask, but this is a tricky thing to do.

Yes you can, and it’s the correct thing to do. Read part of a file inside your tasks, do not read the whole file first.

Yes you can do that too! This would probably be the most cleaner way, or at least the more logical. Use map_partitions on the complete input dataset, and apply your modules on each partition.

map_partitions is preferred if you have a DataFrame like input. This means you use a high level Dask API, DataFrame to process your dataset. It’s also referred as collection API.

Using Delayed on each partition (reading your input file by key, one Delayed by key) is basically the same as calling map_partitions on a DataFrame. You would chose one or the other based on how you have to read the input data, and what you want to achieve next.

Hi @guillaumeeb and thanks for answering!

map_partitions is preferred if you have a DataFrame like input. This means you use a high level Dask API, DataFrame to process your dataset. It’s also referred as collection API.

Using Delayed on each partition (reading your input file by key, one Delayed by key) is basically the same as calling map_partitions on a DataFrame. You would chose one or the other based on how you have to read the input data, and what you want to achieve next.

Yes, the input file is a h5 collections of dataframes so i guess going directly for map partitions will solve me some kind of problems. I have one question here since it’s still super confusing to me:

map_partition() contains a meta keyword that’s supposed to inform about the ouput, right? So if i map through multiple DF partitions, add some columns, change some values and return the df partition, the output is still the DF right? So going for a df = df.map_partition(func, meta=df) should do the trick? Or do i have to provide some kind of hint of what it is going to look like? I had the same thing before where i had to create a list of all columns the Df is going to have after processing and provide that to the meta keyword.

The next question will be: since using map_partition, i want to create a Client or a LocalCluster to handle the amount of worker/threads. I’ve set up a LocalCluster with 5 threads for keeping an oversight and i want to view them on the dashboard just like i can when working with a jupyter notebook, how can i start the client when running the software from the terminal? And what would it be a better practice to use them with a with context?

Edit: i think i managed to build something working, i was able to process many keys in parallel, but then i error’d out:

2023-02-16 16:40:51,233 - distributed.worker - WARNING - Compute Failed
Key:       ('to-hdf-b5ba9246ae0f11ed90f5fbcbaa91114d', 0)
Function:  _pd_to_hdf
args:      (<function NDFrame.to_hdf at 0x7f09a2ce74c0>, <SerializableLock: 57a313c4-5fbd-425e-81d8-14e06546b03e>, [None, '/home/user/Development/testdata/h5/1673963564_e95ed118-44c4-4a14-9379-de31dd8570a4_head_5000Primaries_0.0deg.hits_out.h5', '/000'], {'format': 'table', 'mode': 'a', 'append': False})
kwargs:    {}
Exception: "TypeError('cannot create a storer if the object is not existing nor a value are passed')"

Could this mean a key failed during processing or did it try to store every key, before every key was existent?

The code behind looks like this:

for file in args.input_files:
            config = base_config.copy()
            config["input_files"] = [file]
            config["output_prefix"] = os.path.splitext(file)[0] + "_out"
            filename = config["output_prefix"] + ".h5"

            if file.endswith("h5"):
                df = load_file(file)
                df = df.map_partitions(process_file, file, config, meta=df)
                # df.compute() 
                df.to_hdf(filename, key="/*")
                print("Conversion has ended! :)")

No, if you change the columns, you’ll have to provide correct metadata. See the documentation:

An empty pd.DataFrame or pd.Series that matches the dtypes and column names of the output

Well, just the same as in the notebook I guess: cluster = LocalCluster(...) and then client = Client(cluster). And yes it’s a better practice to use a with context, but it’s really up to you here. Do you have trouble doing this?

Ok, so here, I’m not sure. Maybe it’s a metadata problem, or maybe one of you partition is empty. Can you check that?

Ok, so here, I’m not sure. Maybe it’s a metadata problem, or maybe one of you partition is empty. Can you check that?

Partition shouldn’t be empty as this file is being worked on for quiet a while now…

One thing i saw is that i tried to write /spot_000, which doesn’t exist. If i fill key with just key='*' it tries to fill with 000. I don’t know if this is the error but i looks like it tried to write a key that doesn’t exist.

So i thought “maybe if i send the result directly to a write function…” and ended up with this error:

2023-02-16 21:02:06,238 - distributed.worker - WARNING - Compute Failed
Key:       ('process_file-eb30905dc8010698da4e25dec8f04691', 8)
Function:  subgraph_callable-5ff7720a-56c9-410b-9da9-2469aeac
args:      ({'number': 8, 'division': 0}, ('/home/user/Development/testdata/h5/out/testfile.h5', '/spot_16', {'start': 0, 'stop': 1000000}))
kwargs:    {}
Exception: "HDF5ExtError('Problems appending the records.')"

looking at args, it looks like spot_16 became partition 8 and as thus i tried to write it into the new file as spot_8 which is not correct… It is important that the stops get rewritten into the file as the same.

But

since the key is known in the args here, do i have any possibilities to access it from inside the code? Divisions was always 0. With pandas i could just df.keys() and i got the name of the keys and could select them by those. Any chances here?

Edit: omg yeah, that was definetly a me problem, i found the issue! But i also ran directly into the next issue. It was indeed a metadata error. After fixing that error, i was able to process smaller files in parrallel and write them back. But when trying with the really big files, i get this error message:

  raise exceptions.TimeoutError()
  ... few async.io...
  File "H5F.c", line 620, in H5Fopen
    unable to open file
  File "H5VLcallback.c", line 3501, in H5VL_file_open
    failed to iterate over available VOL connector plugins
  File "H5PLpath.c", line 578, in H5PL__path_table_iterate
    can't iterate over plugins in plugin path '(null)'
  File "H5PLpath.c", line 620, in H5PL__path_table_iterate_process_path
    can't open directory: /usr/local/hdf5/lib/plugin
  File "H5VLcallback.c", line 3351, in H5VL__file_open
    open failed
  File "H5VLnative_file.c", line 97, in H5VL__native_file_open
    unable to open file
  File "H5Fint.c", line 1898, in H5F_open
    unable to lock the file
  File "H5FD.c", line 1625, in H5FD_lock
    driver lock request failed
  File "H5FDsec2.c", line 1002, in H5FD__sec2_lock
    unable to lock file, errno = 11, error message = 'Resource temporarily unavailable'

End of HDF5 error back trace

Unable to open/create file '/home/user/Development/testdata/h5/testfile_out.h5'
2023-02-16 22:17:08,682 - distributed.process - WARNING - [<AsyncProcess Dask Worker process (from Nanny)>] process 1902610 exit status was already read will report exitcode 255
2023-02-16 22:17:10,018 - distributed.process - WARNING - [<AsyncProcess Dask Worker process (from Nanny)>] process 1902626 exit status was already read will report exitcode 255
2023-02-16 22:17:10,182 - distributed.process - WARNING - [<AsyncProcess Dask Worker process (from Nanny)>] process 1902646 exit status was already read will report exitcode 255
2023-02-16 22:17:10,583 - distributed.process - WARNING - [<AsyncProcess Dask Worker process (from Nanny)>] process 1902619 exit status was already read will report exitcode 255

The code to this:

def load_file(file):
    df = dd.read_hdf(file, key="/*", mode="r" , sorted_index=True)
    return df

def verification(df, spot):
    print(f"Verfication of Partition: {spot}\n{df[:10]}",)

def process_file(df, file, config, partition_info=True):
    partition = partition_info['number']
    print("Partition: ", partition)
    pipeline, metadata, detector = ConversionPipeline.pipeline_from_config_dict(config)
    df = pipeline.process(df, detector, metadata)
    verification(df, partition)
    return df

main: 
df = load_file(file)
print("Filename: ", filename)
df = df.map_partitions(process_file, file, config, meta=meta_df)
df.to_hdf(filename, key='/spot_*', lock=True) # lock is set but doesn't seem to work properly? 

Technically it looks like it tries to open the same file multiple times or at least tries to write to it multiple times (the dashboard showed exactly 1 write process for a longer time until crashing)?

Hi @MarcoPasta, so I’m going to answer directly your last edit.

This is actually a known problem, see

I’m able to reproduce it with the MRE from the github issue.

As @martindurant says in the issue, there are a lot of issues with using HDF5 and trying to write a file with multiple processes. This just don’t work. You can bypass this by creating a LocalCluster in threading mode only LocalCluster(processes=False), but anyway: only one partition can be written at a time, so you can’t write result concurrently, which is a terrible waste of time.

Question is: do you really need to use only one HDF file with separated keys in it? Can’t you just write to separate files, or even better, a more performant file format like Parquet?

Hi @guillaumeeb thanks for replying!

Sadly i have no controller over what format is being used, but i will request to the group to take a look at parquet, but i think i will not have enough time to move to parquet now…

I was reading through all these Github issues and came across parallel HDF5 that said it can be written in parallel (but in the context of h5py). Do you think it would make a difference? I would need to request the data for this.

I will spend my evening into look at parquet and what i can make out of it, maybe it’s worth a shot. Does it support multi writing? Else i will just try to write every key into a different file which should be possible with multiprocessing (i guess)

Also: when setting LocalCluster(processes=False) i do receive a segmentation fault. I don’t know why since it happen pretty early - but whats working is limiting the amound of workers and threads per workers to 1

with LocalCluster(
        # processes=False,
        n_workers=1, 
        threads_per_worker=1,
        dashboard_address="127.0.0.1:8787"
        ) as cluster, Client(cluster) as client: 

This conestellation is running for some hours now and look pretty promising, i hope the result will be fine…

Yes, this works, I’ve tried it. You should definitely do this if you can!

I don’t think Dask can use this.

Yes, this works, I’ve tried it. You should definitely do this if you can!

So i tried this, i can write a bunch of file but in almost every case now i end up with:

Exception: 'HDF5ExtError("Can\'t get type info on attribute values_cols in node spot_219.")'

Does this mean the input files comes with problems or could this be an error inside dask? When using the old pandas method, every file was readable with no problem and when performing single threaded, also no problems appear…

Can you give the complete stack trace? Is it on reading or writing?

Does this happened always on the same spot_xxx value?

Yes i cam try to give some more, might have to sed my username out of it.

The code i have written for creating a directory to store every key as a file is:

for file in args.input_files:
            config = base_config.copy()
            config["input_files"] = [file]
            config["output_prefix"] = os.path.splitext(file)[0] + "_out"
            dirname = config["output_prefix"]
            print(dirname)
            if not os.path.exists(dirname):
                os.mkdir(dirname, mode=0o775)
                print("Directory created")
            else:
                print("directory exists already")
            filename = dirname + "/" + dirname.split("/")[-1] + "_*" + ".h5"
            print(filename)
            

            if file.endswith("h5"):
                df = dd.read_hdf(file, key="/spot_*", mode="r", lock=True)
                df = df.map_partitions(process_file, file, config, meta=meta_df)
                # df.compute()
                df.to_hdf(filename, key='/spot', lock=True)
                print("Conversion has ended! :)")

This works and the errors that come from it are either that the description of a key is not available during processing or a file read error, that why i tried with a lock.

I will just try to post some errors that come when i try to run it on the same or multiple files:
File 4

2023-02-18 15:13:37,849 - distributed.worker - WARNING - Compute Failed
Key:       ('process_file-916bb242b51036a0bb8d5f7de12a1016', 22)
Function:  subgraph_callable-83082622-75d7-4e01-89ca-cb15b359
args:      ({'number': 22, 'division': None},         PDGEncoding  trackID  parentID   eventID        posX       posY        posZ  ...  volumeID[6]  volumeID[7]  volumeID[8]  volumeID[9]  spotX  spotY  spotAngle
0              2212        1         0  13930000  -69.668015 -69.873405  225.354004  ...           -1           -1           -1           -1  -63.0  -56.0        4.0
1              2212        1         0  13930000  -70.072998 -70.521439  283.153992  ...           -1           -1           -1           -1  -63.0  -56.0        4.0
2              2212        1         0  13930000  -70.421776 -70.895531  326.093994  ...           -1           -1           -1           -1  -63.0  -56.0        4.0
3              2212        1         0  13930000  -70.451416 -70.970680  331.593994  ...           -1           -1           -1           -1  -63.0  -56.0        4.0
4              2212        1         0  13930000  -70.470627 -71.098946  337.093994  ...           -1           -1           -1        
kwargs:    {}
Exception: 'HDF5ExtError("Can\'t get type info on attribute VERSION in node spot_119.")'
####################################
2023-02-18 15:20:37,979 - distributed.worker - WARNING - Compute Failed
Key:       ('process_file-916bb242b51036a0bb8d5f7de12a1016', 231)
Function:  subgraph_callable-ca4eeeec-5bd4-4ae5-8b68-f57158bd
args:      ({'number': 231, 'division': None},         PDGEncoding  trackID  parentID   eventID       posX       posY        posZ      edep  ...  volumeID[5]  volumeID[6]  volumeID[7]  volumeID[8]  volumeID[9]  spotX  spotY  spotAngle
0              2212        1         0  14870000 -14.048635 -28.422003  225.354004  0.021541  ...           -1           -1           -1           -1           -1   -7.0  -28.0        4.0
1              2212        1         0  14870000 -15.563166 -27.757998  283.153992  0.039218  ...           -1           -1           -1           -1           -1   -7.0  -28.0        4.0
2              2212        1         0  14870000 -16.733650 -27.231239  326.093994  0.017914  ...           -1           -1           -1           -1           -1   -7.0  -28.0        4.0
3              2212        1         0  14870000 -16.934532 -27.147579  331.593994  0.015610  ...           -1           -1           -1           -1           -1   -7.0  -28.0        4.0
4              2212     
kwargs:    {}
Exception: 'HDF5ExtError("Can\'t get type info on attribute encoding in node spot_188.")'
###################################
2023-02-18 15:33:09,023 - distributed.worker - WARNING - Compute Failed
Key:       ('process_file-916bb242b51036a0bb8d5f7de12a1016', 22)
Function:  subgraph_callable-2ffa6e60-313c-44cb-9082-0a574c63
args:      ({'number': 22, 'division': None},         PDGEncoding  trackID  parentID   eventID        posX       posY        posZ  ...  volumeID[6]  volumeID[7]  volumeID[8]  volumeID[9]  spotX  spotY  spotAngle
0              2212        1         0  13930000  -69.668015 -69.873405  225.354004  ...           -1           -1           -1           -1  -63.0  -56.0        4.0
1              2212        1         0  13930000  -70.072998 -70.521439  283.153992  ...           -1           -1           -1           -1  -63.0  -56.0        4.0
2              2212        1         0  13930000  -70.421776 -70.895531  326.093994  ...           -1           -1           -1           -1  -63.0  -56.0        4.0
3              2212        1         0  13930000  -70.451416 -70.970680  331.593994  ...           -1           -1           -1           -1  -63.0  -56.0        4.0
4              2212        1         0  13930000  -70.470627 -71.098946  337.093994  ...           -1           -1           -1        
kwargs:    {}
Exception: "SystemError('Negative size passed to PyUnicode_New')"

File 0

Module FileInputModule finished in 0.011126518249511719 seconds.
2023-02-18 16:01:25,705 - distributed.worker - WARNING - Compute Failed
Key:       ('to-hdf-964d578eaf9c11ed90f5fbcbaa91114d', 130)
Function:  _pd_to_hdf
args:      (<function NDFrame.to_hdf at 0x7fc914c884c0>, <SerializableLock: 6c2d72ef-8b40-49f5-9192-1c2aecd12b65>, [        PDGEncoding  trackID      edep       posX       posY        posZ  eventID  spotX  spotY  spotAngle  frameID  layer  stave  chip  pixelX  pixelY
0              2212        1  0.012576 -29.742273 -32.724815  225.354004  1075000  -35.0  -21.0        0.0    10750      0      8     5     524     165
1              2212        1  0.012576 -29.742273 -32.724815  225.354004  1075000  -35.0  -21.0        0.0    10750      0      8     5     523     165
2              2212        1  0.012576 -29.742273 -32.724815  225.354004  1075000  -35.0  -21.0        0.0    10750      0      8     5     524     164
3              2212        1  0.020221 -29.186249 -35.309849  283.153992  1075000  -35.0  -21.0        0.0    10750      1      8     5     544     262
4              2212        1  0.020221 -29.186249 -35.309849  283.153992  1075000  -35.0  -21.0        0.0    10750      1      8     5
kwargs:    {}
Exception: 'HDF5ExtError("Unable to open/create file \'/home/user/Development/testdata/h5/1673963564_e95ed118-44c4-4a14-9379-de31dd8570a4_head_5000Primaries_0.0deg.hits_out/1673963564_e95ed118-44c4-4a14-9379-de31dd8570a4_head_5000Primaries_0.0deg.hits_out_130.h5\'")'
######################################
2023-02-18 16:11:42,331 - distributed.worker - WARNING - Compute Failed
Key:       ('to-hdf-049bdd54af9e11ed90f5fbcbaa91114d', 130)
Function:  _pd_to_hdf
args:      (<function NDFrame.to_hdf at 0x7f44ebdf74c0>, <SerializableLock: 02e2b37e-d5d8-46ef-8ce4-4b436e397bc1>, [        PDGEncoding  trackID      edep       posX       posY        posZ  eventID  spotX  spotY  spotAngle  frameID  layer  stave  chip  pixelX  pixelY
0              2212        1  0.012576 -29.742273 -32.724815  225.354004  1075000  -35.0  -21.0        0.0    10750      0      8     5     525     165
1              2212        1  0.012576 -29.742273 -32.724815  225.354004  1075000  -35.0  -21.0        0.0    10750      0      8     5     524     165
2              2212        1  0.012576 -29.742273 -32.724815  225.354004  1075000  -35.0  -21.0        0.0    10750      0      8     5     524     164
3              2212        1  0.020221 -29.186249 -35.309849  283.153992  1075000  -35.0  -21.0        0.0    10750      1      8     5     544     262
4              2212        1  0.020221 -29.186249 -35.309849  283.153992  1075000  -35.0  -21.0        0.0    10750      1      8     5
kwargs:    {}
Exception: 'HDF5ExtError("Unable to open/create file \'/home/user/Development/testdata/h5/1673963564_e95ed118-44c4-4a14-9379-de31dd8570a4_head_5000Primaries_0.0deg.hits_out/1673963564_e95ed118-44c4-4a14-9379-de31dd8570a4_head_5000Primaries_0.0deg.hits_out_130.h5\'")'
####################### here i received the same error 2 times
######################################################
2023-02-18 16:17:53,604 - distributed.worker - WARNING - Compute Failed
Key:       ('process_file-74ace8235f76b9079d75f8339893a9a1', 218)
Function:  subgraph_callable-5b1d492b-d33c-448a-b6ba-d6219441
args:      ({'number': 218, 'division': None},         PDGEncoding  trackID  parentID  eventID       posX       posY        posZ      edep  ...  volumeID[5]  volumeID[6]  volumeID[7]  volumeID[8]  volumeID[9]  spotX  spotY  spotAngle
0              2212        1         0  1475000 -11.079943  72.374397  225.354004  0.032425  ...           -1           -1           -1           -1           -1  -14.0   56.0        0.0
1              2212        1         0  1475000 -10.962158  75.002434  283.153992  0.019318  ...           -1           -1           -1           -1           -1  -14.0   56.0        0.0
2              2212        1         0  1475000 -10.956473  76.957382  326.093994  0.015567  ...           -1           -1           -1           -1           -1  -14.0   56.0        0.0
3              2212        1         0  1475000 -10.977034  77.221481  331.593994  0.040080  ...           -1           -1           -1           -1           -1  -14.0   56.0        0.0
4              2212        1 
kwargs:    {}
Exception: 'HDF5ExtError("Can\'t get type info on attribute info in node spot_260.")'
############## this occured after i deleted the whole directory

If I understand correctly, this works for some of the files in args.input_files, but fails with others, as file 0 or 4?

If you write in multiple files, I don’t think you need a lock.

For the error on file 4, this seems to come from some error handling, or edge case handling not correctly handled by your process_file function, but it’s hard to be sure. Does this works on this file using Pandas? Do all the HDF5 keys have the same attributes?

For file 0, does this always fail with this error? This seems like something coming from the File system.

If I understand correctly, this works for some of the files in args.input_files, but fails with others, as file 0 or 4?

If i run the pipeline in single threaded mode, it ran on 3 keys perfectly fine with no errors. When running in multithreading mode, i get a seg fault and in multiprocessing mode the above mentioned errors on every file i give it to. It is either an attribute, the read failed or the writing process for the new file failed…

I am just going to run on a few other files and post the results here:
File 1:

2023-02-19 19:32:17,972 - distributed.worker - WARNING - Compute Failed
Key:       ('read-hdf-595d2274e68fdd163c75f3ea0efc49b4', 235)
Function:  subgraph_callable-55c7d301-d7a5-4d8d-8316-bc4714ff
args:      (('/home/user/Development/testdata/h5/1673963564_e95ed118-44c4-4a14-9379-de31dd8570a4_head_5000Primaries_1.0deg.hits.h5', '/spot_31', {'start': 0, 'stop': 1000000}))
kwargs:    {}
Exception: "HDF5ExtError('Problems reading records.')"

Partition:  229
Module HitBinningModule finished in 6.719513177871704 seconds.
Module ColumnFilterModule finished in 0.004641294479370117 seconds.
Module EqualEventsFramingModule finished in 0.03695988655090332 seconds.
Module ClusterLookupDiffusionModule finished in 5.807046890258789 seconds.
Validation of Partition: 107
   PDGEncoding  trackID      edep       posX      posY        posZ  eventID  spotX  spotY  spotAngle  frameID  layer  stave  chip  pixelX  pixelY
0         2212        1  0.013265 -48.237778 -4.229612  225.354004  4310000  -42.0    0.0        1.0    43100      0      6     6     921     125
1         2212        1  0.013265 -48.237778 -4.229612  225.354004  4310000  -42.0    0.0        1.0    43100      0      6     6     922     124
2         2212        1  0.013265 -48.237778 -4.229612  225.354004  4310000  -42.0    0.0        1.0    43100      0      6     6     921     124
3         2212        1  0.020734 -49.709217 -5.243570  283.153992  4310000  -42.0    0.0        1.0    43100      1      6     6     871     163
4         2212        1  0.020734 -49.709217 -5.243570  283.153992  4310000  -42.0    0.0        1.0    43100      1      6     6     870     163
5         2212        1  0.020734 -49.709217 -5.243570  283.153992  4310000  -42.0    0.0        1.0    43100      1      6     6     871     162
6         2212        1  0.020734 -49.709217 -5.243570  283.153992  4310000  -42.0    0.0        1.0    43100      1      6     6     870     162
7         2212        1  0.262019 -50.549171 -6.098475  326.093994  4310000  -42.0    0.0        1.0    43100      2      6     6     842     196
8         2212        1  0.262019 -50.549171 -6.098475  326.093994  4310000  -42.0    0.0        1.0    43100      2      6     6     841     196
9         2212        1  0.262019 -50.549171 -6.098475  326.093994  4310000  -42.0    0.0        1.0    43100      2      6     6     843     195
Module ClusterLookupDiffusionModule finished in 3.9661855697631836 seconds.
Validation of Partition: 111
   PDGEncoding  trackID      edep       posX       posY        posZ  eventID  spotX  spotY  spotAngle  frameID  layer  stave  chip  pixelX  pixelY
0         2212        1  0.034800 -49.073887  34.947762  225.354004  4330000  -42.0   28.0        1.0    43300      0      3     6     893     249
1         2212        1  0.034800 -49.073887  34.947762  225.354004  4330000  -42.0   28.0        1.0    43300      0      3     6     892     249
2         2212        1  0.034800 -49.073887  34.947762  225.354004  4330000  -42.0   28.0        1.0    43300      0      3     6     893     248
3         2212        1  0.034800 -49.073887  34.947762  225.354004  4330000  -42.0   28.0        1.0    43300      0      3     6     892     248
4         2212        1  0.034800 -49.073887  34.947762  225.354004  4330000  -42.0   28.0        1.0    43300      0      3     6     892     247
5         2212        1  0.020472 -49.861042  35.898552  283.153992  4330000  -42.0   28.0        1.0    43300      1      3     6     866     284
6         2212        1  0.020472 -49.861042  35.898552  283.153992  4330000  -42.0   28.0        1.0    43300      1      3     6     865     284
7         2212        1  0.020472 -49.861042  35.898552  283.153992  4330000  -42.0   28.0        1.0    43300      1      3     6     866     283
8         2212        1  0.020472 -49.861042  35.898552  283.153992  4330000  -42.0   28.0        1.0    43300      1      3     6     865     283
9         2212        1  0.017056 -50.461452  36.701717  326.093994  4330000  -42.0   28.0        1.0    43300      2      3     6     845     313
free(): invalid pointer
Module FileInputModule finished in 0.007690906524658203 seconds.
Module ClusterLookupDiffusionModule finished in 8.865557432174683 seconds.
2023-02-19 19:32:18,770 - distributed.worker - ERROR - Exception during execution of task ('process_file-6c91066084de4209527949aaeb70e464', 491).
Traceback (most recent call last):
  File "/home/user/.conda/envs/pctenv/lib/python3.8/site-packages/distributed/worker.py", line 2367, in _prepare_args_for_execution
    data[k] = self.data[k]
  File "/home/user/.conda/envs/pctenv/lib/python3.8/site-packages/distributed/spill.py", line 245, in __getitem__
    return super().__getitem__(key)
  File "/home/user/.conda/envs/pctenv/lib/python3.8/site-packages/zict/buffer.py", line 108, in __getitem__
    raise KeyError(key)
KeyError: "('read-hdf-595d2274e68fdd163c75f3ea0efc49b4', 491)"

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/user/.conda/envs/pctenv/lib/python3.8/site-packages/distributed/worker.py", line 2248, in execute
    args2, kwargs2 = self._prepare_args_for_execution(ts, args, kwargs)
  File "/home/user/.conda/envs/pctenv/lib/python3.8/site-packages/distributed/worker.py", line 2371, in _prepare_args_for_execution
    data[k] = Actor(type(self.state.actors[k]), self.address, k, self)
KeyError: "('read-hdf-595d2274e68fdd163c75f3ea0efc49b4', 491)"
#######################################################
2023-02-19 19:50:38,416 - distributed.worker - WARNING - Compute Failed
Key:       ('process_file-6c91066084de4209527949aaeb70e464', 243)
Function:  subgraph_callable-3adf5c1b-296b-40b8-9bfd-c86ce270
args:      ({'number': 243, 'division': None},         PDGEncoding  trackID  parentID  eventID      posX       posY        posZ      edep  ...  volumeID[5]  volumeID[6]  volumeID[7]  volumeID[8]  volumeID[9]  spotX  spotY  spotAngle
0              2212        1         0  4920000 -4.954599  66.264595  225.354004  0.021983  ...           -1           -1           -1           -1           -1   -7.0   49.0        1.0
1              2212        1         0  4920000 -5.475255  68.772438  283.153992  0.097121  ...           -1           -1           -1           -1           -1   -7.0   49.0        1.0
2              2212        1         0  4920000 -5.766844  70.767242  326.093994  0.027968  ...           -1           -1           -1           -1           -1   -7.0   49.0        1.0
3              2212        1         0  4920000 -5.814410  71.029160  331.593994  0.020837  ...           -1           -1           -1           -1           -1   -7.0   49.0        1.0
4              2212        1      
kwargs:    {}
Exception: 'HDF5ExtError("Can\'t get type info on attribute pandas_version in node spot_23.")'

File 2:

2023-02-19 19:43:26,256 - distributed.worker - WARNING - Compute Failed
Key:       ('read-hdf-00365c902083a9475e908d7785e0d6ca', 283)
Function:  subgraph_callable-e30ccb30-359b-469d-84af-4e6c948c
args:      (('/home/user/Development/testdata/h5/1673963564_e95ed118-44c4-4a14-9379-de31dd8570a4_head_5000Primaries_2.0deg.hits.h5', '/spot_353', {'start': 0, 'stop': 1000000}))
kwargs:    {}
Exception: "HDF5ExtError('Problems reading records.')"

File 3

2023-02-19 20:02:43,989 - distributed.worker - WARNING - Compute Failed
Key:       ('to-hdf-52a75bc4b08711ed90f5fbcbaa91114d', 123)
Function:  _pd_to_hdf
args:      (<function NDFrame.to_hdf at 0x7f68db6aa4c0>, <SerializableLock: 7f247b30-7d50-4e09-9969-e167add3dc22>, [        PDGEncoding  trackID      edep       posX       posY        posZ   eventID  spotX  spotY  spotAngle  frameID  layer  stave  chip  pixelX  pixelY
0              2212        1  0.036161 -40.947140 -77.479813  225.354004  11050000  -35.0  -63.0        3.0   110500      0     11     5     142     210
1              2212        1  0.036161 -40.947140 -77.479813  225.354004  11050000  -35.0  -63.0        3.0   110500      0     11     5     141     210
2              2212        1  0.036161 -40.947140 -77.479813  225.354004  11050000  -35.0  -63.0        3.0   110500      0     11     5     142     209
3              2212        1  0.036161 -40.947140 -77.479813  225.354004  11050000  -35.0  -63.0        3.0   110500      0     11     5     141     209
4              2212        1  0.036161 -40.947140 -77.479813  225.354004  11050000  -35.0  -63.0        3.0   110500      0     11
kwargs:    {}
Exception: 'AttributeError("\'UnImplemented\' object has no attribute \'description\'")'