Skip to content

LeRobot

LeRobot is HuggingFace's open-source robotics framework with 100+ datasets covering manipulation, locomotion, and navigation. Datasets use the LeRobot v3.0 format: parquet files for sensor telemetry + MP4 videos for camera streams.

Deeplake turns these datasets into queryable, GPU-streamable tables, so you can filter by episode, search by sensor state, and feed directly into PyTorch training loops without downloading everything locally.

Objective

Ingest a LeRobot manipulation dataset (ALOHA mobile shrimp) into Deeplake using the managed SDK, then query and stream it for training.

Prerequisites

  • Deeplake SDK: pip install deeplake
  • LeRobot dependencies: pip install huggingface-hub av pillow pandas pyarrow
  • A Deeplake API token.

Set credentials first

export DEEPLAKE_API_KEY="your-token-here"
export DEEPLAKE_WORKSPACE="your-workspace"  # optional, defaults to "default"

Complete Code

import os
import json
import time
import numpy as np
import pandas as pd
from glob import glob
from io import BytesIO
from PIL import Image
from deeplake import Client

# ── 1. Setup ──────────────────────────────────────────────────
client = Client()

# ── 2. Download LeRobot dataset from HuggingFace ────────────
from huggingface_hub import snapshot_download

REPO_ID = "lerobot/aloha_mobile_shrimp"
local_dir = snapshot_download(REPO_ID, repo_type="dataset")

# Read dataset metadata
with open(os.path.join(local_dir, "meta", "info.json")) as f:
    info = json.load(f)

# Read task descriptions (format varies: .jsonl, .json, or .parquet)
tasks = {}
for tasks_file in ["tasks.jsonl", "tasks.json"]:
    tasks_path = os.path.join(local_dir, "meta", tasks_file)
    if not os.path.exists(tasks_path):
        continue
    with open(tasks_path) as f:
        if tasks_file.endswith(".jsonl"):
            for line in f:
                t = json.loads(line)
                tasks[t["task_index"]] = t["task"]
        else:
            for t in json.load(f):
                tasks[t["task_index"]] = t["task"]
    break

# Fallback: tasks.parquet (LeRobot v3.0 newer datasets)
if not tasks:
    tasks_pq = os.path.join(local_dir, "meta", "tasks.parquet")
    if os.path.exists(tasks_pq):
        tdf = pd.read_parquet(tasks_pq).reset_index()
        for _, row in tdf.iterrows():
            tasks[int(row["task_index"])] = str(row.iloc[0])

# ── 3. Parse parquet telemetry ──────────────────────────────
pq_files = sorted(glob(os.path.join(local_dir, "data", "**", "*.parquet"), recursive=True))
df = pd.concat([pd.read_parquet(f) for f in pq_files], ignore_index=True)
df = df.sort_values(["episode_index", "frame_index"]).reset_index(drop=True)

EPISODE = 0
CAMERAS = ["cam_high", "cam_left_wrist", "cam_right_wrist"]

ep_df = df[df["episode_index"] == EPISODE].reset_index(drop=True)
n_frames = len(ep_df)
frame_offset = int(df[df["episode_index"] < EPISODE].shape[0])

print(f"Episode {EPISODE}: {n_frames} frames, {len(CAMERAS)} cameras")
print(f"FPS: {info['fps']}, DOF: 14 (dual ALOHA arms)")

# ── 4. Decode video frames ──────────────────────────────────
import av

cam_frames = {}
for cam in CAMERAS:
    cam_key = f"observation.images.{cam}"
    mp4s = sorted(glob(os.path.join(local_dir, "videos", cam_key, "**", "*.mp4"), recursive=True))

    frames = []
    total_count = 0
    for mp4_path in mp4s:
        container = av.open(mp4_path)
        for frame in container.decode(container.streams.video[0]):
            if total_count < frame_offset:
                total_count += 1
                continue
            if len(frames) >= n_frames:
                break
            rgb = frame.to_ndarray(format="rgb24")
            buf = BytesIO()
            Image.fromarray(rgb).save(buf, format="JPEG", quality=85)
            frames.append(buf.getvalue())
            total_count += 1
        container.close()
        if len(frames) >= n_frames:
            break
    cam_frames[cam] = frames

# ── 5. Ingest into Deeplake ────────────────────────────────
# Build columnar data: 1 row per camera per frame
data = {
    "image": [],
    "camera_name": [],
    "state": [],
    "action": [],
    "effort": [],
    "episode_index": [],
    "frame_index": [],
    "timestamp": [],
    "task": [],
}

for fi in range(n_frames):
    row = ep_df.iloc[fi]
    state = np.array(row["observation.state"], dtype=np.float32).tolist()
    action = np.array(row["action"], dtype=np.float32).tolist()
    effort = (
        np.array(row["observation.effort"], dtype=np.float32).tolist()
        if "observation.effort" in ep_df.columns
        else [0.0] * 14
    )
    task_str = tasks.get(int(row.get("task_index", 0)), "")

    for cam in CAMERAS:
        data["image"].append(cam_frames[cam][fi])
        data["camera_name"].append(cam)
        data["state"].append(state)
        data["action"].append(action)
        data["effort"].append(effort)
        data["episode_index"].append(int(row["episode_index"]))
        data["frame_index"].append(int(row["frame_index"]))
        data["timestamp"].append(float(row["timestamp"]))
        data["task"].append(task_str)

# Format class ensures IMAGE domain type in PostgreSQL
class LeRobotFormat:
    def __init__(self, data):
        self._data = data

    def schema(self):
        return {"image": "IMAGE"}

    def pg_schema(self):
        return {"image": "IMAGE"}

    def normalize(self):
        yield self._data

# Drop existing table if re-running
client.drop_table("aloha_shrimp")

result = client.ingest("aloha_shrimp", format=LeRobotFormat(data))
print(f"Ingested {result['row_count']} rows")

# ── 6. Query the data ───────────────────────────────────────
# Count frames per camera
stats = client.query("""
    SELECT camera_name, COUNT(*) as frame_count
    FROM aloha_shrimp
    GROUP BY camera_name
""")
for s in stats:
    print(f"  {s['camera_name']}: {s['frame_count']} frames")

# Find frames where the robot is moving (action magnitude > threshold)
moving = client.query("""
    SELECT frame_index, camera_name, action
    FROM aloha_shrimp
    WHERE camera_name = 'cam_high'
    ORDER BY frame_index
    LIMIT 10
""")

# ── 7. Stream to PyTorch training loop ──────────────────────
import torch
from torch.utils.data import DataLoader

ds = client.open_table("aloha_shrimp")

# Option A: PyTorch DataLoader (shuffling, multi-worker)
loader = DataLoader(ds.pytorch(), batch_size=32, num_workers=4)
for batch in loader:
    states = batch["state"]       # [batch, 14] tensor
    actions = batch["action"]     # [batch, 14] tensor
    print(f"DataLoader batch: {states.shape}")
    break

# Option B: Native prefetching dataloader (async I/O, no torch dependency)
for batch in ds.batches(batch_size=32):
    states = torch.from_numpy(batch["state"])    # [batch, 14]
    actions = torch.from_numpy(batch["action"])  # [batch, 14]
    images = batch["image"]                      # list of JPEG bytes
    # train_policy(images, states, actions)
    print(f"Prefetch batch: {states.shape}")
    break
# Requires: export DEEPLAKE_API_KEY="..." (see quickstart)
# Requires: export DEEPLAKE_ORG_ID="your-org-id"
API_URL="https://api.deeplake.ai"
TABLE="aloha_shrimp"

# 1. Create the robotics schema
curl -s -X POST "$API_URL/workspaces/$DEEPLAKE_WORKSPACE/tables/query" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $DEEPLAKE_API_KEY" \
  -H "X-Activeloop-Org-Id: $DEEPLAKE_ORG_ID" \
  -d '{
    "query": "CREATE TABLE IF NOT EXISTS \"'$DEEPLAKE_WORKSPACE'\".\"'$TABLE'\" (id BIGSERIAL PRIMARY KEY, camera_name TEXT, state FLOAT4[], action FLOAT4[], effort FLOAT4[], episode_index INT4, frame_index INT4, timestamp FLOAT4, task TEXT) USING deeplake"
  }'

# 2. Insert a telemetry row
curl -s -X POST "$API_URL/workspaces/$DEEPLAKE_WORKSPACE/tables/query" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $DEEPLAKE_API_KEY" \
  -H "X-Activeloop-Org-Id: $DEEPLAKE_ORG_ID" \
  -d '{
    "query": "INSERT INTO \"'$DEEPLAKE_WORKSPACE'\".\"'$TABLE'\" (camera_name, state, action, effort, episode_index, frame_index, timestamp, task) VALUES ($1, $2::float4[], $3::float4[], $4::float4[], 0, 0, 0.0, $5)",
    "params": ["cam_high", "{0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1.0,1.1,1.2,1.3,1.4}", "{0.01,0.02,0.03,0.04,0.05,0.06,0.07,0.08,0.09,0.1,0.11,0.12,0.13,0.14}", "{0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0}", "Pick up the shrimp"]
  }'

# 3. Query frames by camera
curl -s -X POST "$API_URL/workspaces/$DEEPLAKE_WORKSPACE/tables/query" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $DEEPLAKE_API_KEY" \
  -H "X-Activeloop-Org-Id: $DEEPLAKE_ORG_ID" \
  -d '{
    "query": "SELECT camera_name, COUNT(*) as cnt FROM \"'$DEEPLAKE_WORKSPACE'\".\"'$TABLE'\" GROUP BY camera_name"
  }'

Step-by-Step Breakdown

1. LeRobot Dataset Format

LeRobot v3.0 datasets on HuggingFace use a three-part structure:

Component Format Contents
Telemetry Parquet shards Joint state, action, effort, timestamps, episode/frame indices
Video MP4 per camera Synchronized camera streams (one file per camera view)
Metadata JSON FPS, episode counts, task descriptions, sensor statistics

The ALOHA dataset has 14 degrees of freedom (7 per arm): waist, shoulder, elbow, forearm roll, wrist angle, wrist rotate, and gripper.

2. One Row Per Camera Per Frame

The ingestion schema stores one row per camera per frame. For an episode with 3,750 frames and 3 cameras, that's 11,250 rows. This flat layout makes it easy to:

  • Filter by camera: WHERE camera_name = 'cam_high'
  • Filter by episode: WHERE episode_index = 0
  • Join sensor data with visual data in a single query

3. Schema Design

Column Type Description
image IMAGE JPEG-encoded camera frame (480x640)
camera_name TEXT cam_high, cam_left_wrist, or cam_right_wrist
state EMBEDDING 14-DOF joint positions (normalized)
action EMBEDDING 14-DOF robot commands
effort EMBEDDING 14-DOF joint torques
episode_index INT64 Episode number
frame_index INT64 Frame number within episode
timestamp FLOAT64 Seconds since episode start
task TEXT Task description (e.g., "Pick up the shrimp")

State and action as embeddings

Storing state and action as float arrays lets you use Deeplake's vector similarity search (<#>) to find episodes where the robot was in a similar configuration. This is useful for retrieval-augmented policy learning.

4. GPU Streaming

client.open_table() returns a deeplake.Dataset that streams from the cloud without downloading the full dataset. Two loading options:

Method Best for
DataLoader(ds.pytorch(), ...) Training loops that need shuffling and multi-worker parallelism
ds.batches(batch_size) Sequential iteration with native async prefetching (no torch dependency)

Multi-Episode Ingestion

To ingest all 18 episodes of the ALOHA shrimp dataset:

for episode in range(18):
    ep_df = df[df["episode_index"] == episode].reset_index(drop=True)
    # ... decode frames and build data dict as above ...

    fmt = LeRobotFormat(data)  # reuse format class from above

    if episode == 0:
        client.drop_table("aloha_shrimp_full")
    client.ingest("aloha_shrimp_full", format=fmt)

Custom Format Class

For repeated ingestion of LeRobot datasets, create a reusable format class:

class LeRobotFormat:
    """Format class for LeRobot v3.0 datasets (parquet + MP4)."""

    def __init__(self, local_dir, episodes=None, cameras=None):
        self.local_dir = local_dir
        self.episodes = episodes  # None = all episodes
        self.cameras = cameras or ["cam_high", "cam_left_wrist", "cam_right_wrist"]

    def schema(self):
        return {"image": "IMAGE"}

    def pg_schema(self):
        return {"image": "IMAGE"}

    def normalize(self):
        import av

        # Load metadata
        with open(os.path.join(self.local_dir, "meta", "info.json")) as f:
            info = json.load(f)

        tasks = {}
        for name in ["tasks.jsonl", "tasks.json"]:
            p = os.path.join(self.local_dir, "meta", name)
            if os.path.exists(p):
                with open(p) as f:
                    if name.endswith(".jsonl"):
                        for line in f:
                            t = json.loads(line)
                            tasks[t["task_index"]] = t["task"]
                    else:
                        for t in json.load(f):
                            tasks[t["task_index"]] = t["task"]
                break

        # Fallback: tasks.parquet (LeRobot v3.0 newer datasets)
        if not tasks:
            tasks_pq = os.path.join(self.local_dir, "meta", "tasks.parquet")
            if os.path.exists(tasks_pq):
                tdf = pd.read_parquet(tasks_pq).reset_index()
                for _, row in tdf.iterrows():
                    tasks[int(row["task_index"])] = str(row.iloc[0])

        # Load all parquet shards
        pq_files = sorted(glob(os.path.join(self.local_dir, "data", "**", "*.parquet"), recursive=True))
        df = pd.concat([pd.read_parquet(f) for f in pq_files], ignore_index=True)
        df = df.sort_values(["episode_index", "frame_index"]).reset_index(drop=True)

        all_episodes = sorted(df["episode_index"].unique())
        if self.episodes is not None:
            all_episodes = [e for e in all_episodes if e in self.episodes]

        for episode in all_episodes:
            ep_df = df[df["episode_index"] == episode].reset_index(drop=True)
            n_frames = len(ep_df)
            frame_offset = int(df[df["episode_index"] < episode].shape[0])

            # Decode video frames for this episode
            cam_frames = {}
            for cam in self.cameras:
                cam_key = f"observation.images.{cam}"
                mp4s = sorted(glob(
                    os.path.join(self.local_dir, "videos", cam_key, "**", "*.mp4"),
                    recursive=True,
                ))
                if not mp4s:
                    continue
                frames = []
                total_count = 0
                for mp4_path in mp4s:
                    container = av.open(mp4_path)
                    for frame in container.decode(container.streams.video[0]):
                        if total_count < frame_offset:
                            total_count += 1
                            continue
                        if len(frames) >= n_frames:
                            break
                        rgb = frame.to_ndarray(format="rgb24")
                        buf = BytesIO()
                        Image.fromarray(rgb).save(buf, format="JPEG", quality=85)
                        frames.append(buf.getvalue())
                        total_count += 1
                    container.close()
                    if len(frames) >= n_frames:
                        break
                cam_frames[cam] = frames

            # Yield one batch per episode
            batch = {
                "image": [], "camera_name": [], "state": [], "action": [],
                "effort": [], "episode_index": [], "frame_index": [],
                "timestamp": [], "task": [],
            }

            for fi in range(n_frames):
                row = ep_df.iloc[fi]
                state = np.array(row["observation.state"], dtype=np.float32).tolist()
                action = np.array(row["action"], dtype=np.float32).tolist()
                effort = (
                    np.array(row["observation.effort"], dtype=np.float32).tolist()
                    if "observation.effort" in ep_df.columns
                    else [0.0] * 14
                )

                for cam in self.cameras:
                    if cam not in cam_frames:
                        continue
                    batch["image"].append(cam_frames[cam][fi])
                    batch["camera_name"].append(cam)
                    batch["state"].append(state)
                    batch["action"].append(action)
                    batch["effort"].append(effort)
                    batch["episode_index"].append(int(row["episode_index"]))
                    batch["frame_index"].append(int(row["frame_index"]))
                    batch["timestamp"].append(float(row["timestamp"]))
                    batch["task"].append(tasks.get(int(row.get("task_index", 0)), ""))

            if batch["image"]:
                yield batch


# Usage
from huggingface_hub import snapshot_download

local_dir = snapshot_download("lerobot/aloha_mobile_shrimp", repo_type="dataset")

client.ingest("aloha_full", format=LeRobotFormat(
    local_dir=local_dir,
    episodes=[0, 1, 2],          # Ingest 3 episodes
    cameras=["cam_high"],         # Only overhead camera
))

Supported LeRobot Datasets

This pattern works with any LeRobot v3.0 dataset on HuggingFace. Popular ones:

Dataset Robot Cameras DOF Episodes
lerobot/aloha_mobile_shrimp ALOHA Mobile 3 14 18
lerobot/aloha_sim_transfer_cube_human ALOHA Sim 3 14 50
lerobot/pusht PushT (2D) 1 2 206
lerobot/xarm_lift_medium xArm 1 4 100

Adjust CAMERAS and DOF count per dataset. The info.json metadata file lists available cameras and state dimensions.

Performance Tips

  • Batch by episode: Yielding one batch per episode in the format class keeps memory bounded. For datasets with very long episodes (>10k frames), split into sub-batches.
  • JPEG quality: quality=85 is a good tradeoff. Lower values (70) reduce storage 2x with minimal visual impact for policy training.
  • Overhead camera only: For many manipulation tasks, cam_high alone is sufficient. Ingesting 1 camera instead of 3 cuts storage and ingestion time by 3x.

What to try next