Compare commits

..

4 Commits

Author SHA1 Message Date
Lance Release
f31561c5bb Bump version: 0.30.0-beta.3 → 0.30.0-beta.4 2026-03-09 08:45:25 +00:00
Jack Ye
e0c5ceac03 fix: propagate managed versioning for namespace connection (#3111)
Without this fix, if user directly use the native table to do operations
like `add_columns`, even if it is configured to use namespace db
connection, it is not really propagated through.

The fix is to bring lancedb's python binding up to date and do a similar
implementation as https://github.com/lance-format/lance/pull/5968, and
make sure the namespace is fully propagated through all the related
calls.

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-09 01:44:31 -07:00
Prashanth Rao
e93bb3355a docs: add meth/func names to mkdocstrings (#3101)
LanceDB's SDK API docs do not currently show method names under any
given object, and this makes it harder to quickly understand and find
relevant method names for a given class. Geneva docs show the available
methods in the right navigation.

This PR standardizes the appearance of the LanceDB SDK API in the docs
to be more similar to Geneva's.
<img width="1386" height="792" alt="image"
src="https://github.com/user-attachments/assets/30816591-d6d5-495d-886d-e234beeb6059"
/>

<img width="897" height="540" alt="image"
src="https://github.com/user-attachments/assets/d5491b6b-c7bf-4d3b-8b15-1a1a7700e7c9"
/>
2026-03-06 08:54:45 -08:00
Will Jones
b75991eb07 fix: propagate cast errors in add() (#3075)
When we write data with `add()`, we can input data to the table's
schema. However, we were using "safe" mode, which propagates errors as
nulls. For example, if you pass `u64::max` into a field that is a `u32`,
it will just write null instead of giving overflow error. Now it
propagates the overflow. This is the same behavior as other systems like
DuckDB.

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-05 20:24:50 -08:00
30 changed files with 1316 additions and 515 deletions

116
Cargo.lock generated
View File

@@ -3088,8 +3088,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
dependencies = [
"arrow-array",
"rand 0.9.2",
@@ -4260,8 +4260,8 @@ dependencies = [
[[package]]
name = "lance"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
dependencies = [
"arrow",
"arrow-arith",
@@ -4315,7 +4315,7 @@ dependencies = [
"semver",
"serde",
"serde_json",
"snafu 0.9.0",
"snafu",
"tantivy",
"tokio",
"tokio-stream",
@@ -4327,8 +4327,8 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4338,7 +4338,6 @@ dependencies = [
"arrow-schema",
"arrow-select",
"bytes",
"futures",
"getrandom 0.2.16",
"half",
"jsonb",
@@ -4348,8 +4347,8 @@ dependencies = [
[[package]]
name = "lance-bitpacking"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
dependencies = [
"arrayref",
"paste",
@@ -4358,8 +4357,8 @@ dependencies = [
[[package]]
name = "lance-core"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4385,7 +4384,7 @@ dependencies = [
"rand 0.9.2",
"roaring",
"serde_json",
"snafu 0.9.0",
"snafu",
"tempfile",
"tokio",
"tokio-stream",
@@ -4396,8 +4395,8 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
dependencies = [
"arrow",
"arrow-array",
@@ -4420,15 +4419,15 @@ dependencies = [
"pin-project",
"prost",
"prost-build",
"snafu 0.9.0",
"snafu",
"tokio",
"tracing",
]
[[package]]
name = "lance-datagen"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
dependencies = [
"arrow",
"arrow-array",
@@ -4446,8 +4445,8 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4474,7 +4473,7 @@ dependencies = [
"prost-build",
"prost-types",
"rand 0.9.2",
"snafu 0.9.0",
"snafu",
"strum",
"tokio",
"tracing",
@@ -4484,8 +4483,8 @@ dependencies = [
[[package]]
name = "lance-file"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4510,15 +4509,15 @@ dependencies = [
"prost",
"prost-build",
"prost-types",
"snafu 0.9.0",
"snafu",
"tokio",
"tracing",
]
[[package]]
name = "lance-index"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
dependencies = [
"arrow",
"arrow-arith",
@@ -4570,7 +4569,7 @@ dependencies = [
"serde",
"serde_json",
"smallvec",
"snafu 0.9.0",
"snafu",
"tantivy",
"tempfile",
"tokio",
@@ -4581,8 +4580,8 @@ dependencies = [
[[package]]
name = "lance-io"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
dependencies = [
"arrow",
"arrow-arith",
@@ -4614,7 +4613,7 @@ dependencies = [
"prost",
"rand 0.9.2",
"serde",
"snafu 0.9.0",
"snafu",
"tempfile",
"tokio",
"tracing",
@@ -4623,8 +4622,8 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4640,21 +4639,21 @@ dependencies = [
[[package]]
name = "lance-namespace"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
dependencies = [
"arrow",
"async-trait",
"bytes",
"lance-core",
"lance-namespace-reqwest-client",
"snafu 0.9.0",
"snafu",
]
[[package]]
name = "lance-namespace-impls"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
dependencies = [
"arrow",
"arrow-ipc",
@@ -4676,7 +4675,7 @@ dependencies = [
"reqwest",
"serde",
"serde_json",
"snafu 0.9.0",
"snafu",
"tokio",
"tower",
"tower-http 0.5.2",
@@ -4698,8 +4697,8 @@ dependencies = [
[[package]]
name = "lance-table"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
dependencies = [
"arrow",
"arrow-array",
@@ -4729,7 +4728,7 @@ dependencies = [
"semver",
"serde",
"serde_json",
"snafu 0.9.0",
"snafu",
"tokio",
"tracing",
"url",
@@ -4738,8 +4737,8 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "3.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.3#de393a26a068dd297929ca7d798e43dc31c57337"
version = "3.0.0-rc.2"
source = "git+https://github.com/lance-format/lance.git?tag=v3.0.0-rc.2#3fb3e705b8a25ab1bb0fc9e1e0158e8a13356181"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -4820,7 +4819,7 @@ dependencies = [
"serde",
"serde_json",
"serde_with",
"snafu 0.8.9",
"snafu",
"tempfile",
"test-log",
"tokenizers",
@@ -4856,17 +4855,21 @@ version = "0.30.0-beta.3"
dependencies = [
"arrow",
"async-trait",
"bytes",
"env_logger",
"futures",
"lance-core",
"lance-io",
"lance-namespace",
"lance-namespace-impls",
"lancedb",
"pin-project",
"pyo3",
"pyo3-async-runtimes",
"pyo3-build-config",
"snafu 0.8.9",
"serde",
"serde_json",
"snafu",
"tokio",
]
@@ -7778,16 +7781,7 @@ version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e84b3f4eacbf3a1ce05eac6763b4d629d60cbc94d632e4092c54ade71f1e1a2"
dependencies = [
"snafu-derive 0.8.9",
]
[[package]]
name = "snafu"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1d4bced6a69f90b2056c03dcff2c4737f98d6fb9e0853493996e1d253ca29c6"
dependencies = [
"snafu-derive 0.9.0",
"snafu-derive",
]
[[package]]
@@ -7802,18 +7796,6 @@ dependencies = [
"syn 2.0.114",
]
[[package]]
name = "snafu-derive"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54254b8531cafa275c5e096f62d48c81435d1015405a91198ddb11e967301d40"
dependencies = [
"heck 0.4.1",
"proc-macro2",
"quote",
"syn 2.0.114",
]
[[package]]
name = "socket2"
version = "0.5.10"

View File

@@ -15,20 +15,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=3.0.0-rc.3", default-features = false, "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=3.0.0-rc.3", default-features = false, "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=3.0.0-rc.3", default-features = false, "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=3.0.0-rc.2", default-features = false, "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=3.0.0-rc.2", default-features = false, "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=3.0.0-rc.2", default-features = false, "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=3.0.0-rc.2", "tag" = "v3.0.0-rc.2", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "57.2", optional = false }

View File

@@ -52,14 +52,21 @@ plugins:
options:
docstring_style: numpy
heading_level: 3
show_source: true
show_symbol_type_in_heading: true
show_signature_annotations: true
show_root_heading: true
show_docstring_examples: true
show_docstring_attributes: false
show_docstring_other_parameters: true
show_symbol_type_heading: true
show_labels: false
show_if_no_docstring: true
show_source: false
members_order: source
docstring_section_style: list
signature_crossrefs: true
separate_signature: true
filters:
- "!^_"
import:
# for cross references
- https://arrow.apache.org/docs/objects.inv
@@ -113,7 +120,7 @@ markdown_extensions:
emoji_index: !!python/name:material.extensions.emoji.twemoji
emoji_generator: !!python/name:material.extensions.emoji.to_svg
- markdown.extensions.toc:
toc_depth: 3
toc_depth: 4
permalink: true
permalink_title: Anchor link to this section

View File

@@ -145,7 +145,6 @@ impl From<ClientConfig> for lancedb::remote::ClientConfig {
id_delimiter: config.id_delimiter,
tls_config: config.tls_config.map(Into::into),
header_provider: None, // the header provider is set separately later
mem_wal_enabled: None, // mem_wal is set per-operation in merge_insert
}
}
}

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.30.0-beta.3"
current_version = "0.30.0-beta.4"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.30.0-beta.3"
version = "0.30.0-beta.4"
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true
@@ -16,9 +16,11 @@ crate-type = ["cdylib"]
[dependencies]
arrow = { version = "57.2", features = ["pyarrow"] }
async-trait = "0.1"
bytes = "1"
lancedb = { path = "../rust/lancedb", default-features = false }
lance-core.workspace = true
lance-namespace.workspace = true
lance-namespace-impls.workspace = true
lance-io.workspace = true
env_logger.workspace = true
pyo3 = { version = "0.26", features = ["extension-module", "abi3-py39"] }
@@ -28,6 +30,8 @@ pyo3-async-runtimes = { version = "0.26", features = [
] }
pin-project = "1.1.5"
futures.workspace = true
serde = "1"
serde_json = "1"
snafu.workspace = true
tokio = { version = "1.40", features = ["sync"] }

View File

@@ -45,7 +45,7 @@ repository = "https://github.com/lancedb/lancedb"
[project.optional-dependencies]
pylance = [
"pylance>=1.0.0b14",
"pylance>=4.0.0b7",
]
tests = [
"aiohttp",
@@ -59,9 +59,9 @@ tests = [
"polars>=0.19, <=1.3.0",
"tantivy",
"pyarrow-stubs",
"pylance>=1.0.0b14,<3.0.0",
"pylance>=4.0.0b7",
"requests",
"datafusion<52",
"datafusion>=52,<53",
]
dev = [
"ruff",

View File

@@ -8,7 +8,7 @@ from abc import abstractmethod
from datetime import timedelta
from pathlib import Path
import sys
from typing import TYPE_CHECKING, Dict, Iterable, List, Literal, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Literal, Optional, Union
if sys.version_info >= (3, 12):
from typing import override
@@ -1541,6 +1541,8 @@ class AsyncConnection(object):
storage_options_provider: Optional["StorageOptionsProvider"] = None,
index_cache_size: Optional[int] = None,
location: Optional[str] = None,
namespace_client: Optional[Any] = None,
managed_versioning: Optional[bool] = None,
) -> AsyncTable:
"""Open a Lance Table in the database.
@@ -1573,6 +1575,9 @@ class AsyncConnection(object):
The explicit location (URI) of the table. If provided, the table will be
opened from this location instead of deriving it from the database URI
and table name.
managed_versioning: bool, optional
Whether managed versioning is enabled for this table. If provided,
avoids a redundant describe_table call when namespace_client is set.
Returns
-------
@@ -1587,6 +1592,8 @@ class AsyncConnection(object):
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=location,
namespace_client=namespace_client,
managed_versioning=managed_versioning,
)
return AsyncTable(table)

View File

@@ -34,7 +34,6 @@ class LanceMergeInsertBuilder(object):
self._when_not_matched_by_source_condition = None
self._timeout = None
self._use_index = True
self._mem_wal = False
def when_matched_update_all(
self, *, where: Optional[str] = None
@@ -97,47 +96,6 @@ class LanceMergeInsertBuilder(object):
self._use_index = use_index
return self
def mem_wal(self, enabled: bool = True) -> LanceMergeInsertBuilder:
"""
Enable MemWAL (Memory Write-Ahead Log) mode for this merge insert operation.
When enabled, the merge insert will route data through a memory node service
that buffers writes before flushing to storage. This is only supported for
remote (LanceDB Cloud) tables.
**Important:** MemWAL only supports the upsert pattern. You must use:
- `when_matched_update_all()` (without a filter condition)
- `when_not_matched_insert_all()`
MemWAL does NOT support:
- `when_matched_update_all(where=...)` with a filter condition
- `when_not_matched_by_source_delete()`
Parameters
----------
enabled: bool
Whether to enable MemWAL mode. Defaults to `True`.
Raises
------
NotImplementedError
If used on a native (local) table, as MemWAL is only supported for
remote tables.
ValueError
If the merge insert pattern is not supported by MemWAL.
Examples
--------
>>> # Correct usage with MemWAL
>>> table.merge_insert(["id"]) \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .mem_wal() \\
... .execute(new_data)
"""
self._mem_wal = enabled
return self
def execute(
self,
new_data: DATA,

View File

@@ -12,7 +12,7 @@ from __future__ import annotations
import asyncio
import sys
from typing import Dict, Iterable, List, Optional, Union
from typing import Any, Dict, Iterable, List, Optional, Union
if sys.version_info >= (3, 12):
from typing import override
@@ -240,7 +240,7 @@ class LanceNamespaceDBConnection(DBConnection):
session : Optional[Session]
A session to use for this connection
"""
self._ns = namespace
self._namespace_client = namespace
self.read_consistency_interval = read_consistency_interval
self.storage_options = storage_options or {}
self.session = session
@@ -269,7 +269,7 @@ class LanceNamespaceDBConnection(DBConnection):
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._ns.list_tables(request)
response = self._namespace_client.list_tables(request)
return response.tables if response.tables else []
@override
@@ -309,7 +309,9 @@ class LanceNamespaceDBConnection(DBConnection):
# Try to describe the table first to see if it exists
try:
describe_request = DescribeTableRequest(id=table_id)
describe_response = self._ns.describe_table(describe_request)
describe_response = self._namespace_client.describe_table(
describe_request
)
location = describe_response.location
namespace_storage_options = describe_response.storage_options
except Exception:
@@ -323,7 +325,7 @@ class LanceNamespaceDBConnection(DBConnection):
location=None,
properties=self.storage_options if self.storage_options else None,
)
declare_response = self._ns.declare_table(declare_request)
declare_response = self._namespace_client.declare_table(declare_request)
if not declare_response.location:
raise ValueError(
@@ -353,7 +355,7 @@ class LanceNamespaceDBConnection(DBConnection):
# Only create if namespace returned storage_options (not None)
if storage_options_provider is None and namespace_storage_options is not None:
storage_options_provider = LanceNamespaceStorageOptionsProvider(
namespace=self._ns,
namespace=self._namespace_client,
table_id=table_id,
)
@@ -371,6 +373,7 @@ class LanceNamespaceDBConnection(DBConnection):
storage_options=merged_storage_options,
storage_options_provider=storage_options_provider,
location=location,
namespace_client=self._namespace_client,
)
return tbl
@@ -389,7 +392,7 @@ class LanceNamespaceDBConnection(DBConnection):
namespace = []
table_id = namespace + [name]
request = DescribeTableRequest(id=table_id)
response = self._ns.describe_table(request)
response = self._namespace_client.describe_table(request)
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
@@ -402,10 +405,14 @@ class LanceNamespaceDBConnection(DBConnection):
# Only create if namespace returned storage_options (not None)
if storage_options_provider is None and response.storage_options is not None:
storage_options_provider = LanceNamespaceStorageOptionsProvider(
namespace=self._ns,
namespace=self._namespace_client,
table_id=table_id,
)
# Pass managed_versioning to avoid redundant describe_table call in Rust.
# Convert None to False since we already have the answer from describe_table.
managed_versioning = response.managed_versioning is True
return self._lance_table_from_uri(
name,
response.location,
@@ -413,6 +420,8 @@ class LanceNamespaceDBConnection(DBConnection):
storage_options=merged_storage_options,
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
namespace_client=self._namespace_client,
managed_versioning=managed_versioning,
)
@override
@@ -422,7 +431,7 @@ class LanceNamespaceDBConnection(DBConnection):
namespace = []
table_id = namespace + [name]
request = DropTableRequest(id=table_id)
self._ns.drop_table(request)
self._namespace_client.drop_table(request)
@override
def rename_table(
@@ -484,7 +493,7 @@ class LanceNamespaceDBConnection(DBConnection):
request = ListNamespacesRequest(
id=namespace, page_token=page_token, limit=limit
)
response = self._ns.list_namespaces(request)
response = self._namespace_client.list_namespaces(request)
return ListNamespacesResponse(
namespaces=response.namespaces if response.namespaces else [],
page_token=response.page_token,
@@ -520,7 +529,7 @@ class LanceNamespaceDBConnection(DBConnection):
mode=_normalize_create_namespace_mode(mode),
properties=properties,
)
response = self._ns.create_namespace(request)
response = self._namespace_client.create_namespace(request)
return CreateNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@@ -555,7 +564,7 @@ class LanceNamespaceDBConnection(DBConnection):
mode=_normalize_drop_namespace_mode(mode),
behavior=_normalize_drop_namespace_behavior(behavior),
)
response = self._ns.drop_namespace(request)
response = self._namespace_client.drop_namespace(request)
return DropNamespaceResponse(
properties=(
response.properties if hasattr(response, "properties") else None
@@ -581,7 +590,7 @@ class LanceNamespaceDBConnection(DBConnection):
Response containing the namespace properties.
"""
request = DescribeNamespaceRequest(id=namespace)
response = self._ns.describe_namespace(request)
response = self._namespace_client.describe_namespace(request)
return DescribeNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@@ -615,7 +624,7 @@ class LanceNamespaceDBConnection(DBConnection):
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._ns.list_tables(request)
response = self._namespace_client.list_tables(request)
return ListTablesResponse(
tables=response.tables if response.tables else [],
page_token=response.page_token,
@@ -630,6 +639,8 @@ class LanceNamespaceDBConnection(DBConnection):
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
index_cache_size: Optional[int] = None,
namespace_client: Optional[Any] = None,
managed_versioning: Optional[bool] = None,
) -> LanceTable:
# Open a table directly from a URI using the location parameter
# Note: storage_options should already be merged by the caller
@@ -643,6 +654,8 @@ class LanceNamespaceDBConnection(DBConnection):
)
# Open the table using the temporary connection with the location parameter
# Pass namespace_client to enable managed versioning support
# Pass managed_versioning to avoid redundant describe_table call
return LanceTable.open(
temp_conn,
name,
@@ -651,6 +664,8 @@ class LanceNamespaceDBConnection(DBConnection):
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=table_uri,
namespace_client=namespace_client,
managed_versioning=managed_versioning,
)
@@ -685,7 +700,7 @@ class AsyncLanceNamespaceDBConnection:
session : Optional[Session]
A session to use for this connection
"""
self._ns = namespace
self._namespace_client = namespace
self.read_consistency_interval = read_consistency_interval
self.storage_options = storage_options or {}
self.session = session
@@ -713,7 +728,7 @@ class AsyncLanceNamespaceDBConnection:
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._ns.list_tables(request)
response = self._namespace_client.list_tables(request)
return response.tables if response.tables else []
async def create_table(
@@ -750,7 +765,9 @@ class AsyncLanceNamespaceDBConnection:
# Try to describe the table first to see if it exists
try:
describe_request = DescribeTableRequest(id=table_id)
describe_response = self._ns.describe_table(describe_request)
describe_response = self._namespace_client.describe_table(
describe_request
)
location = describe_response.location
namespace_storage_options = describe_response.storage_options
except Exception:
@@ -764,7 +781,7 @@ class AsyncLanceNamespaceDBConnection:
location=None,
properties=self.storage_options if self.storage_options else None,
)
declare_response = self._ns.declare_table(declare_request)
declare_response = self._namespace_client.declare_table(declare_request)
if not declare_response.location:
raise ValueError(
@@ -797,7 +814,7 @@ class AsyncLanceNamespaceDBConnection:
and namespace_storage_options is not None
):
provider = LanceNamespaceStorageOptionsProvider(
namespace=self._ns,
namespace=self._namespace_client,
table_id=table_id,
)
else:
@@ -817,6 +834,7 @@ class AsyncLanceNamespaceDBConnection:
storage_options=merged_storage_options,
storage_options_provider=provider,
location=location,
namespace_client=self._namespace_client,
)
lance_table = await asyncio.to_thread(_create_table)
@@ -837,7 +855,7 @@ class AsyncLanceNamespaceDBConnection:
namespace = []
table_id = namespace + [name]
request = DescribeTableRequest(id=table_id)
response = self._ns.describe_table(request)
response = self._namespace_client.describe_table(request)
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
@@ -849,10 +867,14 @@ class AsyncLanceNamespaceDBConnection:
# Create a storage options provider if not provided by user
if storage_options_provider is None and response.storage_options is not None:
storage_options_provider = LanceNamespaceStorageOptionsProvider(
namespace=self._ns,
namespace=self._namespace_client,
table_id=table_id,
)
# Capture managed_versioning from describe response.
# Convert None to False since we already have the answer from describe_table.
managed_versioning = response.managed_versioning is True
# Open table in a thread
def _open_table():
temp_conn = LanceDBConnection(
@@ -870,6 +892,8 @@ class AsyncLanceNamespaceDBConnection:
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=response.location,
namespace_client=self._namespace_client,
managed_versioning=managed_versioning,
)
lance_table = await asyncio.to_thread(_open_table)
@@ -881,7 +905,7 @@ class AsyncLanceNamespaceDBConnection:
namespace = []
table_id = namespace + [name]
request = DropTableRequest(id=table_id)
self._ns.drop_table(request)
self._namespace_client.drop_table(request)
async def rename_table(
self,
@@ -943,7 +967,7 @@ class AsyncLanceNamespaceDBConnection:
request = ListNamespacesRequest(
id=namespace, page_token=page_token, limit=limit
)
response = self._ns.list_namespaces(request)
response = self._namespace_client.list_namespaces(request)
return ListNamespacesResponse(
namespaces=response.namespaces if response.namespaces else [],
page_token=response.page_token,
@@ -978,7 +1002,7 @@ class AsyncLanceNamespaceDBConnection:
mode=_normalize_create_namespace_mode(mode),
properties=properties,
)
response = self._ns.create_namespace(request)
response = self._namespace_client.create_namespace(request)
return CreateNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@@ -1012,7 +1036,7 @@ class AsyncLanceNamespaceDBConnection:
mode=_normalize_drop_namespace_mode(mode),
behavior=_normalize_drop_namespace_behavior(behavior),
)
response = self._ns.drop_namespace(request)
response = self._namespace_client.drop_namespace(request)
return DropNamespaceResponse(
properties=(
response.properties if hasattr(response, "properties") else None
@@ -1039,7 +1063,7 @@ class AsyncLanceNamespaceDBConnection:
Response containing the namespace properties.
"""
request = DescribeNamespaceRequest(id=namespace)
response = self._ns.describe_namespace(request)
response = self._namespace_client.describe_namespace(request)
return DescribeNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@@ -1072,7 +1096,7 @@ class AsyncLanceNamespaceDBConnection:
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._ns.list_tables(request)
response = self._namespace_client.list_tables(request)
return ListTablesResponse(
tables=response.tables if response.tables else [],
page_token=response.page_token,

View File

@@ -1746,6 +1746,8 @@ class LanceTable(Table):
storage_options_provider: Optional["StorageOptionsProvider"] = None,
index_cache_size: Optional[int] = None,
location: Optional[str] = None,
namespace_client: Optional[Any] = None,
managed_versioning: Optional[bool] = None,
_async: AsyncTable = None,
):
if namespace is None:
@@ -1753,6 +1755,7 @@ class LanceTable(Table):
self._conn = connection
self._namespace = namespace
self._location = location # Store location for use in _dataset_path
self._namespace_client = namespace_client
if _async is not None:
self._table = _async
else:
@@ -1764,6 +1767,8 @@ class LanceTable(Table):
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=location,
namespace_client=namespace_client,
managed_versioning=managed_versioning,
)
)
@@ -1806,6 +1811,8 @@ class LanceTable(Table):
storage_options_provider: Optional["StorageOptionsProvider"] = None,
index_cache_size: Optional[int] = None,
location: Optional[str] = None,
namespace_client: Optional[Any] = None,
managed_versioning: Optional[bool] = None,
):
if namespace is None:
namespace = []
@@ -1817,6 +1824,8 @@ class LanceTable(Table):
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=location,
namespace_client=namespace_client,
managed_versioning=managed_versioning,
)
# check the dataset exists
@@ -1848,6 +1857,16 @@ class LanceTable(Table):
"Please install with `pip install pylance`."
)
if self._namespace_client is not None:
table_id = self._namespace + [self.name]
return lance.dataset(
version=self.version,
storage_options=self._conn.storage_options,
namespace=self._namespace_client,
table_id=table_id,
**kwargs,
)
return lance.dataset(
self._dataset_path,
version=self.version,
@@ -2713,6 +2732,7 @@ class LanceTable(Table):
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
location: Optional[str] = None,
namespace_client: Optional[Any] = None,
):
"""
Create a new table.
@@ -2773,6 +2793,7 @@ class LanceTable(Table):
self._conn = db
self._namespace = namespace
self._location = location
self._namespace_client = namespace_client
if data_storage_version is not None:
warnings.warn(
@@ -4181,7 +4202,6 @@ class AsyncTable:
when_not_matched_by_source_condition=merge._when_not_matched_by_source_condition,
timeout=merge._timeout,
use_index=merge._use_index,
mem_wal=merge._mem_wal,
),
)

View File

@@ -326,6 +326,24 @@ def test_add_struct(mem_db: DBConnection):
table = mem_db.create_table("test2", schema=schema)
table.add(data)
struct_type = pa.struct(
[
("b", pa.int64()),
("a", pa.int64()),
]
)
expected = pa.table(
{
"s_list": [
[
pa.scalar({"b": 1, "a": 2}, type=struct_type),
pa.scalar({"b": 4, "a": None}, type=struct_type),
]
],
}
)
assert table.to_arrow() == expected
def test_add_subschema(mem_db: DBConnection):
schema = pa.schema(

View File

@@ -17,7 +17,8 @@ use pyo3::{
use pyo3_async_runtimes::tokio::future_into_py;
use crate::{
error::PythonErrorExt, storage_options::py_object_to_storage_options_provider, table::Table,
error::PythonErrorExt, namespace::extract_namespace_arc,
storage_options::py_object_to_storage_options_provider, table::Table,
};
#[pyclass]
@@ -182,7 +183,8 @@ impl Connection {
})
}
#[pyo3(signature = (name, namespace=vec![], storage_options = None, storage_options_provider=None, index_cache_size = None, location=None))]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (name, namespace=vec![], storage_options = None, storage_options_provider=None, index_cache_size = None, location=None, namespace_client=None, managed_versioning=None))]
pub fn open_table(
self_: PyRef<'_, Self>,
name: String,
@@ -191,11 +193,13 @@ impl Connection {
storage_options_provider: Option<Py<PyAny>>,
index_cache_size: Option<u32>,
location: Option<String>,
namespace_client: Option<Py<PyAny>>,
managed_versioning: Option<bool>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
let mut builder = inner.open_table(name);
builder = builder.namespace(namespace);
builder = builder.namespace(namespace.clone());
if let Some(storage_options) = storage_options {
builder = builder.storage_options(storage_options);
}
@@ -209,6 +213,20 @@ impl Connection {
if let Some(location) = location {
builder = builder.location(location);
}
// Extract namespace client from Python object if provided
let ns_client = if let Some(ns_obj) = namespace_client {
let py = self_.py();
Some(extract_namespace_arc(py, ns_obj)?)
} else {
None
};
if let Some(ns_client) = ns_client {
builder = builder.namespace_client(ns_client);
}
// Pass managed_versioning if provided to avoid redundant describe_table call
if let Some(enabled) = managed_versioning {
builder = builder.managed_versioning(enabled);
}
future_into_py(self_.py(), async move {
let table = builder.execute().await.infer_error()?;
@@ -506,7 +524,6 @@ pub struct PyClientConfig {
id_delimiter: Option<String>,
tls_config: Option<PyClientTlsConfig>,
header_provider: Option<Py<PyAny>>,
mem_wal_enabled: Option<bool>,
}
#[derive(FromPyObject)]
@@ -591,7 +608,6 @@ impl From<PyClientConfig> for lancedb::remote::ClientConfig {
id_delimiter: value.id_delimiter,
tls_config: value.tls_config.map(Into::into),
header_provider,
mem_wal_enabled: value.mem_wal_enabled,
}
}
}

View File

@@ -23,6 +23,7 @@ pub mod connection;
pub mod error;
pub mod header;
pub mod index;
pub mod namespace;
pub mod permutation;
pub mod query;
pub mod session;

746
python/src/namespace.rs Normal file
View File

@@ -0,0 +1,746 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Namespace utilities for Python bindings
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use lance_namespace::LanceNamespace as LanceNamespaceTrait;
use lance_namespace::models::*;
use pyo3::prelude::*;
use pyo3::types::PyDict;
/// Wrapper that allows any Python object implementing LanceNamespace protocol
/// to be used as a Rust LanceNamespace.
///
/// This is similar to PyLanceNamespace in lance's Python bindings - it wraps a Python
/// object and calls back into Python when namespace methods are invoked.
pub struct PyLanceNamespace {
py_namespace: Arc<Py<PyAny>>,
namespace_id: String,
}
impl PyLanceNamespace {
/// Create a new PyLanceNamespace wrapper around a Python namespace object.
pub fn new(_py: Python<'_>, py_namespace: &Bound<'_, PyAny>) -> PyResult<Self> {
let namespace_id = py_namespace
.call_method0("namespace_id")?
.extract::<String>()?;
Ok(Self {
py_namespace: Arc::new(py_namespace.clone().unbind()),
namespace_id,
})
}
/// Create an Arc<dyn LanceNamespace> from a Python namespace object.
pub fn create_arc(
py: Python<'_>,
py_namespace: &Bound<'_, PyAny>,
) -> PyResult<Arc<dyn LanceNamespaceTrait>> {
let wrapper = Self::new(py, py_namespace)?;
Ok(Arc::new(wrapper))
}
}
impl std::fmt::Debug for PyLanceNamespace {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PyLanceNamespace {{ id: {} }}", self.namespace_id)
}
}
/// Get or create the DictWithModelDump class in Python.
/// This class acts like a dict but also has model_dump() method.
/// This allows it to work with both:
/// - depythonize (which expects a dict/Mapping)
/// - Python code that calls .model_dump() (like DirectoryNamespace wrapper)
fn get_dict_with_model_dump_class(py: Python<'_>) -> PyResult<Bound<'_, PyAny>> {
// Use a module-level cache via __builtins__
let builtins = py.import("builtins")?;
if builtins.hasattr("_DictWithModelDump")? {
return builtins.getattr("_DictWithModelDump");
}
// Create the class using exec
let locals = PyDict::new(py);
py.run(
c"class DictWithModelDump(dict):
def model_dump(self):
return dict(self)",
None,
Some(&locals),
)?;
let class = locals.get_item("DictWithModelDump")?.ok_or_else(|| {
pyo3::exceptions::PyRuntimeError::new_err("Failed to create DictWithModelDump class")
})?;
// Cache it
builtins.setattr("_DictWithModelDump", &class)?;
Ok(class)
}
/// Helper to call a Python namespace method with JSON serialization.
/// For methods that take a request and return a response.
/// Uses DictWithModelDump to pass a dict that also has model_dump() method,
/// making it compatible with both depythonize and Python wrappers.
async fn call_py_method<Req, Resp>(
py_namespace: Arc<Py<PyAny>>,
method_name: &'static str,
request: Req,
) -> lance_core::Result<Resp>
where
Req: serde::Serialize + Send + 'static,
Resp: serde::de::DeserializeOwned + Send + 'static,
{
let request_json = serde_json::to_string(&request).map_err(|e| {
lance_core::Error::io(
format!("Failed to serialize request for {}: {}", method_name, e),
Default::default(),
)
})?;
let response_json = tokio::task::spawn_blocking(move || {
Python::attach(|py| {
let json_module = py.import("json")?;
let request_dict = json_module.call_method1("loads", (&request_json,))?;
// Wrap dict in DictWithModelDump so it works with both depythonize and .model_dump()
let dict_class = get_dict_with_model_dump_class(py)?;
let request_arg = dict_class.call1((request_dict,))?;
// Call the Python method
let result = py_namespace.call_method1(py, method_name, (request_arg,))?;
// Convert response to dict, then to JSON
// Pydantic models have model_dump() method
let result_dict = if result.bind(py).hasattr("model_dump")? {
result.call_method0(py, "model_dump")?
} else {
result
};
let response_json: String = json_module
.call_method1("dumps", (result_dict,))?
.extract()?;
Ok::<_, PyErr>(response_json)
})
})
.await
.map_err(|e| {
lance_core::Error::io(
format!("Task join error for {}: {}", method_name, e),
Default::default(),
)
})?
.map_err(|e: PyErr| {
lance_core::Error::io(
format!("Python error in {}: {}", method_name, e),
Default::default(),
)
})?;
serde_json::from_str(&response_json).map_err(|e| {
lance_core::Error::io(
format!("Failed to deserialize response from {}: {}", method_name, e),
Default::default(),
)
})
}
/// Helper for methods that return () on success
async fn call_py_method_unit<Req>(
py_namespace: Arc<Py<PyAny>>,
method_name: &'static str,
request: Req,
) -> lance_core::Result<()>
where
Req: serde::Serialize + Send + 'static,
{
let request_json = serde_json::to_string(&request).map_err(|e| {
lance_core::Error::io(
format!("Failed to serialize request for {}: {}", method_name, e),
Default::default(),
)
})?;
tokio::task::spawn_blocking(move || {
Python::attach(|py| {
let json_module = py.import("json")?;
let request_dict = json_module.call_method1("loads", (&request_json,))?;
// Wrap dict in DictWithModelDump
let dict_class = get_dict_with_model_dump_class(py)?;
let request_arg = dict_class.call1((request_dict,))?;
// Call the Python method
py_namespace.call_method1(py, method_name, (request_arg,))?;
Ok::<_, PyErr>(())
})
})
.await
.map_err(|e| {
lance_core::Error::io(
format!("Task join error for {}: {}", method_name, e),
Default::default(),
)
})?
.map_err(|e: PyErr| {
lance_core::Error::io(
format!("Python error in {}: {}", method_name, e),
Default::default(),
)
})
}
/// Helper for methods that return a primitive type
async fn call_py_method_primitive<Req, Resp>(
py_namespace: Arc<Py<PyAny>>,
method_name: &'static str,
request: Req,
) -> lance_core::Result<Resp>
where
Req: serde::Serialize + Send + 'static,
Resp: for<'py> pyo3::FromPyObject<'py> + Send + 'static,
{
let request_json = serde_json::to_string(&request).map_err(|e| {
lance_core::Error::io(
format!("Failed to serialize request for {}: {}", method_name, e),
Default::default(),
)
})?;
tokio::task::spawn_blocking(move || {
Python::attach(|py| {
let json_module = py.import("json")?;
let request_dict = json_module.call_method1("loads", (&request_json,))?;
// Wrap dict in DictWithModelDump
let dict_class = get_dict_with_model_dump_class(py)?;
let request_arg = dict_class.call1((request_dict,))?;
// Call the Python method
let result = py_namespace.call_method1(py, method_name, (request_arg,))?;
let value: Resp = result.extract(py)?;
Ok::<_, PyErr>(value)
})
})
.await
.map_err(|e| {
lance_core::Error::io(
format!("Task join error for {}: {}", method_name, e),
Default::default(),
)
})?
.map_err(|e: PyErr| {
lance_core::Error::io(
format!("Python error in {}: {}", method_name, e),
Default::default(),
)
})
}
/// Helper for methods that return Bytes
async fn call_py_method_bytes<Req>(
py_namespace: Arc<Py<PyAny>>,
method_name: &'static str,
request: Req,
) -> lance_core::Result<Bytes>
where
Req: serde::Serialize + Send + 'static,
{
let request_json = serde_json::to_string(&request).map_err(|e| {
lance_core::Error::io(
format!("Failed to serialize request for {}: {}", method_name, e),
Default::default(),
)
})?;
tokio::task::spawn_blocking(move || {
Python::attach(|py| {
let json_module = py.import("json")?;
let request_dict = json_module.call_method1("loads", (&request_json,))?;
// Wrap dict in DictWithModelDump
let dict_class = get_dict_with_model_dump_class(py)?;
let request_arg = dict_class.call1((request_dict,))?;
// Call the Python method
let result = py_namespace.call_method1(py, method_name, (request_arg,))?;
let bytes_data: Vec<u8> = result.extract(py)?;
Ok::<_, PyErr>(Bytes::from(bytes_data))
})
})
.await
.map_err(|e| {
lance_core::Error::io(
format!("Task join error for {}: {}", method_name, e),
Default::default(),
)
})?
.map_err(|e: PyErr| {
lance_core::Error::io(
format!("Python error in {}: {}", method_name, e),
Default::default(),
)
})
}
/// Helper for methods that take request + data and return a response
async fn call_py_method_with_data<Req, Resp>(
py_namespace: Arc<Py<PyAny>>,
method_name: &'static str,
request: Req,
data: Bytes,
) -> lance_core::Result<Resp>
where
Req: serde::Serialize + Send + 'static,
Resp: serde::de::DeserializeOwned + Send + 'static,
{
let request_json = serde_json::to_string(&request).map_err(|e| {
lance_core::Error::io(
format!("Failed to serialize request for {}: {}", method_name, e),
Default::default(),
)
})?;
let response_json = tokio::task::spawn_blocking(move || {
Python::attach(|py| {
let json_module = py.import("json")?;
let request_dict = json_module.call_method1("loads", (&request_json,))?;
// Wrap dict in DictWithModelDump
let dict_class = get_dict_with_model_dump_class(py)?;
let request_arg = dict_class.call1((request_dict,))?;
// Pass request and bytes to Python method
let py_bytes = pyo3::types::PyBytes::new(py, &data);
let result = py_namespace.call_method1(py, method_name, (request_arg, py_bytes))?;
// Convert response dict to JSON
let response_json: String = json_module.call_method1("dumps", (result,))?.extract()?;
Ok::<_, PyErr>(response_json)
})
})
.await
.map_err(|e| {
lance_core::Error::io(
format!("Task join error for {}: {}", method_name, e),
Default::default(),
)
})?
.map_err(|e: PyErr| {
lance_core::Error::io(
format!("Python error in {}: {}", method_name, e),
Default::default(),
)
})?;
serde_json::from_str(&response_json).map_err(|e| {
lance_core::Error::io(
format!("Failed to deserialize response from {}: {}", method_name, e),
Default::default(),
)
})
}
#[async_trait]
impl LanceNamespaceTrait for PyLanceNamespace {
fn namespace_id(&self) -> String {
self.namespace_id.clone()
}
async fn list_namespaces(
&self,
request: ListNamespacesRequest,
) -> lance_core::Result<ListNamespacesResponse> {
call_py_method(self.py_namespace.clone(), "list_namespaces", request).await
}
async fn describe_namespace(
&self,
request: DescribeNamespaceRequest,
) -> lance_core::Result<DescribeNamespaceResponse> {
call_py_method(self.py_namespace.clone(), "describe_namespace", request).await
}
async fn create_namespace(
&self,
request: CreateNamespaceRequest,
) -> lance_core::Result<CreateNamespaceResponse> {
call_py_method(self.py_namespace.clone(), "create_namespace", request).await
}
async fn drop_namespace(
&self,
request: DropNamespaceRequest,
) -> lance_core::Result<DropNamespaceResponse> {
call_py_method(self.py_namespace.clone(), "drop_namespace", request).await
}
async fn namespace_exists(&self, request: NamespaceExistsRequest) -> lance_core::Result<()> {
call_py_method_unit(self.py_namespace.clone(), "namespace_exists", request).await
}
async fn list_tables(
&self,
request: ListTablesRequest,
) -> lance_core::Result<ListTablesResponse> {
call_py_method(self.py_namespace.clone(), "list_tables", request).await
}
async fn describe_table(
&self,
request: DescribeTableRequest,
) -> lance_core::Result<DescribeTableResponse> {
call_py_method(self.py_namespace.clone(), "describe_table", request).await
}
async fn register_table(
&self,
request: RegisterTableRequest,
) -> lance_core::Result<RegisterTableResponse> {
call_py_method(self.py_namespace.clone(), "register_table", request).await
}
async fn table_exists(&self, request: TableExistsRequest) -> lance_core::Result<()> {
call_py_method_unit(self.py_namespace.clone(), "table_exists", request).await
}
async fn drop_table(&self, request: DropTableRequest) -> lance_core::Result<DropTableResponse> {
call_py_method(self.py_namespace.clone(), "drop_table", request).await
}
async fn deregister_table(
&self,
request: DeregisterTableRequest,
) -> lance_core::Result<DeregisterTableResponse> {
call_py_method(self.py_namespace.clone(), "deregister_table", request).await
}
async fn count_table_rows(&self, request: CountTableRowsRequest) -> lance_core::Result<i64> {
call_py_method_primitive(self.py_namespace.clone(), "count_table_rows", request).await
}
async fn create_table(
&self,
request: CreateTableRequest,
request_data: Bytes,
) -> lance_core::Result<CreateTableResponse> {
call_py_method_with_data(
self.py_namespace.clone(),
"create_table",
request,
request_data,
)
.await
}
async fn declare_table(
&self,
request: DeclareTableRequest,
) -> lance_core::Result<DeclareTableResponse> {
call_py_method(self.py_namespace.clone(), "declare_table", request).await
}
async fn insert_into_table(
&self,
request: InsertIntoTableRequest,
request_data: Bytes,
) -> lance_core::Result<InsertIntoTableResponse> {
call_py_method_with_data(
self.py_namespace.clone(),
"insert_into_table",
request,
request_data,
)
.await
}
async fn merge_insert_into_table(
&self,
request: MergeInsertIntoTableRequest,
request_data: Bytes,
) -> lance_core::Result<MergeInsertIntoTableResponse> {
call_py_method_with_data(
self.py_namespace.clone(),
"merge_insert_into_table",
request,
request_data,
)
.await
}
async fn update_table(
&self,
request: UpdateTableRequest,
) -> lance_core::Result<UpdateTableResponse> {
call_py_method(self.py_namespace.clone(), "update_table", request).await
}
async fn delete_from_table(
&self,
request: DeleteFromTableRequest,
) -> lance_core::Result<DeleteFromTableResponse> {
call_py_method(self.py_namespace.clone(), "delete_from_table", request).await
}
async fn query_table(&self, request: QueryTableRequest) -> lance_core::Result<Bytes> {
call_py_method_bytes(self.py_namespace.clone(), "query_table", request).await
}
async fn create_table_index(
&self,
request: CreateTableIndexRequest,
) -> lance_core::Result<CreateTableIndexResponse> {
call_py_method(self.py_namespace.clone(), "create_table_index", request).await
}
async fn list_table_indices(
&self,
request: ListTableIndicesRequest,
) -> lance_core::Result<ListTableIndicesResponse> {
call_py_method(self.py_namespace.clone(), "list_table_indices", request).await
}
async fn describe_table_index_stats(
&self,
request: DescribeTableIndexStatsRequest,
) -> lance_core::Result<DescribeTableIndexStatsResponse> {
call_py_method(
self.py_namespace.clone(),
"describe_table_index_stats",
request,
)
.await
}
async fn describe_transaction(
&self,
request: DescribeTransactionRequest,
) -> lance_core::Result<DescribeTransactionResponse> {
call_py_method(self.py_namespace.clone(), "describe_transaction", request).await
}
async fn alter_transaction(
&self,
request: AlterTransactionRequest,
) -> lance_core::Result<AlterTransactionResponse> {
call_py_method(self.py_namespace.clone(), "alter_transaction", request).await
}
async fn create_table_scalar_index(
&self,
request: CreateTableIndexRequest,
) -> lance_core::Result<CreateTableScalarIndexResponse> {
call_py_method(
self.py_namespace.clone(),
"create_table_scalar_index",
request,
)
.await
}
async fn drop_table_index(
&self,
request: DropTableIndexRequest,
) -> lance_core::Result<DropTableIndexResponse> {
call_py_method(self.py_namespace.clone(), "drop_table_index", request).await
}
async fn list_all_tables(
&self,
request: ListTablesRequest,
) -> lance_core::Result<ListTablesResponse> {
call_py_method(self.py_namespace.clone(), "list_all_tables", request).await
}
async fn restore_table(
&self,
request: RestoreTableRequest,
) -> lance_core::Result<RestoreTableResponse> {
call_py_method(self.py_namespace.clone(), "restore_table", request).await
}
async fn rename_table(
&self,
request: RenameTableRequest,
) -> lance_core::Result<RenameTableResponse> {
call_py_method(self.py_namespace.clone(), "rename_table", request).await
}
async fn list_table_versions(
&self,
request: ListTableVersionsRequest,
) -> lance_core::Result<ListTableVersionsResponse> {
call_py_method(self.py_namespace.clone(), "list_table_versions", request).await
}
async fn create_table_version(
&self,
request: CreateTableVersionRequest,
) -> lance_core::Result<CreateTableVersionResponse> {
call_py_method(self.py_namespace.clone(), "create_table_version", request).await
}
async fn describe_table_version(
&self,
request: DescribeTableVersionRequest,
) -> lance_core::Result<DescribeTableVersionResponse> {
call_py_method(self.py_namespace.clone(), "describe_table_version", request).await
}
async fn batch_delete_table_versions(
&self,
request: BatchDeleteTableVersionsRequest,
) -> lance_core::Result<BatchDeleteTableVersionsResponse> {
call_py_method(
self.py_namespace.clone(),
"batch_delete_table_versions",
request,
)
.await
}
async fn update_table_schema_metadata(
&self,
request: UpdateTableSchemaMetadataRequest,
) -> lance_core::Result<UpdateTableSchemaMetadataResponse> {
call_py_method(
self.py_namespace.clone(),
"update_table_schema_metadata",
request,
)
.await
}
async fn get_table_stats(
&self,
request: GetTableStatsRequest,
) -> lance_core::Result<GetTableStatsResponse> {
call_py_method(self.py_namespace.clone(), "get_table_stats", request).await
}
async fn explain_table_query_plan(
&self,
request: ExplainTableQueryPlanRequest,
) -> lance_core::Result<String> {
call_py_method_primitive(
self.py_namespace.clone(),
"explain_table_query_plan",
request,
)
.await
}
async fn analyze_table_query_plan(
&self,
request: AnalyzeTableQueryPlanRequest,
) -> lance_core::Result<String> {
call_py_method_primitive(
self.py_namespace.clone(),
"analyze_table_query_plan",
request,
)
.await
}
async fn alter_table_add_columns(
&self,
request: AlterTableAddColumnsRequest,
) -> lance_core::Result<AlterTableAddColumnsResponse> {
call_py_method(
self.py_namespace.clone(),
"alter_table_add_columns",
request,
)
.await
}
async fn alter_table_alter_columns(
&self,
request: AlterTableAlterColumnsRequest,
) -> lance_core::Result<AlterTableAlterColumnsResponse> {
call_py_method(
self.py_namespace.clone(),
"alter_table_alter_columns",
request,
)
.await
}
async fn alter_table_drop_columns(
&self,
request: AlterTableDropColumnsRequest,
) -> lance_core::Result<AlterTableDropColumnsResponse> {
call_py_method(
self.py_namespace.clone(),
"alter_table_drop_columns",
request,
)
.await
}
async fn list_table_tags(
&self,
request: ListTableTagsRequest,
) -> lance_core::Result<ListTableTagsResponse> {
call_py_method(self.py_namespace.clone(), "list_table_tags", request).await
}
async fn create_table_tag(
&self,
request: CreateTableTagRequest,
) -> lance_core::Result<CreateTableTagResponse> {
call_py_method(self.py_namespace.clone(), "create_table_tag", request).await
}
async fn delete_table_tag(
&self,
request: DeleteTableTagRequest,
) -> lance_core::Result<DeleteTableTagResponse> {
call_py_method(self.py_namespace.clone(), "delete_table_tag", request).await
}
async fn update_table_tag(
&self,
request: UpdateTableTagRequest,
) -> lance_core::Result<UpdateTableTagResponse> {
call_py_method(self.py_namespace.clone(), "update_table_tag", request).await
}
async fn get_table_tag_version(
&self,
request: GetTableTagVersionRequest,
) -> lance_core::Result<GetTableTagVersionResponse> {
call_py_method(self.py_namespace.clone(), "get_table_tag_version", request).await
}
}
/// Convert Python dict to HashMap<String, String>
#[allow(dead_code)]
fn dict_to_hashmap(dict: &Bound<'_, PyDict>) -> PyResult<HashMap<String, String>> {
let mut map = HashMap::new();
for (key, value) in dict.iter() {
let key_str: String = key.extract()?;
let value_str: String = value.extract()?;
map.insert(key_str, value_str);
}
Ok(map)
}
/// Extract an Arc<dyn LanceNamespace> from a Python namespace object.
///
/// This function wraps any Python namespace object with PyLanceNamespace.
/// The PyLanceNamespace wrapper uses DictWithModelDump to pass requests,
/// which works with both:
/// - Native namespaces (DirectoryNamespace, RestNamespace) that use depythonize (expects dict)
/// - Custom Python implementations that call .model_dump() on the request
pub fn extract_namespace_arc(
py: Python<'_>,
ns: Py<PyAny>,
) -> PyResult<Arc<dyn LanceNamespaceTrait>> {
let ns_ref = ns.bind(py);
PyLanceNamespace::create_arc(py, ns_ref)
}

View File

@@ -66,10 +66,13 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
.inner
.bind(py)
.call_method0("fetch_storage_options")
.map_err(|e| lance_core::Error::io_source(Box::new(std::io::Error::other(format!(
"Failed to call fetch_storage_options: {}",
e
)))))?;
.map_err(|e| lance_core::Error::IO {
source: Box::new(std::io::Error::other(format!(
"Failed to call fetch_storage_options: {}",
e
))),
location: snafu::location!(),
})?;
// If result is None, return None
if result.is_none() {
@@ -78,19 +81,26 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
// Extract the result dict - should be a flat Map<String, String>
let result_dict = result.downcast::<PyDict>().map_err(|_| {
lance_core::Error::invalid_input(
"fetch_storage_options() must return a dict of string key-value pairs or None",
)
lance_core::Error::InvalidInput {
source: "fetch_storage_options() must return None or a dict of string key-value pairs".into(),
location: snafu::location!(),
}
})?;
// Convert all entries to HashMap<String, String>
let mut storage_options = HashMap::new();
for (key, value) in result_dict.iter() {
let key_str: String = key.extract().map_err(|e| {
lance_core::Error::invalid_input(format!("Storage option key must be a string: {}", e))
lance_core::Error::InvalidInput {
source: format!("Storage option key must be a string: {}", e).into(),
location: snafu::location!(),
}
})?;
let value_str: String = value.extract().map_err(|e| {
lance_core::Error::invalid_input(format!("Storage option value must be a string: {}", e))
lance_core::Error::InvalidInput {
source: format!("Storage option value must be a string: {}", e).into(),
location: snafu::location!(),
}
})?;
storage_options.insert(key_str, value_str);
}
@@ -99,10 +109,13 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
})
})
.await
.map_err(|e| lance_core::Error::io_source(Box::new(std::io::Error::other(format!(
"Task join error: {}",
e
)))))?
.map_err(|e| lance_core::Error::IO {
source: Box::new(std::io::Error::other(format!(
"Task join error: {}",
e
))),
location: snafu::location!(),
})?
}
fn provider_id(&self) -> String {

View File

@@ -710,9 +710,6 @@ impl Table {
if let Some(use_index) = parameters.use_index {
builder.use_index(use_index);
}
if let Some(mem_wal) = parameters.mem_wal {
builder.mem_wal(mem_wal);
}
future_into_py(self_.py(), async move {
let res = builder.execute(Box::new(batches)).await.infer_error()?;
@@ -873,7 +870,6 @@ pub struct MergeInsertParams {
when_not_matched_by_source_condition: Option<String>,
timeout: Option<std::time::Duration>,
use_index: Option<bool>,
mem_wal: Option<bool>,
}
#[pyclass]

4
python/uv.lock generated
View File

@@ -2006,7 +2006,7 @@ requires-dist = [
{ name = "botocore", marker = "extra == 'embeddings'", specifier = ">=1.31.57" },
{ name = "cohere", marker = "extra == 'embeddings'" },
{ name = "colpali-engine", marker = "extra == 'embeddings'", specifier = ">=0.3.10" },
{ name = "datafusion", marker = "extra == 'tests'" },
{ name = "datafusion", marker = "extra == 'tests'", specifier = "<52" },
{ name = "deprecation" },
{ name = "duckdb", marker = "extra == 'tests'" },
{ name = "google-generativeai", marker = "extra == 'embeddings'" },
@@ -2035,7 +2035,7 @@ requires-dist = [
{ name = "pyarrow-stubs", marker = "extra == 'tests'" },
{ name = "pydantic", specifier = ">=1.10" },
{ name = "pylance", marker = "extra == 'pylance'", specifier = ">=1.0.0b14" },
{ name = "pylance", marker = "extra == 'tests'", specifier = ">=1.0.0b14" },
{ name = "pylance", marker = "extra == 'tests'", specifier = ">=1.0.0b14,<3.0.0" },
{ name = "pyright", marker = "extra == 'dev'" },
{ name = "pytest", marker = "extra == 'tests'" },
{ name = "pytest-asyncio", marker = "extra == 'tests'" },

View File

@@ -136,6 +136,7 @@ impl OpenTableBuilder {
lance_read_params: None,
location: None,
namespace_client: None,
managed_versioning: None,
},
embedding_registry,
}
@@ -235,6 +236,29 @@ impl OpenTableBuilder {
self
}
/// Set a namespace client for managed versioning support.
///
/// When a namespace client is provided and the table has `managed_versioning` enabled,
/// the table will use the namespace's commit handler to notify the namespace of
/// version changes. This enables features like event emission for table modifications.
pub fn namespace_client(mut self, client: Arc<dyn lance_namespace::LanceNamespace>) -> Self {
self.request.namespace_client = Some(client);
self
}
/// Set whether managed versioning is enabled for this table.
///
/// When set to `Some(true)`, the table will use namespace-managed commits.
/// When set to `Some(false)`, the table will use local commits even if namespace_client is set.
/// When set to `None` (default), the value will be fetched from the namespace if namespace_client is set.
///
/// This is typically set when the caller has already queried the namespace and knows the
/// managed_versioning status, avoiding a redundant describe_table call.
pub fn managed_versioning(mut self, enabled: bool) -> Self {
self.request.managed_versioning = Some(enabled);
self
}
/// Open the table
pub async fn execute(self) -> Result<Table> {
let table = self.parent.open_table(self.request).await?;
@@ -294,6 +318,12 @@ impl CloneTableBuilder {
self
}
/// Set a namespace client for managed versioning support.
pub fn namespace_client(mut self, client: Arc<dyn lance_namespace::LanceNamespace>) -> Self {
self.request.namespace_client = Some(client);
self
}
/// Execute the clone operation
pub async fn execute(self) -> Result<Table> {
let parent = self.parent.clone();
@@ -784,19 +814,13 @@ impl ConnectBuilder {
message: "An api_key is required when connecting to LanceDb Cloud".to_string(),
})?;
// Propagate mem_wal_enabled from options to client_config
let mut client_config = self.request.client_config;
if options.mem_wal_enabled.is_some() {
client_config.mem_wal_enabled = options.mem_wal_enabled;
}
let storage_options = StorageOptions(options.storage_options.clone());
let internal = Arc::new(crate::remote::db::RemoteDatabase::try_new(
&self.request.uri,
&api_key,
&region,
options.host_override,
client_config,
self.request.client_config,
storage_options.into(),
)?);
Ok(Connection {

View File

@@ -66,6 +66,10 @@ pub struct OpenTableRequest {
/// Optional namespace client for server-side query execution.
/// When set, queries will be executed on the namespace server instead of locally.
pub namespace_client: Option<Arc<dyn LanceNamespace>>,
/// Whether managed versioning is enabled for this table.
/// When Some(true), the table will use namespace-managed commits instead of local commits.
/// When None and namespace_client is provided, the value will be fetched from the namespace.
pub managed_versioning: Option<bool>,
}
impl std::fmt::Debug for OpenTableRequest {
@@ -77,6 +81,7 @@ impl std::fmt::Debug for OpenTableRequest {
.field("lance_read_params", &self.lance_read_params)
.field("location", &self.location)
.field("namespace_client", &self.namespace_client)
.field("managed_versioning", &self.managed_versioning)
.finish()
}
}
@@ -161,6 +166,9 @@ pub struct CloneTableRequest {
/// Whether to perform a shallow clone (true) or deep clone (false). Defaults to true.
/// Currently only shallow clone is supported.
pub is_shallow: bool,
/// Optional namespace client for managed versioning support.
/// When set, enables the commit handler to track table versions through the namespace.
pub namespace_client: Option<Arc<dyn LanceNamespace>>,
}
impl CloneTableRequest {
@@ -172,6 +180,7 @@ impl CloneTableRequest {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
}
}
}

View File

@@ -669,6 +669,7 @@ impl ListingDatabase {
lance_read_params: None,
location: None,
namespace_client: None,
managed_versioning: None,
};
let req = (callback)(req);
let table = self.open_table(req).await?;
@@ -869,6 +870,7 @@ impl Database for ListingDatabase {
Some(write_params),
self.read_consistency_interval,
request.namespace_client,
false, // server_side_query_enabled - listing database doesn't support server-side queries
)
.await
{
@@ -946,7 +948,9 @@ impl Database for ListingDatabase {
self.store_wrapper.clone(),
None,
self.read_consistency_interval,
None,
request.namespace_client,
false, // server_side_query_enabled - listing database doesn't support server-side queries
None, // managed_versioning - will be queried if namespace_client is provided
)
.await?;
@@ -1022,6 +1026,8 @@ impl Database for ListingDatabase {
Some(read_params),
self.read_consistency_interval,
request.namespace_client,
false, // server_side_query_enabled - listing database doesn't support server-side queries
request.managed_versioning, // Pass through managed_versioning from request
)
.await?,
);
@@ -1162,6 +1168,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();
@@ -1222,6 +1229,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();
@@ -1281,6 +1289,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await;
@@ -1317,6 +1326,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: false, // Request deep clone
namespace_client: None,
})
.await;
@@ -1357,6 +1367,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await;
@@ -1397,6 +1408,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await;
@@ -1416,6 +1428,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await;
@@ -1452,6 +1465,7 @@ mod tests {
source_version: Some(1),
source_tag: Some("v1.0".to_string()),
is_shallow: true,
namespace_client: None,
})
.await;
@@ -1525,6 +1539,7 @@ mod tests {
source_version: Some(initial_version),
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();
@@ -1603,6 +1618,7 @@ mod tests {
source_version: None,
source_tag: Some("v1.0".to_string()),
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();
@@ -1654,6 +1670,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();
@@ -1746,6 +1763,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();

View File

@@ -7,6 +7,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor};
use lance_namespace::{
LanceNamespace,
@@ -18,6 +19,8 @@ use lance_namespace::{
},
};
use lance_namespace_impls::ConnectBuilder;
use lance_table::io::commit::CommitHandler;
use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
use crate::database::ReadConsistency;
use crate::error::{Error, Result};
@@ -205,38 +208,55 @@ impl Database for LanceNamespaceDatabase {
let mut table_id = request.namespace.clone();
table_id.push(request.name.clone());
// Try declare_table first, falling back to create_empty_table for backwards
// compatibility with older namespace clients that don't support declare_table
let declare_request = DeclareTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
let (location, initial_storage_options) = {
let response = self.namespace.declare_table(declare_request).await?;
let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response".to_string(),
let response = self
.namespace
.declare_table(declare_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to declare table: {}", e),
})?;
// Use storage options from response, fall back to self.storage_options
let opts = response
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o| !o.is_empty());
(loc, opts)
};
let write_params = if let Some(storage_opts) = initial_storage_options {
let mut params = request.write_options.lance_write_params.unwrap_or_default();
let location = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response".to_string(),
})?;
// Use storage options from response, fall back to self.storage_options
let initial_storage_options = response
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o| !o.is_empty());
let managed_versioning = response.managed_versioning;
// Build write params with storage options and commit handler
let mut params = request.write_options.lance_write_params.unwrap_or_default();
// Set up storage options if provided
if let Some(storage_opts) = initial_storage_options {
let store_params = params
.store_params
.get_or_insert_with(ObjectStoreParams::default);
store_params.storage_options_accessor = Some(Arc::new(
StorageOptionsAccessor::with_static_options(storage_opts),
));
Some(params)
} else {
request.write_options.lance_write_params
};
}
// Set up commit handler when managed_versioning is enabled
if managed_versioning == Some(true) {
let external_store =
LanceNamespaceExternalManifestStore::new(self.namespace.clone(), table_id.clone());
let commit_handler: Arc<dyn CommitHandler> = Arc::new(ExternalManifestCommitHandler {
external_manifest_store: Arc::new(external_store),
});
params.commit_handler = Some(commit_handler);
}
let write_params = Some(params);
let native_table = NativeTable::create_from_namespace(
self.namespace.clone(),

View File

@@ -14,7 +14,6 @@ use crate::remote::db::RemoteOptions;
use crate::remote::retry::{ResolvedRetryConfig, RetryCounter};
const REQUEST_ID_HEADER: HeaderName = HeaderName::from_static("x-request-id");
const MEM_WAL_ENABLED_HEADER: HeaderName = HeaderName::from_static("x-lancedb-mem-wal-enabled");
/// Configuration for TLS/mTLS settings.
#[derive(Clone, Debug, Default)]
@@ -53,10 +52,6 @@ pub struct ClientConfig {
pub tls_config: Option<TlsConfig>,
/// Provider for custom headers to be added to each request
pub header_provider: Option<Arc<dyn HeaderProvider>>,
/// Enable MemWAL write path for streaming writes.
/// When true, write operations will use the MemWAL architecture
/// for high-performance streaming writes.
pub mem_wal_enabled: Option<bool>,
}
impl std::fmt::Debug for ClientConfig {
@@ -72,7 +67,6 @@ impl std::fmt::Debug for ClientConfig {
"header_provider",
&self.header_provider.as_ref().map(|_| "Some(...)"),
)
.field("mem_wal_enabled", &self.mem_wal_enabled)
.finish()
}
}
@@ -87,7 +81,6 @@ impl Default for ClientConfig {
id_delimiter: None,
tls_config: None,
header_provider: None,
mem_wal_enabled: None,
}
}
}
@@ -484,11 +477,6 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
);
}
// Add MemWAL header if enabled
if let Some(true) = config.mem_wal_enabled {
headers.insert(MEM_WAL_ENABLED_HEADER, HeaderValue::from_static("true"));
}
Ok(headers)
}

View File

@@ -78,7 +78,6 @@ pub const OPT_REMOTE_PREFIX: &str = "remote_database_";
pub const OPT_REMOTE_API_KEY: &str = "remote_database_api_key";
pub const OPT_REMOTE_REGION: &str = "remote_database_region";
pub const OPT_REMOTE_HOST_OVERRIDE: &str = "remote_database_host_override";
pub const OPT_REMOTE_MEM_WAL_ENABLED: &str = "remote_database_mem_wal_enabled";
// TODO: add support for configuring client config via key/value options
#[derive(Clone, Debug, Default)]
@@ -99,12 +98,6 @@ pub struct RemoteDatabaseOptions {
/// These options are only used for LanceDB Enterprise and only a subset of options
/// are supported.
pub storage_options: HashMap<String, String>,
/// Enable MemWAL write path for high-performance streaming writes.
///
/// When enabled, write operations (insert, merge_insert, etc.) will use
/// the MemWAL architecture which buffers writes in memory and Write-Ahead Log
/// before asynchronously merging to the base table.
pub mem_wal_enabled: Option<bool>,
}
impl RemoteDatabaseOptions {
@@ -116,9 +109,6 @@ impl RemoteDatabaseOptions {
let api_key = map.get(OPT_REMOTE_API_KEY).cloned();
let region = map.get(OPT_REMOTE_REGION).cloned();
let host_override = map.get(OPT_REMOTE_HOST_OVERRIDE).cloned();
let mem_wal_enabled = map
.get(OPT_REMOTE_MEM_WAL_ENABLED)
.map(|v| v.to_lowercase() == "true");
let storage_options = map
.iter()
.filter(|(key, _)| !key.starts_with(OPT_REMOTE_PREFIX))
@@ -129,7 +119,6 @@ impl RemoteDatabaseOptions {
region,
host_override,
storage_options,
mem_wal_enabled,
})
}
}
@@ -148,12 +137,6 @@ impl DatabaseOptions for RemoteDatabaseOptions {
if let Some(host_override) = &self.host_override {
map.insert(OPT_REMOTE_HOST_OVERRIDE.to_string(), host_override.clone());
}
if let Some(mem_wal_enabled) = &self.mem_wal_enabled {
map.insert(
OPT_REMOTE_MEM_WAL_ENABLED.to_string(),
mem_wal_enabled.to_string(),
);
}
}
}
@@ -198,20 +181,6 @@ impl RemoteDatabaseOptionsBuilder {
self.options.host_override = Some(host_override);
self
}
/// Enable MemWAL write path for high-performance streaming writes.
///
/// When enabled, write operations will use the MemWAL architecture
/// which buffers writes in memory and Write-Ahead Log before
/// asynchronously merging to the base table.
///
/// # Arguments
///
/// * `enabled` - Whether to enable MemWAL writes
pub fn mem_wal_enabled(mut self, enabled: bool) -> Self {
self.options.mem_wal_enabled = Some(enabled);
self
}
}
#[derive(Debug)]
@@ -495,6 +464,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
lance_read_params: None,
location: None,
namespace_client: None,
managed_versioning: None,
};
let req = (callback)(req);
self.open_table(req).await

View File

@@ -62,7 +62,6 @@ use std::time::Duration;
use tokio::sync::RwLock;
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
const MEM_WAL_ENABLED_HEADER: HeaderName = HeaderName::from_static("x-lancedb-mem-wal-enabled");
const METRIC_TYPE_KEY: &str = "metric_type";
const INDEX_TYPE_KEY: &str = "index_type";
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
@@ -1360,7 +1359,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
self.check_mutable().await?;
let timeout = params.timeout;
let mem_wal = params.mem_wal;
let query = MergeInsertRequest::try_from(params)?;
let mut request = self
@@ -1376,10 +1374,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
}
if mem_wal {
request = request.header(MEM_WAL_ENABLED_HEADER, "true");
}
let (request_id, response) = self.send_streaming(request, new_data, true).await?;
let response = self.check_table_response(&request_id, response).await?;

View File

@@ -34,9 +34,13 @@ use lance_index::vector::sq::builder::SQBuildParams;
use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsAccessor};
pub use query::AnyQuery;
use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
use lance_namespace::LanceNamespace;
use lance_namespace::models::DescribeTableRequest;
use lance_table::format::Manifest;
use lance_table::io::commit::CommitHandler;
use lance_table::io::commit::ManifestNamingScheme;
use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::format;
@@ -1212,10 +1216,13 @@ pub struct NativeTable {
// This comes from the connection options. We store here so we can pass down
// to the dataset when we recreate it (for example, in checkout_latest).
read_consistency_interval: Option<std::time::Duration>,
// Optional namespace client for server-side query execution.
// When set, queries will be executed on the namespace server instead of locally.
// pub (crate) namespace_client so query.rs can access the fields
// Optional namespace client for namespace operations (e.g., managed versioning).
// pub(crate) so query.rs can access the field for server-side query execution.
pub(crate) namespace_client: Option<Arc<dyn LanceNamespace>>,
// Whether to enable server-side query execution via the namespace client.
// When true and namespace_client is set, queries will be executed on the
// namespace server instead of locally.
pub(crate) server_side_query_enabled: bool,
}
impl std::fmt::Debug for NativeTable {
@@ -1227,6 +1234,7 @@ impl std::fmt::Debug for NativeTable {
.field("uri", &self.uri)
.field("read_consistency_interval", &self.read_consistency_interval)
.field("namespace_client", &self.namespace_client)
.field("server_side_query_enabled", &self.server_side_query_enabled)
.finish()
}
}
@@ -1263,7 +1271,7 @@ impl NativeTable {
/// * A [NativeTable] object.
pub async fn open(uri: &str) -> Result<Self> {
let name = Self::get_table_name(uri)?;
Self::open_with_params(uri, &name, vec![], None, None, None, None).await
Self::open_with_params(uri, &name, vec![], None, None, None, None, false, None).await
}
/// Opens an existing Table
@@ -1273,7 +1281,10 @@ impl NativeTable {
/// * `base_path` - The base path where the table is located
/// * `name` The Table name
/// * `params` The [ReadParams] to use when opening the table
/// * `namespace_client` - Optional namespace client for server-side query execution
/// * `namespace_client` - Optional namespace client for namespace operations
/// * `server_side_query_enabled` - Whether to enable server-side query execution
/// * `managed_versioning` - Whether managed versioning is enabled. If None and namespace_client
/// is provided, the value will be fetched via describe_table.
///
/// # Returns
///
@@ -1287,6 +1298,8 @@ impl NativeTable {
params: Option<ReadParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
server_side_query_enabled: bool,
managed_versioning: Option<bool>,
) -> Result<Self> {
let params = params.unwrap_or_default();
// patch the params if we have a write store wrapper
@@ -1295,17 +1308,54 @@ impl NativeTable {
None => params,
};
let dataset = DatasetBuilder::from_uri(uri)
.with_read_params(params)
.load()
.await
.map_err(|e| match e {
lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
name: name.to_string(),
source: Box::new(e),
},
e => e.into(),
})?;
// Build table_id from namespace + name
let mut table_id = namespace.clone();
table_id.push(name.to_string());
// Determine if managed_versioning is enabled
// Use the provided value if available, otherwise query the namespace
let managed_versioning = match managed_versioning {
Some(value) => value,
None if namespace_client.is_some() => {
let ns_client = namespace_client.as_ref().unwrap();
let describe_request = DescribeTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
let response = ns_client
.describe_table(describe_request)
.await
.map_err(|e| Error::Runtime {
message: format!(
"Failed to describe table via namespace client: {}. \
If you don't need managed versioning, don't pass namespace_client.",
e
),
})?;
response.managed_versioning == Some(true)
}
None => false,
};
let mut builder = DatasetBuilder::from_uri(uri).with_read_params(params);
// Set up commit handler when managed_versioning is enabled
if managed_versioning && let Some(ref ns_client) = namespace_client {
let external_store =
LanceNamespaceExternalManifestStore::new(ns_client.clone(), table_id.clone());
let commit_handler: Arc<dyn CommitHandler> = Arc::new(ExternalManifestCommitHandler {
external_manifest_store: Arc::new(external_store),
});
builder = builder.with_commit_handler(commit_handler);
}
let dataset = builder.load().await.map_err(|e| match e {
lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
name: name.to_string(),
source: Box::new(e),
},
e => e.into(),
})?;
let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
let id = Self::build_id(&namespace, name);
@@ -1318,6 +1368,7 @@ impl NativeTable {
dataset,
read_consistency_interval,
namespace_client,
server_side_query_enabled,
})
}
@@ -1421,6 +1472,7 @@ impl NativeTable {
dataset,
read_consistency_interval,
namespace_client: stored_namespace_client,
server_side_query_enabled,
})
}
@@ -1460,7 +1512,8 @@ impl NativeTable {
/// * `namespace` - The namespace path. When non-empty, an explicit URI must be provided.
/// * `batches` RecordBatch to be saved in the database.
/// * `params` - Write parameters.
/// * `namespace_client` - Optional namespace client for server-side query execution
/// * `namespace_client` - Optional namespace client for namespace operations
/// * `server_side_query_enabled` - Whether to enable server-side query execution
///
/// # Returns
///
@@ -1475,6 +1528,7 @@ impl NativeTable {
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
server_side_query_enabled: bool,
) -> Result<Self> {
// Default params uses format v1.
let params = params.unwrap_or(WriteParams {
@@ -1507,6 +1561,7 @@ impl NativeTable {
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
read_consistency_interval,
namespace_client,
server_side_query_enabled,
})
}
@@ -1520,6 +1575,7 @@ impl NativeTable {
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
server_side_query_enabled: bool,
) -> Result<Self> {
let data: Box<dyn Scannable> = Box::new(RecordBatch::new_empty(schema));
Self::create(
@@ -1531,6 +1587,7 @@ impl NativeTable {
params,
read_consistency_interval,
namespace_client,
server_side_query_enabled,
)
.await
}
@@ -1634,6 +1691,7 @@ impl NativeTable {
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
read_consistency_interval,
namespace_client: stored_namespace_client,
server_side_query_enabled,
})
}
@@ -2625,7 +2683,7 @@ mod tests {
vec![Ok(batch.clone())],
batch.schema(),
));
let table = NativeTable::create(uri, "test", vec![], reader, None, None, None, None)
let table = NativeTable::create(uri, "test", vec![], reader, None, None, None, None, false)
.await
.unwrap();

View File

@@ -3,12 +3,13 @@
use std::sync::Arc;
use arrow_cast::can_cast_types;
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema};
use datafusion::functions::core::{get_field, named_struct};
use datafusion_common::ScalarValue;
use datafusion_common::config::ConfigOptions;
use datafusion_physical_expr::ScalarFunctionExpr;
use datafusion_physical_expr::expressions::{Literal, cast};
use datafusion_physical_expr::expressions::{CastExpr, Literal};
use datafusion_physical_plan::expressions::Column;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};
@@ -25,12 +26,9 @@ pub fn cast_to_table_schema(
return Ok(input);
}
let exprs = build_field_exprs(
input_schema.fields(),
table_schema.fields(),
&|idx| Arc::new(Column::new(input_schema.field(idx).name(), idx)) as Arc<dyn PhysicalExpr>,
&input_schema,
)?;
let exprs = build_field_exprs(input_schema.fields(), table_schema.fields(), &|idx| {
Arc::new(Column::new(input_schema.field(idx).name(), idx)) as Arc<dyn PhysicalExpr>
})?;
let exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = exprs
.into_iter()
@@ -51,7 +49,6 @@ fn build_field_exprs(
input_fields: &Fields,
table_fields: &Fields,
get_input_expr: &dyn Fn(usize) -> Arc<dyn PhysicalExpr>,
input_schema: &Schema,
) -> Result<Vec<(Arc<dyn PhysicalExpr>, FieldRef)>> {
let config = Arc::new(ConfigOptions::default());
let mut result = Vec::new();
@@ -72,24 +69,19 @@ fn build_field_exprs(
(DataType::Struct(in_children), DataType::Struct(tbl_children))
if in_children != tbl_children =>
{
let sub_exprs = build_field_exprs(
in_children,
tbl_children,
&|child_idx| {
let child_name = in_children[child_idx].name();
Arc::new(ScalarFunctionExpr::new(
&format!("get_field({child_name})"),
get_field(),
vec![
input_expr.clone(),
Arc::new(Literal::new(ScalarValue::from(child_name.as_str()))),
],
Arc::new(in_children[child_idx].as_ref().clone()),
config.clone(),
)) as Arc<dyn PhysicalExpr>
},
input_schema,
)?;
let sub_exprs = build_field_exprs(in_children, tbl_children, &|child_idx| {
let child_name = in_children[child_idx].name();
Arc::new(ScalarFunctionExpr::new(
&format!("get_field({child_name})"),
get_field(),
vec![
input_expr.clone(),
Arc::new(Literal::new(ScalarValue::from(child_name.as_str()))),
],
Arc::new(in_children[child_idx].as_ref().clone()),
config.clone(),
)) as Arc<dyn PhysicalExpr>
})?;
let output_struct_fields: Fields = sub_exprs
.iter()
@@ -125,17 +117,21 @@ fn build_field_exprs(
// Types match: pass through.
(inp, tbl) if inp == tbl => input_expr,
// Types differ: cast.
_ => cast(input_expr, input_schema, table_field.data_type().clone()).map_err(|e| {
Error::InvalidInput {
// safe: false (the default) means overflow/truncation errors surface at execution time.
(_, _) if can_cast_types(input_field.data_type(), table_field.data_type()) => Arc::new(
CastExpr::new(input_expr, table_field.data_type().clone(), None),
)
as Arc<dyn PhysicalExpr>,
(inp, tbl) => {
return Err(Error::InvalidInput {
message: format!(
"cannot cast field '{}' from {} to {}: {}",
"cannot cast field '{}' from {} to {}",
table_field.name(),
input_field.data_type(),
table_field.data_type(),
e
inp,
tbl,
),
}
})?,
});
}
};
let output_field = Arc::new(Field::new(
@@ -153,10 +149,12 @@ fn build_field_exprs(
mod tests {
use std::sync::Arc;
use arrow::buffer::OffsetBuffer;
use arrow_array::{
Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, StructArray,
Array, Float32Array, Float64Array, Int32Array, Int64Array, ListArray, RecordBatch,
StringArray, StructArray, UInt32Array, UInt64Array,
};
use arrow_schema::{DataType, Field, Schema};
use arrow_schema::{DataType, Field, Fields, Schema};
use datafusion::prelude::SessionContext;
use datafusion_catalog::MemTable;
use futures::TryStreamExt;
@@ -495,4 +493,129 @@ mod tests {
assert_eq!(b.value(0), "hello");
assert_eq!(b.value(1), "world");
}
#[tokio::test]
async fn test_narrowing_numeric_cast_success() {
let input_batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::UInt64, false)])),
vec![Arc::new(UInt64Array::from(vec![1u64, 2, 3]))],
)
.unwrap();
let table_schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]);
let plan = plan_from_batch(input_batch).await;
let casted = cast_to_table_schema(plan, &table_schema).unwrap();
let result = collect(casted).await;
assert_eq!(result.schema().field(0).data_type(), &DataType::UInt32);
let a: &UInt32Array = result.column(0).as_any().downcast_ref().unwrap();
assert_eq!(a.values(), &[1u32, 2, 3]);
}
#[tokio::test]
async fn test_narrowing_numeric_cast_overflow_errors() {
let overflow_val = u32::MAX as u64 + 1;
let input_batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::UInt64, false)])),
vec![Arc::new(UInt64Array::from(vec![overflow_val]))],
)
.unwrap();
let table_schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]);
let plan = plan_from_batch(input_batch).await;
// Planning succeeds — the overflow is only detected at execution time.
let casted = cast_to_table_schema(plan, &table_schema).unwrap();
let ctx = SessionContext::new();
let stream = casted.execute(0, ctx.task_ctx()).unwrap();
let result: Result<Vec<RecordBatch>, _> = stream.try_collect().await;
assert!(result.is_err(), "expected overflow error at execution time");
}
#[tokio::test]
async fn test_list_struct_field_reorder() {
// list<struct<a: Int32, b: Int32>> → list<struct<b: Int64, a: Int64>>
// Tests both reordering (a,b → b,a) and element-type widening (Int32 → Int64).
let inner_fields: Fields = vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]
.into();
let struct_array = StructArray::from(vec![
(
Arc::new(inner_fields[0].as_ref().clone()),
Arc::new(Int32Array::from(vec![1, 3])) as _,
),
(
Arc::new(inner_fields[1].as_ref().clone()),
Arc::new(Int32Array::from(vec![2, 4])) as _,
),
]);
// Offsets: one list element containing two struct rows (0..2).
let offsets = OffsetBuffer::from_lengths(vec![2]);
let list_array = ListArray::try_new(
Arc::new(Field::new("item", DataType::Struct(inner_fields), true)),
offsets,
Arc::new(struct_array),
None,
)
.unwrap();
let input_batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"s_list",
list_array.data_type().clone(),
false,
)])),
vec![Arc::new(list_array)],
)
.unwrap();
let table_inner: Fields = vec![
Field::new("b", DataType::Int64, true),
Field::new("a", DataType::Int64, true),
]
.into();
let table_schema = Schema::new(vec![Field::new(
"s_list",
DataType::List(Arc::new(Field::new(
"item",
DataType::Struct(table_inner),
true,
))),
false,
)]);
let plan = plan_from_batch(input_batch).await;
let casted = cast_to_table_schema(plan, &table_schema).unwrap();
let result = collect(casted).await;
let list_col = result
.column(0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let struct_col = list_col
.values()
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
assert_eq!(struct_col.num_columns(), 2);
let b: &Int64Array = struct_col
.column_by_name("b")
.unwrap()
.as_any()
.downcast_ref()
.unwrap();
assert_eq!(b.values(), &[2, 4]);
let a: &Int64Array = struct_col
.column_by_name("a")
.unwrap()
.as_any()
.downcast_ref()
.unwrap();
assert_eq!(a.values(), &[1, 3]);
}
}

View File

@@ -1,3 +1,4 @@
use futures::FutureExt;
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use serde::{Deserialize, Serialize};
@@ -23,7 +24,7 @@ pub struct DeleteResult {
pub(crate) async fn execute_delete(table: &NativeTable, predicate: &str) -> Result<DeleteResult> {
table.dataset.ensure_mutable()?;
let mut dataset = (*table.dataset.get().await?).clone();
let delete_result = dataset.delete(predicate).await?;
let delete_result = dataset.delete(predicate).boxed().await?;
let num_deleted_rows = delete_result.num_deleted_rows;
let version = dataset.version().version;
table.dataset.update(dataset);

View File

@@ -55,7 +55,6 @@ pub struct MergeInsertBuilder {
pub(crate) when_not_matched_by_source_delete_filt: Option<String>,
pub(crate) timeout: Option<Duration>,
pub(crate) use_index: bool,
pub(crate) mem_wal: bool,
}
impl MergeInsertBuilder {
@@ -70,7 +69,6 @@ impl MergeInsertBuilder {
when_not_matched_by_source_delete_filt: None,
timeout: None,
use_index: true,
mem_wal: false,
}
}
@@ -150,65 +148,13 @@ impl MergeInsertBuilder {
self
}
/// Enables MemWAL (Memory Write-Ahead Log) mode for this merge insert operation.
///
/// When enabled, the merge insert will route data through a memory node service
/// that buffers writes before flushing to storage. This is only supported for
/// remote (LanceDB Cloud) tables.
///
/// If not set, defaults to `false`.
pub fn mem_wal(&mut self, enabled: bool) -> &mut Self {
self.mem_wal = enabled;
self
}
/// Executes the merge insert operation
///
/// Returns version and statistics about the merge operation including the number of rows
/// inserted, updated, and deleted.
pub async fn execute(self, new_data: Box<dyn RecordBatchReader + Send>) -> Result<MergeResult> {
// Validate MemWAL constraints before execution
if self.mem_wal {
self.validate_mem_wal_pattern()?;
}
self.table.clone().merge_insert(self, new_data).await
}
/// Validate that the merge insert pattern is supported by MemWAL.
///
/// MemWAL only supports the upsert pattern:
/// - when_matched_update_all (without filter)
/// - when_not_matched_insert_all
/// - NO when_not_matched_by_source_delete
fn validate_mem_wal_pattern(&self) -> Result<()> {
// Must have when_matched_update_all without filter
if !self.when_matched_update_all {
return Err(Error::InvalidInput {
message: "MemWAL requires when_matched_update_all() to be set".to_string(),
});
}
if self.when_matched_update_all_filt.is_some() {
return Err(Error::InvalidInput {
message: "MemWAL does not support conditional when_matched_update_all (no filter allowed)".to_string(),
});
}
// Must have when_not_matched_insert_all
if !self.when_not_matched_insert_all {
return Err(Error::InvalidInput {
message: "MemWAL requires when_not_matched_insert_all() to be set".to_string(),
});
}
// Must NOT have when_not_matched_by_source_delete
if self.when_not_matched_by_source_delete {
return Err(Error::InvalidInput {
message: "MemWAL does not support when_not_matched_by_source_delete()".to_string(),
});
}
Ok(())
}
}
/// Internal implementation of the merge insert logic
@@ -219,14 +165,6 @@ pub(crate) async fn execute_merge_insert(
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<MergeResult> {
if params.mem_wal {
return Err(Error::NotSupported {
message: "MemWAL is not supported for native (local) tables. \
MemWAL is only available for remote (LanceDB Cloud) tables."
.to_string(),
});
}
let dataset = table.dataset.get().await?;
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
match (
@@ -386,139 +324,4 @@ mod tests {
merge_insert_builder.execute(new_batches).await.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 25);
}
#[tokio::test]
async fn test_mem_wal_validation_valid_pattern() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test", batches)
.execute()
.await
.unwrap();
// Valid MemWAL pattern: when_matched_update_all + when_not_matched_insert_all
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_matched_update_all(None);
builder.when_not_matched_insert_all();
builder.mem_wal(true);
// Should fail because native tables don't support MemWAL, but validation passes
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("MemWAL is not supported for native"),
"Expected native table error, got: {}",
err
);
}
#[tokio::test]
async fn test_mem_wal_validation_missing_when_matched() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test2", batches)
.execute()
.await
.unwrap();
// Missing when_matched_update_all
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_not_matched_insert_all();
builder.mem_wal(true);
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("requires when_matched_update_all"),
"Expected validation error, got: {}",
err
);
}
#[tokio::test]
async fn test_mem_wal_validation_missing_when_not_matched() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test3", batches)
.execute()
.await
.unwrap();
// Missing when_not_matched_insert_all
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_matched_update_all(None);
builder.mem_wal(true);
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("requires when_not_matched_insert_all"),
"Expected validation error, got: {}",
err
);
}
#[tokio::test]
async fn test_mem_wal_validation_with_filter() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test4", batches)
.execute()
.await
.unwrap();
// With conditional filter - not allowed
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_matched_update_all(Some("target.age > 0".to_string()));
builder.when_not_matched_insert_all();
builder.mem_wal(true);
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("does not support conditional"),
"Expected filter validation error, got: {}",
err
);
}
#[tokio::test]
async fn test_mem_wal_validation_with_delete() {
let conn = connect("memory://").execute().await.unwrap();
let batches = merge_insert_test_batches(0, 0);
let table = conn
.create_table("mem_wal_test5", batches)
.execute()
.await
.unwrap();
// With when_not_matched_by_source_delete - not allowed
let new_batches = merge_insert_test_batches(5, 1);
let mut builder = table.merge_insert(&["i"]);
builder.when_matched_update_all(None);
builder.when_not_matched_insert_all();
builder.when_not_matched_by_source_delete(None);
builder.mem_wal(true);
let result = builder.execute(new_batches).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("does not support when_not_matched_by_source_delete"),
"Expected delete validation error, got: {}",
err
);
}
}

View File

@@ -40,8 +40,10 @@ pub async fn execute_query(
query: &AnyQuery,
options: QueryExecutionOptions,
) -> Result<DatasetRecordBatchStream> {
// If namespace client is configured, use server-side query execution
if let Some(ref namespace_client) = table.namespace_client {
// If server-side query is enabled and namespace client is configured, use server-side query execution
if table.server_side_query_enabled
&& let Some(ref namespace_client) = table.namespace_client
{
return execute_namespace_query(table, namespace_client.clone(), query, options).await;
}
execute_generic_query(table, query, options).await