From 32c77879c9746a4f49da808141317ed7a4721381 Mon Sep 17 00:00:00 2001 From: Brendan Clement Date: Thu, 14 May 2026 11:39:56 -0700 Subject: [PATCH] feat(nodejs): surface skip_auto_cleanup on add and merge insert --- nodejs/__test__/table.test.ts | 6 +++ nodejs/lancedb/merge.ts | 17 +++++++ nodejs/lancedb/table.ts | 14 +++++- nodejs/src/merge.rs | 7 +++ nodejs/src/table.rs | 16 ++++++- rust/lancedb/src/database/listing.rs | 6 +++ rust/lancedb/src/database/namespace.rs | 4 ++ rust/lancedb/src/io/object_store.rs | 1 + rust/lancedb/src/table.rs | 18 ++++++- rust/lancedb/src/table/add_data.rs | 53 +++++++++++++++++++++ rust/lancedb/src/table/datafusion/insert.rs | 1 + rust/lancedb/src/table/dataset.rs | 2 + rust/lancedb/src/table/merge.rs | 16 +++++++ 13 files changed, 157 insertions(+), 4 deletions(-) diff --git a/nodejs/__test__/table.test.ts b/nodejs/__test__/table.test.ts index 36c616438..1776e65fd 100644 --- a/nodejs/__test__/table.test.ts +++ b/nodejs/__test__/table.test.ts @@ -115,6 +115,12 @@ describe.each([arrow15, arrow16, arrow17, arrow18])( await expect(table.countRows()).resolves.toBe(1); }); + it("should accept skipAutoCleanup on add()", async () => { + await table.add([{ id: 1 }], { skipAutoCleanup: true }); + await table.add([{ id: 2 }], { skipAutoCleanup: true }); + await expect(table.countRows()).resolves.toBe(2); + }); + it("should let me close the table", async () => { expect(table.isOpen()).toBe(true); table.close(); diff --git a/nodejs/lancedb/merge.ts b/nodejs/lancedb/merge.ts index dc9144fdf..dd586132c 100644 --- a/nodejs/lancedb/merge.ts +++ b/nodejs/lancedb/merge.ts @@ -87,6 +87,23 @@ export class MergeInsertBuilder { this.#schema, ); } + + /** + * Skip the automatic cleanup of old dataset versions that would otherwise + * run as part of this merge insert's commit. Forwards to + * `MergeInsertBuilder::skip_auto_cleanup` in lance-core. + * + * Useful for high-frequency writers that prefer to manage version cleanup + * themselves, or writers without delete permissions on the underlying storage. + * + * @param skip - If true, the auto-cleanup step is skipped at commit time. + */ + skipAutoCleanup(skip: boolean): MergeInsertBuilder { + return new MergeInsertBuilder( + this.#native.skipAutoCleanup(skip), + this.#schema, + ); + } /** * Executes the merge insert operation * diff --git a/nodejs/lancedb/table.ts b/nodejs/lancedb/table.ts index 264b8897d..a3aeebc6e 100644 --- a/nodejs/lancedb/table.ts +++ b/nodejs/lancedb/table.ts @@ -56,6 +56,18 @@ export interface AddDataOptions { * If "overwrite" then the new data will replace the existing data in the table. */ mode: "append" | "overwrite"; + /** + * If true, skip the automatic cleanup of old dataset versions that would + * otherwise run as part of this write's commit. Forwards to + * `WriteParams.skip_auto_cleanup` in lance-core. + * + * Useful for high-frequency writers that prefer to manage version cleanup + * themselves (for example, via a separate periodic optimize job), or for + * writers that don't have delete permissions on the underlying storage. + * + * Defaults to false. + */ + skipAutoCleanup?: boolean; } export interface UpdateOptions { @@ -636,7 +648,7 @@ export class LocalTable extends Table { const schema = await this.schema(); const buffer = await fromDataToBuffer(data, undefined, schema); - return await this.inner.add(buffer, mode); + return await this.inner.add(buffer, mode, options?.skipAutoCleanup); } async update( diff --git a/nodejs/src/merge.rs b/nodejs/src/merge.rs index 98d637fb3..9c58fe75e 100644 --- a/nodejs/src/merge.rs +++ b/nodejs/src/merge.rs @@ -50,6 +50,13 @@ impl NativeMergeInsertBuilder { this } + #[napi] + pub fn skip_auto_cleanup(&self, skip: bool) -> Self { + let mut this = self.clone(); + this.inner.skip_auto_cleanup(skip); + this + } + #[napi(catch_unwind)] pub async fn execute(&self, buf: Buffer) -> napi::Result { let data = ipc_file_to_batches(buf.to_vec()) diff --git a/nodejs/src/table.rs b/nodejs/src/table.rs index 6100d7bd7..b5b105d65 100644 --- a/nodejs/src/table.rs +++ b/nodejs/src/table.rs @@ -6,7 +6,7 @@ use std::collections::HashMap; use lancedb::ipc::{ipc_file_to_batches, ipc_file_to_schema}; use lancedb::table::{ AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration, NewColumnTransform, - OptimizeAction, OptimizeOptions, Table as LanceDbTable, + OptimizeAction, OptimizeOptions, Table as LanceDbTable, WriteOptions, }; use napi::bindgen_prelude::*; use napi_derive::napi; @@ -68,7 +68,12 @@ impl Table { } #[napi(catch_unwind)] - pub async fn add(&self, buf: Buffer, mode: String) -> napi::Result { + pub async fn add( + &self, + buf: Buffer, + mode: String, + skip_auto_cleanup: Option, + ) -> napi::Result { let batches = ipc_file_to_batches(buf.to_vec()) .map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?; let batches = batches @@ -92,6 +97,13 @@ impl Table { return Err(napi::Error::from_reason(format!("Invalid mode: {}", mode))); }; + if skip_auto_cleanup.unwrap_or(false) { + op = op.write_options(WriteOptions { + skip_auto_cleanup: true, + ..Default::default() + }); + } + let res = op.execute().await.default_error()?; Ok(res.into()) } diff --git a/rust/lancedb/src/database/listing.rs b/rust/lancedb/src/database/listing.rs index 7b7657bf3..c0eb48b2a 100644 --- a/rust/lancedb/src/database/listing.rs +++ b/rust/lancedb/src/database/listing.rs @@ -849,6 +849,10 @@ impl ListingDatabase { write_params.mode = WriteMode::Overwrite; } + if request.write_options.skip_auto_cleanup { + write_params.skip_auto_cleanup = true; + } + write_params.session = Some(self.session.clone()); write_params @@ -2034,6 +2038,7 @@ mod tests { }), ..Default::default() }), + ..Default::default() }; let table = db @@ -2107,6 +2112,7 @@ mod tests { }), ..Default::default() }), + ..Default::default() }; let table = db diff --git a/rust/lancedb/src/database/namespace.rs b/rust/lancedb/src/database/namespace.rs index de18f8db8..e6f0ad2ce 100644 --- a/rust/lancedb/src/database/namespace.rs +++ b/rust/lancedb/src/database/namespace.rs @@ -414,6 +414,10 @@ impl Database for LanceNamespaceDatabase { params.mode = WriteMode::Overwrite; } + if request.write_options.skip_auto_cleanup { + params.skip_auto_cleanup = true; + } + // Set up storage options if provided if let Some(storage_opts) = initial_storage_options { let store_params = params diff --git a/rust/lancedb/src/io/object_store.rs b/rust/lancedb/src/io/object_store.rs index d27357b82..2e9a00490 100644 --- a/rust/lancedb/src/io/object_store.rs +++ b/rust/lancedb/src/io/object_store.rs @@ -234,6 +234,7 @@ mod test { .create_table("test", data) .write_options(WriteOptions { lance_write_params: Some(param), + ..Default::default() }) .execute() .await; diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 29bcaea26..5da48f6d4 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -189,6 +189,18 @@ pub struct WriteOptions { // Coming soon: https://github.com/lancedb/lancedb/issues/992 // /// What behavior to take if the data contains invalid vectors // pub on_bad_vectors: BadVectorHandling, + /// If true, skip the automatic cleanup of old dataset versions that would + /// otherwise run during the commit. This forwards to + /// [`WriteParams::skip_auto_cleanup`] in lance-core. + /// + /// Useful for high-frequency writers that want to manage version cleanup + /// themselves (e.g. via a periodic optimize job), or for writers that + /// lack delete permissions on the underlying storage. + /// + /// If `lance_write_params` is also set with `skip_auto_cleanup = true`, + /// the cleanup is skipped. Setting this field to `true` forces the flag + /// on regardless of `lance_write_params`. + pub skip_auto_cleanup: bool, /// Advanced parameters that can be used to customize table creation /// /// Overlapping `OpenTableBuilder` options (e.g. [AddDataBuilder::mode]) will take @@ -2283,7 +2295,8 @@ impl BaseTable for NativeTable { let output = add.into_plan(&table_schema, &table_def)?; - let lance_params = output + let skip_auto_cleanup = output.write_options.skip_auto_cleanup; + let mut lance_params = output .write_options .lance_write_params .unwrap_or(WriteParams { @@ -2293,6 +2306,9 @@ impl BaseTable for NativeTable { }, ..Default::default() }); + if skip_auto_cleanup { + lance_params.skip_auto_cleanup = true; + } // Repartition for write parallelism if beneficial. let plan = if num_partitions > 1 { diff --git a/rust/lancedb/src/table/add_data.rs b/rust/lancedb/src/table/add_data.rs index 1c4b4bdf3..d6058e8bd 100644 --- a/rust/lancedb/src/table/add_data.rs +++ b/rust/lancedb/src/table/add_data.rs @@ -441,6 +441,7 @@ mod tests { .add(new_batch.clone()) .write_options(WriteOptions { lance_write_params: Some(param), + ..Default::default() }) .mode(AddDataMode::Append) .execute() @@ -761,4 +762,56 @@ mod tests { table2.add(struct_batch).execute().await.unwrap(); assert_eq!(table2.count_rows(None).await.unwrap(), 2); } + + #[tokio::test] + async fn test_add_skip_auto_cleanup() { + // Verifies WriteOptions::skip_auto_cleanup is forwarded to lance-core's + // WriteParams and actually suppresses the cleanup hook on commit. + let tmp_dir = tempfile::tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + let conn = connect(uri).execute().await.unwrap(); + + let batch = record_batch!(("id", Int64, [1, 2, 3])).unwrap(); + let table = conn.create_table("t", batch).execute().await.unwrap(); + // Cleanup on every commit, with `older_than = 0s` so prior versions are + // immediately eligible. + table + .as_native() + .unwrap() + .update_config(vec![ + ("lance.auto_cleanup.interval".to_string(), "1".to_string()), + ( + "lance.auto_cleanup.older_than".to_string(), + "0s".to_string(), + ), + ]) + .await + .unwrap(); + + // Write several versions with skip_auto_cleanup; none should be removed. + for i in 0..3 { + let new_batch = record_batch!(("id", Int64, [10 + i])).unwrap(); + table + .add(new_batch) + .write_options(WriteOptions { + skip_auto_cleanup: true, + ..Default::default() + }) + .execute() + .await + .unwrap(); + } + let versions_before = table.list_versions().await.unwrap().len(); + + // Now write one more without the flag; cleanup should run and prune. + let new_batch = record_batch!(("id", Int64, [42])).unwrap(); + table.add(new_batch).execute().await.unwrap(); + let versions_after = table.list_versions().await.unwrap().len(); + + assert!( + versions_after < versions_before, + "auto-cleanup should have removed old versions once the skip flag was off \ + (before={versions_before}, after={versions_after})" + ); + } } diff --git a/rust/lancedb/src/table/datafusion/insert.rs b/rust/lancedb/src/table/datafusion/insert.rs index f2cf21f13..9203f7e2b 100644 --- a/rust/lancedb/src/table/datafusion/insert.rs +++ b/rust/lancedb/src/table/datafusion/insert.rs @@ -219,6 +219,7 @@ impl ExecutionPlan for InsertExec { && let Some(merged_txn) = merge_transactions(transactions) { let new_dataset = CommitBuilder::new(dataset.clone()) + .with_skip_auto_cleanup(write_params.skip_auto_cleanup) .execute(merged_txn) .await?; ds_wrapper.update(new_dataset); diff --git a/rust/lancedb/src/table/dataset.rs b/rust/lancedb/src/table/dataset.rs index 584d45a2f..7db998447 100644 --- a/rust/lancedb/src/table/dataset.rs +++ b/rust/lancedb/src/table/dataset.rs @@ -528,6 +528,7 @@ mod tests { }), ..Default::default() }), + ..Default::default() }) .execute() .await @@ -589,6 +590,7 @@ mod tests { }), ..Default::default() }), + ..Default::default() }) .execute() .await diff --git a/rust/lancedb/src/table/merge.rs b/rust/lancedb/src/table/merge.rs index d8805acb8..571da80a1 100644 --- a/rust/lancedb/src/table/merge.rs +++ b/rust/lancedb/src/table/merge.rs @@ -55,6 +55,7 @@ pub struct MergeInsertBuilder { pub(crate) when_not_matched_by_source_delete_filt: Option, pub(crate) timeout: Option, pub(crate) use_index: bool, + pub(crate) skip_auto_cleanup: bool, } impl MergeInsertBuilder { @@ -69,6 +70,7 @@ impl MergeInsertBuilder { when_not_matched_by_source_delete_filt: None, timeout: None, use_index: true, + skip_auto_cleanup: false, } } @@ -148,6 +150,17 @@ impl MergeInsertBuilder { self } + /// Skip the automatic cleanup of old dataset versions that would otherwise + /// run during the merge insert commit. + /// + /// This forwards to [`lance::dataset::MergeInsertBuilder::skip_auto_cleanup`] + /// in lance-core. Useful for high-frequency writers that want to manage + /// version cleanup themselves, or writers without delete permissions. + pub fn skip_auto_cleanup(&mut self, skip: bool) -> &mut Self { + self.skip_auto_cleanup = skip; + self + } + /// Executes the merge insert operation /// /// Returns version and statistics about the merge operation including the number of rows @@ -191,6 +204,9 @@ pub(crate) async fn execute_merge_insert( builder.when_not_matched_by_source(WhenNotMatchedBySource::Keep); } builder.use_index(params.use_index); + if params.skip_auto_cleanup { + builder.skip_auto_cleanup(true); + } let future = if let Some(timeout) = params.timeout { let future = builder