Ingestion¶
The client.ingest() method loads files, structured data, and HuggingFace datasets into DeepLake managed tables in a single call.
The TypeScript SDK uses the WASM engine to write data directly to storage. The flow is: create a table via REST, open the dataset with a direct storage path, append data, and commit.
ingest()¶
const { ManagedClient } = require("deeplake");
const {
initializeWasm, deeplakeOpen, deeplakeAppend,
deeplakeCommit, deeplakeRelease,
} = require("deeplake/wasm");
const { apiRequest } = require("deeplake/api");
// 1. Initialize WASM and client
await initializeWasm();
const client = new ManagedClient({
token: process.env.DEEPLAKE_API_KEY,
workspaceId: process.env.DEEPLAKE_WORKSPACE,
});
await client.applyStorageCreds("readwrite");
// 2. Create table via REST API
await apiRequest(client.apiUrl, client.token, client.orgId, {
method: "POST",
path: `/workspaces/${client.workspaceId}/tables`,
body: { table_name: "my_table", table_schema: { title: "TEXT", content: "TEXT" } },
timeoutMs: 30000,
});
// 3. Open dataset, append, commit
const dsPath = await client.getDatasetPath("my_table");
const ds = await deeplakeOpen(dsPath, "", client.token);
await deeplakeAppend(ds, {
title: ["Row 1", "Row 2"],
content: ["First content", "Second content"],
});
await deeplakeCommit(ds);
deeplakeRelease(ds);
Warning
If table_name already exists, ingest() raises a TableError (Python). In TypeScript, the REST POST /tables call returns a 409 conflict. Use drop_table() / dropTable() first, or choose a different name.
Parameters¶
| Parameter | Type | Required | Description |
|---|---|---|---|
table_name |
str |
yes | Table name to create. Must not already exist |
data |
dict |
yes | Data to ingest as a dictionary (see below) |
schema |
dict[str, str] |
no | Explicit schema mapping column names to type strings. Use "FILE" for file path columns. Overrides auto-inference |
on_progress |
Callable |
no | Callback called with (rows_written, total) |
chunk_size |
int |
no | Character count per text chunk. Default: 1000 |
chunk_overlap |
int |
no | Overlap between consecutive text chunks (chars). Default: 200 |
apiRequest() - Create the table:
| Parameter | Type | Required | Description |
|---|---|---|---|
body.table_name |
string |
yes | Table name to create. Must not already exist |
body.table_schema |
Record<string, string> |
yes | Schema mapping column names to Postgres types (TEXT, BIGINT, BYTEA, etc.) |
deeplakeAppend() - Write data:
| Parameter | Type | Required | Description |
|---|---|---|---|
ds |
dataset handle | yes | Handle returned by deeplakeOpen() |
data |
Record<string, unknown[]> |
yes | Column-oriented data. Keys are column names, values are arrays of equal length |
Data content types (Python):
| Content | Interpretation | Example |
|---|---|---|
Column with schema={"col": "FILE"} |
File paths to ingest | {"path": ["video.mp4"]}, schema={"path": "FILE"} |
{"_huggingface": "name"} |
HuggingFace dataset | {"_huggingface": "cifar10"} |
{col: [values]} |
Direct column data | {"text": ["a", "b"], "score": [1, 2]} |
Return value¶
| Key | Type | Description |
|---|---|---|
table_name |
str |
Name of the created table |
row_count |
int |
Total rows inserted |
dataset_path |
str |
Storage path of the underlying dataset |
Ingest Multimodal Data¶
TypeScript: file ingestion
File-based ingestion (video, image, PDF) with automatic chunking and normalization is available in the Python SDK. In TypeScript, use the WASM append flow to write pre-processed data directly, or use the Python SDK for file ingestion.
Ingest Text Data¶
Ingest Structured Data¶
// Assumes client, WASM already initialized (see setup above)
const dsPath = await client.getDatasetPath("vectors");
await apiRequest(client.apiUrl, client.token, client.orgId, {
method: "POST",
path: `/workspaces/${client.workspaceId}/tables`,
body: {
table_name: "vectors",
table_schema: { id: "TEXT", text: "TEXT", embedding: "float4[]" },
},
timeoutMs: 30000,
});
const ds = await deeplakeOpen(dsPath, "", client.token);
await deeplakeAppend(ds, {
id: ["doc1", "doc2", "doc3"],
text: ["Hello world", "Goodbye world", "Another doc"],
embedding: [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6], [0.7, 0.8, 0.9]],
});
await deeplakeCommit(ds);
deeplakeRelease(ds);
await apiRequest(client.apiUrl, client.token, client.orgId, {
method: "POST",
path: `/workspaces/${client.workspaceId}/tables`,
body: {
table_name: "data",
table_schema: { name: "TEXT", age: "BIGINT" },
},
timeoutMs: 30000,
});
const dsPath = await client.getDatasetPath("data");
const ds = await deeplakeOpen(dsPath, "", client.token);
await deeplakeAppend(ds, {
name: ["Alice", "Bob"],
age: [30, 25],
});
await deeplakeCommit(ds);
deeplakeRelease(ds);
Production Patterns¶
import glob
files = glob.glob("data/videos/*.mp4")
print(f"Ingesting {len(files)} files...")
def progress(rows_written, total):
pct = (rows_written / total * 100) if total else 0
print(f" {rows_written}/{total} rows ({pct:.0f}%)")
result = client.ingest("video_archive", {"path": files}, schema={"path": "FILE"},
on_progress=progress)
print(f"Done: {result['row_count']} rows created")
// Append in batches for large datasets
const BATCH_SIZE = 1000;
const dsPath = await client.getDatasetPath("large_table");
const ds = await deeplakeOpen(dsPath, "", client.token);
for (let i = 0; i < allData.length; i += BATCH_SIZE) {
const batch = allData.slice(i, i + BATCH_SIZE);
await deeplakeAppend(ds, {
text: batch.map(r => r.text),
score: batch.map(r => r.score),
});
console.log(`Appended ${Math.min(i + BATCH_SIZE, allData.length)}/${allData.length}`);
}
await deeplakeCommit(ds);
deeplakeRelease(ds);
// Drop existing table first
await client.dropTable("documents");
// Recreate table and append new data
await apiRequest(client.apiUrl, client.token, client.orgId, {
method: "POST",
path: `/workspaces/${client.workspaceId}/tables`,
body: { table_name: "documents", table_schema: { title: "TEXT", content: "TEXT" } },
timeoutMs: 30000,
});
const dsPath = await client.getDatasetPath("documents");
const ds = await deeplakeOpen(dsPath, "", client.token);
await deeplakeAppend(ds, {
title: ["Updated report", "New notes"],
content: ["Report v2 content...", "Fresh notes..."],
});
await deeplakeCommit(ds);
deeplakeRelease(ds);
Chunking Strategy¶
| File Type | Extensions | Strategy | Columns Created |
|---|---|---|---|
| Video | .mp4, .mov, .avi, .mkv, .webm |
10-second segments + thumbnails | id, file_id, chunk_index, start_time, end_time, video_data, thumbnail, text |
| Image | .jpg, .jpeg, .png, .gif, .bmp, .webp |
Single chunk | id, file_id, image, filename, text |
.pdf |
Page-by-page at 300 DPI | id, file_id, page_index, image, text |
|
| Text | .txt, .md, .csv, .json, .xml, .html |
1000-char chunks, 200 overlap | id, file_id, chunk_index, text |
| Other | * |
Single binary chunk | id, file_id, data, filename |
Performance Tuning¶
| Parameter | Default | Description |
|---|---|---|
flush_every |
200 | Rows buffered before ds.append() |
commit_every |
2000 | Rows between ds.commit() calls |
| Normalization workers | 4 | Max threads for file processing |
| Storage concurrency | 32 | Parallel storage I/O operations |
Buffered writes. Rows are accumulated in a memory buffer and flushed in larger batches, reducing Python-to-C++ FFI overhead.
Periodic commits. ds.commit() is called every 2000 rows to free C++ memory buffers and enable crash recovery.
Parallel file normalization. ffmpeg for video, PyMuPDF for PDFs, file I/O for images/text. Runs in a thread pool (up to 4 workers).
Storage concurrency. The SDK sets deeplake.storage.set_concurrency(32) during ingestion to parallelize S3/GCS chunk uploads.
For the full list of data types and schema inference rules, see Data Types.