feat(nodejs): add Scannable primitive for streaming ingestion (#3271)

## **Summary**

This PR adds a **Scannable primitive** to the Node.js bindings, bringing
parity with Python's `PyScannable`.

A `Scannable` wraps a schema, an optional row count hint, a rescannable
flag, and a batch producing callback. On the Rust side it implements
`lancedb::data::scannable::Scannable`. The goal is to give consumers
such as `Table.add`, `createTable`, and `mergeInsert` a way to stream
data without materializing the full dataset in JS memory.

This PR introduces only the primitive. Migrating existing consumers to
use it will come in follow up work.

---

## **Design**

### **Transport**

The transport uses the **Arrow IPC Stream format, one batch at a time**.

The JS side encodes each `RecordBatch` into a self contained IPC Stream
message containing schema, batch, and end of stream. The message is
returned as a `Buffer` through a napi `ThreadsafeFunction`. The Rust
side decodes it using `arrow_ipc::reader::StreamReader`.

Only one batch is active at a time, so JS memory stays bounded by the
batch size. The Node `Buffer` size limit of about 4 GiB therefore does
not constrain the stream as a whole.

I initially evaluated the Arrow C Data Interface, which is the approach
used in Python. I dropped that path after confirming that the
`apache-arrow` npm package does not expose a C Data Interface export in
any supported version from 15 to 18. JavaScript is not listed in Arrow's
C Data Interface implementation table, and the upstream tracking issue
remains open with no scheduled work.

Third party FFI shims would introduce additional dependency risk without
solving the core maintenance problem. Using IPC adds one encode and
decode step per batch, but the cost is predictable and typically
dominated by Lance's write path.

---

### **API**

```ts
class Scannable {
  readonly schema: Schema
  readonly numRows: number | null
  readonly rescannable: boolean

  static fromFactory(schema, factory, opts?)
  static fromTable(table, opts?)
  static fromIterable(schema, iter, opts?)
  static fromRecordBatchReader(reader, opts?)
}
```

The FFI boundary consists of a single callback:

`getNextBatch(isStart: boolean): Promise<Buffer | null>`

`isStart` is `true` on the first call of each new scan and `false` for
every call after it. The JS side uses it to drop any cached iterator and
re-invoke the factory at scan boundaries. This is what makes a
rescannable source restart at batch 0 on every `scan_as_stream` call,
even when a previous scan ended mid stream, for example a retried write
after a network error. Without this signal a retry would resume a stale
iterator and silently skip already emitted batches.

In addition, a schema only IPC buffer is transferred once during
construction.

---

## **Changes**

* `nodejs/src/scannable.rs`
Adds `NapiScannable` and the `LanceScannable` implementation. Implements
`schema()`, `num_rows()`, `rescannable()`, and `scan_as_stream()`.
Includes per batch schema validation against the declared schema, one
shot enforcement for non rescannable sources, and a scan boundary reset
signal (`isStart`) so rescannable sources restart from batch 0 on every
`scan_as_stream` call rather than resuming a stale iterator.

* `nodejs/src/lib.rs`
  Module registration.

* `nodejs/lancedb/scannable.ts`
Defines the `Scannable` class and the four constructors listed above.
Each constructor rejects option combinations it cannot honor, for
example a `rescannable: true` request on a one shot iterable or reader,
and a `numRows` that disagrees with an in memory table's row count.

* `nodejs/lancedb/index.ts`
  Exports the new primitive.

* `nodejs/__test__/scannable.test.ts`
  Test suite for the primitive.

---

## **Validation**

Before implementing the bridge, I ran an end to end harness with a JS
producer feeding a standalone Rust consumer built against the same
`arrow-ipc` version used in the bridge.

The harness covered the following scenarios:

* happy path
* empty stream
* 1,000 small batches
* 10 large batches
* mixed primitive types with nullables
* nested `List<Struct<>>`
* truncated stream error handling
* declared schema mismatch validation
* a 6 GB stress test through the pipe

All scenarios completed with bounded memory usage. The goal of this
harness was to confirm that the IPC Stream transport works correctly end
to end and that Node's `Buffer` size limit does not constrain the
overall stream.

Separately, the rescannable restart contract was verified with a focused
harness. A rescannable source is consumed partially and the scan is
dropped mid stream, then re-scanned. The re-scan replays from batch 0
rather than resuming the stale iterator. The same harness was run with
the `isStart` reset path disabled and the mid stream restart case failed
as expected, confirming the test exercises the real regression.

These harnesses are not meant to replace the full test suite, which is
described below.

---

## **Tests**

`__test__/scannable.test.ts` covers construction, metadata reflection,
per constructor defaults and overrides, construction time validation,
the native handle surface, and schema variety across empty tables,
nested types, `FixedSizeList`, and wide schemas.

Runtime scan behavior including `scan_as_stream`, one shot enforcement
on non rescannable sources, schema mismatch detection, IPC decode
failures, and rescannable restart semantics is not exercised here. There
is no in tree JS consumer of `NapiScannable` yet. This mirrors Python's
`PyScannable`, which has no dedicated test file and is covered
transitively through the consumers that accept a Scannable.

Runtime coverage will follow in the consumer migration work.

---

## **Status**

Ready for review.

Closes #3223

---
This commit is contained in:
Tanay
2026-05-15 03:37:41 +05:30
committed by GitHub
parent 9330a9b851
commit df4ad9f851
9 changed files with 1183 additions and 0 deletions

View File

@@ -0,0 +1,173 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / Scannable
# Class: Scannable
A data source that can be scanned as a stream of Arrow `RecordBatch`es.
`Scannable` wraps the schema + optional row count + rescannable flag and
a callback that yields batches one at a time. It is passed to consumers
(e.g. `Table.add`, `createTable`, `mergeInsert` — follow-up work) that
need to pull data without materializing the full dataset in JS memory.
Batches cross the JS↔Rust boundary as Arrow IPC Stream messages; a fresh
writer serializes each batch, and the Rust side decodes it with
`arrow_ipc::reader::StreamReader`. One batch is in flight at a time.
## Properties
### numRows
```ts
readonly numRows: null | number;
```
***
### rescannable
```ts
readonly rescannable: boolean;
```
***
### schema
```ts
readonly schema: Schema<any>;
```
## Methods
### fromFactory()
```ts
static fromFactory(
schema,
factory,
opts): Promise<Scannable>
```
Build a Scannable from an explicit schema and a factory that returns a
fresh batch iterator on each call.
The factory is invoked once per scan. Each iterator yields
`RecordBatch`es matching the declared schema. Use this when you need
direct control over the pull loop — for example, to wrap a streaming
source whose batches are produced lazily.
#### Parameters
* **schema**: `Schema`&lt;`any`&gt;
The Arrow schema of the produced batches.
* **factory**
Called at the start of each scan to produce a batch
iterator. Must be idempotent when `rescannable` is true.
* **opts**: [`ScannableOptions`](../interfaces/ScannableOptions.md) = `{}`
Optional hints. `rescannable` defaults to `true`; set to
`false` if calling `factory()` twice would not reproduce the same data.
#### Returns
`Promise`&lt;[`Scannable`](Scannable.md)&gt;
***
### fromIterable()
```ts
static fromIterable(
schema,
iter,
opts): Promise<Scannable>
```
Build a Scannable from an iterable of `RecordBatch`es. `rescannable`
defaults to `false`. Pass an explicit schema so the consumer can
validate before any batch is pulled.
`opts.rescannable: true` is honest for replayable iterables (Arrays,
Sets, or custom iterables whose `[Symbol.iterator]()` returns a fresh
iterator each call). It is rejected for one-shot iterables (generators,
async generators, or already-an-iterator inputs) because their
`[Symbol.iterator]()` returns the same exhausted object on the second
scan. For replayable sources outside this shape, use
`fromFactory(schema, () => createIter(), { rescannable: true })`.
Note: when `opts.rescannable` is `true`, the constructor calls
`[Symbol.iterator]()` once on the input to perform the structural check.
#### Parameters
* **schema**: `Schema`&lt;`any`&gt;
* **iter**: `Iterable`&lt;`RecordBatch`&lt;`any`&gt;&gt; \| `AsyncIterable`&lt;`RecordBatch`&lt;`any`&gt;&gt;
* **opts**: [`ScannableOptions`](../interfaces/ScannableOptions.md) = `{}`
#### Returns
`Promise`&lt;[`Scannable`](Scannable.md)&gt;
***
### fromRecordBatchReader()
```ts
static fromRecordBatchReader(reader, opts): Promise<Scannable>
```
Build a Scannable from an Arrow `RecordBatchReader`. A reader can only
be consumed once; `rescannable` defaults to `false`.
The reader must already be opened (via `.open()`) so its `.schema` is
populated. `RecordBatchReader.from(...)` returns an unopened reader.
`opts.rescannable: true` is rejected because `RecordBatchReader` is a
self-iterator (its `[Symbol.iterator]()` returns itself), and this
constructor does not call `reader.reset()` between scans, so a second
scan would always see an exhausted reader. For genuinely replayable
sources, use
`fromFactory(schema, () => openReader(), { rescannable: true })`,
which mints a fresh reader on each scan.
#### Parameters
* **reader**: `RecordBatchReader`&lt;`any`&gt;
* **opts**: [`ScannableOptions`](../interfaces/ScannableOptions.md) = `{}`
#### Returns
`Promise`&lt;[`Scannable`](Scannable.md)&gt;
***
### fromTable()
```ts
static fromTable(table, opts): Promise<Scannable>
```
Build a Scannable from an in-memory Arrow `Table`. Always rescannable;
the table's batches are replayed on each scan.
The table's row count is authoritative: `opts.numRows` must either be
omitted or equal to `table.numRows`. `opts.rescannable` of `false` is
rejected because in-memory Tables are always rescannable.
#### Parameters
* **table**: `Table`&lt;`any`&gt;
* **opts**: [`ScannableOptions`](../interfaces/ScannableOptions.md) = `{}`
#### Returns
`Promise`&lt;[`Scannable`](Scannable.md)&gt;

View File

@@ -32,6 +32,7 @@
- [PhraseQuery](classes/PhraseQuery.md)
- [Query](classes/Query.md)
- [QueryBase](classes/QueryBase.md)
- [Scannable](classes/Scannable.md)
- [Session](classes/Session.md)
- [StaticHeaderProvider](classes/StaticHeaderProvider.md)
- [Table](classes/Table.md)
@@ -86,6 +87,7 @@
- [RemovalStats](interfaces/RemovalStats.md)
- [RestNamespaceConfig](interfaces/RestNamespaceConfig.md)
- [RetryConfig](interfaces/RetryConfig.md)
- [ScannableOptions](interfaces/ScannableOptions.md)
- [ShuffleOptions](interfaces/ShuffleOptions.md)
- [SplitCalculatedOptions](interfaces/SplitCalculatedOptions.md)
- [SplitHashOptions](interfaces/SplitHashOptions.md)

View File

@@ -0,0 +1,29 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / ScannableOptions
# Interface: ScannableOptions
## Properties
### numRows?
```ts
optional numRows: number;
```
Hint about the number of rows. Not validated against the stream.
***
### rescannable?
```ts
optional rescannable: boolean;
```
Whether the source can be scanned more than once. Defaults to `true` for
`fromTable` / `fromFactory` and `false` for `fromIterable` /
`fromRecordBatchReader`.