Compare commits

...

4 Commits

Author SHA1 Message Date
Lance Release
f31561c5bb Bump version: 0.30.0-beta.3 → 0.30.0-beta.4 2026-03-09 08:45:25 +00:00
Jack Ye
e0c5ceac03 fix: propagate managed versioning for namespace connection (#3111)
Without this fix, if user directly use the native table to do operations
like `add_columns`, even if it is configured to use namespace db
connection, it is not really propagated through.

The fix is to bring lancedb's python binding up to date and do a similar
implementation as https://github.com/lance-format/lance/pull/5968, and
make sure the namespace is fully propagated through all the related
calls.

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-09 01:44:31 -07:00
Prashanth Rao
e93bb3355a docs: add meth/func names to mkdocstrings (#3101)
LanceDB's SDK API docs do not currently show method names under any
given object, and this makes it harder to quickly understand and find
relevant method names for a given class. Geneva docs show the available
methods in the right navigation.

This PR standardizes the appearance of the LanceDB SDK API in the docs
to be more similar to Geneva's.
<img width="1386" height="792" alt="image"
src="https://github.com/user-attachments/assets/30816591-d6d5-495d-886d-e234beeb6059"
/>

<img width="897" height="540" alt="image"
src="https://github.com/user-attachments/assets/d5491b6b-c7bf-4d3b-8b15-1a1a7700e7c9"
/>
2026-03-06 08:54:45 -08:00
Will Jones
b75991eb07 fix: propagate cast errors in add() (#3075)
When we write data with `add()`, we can input data to the table's
schema. However, we were using "safe" mode, which propagates errors as
nulls. For example, if you pass `u64::max` into a field that is a `u32`,
it will just write null instead of giving overflow error. Now it
propagates the overflow. This is the same behavior as other systems like
DuckDB.

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-05 20:24:50 -08:00
22 changed files with 1233 additions and 167 deletions

4
Cargo.lock generated
View File

@@ -4855,16 +4855,20 @@ version = "0.30.0-beta.3"
dependencies = [
"arrow",
"async-trait",
"bytes",
"env_logger",
"futures",
"lance-core",
"lance-io",
"lance-namespace",
"lance-namespace-impls",
"lancedb",
"pin-project",
"pyo3",
"pyo3-async-runtimes",
"pyo3-build-config",
"serde",
"serde_json",
"snafu",
"tokio",
]

View File

@@ -52,14 +52,21 @@ plugins:
options:
docstring_style: numpy
heading_level: 3
show_source: true
show_symbol_type_in_heading: true
show_signature_annotations: true
show_root_heading: true
show_docstring_examples: true
show_docstring_attributes: false
show_docstring_other_parameters: true
show_symbol_type_heading: true
show_labels: false
show_if_no_docstring: true
show_source: false
members_order: source
docstring_section_style: list
signature_crossrefs: true
separate_signature: true
filters:
- "!^_"
import:
# for cross references
- https://arrow.apache.org/docs/objects.inv
@@ -113,7 +120,7 @@ markdown_extensions:
emoji_index: !!python/name:material.extensions.emoji.twemoji
emoji_generator: !!python/name:material.extensions.emoji.to_svg
- markdown.extensions.toc:
toc_depth: 3
toc_depth: 4
permalink: true
permalink_title: Anchor link to this section

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.30.0-beta.3"
current_version = "0.30.0-beta.4"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.30.0-beta.3"
version = "0.30.0-beta.4"
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true
@@ -16,9 +16,11 @@ crate-type = ["cdylib"]
[dependencies]
arrow = { version = "57.2", features = ["pyarrow"] }
async-trait = "0.1"
bytes = "1"
lancedb = { path = "../rust/lancedb", default-features = false }
lance-core.workspace = true
lance-namespace.workspace = true
lance-namespace-impls.workspace = true
lance-io.workspace = true
env_logger.workspace = true
pyo3 = { version = "0.26", features = ["extension-module", "abi3-py39"] }
@@ -28,6 +30,8 @@ pyo3-async-runtimes = { version = "0.26", features = [
] }
pin-project = "1.1.5"
futures.workspace = true
serde = "1"
serde_json = "1"
snafu.workspace = true
tokio = { version = "1.40", features = ["sync"] }

View File

@@ -45,7 +45,7 @@ repository = "https://github.com/lancedb/lancedb"
[project.optional-dependencies]
pylance = [
"pylance>=1.0.0b14",
"pylance>=4.0.0b7",
]
tests = [
"aiohttp",
@@ -59,9 +59,9 @@ tests = [
"polars>=0.19, <=1.3.0",
"tantivy",
"pyarrow-stubs",
"pylance>=1.0.0b14,<3.0.0",
"pylance>=4.0.0b7",
"requests",
"datafusion<52",
"datafusion>=52,<53",
]
dev = [
"ruff",

View File

@@ -8,7 +8,7 @@ from abc import abstractmethod
from datetime import timedelta
from pathlib import Path
import sys
from typing import TYPE_CHECKING, Dict, Iterable, List, Literal, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Literal, Optional, Union
if sys.version_info >= (3, 12):
from typing import override
@@ -1541,6 +1541,8 @@ class AsyncConnection(object):
storage_options_provider: Optional["StorageOptionsProvider"] = None,
index_cache_size: Optional[int] = None,
location: Optional[str] = None,
namespace_client: Optional[Any] = None,
managed_versioning: Optional[bool] = None,
) -> AsyncTable:
"""Open a Lance Table in the database.
@@ -1573,6 +1575,9 @@ class AsyncConnection(object):
The explicit location (URI) of the table. If provided, the table will be
opened from this location instead of deriving it from the database URI
and table name.
managed_versioning: bool, optional
Whether managed versioning is enabled for this table. If provided,
avoids a redundant describe_table call when namespace_client is set.
Returns
-------
@@ -1587,6 +1592,8 @@ class AsyncConnection(object):
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=location,
namespace_client=namespace_client,
managed_versioning=managed_versioning,
)
return AsyncTable(table)

View File

@@ -12,7 +12,7 @@ from __future__ import annotations
import asyncio
import sys
from typing import Dict, Iterable, List, Optional, Union
from typing import Any, Dict, Iterable, List, Optional, Union
if sys.version_info >= (3, 12):
from typing import override
@@ -240,7 +240,7 @@ class LanceNamespaceDBConnection(DBConnection):
session : Optional[Session]
A session to use for this connection
"""
self._ns = namespace
self._namespace_client = namespace
self.read_consistency_interval = read_consistency_interval
self.storage_options = storage_options or {}
self.session = session
@@ -269,7 +269,7 @@ class LanceNamespaceDBConnection(DBConnection):
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._ns.list_tables(request)
response = self._namespace_client.list_tables(request)
return response.tables if response.tables else []
@override
@@ -309,7 +309,9 @@ class LanceNamespaceDBConnection(DBConnection):
# Try to describe the table first to see if it exists
try:
describe_request = DescribeTableRequest(id=table_id)
describe_response = self._ns.describe_table(describe_request)
describe_response = self._namespace_client.describe_table(
describe_request
)
location = describe_response.location
namespace_storage_options = describe_response.storage_options
except Exception:
@@ -323,7 +325,7 @@ class LanceNamespaceDBConnection(DBConnection):
location=None,
properties=self.storage_options if self.storage_options else None,
)
declare_response = self._ns.declare_table(declare_request)
declare_response = self._namespace_client.declare_table(declare_request)
if not declare_response.location:
raise ValueError(
@@ -353,7 +355,7 @@ class LanceNamespaceDBConnection(DBConnection):
# Only create if namespace returned storage_options (not None)
if storage_options_provider is None and namespace_storage_options is not None:
storage_options_provider = LanceNamespaceStorageOptionsProvider(
namespace=self._ns,
namespace=self._namespace_client,
table_id=table_id,
)
@@ -371,6 +373,7 @@ class LanceNamespaceDBConnection(DBConnection):
storage_options=merged_storage_options,
storage_options_provider=storage_options_provider,
location=location,
namespace_client=self._namespace_client,
)
return tbl
@@ -389,7 +392,7 @@ class LanceNamespaceDBConnection(DBConnection):
namespace = []
table_id = namespace + [name]
request = DescribeTableRequest(id=table_id)
response = self._ns.describe_table(request)
response = self._namespace_client.describe_table(request)
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
@@ -402,10 +405,14 @@ class LanceNamespaceDBConnection(DBConnection):
# Only create if namespace returned storage_options (not None)
if storage_options_provider is None and response.storage_options is not None:
storage_options_provider = LanceNamespaceStorageOptionsProvider(
namespace=self._ns,
namespace=self._namespace_client,
table_id=table_id,
)
# Pass managed_versioning to avoid redundant describe_table call in Rust.
# Convert None to False since we already have the answer from describe_table.
managed_versioning = response.managed_versioning is True
return self._lance_table_from_uri(
name,
response.location,
@@ -413,6 +420,8 @@ class LanceNamespaceDBConnection(DBConnection):
storage_options=merged_storage_options,
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
namespace_client=self._namespace_client,
managed_versioning=managed_versioning,
)
@override
@@ -422,7 +431,7 @@ class LanceNamespaceDBConnection(DBConnection):
namespace = []
table_id = namespace + [name]
request = DropTableRequest(id=table_id)
self._ns.drop_table(request)
self._namespace_client.drop_table(request)
@override
def rename_table(
@@ -484,7 +493,7 @@ class LanceNamespaceDBConnection(DBConnection):
request = ListNamespacesRequest(
id=namespace, page_token=page_token, limit=limit
)
response = self._ns.list_namespaces(request)
response = self._namespace_client.list_namespaces(request)
return ListNamespacesResponse(
namespaces=response.namespaces if response.namespaces else [],
page_token=response.page_token,
@@ -520,7 +529,7 @@ class LanceNamespaceDBConnection(DBConnection):
mode=_normalize_create_namespace_mode(mode),
properties=properties,
)
response = self._ns.create_namespace(request)
response = self._namespace_client.create_namespace(request)
return CreateNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@@ -555,7 +564,7 @@ class LanceNamespaceDBConnection(DBConnection):
mode=_normalize_drop_namespace_mode(mode),
behavior=_normalize_drop_namespace_behavior(behavior),
)
response = self._ns.drop_namespace(request)
response = self._namespace_client.drop_namespace(request)
return DropNamespaceResponse(
properties=(
response.properties if hasattr(response, "properties") else None
@@ -581,7 +590,7 @@ class LanceNamespaceDBConnection(DBConnection):
Response containing the namespace properties.
"""
request = DescribeNamespaceRequest(id=namespace)
response = self._ns.describe_namespace(request)
response = self._namespace_client.describe_namespace(request)
return DescribeNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@@ -615,7 +624,7 @@ class LanceNamespaceDBConnection(DBConnection):
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._ns.list_tables(request)
response = self._namespace_client.list_tables(request)
return ListTablesResponse(
tables=response.tables if response.tables else [],
page_token=response.page_token,
@@ -630,6 +639,8 @@ class LanceNamespaceDBConnection(DBConnection):
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
index_cache_size: Optional[int] = None,
namespace_client: Optional[Any] = None,
managed_versioning: Optional[bool] = None,
) -> LanceTable:
# Open a table directly from a URI using the location parameter
# Note: storage_options should already be merged by the caller
@@ -643,6 +654,8 @@ class LanceNamespaceDBConnection(DBConnection):
)
# Open the table using the temporary connection with the location parameter
# Pass namespace_client to enable managed versioning support
# Pass managed_versioning to avoid redundant describe_table call
return LanceTable.open(
temp_conn,
name,
@@ -651,6 +664,8 @@ class LanceNamespaceDBConnection(DBConnection):
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=table_uri,
namespace_client=namespace_client,
managed_versioning=managed_versioning,
)
@@ -685,7 +700,7 @@ class AsyncLanceNamespaceDBConnection:
session : Optional[Session]
A session to use for this connection
"""
self._ns = namespace
self._namespace_client = namespace
self.read_consistency_interval = read_consistency_interval
self.storage_options = storage_options or {}
self.session = session
@@ -713,7 +728,7 @@ class AsyncLanceNamespaceDBConnection:
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._ns.list_tables(request)
response = self._namespace_client.list_tables(request)
return response.tables if response.tables else []
async def create_table(
@@ -750,7 +765,9 @@ class AsyncLanceNamespaceDBConnection:
# Try to describe the table first to see if it exists
try:
describe_request = DescribeTableRequest(id=table_id)
describe_response = self._ns.describe_table(describe_request)
describe_response = self._namespace_client.describe_table(
describe_request
)
location = describe_response.location
namespace_storage_options = describe_response.storage_options
except Exception:
@@ -764,7 +781,7 @@ class AsyncLanceNamespaceDBConnection:
location=None,
properties=self.storage_options if self.storage_options else None,
)
declare_response = self._ns.declare_table(declare_request)
declare_response = self._namespace_client.declare_table(declare_request)
if not declare_response.location:
raise ValueError(
@@ -797,7 +814,7 @@ class AsyncLanceNamespaceDBConnection:
and namespace_storage_options is not None
):
provider = LanceNamespaceStorageOptionsProvider(
namespace=self._ns,
namespace=self._namespace_client,
table_id=table_id,
)
else:
@@ -817,6 +834,7 @@ class AsyncLanceNamespaceDBConnection:
storage_options=merged_storage_options,
storage_options_provider=provider,
location=location,
namespace_client=self._namespace_client,
)
lance_table = await asyncio.to_thread(_create_table)
@@ -837,7 +855,7 @@ class AsyncLanceNamespaceDBConnection:
namespace = []
table_id = namespace + [name]
request = DescribeTableRequest(id=table_id)
response = self._ns.describe_table(request)
response = self._namespace_client.describe_table(request)
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
@@ -849,10 +867,14 @@ class AsyncLanceNamespaceDBConnection:
# Create a storage options provider if not provided by user
if storage_options_provider is None and response.storage_options is not None:
storage_options_provider = LanceNamespaceStorageOptionsProvider(
namespace=self._ns,
namespace=self._namespace_client,
table_id=table_id,
)
# Capture managed_versioning from describe response.
# Convert None to False since we already have the answer from describe_table.
managed_versioning = response.managed_versioning is True
# Open table in a thread
def _open_table():
temp_conn = LanceDBConnection(
@@ -870,6 +892,8 @@ class AsyncLanceNamespaceDBConnection:
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=response.location,
namespace_client=self._namespace_client,
managed_versioning=managed_versioning,
)
lance_table = await asyncio.to_thread(_open_table)
@@ -881,7 +905,7 @@ class AsyncLanceNamespaceDBConnection:
namespace = []
table_id = namespace + [name]
request = DropTableRequest(id=table_id)
self._ns.drop_table(request)
self._namespace_client.drop_table(request)
async def rename_table(
self,
@@ -943,7 +967,7 @@ class AsyncLanceNamespaceDBConnection:
request = ListNamespacesRequest(
id=namespace, page_token=page_token, limit=limit
)
response = self._ns.list_namespaces(request)
response = self._namespace_client.list_namespaces(request)
return ListNamespacesResponse(
namespaces=response.namespaces if response.namespaces else [],
page_token=response.page_token,
@@ -978,7 +1002,7 @@ class AsyncLanceNamespaceDBConnection:
mode=_normalize_create_namespace_mode(mode),
properties=properties,
)
response = self._ns.create_namespace(request)
response = self._namespace_client.create_namespace(request)
return CreateNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@@ -1012,7 +1036,7 @@ class AsyncLanceNamespaceDBConnection:
mode=_normalize_drop_namespace_mode(mode),
behavior=_normalize_drop_namespace_behavior(behavior),
)
response = self._ns.drop_namespace(request)
response = self._namespace_client.drop_namespace(request)
return DropNamespaceResponse(
properties=(
response.properties if hasattr(response, "properties") else None
@@ -1039,7 +1063,7 @@ class AsyncLanceNamespaceDBConnection:
Response containing the namespace properties.
"""
request = DescribeNamespaceRequest(id=namespace)
response = self._ns.describe_namespace(request)
response = self._namespace_client.describe_namespace(request)
return DescribeNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@@ -1072,7 +1096,7 @@ class AsyncLanceNamespaceDBConnection:
if namespace is None:
namespace = []
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._ns.list_tables(request)
response = self._namespace_client.list_tables(request)
return ListTablesResponse(
tables=response.tables if response.tables else [],
page_token=response.page_token,

View File

@@ -1746,6 +1746,8 @@ class LanceTable(Table):
storage_options_provider: Optional["StorageOptionsProvider"] = None,
index_cache_size: Optional[int] = None,
location: Optional[str] = None,
namespace_client: Optional[Any] = None,
managed_versioning: Optional[bool] = None,
_async: AsyncTable = None,
):
if namespace is None:
@@ -1753,6 +1755,7 @@ class LanceTable(Table):
self._conn = connection
self._namespace = namespace
self._location = location # Store location for use in _dataset_path
self._namespace_client = namespace_client
if _async is not None:
self._table = _async
else:
@@ -1764,6 +1767,8 @@ class LanceTable(Table):
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=location,
namespace_client=namespace_client,
managed_versioning=managed_versioning,
)
)
@@ -1806,6 +1811,8 @@ class LanceTable(Table):
storage_options_provider: Optional["StorageOptionsProvider"] = None,
index_cache_size: Optional[int] = None,
location: Optional[str] = None,
namespace_client: Optional[Any] = None,
managed_versioning: Optional[bool] = None,
):
if namespace is None:
namespace = []
@@ -1817,6 +1824,8 @@ class LanceTable(Table):
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=location,
namespace_client=namespace_client,
managed_versioning=managed_versioning,
)
# check the dataset exists
@@ -1848,6 +1857,16 @@ class LanceTable(Table):
"Please install with `pip install pylance`."
)
if self._namespace_client is not None:
table_id = self._namespace + [self.name]
return lance.dataset(
version=self.version,
storage_options=self._conn.storage_options,
namespace=self._namespace_client,
table_id=table_id,
**kwargs,
)
return lance.dataset(
self._dataset_path,
version=self.version,
@@ -2713,6 +2732,7 @@ class LanceTable(Table):
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
location: Optional[str] = None,
namespace_client: Optional[Any] = None,
):
"""
Create a new table.
@@ -2773,6 +2793,7 @@ class LanceTable(Table):
self._conn = db
self._namespace = namespace
self._location = location
self._namespace_client = namespace_client
if data_storage_version is not None:
warnings.warn(

View File

@@ -326,6 +326,24 @@ def test_add_struct(mem_db: DBConnection):
table = mem_db.create_table("test2", schema=schema)
table.add(data)
struct_type = pa.struct(
[
("b", pa.int64()),
("a", pa.int64()),
]
)
expected = pa.table(
{
"s_list": [
[
pa.scalar({"b": 1, "a": 2}, type=struct_type),
pa.scalar({"b": 4, "a": None}, type=struct_type),
]
],
}
)
assert table.to_arrow() == expected
def test_add_subschema(mem_db: DBConnection):
schema = pa.schema(

View File

@@ -17,7 +17,8 @@ use pyo3::{
use pyo3_async_runtimes::tokio::future_into_py;
use crate::{
error::PythonErrorExt, storage_options::py_object_to_storage_options_provider, table::Table,
error::PythonErrorExt, namespace::extract_namespace_arc,
storage_options::py_object_to_storage_options_provider, table::Table,
};
#[pyclass]
@@ -182,7 +183,8 @@ impl Connection {
})
}
#[pyo3(signature = (name, namespace=vec![], storage_options = None, storage_options_provider=None, index_cache_size = None, location=None))]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (name, namespace=vec![], storage_options = None, storage_options_provider=None, index_cache_size = None, location=None, namespace_client=None, managed_versioning=None))]
pub fn open_table(
self_: PyRef<'_, Self>,
name: String,
@@ -191,11 +193,13 @@ impl Connection {
storage_options_provider: Option<Py<PyAny>>,
index_cache_size: Option<u32>,
location: Option<String>,
namespace_client: Option<Py<PyAny>>,
managed_versioning: Option<bool>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
let mut builder = inner.open_table(name);
builder = builder.namespace(namespace);
builder = builder.namespace(namespace.clone());
if let Some(storage_options) = storage_options {
builder = builder.storage_options(storage_options);
}
@@ -209,6 +213,20 @@ impl Connection {
if let Some(location) = location {
builder = builder.location(location);
}
// Extract namespace client from Python object if provided
let ns_client = if let Some(ns_obj) = namespace_client {
let py = self_.py();
Some(extract_namespace_arc(py, ns_obj)?)
} else {
None
};
if let Some(ns_client) = ns_client {
builder = builder.namespace_client(ns_client);
}
// Pass managed_versioning if provided to avoid redundant describe_table call
if let Some(enabled) = managed_versioning {
builder = builder.managed_versioning(enabled);
}
future_into_py(self_.py(), async move {
let table = builder.execute().await.infer_error()?;

View File

@@ -23,6 +23,7 @@ pub mod connection;
pub mod error;
pub mod header;
pub mod index;
pub mod namespace;
pub mod permutation;
pub mod query;
pub mod session;

746
python/src/namespace.rs Normal file
View File

@@ -0,0 +1,746 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Namespace utilities for Python bindings
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use lance_namespace::LanceNamespace as LanceNamespaceTrait;
use lance_namespace::models::*;
use pyo3::prelude::*;
use pyo3::types::PyDict;
/// Wrapper that allows any Python object implementing LanceNamespace protocol
/// to be used as a Rust LanceNamespace.
///
/// This is similar to PyLanceNamespace in lance's Python bindings - it wraps a Python
/// object and calls back into Python when namespace methods are invoked.
pub struct PyLanceNamespace {
py_namespace: Arc<Py<PyAny>>,
namespace_id: String,
}
impl PyLanceNamespace {
/// Create a new PyLanceNamespace wrapper around a Python namespace object.
pub fn new(_py: Python<'_>, py_namespace: &Bound<'_, PyAny>) -> PyResult<Self> {
let namespace_id = py_namespace
.call_method0("namespace_id")?
.extract::<String>()?;
Ok(Self {
py_namespace: Arc::new(py_namespace.clone().unbind()),
namespace_id,
})
}
/// Create an Arc<dyn LanceNamespace> from a Python namespace object.
pub fn create_arc(
py: Python<'_>,
py_namespace: &Bound<'_, PyAny>,
) -> PyResult<Arc<dyn LanceNamespaceTrait>> {
let wrapper = Self::new(py, py_namespace)?;
Ok(Arc::new(wrapper))
}
}
impl std::fmt::Debug for PyLanceNamespace {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PyLanceNamespace {{ id: {} }}", self.namespace_id)
}
}
/// Get or create the DictWithModelDump class in Python.
/// This class acts like a dict but also has model_dump() method.
/// This allows it to work with both:
/// - depythonize (which expects a dict/Mapping)
/// - Python code that calls .model_dump() (like DirectoryNamespace wrapper)
fn get_dict_with_model_dump_class(py: Python<'_>) -> PyResult<Bound<'_, PyAny>> {
// Use a module-level cache via __builtins__
let builtins = py.import("builtins")?;
if builtins.hasattr("_DictWithModelDump")? {
return builtins.getattr("_DictWithModelDump");
}
// Create the class using exec
let locals = PyDict::new(py);
py.run(
c"class DictWithModelDump(dict):
def model_dump(self):
return dict(self)",
None,
Some(&locals),
)?;
let class = locals.get_item("DictWithModelDump")?.ok_or_else(|| {
pyo3::exceptions::PyRuntimeError::new_err("Failed to create DictWithModelDump class")
})?;
// Cache it
builtins.setattr("_DictWithModelDump", &class)?;
Ok(class)
}
/// Helper to call a Python namespace method with JSON serialization.
/// For methods that take a request and return a response.
/// Uses DictWithModelDump to pass a dict that also has model_dump() method,
/// making it compatible with both depythonize and Python wrappers.
async fn call_py_method<Req, Resp>(
py_namespace: Arc<Py<PyAny>>,
method_name: &'static str,
request: Req,
) -> lance_core::Result<Resp>
where
Req: serde::Serialize + Send + 'static,
Resp: serde::de::DeserializeOwned + Send + 'static,
{
let request_json = serde_json::to_string(&request).map_err(|e| {
lance_core::Error::io(
format!("Failed to serialize request for {}: {}", method_name, e),
Default::default(),
)
})?;
let response_json = tokio::task::spawn_blocking(move || {
Python::attach(|py| {
let json_module = py.import("json")?;
let request_dict = json_module.call_method1("loads", (&request_json,))?;
// Wrap dict in DictWithModelDump so it works with both depythonize and .model_dump()
let dict_class = get_dict_with_model_dump_class(py)?;
let request_arg = dict_class.call1((request_dict,))?;
// Call the Python method
let result = py_namespace.call_method1(py, method_name, (request_arg,))?;
// Convert response to dict, then to JSON
// Pydantic models have model_dump() method
let result_dict = if result.bind(py).hasattr("model_dump")? {
result.call_method0(py, "model_dump")?
} else {
result
};
let response_json: String = json_module
.call_method1("dumps", (result_dict,))?
.extract()?;
Ok::<_, PyErr>(response_json)
})
})
.await
.map_err(|e| {
lance_core::Error::io(
format!("Task join error for {}: {}", method_name, e),
Default::default(),
)
})?
.map_err(|e: PyErr| {
lance_core::Error::io(
format!("Python error in {}: {}", method_name, e),
Default::default(),
)
})?;
serde_json::from_str(&response_json).map_err(|e| {
lance_core::Error::io(
format!("Failed to deserialize response from {}: {}", method_name, e),
Default::default(),
)
})
}
/// Helper for methods that return () on success
async fn call_py_method_unit<Req>(
py_namespace: Arc<Py<PyAny>>,
method_name: &'static str,
request: Req,
) -> lance_core::Result<()>
where
Req: serde::Serialize + Send + 'static,
{
let request_json = serde_json::to_string(&request).map_err(|e| {
lance_core::Error::io(
format!("Failed to serialize request for {}: {}", method_name, e),
Default::default(),
)
})?;
tokio::task::spawn_blocking(move || {
Python::attach(|py| {
let json_module = py.import("json")?;
let request_dict = json_module.call_method1("loads", (&request_json,))?;
// Wrap dict in DictWithModelDump
let dict_class = get_dict_with_model_dump_class(py)?;
let request_arg = dict_class.call1((request_dict,))?;
// Call the Python method
py_namespace.call_method1(py, method_name, (request_arg,))?;
Ok::<_, PyErr>(())
})
})
.await
.map_err(|e| {
lance_core::Error::io(
format!("Task join error for {}: {}", method_name, e),
Default::default(),
)
})?
.map_err(|e: PyErr| {
lance_core::Error::io(
format!("Python error in {}: {}", method_name, e),
Default::default(),
)
})
}
/// Helper for methods that return a primitive type
async fn call_py_method_primitive<Req, Resp>(
py_namespace: Arc<Py<PyAny>>,
method_name: &'static str,
request: Req,
) -> lance_core::Result<Resp>
where
Req: serde::Serialize + Send + 'static,
Resp: for<'py> pyo3::FromPyObject<'py> + Send + 'static,
{
let request_json = serde_json::to_string(&request).map_err(|e| {
lance_core::Error::io(
format!("Failed to serialize request for {}: {}", method_name, e),
Default::default(),
)
})?;
tokio::task::spawn_blocking(move || {
Python::attach(|py| {
let json_module = py.import("json")?;
let request_dict = json_module.call_method1("loads", (&request_json,))?;
// Wrap dict in DictWithModelDump
let dict_class = get_dict_with_model_dump_class(py)?;
let request_arg = dict_class.call1((request_dict,))?;
// Call the Python method
let result = py_namespace.call_method1(py, method_name, (request_arg,))?;
let value: Resp = result.extract(py)?;
Ok::<_, PyErr>(value)
})
})
.await
.map_err(|e| {
lance_core::Error::io(
format!("Task join error for {}: {}", method_name, e),
Default::default(),
)
})?
.map_err(|e: PyErr| {
lance_core::Error::io(
format!("Python error in {}: {}", method_name, e),
Default::default(),
)
})
}
/// Helper for methods that return Bytes
async fn call_py_method_bytes<Req>(
py_namespace: Arc<Py<PyAny>>,
method_name: &'static str,
request: Req,
) -> lance_core::Result<Bytes>
where
Req: serde::Serialize + Send + 'static,
{
let request_json = serde_json::to_string(&request).map_err(|e| {
lance_core::Error::io(
format!("Failed to serialize request for {}: {}", method_name, e),
Default::default(),
)
})?;
tokio::task::spawn_blocking(move || {
Python::attach(|py| {
let json_module = py.import("json")?;
let request_dict = json_module.call_method1("loads", (&request_json,))?;
// Wrap dict in DictWithModelDump
let dict_class = get_dict_with_model_dump_class(py)?;
let request_arg = dict_class.call1((request_dict,))?;
// Call the Python method
let result = py_namespace.call_method1(py, method_name, (request_arg,))?;
let bytes_data: Vec<u8> = result.extract(py)?;
Ok::<_, PyErr>(Bytes::from(bytes_data))
})
})
.await
.map_err(|e| {
lance_core::Error::io(
format!("Task join error for {}: {}", method_name, e),
Default::default(),
)
})?
.map_err(|e: PyErr| {
lance_core::Error::io(
format!("Python error in {}: {}", method_name, e),
Default::default(),
)
})
}
/// Helper for methods that take request + data and return a response
async fn call_py_method_with_data<Req, Resp>(
py_namespace: Arc<Py<PyAny>>,
method_name: &'static str,
request: Req,
data: Bytes,
) -> lance_core::Result<Resp>
where
Req: serde::Serialize + Send + 'static,
Resp: serde::de::DeserializeOwned + Send + 'static,
{
let request_json = serde_json::to_string(&request).map_err(|e| {
lance_core::Error::io(
format!("Failed to serialize request for {}: {}", method_name, e),
Default::default(),
)
})?;
let response_json = tokio::task::spawn_blocking(move || {
Python::attach(|py| {
let json_module = py.import("json")?;
let request_dict = json_module.call_method1("loads", (&request_json,))?;
// Wrap dict in DictWithModelDump
let dict_class = get_dict_with_model_dump_class(py)?;
let request_arg = dict_class.call1((request_dict,))?;
// Pass request and bytes to Python method
let py_bytes = pyo3::types::PyBytes::new(py, &data);
let result = py_namespace.call_method1(py, method_name, (request_arg, py_bytes))?;
// Convert response dict to JSON
let response_json: String = json_module.call_method1("dumps", (result,))?.extract()?;
Ok::<_, PyErr>(response_json)
})
})
.await
.map_err(|e| {
lance_core::Error::io(
format!("Task join error for {}: {}", method_name, e),
Default::default(),
)
})?
.map_err(|e: PyErr| {
lance_core::Error::io(
format!("Python error in {}: {}", method_name, e),
Default::default(),
)
})?;
serde_json::from_str(&response_json).map_err(|e| {
lance_core::Error::io(
format!("Failed to deserialize response from {}: {}", method_name, e),
Default::default(),
)
})
}
#[async_trait]
impl LanceNamespaceTrait for PyLanceNamespace {
fn namespace_id(&self) -> String {
self.namespace_id.clone()
}
async fn list_namespaces(
&self,
request: ListNamespacesRequest,
) -> lance_core::Result<ListNamespacesResponse> {
call_py_method(self.py_namespace.clone(), "list_namespaces", request).await
}
async fn describe_namespace(
&self,
request: DescribeNamespaceRequest,
) -> lance_core::Result<DescribeNamespaceResponse> {
call_py_method(self.py_namespace.clone(), "describe_namespace", request).await
}
async fn create_namespace(
&self,
request: CreateNamespaceRequest,
) -> lance_core::Result<CreateNamespaceResponse> {
call_py_method(self.py_namespace.clone(), "create_namespace", request).await
}
async fn drop_namespace(
&self,
request: DropNamespaceRequest,
) -> lance_core::Result<DropNamespaceResponse> {
call_py_method(self.py_namespace.clone(), "drop_namespace", request).await
}
async fn namespace_exists(&self, request: NamespaceExistsRequest) -> lance_core::Result<()> {
call_py_method_unit(self.py_namespace.clone(), "namespace_exists", request).await
}
async fn list_tables(
&self,
request: ListTablesRequest,
) -> lance_core::Result<ListTablesResponse> {
call_py_method(self.py_namespace.clone(), "list_tables", request).await
}
async fn describe_table(
&self,
request: DescribeTableRequest,
) -> lance_core::Result<DescribeTableResponse> {
call_py_method(self.py_namespace.clone(), "describe_table", request).await
}
async fn register_table(
&self,
request: RegisterTableRequest,
) -> lance_core::Result<RegisterTableResponse> {
call_py_method(self.py_namespace.clone(), "register_table", request).await
}
async fn table_exists(&self, request: TableExistsRequest) -> lance_core::Result<()> {
call_py_method_unit(self.py_namespace.clone(), "table_exists", request).await
}
async fn drop_table(&self, request: DropTableRequest) -> lance_core::Result<DropTableResponse> {
call_py_method(self.py_namespace.clone(), "drop_table", request).await
}
async fn deregister_table(
&self,
request: DeregisterTableRequest,
) -> lance_core::Result<DeregisterTableResponse> {
call_py_method(self.py_namespace.clone(), "deregister_table", request).await
}
async fn count_table_rows(&self, request: CountTableRowsRequest) -> lance_core::Result<i64> {
call_py_method_primitive(self.py_namespace.clone(), "count_table_rows", request).await
}
async fn create_table(
&self,
request: CreateTableRequest,
request_data: Bytes,
) -> lance_core::Result<CreateTableResponse> {
call_py_method_with_data(
self.py_namespace.clone(),
"create_table",
request,
request_data,
)
.await
}
async fn declare_table(
&self,
request: DeclareTableRequest,
) -> lance_core::Result<DeclareTableResponse> {
call_py_method(self.py_namespace.clone(), "declare_table", request).await
}
async fn insert_into_table(
&self,
request: InsertIntoTableRequest,
request_data: Bytes,
) -> lance_core::Result<InsertIntoTableResponse> {
call_py_method_with_data(
self.py_namespace.clone(),
"insert_into_table",
request,
request_data,
)
.await
}
async fn merge_insert_into_table(
&self,
request: MergeInsertIntoTableRequest,
request_data: Bytes,
) -> lance_core::Result<MergeInsertIntoTableResponse> {
call_py_method_with_data(
self.py_namespace.clone(),
"merge_insert_into_table",
request,
request_data,
)
.await
}
async fn update_table(
&self,
request: UpdateTableRequest,
) -> lance_core::Result<UpdateTableResponse> {
call_py_method(self.py_namespace.clone(), "update_table", request).await
}
async fn delete_from_table(
&self,
request: DeleteFromTableRequest,
) -> lance_core::Result<DeleteFromTableResponse> {
call_py_method(self.py_namespace.clone(), "delete_from_table", request).await
}
async fn query_table(&self, request: QueryTableRequest) -> lance_core::Result<Bytes> {
call_py_method_bytes(self.py_namespace.clone(), "query_table", request).await
}
async fn create_table_index(
&self,
request: CreateTableIndexRequest,
) -> lance_core::Result<CreateTableIndexResponse> {
call_py_method(self.py_namespace.clone(), "create_table_index", request).await
}
async fn list_table_indices(
&self,
request: ListTableIndicesRequest,
) -> lance_core::Result<ListTableIndicesResponse> {
call_py_method(self.py_namespace.clone(), "list_table_indices", request).await
}
async fn describe_table_index_stats(
&self,
request: DescribeTableIndexStatsRequest,
) -> lance_core::Result<DescribeTableIndexStatsResponse> {
call_py_method(
self.py_namespace.clone(),
"describe_table_index_stats",
request,
)
.await
}
async fn describe_transaction(
&self,
request: DescribeTransactionRequest,
) -> lance_core::Result<DescribeTransactionResponse> {
call_py_method(self.py_namespace.clone(), "describe_transaction", request).await
}
async fn alter_transaction(
&self,
request: AlterTransactionRequest,
) -> lance_core::Result<AlterTransactionResponse> {
call_py_method(self.py_namespace.clone(), "alter_transaction", request).await
}
async fn create_table_scalar_index(
&self,
request: CreateTableIndexRequest,
) -> lance_core::Result<CreateTableScalarIndexResponse> {
call_py_method(
self.py_namespace.clone(),
"create_table_scalar_index",
request,
)
.await
}
async fn drop_table_index(
&self,
request: DropTableIndexRequest,
) -> lance_core::Result<DropTableIndexResponse> {
call_py_method(self.py_namespace.clone(), "drop_table_index", request).await
}
async fn list_all_tables(
&self,
request: ListTablesRequest,
) -> lance_core::Result<ListTablesResponse> {
call_py_method(self.py_namespace.clone(), "list_all_tables", request).await
}
async fn restore_table(
&self,
request: RestoreTableRequest,
) -> lance_core::Result<RestoreTableResponse> {
call_py_method(self.py_namespace.clone(), "restore_table", request).await
}
async fn rename_table(
&self,
request: RenameTableRequest,
) -> lance_core::Result<RenameTableResponse> {
call_py_method(self.py_namespace.clone(), "rename_table", request).await
}
async fn list_table_versions(
&self,
request: ListTableVersionsRequest,
) -> lance_core::Result<ListTableVersionsResponse> {
call_py_method(self.py_namespace.clone(), "list_table_versions", request).await
}
async fn create_table_version(
&self,
request: CreateTableVersionRequest,
) -> lance_core::Result<CreateTableVersionResponse> {
call_py_method(self.py_namespace.clone(), "create_table_version", request).await
}
async fn describe_table_version(
&self,
request: DescribeTableVersionRequest,
) -> lance_core::Result<DescribeTableVersionResponse> {
call_py_method(self.py_namespace.clone(), "describe_table_version", request).await
}
async fn batch_delete_table_versions(
&self,
request: BatchDeleteTableVersionsRequest,
) -> lance_core::Result<BatchDeleteTableVersionsResponse> {
call_py_method(
self.py_namespace.clone(),
"batch_delete_table_versions",
request,
)
.await
}
async fn update_table_schema_metadata(
&self,
request: UpdateTableSchemaMetadataRequest,
) -> lance_core::Result<UpdateTableSchemaMetadataResponse> {
call_py_method(
self.py_namespace.clone(),
"update_table_schema_metadata",
request,
)
.await
}
async fn get_table_stats(
&self,
request: GetTableStatsRequest,
) -> lance_core::Result<GetTableStatsResponse> {
call_py_method(self.py_namespace.clone(), "get_table_stats", request).await
}
async fn explain_table_query_plan(
&self,
request: ExplainTableQueryPlanRequest,
) -> lance_core::Result<String> {
call_py_method_primitive(
self.py_namespace.clone(),
"explain_table_query_plan",
request,
)
.await
}
async fn analyze_table_query_plan(
&self,
request: AnalyzeTableQueryPlanRequest,
) -> lance_core::Result<String> {
call_py_method_primitive(
self.py_namespace.clone(),
"analyze_table_query_plan",
request,
)
.await
}
async fn alter_table_add_columns(
&self,
request: AlterTableAddColumnsRequest,
) -> lance_core::Result<AlterTableAddColumnsResponse> {
call_py_method(
self.py_namespace.clone(),
"alter_table_add_columns",
request,
)
.await
}
async fn alter_table_alter_columns(
&self,
request: AlterTableAlterColumnsRequest,
) -> lance_core::Result<AlterTableAlterColumnsResponse> {
call_py_method(
self.py_namespace.clone(),
"alter_table_alter_columns",
request,
)
.await
}
async fn alter_table_drop_columns(
&self,
request: AlterTableDropColumnsRequest,
) -> lance_core::Result<AlterTableDropColumnsResponse> {
call_py_method(
self.py_namespace.clone(),
"alter_table_drop_columns",
request,
)
.await
}
async fn list_table_tags(
&self,
request: ListTableTagsRequest,
) -> lance_core::Result<ListTableTagsResponse> {
call_py_method(self.py_namespace.clone(), "list_table_tags", request).await
}
async fn create_table_tag(
&self,
request: CreateTableTagRequest,
) -> lance_core::Result<CreateTableTagResponse> {
call_py_method(self.py_namespace.clone(), "create_table_tag", request).await
}
async fn delete_table_tag(
&self,
request: DeleteTableTagRequest,
) -> lance_core::Result<DeleteTableTagResponse> {
call_py_method(self.py_namespace.clone(), "delete_table_tag", request).await
}
async fn update_table_tag(
&self,
request: UpdateTableTagRequest,
) -> lance_core::Result<UpdateTableTagResponse> {
call_py_method(self.py_namespace.clone(), "update_table_tag", request).await
}
async fn get_table_tag_version(
&self,
request: GetTableTagVersionRequest,
) -> lance_core::Result<GetTableTagVersionResponse> {
call_py_method(self.py_namespace.clone(), "get_table_tag_version", request).await
}
}
/// Convert Python dict to HashMap<String, String>
#[allow(dead_code)]
fn dict_to_hashmap(dict: &Bound<'_, PyDict>) -> PyResult<HashMap<String, String>> {
let mut map = HashMap::new();
for (key, value) in dict.iter() {
let key_str: String = key.extract()?;
let value_str: String = value.extract()?;
map.insert(key_str, value_str);
}
Ok(map)
}
/// Extract an Arc<dyn LanceNamespace> from a Python namespace object.
///
/// This function wraps any Python namespace object with PyLanceNamespace.
/// The PyLanceNamespace wrapper uses DictWithModelDump to pass requests,
/// which works with both:
/// - Native namespaces (DirectoryNamespace, RestNamespace) that use depythonize (expects dict)
/// - Custom Python implementations that call .model_dump() on the request
pub fn extract_namespace_arc(
py: Python<'_>,
ns: Py<PyAny>,
) -> PyResult<Arc<dyn LanceNamespaceTrait>> {
let ns_ref = ns.bind(py);
PyLanceNamespace::create_arc(py, ns_ref)
}

4
python/uv.lock generated
View File

@@ -2006,7 +2006,7 @@ requires-dist = [
{ name = "botocore", marker = "extra == 'embeddings'", specifier = ">=1.31.57" },
{ name = "cohere", marker = "extra == 'embeddings'" },
{ name = "colpali-engine", marker = "extra == 'embeddings'", specifier = ">=0.3.10" },
{ name = "datafusion", marker = "extra == 'tests'" },
{ name = "datafusion", marker = "extra == 'tests'", specifier = "<52" },
{ name = "deprecation" },
{ name = "duckdb", marker = "extra == 'tests'" },
{ name = "google-generativeai", marker = "extra == 'embeddings'" },
@@ -2035,7 +2035,7 @@ requires-dist = [
{ name = "pyarrow-stubs", marker = "extra == 'tests'" },
{ name = "pydantic", specifier = ">=1.10" },
{ name = "pylance", marker = "extra == 'pylance'", specifier = ">=1.0.0b14" },
{ name = "pylance", marker = "extra == 'tests'", specifier = ">=1.0.0b14" },
{ name = "pylance", marker = "extra == 'tests'", specifier = ">=1.0.0b14,<3.0.0" },
{ name = "pyright", marker = "extra == 'dev'" },
{ name = "pytest", marker = "extra == 'tests'" },
{ name = "pytest-asyncio", marker = "extra == 'tests'" },

View File

@@ -136,6 +136,7 @@ impl OpenTableBuilder {
lance_read_params: None,
location: None,
namespace_client: None,
managed_versioning: None,
},
embedding_registry,
}
@@ -235,6 +236,29 @@ impl OpenTableBuilder {
self
}
/// Set a namespace client for managed versioning support.
///
/// When a namespace client is provided and the table has `managed_versioning` enabled,
/// the table will use the namespace's commit handler to notify the namespace of
/// version changes. This enables features like event emission for table modifications.
pub fn namespace_client(mut self, client: Arc<dyn lance_namespace::LanceNamespace>) -> Self {
self.request.namespace_client = Some(client);
self
}
/// Set whether managed versioning is enabled for this table.
///
/// When set to `Some(true)`, the table will use namespace-managed commits.
/// When set to `Some(false)`, the table will use local commits even if namespace_client is set.
/// When set to `None` (default), the value will be fetched from the namespace if namespace_client is set.
///
/// This is typically set when the caller has already queried the namespace and knows the
/// managed_versioning status, avoiding a redundant describe_table call.
pub fn managed_versioning(mut self, enabled: bool) -> Self {
self.request.managed_versioning = Some(enabled);
self
}
/// Open the table
pub async fn execute(self) -> Result<Table> {
let table = self.parent.open_table(self.request).await?;
@@ -294,6 +318,12 @@ impl CloneTableBuilder {
self
}
/// Set a namespace client for managed versioning support.
pub fn namespace_client(mut self, client: Arc<dyn lance_namespace::LanceNamespace>) -> Self {
self.request.namespace_client = Some(client);
self
}
/// Execute the clone operation
pub async fn execute(self) -> Result<Table> {
let parent = self.parent.clone();

View File

@@ -66,6 +66,10 @@ pub struct OpenTableRequest {
/// Optional namespace client for server-side query execution.
/// When set, queries will be executed on the namespace server instead of locally.
pub namespace_client: Option<Arc<dyn LanceNamespace>>,
/// Whether managed versioning is enabled for this table.
/// When Some(true), the table will use namespace-managed commits instead of local commits.
/// When None and namespace_client is provided, the value will be fetched from the namespace.
pub managed_versioning: Option<bool>,
}
impl std::fmt::Debug for OpenTableRequest {
@@ -77,6 +81,7 @@ impl std::fmt::Debug for OpenTableRequest {
.field("lance_read_params", &self.lance_read_params)
.field("location", &self.location)
.field("namespace_client", &self.namespace_client)
.field("managed_versioning", &self.managed_versioning)
.finish()
}
}
@@ -161,6 +166,9 @@ pub struct CloneTableRequest {
/// Whether to perform a shallow clone (true) or deep clone (false). Defaults to true.
/// Currently only shallow clone is supported.
pub is_shallow: bool,
/// Optional namespace client for managed versioning support.
/// When set, enables the commit handler to track table versions through the namespace.
pub namespace_client: Option<Arc<dyn LanceNamespace>>,
}
impl CloneTableRequest {
@@ -172,6 +180,7 @@ impl CloneTableRequest {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
}
}
}

View File

@@ -669,6 +669,7 @@ impl ListingDatabase {
lance_read_params: None,
location: None,
namespace_client: None,
managed_versioning: None,
};
let req = (callback)(req);
let table = self.open_table(req).await?;
@@ -869,6 +870,7 @@ impl Database for ListingDatabase {
Some(write_params),
self.read_consistency_interval,
request.namespace_client,
false, // server_side_query_enabled - listing database doesn't support server-side queries
)
.await
{
@@ -946,7 +948,9 @@ impl Database for ListingDatabase {
self.store_wrapper.clone(),
None,
self.read_consistency_interval,
None,
request.namespace_client,
false, // server_side_query_enabled - listing database doesn't support server-side queries
None, // managed_versioning - will be queried if namespace_client is provided
)
.await?;
@@ -1022,6 +1026,8 @@ impl Database for ListingDatabase {
Some(read_params),
self.read_consistency_interval,
request.namespace_client,
false, // server_side_query_enabled - listing database doesn't support server-side queries
request.managed_versioning, // Pass through managed_versioning from request
)
.await?,
);
@@ -1162,6 +1168,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();
@@ -1222,6 +1229,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();
@@ -1281,6 +1289,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await;
@@ -1317,6 +1326,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: false, // Request deep clone
namespace_client: None,
})
.await;
@@ -1357,6 +1367,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await;
@@ -1397,6 +1408,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await;
@@ -1416,6 +1428,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await;
@@ -1452,6 +1465,7 @@ mod tests {
source_version: Some(1),
source_tag: Some("v1.0".to_string()),
is_shallow: true,
namespace_client: None,
})
.await;
@@ -1525,6 +1539,7 @@ mod tests {
source_version: Some(initial_version),
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();
@@ -1603,6 +1618,7 @@ mod tests {
source_version: None,
source_tag: Some("v1.0".to_string()),
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();
@@ -1654,6 +1670,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();
@@ -1746,6 +1763,7 @@ mod tests {
source_version: None,
source_tag: None,
is_shallow: true,
namespace_client: None,
})
.await
.unwrap();

View File

@@ -7,18 +7,20 @@ use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor};
use lance_namespace::{
LanceNamespace,
models::{
CreateEmptyTableRequest, CreateNamespaceRequest, CreateNamespaceResponse,
DeclareTableRequest, DescribeNamespaceRequest, DescribeNamespaceResponse,
DescribeTableRequest, DropNamespaceRequest, DropNamespaceResponse, DropTableRequest,
ListNamespacesRequest, ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
CreateNamespaceRequest, CreateNamespaceResponse, DeclareTableRequest,
DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, ListNamespacesRequest,
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
},
};
use lance_namespace_impls::ConnectBuilder;
use log::warn;
use lance_table::io::commit::CommitHandler;
use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
use crate::database::ReadConsistency;
use crate::error::{Error, Result};
@@ -206,83 +208,55 @@ impl Database for LanceNamespaceDatabase {
let mut table_id = request.namespace.clone();
table_id.push(request.name.clone());
// Try declare_table first, falling back to create_empty_table for backwards
// compatibility with older namespace clients that don't support declare_table
let declare_request = DeclareTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
let (location, initial_storage_options) =
match self.namespace.declare_table(declare_request).await {
Ok(response) => {
let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response"
.to_string(),
})?;
// Use storage options from response, fall back to self.storage_options
let opts = response
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o| !o.is_empty());
(loc, opts)
}
Err(e) => {
// Check if the error is "not supported" and try create_empty_table as fallback
let err_str = e.to_string().to_lowercase();
if err_str.contains("not supported") || err_str.contains("not implemented") {
warn!(
"declare_table is not supported by the namespace client, \
falling back to deprecated create_empty_table. \
create_empty_table is deprecated and will be removed in Lance 3.0.0. \
Please upgrade your namespace client to support declare_table."
);
#[allow(deprecated)]
let create_empty_request = CreateEmptyTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
let response = self
.namespace
.declare_table(declare_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to declare table: {}", e),
})?;
#[allow(deprecated)]
let create_response = self
.namespace
.create_empty_table(create_empty_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to create empty table: {}", e),
})?;
let location = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response".to_string(),
})?;
let loc = create_response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from create_empty_table response"
.to_string(),
})?;
// For deprecated path, use self.storage_options
let opts = if self.storage_options.is_empty() {
None
} else {
Some(self.storage_options.clone())
};
(loc, opts)
} else {
return Err(Error::Runtime {
message: format!("Failed to declare table: {}", e),
});
}
}
};
// Use storage options from response, fall back to self.storage_options
let initial_storage_options = response
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o| !o.is_empty());
let write_params = if let Some(storage_opts) = initial_storage_options {
let mut params = request.write_options.lance_write_params.unwrap_or_default();
let managed_versioning = response.managed_versioning;
// Build write params with storage options and commit handler
let mut params = request.write_options.lance_write_params.unwrap_or_default();
// Set up storage options if provided
if let Some(storage_opts) = initial_storage_options {
let store_params = params
.store_params
.get_or_insert_with(ObjectStoreParams::default);
store_params.storage_options_accessor = Some(Arc::new(
StorageOptionsAccessor::with_static_options(storage_opts),
));
Some(params)
} else {
request.write_options.lance_write_params
};
}
// Set up commit handler when managed_versioning is enabled
if managed_versioning == Some(true) {
let external_store =
LanceNamespaceExternalManifestStore::new(self.namespace.clone(), table_id.clone());
let commit_handler: Arc<dyn CommitHandler> = Arc::new(ExternalManifestCommitHandler {
external_manifest_store: Arc::new(external_store),
});
params.commit_handler = Some(commit_handler);
}
let write_params = Some(params);
let native_table = NativeTable::create_from_namespace(
self.namespace.clone(),

View File

@@ -464,6 +464,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
lance_read_params: None,
location: None,
namespace_client: None,
managed_versioning: None,
};
let req = (callback)(req);
self.open_table(req).await

View File

@@ -34,9 +34,13 @@ use lance_index::vector::sq::builder::SQBuildParams;
use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsAccessor};
pub use query::AnyQuery;
use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
use lance_namespace::LanceNamespace;
use lance_namespace::models::DescribeTableRequest;
use lance_table::format::Manifest;
use lance_table::io::commit::CommitHandler;
use lance_table::io::commit::ManifestNamingScheme;
use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::format;
@@ -1212,10 +1216,13 @@ pub struct NativeTable {
// This comes from the connection options. We store here so we can pass down
// to the dataset when we recreate it (for example, in checkout_latest).
read_consistency_interval: Option<std::time::Duration>,
// Optional namespace client for server-side query execution.
// When set, queries will be executed on the namespace server instead of locally.
// pub (crate) namespace_client so query.rs can access the fields
// Optional namespace client for namespace operations (e.g., managed versioning).
// pub(crate) so query.rs can access the field for server-side query execution.
pub(crate) namespace_client: Option<Arc<dyn LanceNamespace>>,
// Whether to enable server-side query execution via the namespace client.
// When true and namespace_client is set, queries will be executed on the
// namespace server instead of locally.
pub(crate) server_side_query_enabled: bool,
}
impl std::fmt::Debug for NativeTable {
@@ -1227,6 +1234,7 @@ impl std::fmt::Debug for NativeTable {
.field("uri", &self.uri)
.field("read_consistency_interval", &self.read_consistency_interval)
.field("namespace_client", &self.namespace_client)
.field("server_side_query_enabled", &self.server_side_query_enabled)
.finish()
}
}
@@ -1263,7 +1271,7 @@ impl NativeTable {
/// * A [NativeTable] object.
pub async fn open(uri: &str) -> Result<Self> {
let name = Self::get_table_name(uri)?;
Self::open_with_params(uri, &name, vec![], None, None, None, None).await
Self::open_with_params(uri, &name, vec![], None, None, None, None, false, None).await
}
/// Opens an existing Table
@@ -1273,7 +1281,10 @@ impl NativeTable {
/// * `base_path` - The base path where the table is located
/// * `name` The Table name
/// * `params` The [ReadParams] to use when opening the table
/// * `namespace_client` - Optional namespace client for server-side query execution
/// * `namespace_client` - Optional namespace client for namespace operations
/// * `server_side_query_enabled` - Whether to enable server-side query execution
/// * `managed_versioning` - Whether managed versioning is enabled. If None and namespace_client
/// is provided, the value will be fetched via describe_table.
///
/// # Returns
///
@@ -1287,6 +1298,8 @@ impl NativeTable {
params: Option<ReadParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
server_side_query_enabled: bool,
managed_versioning: Option<bool>,
) -> Result<Self> {
let params = params.unwrap_or_default();
// patch the params if we have a write store wrapper
@@ -1295,17 +1308,54 @@ impl NativeTable {
None => params,
};
let dataset = DatasetBuilder::from_uri(uri)
.with_read_params(params)
.load()
.await
.map_err(|e| match e {
lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
name: name.to_string(),
source: Box::new(e),
},
e => e.into(),
})?;
// Build table_id from namespace + name
let mut table_id = namespace.clone();
table_id.push(name.to_string());
// Determine if managed_versioning is enabled
// Use the provided value if available, otherwise query the namespace
let managed_versioning = match managed_versioning {
Some(value) => value,
None if namespace_client.is_some() => {
let ns_client = namespace_client.as_ref().unwrap();
let describe_request = DescribeTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
let response = ns_client
.describe_table(describe_request)
.await
.map_err(|e| Error::Runtime {
message: format!(
"Failed to describe table via namespace client: {}. \
If you don't need managed versioning, don't pass namespace_client.",
e
),
})?;
response.managed_versioning == Some(true)
}
None => false,
};
let mut builder = DatasetBuilder::from_uri(uri).with_read_params(params);
// Set up commit handler when managed_versioning is enabled
if managed_versioning && let Some(ref ns_client) = namespace_client {
let external_store =
LanceNamespaceExternalManifestStore::new(ns_client.clone(), table_id.clone());
let commit_handler: Arc<dyn CommitHandler> = Arc::new(ExternalManifestCommitHandler {
external_manifest_store: Arc::new(external_store),
});
builder = builder.with_commit_handler(commit_handler);
}
let dataset = builder.load().await.map_err(|e| match e {
lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
name: name.to_string(),
source: Box::new(e),
},
e => e.into(),
})?;
let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
let id = Self::build_id(&namespace, name);
@@ -1318,6 +1368,7 @@ impl NativeTable {
dataset,
read_consistency_interval,
namespace_client,
server_side_query_enabled,
})
}
@@ -1421,6 +1472,7 @@ impl NativeTable {
dataset,
read_consistency_interval,
namespace_client: stored_namespace_client,
server_side_query_enabled,
})
}
@@ -1460,7 +1512,8 @@ impl NativeTable {
/// * `namespace` - The namespace path. When non-empty, an explicit URI must be provided.
/// * `batches` RecordBatch to be saved in the database.
/// * `params` - Write parameters.
/// * `namespace_client` - Optional namespace client for server-side query execution
/// * `namespace_client` - Optional namespace client for namespace operations
/// * `server_side_query_enabled` - Whether to enable server-side query execution
///
/// # Returns
///
@@ -1475,6 +1528,7 @@ impl NativeTable {
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
server_side_query_enabled: bool,
) -> Result<Self> {
// Default params uses format v1.
let params = params.unwrap_or(WriteParams {
@@ -1507,6 +1561,7 @@ impl NativeTable {
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
read_consistency_interval,
namespace_client,
server_side_query_enabled,
})
}
@@ -1520,6 +1575,7 @@ impl NativeTable {
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
namespace_client: Option<Arc<dyn LanceNamespace>>,
server_side_query_enabled: bool,
) -> Result<Self> {
let data: Box<dyn Scannable> = Box::new(RecordBatch::new_empty(schema));
Self::create(
@@ -1531,6 +1587,7 @@ impl NativeTable {
params,
read_consistency_interval,
namespace_client,
server_side_query_enabled,
)
.await
}
@@ -1634,6 +1691,7 @@ impl NativeTable {
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
read_consistency_interval,
namespace_client: stored_namespace_client,
server_side_query_enabled,
})
}
@@ -2625,7 +2683,7 @@ mod tests {
vec![Ok(batch.clone())],
batch.schema(),
));
let table = NativeTable::create(uri, "test", vec![], reader, None, None, None, None)
let table = NativeTable::create(uri, "test", vec![], reader, None, None, None, None, false)
.await
.unwrap();

View File

@@ -3,12 +3,13 @@
use std::sync::Arc;
use arrow_cast::can_cast_types;
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema};
use datafusion::functions::core::{get_field, named_struct};
use datafusion_common::ScalarValue;
use datafusion_common::config::ConfigOptions;
use datafusion_physical_expr::ScalarFunctionExpr;
use datafusion_physical_expr::expressions::{Literal, cast};
use datafusion_physical_expr::expressions::{CastExpr, Literal};
use datafusion_physical_plan::expressions::Column;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};
@@ -25,12 +26,9 @@ pub fn cast_to_table_schema(
return Ok(input);
}
let exprs = build_field_exprs(
input_schema.fields(),
table_schema.fields(),
&|idx| Arc::new(Column::new(input_schema.field(idx).name(), idx)) as Arc<dyn PhysicalExpr>,
&input_schema,
)?;
let exprs = build_field_exprs(input_schema.fields(), table_schema.fields(), &|idx| {
Arc::new(Column::new(input_schema.field(idx).name(), idx)) as Arc<dyn PhysicalExpr>
})?;
let exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = exprs
.into_iter()
@@ -51,7 +49,6 @@ fn build_field_exprs(
input_fields: &Fields,
table_fields: &Fields,
get_input_expr: &dyn Fn(usize) -> Arc<dyn PhysicalExpr>,
input_schema: &Schema,
) -> Result<Vec<(Arc<dyn PhysicalExpr>, FieldRef)>> {
let config = Arc::new(ConfigOptions::default());
let mut result = Vec::new();
@@ -72,24 +69,19 @@ fn build_field_exprs(
(DataType::Struct(in_children), DataType::Struct(tbl_children))
if in_children != tbl_children =>
{
let sub_exprs = build_field_exprs(
in_children,
tbl_children,
&|child_idx| {
let child_name = in_children[child_idx].name();
Arc::new(ScalarFunctionExpr::new(
&format!("get_field({child_name})"),
get_field(),
vec![
input_expr.clone(),
Arc::new(Literal::new(ScalarValue::from(child_name.as_str()))),
],
Arc::new(in_children[child_idx].as_ref().clone()),
config.clone(),
)) as Arc<dyn PhysicalExpr>
},
input_schema,
)?;
let sub_exprs = build_field_exprs(in_children, tbl_children, &|child_idx| {
let child_name = in_children[child_idx].name();
Arc::new(ScalarFunctionExpr::new(
&format!("get_field({child_name})"),
get_field(),
vec![
input_expr.clone(),
Arc::new(Literal::new(ScalarValue::from(child_name.as_str()))),
],
Arc::new(in_children[child_idx].as_ref().clone()),
config.clone(),
)) as Arc<dyn PhysicalExpr>
})?;
let output_struct_fields: Fields = sub_exprs
.iter()
@@ -125,17 +117,21 @@ fn build_field_exprs(
// Types match: pass through.
(inp, tbl) if inp == tbl => input_expr,
// Types differ: cast.
_ => cast(input_expr, input_schema, table_field.data_type().clone()).map_err(|e| {
Error::InvalidInput {
// safe: false (the default) means overflow/truncation errors surface at execution time.
(_, _) if can_cast_types(input_field.data_type(), table_field.data_type()) => Arc::new(
CastExpr::new(input_expr, table_field.data_type().clone(), None),
)
as Arc<dyn PhysicalExpr>,
(inp, tbl) => {
return Err(Error::InvalidInput {
message: format!(
"cannot cast field '{}' from {} to {}: {}",
"cannot cast field '{}' from {} to {}",
table_field.name(),
input_field.data_type(),
table_field.data_type(),
e
inp,
tbl,
),
}
})?,
});
}
};
let output_field = Arc::new(Field::new(
@@ -153,10 +149,12 @@ fn build_field_exprs(
mod tests {
use std::sync::Arc;
use arrow::buffer::OffsetBuffer;
use arrow_array::{
Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, StructArray,
Array, Float32Array, Float64Array, Int32Array, Int64Array, ListArray, RecordBatch,
StringArray, StructArray, UInt32Array, UInt64Array,
};
use arrow_schema::{DataType, Field, Schema};
use arrow_schema::{DataType, Field, Fields, Schema};
use datafusion::prelude::SessionContext;
use datafusion_catalog::MemTable;
use futures::TryStreamExt;
@@ -495,4 +493,129 @@ mod tests {
assert_eq!(b.value(0), "hello");
assert_eq!(b.value(1), "world");
}
#[tokio::test]
async fn test_narrowing_numeric_cast_success() {
let input_batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::UInt64, false)])),
vec![Arc::new(UInt64Array::from(vec![1u64, 2, 3]))],
)
.unwrap();
let table_schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]);
let plan = plan_from_batch(input_batch).await;
let casted = cast_to_table_schema(plan, &table_schema).unwrap();
let result = collect(casted).await;
assert_eq!(result.schema().field(0).data_type(), &DataType::UInt32);
let a: &UInt32Array = result.column(0).as_any().downcast_ref().unwrap();
assert_eq!(a.values(), &[1u32, 2, 3]);
}
#[tokio::test]
async fn test_narrowing_numeric_cast_overflow_errors() {
let overflow_val = u32::MAX as u64 + 1;
let input_batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::UInt64, false)])),
vec![Arc::new(UInt64Array::from(vec![overflow_val]))],
)
.unwrap();
let table_schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]);
let plan = plan_from_batch(input_batch).await;
// Planning succeeds — the overflow is only detected at execution time.
let casted = cast_to_table_schema(plan, &table_schema).unwrap();
let ctx = SessionContext::new();
let stream = casted.execute(0, ctx.task_ctx()).unwrap();
let result: Result<Vec<RecordBatch>, _> = stream.try_collect().await;
assert!(result.is_err(), "expected overflow error at execution time");
}
#[tokio::test]
async fn test_list_struct_field_reorder() {
// list<struct<a: Int32, b: Int32>> → list<struct<b: Int64, a: Int64>>
// Tests both reordering (a,b → b,a) and element-type widening (Int32 → Int64).
let inner_fields: Fields = vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]
.into();
let struct_array = StructArray::from(vec![
(
Arc::new(inner_fields[0].as_ref().clone()),
Arc::new(Int32Array::from(vec![1, 3])) as _,
),
(
Arc::new(inner_fields[1].as_ref().clone()),
Arc::new(Int32Array::from(vec![2, 4])) as _,
),
]);
// Offsets: one list element containing two struct rows (0..2).
let offsets = OffsetBuffer::from_lengths(vec![2]);
let list_array = ListArray::try_new(
Arc::new(Field::new("item", DataType::Struct(inner_fields), true)),
offsets,
Arc::new(struct_array),
None,
)
.unwrap();
let input_batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"s_list",
list_array.data_type().clone(),
false,
)])),
vec![Arc::new(list_array)],
)
.unwrap();
let table_inner: Fields = vec![
Field::new("b", DataType::Int64, true),
Field::new("a", DataType::Int64, true),
]
.into();
let table_schema = Schema::new(vec![Field::new(
"s_list",
DataType::List(Arc::new(Field::new(
"item",
DataType::Struct(table_inner),
true,
))),
false,
)]);
let plan = plan_from_batch(input_batch).await;
let casted = cast_to_table_schema(plan, &table_schema).unwrap();
let result = collect(casted).await;
let list_col = result
.column(0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let struct_col = list_col
.values()
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
assert_eq!(struct_col.num_columns(), 2);
let b: &Int64Array = struct_col
.column_by_name("b")
.unwrap()
.as_any()
.downcast_ref()
.unwrap();
assert_eq!(b.values(), &[2, 4]);
let a: &Int64Array = struct_col
.column_by_name("a")
.unwrap()
.as_any()
.downcast_ref()
.unwrap();
assert_eq!(a.values(), &[1, 3]);
}
}

View File

@@ -1,3 +1,4 @@
use futures::FutureExt;
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use serde::{Deserialize, Serialize};
@@ -23,7 +24,7 @@ pub struct DeleteResult {
pub(crate) async fn execute_delete(table: &NativeTable, predicate: &str) -> Result<DeleteResult> {
table.dataset.ensure_mutable()?;
let mut dataset = (*table.dataset.get().await?).clone();
let delete_result = dataset.delete(predicate).await?;
let delete_result = dataset.delete(predicate).boxed().await?;
let num_deleted_rows = delete_result.num_deleted_rows;
let version = dataset.version().version;
table.dataset.update(dataset);

View File

@@ -40,8 +40,10 @@ pub async fn execute_query(
query: &AnyQuery,
options: QueryExecutionOptions,
) -> Result<DatasetRecordBatchStream> {
// If namespace client is configured, use server-side query execution
if let Some(ref namespace_client) = table.namespace_client {
// If server-side query is enabled and namespace client is configured, use server-side query execution
if table.server_side_query_enabled
&& let Some(ref namespace_client) = table.namespace_client
{
return execute_namespace_query(table, namespace_client.clone(), query, options).await;
}
execute_generic_query(table, query, options).await