refactor(rust): eagerly init listing namespace db

This commit is contained in:
Jack Ye
2026-04-15 22:28:52 -07:00
parent 5807ad464b
commit c6e43b545d

View File

@@ -6,10 +6,7 @@
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::path::Path;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use std::{collections::HashMap, sync::Arc};
use lance::dataset::refs::Ref;
use lance::dataset::{ReadParams, WriteMode, builder::DatasetBuilder};
@@ -260,8 +257,8 @@ pub struct ListingDatabase {
// Session for object stores and caching
session: Arc<lance::session::Session>,
// Cached namespace-backed database for child namespace operations
cached_namespace_database: Mutex<Option<Arc<LanceNamespaceDatabase>>>,
// Namespace-backed database for child namespace operations
namespace_database: Arc<LanceNamespaceDatabase>,
}
impl std::fmt::Display for ListingDatabase {
@@ -288,6 +285,38 @@ const MIRRORED_STORE: &str = "mirroredStore";
/// A connection to LanceDB
impl ListingDatabase {
fn namespace_client_properties(
uri: &str,
storage_options: &HashMap<String, String>,
) -> HashMap<String, String> {
let mut properties = HashMap::new();
properties.insert("root".to_string(), uri.to_string());
for (key, value) in storage_options {
properties.insert(format!("storage.{}", key), value.clone());
}
properties
}
async fn connect_namespace_database(
uri: &str,
storage_options: HashMap<String, String>,
read_consistency_interval: Option<std::time::Duration>,
session: Arc<lance::session::Session>,
) -> Result<Arc<LanceNamespaceDatabase>> {
let ns_properties = Self::namespace_client_properties(uri, &storage_options);
Ok(Arc::new(
LanceNamespaceDatabase::connect(
"dir",
ns_properties,
storage_options,
read_consistency_interval,
Some(session),
HashSet::new(),
)
.await?,
))
}
/// Connect to a listing database
///
/// The URI should be a path to a directory where the tables are stored.
@@ -394,6 +423,14 @@ impl ListingDatabase {
None => None,
};
let namespace_database = Self::connect_namespace_database(
&table_base_uri,
options.storage_options.clone(),
request.read_consistency_interval,
session.clone(),
)
.await?;
Ok(Self {
uri: table_base_uri,
query_string,
@@ -405,7 +442,7 @@ impl ListingDatabase {
storage_options_provider: None,
new_table_config: options.new_table_config,
session,
cached_namespace_database: Mutex::new(None),
namespace_database,
})
}
Err(_) => {
@@ -437,6 +474,14 @@ impl ListingDatabase {
Self::try_create_dir(path).context(CreateDirSnafu { path })?;
}
let namespace_database = Self::connect_namespace_database(
path,
HashMap::new(),
read_consistency_interval,
session.clone(),
)
.await?;
Ok(Self {
uri: path.to_string(),
query_string: None,
@@ -448,7 +493,7 @@ impl ListingDatabase {
storage_options_provider: None,
new_table_config,
session,
cached_namespace_database: Mutex::new(None),
namespace_database,
})
}
@@ -506,38 +551,8 @@ impl ListingDatabase {
Ok(uri)
}
async fn namespace_database(&self) -> Result<Arc<LanceNamespaceDatabase>> {
if let Some(db) = self
.cached_namespace_database
.lock()
.unwrap_or_else(|e| e.into_inner())
.clone()
{
return Ok(db);
}
let (ns_impl, ns_properties) = self.namespace_client_config().await?;
let db = Arc::new(
LanceNamespaceDatabase::connect(
&ns_impl,
ns_properties,
self.storage_options.clone(),
self.read_consistency_interval,
Some(self.session.clone()),
HashSet::new(),
)
.await?,
);
let mut cached = self
.cached_namespace_database
.lock()
.unwrap_or_else(|e| e.into_inner());
if let Some(existing) = cached.as_ref() {
return Ok(existing.clone());
}
*cached = Some(db.clone());
Ok(db)
fn namespace_database(&self) -> Arc<LanceNamespaceDatabase> {
self.namespace_database.clone()
}
async fn drop_tables(&self, names: Vec<String>) -> Result<()> {
@@ -739,10 +754,7 @@ impl Database for ListingDatabase {
&self,
request: ListNamespacesRequest,
) -> Result<ListNamespacesResponse> {
self.namespace_database()
.await?
.list_namespaces(request)
.await
self.namespace_database().list_namespaces(request).await
}
fn uri(&self) -> &str {
@@ -765,34 +777,24 @@ impl Database for ListingDatabase {
&self,
request: CreateNamespaceRequest,
) -> Result<CreateNamespaceResponse> {
self.namespace_database()
.await?
.create_namespace(request)
.await
self.namespace_database().create_namespace(request).await
}
async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
self.namespace_database()
.await?
.drop_namespace(request)
.await
self.namespace_database().drop_namespace(request).await
}
async fn describe_namespace(
&self,
request: DescribeNamespaceRequest,
) -> Result<DescribeNamespaceResponse> {
self.namespace_database()
.await?
.describe_namespace(request)
.await
self.namespace_database().describe_namespace(request).await
}
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> {
if !request.namespace_path.is_empty() {
let response = self
.namespace_database()
.await?
.list_tables(ListTablesRequest {
id: Some(request.namespace_path),
page_token: request.start_after,
@@ -833,7 +835,7 @@ impl Database for ListingDatabase {
async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) {
return self.namespace_database().await?.list_tables(request).await;
return self.namespace_database().list_tables(request).await;
}
let mut f = self
.object_store
@@ -882,7 +884,7 @@ impl Database for ListingDatabase {
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> {
if !request.namespace_path.is_empty() {
return self.namespace_database().await?.create_table(request).await;
return self.namespace_database().create_table(request).await;
}
// Use provided location if available, otherwise derive from table name
let table_uri = request
@@ -1000,7 +1002,7 @@ impl Database for ListingDatabase {
async fn open_table(&self, mut request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
if !request.namespace_path.is_empty() {
return self.namespace_database().await?.open_table(request).await;
return self.namespace_database().open_table(request).await;
}
// Use provided location if available, otherwise derive from table name
let table_uri = request
@@ -1098,7 +1100,6 @@ impl Database for ListingDatabase {
if !namespace_path.is_empty() {
return self
.namespace_database()
.await?
.drop_table(name, namespace_path)
.await;
}
@@ -1111,7 +1112,6 @@ impl Database for ListingDatabase {
if !namespace_path.is_empty() {
return self
.namespace_database()
.await?
.drop_all_tables(namespace_path)
.await;
}
@@ -1124,30 +1124,11 @@ impl Database for ListingDatabase {
}
async fn namespace_client(&self) -> Result<Arc<dyn lance_namespace::LanceNamespace>> {
// Create a DirectoryNamespace pointing to the same root with the same storage options
let mut builder = lance_namespace_impls::DirectoryNamespaceBuilder::new(&self.uri);
// Add storage options
if !self.storage_options.is_empty() {
builder = builder.storage_options(self.storage_options.clone());
}
// Use the same session
builder = builder.session(self.session.clone());
let namespace = builder.build().await.map_err(|e| Error::Runtime {
message: format!("Failed to create namespace client: {}", e),
})?;
Ok(Arc::new(namespace) as Arc<dyn lance_namespace::LanceNamespace>)
self.namespace_database.namespace_client().await
}
async fn namespace_client_config(&self) -> Result<(String, HashMap<String, String>)> {
let mut properties = HashMap::new();
properties.insert("root".to_string(), self.uri.clone());
for (key, value) in &self.storage_options {
properties.insert(format!("storage.{}", key), value.clone());
}
Ok(("dir".to_string(), properties))
self.namespace_database.namespace_client_config().await
}
}