Efficienty shard dask array and send to workers

How can I efficiently, shard a dataset and send the dataset to n workers (and persist the dataset on those workers) to reuse them for ML model training?

I am doing this:

        train_all_paths = [dataset + 'train_' + str(i) + '_compressed.npz' for i in range(8)]
        val_all_paths = [validation + 'valid_' + str(i) + '_compressed.npz' for i in range(8)]

        def load_file(file_name):
            return np.load(file_name)


        def npz_headers(npz):
            """Takes a path to an .npz file, which is a Zip archive of .npy files.
            Generates a sequence of (name, shape, np.dtype).
            """
            with zipfile.ZipFile(npz) as archive:
                for name in archive.namelist():
                    if not name.endswith('.npy'):
                        continue

                    npy = archive.open(name)
                    version = np.lib.format.read_magic(npy)
                    shape, fortran, dtype = np.lib.format._read_array_header(npy, version)
                    yield name[:-4], shape, dtype

        def read_npz_file(npz_file):
            npz_ptr = np.load(npz_file)
            return npz_ptr['dataset_mat']

        npz_read = dask.delayed(read_npz_file)
        lazy_train_nps = [[npz_read(path), list(npz_headers(path))[0][1], list(npz_headers(path))[0][2]] for path in train_all_paths]

        lazy_val_nps = [[npz_read(path), list(npz_headers(path))[0][1], list(npz_headers(path))[0][2]] for path in val_all_paths]   # Lazily evaluate imread on each path
        train_dataset = [da.from_delayed(lazy_da_val[0],           # Construct a small Dask array
                          dtype=lazy_da_val[2],   # for every lazy value
                          shape=lazy_da_val[1])
                        for lazy_da_val in lazy_train_nps]

        val_arrays = [da.from_delayed(lazy_da_val[0],           # Construct a small Dask array
                          dtype=lazy_da_val[2],   # for every lazy value
                          shape=lazy_da_val[1])
                        for lazy_da_val in lazy_val_nps]

send_data(train_dataset)

Send data does: self.data_mapping[“data_w{0}”.format(d)] = self.client.persist(dataset[d], workers=self.worker_id_ip_dict[d]) (calling persist on worker)

On the worker (these are dask arrays), I am getting the features and labels and doing model training on the local data.

But, I get the error: OSError: [Errno 24] Too many open files on all my workers and the training doesn’t run. Is there anything wrong I am doing?

When I do df.compute() on the worker, the worker memory blows up a lot. So I am not sure how to persist in memory and not let the memory blow up.

I’m a bit confused about why you want the datasets persisted (held in memory). That’s a pretty unusual thing to do, since if all of the datasets fit within memory at once then you probably wouldn’t need to use Dask at all.

It’s more common to see persist used once you get to a smaller intermediate result, that needs to be accessed multiple times for the remainder of the computation (see Dask Best Practices — Dask documentation)

I do have one suggestion, although it is unlikely to solve all of your problems.

I think you should try and use zipfile open as a context manager. I see archive.open in your code, but no corresponding close, and if you used the context manager this would be handled neatly for you. This might not be the full reason for the “Too many open files” errors, but it is likely contributing to that problem.

with ZipFile('spam.zip') as myzip:
    with myzip.open('eggs.txt') as myfile:
        print(myfile.read()) 

https://docs.python.org/3/library/zipfile.html#zipfile.ZipFile.open

1 Like

So I am sharding the partitions of the dask array to lets say 8 workers, now I want to persist that in memory because I will be doing ML model training on that partition for multiple epochs, so I don’t want to reload the dataset to memory for each epoch (hence the need for persist)

Yes I will try this. So the too many open files errors is occurring because tensorflow is calling a getitem method to train on a particular record and that getitem method is called multiple times which is causing the dask worker to crash

@vigneshn1997 did this help answer your question? Feel free to follow-up if not!

Sorry for my delayed response.
I am still facing the issue. What would be an efficient way to shard my data across dask workers?

I have lets say a folder of dataset with 8 files, I want to read the dataset in parallel with each worker loading roughly the same amount of data. I want some pointer mapping which worker is having what data partition (or like pointer to the worker’s local data) and I want to persist the data on the workers. So, lets say I am training an mL model locally on each worker, I do not want the worker to load the data in every iteration. I want the worker to cache its data. So can you help me with how this can be done efficiently using dask?

The complete dataset cannot be loaded in one worker’s memory. Hence the data has to be sharded across workers, but I want to have the reference to which worker has what data?