Video Retrieval¶
Chunk long videos into clips, embed them with multi-vector models, and retrieve specific moments with MaxSim search.
What you'll build¶
- Split a video into time-based chunks with ffmpeg
- Upload each chunk to
/files - Store metadata + multi-vector embeddings in a table
- Retrieve clips by text query
Prerequisites¶
You also need ffmpeg in your PATH and a multi-vector encoder (e.g., ColQwen3-Video).
Code¶
Setup¶
import subprocess
import os
import requests
API_URL = "https://api.deeplake.ai"
TOKEN = "YOUR_TOKEN"
WORKSPACE = "YOUR_WORKSPACE"
TABLE = "video_chunks"
headers = {
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json",
}
auth_headers = {"Authorization": f"Bearer {TOKEN}"}
def query(sql):
res = requests.post(
f"{API_URL}/workspaces/{WORKSPACE}/tables/query",
headers=headers,
json={"query": sql},
)
return res.json()
Create the table¶
query(f"""
CREATE TABLE IF NOT EXISTS "{WORKSPACE}"."{TABLE}" (
id BIGSERIAL PRIMARY KEY,
video_id TEXT,
chunk_index INT,
start_s FLOAT4,
end_s FLOAT4,
embedding FLOAT4[][],
file_id UUID,
metadata JSONB
) USING deeplake
""")
Chunk a video with ffmpeg¶
def chunk_video(video_path, chunk_duration_s=20, output_dir="/tmp/chunks"):
os.makedirs(output_dir, exist_ok=True)
cmd = [
"ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", video_path,
]
duration = float(subprocess.check_output(cmd).strip())
chunks = []
start = 0.0
idx = 0
while start < duration:
end = min(start + chunk_duration_s, duration)
out_path = os.path.join(output_dir, f"chunk_{idx:04d}.mp4")
subprocess.run([
"ffmpeg", "-y", "-ss", str(start), "-to", str(end),
"-i", video_path, "-c", "copy", out_path,
], capture_output=True)
chunks.append({"path": out_path, "index": idx, "start": start, "end": end})
start = end
idx += 1
return chunks
Upload and ingest¶
video_path = "warehouse_cam1.mp4"
chunks = chunk_video(video_path, chunk_duration_s=20)
for chunk in chunks:
# Upload chunk to /files
with open(chunk["path"], "rb") as f:
res = requests.post(
f"{API_URL}/workspaces/{WORKSPACE}/files",
headers=auth_headers,
files={"file": (os.path.basename(chunk["path"]), f, "video/mp4")},
timeout=120,
)
file_id = res.json().get("id")
if not file_id:
continue
# Compute multi-vector embedding (replace with your encoder)
embedding = encode_video(chunk["path"]) # returns list[list[float]]
# Format as SQL literal
inner = ", ".join(
"ARRAY[" + ",".join(str(v) for v in row) + "]"
for row in embedding
)
emb_literal = f"ARRAY[{inner}]::float4[][]"
# Insert
query(f"""
INSERT INTO "{WORKSPACE}"."{TABLE}"
(video_id, chunk_index, start_s, end_s, embedding, file_id, metadata)
VALUES (
'warehouse_cam1', {chunk['index']}, {chunk['start']}, {chunk['end']},
{emb_literal}, '{file_id}'::uuid,
'{{"source": "warehouse", "camera": "cam1"}}'::jsonb
)
""")
Create indexes¶
# Vector index for similarity search
query(f"""
CREATE INDEX IF NOT EXISTS idx_video_vec
ON "{WORKSPACE}"."{TABLE}" USING deeplake_index (embedding DESC)
""")
Retrieve clips by text¶
search_text = "forklift moving pallets in aisle"
q_embedding = encode_text(search_text) # returns list[list[float]]
inner = ", ".join(
"ARRAY[" + ",".join(str(v) for v in row) + "]"
for row in q_embedding
)
emb_literal = f"ARRAY[{inner}]::float4[][]"
result = query(f"""
SELECT video_id, chunk_index, start_s, end_s, file_id,
embedding <#> {emb_literal} AS score
FROM "{WORKSPACE}"."{TABLE}"
ORDER BY score DESC
LIMIT 5
""")
for row in result.get("rows", []):
print(f"chunk {row[1]}: {row[2]:.1f}s - {row[3]:.1f}s score={row[5]:.4f}")
Download matched clips¶
for row in result.get("rows", []):
file_id = row[4]
res = requests.get(
f"{API_URL}/workspaces/{WORKSPACE}/files/{file_id}/content",
headers=auth_headers,
)
out_path = f"match_chunk_{row[1]}.mp4"
with open(out_path, "wb") as f:
f.write(res.content)
What to try next¶
- Retrieval to training — use retrieved clips as a training set
- Multimodal library — mix video with images and text
- Search fundamentals — all four search modes explained