I’m working with large txt files that I have to process in order to store them efficiently. The txts are about 100gb, and after some rows they present a repeating pattern: a timestamp, 16 rows of data (tab-separated, with comma as decimal, about 10k columns), and a blank row. The 16 rows refer to readings from 16 channels at the timestamp above them. The first column is hence mixed, with timestamps and data.
The purpose of the script is to convert this file in one or more parquet files, with 2 additional columns: timestamp and channel, so in every row both of them are present, and the data can be easily filtered afterwards. I’ve tried to exploit dask parallel capabilities in order to make this process as fast as possible, while keeping copies number down to keep RAM usage under control.
I’ve come up with this unfinished solution, which employs process_partition_with_buffer in order to mitigate the processing gaps among different file chunks and works on it with pandas, but this approach misses some rows (probably the ones that get overwritten at the beginning). I’d really like some suggestion about the best way to proceed before continuing with this method; I’ve looked also the rolling.apply() function, Futures and Delayed, and I’m having an hard time figuring out the best approach.
Being a beginner in python and dask, any advice is greatly appreciated.
def process_partition_with_buffer(df, partition_info=None, wl_cols=None) -> pd.DataFrame:
df = df.reset_index(drop=True)
first_col = df[wl_cols[0]]
# Identify rows that contain timestamps
timestamp_mask = first_col.str.contains(REGEX_TIMESTAMP, regex=True)
# Get the index of the first and last timestamp
first_timestamp_index = timestamp_mask.idxmax()
last_timestamp_index = timestamp_mask[::-1].idxmax()
# Get the timestamps
timestamps = first_col.loc[timestamp_mask]
# Simple way to tell if there are 17 rows after the last timestamp: try to access the 17th row after the last timestamp
try:
if df.iloc[last_timestamp_index + 17, 0].isna():
print("End of file reached.")
ch_filler = np.tile(CHANNEL_RANGE, len(timestamps))
last_timestamp_index += 17
timestamp_filler = pd.to_datetime(np.repeat(timestamps.values, ROWS_PER_TIMESTAMP + 1), format='%d.%m.%Y %H.%M.%S.%f')
except IndexError:
ch_filler = np.tile(CHANNEL_RANGE, len(timestamps)-1)
timestamp_filler = pd.to_datetime(np.repeat(timestamps[:-1].values, ROWS_PER_TIMESTAMP + 1), format='%d.%m.%Y %H.%M.%S.%f')
pass
df['timestamp'] = None
df['channel'] = None
# Assign the timestamps and channels
df.iloc[first_timestamp_index:last_timestamp_index, -2] = timestamp_filler
df.iloc[first_timestamp_index:last_timestamp_index, -1] = ch_filler
# Delete the timestamp rows
df = df.drop(df[timestamp_mask].index)
df[wl_cols[0]] = first_col.str.replace(',', '.')
# Convert the first wavelength column to float
df[wl_cols[0]] = df[wl_cols[0]].astype('float32')
return df
def process_file(input_file, output_file):
client = Client()
print(client)
# Generate wavelength column names
wavelengths = np.arange(wavelength_start,
wavelength_start + num_points * wavelength_delta,
wavelength_delta)
wl_cols = [f"{wl:.3f}" for wl in wavelengths]
# Initialize the schema with the first column as string
schema = {wl_cols[0]: str}
schema.update({col: np.float32 for col in wl_cols[1:]}) # Rest as Float32
# Read the file with Dask
ddf = dd.read_csv(
input_file,
sep='\t',
skiprows=SKIP_ROWS,
header=None,
names=wl_cols,
index_col=False,
skip_blank_lines=True,
compression=None,
dtype=schema,
decimal=',',
blocksize='512MB',
engine='c',
dtype_backend='pyarrow'
)
# Generate meta_df inforation for map_overlap
meta = {
col: np.float32(0.0) for col in wl_cols
}
meta.update({
'timestamp': pd.Timestamp('2000-01-01 00:00:00.000'),
'channel': np.uint8(1),
})
meta_df = pd.DataFrame([meta])
# Process the data
ddf=ddf.map_overlap(process_partition_with_buffer, before=17, after=0, wl_cols=wl_cols, meta=meta_df)
# Write the data to Parquet
ddf.to_parquet(
output_file,
engine='pyarrow'
)
# Close the client
client.close()