diff --git a/docs/src/js/globals.md b/docs/src/js/globals.md index 0f582b10f..09afacec5 100644 --- a/docs/src/js/globals.md +++ b/docs/src/js/globals.md @@ -104,6 +104,7 @@ - [UpdateResult](interfaces/UpdateResult.md) - [Version](interfaces/Version.md) - [WriteExecutionOptions](interfaces/WriteExecutionOptions.md) +- [WriteProgress](interfaces/WriteProgress.md) ## Type Aliases diff --git a/docs/src/js/interfaces/AddDataOptions.md b/docs/src/js/interfaces/AddDataOptions.md index 187721c00..6976787f0 100644 --- a/docs/src/js/interfaces/AddDataOptions.md +++ b/docs/src/js/interfaces/AddDataOptions.md @@ -19,3 +19,39 @@ mode: "append" | "overwrite"; If "append" (the default) then the new data will be added to the table If "overwrite" then the new data will replace the existing data in the table. + +*** + +### progress() + +```ts +progress: (progress) => void; +``` + +Optional callback invoked periodically with write progress. + +The callback is fired once per batch written and once more with +`done: true` when the write completes. Calls are dispatched +asynchronously to the JS event loop and never block the write — a slow +callback will queue events rather than back-pressure the writer. + +Errors thrown from the callback are logged with `console.warn` and +swallowed — they do not abort the write. + +#### Parameters + +* **progress**: [`WriteProgress`](WriteProgress.md) + +#### Returns + +`void` + +#### Example + +```ts +await table.add(data, { + progress: (p) => { + console.log(`${p.outputRows}/${p.totalRows ?? "?"} rows`); + }, +}); +``` diff --git a/docs/src/js/interfaces/WriteProgress.md b/docs/src/js/interfaces/WriteProgress.md new file mode 100644 index 000000000..e547b91a1 --- /dev/null +++ b/docs/src/js/interfaces/WriteProgress.md @@ -0,0 +1,84 @@ +[**@lancedb/lancedb**](../README.md) • **Docs** + +*** + +[@lancedb/lancedb](../globals.md) / WriteProgress + +# Interface: WriteProgress + +Progress snapshot for a write operation, delivered to the `progress` +callback passed to [Table.add](../classes/Table.md#add). + +## Properties + +### activeTasks + +```ts +activeTasks: number; +``` + +Number of parallel write tasks currently in flight. + +*** + +### done + +```ts +done: boolean; +``` + +`true` for the final callback; `false` otherwise. + +*** + +### elapsedSeconds + +```ts +elapsedSeconds: number; +``` + +Wall-clock seconds since the write started. + +*** + +### outputBytes + +```ts +outputBytes: number; +``` + +Number of bytes written so far. + +*** + +### outputRows + +```ts +outputRows: number; +``` + +Number of rows written so far. + +*** + +### totalRows? + +```ts +optional totalRows: number; +``` + +Total rows expected, when the input source reports it. + +Always set on the final callback (the one with `done: true`), falling +back to the actual number of rows written when the source could not +report a row count up front. + +*** + +### totalTasks + +```ts +totalTasks: number; +``` + +Total number of parallel write tasks (the write parallelism). diff --git a/nodejs/__test__/table.test.ts b/nodejs/__test__/table.test.ts index 23f65da4e..fda877a45 100644 --- a/nodejs/__test__/table.test.ts +++ b/nodejs/__test__/table.test.ts @@ -115,6 +115,48 @@ describe.each([arrow15, arrow16, arrow17, arrow18])( await expect(table.countRows()).resolves.toBe(1); }); + it("should invoke the progress callback", async () => { + const events: import("../lancedb").WriteProgress[] = []; + await table.add([{ id: 1 }, { id: 2 }, { id: 3 }], { + progress: (p) => events.push(p), + }); + + expect(events.length).toBeGreaterThan(0); + const last = events[events.length - 1]; + expect(last.done).toBe(true); + // Earlier callbacks must have done=false. + for (const ev of events.slice(0, -1)) { + expect(ev.done).toBe(false); + } + // outputRows reflects the rows added in this call, not table size. + expect(last.outputRows).toBe(3); + // The input source (an array) reports a row count, so totalRows is set. + expect(last.totalRows).toBe(3); + // outputRows is monotonic. + for (let i = 1; i < events.length; i++) { + expect(events[i].outputRows).toBeGreaterThanOrEqual( + events[i - 1].outputRows, + ); + } + }); + + it("should swallow errors thrown from the progress callback", async () => { + const warn = jest + .spyOn(console, "warn") + .mockImplementation(() => undefined); + try { + const res = await table.add([{ id: 1 }, { id: 2 }], { + progress: () => { + throw new Error("callback bomb"); + }, + }); + expect(res.version).toBeGreaterThan(0); + expect(warn).toHaveBeenCalled(); + } finally { + warn.mockRestore(); + } + }); + it("should let me close the table", async () => { expect(table.isOpen()).toBe(true); table.close(); diff --git a/nodejs/lancedb/index.ts b/nodejs/lancedb/index.ts index fc6d7777a..1110e427e 100644 --- a/nodejs/lancedb/index.ts +++ b/nodejs/lancedb/index.ts @@ -113,6 +113,7 @@ export { UpdateOptions, OptimizeOptions, Version, + WriteProgress, LsmWriteSpec, ColumnAlteration, } from "./table"; diff --git a/nodejs/lancedb/table.ts b/nodejs/lancedb/table.ts index 59816a413..fe495392a 100644 --- a/nodejs/lancedb/table.ts +++ b/nodejs/lancedb/table.ts @@ -46,6 +46,33 @@ import { sanitizeType } from "./sanitize"; import { IntoSql, toSQL } from "./util"; export { IndexConfig } from "./native"; +/** + * Progress snapshot for a write operation, delivered to the `progress` + * callback passed to {@link Table.add}. + */ +export interface WriteProgress { + /** Number of rows written so far. */ + outputRows: number; + /** Number of bytes written so far. */ + outputBytes: number; + /** + * Total rows expected, when the input source reports it. + * + * Always set on the final callback (the one with `done: true`), falling + * back to the actual number of rows written when the source could not + * report a row count up front. + */ + totalRows?: number; + /** Wall-clock seconds since the write started. */ + elapsedSeconds: number; + /** Number of parallel write tasks currently in flight. */ + activeTasks: number; + /** Total number of parallel write tasks (the write parallelism). */ + totalTasks: number; + /** `true` for the final callback; `false` otherwise. */ + done: boolean; +} + /** * Options for adding data to a table. */ @@ -56,6 +83,28 @@ export interface AddDataOptions { * If "overwrite" then the new data will replace the existing data in the table. */ mode: "append" | "overwrite"; + + /** + * Optional callback invoked periodically with write progress. + * + * The callback is fired once per batch written and once more with + * `done: true` when the write completes. Calls are dispatched + * asynchronously to the JS event loop and never block the write — a slow + * callback will queue events rather than back-pressure the writer. + * + * Errors thrown from the callback are logged with `console.warn` and + * swallowed — they do not abort the write. + * + * @example + * ```ts + * await table.add(data, { + * progress: (p) => { + * console.log(`${p.outputRows}/${p.totalRows ?? "?"} rows`); + * }, + * }); + * ``` + */ + progress: (progress: WriteProgress) => void; } export interface UpdateOptions { @@ -705,7 +754,20 @@ export class LocalTable extends Table { const schema = await this.schema(); const buffer = await fromDataToBuffer(data, undefined, schema); - return await this.inner.add(buffer, mode); + // Wrap the user callback so a thrown error doesn't surface as an + // unhandled exception (the callback fires from a napi threadsafe + // function — exceptions there crash the process). + const userProgress = options?.progress; + const progress = userProgress + ? (p: WriteProgress) => { + try { + userProgress(p); + } catch (e) { + console.warn("Table.add progress callback threw:", e); + } + } + : undefined; + return await this.inner.add(buffer, mode, progress); } async update( diff --git a/nodejs/src/table.rs b/nodejs/src/table.rs index 29bf7bba4..4c5424bc9 100644 --- a/nodejs/src/table.rs +++ b/nodejs/src/table.rs @@ -9,6 +9,7 @@ use lancedb::table::{ OptimizeAction, OptimizeOptions, Table as LanceDbTable, }; use napi::bindgen_prelude::*; +use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode}; use napi_derive::napi; use crate::error::NapiErrorExt; @@ -67,8 +68,16 @@ impl Table { schema_to_buffer(&schema) } - #[napi(catch_unwind)] - pub async fn add(&self, buf: Buffer, mode: String) -> napi::Result { + #[napi( + catch_unwind, + ts_args_type = "buf: Buffer, mode: string, progressCallback?: (progress: WriteProgressInfo) => void" + )] + pub async fn add( + &self, + buf: Buffer, + mode: String, + progress_callback: Option, + ) -> napi::Result { let batches = ipc_file_to_batches(buf.to_vec()) .map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?; let batches = batches @@ -92,6 +101,19 @@ impl Table { return Err(napi::Error::from_reason(format!("Invalid mode: {}", mode))); }; + if let Some(tsfn) = progress_callback { + op = op.progress(move |p| { + // NonBlocking: dispatch onto the JS event loop without + // blocking the writer thread. With napi-rs's default + // unbounded queue, events are not dropped — a slow JS + // callback will just queue them. + tsfn.call( + WriteProgressInfo::from(p), + ThreadsafeFunctionCallMode::NonBlocking, + ); + }); + } + let res = op.execute().await.default_error()?; Ok(res.into()) } @@ -654,6 +676,44 @@ pub struct OptimizeStats { pub prune: RemovalStats, } +/// Progress snapshot for a write operation, delivered to the JS callback +/// passed to `Table.add`. +#[napi(object)] +#[derive(Clone, Debug)] +pub struct WriteProgressInfo { + /// Number of rows written so far. + pub output_rows: i64, + /// Number of bytes written so far. + pub output_bytes: i64, + /// Total rows expected, if the input source reports it. + /// Always set on the final callback (where `done` is `true`). + pub total_rows: Option, + /// Wall-clock seconds since monitoring started. + pub elapsed_seconds: f64, + /// Number of parallel write tasks currently in flight. + pub active_tasks: i64, + /// Total number of parallel write tasks (the write parallelism). + pub total_tasks: i64, + /// `true` for the final callback; `false` otherwise. + pub done: bool, +} + +impl From<&lancedb::table::write_progress::WriteProgress> for WriteProgressInfo { + fn from(p: &lancedb::table::write_progress::WriteProgress) -> Self { + Self { + output_rows: p.output_rows() as i64, + output_bytes: p.output_bytes() as i64, + total_rows: p.total_rows().map(|n| n as i64), + elapsed_seconds: p.elapsed().as_secs_f64(), + active_tasks: p.active_tasks() as i64, + total_tasks: p.total_tasks() as i64, + done: p.done(), + } + } +} + +type ProgressFn = ThreadsafeFunction; + /// A definition of a column alteration. The alteration changes the column at /// `path` to have the new name `name`, to be nullable if `nullable` is true, /// and to have the data type `data_type`. At least one of `rename` or `nullable`