Skip to content

Agent Skills Reference

Agent-friendly SDK for ingesting data into Deeplake managed tables. Use this skill when users want to store, ingest, or query data in Deeplake.


Prerequisites

Required services:

  • Deeplake API server running (default: https://api.deeplake.ai)
  • PostgreSQL with pg_deeplake extension installed

Optional dependencies (per file type):

  • Video ingestion: ffmpeg (sudo apt-get install ffmpeg)
  • PDF ingestion: pymupdf (pip install pymupdf)

Python import:

import os
from deeplake import Client
from deeplake.managed import ManagedServiceError, AuthError, IngestError, TableError, TokenError, WorkspaceError

Quick Reference

import os
from deeplake import Client

# Initialize -- token + workspace (org_id always extracted from token)
client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")

# Ingest files (FILE schema)
client.ingest("videos", {"path": ["video1.mp4", "video2.mp4"]}, schema={"path": "FILE"})
client.ingest("images", {"path": ["img1.jpg", "img2.png"]}, schema={"path": "FILE"})
client.ingest("docs", {"path": ["document.pdf", "notes.txt"]}, schema={"path": "FILE"})

# Ingest data directly (dict = column data)
client.ingest("embeddings", {
    "text": ["doc1", "doc2", "doc3"],
    "embedding": [[0.1, 0.2, ...], [0.3, 0.4, ...], [0.5, 0.6, ...]],
    "label": ["A", "B", "C"],
})

# Ingest from HuggingFace
client.ingest("cifar", {"_huggingface": "cifar10"})

# Ingest COCO panoptic dataset (format object)
from deeplake.managed.formats import CocoPanoptic
client.ingest("panoptic", format=CocoPanoptic(
    images_dir="coco/train2017",
    masks_dir="coco/panoptic_train2017",
    annotations="coco/annotations/panoptic_train2017.json",
))

# Fluent query API
results = client.table("videos").select("id", "text").limit(10)()

# Fluent query with filters
results = (
    client.table("videos")
        .select("id", "text", "start_time")
        .where("file_id = $1", "abc")
        .where("start_time > 60")
        .order_by("start_time")
        .limit(10)
)()

# Raw SQL still works
results = client.query("SELECT * FROM videos LIMIT 10")

# Vector similarity search (raw SQL)
emb_pg = "{" + ",".join(str(x) for x in query_embedding) + "}"
results = client.query("""
    SELECT id, text, embedding <#> $1::float4[] AS similarity
    FROM embeddings
    ORDER BY similarity DESC
    LIMIT 10
""", (emb_pg,))

Architecture

Client(token, workspace_id)
  ├─ .ingest(table, data)       → creates table, returns {table_name, row_count}
  ├─ .query(sql)                → list[dict]
  ├─ .table(table)...()         → fluent SQL builder → list[dict]
  ├─ .open_table(table)         → deeplake.Dataset (for ML training)
  ├─ .list_tables()             → list[str]
  └─ .drop_table(table)         → None
              PostgreSQL + pg_deeplake
  - Vector similarity: embedding <#> query_vec
  - BM25 text search:  text <#> 'search query'
  - Hybrid search:     (embedding, text)::deeplake_hybrid_record

Client Initialization

from deeplake import Client

# client = Client(token, workspace_id, *, api_url=None)
client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")

Parameters

Parameter Type Required Description
token str yes API token. org_id is extracted automatically from JWT claims.
workspace_id str yes Target workspace name.
api_url str no API base URL. Falls back to DEEPLAKE_API_URL env var, then https://api.deeplake.ai.

Examples

import os
from deeplake import Client

client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")
client = Client(
    token=os.environ["DEEPLAKE_API_KEY"],
    workspace_id="my-workspace",
    api_url="http://custom:8080",
)
with Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace") as client:
    client.ingest("table", {"path": ["file.txt"]}, schema={"path": "FILE"})
# Connection automatically closed
client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")
client.ingest("table", {"path": ["file.txt"]}, schema={"path": "FILE"})

Note

org_id is always extracted from the token JWT claims. If the token doesn't contain an OrgID claim, the client falls back to the /me API endpoint. Dataset root path is fetched from the PostgreSQL GUC deeplake.root_path.


client.ingest()

# client.ingest(table_name, data, *, schema=None, on_progress=None, chunk_size=1000, chunk_overlap=200)
result = client.ingest(
    table_name,
    data={"text": ["hello"]},
    schema=None,
    on_progress=None,
    chunk_size=1000,
    chunk_overlap=200,
)

Warning

If table_name already exists, ingest() raises a TableError. Use client.drop_table(table_name) first, or choose a different name.

Parameters

Parameter Type Required Description
table_name str yes Table name to create. Must not already exist.
data dict yes* Data to ingest as a dictionary (see data content types below). *Not required when format= is set.
format object no Format object with normalize() method. When set, data is ignored and the format object drives ingestion.
schema dict[str, str] no Explicit schema mapping column names to type strings. Use "FILE" for file path columns. Overrides auto-inference.
on_progress Callable no Callback called with (rows_written, total).
chunk_size int no Character count per text chunk. Default 1000.
chunk_overlap int no Overlap between consecutive text chunks (chars). Default 200.

Data content types:

Content Interpretation Example
Column with schema={"col": "FILE"} File paths to ingest {"path": ["video.mp4"]}, schema={"path": "FILE"}
{"_huggingface": "name"} HuggingFace dataset {"_huggingface": "cifar10"}
{col: [values]} Direct column data {"text": ["a", "b"], "score": [1, 2]}

Return value

{"table_name": "videos", "row_count": 150, "dataset_path": "/data/org_id/workspace_id/videos"}

Examples

client.ingest("security_footage", {
    "path": [
        "camera1_2025-01-15.mp4",
        "camera2_2025-01-15.mp4",
    ]
}, schema={"path": "FILE"})
# Creates ~10-second segments with thumbnails
client.ingest("documents", {
    "path": [
        "report.txt",
        "notes.md",
        "data.json",
    ]
}, schema={"path": "FILE"})
# Text chunked into ~1000 char pieces with 200 char overlap
client.ingest("photos", {
    "path": ["image1.jpg", "image2.png"]
}, schema={"path": "FILE"})
# Each image stored as single row
client.ingest("manuals", {
    "path": ["manual.pdf"]
}, schema={"path": "FILE"})
# Each page rendered at 300 DPI as PNG
client.ingest("vectors", {
    "id": ["doc1", "doc2", "doc3"],
    "text": ["Hello world", "Goodbye world", "Another doc"],
    "embedding": [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6], [0.7, 0.8, 0.9]],
})
# Schema inferred from data types
client.ingest("data", {
    "name": ["Alice", "Bob"],
    "age": [30, 25],
}, schema={
    "name": "TEXT",
    "age": "INT64",
})
client.ingest("mnist", {"_huggingface": "mnist"})
client.ingest("cifar", {"_huggingface": "cifar10"})
from deeplake.managed.formats import CocoPanoptic

client.ingest("panoptic", format=CocoPanoptic(
    images_dir="coco/train2017",
    masks_dir="coco/panoptic_train2017",
    annotations="coco/annotations/panoptic_train2017.json",
))
# Columns: image_id, image (IMAGE), mask (SEGMENT_MASK), width, height,
#          filename, segments_info (JSON), categories (JSON)
def progress(rows_written, total):
    print(f"Written {rows_written} rows...")

client.ingest("documents", {"path": pdf_files}, schema={"path": "FILE"},
              on_progress=progress)

Chunking Strategy

File Type Extensions Strategy Columns Created
Video .mp4, .mov, .avi, .mkv, .webm 10-second segments + thumbnails id, file_id, chunk_index, start_time, end_time, video_data, thumbnail, text
Image .jpg, .jpeg, .png, .gif, .bmp, .webp Single chunk id, file_id, image, filename, text
PDF .pdf Page-by-page at 300 DPI id, file_id, page_index, image, text
Text .txt, .md, .csv, .json, .xml, .html 1000-char chunks, 200 overlap id, file_id, chunk_index, text
Other * Single binary chunk id, file_id, data, filename

Custom Format Classes

You can create custom format classes for any dataset layout with inter-file relationships. A format object needs a normalize() method that yields dict[str, list] batches, and can optionally provide a schema() method to declare column types.

Type inference from Python values:

Python type Inferred schema type
bool BOOL
int INT64
float FLOAT64
str TEXT
bytes BINARY
list[float] EMBEDDING
from pathlib import Path

class CsvWithImages:
    """Format for a CSV file where one column contains image file paths."""

    def __init__(self, csv_path, images_dir):
        self.csv_path = Path(csv_path)
        self.images_dir = Path(images_dir)

    def schema(self):
        """Optional: declare column types for pg_deeplake domain mapping."""
        return {"image": "IMAGE"}

    def normalize(self):
        import csv

        with open(self.csv_path) as f:
            rows = list(csv.DictReader(f))

        batch = []
        for row in rows:
            img_path = self.images_dir / row["image_file"]
            if not img_path.exists():
                continue
            batch.append({
                "image": img_path.read_bytes(),
                "label": row["label"],
                "confidence": float(row["confidence"]),
            })
            if len(batch) >= 100:
                yield {k: [d[k] for d in batch] for k in batch[0]}
                batch = []
        if batch:
            yield {k: [d[k] for d in batch] for k in batch[0]}

# Usage
client.ingest("labeled_images", format=CsvWithImages(
    csv_path="annotations.csv",
    images_dir="images/",
))
from pathlib import Path

class ImageFolderWithEmbeddings:
    """Images in subfolders (subfolder name = label) + pre-computed embeddings."""

    def __init__(self, root_dir, embeddings_path):
        self.root_dir = Path(root_dir)
        self.embeddings_path = Path(embeddings_path)

    def normalize(self):
        import json

        with open(self.embeddings_path) as f:
            embeddings = json.load(f)

        batch = []
        for label_dir in sorted(self.root_dir.iterdir()):
            if not label_dir.is_dir():
                continue
            label = label_dir.name
            for img_path in sorted(label_dir.glob("*.jpg")):
                emb = embeddings.get(img_path.name)
                if emb is None:
                    continue
                batch.append({
                    "image": img_path.read_bytes(),
                    "label": label,
                    "embedding": emb,
                    "filename": img_path.name,
                })
                if len(batch) >= 100:
                    yield {k: [d[k] for d in batch] for k in batch[0]}
                    batch = []
        if batch:
            yield {k: [d[k] for d in batch] for k in batch[0]}

client.ingest("classified_images", format=ImageFolderWithEmbeddings(
    root_dir="imagenet/train",
    embeddings_path="embeddings.json",
))

Key rules for normalize():

  1. Yield dict[str, list] -- each dict maps column names to equal-length lists (one entry per row)
  2. Use consistent column names across all yielded batches
  3. Use native Python types -- int, float, str, bytes, bool, list[float] for embeddings
  4. Batch for memory -- yield batches of ~100-1000 rows, don't load entire dataset into one dict
  5. Validate in normalize() -- raise FileNotFoundError or ValueError early for missing files

Optional schema() method:

  • Return dict[str, str] mapping column names to type names (e.g. {"image": "IMAGE", "mask": "SEGMENT_MASK"})
  • These hints override type inference for columns where Python types are ambiguous (e.g. bytes could be BINARY, IMAGE, SEGMENT_MASK, MESH, etc.)
  • User-supplied schema to ingest() takes precedence over format schema hints
  • Available domain types: IMAGE, SEGMENT_MASK, BINARY_MASK, BOUNDING_BOX, CLASS_LABEL, POLYGON, POINT, MESH, MEDICAL

client.open_table()

Open a managed table as a deeplake.Dataset for direct access. Bypasses PostgreSQL and returns the native dataset object with built-in ML framework integration.

ds = client.open_table(table_name)

Parameters

Parameter Type Required Description
table_name str yes Table name (created via ingest()).

Batch Iteration

ds = client.open_table("videos")
for batch in ds.batches(32):
    train(batch)

DataLoader

from torch.utils.data import DataLoader

ds = client.open_table("training_data")
loader = DataLoader(ds.pytorch(), batch_size=32, shuffle=True, num_workers=4)

for batch in loader:
    images, labels = batch["image"], batch["label"]
    loss = model(images, labels)
    loss.backward()
import tensorflow as tf

ds = client.open_table("training_data")
tf_ds = ds.tensorflow().batch(32).prefetch(tf.data.AUTOTUNE)

model.fit(tf_ds, epochs=10)

Async Prefetch

import asyncio

ds = client.open_table("videos")

async def load_row(row):
    image, text = await asyncio.gather(
        row.get_async("image"),
        row.get_async("text"),
    )
    return image, text

TQL Filtering

ds = client.open_table("embeddings")
view = ds.query("SELECT * WHERE label = 'cat'")
for batch in view.batches(16):
    process(batch)
ds = client.open_table("images")
cats_only = ds.query("SELECT * WHERE category IN ('cat', 'kitten')")
loader = DataLoader(cats_only.pytorch(), batch_size=32)
ds = client.open_table("training_data")
view = ds.query("""
    SELECT * WHERE split = 'train'
    AND confidence > 0.9
    ORDER BY created_at DESC
    LIMIT 10000
""")
from torch.utils.data import DataLoader

ds = client.open_table("training_data")
filtered = ds.query("SELECT * WHERE label = 'dog' AND split = 'train'")
loader = DataLoader(filtered.pytorch(), batch_size=64, shuffle=True, num_workers=4)

for batch in loader:
    train_step(batch)

Querying

Fluent Query API

client.table(table) returns a chainable QueryBuilder. Call () or .execute() to run.

results = (
    client.table("videos")
        .select("id", "text", "start_time")
        .where("file_id = $1", "abc123")
        .order_by("start_time ASC")
        .limit(10)
)()

QueryBuilder Methods

Method Description Example
.select(*cols) Set columns (default *) .select("id", "text")
.where(cond, *params) Add WHERE (multiple = AND) .where("id = $1", "abc")
.order_by(clause) Add ORDER BY .order_by("created_at DESC")
.limit(n) Set LIMIT .limit(10)
.offset(n) Set OFFSET .offset(20)
.execute() or () Run query, return list[dict] .execute()

Examples

results = client.table("documents").select("id", "text").limit(100)()
results = (
    client.table("videos")
        .select("id", "text", "start_time")
        .where("file_id = $1", "abc")
        .where("start_time > 60")
        .order_by("start_time")
        .limit(10)
)()
all_rows = client.table("small_table")()
results = (
    client.table("documents")
        .select("id", "text")
        .where("text LIKE '%machine learning%'")
        .limit(50)
)()
page_size = 20
page = 3
results = (
    client.table("documents")
        .select("id", "text")
        .order_by("id ASC")
        .limit(page_size)
        .offset(page_size * (page - 1))
)()

client.query()

Execute raw SQL and return all results as a list of dictionaries.

rows = client.query(sql, params=None)

Parameters

Parameter Type Required Description
sql str yes SQL query string. Use $1, $2, ... placeholders for parameterized values.
params tuple no Parameter values corresponding to $1, $2, ... placeholders.

Examples

rows = client.query("SELECT * FROM videos LIMIT 10")
for row in rows:
    print(row["id"], row["text"])
file_id = "abc123"
rows = client.query("SELECT * FROM documents WHERE file_id = $1", (file_id,))
result = client.query("SELECT COUNT(*) as count FROM videos")
print(f"Total: {result[0]['count']} rows")
query_emb = model.encode("search query").tolist()
emb_pg = "{" + ",".join(str(x) for x in query_emb) + "}"
results = client.query("""
    SELECT id, text, embedding <#> $1::float4[] AS score
    FROM embeddings
    ORDER BY score DESC LIMIT 10
""", (emb_pg,))
query_text = "neural networks"
query_emb = model.encode(query_text).tolist()
emb_pg = "{" + ",".join(str(x) for x in query_emb) + "}"
results = client.query("""
    SELECT id, text,
           (embedding, text)::deeplake_hybrid_record <#>
           deeplake_hybrid_record($1::float4[], $2, 0.7, 0.3) AS score
    FROM documents
    ORDER BY score DESC LIMIT 10
""", (emb_pg, query_text))
file_id = "old-file-123"
client.query("DELETE FROM documents WHERE file_id = $1", (file_id,))
client.query("""UPDATE documents SET metadata = '{"reviewed": true}'::jsonb WHERE id = 42""")
title = "New doc"
content = "Content here"
rows = client.query("INSERT INTO documents (title, content) VALUES ($1, $2) RETURNING id", (title, content))
new_id = rows[0]["id"]

pg_deeplake SQL Reference

Once data is ingested, use these SQL features via client.query().

The <#> operator

The primary search operator. Behavior depends on the column type:

Left side Right side Behavior Sort
FLOAT4[] column FLOAT4[] literal Vector cosine similarity DESC
FLOAT4[][] column FLOAT4[][] literal MaxSim (multi-vector) DESC
TEXT column TEXT literal BM25 keyword score DESC
(FLOAT4[], TEXT) tuple deeplake_hybrid_record(...) Hybrid BM25 + vector DESC

Search examples

query_emb = model.encode("search query").tolist()
emb_pg = "{" + ",".join(str(x) for x in query_emb) + "}"
results = client.query("""
    SELECT id, text, embedding <#> $1::float4[] AS similarity
    FROM embeddings
    ORDER BY similarity DESC LIMIT 10
""", (emb_pg,))

```python search_text = "machine learning" results = client.query(""" SELECT id, text, text <#> $1 AS score FROM documents ORDER BY score DESC LIMIT 10 """, (search_text,))

SELECT * FROM documents
WHERE text LIKE '%important keyword%';
query_text = "neural networks"
query_emb = model.encode(query_text).tolist()
emb_pg = "{" + ",".join(str(x) for x in query_emb) + "}"
results = client.query("""
    SELECT id, text,
           (embedding, text)::deeplake_hybrid_record <#>
           deeplake_hybrid_record($1::float4[], $2, 0.7, 0.3) AS score
    FROM documents
    ORDER BY score DESC LIMIT 10
""", (emb_pg, query_text))

Index creation

CREATE INDEX ON documents USING deeplake_index (embedding);
CREATE INDEX ON documents USING deeplake_index (text)
WITH (index_type = 'bm25');
CREATE INDEX ON documents USING deeplake_index (category)
WITH (index_type = 'exact_text');

Table Management

client.list_tables()

tables = client.list_tables()  # -> list[str]

client.drop_table()

client.drop_table(table_name, if_exists=True)

Parameters

Parameter Type Required Description
table_name str yes Table to drop.
if_exists bool no If True, no error when table doesn't exist. Default True.

Examples

client.drop_table("old_data")
client.drop_table("old_data")
remaining = client.list_tables()
print(f"Remaining tables: {remaining}")

Error Handling

from deeplake.managed import (
    ManagedServiceError,  # Base class for all errors
    AuthError,            # Token invalid/expired
    CredentialsError,     # DB credentials fetch failed
    IngestError,          # File ingestion failed
    TableError,           # Table operation failed
    TokenError,           # Token parsing failed (subclass of AuthError)
    WorkspaceError,       # Workspace not found or inaccessible
)
from deeplake import Client
from deeplake.managed import IngestError, TableError, AuthError

try:
    client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")
    client.ingest("videos", {"path": ["video.mp4"]}, schema={"path": "FILE"})
except AuthError as e:
    print(f"Authentication failed: {e}")
except IngestError as e:
    print(f"Ingestion failed: {e}")
except TableError as e:
    print(f"Table operation failed: {e}")
except ManagedServiceError as e:
    print(f"General error: {e}")

Common errors

Error Cause Solution
AuthError: Token required No token provided Pass token= to Client()
AuthError: Token does not contain org_id Token missing OrgID claim Ensure token has OrgID claim or API /me endpoint is accessible
ManagedServiceError: workspace_id required No workspace Pass workspace_id= to Client()
IngestError: File not found Invalid file path Check file exists at given path
IngestError: ffmpeg not found ffmpeg not installed sudo apt-get install ffmpeg
TableError: create_deeplake_table failed pg_deeplake issue Check PostgreSQL has extension installed
TableError: Table already exists Table name taken Use drop_table() first or choose a different name

Agent Decision Trees

How to Initialize Client

Need to create a Client
├─ Have API token?
│   └─ Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="ws")
└─ Need custom API URL?
    └─ Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="ws",
              api_url="http://custom:8080")

How to Ingest Data

User wants to ingest data
├─ Is it local files? → use FILE schema
│   ├─ Single file
│   │   └─ client.ingest("table", {"path": ["file.ext"]},
│   │          schema={"path": "FILE"})
│   └─ Multiple files
│       └─ client.ingest("table", {"path": ["f1.mp4", "f2.mp4"]},
│              schema={"path": "FILE"})
├─ Is it structured data (dict/lists)? → pass a dict directly
│   └─ client.ingest("table", {
│          "col1": [val1, val2, ...],
│          "col2": [val1, val2, ...],
│      })
├─ Is it a HuggingFace dataset? → use _huggingface key
│   └─ client.ingest("table", {"_huggingface": "dataset_name"})
├─ Is it COCO panoptic data? → use format object
│   └─ from deeplake.managed.formats import CocoPanoptic
│      client.ingest("table", format=CocoPanoptic(
│          images_dir="...", masks_dir="...",
│          annotations="..."
│      ))
├─ Need custom chunking for text?
│   └─ client.ingest("table", {"path": ["doc.txt"]},
│          schema={"path": "FILE"},
│          chunk_size=500, chunk_overlap=100)
└─ Need explicit schema?
    └─ client.ingest("table", {...}, schema={
           "name": "TEXT",
           "count": "INT64",
           "vector": "EMBEDDING",
       })

How to Query Data

User wants to query data
├─ Simple SELECT (small result)?
│   ├─ Fluent: client.table("table").select("id", "text").limit(100)()
│   └─ Raw:    client.query("SELECT * FROM table LIMIT 100")
├─ Large result set (streaming)?
│   └─ Use client.open_table("table") for direct dataset access
│      with batch iteration, PyTorch/TF DataLoaders, etc.
├─ Need semantic/vector search?
│   └─ emb_pg = "{" + ",".join(str(x) for x in query_embedding) + "}"
│      client.query("SELECT *, embedding <#> $1::float4[] AS score
│          FROM table ORDER BY score DESC LIMIT 10", (emb_pg,))
├─ Need text search?
│   └─ client.query("SELECT *, text <#> $1 AS score
│          FROM table ORDER BY score DESC LIMIT 10", (keyword,))
└─ Need hybrid search (vector + text)?
    └─ emb_pg = "{" + ",".join(str(x) for x in query_emb) + "}"
       client.query("SELECT *, (embedding, text)::deeplake_hybrid_record <#>
           deeplake_hybrid_record($1::float4[], $2, 0.7, 0.3) AS score
           FROM table ORDER BY score DESC LIMIT 10", (emb_pg, search_text))

User Wants to Train on Data

User wants to train / iterate over data
├─ Need PyTorch DataLoader?
│   └─ ds = client.open_table("table")
│      loader = DataLoader(ds.pytorch(), batch_size=32, shuffle=True)
├─ Need TensorFlow tf.data?
│   └─ ds = client.open_table("table")
│      tf_ds = ds.tensorflow().batch(32).prefetch(AUTOTUNE)
├─ Need simple batch iteration?
│   └─ ds = client.open_table("table")
│      for batch in ds.batches(32): ...
└─ Need async prefetch per row?
    └─ ds = client.open_table("table")
       await row.get_async("image")

Error Recovery

Operation failed with error
├─ AuthError?
│   ├─ "Token required" → Pass token= to Client()
│   ├─ "Token does not contain org_id" → Ensure token has OrgID claim
│   └─ "Token expired" → Get new token
├─ IngestError?
│   ├─ "data must be a dict" → Pass a dict, not list/str/int
│   ├─ "data must not be empty" → Dict must have at least one key
│   ├─ "File not found" → Check file path exists
│   └─ "ffmpeg not found" → Install ffmpeg for video processing
├─ TableError?
│   ├─ "create_deeplake_table failed" → Check pg_deeplake extension
│   └─ "Table already exists" → Use drop_table() first or different name
└─ ManagedServiceError?
    └─ Check API server is running at the configured api_url

Complete Workflow Examples

import os
from deeplake import Client

client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")

# Ingest video files (FILE schema)
result = client.ingest("security_videos", {
    "path": ["/path/to/camera1.mp4", "/path/to/camera2.mp4"],
}, schema={"path": "FILE"})
print(f"Ingested {result['row_count']} video segments")

# Fluent query for segments
segments = (
    client.table("security_videos")
        .select("id", "file_id", "start_time", "end_time", "text")
        .where("start_time > 60")
        .limit(10)
)()

for seg in segments:
    print(f"Segment {seg['id']}: {seg['start_time']}s - {seg['end_time']}s")

Workflow 2: Build Semantic Search Index

import os
from deeplake import Client
from sentence_transformers import SentenceTransformer

model = SentenceTransformer("all-MiniLM-L6-v2")
client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")

# Prepare documents with embeddings
documents = ["Doc about AI", "Doc about ML", "Doc about databases"]
embeddings = model.encode(documents).tolist()

# Ingest
client.ingest("search_index", {
    "text": documents,
    "embedding": embeddings,
})

# Search
query_emb = model.encode("artificial intelligence").tolist()
emb_pg = "{" + ",".join(str(x) for x in query_emb) + "}"
results = client.query("""
    SELECT text, embedding <#> $1::float4[] AS similarity
    FROM search_index
    ORDER BY similarity DESC
    LIMIT 5
""", (emb_pg,))

for r in results:
    print(f"{r['similarity']:.3f}: {r['text']}")

Workflow 3: Process PDF Documents

import os
from deeplake import Client

client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")

# Ingest PDFs (each page becomes a row)
result = client.ingest("manuals", {
    "path": ["/path/to/manual1.pdf", "/path/to/manual2.pdf"],
}, schema={"path": "FILE"})
print(f"Processed {result['row_count']} pages")

# Search within PDFs
pages = client.query("""
    SELECT file_id, page_index, text
    FROM manuals
    WHERE text LIKE '%installation%'
""")

for page in pages:
    print(f"Found in file {page['file_id']}, page {page['page_index']}")

Workflow 4: Train a PyTorch Model

import os
from torch.utils.data import DataLoader
from deeplake import Client

client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")

# Ingest from HuggingFace
client.ingest("cifar10", {"_huggingface": "cifar10"})

# Stream directly into PyTorch
ds = client.open_table("cifar10")
loader = DataLoader(ds.pytorch(), batch_size=64, shuffle=True, num_workers=4)

for batch in loader:
    images, labels = batch["img"], batch["label"]
    # training_step(images, labels)

Workflow 5: Ingest COCO Panoptic and Query

import os
import json
from deeplake import Client
from deeplake.managed.formats import CocoPanoptic

client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")

# Ingest panoptic dataset using format object
result = client.ingest("panoptic_train", format=CocoPanoptic(
    images_dir="/data/coco/train2017",
    masks_dir="/data/coco/panoptic_train2017",
    annotations="/data/coco/annotations/panoptic_train2017.json",
))
print(f"Ingested {result['row_count']} images")

# Query for images
rows = client.query("""
    SELECT image_id, filename, segments_info
    FROM panoptic_train
    LIMIT 10
""")

for row in rows:
    segments = json.loads(row["segments_info"])
    print(f"Image {row['filename']}: {len(segments)} segments")

Data Types

Schema Type Python Type Postgres Type Example
TEXT str text "hello"
INT32 int integer 42
INT64 int bigint 9999999999
FLOAT32 float real 3.14
FLOAT64 float double precision 3.14159265359
BOOL bool boolean True
BINARY bytes bytea b"\x00\x01"
IMAGE bytes IMAGE (bytea) Image binary data
VIDEO bytes bytea Video binary data
EMBEDDING list[float] float4[] [0.1, 0.2, 0.3]
SEGMENT_MASK bytes SEGMENT_MASK (bytea) Segmentation mask data
BINARY_MASK bytes BINARY_MASK (bytea) Binary mask data
BOUNDING_BOX list[float] BOUNDING_BOX (float4[]) [x, y, w, h]
CLASS_LABEL int CLASS_LABEL (int4) Label index
POLYGON bytes DEEPLAKE_POLYGON (bytea) Polygon coordinates
POINT list[float] DEEPLAKE_POINT (float4[]) [1.0, 2.0]
MESH bytes MESH (bytea) 3D mesh data (PLY, STL)
MEDICAL bytes MEDICAL (bytea) Medical imaging (DICOM)
FILE str (path) N/A (processed) "/path/to/file.mp4"

Schema inference

bool -> BOOL - int -> INT64 - float -> FLOAT64 - bytes -> BINARY - str -> TEXT - list[float] -> EMBEDDING (size auto-detected)

Domain types

IMAGE, SEGMENT_MASK, BINARY_MASK, BOUNDING_BOX, CLASS_LABEL, POLYGON, POINT, MESH, and MEDICAL are PostgreSQL domain types defined by pg_deeplake. They behave like their base types but carry semantic meaning for visualization and type-aware processing.

FILE

FILE is a schema directive, not a storage type. Columns marked as FILE are treated as file paths during ingestion -- the files are processed (chunked, etc.) and the resulting data is stored in generated columns. The FILE column itself is not stored.


Performance Tuning

Buffered writes: Rows are accumulated in a memory buffer and flushed in larger batches, reducing Python-to-C++ FFI overhead.

Periodic commits: ds.commit() is called every 2000 rows to free memory and enable crash recovery.

Parallel file normalization: File processing (ffmpeg, PyMuPDF, file I/O) runs in a thread pool (up to 4 workers).

Storage concurrency: The SDK sets deeplake.storage.set_concurrency(32) during ingestion to parallelize S3/GCS chunk uploads.

Parameter Default Description
flush_every 200 Rows buffered before ds.append()
commit_every 2000 Rows between ds.commit() calls
Normalization workers 4 Max threads for file processing
Storage concurrency 32 Parallel storage I/O operations

Limits

Resource Limit
Video chunk duration 10 seconds
Text chunk size (default) 1000 characters
Text chunk overlap (default) 200 characters
PDF rendering resolution 300 DPI
Batch size (data ingest) 1000 rows
Batch iteration size (default) 100 rows
Write buffer (flush_every) 200 rows
Commit interval 2000 rows
File normalization workers 4 threads
Storage I/O concurrency 32 operations

Troubleshooting

Token does not contain org_id

Ensure your token contains an OrgID claim, or that the API /me endpoint is accessible as fallback.

ffmpeg not found (video processing)
sudo apt-get install ffmpeg
fitz not found (PDF processing)
pip install pymupdf
Connection refused to API
curl https://api.deeplake.ai/health