mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-22 22:40:41 +00:00
feat(nodejs): surface skip_auto_cleanup on add and merge insert
This commit is contained in:
@@ -115,6 +115,12 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
|
||||
await expect(table.countRows()).resolves.toBe(1);
|
||||
});
|
||||
|
||||
it("should accept skipAutoCleanup on add()", async () => {
|
||||
await table.add([{ id: 1 }], { skipAutoCleanup: true });
|
||||
await table.add([{ id: 2 }], { skipAutoCleanup: true });
|
||||
await expect(table.countRows()).resolves.toBe(2);
|
||||
});
|
||||
|
||||
it("should let me close the table", async () => {
|
||||
expect(table.isOpen()).toBe(true);
|
||||
table.close();
|
||||
|
||||
@@ -87,6 +87,23 @@ export class MergeInsertBuilder {
|
||||
this.#schema,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip the automatic cleanup of old dataset versions that would otherwise
|
||||
* run as part of this merge insert's commit. Forwards to
|
||||
* `MergeInsertBuilder::skip_auto_cleanup` in lance-core.
|
||||
*
|
||||
* Useful for high-frequency writers that prefer to manage version cleanup
|
||||
* themselves, or writers without delete permissions on the underlying storage.
|
||||
*
|
||||
* @param skip - If true, the auto-cleanup step is skipped at commit time.
|
||||
*/
|
||||
skipAutoCleanup(skip: boolean): MergeInsertBuilder {
|
||||
return new MergeInsertBuilder(
|
||||
this.#native.skipAutoCleanup(skip),
|
||||
this.#schema,
|
||||
);
|
||||
}
|
||||
/**
|
||||
* Executes the merge insert operation
|
||||
*
|
||||
|
||||
@@ -56,6 +56,18 @@ export interface AddDataOptions {
|
||||
* If "overwrite" then the new data will replace the existing data in the table.
|
||||
*/
|
||||
mode: "append" | "overwrite";
|
||||
/**
|
||||
* If true, skip the automatic cleanup of old dataset versions that would
|
||||
* otherwise run as part of this write's commit. Forwards to
|
||||
* `WriteParams.skip_auto_cleanup` in lance-core.
|
||||
*
|
||||
* Useful for high-frequency writers that prefer to manage version cleanup
|
||||
* themselves (for example, via a separate periodic optimize job), or for
|
||||
* writers that don't have delete permissions on the underlying storage.
|
||||
*
|
||||
* Defaults to false.
|
||||
*/
|
||||
skipAutoCleanup?: boolean;
|
||||
}
|
||||
|
||||
export interface UpdateOptions {
|
||||
@@ -636,7 +648,7 @@ export class LocalTable extends Table {
|
||||
const schema = await this.schema();
|
||||
|
||||
const buffer = await fromDataToBuffer(data, undefined, schema);
|
||||
return await this.inner.add(buffer, mode);
|
||||
return await this.inner.add(buffer, mode, options?.skipAutoCleanup);
|
||||
}
|
||||
|
||||
async update(
|
||||
|
||||
@@ -50,6 +50,13 @@ impl NativeMergeInsertBuilder {
|
||||
this
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn skip_auto_cleanup(&self, skip: bool) -> Self {
|
||||
let mut this = self.clone();
|
||||
this.inner.skip_auto_cleanup(skip);
|
||||
this
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn execute(&self, buf: Buffer) -> napi::Result<MergeResult> {
|
||||
let data = ipc_file_to_batches(buf.to_vec())
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::collections::HashMap;
|
||||
use lancedb::ipc::{ipc_file_to_batches, ipc_file_to_schema};
|
||||
use lancedb::table::{
|
||||
AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration, NewColumnTransform,
|
||||
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
|
||||
OptimizeAction, OptimizeOptions, Table as LanceDbTable, WriteOptions,
|
||||
};
|
||||
use napi::bindgen_prelude::*;
|
||||
use napi_derive::napi;
|
||||
@@ -68,7 +68,12 @@ impl Table {
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn add(&self, buf: Buffer, mode: String) -> napi::Result<AddResult> {
|
||||
pub async fn add(
|
||||
&self,
|
||||
buf: Buffer,
|
||||
mode: String,
|
||||
skip_auto_cleanup: Option<bool>,
|
||||
) -> napi::Result<AddResult> {
|
||||
let batches = ipc_file_to_batches(buf.to_vec())
|
||||
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
|
||||
let batches = batches
|
||||
@@ -92,6 +97,13 @@ impl Table {
|
||||
return Err(napi::Error::from_reason(format!("Invalid mode: {}", mode)));
|
||||
};
|
||||
|
||||
if skip_auto_cleanup.unwrap_or(false) {
|
||||
op = op.write_options(WriteOptions {
|
||||
skip_auto_cleanup: true,
|
||||
..Default::default()
|
||||
});
|
||||
}
|
||||
|
||||
let res = op.execute().await.default_error()?;
|
||||
Ok(res.into())
|
||||
}
|
||||
|
||||
@@ -849,6 +849,10 @@ impl ListingDatabase {
|
||||
write_params.mode = WriteMode::Overwrite;
|
||||
}
|
||||
|
||||
if request.write_options.skip_auto_cleanup {
|
||||
write_params.skip_auto_cleanup = true;
|
||||
}
|
||||
|
||||
write_params.session = Some(self.session.clone());
|
||||
|
||||
write_params
|
||||
@@ -2034,6 +2038,7 @@ mod tests {
|
||||
}),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let table = db
|
||||
@@ -2107,6 +2112,7 @@ mod tests {
|
||||
}),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let table = db
|
||||
|
||||
@@ -414,6 +414,10 @@ impl Database for LanceNamespaceDatabase {
|
||||
params.mode = WriteMode::Overwrite;
|
||||
}
|
||||
|
||||
if request.write_options.skip_auto_cleanup {
|
||||
params.skip_auto_cleanup = true;
|
||||
}
|
||||
|
||||
// Set up storage options if provided
|
||||
if let Some(storage_opts) = initial_storage_options {
|
||||
let store_params = params
|
||||
|
||||
@@ -234,6 +234,7 @@ mod test {
|
||||
.create_table("test", data)
|
||||
.write_options(WriteOptions {
|
||||
lance_write_params: Some(param),
|
||||
..Default::default()
|
||||
})
|
||||
.execute()
|
||||
.await;
|
||||
|
||||
@@ -189,6 +189,18 @@ pub struct WriteOptions {
|
||||
// Coming soon: https://github.com/lancedb/lancedb/issues/992
|
||||
// /// What behavior to take if the data contains invalid vectors
|
||||
// pub on_bad_vectors: BadVectorHandling,
|
||||
/// If true, skip the automatic cleanup of old dataset versions that would
|
||||
/// otherwise run during the commit. This forwards to
|
||||
/// [`WriteParams::skip_auto_cleanup`] in lance-core.
|
||||
///
|
||||
/// Useful for high-frequency writers that want to manage version cleanup
|
||||
/// themselves (e.g. via a periodic optimize job), or for writers that
|
||||
/// lack delete permissions on the underlying storage.
|
||||
///
|
||||
/// If `lance_write_params` is also set with `skip_auto_cleanup = true`,
|
||||
/// the cleanup is skipped. Setting this field to `true` forces the flag
|
||||
/// on regardless of `lance_write_params`.
|
||||
pub skip_auto_cleanup: bool,
|
||||
/// Advanced parameters that can be used to customize table creation
|
||||
///
|
||||
/// Overlapping `OpenTableBuilder` options (e.g. [AddDataBuilder::mode]) will take
|
||||
@@ -2283,7 +2295,8 @@ impl BaseTable for NativeTable {
|
||||
|
||||
let output = add.into_plan(&table_schema, &table_def)?;
|
||||
|
||||
let lance_params = output
|
||||
let skip_auto_cleanup = output.write_options.skip_auto_cleanup;
|
||||
let mut lance_params = output
|
||||
.write_options
|
||||
.lance_write_params
|
||||
.unwrap_or(WriteParams {
|
||||
@@ -2293,6 +2306,9 @@ impl BaseTable for NativeTable {
|
||||
},
|
||||
..Default::default()
|
||||
});
|
||||
if skip_auto_cleanup {
|
||||
lance_params.skip_auto_cleanup = true;
|
||||
}
|
||||
|
||||
// Repartition for write parallelism if beneficial.
|
||||
let plan = if num_partitions > 1 {
|
||||
|
||||
@@ -441,6 +441,7 @@ mod tests {
|
||||
.add(new_batch.clone())
|
||||
.write_options(WriteOptions {
|
||||
lance_write_params: Some(param),
|
||||
..Default::default()
|
||||
})
|
||||
.mode(AddDataMode::Append)
|
||||
.execute()
|
||||
@@ -761,4 +762,56 @@ mod tests {
|
||||
table2.add(struct_batch).execute().await.unwrap();
|
||||
assert_eq!(table2.count_rows(None).await.unwrap(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_add_skip_auto_cleanup() {
|
||||
// Verifies WriteOptions::skip_auto_cleanup is forwarded to lance-core's
|
||||
// WriteParams and actually suppresses the cleanup hook on commit.
|
||||
let tmp_dir = tempfile::tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
let conn = connect(uri).execute().await.unwrap();
|
||||
|
||||
let batch = record_batch!(("id", Int64, [1, 2, 3])).unwrap();
|
||||
let table = conn.create_table("t", batch).execute().await.unwrap();
|
||||
// Cleanup on every commit, with `older_than = 0s` so prior versions are
|
||||
// immediately eligible.
|
||||
table
|
||||
.as_native()
|
||||
.unwrap()
|
||||
.update_config(vec![
|
||||
("lance.auto_cleanup.interval".to_string(), "1".to_string()),
|
||||
(
|
||||
"lance.auto_cleanup.older_than".to_string(),
|
||||
"0s".to_string(),
|
||||
),
|
||||
])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Write several versions with skip_auto_cleanup; none should be removed.
|
||||
for i in 0..3 {
|
||||
let new_batch = record_batch!(("id", Int64, [10 + i])).unwrap();
|
||||
table
|
||||
.add(new_batch)
|
||||
.write_options(WriteOptions {
|
||||
skip_auto_cleanup: true,
|
||||
..Default::default()
|
||||
})
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let versions_before = table.list_versions().await.unwrap().len();
|
||||
|
||||
// Now write one more without the flag; cleanup should run and prune.
|
||||
let new_batch = record_batch!(("id", Int64, [42])).unwrap();
|
||||
table.add(new_batch).execute().await.unwrap();
|
||||
let versions_after = table.list_versions().await.unwrap().len();
|
||||
|
||||
assert!(
|
||||
versions_after < versions_before,
|
||||
"auto-cleanup should have removed old versions once the skip flag was off \
|
||||
(before={versions_before}, after={versions_after})"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -219,6 +219,7 @@ impl ExecutionPlan for InsertExec {
|
||||
&& let Some(merged_txn) = merge_transactions(transactions)
|
||||
{
|
||||
let new_dataset = CommitBuilder::new(dataset.clone())
|
||||
.with_skip_auto_cleanup(write_params.skip_auto_cleanup)
|
||||
.execute(merged_txn)
|
||||
.await?;
|
||||
ds_wrapper.update(new_dataset);
|
||||
|
||||
@@ -528,6 +528,7 @@ mod tests {
|
||||
}),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
})
|
||||
.execute()
|
||||
.await
|
||||
@@ -589,6 +590,7 @@ mod tests {
|
||||
}),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
})
|
||||
.execute()
|
||||
.await
|
||||
|
||||
@@ -55,6 +55,7 @@ pub struct MergeInsertBuilder {
|
||||
pub(crate) when_not_matched_by_source_delete_filt: Option<String>,
|
||||
pub(crate) timeout: Option<Duration>,
|
||||
pub(crate) use_index: bool,
|
||||
pub(crate) skip_auto_cleanup: bool,
|
||||
}
|
||||
|
||||
impl MergeInsertBuilder {
|
||||
@@ -69,6 +70,7 @@ impl MergeInsertBuilder {
|
||||
when_not_matched_by_source_delete_filt: None,
|
||||
timeout: None,
|
||||
use_index: true,
|
||||
skip_auto_cleanup: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,6 +150,17 @@ impl MergeInsertBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Skip the automatic cleanup of old dataset versions that would otherwise
|
||||
/// run during the merge insert commit.
|
||||
///
|
||||
/// This forwards to [`lance::dataset::MergeInsertBuilder::skip_auto_cleanup`]
|
||||
/// in lance-core. Useful for high-frequency writers that want to manage
|
||||
/// version cleanup themselves, or writers without delete permissions.
|
||||
pub fn skip_auto_cleanup(&mut self, skip: bool) -> &mut Self {
|
||||
self.skip_auto_cleanup = skip;
|
||||
self
|
||||
}
|
||||
|
||||
/// Executes the merge insert operation
|
||||
///
|
||||
/// Returns version and statistics about the merge operation including the number of rows
|
||||
@@ -191,6 +204,9 @@ pub(crate) async fn execute_merge_insert(
|
||||
builder.when_not_matched_by_source(WhenNotMatchedBySource::Keep);
|
||||
}
|
||||
builder.use_index(params.use_index);
|
||||
if params.skip_auto_cleanup {
|
||||
builder.skip_auto_cleanup(true);
|
||||
}
|
||||
|
||||
let future = if let Some(timeout) = params.timeout {
|
||||
let future = builder
|
||||
|
||||
Reference in New Issue
Block a user