mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-21 05:50:41 +00:00
feat(nodejs): add progress to Table.add (#3398)
### Summary
- Add an optional `progress` callback to `Table.add(data, { progress
})`. Callback fires once per batch written and once more with `done:
true` when the write completes.
- Errors thrown from the user's callback are logged with `console.warn`
and swallowed
### Testing
- npm test
- ran smoke test script to verify functionality
This commit is contained in:
@@ -9,6 +9,7 @@ use lancedb::table::{
|
||||
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
|
||||
};
|
||||
use napi::bindgen_prelude::*;
|
||||
use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode};
|
||||
use napi_derive::napi;
|
||||
|
||||
use crate::error::NapiErrorExt;
|
||||
@@ -67,8 +68,16 @@ impl Table {
|
||||
schema_to_buffer(&schema)
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn add(&self, buf: Buffer, mode: String) -> napi::Result<AddResult> {
|
||||
#[napi(
|
||||
catch_unwind,
|
||||
ts_args_type = "buf: Buffer, mode: string, progressCallback?: (progress: WriteProgressInfo) => void"
|
||||
)]
|
||||
pub async fn add(
|
||||
&self,
|
||||
buf: Buffer,
|
||||
mode: String,
|
||||
progress_callback: Option<ProgressFn>,
|
||||
) -> napi::Result<AddResult> {
|
||||
let batches = ipc_file_to_batches(buf.to_vec())
|
||||
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
|
||||
let batches = batches
|
||||
@@ -92,6 +101,19 @@ impl Table {
|
||||
return Err(napi::Error::from_reason(format!("Invalid mode: {}", mode)));
|
||||
};
|
||||
|
||||
if let Some(tsfn) = progress_callback {
|
||||
op = op.progress(move |p| {
|
||||
// NonBlocking: dispatch onto the JS event loop without
|
||||
// blocking the writer thread. With napi-rs's default
|
||||
// unbounded queue, events are not dropped — a slow JS
|
||||
// callback will just queue them.
|
||||
tsfn.call(
|
||||
WriteProgressInfo::from(p),
|
||||
ThreadsafeFunctionCallMode::NonBlocking,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
let res = op.execute().await.default_error()?;
|
||||
Ok(res.into())
|
||||
}
|
||||
@@ -654,6 +676,44 @@ pub struct OptimizeStats {
|
||||
pub prune: RemovalStats,
|
||||
}
|
||||
|
||||
/// Progress snapshot for a write operation, delivered to the JS callback
|
||||
/// passed to `Table.add`.
|
||||
#[napi(object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct WriteProgressInfo {
|
||||
/// Number of rows written so far.
|
||||
pub output_rows: i64,
|
||||
/// Number of bytes written so far.
|
||||
pub output_bytes: i64,
|
||||
/// Total rows expected, if the input source reports it.
|
||||
/// Always set on the final callback (where `done` is `true`).
|
||||
pub total_rows: Option<i64>,
|
||||
/// Wall-clock seconds since monitoring started.
|
||||
pub elapsed_seconds: f64,
|
||||
/// Number of parallel write tasks currently in flight.
|
||||
pub active_tasks: i64,
|
||||
/// Total number of parallel write tasks (the write parallelism).
|
||||
pub total_tasks: i64,
|
||||
/// `true` for the final callback; `false` otherwise.
|
||||
pub done: bool,
|
||||
}
|
||||
|
||||
impl From<&lancedb::table::write_progress::WriteProgress> for WriteProgressInfo {
|
||||
fn from(p: &lancedb::table::write_progress::WriteProgress) -> Self {
|
||||
Self {
|
||||
output_rows: p.output_rows() as i64,
|
||||
output_bytes: p.output_bytes() as i64,
|
||||
total_rows: p.total_rows().map(|n| n as i64),
|
||||
elapsed_seconds: p.elapsed().as_secs_f64(),
|
||||
active_tasks: p.active_tasks() as i64,
|
||||
total_tasks: p.total_tasks() as i64,
|
||||
done: p.done(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type ProgressFn = ThreadsafeFunction<WriteProgressInfo, (), WriteProgressInfo, Status, false>;
|
||||
|
||||
/// A definition of a column alteration. The alteration changes the column at
|
||||
/// `path` to have the new name `name`, to be nullable if `nullable` is true,
|
||||
/// and to have the data type `data_type`. At least one of `rename` or `nullable`
|
||||
|
||||
Reference in New Issue
Block a user