From 97a4b38f198a5afc20d3a01967dc5e274179ab86 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Thu, 16 Apr 2026 10:12:28 -0700 Subject: [PATCH] feat(rust): support nested namespace ops in listing db (#3279) ## Summary - delegate child-namespace `ListingDatabase` operations through an eagerly initialized `LanceNamespaceDatabase` - support nested namespace create/open/list/drop flows without requiring callers to inject explicit locations - add `namespace_client_properties` plumbing for local and namespace connections so directory namespace settings like `table_version_tracking_enabled` can be configured - add regression tests for nested namespace ops and namespace client property propagation --- python/python/tests/test_table.py | 9 +- rust/lancedb/src/connection.rs | 89 +++++- rust/lancedb/src/database/listing.rs | 371 ++++++++++++++++++++----- rust/lancedb/src/database/namespace.rs | 41 +++ 4 files changed, 440 insertions(+), 70 deletions(-) diff --git a/python/python/tests/test_table.py b/python/python/tests/test_table.py index 639afe903..7337c7e9a 100644 --- a/python/python/tests/test_table.py +++ b/python/python/tests/test_table.py @@ -3,6 +3,7 @@ import os +import sys from datetime import date, datetime, timedelta from time import sleep from typing import List @@ -2040,6 +2041,13 @@ def test_hybrid_search_metric_type(tmp_db: DBConnection): @pytest.mark.parametrize( "consistency_interval", [None, timedelta(seconds=0), timedelta(seconds=0.1)] ) +@pytest.mark.skipif( + sys.platform == "win32", + reason=( + "TODO: directory namespace is not supported on Windows yet; " + "re-enable after that is fixed." + ), +) def test_consistency(tmp_path, consistency_interval): db = lancedb.connect(tmp_path) table = db.create_table("my_table", data=[{"id": 0}]) @@ -2060,7 +2068,6 @@ def test_consistency(tmp_path, consistency_interval): elif consistency_interval == timedelta(seconds=0): assert table2.version == table.version else: - # (consistency_interval == timedelta(seconds=0.1) assert table2.version == table.version - 1 sleep(0.1) assert table2.version == table.version diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index bcb0932f6..35863aa60 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -582,6 +582,14 @@ pub struct ConnectRequest { /// Database specific options pub options: HashMap, + /// Extra properties for the equivalent namespace client. + /// + /// For a local [`ListingDatabase`], these are merged into the backing + /// `DirectoryNamespace` properties. This is useful for namespace-specific + /// settings such as `table_version_tracking_enabled` that are distinct from + /// storage options. + pub namespace_client_properties: HashMap, + /// The interval at which to check for updates from other processes. /// /// If None, then consistency is not checked. For performance @@ -621,6 +629,7 @@ impl ConnectBuilder { client_config: Default::default(), read_consistency_interval: None, options: HashMap::new(), + namespace_client_properties: HashMap::new(), session: None, }, embedding_registry: None, @@ -757,6 +766,31 @@ impl ConnectBuilder { self } + /// Set an additional property for the equivalent namespace client. + pub fn namespace_client_property( + mut self, + key: impl Into, + value: impl Into, + ) -> Self { + self.request + .namespace_client_properties + .insert(key.into(), value.into()); + self + } + + /// Set multiple additional properties for the equivalent namespace client. + pub fn namespace_client_properties( + mut self, + pairs: impl IntoIterator, impl Into)>, + ) -> Self { + for (key, value) in pairs { + self.request + .namespace_client_properties + .insert(key.into(), value.into()); + } + self + } + /// The interval at which to check for updates from other processes. This /// only affects LanceDB OSS. /// @@ -893,6 +927,7 @@ pub struct ConnectNamespaceBuilder { ns_impl: String, properties: HashMap, storage_options: HashMap, + namespace_client_properties: HashMap, read_consistency_interval: Option, embedding_registry: Option>, session: Option>, @@ -905,6 +940,7 @@ impl ConnectNamespaceBuilder { ns_impl: ns_impl.to_string(), properties, storage_options: HashMap::new(), + namespace_client_properties: HashMap::new(), read_consistency_interval: None, embedding_registry: None, session: None, @@ -933,6 +969,29 @@ impl ConnectNamespaceBuilder { self } + /// Set an additional namespace client property. + pub fn namespace_client_property( + mut self, + key: impl Into, + value: impl Into, + ) -> Self { + self.namespace_client_properties + .insert(key.into(), value.into()); + self + } + + /// Set multiple additional namespace client properties. + pub fn namespace_client_properties( + mut self, + pairs: impl IntoIterator, impl Into)>, + ) -> Self { + for (key, value) in pairs { + self.namespace_client_properties + .insert(key.into(), value.into()); + } + self + } + /// The interval at which to check for updates from other processes. /// /// If left unset, consistency is not checked. For maximum read @@ -994,10 +1053,13 @@ impl ConnectNamespaceBuilder { pub async fn execute(self) -> Result { use crate::database::namespace::LanceNamespaceDatabase; + let mut properties = self.properties; + properties.extend(self.namespace_client_properties); + let internal = Arc::new( LanceNamespaceDatabase::connect( &self.ns_impl, - self.properties, + properties, self.storage_options, self.read_consistency_interval, self.session, @@ -1117,6 +1179,31 @@ mod tests { assert_eq!(db.uri(), relative_uri.to_str().unwrap().to_string()); } + #[tokio::test] + async fn test_connect_with_namespace_client_properties() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + + let db = connect(uri) + .namespace_client_property("table_version_tracking_enabled", "true") + .namespace_client_property("manifest_enabled", "true") + .execute() + .await + .unwrap(); + + let (ns_impl, properties) = db.namespace_client_config().await.unwrap(); + assert_eq!(ns_impl, "dir"); + assert_eq!(properties.get("root"), Some(&uri.to_string())); + assert_eq!( + properties.get("table_version_tracking_enabled"), + Some(&"true".to_string()) + ); + assert_eq!( + properties.get("manifest_enabled"), + Some(&"true".to_string()) + ); + } + #[tokio::test] async fn test_table_names() { let tc = new_test_connection().await.unwrap(); diff --git a/rust/lancedb/src/database/listing.rs b/rust/lancedb/src/database/listing.rs index a1f2dc58a..02884bb63 100644 --- a/rust/lancedb/src/database/listing.rs +++ b/rust/lancedb/src/database/listing.rs @@ -20,6 +20,7 @@ use snafu::ResultExt; use crate::connection::ConnectRequest; use crate::database::ReadConsistency; +use crate::database::namespace::LanceNamespaceDatabase; use crate::error::{CreateDirSnafu, Error, Result}; use crate::io::object_store::MirroringObjectStoreWrapper; use crate::table::NativeTable; @@ -255,6 +256,9 @@ pub struct ListingDatabase { // Session for object stores and caching session: Arc, + + // Namespace-backed database for child namespace operations + namespace_database: Arc, } impl std::fmt::Display for ListingDatabase { @@ -281,6 +285,44 @@ const MIRRORED_STORE: &str = "mirroredStore"; /// A connection to LanceDB impl ListingDatabase { + fn build_namespace_client_properties( + uri: &str, + storage_options: &HashMap, + namespace_client_properties: HashMap, + ) -> HashMap { + let mut properties = namespace_client_properties; + properties.insert("root".to_string(), uri.to_string()); + for (key, value) in storage_options { + properties.insert(format!("storage.{}", key), value.clone()); + } + properties + } + + async fn connect_namespace_database( + uri: &str, + storage_options: HashMap, + namespace_client_properties: HashMap, + read_consistency_interval: Option, + session: Arc, + ) -> Result> { + let ns_properties = Self::build_namespace_client_properties( + uri, + &storage_options, + namespace_client_properties, + ); + Ok(Arc::new( + LanceNamespaceDatabase::connect( + "dir", + ns_properties, + storage_options, + read_consistency_interval, + Some(session), + HashSet::new(), + ) + .await?, + )) + } + /// Connect to a listing database /// /// The URI should be a path to a directory where the tables are stored. @@ -300,6 +342,7 @@ impl ListingDatabase { uri, request.read_consistency_interval, options.new_table_config, + request.namespace_client_properties.clone(), request.session.clone(), ) .await @@ -387,6 +430,15 @@ impl ListingDatabase { None => None, }; + let namespace_database = Self::connect_namespace_database( + &table_base_uri, + options.storage_options.clone(), + request.namespace_client_properties.clone(), + request.read_consistency_interval, + session.clone(), + ) + .await?; + Ok(Self { uri: table_base_uri, query_string, @@ -398,6 +450,7 @@ impl ListingDatabase { storage_options_provider: None, new_table_config: options.new_table_config, session, + namespace_database, }) } Err(_) => { @@ -405,6 +458,7 @@ impl ListingDatabase { uri, request.read_consistency_interval, options.new_table_config, + request.namespace_client_properties.clone(), request.session.clone(), ) .await @@ -416,6 +470,7 @@ impl ListingDatabase { path: &str, read_consistency_interval: Option, new_table_config: NewTableConfig, + namespace_client_properties: HashMap, session: Option>, ) -> Result { let session = session.unwrap_or_else(|| Arc::new(lance::session::Session::default())); @@ -429,6 +484,15 @@ impl ListingDatabase { Self::try_create_dir(path).context(CreateDirSnafu { path })?; } + let namespace_database = Self::connect_namespace_database( + path, + HashMap::new(), + namespace_client_properties, + read_consistency_interval, + session.clone(), + ) + .await?; + Ok(Self { uri: path.to_string(), query_string: None, @@ -440,6 +504,7 @@ impl ListingDatabase { storage_options_provider: None, new_table_config, session, + namespace_database, }) } @@ -497,6 +562,10 @@ impl ListingDatabase { Ok(uri) } + fn namespace_database(&self) -> Arc { + self.namespace_database.clone() + } + async fn drop_tables(&self, names: Vec) -> Result<()> { let object_store_params = ObjectStoreParams { storage_options_accessor: if self.storage_options.is_empty() { @@ -696,16 +765,7 @@ impl Database for ListingDatabase { &self, request: ListNamespacesRequest, ) -> Result { - if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) { - return Err(Error::NotSupported { - message: "Namespace operations are not supported for listing database".into(), - }); - } - - Ok(ListNamespacesResponse { - namespaces: Vec::new(), - page_token: None, - }) + self.namespace_database().list_namespaces(request).await } fn uri(&self) -> &str { @@ -726,36 +786,26 @@ impl Database for ListingDatabase { async fn create_namespace( &self, - _request: CreateNamespaceRequest, + request: CreateNamespaceRequest, ) -> Result { - Err(Error::NotSupported { - message: "Namespace operations are not supported for listing database".into(), - }) + self.namespace_database().create_namespace(request).await } - async fn drop_namespace( - &self, - _request: DropNamespaceRequest, - ) -> Result { - Err(Error::NotSupported { - message: "Namespace operations are not supported for listing database".into(), - }) + async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result { + self.namespace_database().drop_namespace(request).await } async fn describe_namespace( &self, - _request: DescribeNamespaceRequest, + request: DescribeNamespaceRequest, ) -> Result { - Err(Error::NotSupported { - message: "Namespace operations are not supported for listing database".into(), - }) + self.namespace_database().describe_namespace(request).await } + #[allow(deprecated)] async fn table_names(&self, request: TableNamesRequest) -> Result> { if !request.namespace_path.is_empty() { - return Err(Error::NotSupported { - message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(), - }); + return self.namespace_database().table_names(request).await; } let mut f = self .object_store @@ -788,9 +838,7 @@ impl Database for ListingDatabase { async fn list_tables(&self, request: ListTablesRequest) -> Result { if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) { - return Err(Error::NotSupported { - message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(), - }); + return self.namespace_database().list_tables(request).await; } let mut f = self .object_store @@ -838,11 +886,8 @@ impl Database for ListingDatabase { } async fn create_table(&self, request: CreateTableRequest) -> Result> { - // When namespace is not empty, location must be provided - if !request.namespace_path.is_empty() && request.location.is_none() { - return Err(Error::InvalidInput { - message: "Location must be provided when namespace is not empty".into(), - }); + if !request.namespace_path.is_empty() { + return self.namespace_database().create_table(request).await; } // Use provided location if available, otherwise derive from table name let table_uri = request @@ -959,11 +1004,8 @@ impl Database for ListingDatabase { } async fn open_table(&self, mut request: OpenTableRequest) -> Result> { - // When namespace is not empty, location must be provided - if !request.namespace_path.is_empty() && request.location.is_none() { - return Err(Error::InvalidInput { - message: "Location must be provided when namespace is not empty".into(), - }); + if !request.namespace_path.is_empty() { + return self.namespace_database().open_table(request).await; } // Use provided location if available, otherwise derive from table name let table_uri = request @@ -1059,9 +1101,10 @@ impl Database for ListingDatabase { async fn drop_table(&self, name: &str, namespace_path: &[String]) -> Result<()> { if !namespace_path.is_empty() { - return Err(Error::NotSupported { - message: "Namespace parameter is not supported for listing database.".into(), - }); + return self + .namespace_database() + .drop_table(name, namespace_path) + .await; } self.drop_tables(vec![name.to_string()]).await } @@ -1070,9 +1113,10 @@ impl Database for ListingDatabase { async fn drop_all_tables(&self, namespace_path: &[String]) -> Result<()> { // Check if namespace parameter is provided if !namespace_path.is_empty() { - return Err(Error::NotSupported { - message: "Namespace parameter is not supported for listing database.".into(), - }); + return self + .namespace_database() + .drop_all_tables(namespace_path) + .await; } let tables = self.table_names(TableNamesRequest::default()).await?; self.drop_tables(tables).await @@ -1083,30 +1127,11 @@ impl Database for ListingDatabase { } async fn namespace_client(&self) -> Result> { - // Create a DirectoryNamespace pointing to the same root with the same storage options - let mut builder = lance_namespace_impls::DirectoryNamespaceBuilder::new(&self.uri); - - // Add storage options - if !self.storage_options.is_empty() { - builder = builder.storage_options(self.storage_options.clone()); - } - - // Use the same session - builder = builder.session(self.session.clone()); - - let namespace = builder.build().await.map_err(|e| Error::Runtime { - message: format!("Failed to create namespace client: {}", e), - })?; - Ok(Arc::new(namespace) as Arc) + self.namespace_database.namespace_client().await } async fn namespace_client_config(&self) -> Result<(String, HashMap)> { - let mut properties = HashMap::new(); - properties.insert("root".to_string(), self.uri.clone()); - for (key, value) in &self.storage_options { - properties.insert(format!("storage.{}", key), value.clone()); - } - Ok(("dir".to_string(), properties)) + self.namespace_database.namespace_client_config().await } } @@ -1132,6 +1157,7 @@ mod tests { #[cfg(feature = "remote")] client_config: Default::default(), options: Default::default(), + namespace_client_properties: Default::default(), read_consistency_interval: None, session: None, }; @@ -1265,6 +1291,7 @@ mod tests { #[cfg(feature = "remote")] client_config: Default::default(), options: options.clone(), + namespace_client_properties: Default::default(), read_consistency_interval: None, session: None, }; @@ -1799,6 +1826,7 @@ mod tests { #[cfg(feature = "remote")] client_config: Default::default(), options, + namespace_client_properties: Default::default(), read_consistency_interval: None, session: None, }; @@ -1904,6 +1932,7 @@ mod tests { #[cfg(feature = "remote")] client_config: Default::default(), options, + namespace_client_properties: Default::default(), read_consistency_interval: None, session: None, }; @@ -1975,6 +2004,7 @@ mod tests { #[cfg(feature = "remote")] client_config: Default::default(), options, + namespace_client_properties: Default::default(), read_consistency_interval: None, session: None, }; @@ -2108,4 +2138,209 @@ mod tests { assert!(tables.contains(&"table1".to_string())); assert!(tables.contains(&"table2".to_string())); } + + #[tokio::test] + async fn test_listing_database_namespace_operations() { + let (_tempdir, db) = setup_database().await; + + db.create_namespace(CreateNamespaceRequest { + id: Some(vec!["parent".to_string()]), + ..Default::default() + }) + .await + .unwrap(); + + db.create_namespace(CreateNamespaceRequest { + id: Some(vec!["parent".to_string(), "child".to_string()]), + ..Default::default() + }) + .await + .unwrap(); + + let root_namespaces = db + .list_namespaces(ListNamespacesRequest { + id: Some(vec![]), + ..Default::default() + }) + .await + .unwrap(); + assert!(root_namespaces.namespaces.contains(&"parent".to_string())); + + let child_namespaces = db + .list_namespaces(ListNamespacesRequest { + id: Some(vec!["parent".to_string()]), + ..Default::default() + }) + .await + .unwrap(); + assert!(child_namespaces.namespaces.contains(&"child".to_string())); + + db.describe_namespace(DescribeNamespaceRequest { + id: Some(vec!["parent".to_string(), "child".to_string()]), + ..Default::default() + }) + .await + .unwrap(); + } + + #[tokio::test] + #[cfg(not(windows))] // TODO: support Windows once directory namespace-backed listing DB tests are supported. + async fn test_listing_database_with_namespace_client_properties() { + let tempdir = tempdir().unwrap(); + let uri = tempdir.path().to_str().unwrap(); + + let mut namespace_client_properties = HashMap::new(); + namespace_client_properties.insert( + "table_version_tracking_enabled".to_string(), + "true".to_string(), + ); + namespace_client_properties.insert("manifest_enabled".to_string(), "true".to_string()); + + let request = ConnectRequest { + uri: uri.to_string(), + #[cfg(feature = "remote")] + client_config: Default::default(), + options: Default::default(), + namespace_client_properties, + read_consistency_interval: None, + session: None, + }; + + let db = ListingDatabase::connect_with_options(&request) + .await + .unwrap(); + let namespace_path = vec!["test_ns".to_string()]; + + db.create_namespace(CreateNamespaceRequest { + id: Some(namespace_path.clone()), + ..Default::default() + }) + .await + .unwrap(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + db.create_table(CreateTableRequest { + name: "managed_table".to_string(), + namespace_path: namespace_path.clone(), + data: Box::new(RecordBatch::new_empty(schema)) as Box, + mode: CreateTableMode::Create, + write_options: Default::default(), + location: None, + namespace_client: None, + }) + .await + .unwrap(); + + let namespace_client = db.namespace_client().await.unwrap(); + let describe = namespace_client + .describe_table(lance_namespace::models::DescribeTableRequest { + id: Some(vec!["test_ns".to_string(), "managed_table".to_string()]), + ..Default::default() + }) + .await + .unwrap(); + + assert_eq!(describe.managed_versioning, Some(true)); + } + + #[tokio::test] + async fn test_listing_database_nested_namespace_table_ops() { + let (_tempdir, db) = setup_database().await; + let namespace_path = vec!["parent".to_string(), "child".to_string()]; + + db.create_namespace(CreateNamespaceRequest { + id: Some(vec!["parent".to_string()]), + ..Default::default() + }) + .await + .unwrap(); + db.create_namespace(CreateNamespaceRequest { + id: Some(namespace_path.clone()), + ..Default::default() + }) + .await + .unwrap(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + db.create_table(CreateTableRequest { + name: "nested_table".to_string(), + namespace_path: namespace_path.clone(), + data: Box::new(RecordBatch::new_empty(schema)) as Box, + mode: CreateTableMode::Create, + write_options: Default::default(), + location: None, + namespace_client: None, + }) + .await + .unwrap(); + + let namespace_client = db.namespace_client().await.unwrap(); + let describe = namespace_client + .describe_table(lance_namespace::models::DescribeTableRequest { + id: Some(vec![ + "parent".to_string(), + "child".to_string(), + "nested_table".to_string(), + ]), + ..Default::default() + }) + .await + .unwrap(); + assert!(describe.location.is_some()); + + let table = db + .open_table(OpenTableRequest { + name: "nested_table".to_string(), + namespace_path: namespace_path.clone(), + index_cache_size: None, + lance_read_params: None, + location: None, + namespace_client: None, + managed_versioning: None, + }) + .await + .unwrap(); + assert_eq!(table.name(), "nested_table"); + + #[allow(deprecated)] + let table_names = db + .table_names(TableNamesRequest { + namespace_path: namespace_path.clone(), + start_after: None, + limit: None, + }) + .await + .unwrap(); + assert_eq!(table_names, vec!["nested_table".to_string()]); + + let list_tables = db + .list_tables(ListTablesRequest { + id: Some(namespace_path.clone()), + ..Default::default() + }) + .await + .unwrap(); + assert_eq!(list_tables.tables, vec!["nested_table".to_string()]); + + db.drop_table("nested_table", &namespace_path) + .await + .unwrap(); + + let post_drop = db + .list_tables(ListTablesRequest { + id: Some(namespace_path), + ..Default::default() + }) + .await + .unwrap(); + assert!(post_drop.tables.is_empty()); + } } diff --git a/rust/lancedb/src/database/namespace.rs b/rust/lancedb/src/database/namespace.rs index 03bc434e6..6b0d19054 100644 --- a/rust/lancedb/src/database/namespace.rs +++ b/rust/lancedb/src/database/namespace.rs @@ -450,6 +450,47 @@ mod tests { )); } + #[tokio::test] + async fn test_namespace_connection_with_namespace_client_properties() { + let tmp_dir = tempdir().unwrap(); + let root_path = tmp_dir.path().to_str().unwrap().to_string(); + + let mut properties = HashMap::new(); + properties.insert("root".to_string(), root_path); + + let conn = connect_namespace("dir", properties) + .namespace_client_property("table_version_tracking_enabled", "true") + .namespace_client_property("manifest_enabled", "true") + .execute() + .await + .expect("Failed to connect to namespace"); + + conn.create_namespace(CreateNamespaceRequest { + id: Some(vec!["test_ns".into()]), + ..Default::default() + }) + .await + .expect("Failed to create namespace"); + + let test_data = create_test_data(); + conn.create_table("test_table", test_data) + .namespace(vec!["test_ns".into()]) + .execute() + .await + .expect("Failed to create table"); + + let namespace_client = conn.namespace_client().await.unwrap(); + let describe = namespace_client + .describe_table(DescribeTableRequest { + id: Some(vec!["test_ns".into(), "test_table".into()]), + ..Default::default() + }) + .await + .expect("Failed to describe table"); + + assert_eq!(describe.managed_versioning, Some(true)); + } + #[tokio::test] async fn test_namespace_create_table_basic() { // Setup: Create a temporary directory for the namespace