Agent Skills Reference¶
Agent-friendly SDK for ingesting data into Deeplake managed tables. Use this skill when users want to store, ingest, or query data in Deeplake.
Prerequisites¶
Required services:
- Deeplake API server running (default:
https://api.deeplake.ai) - PostgreSQL with
pg_deeplakeextension installed
Optional dependencies (per file type):
- Video ingestion:
ffmpeg(sudo apt-get install ffmpeg) - PDF ingestion:
pymupdf(pip install pymupdf)
Python import:
import os
from deeplake import Client
from deeplake.managed import ManagedServiceError, AuthError, IngestError, TableError, TokenError, WorkspaceError
Quick Reference¶
import os
from deeplake import Client
# Initialize -- token + workspace (org_id always extracted from token)
client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")
# Ingest files (FILE schema)
client.ingest("videos", {"path": ["video1.mp4", "video2.mp4"]}, schema={"path": "FILE"})
client.ingest("images", {"path": ["img1.jpg", "img2.png"]}, schema={"path": "FILE"})
client.ingest("docs", {"path": ["document.pdf", "notes.txt"]}, schema={"path": "FILE"})
# Ingest data directly (dict = column data)
client.ingest("embeddings", {
"text": ["doc1", "doc2", "doc3"],
"embedding": [[0.1, 0.2, ...], [0.3, 0.4, ...], [0.5, 0.6, ...]],
"label": ["A", "B", "C"],
})
# Ingest from HuggingFace
client.ingest("cifar", {"_huggingface": "cifar10"})
# Ingest COCO panoptic dataset (format object)
from deeplake.managed.formats import CocoPanoptic
client.ingest("panoptic", format=CocoPanoptic(
images_dir="coco/train2017",
masks_dir="coco/panoptic_train2017",
annotations="coco/annotations/panoptic_train2017.json",
))
# Fluent query API
results = client.table("videos").select("id", "text").limit(10)()
# Fluent query with filters
results = (
client.table("videos")
.select("id", "text", "start_time")
.where("file_id = $1", "abc")
.where("start_time > 60")
.order_by("start_time")
.limit(10)
)()
# Raw SQL still works
results = client.query("SELECT * FROM videos LIMIT 10")
# Vector similarity search (raw SQL)
emb_pg = "{" + ",".join(str(x) for x in query_embedding) + "}"
results = client.query("""
SELECT id, text, embedding <#> $1::float4[] AS similarity
FROM embeddings
ORDER BY similarity DESC
LIMIT 10
""", (emb_pg,))
Architecture¶
Client(token, workspace_id)
├─ .ingest(table, data) → creates table, returns {table_name, row_count}
├─ .query(sql) → list[dict]
├─ .table(table)...() → fluent SQL builder → list[dict]
├─ .open_table(table) → deeplake.Dataset (for ML training)
├─ .list_tables() → list[str]
└─ .drop_table(table) → None
│
▼
PostgreSQL + pg_deeplake
- Vector similarity: embedding <#> query_vec
- BM25 text search: text <#> 'search query'
- Hybrid search: (embedding, text)::deeplake_hybrid_record
Client Initialization¶
from deeplake import Client
# client = Client(token, workspace_id, *, api_url=None)
client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")
Parameters¶
| Parameter | Type | Required | Description |
|---|---|---|---|
token |
str |
yes | API token. org_id is extracted automatically from JWT claims. |
workspace_id |
str |
yes | Target workspace name. |
api_url |
str |
no | API base URL. Falls back to DEEPLAKE_API_URL env var, then https://api.deeplake.ai. |
Examples¶
Note
org_id is always extracted from the token JWT claims. If the token doesn't contain an OrgID claim, the client falls back to the /me API endpoint. Dataset root path is fetched from the PostgreSQL GUC deeplake.root_path.
client.ingest()¶
# client.ingest(table_name, data, *, schema=None, on_progress=None, chunk_size=1000, chunk_overlap=200)
result = client.ingest(
table_name,
data={"text": ["hello"]},
schema=None,
on_progress=None,
chunk_size=1000,
chunk_overlap=200,
)
Warning
If table_name already exists, ingest() raises a TableError. Use client.drop_table(table_name) first, or choose a different name.
Parameters¶
| Parameter | Type | Required | Description |
|---|---|---|---|
table_name |
str |
yes | Table name to create. Must not already exist. |
data |
dict |
yes* | Data to ingest as a dictionary (see data content types below). *Not required when format= is set. |
format |
object |
no | Format object with normalize() method. When set, data is ignored and the format object drives ingestion. |
schema |
dict[str, str] |
no | Explicit schema mapping column names to type strings. Use "FILE" for file path columns. Overrides auto-inference. |
on_progress |
Callable |
no | Callback called with (rows_written, total). |
chunk_size |
int |
no | Character count per text chunk. Default 1000. |
chunk_overlap |
int |
no | Overlap between consecutive text chunks (chars). Default 200. |
Data content types:
| Content | Interpretation | Example |
|---|---|---|
Column with schema={"col": "FILE"} |
File paths to ingest | {"path": ["video.mp4"]}, schema={"path": "FILE"} |
{"_huggingface": "name"} |
HuggingFace dataset | {"_huggingface": "cifar10"} |
{col: [values]} |
Direct column data | {"text": ["a", "b"], "score": [1, 2]} |
Return value¶
Examples¶
from deeplake.managed.formats import CocoPanoptic
client.ingest("panoptic", format=CocoPanoptic(
images_dir="coco/train2017",
masks_dir="coco/panoptic_train2017",
annotations="coco/annotations/panoptic_train2017.json",
))
# Columns: image_id, image (IMAGE), mask (SEGMENT_MASK), width, height,
# filename, segments_info (JSON), categories (JSON)
Chunking Strategy¶
| File Type | Extensions | Strategy | Columns Created |
|---|---|---|---|
| Video | .mp4, .mov, .avi, .mkv, .webm |
10-second segments + thumbnails | id, file_id, chunk_index, start_time, end_time, video_data, thumbnail, text |
| Image | .jpg, .jpeg, .png, .gif, .bmp, .webp |
Single chunk | id, file_id, image, filename, text |
.pdf |
Page-by-page at 300 DPI | id, file_id, page_index, image, text |
|
| Text | .txt, .md, .csv, .json, .xml, .html |
1000-char chunks, 200 overlap | id, file_id, chunk_index, text |
| Other | * |
Single binary chunk | id, file_id, data, filename |
Custom Format Classes¶
You can create custom format classes for any dataset layout with inter-file relationships. A format object needs a normalize() method that yields dict[str, list] batches, and can optionally provide a schema() method to declare column types.
Type inference from Python values:
| Python type | Inferred schema type |
|---|---|
bool |
BOOL |
int |
INT64 |
float |
FLOAT64 |
str |
TEXT |
bytes |
BINARY |
list[float] |
EMBEDDING |
from pathlib import Path
class CsvWithImages:
"""Format for a CSV file where one column contains image file paths."""
def __init__(self, csv_path, images_dir):
self.csv_path = Path(csv_path)
self.images_dir = Path(images_dir)
def schema(self):
"""Optional: declare column types for pg_deeplake domain mapping."""
return {"image": "IMAGE"}
def normalize(self):
import csv
with open(self.csv_path) as f:
rows = list(csv.DictReader(f))
batch = []
for row in rows:
img_path = self.images_dir / row["image_file"]
if not img_path.exists():
continue
batch.append({
"image": img_path.read_bytes(),
"label": row["label"],
"confidence": float(row["confidence"]),
})
if len(batch) >= 100:
yield {k: [d[k] for d in batch] for k in batch[0]}
batch = []
if batch:
yield {k: [d[k] for d in batch] for k in batch[0]}
# Usage
client.ingest("labeled_images", format=CsvWithImages(
csv_path="annotations.csv",
images_dir="images/",
))
from pathlib import Path
class ImageFolderWithEmbeddings:
"""Images in subfolders (subfolder name = label) + pre-computed embeddings."""
def __init__(self, root_dir, embeddings_path):
self.root_dir = Path(root_dir)
self.embeddings_path = Path(embeddings_path)
def normalize(self):
import json
with open(self.embeddings_path) as f:
embeddings = json.load(f)
batch = []
for label_dir in sorted(self.root_dir.iterdir()):
if not label_dir.is_dir():
continue
label = label_dir.name
for img_path in sorted(label_dir.glob("*.jpg")):
emb = embeddings.get(img_path.name)
if emb is None:
continue
batch.append({
"image": img_path.read_bytes(),
"label": label,
"embedding": emb,
"filename": img_path.name,
})
if len(batch) >= 100:
yield {k: [d[k] for d in batch] for k in batch[0]}
batch = []
if batch:
yield {k: [d[k] for d in batch] for k in batch[0]}
client.ingest("classified_images", format=ImageFolderWithEmbeddings(
root_dir="imagenet/train",
embeddings_path="embeddings.json",
))
Key rules for normalize():
- Yield
dict[str, list]-- each dict maps column names to equal-length lists (one entry per row) - Use consistent column names across all yielded batches
- Use native Python types --
int,float,str,bytes,bool,list[float]for embeddings - Batch for memory -- yield batches of ~100-1000 rows, don't load entire dataset into one dict
- Validate in
normalize()-- raiseFileNotFoundErrororValueErrorearly for missing files
Optional schema() method:
- Return
dict[str, str]mapping column names to type names (e.g.{"image": "IMAGE", "mask": "SEGMENT_MASK"}) - These hints override type inference for columns where Python types are ambiguous (e.g.
bytescould beBINARY,IMAGE,SEGMENT_MASK,MESH, etc.) - User-supplied schema to
ingest()takes precedence over format schema hints - Available domain types:
IMAGE,SEGMENT_MASK,BINARY_MASK,BOUNDING_BOX,CLASS_LABEL,POLYGON,POINT,MESH,MEDICAL
client.open_table()¶
Open a managed table as a deeplake.Dataset for direct access. Bypasses PostgreSQL and returns the native dataset object with built-in ML framework integration.
Parameters¶
| Parameter | Type | Required | Description |
|---|---|---|---|
table_name |
str |
yes | Table name (created via ingest()). |
Batch Iteration¶
DataLoader¶
Async Prefetch¶
import asyncio
ds = client.open_table("videos")
async def load_row(row):
image, text = await asyncio.gather(
row.get_async("image"),
row.get_async("text"),
)
return image, text
TQL Filtering¶
Querying¶
Fluent Query API¶
client.table(table) returns a chainable QueryBuilder. Call () or .execute() to run.
results = (
client.table("videos")
.select("id", "text", "start_time")
.where("file_id = $1", "abc123")
.order_by("start_time ASC")
.limit(10)
)()
QueryBuilder Methods¶
| Method | Description | Example |
|---|---|---|
.select(*cols) |
Set columns (default *) |
.select("id", "text") |
.where(cond, *params) |
Add WHERE (multiple = AND) | .where("id = $1", "abc") |
.order_by(clause) |
Add ORDER BY | .order_by("created_at DESC") |
.limit(n) |
Set LIMIT | .limit(10) |
.offset(n) |
Set OFFSET | .offset(20) |
.execute() or () |
Run query, return list[dict] |
.execute() |
Examples¶
client.query()¶
Execute raw SQL and return all results as a list of dictionaries.
Parameters¶
| Parameter | Type | Required | Description |
|---|---|---|---|
sql |
str |
yes | SQL query string. Use $1, $2, ... placeholders for parameterized values. |
params |
tuple |
no | Parameter values corresponding to $1, $2, ... placeholders. |
Examples¶
query_text = "neural networks"
query_emb = model.encode(query_text).tolist()
emb_pg = "{" + ",".join(str(x) for x in query_emb) + "}"
results = client.query("""
SELECT id, text,
(embedding, text)::deeplake_hybrid_record <#>
deeplake_hybrid_record($1::float4[], $2, 0.7, 0.3) AS score
FROM documents
ORDER BY score DESC LIMIT 10
""", (emb_pg, query_text))
pg_deeplake SQL Reference¶
Once data is ingested, use these SQL features via client.query().
The <#> operator¶
The primary search operator. Behavior depends on the column type:
| Left side | Right side | Behavior | Sort |
|---|---|---|---|
FLOAT4[] column |
FLOAT4[] literal |
Vector cosine similarity | DESC |
FLOAT4[][] column |
FLOAT4[][] literal |
MaxSim (multi-vector) | DESC |
TEXT column |
TEXT literal |
BM25 keyword score | DESC |
(FLOAT4[], TEXT) tuple |
deeplake_hybrid_record(...) |
Hybrid BM25 + vector | DESC |
Search examples¶
```python search_text = "machine learning" results = client.query(""" SELECT id, text, text <#> $1 AS score FROM documents ORDER BY score DESC LIMIT 10 """, (search_text,))
query_text = "neural networks"
query_emb = model.encode(query_text).tolist()
emb_pg = "{" + ",".join(str(x) for x in query_emb) + "}"
results = client.query("""
SELECT id, text,
(embedding, text)::deeplake_hybrid_record <#>
deeplake_hybrid_record($1::float4[], $2, 0.7, 0.3) AS score
FROM documents
ORDER BY score DESC LIMIT 10
""", (emb_pg, query_text))
Index creation¶
Table Management¶
client.list_tables()¶
client.drop_table()¶
Parameters¶
| Parameter | Type | Required | Description |
|---|---|---|---|
table_name |
str |
yes | Table to drop. |
if_exists |
bool |
no | If True, no error when table doesn't exist. Default True. |
Examples¶
Error Handling¶
from deeplake.managed import (
ManagedServiceError, # Base class for all errors
AuthError, # Token invalid/expired
CredentialsError, # DB credentials fetch failed
IngestError, # File ingestion failed
TableError, # Table operation failed
TokenError, # Token parsing failed (subclass of AuthError)
WorkspaceError, # Workspace not found or inaccessible
)
from deeplake import Client
from deeplake.managed import IngestError, TableError, AuthError
try:
client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")
client.ingest("videos", {"path": ["video.mp4"]}, schema={"path": "FILE"})
except AuthError as e:
print(f"Authentication failed: {e}")
except IngestError as e:
print(f"Ingestion failed: {e}")
except TableError as e:
print(f"Table operation failed: {e}")
except ManagedServiceError as e:
print(f"General error: {e}")
Common errors¶
| Error | Cause | Solution |
|---|---|---|
AuthError: Token required |
No token provided | Pass token= to Client() |
AuthError: Token does not contain org_id |
Token missing OrgID claim | Ensure token has OrgID claim or API /me endpoint is accessible |
ManagedServiceError: workspace_id required |
No workspace | Pass workspace_id= to Client() |
IngestError: File not found |
Invalid file path | Check file exists at given path |
IngestError: ffmpeg not found |
ffmpeg not installed | sudo apt-get install ffmpeg |
TableError: create_deeplake_table failed |
pg_deeplake issue | Check PostgreSQL has extension installed |
TableError: Table already exists |
Table name taken | Use drop_table() first or choose a different name |
Agent Decision Trees¶
How to Initialize Client¶
Need to create a Client
│
├─ Have API token?
│ └─ Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="ws")
│
└─ Need custom API URL?
└─ Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="ws",
api_url="http://custom:8080")
How to Ingest Data¶
User wants to ingest data
│
├─ Is it local files? → use FILE schema
│ ├─ Single file
│ │ └─ client.ingest("table", {"path": ["file.ext"]},
│ │ schema={"path": "FILE"})
│ └─ Multiple files
│ └─ client.ingest("table", {"path": ["f1.mp4", "f2.mp4"]},
│ schema={"path": "FILE"})
│
├─ Is it structured data (dict/lists)? → pass a dict directly
│ └─ client.ingest("table", {
│ "col1": [val1, val2, ...],
│ "col2": [val1, val2, ...],
│ })
│
├─ Is it a HuggingFace dataset? → use _huggingface key
│ └─ client.ingest("table", {"_huggingface": "dataset_name"})
│
├─ Is it COCO panoptic data? → use format object
│ └─ from deeplake.managed.formats import CocoPanoptic
│ client.ingest("table", format=CocoPanoptic(
│ images_dir="...", masks_dir="...",
│ annotations="..."
│ ))
│
├─ Need custom chunking for text?
│ └─ client.ingest("table", {"path": ["doc.txt"]},
│ schema={"path": "FILE"},
│ chunk_size=500, chunk_overlap=100)
│
└─ Need explicit schema?
└─ client.ingest("table", {...}, schema={
"name": "TEXT",
"count": "INT64",
"vector": "EMBEDDING",
})
How to Query Data¶
User wants to query data
│
├─ Simple SELECT (small result)?
│ ├─ Fluent: client.table("table").select("id", "text").limit(100)()
│ └─ Raw: client.query("SELECT * FROM table LIMIT 100")
│
├─ Large result set (streaming)?
│ └─ Use client.open_table("table") for direct dataset access
│ with batch iteration, PyTorch/TF DataLoaders, etc.
│
├─ Need semantic/vector search?
│ └─ emb_pg = "{" + ",".join(str(x) for x in query_embedding) + "}"
│ client.query("SELECT *, embedding <#> $1::float4[] AS score
│ FROM table ORDER BY score DESC LIMIT 10", (emb_pg,))
│
├─ Need text search?
│ └─ client.query("SELECT *, text <#> $1 AS score
│ FROM table ORDER BY score DESC LIMIT 10", (keyword,))
│
└─ Need hybrid search (vector + text)?
└─ emb_pg = "{" + ",".join(str(x) for x in query_emb) + "}"
client.query("SELECT *, (embedding, text)::deeplake_hybrid_record <#>
deeplake_hybrid_record($1::float4[], $2, 0.7, 0.3) AS score
FROM table ORDER BY score DESC LIMIT 10", (emb_pg, search_text))
User Wants to Train on Data¶
User wants to train / iterate over data
│
├─ Need PyTorch DataLoader?
│ └─ ds = client.open_table("table")
│ loader = DataLoader(ds.pytorch(), batch_size=32, shuffle=True)
│
├─ Need TensorFlow tf.data?
│ └─ ds = client.open_table("table")
│ tf_ds = ds.tensorflow().batch(32).prefetch(AUTOTUNE)
│
├─ Need simple batch iteration?
│ └─ ds = client.open_table("table")
│ for batch in ds.batches(32): ...
│
└─ Need async prefetch per row?
└─ ds = client.open_table("table")
await row.get_async("image")
Error Recovery¶
Operation failed with error
│
├─ AuthError?
│ ├─ "Token required" → Pass token= to Client()
│ ├─ "Token does not contain org_id" → Ensure token has OrgID claim
│ └─ "Token expired" → Get new token
│
├─ IngestError?
│ ├─ "data must be a dict" → Pass a dict, not list/str/int
│ ├─ "data must not be empty" → Dict must have at least one key
│ ├─ "File not found" → Check file path exists
│ └─ "ffmpeg not found" → Install ffmpeg for video processing
│
├─ TableError?
│ ├─ "create_deeplake_table failed" → Check pg_deeplake extension
│ └─ "Table already exists" → Use drop_table() first or different name
│
└─ ManagedServiceError?
└─ Check API server is running at the configured api_url
Complete Workflow Examples¶
Workflow 1: Ingest Videos and Search¶
import os
from deeplake import Client
client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")
# Ingest video files (FILE schema)
result = client.ingest("security_videos", {
"path": ["/path/to/camera1.mp4", "/path/to/camera2.mp4"],
}, schema={"path": "FILE"})
print(f"Ingested {result['row_count']} video segments")
# Fluent query for segments
segments = (
client.table("security_videos")
.select("id", "file_id", "start_time", "end_time", "text")
.where("start_time > 60")
.limit(10)
)()
for seg in segments:
print(f"Segment {seg['id']}: {seg['start_time']}s - {seg['end_time']}s")
Workflow 2: Build Semantic Search Index¶
import os
from deeplake import Client
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")
# Prepare documents with embeddings
documents = ["Doc about AI", "Doc about ML", "Doc about databases"]
embeddings = model.encode(documents).tolist()
# Ingest
client.ingest("search_index", {
"text": documents,
"embedding": embeddings,
})
# Search
query_emb = model.encode("artificial intelligence").tolist()
emb_pg = "{" + ",".join(str(x) for x in query_emb) + "}"
results = client.query("""
SELECT text, embedding <#> $1::float4[] AS similarity
FROM search_index
ORDER BY similarity DESC
LIMIT 5
""", (emb_pg,))
for r in results:
print(f"{r['similarity']:.3f}: {r['text']}")
Workflow 3: Process PDF Documents¶
import os
from deeplake import Client
client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")
# Ingest PDFs (each page becomes a row)
result = client.ingest("manuals", {
"path": ["/path/to/manual1.pdf", "/path/to/manual2.pdf"],
}, schema={"path": "FILE"})
print(f"Processed {result['row_count']} pages")
# Search within PDFs
pages = client.query("""
SELECT file_id, page_index, text
FROM manuals
WHERE text LIKE '%installation%'
""")
for page in pages:
print(f"Found in file {page['file_id']}, page {page['page_index']}")
Workflow 4: Train a PyTorch Model¶
import os
from torch.utils.data import DataLoader
from deeplake import Client
client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")
# Ingest from HuggingFace
client.ingest("cifar10", {"_huggingface": "cifar10"})
# Stream directly into PyTorch
ds = client.open_table("cifar10")
loader = DataLoader(ds.pytorch(), batch_size=64, shuffle=True, num_workers=4)
for batch in loader:
images, labels = batch["img"], batch["label"]
# training_step(images, labels)
Workflow 5: Ingest COCO Panoptic and Query¶
import os
import json
from deeplake import Client
from deeplake.managed.formats import CocoPanoptic
client = Client(token=os.environ["DEEPLAKE_API_KEY"], workspace_id="my-workspace")
# Ingest panoptic dataset using format object
result = client.ingest("panoptic_train", format=CocoPanoptic(
images_dir="/data/coco/train2017",
masks_dir="/data/coco/panoptic_train2017",
annotations="/data/coco/annotations/panoptic_train2017.json",
))
print(f"Ingested {result['row_count']} images")
# Query for images
rows = client.query("""
SELECT image_id, filename, segments_info
FROM panoptic_train
LIMIT 10
""")
for row in rows:
segments = json.loads(row["segments_info"])
print(f"Image {row['filename']}: {len(segments)} segments")
Data Types¶
| Schema Type | Python Type | Postgres Type | Example |
|---|---|---|---|
TEXT |
str |
text |
"hello" |
INT32 |
int |
integer |
42 |
INT64 |
int |
bigint |
9999999999 |
FLOAT32 |
float |
real |
3.14 |
FLOAT64 |
float |
double precision |
3.14159265359 |
BOOL |
bool |
boolean |
True |
BINARY |
bytes |
bytea |
b"\x00\x01" |
IMAGE |
bytes |
IMAGE (bytea) |
Image binary data |
VIDEO |
bytes |
bytea |
Video binary data |
EMBEDDING |
list[float] |
float4[] |
[0.1, 0.2, 0.3] |
SEGMENT_MASK |
bytes |
SEGMENT_MASK (bytea) |
Segmentation mask data |
BINARY_MASK |
bytes |
BINARY_MASK (bytea) |
Binary mask data |
BOUNDING_BOX |
list[float] |
BOUNDING_BOX (float4[]) |
[x, y, w, h] |
CLASS_LABEL |
int |
CLASS_LABEL (int4) |
Label index |
POLYGON |
bytes |
DEEPLAKE_POLYGON (bytea) |
Polygon coordinates |
POINT |
list[float] |
DEEPLAKE_POINT (float4[]) |
[1.0, 2.0] |
MESH |
bytes |
MESH (bytea) |
3D mesh data (PLY, STL) |
MEDICAL |
bytes |
MEDICAL (bytea) |
Medical imaging (DICOM) |
FILE |
str (path) |
N/A (processed) | "/path/to/file.mp4" |
Schema inference
bool -> BOOL - int -> INT64 - float -> FLOAT64 - bytes -> BINARY - str -> TEXT - list[float] -> EMBEDDING (size auto-detected)
Domain types
IMAGE, SEGMENT_MASK, BINARY_MASK, BOUNDING_BOX, CLASS_LABEL, POLYGON, POINT, MESH, and MEDICAL are PostgreSQL domain types defined by pg_deeplake. They behave like their base types but carry semantic meaning for visualization and type-aware processing.
FILE
FILE is a schema directive, not a storage type. Columns marked as FILE are treated as file paths during ingestion -- the files are processed (chunked, etc.) and the resulting data is stored in generated columns. The FILE column itself is not stored.
Performance Tuning¶
Buffered writes: Rows are accumulated in a memory buffer and flushed in larger batches, reducing Python-to-C++ FFI overhead.
Periodic commits: ds.commit() is called every 2000 rows to free memory and enable crash recovery.
Parallel file normalization: File processing (ffmpeg, PyMuPDF, file I/O) runs in a thread pool (up to 4 workers).
Storage concurrency: The SDK sets deeplake.storage.set_concurrency(32) during ingestion to parallelize S3/GCS chunk uploads.
| Parameter | Default | Description |
|---|---|---|
flush_every |
200 | Rows buffered before ds.append() |
commit_every |
2000 | Rows between ds.commit() calls |
| Normalization workers | 4 | Max threads for file processing |
| Storage concurrency | 32 | Parallel storage I/O operations |
Limits¶
| Resource | Limit |
|---|---|
| Video chunk duration | 10 seconds |
| Text chunk size (default) | 1000 characters |
| Text chunk overlap (default) | 200 characters |
| PDF rendering resolution | 300 DPI |
| Batch size (data ingest) | 1000 rows |
| Batch iteration size (default) | 100 rows |
| Write buffer (flush_every) | 200 rows |
| Commit interval | 2000 rows |
| File normalization workers | 4 threads |
| Storage I/O concurrency | 32 operations |
Troubleshooting¶
Token does not contain org_id
Ensure your token contains an OrgID claim, or that the API /me endpoint is accessible as fallback.