diff --git a/docs/src/js/classes/Scannable.md b/docs/src/js/classes/Scannable.md new file mode 100644 index 000000000..5005db6ab --- /dev/null +++ b/docs/src/js/classes/Scannable.md @@ -0,0 +1,173 @@ +[**@lancedb/lancedb**](../README.md) • **Docs** + +*** + +[@lancedb/lancedb](../globals.md) / Scannable + +# Class: Scannable + +A data source that can be scanned as a stream of Arrow `RecordBatch`es. + +`Scannable` wraps the schema + optional row count + rescannable flag and +a callback that yields batches one at a time. It is passed to consumers +(e.g. `Table.add`, `createTable`, `mergeInsert` — follow-up work) that +need to pull data without materializing the full dataset in JS memory. + +Batches cross the JS↔Rust boundary as Arrow IPC Stream messages; a fresh +writer serializes each batch, and the Rust side decodes it with +`arrow_ipc::reader::StreamReader`. One batch is in flight at a time. + +## Properties + +### numRows + +```ts +readonly numRows: null | number; +``` + +*** + +### rescannable + +```ts +readonly rescannable: boolean; +``` + +*** + +### schema + +```ts +readonly schema: Schema; +``` + +## Methods + +### fromFactory() + +```ts +static fromFactory( + schema, + factory, + opts): Promise +``` + +Build a Scannable from an explicit schema and a factory that returns a +fresh batch iterator on each call. + +The factory is invoked once per scan. Each iterator yields +`RecordBatch`es matching the declared schema. Use this when you need +direct control over the pull loop — for example, to wrap a streaming +source whose batches are produced lazily. + +#### Parameters + +* **schema**: `Schema`<`any`> + The Arrow schema of the produced batches. + +* **factory** + Called at the start of each scan to produce a batch + iterator. Must be idempotent when `rescannable` is true. + +* **opts**: [`ScannableOptions`](../interfaces/ScannableOptions.md) = `{}` + Optional hints. `rescannable` defaults to `true`; set to + `false` if calling `factory()` twice would not reproduce the same data. + +#### Returns + +`Promise`<[`Scannable`](Scannable.md)> + +*** + +### fromIterable() + +```ts +static fromIterable( + schema, + iter, + opts): Promise +``` + +Build a Scannable from an iterable of `RecordBatch`es. `rescannable` +defaults to `false`. Pass an explicit schema so the consumer can +validate before any batch is pulled. + +`opts.rescannable: true` is honest for replayable iterables (Arrays, +Sets, or custom iterables whose `[Symbol.iterator]()` returns a fresh +iterator each call). It is rejected for one-shot iterables (generators, +async generators, or already-an-iterator inputs) because their +`[Symbol.iterator]()` returns the same exhausted object on the second +scan. For replayable sources outside this shape, use +`fromFactory(schema, () => createIter(), { rescannable: true })`. + +Note: when `opts.rescannable` is `true`, the constructor calls +`[Symbol.iterator]()` once on the input to perform the structural check. + +#### Parameters + +* **schema**: `Schema`<`any`> + +* **iter**: `Iterable`<`RecordBatch`<`any`>> \| `AsyncIterable`<`RecordBatch`<`any`>> + +* **opts**: [`ScannableOptions`](../interfaces/ScannableOptions.md) = `{}` + +#### Returns + +`Promise`<[`Scannable`](Scannable.md)> + +*** + +### fromRecordBatchReader() + +```ts +static fromRecordBatchReader(reader, opts): Promise +``` + +Build a Scannable from an Arrow `RecordBatchReader`. A reader can only +be consumed once; `rescannable` defaults to `false`. + +The reader must already be opened (via `.open()`) so its `.schema` is +populated. `RecordBatchReader.from(...)` returns an unopened reader. + +`opts.rescannable: true` is rejected because `RecordBatchReader` is a +self-iterator (its `[Symbol.iterator]()` returns itself), and this +constructor does not call `reader.reset()` between scans, so a second +scan would always see an exhausted reader. For genuinely replayable +sources, use +`fromFactory(schema, () => openReader(), { rescannable: true })`, +which mints a fresh reader on each scan. + +#### Parameters + +* **reader**: `RecordBatchReader`<`any`> + +* **opts**: [`ScannableOptions`](../interfaces/ScannableOptions.md) = `{}` + +#### Returns + +`Promise`<[`Scannable`](Scannable.md)> + +*** + +### fromTable() + +```ts +static fromTable(table, opts): Promise +``` + +Build a Scannable from an in-memory Arrow `Table`. Always rescannable; +the table's batches are replayed on each scan. + +The table's row count is authoritative: `opts.numRows` must either be +omitted or equal to `table.numRows`. `opts.rescannable` of `false` is +rejected because in-memory Tables are always rescannable. + +#### Parameters + +* **table**: `Table`<`any`> + +* **opts**: [`ScannableOptions`](../interfaces/ScannableOptions.md) = `{}` + +#### Returns + +`Promise`<[`Scannable`](Scannable.md)> diff --git a/docs/src/js/globals.md b/docs/src/js/globals.md index fca33544f..5786afb88 100644 --- a/docs/src/js/globals.md +++ b/docs/src/js/globals.md @@ -32,6 +32,7 @@ - [PhraseQuery](classes/PhraseQuery.md) - [Query](classes/Query.md) - [QueryBase](classes/QueryBase.md) +- [Scannable](classes/Scannable.md) - [Session](classes/Session.md) - [StaticHeaderProvider](classes/StaticHeaderProvider.md) - [Table](classes/Table.md) @@ -86,6 +87,7 @@ - [RemovalStats](interfaces/RemovalStats.md) - [RestNamespaceConfig](interfaces/RestNamespaceConfig.md) - [RetryConfig](interfaces/RetryConfig.md) +- [ScannableOptions](interfaces/ScannableOptions.md) - [ShuffleOptions](interfaces/ShuffleOptions.md) - [SplitCalculatedOptions](interfaces/SplitCalculatedOptions.md) - [SplitHashOptions](interfaces/SplitHashOptions.md) diff --git a/docs/src/js/interfaces/ScannableOptions.md b/docs/src/js/interfaces/ScannableOptions.md new file mode 100644 index 000000000..d15dc76a1 --- /dev/null +++ b/docs/src/js/interfaces/ScannableOptions.md @@ -0,0 +1,29 @@ +[**@lancedb/lancedb**](../README.md) • **Docs** + +*** + +[@lancedb/lancedb](../globals.md) / ScannableOptions + +# Interface: ScannableOptions + +## Properties + +### numRows? + +```ts +optional numRows: number; +``` + +Hint about the number of rows. Not validated against the stream. + +*** + +### rescannable? + +```ts +optional rescannable: boolean; +``` + +Whether the source can be scanned more than once. Defaults to `true` for +`fromTable` / `fromFactory` and `false` for `fromIterable` / +`fromRecordBatchReader`. diff --git a/nodejs/__test__/scannable.test.ts b/nodejs/__test__/scannable.test.ts new file mode 100644 index 000000000..47f1c0548 --- /dev/null +++ b/nodejs/__test__/scannable.test.ts @@ -0,0 +1,438 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The LanceDB Authors + +import { + Field, + Float16, + Int32, + type RecordBatch, + RecordBatchReader, + Schema, + tableToIPC, +} from "apache-arrow"; +import { makeArrowTable, makeEmptyTable } from "../lancedb/arrow"; +import { Scannable } from "../lancedb/scannable"; + +function makeTable() { + return makeArrowTable( + [ + { id: 1, name: "a" }, + { id: 2, name: "b" }, + { id: 3, name: "c" }, + ], + { vectorColumns: {} }, + ); +} + +async function makeReader(): Promise { + // `RecordBatchReader.from()` returns an unopened reader; `.schema` is only + // populated after `.open()`. Opening sync readers is synchronous. + const reader = RecordBatchReader.from(tableToIPC(makeTable())); + return reader.open() as RecordBatchReader; +} + +describe("Scannable", () => { + describe("fromTable", () => { + test("reflects schema, numRows, and defaults rescannable=true", async () => { + const table = makeTable(); + const scannable = await Scannable.fromTable(table); + + expect(scannable.schema).toBe(table.schema); + expect(scannable.numRows).toBe(table.numRows); + expect(scannable.rescannable).toBe(true); + }); + + test("throws when opts.numRows does not match table.numRows", async () => { + await expect( + Scannable.fromTable(makeTable(), { numRows: 42 }), + ).rejects.toThrow(/does not match table\.numRows/); + }); + + test("throws when opts.rescannable is false", async () => { + await expect( + Scannable.fromTable(makeTable(), { rescannable: false }), + ).rejects.toThrow(/always rescannable/); + }); + }); + + describe("fromRecordBatchReader", () => { + test("reflects schema and defaults numRows=null, rescannable=false", async () => { + const reader = await makeReader(); + const scannable = await Scannable.fromRecordBatchReader(reader); + + expect(scannable.schema).toBe(reader.schema); + expect(scannable.numRows).toBeNull(); + expect(scannable.rescannable).toBe(false); + }); + + test("honors numRows override", async () => { + const scannable = await Scannable.fromRecordBatchReader( + await makeReader(), + { numRows: 3 }, + ); + + expect(scannable.numRows).toBe(3); + expect(scannable.rescannable).toBe(false); + }); + + test("rescannable: false explicit does not throw", async () => { + const reader = await makeReader(); + const scannable = await Scannable.fromRecordBatchReader(reader, { + rescannable: false, + }); + expect(scannable.rescannable).toBe(false); + }); + + test("throws when opts.rescannable is true", async () => { + const reader = await makeReader(); + await expect( + Scannable.fromRecordBatchReader(reader, { rescannable: true }), + ).rejects.toThrow(/does not accept rescannable/); + }); + + test("throws when opts.rescannable is true even alongside numRows", async () => { + const reader = await makeReader(); + await expect( + Scannable.fromRecordBatchReader(reader, { + numRows: 3, + rescannable: true, + }), + ).rejects.toThrow(/does not accept rescannable/); + }); + }); + + describe("fromIterable", () => { + test("accepts a sync iterable of batches", async () => { + const table = makeTable(); + const scannable = await Scannable.fromIterable( + table.schema, + table.batches, + ); + + expect(scannable.schema).toBe(table.schema); + expect(scannable.numRows).toBeNull(); + expect(scannable.rescannable).toBe(false); + }); + + test("accepts an async iterable of batches", async () => { + const table = makeTable(); + async function* generator(): AsyncGenerator { + for (const batch of table.batches) { + yield batch; + } + } + + const scannable = await Scannable.fromIterable(table.schema, generator()); + expect(scannable.schema).toBe(table.schema); + expect(scannable.rescannable).toBe(false); + }); + + describe("rescannable: true detection", () => { + // Replayable inputs: [Symbol.iterator]() / [Symbol.asyncIterator]() + // returns a fresh iterator each call. Must NOT throw. + + test("Array passes (fresh ArrayIterator each call)", async () => { + const table = makeTable(); + const scannable = await Scannable.fromIterable( + table.schema, + table.batches, + { rescannable: true }, + ); + expect(scannable.rescannable).toBe(true); + }); + + test("Set passes (fresh SetIterator each call)", async () => { + const table = makeTable(); + const set = new Set(table.batches); + const scannable = await Scannable.fromIterable(table.schema, set, { + rescannable: true, + }); + expect(scannable.rescannable).toBe(true); + }); + + test("custom Iterable returning a fresh iterator passes", async () => { + const table = makeTable(); + const replayable: Iterable = { + [Symbol.iterator]() { + return table.batches[Symbol.iterator](); + }, + }; + const scannable = await Scannable.fromIterable( + table.schema, + replayable, + { rescannable: true }, + ); + expect(scannable.rescannable).toBe(true); + }); + + test("object with generator method passes (fresh generator each call)", async () => { + const table = makeTable(); + const replayable: Iterable = { + *[Symbol.iterator]() { + for (const batch of table.batches) yield batch; + }, + }; + const scannable = await Scannable.fromIterable( + table.schema, + replayable, + { rescannable: true }, + ); + expect(scannable.rescannable).toBe(true); + }); + + test("empty Array passes (replayable degenerate case)", async () => { + const schema = makeTable().schema; + const scannable = await Scannable.fromIterable( + schema, + [] as RecordBatch[], + { rescannable: true }, + ); + expect(scannable.rescannable).toBe(true); + }); + + // One-shot inputs: [Symbol.iterator]() / [Symbol.asyncIterator]() + // returns the same object, or the input is already-an-iterator. + // Must throw with a /one-shot/ message. + + test("sync generator throws", async () => { + const table = makeTable(); + function* generator(): Generator { + for (const batch of table.batches) yield batch; + } + await expect( + Scannable.fromIterable(table.schema, generator(), { + rescannable: true, + }), + ).rejects.toThrow(/one-shot/); + }); + + test("async generator throws", async () => { + const table = makeTable(); + async function* generator(): AsyncGenerator { + for (const batch of table.batches) yield batch; + } + await expect( + Scannable.fromIterable(table.schema, generator(), { + rescannable: true, + }), + ).rejects.toThrow(/one-shot/); + }); + + test("empty generator throws (one-shot degenerate case)", async () => { + const schema = makeTable().schema; + function* generator(): Generator { + // intentionally empty; yields nothing. + } + await expect( + Scannable.fromIterable(schema, generator(), { rescannable: true }), + ).rejects.toThrow(/one-shot/); + }); + + test("custom self-iterator throws", async () => { + const table = makeTable(); + const batches = table.batches; + let i = 0; + const oneShot: Iterable & Iterator = { + [Symbol.iterator]() { + return this; + }, + next() { + if (i >= batches.length) { + return { done: true, value: undefined }; + } + return { done: false, value: batches[i++] }; + }, + }; + await expect( + Scannable.fromIterable(table.schema, oneShot, { rescannable: true }), + ).rejects.toThrow(/one-shot/); + }); + + test("Array.values() (IterableIterator) throws", async () => { + const table = makeTable(); + const iter = table.batches.values(); + await expect( + Scannable.fromIterable(table.schema, iter, { rescannable: true }), + ).rejects.toThrow(/one-shot/); + }); + + test("raw iterator (only `.next`) throws", async () => { + const table = makeTable(); + const batches = table.batches; + let i = 0; + const rawIter = { + next(): IteratorResult { + if (i >= batches.length) { + return { done: true, value: undefined }; + } + return { done: false, value: batches[i++] }; + }, + }; + await expect( + Scannable.fromIterable( + table.schema, + rawIter as unknown as Iterable, + { rescannable: true }, + ), + ).rejects.toThrow(/one-shot/); + }); + + // Edge: null/undefined must not crash the detection helper. The + // null check belongs to `normalizeIterator` and only fires when a + // scan starts. + + test("null input does not crash detection at construction", async () => { + const schema = makeTable().schema; + await expect( + Scannable.fromIterable( + schema, + null as unknown as Iterable, + { + rescannable: true, + }, + ), + ).resolves.toBeDefined(); + }); + + test("undefined input does not crash detection at construction", async () => { + const schema = makeTable().schema; + await expect( + Scannable.fromIterable( + schema, + undefined as unknown as Iterable, + { rescannable: true }, + ), + ).resolves.toBeDefined(); + }); + + // Default (rescannable omitted) skips the check entirely, so even + // pathological inputs construct without throwing here. + + test("rescannable omitted skips detection entirely (generator passes)", async () => { + const table = makeTable(); + function* generator(): Generator { + for (const batch of table.batches) yield batch; + } + const scannable = await Scannable.fromIterable( + table.schema, + generator(), + ); + expect(scannable.rescannable).toBe(false); + }); + + test("rescannable: false explicit skips detection entirely (generator passes)", async () => { + const table = makeTable(); + function* generator(): Generator { + for (const batch of table.batches) yield batch; + } + const scannable = await Scannable.fromIterable( + table.schema, + generator(), + { rescannable: false }, + ); + expect(scannable.rescannable).toBe(false); + }); + }); + }); + + describe("fromFactory", () => { + test("defaults rescannable=true and does not invoke the factory eagerly", async () => { + const table = makeTable(); + const factory = jest.fn(() => table.batches); + + const scannable = await Scannable.fromFactory(table.schema, factory); + + expect(scannable.schema).toBe(table.schema); + expect(scannable.rescannable).toBe(true); + expect(factory).not.toHaveBeenCalled(); + }); + + test("honors rescannable and numRows overrides", async () => { + const table = makeTable(); + const scannable = await Scannable.fromFactory( + table.schema, + () => table.batches, + { numRows: 7, rescannable: false }, + ); + + expect(scannable.numRows).toBe(7); + expect(scannable.rescannable).toBe(false); + }); + }); + + describe("validation", () => { + test("throws when numRows is negative", async () => { + await expect( + Scannable.fromFactory(makeTable().schema, () => [], { numRows: -1 }), + ).rejects.toThrow(/non-negative/); + }); + + test("throws when numRows is not an integer", async () => { + await expect( + Scannable.fromFactory(makeTable().schema, () => [], { numRows: 3.5 }), + ).rejects.toThrow(/integer/); + }); + }); + + describe("native handle", () => { + test("exposes a native handle via inner", async () => { + const scannable = await Scannable.fromTable(makeTable()); + expect(scannable.inner).toBeDefined(); + expect(typeof scannable.inner).toBe("object"); + expect(scannable.inner).not.toBeNull(); + }); + }); + + // Schema-variety construction tests. Each asserts that construction + // succeeds against a richer Arrow schema, which transitively exercises + // schema serialization and the Rust-side `ipc_file_to_schema` for types + // beyond flat primitives. + describe("schema variety", () => { + test("accepts an empty table", async () => { + const schema = new Schema([new Field("id", new Int32(), true)]); + const table = makeEmptyTable(schema); + const scannable = await Scannable.fromTable(table); + + expect(scannable.numRows).toBe(0); + expect(scannable.schema).toBe(table.schema); + }); + + test("accepts nested struct and list columns", async () => { + const table = makeArrowTable( + [ + { id: 1, point: { x: 0, y: 0 }, tags: ["a", "b"] }, + { id: 2, point: { x: 1, y: 2 }, tags: ["c"] }, + ], + { vectorColumns: {} }, + ); + const scannable = await Scannable.fromTable(table); + + expect(scannable.schema).toBe(table.schema); + expect(scannable.numRows).toBe(2); + }); + + test("accepts a FixedSizeList (vector) column", async () => { + const table = makeArrowTable( + [ + { id: 1, vec: [1, 2, 3] }, + { id: 2, vec: [4, 5, 6] }, + ], + { vectorColumns: { vec: { type: new Float16() } } }, + ); + const scannable = await Scannable.fromTable(table); + + expect(scannable.schema).toBe(table.schema); + expect(scannable.numRows).toBe(2); + }); + + test("accepts a table with many columns", async () => { + const row: Record = {}; + for (let i = 0; i < 50; i++) row[`c${i}`] = i; + const table = makeArrowTable([row, row], { vectorColumns: {} }); + const scannable = await Scannable.fromTable(table); + + expect(scannable.schema.fields.length).toBe(50); + expect(scannable.numRows).toBe(2); + }); + }); +}); diff --git a/nodejs/lancedb/arrow.ts b/nodejs/lancedb/arrow.ts index 84f5ddf7b..587d30b19 100644 --- a/nodejs/lancedb/arrow.ts +++ b/nodejs/lancedb/arrow.ts @@ -1291,6 +1291,18 @@ export async function fromRecordBatchToBuffer( return Buffer.from(await writer.toUint8Array()); } +/** + * Create a buffer containing a single record batch using the Arrow IPC Stream + * serialization. Each call produces a self-contained Stream message (schema + + * batch + EOS) suitable for incremental decode by `arrow_ipc::reader::StreamReader`. + */ +export async function fromRecordBatchToStreamBuffer( + batch: RecordBatch, +): Promise { + const writer = RecordBatchStreamWriter.writeAll([batch]); + return Buffer.from(await writer.toUint8Array()); +} + /** * Serialize an Arrow Table into a buffer using the Arrow IPC Stream serialization * diff --git a/nodejs/lancedb/index.ts b/nodejs/lancedb/index.ts index 8952cf043..e1c08b7b5 100644 --- a/nodejs/lancedb/index.ts +++ b/nodejs/lancedb/index.ts @@ -126,6 +126,7 @@ export { MergeInsertBuilder, WriteExecutionOptions } from "./merge"; export * as embedding from "./embedding"; export { permutationBuilder, PermutationBuilder } from "./permutation"; +export { Scannable, ScannableOptions } from "./scannable"; export * as rerankers from "./rerankers"; export { SchemaLike, diff --git a/nodejs/lancedb/scannable.ts b/nodejs/lancedb/scannable.ts new file mode 100644 index 000000000..9286e7998 --- /dev/null +++ b/nodejs/lancedb/scannable.ts @@ -0,0 +1,274 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The LanceDB Authors + +import { + Table as ArrowTable, + RecordBatch, + RecordBatchReader, + Schema, +} from "apache-arrow"; +import { + fromRecordBatchToStreamBuffer, + fromTableToBuffer, + makeEmptyTable, +} from "./arrow"; +import { NapiScannable } from "./native.js"; + +export interface ScannableOptions { + /** Hint about the number of rows. Not validated against the stream. */ + numRows?: number; + /** + * Whether the source can be scanned more than once. Defaults to `true` for + * `fromTable` / `fromFactory` and `false` for `fromIterable` / + * `fromRecordBatchReader`. + */ + rescannable?: boolean; +} + +/** + * A data source that can be scanned as a stream of Arrow `RecordBatch`es. + * + * `Scannable` wraps the schema + optional row count + rescannable flag and + * a callback that yields batches one at a time. It is passed to consumers + * (e.g. `Table.add`, `createTable`, `mergeInsert` — follow-up work) that + * need to pull data without materializing the full dataset in JS memory. + * + * Batches cross the JS↔Rust boundary as Arrow IPC Stream messages; a fresh + * writer serializes each batch, and the Rust side decodes it with + * `arrow_ipc::reader::StreamReader`. One batch is in flight at a time. + */ +export class Scannable { + readonly schema: Schema; + readonly numRows: number | null; + readonly rescannable: boolean; + + /** @hidden */ + private readonly native: NapiScannable; + + private constructor( + native: NapiScannable, + schema: Schema, + numRows: number | null, + rescannable: boolean, + ) { + this.native = native; + this.schema = schema; + this.numRows = numRows; + this.rescannable = rescannable; + } + + /** @hidden Access the native handle for passing through to Rust consumers. */ + get inner(): NapiScannable { + return this.native; + } + + /** + * Build a Scannable from an explicit schema and a factory that returns a + * fresh batch iterator on each call. + * + * The factory is invoked once per scan. Each iterator yields + * `RecordBatch`es matching the declared schema. Use this when you need + * direct control over the pull loop — for example, to wrap a streaming + * source whose batches are produced lazily. + * + * @param schema - The Arrow schema of the produced batches. + * @param factory - Called at the start of each scan to produce a batch + * iterator. Must be idempotent when `rescannable` is true. + * @param opts - Optional hints. `rescannable` defaults to `true`; set to + * `false` if calling `factory()` twice would not reproduce the same data. + */ + static async fromFactory( + schema: Schema, + factory: () => + | AsyncIterable + | Iterable + | AsyncIterator + | Iterator, + opts: ScannableOptions = {}, + ): Promise { + const numRows = opts.numRows ?? null; + if (numRows != null && !Number.isInteger(numRows)) { + throw new TypeError("numRows must be an integer"); + } + const rescannable = opts.rescannable ?? true; + + let iter: AsyncIterator | Iterator | null = null; + const getNextBatch = async (isStart: boolean): Promise => { + // `isStart` is true on the first pull of every new scan_as_stream. + // Drop any cached iterator so factory() is re-invoked for the next scan + if (isStart) { + iter = null; + } + if (iter === null) { + iter = normalizeIterator(factory()); + } + const result = await iter.next(); + if (result.done) { + iter = null; + return null; + } + return fromRecordBatchToStreamBuffer(result.value); + }; + + const schemaBuf = await fromTableToBuffer(makeEmptyTable(schema)); + const native = new NapiScannable( + schemaBuf, + numRows, + rescannable, + getNextBatch, + ); + return new Scannable(native, schema, numRows, rescannable); + } + + /** + * Build a Scannable from an in-memory Arrow `Table`. Always rescannable; + * the table's batches are replayed on each scan. + * + * The table's row count is authoritative: `opts.numRows` must either be + * omitted or equal to `table.numRows`. `opts.rescannable` of `false` is + * rejected because in-memory Tables are always rescannable. + */ + static async fromTable( + table: ArrowTable, + opts: ScannableOptions = {}, + ): Promise { + if (opts.numRows != null && opts.numRows !== table.numRows) { + throw new TypeError( + `opts.numRows (${opts.numRows}) does not match table.numRows (${table.numRows}). ` + + `The table's row count is authoritative; omit numRows or pass the matching value.`, + ); + } + if (opts.rescannable === false) { + throw new TypeError( + `fromTable does not accept rescannable: false. ` + + `In-memory Arrow Tables are always rescannable; omit the option or pass true.`, + ); + } + return Scannable.fromFactory(table.schema, () => table.batches, { + numRows: table.numRows, + rescannable: true, + }); + } + + /** + * Build a Scannable from an iterable of `RecordBatch`es. `rescannable` + * defaults to `false`. Pass an explicit schema so the consumer can + * validate before any batch is pulled. + * + * `opts.rescannable: true` is honest for replayable iterables (Arrays, + * Sets, or custom iterables whose `[Symbol.iterator]()` returns a fresh + * iterator each call). It is rejected for one-shot iterables (generators, + * async generators, or already-an-iterator inputs) because their + * `[Symbol.iterator]()` returns the same exhausted object on the second + * scan. For replayable sources outside this shape, use + * `fromFactory(schema, () => createIter(), { rescannable: true })`. + * + * Note: when `opts.rescannable` is `true`, the constructor calls + * `[Symbol.iterator]()` once on the input to perform the structural check. + */ + static async fromIterable( + schema: Schema, + iter: AsyncIterable | Iterable, + opts: ScannableOptions = {}, + ): Promise { + if (opts.rescannable === true && isOneShotIterable(iter)) { + throw new TypeError( + `fromIterable: rescannable: true is not honest for one-shot iterables ` + + `(generators, async generators, or iterators where [Symbol.iterator]() ` + + `returns the same object). The source would be exhausted after the first scan. ` + + `Use fromFactory(schema, () => createIter(), { rescannable: true }) for sources ` + + `where each call mints a fresh iterator.`, + ); + } + return Scannable.fromFactory(schema, () => iter, { + numRows: opts.numRows, + rescannable: opts.rescannable ?? false, + }); + } + + /** + * Build a Scannable from an Arrow `RecordBatchReader`. A reader can only + * be consumed once; `rescannable` defaults to `false`. + * + * The reader must already be opened (via `.open()`) so its `.schema` is + * populated. `RecordBatchReader.from(...)` returns an unopened reader. + * + * `opts.rescannable: true` is rejected because `RecordBatchReader` is a + * self-iterator (its `[Symbol.iterator]()` returns itself), and this + * constructor does not call `reader.reset()` between scans, so a second + * scan would always see an exhausted reader. For genuinely replayable + * sources, use + * `fromFactory(schema, () => openReader(), { rescannable: true })`, + * which mints a fresh reader on each scan. + */ + static async fromRecordBatchReader( + reader: RecordBatchReader, + opts: ScannableOptions = {}, + ): Promise { + if (opts.rescannable === true) { + throw new TypeError( + `fromRecordBatchReader does not accept rescannable: true. ` + + `RecordBatchReader is a self-iterator (its [Symbol.iterator]() ` + + `returns itself) and would be exhausted after the first scan. ` + + `Use fromFactory(schema, () => openReader(), { rescannable: true }) ` + + `for sources where each call mints a fresh reader.`, + ); + } + return Scannable.fromFactory(reader.schema, () => reader, { + numRows: opts.numRows, + rescannable: false, + }); + } +} + +function normalizeIterator( + source: AsyncIterable | Iterable | AsyncIterator | Iterator, +): AsyncIterator | Iterator { + if (source == null) { + throw new TypeError("Scannable factory returned null/undefined"); + } + if ( + typeof (source as AsyncIterable)[Symbol.asyncIterator] === "function" + ) { + return (source as AsyncIterable)[Symbol.asyncIterator](); + } + if (typeof (source as Iterable)[Symbol.iterator] === "function") { + return (source as Iterable)[Symbol.iterator](); + } + // Already an iterator (has `.next`). + if (typeof (source as Iterator).next === "function") { + return source as Iterator; + } + throw new TypeError("Scannable factory returned a non-iterable value"); +} + +// A "self-iterator" returns the same object from `[Symbol.iterator]()` / +// `[Symbol.asyncIterator]()`. Generators behave this way, so they exhaust +// after one pass. Replayable iterables (Array, Set, custom) return a fresh +// iterator each call. Detection mirrors `normalizeIterator`'s ordering so +// classification matches scan-time behavior. +function isOneShotIterable( + source: AsyncIterable | Iterable, +): boolean { + // null/undefined are not one-shot in any meaningful sense; let + // `normalizeIterator` raise the actual error at scan time. + if (source == null) return false; + const ref = source as unknown; + if ( + typeof (source as AsyncIterable)[Symbol.asyncIterator] === + "function" + ) { + const it = (source as AsyncIterable)[ + Symbol.asyncIterator + ]() as unknown; + return it === ref; + } + if (typeof (source as Iterable)[Symbol.iterator] === "function") { + const it = (source as Iterable)[Symbol.iterator]() as unknown; + return it === ref; + } + // Already-an-iterator (has `.next` but no `Symbol.iterator`) is by + // definition one-shot. + if (typeof (source as { next?: unknown }).next === "function") return true; + return false; +} diff --git a/nodejs/src/lib.rs b/nodejs/src/lib.rs index dab6bad67..f241fb81f 100644 --- a/nodejs/src/lib.rs +++ b/nodejs/src/lib.rs @@ -16,6 +16,7 @@ pub mod permutation; mod query; pub mod remote; mod rerankers; +mod scannable; mod session; mod table; mod util; diff --git a/nodejs/src/scannable.rs b/nodejs/src/scannable.rs new file mode 100644 index 000000000..fab013716 --- /dev/null +++ b/nodejs/src/scannable.rs @@ -0,0 +1,253 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The LanceDB Authors + +//! NodeJS binding for the [`lancedb::data::scannable::Scannable`] trait. +//! +//! The JS side supplies a `getNextBatch(isStart)` callback that returns the +//! next Arrow `RecordBatch` encoded as a self-contained Arrow IPC Stream +//! message (schema message + record batch message + EOS marker) wrapped in a +//! `Buffer`, or `null` when the stream is exhausted. The Rust side parses +//! each buffer with `arrow_ipc::reader::StreamReader`, validates every +//! standalone batch stream against the declared schema, and yields decoded +//! `RecordBatch`es as a [`SendableRecordBatchStream`]. +//! +//! `isStart` is `true` on the first `getNextBatch` call of each new +//! `scan_as_stream` and `false` thereafter. JS uses it to drop any cached +//! iterator and re-invoke its factory at scan boundaries, so retries +//! triggered by mid-stream failures restart at batch 0. + +use std::io::Cursor; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_ipc::reader::StreamReader; +use arrow_schema::SchemaRef; +use futures::stream::once; +use lancedb::arrow::{SendableRecordBatchStream, SimpleRecordBatchStream}; +use lancedb::data::scannable::Scannable as LanceScannable; +use lancedb::ipc::ipc_file_to_schema; +use lancedb::{Error, Result as LanceResult}; +use napi::bindgen_prelude::*; +use napi::threadsafe_function::ThreadsafeFunction; +use napi_derive::napi; + +/// Threadsafe handle to the JS `getNextBatch` callback. The callback takes a +/// single boolean `isStart` (`true` on the first call of each new scan) and +/// returns a Promise that resolves to a `Buffer` containing one IPC Stream +/// message, or `null` at end-of-stream. +type GetNextBatchFn = ThreadsafeFunction>, bool, Status, false>; + +/// A Rust-side view of a JS-constructed `Scannable`. +/// +/// Held in JS as the return value of the `Scannable` class constructor. When +/// passed to a consumer that accepts `impl lancedb::data::scannable::Scannable`, +/// the consumer invokes `scan_as_stream()` to pull batches through the JS +/// callback. +#[napi] +pub struct NapiScannable { + schema: SchemaRef, + num_rows: Option, + rescannable: bool, + // `ThreadsafeFunction` is not `Clone`; wrap in `Arc` so the stream + // returned by `scan_as_stream` can own a handle independent of `self`. + get_next_batch: Arc, + // Tracks whether a scan has already started; used to enforce one-shot + // semantics on non-rescannable sources. + scanned: bool, +} + +#[napi] +impl NapiScannable { + /// Construct a new `NapiScannable`. + /// + /// - `schema_buf` — Arrow IPC File buffer carrying only the schema (no batches). + /// - `num_rows` — optional row count hint; not validated against the stream. + /// - `rescannable` — whether `get_next_batch` may be re-driven after the + /// scan completes. + /// - `get_next_batch` -- JS callback that yields the next batch as an Arrow + /// IPC Stream message wrapped in a `Buffer`, or `null` at EOF. The + /// `isStart` argument is `true` on the first call of each new scan; + /// JS uses it to discard any cached iterator before pulling. + #[napi(constructor)] + pub fn new( + schema_buf: Buffer, + num_rows: Option, + rescannable: bool, + get_next_batch: Function>>, + ) -> napi::Result { + let schema = ipc_file_to_schema(schema_buf.to_vec()) + .map_err(|e| napi::Error::from_reason(format!("Invalid schema buffer: {}", e)))?; + let num_rows = num_rows + .map(|n| { + usize::try_from(n) + .map_err(|_| napi::Error::from_reason("num_rows must be non-negative")) + }) + .transpose()?; + let get_next_batch = Arc::new(get_next_batch.build_threadsafe_function().build()?); + Ok(Self { + schema, + num_rows, + rescannable, + get_next_batch, + scanned: false, + }) + } +} + +impl std::fmt::Debug for NapiScannable { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NapiScannable") + .field("schema", &self.schema) + .field("num_rows", &self.num_rows) + .field("rescannable", &self.rescannable) + .finish() + } +} + +impl LanceScannable for NapiScannable { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn scan_as_stream(&mut self) -> SendableRecordBatchStream { + let schema = self.schema.clone(); + + // One-shot enforcement for non-rescannable sources: return a stream + // whose first item is an error. + if self.scanned && !self.rescannable { + let err_stream = once(async { + Err(Error::InvalidInput { + message: "Scannable has already been consumed (non-rescannable source)" + .to_string(), + }) + }); + return Box::pin(SimpleRecordBatchStream::new(err_stream, schema)); + } + self.scanned = true; + + let tsfn = Arc::clone(&self.get_next_batch); + let declared_schema = schema.clone(); + + // State threaded through the unfold. `is_first_pull` starts true so + // the first call into JS signals a new-scan boundary; JS uses it to + // reset any cached iterator before factory()-ing a fresh one. + let initial = State { + tsfn, + batch_index: 0, + declared_schema, + errored: false, + is_first_pull: true, + }; + + let stream = futures::stream::unfold(initial, |mut state| async move { + if state.errored { + return None; + } + + // Pull the next IPC Stream buffer from JS. `is_first_pull` is + // consumed here and cleared so subsequent pulls continue the + // same scan rather than restarting it. + let is_start = state.is_first_pull; + state.is_first_pull = false; + let buf = match pull_next(&state.tsfn, is_start).await { + Ok(Some(buf)) => buf, + Ok(None) => return None, + Err(e) => { + state.errored = true; + return Some((Err(e), state)); + } + }; + + match decode_one_batch(buf.as_ref(), &state.declared_schema) { + Ok(batch) => { + state.batch_index += 1; + Some((Ok(batch), state)) + } + Err(e) => { + let tagged = Error::Runtime { + message: format!( + "[scannable/rust-bridge] failure at batch index {}: {}", + state.batch_index, e + ), + }; + state.errored = true; + Some((Err(tagged), state)) + } + } + }); + + Box::pin(SimpleRecordBatchStream::new(stream, schema)) + } + + fn num_rows(&self) -> Option { + self.num_rows + } + + fn rescannable(&self) -> bool { + self.rescannable + } +} + +struct State { + tsfn: Arc, + batch_index: usize, + declared_schema: SchemaRef, + errored: bool, + /// True for the very first pull of a new scan. Forwarded to JS so the + /// callback can drop any cached iterator and call its factory fresh, + /// which makes rescannable sources restart at batch 0 even when the + /// previous scan ended mid-stream. + is_first_pull: bool, +} + +/// Invoke the JS callback and await its Promise. `is_start` is forwarded to +/// the JS side as the `isStart` argument so it can reset its iterator at the +/// scan boundary. Errors on the JS side surface here as rejected promises +/// and are tunneled back as `lancedb::Error::Runtime`. +async fn pull_next(tsfn: &GetNextBatchFn, is_start: bool) -> LanceResult> { + let promise = tsfn + .call_async(is_start) + .await + .map_err(|e| Error::Runtime { + message: format!( + "[scannable/js-factory] napi error status={}, reason={}", + e.status, e.reason + ), + })?; + promise.await.map_err(|e| Error::Runtime { + message: format!( + "[scannable/js-iterator] napi error status={}, reason={}", + e.status, e.reason + ), + }) +} + +/// Decode one IPC Stream buffer (schema + batch + EOS) into a `RecordBatch`. +/// Each buffer is a standalone IPC stream, so every decoded stream schema must +/// match the one declared at construction. +fn decode_one_batch(buf: &[u8], declared: &SchemaRef) -> LanceResult { + let reader = StreamReader::try_new(Cursor::new(buf), None).map_err(|e| Error::Runtime { + message: format!("failed to open IPC stream reader: {}", e), + })?; + + let actual = reader.schema(); + if actual.as_ref() != declared.as_ref() { + return Err(Error::InvalidInput { + message: format!( + "declared schema does not match stream schema: declared={:?} actual={:?}", + declared, actual + ), + }); + } + + let mut iter = reader; + let batch = iter + .next() + .ok_or_else(|| Error::Runtime { + message: "IPC stream contained schema but no record batch".to_string(), + })? + .map_err(|e| Error::Runtime { + message: format!("failed to decode record batch: {}", e), + })?; + Ok(batch) +}