mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-20 05:20:40 +00:00
## **Summary**
This PR adds a **Scannable primitive** to the Node.js bindings, bringing
parity with Python's `PyScannable`.
A `Scannable` wraps a schema, an optional row count hint, a rescannable
flag, and a batch producing callback. On the Rust side it implements
`lancedb::data::scannable::Scannable`. The goal is to give consumers
such as `Table.add`, `createTable`, and `mergeInsert` a way to stream
data without materializing the full dataset in JS memory.
This PR introduces only the primitive. Migrating existing consumers to
use it will come in follow up work.
---
## **Design**
### **Transport**
The transport uses the **Arrow IPC Stream format, one batch at a time**.
The JS side encodes each `RecordBatch` into a self contained IPC Stream
message containing schema, batch, and end of stream. The message is
returned as a `Buffer` through a napi `ThreadsafeFunction`. The Rust
side decodes it using `arrow_ipc::reader::StreamReader`.
Only one batch is active at a time, so JS memory stays bounded by the
batch size. The Node `Buffer` size limit of about 4 GiB therefore does
not constrain the stream as a whole.
I initially evaluated the Arrow C Data Interface, which is the approach
used in Python. I dropped that path after confirming that the
`apache-arrow` npm package does not expose a C Data Interface export in
any supported version from 15 to 18. JavaScript is not listed in Arrow's
C Data Interface implementation table, and the upstream tracking issue
remains open with no scheduled work.
Third party FFI shims would introduce additional dependency risk without
solving the core maintenance problem. Using IPC adds one encode and
decode step per batch, but the cost is predictable and typically
dominated by Lance's write path.
---
### **API**
```ts
class Scannable {
readonly schema: Schema
readonly numRows: number | null
readonly rescannable: boolean
static fromFactory(schema, factory, opts?)
static fromTable(table, opts?)
static fromIterable(schema, iter, opts?)
static fromRecordBatchReader(reader, opts?)
}
```
The FFI boundary consists of a single callback:
`getNextBatch(isStart: boolean): Promise<Buffer | null>`
`isStart` is `true` on the first call of each new scan and `false` for
every call after it. The JS side uses it to drop any cached iterator and
re-invoke the factory at scan boundaries. This is what makes a
rescannable source restart at batch 0 on every `scan_as_stream` call,
even when a previous scan ended mid stream, for example a retried write
after a network error. Without this signal a retry would resume a stale
iterator and silently skip already emitted batches.
In addition, a schema only IPC buffer is transferred once during
construction.
---
## **Changes**
* `nodejs/src/scannable.rs`
Adds `NapiScannable` and the `LanceScannable` implementation. Implements
`schema()`, `num_rows()`, `rescannable()`, and `scan_as_stream()`.
Includes per batch schema validation against the declared schema, one
shot enforcement for non rescannable sources, and a scan boundary reset
signal (`isStart`) so rescannable sources restart from batch 0 on every
`scan_as_stream` call rather than resuming a stale iterator.
* `nodejs/src/lib.rs`
Module registration.
* `nodejs/lancedb/scannable.ts`
Defines the `Scannable` class and the four constructors listed above.
Each constructor rejects option combinations it cannot honor, for
example a `rescannable: true` request on a one shot iterable or reader,
and a `numRows` that disagrees with an in memory table's row count.
* `nodejs/lancedb/index.ts`
Exports the new primitive.
* `nodejs/__test__/scannable.test.ts`
Test suite for the primitive.
---
## **Validation**
Before implementing the bridge, I ran an end to end harness with a JS
producer feeding a standalone Rust consumer built against the same
`arrow-ipc` version used in the bridge.
The harness covered the following scenarios:
* happy path
* empty stream
* 1,000 small batches
* 10 large batches
* mixed primitive types with nullables
* nested `List<Struct<>>`
* truncated stream error handling
* declared schema mismatch validation
* a 6 GB stress test through the pipe
All scenarios completed with bounded memory usage. The goal of this
harness was to confirm that the IPC Stream transport works correctly end
to end and that Node's `Buffer` size limit does not constrain the
overall stream.
Separately, the rescannable restart contract was verified with a focused
harness. A rescannable source is consumed partially and the scan is
dropped mid stream, then re-scanned. The re-scan replays from batch 0
rather than resuming the stale iterator. The same harness was run with
the `isStart` reset path disabled and the mid stream restart case failed
as expected, confirming the test exercises the real regression.
These harnesses are not meant to replace the full test suite, which is
described below.
---
## **Tests**
`__test__/scannable.test.ts` covers construction, metadata reflection,
per constructor defaults and overrides, construction time validation,
the native handle surface, and schema variety across empty tables,
nested types, `FixedSizeList`, and wide schemas.
Runtime scan behavior including `scan_as_stream`, one shot enforcement
on non rescannable sources, schema mismatch detection, IPC decode
failures, and rescannable restart semantics is not exercised here. There
is no in tree JS consumer of `NapiScannable` yet. This mirrors Python's
`PyScannable`, which has no dedicated test file and is covered
transitively through the consumers that accept a Scannable.
Runtime coverage will follow in the consumer migration work.
---
## **Status**
Ready for review.
Closes #3223
---
98 lines
3.8 KiB
Rust
98 lines
3.8 KiB
Rust
// SPDX-License-Identifier: Apache-2.0
|
|
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
|
|
|
use std::collections::HashMap;
|
|
|
|
use env_logger::Env;
|
|
use napi_derive::*;
|
|
|
|
mod connection;
|
|
mod error;
|
|
mod header;
|
|
mod index;
|
|
mod iterator;
|
|
pub mod merge;
|
|
pub mod permutation;
|
|
mod query;
|
|
pub mod remote;
|
|
mod rerankers;
|
|
mod scannable;
|
|
mod session;
|
|
mod table;
|
|
mod util;
|
|
|
|
#[napi(object)]
|
|
#[derive(Debug)]
|
|
pub struct ConnectionOptions {
|
|
/// (For LanceDB OSS only): The interval, in seconds, at which to check for
|
|
/// updates to the table from other processes. If None, then consistency is not
|
|
/// checked. For performance reasons, this is the default. For strong
|
|
/// consistency, set this to zero seconds. Then every read will check for
|
|
/// updates from other processes. As a compromise, you can set this to a
|
|
/// non-zero value for eventual consistency. If more than that interval
|
|
/// has passed since the last check, then the table will be checked for updates.
|
|
/// Note: this consistency only applies to read operations. Write operations are
|
|
/// always consistent.
|
|
pub read_consistency_interval: Option<f64>,
|
|
/// (For LanceDB OSS only): configuration for object storage.
|
|
///
|
|
/// The available options are described at https://docs.lancedb.com/storage/
|
|
pub storage_options: Option<HashMap<String, String>>,
|
|
/// (For LanceDB OSS only): use directory namespace manifests as the source
|
|
/// of truth for table metadata. Existing directory-listed root tables are
|
|
/// migrated into the manifest on access.
|
|
pub manifest_enabled: Option<bool>,
|
|
/// (For LanceDB OSS only): extra properties for the backing namespace
|
|
/// client used by manifest-enabled native connections.
|
|
pub namespace_client_properties: Option<HashMap<String, String>>,
|
|
/// (For LanceDB OSS only): the session to use for this connection. Holds
|
|
/// shared caches and other session-specific state.
|
|
pub session: Option<session::Session>,
|
|
|
|
/// (For LanceDB cloud only): configuration for the remote HTTP client.
|
|
pub client_config: Option<remote::ClientConfig>,
|
|
/// (For LanceDB cloud only): the API key to use with LanceDB Cloud.
|
|
///
|
|
/// Can also be set via the environment variable `LANCEDB_API_KEY`.
|
|
pub api_key: Option<String>,
|
|
/// (For LanceDB cloud only): the region to use for LanceDB cloud.
|
|
/// Defaults to 'us-east-1'.
|
|
pub region: Option<String>,
|
|
/// (For LanceDB cloud only): the host to use for LanceDB cloud. Used
|
|
/// for testing purposes.
|
|
pub host_override: Option<String>,
|
|
}
|
|
|
|
#[napi(object)]
|
|
pub struct OpenTableOptions {
|
|
pub storage_options: Option<HashMap<String, String>>,
|
|
}
|
|
|
|
#[napi(object)]
|
|
#[derive(Debug)]
|
|
pub struct ConnectNamespaceOptions {
|
|
/// The interval, in seconds, at which to check for updates to the table
|
|
/// from other processes. If None, then consistency is not checked. For
|
|
/// performance reasons, this is the default. For strong consistency, set
|
|
/// this to zero seconds. Then every read will check for updates from other
|
|
/// processes. As a compromise, you can set this to a non-zero value for
|
|
/// eventual consistency.
|
|
pub read_consistency_interval: Option<f64>,
|
|
/// Configuration for object storage. The available options are described
|
|
/// at https://docs.lancedb.com/storage/
|
|
pub storage_options: Option<HashMap<String, String>>,
|
|
/// Extra properties for the backing namespace client.
|
|
pub namespace_client_properties: Option<HashMap<String, String>>,
|
|
/// The session to use for this connection. Holds shared caches and other
|
|
/// session-specific state.
|
|
pub session: Option<session::Session>,
|
|
}
|
|
|
|
#[napi_derive::module_init]
|
|
fn init() {
|
|
let env = Env::new()
|
|
.filter_or("LANCEDB_LOG", "warn")
|
|
.write_style("LANCEDB_LOG_STYLE");
|
|
env_logger::init_from_env(env);
|
|
}
|