From 20bec61ecb91234748bf2b390cf72184f929e0d1 Mon Sep 17 00:00:00 2001 From: "S.A.N" Date: Thu, 30 Oct 2025 22:36:24 +0100 Subject: [PATCH] refactor(node): async generator for RecordBatchIterator (#2744) JS native Async Generator, more efficient asynchronous iteration, fewer synthetic promises, and the ability to handle `catch` or `break` of parent loop in `finally` block --- docs/src/js/classes/Query.md | 4 +- docs/src/js/classes/QueryBase.md | 4 +- docs/src/js/classes/RecordBatchIterator.md | 43 ------------------- docs/src/js/classes/TakeQuery.md | 4 +- docs/src/js/classes/VectorQuery.md | 4 +- docs/src/js/functions/RecordBatchIterator.md | 19 +++++++++ docs/src/js/globals.md | 2 +- nodejs/lancedb/query.ts | 45 +++++++------------- 8 files changed, 44 insertions(+), 81 deletions(-) delete mode 100644 docs/src/js/classes/RecordBatchIterator.md create mode 100644 docs/src/js/functions/RecordBatchIterator.md diff --git a/docs/src/js/classes/Query.md b/docs/src/js/classes/Query.md index c10c2f28..b6906906 100644 --- a/docs/src/js/classes/Query.md +++ b/docs/src/js/classes/Query.md @@ -80,7 +80,7 @@ AnalyzeExec verbose=true, metrics=[] ### execute() ```ts -protected execute(options?): RecordBatchIterator +protected execute(options?): AsyncGenerator, void, unknown> ``` Execute the query and return the results as an @@ -91,7 +91,7 @@ Execute the query and return the results as an #### Returns -[`RecordBatchIterator`](RecordBatchIterator.md) +`AsyncGenerator`<`RecordBatch`<`any`>, `void`, `unknown`> #### See diff --git a/docs/src/js/classes/QueryBase.md b/docs/src/js/classes/QueryBase.md index 91aae97b..1dd81ff9 100644 --- a/docs/src/js/classes/QueryBase.md +++ b/docs/src/js/classes/QueryBase.md @@ -81,7 +81,7 @@ AnalyzeExec verbose=true, metrics=[] ### execute() ```ts -protected execute(options?): RecordBatchIterator +protected execute(options?): AsyncGenerator, void, unknown> ``` Execute the query and return the results as an @@ -92,7 +92,7 @@ Execute the query and return the results as an #### Returns -[`RecordBatchIterator`](RecordBatchIterator.md) +`AsyncGenerator`<`RecordBatch`<`any`>, `void`, `unknown`> #### See diff --git a/docs/src/js/classes/RecordBatchIterator.md b/docs/src/js/classes/RecordBatchIterator.md deleted file mode 100644 index f77e09dc..00000000 --- a/docs/src/js/classes/RecordBatchIterator.md +++ /dev/null @@ -1,43 +0,0 @@ -[**@lancedb/lancedb**](../README.md) • **Docs** - -*** - -[@lancedb/lancedb](../globals.md) / RecordBatchIterator - -# Class: RecordBatchIterator - -## Implements - -- `AsyncIterator`<`RecordBatch`> - -## Constructors - -### new RecordBatchIterator() - -```ts -new RecordBatchIterator(promise?): RecordBatchIterator -``` - -#### Parameters - -* **promise?**: `Promise`<`RecordBatchIterator`> - -#### Returns - -[`RecordBatchIterator`](RecordBatchIterator.md) - -## Methods - -### next() - -```ts -next(): Promise, any>> -``` - -#### Returns - -`Promise`<`IteratorResult`<`RecordBatch`<`any`>, `any`>> - -#### Implementation of - -`AsyncIterator.next` diff --git a/docs/src/js/classes/TakeQuery.md b/docs/src/js/classes/TakeQuery.md index 4b1d168d..1329b99d 100644 --- a/docs/src/js/classes/TakeQuery.md +++ b/docs/src/js/classes/TakeQuery.md @@ -76,7 +76,7 @@ AnalyzeExec verbose=true, metrics=[] ### execute() ```ts -protected execute(options?): RecordBatchIterator +protected execute(options?): AsyncGenerator, void, unknown> ``` Execute the query and return the results as an @@ -87,7 +87,7 @@ Execute the query and return the results as an #### Returns -[`RecordBatchIterator`](RecordBatchIterator.md) +`AsyncGenerator`<`RecordBatch`<`any`>, `void`, `unknown`> #### See diff --git a/docs/src/js/classes/VectorQuery.md b/docs/src/js/classes/VectorQuery.md index 05554f91..646c65cb 100644 --- a/docs/src/js/classes/VectorQuery.md +++ b/docs/src/js/classes/VectorQuery.md @@ -221,7 +221,7 @@ also increase the latency of your query. The default value is 1.5*limit. ### execute() ```ts -protected execute(options?): RecordBatchIterator +protected execute(options?): AsyncGenerator, void, unknown> ``` Execute the query and return the results as an @@ -232,7 +232,7 @@ Execute the query and return the results as an #### Returns -[`RecordBatchIterator`](RecordBatchIterator.md) +`AsyncGenerator`<`RecordBatch`<`any`>, `void`, `unknown`> #### See diff --git a/docs/src/js/functions/RecordBatchIterator.md b/docs/src/js/functions/RecordBatchIterator.md new file mode 100644 index 00000000..1ab5a8ef --- /dev/null +++ b/docs/src/js/functions/RecordBatchIterator.md @@ -0,0 +1,19 @@ +[**@lancedb/lancedb**](../README.md) • **Docs** + +*** + +[@lancedb/lancedb](../globals.md) / RecordBatchIterator + +# Function: RecordBatchIterator() + +```ts +function RecordBatchIterator(promisedInner): AsyncGenerator, void, unknown> +``` + +## Parameters + +* **promisedInner**: `Promise`<`RecordBatchIterator`> + +## Returns + +`AsyncGenerator`<`RecordBatch`<`any`>, `void`, `unknown`> diff --git a/docs/src/js/globals.md b/docs/src/js/globals.md index 462e6a99..b3d61023 100644 --- a/docs/src/js/globals.md +++ b/docs/src/js/globals.md @@ -32,7 +32,6 @@ - [PhraseQuery](classes/PhraseQuery.md) - [Query](classes/Query.md) - [QueryBase](classes/QueryBase.md) -- [RecordBatchIterator](classes/RecordBatchIterator.md) - [Session](classes/Session.md) - [StaticHeaderProvider](classes/StaticHeaderProvider.md) - [Table](classes/Table.md) @@ -105,6 +104,7 @@ ## Functions +- [RecordBatchIterator](functions/RecordBatchIterator.md) - [connect](functions/connect.md) - [makeArrowTable](functions/makeArrowTable.md) - [packBits](functions/packBits.md) diff --git a/nodejs/lancedb/query.ts b/nodejs/lancedb/query.ts index 19c87c70..46de6c3b 100644 --- a/nodejs/lancedb/query.ts +++ b/nodejs/lancedb/query.ts @@ -20,35 +20,25 @@ import { } from "./native"; import { Reranker } from "./rerankers"; -export class RecordBatchIterator implements AsyncIterator { - private promisedInner?: Promise; - private inner?: NativeBatchIterator; +export async function* RecordBatchIterator( + promisedInner: Promise, +) { + const inner = await promisedInner; - constructor(promise?: Promise) { - // TODO: check promise reliably so we dont need to pass two arguments. - this.promisedInner = promise; + if (inner === undefined) { + throw new Error("Invalid iterator state"); } - // biome-ignore lint/suspicious/noExplicitAny: skip - async next(): Promise>> { - if (this.inner === undefined) { - this.inner = await this.promisedInner; - } - if (this.inner === undefined) { - throw new Error("Invalid iterator state state"); - } - const n = await this.inner.next(); - if (n == null) { - return Promise.resolve({ done: true, value: null }); - } - const tbl = tableFromIPC(n); - if (tbl.batches.length != 1) { + for (let buffer = await inner.next(); buffer; buffer = await inner.next()) { + const { batches } = tableFromIPC(buffer); + + if (batches.length !== 1) { throw new Error("Expected only one batch"); } - return Promise.resolve({ done: false, value: tbl.batches[0] }); + + yield batches[0]; } } -/* eslint-enable */ class RecordBatchIterable< NativeQueryType extends NativeQuery | NativeVectorQuery | NativeTakeQuery, @@ -64,7 +54,7 @@ class RecordBatchIterable< // biome-ignore lint/suspicious/noExplicitAny: skip [Symbol.asyncIterator](): AsyncIterator, any, undefined> { - return new RecordBatchIterator( + return RecordBatchIterator( this.inner.execute(this.options?.maxBatchLength, this.options?.timeoutMs), ); } @@ -231,10 +221,8 @@ export class QueryBase< * single query) * */ - protected execute( - options?: Partial, - ): RecordBatchIterator { - return new RecordBatchIterator(this.nativeExecute(options)); + protected execute(options?: Partial) { + return RecordBatchIterator(this.nativeExecute(options)); } /** @@ -242,8 +230,7 @@ export class QueryBase< */ // biome-ignore lint/suspicious/noExplicitAny: skip [Symbol.asyncIterator](): AsyncIterator> { - const promise = this.nativeExecute(); - return new RecordBatchIterator(promise); + return RecordBatchIterator(this.nativeExecute()); } /** Collect the results as an Arrow @see {@link ArrowTable}. */