Compare commits

...

3 Commits

Author SHA1 Message Date
Jack Ye
01c6b9dcb8 feat: add mem_wal flag to merge insert for MemWAL write path
Add support for enabling MemWAL (Memory Write-Ahead Log) mode on merge insert
operations. This allows streaming writes to route through memory nodes for
high-performance buffered writes.

Changes:
- Add `mem_wal` field to MergeInsertBuilder with validation
- Add `x-lancedb-mem-wal-enabled` header for remote requests
- Add Python `mem_wal()` method to LanceMergeInsertBuilder
- Add validation to ensure only upsert pattern is supported:
  - when_matched_update_all() without filter
  - when_not_matched_insert_all()
- Throw NotSupported error for native tables
- Add mem_wal_enabled to ClientConfig for Python/Node bindings

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2026-03-16 01:04:04 -07:00
Will Jones
33a13f0738 fixes for breaking changes 2026-03-04 15:27:32 -08:00
Will Jones
cabc75f167 feat: upgrade Lance to 3.0.0-rc.3 2026-03-04 14:57:17 -08:00
14 changed files with 413 additions and 148 deletions

112
Cargo.lock generated
View File

@@ -3088,8 +3088,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow-array",
"rand 0.9.2",
@@ -4260,8 +4260,8 @@ dependencies = [
[[package]]
name = "lance"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow",
"arrow-arith",
@@ -4315,7 +4315,7 @@ dependencies = [
"semver",
"serde",
"serde_json",
"snafu",
"snafu 0.9.0",
"tantivy",
"tokio",
"tokio-stream",
@@ -4327,8 +4327,8 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4338,6 +4338,7 @@ dependencies = [
"arrow-schema",
"arrow-select",
"bytes",
"futures",
"getrandom 0.2.16",
"half",
"jsonb",
@@ -4347,8 +4348,8 @@ dependencies = [
[[package]]
name = "lance-bitpacking"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrayref",
"paste",
@@ -4357,8 +4358,8 @@ dependencies = [
[[package]]
name = "lance-core"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4384,7 +4385,7 @@ dependencies = [
"rand 0.9.2",
"roaring",
"serde_json",
"snafu",
"snafu 0.9.0",
"tempfile",
"tokio",
"tokio-stream",
@@ -4395,8 +4396,8 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow",
"arrow-array",
@@ -4419,15 +4420,15 @@ dependencies = [
"pin-project",
"prost",
"prost-build",
"snafu",
"snafu 0.9.0",
"tokio",
"tracing",
]
[[package]]
name = "lance-datagen"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow",
"arrow-array",
@@ -4445,8 +4446,8 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4473,7 +4474,7 @@ dependencies = [
"prost-build",
"prost-types",
"rand 0.9.2",
"snafu",
"snafu 0.9.0",
"strum",
"tokio",
"tracing",
@@ -4483,8 +4484,8 @@ dependencies = [
[[package]]
name = "lance-file"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4509,15 +4510,15 @@ dependencies = [
"prost",
"prost-build",
"prost-types",
"snafu",
"snafu 0.9.0",
"tokio",
"tracing",
]
[[package]]
name = "lance-index"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow",
"arrow-arith",
@@ -4569,7 +4570,7 @@ dependencies = [
"serde",
"serde_json",
"smallvec",
"snafu",
"snafu 0.9.0",
"tantivy",
"tempfile",
"tokio",
@@ -4580,8 +4581,8 @@ dependencies = [
[[package]]
name = "lance-io"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow",
"arrow-arith",
@@ -4613,7 +4614,7 @@ dependencies = [
"prost",
"rand 0.9.2",
"serde",
"snafu",
"snafu 0.9.0",
"tempfile",
"tokio",
"tracing",
@@ -4622,8 +4623,8 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4639,21 +4640,21 @@ dependencies = [
[[package]]
name = "lance-namespace"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow",
"async-trait",
"bytes",
"lance-core",
"lance-namespace-reqwest-client",
"snafu",
"snafu 0.9.0",
]
[[package]]
name = "lance-namespace-impls"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow",
"arrow-ipc",
@@ -4675,7 +4676,7 @@ dependencies = [
"reqwest",
"serde",
"serde_json",
"snafu",
"snafu 0.9.0",
"tokio",
"tower",
"tower-http 0.5.2",
@@ -4697,8 +4698,8 @@ dependencies = [
[[package]]
name = "lance-table"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow",
"arrow-array",
@@ -4728,7 +4729,7 @@ dependencies = [
"semver",
"serde",
"serde_json",
"snafu",
"snafu 0.9.0",
"tokio",
"tracing",
"url",
@@ -4737,8 +4738,8 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -4819,7 +4820,7 @@ dependencies = [
"serde",
"serde_json",
"serde_with",
"snafu",
"snafu 0.8.9",
"tempfile",
"test-log",
"tokenizers",
@@ -4865,7 +4866,7 @@ dependencies = [
"pyo3",
"pyo3-async-runtimes",
"pyo3-build-config",
"snafu",
"snafu 0.8.9",
"tokio",
]
@@ -7777,7 +7778,16 @@ version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e84b3f4eacbf3a1ce05eac6763b4d629d60cbc94d632e4092c54ade71f1e1a2"
dependencies = [
"snafu-derive",
"snafu-derive 0.8.9",
]
[[package]]
name = "snafu"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1d4bced6a69f90b2056c03dcff2c4737f98d6fb9e0853493996e1d253ca29c6"
dependencies = [
"snafu-derive 0.9.0",
]
[[package]]
@@ -7792,6 +7802,18 @@ dependencies = [
"syn 2.0.114",
]
[[package]]
name = "snafu-derive"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54254b8531cafa275c5e096f62d48c81435d1015405a91198ddb11e967301d40"
dependencies = [
"heck 0.4.1",
"proc-macro2",
"quote",
"syn 2.0.114",
]
[[package]]
name = "socket2"
version = "0.5.10"

View File

@@ -15,20 +15,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=3.0.0-rc.2", default-features = false, "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=3.0.0-rc.2", default-features = false, "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=3.0.0-rc.2", default-features = false, "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=3.0.0-rc.3", default-features = false, "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=3.0.0-rc.3", default-features = false, "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=3.0.0-rc.3", default-features = false, "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "57.2", optional = false }

View File

@@ -145,6 +145,7 @@ impl From<ClientConfig> for lancedb::remote::ClientConfig {
id_delimiter: config.id_delimiter,
tls_config: config.tls_config.map(Into::into),
header_provider: None, // the header provider is set separately later
mem_wal_enabled: None, // mem_wal is set per-operation in merge_insert
}
}
}

View File

@@ -34,6 +34,7 @@ class LanceMergeInsertBuilder(object):
self._when_not_matched_by_source_condition = None
self._timeout = None
self._use_index = True
self._mem_wal = False
def when_matched_update_all(
self, *, where: Optional[str] = None
@@ -96,6 +97,47 @@ class LanceMergeInsertBuilder(object):
self._use_index = use_index
return self
def mem_wal(self, enabled: bool = True) -> LanceMergeInsertBuilder:
"""
Enable MemWAL (Memory Write-Ahead Log) mode for this merge insert operation.
When enabled, the merge insert will route data through a memory node service
that buffers writes before flushing to storage. This is only supported for
remote (LanceDB Cloud) tables.
**Important:** MemWAL only supports the upsert pattern. You must use:
- `when_matched_update_all()` (without a filter condition)
- `when_not_matched_insert_all()`
MemWAL does NOT support:
- `when_matched_update_all(where=...)` with a filter condition
- `when_not_matched_by_source_delete()`
Parameters
----------
enabled: bool
Whether to enable MemWAL mode. Defaults to `True`.
Raises
------
NotImplementedError
If used on a native (local) table, as MemWAL is only supported for
remote tables.
ValueError
If the merge insert pattern is not supported by MemWAL.
Examples
--------
>>> # Correct usage with MemWAL
>>> table.merge_insert(["id"]) \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .mem_wal() \\
... .execute(new_data)
"""
self._mem_wal = enabled
return self
def execute(
self,
new_data: DATA,

View File

@@ -4181,6 +4181,7 @@ class AsyncTable:
when_not_matched_by_source_condition=merge._when_not_matched_by_source_condition,
timeout=merge._timeout,
use_index=merge._use_index,
mem_wal=merge._mem_wal,
),
)

View File

@@ -506,6 +506,7 @@ pub struct PyClientConfig {
id_delimiter: Option<String>,
tls_config: Option<PyClientTlsConfig>,
header_provider: Option<Py<PyAny>>,
mem_wal_enabled: Option<bool>,
}
#[derive(FromPyObject)]
@@ -590,6 +591,7 @@ impl From<PyClientConfig> for lancedb::remote::ClientConfig {
id_delimiter: value.id_delimiter,
tls_config: value.tls_config.map(Into::into),
header_provider,
mem_wal_enabled: value.mem_wal_enabled,
}
}
}

View File

@@ -66,13 +66,10 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
.inner
.bind(py)
.call_method0("fetch_storage_options")
.map_err(|e| lance_core::Error::IO {
source: Box::new(std::io::Error::other(format!(
"Failed to call fetch_storage_options: {}",
e
))),
location: snafu::location!(),
})?;
.map_err(|e| lance_core::Error::io_source(Box::new(std::io::Error::other(format!(
"Failed to call fetch_storage_options: {}",
e
)))))?;
// If result is None, return None
if result.is_none() {
@@ -81,26 +78,19 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
// Extract the result dict - should be a flat Map<String, String>
let result_dict = result.downcast::<PyDict>().map_err(|_| {
lance_core::Error::InvalidInput {
source: "fetch_storage_options() must return None or a dict of string key-value pairs".into(),
location: snafu::location!(),
}
lance_core::Error::invalid_input(
"fetch_storage_options() must return a dict of string key-value pairs or None",
)
})?;
// Convert all entries to HashMap<String, String>
let mut storage_options = HashMap::new();
for (key, value) in result_dict.iter() {
let key_str: String = key.extract().map_err(|e| {
lance_core::Error::InvalidInput {
source: format!("Storage option key must be a string: {}", e).into(),
location: snafu::location!(),
}
lance_core::Error::invalid_input(format!("Storage option key must be a string: {}", e))
})?;
let value_str: String = value.extract().map_err(|e| {
lance_core::Error::InvalidInput {
source: format!("Storage option value must be a string: {}", e).into(),
location: snafu::location!(),
}
lance_core::Error::invalid_input(format!("Storage option value must be a string: {}", e))
})?;
storage_options.insert(key_str, value_str);
}
@@ -109,13 +99,10 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
})
})
.await
.map_err(|e| lance_core::Error::IO {
source: Box::new(std::io::Error::other(format!(
"Task join error: {}",
e
))),
location: snafu::location!(),
})?
.map_err(|e| lance_core::Error::io_source(Box::new(std::io::Error::other(format!(
"Task join error: {}",
e
)))))?
}
fn provider_id(&self) -> String {

View File

@@ -710,6 +710,9 @@ impl Table {
if let Some(use_index) = parameters.use_index {
builder.use_index(use_index);
}
if let Some(mem_wal) = parameters.mem_wal {
builder.mem_wal(mem_wal);
}
future_into_py(self_.py(), async move {
let res = builder.execute(Box::new(batches)).await.infer_error()?;
@@ -870,6 +873,7 @@ pub struct MergeInsertParams {
when_not_matched_by_source_condition: Option<String>,
timeout: Option<std::time::Duration>,
use_index: Option<bool>,
mem_wal: Option<bool>,
}
#[pyclass]

View File

@@ -784,13 +784,19 @@ impl ConnectBuilder {
message: "An api_key is required when connecting to LanceDb Cloud".to_string(),
})?;
// Propagate mem_wal_enabled from options to client_config
let mut client_config = self.request.client_config;
if options.mem_wal_enabled.is_some() {
client_config.mem_wal_enabled = options.mem_wal_enabled;
}
let storage_options = StorageOptions(options.storage_options.clone());
let internal = Arc::new(crate::remote::db::RemoteDatabase::try_new(
&self.request.uri,
&api_key,
&region,
options.host_override,
self.request.client_config,
client_config,
storage_options.into(),
)?);
Ok(Connection {

View File

@@ -11,14 +11,13 @@ use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor};
use lance_namespace::{
LanceNamespace,
models::{
CreateEmptyTableRequest, CreateNamespaceRequest, CreateNamespaceResponse,
DeclareTableRequest, DescribeNamespaceRequest, DescribeNamespaceResponse,
DescribeTableRequest, DropNamespaceRequest, DropNamespaceResponse, DropTableRequest,
ListNamespacesRequest, ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
CreateNamespaceRequest, CreateNamespaceResponse, DeclareTableRequest,
DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, ListNamespacesRequest,
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
},
};
use lance_namespace_impls::ConnectBuilder;
use log::warn;
use crate::database::ReadConsistency;
use crate::error::{Error, Result};
@@ -213,63 +212,18 @@ impl Database for LanceNamespaceDatabase {
..Default::default()
};
let (location, initial_storage_options) =
match self.namespace.declare_table(declare_request).await {
Ok(response) => {
let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response"
.to_string(),
})?;
// Use storage options from response, fall back to self.storage_options
let opts = response
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o| !o.is_empty());
(loc, opts)
}
Err(e) => {
// Check if the error is "not supported" and try create_empty_table as fallback
let err_str = e.to_string().to_lowercase();
if err_str.contains("not supported") || err_str.contains("not implemented") {
warn!(
"declare_table is not supported by the namespace client, \
falling back to deprecated create_empty_table. \
create_empty_table is deprecated and will be removed in Lance 3.0.0. \
Please upgrade your namespace client to support declare_table."
);
#[allow(deprecated)]
let create_empty_request = CreateEmptyTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
#[allow(deprecated)]
let create_response = self
.namespace
.create_empty_table(create_empty_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to create empty table: {}", e),
})?;
let loc = create_response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from create_empty_table response"
.to_string(),
})?;
// For deprecated path, use self.storage_options
let opts = if self.storage_options.is_empty() {
None
} else {
Some(self.storage_options.clone())
};
(loc, opts)
} else {
return Err(Error::Runtime {
message: format!("Failed to declare table: {}", e),
});
}
}
};
let (location, initial_storage_options) = {
let response = self.namespace.declare_table(declare_request).await?;
let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response".to_string(),
})?;
// Use storage options from response, fall back to self.storage_options
let opts = response
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o| !o.is_empty());
(loc, opts)
};
let write_params = if let Some(storage_opts) = initial_storage_options {
let mut params = request.write_options.lance_write_params.unwrap_or_default();

View File

@@ -14,6 +14,7 @@ use crate::remote::db::RemoteOptions;
use crate::remote::retry::{ResolvedRetryConfig, RetryCounter};
const REQUEST_ID_HEADER: HeaderName = HeaderName::from_static("x-request-id");
const MEM_WAL_ENABLED_HEADER: HeaderName = HeaderName::from_static("x-lancedb-mem-wal-enabled");
/// Configuration for TLS/mTLS settings.
#[derive(Clone, Debug, Default)]
@@ -52,6 +53,10 @@ pub struct ClientConfig {
pub tls_config: Option<TlsConfig>,
/// Provider for custom headers to be added to each request
pub header_provider: Option<Arc<dyn HeaderProvider>>,
/// Enable MemWAL write path for streaming writes.
/// When true, write operations will use the MemWAL architecture
/// for high-performance streaming writes.
pub mem_wal_enabled: Option<bool>,
}
impl std::fmt::Debug for ClientConfig {
@@ -67,6 +72,7 @@ impl std::fmt::Debug for ClientConfig {
"header_provider",
&self.header_provider.as_ref().map(|_| "Some(...)"),
)
.field("mem_wal_enabled", &self.mem_wal_enabled)
.finish()
}
}
@@ -81,6 +87,7 @@ impl Default for ClientConfig {
id_delimiter: None,
tls_config: None,
header_provider: None,
mem_wal_enabled: None,
}
}
}
@@ -477,6 +484,11 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
);
}
// Add MemWAL header if enabled
if let Some(true) = config.mem_wal_enabled {
headers.insert(MEM_WAL_ENABLED_HEADER, HeaderValue::from_static("true"));
}
Ok(headers)
}

View File

@@ -78,6 +78,7 @@ pub const OPT_REMOTE_PREFIX: &str = "remote_database_";
pub const OPT_REMOTE_API_KEY: &str = "remote_database_api_key";
pub const OPT_REMOTE_REGION: &str = "remote_database_region";
pub const OPT_REMOTE_HOST_OVERRIDE: &str = "remote_database_host_override";
pub const OPT_REMOTE_MEM_WAL_ENABLED: &str = "remote_database_mem_wal_enabled";
// TODO: add support for configuring client config via key/value options
#[derive(Clone, Debug, Default)]
@@ -98,6 +99,12 @@ pub struct RemoteDatabaseOptions {
/// These options are only used for LanceDB Enterprise and only a subset of options
/// are supported.
pub storage_options: HashMap<String, String>,
/// Enable MemWAL write path for high-performance streaming writes.
///
/// When enabled, write operations (insert, merge_insert, etc.) will use
/// the MemWAL architecture which buffers writes in memory and Write-Ahead Log
/// before asynchronously merging to the base table.
pub mem_wal_enabled: Option<bool>,
}
impl RemoteDatabaseOptions {
@@ -109,6 +116,9 @@ impl RemoteDatabaseOptions {
let api_key = map.get(OPT_REMOTE_API_KEY).cloned();
let region = map.get(OPT_REMOTE_REGION).cloned();
let host_override = map.get(OPT_REMOTE_HOST_OVERRIDE).cloned();
let mem_wal_enabled = map
.get(OPT_REMOTE_MEM_WAL_ENABLED)
.map(|v| v.to_lowercase() == "true");
let storage_options = map
.iter()
.filter(|(key, _)| !key.starts_with(OPT_REMOTE_PREFIX))
@@ -119,6 +129,7 @@ impl RemoteDatabaseOptions {
region,
host_override,
storage_options,
mem_wal_enabled,
})
}
}
@@ -137,6 +148,12 @@ impl DatabaseOptions for RemoteDatabaseOptions {
if let Some(host_override) = &self.host_override {
map.insert(OPT_REMOTE_HOST_OVERRIDE.to_string(), host_override.clone());
}
if let Some(mem_wal_enabled) = &self.mem_wal_enabled {
map.insert(
OPT_REMOTE_MEM_WAL_ENABLED.to_string(),
mem_wal_enabled.to_string(),
);
}
}
}
@@ -181,6 +198,20 @@ impl RemoteDatabaseOptionsBuilder {
self.options.host_override = Some(host_override);
self
}
/// Enable MemWAL write path for high-performance streaming writes.
///
/// When enabled, write operations will use the MemWAL architecture
/// which buffers writes in memory and Write-Ahead Log before
/// asynchronously merging to the base table.
///
/// # Arguments
///
/// * `enabled` - Whether to enable MemWAL writes
pub fn mem_wal_enabled(mut self, enabled: bool) -> Self {
self.options.mem_wal_enabled = Some(enabled);
self
}
}
#[derive(Debug)]

View File

@@ -62,6 +62,7 @@ use std::time::Duration;
use tokio::sync::RwLock;
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
const MEM_WAL_ENABLED_HEADER: HeaderName = HeaderName::from_static("x-lancedb-mem-wal-enabled");
const METRIC_TYPE_KEY: &str = "metric_type";
const INDEX_TYPE_KEY: &str = "index_type";
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
@@ -1359,6 +1360,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
self.check_mutable().await?;
let timeout = params.timeout;
let mem_wal = params.mem_wal;
let query = MergeInsertRequest::try_from(params)?;
let mut request = self
@@ -1374,6 +1376,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
}
if mem_wal {
request = request.header(MEM_WAL_ENABLED_HEADER, "true");
}
let (request_id, response) = self.send_streaming(request, new_data, true).await?;
let response = self.check_table_response(&request_id, response).await?;

View File

@@ -55,6 +55,7 @@ pub struct MergeInsertBuilder {
pub(crate) when_not_matched_by_source_delete_filt: Option<String>,
pub(crate) timeout: Option<Duration>,
pub(crate) use_index: bool,
pub(crate) mem_wal: bool,
}
impl MergeInsertBuilder {
@@ -69,6 +70,7 @@ impl MergeInsertBuilder {
when_not_matched_by_source_delete_filt: None,
timeout: None,
use_index: true,
mem_wal: false,
}
}
@@ -148,13 +150,65 @@ impl MergeInsertBuilder {
self
}
/// Enables MemWAL (Memory Write-Ahead Log) mode for this merge insert operation.
///
/// When enabled, the merge insert will route data through a memory node service
/// that buffers writes before flushing to storage. This is only supported for
/// remote (LanceDB Cloud) tables.
///
/// If not set, defaults to `false`.
pub fn mem_wal(&mut self, enabled: bool) -> &mut Self {
self.mem_wal = enabled;
self
}
/// Executes the merge insert operation
///
/// Returns version and statistics about the merge operation including the number of rows
/// inserted, updated, and deleted.
pub async fn execute(self, new_data: Box<dyn RecordBatchReader + Send>) -> Result<MergeResult> {
// Validate MemWAL constraints before execution
if self.mem_wal {
self.validate_mem_wal_pattern()?;
}
self.table.clone().merge_insert(self, new_data).await
}
/// Validate that the merge insert pattern is supported by MemWAL.
///
/// MemWAL only supports the upsert pattern:
/// - when_matched_update_all (without filter)
/// - when_not_matched_insert_all
/// - NO when_not_matched_by_source_delete
fn validate_mem_wal_pattern(&self) -> Result<()> {
// Must have when_matched_update_all without filter
if !self.when_matched_update_all {
return Err(Error::InvalidInput {
message: "MemWAL requires when_matched_update_all() to be set".to_string(),
});
}
if self.when_matched_update_all_filt.is_some() {
return Err(Error::InvalidInput {
message: "MemWAL does not support conditional when_matched_update_all (no filter allowed)".to_string(),
});
}
// Must have when_not_matched_insert_all
if !self.when_not_matched_insert_all {
return Err(Error::InvalidInput {
message: "MemWAL requires when_not_matched_insert_all() to be set".to_string(),
});
}
// Must NOT have when_not_matched_by_source_delete
if self.when_not_matched_by_source_delete {
return Err(Error::InvalidInput {
message: "MemWAL does not support when_not_matched_by_source_delete()".to_string(),
});
}
Ok(())
}
}
/// Internal implementation of the merge insert logic
@@ -165,6 +219,14 @@ pub(crate) async fn execute_merge_insert(
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<MergeResult> {
if params.mem_wal {
return Err(Error::NotSupported {
message: "MemWAL is not supported for native (local) tables. \
MemWAL is only available for remote (LanceDB Cloud) tables."
.to_string(),
});
}
let dataset = table.dataset.get().await?;
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
match (
@@ -324,4 +386,139 @@ mod tests {
merge_insert_builder.execute(new_batches).await.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 25);
}
#[tokio::test]
async fn test_mem_wal_validation_valid_pattern() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test", batches)
.execute()
.await
.unwrap();
// Valid MemWAL pattern: when_matched_update_all + when_not_matched_insert_all
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_matched_update_all(None);
builder.when_not_matched_insert_all();
builder.mem_wal(true);
// Should fail because native tables don't support MemWAL, but validation passes
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("MemWAL is not supported for native"),
"Expected native table error, got: {}",
err
);
}
#[tokio::test]
async fn test_mem_wal_validation_missing_when_matched() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test2", batches)
.execute()
.await
.unwrap();
// Missing when_matched_update_all
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_not_matched_insert_all();
builder.mem_wal(true);
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("requires when_matched_update_all"),
"Expected validation error, got: {}",
err
);
}
#[tokio::test]
async fn test_mem_wal_validation_missing_when_not_matched() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test3", batches)
.execute()
.await
.unwrap();
// Missing when_not_matched_insert_all
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_matched_update_all(None);
builder.mem_wal(true);
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("requires when_not_matched_insert_all"),
"Expected validation error, got: {}",
err
);
}
#[tokio::test]
async fn test_mem_wal_validation_with_filter() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test4", batches)
.execute()
.await
.unwrap();
// With conditional filter - not allowed
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_matched_update_all(Some("target.age > 0".to_string()));
builder.when_not_matched_insert_all();
builder.mem_wal(true);
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("does not support conditional"),
"Expected filter validation error, got: {}",
err
);
}
#[tokio::test]
async fn test_mem_wal_validation_with_delete() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test5", batches)
.execute()
.await
.unwrap();
// With when_not_matched_by_source_delete - not allowed
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_matched_update_all(None);
builder.when_not_matched_insert_all();
builder.when_not_matched_by_source_delete(None);
builder.mem_wal(true);
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("does not support when_not_matched_by_source_delete"),
"Expected delete validation error, got: {}",
err
);
}
}