Hey, you guys!
I have been in trouble with such a simple problem, but I have no idea how to do it efficiently.
I have several thousand financial files, each of which stores the financial data in each day. Each data is a binary numpy array and can be read by np.memmap
(and converted to a DataFrame). The data is organized in the following way:
2022-07-20.dat
Stock ID, Time, Date, x_1, x_2, …
0000001, 1000, 2022-07-20, xxx, xxx
0000001, 2000, 2022-07-20, xxx, xxx
0000002, 1000, 2022-07-20, xxx, xxx
0000002, 2000, 2022-07-20, xxx, xxx
, where the rows of each stock are aggregated together, sorted by Time. And the groups of stocks are sorted by Stock ID.
However, now I have to store the full data in files, each of which represents the data of a single stock across all dates, instead of a single date across all stocks. For example:
0000001.dat
Stock ID, Time, Date, x_1, x_2, …
0000001, 1000, 2022-07-20, xxx, xxx
0000001, 2000, 2022-07-20, xxx, xxx
0000001, 1000, 2022-07-21, xxx, xxx
0000001, 2000, 2022-07-21, xxx, xxx
, where all data in a single day is sorted by Date and Time. The number of dates is several thousand, too.
This data conversion pattern is just like what is described in the picture from this blog:
The full data is too large to be all fit in memory. I have tried to generate indices of where each stock is located in each date file, and mannually write a multi-processing program to combine all data using np.memmap
and aforementioned indices. But the performance is frustrating. After resorting to Dask, I found this kind of questions are mentioned in the Dask doc:
For example, if your data is arranged by customer ID but now you want to arrange it by time, all of your partitions will have to talk to each other to exchange shards of data.
which tells us avoid full-data shuffleing. However, it seems that full-data shuffling is unavoidable in this simple problem. So what are the best practices for this problem? Or how can I avoid it? Thanks!