Skip to content

Ingestion

The client.ingest() method loads files, structured data, and HuggingFace datasets into DeepLake managed tables in a single call.

The TypeScript SDK uses the WASM engine to write data directly to storage. The flow is: create a table via REST, open the dataset with a direct storage path, append data, and commit.


ingest()

# client.ingest(table_name, data, *, schema=None, on_progress=None, chunk_size=1000, chunk_overlap=200)
result = client.ingest(table_name, data, schema=None, on_progress=None, chunk_size=1000, chunk_overlap=200)
const { ManagedClient } = require("deeplake");
const {
    initializeWasm, deeplakeOpen, deeplakeAppend,
    deeplakeCommit, deeplakeRelease,
} = require("deeplake/wasm");
const { apiRequest } = require("deeplake/api");

// 1. Initialize WASM and client
await initializeWasm();
const client = new ManagedClient({
    token: process.env.DEEPLAKE_API_KEY,
    workspaceId: process.env.DEEPLAKE_WORKSPACE,
});
await client.applyStorageCreds("readwrite");

// 2. Create table via REST API
await apiRequest(client.apiUrl, client.token, client.orgId, {
    method: "POST",
    path: `/workspaces/${client.workspaceId}/tables`,
    body: { table_name: "my_table", table_schema: { title: "TEXT", content: "TEXT" } },
    timeoutMs: 30000,
});

// 3. Open dataset, append, commit
const dsPath = await client.getDatasetPath("my_table");
const ds = await deeplakeOpen(dsPath, "", client.token);
await deeplakeAppend(ds, {
    title: ["Row 1", "Row 2"],
    content: ["First content", "Second content"],
});
await deeplakeCommit(ds);
deeplakeRelease(ds);

Warning

If table_name already exists, ingest() raises a TableError (Python). In TypeScript, the REST POST /tables call returns a 409 conflict. Use drop_table() / dropTable() first, or choose a different name.

Parameters

Parameter Type Required Description
table_name str yes Table name to create. Must not already exist
data dict yes Data to ingest as a dictionary (see below)
schema dict[str, str] no Explicit schema mapping column names to type strings. Use "FILE" for file path columns. Overrides auto-inference
on_progress Callable no Callback called with (rows_written, total)
chunk_size int no Character count per text chunk. Default: 1000
chunk_overlap int no Overlap between consecutive text chunks (chars). Default: 200

apiRequest() - Create the table:

Parameter Type Required Description
body.table_name string yes Table name to create. Must not already exist
body.table_schema Record<string, string> yes Schema mapping column names to Postgres types (TEXT, BIGINT, BYTEA, etc.)

deeplakeAppend() - Write data:

Parameter Type Required Description
ds dataset handle yes Handle returned by deeplakeOpen()
data Record<string, unknown[]> yes Column-oriented data. Keys are column names, values are arrays of equal length

Data content types (Python):

Content Interpretation Example
Column with schema={"col": "FILE"} File paths to ingest {"path": ["video.mp4"]}, schema={"path": "FILE"}
{"_huggingface": "name"} HuggingFace dataset {"_huggingface": "cifar10"}
{col: [values]} Direct column data {"text": ["a", "b"], "score": [1, 2]}

Return value

{"table_name": "videos", "row_count": 150, "dataset_path": "/data/org_id/workspace_id/videos"}
Key Type Description
table_name str Name of the created table
row_count int Total rows inserted
dataset_path str Storage path of the underlying dataset

The TypeScript WASM flow does not return a result object. Use client.query() to verify the data after commit:

const rows = await client.query(`SELECT COUNT(*) as total FROM my_table`);
console.log(`Inserted ${rows[0].total} rows`);

Ingest Multimodal Data

client.ingest("security_footage", {
    "path": [
        "camera1_2025-01-15.mp4",
        "camera2_2025-01-15.mp4",
    ]
}, schema={"path": "FILE"})
# Creates ~10-second segments with thumbnails
client.ingest("photos", {
    "path": ["image1.jpg", "image2.png"]
}, schema={"path": "FILE"})
# Each image stored as single row
client.ingest("manuals", {
    "path": ["manual.pdf"]
}, schema={"path": "FILE"})
# Each page rendered at 300 DPI as PNG

TypeScript: file ingestion

File-based ingestion (video, image, PDF) with automatic chunking and normalization is available in the Python SDK. In TypeScript, use the WASM append flow to write pre-processed data directly, or use the Python SDK for file ingestion.

Ingest Text Data

client.ingest("documents", {
    "path": [
        "report.txt",
        "notes.md",
        "data.json",
    ]
}, schema={"path": "FILE"})
# Text chunked into ~1000 char pieces with 200 char overlap
# Smaller chunks for precise retrieval
client.ingest("faq", {"path": ["faq.txt"]}, schema={"path": "FILE"},
              chunk_size=500, chunk_overlap=100)

# Larger chunks for context-heavy documents
client.ingest("legal", {"path": ["contract.pdf"]}, schema={"path": "FILE"},
              chunk_size=2000, chunk_overlap=400)

Ingest Structured Data

client.ingest("vectors", {
    "id": ["doc1", "doc2", "doc3"],
    "text": ["Hello world", "Goodbye world", "Another doc"],
    "embedding": [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6], [0.7, 0.8, 0.9]],
})
# Schema inferred from data types
client.ingest("data", {
    "name": ["Alice", "Bob"],
    "age": [30, 25],
}, schema={
    "name": "TEXT",
    "age": "INT64",
})
client.ingest("mnist", {"_huggingface": "mnist"})
client.ingest("cifar", {"_huggingface": "cifar10"})
// Assumes client, WASM already initialized (see setup above)
const dsPath = await client.getDatasetPath("vectors");

await apiRequest(client.apiUrl, client.token, client.orgId, {
    method: "POST",
    path: `/workspaces/${client.workspaceId}/tables`,
    body: {
        table_name: "vectors",
        table_schema: { id: "TEXT", text: "TEXT", embedding: "float4[]" },
    },
    timeoutMs: 30000,
});

const ds = await deeplakeOpen(dsPath, "", client.token);
await deeplakeAppend(ds, {
    id: ["doc1", "doc2", "doc3"],
    text: ["Hello world", "Goodbye world", "Another doc"],
    embedding: [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6], [0.7, 0.8, 0.9]],
});
await deeplakeCommit(ds);
deeplakeRelease(ds);
await apiRequest(client.apiUrl, client.token, client.orgId, {
    method: "POST",
    path: `/workspaces/${client.workspaceId}/tables`,
    body: {
        table_name: "data",
        table_schema: { name: "TEXT", age: "BIGINT" },
    },
    timeoutMs: 30000,
});

const dsPath = await client.getDatasetPath("data");
const ds = await deeplakeOpen(dsPath, "", client.token);
await deeplakeAppend(ds, {
    name: ["Alice", "Bob"],
    age: [30, 25],
});
await deeplakeCommit(ds);
deeplakeRelease(ds);

Production Patterns

def progress(rows_written, total):
    print(f"Written {rows_written} rows...")

client.ingest("documents", {"path": pdf_files}, schema={"path": "FILE"},
              on_progress=progress)
import glob

files = glob.glob("data/videos/*.mp4")
print(f"Ingesting {len(files)} files...")

def progress(rows_written, total):
    pct = (rows_written / total * 100) if total else 0
    print(f"  {rows_written}/{total} rows ({pct:.0f}%)")

result = client.ingest("video_archive", {"path": files}, schema={"path": "FILE"},
                      on_progress=progress)
print(f"Done: {result['row_count']} rows created")
# Drop existing table first
client.drop_table("documents")

# Re-ingest with updated files
client.ingest("documents", {
    "path": [
        "report_v2.pdf",
        "new_notes.txt",
    ]
}, schema={"path": "FILE"})
// Append in batches for large datasets
const BATCH_SIZE = 1000;
const dsPath = await client.getDatasetPath("large_table");
const ds = await deeplakeOpen(dsPath, "", client.token);

for (let i = 0; i < allData.length; i += BATCH_SIZE) {
    const batch = allData.slice(i, i + BATCH_SIZE);
    await deeplakeAppend(ds, {
        text: batch.map(r => r.text),
        score: batch.map(r => r.score),
    });
    console.log(`Appended ${Math.min(i + BATCH_SIZE, allData.length)}/${allData.length}`);
}

await deeplakeCommit(ds);
deeplakeRelease(ds);
// Drop existing table first
await client.dropTable("documents");

// Recreate table and append new data
await apiRequest(client.apiUrl, client.token, client.orgId, {
    method: "POST",
    path: `/workspaces/${client.workspaceId}/tables`,
    body: { table_name: "documents", table_schema: { title: "TEXT", content: "TEXT" } },
    timeoutMs: 30000,
});

const dsPath = await client.getDatasetPath("documents");
const ds = await deeplakeOpen(dsPath, "", client.token);
await deeplakeAppend(ds, {
    title: ["Updated report", "New notes"],
    content: ["Report v2 content...", "Fresh notes..."],
});
await deeplakeCommit(ds);
deeplakeRelease(ds);

Chunking Strategy

File Type Extensions Strategy Columns Created
Video .mp4, .mov, .avi, .mkv, .webm 10-second segments + thumbnails id, file_id, chunk_index, start_time, end_time, video_data, thumbnail, text
Image .jpg, .jpeg, .png, .gif, .bmp, .webp Single chunk id, file_id, image, filename, text
PDF .pdf Page-by-page at 300 DPI id, file_id, page_index, image, text
Text .txt, .md, .csv, .json, .xml, .html 1000-char chunks, 200 overlap id, file_id, chunk_index, text
Other * Single binary chunk id, file_id, data, filename

Performance Tuning

Parameter Default Description
flush_every 200 Rows buffered before ds.append()
commit_every 2000 Rows between ds.commit() calls
Normalization workers 4 Max threads for file processing
Storage concurrency 32 Parallel storage I/O operations

Buffered writes. Rows are accumulated in a memory buffer and flushed in larger batches, reducing Python-to-C++ FFI overhead.

Periodic commits. ds.commit() is called every 2000 rows to free C++ memory buffers and enable crash recovery.

Parallel file normalization. ffmpeg for video, PyMuPDF for PDFs, file I/O for images/text. Runs in a thread pool (up to 4 workers).

Storage concurrency. The SDK sets deeplake.storage.set_concurrency(32) during ingestion to parallelize S3/GCS chunk uploads.


For the full list of data types and schema inference rules, see Data Types.