feat(nodejs): add Table.add progress callback

This commit is contained in:
Brendan Clement
2026-05-18 09:08:58 -07:00
parent 8df2fff75f
commit aed71c3bc0
4 changed files with 133 additions and 3 deletions

View File

@@ -115,6 +115,31 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
await expect(table.countRows()).resolves.toBe(1);
});
it("should invoke the progress callback", async () => {
const events: import("../lancedb").WriteProgress[] = [];
await table.add([{ id: 1 }, { id: 2 }, { id: 3 }], {
progress: (p) => events.push(p),
});
expect(events.length).toBeGreaterThan(0);
const last = events[events.length - 1];
expect(last.done).toBe(true);
// Earlier callbacks must have done=false.
for (const ev of events.slice(0, -1)) {
expect(ev.done).toBe(false);
}
// outputRows reflects the rows added in this call, not table size.
expect(last.outputRows).toBe(3);
// The input source (an array) reports a row count, so totalRows is set.
expect(last.totalRows).toBe(3);
// outputRows is monotonic.
for (let i = 1; i < events.length; i++) {
expect(events[i].outputRows).toBeGreaterThanOrEqual(
events[i - 1].outputRows,
);
}
});
it("should let me close the table", async () => {
expect(table.isOpen()).toBe(true);
table.close();

View File

@@ -113,6 +113,7 @@ export {
UpdateOptions,
OptimizeOptions,
Version,
WriteProgress,
LsmWriteSpec,
ColumnAlteration,
} from "./table";

View File

@@ -46,6 +46,33 @@ import { sanitizeType } from "./sanitize";
import { IntoSql, toSQL } from "./util";
export { IndexConfig } from "./native";
/**
* Progress snapshot for a write operation, delivered to the `progress`
* callback passed to {@link Table.add}.
*/
export interface WriteProgress {
/** Number of rows written so far. */
outputRows: number;
/** Number of bytes written so far. */
outputBytes: number;
/**
* Total rows expected, when the input source reports it.
*
* Always set on the final callback (the one with `done: true`), falling
* back to the actual number of rows written when the source could not
* report a row count up front.
*/
totalRows?: number;
/** Wall-clock seconds since the write started. */
elapsedSeconds: number;
/** Number of parallel write tasks currently in flight. */
activeTasks: number;
/** Total number of parallel write tasks (the write parallelism). */
totalTasks: number;
/** `true` for the final callback; `false` otherwise. */
done: boolean;
}
/**
* Options for adding data to a table.
*/
@@ -56,6 +83,25 @@ export interface AddDataOptions {
* If "overwrite" then the new data will replace the existing data in the table.
*/
mode: "append" | "overwrite";
/**
* Optional callback invoked periodically with write progress.
*
* The callback is fired once per batch written and once more with
* `done: true` when the write completes. Calls are non-blocking — if the
* callback is slow, intermediate updates may be dropped to avoid stalling
* the write.
*
* @example
* ```ts
* await table.add(data, {
* progress: (p) => {
* console.log(`${p.outputRows}/${p.totalRows ?? "?"} rows`);
* },
* });
* ```
*/
progress: (progress: WriteProgress) => void;
}
export interface UpdateOptions {
@@ -705,7 +751,7 @@ export class LocalTable extends Table {
const schema = await this.schema();
const buffer = await fromDataToBuffer(data, undefined, schema);
return await this.inner.add(buffer, mode);
return await this.inner.add(buffer, mode, options?.progress);
}
async update(

View File

@@ -9,6 +9,7 @@ use lancedb::table::{
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
};
use napi::bindgen_prelude::*;
use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode};
use napi_derive::napi;
use crate::error::NapiErrorExt;
@@ -67,8 +68,16 @@ impl Table {
schema_to_buffer(&schema)
}
#[napi(catch_unwind)]
pub async fn add(&self, buf: Buffer, mode: String) -> napi::Result<AddResult> {
#[napi(
catch_unwind,
ts_args_type = "buf: Buffer, mode: string, progressCallback?: (progress: WriteProgressInfo) => void"
)]
pub async fn add(
&self,
buf: Buffer,
mode: String,
progress_callback: Option<ProgressFn>,
) -> napi::Result<AddResult> {
let batches = ipc_file_to_batches(buf.to_vec())
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
let batches = batches
@@ -92,6 +101,17 @@ impl Table {
return Err(napi::Error::from_reason(format!("Invalid mode: {}", mode)));
};
if let Some(tsfn) = progress_callback {
op = op.progress(move |p| {
// Non-blocking: drop progress events rather than back-pressuring
// the write if the JS callback can't keep up.
tsfn.call(
WriteProgressInfo::from(p),
ThreadsafeFunctionCallMode::NonBlocking,
);
});
}
let res = op.execute().await.default_error()?;
Ok(res.into())
}
@@ -654,6 +674,44 @@ pub struct OptimizeStats {
pub prune: RemovalStats,
}
/// Progress snapshot for a write operation, delivered to the JS callback
/// passed to `Table.add`.
#[napi(object)]
#[derive(Clone, Debug)]
pub struct WriteProgressInfo {
/// Number of rows written so far.
pub output_rows: i64,
/// Number of bytes written so far.
pub output_bytes: i64,
/// Total rows expected, if the input source reports it.
/// Always set on the final callback (where `done` is `true`).
pub total_rows: Option<i64>,
/// Wall-clock seconds since monitoring started.
pub elapsed_seconds: f64,
/// Number of parallel write tasks currently in flight.
pub active_tasks: i64,
/// Total number of parallel write tasks (the write parallelism).
pub total_tasks: i64,
/// `true` for the final callback; `false` otherwise.
pub done: bool,
}
impl From<&lancedb::table::write_progress::WriteProgress> for WriteProgressInfo {
fn from(p: &lancedb::table::write_progress::WriteProgress) -> Self {
Self {
output_rows: p.output_rows() as i64,
output_bytes: p.output_bytes() as i64,
total_rows: p.total_rows().map(|n| n as i64),
elapsed_seconds: p.elapsed().as_secs_f64(),
active_tasks: p.active_tasks() as i64,
total_tasks: p.total_tasks() as i64,
done: p.done(),
}
}
}
type ProgressFn = ThreadsafeFunction<WriteProgressInfo, (), WriteProgressInfo, Status, false>;
/// A definition of a column alteration. The alteration changes the column at
/// `path` to have the new name `name`, to be nullable if `nullable` is true,
/// and to have the data type `data_type`. At least one of `rename` or `nullable`