Unpacking .snappy.parquet File

I am trying to read a snappy.parquet file using dask and later on, convert it to a dask array that I will be using to train my machine learning model. One of the columns is an object (supposed to be image that needs to be unpacked) and all the rest are of float64 type. Here is a dask dataframe that I am referring to.

import dask.dataframe as dd

train_path = ["/somefile.snappy.parquet"]
data = dd.read_parquet(train_path, engine="fastparquet", compression = "snappy")

I am having trouble to convert this object column into a dask array. When I executed data.compute() , the floats show but the objects in the X_jets column do not. As a matter of fact, it even returned a None value. Been stuck in this problem for already a day and I am planning to also convert it to dask array. Any help would be appreciated. Thanks

There is a pytorch implementation of it using some self.transform execution under a class for preprocessing, but I just dont want to copy paste it. I want to learn something new. To open the parquet file, it was done using pyarrow.parquet, with a code snippet below as

data1 = pq.ParquetFile("/somefile.snappy.parquet")
df = data1.read_row_group(0).to_pydict()

This is only for row 0. I understand that I need to do a for loop to get to read all the entries. But, i just wanted to know how dask.dataframe.read_parquet can help with this.

Hi @rsohaljr_14, welcome to Dask community!

By any chance, would you be able to share one of your file so we can test the behavior?

Two things I would try:

  • Is there any reason why you are using fastparquet engine instead of pyarrow?
  • Can you read correctly part of the file using just Pandas?

Hi @guillaumeeb! Thank you for the warm welcome and response. I tried using the pyarrow engine before and when the unpacking comes, it turns out that I am having an OOM (Out Of Memory) error. The snippet that I used was

# Process the data in chunks
for chunk in data.to_delayed():
    chunk.compute()

The file is quite huge, so I attached a google drive link for this one (can be accessed through QCDToGGQQ_IMGjet_RH1all_jet0_run1_n47540.test.snappy.parquet - Google Drive) . On the pytorch implementation, colleagues simply used pyarrow.parquet to read the X_jets column (which is supposed to be a 125x125 image size composed of an array) as can be seen on the code below.

class ParquetDataset(Dataset):
    def __init__(self, filename, transform=None):
        self.parquet = pq.ParquetFile(filename)
        self.cols = None 
        self.transform = transform
    def __getitem__(self, index):
        data = self.parquet.read_row_group(index, columns=self.cols).to_pydict()
        # Preprocessing
        start = time.time()
        data['X_jets'] = np.float32(data['X_jets'])
        data['X_jets'] = torch.Tensor(data['X_jets'])
        data['y'] = torch.Tensor(data['y'])
        
        #to preprocess
        data['X_jets'][data['X_jets'] < 1.e-3] = 0. # Zero-Suppression
        data['X_jets'][-1,...] = 25.*data['X_jets'][-1,...] # For HCAL: to match pixel intensity distn of other layers
        data['X_jets'] = data['X_jets']/100. # To standardize

I just wanted to see that I can use dask here as well to lazily load the dataset and avoid creating some data loaders. Thank you so much for your help.

Edit: The X_jets I believe contains three sets of data, all of which are images of size 125x125. I think the names areTracks, Ecal, and HCal in order. What was done here is for HCal. The schema looks like:
X_jets: list<item: list<item: list<item: double>>> child 0, item: list<item: list<item: double>> child 0, item: list<item: double> child 0, item: double

Could you give us a more complete code snippet of what you are trying to achieve? The code above will just load all the chunks in main process memory one by one, and so probably overwhelm your machine if the file is huge. Do you really want to load all the data in memory at the same time?

Sure. The thing is, I just wanted it load some chunks in the memory at a time, and not load all the data in one go. I expect that upon unpacking, I should arrive in a tensor of shape [3,125,125] where the 3 here means that I have three sets of numpy array, say for instance Tracks, ECal, and HCal. The ones that I gave is that I have already done. However, here is a pytorch implementation that I got from the github repository

class ParquetDataset(Dataset):
    def __init__(self, filename, transform=None):
        self.parquet = pq.ParquetFile(filename)
        self.cols = None 
        self.transform = transform
    def __getitem__(self, index):
        data = self.parquet.read_row_group(index, columns=self.cols).to_pydict()
        # Preprocessing
        start = time.time()
        data['X_jets'] = np.float32(data['X_jets'])
        data['X_jets'] = torch.Tensor(data['X_jets'])
        data['y'] = torch.Tensor(data['y'])
        
        #to preprocess
        data['X_jets'][data['X_jets'] < 1.e-3] = 0. # Zero-Suppression
        data['X_jets'][-1,...] = 25.*data['X_jets'][-1,...] # For HCAL: to match pixel intensity distn of other layers
        data['X_jets'] = data['X_jets']/100. # To standardize
        
        if self.transform:
            data['X_jets']= self.transform(data['X_jets'][0]) 
        end = time.time()
        return dict(data)
    def __len__(self):
        return self.parquet.num_row_groups


import torchvision.transforms as transforms
from torch.utils.data.sampler import SubsetRandomSampler
def get_data_loader(datasets, batch_size, cut, random_sampler=False):
    transform = torch.nn.Sequential(transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225]))
  
    dset = ConcatDataset([ParquetDataset(dataset, transform=transform) for dataset in datasets])
    idxs = np.random.permutation(len(dset))
    if random_sampler: 
        random_sampler = sampler.SubsetRandomSampler(idxs[:cut])
    else: 
        random_sampler = None 
    validation_split = .2
    shuffle_dataset = True
    random_seed= 42

# Creating data indices for training and validation splits:
    dataset_size = 55494
    indices = list(range(dataset_size))
    split = int(np.floor(validation_split * dataset_size))
    print(split)
    if shuffle_dataset :
        np.random.seed(random_seed)
        np.random.shuffle(indices)
    train_indices, val_indices = indices[split:], indices[:split]

# Creating PT data samplers and loaders:
    train_sampler = SubsetRandomSampler(train_indices)
    valid_sampler = SubsetRandomSampler(val_indices)
    print("Train indices ",train_sampler)
    print("Test indices ", valid_sampler)
    train_loader = torch.utils.data.DataLoader(dset, batch_size=batch_size, 
                                           sampler=train_sampler)
    test_loader = torch.utils.data.DataLoader(dset, batch_size=batch_size,
                                                sampler=valid_sampler)
    
    return train_loader, test_loader

dataset_train = ["/filename.snappy.parquet"]
train_loader, test_loader= get_data_loader(dataset_train, 32, cut = None, random_sampler = True)
for i, batch in enumerate(train_loader):
    X_jets,	pt, m0, y = batch["X_jets"], batch["pt"], batch["m0"],batch["y"]

For this specific case, I just wanted to load the dataset that I linked above.

What you want to achieve is still unclear to me. Do you want to iterate over the rows of the Parquet file to train your model? Does PyTorch or your machine learning library accept Dask Arrays as input? Or do you want to pre process the data? What do you want to do after you’ve loaded the data?

dd.read_parquet will give you a lazy Dask DataFrame collection, you can then apply computation on it in a distributed or streaming way using DataFrame API, or plain Pandas with map_partitions. You can also convert part of it to Dask Arrays also lazy collection and apply a computation on it in a chunked way.

Hi @guillaumeeb. I wanted to preprocess the data first, so I did

from dask_ml.preprocessing import StandardScaler #Scale to zero mean and unit variance

train_path = ["\filename.train.snappy.parquet"]
data = dd.read_parquet(train_path, engine="pyarrow", compression = "snappy")
X = data["X_jets"].astype("float32")
y = data["y"]
def xjets_preprocess(image_data):
    image_data[image_data < 1e-3] = 0.
    image_data[-1,...] = 25.*image_data[-1,...]
    image_data = image_data/100. #Standardize
    data_image = StandardScaler().fit_transform(image_data)
    return data_image

I have also known lately that you can perform transformations using functions with dask on each partition. So, what I did after is

X_images = data.map_partitions(xjets_preprocess, meta=X)

Now, after preprocessing, when I apply computation on this one, it gave me a pyarrow memory error. When I included split_row_groups = 10 to increase the partitions, the execution time to compute took forever. I have also done the X_images.to_dask_array(lengths = True) and also lasted forever. How can I convert it to dask array to lazily load the data and apply computation on chunks after preprocessing? I hope the problem I am now on makes more sense. I want to implement a resnet model in tensorflow and feed chunks of data into it. While tensorflow does not directly support dask dataframe as an input, once I convert the series dataframe that i am working on to dask array, I believe I can do something like

def dask_generator(X_images, y, batch_size):
    instances = X_images.shape[0]
    batches = instances // batch_size 
    while True:
        for i in range(batches):
            X_batch = X[i*batch_size:(i+1)*batch_size]
            y_batch = y[i*batch_size:(i+1)*batch_size]
            yield X_batch, y_batch
            
batch_size = 32
train = dask_generator(X_images, y, batch_size)
...
resnet.fit(train, steps_per_epoch = X_images.shape[0]//batch_size, shuffle = True, epochs = 10)

for it to work good.

EDIT: Below is what I am trying to accomplish, and in full code.

def xjets_preprocess(xjets):
    xjets = np.float32(xjets.values)
    xjets = tf.constant(xjets)
    xjets[xjets < 1e-3] = 0.
    xjets[-1, ...] = 25. * xjets[-1, ...]
    xjets = xjets / 100.  # Standardize
    data_image = StandardScaler().fit_transform(xjets)
    return data_image

def target(target_data):
    target_data = tf.constant(target_data)
    return target_data

import dask.dataframe as dd
train_path = ["/filename.train.snappy.parquet"]
data = dd.read_parquet(train_path, engine="pyarrow", compression = "snappy", columns = ["X_jets", "y"], 
                       split_row_groups = 12) #Create multiple partitions so that it contains 12 rows per partition
def preprocess_chunk(chunk):
    X_images = chunk["X_jets"].map_partitions(xjets_preprocess, meta=(data["X_jets"], "f8"))
    y = chunk["y"].map_partitions(target, meta=(data["y"], "f8"))
    return X_images, y

#Define some model
model = model
for i in range(0, num_samples, chunk_size):
    chunk = data.loc[i : i + chunk_size]
    X_images, y = preprocess_chunk(chunk)
    X_images = X_images.compute()
    y = y.compute()
    model.train_on_batch(X_images, y, epochs = 1)

And then again, I came up with an error ValueError: setting an array element with a sequence. I believe the thought process here is correct right?

If you call compute on the whole data set, then it will try to convert the Dask DataFrame into a Pandas one on the main process memory. So you probably don’t want to do that.

I’m not sure you can easily make a generator from a Dask collection, it’s not really meant for that.

But you might be interested about how the dask-ml Incremental Learning is implemented.

About your example, I would probably try to do something like:

def xjets_preprocess(xjets):
    "returns a numpy array from a Series"
    ...

def target(target_data):
    "returns a numpy array from a Series"
    ...

data = dd.read_parquet(...)
# Makes a Dask Array from a Dask Series
X = data["X_jets"].map_partitions(xjets_preprocess, meta=(data["X_jets"], "f8"))
# Makes a Dask Array from a Dask Series
y = data["y"].map_partitions(target, meta=(data["y"], "f8"))

model = model
for i in range(0, num_samples, chunk_size):
    X_chunk = X[i : i + chunk_size].compute()
    y_chunk = y[i : i + chunk_size].compute()
    model.train_on_batch(X_chunk, y_chunk, epochs = 1)

For the conversion of a series to a numpy array, I can use .to_dask_array() right? I was thinking that this is allowed because on this pseudocode you gave, it will eventually proceed into executing .compute() in the loop. My question then remains, will the memory be cleared after executing such a command for each batch? Worst case scenario is that after some time, the memory allocation stacks with each other and will eventually be equivalent into calling .compute() in the whole dataset. I am thinking of using .head() instead.

Yes, even with a DataFrame.

Well, I might have said something unclear earlier, if you don’t keep the object anywhere in your code, Python should be able to garbage collect it once it needs memory. If I understand correctly your use case, you have to convert Dask collections to non Dask ones like Pandas or Numpy.

Hi @guillaumeeb. I already fixed the issue on converting it to a numpy array and it took a while. After it, I faced another problem on training. I have the following modified code:

X = data["X_jets"].map_partitions(transform_partition, meta = np.array([])).to_delayed()
y = data["y"].map_partitions(target_partition, meta = np.array([])).to_delayed()

#Split the dataset into training and validation set using train_test_split from dask-ml
X_train, X_val, y_train, y_val = train_test_split(X, y, train_size = 3, test_size = 1, 
                                                  random_state = 42, shuffle = True)

def dask_generator(X_iter, y_iter):
    for delayed_X, delayed_y in zip(X_iter, y_iter):
        X = delayed_X.compute()
        y = delayed_y.compute()
        yield X, y

train_loader = dask_generator(X_train, y_train)
val_loader = dask_generator(X_val, y_val)

model.fit(train_loader, steps_per_epoch = len(X_train),  validation_data = val_loader, validation_steps = len(X_val),
          verbose=1, epochs=25, callbacks = callbacks)

And the error is:

Epoch 1/25
3/3 [==============================] - 12s 4s/step - loss: 0.7539 - accuracy: 0.4792 - val_loss: 0.7354 - val_accuracy: 0.4375 - lr: 0.0010
Epoch 2/25
WARNING:tensorflow:Your input ran out of data; interrupting training. Make sure that your dataset or generator can generate at least `steps_per_epoch * epochs` batches (in this case, 75 batches). You may need to use the repeat() function when building your dataset.
WARNING:tensorflow:Your input ran out of data; interrupting training. Make sure that your dataset or generator can generate at least `steps_per_epoch * epochs` batches (in this case, 1 batches). You may need to use the repeat() function when building your dataset.
WARNING:tensorflow:Learning rate reduction is conditioned on metric `val_loss` which is not available. Available metrics are: loss,accuracy,lr
WARNING:tensorflow:Early stopping conditioned on metric `val_loss` which is not available. Available metrics are: loss,accuracy,lr
3/3 [==============================] - 1s 226ms/step - loss: 0.7539 - accuracy: 0.4792 - lr: 0.0010

I am just using a fraction of the whole dataset to test if everything looks fine. Here, I wanted to bring all dataset fed in epoch 1 to epoch 2. However, tensorflow seems to understand that dask cannot repeatedly produce the dataset and feed all of it into each epoch. I also cannot use len(X_train)//batch_size here because it will defeat the purpose of feeding the whole dataset into each epoch. That is, this method will work but will only work until epoch 3. Is there a fix for this?