mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-21 13:10:39 +00:00
Compare commits
4 Commits
python-v0.
...
rust-neste
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a3e3c1ad5c | ||
|
|
c6e43b545d | ||
|
|
5807ad464b | ||
|
|
513c2215c5 |
@@ -582,6 +582,14 @@ pub struct ConnectRequest {
|
|||||||
/// Database specific options
|
/// Database specific options
|
||||||
pub options: HashMap<String, String>,
|
pub options: HashMap<String, String>,
|
||||||
|
|
||||||
|
/// Extra properties for the equivalent namespace client.
|
||||||
|
///
|
||||||
|
/// For a local [`ListingDatabase`], these are merged into the backing
|
||||||
|
/// `DirectoryNamespace` properties. This is useful for namespace-specific
|
||||||
|
/// settings such as `table_version_tracking_enabled` that are distinct from
|
||||||
|
/// storage options.
|
||||||
|
pub namespace_client_properties: HashMap<String, String>,
|
||||||
|
|
||||||
/// The interval at which to check for updates from other processes.
|
/// The interval at which to check for updates from other processes.
|
||||||
///
|
///
|
||||||
/// If None, then consistency is not checked. For performance
|
/// If None, then consistency is not checked. For performance
|
||||||
@@ -621,6 +629,7 @@ impl ConnectBuilder {
|
|||||||
client_config: Default::default(),
|
client_config: Default::default(),
|
||||||
read_consistency_interval: None,
|
read_consistency_interval: None,
|
||||||
options: HashMap::new(),
|
options: HashMap::new(),
|
||||||
|
namespace_client_properties: HashMap::new(),
|
||||||
session: None,
|
session: None,
|
||||||
},
|
},
|
||||||
embedding_registry: None,
|
embedding_registry: None,
|
||||||
@@ -757,6 +766,31 @@ impl ConnectBuilder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set an additional property for the equivalent namespace client.
|
||||||
|
pub fn namespace_client_property(
|
||||||
|
mut self,
|
||||||
|
key: impl Into<String>,
|
||||||
|
value: impl Into<String>,
|
||||||
|
) -> Self {
|
||||||
|
self.request
|
||||||
|
.namespace_client_properties
|
||||||
|
.insert(key.into(), value.into());
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set multiple additional properties for the equivalent namespace client.
|
||||||
|
pub fn namespace_client_properties(
|
||||||
|
mut self,
|
||||||
|
pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
|
||||||
|
) -> Self {
|
||||||
|
for (key, value) in pairs {
|
||||||
|
self.request
|
||||||
|
.namespace_client_properties
|
||||||
|
.insert(key.into(), value.into());
|
||||||
|
}
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// The interval at which to check for updates from other processes. This
|
/// The interval at which to check for updates from other processes. This
|
||||||
/// only affects LanceDB OSS.
|
/// only affects LanceDB OSS.
|
||||||
///
|
///
|
||||||
@@ -893,6 +927,7 @@ pub struct ConnectNamespaceBuilder {
|
|||||||
ns_impl: String,
|
ns_impl: String,
|
||||||
properties: HashMap<String, String>,
|
properties: HashMap<String, String>,
|
||||||
storage_options: HashMap<String, String>,
|
storage_options: HashMap<String, String>,
|
||||||
|
namespace_client_properties: HashMap<String, String>,
|
||||||
read_consistency_interval: Option<std::time::Duration>,
|
read_consistency_interval: Option<std::time::Duration>,
|
||||||
embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
|
embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
|
||||||
session: Option<Arc<lance::session::Session>>,
|
session: Option<Arc<lance::session::Session>>,
|
||||||
@@ -905,6 +940,7 @@ impl ConnectNamespaceBuilder {
|
|||||||
ns_impl: ns_impl.to_string(),
|
ns_impl: ns_impl.to_string(),
|
||||||
properties,
|
properties,
|
||||||
storage_options: HashMap::new(),
|
storage_options: HashMap::new(),
|
||||||
|
namespace_client_properties: HashMap::new(),
|
||||||
read_consistency_interval: None,
|
read_consistency_interval: None,
|
||||||
embedding_registry: None,
|
embedding_registry: None,
|
||||||
session: None,
|
session: None,
|
||||||
@@ -933,6 +969,29 @@ impl ConnectNamespaceBuilder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set an additional namespace client property.
|
||||||
|
pub fn namespace_client_property(
|
||||||
|
mut self,
|
||||||
|
key: impl Into<String>,
|
||||||
|
value: impl Into<String>,
|
||||||
|
) -> Self {
|
||||||
|
self.namespace_client_properties
|
||||||
|
.insert(key.into(), value.into());
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set multiple additional namespace client properties.
|
||||||
|
pub fn namespace_client_properties(
|
||||||
|
mut self,
|
||||||
|
pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
|
||||||
|
) -> Self {
|
||||||
|
for (key, value) in pairs {
|
||||||
|
self.namespace_client_properties
|
||||||
|
.insert(key.into(), value.into());
|
||||||
|
}
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// The interval at which to check for updates from other processes.
|
/// The interval at which to check for updates from other processes.
|
||||||
///
|
///
|
||||||
/// If left unset, consistency is not checked. For maximum read
|
/// If left unset, consistency is not checked. For maximum read
|
||||||
@@ -994,10 +1053,13 @@ impl ConnectNamespaceBuilder {
|
|||||||
pub async fn execute(self) -> Result<Connection> {
|
pub async fn execute(self) -> Result<Connection> {
|
||||||
use crate::database::namespace::LanceNamespaceDatabase;
|
use crate::database::namespace::LanceNamespaceDatabase;
|
||||||
|
|
||||||
|
let mut properties = self.properties;
|
||||||
|
properties.extend(self.namespace_client_properties);
|
||||||
|
|
||||||
let internal = Arc::new(
|
let internal = Arc::new(
|
||||||
LanceNamespaceDatabase::connect(
|
LanceNamespaceDatabase::connect(
|
||||||
&self.ns_impl,
|
&self.ns_impl,
|
||||||
self.properties,
|
properties,
|
||||||
self.storage_options,
|
self.storage_options,
|
||||||
self.read_consistency_interval,
|
self.read_consistency_interval,
|
||||||
self.session,
|
self.session,
|
||||||
@@ -1117,6 +1179,31 @@ mod tests {
|
|||||||
assert_eq!(db.uri(), relative_uri.to_str().unwrap().to_string());
|
assert_eq!(db.uri(), relative_uri.to_str().unwrap().to_string());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_connect_with_namespace_client_properties() {
|
||||||
|
let tmp_dir = tempdir().unwrap();
|
||||||
|
let uri = tmp_dir.path().to_str().unwrap();
|
||||||
|
|
||||||
|
let db = connect(uri)
|
||||||
|
.namespace_client_property("table_version_tracking_enabled", "true")
|
||||||
|
.namespace_client_property("manifest_enabled", "true")
|
||||||
|
.execute()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let (ns_impl, properties) = db.namespace_client_config().await.unwrap();
|
||||||
|
assert_eq!(ns_impl, "dir");
|
||||||
|
assert_eq!(properties.get("root"), Some(&uri.to_string()));
|
||||||
|
assert_eq!(
|
||||||
|
properties.get("table_version_tracking_enabled"),
|
||||||
|
Some(&"true".to_string())
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
properties.get("manifest_enabled"),
|
||||||
|
Some(&"true".to_string())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_table_names() {
|
async fn test_table_names() {
|
||||||
let tc = new_test_connection().await.unwrap();
|
let tc = new_test_connection().await.unwrap();
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ use snafu::ResultExt;
|
|||||||
|
|
||||||
use crate::connection::ConnectRequest;
|
use crate::connection::ConnectRequest;
|
||||||
use crate::database::ReadConsistency;
|
use crate::database::ReadConsistency;
|
||||||
|
use crate::database::namespace::LanceNamespaceDatabase;
|
||||||
use crate::error::{CreateDirSnafu, Error, Result};
|
use crate::error::{CreateDirSnafu, Error, Result};
|
||||||
use crate::io::object_store::MirroringObjectStoreWrapper;
|
use crate::io::object_store::MirroringObjectStoreWrapper;
|
||||||
use crate::table::NativeTable;
|
use crate::table::NativeTable;
|
||||||
@@ -255,6 +256,9 @@ pub struct ListingDatabase {
|
|||||||
|
|
||||||
// Session for object stores and caching
|
// Session for object stores and caching
|
||||||
session: Arc<lance::session::Session>,
|
session: Arc<lance::session::Session>,
|
||||||
|
|
||||||
|
// Namespace-backed database for child namespace operations
|
||||||
|
namespace_database: Arc<LanceNamespaceDatabase>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for ListingDatabase {
|
impl std::fmt::Display for ListingDatabase {
|
||||||
@@ -281,6 +285,44 @@ const MIRRORED_STORE: &str = "mirroredStore";
|
|||||||
|
|
||||||
/// A connection to LanceDB
|
/// A connection to LanceDB
|
||||||
impl ListingDatabase {
|
impl ListingDatabase {
|
||||||
|
fn build_namespace_client_properties(
|
||||||
|
uri: &str,
|
||||||
|
storage_options: &HashMap<String, String>,
|
||||||
|
namespace_client_properties: HashMap<String, String>,
|
||||||
|
) -> HashMap<String, String> {
|
||||||
|
let mut properties = namespace_client_properties;
|
||||||
|
properties.insert("root".to_string(), uri.to_string());
|
||||||
|
for (key, value) in storage_options {
|
||||||
|
properties.insert(format!("storage.{}", key), value.clone());
|
||||||
|
}
|
||||||
|
properties
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connect_namespace_database(
|
||||||
|
uri: &str,
|
||||||
|
storage_options: HashMap<String, String>,
|
||||||
|
namespace_client_properties: HashMap<String, String>,
|
||||||
|
read_consistency_interval: Option<std::time::Duration>,
|
||||||
|
session: Arc<lance::session::Session>,
|
||||||
|
) -> Result<Arc<LanceNamespaceDatabase>> {
|
||||||
|
let ns_properties = Self::build_namespace_client_properties(
|
||||||
|
uri,
|
||||||
|
&storage_options,
|
||||||
|
namespace_client_properties,
|
||||||
|
);
|
||||||
|
Ok(Arc::new(
|
||||||
|
LanceNamespaceDatabase::connect(
|
||||||
|
"dir",
|
||||||
|
ns_properties,
|
||||||
|
storage_options,
|
||||||
|
read_consistency_interval,
|
||||||
|
Some(session),
|
||||||
|
HashSet::new(),
|
||||||
|
)
|
||||||
|
.await?,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
/// Connect to a listing database
|
/// Connect to a listing database
|
||||||
///
|
///
|
||||||
/// The URI should be a path to a directory where the tables are stored.
|
/// The URI should be a path to a directory where the tables are stored.
|
||||||
@@ -300,6 +342,7 @@ impl ListingDatabase {
|
|||||||
uri,
|
uri,
|
||||||
request.read_consistency_interval,
|
request.read_consistency_interval,
|
||||||
options.new_table_config,
|
options.new_table_config,
|
||||||
|
request.namespace_client_properties.clone(),
|
||||||
request.session.clone(),
|
request.session.clone(),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -387,6 +430,15 @@ impl ListingDatabase {
|
|||||||
None => None,
|
None => None,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let namespace_database = Self::connect_namespace_database(
|
||||||
|
&table_base_uri,
|
||||||
|
options.storage_options.clone(),
|
||||||
|
request.namespace_client_properties.clone(),
|
||||||
|
request.read_consistency_interval,
|
||||||
|
session.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
uri: table_base_uri,
|
uri: table_base_uri,
|
||||||
query_string,
|
query_string,
|
||||||
@@ -398,6 +450,7 @@ impl ListingDatabase {
|
|||||||
storage_options_provider: None,
|
storage_options_provider: None,
|
||||||
new_table_config: options.new_table_config,
|
new_table_config: options.new_table_config,
|
||||||
session,
|
session,
|
||||||
|
namespace_database,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
@@ -405,6 +458,7 @@ impl ListingDatabase {
|
|||||||
uri,
|
uri,
|
||||||
request.read_consistency_interval,
|
request.read_consistency_interval,
|
||||||
options.new_table_config,
|
options.new_table_config,
|
||||||
|
request.namespace_client_properties.clone(),
|
||||||
request.session.clone(),
|
request.session.clone(),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -416,6 +470,7 @@ impl ListingDatabase {
|
|||||||
path: &str,
|
path: &str,
|
||||||
read_consistency_interval: Option<std::time::Duration>,
|
read_consistency_interval: Option<std::time::Duration>,
|
||||||
new_table_config: NewTableConfig,
|
new_table_config: NewTableConfig,
|
||||||
|
namespace_client_properties: HashMap<String, String>,
|
||||||
session: Option<Arc<lance::session::Session>>,
|
session: Option<Arc<lance::session::Session>>,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let session = session.unwrap_or_else(|| Arc::new(lance::session::Session::default()));
|
let session = session.unwrap_or_else(|| Arc::new(lance::session::Session::default()));
|
||||||
@@ -429,6 +484,15 @@ impl ListingDatabase {
|
|||||||
Self::try_create_dir(path).context(CreateDirSnafu { path })?;
|
Self::try_create_dir(path).context(CreateDirSnafu { path })?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let namespace_database = Self::connect_namespace_database(
|
||||||
|
path,
|
||||||
|
HashMap::new(),
|
||||||
|
namespace_client_properties,
|
||||||
|
read_consistency_interval,
|
||||||
|
session.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
uri: path.to_string(),
|
uri: path.to_string(),
|
||||||
query_string: None,
|
query_string: None,
|
||||||
@@ -440,6 +504,7 @@ impl ListingDatabase {
|
|||||||
storage_options_provider: None,
|
storage_options_provider: None,
|
||||||
new_table_config,
|
new_table_config,
|
||||||
session,
|
session,
|
||||||
|
namespace_database,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -497,6 +562,10 @@ impl ListingDatabase {
|
|||||||
Ok(uri)
|
Ok(uri)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn namespace_database(&self) -> Arc<LanceNamespaceDatabase> {
|
||||||
|
self.namespace_database.clone()
|
||||||
|
}
|
||||||
|
|
||||||
async fn drop_tables(&self, names: Vec<String>) -> Result<()> {
|
async fn drop_tables(&self, names: Vec<String>) -> Result<()> {
|
||||||
let object_store_params = ObjectStoreParams {
|
let object_store_params = ObjectStoreParams {
|
||||||
storage_options_accessor: if self.storage_options.is_empty() {
|
storage_options_accessor: if self.storage_options.is_empty() {
|
||||||
@@ -696,16 +765,7 @@ impl Database for ListingDatabase {
|
|||||||
&self,
|
&self,
|
||||||
request: ListNamespacesRequest,
|
request: ListNamespacesRequest,
|
||||||
) -> Result<ListNamespacesResponse> {
|
) -> Result<ListNamespacesResponse> {
|
||||||
if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) {
|
self.namespace_database().list_namespaces(request).await
|
||||||
return Err(Error::NotSupported {
|
|
||||||
message: "Namespace operations are not supported for listing database".into(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(ListNamespacesResponse {
|
|
||||||
namespaces: Vec::new(),
|
|
||||||
page_token: None,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn uri(&self) -> &str {
|
fn uri(&self) -> &str {
|
||||||
@@ -726,36 +786,26 @@ impl Database for ListingDatabase {
|
|||||||
|
|
||||||
async fn create_namespace(
|
async fn create_namespace(
|
||||||
&self,
|
&self,
|
||||||
_request: CreateNamespaceRequest,
|
request: CreateNamespaceRequest,
|
||||||
) -> Result<CreateNamespaceResponse> {
|
) -> Result<CreateNamespaceResponse> {
|
||||||
Err(Error::NotSupported {
|
self.namespace_database().create_namespace(request).await
|
||||||
message: "Namespace operations are not supported for listing database".into(),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn drop_namespace(
|
async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
|
||||||
&self,
|
self.namespace_database().drop_namespace(request).await
|
||||||
_request: DropNamespaceRequest,
|
|
||||||
) -> Result<DropNamespaceResponse> {
|
|
||||||
Err(Error::NotSupported {
|
|
||||||
message: "Namespace operations are not supported for listing database".into(),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn describe_namespace(
|
async fn describe_namespace(
|
||||||
&self,
|
&self,
|
||||||
_request: DescribeNamespaceRequest,
|
request: DescribeNamespaceRequest,
|
||||||
) -> Result<DescribeNamespaceResponse> {
|
) -> Result<DescribeNamespaceResponse> {
|
||||||
Err(Error::NotSupported {
|
self.namespace_database().describe_namespace(request).await
|
||||||
message: "Namespace operations are not supported for listing database".into(),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(deprecated)]
|
||||||
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> {
|
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> {
|
||||||
if !request.namespace_path.is_empty() {
|
if !request.namespace_path.is_empty() {
|
||||||
return Err(Error::NotSupported {
|
return self.namespace_database().table_names(request).await;
|
||||||
message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(),
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
let mut f = self
|
let mut f = self
|
||||||
.object_store
|
.object_store
|
||||||
@@ -788,9 +838,7 @@ impl Database for ListingDatabase {
|
|||||||
|
|
||||||
async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
|
async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
|
||||||
if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) {
|
if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) {
|
||||||
return Err(Error::NotSupported {
|
return self.namespace_database().list_tables(request).await;
|
||||||
message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(),
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
let mut f = self
|
let mut f = self
|
||||||
.object_store
|
.object_store
|
||||||
@@ -838,11 +886,8 @@ impl Database for ListingDatabase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> {
|
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> {
|
||||||
// When namespace is not empty, location must be provided
|
if !request.namespace_path.is_empty() {
|
||||||
if !request.namespace_path.is_empty() && request.location.is_none() {
|
return self.namespace_database().create_table(request).await;
|
||||||
return Err(Error::InvalidInput {
|
|
||||||
message: "Location must be provided when namespace is not empty".into(),
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
// Use provided location if available, otherwise derive from table name
|
// Use provided location if available, otherwise derive from table name
|
||||||
let table_uri = request
|
let table_uri = request
|
||||||
@@ -959,11 +1004,8 @@ impl Database for ListingDatabase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn open_table(&self, mut request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
|
async fn open_table(&self, mut request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
|
||||||
// When namespace is not empty, location must be provided
|
if !request.namespace_path.is_empty() {
|
||||||
if !request.namespace_path.is_empty() && request.location.is_none() {
|
return self.namespace_database().open_table(request).await;
|
||||||
return Err(Error::InvalidInput {
|
|
||||||
message: "Location must be provided when namespace is not empty".into(),
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
// Use provided location if available, otherwise derive from table name
|
// Use provided location if available, otherwise derive from table name
|
||||||
let table_uri = request
|
let table_uri = request
|
||||||
@@ -1059,9 +1101,10 @@ impl Database for ListingDatabase {
|
|||||||
|
|
||||||
async fn drop_table(&self, name: &str, namespace_path: &[String]) -> Result<()> {
|
async fn drop_table(&self, name: &str, namespace_path: &[String]) -> Result<()> {
|
||||||
if !namespace_path.is_empty() {
|
if !namespace_path.is_empty() {
|
||||||
return Err(Error::NotSupported {
|
return self
|
||||||
message: "Namespace parameter is not supported for listing database.".into(),
|
.namespace_database()
|
||||||
});
|
.drop_table(name, namespace_path)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
self.drop_tables(vec![name.to_string()]).await
|
self.drop_tables(vec![name.to_string()]).await
|
||||||
}
|
}
|
||||||
@@ -1070,9 +1113,10 @@ impl Database for ListingDatabase {
|
|||||||
async fn drop_all_tables(&self, namespace_path: &[String]) -> Result<()> {
|
async fn drop_all_tables(&self, namespace_path: &[String]) -> Result<()> {
|
||||||
// Check if namespace parameter is provided
|
// Check if namespace parameter is provided
|
||||||
if !namespace_path.is_empty() {
|
if !namespace_path.is_empty() {
|
||||||
return Err(Error::NotSupported {
|
return self
|
||||||
message: "Namespace parameter is not supported for listing database.".into(),
|
.namespace_database()
|
||||||
});
|
.drop_all_tables(namespace_path)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
let tables = self.table_names(TableNamesRequest::default()).await?;
|
let tables = self.table_names(TableNamesRequest::default()).await?;
|
||||||
self.drop_tables(tables).await
|
self.drop_tables(tables).await
|
||||||
@@ -1083,30 +1127,11 @@ impl Database for ListingDatabase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn namespace_client(&self) -> Result<Arc<dyn lance_namespace::LanceNamespace>> {
|
async fn namespace_client(&self) -> Result<Arc<dyn lance_namespace::LanceNamespace>> {
|
||||||
// Create a DirectoryNamespace pointing to the same root with the same storage options
|
self.namespace_database.namespace_client().await
|
||||||
let mut builder = lance_namespace_impls::DirectoryNamespaceBuilder::new(&self.uri);
|
|
||||||
|
|
||||||
// Add storage options
|
|
||||||
if !self.storage_options.is_empty() {
|
|
||||||
builder = builder.storage_options(self.storage_options.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use the same session
|
|
||||||
builder = builder.session(self.session.clone());
|
|
||||||
|
|
||||||
let namespace = builder.build().await.map_err(|e| Error::Runtime {
|
|
||||||
message: format!("Failed to create namespace client: {}", e),
|
|
||||||
})?;
|
|
||||||
Ok(Arc::new(namespace) as Arc<dyn lance_namespace::LanceNamespace>)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn namespace_client_config(&self) -> Result<(String, HashMap<String, String>)> {
|
async fn namespace_client_config(&self) -> Result<(String, HashMap<String, String>)> {
|
||||||
let mut properties = HashMap::new();
|
self.namespace_database.namespace_client_config().await
|
||||||
properties.insert("root".to_string(), self.uri.clone());
|
|
||||||
for (key, value) in &self.storage_options {
|
|
||||||
properties.insert(format!("storage.{}", key), value.clone());
|
|
||||||
}
|
|
||||||
Ok(("dir".to_string(), properties))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1132,6 +1157,7 @@ mod tests {
|
|||||||
#[cfg(feature = "remote")]
|
#[cfg(feature = "remote")]
|
||||||
client_config: Default::default(),
|
client_config: Default::default(),
|
||||||
options: Default::default(),
|
options: Default::default(),
|
||||||
|
namespace_client_properties: Default::default(),
|
||||||
read_consistency_interval: None,
|
read_consistency_interval: None,
|
||||||
session: None,
|
session: None,
|
||||||
};
|
};
|
||||||
@@ -1265,6 +1291,7 @@ mod tests {
|
|||||||
#[cfg(feature = "remote")]
|
#[cfg(feature = "remote")]
|
||||||
client_config: Default::default(),
|
client_config: Default::default(),
|
||||||
options: options.clone(),
|
options: options.clone(),
|
||||||
|
namespace_client_properties: Default::default(),
|
||||||
read_consistency_interval: None,
|
read_consistency_interval: None,
|
||||||
session: None,
|
session: None,
|
||||||
};
|
};
|
||||||
@@ -1799,6 +1826,7 @@ mod tests {
|
|||||||
#[cfg(feature = "remote")]
|
#[cfg(feature = "remote")]
|
||||||
client_config: Default::default(),
|
client_config: Default::default(),
|
||||||
options,
|
options,
|
||||||
|
namespace_client_properties: Default::default(),
|
||||||
read_consistency_interval: None,
|
read_consistency_interval: None,
|
||||||
session: None,
|
session: None,
|
||||||
};
|
};
|
||||||
@@ -1904,6 +1932,7 @@ mod tests {
|
|||||||
#[cfg(feature = "remote")]
|
#[cfg(feature = "remote")]
|
||||||
client_config: Default::default(),
|
client_config: Default::default(),
|
||||||
options,
|
options,
|
||||||
|
namespace_client_properties: Default::default(),
|
||||||
read_consistency_interval: None,
|
read_consistency_interval: None,
|
||||||
session: None,
|
session: None,
|
||||||
};
|
};
|
||||||
@@ -1975,6 +2004,7 @@ mod tests {
|
|||||||
#[cfg(feature = "remote")]
|
#[cfg(feature = "remote")]
|
||||||
client_config: Default::default(),
|
client_config: Default::default(),
|
||||||
options,
|
options,
|
||||||
|
namespace_client_properties: Default::default(),
|
||||||
read_consistency_interval: None,
|
read_consistency_interval: None,
|
||||||
session: None,
|
session: None,
|
||||||
};
|
};
|
||||||
@@ -2108,4 +2138,208 @@ mod tests {
|
|||||||
assert!(tables.contains(&"table1".to_string()));
|
assert!(tables.contains(&"table1".to_string()));
|
||||||
assert!(tables.contains(&"table2".to_string()));
|
assert!(tables.contains(&"table2".to_string()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_listing_database_namespace_operations() {
|
||||||
|
let (_tempdir, db) = setup_database().await;
|
||||||
|
|
||||||
|
db.create_namespace(CreateNamespaceRequest {
|
||||||
|
id: Some(vec!["parent".to_string()]),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
db.create_namespace(CreateNamespaceRequest {
|
||||||
|
id: Some(vec!["parent".to_string(), "child".to_string()]),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let root_namespaces = db
|
||||||
|
.list_namespaces(ListNamespacesRequest {
|
||||||
|
id: Some(vec![]),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(root_namespaces.namespaces.contains(&"parent".to_string()));
|
||||||
|
|
||||||
|
let child_namespaces = db
|
||||||
|
.list_namespaces(ListNamespacesRequest {
|
||||||
|
id: Some(vec!["parent".to_string()]),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(child_namespaces.namespaces.contains(&"child".to_string()));
|
||||||
|
|
||||||
|
db.describe_namespace(DescribeNamespaceRequest {
|
||||||
|
id: Some(vec!["parent".to_string(), "child".to_string()]),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_listing_database_with_namespace_client_properties() {
|
||||||
|
let tempdir = tempdir().unwrap();
|
||||||
|
let uri = tempdir.path().to_str().unwrap();
|
||||||
|
|
||||||
|
let mut namespace_client_properties = HashMap::new();
|
||||||
|
namespace_client_properties.insert(
|
||||||
|
"table_version_tracking_enabled".to_string(),
|
||||||
|
"true".to_string(),
|
||||||
|
);
|
||||||
|
namespace_client_properties.insert("manifest_enabled".to_string(), "true".to_string());
|
||||||
|
|
||||||
|
let request = ConnectRequest {
|
||||||
|
uri: uri.to_string(),
|
||||||
|
#[cfg(feature = "remote")]
|
||||||
|
client_config: Default::default(),
|
||||||
|
options: Default::default(),
|
||||||
|
namespace_client_properties,
|
||||||
|
read_consistency_interval: None,
|
||||||
|
session: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let db = ListingDatabase::connect_with_options(&request)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let namespace_path = vec!["test_ns".to_string()];
|
||||||
|
|
||||||
|
db.create_namespace(CreateNamespaceRequest {
|
||||||
|
id: Some(namespace_path.clone()),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let schema = Arc::new(Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int32, false),
|
||||||
|
Field::new("name", DataType::Utf8, false),
|
||||||
|
]));
|
||||||
|
|
||||||
|
db.create_table(CreateTableRequest {
|
||||||
|
name: "managed_table".to_string(),
|
||||||
|
namespace_path: namespace_path.clone(),
|
||||||
|
data: Box::new(RecordBatch::new_empty(schema)) as Box<dyn Scannable>,
|
||||||
|
mode: CreateTableMode::Create,
|
||||||
|
write_options: Default::default(),
|
||||||
|
location: None,
|
||||||
|
namespace_client: None,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let namespace_client = db.namespace_client().await.unwrap();
|
||||||
|
let describe = namespace_client
|
||||||
|
.describe_table(lance_namespace::models::DescribeTableRequest {
|
||||||
|
id: Some(vec!["test_ns".to_string(), "managed_table".to_string()]),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(describe.managed_versioning, Some(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_listing_database_nested_namespace_table_ops() {
|
||||||
|
let (_tempdir, db) = setup_database().await;
|
||||||
|
let namespace_path = vec!["parent".to_string(), "child".to_string()];
|
||||||
|
|
||||||
|
db.create_namespace(CreateNamespaceRequest {
|
||||||
|
id: Some(vec!["parent".to_string()]),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
db.create_namespace(CreateNamespaceRequest {
|
||||||
|
id: Some(namespace_path.clone()),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let schema = Arc::new(Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int32, false),
|
||||||
|
Field::new("name", DataType::Utf8, false),
|
||||||
|
]));
|
||||||
|
|
||||||
|
db.create_table(CreateTableRequest {
|
||||||
|
name: "nested_table".to_string(),
|
||||||
|
namespace_path: namespace_path.clone(),
|
||||||
|
data: Box::new(RecordBatch::new_empty(schema)) as Box<dyn Scannable>,
|
||||||
|
mode: CreateTableMode::Create,
|
||||||
|
write_options: Default::default(),
|
||||||
|
location: None,
|
||||||
|
namespace_client: None,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let namespace_client = db.namespace_client().await.unwrap();
|
||||||
|
let describe = namespace_client
|
||||||
|
.describe_table(lance_namespace::models::DescribeTableRequest {
|
||||||
|
id: Some(vec![
|
||||||
|
"parent".to_string(),
|
||||||
|
"child".to_string(),
|
||||||
|
"nested_table".to_string(),
|
||||||
|
]),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(describe.location.is_some());
|
||||||
|
|
||||||
|
let table = db
|
||||||
|
.open_table(OpenTableRequest {
|
||||||
|
name: "nested_table".to_string(),
|
||||||
|
namespace_path: namespace_path.clone(),
|
||||||
|
index_cache_size: None,
|
||||||
|
lance_read_params: None,
|
||||||
|
location: None,
|
||||||
|
namespace_client: None,
|
||||||
|
managed_versioning: None,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(table.name(), "nested_table");
|
||||||
|
|
||||||
|
#[allow(deprecated)]
|
||||||
|
let table_names = db
|
||||||
|
.table_names(TableNamesRequest {
|
||||||
|
namespace_path: namespace_path.clone(),
|
||||||
|
start_after: None,
|
||||||
|
limit: None,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(table_names, vec!["nested_table".to_string()]);
|
||||||
|
|
||||||
|
let list_tables = db
|
||||||
|
.list_tables(ListTablesRequest {
|
||||||
|
id: Some(namespace_path.clone()),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(list_tables.tables, vec!["nested_table".to_string()]);
|
||||||
|
|
||||||
|
db.drop_table("nested_table", &namespace_path)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let post_drop = db
|
||||||
|
.list_tables(ListTablesRequest {
|
||||||
|
id: Some(namespace_path),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(post_drop.tables.is_empty());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -450,6 +450,47 @@ mod tests {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_namespace_connection_with_namespace_client_properties() {
|
||||||
|
let tmp_dir = tempdir().unwrap();
|
||||||
|
let root_path = tmp_dir.path().to_str().unwrap().to_string();
|
||||||
|
|
||||||
|
let mut properties = HashMap::new();
|
||||||
|
properties.insert("root".to_string(), root_path);
|
||||||
|
|
||||||
|
let conn = connect_namespace("dir", properties)
|
||||||
|
.namespace_client_property("table_version_tracking_enabled", "true")
|
||||||
|
.namespace_client_property("manifest_enabled", "true")
|
||||||
|
.execute()
|
||||||
|
.await
|
||||||
|
.expect("Failed to connect to namespace");
|
||||||
|
|
||||||
|
conn.create_namespace(CreateNamespaceRequest {
|
||||||
|
id: Some(vec!["test_ns".into()]),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("Failed to create namespace");
|
||||||
|
|
||||||
|
let test_data = create_test_data();
|
||||||
|
conn.create_table("test_table", test_data)
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
|
.execute()
|
||||||
|
.await
|
||||||
|
.expect("Failed to create table");
|
||||||
|
|
||||||
|
let namespace_client = conn.namespace_client().await.unwrap();
|
||||||
|
let describe = namespace_client
|
||||||
|
.describe_table(DescribeTableRequest {
|
||||||
|
id: Some(vec!["test_ns".into(), "test_table".into()]),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("Failed to describe table");
|
||||||
|
|
||||||
|
assert_eq!(describe.managed_versioning, Some(true));
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_namespace_create_table_basic() {
|
async fn test_namespace_create_table_basic() {
|
||||||
// Setup: Create a temporary directory for the namespace
|
// Setup: Create a temporary directory for the namespace
|
||||||
|
|||||||
Reference in New Issue
Block a user