refactor(rust): cache listing namespace db

This commit is contained in:
Jack Ye
2026-04-15 17:23:50 -07:00
parent 513c2215c5
commit 5807ad464b

View File

@@ -6,7 +6,10 @@
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::path::Path;
use std::{collections::HashMap, sync::Arc};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use lance::dataset::refs::Ref;
use lance::dataset::{ReadParams, WriteMode, builder::DatasetBuilder};
@@ -256,6 +259,9 @@ pub struct ListingDatabase {
// Session for object stores and caching
session: Arc<lance::session::Session>,
// Cached namespace-backed database for child namespace operations
cached_namespace_database: Mutex<Option<Arc<LanceNamespaceDatabase>>>,
}
impl std::fmt::Display for ListingDatabase {
@@ -399,6 +405,7 @@ impl ListingDatabase {
storage_options_provider: None,
new_table_config: options.new_table_config,
session,
cached_namespace_database: Mutex::new(None),
})
}
Err(_) => {
@@ -441,6 +448,7 @@ impl ListingDatabase {
storage_options_provider: None,
new_table_config,
session,
cached_namespace_database: Mutex::new(None),
})
}
@@ -498,17 +506,38 @@ impl ListingDatabase {
Ok(uri)
}
async fn namespace_database(&self) -> Result<LanceNamespaceDatabase> {
async fn namespace_database(&self) -> Result<Arc<LanceNamespaceDatabase>> {
if let Some(db) = self
.cached_namespace_database
.lock()
.unwrap_or_else(|e| e.into_inner())
.clone()
{
return Ok(db);
}
let (ns_impl, ns_properties) = self.namespace_client_config().await?;
LanceNamespaceDatabase::connect(
&ns_impl,
ns_properties,
self.storage_options.clone(),
self.read_consistency_interval,
Some(self.session.clone()),
HashSet::new(),
)
.await
let db = Arc::new(
LanceNamespaceDatabase::connect(
&ns_impl,
ns_properties,
self.storage_options.clone(),
self.read_consistency_interval,
Some(self.session.clone()),
HashSet::new(),
)
.await?,
);
let mut cached = self
.cached_namespace_database
.lock()
.unwrap_or_else(|e| e.into_inner());
if let Some(existing) = cached.as_ref() {
return Ok(existing.clone());
}
*cached = Some(db.clone());
Ok(db)
}
async fn drop_tables(&self, names: Vec<String>) -> Result<()> {