mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-02 20:00:46 +00:00
## Summary When an `LsmWriteSpec` is installed on a table (#3396), `merge_insert` upsert calls are dispatched through Lance's MemWAL `ShardWriter` (LSM-style append) instead of the standard merge path. - **`use_lsm_write`** — a `merge_insert` builder option, default `true`; set it `false` to use the standard path for a call even when a spec is set. - **`assume_pre_sharded`** — a `merge_insert` builder option, default `false`; skips the per-row shard check and routes by the first row only. - **`close_lsm_writers`** — drains and closes the table's cached MemWAL shard writers. - The `merge_insert` **`on`** columns default to, and are validated against, the table's unenforced primary key. - Shard writers are cached alongside the dataset (in `DatasetConsistencyWrapper`) and reused for the session. - `MergeResult` gains **`num_rows`** — on the LSM path the insert/update breakdown is unknown until compaction, so only the total is reported. Routing covers all three sharding strategies — bucket (murmur3, Iceberg-compatible), identity, and unsharded. Each `merge_insert` call targets a single shard; the whole input is collected and validated before a single atomic `ShardWriter::put`, so a validation failure leaves the MemWAL untouched. Bindings: Python (`merge_insert(...).use_lsm_write(...)` / `.assume_pre_sharded(...)`, `Table.close_lsm_writers`) and TypeScript (`mergeInsert(...).useLsmWrite(...)` / `.assumePreSharded(...)`, `Table.closeLsmWriters`). ## Context Reconstructed from the original #3354 branch onto current `main`: the branch predated the #3394 (unenforced primary key) / #3396 (`LsmWriteSpec`) split and has been rebuilt on that merged foundation. Depends on Lance `v7.0.0-beta.13`. The MemWAL read path (reading un-flushed shard data back into queries) and remote (LanceDB Cloud) LSM support are follow-ups. --------- Co-authored-by: Jack Ye <yezhaoqin@gmail.com>
92 lines
2.5 KiB
Rust
92 lines
2.5 KiB
Rust
// SPDX-License-Identifier: Apache-2.0
|
|
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
|
|
|
use std::time::Duration;
|
|
|
|
use lancedb::{arrow::IntoArrow, ipc::ipc_file_to_batches, table::merge::MergeInsertBuilder};
|
|
use napi::bindgen_prelude::*;
|
|
use napi_derive::napi;
|
|
|
|
use crate::{error::convert_error, table::MergeResult};
|
|
|
|
#[napi]
|
|
#[derive(Clone)]
|
|
/// A builder used to create and run a merge insert operation
|
|
pub struct NativeMergeInsertBuilder {
|
|
pub(crate) inner: MergeInsertBuilder,
|
|
}
|
|
|
|
#[napi]
|
|
impl NativeMergeInsertBuilder {
|
|
#[napi]
|
|
pub fn when_matched_update_all(&self, condition: Option<String>) -> Self {
|
|
let mut this = self.clone();
|
|
this.inner.when_matched_update_all(condition);
|
|
this
|
|
}
|
|
|
|
#[napi]
|
|
pub fn when_not_matched_insert_all(&self) -> Self {
|
|
let mut this = self.clone();
|
|
this.inner.when_not_matched_insert_all();
|
|
this
|
|
}
|
|
#[napi]
|
|
pub fn when_not_matched_by_source_delete(&self, filter: Option<String>) -> Self {
|
|
let mut this = self.clone();
|
|
this.inner.when_not_matched_by_source_delete(filter);
|
|
this
|
|
}
|
|
|
|
#[napi]
|
|
pub fn set_timeout(&mut self, timeout: u32) {
|
|
self.inner.timeout(Duration::from_millis(timeout as u64));
|
|
}
|
|
|
|
#[napi]
|
|
pub fn use_index(&self, use_index: bool) -> Self {
|
|
let mut this = self.clone();
|
|
this.inner.use_index(use_index);
|
|
this
|
|
}
|
|
|
|
#[napi]
|
|
pub fn use_lsm_write(&self, use_lsm_write: bool) -> Self {
|
|
let mut this = self.clone();
|
|
this.inner.use_lsm_write(use_lsm_write);
|
|
this
|
|
}
|
|
|
|
#[napi]
|
|
pub fn validate_single_shard(&self, validate_single_shard: bool) -> Self {
|
|
let mut this = self.clone();
|
|
this.inner.validate_single_shard(validate_single_shard);
|
|
this
|
|
}
|
|
|
|
#[napi(catch_unwind)]
|
|
pub async fn execute(&self, buf: Buffer) -> napi::Result<MergeResult> {
|
|
let data = ipc_file_to_batches(buf.to_vec())
|
|
.and_then(IntoArrow::into_arrow)
|
|
.map_err(|e| {
|
|
napi::Error::from_reason(format!("Failed to read IPC file: {}", convert_error(&e)))
|
|
})?;
|
|
|
|
let this = self.clone();
|
|
|
|
let res = this.inner.execute(data).await.map_err(|e| {
|
|
napi::Error::from_reason(format!(
|
|
"Failed to execute merge insert: {}",
|
|
convert_error(&e)
|
|
))
|
|
})?;
|
|
Ok(res.into())
|
|
}
|
|
}
|
|
|
|
impl From<MergeInsertBuilder> for NativeMergeInsertBuilder {
|
|
fn from(inner: MergeInsertBuilder) -> Self {
|
|
Self { inner }
|
|
}
|
|
}
|