Compare commits

...

4 Commits

Author SHA1 Message Date
Jack Ye
a3e3c1ad5c feat(rust): add namespace client properties 2026-04-15 22:58:05 -07:00
Jack Ye
c6e43b545d refactor(rust): eagerly init listing namespace db 2026-04-15 22:28:52 -07:00
Jack Ye
5807ad464b refactor(rust): cache listing namespace db 2026-04-15 17:23:50 -07:00
Jack Ye
513c2215c5 feat(rust): support nested namespace ops in listing db 2026-04-15 17:15:26 -07:00
3 changed files with 431 additions and 69 deletions

View File

@@ -582,6 +582,14 @@ pub struct ConnectRequest {
/// Database specific options /// Database specific options
pub options: HashMap<String, String>, pub options: HashMap<String, String>,
/// Extra properties for the equivalent namespace client.
///
/// For a local [`ListingDatabase`], these are merged into the backing
/// `DirectoryNamespace` properties. This is useful for namespace-specific
/// settings such as `table_version_tracking_enabled` that are distinct from
/// storage options.
pub namespace_client_properties: HashMap<String, String>,
/// The interval at which to check for updates from other processes. /// The interval at which to check for updates from other processes.
/// ///
/// If None, then consistency is not checked. For performance /// If None, then consistency is not checked. For performance
@@ -621,6 +629,7 @@ impl ConnectBuilder {
client_config: Default::default(), client_config: Default::default(),
read_consistency_interval: None, read_consistency_interval: None,
options: HashMap::new(), options: HashMap::new(),
namespace_client_properties: HashMap::new(),
session: None, session: None,
}, },
embedding_registry: None, embedding_registry: None,
@@ -757,6 +766,31 @@ impl ConnectBuilder {
self self
} }
/// Set an additional property for the equivalent namespace client.
pub fn namespace_client_property(
mut self,
key: impl Into<String>,
value: impl Into<String>,
) -> Self {
self.request
.namespace_client_properties
.insert(key.into(), value.into());
self
}
/// Set multiple additional properties for the equivalent namespace client.
pub fn namespace_client_properties(
mut self,
pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
for (key, value) in pairs {
self.request
.namespace_client_properties
.insert(key.into(), value.into());
}
self
}
/// The interval at which to check for updates from other processes. This /// The interval at which to check for updates from other processes. This
/// only affects LanceDB OSS. /// only affects LanceDB OSS.
/// ///
@@ -893,6 +927,7 @@ pub struct ConnectNamespaceBuilder {
ns_impl: String, ns_impl: String,
properties: HashMap<String, String>, properties: HashMap<String, String>,
storage_options: HashMap<String, String>, storage_options: HashMap<String, String>,
namespace_client_properties: HashMap<String, String>,
read_consistency_interval: Option<std::time::Duration>, read_consistency_interval: Option<std::time::Duration>,
embedding_registry: Option<Arc<dyn EmbeddingRegistry>>, embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
session: Option<Arc<lance::session::Session>>, session: Option<Arc<lance::session::Session>>,
@@ -905,6 +940,7 @@ impl ConnectNamespaceBuilder {
ns_impl: ns_impl.to_string(), ns_impl: ns_impl.to_string(),
properties, properties,
storage_options: HashMap::new(), storage_options: HashMap::new(),
namespace_client_properties: HashMap::new(),
read_consistency_interval: None, read_consistency_interval: None,
embedding_registry: None, embedding_registry: None,
session: None, session: None,
@@ -933,6 +969,29 @@ impl ConnectNamespaceBuilder {
self self
} }
/// Set an additional namespace client property.
pub fn namespace_client_property(
mut self,
key: impl Into<String>,
value: impl Into<String>,
) -> Self {
self.namespace_client_properties
.insert(key.into(), value.into());
self
}
/// Set multiple additional namespace client properties.
pub fn namespace_client_properties(
mut self,
pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
for (key, value) in pairs {
self.namespace_client_properties
.insert(key.into(), value.into());
}
self
}
/// The interval at which to check for updates from other processes. /// The interval at which to check for updates from other processes.
/// ///
/// If left unset, consistency is not checked. For maximum read /// If left unset, consistency is not checked. For maximum read
@@ -994,10 +1053,13 @@ impl ConnectNamespaceBuilder {
pub async fn execute(self) -> Result<Connection> { pub async fn execute(self) -> Result<Connection> {
use crate::database::namespace::LanceNamespaceDatabase; use crate::database::namespace::LanceNamespaceDatabase;
let mut properties = self.properties;
properties.extend(self.namespace_client_properties);
let internal = Arc::new( let internal = Arc::new(
LanceNamespaceDatabase::connect( LanceNamespaceDatabase::connect(
&self.ns_impl, &self.ns_impl,
self.properties, properties,
self.storage_options, self.storage_options,
self.read_consistency_interval, self.read_consistency_interval,
self.session, self.session,
@@ -1117,6 +1179,31 @@ mod tests {
assert_eq!(db.uri(), relative_uri.to_str().unwrap().to_string()); assert_eq!(db.uri(), relative_uri.to_str().unwrap().to_string());
} }
#[tokio::test]
async fn test_connect_with_namespace_client_properties() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let db = connect(uri)
.namespace_client_property("table_version_tracking_enabled", "true")
.namespace_client_property("manifest_enabled", "true")
.execute()
.await
.unwrap();
let (ns_impl, properties) = db.namespace_client_config().await.unwrap();
assert_eq!(ns_impl, "dir");
assert_eq!(properties.get("root"), Some(&uri.to_string()));
assert_eq!(
properties.get("table_version_tracking_enabled"),
Some(&"true".to_string())
);
assert_eq!(
properties.get("manifest_enabled"),
Some(&"true".to_string())
);
}
#[tokio::test] #[tokio::test]
async fn test_table_names() { async fn test_table_names() {
let tc = new_test_connection().await.unwrap(); let tc = new_test_connection().await.unwrap();

View File

@@ -20,6 +20,7 @@ use snafu::ResultExt;
use crate::connection::ConnectRequest; use crate::connection::ConnectRequest;
use crate::database::ReadConsistency; use crate::database::ReadConsistency;
use crate::database::namespace::LanceNamespaceDatabase;
use crate::error::{CreateDirSnafu, Error, Result}; use crate::error::{CreateDirSnafu, Error, Result};
use crate::io::object_store::MirroringObjectStoreWrapper; use crate::io::object_store::MirroringObjectStoreWrapper;
use crate::table::NativeTable; use crate::table::NativeTable;
@@ -255,6 +256,9 @@ pub struct ListingDatabase {
// Session for object stores and caching // Session for object stores and caching
session: Arc<lance::session::Session>, session: Arc<lance::session::Session>,
// Namespace-backed database for child namespace operations
namespace_database: Arc<LanceNamespaceDatabase>,
} }
impl std::fmt::Display for ListingDatabase { impl std::fmt::Display for ListingDatabase {
@@ -281,6 +285,44 @@ const MIRRORED_STORE: &str = "mirroredStore";
/// A connection to LanceDB /// A connection to LanceDB
impl ListingDatabase { impl ListingDatabase {
fn build_namespace_client_properties(
uri: &str,
storage_options: &HashMap<String, String>,
namespace_client_properties: HashMap<String, String>,
) -> HashMap<String, String> {
let mut properties = namespace_client_properties;
properties.insert("root".to_string(), uri.to_string());
for (key, value) in storage_options {
properties.insert(format!("storage.{}", key), value.clone());
}
properties
}
async fn connect_namespace_database(
uri: &str,
storage_options: HashMap<String, String>,
namespace_client_properties: HashMap<String, String>,
read_consistency_interval: Option<std::time::Duration>,
session: Arc<lance::session::Session>,
) -> Result<Arc<LanceNamespaceDatabase>> {
let ns_properties = Self::build_namespace_client_properties(
uri,
&storage_options,
namespace_client_properties,
);
Ok(Arc::new(
LanceNamespaceDatabase::connect(
"dir",
ns_properties,
storage_options,
read_consistency_interval,
Some(session),
HashSet::new(),
)
.await?,
))
}
/// Connect to a listing database /// Connect to a listing database
/// ///
/// The URI should be a path to a directory where the tables are stored. /// The URI should be a path to a directory where the tables are stored.
@@ -300,6 +342,7 @@ impl ListingDatabase {
uri, uri,
request.read_consistency_interval, request.read_consistency_interval,
options.new_table_config, options.new_table_config,
request.namespace_client_properties.clone(),
request.session.clone(), request.session.clone(),
) )
.await .await
@@ -387,6 +430,15 @@ impl ListingDatabase {
None => None, None => None,
}; };
let namespace_database = Self::connect_namespace_database(
&table_base_uri,
options.storage_options.clone(),
request.namespace_client_properties.clone(),
request.read_consistency_interval,
session.clone(),
)
.await?;
Ok(Self { Ok(Self {
uri: table_base_uri, uri: table_base_uri,
query_string, query_string,
@@ -398,6 +450,7 @@ impl ListingDatabase {
storage_options_provider: None, storage_options_provider: None,
new_table_config: options.new_table_config, new_table_config: options.new_table_config,
session, session,
namespace_database,
}) })
} }
Err(_) => { Err(_) => {
@@ -405,6 +458,7 @@ impl ListingDatabase {
uri, uri,
request.read_consistency_interval, request.read_consistency_interval,
options.new_table_config, options.new_table_config,
request.namespace_client_properties.clone(),
request.session.clone(), request.session.clone(),
) )
.await .await
@@ -416,6 +470,7 @@ impl ListingDatabase {
path: &str, path: &str,
read_consistency_interval: Option<std::time::Duration>, read_consistency_interval: Option<std::time::Duration>,
new_table_config: NewTableConfig, new_table_config: NewTableConfig,
namespace_client_properties: HashMap<String, String>,
session: Option<Arc<lance::session::Session>>, session: Option<Arc<lance::session::Session>>,
) -> Result<Self> { ) -> Result<Self> {
let session = session.unwrap_or_else(|| Arc::new(lance::session::Session::default())); let session = session.unwrap_or_else(|| Arc::new(lance::session::Session::default()));
@@ -429,6 +484,15 @@ impl ListingDatabase {
Self::try_create_dir(path).context(CreateDirSnafu { path })?; Self::try_create_dir(path).context(CreateDirSnafu { path })?;
} }
let namespace_database = Self::connect_namespace_database(
path,
HashMap::new(),
namespace_client_properties,
read_consistency_interval,
session.clone(),
)
.await?;
Ok(Self { Ok(Self {
uri: path.to_string(), uri: path.to_string(),
query_string: None, query_string: None,
@@ -440,6 +504,7 @@ impl ListingDatabase {
storage_options_provider: None, storage_options_provider: None,
new_table_config, new_table_config,
session, session,
namespace_database,
}) })
} }
@@ -497,6 +562,10 @@ impl ListingDatabase {
Ok(uri) Ok(uri)
} }
fn namespace_database(&self) -> Arc<LanceNamespaceDatabase> {
self.namespace_database.clone()
}
async fn drop_tables(&self, names: Vec<String>) -> Result<()> { async fn drop_tables(&self, names: Vec<String>) -> Result<()> {
let object_store_params = ObjectStoreParams { let object_store_params = ObjectStoreParams {
storage_options_accessor: if self.storage_options.is_empty() { storage_options_accessor: if self.storage_options.is_empty() {
@@ -696,16 +765,7 @@ impl Database for ListingDatabase {
&self, &self,
request: ListNamespacesRequest, request: ListNamespacesRequest,
) -> Result<ListNamespacesResponse> { ) -> Result<ListNamespacesResponse> {
if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) { self.namespace_database().list_namespaces(request).await
return Err(Error::NotSupported {
message: "Namespace operations are not supported for listing database".into(),
});
}
Ok(ListNamespacesResponse {
namespaces: Vec::new(),
page_token: None,
})
} }
fn uri(&self) -> &str { fn uri(&self) -> &str {
@@ -726,36 +786,26 @@ impl Database for ListingDatabase {
async fn create_namespace( async fn create_namespace(
&self, &self,
_request: CreateNamespaceRequest, request: CreateNamespaceRequest,
) -> Result<CreateNamespaceResponse> { ) -> Result<CreateNamespaceResponse> {
Err(Error::NotSupported { self.namespace_database().create_namespace(request).await
message: "Namespace operations are not supported for listing database".into(),
})
} }
async fn drop_namespace( async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
&self, self.namespace_database().drop_namespace(request).await
_request: DropNamespaceRequest,
) -> Result<DropNamespaceResponse> {
Err(Error::NotSupported {
message: "Namespace operations are not supported for listing database".into(),
})
} }
async fn describe_namespace( async fn describe_namespace(
&self, &self,
_request: DescribeNamespaceRequest, request: DescribeNamespaceRequest,
) -> Result<DescribeNamespaceResponse> { ) -> Result<DescribeNamespaceResponse> {
Err(Error::NotSupported { self.namespace_database().describe_namespace(request).await
message: "Namespace operations are not supported for listing database".into(),
})
} }
#[allow(deprecated)]
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> { async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> {
if !request.namespace_path.is_empty() { if !request.namespace_path.is_empty() {
return Err(Error::NotSupported { return self.namespace_database().table_names(request).await;
message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(),
});
} }
let mut f = self let mut f = self
.object_store .object_store
@@ -788,9 +838,7 @@ impl Database for ListingDatabase {
async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> { async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) { if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) {
return Err(Error::NotSupported { return self.namespace_database().list_tables(request).await;
message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(),
});
} }
let mut f = self let mut f = self
.object_store .object_store
@@ -838,11 +886,8 @@ impl Database for ListingDatabase {
} }
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> { async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> {
// When namespace is not empty, location must be provided if !request.namespace_path.is_empty() {
if !request.namespace_path.is_empty() && request.location.is_none() { return self.namespace_database().create_table(request).await;
return Err(Error::InvalidInput {
message: "Location must be provided when namespace is not empty".into(),
});
} }
// Use provided location if available, otherwise derive from table name // Use provided location if available, otherwise derive from table name
let table_uri = request let table_uri = request
@@ -959,11 +1004,8 @@ impl Database for ListingDatabase {
} }
async fn open_table(&self, mut request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> { async fn open_table(&self, mut request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
// When namespace is not empty, location must be provided if !request.namespace_path.is_empty() {
if !request.namespace_path.is_empty() && request.location.is_none() { return self.namespace_database().open_table(request).await;
return Err(Error::InvalidInput {
message: "Location must be provided when namespace is not empty".into(),
});
} }
// Use provided location if available, otherwise derive from table name // Use provided location if available, otherwise derive from table name
let table_uri = request let table_uri = request
@@ -1059,9 +1101,10 @@ impl Database for ListingDatabase {
async fn drop_table(&self, name: &str, namespace_path: &[String]) -> Result<()> { async fn drop_table(&self, name: &str, namespace_path: &[String]) -> Result<()> {
if !namespace_path.is_empty() { if !namespace_path.is_empty() {
return Err(Error::NotSupported { return self
message: "Namespace parameter is not supported for listing database.".into(), .namespace_database()
}); .drop_table(name, namespace_path)
.await;
} }
self.drop_tables(vec![name.to_string()]).await self.drop_tables(vec![name.to_string()]).await
} }
@@ -1070,9 +1113,10 @@ impl Database for ListingDatabase {
async fn drop_all_tables(&self, namespace_path: &[String]) -> Result<()> { async fn drop_all_tables(&self, namespace_path: &[String]) -> Result<()> {
// Check if namespace parameter is provided // Check if namespace parameter is provided
if !namespace_path.is_empty() { if !namespace_path.is_empty() {
return Err(Error::NotSupported { return self
message: "Namespace parameter is not supported for listing database.".into(), .namespace_database()
}); .drop_all_tables(namespace_path)
.await;
} }
let tables = self.table_names(TableNamesRequest::default()).await?; let tables = self.table_names(TableNamesRequest::default()).await?;
self.drop_tables(tables).await self.drop_tables(tables).await
@@ -1083,30 +1127,11 @@ impl Database for ListingDatabase {
} }
async fn namespace_client(&self) -> Result<Arc<dyn lance_namespace::LanceNamespace>> { async fn namespace_client(&self) -> Result<Arc<dyn lance_namespace::LanceNamespace>> {
// Create a DirectoryNamespace pointing to the same root with the same storage options self.namespace_database.namespace_client().await
let mut builder = lance_namespace_impls::DirectoryNamespaceBuilder::new(&self.uri);
// Add storage options
if !self.storage_options.is_empty() {
builder = builder.storage_options(self.storage_options.clone());
}
// Use the same session
builder = builder.session(self.session.clone());
let namespace = builder.build().await.map_err(|e| Error::Runtime {
message: format!("Failed to create namespace client: {}", e),
})?;
Ok(Arc::new(namespace) as Arc<dyn lance_namespace::LanceNamespace>)
} }
async fn namespace_client_config(&self) -> Result<(String, HashMap<String, String>)> { async fn namespace_client_config(&self) -> Result<(String, HashMap<String, String>)> {
let mut properties = HashMap::new(); self.namespace_database.namespace_client_config().await
properties.insert("root".to_string(), self.uri.clone());
for (key, value) in &self.storage_options {
properties.insert(format!("storage.{}", key), value.clone());
}
Ok(("dir".to_string(), properties))
} }
} }
@@ -1132,6 +1157,7 @@ mod tests {
#[cfg(feature = "remote")] #[cfg(feature = "remote")]
client_config: Default::default(), client_config: Default::default(),
options: Default::default(), options: Default::default(),
namespace_client_properties: Default::default(),
read_consistency_interval: None, read_consistency_interval: None,
session: None, session: None,
}; };
@@ -1265,6 +1291,7 @@ mod tests {
#[cfg(feature = "remote")] #[cfg(feature = "remote")]
client_config: Default::default(), client_config: Default::default(),
options: options.clone(), options: options.clone(),
namespace_client_properties: Default::default(),
read_consistency_interval: None, read_consistency_interval: None,
session: None, session: None,
}; };
@@ -1799,6 +1826,7 @@ mod tests {
#[cfg(feature = "remote")] #[cfg(feature = "remote")]
client_config: Default::default(), client_config: Default::default(),
options, options,
namespace_client_properties: Default::default(),
read_consistency_interval: None, read_consistency_interval: None,
session: None, session: None,
}; };
@@ -1904,6 +1932,7 @@ mod tests {
#[cfg(feature = "remote")] #[cfg(feature = "remote")]
client_config: Default::default(), client_config: Default::default(),
options, options,
namespace_client_properties: Default::default(),
read_consistency_interval: None, read_consistency_interval: None,
session: None, session: None,
}; };
@@ -1975,6 +2004,7 @@ mod tests {
#[cfg(feature = "remote")] #[cfg(feature = "remote")]
client_config: Default::default(), client_config: Default::default(),
options, options,
namespace_client_properties: Default::default(),
read_consistency_interval: None, read_consistency_interval: None,
session: None, session: None,
}; };
@@ -2108,4 +2138,208 @@ mod tests {
assert!(tables.contains(&"table1".to_string())); assert!(tables.contains(&"table1".to_string()));
assert!(tables.contains(&"table2".to_string())); assert!(tables.contains(&"table2".to_string()));
} }
#[tokio::test]
async fn test_listing_database_namespace_operations() {
let (_tempdir, db) = setup_database().await;
db.create_namespace(CreateNamespaceRequest {
id: Some(vec!["parent".to_string()]),
..Default::default()
})
.await
.unwrap();
db.create_namespace(CreateNamespaceRequest {
id: Some(vec!["parent".to_string(), "child".to_string()]),
..Default::default()
})
.await
.unwrap();
let root_namespaces = db
.list_namespaces(ListNamespacesRequest {
id: Some(vec![]),
..Default::default()
})
.await
.unwrap();
assert!(root_namespaces.namespaces.contains(&"parent".to_string()));
let child_namespaces = db
.list_namespaces(ListNamespacesRequest {
id: Some(vec!["parent".to_string()]),
..Default::default()
})
.await
.unwrap();
assert!(child_namespaces.namespaces.contains(&"child".to_string()));
db.describe_namespace(DescribeNamespaceRequest {
id: Some(vec!["parent".to_string(), "child".to_string()]),
..Default::default()
})
.await
.unwrap();
}
#[tokio::test]
async fn test_listing_database_with_namespace_client_properties() {
let tempdir = tempdir().unwrap();
let uri = tempdir.path().to_str().unwrap();
let mut namespace_client_properties = HashMap::new();
namespace_client_properties.insert(
"table_version_tracking_enabled".to_string(),
"true".to_string(),
);
namespace_client_properties.insert("manifest_enabled".to_string(), "true".to_string());
let request = ConnectRequest {
uri: uri.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
options: Default::default(),
namespace_client_properties,
read_consistency_interval: None,
session: None,
};
let db = ListingDatabase::connect_with_options(&request)
.await
.unwrap();
let namespace_path = vec!["test_ns".to_string()];
db.create_namespace(CreateNamespaceRequest {
id: Some(namespace_path.clone()),
..Default::default()
})
.await
.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
db.create_table(CreateTableRequest {
name: "managed_table".to_string(),
namespace_path: namespace_path.clone(),
data: Box::new(RecordBatch::new_empty(schema)) as Box<dyn Scannable>,
mode: CreateTableMode::Create,
write_options: Default::default(),
location: None,
namespace_client: None,
})
.await
.unwrap();
let namespace_client = db.namespace_client().await.unwrap();
let describe = namespace_client
.describe_table(lance_namespace::models::DescribeTableRequest {
id: Some(vec!["test_ns".to_string(), "managed_table".to_string()]),
..Default::default()
})
.await
.unwrap();
assert_eq!(describe.managed_versioning, Some(true));
}
#[tokio::test]
async fn test_listing_database_nested_namespace_table_ops() {
let (_tempdir, db) = setup_database().await;
let namespace_path = vec!["parent".to_string(), "child".to_string()];
db.create_namespace(CreateNamespaceRequest {
id: Some(vec!["parent".to_string()]),
..Default::default()
})
.await
.unwrap();
db.create_namespace(CreateNamespaceRequest {
id: Some(namespace_path.clone()),
..Default::default()
})
.await
.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
db.create_table(CreateTableRequest {
name: "nested_table".to_string(),
namespace_path: namespace_path.clone(),
data: Box::new(RecordBatch::new_empty(schema)) as Box<dyn Scannable>,
mode: CreateTableMode::Create,
write_options: Default::default(),
location: None,
namespace_client: None,
})
.await
.unwrap();
let namespace_client = db.namespace_client().await.unwrap();
let describe = namespace_client
.describe_table(lance_namespace::models::DescribeTableRequest {
id: Some(vec![
"parent".to_string(),
"child".to_string(),
"nested_table".to_string(),
]),
..Default::default()
})
.await
.unwrap();
assert!(describe.location.is_some());
let table = db
.open_table(OpenTableRequest {
name: "nested_table".to_string(),
namespace_path: namespace_path.clone(),
index_cache_size: None,
lance_read_params: None,
location: None,
namespace_client: None,
managed_versioning: None,
})
.await
.unwrap();
assert_eq!(table.name(), "nested_table");
#[allow(deprecated)]
let table_names = db
.table_names(TableNamesRequest {
namespace_path: namespace_path.clone(),
start_after: None,
limit: None,
})
.await
.unwrap();
assert_eq!(table_names, vec!["nested_table".to_string()]);
let list_tables = db
.list_tables(ListTablesRequest {
id: Some(namespace_path.clone()),
..Default::default()
})
.await
.unwrap();
assert_eq!(list_tables.tables, vec!["nested_table".to_string()]);
db.drop_table("nested_table", &namespace_path)
.await
.unwrap();
let post_drop = db
.list_tables(ListTablesRequest {
id: Some(namespace_path),
..Default::default()
})
.await
.unwrap();
assert!(post_drop.tables.is_empty());
}
} }

View File

@@ -450,6 +450,47 @@ mod tests {
)); ));
} }
#[tokio::test]
async fn test_namespace_connection_with_namespace_client_properties() {
let tmp_dir = tempdir().unwrap();
let root_path = tmp_dir.path().to_str().unwrap().to_string();
let mut properties = HashMap::new();
properties.insert("root".to_string(), root_path);
let conn = connect_namespace("dir", properties)
.namespace_client_property("table_version_tracking_enabled", "true")
.namespace_client_property("manifest_enabled", "true")
.execute()
.await
.expect("Failed to connect to namespace");
conn.create_namespace(CreateNamespaceRequest {
id: Some(vec!["test_ns".into()]),
..Default::default()
})
.await
.expect("Failed to create namespace");
let test_data = create_test_data();
conn.create_table("test_table", test_data)
.namespace(vec!["test_ns".into()])
.execute()
.await
.expect("Failed to create table");
let namespace_client = conn.namespace_client().await.unwrap();
let describe = namespace_client
.describe_table(DescribeTableRequest {
id: Some(vec!["test_ns".into(), "test_table".into()]),
..Default::default()
})
.await
.expect("Failed to describe table");
assert_eq!(describe.managed_versioning, Some(true));
}
#[tokio::test] #[tokio::test]
async fn test_namespace_create_table_basic() { async fn test_namespace_create_table_basic() {
// Setup: Create a temporary directory for the namespace // Setup: Create a temporary directory for the namespace