From a3e3c1ad5c9ba84c443a1dd55764988e2d54b1bd Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 15 Apr 2026 22:58:05 -0700 Subject: [PATCH] feat(rust): add namespace client properties --- rust/lancedb/src/connection.rs | 89 ++++++++++++++++++++++- rust/lancedb/src/database/listing.rs | 97 ++++++++++++++++++++++---- rust/lancedb/src/database/namespace.rs | 41 +++++++++++ 3 files changed, 213 insertions(+), 14 deletions(-) diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index bcb0932f6..35863aa60 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -582,6 +582,14 @@ pub struct ConnectRequest { /// Database specific options pub options: HashMap, + /// Extra properties for the equivalent namespace client. + /// + /// For a local [`ListingDatabase`], these are merged into the backing + /// `DirectoryNamespace` properties. This is useful for namespace-specific + /// settings such as `table_version_tracking_enabled` that are distinct from + /// storage options. + pub namespace_client_properties: HashMap, + /// The interval at which to check for updates from other processes. /// /// If None, then consistency is not checked. For performance @@ -621,6 +629,7 @@ impl ConnectBuilder { client_config: Default::default(), read_consistency_interval: None, options: HashMap::new(), + namespace_client_properties: HashMap::new(), session: None, }, embedding_registry: None, @@ -757,6 +766,31 @@ impl ConnectBuilder { self } + /// Set an additional property for the equivalent namespace client. + pub fn namespace_client_property( + mut self, + key: impl Into, + value: impl Into, + ) -> Self { + self.request + .namespace_client_properties + .insert(key.into(), value.into()); + self + } + + /// Set multiple additional properties for the equivalent namespace client. + pub fn namespace_client_properties( + mut self, + pairs: impl IntoIterator, impl Into)>, + ) -> Self { + for (key, value) in pairs { + self.request + .namespace_client_properties + .insert(key.into(), value.into()); + } + self + } + /// The interval at which to check for updates from other processes. This /// only affects LanceDB OSS. /// @@ -893,6 +927,7 @@ pub struct ConnectNamespaceBuilder { ns_impl: String, properties: HashMap, storage_options: HashMap, + namespace_client_properties: HashMap, read_consistency_interval: Option, embedding_registry: Option>, session: Option>, @@ -905,6 +940,7 @@ impl ConnectNamespaceBuilder { ns_impl: ns_impl.to_string(), properties, storage_options: HashMap::new(), + namespace_client_properties: HashMap::new(), read_consistency_interval: None, embedding_registry: None, session: None, @@ -933,6 +969,29 @@ impl ConnectNamespaceBuilder { self } + /// Set an additional namespace client property. + pub fn namespace_client_property( + mut self, + key: impl Into, + value: impl Into, + ) -> Self { + self.namespace_client_properties + .insert(key.into(), value.into()); + self + } + + /// Set multiple additional namespace client properties. + pub fn namespace_client_properties( + mut self, + pairs: impl IntoIterator, impl Into)>, + ) -> Self { + for (key, value) in pairs { + self.namespace_client_properties + .insert(key.into(), value.into()); + } + self + } + /// The interval at which to check for updates from other processes. /// /// If left unset, consistency is not checked. For maximum read @@ -994,10 +1053,13 @@ impl ConnectNamespaceBuilder { pub async fn execute(self) -> Result { use crate::database::namespace::LanceNamespaceDatabase; + let mut properties = self.properties; + properties.extend(self.namespace_client_properties); + let internal = Arc::new( LanceNamespaceDatabase::connect( &self.ns_impl, - self.properties, + properties, self.storage_options, self.read_consistency_interval, self.session, @@ -1117,6 +1179,31 @@ mod tests { assert_eq!(db.uri(), relative_uri.to_str().unwrap().to_string()); } + #[tokio::test] + async fn test_connect_with_namespace_client_properties() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + + let db = connect(uri) + .namespace_client_property("table_version_tracking_enabled", "true") + .namespace_client_property("manifest_enabled", "true") + .execute() + .await + .unwrap(); + + let (ns_impl, properties) = db.namespace_client_config().await.unwrap(); + assert_eq!(ns_impl, "dir"); + assert_eq!(properties.get("root"), Some(&uri.to_string())); + assert_eq!( + properties.get("table_version_tracking_enabled"), + Some(&"true".to_string()) + ); + assert_eq!( + properties.get("manifest_enabled"), + Some(&"true".to_string()) + ); + } + #[tokio::test] async fn test_table_names() { let tc = new_test_connection().await.unwrap(); diff --git a/rust/lancedb/src/database/listing.rs b/rust/lancedb/src/database/listing.rs index 29ee59889..53cfce9dd 100644 --- a/rust/lancedb/src/database/listing.rs +++ b/rust/lancedb/src/database/listing.rs @@ -285,11 +285,12 @@ const MIRRORED_STORE: &str = "mirroredStore"; /// A connection to LanceDB impl ListingDatabase { - fn namespace_client_properties( + fn build_namespace_client_properties( uri: &str, storage_options: &HashMap, + namespace_client_properties: HashMap, ) -> HashMap { - let mut properties = HashMap::new(); + let mut properties = namespace_client_properties; properties.insert("root".to_string(), uri.to_string()); for (key, value) in storage_options { properties.insert(format!("storage.{}", key), value.clone()); @@ -300,10 +301,15 @@ impl ListingDatabase { async fn connect_namespace_database( uri: &str, storage_options: HashMap, + namespace_client_properties: HashMap, read_consistency_interval: Option, session: Arc, ) -> Result> { - let ns_properties = Self::namespace_client_properties(uri, &storage_options); + let ns_properties = Self::build_namespace_client_properties( + uri, + &storage_options, + namespace_client_properties, + ); Ok(Arc::new( LanceNamespaceDatabase::connect( "dir", @@ -336,6 +342,7 @@ impl ListingDatabase { uri, request.read_consistency_interval, options.new_table_config, + request.namespace_client_properties.clone(), request.session.clone(), ) .await @@ -426,6 +433,7 @@ impl ListingDatabase { let namespace_database = Self::connect_namespace_database( &table_base_uri, options.storage_options.clone(), + request.namespace_client_properties.clone(), request.read_consistency_interval, session.clone(), ) @@ -450,6 +458,7 @@ impl ListingDatabase { uri, request.read_consistency_interval, options.new_table_config, + request.namespace_client_properties.clone(), request.session.clone(), ) .await @@ -461,6 +470,7 @@ impl ListingDatabase { path: &str, read_consistency_interval: Option, new_table_config: NewTableConfig, + namespace_client_properties: HashMap, session: Option>, ) -> Result { let session = session.unwrap_or_else(|| Arc::new(lance::session::Session::default())); @@ -477,6 +487,7 @@ impl ListingDatabase { let namespace_database = Self::connect_namespace_database( path, HashMap::new(), + namespace_client_properties, read_consistency_interval, session.clone(), ) @@ -791,18 +802,10 @@ impl Database for ListingDatabase { self.namespace_database().describe_namespace(request).await } + #[allow(deprecated)] async fn table_names(&self, request: TableNamesRequest) -> Result> { if !request.namespace_path.is_empty() { - let response = self - .namespace_database() - .list_tables(ListTablesRequest { - id: Some(request.namespace_path), - page_token: request.start_after, - limit: request.limit.map(|l| l as i32), - ..Default::default() - }) - .await?; - return Ok(response.tables); + return self.namespace_database().table_names(request).await; } let mut f = self .object_store @@ -1154,6 +1157,7 @@ mod tests { #[cfg(feature = "remote")] client_config: Default::default(), options: Default::default(), + namespace_client_properties: Default::default(), read_consistency_interval: None, session: None, }; @@ -1287,6 +1291,7 @@ mod tests { #[cfg(feature = "remote")] client_config: Default::default(), options: options.clone(), + namespace_client_properties: Default::default(), read_consistency_interval: None, session: None, }; @@ -1821,6 +1826,7 @@ mod tests { #[cfg(feature = "remote")] client_config: Default::default(), options, + namespace_client_properties: Default::default(), read_consistency_interval: None, session: None, }; @@ -1926,6 +1932,7 @@ mod tests { #[cfg(feature = "remote")] client_config: Default::default(), options, + namespace_client_properties: Default::default(), read_consistency_interval: None, session: None, }; @@ -1997,6 +2004,7 @@ mod tests { #[cfg(feature = "remote")] client_config: Default::default(), options, + namespace_client_properties: Default::default(), read_consistency_interval: None, session: None, }; @@ -2175,6 +2183,69 @@ mod tests { .unwrap(); } + #[tokio::test] + async fn test_listing_database_with_namespace_client_properties() { + let tempdir = tempdir().unwrap(); + let uri = tempdir.path().to_str().unwrap(); + + let mut namespace_client_properties = HashMap::new(); + namespace_client_properties.insert( + "table_version_tracking_enabled".to_string(), + "true".to_string(), + ); + namespace_client_properties.insert("manifest_enabled".to_string(), "true".to_string()); + + let request = ConnectRequest { + uri: uri.to_string(), + #[cfg(feature = "remote")] + client_config: Default::default(), + options: Default::default(), + namespace_client_properties, + read_consistency_interval: None, + session: None, + }; + + let db = ListingDatabase::connect_with_options(&request) + .await + .unwrap(); + let namespace_path = vec!["test_ns".to_string()]; + + db.create_namespace(CreateNamespaceRequest { + id: Some(namespace_path.clone()), + ..Default::default() + }) + .await + .unwrap(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + db.create_table(CreateTableRequest { + name: "managed_table".to_string(), + namespace_path: namespace_path.clone(), + data: Box::new(RecordBatch::new_empty(schema)) as Box, + mode: CreateTableMode::Create, + write_options: Default::default(), + location: None, + namespace_client: None, + }) + .await + .unwrap(); + + let namespace_client = db.namespace_client().await.unwrap(); + let describe = namespace_client + .describe_table(lance_namespace::models::DescribeTableRequest { + id: Some(vec!["test_ns".to_string(), "managed_table".to_string()]), + ..Default::default() + }) + .await + .unwrap(); + + assert_eq!(describe.managed_versioning, Some(true)); + } + #[tokio::test] async fn test_listing_database_nested_namespace_table_ops() { let (_tempdir, db) = setup_database().await; diff --git a/rust/lancedb/src/database/namespace.rs b/rust/lancedb/src/database/namespace.rs index 03bc434e6..6b0d19054 100644 --- a/rust/lancedb/src/database/namespace.rs +++ b/rust/lancedb/src/database/namespace.rs @@ -450,6 +450,47 @@ mod tests { )); } + #[tokio::test] + async fn test_namespace_connection_with_namespace_client_properties() { + let tmp_dir = tempdir().unwrap(); + let root_path = tmp_dir.path().to_str().unwrap().to_string(); + + let mut properties = HashMap::new(); + properties.insert("root".to_string(), root_path); + + let conn = connect_namespace("dir", properties) + .namespace_client_property("table_version_tracking_enabled", "true") + .namespace_client_property("manifest_enabled", "true") + .execute() + .await + .expect("Failed to connect to namespace"); + + conn.create_namespace(CreateNamespaceRequest { + id: Some(vec!["test_ns".into()]), + ..Default::default() + }) + .await + .expect("Failed to create namespace"); + + let test_data = create_test_data(); + conn.create_table("test_table", test_data) + .namespace(vec!["test_ns".into()]) + .execute() + .await + .expect("Failed to create table"); + + let namespace_client = conn.namespace_client().await.unwrap(); + let describe = namespace_client + .describe_table(DescribeTableRequest { + id: Some(vec!["test_ns".into(), "test_table".into()]), + ..Default::default() + }) + .await + .expect("Failed to describe table"); + + assert_eq!(describe.managed_versioning, Some(true)); + } + #[tokio::test] async fn test_namespace_create_table_basic() { // Setup: Create a temporary directory for the namespace