How can I efficiently, shard a dataset and send the dataset to n workers (and persist the dataset on those workers) to reuse them for ML model training?
I am doing this:
train_all_paths = [dataset + 'train_' + str(i) + '_compressed.npz' for i in range(8)]
val_all_paths = [validation + 'valid_' + str(i) + '_compressed.npz' for i in range(8)]
def load_file(file_name):
return np.load(file_name)
def npz_headers(npz):
"""Takes a path to an .npz file, which is a Zip archive of .npy files.
Generates a sequence of (name, shape, np.dtype).
"""
with zipfile.ZipFile(npz) as archive:
for name in archive.namelist():
if not name.endswith('.npy'):
continue
npy = archive.open(name)
version = np.lib.format.read_magic(npy)
shape, fortran, dtype = np.lib.format._read_array_header(npy, version)
yield name[:-4], shape, dtype
def read_npz_file(npz_file):
npz_ptr = np.load(npz_file)
return npz_ptr['dataset_mat']
npz_read = dask.delayed(read_npz_file)
lazy_train_nps = [[npz_read(path), list(npz_headers(path))[0][1], list(npz_headers(path))[0][2]] for path in train_all_paths]
lazy_val_nps = [[npz_read(path), list(npz_headers(path))[0][1], list(npz_headers(path))[0][2]] for path in val_all_paths] # Lazily evaluate imread on each path
train_dataset = [da.from_delayed(lazy_da_val[0], # Construct a small Dask array
dtype=lazy_da_val[2], # for every lazy value
shape=lazy_da_val[1])
for lazy_da_val in lazy_train_nps]
val_arrays = [da.from_delayed(lazy_da_val[0], # Construct a small Dask array
dtype=lazy_da_val[2], # for every lazy value
shape=lazy_da_val[1])
for lazy_da_val in lazy_val_nps]
send_data(train_dataset)
Send data does: self.data_mapping[“data_w{0}”.format(d)] = self.client.persist(dataset[d], workers=self.worker_id_ip_dict[d]) (calling persist on worker)
On the worker (these are dask arrays), I am getting the features and labels and doing model training on the local data.
But, I get the error: OSError: [Errno 24] Too many open files on all my workers and the training doesn’t run. Is there anything wrong I am doing?
When I do df.compute() on the worker, the worker memory blows up a lot. So I am not sure how to persist in memory and not let the memory blow up.