Deeplake Dataloaders¶
Deeplake integrates natively with PyTorch and TensorFlow, letting you stream data directly from managed tables into your training loop. No local downloads, no custom data-loading code -- just open a table and start training.
Objective¶
Learn how to connect a Deeplake managed table to PyTorch DataLoader, TensorFlow tf.data, and simple batch iteration for model training and data processing.
Prerequisites¶
- Deeplake SDK:
pip install deeplake - Deep Learning framework:
pip install torch torchvision(orpip install tensorflow). - A Deeplake API token.
Set credentials first
Complete Code¶
import torch
from torch.utils.data import DataLoader
from torchvision import transforms
from deeplake import Client
# 1. Connect to Deeplake
client = Client()
ds = client.open_table("training_data")
# 2. Create a PyTorch DataLoader
loader = DataLoader(
ds.pytorch(),
batch_size=32,
shuffle=True,
num_workers=4,
)
# 3. Train with optional transforms
tform = transforms.Compose([
transforms.ToPILImage(),
transforms.RandomRotation(20),
transforms.ToTensor(),
transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
])
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
for batch in loader:
images = torch.stack([tform(img) for img in batch["image"]]).to(device)
labels = batch["label"].to(device)
# training_step(images, labels)
Step-by-Step Breakdown¶
1. PyTorch DataLoader¶
The most common pattern. Call ds.pytorch() to get a PyTorch-compatible dataset, then wrap it in a standard DataLoader for batching, shuffling, and parallel prefetching.
from torch.utils.data import DataLoader
from deeplake import Client
client = Client()
ds = client.open_table("training_data")
loader = DataLoader(
ds.pytorch(),
batch_size=32,
shuffle=True,
num_workers=4,
)
for batch in loader:
images = batch["image"] # Tensor: (batch_size, H, W, C)
labels = batch["label"] # Tensor: (batch_size,)
# training_step(images, labels)
2. Applying Transforms¶
Deeplake streams raw data. Apply torchvision transforms in your training loop to handle augmentation, normalization, and format conversion.
from torchvision import transforms
tform = transforms.Compose([
transforms.ToPILImage(),
transforms.RandomRotation(20),
transforms.ToTensor(),
transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
])
for batch in loader:
images = torch.stack([tform(img) for img in batch["image"]])
labels = batch["label"]
# training_step(images, labels)
3. TensorFlow Dataset¶
Call ds.tensorflow() to get a tf.data.Dataset. Chain .batch() and .prefetch() for optimal GPU utilization.
You can also iterate manually:
4. Simple Batch Iteration¶
For non-training workloads (preprocessing, analysis, inference), use ds.batches() for straightforward iteration without framework dependencies.
5. Async Data Loading¶
For datasets with large tensors (images, video, embeddings), sequential row fetching becomes the bottleneck. Use get_async() to fetch multiple columns concurrently with an asyncio worker thread.
import asyncio
import threading
from multiprocessing import Queue
from torch.utils.data import IterableDataset, DataLoader
from deeplake import Client
class AsyncDeepLakeDataset(IterableDataset):
def __init__(self, ds, concurrency=128):
self.ds = ds
self.concurrency = concurrency
self.queue = Queue(maxsize=concurrency * 2)
def _worker(self):
loop = asyncio.new_event_loop()
async def fetch_all():
sem = asyncio.Semaphore(self.concurrency)
async def fetch_one(idx):
async with sem:
row = self.ds[idx]
image, label = await asyncio.gather(
row.get_async("image"),
row.get_async("label"),
)
self.queue.put((image, label))
tasks = [fetch_one(i) for i in range(len(self.ds))]
await asyncio.gather(*tasks)
loop.run_until_complete(fetch_all())
self.queue.put(None) # Sentinel
def __iter__(self):
thread = threading.Thread(target=self._worker, daemon=True)
thread.start()
while True:
item = self.queue.get()
if item is None:
break
yield item
client = Client()
ds = client.open_table("training_data")
loader = DataLoader(AsyncDeepLakeDataset(ds), batch_size=32)
for images, labels in loader:
# training_step(images, labels)
pass
This pattern fetches up to 128 rows concurrently in a background thread, yielding them through a queue. On image-heavy datasets, this can be 2-3x faster than sequential loading.
6. Custom PyTorch Dataset Wrapper¶
For complex preprocessing or multi-table joins, wrap open_table in a custom torch.utils.data.Dataset.
import torch
from torch.utils.data import Dataset, DataLoader
from deeplake import Client
class DeepLakeDataset(Dataset):
def __init__(self, client, table_name, transform=None):
self.ds = client.open_table(table_name)
self.transform = transform
self.length = len(self.ds)
def __len__(self):
return self.length
def __getitem__(self, idx):
row = self.ds[idx]
image = row["image"]
label = row["label"]
if self.transform:
image = self.transform(image)
return image, label
# Usage
client = Client()
dataset = DeepLakeDataset(
client,
"training_data",
transform=transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((224, 224)),
transforms.ToTensor(),
]),
)
loader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=4)
for images, labels in loader:
# training_step(images, labels)
pass
Why no REST API?¶
Streaming high-performance tensor data over standard REST endpoints introduces significant latency and CPU overhead due to HTTP headers and JSON serialization. For high-throughput training, the Python SDK is the only supported method as it uses optimized C++ streaming kernels.
Performance Tips¶
| Parameter | Recommendation |
|---|---|
num_workers |
Start with the number of CPU cores. For I/O-bound workloads (cloud streaming), use 2-4x CPU cores. |
prefetch_factor |
Default is 2. Increase to 4-8 if your GPU is starved for data. |
batch_size |
Maximize GPU memory usage. Larger batches reduce per-sample overhead. |
shuffle |
Use True for training. Disable for inference to preserve order. |
pin_memory |
Set True when training on GPU to speed up host-to-device transfer. |
Streaming vs. local: Deeplake streams data from cloud storage by default. For repeated epochs on smaller datasets, consider caching locally to avoid redundant network I/O.
Example with all optimizations:
What to try next¶
- GPU-Streaming Pipeline: direct-to-GPU streaming for training.
- Massive Ingestion: prepare large-scale datasets for training.
- Reference: Querying: details on
open_table().