Efficient txt to parquet with column transformation

I’m working with large txt files that I have to process in order to store them efficiently. The txts are about 100gb, and after some rows they present a repeating pattern: a timestamp, 16 rows of data (tab-separated, with comma as decimal, about 10k columns), and a blank row. The 16 rows refer to readings from 16 channels at the timestamp above them. The first column is hence mixed, with timestamps and data.
The purpose of the script is to convert this file in one or more parquet files, with 2 additional columns: timestamp and channel, so in every row both of them are present, and the data can be easily filtered afterwards. I’ve tried to exploit dask parallel capabilities in order to make this process as fast as possible, while keeping copies number down to keep RAM usage under control.
I’ve come up with this unfinished solution, which employs process_partition_with_buffer in order to mitigate the processing gaps among different file chunks and works on it with pandas, but this approach misses some rows (probably the ones that get overwritten at the beginning). I’d really like some suggestion about the best way to proceed before continuing with this method; I’ve looked also the rolling.apply() function, Futures and Delayed, and I’m having an hard time figuring out the best approach.
Being a beginner in python and dask, any advice is greatly appreciated.

def process_partition_with_buffer(df, partition_info=None, wl_cols=None) -> pd.DataFrame:

    df = df.reset_index(drop=True)
    first_col = df[wl_cols[0]]

    # Identify rows that contain timestamps
    timestamp_mask = first_col.str.contains(REGEX_TIMESTAMP, regex=True)

    # Get the index of the first and last timestamp
    first_timestamp_index = timestamp_mask.idxmax()
    last_timestamp_index = timestamp_mask[::-1].idxmax()

    # Get the timestamps
    timestamps = first_col.loc[timestamp_mask]

    # Simple way to tell if there are 17 rows after the last timestamp: try to access the 17th row after the last timestamp
    try:
        if df.iloc[last_timestamp_index + 17, 0].isna():
            print("End of file reached.")
            ch_filler = np.tile(CHANNEL_RANGE, len(timestamps))
            last_timestamp_index += 17
            timestamp_filler = pd.to_datetime(np.repeat(timestamps.values, ROWS_PER_TIMESTAMP + 1), format='%d.%m.%Y %H.%M.%S.%f')
    except IndexError:
        ch_filler = np.tile(CHANNEL_RANGE, len(timestamps)-1)
        timestamp_filler = pd.to_datetime(np.repeat(timestamps[:-1].values, ROWS_PER_TIMESTAMP + 1), format='%d.%m.%Y %H.%M.%S.%f')
        pass

    df['timestamp'] = None
    df['channel'] = None

    # Assign the timestamps and channels
    df.iloc[first_timestamp_index:last_timestamp_index, -2] = timestamp_filler
    df.iloc[first_timestamp_index:last_timestamp_index, -1] = ch_filler
    
    # Delete the timestamp rows
    df = df.drop(df[timestamp_mask].index)
    df[wl_cols[0]] = first_col.str.replace(',', '.')

    # Convert the first wavelength column to float
    df[wl_cols[0]] = df[wl_cols[0]].astype('float32')

    return df

def process_file(input_file, output_file):

    client = Client()
    print(client)
    
    # Generate wavelength column names
    wavelengths = np.arange(wavelength_start, 
                            wavelength_start + num_points * wavelength_delta, 
                            wavelength_delta)
    wl_cols = [f"{wl:.3f}" for wl in wavelengths]

    # Initialize the schema with the first column as string
    schema = {wl_cols[0]: str}  
    schema.update({col: np.float32 for col in wl_cols[1:]})  # Rest as Float32

    # Read the file with Dask
    ddf = dd.read_csv(
        input_file, 
        sep='\t', 
        skiprows=SKIP_ROWS, 
        header=None, 
        names=wl_cols,
        index_col=False,
        skip_blank_lines=True,
        compression=None,
        dtype=schema,
        decimal=',',
        blocksize='512MB',
        engine='c',
        dtype_backend='pyarrow'
    )

    # Generate meta_df inforation for map_overlap
    meta = {
        col: np.float32(0.0) for col in wl_cols
    }
    meta.update({
        'timestamp': pd.Timestamp('2000-01-01 00:00:00.000'),
        'channel': np.uint8(1),
    })
    meta_df = pd.DataFrame([meta])

    # Process the data
    ddf=ddf.map_overlap(process_partition_with_buffer, before=17, after=0, wl_cols=wl_cols, meta=meta_df)

    # Write the data to Parquet
    ddf.to_parquet(
        output_file,
        engine='pyarrow'
    )

    # Close the client
    client.close()

Hi @tiarap00, welcome to Dask community!

As your data is stored in a particular format that doesn’t look like CSV at all, I would suggest to try another approach. I think the best choice would either be Delayed or Bag ag first, and then convert this to a Dask Dataframe using appropriate methods.

You should take a look at read_text, I think you could configure it with the correct line separator to have your record grouped.

Maybe we could be of further help with some real example of your data, including header rows.

Hi, thank you for the warm welcome!
I used the read_csv as this allows easy value separation, I still need to grasp the concept of “bags”. I’ll try to look better at the function you kindly suggested.

Here’s an example of my data, the columns have no header, the first 109 rows of the file are unformatted data (which i read separately).

Could you print that data as text?

I can’t upload the text file, but here you can find a smaller one with the same structure.

Here’s an updated link for the file structure.

I’ve tried to implement apply to filter the timestamp rows as follows, is there any improvement I could implement to reduce memory usage and/or improve speed? Currently need the .compute() instructions to properly chunk my array, is there any way to get the row count of a chunk on the go and avoid .compute? That could help also because I can memory-map the file, extract all the timestamps with a regex and the rechunk that array too, potentially avoid apply and ffill.

    ddf = dd.read_csv(
        input_file, 
        sep='\t', 
        skiprows=SKIP_ROWS, 
        header=None, 
        names=wl_cols,
        index_col=False,
        skip_blank_lines=True,
        compression=None,
        dtype=schema,
        decimal=',',
        blocksize='512MB',
        engine='c',
        dtype_backend='pyarrow'
    )
    
    ddf['Timestamp'] = ddf[wl_cols[0]].apply(
        lambda x: pd.to_datetime(x, format='%d.%m.%Y %H.%M.%S.%f') 
        if REGEX_TIMESTAMP.match(x) 
        else None, 
        meta=('Timestamp', 'datetime64[ns]')
        ).ffill()
    ddf[wl_cols[0]] = ddf[wl_cols[0]].apply(
        lambda x: np.nan if REGEX_TIMESTAMP.match(x) 
        else x.replace(',', '.'), 
        meta=(wl_cols[0], 'str')
        ).astype('float32')

    df_partition_sizes = ddf.map_partitions(len).compute()
    channel_array = np.resize(CHANNEL_RANGE, df_partition_sizes.sum())
    channel_array = da.from_array(channel_array, chunks=tuple(df_partition_sizes.values), asarray=False)
    ddf = ddf.assign(Channel=channel_array)

    ddf.to_parquet(
        output_file,
        engine='pyarrow'
    )

Sorry, transfer has expired.

Again, I would not recommend using read_csv here.