Hi @guillaumeeb. I wanted to preprocess the data first, so I did
from dask_ml.preprocessing import StandardScaler #Scale to zero mean and unit variance
train_path = ["\filename.train.snappy.parquet"]
data = dd.read_parquet(train_path, engine="pyarrow", compression = "snappy")
X = data["X_jets"].astype("float32")
y = data["y"]
def xjets_preprocess(image_data):
image_data[image_data < 1e-3] = 0.
image_data[-1,...] = 25.*image_data[-1,...]
image_data = image_data/100. #Standardize
data_image = StandardScaler().fit_transform(image_data)
return data_image
I have also known lately that you can perform transformations using functions with dask on each partition. So, what I did after is
X_images = data.map_partitions(xjets_preprocess, meta=X)
Now, after preprocessing, when I apply computation on this one, it gave me a pyarrow memory error. When I included split_row_groups = 10
to increase the partitions, the execution time to compute took forever. I have also done the X_images.to_dask_array(lengths = True)
and also lasted forever. How can I convert it to dask array to lazily load the data and apply computation on chunks after preprocessing? I hope the problem I am now on makes more sense. I want to implement a resnet model in tensorflow and feed chunks of data into it. While tensorflow does not directly support dask dataframe as an input, once I convert the series dataframe that i am working on to dask array, I believe I can do something like
def dask_generator(X_images, y, batch_size):
instances = X_images.shape[0]
batches = instances // batch_size
while True:
for i in range(batches):
X_batch = X[i*batch_size:(i+1)*batch_size]
y_batch = y[i*batch_size:(i+1)*batch_size]
yield X_batch, y_batch
batch_size = 32
train = dask_generator(X_images, y, batch_size)
...
resnet.fit(train, steps_per_epoch = X_images.shape[0]//batch_size, shuffle = True, epochs = 10)
for it to work good.
EDIT: Below is what I am trying to accomplish, and in full code.
def xjets_preprocess(xjets):
xjets = np.float32(xjets.values)
xjets = tf.constant(xjets)
xjets[xjets < 1e-3] = 0.
xjets[-1, ...] = 25. * xjets[-1, ...]
xjets = xjets / 100. # Standardize
data_image = StandardScaler().fit_transform(xjets)
return data_image
def target(target_data):
target_data = tf.constant(target_data)
return target_data
import dask.dataframe as dd
train_path = ["/filename.train.snappy.parquet"]
data = dd.read_parquet(train_path, engine="pyarrow", compression = "snappy", columns = ["X_jets", "y"],
split_row_groups = 12) #Create multiple partitions so that it contains 12 rows per partition
def preprocess_chunk(chunk):
X_images = chunk["X_jets"].map_partitions(xjets_preprocess, meta=(data["X_jets"], "f8"))
y = chunk["y"].map_partitions(target, meta=(data["y"], "f8"))
return X_images, y
#Define some model
model = model
for i in range(0, num_samples, chunk_size):
chunk = data.loc[i : i + chunk_size]
X_images, y = preprocess_chunk(chunk)
X_images = X_images.compute()
y = y.compute()
model.train_on_batch(X_images, y, epochs = 1)
And then again, I came up with an error ValueError: setting an array element with a sequence.
I believe the thought process here is correct right?