feat(table): route merge_insert through the MemWAL LSM write path (#3354)

## Summary

When an `LsmWriteSpec` is installed on a table (#3396), `merge_insert`
upsert
calls are dispatched through Lance's MemWAL `ShardWriter` (LSM-style
append)
instead of the standard merge path.

- **`use_lsm_write`** — a `merge_insert` builder option, default `true`;
set it
  `false` to use the standard path for a call even when a spec is set.
- **`assume_pre_sharded`** — a `merge_insert` builder option, default
`false`;
  skips the per-row shard check and routes by the first row only.
- **`close_lsm_writers`** — drains and closes the table's cached MemWAL
shard
  writers.
- The `merge_insert` **`on`** columns default to, and are validated
against,
  the table's unenforced primary key.
- Shard writers are cached alongside the dataset (in
  `DatasetConsistencyWrapper`) and reused for the session.
- `MergeResult` gains **`num_rows`** — on the LSM path the insert/update
  breakdown is unknown until compaction, so only the total is reported.

Routing covers all three sharding strategies — bucket (murmur3,
Iceberg-compatible), identity, and unsharded. Each `merge_insert` call
targets
a single shard; the whole input is collected and validated before a
single
atomic `ShardWriter::put`, so a validation failure leaves the MemWAL
untouched.

Bindings: Python (`merge_insert(...).use_lsm_write(...)` /
`.assume_pre_sharded(...)`, `Table.close_lsm_writers`) and TypeScript
(`mergeInsert(...).useLsmWrite(...)` / `.assumePreSharded(...)`,
`Table.closeLsmWriters`).

## Context

Reconstructed from the original #3354 branch onto current `main`: the
branch
predated the #3394 (unenforced primary key) / #3396 (`LsmWriteSpec`)
split and
has been rebuilt on that merged foundation. Depends on Lance
`v7.0.0-beta.13`.

The MemWAL read path (reading un-flushed shard data back into queries)
and
remote (LanceDB Cloud) LSM support are follow-ups.

---------

Co-authored-by: Jack Ye <yezhaoqin@gmail.com>
This commit is contained in:
Heng Ge
2026-05-29 08:48:11 -07:00
committed by GitHub
parent 458dcabbd2
commit 048f52c2aa
24 changed files with 2020 additions and 20 deletions

View File

@@ -75,7 +75,7 @@ reqwest = { version = "0.12.0", default-features = false, features = [
"stream",
], optional = true }
http = { version = "1", optional = true } # Matching what is in reqwest
uuid = { version = "1.7.0", features = ["v4"] }
uuid = { version = "1.7.0", features = ["v4", "v5"] }
polars-arrow = { version = ">=0.37,<0.40.0", optional = true }
polars = { version = ">=0.37,<0.40.0", optional = true }
hf-hub = { version = "0.4.1", optional = true, default-features = false, features = [

View File

@@ -908,6 +908,15 @@ mod tests {
use serial_test::serial;
use std::time::Duration;
// Serializes the env-var-mutating tests below: cargo test runs tests in
// parallel, but several of these tests read and write the same process-
// global env vars (`LANCEDB_USER_ID*`), so they would race without this.
static ENV_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
fn lock_env() -> std::sync::MutexGuard<'static, ()> {
ENV_MUTEX.lock().unwrap_or_else(|e| e.into_inner())
}
#[test]
fn test_timeout_config_default() {
let config = TimeoutConfig::default();
@@ -1166,6 +1175,7 @@ mod tests {
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_none() {
let _guard = lock_env();
let config = ClientConfig::default();
// Clear env vars that might be set from other tests
// SAFETY: This is only called in tests
@@ -1179,6 +1189,7 @@ mod tests {
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_from_env() {
let _guard = lock_env();
// SAFETY: This is only called in tests
unsafe {
std::env::set_var("LANCEDB_USER_ID", "env-user-id");
@@ -1194,6 +1205,7 @@ mod tests {
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_from_env_key() {
let _guard = lock_env();
// SAFETY: This is only called in tests
unsafe {
std::env::remove_var("LANCEDB_USER_ID");
@@ -1215,6 +1227,7 @@ mod tests {
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_direct_takes_precedence() {
let _guard = lock_env();
// SAFETY: This is only called in tests
unsafe {
std::env::set_var("LANCEDB_USER_ID", "env-user-id");
@@ -1233,6 +1246,7 @@ mod tests {
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_empty_env_ignored() {
let _guard = lock_env();
// SAFETY: This is only called in tests
unsafe {
std::env::set_var("LANCEDB_USER_ID", "");

View File

@@ -1805,6 +1805,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
num_inserted_rows: 0,
num_updated_rows: 0,
num_attempts: 0,
num_rows: 0,
});
}

View File

@@ -366,6 +366,14 @@ impl LsmWriteSpec {
/// Construct an identity-sharding spec (shard by the raw value of
/// `column`) with no maintained indexes.
///
/// `column` must be a deterministic function of the unenforced primary
/// key: every row with a given primary key must always produce the same
/// `column` value. MemWAL dedups upserts by primary key but tracks
/// generations per shard, so if the same key is written with two
/// different `column` values its versions land in different shards and a
/// stale value can win. Typically `column` is the primary key itself, or
/// a stable attribute of it (e.g. a tenant id).
pub fn identity(column: impl Into<String>) -> Self {
Self::Identity {
column: column.into(),
@@ -580,6 +588,13 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
message: "unset_lsm_write_spec is not supported on this table type".into(),
})
}
/// Drain and close any cached MemWAL shard writers for this table.
///
/// The default implementation is a no-op; table types that maintain
/// MemWAL shard writers override it.
async fn close_lsm_writers(&self) -> Result<()> {
Ok(())
}
/// Gets the table tag manager.
async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
/// Optimize the dataset.
@@ -1386,6 +1401,16 @@ impl Table {
self.inner.unset_lsm_write_spec().await
}
/// Drain and close any cached MemWAL shard writers held for this table.
///
/// When an [`LsmWriteSpec`] is installed, `merge_insert` opens MemWAL shard
/// writers and caches them for reuse across calls. This closes them,
/// flushing pending data; writers reopen lazily on the next `merge_insert`.
/// It is a no-op when no writers are cached.
pub async fn close_lsm_writers(&self) -> Result<()> {
self.inner.close_lsm_writers().await
}
/// Retrieve the version of the table
///
/// LanceDb supports versioning. Every operation that modifies the table increases
@@ -2829,6 +2854,10 @@ impl BaseTable for NativeTable {
merge::lsm::unset_lsm_write_spec(self).await
}
async fn close_lsm_writers(&self) -> Result<()> {
merge::lsm::close_lsm_writers(self).await
}
/// Delete rows from the table
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
delete::execute_delete(self, predicate).await

View File

@@ -8,6 +8,7 @@ use std::{
use lance::{Dataset, dataset::refs};
use crate::table::merge::lsm::ShardWriterCache;
use crate::{Error, error::Result, utils::background_cache::BackgroundCache};
/// A wrapper around a [Dataset] that provides consistency checks.
@@ -18,6 +19,10 @@ use crate::{Error, error::Result, utils::background_cache::BackgroundCache};
pub struct DatasetConsistencyWrapper {
state: Arc<Mutex<DatasetState>>,
consistency: ConsistencyMode,
/// The single MemWAL `ShardWriter` for this dataset, co-located so it is
/// cached for the session and shares the dataset's lifecycle. A dataset
/// writes to one shard at a time. Shared by `Arc` across clones.
shard_writer: Arc<ShardWriterCache>,
}
/// The current dataset and whether it is pinned to a specific version.
@@ -67,9 +72,15 @@ impl DatasetConsistencyWrapper {
pinned_version: None,
})),
consistency,
shard_writer: Arc::new(ShardWriterCache::default()),
}
}
/// The MemWAL `ShardWriter` cache co-located with this dataset.
pub(crate) fn shard_writer(&self) -> &Arc<ShardWriterCache> {
&self.shard_writer
}
/// Get the current dataset.
///
/// Behavior depends on the consistency mode:

View File

@@ -41,6 +41,16 @@ pub struct MergeResult {
/// A value of 1 means the operation succeeded on the first try.
#[serde(default)]
pub num_attempts: u32,
/// Total number of rows written.
///
/// On the standard `merge_insert` path this equals
/// `num_inserted_rows + num_updated_rows`. On the MemWAL LSM write path the
/// insert/update breakdown is not known until compaction; in that mode
/// `num_inserted_rows`, `num_updated_rows`, `num_deleted_rows`, `version`
/// and `num_attempts` are all `0` and this field holds the total number of
/// rows written through the shard writer.
#[serde(default)]
pub num_rows: u64,
}
/// A builder used to create and run a merge insert operation
@@ -57,6 +67,8 @@ pub struct MergeInsertBuilder {
pub(crate) when_not_matched_by_source_delete_filt: Option<String>,
pub(crate) timeout: Option<Duration>,
pub(crate) use_index: bool,
pub(crate) use_lsm_write: Option<bool>,
pub(crate) validate_single_shard: bool,
}
impl MergeInsertBuilder {
@@ -71,6 +83,8 @@ impl MergeInsertBuilder {
when_not_matched_by_source_delete_filt: None,
timeout: None,
use_index: true,
use_lsm_write: None,
validate_single_shard: true,
}
}
@@ -150,6 +164,34 @@ impl MergeInsertBuilder {
self
}
/// Controls whether `merge_insert` uses the MemWAL LSM write path.
///
/// By default (unset), a `merge_insert` on a table with an
/// [`LsmWriteSpec`](super::LsmWriteSpec) installed is routed through
/// Lance's MemWAL shard writer, and a table without one uses the standard
/// path. Calling this with `false` forces the standard path even when a
/// spec is set. Calling it with `true` requires a spec — `merge_insert`
/// errors if none is installed.
pub fn use_lsm_write(&mut self, use_lsm_write: bool) -> &mut Self {
self.use_lsm_write = Some(use_lsm_write);
self
}
/// Controls how an LSM `merge_insert` checks that its input targets a
/// single shard.
///
/// When a table has an LSM write spec, every row in a `merge_insert` call
/// must route to the same shard. When `true` (the default), every row is
/// inspected to verify this. When `false`, only the first row is inspected
/// and the shard it routes to is used for the whole input — a faster path
/// for callers that have already pre-sharded their input.
///
/// Has no effect on tables without an LSM write spec.
pub fn validate_single_shard(&mut self, validate_single_shard: bool) -> &mut Self {
self.validate_single_shard = validate_single_shard;
self
}
/// Executes the merge insert operation
///
/// Returns version and statistics about the merge operation including the number of rows
@@ -167,6 +209,23 @@ pub(crate) async fn execute_merge_insert(
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<MergeResult> {
match lsm::lsm_dispatch_decision(table, &params).await? {
lsm::LsmDispatch::Lsm(plan) => {
let future =
lsm::execute_lsm_merge_insert(table, plan, params.validate_single_shard, new_data);
return match params.timeout {
Some(timeout) => match tokio::time::timeout(timeout, future).await {
Ok(result) => result,
Err(_) => Err(Error::Runtime {
message: "merge insert timed out".to_string(),
}),
},
None => future.await,
};
}
lsm::LsmDispatch::Standard => {}
}
let dataset = table.dataset.get().await?;
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
match (
@@ -219,6 +278,7 @@ pub(crate) async fn execute_merge_insert(
num_inserted_rows: stats.num_inserted_rows,
num_deleted_rows: stats.num_deleted_rows,
num_attempts: stats.num_attempts,
num_rows: stats.num_inserted_rows + stats.num_updated_rows,
})
}
@@ -327,3 +387,366 @@ mod tests {
assert_eq!(table.count_rows(None).await.unwrap(), 25);
}
}
#[cfg(test)]
mod lsm_tests {
use std::sync::Arc;
use arrow_array::{
Int64Array, RecordBatch, RecordBatchIterator, RecordBatchReader, StringArray,
};
use arrow_schema::{DataType, Field, Schema};
use tempfile::{TempDir, tempdir};
use crate::connect;
use crate::error::Error;
use crate::table::{LsmWriteSpec, Table};
/// A reader of `[id: Int64, value: Int64]` rows; `value` is `0..n`.
fn id_value_reader(ids: Vec<i64>) -> Box<dyn RecordBatchReader + Send> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("value", DataType::Int64, false),
]));
let n = ids.len() as i64;
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(ids)),
Arc::new(Int64Array::from_iter_values(0..n)),
],
)
.unwrap();
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema))
}
/// A reader of `[id: Int64, region: Utf8]` rows.
fn id_region_reader(rows: Vec<(i64, &str)>) -> Box<dyn RecordBatchReader + Send> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("region", DataType::Utf8, false),
]));
let ids: Vec<i64> = rows.iter().map(|(id, _)| *id).collect();
let regions: Vec<&str> = rows.iter().map(|(_, region)| *region).collect();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(ids)),
Arc::new(StringArray::from(regions)),
],
)
.unwrap();
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema))
}
/// A multi-batch reader of `[id: Int64, region: Utf8]` rows.
fn id_region_multi_reader(batches: Vec<Vec<(i64, &str)>>) -> Box<dyn RecordBatchReader + Send> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("region", DataType::Utf8, false),
]));
let records: Vec<_> = batches
.into_iter()
.map(|rows| {
let ids: Vec<i64> = rows.iter().map(|(id, _)| *id).collect();
let regions: Vec<&str> = rows.iter().map(|(_, region)| *region).collect();
Ok(RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(ids)),
Arc::new(StringArray::from(regions)),
],
)
.unwrap())
})
.collect();
Box::new(RecordBatchIterator::new(records, schema))
}
/// Create an `[id, value]` table with `id` as the unenforced primary key.
async fn id_value_table(dir: &TempDir) -> Table {
let conn = connect(dir.path().to_str().unwrap())
.execute()
.await
.unwrap();
let table = conn
.create_table("t", id_value_reader(vec![1, 2, 3]))
.execute()
.await
.unwrap();
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
}
#[tokio::test]
async fn lsm_merge_insert_bucket() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
// num_buckets = 1: every row routes to the single bucket.
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
.await
.unwrap();
// Empty `on` defaults to the primary key.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let result = builder
.execute(id_value_reader(vec![3, 4, 5]))
.await
.unwrap();
// LSM path: rows go to the MemWAL, the breakdown is unknown until
// compaction, so only `num_rows` is populated.
assert_eq!(result.num_rows, 3);
assert_eq!(result.version, 0);
assert_eq!(result.num_inserted_rows, 0);
assert_eq!(result.num_updated_rows, 0);
}
#[tokio::test]
async fn lsm_merge_insert_unsharded() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
table
.set_lsm_write_spec(LsmWriteSpec::unsharded())
.await
.unwrap();
let mut builder = table.merge_insert(&["id"]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let result = builder
.execute(id_value_reader(vec![10, 11, 12, 13]))
.await
.unwrap();
assert_eq!(result.num_rows, 4);
}
#[tokio::test]
async fn lsm_merge_insert_identity() {
let dir = tempdir().unwrap();
let conn = connect(dir.path().to_str().unwrap())
.execute()
.await
.unwrap();
let table = conn
.create_table("t", id_region_reader(vec![(1, "us"), (2, "us")]))
.execute()
.await
.unwrap();
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(LsmWriteSpec::identity("region"))
.await
.unwrap();
// All rows share one identity value, so they route to one shard.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let result = builder
.execute(id_region_reader(vec![(3, "us"), (4, "us")]))
.await
.unwrap();
assert_eq!(result.num_rows, 2);
}
#[tokio::test]
async fn lsm_merge_insert_use_lsm_write_false_falls_back() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
.await
.unwrap();
// use_lsm_write(false) opts out: the standard path runs and commits.
let mut builder = table.merge_insert(&["id"]);
builder.when_not_matched_insert_all().use_lsm_write(false);
let result = builder
.execute(id_value_reader(vec![3, 4, 5]))
.await
.unwrap();
assert_eq!(result.num_inserted_rows, 2);
assert_eq!(table.count_rows(None).await.unwrap(), 5);
}
#[tokio::test]
async fn lsm_merge_insert_rejects_on_not_primary_key() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
.await
.unwrap();
let mut builder = table.merge_insert(&["value"]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let err = builder.execute(id_value_reader(vec![1])).await.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
}
#[tokio::test]
async fn lsm_merge_insert_rejects_non_upsert() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
.await
.unwrap();
// Insert-only (no when_matched_update_all) is not the upsert shape.
let mut builder = table.merge_insert(&[]);
builder.when_not_matched_insert_all();
let err = builder.execute(id_value_reader(vec![4])).await.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
}
#[tokio::test]
async fn lsm_close_writers_then_reopen() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
.await
.unwrap();
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
builder.execute(id_value_reader(vec![7, 8])).await.unwrap();
table.close_lsm_writers().await.unwrap();
// The writer reopens lazily on the next merge_insert.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let result = builder.execute(id_value_reader(vec![9])).await.unwrap();
assert_eq!(result.num_rows, 1);
}
#[tokio::test]
async fn lsm_merge_insert_multi_batch() {
let dir = tempdir().unwrap();
let conn = connect(dir.path().to_str().unwrap())
.execute()
.await
.unwrap();
let table = conn
.create_table("t", id_region_reader(vec![(1, "us")]))
.execute()
.await
.unwrap();
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(LsmWriteSpec::identity("region"))
.await
.unwrap();
// Multiple batches that all route to one shard are written together.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let result = builder
.execute(id_region_multi_reader(vec![
vec![(2, "us"), (3, "us")],
vec![(4, "us")],
]))
.await
.unwrap();
assert_eq!(result.num_rows, 3);
// Batches that route to different shards are rejected; the validation
// runs before any write, so no partial write is left behind.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let err = builder
.execute(id_region_multi_reader(vec![
vec![(5, "us")],
vec![(6, "eu")],
]))
.await
.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
}
#[tokio::test]
async fn lsm_merge_insert_use_lsm_write_true_requires_spec() {
let dir = tempdir().unwrap();
// id_value_table sets a primary key but no LSM write spec.
let table = id_value_table(&dir).await;
let mut builder = table.merge_insert(&["id"]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all()
.use_lsm_write(true);
let err = builder.execute(id_value_reader(vec![4])).await.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
}
#[tokio::test]
async fn lsm_merge_insert_rejects_second_shard() {
let dir = tempdir().unwrap();
let conn = connect(dir.path().to_str().unwrap())
.execute()
.await
.unwrap();
let table = conn
.create_table("t", id_region_reader(vec![(1, "us")]))
.execute()
.await
.unwrap();
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(LsmWriteSpec::identity("region"))
.await
.unwrap();
// The first merge_insert opens the single writer for shard "us".
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
builder
.execute(id_region_reader(vec![(2, "us")]))
.await
.unwrap();
// A merge_insert routing to a different shard is rejected.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let err = builder
.execute(id_region_reader(vec![(3, "eu")]))
.await
.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
// After closing the writer, a different shard can be written.
table.close_lsm_writers().await.unwrap();
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
builder
.execute(id_region_reader(vec![(4, "eu")]))
.await
.unwrap();
}
}

File diff suppressed because it is too large Load Diff