Compare commits

...

4 Commits

Author SHA1 Message Date
Jack Ye
01c6b9dcb8 feat: add mem_wal flag to merge insert for MemWAL write path
Add support for enabling MemWAL (Memory Write-Ahead Log) mode on merge insert
operations. This allows streaming writes to route through memory nodes for
high-performance buffered writes.

Changes:
- Add `mem_wal` field to MergeInsertBuilder with validation
- Add `x-lancedb-mem-wal-enabled` header for remote requests
- Add Python `mem_wal()` method to LanceMergeInsertBuilder
- Add validation to ensure only upsert pattern is supported:
  - when_matched_update_all() without filter
  - when_not_matched_insert_all()
- Throw NotSupported error for native tables
- Add mem_wal_enabled to ClientConfig for Python/Node bindings

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2026-03-16 01:04:04 -07:00
Will Jones
33a13f0738 fixes for breaking changes 2026-03-04 15:27:32 -08:00
Will Jones
cabc75f167 feat: upgrade Lance to 3.0.0-rc.3 2026-03-04 14:57:17 -08:00
Wyatt Alt
97ca9bb943 feat: allow passing azure client/tenant ID through remote SDK (#3102)
Prior to this commit we supported passing the azure storage account name
to the lancedb remote SDK through headers. This adds support for client
ID and tenant ID as well.
2026-03-04 11:11:36 -08:00
14 changed files with 471 additions and 158 deletions

112
Cargo.lock generated
View File

@@ -3088,8 +3088,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow-array",
"rand 0.9.2",
@@ -4260,8 +4260,8 @@ dependencies = [
[[package]]
name = "lance"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow",
"arrow-arith",
@@ -4315,7 +4315,7 @@ dependencies = [
"semver",
"serde",
"serde_json",
"snafu",
"snafu 0.9.0",
"tantivy",
"tokio",
"tokio-stream",
@@ -4327,8 +4327,8 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4338,6 +4338,7 @@ dependencies = [
"arrow-schema",
"arrow-select",
"bytes",
"futures",
"getrandom 0.2.16",
"half",
"jsonb",
@@ -4347,8 +4348,8 @@ dependencies = [
[[package]]
name = "lance-bitpacking"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrayref",
"paste",
@@ -4357,8 +4358,8 @@ dependencies = [
[[package]]
name = "lance-core"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4384,7 +4385,7 @@ dependencies = [
"rand 0.9.2",
"roaring",
"serde_json",
"snafu",
"snafu 0.9.0",
"tempfile",
"tokio",
"tokio-stream",
@@ -4395,8 +4396,8 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow",
"arrow-array",
@@ -4419,15 +4420,15 @@ dependencies = [
"pin-project",
"prost",
"prost-build",
"snafu",
"snafu 0.9.0",
"tokio",
"tracing",
]
[[package]]
name = "lance-datagen"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow",
"arrow-array",
@@ -4445,8 +4446,8 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4473,7 +4474,7 @@ dependencies = [
"prost-build",
"prost-types",
"rand 0.9.2",
"snafu",
"snafu 0.9.0",
"strum",
"tokio",
"tracing",
@@ -4483,8 +4484,8 @@ dependencies = [
[[package]]
name = "lance-file"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4509,15 +4510,15 @@ dependencies = [
"prost",
"prost-build",
"prost-types",
"snafu",
"snafu 0.9.0",
"tokio",
"tracing",
]
[[package]]
name = "lance-index"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow",
"arrow-arith",
@@ -4569,7 +4570,7 @@ dependencies = [
"serde",
"serde_json",
"smallvec",
"snafu",
"snafu 0.9.0",
"tantivy",
"tempfile",
"tokio",
@@ -4580,8 +4581,8 @@ dependencies = [
[[package]]
name = "lance-io"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow",
"arrow-arith",
@@ -4613,7 +4614,7 @@ dependencies = [
"prost",
"rand 0.9.2",
"serde",
"snafu",
"snafu 0.9.0",
"tempfile",
"tokio",
"tracing",
@@ -4622,8 +4623,8 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4639,21 +4640,21 @@ dependencies = [
[[package]]
name = "lance-namespace"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow",
"async-trait",
"bytes",
"lance-core",
"lance-namespace-reqwest-client",
"snafu",
"snafu 0.9.0",
]
[[package]]
name = "lance-namespace-impls"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow",
"arrow-ipc",
@@ -4675,7 +4676,7 @@ dependencies = [
"reqwest",
"serde",
"serde_json",
"snafu",
"snafu 0.9.0",
"tokio",
"tower",
"tower-http 0.5.2",
@@ -4697,8 +4698,8 @@ dependencies = [
[[package]]
name = "lance-table"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow",
"arrow-array",
@@ -4728,7 +4729,7 @@ dependencies = [
"semver",
"serde",
"serde_json",
"snafu",
"snafu 0.9.0",
"tokio",
"tracing",
"url",
@@ -4737,8 +4738,8 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -4819,7 +4820,7 @@ dependencies = [
"serde",
"serde_json",
"serde_with",
"snafu",
"snafu 0.8.9",
"tempfile",
"test-log",
"tokenizers",
@@ -4865,7 +4866,7 @@ dependencies = [
"pyo3",
"pyo3-async-runtimes",
"pyo3-build-config",
"snafu",
"snafu 0.8.9",
"tokio",
]
@@ -7777,7 +7778,16 @@ version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e84b3f4eacbf3a1ce05eac6763b4d629d60cbc94d632e4092c54ade71f1e1a2"
dependencies = [
"snafu-derive",
"snafu-derive 0.8.9",
]
[[package]]
name = "snafu"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1d4bced6a69f90b2056c03dcff2c4737f98d6fb9e0853493996e1d253ca29c6"
dependencies = [
"snafu-derive 0.9.0",
]
[[package]]
@@ -7792,6 +7802,18 @@ dependencies = [
"syn 2.0.114",
]
[[package]]
name = "snafu-derive"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54254b8531cafa275c5e096f62d48c81435d1015405a91198ddb11e967301d40"
dependencies = [
"heck 0.4.1",
"proc-macro2",
"quote",
"syn 2.0.114",
]
[[package]]
name = "socket2"
version = "0.5.10"

View File

@@ -15,20 +15,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=3.0.0-rc.2", default-features = false, "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=3.0.0-rc.2", default-features = false, "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=3.0.0-rc.2", default-features = false, "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=3.0.0-rc.3", default-features = false, "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=3.0.0-rc.3", default-features = false, "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=3.0.0-rc.3", default-features = false, "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "57.2", optional = false }

View File

@@ -145,6 +145,7 @@ impl From<ClientConfig> for lancedb::remote::ClientConfig {
id_delimiter: config.id_delimiter,
tls_config: config.tls_config.map(Into::into),
header_provider: None, // the header provider is set separately later
mem_wal_enabled: None, // mem_wal is set per-operation in merge_insert
}
}
}

View File

@@ -34,6 +34,7 @@ class LanceMergeInsertBuilder(object):
self._when_not_matched_by_source_condition = None
self._timeout = None
self._use_index = True
self._mem_wal = False
def when_matched_update_all(
self, *, where: Optional[str] = None
@@ -96,6 +97,47 @@ class LanceMergeInsertBuilder(object):
self._use_index = use_index
return self
def mem_wal(self, enabled: bool = True) -> LanceMergeInsertBuilder:
"""
Enable MemWAL (Memory Write-Ahead Log) mode for this merge insert operation.
When enabled, the merge insert will route data through a memory node service
that buffers writes before flushing to storage. This is only supported for
remote (LanceDB Cloud) tables.
**Important:** MemWAL only supports the upsert pattern. You must use:
- `when_matched_update_all()` (without a filter condition)
- `when_not_matched_insert_all()`
MemWAL does NOT support:
- `when_matched_update_all(where=...)` with a filter condition
- `when_not_matched_by_source_delete()`
Parameters
----------
enabled: bool
Whether to enable MemWAL mode. Defaults to `True`.
Raises
------
NotImplementedError
If used on a native (local) table, as MemWAL is only supported for
remote tables.
ValueError
If the merge insert pattern is not supported by MemWAL.
Examples
--------
>>> # Correct usage with MemWAL
>>> table.merge_insert(["id"]) \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .mem_wal() \\
... .execute(new_data)
"""
self._mem_wal = enabled
return self
def execute(
self,
new_data: DATA,

View File

@@ -4181,6 +4181,7 @@ class AsyncTable:
when_not_matched_by_source_condition=merge._when_not_matched_by_source_condition,
timeout=merge._timeout,
use_index=merge._use_index,
mem_wal=merge._mem_wal,
),
)

View File

@@ -506,6 +506,7 @@ pub struct PyClientConfig {
id_delimiter: Option<String>,
tls_config: Option<PyClientTlsConfig>,
header_provider: Option<Py<PyAny>>,
mem_wal_enabled: Option<bool>,
}
#[derive(FromPyObject)]
@@ -590,6 +591,7 @@ impl From<PyClientConfig> for lancedb::remote::ClientConfig {
id_delimiter: value.id_delimiter,
tls_config: value.tls_config.map(Into::into),
header_provider,
mem_wal_enabled: value.mem_wal_enabled,
}
}
}

View File

@@ -66,13 +66,10 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
.inner
.bind(py)
.call_method0("fetch_storage_options")
.map_err(|e| lance_core::Error::IO {
source: Box::new(std::io::Error::other(format!(
"Failed to call fetch_storage_options: {}",
e
))),
location: snafu::location!(),
})?;
.map_err(|e| lance_core::Error::io_source(Box::new(std::io::Error::other(format!(
"Failed to call fetch_storage_options: {}",
e
)))))?;
// If result is None, return None
if result.is_none() {
@@ -81,26 +78,19 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
// Extract the result dict - should be a flat Map<String, String>
let result_dict = result.downcast::<PyDict>().map_err(|_| {
lance_core::Error::InvalidInput {
source: "fetch_storage_options() must return None or a dict of string key-value pairs".into(),
location: snafu::location!(),
}
lance_core::Error::invalid_input(
"fetch_storage_options() must return a dict of string key-value pairs or None",
)
})?;
// Convert all entries to HashMap<String, String>
let mut storage_options = HashMap::new();
for (key, value) in result_dict.iter() {
let key_str: String = key.extract().map_err(|e| {
lance_core::Error::InvalidInput {
source: format!("Storage option key must be a string: {}", e).into(),
location: snafu::location!(),
}
lance_core::Error::invalid_input(format!("Storage option key must be a string: {}", e))
})?;
let value_str: String = value.extract().map_err(|e| {
lance_core::Error::InvalidInput {
source: format!("Storage option value must be a string: {}", e).into(),
location: snafu::location!(),
}
lance_core::Error::invalid_input(format!("Storage option value must be a string: {}", e))
})?;
storage_options.insert(key_str, value_str);
}
@@ -109,13 +99,10 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
})
})
.await
.map_err(|e| lance_core::Error::IO {
source: Box::new(std::io::Error::other(format!(
"Task join error: {}",
e
))),
location: snafu::location!(),
})?
.map_err(|e| lance_core::Error::io_source(Box::new(std::io::Error::other(format!(
"Task join error: {}",
e
)))))?
}
fn provider_id(&self) -> String {

View File

@@ -710,6 +710,9 @@ impl Table {
if let Some(use_index) = parameters.use_index {
builder.use_index(use_index);
}
if let Some(mem_wal) = parameters.mem_wal {
builder.mem_wal(mem_wal);
}
future_into_py(self_.py(), async move {
let res = builder.execute(Box::new(batches)).await.infer_error()?;
@@ -870,6 +873,7 @@ pub struct MergeInsertParams {
when_not_matched_by_source_condition: Option<String>,
timeout: Option<std::time::Duration>,
use_index: Option<bool>,
mem_wal: Option<bool>,
}
#[pyclass]

View File

@@ -566,8 +566,11 @@ pub struct ConnectBuilder {
}
#[cfg(feature = "remote")]
const ENV_VARS_TO_STORAGE_OPTS: [(&str, &str); 1] =
[("AZURE_STORAGE_ACCOUNT_NAME", "azure_storage_account_name")];
const ENV_VARS_TO_STORAGE_OPTS: [(&str, &str); 3] = [
("AZURE_STORAGE_ACCOUNT_NAME", "azure_storage_account_name"),
("AZURE_CLIENT_ID", "azure_client_id"),
("AZURE_TENANT_ID", "azure_tenant_id"),
];
impl ConnectBuilder {
/// Create a new [`ConnectOptions`] with the given database URI.
@@ -781,13 +784,19 @@ impl ConnectBuilder {
message: "An api_key is required when connecting to LanceDb Cloud".to_string(),
})?;
// Propagate mem_wal_enabled from options to client_config
let mut client_config = self.request.client_config;
if options.mem_wal_enabled.is_some() {
client_config.mem_wal_enabled = options.mem_wal_enabled;
}
let storage_options = StorageOptions(options.storage_options.clone());
let internal = Arc::new(crate::remote::db::RemoteDatabase::try_new(
&self.request.uri,
&api_key,
&region,
options.host_override,
self.request.client_config,
client_config,
storage_options.into(),
)?);
Ok(Connection {

View File

@@ -11,14 +11,13 @@ use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor};
use lance_namespace::{
LanceNamespace,
models::{
CreateEmptyTableRequest, CreateNamespaceRequest, CreateNamespaceResponse,
DeclareTableRequest, DescribeNamespaceRequest, DescribeNamespaceResponse,
DescribeTableRequest, DropNamespaceRequest, DropNamespaceResponse, DropTableRequest,
ListNamespacesRequest, ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
CreateNamespaceRequest, CreateNamespaceResponse, DeclareTableRequest,
DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, ListNamespacesRequest,
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
},
};
use lance_namespace_impls::ConnectBuilder;
use log::warn;
use crate::database::ReadConsistency;
use crate::error::{Error, Result};
@@ -213,63 +212,18 @@ impl Database for LanceNamespaceDatabase {
..Default::default()
};
let (location, initial_storage_options) =
match self.namespace.declare_table(declare_request).await {
Ok(response) => {
let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response"
.to_string(),
})?;
// Use storage options from response, fall back to self.storage_options
let opts = response
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o| !o.is_empty());
(loc, opts)
}
Err(e) => {
// Check if the error is "not supported" and try create_empty_table as fallback
let err_str = e.to_string().to_lowercase();
if err_str.contains("not supported") || err_str.contains("not implemented") {
warn!(
"declare_table is not supported by the namespace client, \
falling back to deprecated create_empty_table. \
create_empty_table is deprecated and will be removed in Lance 3.0.0. \
Please upgrade your namespace client to support declare_table."
);
#[allow(deprecated)]
let create_empty_request = CreateEmptyTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
#[allow(deprecated)]
let create_response = self
.namespace
.create_empty_table(create_empty_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to create empty table: {}", e),
})?;
let loc = create_response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from create_empty_table response"
.to_string(),
})?;
// For deprecated path, use self.storage_options
let opts = if self.storage_options.is_empty() {
None
} else {
Some(self.storage_options.clone())
};
(loc, opts)
} else {
return Err(Error::Runtime {
message: format!("Failed to declare table: {}", e),
});
}
}
};
let (location, initial_storage_options) = {
let response = self.namespace.declare_table(declare_request).await?;
let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response".to_string(),
})?;
// Use storage options from response, fall back to self.storage_options
let opts = response
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o| !o.is_empty());
(loc, opts)
};
let write_params = if let Some(storage_opts) = initial_storage_options {
let mut params = request.write_options.lance_write_params.unwrap_or_default();

View File

@@ -14,6 +14,7 @@ use crate::remote::db::RemoteOptions;
use crate::remote::retry::{ResolvedRetryConfig, RetryCounter};
const REQUEST_ID_HEADER: HeaderName = HeaderName::from_static("x-request-id");
const MEM_WAL_ENABLED_HEADER: HeaderName = HeaderName::from_static("x-lancedb-mem-wal-enabled");
/// Configuration for TLS/mTLS settings.
#[derive(Clone, Debug, Default)]
@@ -52,6 +53,10 @@ pub struct ClientConfig {
pub tls_config: Option<TlsConfig>,
/// Provider for custom headers to be added to each request
pub header_provider: Option<Arc<dyn HeaderProvider>>,
/// Enable MemWAL write path for streaming writes.
/// When true, write operations will use the MemWAL architecture
/// for high-performance streaming writes.
pub mem_wal_enabled: Option<bool>,
}
impl std::fmt::Debug for ClientConfig {
@@ -67,6 +72,7 @@ impl std::fmt::Debug for ClientConfig {
"header_provider",
&self.header_provider.as_ref().map(|_| "Some(...)"),
)
.field("mem_wal_enabled", &self.mem_wal_enabled)
.finish()
}
}
@@ -81,6 +87,7 @@ impl Default for ClientConfig {
id_delimiter: None,
tls_config: None,
header_provider: None,
mem_wal_enabled: None,
}
}
}
@@ -446,13 +453,23 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
})?,
);
}
if let Some(v) = options.0.get("azure_storage_account_name") {
headers.insert(
HeaderName::from_static("x-azure-storage-account-name"),
HeaderValue::from_str(v).map_err(|_| Error::InvalidInput {
message: format!("non-ascii storage account name '{}' provided", db_name),
})?,
);
// Map azure storage options to x-azure-* headers.
// The option key uses underscores (e.g. "azure_client_id") while the
// header uses hyphens (e.g. "x-azure-client-id").
let azure_opts: [(&str, &str); 3] = [
("azure_storage_account_name", "x-azure-storage-account-name"),
("azure_client_id", "x-azure-client-id"),
("azure_tenant_id", "x-azure-tenant-id"),
];
for (opt_key, header_name) in azure_opts {
if let Some(v) = options.0.get(opt_key) {
headers.insert(
HeaderName::from_static(header_name),
HeaderValue::from_str(v).map_err(|_| Error::InvalidInput {
message: format!("non-ascii value '{}' for option '{}'", v, opt_key),
})?,
);
}
}
for (key, value) in &config.extra_headers {
@@ -467,6 +484,11 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
);
}
// Add MemWAL header if enabled
if let Some(true) = config.mem_wal_enabled {
headers.insert(MEM_WAL_ENABLED_HEADER, HeaderValue::from_static("true"));
}
Ok(headers)
}
@@ -1075,4 +1097,34 @@ mod tests {
_ => panic!("Expected Runtime error"),
}
}
#[test]
fn test_default_headers_azure_opts() {
let mut opts = HashMap::new();
opts.insert(
"azure_storage_account_name".to_string(),
"myaccount".to_string(),
);
opts.insert("azure_client_id".to_string(), "my-client-id".to_string());
opts.insert("azure_tenant_id".to_string(), "my-tenant-id".to_string());
let remote_opts = RemoteOptions::new(opts);
let headers = RestfulLanceDbClient::<Sender>::default_headers(
"test-key",
"us-east-1",
"testdb",
false,
&remote_opts,
None,
&ClientConfig::default(),
)
.unwrap();
assert_eq!(
headers.get("x-azure-storage-account-name").unwrap(),
"myaccount"
);
assert_eq!(headers.get("x-azure-client-id").unwrap(), "my-client-id");
assert_eq!(headers.get("x-azure-tenant-id").unwrap(), "my-tenant-id");
}
}

View File

@@ -78,6 +78,7 @@ pub const OPT_REMOTE_PREFIX: &str = "remote_database_";
pub const OPT_REMOTE_API_KEY: &str = "remote_database_api_key";
pub const OPT_REMOTE_REGION: &str = "remote_database_region";
pub const OPT_REMOTE_HOST_OVERRIDE: &str = "remote_database_host_override";
pub const OPT_REMOTE_MEM_WAL_ENABLED: &str = "remote_database_mem_wal_enabled";
// TODO: add support for configuring client config via key/value options
#[derive(Clone, Debug, Default)]
@@ -98,6 +99,12 @@ pub struct RemoteDatabaseOptions {
/// These options are only used for LanceDB Enterprise and only a subset of options
/// are supported.
pub storage_options: HashMap<String, String>,
/// Enable MemWAL write path for high-performance streaming writes.
///
/// When enabled, write operations (insert, merge_insert, etc.) will use
/// the MemWAL architecture which buffers writes in memory and Write-Ahead Log
/// before asynchronously merging to the base table.
pub mem_wal_enabled: Option<bool>,
}
impl RemoteDatabaseOptions {
@@ -109,6 +116,9 @@ impl RemoteDatabaseOptions {
let api_key = map.get(OPT_REMOTE_API_KEY).cloned();
let region = map.get(OPT_REMOTE_REGION).cloned();
let host_override = map.get(OPT_REMOTE_HOST_OVERRIDE).cloned();
let mem_wal_enabled = map
.get(OPT_REMOTE_MEM_WAL_ENABLED)
.map(|v| v.to_lowercase() == "true");
let storage_options = map
.iter()
.filter(|(key, _)| !key.starts_with(OPT_REMOTE_PREFIX))
@@ -119,6 +129,7 @@ impl RemoteDatabaseOptions {
region,
host_override,
storage_options,
mem_wal_enabled,
})
}
}
@@ -137,6 +148,12 @@ impl DatabaseOptions for RemoteDatabaseOptions {
if let Some(host_override) = &self.host_override {
map.insert(OPT_REMOTE_HOST_OVERRIDE.to_string(), host_override.clone());
}
if let Some(mem_wal_enabled) = &self.mem_wal_enabled {
map.insert(
OPT_REMOTE_MEM_WAL_ENABLED.to_string(),
mem_wal_enabled.to_string(),
);
}
}
}
@@ -181,6 +198,20 @@ impl RemoteDatabaseOptionsBuilder {
self.options.host_override = Some(host_override);
self
}
/// Enable MemWAL write path for high-performance streaming writes.
///
/// When enabled, write operations will use the MemWAL architecture
/// which buffers writes in memory and Write-Ahead Log before
/// asynchronously merging to the base table.
///
/// # Arguments
///
/// * `enabled` - Whether to enable MemWAL writes
pub fn mem_wal_enabled(mut self, enabled: bool) -> Self {
self.options.mem_wal_enabled = Some(enabled);
self
}
}
#[derive(Debug)]
@@ -777,7 +808,12 @@ impl RemoteOptions {
impl From<StorageOptions> for RemoteOptions {
fn from(options: StorageOptions) -> Self {
let supported_opts = vec!["account_name", "azure_storage_account_name"];
let supported_opts = vec![
"account_name",
"azure_storage_account_name",
"azure_client_id",
"azure_tenant_id",
];
let mut filtered = HashMap::new();
for opt in supported_opts {
if let Some(v) = options.0.get(opt) {

View File

@@ -62,6 +62,7 @@ use std::time::Duration;
use tokio::sync::RwLock;
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
const MEM_WAL_ENABLED_HEADER: HeaderName = HeaderName::from_static("x-lancedb-mem-wal-enabled");
const METRIC_TYPE_KEY: &str = "metric_type";
const INDEX_TYPE_KEY: &str = "index_type";
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
@@ -1359,6 +1360,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
self.check_mutable().await?;
let timeout = params.timeout;
let mem_wal = params.mem_wal;
let query = MergeInsertRequest::try_from(params)?;
let mut request = self
@@ -1374,6 +1376,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
}
if mem_wal {
request = request.header(MEM_WAL_ENABLED_HEADER, "true");
}
let (request_id, response) = self.send_streaming(request, new_data, true).await?;
let response = self.check_table_response(&request_id, response).await?;

View File

@@ -55,6 +55,7 @@ pub struct MergeInsertBuilder {
pub(crate) when_not_matched_by_source_delete_filt: Option<String>,
pub(crate) timeout: Option<Duration>,
pub(crate) use_index: bool,
pub(crate) mem_wal: bool,
}
impl MergeInsertBuilder {
@@ -69,6 +70,7 @@ impl MergeInsertBuilder {
when_not_matched_by_source_delete_filt: None,
timeout: None,
use_index: true,
mem_wal: false,
}
}
@@ -148,13 +150,65 @@ impl MergeInsertBuilder {
self
}
/// Enables MemWAL (Memory Write-Ahead Log) mode for this merge insert operation.
///
/// When enabled, the merge insert will route data through a memory node service
/// that buffers writes before flushing to storage. This is only supported for
/// remote (LanceDB Cloud) tables.
///
/// If not set, defaults to `false`.
pub fn mem_wal(&mut self, enabled: bool) -> &mut Self {
self.mem_wal = enabled;
self
}
/// Executes the merge insert operation
///
/// Returns version and statistics about the merge operation including the number of rows
/// inserted, updated, and deleted.
pub async fn execute(self, new_data: Box<dyn RecordBatchReader + Send>) -> Result<MergeResult> {
// Validate MemWAL constraints before execution
if self.mem_wal {
self.validate_mem_wal_pattern()?;
}
self.table.clone().merge_insert(self, new_data).await
}
/// Validate that the merge insert pattern is supported by MemWAL.
///
/// MemWAL only supports the upsert pattern:
/// - when_matched_update_all (without filter)
/// - when_not_matched_insert_all
/// - NO when_not_matched_by_source_delete
fn validate_mem_wal_pattern(&self) -> Result<()> {
// Must have when_matched_update_all without filter
if !self.when_matched_update_all {
return Err(Error::InvalidInput {
message: "MemWAL requires when_matched_update_all() to be set".to_string(),
});
}
if self.when_matched_update_all_filt.is_some() {
return Err(Error::InvalidInput {
message: "MemWAL does not support conditional when_matched_update_all (no filter allowed)".to_string(),
});
}
// Must have when_not_matched_insert_all
if !self.when_not_matched_insert_all {
return Err(Error::InvalidInput {
message: "MemWAL requires when_not_matched_insert_all() to be set".to_string(),
});
}
// Must NOT have when_not_matched_by_source_delete
if self.when_not_matched_by_source_delete {
return Err(Error::InvalidInput {
message: "MemWAL does not support when_not_matched_by_source_delete()".to_string(),
});
}
Ok(())
}
}
/// Internal implementation of the merge insert logic
@@ -165,6 +219,14 @@ pub(crate) async fn execute_merge_insert(
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<MergeResult> {
if params.mem_wal {
return Err(Error::NotSupported {
message: "MemWAL is not supported for native (local) tables. \
MemWAL is only available for remote (LanceDB Cloud) tables."
.to_string(),
});
}
let dataset = table.dataset.get().await?;
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
match (
@@ -324,4 +386,139 @@ mod tests {
merge_insert_builder.execute(new_batches).await.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 25);
}
#[tokio::test]
async fn test_mem_wal_validation_valid_pattern() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test", batches)
.execute()
.await
.unwrap();
// Valid MemWAL pattern: when_matched_update_all + when_not_matched_insert_all
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_matched_update_all(None);
builder.when_not_matched_insert_all();
builder.mem_wal(true);
// Should fail because native tables don't support MemWAL, but validation passes
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("MemWAL is not supported for native"),
"Expected native table error, got: {}",
err
);
}
#[tokio::test]
async fn test_mem_wal_validation_missing_when_matched() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test2", batches)
.execute()
.await
.unwrap();
// Missing when_matched_update_all
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_not_matched_insert_all();
builder.mem_wal(true);
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("requires when_matched_update_all"),
"Expected validation error, got: {}",
err
);
}
#[tokio::test]
async fn test_mem_wal_validation_missing_when_not_matched() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test3", batches)
.execute()
.await
.unwrap();
// Missing when_not_matched_insert_all
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_matched_update_all(None);
builder.mem_wal(true);
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("requires when_not_matched_insert_all"),
"Expected validation error, got: {}",
err
);
}
#[tokio::test]
async fn test_mem_wal_validation_with_filter() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test4", batches)
.execute()
.await
.unwrap();
// With conditional filter - not allowed
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_matched_update_all(Some("target.age > 0".to_string()));
builder.when_not_matched_insert_all();
builder.mem_wal(true);
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("does not support conditional"),
"Expected filter validation error, got: {}",
err
);
}
#[tokio::test]
async fn test_mem_wal_validation_with_delete() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test5", batches)
.execute()
.await
.unwrap();
// With when_not_matched_by_source_delete - not allowed
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_matched_update_all(None);
builder.when_not_matched_insert_all();
builder.when_not_matched_by_source_delete(None);
builder.mem_wal(true);
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("does not support when_not_matched_by_source_delete"),
"Expected delete validation error, got: {}",
err
);
}
}