LeRobot¶
LeRobot is HuggingFace's open-source robotics framework with 100+ datasets covering manipulation, locomotion, and navigation. Datasets use the LeRobot v3.0 format: parquet files for sensor telemetry + MP4 videos for camera streams.
Deeplake turns these datasets into queryable, GPU-streamable tables, so you can filter by episode, search by sensor state, and feed directly into PyTorch training loops without downloading everything locally.
Objective¶
Ingest a LeRobot manipulation dataset (ALOHA mobile shrimp) into Deeplake using the managed SDK, then query and stream it for training.
Prerequisites¶
- Deeplake SDK:
pip install deeplake - LeRobot dependencies:
pip install huggingface-hub av pillow pandas pyarrow - A Deeplake API token.
Set credentials first
Complete Code¶
import os
import json
import time
import numpy as np
import pandas as pd
from glob import glob
from io import BytesIO
from PIL import Image
from deeplake import Client
# ── 1. Setup ──────────────────────────────────────────────────
client = Client()
# ── 2. Download LeRobot dataset from HuggingFace ────────────
from huggingface_hub import snapshot_download
REPO_ID = "lerobot/aloha_mobile_shrimp"
local_dir = snapshot_download(REPO_ID, repo_type="dataset")
# Read dataset metadata
with open(os.path.join(local_dir, "meta", "info.json")) as f:
info = json.load(f)
# Read task descriptions (format varies: .jsonl, .json, or .parquet)
tasks = {}
for tasks_file in ["tasks.jsonl", "tasks.json"]:
tasks_path = os.path.join(local_dir, "meta", tasks_file)
if not os.path.exists(tasks_path):
continue
with open(tasks_path) as f:
if tasks_file.endswith(".jsonl"):
for line in f:
t = json.loads(line)
tasks[t["task_index"]] = t["task"]
else:
for t in json.load(f):
tasks[t["task_index"]] = t["task"]
break
# Fallback: tasks.parquet (LeRobot v3.0 newer datasets)
if not tasks:
tasks_pq = os.path.join(local_dir, "meta", "tasks.parquet")
if os.path.exists(tasks_pq):
tdf = pd.read_parquet(tasks_pq).reset_index()
for _, row in tdf.iterrows():
tasks[int(row["task_index"])] = str(row.iloc[0])
# ── 3. Parse parquet telemetry ──────────────────────────────
pq_files = sorted(glob(os.path.join(local_dir, "data", "**", "*.parquet"), recursive=True))
df = pd.concat([pd.read_parquet(f) for f in pq_files], ignore_index=True)
df = df.sort_values(["episode_index", "frame_index"]).reset_index(drop=True)
EPISODE = 0
CAMERAS = ["cam_high", "cam_left_wrist", "cam_right_wrist"]
ep_df = df[df["episode_index"] == EPISODE].reset_index(drop=True)
n_frames = len(ep_df)
frame_offset = int(df[df["episode_index"] < EPISODE].shape[0])
print(f"Episode {EPISODE}: {n_frames} frames, {len(CAMERAS)} cameras")
print(f"FPS: {info['fps']}, DOF: 14 (dual ALOHA arms)")
# ── 4. Decode video frames ──────────────────────────────────
import av
cam_frames = {}
for cam in CAMERAS:
cam_key = f"observation.images.{cam}"
mp4s = sorted(glob(os.path.join(local_dir, "videos", cam_key, "**", "*.mp4"), recursive=True))
frames = []
total_count = 0
for mp4_path in mp4s:
container = av.open(mp4_path)
for frame in container.decode(container.streams.video[0]):
if total_count < frame_offset:
total_count += 1
continue
if len(frames) >= n_frames:
break
rgb = frame.to_ndarray(format="rgb24")
buf = BytesIO()
Image.fromarray(rgb).save(buf, format="JPEG", quality=85)
frames.append(buf.getvalue())
total_count += 1
container.close()
if len(frames) >= n_frames:
break
cam_frames[cam] = frames
# ── 5. Ingest into Deeplake ────────────────────────────────
# Build columnar data: 1 row per camera per frame
data = {
"image": [],
"camera_name": [],
"state": [],
"action": [],
"effort": [],
"episode_index": [],
"frame_index": [],
"timestamp": [],
"task": [],
}
for fi in range(n_frames):
row = ep_df.iloc[fi]
state = np.array(row["observation.state"], dtype=np.float32).tolist()
action = np.array(row["action"], dtype=np.float32).tolist()
effort = (
np.array(row["observation.effort"], dtype=np.float32).tolist()
if "observation.effort" in ep_df.columns
else [0.0] * 14
)
task_str = tasks.get(int(row.get("task_index", 0)), "")
for cam in CAMERAS:
data["image"].append(cam_frames[cam][fi])
data["camera_name"].append(cam)
data["state"].append(state)
data["action"].append(action)
data["effort"].append(effort)
data["episode_index"].append(int(row["episode_index"]))
data["frame_index"].append(int(row["frame_index"]))
data["timestamp"].append(float(row["timestamp"]))
data["task"].append(task_str)
# Format class ensures IMAGE domain type in PostgreSQL
class LeRobotFormat:
def __init__(self, data):
self._data = data
def schema(self):
return {"image": "IMAGE"}
def pg_schema(self):
return {"image": "IMAGE"}
def normalize(self):
yield self._data
# Drop existing table if re-running
client.drop_table("aloha_shrimp")
result = client.ingest("aloha_shrimp", format=LeRobotFormat(data))
print(f"Ingested {result['row_count']} rows")
# ── 6. Query the data ───────────────────────────────────────
# Count frames per camera
stats = client.query("""
SELECT camera_name, COUNT(*) as frame_count
FROM aloha_shrimp
GROUP BY camera_name
""")
for s in stats:
print(f" {s['camera_name']}: {s['frame_count']} frames")
# Find frames where the robot is moving (action magnitude > threshold)
moving = client.query("""
SELECT frame_index, camera_name, action
FROM aloha_shrimp
WHERE camera_name = 'cam_high'
ORDER BY frame_index
LIMIT 10
""")
# ── 7. Stream to PyTorch training loop ──────────────────────
import torch
from torch.utils.data import DataLoader
ds = client.open_table("aloha_shrimp")
# Option A: PyTorch DataLoader (shuffling, multi-worker)
loader = DataLoader(ds.pytorch(), batch_size=32, num_workers=4)
for batch in loader:
states = batch["state"] # [batch, 14] tensor
actions = batch["action"] # [batch, 14] tensor
print(f"DataLoader batch: {states.shape}")
break
# Option B: Native prefetching dataloader (async I/O, no torch dependency)
for batch in ds.batches(batch_size=32):
states = torch.from_numpy(batch["state"]) # [batch, 14]
actions = torch.from_numpy(batch["action"]) # [batch, 14]
images = batch["image"] # list of JPEG bytes
# train_policy(images, states, actions)
print(f"Prefetch batch: {states.shape}")
break
# Requires: export DEEPLAKE_API_KEY="..." (see quickstart)
# Requires: export DEEPLAKE_ORG_ID="your-org-id"
API_URL="https://api.deeplake.ai"
TABLE="aloha_shrimp"
# 1. Create the robotics schema
curl -s -X POST "$API_URL/workspaces/$DEEPLAKE_WORKSPACE/tables/query" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $DEEPLAKE_API_KEY" \
-H "X-Activeloop-Org-Id: $DEEPLAKE_ORG_ID" \
-d '{
"query": "CREATE TABLE IF NOT EXISTS \"'$DEEPLAKE_WORKSPACE'\".\"'$TABLE'\" (id BIGSERIAL PRIMARY KEY, camera_name TEXT, state FLOAT4[], action FLOAT4[], effort FLOAT4[], episode_index INT4, frame_index INT4, timestamp FLOAT4, task TEXT) USING deeplake"
}'
# 2. Insert a telemetry row
curl -s -X POST "$API_URL/workspaces/$DEEPLAKE_WORKSPACE/tables/query" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $DEEPLAKE_API_KEY" \
-H "X-Activeloop-Org-Id: $DEEPLAKE_ORG_ID" \
-d '{
"query": "INSERT INTO \"'$DEEPLAKE_WORKSPACE'\".\"'$TABLE'\" (camera_name, state, action, effort, episode_index, frame_index, timestamp, task) VALUES ($1, $2::float4[], $3::float4[], $4::float4[], 0, 0, 0.0, $5)",
"params": ["cam_high", "{0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1.0,1.1,1.2,1.3,1.4}", "{0.01,0.02,0.03,0.04,0.05,0.06,0.07,0.08,0.09,0.1,0.11,0.12,0.13,0.14}", "{0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0}", "Pick up the shrimp"]
}'
# 3. Query frames by camera
curl -s -X POST "$API_URL/workspaces/$DEEPLAKE_WORKSPACE/tables/query" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $DEEPLAKE_API_KEY" \
-H "X-Activeloop-Org-Id: $DEEPLAKE_ORG_ID" \
-d '{
"query": "SELECT camera_name, COUNT(*) as cnt FROM \"'$DEEPLAKE_WORKSPACE'\".\"'$TABLE'\" GROUP BY camera_name"
}'
Step-by-Step Breakdown¶
1. LeRobot Dataset Format¶
LeRobot v3.0 datasets on HuggingFace use a three-part structure:
| Component | Format | Contents |
|---|---|---|
| Telemetry | Parquet shards | Joint state, action, effort, timestamps, episode/frame indices |
| Video | MP4 per camera | Synchronized camera streams (one file per camera view) |
| Metadata | JSON | FPS, episode counts, task descriptions, sensor statistics |
The ALOHA dataset has 14 degrees of freedom (7 per arm): waist, shoulder, elbow, forearm roll, wrist angle, wrist rotate, and gripper.
2. One Row Per Camera Per Frame¶
The ingestion schema stores one row per camera per frame. For an episode with 3,750 frames and 3 cameras, that's 11,250 rows. This flat layout makes it easy to:
- Filter by camera:
WHERE camera_name = 'cam_high' - Filter by episode:
WHERE episode_index = 0 - Join sensor data with visual data in a single query
3. Schema Design¶
| Column | Type | Description |
|---|---|---|
image |
IMAGE | JPEG-encoded camera frame (480x640) |
camera_name |
TEXT | cam_high, cam_left_wrist, or cam_right_wrist |
state |
EMBEDDING | 14-DOF joint positions (normalized) |
action |
EMBEDDING | 14-DOF robot commands |
effort |
EMBEDDING | 14-DOF joint torques |
episode_index |
INT64 | Episode number |
frame_index |
INT64 | Frame number within episode |
timestamp |
FLOAT64 | Seconds since episode start |
task |
TEXT | Task description (e.g., "Pick up the shrimp") |
State and action as embeddings
Storing state and action as float arrays lets you use Deeplake's vector similarity search (<#>) to find episodes where the robot was in a similar configuration. This is useful for retrieval-augmented policy learning.
4. GPU Streaming¶
client.open_table() returns a deeplake.Dataset that streams from the cloud without downloading the full dataset. Two loading options:
| Method | Best for |
|---|---|
DataLoader(ds.pytorch(), ...) |
Training loops that need shuffling and multi-worker parallelism |
ds.batches(batch_size) |
Sequential iteration with native async prefetching (no torch dependency) |
Multi-Episode Ingestion¶
To ingest all 18 episodes of the ALOHA shrimp dataset:
for episode in range(18):
ep_df = df[df["episode_index"] == episode].reset_index(drop=True)
# ... decode frames and build data dict as above ...
fmt = LeRobotFormat(data) # reuse format class from above
if episode == 0:
client.drop_table("aloha_shrimp_full")
client.ingest("aloha_shrimp_full", format=fmt)
Custom Format Class¶
For repeated ingestion of LeRobot datasets, create a reusable format class:
class LeRobotFormat:
"""Format class for LeRobot v3.0 datasets (parquet + MP4)."""
def __init__(self, local_dir, episodes=None, cameras=None):
self.local_dir = local_dir
self.episodes = episodes # None = all episodes
self.cameras = cameras or ["cam_high", "cam_left_wrist", "cam_right_wrist"]
def schema(self):
return {"image": "IMAGE"}
def pg_schema(self):
return {"image": "IMAGE"}
def normalize(self):
import av
# Load metadata
with open(os.path.join(self.local_dir, "meta", "info.json")) as f:
info = json.load(f)
tasks = {}
for name in ["tasks.jsonl", "tasks.json"]:
p = os.path.join(self.local_dir, "meta", name)
if os.path.exists(p):
with open(p) as f:
if name.endswith(".jsonl"):
for line in f:
t = json.loads(line)
tasks[t["task_index"]] = t["task"]
else:
for t in json.load(f):
tasks[t["task_index"]] = t["task"]
break
# Fallback: tasks.parquet (LeRobot v3.0 newer datasets)
if not tasks:
tasks_pq = os.path.join(self.local_dir, "meta", "tasks.parquet")
if os.path.exists(tasks_pq):
tdf = pd.read_parquet(tasks_pq).reset_index()
for _, row in tdf.iterrows():
tasks[int(row["task_index"])] = str(row.iloc[0])
# Load all parquet shards
pq_files = sorted(glob(os.path.join(self.local_dir, "data", "**", "*.parquet"), recursive=True))
df = pd.concat([pd.read_parquet(f) for f in pq_files], ignore_index=True)
df = df.sort_values(["episode_index", "frame_index"]).reset_index(drop=True)
all_episodes = sorted(df["episode_index"].unique())
if self.episodes is not None:
all_episodes = [e for e in all_episodes if e in self.episodes]
for episode in all_episodes:
ep_df = df[df["episode_index"] == episode].reset_index(drop=True)
n_frames = len(ep_df)
frame_offset = int(df[df["episode_index"] < episode].shape[0])
# Decode video frames for this episode
cam_frames = {}
for cam in self.cameras:
cam_key = f"observation.images.{cam}"
mp4s = sorted(glob(
os.path.join(self.local_dir, "videos", cam_key, "**", "*.mp4"),
recursive=True,
))
if not mp4s:
continue
frames = []
total_count = 0
for mp4_path in mp4s:
container = av.open(mp4_path)
for frame in container.decode(container.streams.video[0]):
if total_count < frame_offset:
total_count += 1
continue
if len(frames) >= n_frames:
break
rgb = frame.to_ndarray(format="rgb24")
buf = BytesIO()
Image.fromarray(rgb).save(buf, format="JPEG", quality=85)
frames.append(buf.getvalue())
total_count += 1
container.close()
if len(frames) >= n_frames:
break
cam_frames[cam] = frames
# Yield one batch per episode
batch = {
"image": [], "camera_name": [], "state": [], "action": [],
"effort": [], "episode_index": [], "frame_index": [],
"timestamp": [], "task": [],
}
for fi in range(n_frames):
row = ep_df.iloc[fi]
state = np.array(row["observation.state"], dtype=np.float32).tolist()
action = np.array(row["action"], dtype=np.float32).tolist()
effort = (
np.array(row["observation.effort"], dtype=np.float32).tolist()
if "observation.effort" in ep_df.columns
else [0.0] * 14
)
for cam in self.cameras:
if cam not in cam_frames:
continue
batch["image"].append(cam_frames[cam][fi])
batch["camera_name"].append(cam)
batch["state"].append(state)
batch["action"].append(action)
batch["effort"].append(effort)
batch["episode_index"].append(int(row["episode_index"]))
batch["frame_index"].append(int(row["frame_index"]))
batch["timestamp"].append(float(row["timestamp"]))
batch["task"].append(tasks.get(int(row.get("task_index", 0)), ""))
if batch["image"]:
yield batch
# Usage
from huggingface_hub import snapshot_download
local_dir = snapshot_download("lerobot/aloha_mobile_shrimp", repo_type="dataset")
client.ingest("aloha_full", format=LeRobotFormat(
local_dir=local_dir,
episodes=[0, 1, 2], # Ingest 3 episodes
cameras=["cam_high"], # Only overhead camera
))
Supported LeRobot Datasets¶
This pattern works with any LeRobot v3.0 dataset on HuggingFace. Popular ones:
| Dataset | Robot | Cameras | DOF | Episodes |
|---|---|---|---|---|
lerobot/aloha_mobile_shrimp |
ALOHA Mobile | 3 | 14 | 18 |
lerobot/aloha_sim_transfer_cube_human |
ALOHA Sim | 3 | 14 | 50 |
lerobot/pusht |
PushT (2D) | 1 | 2 | 206 |
lerobot/xarm_lift_medium |
xArm | 1 | 4 | 100 |
Adjust CAMERAS and DOF count per dataset. The info.json metadata file lists available cameras and state dimensions.
Performance Tips¶
- Batch by episode: Yielding one batch per episode in the format class keeps memory bounded. For datasets with very long episodes (>10k frames), split into sub-batches.
- JPEG quality:
quality=85is a good tradeoff. Lower values (70) reduce storage 2x with minimal visual impact for policy training. - Overhead camera only: For many manipulation tasks,
cam_highalone is sufficient. Ingesting 1 camera instead of 3 cuts storage and ingestion time by 3x.
What to try next¶
- Physical AI & Robotics: telemetry ingestion with visual embeddings and similarity search.
- GPU-Streaming Guide: deep dive into feeding ML models.
- Video Retrieval: search inside robot camera feeds.