0

I'm currently working on a model similar to DeepLOB, using high-frequency tick-level financial data. Due to the massive volume and the need to structure the data into time series format, it's impossible to load the entire dataset into memory at once.

To handle this, I implemented a custom data loader that reads from .npy files in batches. Here's my Dataloader.py:

import numpy as np
import os
import gc

class Np_DataLoader_Cache:
    
    def __init__(self, data_dir, file_list, batch_size, train_ratio=0.8):
        self.data_dir = data_dir
        self.batch_size = batch_size
        self.file_list = file_list
        self.train_ratio = train_ratio
        
   
        self.current_file_id = -1
        self.current_data = None

       
        self.shapes = []
        for file in file_list:
            with open(os.path.join(data_dir, file), 'rb') as f:
                version = np.lib.format.read_magic(f)
                shape, fortran_order, dtype = np.lib.format.read_array_header_1_0(f)
                self.shapes.append(shape[0])

        self.prefix_sums = [0]
        for s in self.shapes:
            self.prefix_sums.append(self.prefix_sums[-1] + s)

        self.total_samples = self.prefix_sums[-1]
        self.train_end_idx = int(self.total_samples * self.train_ratio)
        self.valid_start_idx = self.train_end_idx

    def _load_file_if_needed(self, file_id):
        
        if self.current_file_id != file_id:
            if self.current_data is not None:
                del self.current_data
                gc.collect()
            filename = self.file_list[file_id]
            self.current_data = np.load(os.path.join(self.data_dir, filename), mmap_mode='r')
            self.current_file_id = file_id

    def _get_batch(self, start_row_global, end_row_global):
        
        files_needed = []
        current_row = start_row_global
        file_id = 0

        while current_row < end_row_global:
            
            while file_id < len(self.prefix_sums) - 1 and self.prefix_sums[file_id + 1] <= current_row:
                file_id += 1

            self._load_file_if_needed(file_id)

            file_start_in_global = self.prefix_sums[file_id]
            file_start_in_file = current_row - file_start_in_global
            file_end_in_global = min(end_row_global, self.prefix_sums[file_id + 1])
            file_end_in_file = file_end_in_global - file_start_in_global

            slice_data = self.current_data[file_start_in_file:file_end_in_file]
            files_needed.append(slice_data)
            current_row += slice_data.shape[0]

        return np.concatenate(files_needed, axis=0)

    def get_train_batch(self, batch_index):
        start_row = batch_index * self.batch_size
        end_row = min((batch_index + 1) * self.batch_size, self.train_end_idx)
        if start_row >= end_row:
            return None
        return self._get_batch(start_row, end_row)

    def get_valid_batch(self, batch_index):
        start_row = self.valid_start_idx + batch_index * self.batch_size
        end_row = min(self.valid_start_idx + (batch_index + 1) * self.batch_size, self.total_samples)
        if start_row >= end_row:
            return None
        return self._get_batch(start_row, end_row)

    def close(self):
        if self.current_data is not None:
            del self.current_data
            self.current_data = None
            self.current_file_id = -1
            gc.collect()

However, during training, I found that the data loading process is extremely slow — not the model computation itself, but specifically the time it takes to iterate over the dataset. To investigate, I ran some experiments to measure the batch loading speed:

import os
data_dir = './LOB_OFI_sortcode_NoResample/'
filelist = sorted([f for f in os.listdir(data_dir) if 'npy' in f])[:20]
from DataLoader_NP_Cache import Np_DataLoader_Cache
demo2 = Np_DataLoader_Cache(data_dir, filelist, batch_size = 4096, train_ratio = 0.8)
batch_nums = demo2.train_end_idx // demo2.batch_size + 1

import time
begin_t = time.time()
very_first = time.time()
for batch_index in range(batch_nums):

    mini_batch = demo2.get_train_batch(batch_index)
    
    if batch_index % 5000 == 0:

        end_t = time.time()
        print( f"Batch of {batch_index} Done. Process Ratio is {batch_index / batch_nums}" )
        elapsed_time = end_t - begin_t
        print(f"This 5000Batchs using time: {elapsed_time:.2f} s")
        begin_t = time.time()

end_t = time.time()
elapsed_time = end_t - very_first
print(f"Total: {elapsed_time:.2f} s")

enter image description here

I noticed that:

Some batches load at a reasonable speed (~15 seconds per 5000 batches),

But occasionally, a batch would suddenly take 100+ seconds, and the speed fluctuates heavily.

These slowdowns appear random — sometimes it recovers, sometimes not. And Sometimes it appears quite early, and sometimes it appears later, just like the picture I posted. But in fact, it often appears for the third or fourth 5000 batchs.

I'm wondering if there's something wrong with how I've implemented the data loader or how numpy is loading data (I'm using np.load(..., mmap_mode='r') internally).

Any suggestions or advice would be greatly appreciated. Thank you!

6
  • Have you ever thought to give DuckDB a try? Commented Jul 11 at 11:31
  • Can you make sure that the issue is not that one of your input files is corrupted in any way (I once had a similar issue and it was caused by one of my files containing a concatenation of all other files due to a typo in a command, causing a very long loading time) Commented Jul 11 at 13:55
  • Another idea, if I'm not mistaken your demo2.file_list is a list of 4096*880k filenames, stored two times as it is also in the filenames variable? This takes at least 4G of RAM by itself, and as you probably also store in RAM part of your model and the data of your batch, a saturated RAM could be what freezes your Python for a couple of minutes until it figures everything out and returns the expected result, or dies... I'd ensure my RAM is not dying out by looking at the Resource manager stats while the code is running Commented Jul 11 at 14:07
  • Do you have a link to an example input file so that we can run the code and see if we can reproduce the error? Commented Jul 11 at 14:10
  • Final advice, sorry for the many posts, but also ensure that the batches you return have the expected size, as a faulty logic somewhere in your dataloader could also cause this kind of issue Commented Jul 11 at 14:12

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.