mirror of
https://github.com/lancedb/lancedb.git
synced 2025-12-22 21:09:58 +00:00
feat(rust): support namespace backed database (#2664)
This PR adds support for namespace-backed databases through lance-namespace integration, enabling centralized table management through namespace APIs. --------- Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
1466
Cargo.lock
generated
1466
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -23,6 +23,7 @@ lance-table = "=0.37.0"
|
||||
lance-testing = "=0.37.0"
|
||||
lance-datafusion = "=0.37.0"
|
||||
lance-encoding = "=0.37.0"
|
||||
lance-namespace = "0.0.15"
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "55.1", optional = false }
|
||||
arrow-array = "55.1"
|
||||
|
||||
@@ -36,6 +36,7 @@ lance-table = { workspace = true }
|
||||
lance-linalg = { workspace = true }
|
||||
lance-testing = { workspace = true }
|
||||
lance-encoding = { workspace = true }
|
||||
lance-namespace = { workspace = true }
|
||||
moka = { workspace = true }
|
||||
pin-project = { workspace = true }
|
||||
tokio = { version = "1.23", features = ["rt-multi-thread"] }
|
||||
|
||||
@@ -1015,6 +1015,117 @@ pub fn connect(uri: &str) -> ConnectBuilder {
|
||||
ConnectBuilder::new(uri)
|
||||
}
|
||||
|
||||
pub struct ConnectNamespaceBuilder {
|
||||
ns_impl: String,
|
||||
properties: HashMap<String, String>,
|
||||
storage_options: HashMap<String, String>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
|
||||
session: Option<Arc<lance::session::Session>>,
|
||||
}
|
||||
|
||||
impl ConnectNamespaceBuilder {
|
||||
fn new(ns_impl: &str, properties: HashMap<String, String>) -> Self {
|
||||
Self {
|
||||
ns_impl: ns_impl.to_string(),
|
||||
properties,
|
||||
storage_options: HashMap::new(),
|
||||
read_consistency_interval: None,
|
||||
embedding_registry: None,
|
||||
session: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Set an option for the storage layer.
|
||||
///
|
||||
/// See available options at <https://lancedb.github.io/lancedb/guides/storage/>
|
||||
pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
|
||||
self.storage_options.insert(key.into(), value.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// Set multiple options for the storage layer.
|
||||
///
|
||||
/// See available options at <https://lancedb.github.io/lancedb/guides/storage/>
|
||||
pub fn storage_options(
|
||||
mut self,
|
||||
pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
|
||||
) -> Self {
|
||||
for (key, value) in pairs {
|
||||
self.storage_options.insert(key.into(), value.into());
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// The interval at which to check for updates from other processes.
|
||||
///
|
||||
/// If left unset, consistency is not checked. For maximum read
|
||||
/// performance, this is the default. For strong consistency, set this to
|
||||
/// zero seconds. Then every read will check for updates from other processes.
|
||||
/// As a compromise, set this to a non-zero duration for eventual consistency.
|
||||
pub fn read_consistency_interval(
|
||||
mut self,
|
||||
read_consistency_interval: std::time::Duration,
|
||||
) -> Self {
|
||||
self.read_consistency_interval = Some(read_consistency_interval);
|
||||
self
|
||||
}
|
||||
|
||||
/// Provide a custom [`EmbeddingRegistry`] to use for this connection.
|
||||
pub fn embedding_registry(mut self, registry: Arc<dyn EmbeddingRegistry>) -> Self {
|
||||
self.embedding_registry = Some(registry);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set a custom session for object stores and caching.
|
||||
///
|
||||
/// By default, a new session with default configuration will be created.
|
||||
/// This method allows you to provide a custom session with your own
|
||||
/// configuration for object store registries, caching, etc.
|
||||
pub fn session(mut self, session: Arc<lance::session::Session>) -> Self {
|
||||
self.session = Some(session);
|
||||
self
|
||||
}
|
||||
|
||||
/// Execute the connection
|
||||
pub async fn execute(self) -> Result<Connection> {
|
||||
use crate::database::namespace::LanceNamespaceDatabase;
|
||||
|
||||
let internal = Arc::new(
|
||||
LanceNamespaceDatabase::connect(
|
||||
&self.ns_impl,
|
||||
self.properties,
|
||||
self.storage_options,
|
||||
self.read_consistency_interval,
|
||||
self.session,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
||||
Ok(Connection {
|
||||
internal,
|
||||
uri: format!("namespace://{}", self.ns_impl),
|
||||
embedding_registry: self
|
||||
.embedding_registry
|
||||
.unwrap_or_else(|| Arc::new(MemoryRegistry::new())),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Connect to a LanceDB database through a namespace.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `ns_impl` - The namespace implementation to use (e.g., "dir" for directory-based, "rest" for REST API)
|
||||
/// * `properties` - Configuration properties for the namespace implementation
|
||||
/// ```
|
||||
pub fn connect_namespace(
|
||||
ns_impl: &str,
|
||||
properties: HashMap<String, String>,
|
||||
) -> ConnectNamespaceBuilder {
|
||||
ConnectNamespaceBuilder::new(ns_impl, properties)
|
||||
}
|
||||
|
||||
#[cfg(all(test, feature = "remote"))]
|
||||
mod test_utils {
|
||||
use super::*;
|
||||
|
||||
@@ -29,6 +29,7 @@ use crate::error::Result;
|
||||
use crate::table::{BaseTable, TableDefinition, WriteOptions};
|
||||
|
||||
pub mod listing;
|
||||
pub mod namespace;
|
||||
|
||||
pub trait DatabaseOptions {
|
||||
fn serialize_into_map(&self, map: &mut HashMap<String, String>);
|
||||
|
||||
840
rust/lancedb/src/database/namespace.rs
Normal file
840
rust/lancedb/src/database/namespace.rs
Normal file
@@ -0,0 +1,840 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
//! Namespace-based database implementation that delegates table management to lance-namespace
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use lance_namespace::{
|
||||
connect as connect_namespace,
|
||||
models::{
|
||||
CreateEmptyTableRequest, CreateNamespaceRequest, DescribeTableRequest,
|
||||
DropNamespaceRequest, DropTableRequest, ListNamespacesRequest, ListTablesRequest,
|
||||
},
|
||||
LanceNamespace,
|
||||
};
|
||||
|
||||
use crate::connection::ConnectRequest;
|
||||
use crate::database::listing::ListingDatabase;
|
||||
use crate::error::{Error, Result};
|
||||
|
||||
use super::{
|
||||
BaseTable, CloneTableRequest, CreateNamespaceRequest as DbCreateNamespaceRequest,
|
||||
CreateTableMode, CreateTableRequest as DbCreateTableRequest, Database,
|
||||
DropNamespaceRequest as DbDropNamespaceRequest,
|
||||
ListNamespacesRequest as DbListNamespacesRequest, OpenTableRequest, TableNamesRequest,
|
||||
};
|
||||
|
||||
/// A database implementation that uses lance-namespace for table management
|
||||
pub struct LanceNamespaceDatabase {
|
||||
namespace: Arc<dyn LanceNamespace>,
|
||||
// Storage options to be inherited by tables
|
||||
storage_options: HashMap<String, String>,
|
||||
// Read consistency interval for tables
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
// Optional session for object stores and caching
|
||||
session: Option<Arc<lance::session::Session>>,
|
||||
}
|
||||
|
||||
impl LanceNamespaceDatabase {
|
||||
pub async fn connect(
|
||||
ns_impl: &str,
|
||||
ns_properties: HashMap<String, String>,
|
||||
storage_options: HashMap<String, String>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
session: Option<Arc<lance::session::Session>>,
|
||||
) -> Result<Self> {
|
||||
let namespace = connect_namespace(ns_impl, ns_properties.clone())
|
||||
.await
|
||||
.map_err(|e| Error::InvalidInput {
|
||||
message: format!("Failed to connect to namespace: {:?}", e),
|
||||
})?;
|
||||
|
||||
Ok(Self {
|
||||
namespace,
|
||||
storage_options,
|
||||
read_consistency_interval,
|
||||
session,
|
||||
})
|
||||
}
|
||||
|
||||
/// Helper method to create a ListingDatabase from a table location
|
||||
///
|
||||
/// This method:
|
||||
/// 1. Validates that the location ends with <table_name>.lance
|
||||
/// 2. Extracts the parent directory from the location
|
||||
/// 3. Creates a ListingDatabase at that parent directory
|
||||
async fn create_listing_database(
|
||||
&self,
|
||||
table_name: &str,
|
||||
location: &str,
|
||||
additional_storage_options: Option<HashMap<String, String>>,
|
||||
) -> Result<Arc<ListingDatabase>> {
|
||||
let expected_suffix = format!("{}.lance", table_name);
|
||||
if !location.ends_with(&expected_suffix) {
|
||||
return Err(Error::Runtime {
|
||||
message: format!(
|
||||
"Invalid table location '{}': expected to end with '{}'",
|
||||
location, expected_suffix
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
let parent_dir = location
|
||||
.rsplit_once('/')
|
||||
.map(|(parent, _)| parent.to_string())
|
||||
.ok_or_else(|| Error::Runtime {
|
||||
message: format!("Invalid table location '{}': no parent directory", location),
|
||||
})?;
|
||||
|
||||
let mut merged_storage_options = self.storage_options.clone();
|
||||
if let Some(opts) = additional_storage_options {
|
||||
merged_storage_options.extend(opts);
|
||||
}
|
||||
|
||||
let connect_request = ConnectRequest {
|
||||
uri: parent_dir,
|
||||
options: merged_storage_options,
|
||||
read_consistency_interval: self.read_consistency_interval,
|
||||
session: self.session.clone(),
|
||||
#[cfg(feature = "remote")]
|
||||
client_config: Default::default(),
|
||||
};
|
||||
|
||||
let listing_db = ListingDatabase::connect_with_options(&connect_request)
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to create listing database: {}", e),
|
||||
})?;
|
||||
|
||||
Ok(Arc::new(listing_db))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for LanceNamespaceDatabase {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("LanceNamespaceDatabase")
|
||||
.field("storage_options", &self.storage_options)
|
||||
.field("read_consistency_interval", &self.read_consistency_interval)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for LanceNamespaceDatabase {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "LanceNamespaceDatabase")
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Database for LanceNamespaceDatabase {
|
||||
async fn list_namespaces(&self, request: DbListNamespacesRequest) -> Result<Vec<String>> {
|
||||
let ns_request = ListNamespacesRequest {
|
||||
id: if request.namespace.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(request.namespace)
|
||||
},
|
||||
page_token: request.page_token,
|
||||
limit: request.limit.map(|l| l as i32),
|
||||
};
|
||||
|
||||
let response = self
|
||||
.namespace
|
||||
.list_namespaces(ns_request)
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to list namespaces: {}", e),
|
||||
})?;
|
||||
|
||||
Ok(response.namespaces)
|
||||
}
|
||||
|
||||
async fn create_namespace(&self, request: DbCreateNamespaceRequest) -> Result<()> {
|
||||
let ns_request = CreateNamespaceRequest {
|
||||
id: if request.namespace.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(request.namespace)
|
||||
},
|
||||
mode: None,
|
||||
properties: None,
|
||||
};
|
||||
|
||||
self.namespace
|
||||
.create_namespace(ns_request)
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to create namespace: {}", e),
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn drop_namespace(&self, request: DbDropNamespaceRequest) -> Result<()> {
|
||||
let ns_request = DropNamespaceRequest {
|
||||
id: if request.namespace.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(request.namespace)
|
||||
},
|
||||
mode: None,
|
||||
behavior: None,
|
||||
};
|
||||
|
||||
self.namespace
|
||||
.drop_namespace(ns_request)
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to drop namespace: {}", e),
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> {
|
||||
let ns_request = ListTablesRequest {
|
||||
id: if request.namespace.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(request.namespace)
|
||||
},
|
||||
page_token: request.start_after,
|
||||
limit: request.limit.map(|l| l as i32),
|
||||
};
|
||||
|
||||
let response =
|
||||
self.namespace
|
||||
.list_tables(ns_request)
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to list tables: {}", e),
|
||||
})?;
|
||||
|
||||
Ok(response.tables)
|
||||
}
|
||||
|
||||
async fn create_table(&self, request: DbCreateTableRequest) -> Result<Arc<dyn BaseTable>> {
|
||||
let mut table_id = request.namespace.clone();
|
||||
table_id.push(request.name.clone());
|
||||
let describe_request = DescribeTableRequest {
|
||||
id: Some(table_id.clone()),
|
||||
version: None,
|
||||
};
|
||||
|
||||
let describe_result = self.namespace.describe_table(describe_request).await;
|
||||
|
||||
match request.mode {
|
||||
CreateTableMode::Create => {
|
||||
if describe_result.is_ok() {
|
||||
return Err(Error::TableAlreadyExists {
|
||||
name: request.name.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
CreateTableMode::Overwrite => {
|
||||
if describe_result.is_ok() {
|
||||
// Drop the existing table - must succeed
|
||||
let drop_request = DropTableRequest {
|
||||
id: Some(table_id.clone()),
|
||||
};
|
||||
self.namespace
|
||||
.drop_table(drop_request)
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to drop existing table for overwrite: {}", e),
|
||||
})?;
|
||||
}
|
||||
}
|
||||
CreateTableMode::ExistOk(_) => {
|
||||
if let Ok(response) = describe_result {
|
||||
let location = response.location.ok_or_else(|| Error::Runtime {
|
||||
message: "Table location is missing from namespace response".to_string(),
|
||||
})?;
|
||||
|
||||
let listing_db = self
|
||||
.create_listing_database(&request.name, &location, response.storage_options)
|
||||
.await?;
|
||||
|
||||
return listing_db
|
||||
.open_table(OpenTableRequest {
|
||||
name: request.name.clone(),
|
||||
namespace: request.namespace.clone(),
|
||||
index_cache_size: None,
|
||||
lance_read_params: None,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut table_id = request.namespace.clone();
|
||||
table_id.push(request.name.clone());
|
||||
|
||||
let create_empty_request = CreateEmptyTableRequest {
|
||||
id: Some(table_id),
|
||||
location: None,
|
||||
properties: if self.storage_options.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(self.storage_options.clone())
|
||||
},
|
||||
};
|
||||
|
||||
let create_empty_response = self
|
||||
.namespace
|
||||
.create_empty_table(create_empty_request)
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to create empty table: {}", e),
|
||||
})?;
|
||||
|
||||
let location = create_empty_response
|
||||
.location
|
||||
.ok_or_else(|| Error::Runtime {
|
||||
message: "Table location is missing from create_empty_table response".to_string(),
|
||||
})?;
|
||||
|
||||
let listing_db = self
|
||||
.create_listing_database(
|
||||
&request.name,
|
||||
&location,
|
||||
create_empty_response.storage_options,
|
||||
)
|
||||
.await?;
|
||||
|
||||
listing_db.create_table(request).await
|
||||
}
|
||||
|
||||
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
|
||||
let mut table_id = request.namespace.clone();
|
||||
table_id.push(request.name.clone());
|
||||
|
||||
let describe_request = DescribeTableRequest {
|
||||
id: Some(table_id),
|
||||
version: None,
|
||||
};
|
||||
let response = self
|
||||
.namespace
|
||||
.describe_table(describe_request)
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to describe table: {}", e),
|
||||
})?;
|
||||
|
||||
let location = response.location.ok_or_else(|| Error::Runtime {
|
||||
message: "Table location is missing from namespace response".to_string(),
|
||||
})?;
|
||||
|
||||
let listing_db = self
|
||||
.create_listing_database(&request.name, &location, response.storage_options)
|
||||
.await?;
|
||||
|
||||
listing_db.open_table(request).await
|
||||
}
|
||||
|
||||
async fn clone_table(&self, _request: CloneTableRequest) -> Result<Arc<dyn BaseTable>> {
|
||||
Err(Error::NotSupported {
|
||||
message: "clone_table is not supported for namespace connections".to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn rename_table(
|
||||
&self,
|
||||
_cur_name: &str,
|
||||
_new_name: &str,
|
||||
_cur_namespace: &[String],
|
||||
_new_namespace: &[String],
|
||||
) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "rename_table is not supported for namespace connections".to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn drop_table(&self, name: &str, namespace: &[String]) -> Result<()> {
|
||||
let mut table_id = namespace.to_vec();
|
||||
table_id.push(name.to_string());
|
||||
|
||||
let drop_request = DropTableRequest { id: Some(table_id) };
|
||||
self.namespace
|
||||
.drop_table(drop_request)
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to drop table: {}", e),
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> {
|
||||
let tables = self
|
||||
.table_names(TableNamesRequest {
|
||||
namespace: namespace.to_vec(),
|
||||
start_after: None,
|
||||
limit: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
for table in tables {
|
||||
self.drop_table(&table, namespace).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(not(windows))] // TODO: support windows for lance-namespace
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::connect_namespace;
|
||||
use crate::query::ExecutableQuery;
|
||||
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray};
|
||||
use arrow_schema::{DataType, Field, Schema};
|
||||
use futures::TryStreamExt;
|
||||
use tempfile::tempdir;
|
||||
|
||||
/// Helper function to create test data
|
||||
fn create_test_data() -> RecordBatchIterator<
|
||||
std::vec::IntoIter<std::result::Result<RecordBatch, arrow_schema::ArrowError>>,
|
||||
> {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int32, false),
|
||||
Field::new("name", DataType::Utf8, false),
|
||||
]));
|
||||
|
||||
let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
|
||||
let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie", "David", "Eve"]);
|
||||
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(id_array), Arc::new(name_array)],
|
||||
)
|
||||
.unwrap();
|
||||
RecordBatchIterator::new(vec![std::result::Result::Ok(batch)].into_iter(), schema)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_connection_simple() {
|
||||
// Test that namespace connections work with simple connect_namespace(impl_type, properties)
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let root_path = tmp_dir.path().to_str().unwrap().to_string();
|
||||
|
||||
let mut properties = HashMap::new();
|
||||
properties.insert("root".to_string(), root_path);
|
||||
|
||||
// This should succeed with directory-based namespace
|
||||
let result = connect_namespace("dir", properties).execute().await;
|
||||
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_connection_with_storage_options() {
|
||||
// Test namespace connections with storage options
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let root_path = tmp_dir.path().to_str().unwrap().to_string();
|
||||
|
||||
let mut properties = HashMap::new();
|
||||
properties.insert("root".to_string(), root_path);
|
||||
|
||||
// This should succeed with directory-based namespace and storage options
|
||||
let result = connect_namespace("dir", properties)
|
||||
.storage_option("timeout", "30s")
|
||||
.execute()
|
||||
.await;
|
||||
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_connection_with_all_options() {
|
||||
use crate::embeddings::MemoryRegistry;
|
||||
use std::time::Duration;
|
||||
|
||||
// Test namespace connections with all configuration options
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let root_path = tmp_dir.path().to_str().unwrap().to_string();
|
||||
|
||||
let mut properties = HashMap::new();
|
||||
properties.insert("root".to_string(), root_path);
|
||||
|
||||
let embedding_registry = Arc::new(MemoryRegistry::new());
|
||||
let session = Arc::new(lance::session::Session::default());
|
||||
|
||||
// Test with all options set
|
||||
let result = connect_namespace("dir", properties)
|
||||
.storage_option("timeout", "30s")
|
||||
.storage_options([("cache_size", "1gb"), ("region", "us-east-1")])
|
||||
.read_consistency_interval(Duration::from_secs(5))
|
||||
.embedding_registry(embedding_registry.clone())
|
||||
.session(session.clone())
|
||||
.execute()
|
||||
.await;
|
||||
|
||||
assert!(result.is_ok());
|
||||
|
||||
let conn = result.unwrap();
|
||||
|
||||
// Verify embedding registry is set correctly
|
||||
assert!(std::ptr::eq(
|
||||
conn.embedding_registry() as *const _,
|
||||
embedding_registry.as_ref() as *const _
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_create_table_basic() {
|
||||
// Setup: Create a temporary directory for the namespace
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let root_path = tmp_dir.path().to_str().unwrap().to_string();
|
||||
|
||||
// Connect to namespace using DirectoryNamespace
|
||||
let mut properties = HashMap::new();
|
||||
properties.insert("root".to_string(), root_path);
|
||||
|
||||
let conn = connect_namespace("dir", properties)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to connect to namespace");
|
||||
|
||||
// Test: Create a table
|
||||
let test_data = create_test_data();
|
||||
let table = conn
|
||||
.create_table("test_table", test_data)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to create table");
|
||||
|
||||
// Verify: Table was created and can be queried
|
||||
let results = table
|
||||
.query()
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to query table")
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.expect("Failed to collect results");
|
||||
|
||||
assert_eq!(results.len(), 1);
|
||||
assert_eq!(results[0].num_rows(), 5);
|
||||
|
||||
// Verify: Table appears in table_names
|
||||
let table_names = conn
|
||||
.table_names()
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to list tables");
|
||||
assert!(table_names.contains(&"test_table".to_string()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_describe_table() {
|
||||
// Setup: Create a temporary directory for the namespace
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let root_path = tmp_dir.path().to_str().unwrap().to_string();
|
||||
|
||||
// Connect to namespace
|
||||
let mut properties = HashMap::new();
|
||||
properties.insert("root".to_string(), root_path);
|
||||
|
||||
let conn = connect_namespace("dir", properties)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to connect to namespace");
|
||||
|
||||
// Create a table first
|
||||
let test_data = create_test_data();
|
||||
let _table = conn
|
||||
.create_table("describe_test", test_data)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to create table");
|
||||
|
||||
// Test: Open the table (which internally uses describe_table)
|
||||
let opened_table = conn
|
||||
.open_table("describe_test")
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to open table");
|
||||
|
||||
// Verify: Can query the opened table
|
||||
let results = opened_table
|
||||
.query()
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to query table")
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.expect("Failed to collect results");
|
||||
|
||||
assert_eq!(results.len(), 1);
|
||||
assert_eq!(results[0].num_rows(), 5);
|
||||
|
||||
// Verify schema matches
|
||||
let schema = opened_table.schema().await.expect("Failed to get schema");
|
||||
assert_eq!(schema.fields.len(), 2);
|
||||
assert_eq!(schema.field(0).name(), "id");
|
||||
assert_eq!(schema.field(1).name(), "name");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_create_table_overwrite_mode() {
|
||||
// Setup: Create a temporary directory for the namespace
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let root_path = tmp_dir.path().to_str().unwrap().to_string();
|
||||
|
||||
let mut properties = HashMap::new();
|
||||
properties.insert("root".to_string(), root_path);
|
||||
|
||||
let conn = connect_namespace("dir", properties)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to connect to namespace");
|
||||
|
||||
// Create initial table with 5 rows
|
||||
let test_data1 = create_test_data();
|
||||
let _table1 = conn
|
||||
.create_table("overwrite_test", test_data1)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to create table");
|
||||
|
||||
// Create new data with 3 rows
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int32, false),
|
||||
Field::new("name", DataType::Utf8, false),
|
||||
]));
|
||||
let id_array = Int32Array::from(vec![10, 20, 30]);
|
||||
let name_array = StringArray::from(vec!["New1", "New2", "New3"]);
|
||||
let test_data2 = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(id_array), Arc::new(name_array)],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Test: Overwrite the table
|
||||
let table2 = conn
|
||||
.create_table(
|
||||
"overwrite_test",
|
||||
RecordBatchIterator::new(
|
||||
vec![std::result::Result::Ok(test_data2)].into_iter(),
|
||||
schema,
|
||||
),
|
||||
)
|
||||
.mode(CreateTableMode::Overwrite)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to overwrite table");
|
||||
|
||||
// Verify: Table has new data (3 rows instead of 5)
|
||||
let results = table2
|
||||
.query()
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to query table")
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.expect("Failed to collect results");
|
||||
|
||||
assert_eq!(results.len(), 1);
|
||||
assert_eq!(results[0].num_rows(), 3);
|
||||
|
||||
// Verify the data is actually the new data
|
||||
let id_col = results[0]
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<Int32Array>()
|
||||
.unwrap();
|
||||
assert_eq!(id_col.value(0), 10);
|
||||
assert_eq!(id_col.value(1), 20);
|
||||
assert_eq!(id_col.value(2), 30);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_create_table_exist_ok_mode() {
|
||||
// Setup: Create a temporary directory for the namespace
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let root_path = tmp_dir.path().to_str().unwrap().to_string();
|
||||
|
||||
let mut properties = HashMap::new();
|
||||
properties.insert("root".to_string(), root_path);
|
||||
|
||||
let conn = connect_namespace("dir", properties)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to connect to namespace");
|
||||
|
||||
// Create initial table with test data
|
||||
let test_data1 = create_test_data();
|
||||
let _table1 = conn
|
||||
.create_table("exist_ok_test", test_data1)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to create table");
|
||||
|
||||
// Try to create again with exist_ok mode
|
||||
let test_data2 = create_test_data();
|
||||
let table2 = conn
|
||||
.create_table("exist_ok_test", test_data2)
|
||||
.mode(CreateTableMode::exist_ok(|req| req))
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed with exist_ok mode");
|
||||
|
||||
// Verify: Table still has original data (5 rows)
|
||||
let results = table2
|
||||
.query()
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to query table")
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.expect("Failed to collect results");
|
||||
|
||||
assert_eq!(results.len(), 1);
|
||||
assert_eq!(results[0].num_rows(), 5);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_create_multiple_tables() {
|
||||
// Setup: Create a temporary directory for the namespace
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let root_path = tmp_dir.path().to_str().unwrap().to_string();
|
||||
|
||||
let mut properties = HashMap::new();
|
||||
properties.insert("root".to_string(), root_path);
|
||||
|
||||
let conn = connect_namespace("dir", properties)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to connect to namespace");
|
||||
|
||||
// Create first table
|
||||
let test_data1 = create_test_data();
|
||||
let _table1 = conn
|
||||
.create_table("table1", test_data1)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to create first table");
|
||||
|
||||
// Create second table
|
||||
let test_data2 = create_test_data();
|
||||
let _table2 = conn
|
||||
.create_table("table2", test_data2)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to create second table");
|
||||
|
||||
// Verify: Both tables appear in table list
|
||||
let table_names = conn
|
||||
.table_names()
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to list tables");
|
||||
|
||||
assert!(table_names.contains(&"table1".to_string()));
|
||||
assert!(table_names.contains(&"table2".to_string()));
|
||||
|
||||
// Verify: Can open both tables
|
||||
let opened_table1 = conn
|
||||
.open_table("table1")
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to open table1");
|
||||
|
||||
let opened_table2 = conn
|
||||
.open_table("table2")
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to open table2");
|
||||
|
||||
// Verify both tables work
|
||||
let count1 = opened_table1
|
||||
.count_rows(None)
|
||||
.await
|
||||
.expect("Failed to count rows in table1");
|
||||
assert_eq!(count1, 5);
|
||||
|
||||
let count2 = opened_table2
|
||||
.count_rows(None)
|
||||
.await
|
||||
.expect("Failed to count rows in table2");
|
||||
assert_eq!(count2, 5);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_table_not_found() {
|
||||
// Setup: Create a temporary directory for the namespace
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let root_path = tmp_dir.path().to_str().unwrap().to_string();
|
||||
|
||||
let mut properties = HashMap::new();
|
||||
properties.insert("root".to_string(), root_path);
|
||||
|
||||
let conn = connect_namespace("dir", properties)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to connect to namespace");
|
||||
|
||||
// Test: Try to open a non-existent table
|
||||
let result = conn.open_table("non_existent_table").execute().await;
|
||||
|
||||
// Verify: Should return an error
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_drop_table() {
|
||||
// Setup: Create a temporary directory for the namespace
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let root_path = tmp_dir.path().to_str().unwrap().to_string();
|
||||
|
||||
let mut properties = HashMap::new();
|
||||
properties.insert("root".to_string(), root_path);
|
||||
|
||||
let conn = connect_namespace("dir", properties)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to connect to namespace");
|
||||
|
||||
// Create a table first
|
||||
let test_data = create_test_data();
|
||||
let _table = conn
|
||||
.create_table("drop_test", test_data)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to create table");
|
||||
|
||||
// Verify table exists
|
||||
let table_names_before = conn
|
||||
.table_names()
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to list tables");
|
||||
assert!(table_names_before.contains(&"drop_test".to_string()));
|
||||
|
||||
// Test: Drop the table
|
||||
conn.drop_table("drop_test", &[])
|
||||
.await
|
||||
.expect("Failed to drop table");
|
||||
|
||||
// Verify: Table no longer exists
|
||||
let table_names_after = conn
|
||||
.table_names()
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to list tables");
|
||||
assert!(!table_names_after.contains(&"drop_test".to_string()));
|
||||
|
||||
// Verify: Cannot open dropped table
|
||||
let open_result = conn.open_table("drop_test").execute().await;
|
||||
assert!(open_result.is_err());
|
||||
}
|
||||
}
|
||||
@@ -212,7 +212,7 @@ use std::fmt::Display;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub use connection::Connection;
|
||||
pub use connection::{ConnectNamespaceBuilder, Connection};
|
||||
pub use error::{Error, Result};
|
||||
use lance_linalg::distance::DistanceType as LanceDistanceType;
|
||||
pub use table::Table;
|
||||
@@ -289,6 +289,8 @@ impl Display for DistanceType {
|
||||
|
||||
/// Connect to a database
|
||||
pub use connection::connect;
|
||||
/// Connect to a namespace-backed database
|
||||
pub use connection::connect_namespace;
|
||||
|
||||
/// Re-export Lance Session and ObjectStoreRegistry for custom session creation
|
||||
pub use lance::session::Session;
|
||||
|
||||
Reference in New Issue
Block a user