diff --git a/rust/lancedb/src/database/listing.rs b/rust/lancedb/src/database/listing.rs index f79920ef9..29ee59889 100644 --- a/rust/lancedb/src/database/listing.rs +++ b/rust/lancedb/src/database/listing.rs @@ -6,10 +6,7 @@ use std::collections::HashSet; use std::fs::create_dir_all; use std::path::Path; -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; +use std::{collections::HashMap, sync::Arc}; use lance::dataset::refs::Ref; use lance::dataset::{ReadParams, WriteMode, builder::DatasetBuilder}; @@ -260,8 +257,8 @@ pub struct ListingDatabase { // Session for object stores and caching session: Arc, - // Cached namespace-backed database for child namespace operations - cached_namespace_database: Mutex>>, + // Namespace-backed database for child namespace operations + namespace_database: Arc, } impl std::fmt::Display for ListingDatabase { @@ -288,6 +285,38 @@ const MIRRORED_STORE: &str = "mirroredStore"; /// A connection to LanceDB impl ListingDatabase { + fn namespace_client_properties( + uri: &str, + storage_options: &HashMap, + ) -> HashMap { + let mut properties = HashMap::new(); + properties.insert("root".to_string(), uri.to_string()); + for (key, value) in storage_options { + properties.insert(format!("storage.{}", key), value.clone()); + } + properties + } + + async fn connect_namespace_database( + uri: &str, + storage_options: HashMap, + read_consistency_interval: Option, + session: Arc, + ) -> Result> { + let ns_properties = Self::namespace_client_properties(uri, &storage_options); + Ok(Arc::new( + LanceNamespaceDatabase::connect( + "dir", + ns_properties, + storage_options, + read_consistency_interval, + Some(session), + HashSet::new(), + ) + .await?, + )) + } + /// Connect to a listing database /// /// The URI should be a path to a directory where the tables are stored. @@ -394,6 +423,14 @@ impl ListingDatabase { None => None, }; + let namespace_database = Self::connect_namespace_database( + &table_base_uri, + options.storage_options.clone(), + request.read_consistency_interval, + session.clone(), + ) + .await?; + Ok(Self { uri: table_base_uri, query_string, @@ -405,7 +442,7 @@ impl ListingDatabase { storage_options_provider: None, new_table_config: options.new_table_config, session, - cached_namespace_database: Mutex::new(None), + namespace_database, }) } Err(_) => { @@ -437,6 +474,14 @@ impl ListingDatabase { Self::try_create_dir(path).context(CreateDirSnafu { path })?; } + let namespace_database = Self::connect_namespace_database( + path, + HashMap::new(), + read_consistency_interval, + session.clone(), + ) + .await?; + Ok(Self { uri: path.to_string(), query_string: None, @@ -448,7 +493,7 @@ impl ListingDatabase { storage_options_provider: None, new_table_config, session, - cached_namespace_database: Mutex::new(None), + namespace_database, }) } @@ -506,38 +551,8 @@ impl ListingDatabase { Ok(uri) } - async fn namespace_database(&self) -> Result> { - if let Some(db) = self - .cached_namespace_database - .lock() - .unwrap_or_else(|e| e.into_inner()) - .clone() - { - return Ok(db); - } - - let (ns_impl, ns_properties) = self.namespace_client_config().await?; - let db = Arc::new( - LanceNamespaceDatabase::connect( - &ns_impl, - ns_properties, - self.storage_options.clone(), - self.read_consistency_interval, - Some(self.session.clone()), - HashSet::new(), - ) - .await?, - ); - - let mut cached = self - .cached_namespace_database - .lock() - .unwrap_or_else(|e| e.into_inner()); - if let Some(existing) = cached.as_ref() { - return Ok(existing.clone()); - } - *cached = Some(db.clone()); - Ok(db) + fn namespace_database(&self) -> Arc { + self.namespace_database.clone() } async fn drop_tables(&self, names: Vec) -> Result<()> { @@ -739,10 +754,7 @@ impl Database for ListingDatabase { &self, request: ListNamespacesRequest, ) -> Result { - self.namespace_database() - .await? - .list_namespaces(request) - .await + self.namespace_database().list_namespaces(request).await } fn uri(&self) -> &str { @@ -765,34 +777,24 @@ impl Database for ListingDatabase { &self, request: CreateNamespaceRequest, ) -> Result { - self.namespace_database() - .await? - .create_namespace(request) - .await + self.namespace_database().create_namespace(request).await } async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result { - self.namespace_database() - .await? - .drop_namespace(request) - .await + self.namespace_database().drop_namespace(request).await } async fn describe_namespace( &self, request: DescribeNamespaceRequest, ) -> Result { - self.namespace_database() - .await? - .describe_namespace(request) - .await + self.namespace_database().describe_namespace(request).await } async fn table_names(&self, request: TableNamesRequest) -> Result> { if !request.namespace_path.is_empty() { let response = self .namespace_database() - .await? .list_tables(ListTablesRequest { id: Some(request.namespace_path), page_token: request.start_after, @@ -833,7 +835,7 @@ impl Database for ListingDatabase { async fn list_tables(&self, request: ListTablesRequest) -> Result { if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) { - return self.namespace_database().await?.list_tables(request).await; + return self.namespace_database().list_tables(request).await; } let mut f = self .object_store @@ -882,7 +884,7 @@ impl Database for ListingDatabase { async fn create_table(&self, request: CreateTableRequest) -> Result> { if !request.namespace_path.is_empty() { - return self.namespace_database().await?.create_table(request).await; + return self.namespace_database().create_table(request).await; } // Use provided location if available, otherwise derive from table name let table_uri = request @@ -1000,7 +1002,7 @@ impl Database for ListingDatabase { async fn open_table(&self, mut request: OpenTableRequest) -> Result> { if !request.namespace_path.is_empty() { - return self.namespace_database().await?.open_table(request).await; + return self.namespace_database().open_table(request).await; } // Use provided location if available, otherwise derive from table name let table_uri = request @@ -1098,7 +1100,6 @@ impl Database for ListingDatabase { if !namespace_path.is_empty() { return self .namespace_database() - .await? .drop_table(name, namespace_path) .await; } @@ -1111,7 +1112,6 @@ impl Database for ListingDatabase { if !namespace_path.is_empty() { return self .namespace_database() - .await? .drop_all_tables(namespace_path) .await; } @@ -1124,30 +1124,11 @@ impl Database for ListingDatabase { } async fn namespace_client(&self) -> Result> { - // Create a DirectoryNamespace pointing to the same root with the same storage options - let mut builder = lance_namespace_impls::DirectoryNamespaceBuilder::new(&self.uri); - - // Add storage options - if !self.storage_options.is_empty() { - builder = builder.storage_options(self.storage_options.clone()); - } - - // Use the same session - builder = builder.session(self.session.clone()); - - let namespace = builder.build().await.map_err(|e| Error::Runtime { - message: format!("Failed to create namespace client: {}", e), - })?; - Ok(Arc::new(namespace) as Arc) + self.namespace_database.namespace_client().await } async fn namespace_client_config(&self) -> Result<(String, HashMap)> { - let mut properties = HashMap::new(); - properties.insert("root".to_string(), self.uri.clone()); - for (key, value) in &self.storage_options { - properties.insert(format!("storage.{}", key), value.clone()); - } - Ok(("dir".to_string(), properties)) + self.namespace_database.namespace_client_config().await } }