Skip to content

Building Multimodal Agent Workflows With Deep Lake PG

Modern agent systems hit a “data disconnect”: metadata lives in Postgres, embeddings live in a vector database, and files live somewhere else. Security and governance fragment across systems, and pipelines become brittle.

Deep Lake PG unifies that stack: Postgres tables for low-latency structured state plus lake-scale tensor/file storage. Everything remains SQL-addressable from a single control plane.

This guide is a production-minded walkthrough for multimodal ingestion and retrieval:

  • Upload files via /files (images/video) and visualize them in the web app frontend.
  • Store structured metadata (and optional embeddings) in Postgres tables.
  • Build Deeplake indexes for fast retrieval.

ColQwen is used as an example embedding model in some sections, but it is not a requirement. Any encoder that produces compatible embeddings works.

What You Will Build

  • A COCO table with image metadata, bounding boxes, and multi-vector embeddings.
  • A files store containing the raw image bytes, referenced by UUID.
  • A Deeplake index on a FLOAT4[][] column to enable fast multi-vector search.
  • A search query that ranks results by embedding similarity using <#>.

Use Cases

Use cases are not separate products. They are different workflows on the same primitives.

  • A useful way to think about it: store bytes in /files, store structure + vectors in tables, index what you query, and use SQL to turn retrieval into training.

  • Agentic Loops: Memory that keeps up with your agents. Hybrid BM25 + vector search across millions of conversations. Sub-8ms retrieval. Deterministic replay for debugging. Direct training streams with no ETL in between. Fewer moving parts. Faster iteration.

  • Physical AI: One database for the entire robotics stack. Video, LiDAR, IMU, tensors, embeddings in one place. Temporal and tensor-aware primitives built in. Query training data the same way logs are queried, without stitching together multiple systems.
  • Generative Media: Your library, indexed for creation. Unified multimodal storage across images, video, audio, and text. Semantic search that understands content. Find the right asset in milliseconds. “Shot like this” becomes a query.
  • Deep Analytics: Real-time analytics on data streams. Monitor performance, track usage, and optimize workflows.

Core Concepts

Multimodal Data (Text + Images + Video + Audio)

Real systems combine structured rows (identifiers, timestamps, metadata) with unstructured or high-dimensional content (files, tensors, embeddings).

Deep Lake PG keeps files and tables connected by stable identifiers, keeping governance and query semantics consistent.

Vector retrieval is useful when the query and the data do not share keywords. It matches meaning, not spelling.

Examples:

  • “same bug, different error message”
  • “scenes like this clip”
  • “documents that imply this requirement”

BM25 (Keyword Precision)

BM25 is useful when exact words matter. It is great for identifiers and phrases.

Examples:

  • stack traces and exception names
  • function names and endpoints
  • ticket IDs, file names, and product codes

Hybrid Search (BM25 + Vector)

Hybrid search is useful when you want reliability. It reduces “semantic drift”. It also reduces “keyword brittleness”.

In practice, hybrid is the best default. Use pure BM25 when you know the exact phrase. Use pure vector when the query is conceptual.

MaxSim / Late Interaction (Multi-Vector Embeddings)

Single-vector search compresses an item into one point. That is fast, but it can miss fine details.

MaxSim-style retrieval uses a “bag of embeddings”. It compares query tokens to the best-matching parts of the item. This is useful for structured visuals and long content.

Examples:

  • a specific action inside a video clip
  • a small object inside an image
  • a key sentence inside a long trace or document

Turning Retrieval Into Training

The highest leverage workflow is: query → top‑k → train. You start with a natural-language intent. You retrieve only the relevant examples. You train on a small, high-signal slice.

This shortens iteration loops. It also makes training sets reproducible.

Install Dependencies

# Core Python deps used in the code examples below
pip install \
  pillow requests python-dotenv \
  pydantic pydantic-ai \
  deeplake "psycopg[binary]" psycopg-pool \
  tqdm huggingface_hub datasets \
  torch==2.8.0 torchvision==0.23.0 transformers==4.57.3 accelerate 

Video ingestion also requires an ffmpeg binary in your PATH. If you don’t have it, install it via your OS package manager.

Deep Lake PG In One Mental Model (Files, Tables, Indexes)

You will interact with three primitives.

  • Files: Binary blobs (images, PDFs, audio). You upload them once. You get a UUID.
  • Tables: Postgres tables with standard SQL columns and Deep Lake tensor columns.
  • Indexes: Deeplake indexes that speed up similarity search on tensor columns.

This split is deliberate. It keeps tables fast and queryable. It keeps files cheap and scalable.

Setup

Execution Model

  • Tables: created and queried via Postgres (Deep Lake PG / DLPG) using deeplake.db.
  • Files: uploaded via the HTTP /files API and referenced from tables using a file_id UUID.
  • Frontend Visualization: /files objects can be rendered in the web app frontend using file_id.

Environment Variables

Create a .env file:

BASE_URL=https://api-beta.deeplake.ai

# Org-scoped API token (used for /files upload/download).
# For the World Intelligence org, generate it here:
# https://beta.deeplake.ai/worldintelligence/apitoken
TEST_PG_MANAGED_TOKEN=...

# Workspace identifiers
ACTIVELOOP_ORG_ID=...
ACTIVELOOP_WORKSPACE_ID=...

# Direct Postgres / DLPG connection settings (tables, inserts, queries)
SHARED_DLPG_HOST=...
SHARED_DLPG_PORT=5432
SHARED_DLPG_USER=postgres
SHARED_DLPG_PASSWORD=...

# Optional toggles used by the examples
ENABLE_EMBEDDINGS=false
VIDEO_CHUNK_DURATION_S=30
INSERT_BATCH_SIZE=1
SKIP_EMBEDDINGS=false

A Minimal File Upload (And Frontend Preview)

This is the smallest end-to-end primitive: upload a PNG to /files, store the returned UUID, and render it in the frontend using that file_id.

import io
import os
import requests
from PIL import Image
from dotenv import load_dotenv

load_dotenv()

BASE_URL = os.getenv("BASE_URL", "https://api-beta.deeplake.ai")
TOKEN = os.getenv("TEST_PG_MANAGED_TOKEN")
ORG_ID = os.getenv("ACTIVELOOP_ORG_ID")
WORKSPACE_ID = os.getenv("ACTIVELOOP_WORKSPACE_ID")

img = Image.new("RGB", (256, 256), color=(40, 80, 200))
buf = io.BytesIO()
img.save(buf, format="PNG")

res = requests.post(
    f"{BASE_URL}/workspaces/{WORKSPACE_ID}/files",
    headers={"Authorization": f"Bearer {TOKEN}", "X-Activeloop-Org-Id": ORG_ID},
    files={"file": ("example.png", buf.getvalue(), "image/png")},
    timeout=60,
)
file_id = res.json().get("id")
print("file_id:", file_id)

Use Case: COCO Image Ingestion And Multi-Vector Retrieval (Postgres + /files)

This section is runnable as-is and mirrors the ingestion shape in A_deeplake_api_coco_v2_PG.py:

  • Stream COCO from Hugging Face.
  • Upload images to /files.
  • Insert rows into a Postgres table (optionally with embeddings).
  • Build a Deeplake index and run a multi-vector <#> query.
import io
import json
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Optional

import requests
from dotenv import load_dotenv
from PIL import Image

from deeplake.db import connect, ConnectionManager
from datasets import load_dataset

load_dotenv()

ORG_ID = os.getenv("ACTIVELOOP_ORG_ID")
WORKSPACE_ID = os.getenv("ACTIVELOOP_WORKSPACE_ID")
BASE_URL = os.getenv("BASE_URL", "https://api-beta.deeplake.ai")
TOKEN = os.getenv("TEST_PG_MANAGED_TOKEN")

DLPG_HOST = os.getenv("SHARED_DLPG_HOST")
DLPG_PORT = int(os.getenv("SHARED_DLPG_PORT", "5432"))
DLPG_USER = os.getenv("SHARED_DLPG_USER", "postgres")
DLPG_PASSWORD = os.getenv("SHARED_DLPG_PASSWORD")

ENABLE_EMBEDDINGS = os.getenv("ENABLE_EMBEDDINGS", "false").lower() in ("true", "1", "yes")

def get_connection() -> ConnectionManager:
    return connect(
        org_id=ORG_ID,
        workspace_id=WORKSPACE_ID,
        host=DLPG_HOST,
        port=DLPG_PORT,
        user=DLPG_USER,
        password=DLPG_PASSWORD,
    )

def run_query(conn: ConnectionManager, sql: str):
    with conn.get_connection() as pg_conn:
        with pg_conn.cursor() as cur:
            cur.execute(sql)
            rows = cur.fetchall() if cur.description else []
        pg_conn.commit()
    return rows

def create_table(conn: ConnectionManager, table_name: str, schema: dict[str, str]) -> None:
    cols = ", ".join(f"{k} {v}" for k, v in schema.items())
    sql = f'CREATE TABLE IF NOT EXISTS "{WORKSPACE_ID}"."{table_name}" ({cols}) USING deeplake'
    with conn.get_connection() as pg_conn:
        with pg_conn.cursor() as cur:
            cur.execute(sql)
        pg_conn.commit()

def upload_image(pil_img: Image.Image, filename: str, max_retries: int = 3) -> Optional[str]:
    url = f"{BASE_URL}/workspaces/{WORKSPACE_ID}/files"
    buf = io.BytesIO()
    pil_img.save(buf, format="PNG")
    img_bytes = buf.getvalue()
    headers = {"Authorization": f"Bearer {TOKEN}", "X-Activeloop-Org-Id": ORG_ID}
    for attempt in range(max_retries):
        try:
            res = requests.post(url, headers=headers, files={"file": (filename, img_bytes, "image/png")}, timeout=60)
            if res.status_code in (200, 201):
                return res.json().get("id")
            if res.status_code in (429, 502, 503, 504):
                time.sleep((2 ** attempt) + 0.5)
                continue
            return None
        except requests.RequestException:
            time.sleep((2 ** attempt) + 0.5)
    return None

def upload_files_parallel(images: list[Image.Image], image_ids: list[int], max_workers: int = 8) -> list[Optional[str]]:
    file_ids: list[Optional[str]] = [None] * len(images)
    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        futs = {
            ex.submit(upload_image, images[i], f"coco_{image_ids[i]}.png"): i
            for i in range(len(images))
        }
        for fut in as_completed(futs):
            i = futs[fut]
            file_ids[i] = fut.result()
    return file_ids

def convert_bbox_to_coco(bbox_xyxy: list[float]) -> list[float]:
    xmin, ymin, xmax, ymax = bbox_xyxy
    return [xmin, ymin, xmax - xmin, ymax - ymin]

def main():
    table_name = "coco_dataset_v3_pg"
    conn = get_connection()

    schema: dict[str, str] = {
        "id": "BIGSERIAL PRIMARY KEY",
        "image_id": "BIGINT",
        "width": "INT",
        "height": "INT",
        "bboxes": "JSONB",
        "category_ids": "JSONB",
        "file_id": "UUID",
        "metadata": "JSONB",
    }
    if ENABLE_EMBEDDINGS:
        schema["colqwen_embedding"] = "FLOAT4[][]"

    create_table(conn, table_name, schema)

    qwen_encoder = None
    if ENABLE_EMBEDDINGS:
        import colqwen_encoder as qwen_encoder  # requires a local module

    ds = load_dataset("detection-datasets/coco", split="train", streaming=True)

    BATCH_SIZE = 12
    MAX_ROWS = 500  # increase for real ingestion

    batch_imgs: list[Image.Image] = []
    batch_meta: list[dict] = []
    processed = 0

    for item in ds:
        if MAX_ROWS is not None and processed >= MAX_ROWS:
            break
        img = item["image"].convert("RGB")

        raw = item.get("objects", {})
        bboxes = [convert_bbox_to_coco(b) for b in raw.get("bbox", [])]
        category_ids = list(raw.get("category", []))

        meta = {
            "image_id": int(item["image_id"]),
            "width": int(item["width"]),
            "height": int(item["height"]),
            "bboxes": bboxes,
            "category_ids": category_ids,
            "metadata": {"dataset": "coco", "original_id": int(item["image_id"])},
        }

        batch_imgs.append(img)
        batch_meta.append(meta)

        if len(batch_imgs) < BATCH_SIZE:
            continue

        image_ids = [m["image_id"] for m in batch_meta]
        file_ids = upload_files_parallel(batch_imgs, image_ids)

        embeddings = None
        if ENABLE_EMBEDDINGS and qwen_encoder is not None:
            embeddings = qwen_encoder.encode_images(batch_imgs)

        rows = []
        for i in range(len(batch_imgs)):
            if file_ids[i] is None:
                continue
            if ENABLE_EMBEDDINGS and embeddings is not None:
                rows.append((
                    batch_meta[i]["image_id"],
                    batch_meta[i]["width"],
                    batch_meta[i]["height"],
                    json.dumps(batch_meta[i]["bboxes"]),
                    json.dumps(batch_meta[i]["category_ids"]),
                    embeddings[i].tolist(),
                    file_ids[i],
                    json.dumps(batch_meta[i]["metadata"]),
                ))
            else:
                rows.append((
                    batch_meta[i]["image_id"],
                    batch_meta[i]["width"],
                    batch_meta[i]["height"],
                    json.dumps(batch_meta[i]["bboxes"]),
                    json.dumps(batch_meta[i]["category_ids"]),
                    file_ids[i],
                    json.dumps(batch_meta[i]["metadata"]),
                ))

        if ENABLE_EMBEDDINGS and embeddings is not None:
            sql = f'''
                INSERT INTO "{WORKSPACE_ID}"."{table_name}"
                (image_id, width, height, bboxes, category_ids, colqwen_embedding, file_id, metadata)
                VALUES (%s, %s, %s, %s::jsonb, %s::jsonb, %s::float4[][], %s::uuid, %s::jsonb)
            '''
        else:
            sql = f'''
                INSERT INTO "{WORKSPACE_ID}"."{table_name}"
                (image_id, width, height, bboxes, category_ids, file_id, metadata)
                VALUES (%s, %s, %s, %s::jsonb, %s::jsonb, %s::uuid, %s::jsonb)
            '''

        with conn.get_connection() as pg_conn:
            with pg_conn.cursor() as cur:
                cur.executemany(sql, rows)
            pg_conn.commit()

        processed += len(rows)
        batch_imgs, batch_meta = [], []

    if ENABLE_EMBEDDINGS:
        run_query(conn, f'CREATE INDEX IF NOT EXISTS idx_coco_colqwen_vec ON "{WORKSPACE_ID}"."{table_name}" USING deeplake_index (colqwen_embedding DESC)')

        q_text = "a photo of a city skyline"
        q_emb = qwen_encoder.encode_texts([q_text])[0].tolist()
        sql = f'''
            SELECT image_id, file_id,
                   colqwen_embedding <#> ARRAY{q_emb}::float4[][] AS score
            FROM "{WORKSPACE_ID}"."{table_name}"
            ORDER BY score DESC
            LIMIT 5
        '''
        print(run_query(conn, sql))

    conn.close()

if __name__ == "__main__":
    main()

Query Helpers (Single Query And Batch Query)

Use a single-query endpoint for DDL and ad-hoc queries. Use batch for inserts.

def run_query(query: str, params: list | None = None) -> dict:
    url = f"{BASE_URL}/workspaces/{WORKSPACE_ID}/tables/query"
    payload = {"query": query}
    if params is not None:
        payload["params"] = params
    res = requests.post(url, headers=headers_json, json=payload)
    return res.json() if res.status_code == 200 else {}

def batch_insert(query: str, params_batch: list[list], timeout_s: int = 120) -> dict:
    url = f"{BASE_URL}/workspaces/{WORKSPACE_ID}/tables/query/batch"
    payload = {"query": query, "params_batch": params_batch}
    res = requests.post(url, headers=headers_json, json=payload, timeout=timeout_s)
    return res.json() if res.status_code == 200 else {}

Load COCO In Streaming Mode (No Local Dataset Explosion)

Streaming keeps memory stable. It also matches how you ingest lake-scale data.

from datasets import load_dataset

dataset = load_dataset("detection-datasets/coco", split="train", streaming=True)

You will still do batching. You control throughput. You control cost.

Compute Embeddings (Multi-Vector / Late Interaction)

ColQwen returns embeddings shaped like (batch, tokens, dim). This is a “bag of vectors”. It enables MaxSim-style scoring. It improves retrieval for structured visuals.

You need an embedding model that outputs multi-vector embeddings. Any compatible encoder works.

emb_img = encode_images([pil_image])[0]  # (tokens, dim)
emb_txt = encode_texts(["a city skyline"])[0]  # (tokens, dim)

Keep embeddings as float32 before sending them. It is predictable. It is portable.

Upload Images (Files API) And Store UUIDs

Files are uploaded with multipart form-data. Do not send JSON Content-Type.

import io
from PIL import Image

def upload_image(pil_img: Image.Image, filename: str) -> str | None:
    url = f"{BASE_URL}/workspaces/{WORKSPACE_ID}/files"
    buf = io.BytesIO()
    pil_img.save(buf, format="PNG")
    files = {"file": (filename, buf.getvalue(), "image/png")}

    res = requests.post(url, headers=headers_upload, files=files, timeout=60)
    if res.status_code in (200, 201):
        out = res.json()
        return out.get("id")
    return None

This UUID is your stable join key between “lake files” and “SQL rows”.

Ingestion Strategy (Batching, Parallelism, And Backpressure)

You have three expensive steps:

  • Embedding compute: GPU-bound.
  • File upload: network-bound.
  • SQL insert: service-bound.

Treat them as a pipeline. Measure each stage. Tune each stage.

One mental model helps:

  • Your dataset is a table.
  • Your storage is /files.
  • Your training set is a query.

Good defaults (for a laptop + one GPU):

  • Batch size: 8–32 images for embedding compute.
  • Upload workers: 4–8 threads.
  • Insert batch: 8–64 rows per batch call.

Keep sentences short in code. Keep loops simple. Prefer correctness over peak throughput.

If a runnable starting point is needed, the example below includes:

  • Retry + exponential backoff for transient API errors.
  • Parallel file uploads with a bounded thread pool.
  • A stable schema for COCO + multi-vector embeddings.
  • A working <#> query for multi-vector search.

A Minimal Ingestion Loop (Readable, Not Fastest)

Install dependencies for this block:

pip install torch==2.8.0 torchvision==0.23.0 transformers==4.57.3 accelerate pillow

# Optional: faster attention for some GPU setups
# pip install flash-attn
import json
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

import torch
from transformers import AutoModel, AutoProcessor

# ColQwen3 is a strong default for multi-vector embeddings.
# It returns (tokens, dim) embeddings for both text and images.
MODEL_ID = "TomoroAI/tomoro-colqwen3-embed-4b"
DTYPE = torch.bfloat16
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

_processor = None
_model = None

def _get_model_and_processor():
    global _processor, _model
    if _model is None:
        _processor = AutoProcessor.from_pretrained(
            MODEL_ID,
            trust_remote_code=True,
            max_num_visual_tokens=1280,
        )
        _model = AutoModel.from_pretrained(
            MODEL_ID,
            torch_dtype=DTYPE,
            trust_remote_code=True,
            attn_implementation="flash_attention_2" if DEVICE == "cuda" else "sdpa",
            device_map=DEVICE,
        ).eval()
    return _model, _processor

def encode_images(images, batch_size: int = 4):
    """Return list of embeddings shaped (tokens, dim)."""
    model, processor = _get_model_and_processor()
    outputs = []
    for start in range(0, len(images), batch_size):
        batch_imgs = images[start : start + batch_size]
        feats = processor.process_images(images=batch_imgs)
        feats = {k: v.to(DEVICE) if isinstance(v, torch.Tensor) else v for k, v in feats.items()}
        with torch.inference_mode():
            out = model(**feats)
            vecs = out.embeddings.to(torch.float32).cpu()
        outputs.extend(vecs)
    return outputs

def encode_texts(texts, batch_size: int = 16):
    """Return list of embeddings shaped (tokens, dim)."""
    model, processor = _get_model_and_processor()
    outputs = []
    for start in range(0, len(texts), batch_size):
        batch = processor.process_texts(texts=texts[start : start + batch_size])
        batch = {k: v.to(DEVICE) for k, v in batch.items()}
        with torch.inference_mode():
            out = model(**batch)
            vecs = out.embeddings.to(torch.float32).cpu()
        outputs.extend(vecs)
    return outputs

BATCH_SIZE = 16
UPLOAD_WORKERS = 8

def process_one_batch(batch_items: list[dict]):
    # 1) Prepare images and metadata.
    images = []
    meta = []
    for item in batch_items:
        img = item["image"].convert("RGB")
        images.append(img)

        raw = item.get("objects", {})
        bboxes = [convert_bbox_to_coco(b) for b in raw.get("bbox", [])]
        category_ids = list(raw.get("category", []))

        meta.append({
            "image_id": item["image_id"],
            "width": item["width"],
            "height": item["height"],
            "bboxes_json": json.dumps(bboxes),
            "category_ids_json": json.dumps(category_ids),
            "metadata_json": json.dumps({"dataset": "coco", "original_id": item["image_id"]}),
        })

    # 2) Compute embeddings (GPU-bound).
    emb = encode_images(images)  # list[torch.Tensor(tokens, dim)]

    # 3) Upload files (network-bound).
    file_ids: list[str | None] = [None] * len(images)
    with ThreadPoolExecutor(max_workers=UPLOAD_WORKERS) as ex:
        futures = {
            ex.submit(upload_image, images[i], f'coco_{meta[i]["image_id"]}.png'): i
            for i in range(len(images))
        }
        for fut in as_completed(futures):
            i = futures[fut]
            file_ids[i] = fut.result()

    # 4) Insert rows (service-bound).
    for i in range(len(images)):
        emb_literal = format_2d_array(emb[i].tolist())
        sql = insert_sql.replace("{EMB}", emb_literal)
        params = [[
            meta[i]["image_id"],
            meta[i]["width"],
            meta[i]["height"],
            meta[i]["bboxes_json"],
            meta[i]["category_ids_json"],
            file_ids[i],
            meta[i]["metadata_json"],
        ]]
        batch_insert(sql, params)

        # A small delay prevents bursty overload.
        time.sleep(0.05)

Convert Bbox To COCO Style

COCO boxes are [x, y, width, height].

def convert_bbox_to_coco(bbox_xyxy: list[float]) -> list[float]:
    xmin, ymin, xmax, ymax = bbox_xyxy
    return [xmin, ymin, xmax - xmin, ymax - ymin]

Insert Rows (And Format Multi-Vector Arrays Safely)

Most APIs accept tensors as JSON lists. SQL still needs typing.

Two safe options exist:

  • Pass the embedding as a JSON list and cast server-side.
  • Inline the embedding as an ARRAY[...]::float4[][] literal.

The end-to-end example uses the literal approach. Here is a clean formatter.

def format_2d_array(arr_2d: list[list[float]]) -> str:
    inner = ", ".join(
        "ARRAY[" + ", ".join(str(v) for v in row) + "]" for row in arr_2d
    )
    return f"ARRAY[{inner}]::float4[][]"

Then insert. Keep bboxes and category_ids as JSON strings. Cast to jsonb.

insert_sql = (
    f'INSERT INTO "{WORKSPACE_ID}"."{table_name}" '
    f"(image_id, width, height, bboxes, category_ids, colqwen_embedding, file_id, metadata) "
    f"VALUES ($1, $2, $3, $4::jsonb, $5::jsonb, {{EMB}}, $6::uuid, $7::jsonb)"
)

You will substitute {EMB} per row because it is a literal. That is okay for small batches. For very large ingestion, consider a server-side cast approach.

Verify Ingestion Quickly

Run a count query. Do it early. It catches most mistakes.

count = run_query(f'SELECT COUNT(*) FROM "{WORKSPACE_ID}"."{table_name}"')
print(count.get("rows", [[0]])[0][0])

Build The Deeplake Index (Index-On-The-Lake)

Deep Lake PG emphasizes indexed queries that can run directly over object storage. This is presented as “Index-On-The-Lake” in the announcement blog post.

Create an index on the multi-vector column.

run_query(
    f'CREATE INDEX idx_coco_colqwen_vec '
    f'ON "{WORKSPACE_ID}"."{table_name}" '
    f"USING deeplake_index (colqwen_embedding DESC);"
)

Index creation is a one-time cost. It pays back at query time.

Search With MaxSim-Style Scoring (<#> On Multi-Vector)

Multi-vector search needs a different scoring rule than cosine-on-one-vector.

The operator <#> is used as the “embedding distance / similarity” primitive. With multi-vector tensors, it behaves like a late-interaction scorer (MaxSim-like behavior).

Encode the query.

query_text = "a photo of a city skyline"
query_emb = encode_texts([query_text])[0].tolist()  # (tokens, dim) as nested lists

Run the search.

maxsim_sql = f"""
SELECT image_id, width, height, bboxes, category_ids,
       colqwen_embedding <#> ARRAY{query_emb}::float4[][] AS score
FROM "{WORKSPACE_ID}"."{table_name}"
ORDER BY score DESC
LIMIT 10;
"""

res = run_query(maxsim_sql)
for row in res.get("rows", []):
    image_id = row[0]
    score = row[-1]
    print(image_id, score)

Then pull the files by file_id if your frontend needs images.

Practical Filtering Patterns (Why JSONB Is Useful)

You can filter before ranking. You can reduce compute. You can reduce cost.

Examples:

  • Filter by image shape.
  • Filter by category id.
  • Filter by metadata flags.
SELECT image_id,
       colqwen_embedding <#> ARRAY[...]::float4[][] AS score
FROM "workspace"."coco_multi_modal_search"
WHERE width >= 640 AND height >= 480
ORDER BY score DESC
LIMIT 10;

This is a key “SQL-first retrieval” advantage.

Reliability Checklist (Things That Break First)

  • Wrong headers for uploads: Do not send Content-Type: application/json with multipart.
  • Missing org header: Always send X-Activeloop-Org-Id.
  • Embedding dtype drift: Normalize to float32 before serialization.
  • Array literal bugs: Ensure nested arrays are consistent. Avoid None values.
  • Rate limits: Use retry with exponential backoff for 429/502/503/504.

The end-to-end example already includes pragmatic retry logic. Reuse it.

Use Case: Video Ingestion And Retrieval (Physical AI)

This section demonstrates a video workflow with chunking, embedding, indexing, retrieval, and training set selection.

This is the workflow:

  • Download videos from Hugging Face.
  • Chunk videos into time windows.
  • Embed each chunk with ColQwen3-Video.
  • Upload each chunk to /files.
  • Insert metadata + embeddings into a table.
  • Create indexes.
  • Retrieve only the few chunks relevant to a training objective.

This turns “collect logs” into “query training data”.

Why Chunk Videos

Long videos are hard to index and label. Chunking provides time-aware retrieval, making the unit of supervision a single clip.

The ingestion process uses ffmpeg with stream copy (-c copy) to avoid re-encoding.

Control the process with environment variables:

VIDEO_CHUNK_DURATION_S=20
INSERT_BATCH_SIZE=1
SKIP_EMBEDDINGS=false

Set VIDEO_CHUNK_DURATION_S=0 to disable chunking.

Postgres-First Tables

Metadata and embeddings are stored in Postgres (Deep Lake PG) using deeplake.db. Binary assets (MP4 chunks) are stored in the HTTP /files API and referenced via file_id UUIDs.

Download Videos From Hugging Face (Smart Spaces)

The example dataset is nvidia/PhysicalAI-SmartSpaces. Chunks are downloaded directly from the repository.

from huggingface_hub import HfApi, hf_hub_download
from pathlib import Path

DATASET_REPO = "nvidia/PhysicalAI-SmartSpaces"
DATASET_BASE = "MTMC_Tracking_2024/train"

api = HfApi()
scenes = [
    item
    for item in api.list_repo_tree(
        DATASET_REPO, repo_type="dataset", path_in_repo=DATASET_BASE
    )
    if hasattr(item, "path") and "scene_" in item.path
]

video_paths: list[Path] = []
metadata_items: list[dict] = []

max_videos = 1
count = 0

for scene in scenes:
    if count >= max_videos:
        break
    scene_name = scene.path.split("/")[-1]
    cameras = list(api.list_repo_tree(DATASET_REPO, repo_type="dataset", path_in_repo=scene.path))
    for cam in cameras:
        if count >= max_videos:
            break
        cam_name = cam.path.split("/")[-1]
        if not cam_name.startswith("camera_"):
            continue

        video_repo_path = f"{cam.path}/video.mp4"
        local_path = hf_hub_download(
            repo_id=DATASET_REPO,
            repo_type="dataset",
            filename=video_repo_path,
        )
        video_paths.append(Path(local_path))
        metadata_items.append(
            {"video_id": f"vid_{count}", "scene_name": scene_name, "camera_id": cam_name}
        )
        count += 1

Embed Videos With ColQwen3-Video

This step is optional. If an embedding encoder is not used, set SKIP_EMBEDDINGS=true.

from pathlib import Path
import colqwen_video_encoder as ve

emb = ve.encode_videos([Path("chunk.mp4")])[0]  # torch.Tensor(tokens, dim)
q = ve.encode_texts(["robot arm picking up an object"])[0].tolist()  # nested lists

Create A Table For Chunk-Level Storage

The schema is designed for time-aware retrieval and training.

table_name = "api_test_videos_colqwen_v2_pg"
schema = {
    "id": "BIGSERIAL PRIMARY KEY",
    "video_id": "TEXT",
    "scene_name": "TEXT",
    "chunk_index": "INT",
    "start_s": "FLOAT4",
    "end_s": "FLOAT4",
    "colqwen_embedding": "FLOAT4[][]",
    "file_id": "UUID",
    "metadata": "JSONB",
}

Minimal Postgres Helpers (DLPG Via deeplake.db)

Table operations use Postgres connectivity.

import os
from dotenv import load_dotenv
from deeplake.db import connect, ConnectionManager

load_dotenv()

ORG_ID = os.getenv("ACTIVELOOP_ORG_ID")
WORKSPACE_ID = os.getenv("ACTIVELOOP_WORKSPACE_ID")
DLPG_HOST = os.getenv("SHARED_DLPG_HOST")
DLPG_PORT = int(os.getenv("SHARED_DLPG_PORT", "5432"))
DLPG_USER = os.getenv("SHARED_DLPG_USER", "postgres")
DLPG_PASSWORD = os.getenv("SHARED_DLPG_PASSWORD")

def get_connection() -> ConnectionManager:
    return connect(
        org_id=ORG_ID,
        workspace_id=WORKSPACE_ID,
        host=DLPG_HOST,
        port=DLPG_PORT,
        user=DLPG_USER,
        password=DLPG_PASSWORD,
    )

def create_table(conn: ConnectionManager, table_name: str, schema: dict[str, str]) -> None:
    cols = ", ".join(f"{k} {v}" for k, v in schema.items())
    sql = f'CREATE TABLE IF NOT EXISTS "{WORKSPACE_ID}"."{table_name}" ({cols}) USING deeplake'
    with conn.get_connection() as pg_conn:
        with pg_conn.cursor() as cur:
            cur.execute(sql)
        pg_conn.commit()

Upload Chunks To /files (HTTP) And Store UUIDs In Postgres

Binary MP4 bytes are uploaded via HTTP multipart requests.

import io
import os
import requests
from typing import Optional

BASE_URL = os.getenv("BASE_URL", "https://api-beta.deeplake.ai")
TOKEN = os.getenv("TEST_PG_MANAGED_TOKEN")

def upload_video_file(video_bytes: bytes, filename: str) -> Optional[str]:
    url = f"{BASE_URL}/workspaces/{WORKSPACE_ID}/files"
    headers = {"Authorization": f"Bearer {TOKEN}", "X-Activeloop-Org-Id": ORG_ID}
    buffer = io.BytesIO(video_bytes)
    files = {"file": (filename, buffer, "video/mp4")}
    res = requests.post(url, headers=headers, files=files, timeout=120)
    return res.json().get("id") if res.status_code in (200, 201) else None

Indexing: Vector Retrieval + Exact Text Filtering

Deeplake indexes provide fast similarity search and text filtering.

with get_connection().get_connection() as pg_conn:
    with pg_conn.cursor() as cur:
        cur.execute(
            f'CREATE INDEX IF NOT EXISTS idx_video_vec ON "{WORKSPACE_ID}"."{table_name}" '
            f"USING deeplake_index (colqwen_embedding DESC)"
        )
        cur.execute(
            f'CREATE INDEX IF NOT EXISTS idx_video_scene ON "{WORKSPACE_ID}"."{table_name}" '
            "USING deeplake_index (scene_name) WITH (index_type = 'exact_text')"
        )
    pg_conn.commit()

Retrieval For Fine-Tuning

Training sets are selected by retrieving top-k chunks matching a learning objective.

search_queries = [
    "robot arm picking up an object from a table",
    "autonomous vehicle navigating through obstacles",
    "robotic gripper manipulating small components",
    "human-robot collaboration in assembly task",
    "drone footage of indoor warehouse navigation",
]

retrieved_chunks = []
conn = get_connection()
for q_text in search_queries:
    q_emb = ve.encode_texts([q_text])[0].tolist()  # (tokens, dim) nested lists
    sql = f"""
    SELECT video_id, scene_name, chunk_index, start_s, end_s, file_id,
           colqwen_embedding <#> ARRAY{q_emb}::float4[][] AS score
    FROM "{WORKSPACE_ID}"."{table_name}"
    ORDER BY score DESC
    LIMIT 3;
    """
    with conn.get_connection() as pg_conn:
        with pg_conn.cursor() as cur:
            cur.execute(sql)
            rows = cur.fetchall()
            pg_conn.commit()
    for (video_id, scene_name, chunk_idx, start_s, end_s, file_id, score) in rows:
        retrieved_chunks.append({
            "query": q_text,
            "video_id": video_id,
            "scene_name": scene_name,
            "chunk_index": chunk_idx,
            "start_s": start_s,
            "end_s": end_s,
            "file_id": file_id,
            "score": score,
        })

Download Retrieved Clips

Clips are retrieved directly from /files for inclusion in a training set.

from pathlib import Path

training_dir = Path("/tmp/training_chunks")
training_dir.mkdir(parents=True, exist_ok=True)

for item in retrieved_chunks:
    file_id = item["file_id"]
    if not file_id:
        continue
    url = f"{BASE_URL}/workspaces/{WORKSPACE_ID}/files/{file_id}/download"
    res = requests.get(url, headers={"Authorization": f"Bearer {TOKEN}", "X-Activeloop-Org-Id": ORG_ID})
    if res.status_code == 200:
        out = training_dir / f'{item["video_id"]}_chunk{item["chunk_index"]}.mp4'
        out.write_bytes(res.content)

Notes For Generative Media

The same retrieval loop applies to creative libraries. Semantic search facilitates finding assets based on descriptive intent (e.g., "shot like this, but at night").

Use Case: Agentic Loops (Traces, Memory, And Retrieval)

This section demonstrates traces upload and multi-agent memory workflows.

This use case covers three concrete building blocks:

  • Traces ingestion into a durable Postgres table (JSONB conversations).
  • A “memory table” with embedding + BM25 indexes for retrieval.
  • Multi-agent tools that query memory before responding.

Upload Agent Trajectories (SWE Traces)

The SWE traces dataset comes from Hugging Face. Each row contains a full conversation trace stored as JSONB.

The reference schema:

schema = {
    "id": "BIGSERIAL PRIMARY KEY",
    "agent": "TEXT",
    "model": "TEXT",
    "model_provider": "TEXT",
    "date": "TEXT",
    "task": "TEXT",
    "episode": "TEXT",
    "run_id": "TEXT",
    "trial_name": "TEXT",
    "conversations": "JSONB",
}

The ingestion loop is designed to be operationally stable:

  • It resumes based on COUNT(*).
  • It uses batch inserts.
  • It adapts batch size on transient failures.

This is the core loop shape:

from datasets import load_dataset
import json

table_name = "swe_agent_trajectories2"
ds = load_dataset(
    "DCAgent/neulab-nebius-swe-agent-trajectories-sandboxes-traces-terminus-2",
    split="train",
)

insert_query = (
    f'INSERT INTO "{table_name}" '
    f"(agent, model, model_provider, date, task, episode, run_id, trial_name, conversations) "
    f"VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::jsonb)"
)

This provides a durable source table treated as append-only.

Create A Memory Table (Copy + Embedding Column)

The source table is rich but not indexed for semantic retrieval. A “memory table” is created for this purpose.

The reference pattern:

  • CREATE TABLE AS SELECT * FROM source LIMIT ...
  • ALTER TABLE ADD COLUMN embedding FLOAT4[]
  • Create vector + BM25 indexes
CREATE TABLE IF NOT EXISTS "swe_agent_memory_full" AS
SELECT * FROM "swe_agent_trajectories2" LIMIT 1000;

ALTER TABLE "swe_agent_memory_full"
ADD COLUMN IF NOT EXISTS embedding FLOAT4[];

Indexing For Retrieval (Vector + BM25)

Vector index:

CREATE INDEX IF NOT EXISTS idx_memory_full_vec
ON "workspace"."swe_agent_memory_full"
USING deeplake_index (embedding DESC);

BM25 index on the field for keyword search (e.g., task):

CREATE INDEX IF NOT EXISTS idx_memory_full_bm25
ON "workspace"."swe_agent_memory_full"
USING deeplake_index (task) WITH (index_type = 'bm25');

This provides two “fast paths,” which Hybrid search combines.

Generate Embeddings (OpenRouter) And Update Rows

OpenRouter embeddings are generated for the task text and written into the embedding column.

Environment variables:

# Deep Lake PG org token (used for HTTP APIs like /files, /tables/query)
ACTIVELOOP_ORG_ID=...
ACTIVELOOP_WORKSPACE_ID=...
BASE_URL=https://api-beta.deeplake.ai

# Direct Postgres connectivity (used for memory table, indexes, retrieval)
SHARED_DLPG_HOST=...
SHARED_DLPG_PORT=5432
SHARED_DLPG_USER=postgres
SHARED_DLPG_PASSWORD=...
SHARED_DLPG_DATABASE=postgres

# Embeddings + optional agent runtime
OPENAI_API_KEY=...          # Optional (only if using Pydantic AI agents)

Python implementation for embedding tasks and updating the table:

import os
import requests
import psycopg2
from dotenv import load_dotenv

load_dotenv()

OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
OPENROUTER_MODEL = os.getenv("OPENROUTER_EMBED_MODEL", "mistralai/codestral-embed-2505")

def embed_batch(texts: list[str]) -> list[list[float]]:
    res = requests.post(
        "https://openrouter.ai/api/v1/embeddings",
        headers={"Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json"},
        json={"model": OPENROUTER_MODEL, "input": texts},
        timeout=60,
    )
    res.raise_for_status()
    data = res.json()
    return [item["embedding"] for item in data["data"]]

conn = psycopg2.connect(
    host=os.getenv("SHARED_DLPG_HOST"),
    port=os.getenv("SHARED_DLPG_PORT", "5432"),
    user=os.getenv("SHARED_DLPG_USER", "postgres"),
    password=os.getenv("SHARED_DLPG_PASSWORD"),
    database=os.getenv("SHARED_DLPG_DATABASE", "postgres"),
    sslmode="require",
)

TABLE = "swe_agent_memory_full"

with conn.cursor() as cur:
    cur.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake")
    conn.commit()

# Pull rows without embeddings
with conn.cursor() as cur:
    cur.execute(f'SELECT id, task FROM "{TABLE}" WHERE embedding IS NULL LIMIT 200')
    rows = cur.fetchall()

ids = [r[0] for r in rows]
tasks = [r[1] or "" for r in rows]

if tasks:
    embs = embed_batch(tasks)
    with conn.cursor() as cur:
        for _id, emb in zip(ids, embs):
            cur.execute(f'UPDATE "{TABLE}" SET embedding = %s WHERE id = %s', (emb, _id))
        conn.commit()

conn.close()

Retrieval Modes (BM25, Vector, Hybrid)

Retrieval modes:

  • BM25: exact strings, identifiers, error codes.
  • Vector: conceptual similarity and paraphrases.
  • Hybrid: combined lexical + semantic signal (default).

BM25 Search (Keyword Precision)

BM25 is used for exact matches on text columns like task.

SELECT task, episode, model,
       task <#> 'fix authentication bug' AS bm25_score
FROM "swe_agent_memory_full"
ORDER BY bm25_score ASC
LIMIT 10;

Python implementation:

import os
import psycopg2
from dotenv import load_dotenv

load_dotenv()

conn = psycopg2.connect(
    host=os.getenv("SHARED_DLPG_HOST"),
    port=os.getenv("SHARED_DLPG_PORT", "5432"),
    user=os.getenv("SHARED_DLPG_USER", "postgres"),
    password=os.getenv("SHARED_DLPG_PASSWORD"),
    database=os.getenv("SHARED_DLPG_DATABASE", "postgres"),
    sslmode="require",
)

query_text = "fix authentication bug"

with conn.cursor() as cur:
    # Note: BM25 <#> operator requires string literal for the query
    cur.execute(f"""
        SELECT task, episode, model,
               task <#> '{query_text.replace("'", "''")}' AS bm25_score
        FROM "swe_agent_memory_full"
        ORDER BY bm25_score ASC
        LIMIT 10
    """)
    results = cur.fetchall()
    for row in results:
        print(row)

conn.close()

Vector Search (Semantic Similarity)

Vector search finds conceptual matches using embeddings.

SELECT task, episode, model,
       embedding <#> ARRAY[...]::float4[] AS similarity
FROM "swe_agent_memory_full"
ORDER BY similarity ASC
LIMIT 10;

Hybrid Search (Combined Signal)

Hybrid search mixes semantic and lexical signals using deeplake_hybrid_record.

SELECT task, episode, model,
    (embedding, task) <#> deeplake_hybrid_record(
        ARRAY[...]::float4[],
        'fix authentication bug',
        0.5, 0.5
    ) AS score
FROM "swe_agent_memory_full"
ORDER BY score ASC
LIMIT 10;

Python implementation for Hybrid Search:

import os
import requests
import psycopg2
from dotenv import load_dotenv

load_dotenv()

OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
OPENROUTER_MODEL = os.getenv("OPENROUTER_EMBED_MODEL", "mistralai/codestral-embed-2505")

def embed_query(text: str) -> list[float]:
    res = requests.post(
        "https://openrouter.ai/api/v1/embeddings",
        headers={"Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json"},
        json={"model": OPENROUTER_MODEL, "input": [text]},
        timeout=60,
    )
    res.raise_for_status()
    return res.json()["data"][0]["embedding"]

conn = psycopg2.connect(
    host=os.getenv("SHARED_DLPG_HOST"),
    port=os.getenv("SHARED_DLPG_PORT", "5432"),
    user=os.getenv("SHARED_DLPG_USER", "postgres"),
    password=os.getenv("SHARED_DLPG_PASSWORD"),
    database=os.getenv("SHARED_DLPG_DATABASE", "postgres"),
    sslmode="require",
)

query_text = "fix authentication bug"
query_emb = embed_query(query_text)

with conn.cursor() as cur:
    # Build the hybrid record literal
    emb_str = "ARRAY[" + ",".join(map(str, query_emb)) + "]::float4[]"
    safe_query = query_text.replace("'", "''")

    cur.execute(f"""
        SELECT task, episode, model,
               (embedding, task) <#> deeplake_hybrid_record(
                   {emb_str},
                   '{safe_query}',
                   0.5, 0.5
               ) AS score
        FROM "swe_agent_memory_full"
        WHERE embedding IS NOT NULL
        ORDER BY score ASC
        LIMIT 10
    """)
    results = cur.fetchall()
    for row in results:
        print(row)

conn.close()

Multi-Agent Workflows (Memory-Backed Tools)

The reference builds a multi-agent system using Pydantic AI. Agents call tools that query memory.

It includes:

  • Code Assistant: searches memory before answering.
  • Debugging Oracle: searches “error + description”.
  • Strategy Learner: summarizes patterns across similar tasks.

The important product idea: memory is a shared substrate; agents differ only in how they query it.

Deterministic Replay For Debugging

Every trace row contains conversations as JSON. That enables replay.

run_id is stored as text in each trace row. List a few run_id values, then fetch the full trace with Python:

import os
import psycopg2
from dotenv import load_dotenv

load_dotenv()

conn = psycopg2.connect(
    host=os.getenv("SHARED_DLPG_HOST"),
    port=os.getenv("SHARED_DLPG_PORT", "5432"),
    user=os.getenv("SHARED_DLPG_USER", "postgres"),
    password=os.getenv("SHARED_DLPG_PASSWORD"),
    database=os.getenv("SHARED_DLPG_DATABASE", "postgres"),
    sslmode="require",
)

with conn.cursor() as cur:
    cur.execute("""
        SELECT run_id, agent, model, task
        FROM "swe_agent_trajectories2"
        WHERE run_id IS NOT NULL AND run_id <> ''
        LIMIT 10
    """)
    sample = cur.fetchall()
    print("Sample run_id values:")
    for run_id, agent, model, task in sample:
        print(run_id, agent, model, task[:80])

run_id = sample[0][0] if sample else None
if run_id:
    with conn.cursor() as cur:
        cur.execute(
            """
            SELECT agent, model, task, episode, conversations
            FROM "swe_agent_trajectories2"
            WHERE run_id = %s
            LIMIT 1
            """,
            (run_id,),
        )
        row = cur.fetchone()
        print("\nFull trace:")
        print(row)

conn.close()

This makes debugging concrete: same inputs, same context, repeatable investigations.

“Train On A Few” (No ETL In Between)

This is the target loop:

  • Write the training intent as text.
  • Retrieve top-k traces or chunks.
  • Train only on those.

Because everything is SQL-addressable, your training set is a query. That is the “no ETL” claim in practice.

Related docs and references: