diff --git a/Cargo.lock b/Cargo.lock index b6a394d09..68bf8953b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4855,16 +4855,20 @@ version = "0.30.0-beta.3" dependencies = [ "arrow", "async-trait", + "bytes", "env_logger", "futures", "lance-core", "lance-io", "lance-namespace", + "lance-namespace-impls", "lancedb", "pin-project", "pyo3", "pyo3-async-runtimes", "pyo3-build-config", + "serde", + "serde_json", "snafu", "tokio", ] diff --git a/python/Cargo.toml b/python/Cargo.toml index cddd4d1eb..29bfb76d8 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -16,9 +16,11 @@ crate-type = ["cdylib"] [dependencies] arrow = { version = "57.2", features = ["pyarrow"] } async-trait = "0.1" +bytes = "1" lancedb = { path = "../rust/lancedb", default-features = false } lance-core.workspace = true lance-namespace.workspace = true +lance-namespace-impls.workspace = true lance-io.workspace = true env_logger.workspace = true pyo3 = { version = "0.26", features = ["extension-module", "abi3-py39"] } @@ -28,6 +30,8 @@ pyo3-async-runtimes = { version = "0.26", features = [ ] } pin-project = "1.1.5" futures.workspace = true +serde = "1" +serde_json = "1" snafu.workspace = true tokio = { version = "1.40", features = ["sync"] } diff --git a/python/pyproject.toml b/python/pyproject.toml index 4a6b6142f..8bdcc9295 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -45,7 +45,7 @@ repository = "https://github.com/lancedb/lancedb" [project.optional-dependencies] pylance = [ - "pylance>=1.0.0b14", + "pylance>=4.0.0b7", ] tests = [ "aiohttp", @@ -59,9 +59,9 @@ tests = [ "polars>=0.19, <=1.3.0", "tantivy", "pyarrow-stubs", - "pylance>=1.0.0b14,<3.0.0", + "pylance>=4.0.0b7", "requests", - "datafusion<52", + "datafusion>=52,<53", ] dev = [ "ruff", diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index be2fbdfc2..f568bccc5 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -8,7 +8,7 @@ from abc import abstractmethod from datetime import timedelta from pathlib import Path import sys -from typing import TYPE_CHECKING, Dict, Iterable, List, Literal, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Literal, Optional, Union if sys.version_info >= (3, 12): from typing import override @@ -1541,6 +1541,8 @@ class AsyncConnection(object): storage_options_provider: Optional["StorageOptionsProvider"] = None, index_cache_size: Optional[int] = None, location: Optional[str] = None, + namespace_client: Optional[Any] = None, + managed_versioning: Optional[bool] = None, ) -> AsyncTable: """Open a Lance Table in the database. @@ -1573,6 +1575,9 @@ class AsyncConnection(object): The explicit location (URI) of the table. If provided, the table will be opened from this location instead of deriving it from the database URI and table name. + managed_versioning: bool, optional + Whether managed versioning is enabled for this table. If provided, + avoids a redundant describe_table call when namespace_client is set. Returns ------- @@ -1587,6 +1592,8 @@ class AsyncConnection(object): storage_options_provider=storage_options_provider, index_cache_size=index_cache_size, location=location, + namespace_client=namespace_client, + managed_versioning=managed_versioning, ) return AsyncTable(table) diff --git a/python/python/lancedb/namespace.py b/python/python/lancedb/namespace.py index eade54c5e..dfe532469 100644 --- a/python/python/lancedb/namespace.py +++ b/python/python/lancedb/namespace.py @@ -12,7 +12,7 @@ from __future__ import annotations import asyncio import sys -from typing import Dict, Iterable, List, Optional, Union +from typing import Any, Dict, Iterable, List, Optional, Union if sys.version_info >= (3, 12): from typing import override @@ -240,7 +240,7 @@ class LanceNamespaceDBConnection(DBConnection): session : Optional[Session] A session to use for this connection """ - self._ns = namespace + self._namespace_client = namespace self.read_consistency_interval = read_consistency_interval self.storage_options = storage_options or {} self.session = session @@ -269,7 +269,7 @@ class LanceNamespaceDBConnection(DBConnection): if namespace is None: namespace = [] request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit) - response = self._ns.list_tables(request) + response = self._namespace_client.list_tables(request) return response.tables if response.tables else [] @override @@ -309,7 +309,9 @@ class LanceNamespaceDBConnection(DBConnection): # Try to describe the table first to see if it exists try: describe_request = DescribeTableRequest(id=table_id) - describe_response = self._ns.describe_table(describe_request) + describe_response = self._namespace_client.describe_table( + describe_request + ) location = describe_response.location namespace_storage_options = describe_response.storage_options except Exception: @@ -323,7 +325,7 @@ class LanceNamespaceDBConnection(DBConnection): location=None, properties=self.storage_options if self.storage_options else None, ) - declare_response = self._ns.declare_table(declare_request) + declare_response = self._namespace_client.declare_table(declare_request) if not declare_response.location: raise ValueError( @@ -353,7 +355,7 @@ class LanceNamespaceDBConnection(DBConnection): # Only create if namespace returned storage_options (not None) if storage_options_provider is None and namespace_storage_options is not None: storage_options_provider = LanceNamespaceStorageOptionsProvider( - namespace=self._ns, + namespace=self._namespace_client, table_id=table_id, ) @@ -371,6 +373,7 @@ class LanceNamespaceDBConnection(DBConnection): storage_options=merged_storage_options, storage_options_provider=storage_options_provider, location=location, + namespace_client=self._namespace_client, ) return tbl @@ -389,7 +392,7 @@ class LanceNamespaceDBConnection(DBConnection): namespace = [] table_id = namespace + [name] request = DescribeTableRequest(id=table_id) - response = self._ns.describe_table(request) + response = self._namespace_client.describe_table(request) # Merge storage options: self.storage_options < user options < namespace options merged_storage_options = dict(self.storage_options) @@ -402,10 +405,14 @@ class LanceNamespaceDBConnection(DBConnection): # Only create if namespace returned storage_options (not None) if storage_options_provider is None and response.storage_options is not None: storage_options_provider = LanceNamespaceStorageOptionsProvider( - namespace=self._ns, + namespace=self._namespace_client, table_id=table_id, ) + # Pass managed_versioning to avoid redundant describe_table call in Rust. + # Convert None to False since we already have the answer from describe_table. + managed_versioning = response.managed_versioning is True + return self._lance_table_from_uri( name, response.location, @@ -413,6 +420,8 @@ class LanceNamespaceDBConnection(DBConnection): storage_options=merged_storage_options, storage_options_provider=storage_options_provider, index_cache_size=index_cache_size, + namespace_client=self._namespace_client, + managed_versioning=managed_versioning, ) @override @@ -422,7 +431,7 @@ class LanceNamespaceDBConnection(DBConnection): namespace = [] table_id = namespace + [name] request = DropTableRequest(id=table_id) - self._ns.drop_table(request) + self._namespace_client.drop_table(request) @override def rename_table( @@ -484,7 +493,7 @@ class LanceNamespaceDBConnection(DBConnection): request = ListNamespacesRequest( id=namespace, page_token=page_token, limit=limit ) - response = self._ns.list_namespaces(request) + response = self._namespace_client.list_namespaces(request) return ListNamespacesResponse( namespaces=response.namespaces if response.namespaces else [], page_token=response.page_token, @@ -520,7 +529,7 @@ class LanceNamespaceDBConnection(DBConnection): mode=_normalize_create_namespace_mode(mode), properties=properties, ) - response = self._ns.create_namespace(request) + response = self._namespace_client.create_namespace(request) return CreateNamespaceResponse( properties=response.properties if hasattr(response, "properties") else None ) @@ -555,7 +564,7 @@ class LanceNamespaceDBConnection(DBConnection): mode=_normalize_drop_namespace_mode(mode), behavior=_normalize_drop_namespace_behavior(behavior), ) - response = self._ns.drop_namespace(request) + response = self._namespace_client.drop_namespace(request) return DropNamespaceResponse( properties=( response.properties if hasattr(response, "properties") else None @@ -581,7 +590,7 @@ class LanceNamespaceDBConnection(DBConnection): Response containing the namespace properties. """ request = DescribeNamespaceRequest(id=namespace) - response = self._ns.describe_namespace(request) + response = self._namespace_client.describe_namespace(request) return DescribeNamespaceResponse( properties=response.properties if hasattr(response, "properties") else None ) @@ -615,7 +624,7 @@ class LanceNamespaceDBConnection(DBConnection): if namespace is None: namespace = [] request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit) - response = self._ns.list_tables(request) + response = self._namespace_client.list_tables(request) return ListTablesResponse( tables=response.tables if response.tables else [], page_token=response.page_token, @@ -630,6 +639,8 @@ class LanceNamespaceDBConnection(DBConnection): storage_options: Optional[Dict[str, str]] = None, storage_options_provider: Optional[StorageOptionsProvider] = None, index_cache_size: Optional[int] = None, + namespace_client: Optional[Any] = None, + managed_versioning: Optional[bool] = None, ) -> LanceTable: # Open a table directly from a URI using the location parameter # Note: storage_options should already be merged by the caller @@ -643,6 +654,8 @@ class LanceNamespaceDBConnection(DBConnection): ) # Open the table using the temporary connection with the location parameter + # Pass namespace_client to enable managed versioning support + # Pass managed_versioning to avoid redundant describe_table call return LanceTable.open( temp_conn, name, @@ -651,6 +664,8 @@ class LanceNamespaceDBConnection(DBConnection): storage_options_provider=storage_options_provider, index_cache_size=index_cache_size, location=table_uri, + namespace_client=namespace_client, + managed_versioning=managed_versioning, ) @@ -685,7 +700,7 @@ class AsyncLanceNamespaceDBConnection: session : Optional[Session] A session to use for this connection """ - self._ns = namespace + self._namespace_client = namespace self.read_consistency_interval = read_consistency_interval self.storage_options = storage_options or {} self.session = session @@ -713,7 +728,7 @@ class AsyncLanceNamespaceDBConnection: if namespace is None: namespace = [] request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit) - response = self._ns.list_tables(request) + response = self._namespace_client.list_tables(request) return response.tables if response.tables else [] async def create_table( @@ -750,7 +765,9 @@ class AsyncLanceNamespaceDBConnection: # Try to describe the table first to see if it exists try: describe_request = DescribeTableRequest(id=table_id) - describe_response = self._ns.describe_table(describe_request) + describe_response = self._namespace_client.describe_table( + describe_request + ) location = describe_response.location namespace_storage_options = describe_response.storage_options except Exception: @@ -764,7 +781,7 @@ class AsyncLanceNamespaceDBConnection: location=None, properties=self.storage_options if self.storage_options else None, ) - declare_response = self._ns.declare_table(declare_request) + declare_response = self._namespace_client.declare_table(declare_request) if not declare_response.location: raise ValueError( @@ -797,7 +814,7 @@ class AsyncLanceNamespaceDBConnection: and namespace_storage_options is not None ): provider = LanceNamespaceStorageOptionsProvider( - namespace=self._ns, + namespace=self._namespace_client, table_id=table_id, ) else: @@ -817,6 +834,7 @@ class AsyncLanceNamespaceDBConnection: storage_options=merged_storage_options, storage_options_provider=provider, location=location, + namespace_client=self._namespace_client, ) lance_table = await asyncio.to_thread(_create_table) @@ -837,7 +855,7 @@ class AsyncLanceNamespaceDBConnection: namespace = [] table_id = namespace + [name] request = DescribeTableRequest(id=table_id) - response = self._ns.describe_table(request) + response = self._namespace_client.describe_table(request) # Merge storage options: self.storage_options < user options < namespace options merged_storage_options = dict(self.storage_options) @@ -849,10 +867,14 @@ class AsyncLanceNamespaceDBConnection: # Create a storage options provider if not provided by user if storage_options_provider is None and response.storage_options is not None: storage_options_provider = LanceNamespaceStorageOptionsProvider( - namespace=self._ns, + namespace=self._namespace_client, table_id=table_id, ) + # Capture managed_versioning from describe response. + # Convert None to False since we already have the answer from describe_table. + managed_versioning = response.managed_versioning is True + # Open table in a thread def _open_table(): temp_conn = LanceDBConnection( @@ -870,6 +892,8 @@ class AsyncLanceNamespaceDBConnection: storage_options_provider=storage_options_provider, index_cache_size=index_cache_size, location=response.location, + namespace_client=self._namespace_client, + managed_versioning=managed_versioning, ) lance_table = await asyncio.to_thread(_open_table) @@ -881,7 +905,7 @@ class AsyncLanceNamespaceDBConnection: namespace = [] table_id = namespace + [name] request = DropTableRequest(id=table_id) - self._ns.drop_table(request) + self._namespace_client.drop_table(request) async def rename_table( self, @@ -943,7 +967,7 @@ class AsyncLanceNamespaceDBConnection: request = ListNamespacesRequest( id=namespace, page_token=page_token, limit=limit ) - response = self._ns.list_namespaces(request) + response = self._namespace_client.list_namespaces(request) return ListNamespacesResponse( namespaces=response.namespaces if response.namespaces else [], page_token=response.page_token, @@ -978,7 +1002,7 @@ class AsyncLanceNamespaceDBConnection: mode=_normalize_create_namespace_mode(mode), properties=properties, ) - response = self._ns.create_namespace(request) + response = self._namespace_client.create_namespace(request) return CreateNamespaceResponse( properties=response.properties if hasattr(response, "properties") else None ) @@ -1012,7 +1036,7 @@ class AsyncLanceNamespaceDBConnection: mode=_normalize_drop_namespace_mode(mode), behavior=_normalize_drop_namespace_behavior(behavior), ) - response = self._ns.drop_namespace(request) + response = self._namespace_client.drop_namespace(request) return DropNamespaceResponse( properties=( response.properties if hasattr(response, "properties") else None @@ -1039,7 +1063,7 @@ class AsyncLanceNamespaceDBConnection: Response containing the namespace properties. """ request = DescribeNamespaceRequest(id=namespace) - response = self._ns.describe_namespace(request) + response = self._namespace_client.describe_namespace(request) return DescribeNamespaceResponse( properties=response.properties if hasattr(response, "properties") else None ) @@ -1072,7 +1096,7 @@ class AsyncLanceNamespaceDBConnection: if namespace is None: namespace = [] request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit) - response = self._ns.list_tables(request) + response = self._namespace_client.list_tables(request) return ListTablesResponse( tables=response.tables if response.tables else [], page_token=response.page_token, diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index e19449cc8..4d3fd3a8a 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -1746,6 +1746,8 @@ class LanceTable(Table): storage_options_provider: Optional["StorageOptionsProvider"] = None, index_cache_size: Optional[int] = None, location: Optional[str] = None, + namespace_client: Optional[Any] = None, + managed_versioning: Optional[bool] = None, _async: AsyncTable = None, ): if namespace is None: @@ -1753,6 +1755,7 @@ class LanceTable(Table): self._conn = connection self._namespace = namespace self._location = location # Store location for use in _dataset_path + self._namespace_client = namespace_client if _async is not None: self._table = _async else: @@ -1764,6 +1767,8 @@ class LanceTable(Table): storage_options_provider=storage_options_provider, index_cache_size=index_cache_size, location=location, + namespace_client=namespace_client, + managed_versioning=managed_versioning, ) ) @@ -1806,6 +1811,8 @@ class LanceTable(Table): storage_options_provider: Optional["StorageOptionsProvider"] = None, index_cache_size: Optional[int] = None, location: Optional[str] = None, + namespace_client: Optional[Any] = None, + managed_versioning: Optional[bool] = None, ): if namespace is None: namespace = [] @@ -1817,6 +1824,8 @@ class LanceTable(Table): storage_options_provider=storage_options_provider, index_cache_size=index_cache_size, location=location, + namespace_client=namespace_client, + managed_versioning=managed_versioning, ) # check the dataset exists @@ -1848,6 +1857,16 @@ class LanceTable(Table): "Please install with `pip install pylance`." ) + if self._namespace_client is not None: + table_id = self._namespace + [self.name] + return lance.dataset( + version=self.version, + storage_options=self._conn.storage_options, + namespace=self._namespace_client, + table_id=table_id, + **kwargs, + ) + return lance.dataset( self._dataset_path, version=self.version, @@ -2713,6 +2732,7 @@ class LanceTable(Table): data_storage_version: Optional[str] = None, enable_v2_manifest_paths: Optional[bool] = None, location: Optional[str] = None, + namespace_client: Optional[Any] = None, ): """ Create a new table. @@ -2773,6 +2793,7 @@ class LanceTable(Table): self._conn = db self._namespace = namespace self._location = location + self._namespace_client = namespace_client if data_storage_version is not None: warnings.warn( diff --git a/python/src/connection.rs b/python/src/connection.rs index a8b218a8e..ecd2a5e2f 100644 --- a/python/src/connection.rs +++ b/python/src/connection.rs @@ -17,7 +17,8 @@ use pyo3::{ use pyo3_async_runtimes::tokio::future_into_py; use crate::{ - error::PythonErrorExt, storage_options::py_object_to_storage_options_provider, table::Table, + error::PythonErrorExt, namespace::extract_namespace_arc, + storage_options::py_object_to_storage_options_provider, table::Table, }; #[pyclass] @@ -182,7 +183,8 @@ impl Connection { }) } - #[pyo3(signature = (name, namespace=vec![], storage_options = None, storage_options_provider=None, index_cache_size = None, location=None))] + #[allow(clippy::too_many_arguments)] + #[pyo3(signature = (name, namespace=vec![], storage_options = None, storage_options_provider=None, index_cache_size = None, location=None, namespace_client=None, managed_versioning=None))] pub fn open_table( self_: PyRef<'_, Self>, name: String, @@ -191,11 +193,13 @@ impl Connection { storage_options_provider: Option>, index_cache_size: Option, location: Option, + namespace_client: Option>, + managed_versioning: Option, ) -> PyResult> { let inner = self_.get_inner()?.clone(); let mut builder = inner.open_table(name); - builder = builder.namespace(namespace); + builder = builder.namespace(namespace.clone()); if let Some(storage_options) = storage_options { builder = builder.storage_options(storage_options); } @@ -209,6 +213,20 @@ impl Connection { if let Some(location) = location { builder = builder.location(location); } + // Extract namespace client from Python object if provided + let ns_client = if let Some(ns_obj) = namespace_client { + let py = self_.py(); + Some(extract_namespace_arc(py, ns_obj)?) + } else { + None + }; + if let Some(ns_client) = ns_client { + builder = builder.namespace_client(ns_client); + } + // Pass managed_versioning if provided to avoid redundant describe_table call + if let Some(enabled) = managed_versioning { + builder = builder.managed_versioning(enabled); + } future_into_py(self_.py(), async move { let table = builder.execute().await.infer_error()?; diff --git a/python/src/lib.rs b/python/src/lib.rs index 4788b3e93..a6ef13f12 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -23,6 +23,7 @@ pub mod connection; pub mod error; pub mod header; pub mod index; +pub mod namespace; pub mod permutation; pub mod query; pub mod session; diff --git a/python/src/namespace.rs b/python/src/namespace.rs new file mode 100644 index 000000000..c94a19a40 --- /dev/null +++ b/python/src/namespace.rs @@ -0,0 +1,746 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The LanceDB Authors + +//! Namespace utilities for Python bindings + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use lance_namespace::LanceNamespace as LanceNamespaceTrait; +use lance_namespace::models::*; +use pyo3::prelude::*; +use pyo3::types::PyDict; + +/// Wrapper that allows any Python object implementing LanceNamespace protocol +/// to be used as a Rust LanceNamespace. +/// +/// This is similar to PyLanceNamespace in lance's Python bindings - it wraps a Python +/// object and calls back into Python when namespace methods are invoked. +pub struct PyLanceNamespace { + py_namespace: Arc>, + namespace_id: String, +} + +impl PyLanceNamespace { + /// Create a new PyLanceNamespace wrapper around a Python namespace object. + pub fn new(_py: Python<'_>, py_namespace: &Bound<'_, PyAny>) -> PyResult { + let namespace_id = py_namespace + .call_method0("namespace_id")? + .extract::()?; + + Ok(Self { + py_namespace: Arc::new(py_namespace.clone().unbind()), + namespace_id, + }) + } + + /// Create an Arc from a Python namespace object. + pub fn create_arc( + py: Python<'_>, + py_namespace: &Bound<'_, PyAny>, + ) -> PyResult> { + let wrapper = Self::new(py, py_namespace)?; + Ok(Arc::new(wrapper)) + } +} + +impl std::fmt::Debug for PyLanceNamespace { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "PyLanceNamespace {{ id: {} }}", self.namespace_id) + } +} + +/// Get or create the DictWithModelDump class in Python. +/// This class acts like a dict but also has model_dump() method. +/// This allows it to work with both: +/// - depythonize (which expects a dict/Mapping) +/// - Python code that calls .model_dump() (like DirectoryNamespace wrapper) +fn get_dict_with_model_dump_class(py: Python<'_>) -> PyResult> { + // Use a module-level cache via __builtins__ + let builtins = py.import("builtins")?; + if builtins.hasattr("_DictWithModelDump")? { + return builtins.getattr("_DictWithModelDump"); + } + + // Create the class using exec + let locals = PyDict::new(py); + py.run( + c"class DictWithModelDump(dict): + def model_dump(self): + return dict(self)", + None, + Some(&locals), + )?; + let class = locals.get_item("DictWithModelDump")?.ok_or_else(|| { + pyo3::exceptions::PyRuntimeError::new_err("Failed to create DictWithModelDump class") + })?; + + // Cache it + builtins.setattr("_DictWithModelDump", &class)?; + Ok(class) +} + +/// Helper to call a Python namespace method with JSON serialization. +/// For methods that take a request and return a response. +/// Uses DictWithModelDump to pass a dict that also has model_dump() method, +/// making it compatible with both depythonize and Python wrappers. +async fn call_py_method( + py_namespace: Arc>, + method_name: &'static str, + request: Req, +) -> lance_core::Result +where + Req: serde::Serialize + Send + 'static, + Resp: serde::de::DeserializeOwned + Send + 'static, +{ + let request_json = serde_json::to_string(&request).map_err(|e| { + lance_core::Error::io( + format!("Failed to serialize request for {}: {}", method_name, e), + Default::default(), + ) + })?; + + let response_json = tokio::task::spawn_blocking(move || { + Python::attach(|py| { + let json_module = py.import("json")?; + let request_dict = json_module.call_method1("loads", (&request_json,))?; + + // Wrap dict in DictWithModelDump so it works with both depythonize and .model_dump() + let dict_class = get_dict_with_model_dump_class(py)?; + let request_arg = dict_class.call1((request_dict,))?; + + // Call the Python method + let result = py_namespace.call_method1(py, method_name, (request_arg,))?; + + // Convert response to dict, then to JSON + // Pydantic models have model_dump() method + let result_dict = if result.bind(py).hasattr("model_dump")? { + result.call_method0(py, "model_dump")? + } else { + result + }; + let response_json: String = json_module + .call_method1("dumps", (result_dict,))? + .extract()?; + Ok::<_, PyErr>(response_json) + }) + }) + .await + .map_err(|e| { + lance_core::Error::io( + format!("Task join error for {}: {}", method_name, e), + Default::default(), + ) + })? + .map_err(|e: PyErr| { + lance_core::Error::io( + format!("Python error in {}: {}", method_name, e), + Default::default(), + ) + })?; + + serde_json::from_str(&response_json).map_err(|e| { + lance_core::Error::io( + format!("Failed to deserialize response from {}: {}", method_name, e), + Default::default(), + ) + }) +} + +/// Helper for methods that return () on success +async fn call_py_method_unit( + py_namespace: Arc>, + method_name: &'static str, + request: Req, +) -> lance_core::Result<()> +where + Req: serde::Serialize + Send + 'static, +{ + let request_json = serde_json::to_string(&request).map_err(|e| { + lance_core::Error::io( + format!("Failed to serialize request for {}: {}", method_name, e), + Default::default(), + ) + })?; + + tokio::task::spawn_blocking(move || { + Python::attach(|py| { + let json_module = py.import("json")?; + let request_dict = json_module.call_method1("loads", (&request_json,))?; + + // Wrap dict in DictWithModelDump + let dict_class = get_dict_with_model_dump_class(py)?; + let request_arg = dict_class.call1((request_dict,))?; + + // Call the Python method + py_namespace.call_method1(py, method_name, (request_arg,))?; + Ok::<_, PyErr>(()) + }) + }) + .await + .map_err(|e| { + lance_core::Error::io( + format!("Task join error for {}: {}", method_name, e), + Default::default(), + ) + })? + .map_err(|e: PyErr| { + lance_core::Error::io( + format!("Python error in {}: {}", method_name, e), + Default::default(), + ) + }) +} + +/// Helper for methods that return a primitive type +async fn call_py_method_primitive( + py_namespace: Arc>, + method_name: &'static str, + request: Req, +) -> lance_core::Result +where + Req: serde::Serialize + Send + 'static, + Resp: for<'py> pyo3::FromPyObject<'py> + Send + 'static, +{ + let request_json = serde_json::to_string(&request).map_err(|e| { + lance_core::Error::io( + format!("Failed to serialize request for {}: {}", method_name, e), + Default::default(), + ) + })?; + + tokio::task::spawn_blocking(move || { + Python::attach(|py| { + let json_module = py.import("json")?; + let request_dict = json_module.call_method1("loads", (&request_json,))?; + + // Wrap dict in DictWithModelDump + let dict_class = get_dict_with_model_dump_class(py)?; + let request_arg = dict_class.call1((request_dict,))?; + + // Call the Python method + let result = py_namespace.call_method1(py, method_name, (request_arg,))?; + let value: Resp = result.extract(py)?; + Ok::<_, PyErr>(value) + }) + }) + .await + .map_err(|e| { + lance_core::Error::io( + format!("Task join error for {}: {}", method_name, e), + Default::default(), + ) + })? + .map_err(|e: PyErr| { + lance_core::Error::io( + format!("Python error in {}: {}", method_name, e), + Default::default(), + ) + }) +} + +/// Helper for methods that return Bytes +async fn call_py_method_bytes( + py_namespace: Arc>, + method_name: &'static str, + request: Req, +) -> lance_core::Result +where + Req: serde::Serialize + Send + 'static, +{ + let request_json = serde_json::to_string(&request).map_err(|e| { + lance_core::Error::io( + format!("Failed to serialize request for {}: {}", method_name, e), + Default::default(), + ) + })?; + + tokio::task::spawn_blocking(move || { + Python::attach(|py| { + let json_module = py.import("json")?; + let request_dict = json_module.call_method1("loads", (&request_json,))?; + + // Wrap dict in DictWithModelDump + let dict_class = get_dict_with_model_dump_class(py)?; + let request_arg = dict_class.call1((request_dict,))?; + + // Call the Python method + let result = py_namespace.call_method1(py, method_name, (request_arg,))?; + let bytes_data: Vec = result.extract(py)?; + Ok::<_, PyErr>(Bytes::from(bytes_data)) + }) + }) + .await + .map_err(|e| { + lance_core::Error::io( + format!("Task join error for {}: {}", method_name, e), + Default::default(), + ) + })? + .map_err(|e: PyErr| { + lance_core::Error::io( + format!("Python error in {}: {}", method_name, e), + Default::default(), + ) + }) +} + +/// Helper for methods that take request + data and return a response +async fn call_py_method_with_data( + py_namespace: Arc>, + method_name: &'static str, + request: Req, + data: Bytes, +) -> lance_core::Result +where + Req: serde::Serialize + Send + 'static, + Resp: serde::de::DeserializeOwned + Send + 'static, +{ + let request_json = serde_json::to_string(&request).map_err(|e| { + lance_core::Error::io( + format!("Failed to serialize request for {}: {}", method_name, e), + Default::default(), + ) + })?; + + let response_json = tokio::task::spawn_blocking(move || { + Python::attach(|py| { + let json_module = py.import("json")?; + let request_dict = json_module.call_method1("loads", (&request_json,))?; + + // Wrap dict in DictWithModelDump + let dict_class = get_dict_with_model_dump_class(py)?; + let request_arg = dict_class.call1((request_dict,))?; + + // Pass request and bytes to Python method + let py_bytes = pyo3::types::PyBytes::new(py, &data); + let result = py_namespace.call_method1(py, method_name, (request_arg, py_bytes))?; + + // Convert response dict to JSON + let response_json: String = json_module.call_method1("dumps", (result,))?.extract()?; + Ok::<_, PyErr>(response_json) + }) + }) + .await + .map_err(|e| { + lance_core::Error::io( + format!("Task join error for {}: {}", method_name, e), + Default::default(), + ) + })? + .map_err(|e: PyErr| { + lance_core::Error::io( + format!("Python error in {}: {}", method_name, e), + Default::default(), + ) + })?; + + serde_json::from_str(&response_json).map_err(|e| { + lance_core::Error::io( + format!("Failed to deserialize response from {}: {}", method_name, e), + Default::default(), + ) + }) +} + +#[async_trait] +impl LanceNamespaceTrait for PyLanceNamespace { + fn namespace_id(&self) -> String { + self.namespace_id.clone() + } + + async fn list_namespaces( + &self, + request: ListNamespacesRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "list_namespaces", request).await + } + + async fn describe_namespace( + &self, + request: DescribeNamespaceRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "describe_namespace", request).await + } + + async fn create_namespace( + &self, + request: CreateNamespaceRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "create_namespace", request).await + } + + async fn drop_namespace( + &self, + request: DropNamespaceRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "drop_namespace", request).await + } + + async fn namespace_exists(&self, request: NamespaceExistsRequest) -> lance_core::Result<()> { + call_py_method_unit(self.py_namespace.clone(), "namespace_exists", request).await + } + + async fn list_tables( + &self, + request: ListTablesRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "list_tables", request).await + } + + async fn describe_table( + &self, + request: DescribeTableRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "describe_table", request).await + } + + async fn register_table( + &self, + request: RegisterTableRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "register_table", request).await + } + + async fn table_exists(&self, request: TableExistsRequest) -> lance_core::Result<()> { + call_py_method_unit(self.py_namespace.clone(), "table_exists", request).await + } + + async fn drop_table(&self, request: DropTableRequest) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "drop_table", request).await + } + + async fn deregister_table( + &self, + request: DeregisterTableRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "deregister_table", request).await + } + + async fn count_table_rows(&self, request: CountTableRowsRequest) -> lance_core::Result { + call_py_method_primitive(self.py_namespace.clone(), "count_table_rows", request).await + } + + async fn create_table( + &self, + request: CreateTableRequest, + request_data: Bytes, + ) -> lance_core::Result { + call_py_method_with_data( + self.py_namespace.clone(), + "create_table", + request, + request_data, + ) + .await + } + + async fn declare_table( + &self, + request: DeclareTableRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "declare_table", request).await + } + + async fn insert_into_table( + &self, + request: InsertIntoTableRequest, + request_data: Bytes, + ) -> lance_core::Result { + call_py_method_with_data( + self.py_namespace.clone(), + "insert_into_table", + request, + request_data, + ) + .await + } + + async fn merge_insert_into_table( + &self, + request: MergeInsertIntoTableRequest, + request_data: Bytes, + ) -> lance_core::Result { + call_py_method_with_data( + self.py_namespace.clone(), + "merge_insert_into_table", + request, + request_data, + ) + .await + } + + async fn update_table( + &self, + request: UpdateTableRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "update_table", request).await + } + + async fn delete_from_table( + &self, + request: DeleteFromTableRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "delete_from_table", request).await + } + + async fn query_table(&self, request: QueryTableRequest) -> lance_core::Result { + call_py_method_bytes(self.py_namespace.clone(), "query_table", request).await + } + + async fn create_table_index( + &self, + request: CreateTableIndexRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "create_table_index", request).await + } + + async fn list_table_indices( + &self, + request: ListTableIndicesRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "list_table_indices", request).await + } + + async fn describe_table_index_stats( + &self, + request: DescribeTableIndexStatsRequest, + ) -> lance_core::Result { + call_py_method( + self.py_namespace.clone(), + "describe_table_index_stats", + request, + ) + .await + } + + async fn describe_transaction( + &self, + request: DescribeTransactionRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "describe_transaction", request).await + } + + async fn alter_transaction( + &self, + request: AlterTransactionRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "alter_transaction", request).await + } + + async fn create_table_scalar_index( + &self, + request: CreateTableIndexRequest, + ) -> lance_core::Result { + call_py_method( + self.py_namespace.clone(), + "create_table_scalar_index", + request, + ) + .await + } + + async fn drop_table_index( + &self, + request: DropTableIndexRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "drop_table_index", request).await + } + + async fn list_all_tables( + &self, + request: ListTablesRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "list_all_tables", request).await + } + + async fn restore_table( + &self, + request: RestoreTableRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "restore_table", request).await + } + + async fn rename_table( + &self, + request: RenameTableRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "rename_table", request).await + } + + async fn list_table_versions( + &self, + request: ListTableVersionsRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "list_table_versions", request).await + } + + async fn create_table_version( + &self, + request: CreateTableVersionRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "create_table_version", request).await + } + + async fn describe_table_version( + &self, + request: DescribeTableVersionRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "describe_table_version", request).await + } + + async fn batch_delete_table_versions( + &self, + request: BatchDeleteTableVersionsRequest, + ) -> lance_core::Result { + call_py_method( + self.py_namespace.clone(), + "batch_delete_table_versions", + request, + ) + .await + } + + async fn update_table_schema_metadata( + &self, + request: UpdateTableSchemaMetadataRequest, + ) -> lance_core::Result { + call_py_method( + self.py_namespace.clone(), + "update_table_schema_metadata", + request, + ) + .await + } + + async fn get_table_stats( + &self, + request: GetTableStatsRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "get_table_stats", request).await + } + + async fn explain_table_query_plan( + &self, + request: ExplainTableQueryPlanRequest, + ) -> lance_core::Result { + call_py_method_primitive( + self.py_namespace.clone(), + "explain_table_query_plan", + request, + ) + .await + } + + async fn analyze_table_query_plan( + &self, + request: AnalyzeTableQueryPlanRequest, + ) -> lance_core::Result { + call_py_method_primitive( + self.py_namespace.clone(), + "analyze_table_query_plan", + request, + ) + .await + } + + async fn alter_table_add_columns( + &self, + request: AlterTableAddColumnsRequest, + ) -> lance_core::Result { + call_py_method( + self.py_namespace.clone(), + "alter_table_add_columns", + request, + ) + .await + } + + async fn alter_table_alter_columns( + &self, + request: AlterTableAlterColumnsRequest, + ) -> lance_core::Result { + call_py_method( + self.py_namespace.clone(), + "alter_table_alter_columns", + request, + ) + .await + } + + async fn alter_table_drop_columns( + &self, + request: AlterTableDropColumnsRequest, + ) -> lance_core::Result { + call_py_method( + self.py_namespace.clone(), + "alter_table_drop_columns", + request, + ) + .await + } + + async fn list_table_tags( + &self, + request: ListTableTagsRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "list_table_tags", request).await + } + + async fn create_table_tag( + &self, + request: CreateTableTagRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "create_table_tag", request).await + } + + async fn delete_table_tag( + &self, + request: DeleteTableTagRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "delete_table_tag", request).await + } + + async fn update_table_tag( + &self, + request: UpdateTableTagRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "update_table_tag", request).await + } + + async fn get_table_tag_version( + &self, + request: GetTableTagVersionRequest, + ) -> lance_core::Result { + call_py_method(self.py_namespace.clone(), "get_table_tag_version", request).await + } +} + +/// Convert Python dict to HashMap +#[allow(dead_code)] +fn dict_to_hashmap(dict: &Bound<'_, PyDict>) -> PyResult> { + let mut map = HashMap::new(); + for (key, value) in dict.iter() { + let key_str: String = key.extract()?; + let value_str: String = value.extract()?; + map.insert(key_str, value_str); + } + Ok(map) +} + +/// Extract an Arc from a Python namespace object. +/// +/// This function wraps any Python namespace object with PyLanceNamespace. +/// The PyLanceNamespace wrapper uses DictWithModelDump to pass requests, +/// which works with both: +/// - Native namespaces (DirectoryNamespace, RestNamespace) that use depythonize (expects dict) +/// - Custom Python implementations that call .model_dump() on the request +pub fn extract_namespace_arc( + py: Python<'_>, + ns: Py, +) -> PyResult> { + let ns_ref = ns.bind(py); + PyLanceNamespace::create_arc(py, ns_ref) +} diff --git a/python/uv.lock b/python/uv.lock index c4f2103ba..8f5534eed 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -2006,7 +2006,7 @@ requires-dist = [ { name = "botocore", marker = "extra == 'embeddings'", specifier = ">=1.31.57" }, { name = "cohere", marker = "extra == 'embeddings'" }, { name = "colpali-engine", marker = "extra == 'embeddings'", specifier = ">=0.3.10" }, - { name = "datafusion", marker = "extra == 'tests'" }, + { name = "datafusion", marker = "extra == 'tests'", specifier = "<52" }, { name = "deprecation" }, { name = "duckdb", marker = "extra == 'tests'" }, { name = "google-generativeai", marker = "extra == 'embeddings'" }, @@ -2035,7 +2035,7 @@ requires-dist = [ { name = "pyarrow-stubs", marker = "extra == 'tests'" }, { name = "pydantic", specifier = ">=1.10" }, { name = "pylance", marker = "extra == 'pylance'", specifier = ">=1.0.0b14" }, - { name = "pylance", marker = "extra == 'tests'", specifier = ">=1.0.0b14" }, + { name = "pylance", marker = "extra == 'tests'", specifier = ">=1.0.0b14,<3.0.0" }, { name = "pyright", marker = "extra == 'dev'" }, { name = "pytest", marker = "extra == 'tests'" }, { name = "pytest-asyncio", marker = "extra == 'tests'" }, diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index e745a921b..95d985aac 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -136,6 +136,7 @@ impl OpenTableBuilder { lance_read_params: None, location: None, namespace_client: None, + managed_versioning: None, }, embedding_registry, } @@ -235,6 +236,29 @@ impl OpenTableBuilder { self } + /// Set a namespace client for managed versioning support. + /// + /// When a namespace client is provided and the table has `managed_versioning` enabled, + /// the table will use the namespace's commit handler to notify the namespace of + /// version changes. This enables features like event emission for table modifications. + pub fn namespace_client(mut self, client: Arc) -> Self { + self.request.namespace_client = Some(client); + self + } + + /// Set whether managed versioning is enabled for this table. + /// + /// When set to `Some(true)`, the table will use namespace-managed commits. + /// When set to `Some(false)`, the table will use local commits even if namespace_client is set. + /// When set to `None` (default), the value will be fetched from the namespace if namespace_client is set. + /// + /// This is typically set when the caller has already queried the namespace and knows the + /// managed_versioning status, avoiding a redundant describe_table call. + pub fn managed_versioning(mut self, enabled: bool) -> Self { + self.request.managed_versioning = Some(enabled); + self + } + /// Open the table pub async fn execute(self) -> Result { let table = self.parent.open_table(self.request).await?; @@ -294,6 +318,12 @@ impl CloneTableBuilder { self } + /// Set a namespace client for managed versioning support. + pub fn namespace_client(mut self, client: Arc) -> Self { + self.request.namespace_client = Some(client); + self + } + /// Execute the clone operation pub async fn execute(self) -> Result
{ let parent = self.parent.clone(); diff --git a/rust/lancedb/src/database.rs b/rust/lancedb/src/database.rs index 95f477dc1..957a1c635 100644 --- a/rust/lancedb/src/database.rs +++ b/rust/lancedb/src/database.rs @@ -66,6 +66,10 @@ pub struct OpenTableRequest { /// Optional namespace client for server-side query execution. /// When set, queries will be executed on the namespace server instead of locally. pub namespace_client: Option>, + /// Whether managed versioning is enabled for this table. + /// When Some(true), the table will use namespace-managed commits instead of local commits. + /// When None and namespace_client is provided, the value will be fetched from the namespace. + pub managed_versioning: Option, } impl std::fmt::Debug for OpenTableRequest { @@ -77,6 +81,7 @@ impl std::fmt::Debug for OpenTableRequest { .field("lance_read_params", &self.lance_read_params) .field("location", &self.location) .field("namespace_client", &self.namespace_client) + .field("managed_versioning", &self.managed_versioning) .finish() } } @@ -161,6 +166,9 @@ pub struct CloneTableRequest { /// Whether to perform a shallow clone (true) or deep clone (false). Defaults to true. /// Currently only shallow clone is supported. pub is_shallow: bool, + /// Optional namespace client for managed versioning support. + /// When set, enables the commit handler to track table versions through the namespace. + pub namespace_client: Option>, } impl CloneTableRequest { @@ -172,6 +180,7 @@ impl CloneTableRequest { source_version: None, source_tag: None, is_shallow: true, + namespace_client: None, } } } diff --git a/rust/lancedb/src/database/listing.rs b/rust/lancedb/src/database/listing.rs index 3f6a3de55..5134e6f1c 100644 --- a/rust/lancedb/src/database/listing.rs +++ b/rust/lancedb/src/database/listing.rs @@ -669,6 +669,7 @@ impl ListingDatabase { lance_read_params: None, location: None, namespace_client: None, + managed_versioning: None, }; let req = (callback)(req); let table = self.open_table(req).await?; @@ -869,6 +870,7 @@ impl Database for ListingDatabase { Some(write_params), self.read_consistency_interval, request.namespace_client, + false, // server_side_query_enabled - listing database doesn't support server-side queries ) .await { @@ -946,7 +948,9 @@ impl Database for ListingDatabase { self.store_wrapper.clone(), None, self.read_consistency_interval, - None, + request.namespace_client, + false, // server_side_query_enabled - listing database doesn't support server-side queries + None, // managed_versioning - will be queried if namespace_client is provided ) .await?; @@ -1022,6 +1026,8 @@ impl Database for ListingDatabase { Some(read_params), self.read_consistency_interval, request.namespace_client, + false, // server_side_query_enabled - listing database doesn't support server-side queries + request.managed_versioning, // Pass through managed_versioning from request ) .await?, ); @@ -1162,6 +1168,7 @@ mod tests { source_version: None, source_tag: None, is_shallow: true, + namespace_client: None, }) .await .unwrap(); @@ -1222,6 +1229,7 @@ mod tests { source_version: None, source_tag: None, is_shallow: true, + namespace_client: None, }) .await .unwrap(); @@ -1281,6 +1289,7 @@ mod tests { source_version: None, source_tag: None, is_shallow: true, + namespace_client: None, }) .await; @@ -1317,6 +1326,7 @@ mod tests { source_version: None, source_tag: None, is_shallow: false, // Request deep clone + namespace_client: None, }) .await; @@ -1357,6 +1367,7 @@ mod tests { source_version: None, source_tag: None, is_shallow: true, + namespace_client: None, }) .await; @@ -1397,6 +1408,7 @@ mod tests { source_version: None, source_tag: None, is_shallow: true, + namespace_client: None, }) .await; @@ -1416,6 +1428,7 @@ mod tests { source_version: None, source_tag: None, is_shallow: true, + namespace_client: None, }) .await; @@ -1452,6 +1465,7 @@ mod tests { source_version: Some(1), source_tag: Some("v1.0".to_string()), is_shallow: true, + namespace_client: None, }) .await; @@ -1525,6 +1539,7 @@ mod tests { source_version: Some(initial_version), source_tag: None, is_shallow: true, + namespace_client: None, }) .await .unwrap(); @@ -1603,6 +1618,7 @@ mod tests { source_version: None, source_tag: Some("v1.0".to_string()), is_shallow: true, + namespace_client: None, }) .await .unwrap(); @@ -1654,6 +1670,7 @@ mod tests { source_version: None, source_tag: None, is_shallow: true, + namespace_client: None, }) .await .unwrap(); @@ -1746,6 +1763,7 @@ mod tests { source_version: None, source_tag: None, is_shallow: true, + namespace_client: None, }) .await .unwrap(); diff --git a/rust/lancedb/src/database/namespace.rs b/rust/lancedb/src/database/namespace.rs index 90b5c19bd..198b8c186 100644 --- a/rust/lancedb/src/database/namespace.rs +++ b/rust/lancedb/src/database/namespace.rs @@ -7,18 +7,20 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore; use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor}; use lance_namespace::{ LanceNamespace, models::{ - CreateEmptyTableRequest, CreateNamespaceRequest, CreateNamespaceResponse, - DeclareTableRequest, DescribeNamespaceRequest, DescribeNamespaceResponse, - DescribeTableRequest, DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, - ListNamespacesRequest, ListNamespacesResponse, ListTablesRequest, ListTablesResponse, + CreateNamespaceRequest, CreateNamespaceResponse, DeclareTableRequest, + DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest, + DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, ListNamespacesRequest, + ListNamespacesResponse, ListTablesRequest, ListTablesResponse, }, }; use lance_namespace_impls::ConnectBuilder; -use log::warn; +use lance_table::io::commit::CommitHandler; +use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler; use crate::database::ReadConsistency; use crate::error::{Error, Result}; @@ -206,83 +208,55 @@ impl Database for LanceNamespaceDatabase { let mut table_id = request.namespace.clone(); table_id.push(request.name.clone()); - // Try declare_table first, falling back to create_empty_table for backwards - // compatibility with older namespace clients that don't support declare_table let declare_request = DeclareTableRequest { id: Some(table_id.clone()), ..Default::default() }; - let (location, initial_storage_options) = - match self.namespace.declare_table(declare_request).await { - Ok(response) => { - let loc = response.location.ok_or_else(|| Error::Runtime { - message: "Table location is missing from declare_table response" - .to_string(), - })?; - // Use storage options from response, fall back to self.storage_options - let opts = response - .storage_options - .or_else(|| Some(self.storage_options.clone())) - .filter(|o| !o.is_empty()); - (loc, opts) - } - Err(e) => { - // Check if the error is "not supported" and try create_empty_table as fallback - let err_str = e.to_string().to_lowercase(); - if err_str.contains("not supported") || err_str.contains("not implemented") { - warn!( - "declare_table is not supported by the namespace client, \ - falling back to deprecated create_empty_table. \ - create_empty_table is deprecated and will be removed in Lance 3.0.0. \ - Please upgrade your namespace client to support declare_table." - ); - #[allow(deprecated)] - let create_empty_request = CreateEmptyTableRequest { - id: Some(table_id.clone()), - ..Default::default() - }; + let response = self + .namespace + .declare_table(declare_request) + .await + .map_err(|e| Error::Runtime { + message: format!("Failed to declare table: {}", e), + })?; - #[allow(deprecated)] - let create_response = self - .namespace - .create_empty_table(create_empty_request) - .await - .map_err(|e| Error::Runtime { - message: format!("Failed to create empty table: {}", e), - })?; + let location = response.location.ok_or_else(|| Error::Runtime { + message: "Table location is missing from declare_table response".to_string(), + })?; - let loc = create_response.location.ok_or_else(|| Error::Runtime { - message: "Table location is missing from create_empty_table response" - .to_string(), - })?; - // For deprecated path, use self.storage_options - let opts = if self.storage_options.is_empty() { - None - } else { - Some(self.storage_options.clone()) - }; - (loc, opts) - } else { - return Err(Error::Runtime { - message: format!("Failed to declare table: {}", e), - }); - } - } - }; + // Use storage options from response, fall back to self.storage_options + let initial_storage_options = response + .storage_options + .or_else(|| Some(self.storage_options.clone())) + .filter(|o| !o.is_empty()); - let write_params = if let Some(storage_opts) = initial_storage_options { - let mut params = request.write_options.lance_write_params.unwrap_or_default(); + let managed_versioning = response.managed_versioning; + + // Build write params with storage options and commit handler + let mut params = request.write_options.lance_write_params.unwrap_or_default(); + + // Set up storage options if provided + if let Some(storage_opts) = initial_storage_options { let store_params = params .store_params .get_or_insert_with(ObjectStoreParams::default); store_params.storage_options_accessor = Some(Arc::new( StorageOptionsAccessor::with_static_options(storage_opts), )); - Some(params) - } else { - request.write_options.lance_write_params - }; + } + + // Set up commit handler when managed_versioning is enabled + if managed_versioning == Some(true) { + let external_store = + LanceNamespaceExternalManifestStore::new(self.namespace.clone(), table_id.clone()); + let commit_handler: Arc = Arc::new(ExternalManifestCommitHandler { + external_manifest_store: Arc::new(external_store), + }); + params.commit_handler = Some(commit_handler); + } + + let write_params = Some(params); let native_table = NativeTable::create_from_namespace( self.namespace.clone(), diff --git a/rust/lancedb/src/remote/db.rs b/rust/lancedb/src/remote/db.rs index b80c1cea1..909de4312 100644 --- a/rust/lancedb/src/remote/db.rs +++ b/rust/lancedb/src/remote/db.rs @@ -464,6 +464,7 @@ impl Database for RemoteDatabase { lance_read_params: None, location: None, namespace_client: None, + managed_versioning: None, }; let req = (callback)(req); self.open_table(req).await diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 611ec1f26..d51a80369 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -34,9 +34,13 @@ use lance_index::vector::sq::builder::SQBuildParams; use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsAccessor}; pub use query::AnyQuery; +use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore; use lance_namespace::LanceNamespace; +use lance_namespace::models::DescribeTableRequest; use lance_table::format::Manifest; +use lance_table::io::commit::CommitHandler; use lance_table::io::commit::ManifestNamingScheme; +use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::format; @@ -1212,10 +1216,13 @@ pub struct NativeTable { // This comes from the connection options. We store here so we can pass down // to the dataset when we recreate it (for example, in checkout_latest). read_consistency_interval: Option, - // Optional namespace client for server-side query execution. - // When set, queries will be executed on the namespace server instead of locally. - // pub (crate) namespace_client so query.rs can access the fields + // Optional namespace client for namespace operations (e.g., managed versioning). + // pub(crate) so query.rs can access the field for server-side query execution. pub(crate) namespace_client: Option>, + // Whether to enable server-side query execution via the namespace client. + // When true and namespace_client is set, queries will be executed on the + // namespace server instead of locally. + pub(crate) server_side_query_enabled: bool, } impl std::fmt::Debug for NativeTable { @@ -1227,6 +1234,7 @@ impl std::fmt::Debug for NativeTable { .field("uri", &self.uri) .field("read_consistency_interval", &self.read_consistency_interval) .field("namespace_client", &self.namespace_client) + .field("server_side_query_enabled", &self.server_side_query_enabled) .finish() } } @@ -1263,7 +1271,7 @@ impl NativeTable { /// * A [NativeTable] object. pub async fn open(uri: &str) -> Result { let name = Self::get_table_name(uri)?; - Self::open_with_params(uri, &name, vec![], None, None, None, None).await + Self::open_with_params(uri, &name, vec![], None, None, None, None, false, None).await } /// Opens an existing Table @@ -1273,7 +1281,10 @@ impl NativeTable { /// * `base_path` - The base path where the table is located /// * `name` The Table name /// * `params` The [ReadParams] to use when opening the table - /// * `namespace_client` - Optional namespace client for server-side query execution + /// * `namespace_client` - Optional namespace client for namespace operations + /// * `server_side_query_enabled` - Whether to enable server-side query execution + /// * `managed_versioning` - Whether managed versioning is enabled. If None and namespace_client + /// is provided, the value will be fetched via describe_table. /// /// # Returns /// @@ -1287,6 +1298,8 @@ impl NativeTable { params: Option, read_consistency_interval: Option, namespace_client: Option>, + server_side_query_enabled: bool, + managed_versioning: Option, ) -> Result { let params = params.unwrap_or_default(); // patch the params if we have a write store wrapper @@ -1295,17 +1308,54 @@ impl NativeTable { None => params, }; - let dataset = DatasetBuilder::from_uri(uri) - .with_read_params(params) - .load() - .await - .map_err(|e| match e { - lance::Error::DatasetNotFound { .. } => Error::TableNotFound { - name: name.to_string(), - source: Box::new(e), - }, - e => e.into(), - })?; + // Build table_id from namespace + name + let mut table_id = namespace.clone(); + table_id.push(name.to_string()); + + // Determine if managed_versioning is enabled + // Use the provided value if available, otherwise query the namespace + let managed_versioning = match managed_versioning { + Some(value) => value, + None if namespace_client.is_some() => { + let ns_client = namespace_client.as_ref().unwrap(); + let describe_request = DescribeTableRequest { + id: Some(table_id.clone()), + ..Default::default() + }; + let response = ns_client + .describe_table(describe_request) + .await + .map_err(|e| Error::Runtime { + message: format!( + "Failed to describe table via namespace client: {}. \ + If you don't need managed versioning, don't pass namespace_client.", + e + ), + })?; + response.managed_versioning == Some(true) + } + None => false, + }; + + let mut builder = DatasetBuilder::from_uri(uri).with_read_params(params); + + // Set up commit handler when managed_versioning is enabled + if managed_versioning && let Some(ref ns_client) = namespace_client { + let external_store = + LanceNamespaceExternalManifestStore::new(ns_client.clone(), table_id.clone()); + let commit_handler: Arc = Arc::new(ExternalManifestCommitHandler { + external_manifest_store: Arc::new(external_store), + }); + builder = builder.with_commit_handler(commit_handler); + } + + let dataset = builder.load().await.map_err(|e| match e { + lance::Error::DatasetNotFound { .. } => Error::TableNotFound { + name: name.to_string(), + source: Box::new(e), + }, + e => e.into(), + })?; let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval); let id = Self::build_id(&namespace, name); @@ -1318,6 +1368,7 @@ impl NativeTable { dataset, read_consistency_interval, namespace_client, + server_side_query_enabled, }) } @@ -1421,6 +1472,7 @@ impl NativeTable { dataset, read_consistency_interval, namespace_client: stored_namespace_client, + server_side_query_enabled, }) } @@ -1460,7 +1512,8 @@ impl NativeTable { /// * `namespace` - The namespace path. When non-empty, an explicit URI must be provided. /// * `batches` RecordBatch to be saved in the database. /// * `params` - Write parameters. - /// * `namespace_client` - Optional namespace client for server-side query execution + /// * `namespace_client` - Optional namespace client for namespace operations + /// * `server_side_query_enabled` - Whether to enable server-side query execution /// /// # Returns /// @@ -1475,6 +1528,7 @@ impl NativeTable { params: Option, read_consistency_interval: Option, namespace_client: Option>, + server_side_query_enabled: bool, ) -> Result { // Default params uses format v1. let params = params.unwrap_or(WriteParams { @@ -1507,6 +1561,7 @@ impl NativeTable { dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval), read_consistency_interval, namespace_client, + server_side_query_enabled, }) } @@ -1520,6 +1575,7 @@ impl NativeTable { params: Option, read_consistency_interval: Option, namespace_client: Option>, + server_side_query_enabled: bool, ) -> Result { let data: Box = Box::new(RecordBatch::new_empty(schema)); Self::create( @@ -1531,6 +1587,7 @@ impl NativeTable { params, read_consistency_interval, namespace_client, + server_side_query_enabled, ) .await } @@ -1634,6 +1691,7 @@ impl NativeTable { dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval), read_consistency_interval, namespace_client: stored_namespace_client, + server_side_query_enabled, }) } @@ -2625,7 +2683,7 @@ mod tests { vec![Ok(batch.clone())], batch.schema(), )); - let table = NativeTable::create(uri, "test", vec![], reader, None, None, None, None) + let table = NativeTable::create(uri, "test", vec![], reader, None, None, None, None, false) .await .unwrap(); diff --git a/rust/lancedb/src/table/query.rs b/rust/lancedb/src/table/query.rs index e168f7a8f..529551269 100644 --- a/rust/lancedb/src/table/query.rs +++ b/rust/lancedb/src/table/query.rs @@ -40,8 +40,10 @@ pub async fn execute_query( query: &AnyQuery, options: QueryExecutionOptions, ) -> Result { - // If namespace client is configured, use server-side query execution - if let Some(ref namespace_client) = table.namespace_client { + // If server-side query is enabled and namespace client is configured, use server-side query execution + if table.server_side_query_enabled + && let Some(ref namespace_client) = table.namespace_client + { return execute_namespace_query(table, namespace_client.clone(), query, options).await; } execute_generic_query(table, query, options).await