refactor(node): async generator for RecordBatchIterator (#2744)

JS native Async Generator, more efficient asynchronous iteration, fewer
synthetic promises, and the ability to handle `catch` or `break` of
parent loop in `finally` block
This commit is contained in:
S.A.N
2025-10-30 22:36:24 +01:00
committed by GitHub
parent 45255be42c
commit 20bec61ecb
8 changed files with 44 additions and 81 deletions

View File

@@ -20,35 +20,25 @@ import {
} from "./native";
import { Reranker } from "./rerankers";
export class RecordBatchIterator implements AsyncIterator<RecordBatch> {
private promisedInner?: Promise<NativeBatchIterator>;
private inner?: NativeBatchIterator;
export async function* RecordBatchIterator(
promisedInner: Promise<NativeBatchIterator>,
) {
const inner = await promisedInner;
constructor(promise?: Promise<NativeBatchIterator>) {
// TODO: check promise reliably so we dont need to pass two arguments.
this.promisedInner = promise;
if (inner === undefined) {
throw new Error("Invalid iterator state");
}
// biome-ignore lint/suspicious/noExplicitAny: skip
async next(): Promise<IteratorResult<RecordBatch<any>>> {
if (this.inner === undefined) {
this.inner = await this.promisedInner;
}
if (this.inner === undefined) {
throw new Error("Invalid iterator state state");
}
const n = await this.inner.next();
if (n == null) {
return Promise.resolve({ done: true, value: null });
}
const tbl = tableFromIPC(n);
if (tbl.batches.length != 1) {
for (let buffer = await inner.next(); buffer; buffer = await inner.next()) {
const { batches } = tableFromIPC(buffer);
if (batches.length !== 1) {
throw new Error("Expected only one batch");
}
return Promise.resolve({ done: false, value: tbl.batches[0] });
yield batches[0];
}
}
/* eslint-enable */
class RecordBatchIterable<
NativeQueryType extends NativeQuery | NativeVectorQuery | NativeTakeQuery,
@@ -64,7 +54,7 @@ class RecordBatchIterable<
// biome-ignore lint/suspicious/noExplicitAny: skip
[Symbol.asyncIterator](): AsyncIterator<RecordBatch<any>, any, undefined> {
return new RecordBatchIterator(
return RecordBatchIterator(
this.inner.execute(this.options?.maxBatchLength, this.options?.timeoutMs),
);
}
@@ -231,10 +221,8 @@ export class QueryBase<
* single query)
*
*/
protected execute(
options?: Partial<QueryExecutionOptions>,
): RecordBatchIterator {
return new RecordBatchIterator(this.nativeExecute(options));
protected execute(options?: Partial<QueryExecutionOptions>) {
return RecordBatchIterator(this.nativeExecute(options));
}
/**
@@ -242,8 +230,7 @@ export class QueryBase<
*/
// biome-ignore lint/suspicious/noExplicitAny: skip
[Symbol.asyncIterator](): AsyncIterator<RecordBatch<any>> {
const promise = this.nativeExecute();
return new RecordBatchIterator(promise);
return RecordBatchIterator(this.nativeExecute());
}
/** Collect the results as an Arrow @see {@link ArrowTable}. */