feat(rust): support nested namespace ops in listing db

This commit is contained in:
Jack Ye
2026-04-15 17:15:26 -07:00
parent 10879d99b8
commit 513c2215c5

View File

@@ -20,6 +20,7 @@ use snafu::ResultExt;
use crate::connection::ConnectRequest;
use crate::database::ReadConsistency;
use crate::database::namespace::LanceNamespaceDatabase;
use crate::error::{CreateDirSnafu, Error, Result};
use crate::io::object_store::MirroringObjectStoreWrapper;
use crate::table::NativeTable;
@@ -497,6 +498,19 @@ impl ListingDatabase {
Ok(uri)
}
async fn namespace_database(&self) -> Result<LanceNamespaceDatabase> {
let (ns_impl, ns_properties) = self.namespace_client_config().await?;
LanceNamespaceDatabase::connect(
&ns_impl,
ns_properties,
self.storage_options.clone(),
self.read_consistency_interval,
Some(self.session.clone()),
HashSet::new(),
)
.await
}
async fn drop_tables(&self, names: Vec<String>) -> Result<()> {
let object_store_params = ObjectStoreParams {
storage_options_accessor: if self.storage_options.is_empty() {
@@ -696,16 +710,10 @@ impl Database for ListingDatabase {
&self,
request: ListNamespacesRequest,
) -> Result<ListNamespacesResponse> {
if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) {
return Err(Error::NotSupported {
message: "Namespace operations are not supported for listing database".into(),
});
}
Ok(ListNamespacesResponse {
namespaces: Vec::new(),
page_token: None,
})
self.namespace_database()
.await?
.list_namespaces(request)
.await
}
fn uri(&self) -> &str {
@@ -726,36 +734,44 @@ impl Database for ListingDatabase {
async fn create_namespace(
&self,
_request: CreateNamespaceRequest,
request: CreateNamespaceRequest,
) -> Result<CreateNamespaceResponse> {
Err(Error::NotSupported {
message: "Namespace operations are not supported for listing database".into(),
})
self.namespace_database()
.await?
.create_namespace(request)
.await
}
async fn drop_namespace(
&self,
_request: DropNamespaceRequest,
) -> Result<DropNamespaceResponse> {
Err(Error::NotSupported {
message: "Namespace operations are not supported for listing database".into(),
})
async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
self.namespace_database()
.await?
.drop_namespace(request)
.await
}
async fn describe_namespace(
&self,
_request: DescribeNamespaceRequest,
request: DescribeNamespaceRequest,
) -> Result<DescribeNamespaceResponse> {
Err(Error::NotSupported {
message: "Namespace operations are not supported for listing database".into(),
})
self.namespace_database()
.await?
.describe_namespace(request)
.await
}
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> {
if !request.namespace_path.is_empty() {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(),
});
let response = self
.namespace_database()
.await?
.list_tables(ListTablesRequest {
id: Some(request.namespace_path),
page_token: request.start_after,
limit: request.limit.map(|l| l as i32),
..Default::default()
})
.await?;
return Ok(response.tables);
}
let mut f = self
.object_store
@@ -788,9 +804,7 @@ impl Database for ListingDatabase {
async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(),
});
return self.namespace_database().await?.list_tables(request).await;
}
let mut f = self
.object_store
@@ -838,11 +852,8 @@ impl Database for ListingDatabase {
}
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> {
// When namespace is not empty, location must be provided
if !request.namespace_path.is_empty() && request.location.is_none() {
return Err(Error::InvalidInput {
message: "Location must be provided when namespace is not empty".into(),
});
if !request.namespace_path.is_empty() {
return self.namespace_database().await?.create_table(request).await;
}
// Use provided location if available, otherwise derive from table name
let table_uri = request
@@ -959,11 +970,8 @@ impl Database for ListingDatabase {
}
async fn open_table(&self, mut request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
// When namespace is not empty, location must be provided
if !request.namespace_path.is_empty() && request.location.is_none() {
return Err(Error::InvalidInput {
message: "Location must be provided when namespace is not empty".into(),
});
if !request.namespace_path.is_empty() {
return self.namespace_database().await?.open_table(request).await;
}
// Use provided location if available, otherwise derive from table name
let table_uri = request
@@ -1059,9 +1067,11 @@ impl Database for ListingDatabase {
async fn drop_table(&self, name: &str, namespace_path: &[String]) -> Result<()> {
if !namespace_path.is_empty() {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database.".into(),
});
return self
.namespace_database()
.await?
.drop_table(name, namespace_path)
.await;
}
self.drop_tables(vec![name.to_string()]).await
}
@@ -1070,9 +1080,11 @@ impl Database for ListingDatabase {
async fn drop_all_tables(&self, namespace_path: &[String]) -> Result<()> {
// Check if namespace parameter is provided
if !namespace_path.is_empty() {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database.".into(),
});
return self
.namespace_database()
.await?
.drop_all_tables(namespace_path)
.await;
}
let tables = self.table_names(TableNamesRequest::default()).await?;
self.drop_tables(tables).await
@@ -2108,4 +2120,145 @@ mod tests {
assert!(tables.contains(&"table1".to_string()));
assert!(tables.contains(&"table2".to_string()));
}
#[tokio::test]
async fn test_listing_database_namespace_operations() {
let (_tempdir, db) = setup_database().await;
db.create_namespace(CreateNamespaceRequest {
id: Some(vec!["parent".to_string()]),
..Default::default()
})
.await
.unwrap();
db.create_namespace(CreateNamespaceRequest {
id: Some(vec!["parent".to_string(), "child".to_string()]),
..Default::default()
})
.await
.unwrap();
let root_namespaces = db
.list_namespaces(ListNamespacesRequest {
id: Some(vec![]),
..Default::default()
})
.await
.unwrap();
assert!(root_namespaces.namespaces.contains(&"parent".to_string()));
let child_namespaces = db
.list_namespaces(ListNamespacesRequest {
id: Some(vec!["parent".to_string()]),
..Default::default()
})
.await
.unwrap();
assert!(child_namespaces.namespaces.contains(&"child".to_string()));
db.describe_namespace(DescribeNamespaceRequest {
id: Some(vec!["parent".to_string(), "child".to_string()]),
..Default::default()
})
.await
.unwrap();
}
#[tokio::test]
async fn test_listing_database_nested_namespace_table_ops() {
let (_tempdir, db) = setup_database().await;
let namespace_path = vec!["parent".to_string(), "child".to_string()];
db.create_namespace(CreateNamespaceRequest {
id: Some(vec!["parent".to_string()]),
..Default::default()
})
.await
.unwrap();
db.create_namespace(CreateNamespaceRequest {
id: Some(namespace_path.clone()),
..Default::default()
})
.await
.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
db.create_table(CreateTableRequest {
name: "nested_table".to_string(),
namespace_path: namespace_path.clone(),
data: Box::new(RecordBatch::new_empty(schema)) as Box<dyn Scannable>,
mode: CreateTableMode::Create,
write_options: Default::default(),
location: None,
namespace_client: None,
})
.await
.unwrap();
let namespace_client = db.namespace_client().await.unwrap();
let describe = namespace_client
.describe_table(lance_namespace::models::DescribeTableRequest {
id: Some(vec![
"parent".to_string(),
"child".to_string(),
"nested_table".to_string(),
]),
..Default::default()
})
.await
.unwrap();
assert!(describe.location.is_some());
let table = db
.open_table(OpenTableRequest {
name: "nested_table".to_string(),
namespace_path: namespace_path.clone(),
index_cache_size: None,
lance_read_params: None,
location: None,
namespace_client: None,
managed_versioning: None,
})
.await
.unwrap();
assert_eq!(table.name(), "nested_table");
#[allow(deprecated)]
let table_names = db
.table_names(TableNamesRequest {
namespace_path: namespace_path.clone(),
start_after: None,
limit: None,
})
.await
.unwrap();
assert_eq!(table_names, vec!["nested_table".to_string()]);
let list_tables = db
.list_tables(ListTablesRequest {
id: Some(namespace_path.clone()),
..Default::default()
})
.await
.unwrap();
assert_eq!(list_tables.tables, vec!["nested_table".to_string()]);
db.drop_table("nested_table", &namespace_path)
.await
.unwrap();
let post_drop = db
.list_tables(ListTablesRequest {
id: Some(namespace_path),
..Default::default()
})
.await
.unwrap();
assert!(post_drop.tables.is_empty());
}
}