mirror of
https://github.com/lancedb/lancedb.git
synced 2026-03-26 02:20:40 +00:00
Compare commits
4 Commits
codex/upda
...
jack/memta
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
01c6b9dcb8 | ||
|
|
33a13f0738 | ||
|
|
cabc75f167 | ||
|
|
97ca9bb943 |
112
Cargo.lock
generated
112
Cargo.lock
generated
@@ -3088,8 +3088,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
|
||||
|
||||
[[package]]
|
||||
name = "fsst"
|
||||
version = "3.0.0-rc.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
|
||||
version = "3.0.0-rc.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"rand 0.9.2",
|
||||
@@ -4260,8 +4260,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance"
|
||||
version = "3.0.0-rc.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
|
||||
version = "3.0.0-rc.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-arith",
|
||||
@@ -4315,7 +4315,7 @@ dependencies = [
|
||||
"semver",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"snafu",
|
||||
"snafu 0.9.0",
|
||||
"tantivy",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -4327,8 +4327,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-arrow"
|
||||
version = "3.0.0-rc.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
|
||||
version = "3.0.0-rc.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4338,6 +4338,7 @@ dependencies = [
|
||||
"arrow-schema",
|
||||
"arrow-select",
|
||||
"bytes",
|
||||
"futures",
|
||||
"getrandom 0.2.16",
|
||||
"half",
|
||||
"jsonb",
|
||||
@@ -4347,8 +4348,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-bitpacking"
|
||||
version = "3.0.0-rc.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
|
||||
version = "3.0.0-rc.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
|
||||
dependencies = [
|
||||
"arrayref",
|
||||
"paste",
|
||||
@@ -4357,8 +4358,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-core"
|
||||
version = "3.0.0-rc.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
|
||||
version = "3.0.0-rc.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4384,7 +4385,7 @@ dependencies = [
|
||||
"rand 0.9.2",
|
||||
"roaring",
|
||||
"serde_json",
|
||||
"snafu",
|
||||
"snafu 0.9.0",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -4395,8 +4396,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-datafusion"
|
||||
version = "3.0.0-rc.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
|
||||
version = "3.0.0-rc.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4419,15 +4420,15 @@ dependencies = [
|
||||
"pin-project",
|
||||
"prost",
|
||||
"prost-build",
|
||||
"snafu",
|
||||
"snafu 0.9.0",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lance-datagen"
|
||||
version = "3.0.0-rc.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
|
||||
version = "3.0.0-rc.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4445,8 +4446,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-encoding"
|
||||
version = "3.0.0-rc.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
|
||||
version = "3.0.0-rc.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
|
||||
dependencies = [
|
||||
"arrow-arith",
|
||||
"arrow-array",
|
||||
@@ -4473,7 +4474,7 @@ dependencies = [
|
||||
"prost-build",
|
||||
"prost-types",
|
||||
"rand 0.9.2",
|
||||
"snafu",
|
||||
"snafu 0.9.0",
|
||||
"strum",
|
||||
"tokio",
|
||||
"tracing",
|
||||
@@ -4483,8 +4484,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-file"
|
||||
version = "3.0.0-rc.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
|
||||
version = "3.0.0-rc.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
|
||||
dependencies = [
|
||||
"arrow-arith",
|
||||
"arrow-array",
|
||||
@@ -4509,15 +4510,15 @@ dependencies = [
|
||||
"prost",
|
||||
"prost-build",
|
||||
"prost-types",
|
||||
"snafu",
|
||||
"snafu 0.9.0",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lance-index"
|
||||
version = "3.0.0-rc.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
|
||||
version = "3.0.0-rc.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-arith",
|
||||
@@ -4569,7 +4570,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"smallvec",
|
||||
"snafu",
|
||||
"snafu 0.9.0",
|
||||
"tantivy",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
@@ -4580,8 +4581,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-io"
|
||||
version = "3.0.0-rc.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
|
||||
version = "3.0.0-rc.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-arith",
|
||||
@@ -4613,7 +4614,7 @@ dependencies = [
|
||||
"prost",
|
||||
"rand 0.9.2",
|
||||
"serde",
|
||||
"snafu",
|
||||
"snafu 0.9.0",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tracing",
|
||||
@@ -4622,8 +4623,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-linalg"
|
||||
version = "3.0.0-rc.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
|
||||
version = "3.0.0-rc.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4639,21 +4640,21 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace"
|
||||
version = "3.0.0-rc.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
|
||||
version = "3.0.0-rc.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"lance-core",
|
||||
"lance-namespace-reqwest-client",
|
||||
"snafu",
|
||||
"snafu 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace-impls"
|
||||
version = "3.0.0-rc.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
|
||||
version = "3.0.0-rc.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-ipc",
|
||||
@@ -4675,7 +4676,7 @@ dependencies = [
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"snafu",
|
||||
"snafu 0.9.0",
|
||||
"tokio",
|
||||
"tower",
|
||||
"tower-http 0.5.2",
|
||||
@@ -4697,8 +4698,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-table"
|
||||
version = "3.0.0-rc.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
|
||||
version = "3.0.0-rc.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4728,7 +4729,7 @@ dependencies = [
|
||||
"semver",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"snafu",
|
||||
"snafu 0.9.0",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"url",
|
||||
@@ -4737,8 +4738,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-testing"
|
||||
version = "3.0.0-rc.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
|
||||
version = "3.0.0-rc.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-schema",
|
||||
@@ -4819,7 +4820,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"snafu",
|
||||
"snafu 0.8.9",
|
||||
"tempfile",
|
||||
"test-log",
|
||||
"tokenizers",
|
||||
@@ -4865,7 +4866,7 @@ dependencies = [
|
||||
"pyo3",
|
||||
"pyo3-async-runtimes",
|
||||
"pyo3-build-config",
|
||||
"snafu",
|
||||
"snafu 0.8.9",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
@@ -7777,7 +7778,16 @@ version = "0.8.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6e84b3f4eacbf3a1ce05eac6763b4d629d60cbc94d632e4092c54ade71f1e1a2"
|
||||
dependencies = [
|
||||
"snafu-derive",
|
||||
"snafu-derive 0.8.9",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "snafu"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d1d4bced6a69f90b2056c03dcff2c4737f98d6fb9e0853493996e1d253ca29c6"
|
||||
dependencies = [
|
||||
"snafu-derive 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7792,6 +7802,18 @@ dependencies = [
|
||||
"syn 2.0.114",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "snafu-derive"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "54254b8531cafa275c5e096f62d48c81435d1015405a91198ddb11e967301d40"
|
||||
dependencies = [
|
||||
"heck 0.4.1",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.114",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
version = "0.5.10"
|
||||
|
||||
28
Cargo.toml
28
Cargo.toml
@@ -15,20 +15,20 @@ categories = ["database-implementations"]
|
||||
rust-version = "1.91.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { "version" = "=3.0.0-rc.2", default-features = false, "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=3.0.0-rc.2", default-features = false, "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=3.0.0-rc.2", default-features = false, "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance = { "version" = "=3.0.0-rc.3", default-features = false, "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=3.0.0-rc.3", default-features = false, "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=3.0.0-rc.3", default-features = false, "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
ahash = "0.8"
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "57.2", optional = false }
|
||||
|
||||
@@ -145,6 +145,7 @@ impl From<ClientConfig> for lancedb::remote::ClientConfig {
|
||||
id_delimiter: config.id_delimiter,
|
||||
tls_config: config.tls_config.map(Into::into),
|
||||
header_provider: None, // the header provider is set separately later
|
||||
mem_wal_enabled: None, // mem_wal is set per-operation in merge_insert
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ class LanceMergeInsertBuilder(object):
|
||||
self._when_not_matched_by_source_condition = None
|
||||
self._timeout = None
|
||||
self._use_index = True
|
||||
self._mem_wal = False
|
||||
|
||||
def when_matched_update_all(
|
||||
self, *, where: Optional[str] = None
|
||||
@@ -96,6 +97,47 @@ class LanceMergeInsertBuilder(object):
|
||||
self._use_index = use_index
|
||||
return self
|
||||
|
||||
def mem_wal(self, enabled: bool = True) -> LanceMergeInsertBuilder:
|
||||
"""
|
||||
Enable MemWAL (Memory Write-Ahead Log) mode for this merge insert operation.
|
||||
|
||||
When enabled, the merge insert will route data through a memory node service
|
||||
that buffers writes before flushing to storage. This is only supported for
|
||||
remote (LanceDB Cloud) tables.
|
||||
|
||||
**Important:** MemWAL only supports the upsert pattern. You must use:
|
||||
- `when_matched_update_all()` (without a filter condition)
|
||||
- `when_not_matched_insert_all()`
|
||||
|
||||
MemWAL does NOT support:
|
||||
- `when_matched_update_all(where=...)` with a filter condition
|
||||
- `when_not_matched_by_source_delete()`
|
||||
|
||||
Parameters
|
||||
----------
|
||||
enabled: bool
|
||||
Whether to enable MemWAL mode. Defaults to `True`.
|
||||
|
||||
Raises
|
||||
------
|
||||
NotImplementedError
|
||||
If used on a native (local) table, as MemWAL is only supported for
|
||||
remote tables.
|
||||
ValueError
|
||||
If the merge insert pattern is not supported by MemWAL.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> # Correct usage with MemWAL
|
||||
>>> table.merge_insert(["id"]) \\
|
||||
... .when_matched_update_all() \\
|
||||
... .when_not_matched_insert_all() \\
|
||||
... .mem_wal() \\
|
||||
... .execute(new_data)
|
||||
"""
|
||||
self._mem_wal = enabled
|
||||
return self
|
||||
|
||||
def execute(
|
||||
self,
|
||||
new_data: DATA,
|
||||
|
||||
@@ -4181,6 +4181,7 @@ class AsyncTable:
|
||||
when_not_matched_by_source_condition=merge._when_not_matched_by_source_condition,
|
||||
timeout=merge._timeout,
|
||||
use_index=merge._use_index,
|
||||
mem_wal=merge._mem_wal,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -506,6 +506,7 @@ pub struct PyClientConfig {
|
||||
id_delimiter: Option<String>,
|
||||
tls_config: Option<PyClientTlsConfig>,
|
||||
header_provider: Option<Py<PyAny>>,
|
||||
mem_wal_enabled: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(FromPyObject)]
|
||||
@@ -590,6 +591,7 @@ impl From<PyClientConfig> for lancedb::remote::ClientConfig {
|
||||
id_delimiter: value.id_delimiter,
|
||||
tls_config: value.tls_config.map(Into::into),
|
||||
header_provider,
|
||||
mem_wal_enabled: value.mem_wal_enabled,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,13 +66,10 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
|
||||
.inner
|
||||
.bind(py)
|
||||
.call_method0("fetch_storage_options")
|
||||
.map_err(|e| lance_core::Error::IO {
|
||||
source: Box::new(std::io::Error::other(format!(
|
||||
"Failed to call fetch_storage_options: {}",
|
||||
e
|
||||
))),
|
||||
location: snafu::location!(),
|
||||
})?;
|
||||
.map_err(|e| lance_core::Error::io_source(Box::new(std::io::Error::other(format!(
|
||||
"Failed to call fetch_storage_options: {}",
|
||||
e
|
||||
)))))?;
|
||||
|
||||
// If result is None, return None
|
||||
if result.is_none() {
|
||||
@@ -81,26 +78,19 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
|
||||
|
||||
// Extract the result dict - should be a flat Map<String, String>
|
||||
let result_dict = result.downcast::<PyDict>().map_err(|_| {
|
||||
lance_core::Error::InvalidInput {
|
||||
source: "fetch_storage_options() must return None or a dict of string key-value pairs".into(),
|
||||
location: snafu::location!(),
|
||||
}
|
||||
lance_core::Error::invalid_input(
|
||||
"fetch_storage_options() must return a dict of string key-value pairs or None",
|
||||
)
|
||||
})?;
|
||||
|
||||
// Convert all entries to HashMap<String, String>
|
||||
let mut storage_options = HashMap::new();
|
||||
for (key, value) in result_dict.iter() {
|
||||
let key_str: String = key.extract().map_err(|e| {
|
||||
lance_core::Error::InvalidInput {
|
||||
source: format!("Storage option key must be a string: {}", e).into(),
|
||||
location: snafu::location!(),
|
||||
}
|
||||
lance_core::Error::invalid_input(format!("Storage option key must be a string: {}", e))
|
||||
})?;
|
||||
let value_str: String = value.extract().map_err(|e| {
|
||||
lance_core::Error::InvalidInput {
|
||||
source: format!("Storage option value must be a string: {}", e).into(),
|
||||
location: snafu::location!(),
|
||||
}
|
||||
lance_core::Error::invalid_input(format!("Storage option value must be a string: {}", e))
|
||||
})?;
|
||||
storage_options.insert(key_str, value_str);
|
||||
}
|
||||
@@ -109,13 +99,10 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
|
||||
})
|
||||
})
|
||||
.await
|
||||
.map_err(|e| lance_core::Error::IO {
|
||||
source: Box::new(std::io::Error::other(format!(
|
||||
"Task join error: {}",
|
||||
e
|
||||
))),
|
||||
location: snafu::location!(),
|
||||
})?
|
||||
.map_err(|e| lance_core::Error::io_source(Box::new(std::io::Error::other(format!(
|
||||
"Task join error: {}",
|
||||
e
|
||||
)))))?
|
||||
}
|
||||
|
||||
fn provider_id(&self) -> String {
|
||||
|
||||
@@ -710,6 +710,9 @@ impl Table {
|
||||
if let Some(use_index) = parameters.use_index {
|
||||
builder.use_index(use_index);
|
||||
}
|
||||
if let Some(mem_wal) = parameters.mem_wal {
|
||||
builder.mem_wal(mem_wal);
|
||||
}
|
||||
|
||||
future_into_py(self_.py(), async move {
|
||||
let res = builder.execute(Box::new(batches)).await.infer_error()?;
|
||||
@@ -870,6 +873,7 @@ pub struct MergeInsertParams {
|
||||
when_not_matched_by_source_condition: Option<String>,
|
||||
timeout: Option<std::time::Duration>,
|
||||
use_index: Option<bool>,
|
||||
mem_wal: Option<bool>,
|
||||
}
|
||||
|
||||
#[pyclass]
|
||||
|
||||
@@ -566,8 +566,11 @@ pub struct ConnectBuilder {
|
||||
}
|
||||
|
||||
#[cfg(feature = "remote")]
|
||||
const ENV_VARS_TO_STORAGE_OPTS: [(&str, &str); 1] =
|
||||
[("AZURE_STORAGE_ACCOUNT_NAME", "azure_storage_account_name")];
|
||||
const ENV_VARS_TO_STORAGE_OPTS: [(&str, &str); 3] = [
|
||||
("AZURE_STORAGE_ACCOUNT_NAME", "azure_storage_account_name"),
|
||||
("AZURE_CLIENT_ID", "azure_client_id"),
|
||||
("AZURE_TENANT_ID", "azure_tenant_id"),
|
||||
];
|
||||
|
||||
impl ConnectBuilder {
|
||||
/// Create a new [`ConnectOptions`] with the given database URI.
|
||||
@@ -781,13 +784,19 @@ impl ConnectBuilder {
|
||||
message: "An api_key is required when connecting to LanceDb Cloud".to_string(),
|
||||
})?;
|
||||
|
||||
// Propagate mem_wal_enabled from options to client_config
|
||||
let mut client_config = self.request.client_config;
|
||||
if options.mem_wal_enabled.is_some() {
|
||||
client_config.mem_wal_enabled = options.mem_wal_enabled;
|
||||
}
|
||||
|
||||
let storage_options = StorageOptions(options.storage_options.clone());
|
||||
let internal = Arc::new(crate::remote::db::RemoteDatabase::try_new(
|
||||
&self.request.uri,
|
||||
&api_key,
|
||||
®ion,
|
||||
options.host_override,
|
||||
self.request.client_config,
|
||||
client_config,
|
||||
storage_options.into(),
|
||||
)?);
|
||||
Ok(Connection {
|
||||
|
||||
@@ -11,14 +11,13 @@ use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor};
|
||||
use lance_namespace::{
|
||||
LanceNamespace,
|
||||
models::{
|
||||
CreateEmptyTableRequest, CreateNamespaceRequest, CreateNamespaceResponse,
|
||||
DeclareTableRequest, DescribeNamespaceRequest, DescribeNamespaceResponse,
|
||||
DescribeTableRequest, DropNamespaceRequest, DropNamespaceResponse, DropTableRequest,
|
||||
ListNamespacesRequest, ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
|
||||
CreateNamespaceRequest, CreateNamespaceResponse, DeclareTableRequest,
|
||||
DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
|
||||
DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, ListNamespacesRequest,
|
||||
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
|
||||
},
|
||||
};
|
||||
use lance_namespace_impls::ConnectBuilder;
|
||||
use log::warn;
|
||||
|
||||
use crate::database::ReadConsistency;
|
||||
use crate::error::{Error, Result};
|
||||
@@ -213,63 +212,18 @@ impl Database for LanceNamespaceDatabase {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let (location, initial_storage_options) =
|
||||
match self.namespace.declare_table(declare_request).await {
|
||||
Ok(response) => {
|
||||
let loc = response.location.ok_or_else(|| Error::Runtime {
|
||||
message: "Table location is missing from declare_table response"
|
||||
.to_string(),
|
||||
})?;
|
||||
// Use storage options from response, fall back to self.storage_options
|
||||
let opts = response
|
||||
.storage_options
|
||||
.or_else(|| Some(self.storage_options.clone()))
|
||||
.filter(|o| !o.is_empty());
|
||||
(loc, opts)
|
||||
}
|
||||
Err(e) => {
|
||||
// Check if the error is "not supported" and try create_empty_table as fallback
|
||||
let err_str = e.to_string().to_lowercase();
|
||||
if err_str.contains("not supported") || err_str.contains("not implemented") {
|
||||
warn!(
|
||||
"declare_table is not supported by the namespace client, \
|
||||
falling back to deprecated create_empty_table. \
|
||||
create_empty_table is deprecated and will be removed in Lance 3.0.0. \
|
||||
Please upgrade your namespace client to support declare_table."
|
||||
);
|
||||
#[allow(deprecated)]
|
||||
let create_empty_request = CreateEmptyTableRequest {
|
||||
id: Some(table_id.clone()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
#[allow(deprecated)]
|
||||
let create_response = self
|
||||
.namespace
|
||||
.create_empty_table(create_empty_request)
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to create empty table: {}", e),
|
||||
})?;
|
||||
|
||||
let loc = create_response.location.ok_or_else(|| Error::Runtime {
|
||||
message: "Table location is missing from create_empty_table response"
|
||||
.to_string(),
|
||||
})?;
|
||||
// For deprecated path, use self.storage_options
|
||||
let opts = if self.storage_options.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(self.storage_options.clone())
|
||||
};
|
||||
(loc, opts)
|
||||
} else {
|
||||
return Err(Error::Runtime {
|
||||
message: format!("Failed to declare table: {}", e),
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
let (location, initial_storage_options) = {
|
||||
let response = self.namespace.declare_table(declare_request).await?;
|
||||
let loc = response.location.ok_or_else(|| Error::Runtime {
|
||||
message: "Table location is missing from declare_table response".to_string(),
|
||||
})?;
|
||||
// Use storage options from response, fall back to self.storage_options
|
||||
let opts = response
|
||||
.storage_options
|
||||
.or_else(|| Some(self.storage_options.clone()))
|
||||
.filter(|o| !o.is_empty());
|
||||
(loc, opts)
|
||||
};
|
||||
|
||||
let write_params = if let Some(storage_opts) = initial_storage_options {
|
||||
let mut params = request.write_options.lance_write_params.unwrap_or_default();
|
||||
|
||||
@@ -14,6 +14,7 @@ use crate::remote::db::RemoteOptions;
|
||||
use crate::remote::retry::{ResolvedRetryConfig, RetryCounter};
|
||||
|
||||
const REQUEST_ID_HEADER: HeaderName = HeaderName::from_static("x-request-id");
|
||||
const MEM_WAL_ENABLED_HEADER: HeaderName = HeaderName::from_static("x-lancedb-mem-wal-enabled");
|
||||
|
||||
/// Configuration for TLS/mTLS settings.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
@@ -52,6 +53,10 @@ pub struct ClientConfig {
|
||||
pub tls_config: Option<TlsConfig>,
|
||||
/// Provider for custom headers to be added to each request
|
||||
pub header_provider: Option<Arc<dyn HeaderProvider>>,
|
||||
/// Enable MemWAL write path for streaming writes.
|
||||
/// When true, write operations will use the MemWAL architecture
|
||||
/// for high-performance streaming writes.
|
||||
pub mem_wal_enabled: Option<bool>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ClientConfig {
|
||||
@@ -67,6 +72,7 @@ impl std::fmt::Debug for ClientConfig {
|
||||
"header_provider",
|
||||
&self.header_provider.as_ref().map(|_| "Some(...)"),
|
||||
)
|
||||
.field("mem_wal_enabled", &self.mem_wal_enabled)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -81,6 +87,7 @@ impl Default for ClientConfig {
|
||||
id_delimiter: None,
|
||||
tls_config: None,
|
||||
header_provider: None,
|
||||
mem_wal_enabled: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -446,13 +453,23 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
|
||||
})?,
|
||||
);
|
||||
}
|
||||
if let Some(v) = options.0.get("azure_storage_account_name") {
|
||||
headers.insert(
|
||||
HeaderName::from_static("x-azure-storage-account-name"),
|
||||
HeaderValue::from_str(v).map_err(|_| Error::InvalidInput {
|
||||
message: format!("non-ascii storage account name '{}' provided", db_name),
|
||||
})?,
|
||||
);
|
||||
// Map azure storage options to x-azure-* headers.
|
||||
// The option key uses underscores (e.g. "azure_client_id") while the
|
||||
// header uses hyphens (e.g. "x-azure-client-id").
|
||||
let azure_opts: [(&str, &str); 3] = [
|
||||
("azure_storage_account_name", "x-azure-storage-account-name"),
|
||||
("azure_client_id", "x-azure-client-id"),
|
||||
("azure_tenant_id", "x-azure-tenant-id"),
|
||||
];
|
||||
for (opt_key, header_name) in azure_opts {
|
||||
if let Some(v) = options.0.get(opt_key) {
|
||||
headers.insert(
|
||||
HeaderName::from_static(header_name),
|
||||
HeaderValue::from_str(v).map_err(|_| Error::InvalidInput {
|
||||
message: format!("non-ascii value '{}' for option '{}'", v, opt_key),
|
||||
})?,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
for (key, value) in &config.extra_headers {
|
||||
@@ -467,6 +484,11 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
|
||||
);
|
||||
}
|
||||
|
||||
// Add MemWAL header if enabled
|
||||
if let Some(true) = config.mem_wal_enabled {
|
||||
headers.insert(MEM_WAL_ENABLED_HEADER, HeaderValue::from_static("true"));
|
||||
}
|
||||
|
||||
Ok(headers)
|
||||
}
|
||||
|
||||
@@ -1075,4 +1097,34 @@ mod tests {
|
||||
_ => panic!("Expected Runtime error"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_default_headers_azure_opts() {
|
||||
let mut opts = HashMap::new();
|
||||
opts.insert(
|
||||
"azure_storage_account_name".to_string(),
|
||||
"myaccount".to_string(),
|
||||
);
|
||||
opts.insert("azure_client_id".to_string(), "my-client-id".to_string());
|
||||
opts.insert("azure_tenant_id".to_string(), "my-tenant-id".to_string());
|
||||
let remote_opts = RemoteOptions::new(opts);
|
||||
|
||||
let headers = RestfulLanceDbClient::<Sender>::default_headers(
|
||||
"test-key",
|
||||
"us-east-1",
|
||||
"testdb",
|
||||
false,
|
||||
&remote_opts,
|
||||
None,
|
||||
&ClientConfig::default(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
headers.get("x-azure-storage-account-name").unwrap(),
|
||||
"myaccount"
|
||||
);
|
||||
assert_eq!(headers.get("x-azure-client-id").unwrap(), "my-client-id");
|
||||
assert_eq!(headers.get("x-azure-tenant-id").unwrap(), "my-tenant-id");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,6 +78,7 @@ pub const OPT_REMOTE_PREFIX: &str = "remote_database_";
|
||||
pub const OPT_REMOTE_API_KEY: &str = "remote_database_api_key";
|
||||
pub const OPT_REMOTE_REGION: &str = "remote_database_region";
|
||||
pub const OPT_REMOTE_HOST_OVERRIDE: &str = "remote_database_host_override";
|
||||
pub const OPT_REMOTE_MEM_WAL_ENABLED: &str = "remote_database_mem_wal_enabled";
|
||||
// TODO: add support for configuring client config via key/value options
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
@@ -98,6 +99,12 @@ pub struct RemoteDatabaseOptions {
|
||||
/// These options are only used for LanceDB Enterprise and only a subset of options
|
||||
/// are supported.
|
||||
pub storage_options: HashMap<String, String>,
|
||||
/// Enable MemWAL write path for high-performance streaming writes.
|
||||
///
|
||||
/// When enabled, write operations (insert, merge_insert, etc.) will use
|
||||
/// the MemWAL architecture which buffers writes in memory and Write-Ahead Log
|
||||
/// before asynchronously merging to the base table.
|
||||
pub mem_wal_enabled: Option<bool>,
|
||||
}
|
||||
|
||||
impl RemoteDatabaseOptions {
|
||||
@@ -109,6 +116,9 @@ impl RemoteDatabaseOptions {
|
||||
let api_key = map.get(OPT_REMOTE_API_KEY).cloned();
|
||||
let region = map.get(OPT_REMOTE_REGION).cloned();
|
||||
let host_override = map.get(OPT_REMOTE_HOST_OVERRIDE).cloned();
|
||||
let mem_wal_enabled = map
|
||||
.get(OPT_REMOTE_MEM_WAL_ENABLED)
|
||||
.map(|v| v.to_lowercase() == "true");
|
||||
let storage_options = map
|
||||
.iter()
|
||||
.filter(|(key, _)| !key.starts_with(OPT_REMOTE_PREFIX))
|
||||
@@ -119,6 +129,7 @@ impl RemoteDatabaseOptions {
|
||||
region,
|
||||
host_override,
|
||||
storage_options,
|
||||
mem_wal_enabled,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -137,6 +148,12 @@ impl DatabaseOptions for RemoteDatabaseOptions {
|
||||
if let Some(host_override) = &self.host_override {
|
||||
map.insert(OPT_REMOTE_HOST_OVERRIDE.to_string(), host_override.clone());
|
||||
}
|
||||
if let Some(mem_wal_enabled) = &self.mem_wal_enabled {
|
||||
map.insert(
|
||||
OPT_REMOTE_MEM_WAL_ENABLED.to_string(),
|
||||
mem_wal_enabled.to_string(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,6 +198,20 @@ impl RemoteDatabaseOptionsBuilder {
|
||||
self.options.host_override = Some(host_override);
|
||||
self
|
||||
}
|
||||
|
||||
/// Enable MemWAL write path for high-performance streaming writes.
|
||||
///
|
||||
/// When enabled, write operations will use the MemWAL architecture
|
||||
/// which buffers writes in memory and Write-Ahead Log before
|
||||
/// asynchronously merging to the base table.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `enabled` - Whether to enable MemWAL writes
|
||||
pub fn mem_wal_enabled(mut self, enabled: bool) -> Self {
|
||||
self.options.mem_wal_enabled = Some(enabled);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -777,7 +808,12 @@ impl RemoteOptions {
|
||||
|
||||
impl From<StorageOptions> for RemoteOptions {
|
||||
fn from(options: StorageOptions) -> Self {
|
||||
let supported_opts = vec!["account_name", "azure_storage_account_name"];
|
||||
let supported_opts = vec![
|
||||
"account_name",
|
||||
"azure_storage_account_name",
|
||||
"azure_client_id",
|
||||
"azure_tenant_id",
|
||||
];
|
||||
let mut filtered = HashMap::new();
|
||||
for opt in supported_opts {
|
||||
if let Some(v) = options.0.get(opt) {
|
||||
|
||||
@@ -62,6 +62,7 @@ use std::time::Duration;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
|
||||
const MEM_WAL_ENABLED_HEADER: HeaderName = HeaderName::from_static("x-lancedb-mem-wal-enabled");
|
||||
const METRIC_TYPE_KEY: &str = "metric_type";
|
||||
const INDEX_TYPE_KEY: &str = "index_type";
|
||||
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
|
||||
@@ -1359,6 +1360,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
self.check_mutable().await?;
|
||||
|
||||
let timeout = params.timeout;
|
||||
let mem_wal = params.mem_wal;
|
||||
|
||||
let query = MergeInsertRequest::try_from(params)?;
|
||||
let mut request = self
|
||||
@@ -1374,6 +1376,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
}
|
||||
}
|
||||
|
||||
if mem_wal {
|
||||
request = request.header(MEM_WAL_ENABLED_HEADER, "true");
|
||||
}
|
||||
|
||||
let (request_id, response) = self.send_streaming(request, new_data, true).await?;
|
||||
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
|
||||
@@ -55,6 +55,7 @@ pub struct MergeInsertBuilder {
|
||||
pub(crate) when_not_matched_by_source_delete_filt: Option<String>,
|
||||
pub(crate) timeout: Option<Duration>,
|
||||
pub(crate) use_index: bool,
|
||||
pub(crate) mem_wal: bool,
|
||||
}
|
||||
|
||||
impl MergeInsertBuilder {
|
||||
@@ -69,6 +70,7 @@ impl MergeInsertBuilder {
|
||||
when_not_matched_by_source_delete_filt: None,
|
||||
timeout: None,
|
||||
use_index: true,
|
||||
mem_wal: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,13 +150,65 @@ impl MergeInsertBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Enables MemWAL (Memory Write-Ahead Log) mode for this merge insert operation.
|
||||
///
|
||||
/// When enabled, the merge insert will route data through a memory node service
|
||||
/// that buffers writes before flushing to storage. This is only supported for
|
||||
/// remote (LanceDB Cloud) tables.
|
||||
///
|
||||
/// If not set, defaults to `false`.
|
||||
pub fn mem_wal(&mut self, enabled: bool) -> &mut Self {
|
||||
self.mem_wal = enabled;
|
||||
self
|
||||
}
|
||||
|
||||
/// Executes the merge insert operation
|
||||
///
|
||||
/// Returns version and statistics about the merge operation including the number of rows
|
||||
/// inserted, updated, and deleted.
|
||||
pub async fn execute(self, new_data: Box<dyn RecordBatchReader + Send>) -> Result<MergeResult> {
|
||||
// Validate MemWAL constraints before execution
|
||||
if self.mem_wal {
|
||||
self.validate_mem_wal_pattern()?;
|
||||
}
|
||||
self.table.clone().merge_insert(self, new_data).await
|
||||
}
|
||||
|
||||
/// Validate that the merge insert pattern is supported by MemWAL.
|
||||
///
|
||||
/// MemWAL only supports the upsert pattern:
|
||||
/// - when_matched_update_all (without filter)
|
||||
/// - when_not_matched_insert_all
|
||||
/// - NO when_not_matched_by_source_delete
|
||||
fn validate_mem_wal_pattern(&self) -> Result<()> {
|
||||
// Must have when_matched_update_all without filter
|
||||
if !self.when_matched_update_all {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "MemWAL requires when_matched_update_all() to be set".to_string(),
|
||||
});
|
||||
}
|
||||
if self.when_matched_update_all_filt.is_some() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "MemWAL does not support conditional when_matched_update_all (no filter allowed)".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Must have when_not_matched_insert_all
|
||||
if !self.when_not_matched_insert_all {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "MemWAL requires when_not_matched_insert_all() to be set".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Must NOT have when_not_matched_by_source_delete
|
||||
if self.when_not_matched_by_source_delete {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "MemWAL does not support when_not_matched_by_source_delete()".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal implementation of the merge insert logic
|
||||
@@ -165,6 +219,14 @@ pub(crate) async fn execute_merge_insert(
|
||||
params: MergeInsertBuilder,
|
||||
new_data: Box<dyn RecordBatchReader + Send>,
|
||||
) -> Result<MergeResult> {
|
||||
if params.mem_wal {
|
||||
return Err(Error::NotSupported {
|
||||
message: "MemWAL is not supported for native (local) tables. \
|
||||
MemWAL is only available for remote (LanceDB Cloud) tables."
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let dataset = table.dataset.get().await?;
|
||||
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
|
||||
match (
|
||||
@@ -324,4 +386,139 @@ mod tests {
|
||||
merge_insert_builder.execute(new_batches).await.unwrap();
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 25);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_mem_wal_validation_valid_pattern() {
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
let batches = merge_insert_test_batches(0, 0);
|
||||
let table = conn
|
||||
.create_table("mem_wal_test", batches)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Valid MemWAL pattern: when_matched_update_all + when_not_matched_insert_all
|
||||
let new_batches = merge_insert_test_batches(5, 1);
|
||||
let mut builder = table.merge_insert(&["i"]);
|
||||
builder.when_matched_update_all(None);
|
||||
builder.when_not_matched_insert_all();
|
||||
builder.mem_wal(true);
|
||||
|
||||
// Should fail because native tables don't support MemWAL, but validation passes
|
||||
let result = builder.execute(new_batches).await;
|
||||
assert!(result.is_err());
|
||||
let err = result.unwrap_err().to_string();
|
||||
assert!(
|
||||
err.contains("MemWAL is not supported for native"),
|
||||
"Expected native table error, got: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_mem_wal_validation_missing_when_matched() {
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
let batches = merge_insert_test_batches(0, 0);
|
||||
let table = conn
|
||||
.create_table("mem_wal_test2", batches)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Missing when_matched_update_all
|
||||
let new_batches = merge_insert_test_batches(5, 1);
|
||||
let mut builder = table.merge_insert(&["i"]);
|
||||
builder.when_not_matched_insert_all();
|
||||
builder.mem_wal(true);
|
||||
|
||||
let result = builder.execute(new_batches).await;
|
||||
assert!(result.is_err());
|
||||
let err = result.unwrap_err().to_string();
|
||||
assert!(
|
||||
err.contains("requires when_matched_update_all"),
|
||||
"Expected validation error, got: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_mem_wal_validation_missing_when_not_matched() {
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
let batches = merge_insert_test_batches(0, 0);
|
||||
let table = conn
|
||||
.create_table("mem_wal_test3", batches)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Missing when_not_matched_insert_all
|
||||
let new_batches = merge_insert_test_batches(5, 1);
|
||||
let mut builder = table.merge_insert(&["i"]);
|
||||
builder.when_matched_update_all(None);
|
||||
builder.mem_wal(true);
|
||||
|
||||
let result = builder.execute(new_batches).await;
|
||||
assert!(result.is_err());
|
||||
let err = result.unwrap_err().to_string();
|
||||
assert!(
|
||||
err.contains("requires when_not_matched_insert_all"),
|
||||
"Expected validation error, got: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_mem_wal_validation_with_filter() {
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
let batches = merge_insert_test_batches(0, 0);
|
||||
let table = conn
|
||||
.create_table("mem_wal_test4", batches)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// With conditional filter - not allowed
|
||||
let new_batches = merge_insert_test_batches(5, 1);
|
||||
let mut builder = table.merge_insert(&["i"]);
|
||||
builder.when_matched_update_all(Some("target.age > 0".to_string()));
|
||||
builder.when_not_matched_insert_all();
|
||||
builder.mem_wal(true);
|
||||
|
||||
let result = builder.execute(new_batches).await;
|
||||
assert!(result.is_err());
|
||||
let err = result.unwrap_err().to_string();
|
||||
assert!(
|
||||
err.contains("does not support conditional"),
|
||||
"Expected filter validation error, got: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_mem_wal_validation_with_delete() {
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
let batches = merge_insert_test_batches(0, 0);
|
||||
let table = conn
|
||||
.create_table("mem_wal_test5", batches)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// With when_not_matched_by_source_delete - not allowed
|
||||
let new_batches = merge_insert_test_batches(5, 1);
|
||||
let mut builder = table.merge_insert(&["i"]);
|
||||
builder.when_matched_update_all(None);
|
||||
builder.when_not_matched_insert_all();
|
||||
builder.when_not_matched_by_source_delete(None);
|
||||
builder.mem_wal(true);
|
||||
|
||||
let result = builder.execute(new_batches).await;
|
||||
assert!(result.is_err());
|
||||
let err = result.unwrap_err().to_string();
|
||||
assert!(
|
||||
err.contains("does not support when_not_matched_by_source_delete"),
|
||||
"Expected delete validation error, got: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user