Skip to content

Physical AI & Robotics

Physical AI agents (robots, autonomous vehicles, drones) generate massive streams of multimodal data: high-fps video, LIDAR point clouds, and sensor telemetry. Deeplake's GPU-native engine allows you to ingest this data at scale and stream it directly to training loops without bottlenecking on the CPU.

Objective

Demonstrate a robotics data pipeline: ingest raw sensor data + video frames, store them with high-precision timestamps, and enable direct streaming to a GPU for training.

Prerequisites

  • Deeplake SDK: pip install deeplake
  • Robotics AI stack: pip install torch transformers pillow accelerate
  • A Deeplake API token.

Set credentials first

export DEEPLAKE_API_KEY="your-token-here"
export DEEPLAKE_WORKSPACE="your-workspace"  # optional, defaults to "default"

Complete Code

import time
import torch
from PIL import Image
from deeplake import Client
from transformers import AutoModel, AutoProcessor

# 1. Setup GPU-Native Robotics Data Lake
client = Client()

# 2. Setup Multimodal Encoder (ColQwen3 for visual state)
MODEL_ID = "TomoroAI/tomoro-colqwen3-embed-4b"
device = "cuda" if torch.cuda.is_available() else "cpu"
processor = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True)
model = AutoModel.from_pretrained(MODEL_ID, trust_remote_code=True, torch_dtype=torch.bfloat16).to(device)

def get_visual_embedding(image_path):
    """Generates a visual embedding for the given image."""
    img = Image.open(image_path).convert("RGB")
    inputs = processor.process_images(images=[img])
    inputs = {k: v.to(device) for k, v in inputs.items()}
    with torch.inference_mode():
        return model(**inputs).embeddings[0].cpu().float().numpy().tolist()

# Log a batch of robot telemetry steps
image_paths = ["frame_001.png", "frame_002.png"]
joint_data = [[0.1, 0.2, 0.3], [0.15, 0.25, 0.35]]
force_data = [[1.2, 0.5], [1.3, 0.6]]

visual_embs = [get_visual_embedding(p) for p in image_paths]

client.ingest("robot_telemetry", {
    "timestamp": [time.time(), time.time() + 0.1],
    "camera_rgb": image_paths,
    "joint_angles": joint_data,
    "forces": force_data,
    "visual_embedding": visual_embs,
    "is_anomaly": [False, False]
})

# 3. Retrieve Similar States (for Decision Planning)
# Search for episodes where the robot was in a similar visual state
def get_similar_episodes(query_image_path):
    # Generate embedding for the current visual state
    # current_emb = get_visual_embedding(query_image_path)
    current_emb = [0.1, 0.2, 0.3] # Placeholder

    emb_pg = "{" + ",".join(str(x) for x in current_emb) + "}"
    return client.query("""
        SELECT timestamp, camera_rgb, joint_angles
        FROM robot_telemetry
        ORDER BY visual_embedding <#> $1::float4[] DESC LIMIT 3
    """, (emb_pg,))

# 4. Direct Streaming to GPU (Training Loop)
from torch.utils.data import DataLoader

ds = client.open_table("robot_telemetry")
dataloader = DataLoader(ds.pytorch(), batch_size=32, num_workers=4)

for batch in dataloader:
    # Tensors are loaded directly into GPU memory
    # train_policy(batch["camera_rgb"], batch["joint_angles"])
    print(f"Batch loaded: {len(batch['timestamp'])} steps")
    break
# Requires: export DEEPLAKE_API_KEY="..." (see quickstart)
# Requires: export DEEPLAKE_ORG_ID="your-org-id"
API_URL="https://api.deeplake.ai"
TABLE="robot_telemetry"

# 1. Create the multimodal schema for a robotics data lake
curl -s -X POST "$API_URL/workspaces/$DEEPLAKE_WORKSPACE/tables/query" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $DEEPLAKE_API_KEY" \
  -H "X-Activeloop-Org-Id: $DEEPLAKE_ORG_ID" \
  -d '{
    "query": "CREATE TABLE IF NOT EXISTS \"'$DEEPLAKE_WORKSPACE'\".\"'$TABLE'\" (id BIGSERIAL PRIMARY KEY, timestamp FLOAT8, camera_rgb_id UUID, joint_angles FLOAT4[], forces FLOAT4[], is_anomaly BOOLEAN) USING deeplake"
  }'

# 2. Insert robot sensor metadata
curl -s -X POST "$API_URL/workspaces/$DEEPLAKE_WORKSPACE/tables/query" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $DEEPLAKE_API_KEY" \
  -H "X-Activeloop-Org-Id: $DEEPLAKE_ORG_ID" \
  -d '{
    "query": "INSERT INTO \"'$DEEPLAKE_WORKSPACE'\".\"'$TABLE'\" (timestamp, camera_rgb_id, joint_angles, forces, is_anomaly) VALUES (1700000001.0, $1::uuid, $2::float4[], $3::float4[], false)",
    "params": ["550e8400-e29b-41d4-a716-446655440000", "{0.1,0.2,0.3}", "{1.2,0.5}"]
  }'

# 4. Query recent anomalies
curl -s -X POST "$API_URL/workspaces/$DEEPLAKE_WORKSPACE/tables/query" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $DEEPLAKE_API_KEY" \
  -H "X-Activeloop-Org-Id: $DEEPLAKE_ORG_ID" \
  -d '{
    "query": "SELECT timestamp, joint_angles, forces FROM \"'$DEEPLAKE_WORKSPACE'\".\"'$TABLE'\" WHERE is_anomaly = true ORDER BY timestamp DESC LIMIT 10"
  }'

Step-by-Step Breakdown

1. GPU-Native Data Flow

Traditionally, robotics data is stored in slow file systems and moved to GPUs during training, creating a massive bottleneck. Deeplake stores data in a format that GPUs can read directly, enabling the high-throughput training required for modern foundation models in robotics.

2. Temporal Precision

By using FLOAT8 for timestamps and indexed metadata columns, you can perform range queries (e.g., WHERE timestamp BETWEEN X and Y) to retrieve specific "episodes" of robot movement for fine-tuning.

3. Native Multimodal Support

Deeplake managed tables can store raw binary data (images/video) alongside structured sensor arrays (FLOAT4[]). This ensures that your video frames are perfectly synchronized with your telemetry data, a requirement for imitation learning.

Performance Tips

  • Normalization Workers: When ingesting thousands of sensor files, use normalization_workers=8 in client.ingest() to parallelize file processing.
  • Buffering: Use commit_every=5000 for high-frequency streams to balance between peak performance and crash recovery.

What to try next