Skip to content

Video Retrieval

Chunk long videos into clips, embed them with multi-vector models, and retrieve specific moments with MaxSim search.

What you'll build

  • Split a video into time-based chunks with ffmpeg
  • Upload each chunk to /files
  • Store metadata + multi-vector embeddings in a table
  • Retrieve clips by text query

Prerequisites

pip install requests

You also need ffmpeg in your PATH and a multi-vector encoder (e.g., ColQwen3-Video).

Code

Setup

import subprocess
import os
import requests

API_URL = "https://api.deeplake.ai"
TOKEN = "YOUR_TOKEN"
WORKSPACE = "YOUR_WORKSPACE"
TABLE = "video_chunks"

headers = {
    "Authorization": f"Bearer {TOKEN}",
    "Content-Type": "application/json",
}
auth_headers = {"Authorization": f"Bearer {TOKEN}"}

def query(sql):
    res = requests.post(
        f"{API_URL}/workspaces/{WORKSPACE}/tables/query",
        headers=headers,
        json={"query": sql},
    )
    return res.json()

Create the table

query(f"""
    CREATE TABLE IF NOT EXISTS "{WORKSPACE}"."{TABLE}" (
        id BIGSERIAL PRIMARY KEY,
        video_id TEXT,
        chunk_index INT,
        start_s FLOAT4,
        end_s FLOAT4,
        embedding FLOAT4[][],
        file_id UUID,
        metadata JSONB
    ) USING deeplake
""")

Chunk a video with ffmpeg

def chunk_video(video_path, chunk_duration_s=20, output_dir="/tmp/chunks"):
    os.makedirs(output_dir, exist_ok=True)
    cmd = [
        "ffprobe", "-v", "error", "-show_entries", "format=duration",
        "-of", "default=noprint_wrappers=1:nokey=1", video_path,
    ]
    duration = float(subprocess.check_output(cmd).strip())

    chunks = []
    start = 0.0
    idx = 0
    while start < duration:
        end = min(start + chunk_duration_s, duration)
        out_path = os.path.join(output_dir, f"chunk_{idx:04d}.mp4")
        subprocess.run([
            "ffmpeg", "-y", "-ss", str(start), "-to", str(end),
            "-i", video_path, "-c", "copy", out_path,
        ], capture_output=True)
        chunks.append({"path": out_path, "index": idx, "start": start, "end": end})
        start = end
        idx += 1

    return chunks

Upload and ingest

video_path = "warehouse_cam1.mp4"
chunks = chunk_video(video_path, chunk_duration_s=20)

for chunk in chunks:
    # Upload chunk to /files
    with open(chunk["path"], "rb") as f:
        res = requests.post(
            f"{API_URL}/workspaces/{WORKSPACE}/files",
            headers=auth_headers,
            files={"file": (os.path.basename(chunk["path"]), f, "video/mp4")},
            timeout=120,
        )
    file_id = res.json().get("id")
    if not file_id:
        continue

    # Compute multi-vector embedding (replace with your encoder)
    embedding = encode_video(chunk["path"])  # returns list[list[float]]

    # Format as SQL literal
    inner = ", ".join(
        "ARRAY[" + ",".join(str(v) for v in row) + "]"
        for row in embedding
    )
    emb_literal = f"ARRAY[{inner}]::float4[][]"

    # Insert
    query(f"""
        INSERT INTO "{WORKSPACE}"."{TABLE}"
        (video_id, chunk_index, start_s, end_s, embedding, file_id, metadata)
        VALUES (
            'warehouse_cam1', {chunk['index']}, {chunk['start']}, {chunk['end']},
            {emb_literal}, '{file_id}'::uuid,
            '{{"source": "warehouse", "camera": "cam1"}}'::jsonb
        )
    """)

Create indexes

# Vector index for similarity search
query(f"""
    CREATE INDEX IF NOT EXISTS idx_video_vec
    ON "{WORKSPACE}"."{TABLE}" USING deeplake_index (embedding DESC)
""")

Retrieve clips by text

search_text = "forklift moving pallets in aisle"
q_embedding = encode_text(search_text)  # returns list[list[float]]

inner = ", ".join(
    "ARRAY[" + ",".join(str(v) for v in row) + "]"
    for row in q_embedding
)
emb_literal = f"ARRAY[{inner}]::float4[][]"

result = query(f"""
    SELECT video_id, chunk_index, start_s, end_s, file_id,
           embedding <#> {emb_literal} AS score
    FROM "{WORKSPACE}"."{TABLE}"
    ORDER BY score DESC
    LIMIT 5
""")

for row in result.get("rows", []):
    print(f"chunk {row[1]}: {row[2]:.1f}s - {row[3]:.1f}s  score={row[5]:.4f}")

Download matched clips

for row in result.get("rows", []):
    file_id = row[4]
    res = requests.get(
        f"{API_URL}/workspaces/{WORKSPACE}/files/{file_id}/content",
        headers=auth_headers,
    )
    out_path = f"match_chunk_{row[1]}.mp4"
    with open(out_path, "wb") as f:
        f.write(res.content)

What to try next