Skip to content

Async Data Loader

Overview

This document describes the implementation of a custom DataLoader for handling data retrieval using deeplake.Dataset with PyTorch. The DataLoader supports both sequential and asynchronous data fetching, with the asynchronous approach being optimized for performance and speed.

Dataset Structure

The dataset comprises pairs of images and their respective masks. Each image is a high-resolution file, while each mask is a binary image indicating the regions of interest within the corresponding image.

Sequential data fetching

This ImageDataset class is a custom implementation of a PyTorch dataset that uses deeplake.Dataset as a datasource.

class ImageDataset(torch.utils.data.Dataset):
    def __init__(self, deeplake_ds: deeplake.Dataset, transform: Callable = None):
        self.ds = deeplake_ds
        self.transform = transform

    def __len__(self):
        return len(self.ds)

    def __getitem__(self, item):
        image =  self.ds[item]["images"]
        mask = self.ds[item]["masks"]

        if self.transform is not None:
            image, mask = self.transform((image, mask))

        return image, mask

In the sequential fetching approach, data is loaded one item at a time in a synchronous manner. While this method is straightforward, it can become a bottleneck when working with large datasets with multiple tensors.

Asynchronous Data Fetching

The asynchronous fetching method utilizes asyncio and threading to load data in parallel. This significantly improves loading times, especially for large datasets with multiple tensors.

import deeplake

import asyncio
from threading import Thread, Lock
from multiprocessing import Queue

lock = Lock()
index = -1
def generate_data(ds: deeplake.Dataset):
    total_count = len(ds)
    global index
    while True:
        idx = 0
        with lock:
            index = (index + 1) % (total_count - 1)
            idx = index
        yield ds[idx]

class AsyncImageDataset(torch.utils.data.IterableDataset):
    def __init__(self, deeplake_ds: deeplake.Dataset, transform: Callable = None, max_queue_size: int = 1024):
        self.ds = deeplake_ds
        self.transform = transform
        self.worked_started = False
        self.data_generator = generate_data(self.ds)
        self.q = Queue(maxsize=max_queue_size)

    async def run_async(self):
        for item in self.data_generator:
            data = await asyncio.gather(
                item.get_async("images"),
                item.get_async("masks")
            )
            self.q.put(data)

    def start_worker(self):
        loop = asyncio.new_event_loop()

        for _ in range(128):
            loop.create_task(self.run_async())

        def loop_in_thread(loop):
            asyncio.set_event_loop(loop)
            loop.run_forever()

        self.loop_thread = Thread(target=loop_in_thread, args=(loop,), daemon=True)
        self.loop_thread.start()

        self.worked_started = True

    def __iter__(self):
        while True:
            if not self.worked_started:
                self.start_worker()

            # wait until some data is filled
            while self.q.empty():
                pass

            image, mask = self.q.get()
            if self.transform is not None:
                image, mask = self.transform((image, mask))

            yield image, mask

The AsyncImageDataset utilizes Python’s asyncio library to fetch images and masks concurrently from deeplake.Dataset, minimizing data loading times. The class implements a separate thread to run an event loop, allowing multiple data retrieval tasks to operate simultaneously. A multiprocessing Queue is used to store retrieved items, enabling thread-safe communication between data loading threads and the main processing loop.

Benchmark results

Method Average Loading Time (seconds per batch)
Sequential 6.2
Asynchronous 2.15

While the sequential method is simpler to implement, the asynchronous approach offers substantial performance benefits, making it the preferred choice for handling larger datasets in machine learning workflows. This flexibility allows users to choose the best method suited to their specific use case, ensuring efficient data handling and model training.