diff --git a/nodejs/__test__/table.test.ts b/nodejs/__test__/table.test.ts index 23f65da4e..6d54906e2 100644 --- a/nodejs/__test__/table.test.ts +++ b/nodejs/__test__/table.test.ts @@ -115,6 +115,31 @@ describe.each([arrow15, arrow16, arrow17, arrow18])( await expect(table.countRows()).resolves.toBe(1); }); + it("should invoke the progress callback", async () => { + const events: import("../lancedb").WriteProgress[] = []; + await table.add([{ id: 1 }, { id: 2 }, { id: 3 }], { + progress: (p) => events.push(p), + }); + + expect(events.length).toBeGreaterThan(0); + const last = events[events.length - 1]; + expect(last.done).toBe(true); + // Earlier callbacks must have done=false. + for (const ev of events.slice(0, -1)) { + expect(ev.done).toBe(false); + } + // outputRows reflects the rows added in this call, not table size. + expect(last.outputRows).toBe(3); + // The input source (an array) reports a row count, so totalRows is set. + expect(last.totalRows).toBe(3); + // outputRows is monotonic. + for (let i = 1; i < events.length; i++) { + expect(events[i].outputRows).toBeGreaterThanOrEqual( + events[i - 1].outputRows, + ); + } + }); + it("should let me close the table", async () => { expect(table.isOpen()).toBe(true); table.close(); diff --git a/nodejs/lancedb/index.ts b/nodejs/lancedb/index.ts index fc6d7777a..1110e427e 100644 --- a/nodejs/lancedb/index.ts +++ b/nodejs/lancedb/index.ts @@ -113,6 +113,7 @@ export { UpdateOptions, OptimizeOptions, Version, + WriteProgress, LsmWriteSpec, ColumnAlteration, } from "./table"; diff --git a/nodejs/lancedb/table.ts b/nodejs/lancedb/table.ts index 59816a413..1503f7553 100644 --- a/nodejs/lancedb/table.ts +++ b/nodejs/lancedb/table.ts @@ -46,6 +46,33 @@ import { sanitizeType } from "./sanitize"; import { IntoSql, toSQL } from "./util"; export { IndexConfig } from "./native"; +/** + * Progress snapshot for a write operation, delivered to the `progress` + * callback passed to {@link Table.add}. + */ +export interface WriteProgress { + /** Number of rows written so far. */ + outputRows: number; + /** Number of bytes written so far. */ + outputBytes: number; + /** + * Total rows expected, when the input source reports it. + * + * Always set on the final callback (the one with `done: true`), falling + * back to the actual number of rows written when the source could not + * report a row count up front. + */ + totalRows?: number; + /** Wall-clock seconds since the write started. */ + elapsedSeconds: number; + /** Number of parallel write tasks currently in flight. */ + activeTasks: number; + /** Total number of parallel write tasks (the write parallelism). */ + totalTasks: number; + /** `true` for the final callback; `false` otherwise. */ + done: boolean; +} + /** * Options for adding data to a table. */ @@ -56,6 +83,25 @@ export interface AddDataOptions { * If "overwrite" then the new data will replace the existing data in the table. */ mode: "append" | "overwrite"; + + /** + * Optional callback invoked periodically with write progress. + * + * The callback is fired once per batch written and once more with + * `done: true` when the write completes. Calls are non-blocking — if the + * callback is slow, intermediate updates may be dropped to avoid stalling + * the write. + * + * @example + * ```ts + * await table.add(data, { + * progress: (p) => { + * console.log(`${p.outputRows}/${p.totalRows ?? "?"} rows`); + * }, + * }); + * ``` + */ + progress: (progress: WriteProgress) => void; } export interface UpdateOptions { @@ -705,7 +751,7 @@ export class LocalTable extends Table { const schema = await this.schema(); const buffer = await fromDataToBuffer(data, undefined, schema); - return await this.inner.add(buffer, mode); + return await this.inner.add(buffer, mode, options?.progress); } async update( diff --git a/nodejs/src/table.rs b/nodejs/src/table.rs index 29bf7bba4..6f526e9fd 100644 --- a/nodejs/src/table.rs +++ b/nodejs/src/table.rs @@ -9,6 +9,7 @@ use lancedb::table::{ OptimizeAction, OptimizeOptions, Table as LanceDbTable, }; use napi::bindgen_prelude::*; +use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode}; use napi_derive::napi; use crate::error::NapiErrorExt; @@ -67,8 +68,16 @@ impl Table { schema_to_buffer(&schema) } - #[napi(catch_unwind)] - pub async fn add(&self, buf: Buffer, mode: String) -> napi::Result { + #[napi( + catch_unwind, + ts_args_type = "buf: Buffer, mode: string, progressCallback?: (progress: WriteProgressInfo) => void" + )] + pub async fn add( + &self, + buf: Buffer, + mode: String, + progress_callback: Option, + ) -> napi::Result { let batches = ipc_file_to_batches(buf.to_vec()) .map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?; let batches = batches @@ -92,6 +101,17 @@ impl Table { return Err(napi::Error::from_reason(format!("Invalid mode: {}", mode))); }; + if let Some(tsfn) = progress_callback { + op = op.progress(move |p| { + // Non-blocking: drop progress events rather than back-pressuring + // the write if the JS callback can't keep up. + tsfn.call( + WriteProgressInfo::from(p), + ThreadsafeFunctionCallMode::NonBlocking, + ); + }); + } + let res = op.execute().await.default_error()?; Ok(res.into()) } @@ -654,6 +674,44 @@ pub struct OptimizeStats { pub prune: RemovalStats, } +/// Progress snapshot for a write operation, delivered to the JS callback +/// passed to `Table.add`. +#[napi(object)] +#[derive(Clone, Debug)] +pub struct WriteProgressInfo { + /// Number of rows written so far. + pub output_rows: i64, + /// Number of bytes written so far. + pub output_bytes: i64, + /// Total rows expected, if the input source reports it. + /// Always set on the final callback (where `done` is `true`). + pub total_rows: Option, + /// Wall-clock seconds since monitoring started. + pub elapsed_seconds: f64, + /// Number of parallel write tasks currently in flight. + pub active_tasks: i64, + /// Total number of parallel write tasks (the write parallelism). + pub total_tasks: i64, + /// `true` for the final callback; `false` otherwise. + pub done: bool, +} + +impl From<&lancedb::table::write_progress::WriteProgress> for WriteProgressInfo { + fn from(p: &lancedb::table::write_progress::WriteProgress) -> Self { + Self { + output_rows: p.output_rows() as i64, + output_bytes: p.output_bytes() as i64, + total_rows: p.total_rows().map(|n| n as i64), + elapsed_seconds: p.elapsed().as_secs_f64(), + active_tasks: p.active_tasks() as i64, + total_tasks: p.total_tasks() as i64, + done: p.done(), + } + } +} + +type ProgressFn = ThreadsafeFunction; + /// A definition of a column alteration. The alteration changes the column at /// `path` to have the new name `name`, to be nullable if `nullable` is true, /// and to have the data type `data_type`. At least one of `rename` or `nullable`