Physical AI & Robotics¶
Physical AI agents (robots, autonomous vehicles, drones) generate massive streams of multimodal data: high-fps video, LIDAR point clouds, and sensor telemetry. Deeplake's GPU-native engine allows you to ingest this data at scale and stream it directly to training loops without bottlenecking on the CPU.
Objective¶
Demonstrate a robotics data pipeline: ingest raw sensor data + video frames, store them with high-precision timestamps, and enable direct streaming to a GPU for training.
Prerequisites¶
- Deeplake SDK:
pip install deeplake - Robotics AI stack:
pip install torch transformers pillow accelerate - A Deeplake API token.
Set credentials first
Complete Code¶
import time
import torch
from PIL import Image
from deeplake import Client
from transformers import AutoModel, AutoProcessor
# 1. Setup GPU-Native Robotics Data Lake
client = Client()
# 2. Setup Multimodal Encoder (ColQwen3 for visual state)
MODEL_ID = "TomoroAI/tomoro-colqwen3-embed-4b"
device = "cuda" if torch.cuda.is_available() else "cpu"
processor = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True)
model = AutoModel.from_pretrained(MODEL_ID, trust_remote_code=True, torch_dtype=torch.bfloat16).to(device)
def get_visual_embedding(image_path):
"""Generates a visual embedding for the given image."""
img = Image.open(image_path).convert("RGB")
inputs = processor.process_images(images=[img])
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.inference_mode():
return model(**inputs).embeddings[0].cpu().float().numpy().tolist()
# Log a batch of robot telemetry steps
image_paths = ["frame_001.png", "frame_002.png"]
joint_data = [[0.1, 0.2, 0.3], [0.15, 0.25, 0.35]]
force_data = [[1.2, 0.5], [1.3, 0.6]]
visual_embs = [get_visual_embedding(p) for p in image_paths]
client.ingest("robot_telemetry", {
"timestamp": [time.time(), time.time() + 0.1],
"camera_rgb": image_paths,
"joint_angles": joint_data,
"forces": force_data,
"visual_embedding": visual_embs,
"is_anomaly": [False, False]
})
# 3. Retrieve Similar States (for Decision Planning)
# Search for episodes where the robot was in a similar visual state
def get_similar_episodes(query_image_path):
# Generate embedding for the current visual state
# current_emb = get_visual_embedding(query_image_path)
current_emb = [0.1, 0.2, 0.3] # Placeholder
emb_pg = "{" + ",".join(str(x) for x in current_emb) + "}"
return client.query("""
SELECT timestamp, camera_rgb, joint_angles
FROM robot_telemetry
ORDER BY visual_embedding <#> $1::float4[] DESC LIMIT 3
""", (emb_pg,))
# 4. Direct Streaming to GPU (Training Loop)
from torch.utils.data import DataLoader
ds = client.open_table("robot_telemetry")
dataloader = DataLoader(ds.pytorch(), batch_size=32, num_workers=4)
for batch in dataloader:
# Tensors are loaded directly into GPU memory
# train_policy(batch["camera_rgb"], batch["joint_angles"])
print(f"Batch loaded: {len(batch['timestamp'])} steps")
break
# Requires: export DEEPLAKE_API_KEY="..." (see quickstart)
# Requires: export DEEPLAKE_ORG_ID="your-org-id"
API_URL="https://api.deeplake.ai"
TABLE="robot_telemetry"
# 1. Create the multimodal schema for a robotics data lake
curl -s -X POST "$API_URL/workspaces/$DEEPLAKE_WORKSPACE/tables/query" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $DEEPLAKE_API_KEY" \
-H "X-Activeloop-Org-Id: $DEEPLAKE_ORG_ID" \
-d '{
"query": "CREATE TABLE IF NOT EXISTS \"'$DEEPLAKE_WORKSPACE'\".\"'$TABLE'\" (id BIGSERIAL PRIMARY KEY, timestamp FLOAT8, camera_rgb_id UUID, joint_angles FLOAT4[], forces FLOAT4[], is_anomaly BOOLEAN) USING deeplake"
}'
# 2. Insert robot sensor metadata
curl -s -X POST "$API_URL/workspaces/$DEEPLAKE_WORKSPACE/tables/query" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $DEEPLAKE_API_KEY" \
-H "X-Activeloop-Org-Id: $DEEPLAKE_ORG_ID" \
-d '{
"query": "INSERT INTO \"'$DEEPLAKE_WORKSPACE'\".\"'$TABLE'\" (timestamp, camera_rgb_id, joint_angles, forces, is_anomaly) VALUES (1700000001.0, $1::uuid, $2::float4[], $3::float4[], false)",
"params": ["550e8400-e29b-41d4-a716-446655440000", "{0.1,0.2,0.3}", "{1.2,0.5}"]
}'
# 4. Query recent anomalies
curl -s -X POST "$API_URL/workspaces/$DEEPLAKE_WORKSPACE/tables/query" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $DEEPLAKE_API_KEY" \
-H "X-Activeloop-Org-Id: $DEEPLAKE_ORG_ID" \
-d '{
"query": "SELECT timestamp, joint_angles, forces FROM \"'$DEEPLAKE_WORKSPACE'\".\"'$TABLE'\" WHERE is_anomaly = true ORDER BY timestamp DESC LIMIT 10"
}'
Step-by-Step Breakdown¶
1. GPU-Native Data Flow¶
Traditionally, robotics data is stored in slow file systems and moved to GPUs during training, creating a massive bottleneck. Deeplake stores data in a format that GPUs can read directly, enabling the high-throughput training required for modern foundation models in robotics.
2. Temporal Precision¶
By using FLOAT8 for timestamps and indexed metadata columns, you can perform range queries (e.g., WHERE timestamp BETWEEN X and Y) to retrieve specific "episodes" of robot movement for fine-tuning.
3. Native Multimodal Support¶
Deeplake managed tables can store raw binary data (images/video) alongside structured sensor arrays (FLOAT4[]). This ensures that your video frames are perfectly synchronized with your telemetry data, a requirement for imitation learning.
Performance Tips¶
- Normalization Workers: When ingesting thousands of sensor files, use
normalization_workers=8inclient.ingest()to parallelize file processing. - Buffering: Use
commit_every=5000for high-frequency streams to balance between peak performance and crash recovery.
What to try next¶
- GPU-Streaming Guide: deep dive into feeding ML models.
- Massive Ingestion: how to handle petabyte-scale robotics data.
- Video Retrieval: search inside robot camera feeds.