diff --git a/rust/lancedb/src/database/namespace.rs b/rust/lancedb/src/database/namespace.rs index 6f34529e..6f05a057 100644 --- a/rust/lancedb/src/database/namespace.rs +++ b/rust/lancedb/src/database/namespace.rs @@ -7,7 +7,6 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; -use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsProvider}; use lance_namespace::{ models::{ CreateEmptyTableRequest, CreateNamespaceRequest, CreateNamespaceResponse, @@ -19,13 +18,13 @@ use lance_namespace::{ }; use lance_namespace_impls::ConnectBuilder; -use crate::connection::ConnectRequest; use crate::database::ReadConsistency; use crate::error::{Error, Result}; +use crate::table::NativeTable; use super::{ - listing::ListingDatabase, BaseTable, CloneTableRequest, CreateTableMode, - CreateTableRequest as DbCreateTableRequest, Database, OpenTableRequest, TableNamesRequest, + BaseTable, CloneTableRequest, CreateTableMode, CreateTableRequest as DbCreateTableRequest, + Database, OpenTableRequest, TableNamesRequest, }; /// A database implementation that uses lance-namespace for table management @@ -90,51 +89,6 @@ impl std::fmt::Display for LanceNamespaceDatabase { } } -impl LanceNamespaceDatabase { - /// Create a temporary listing database for the given location - /// - /// Merges storage options with priority: connection < user < namespace - async fn create_listing_database( - &self, - location: &str, - table_id: Vec, - user_storage_options: Option<&HashMap>, - response_storage_options: Option<&HashMap>, - ) -> Result { - // Merge storage options: connection < user < namespace - let mut merged_storage_options = self.storage_options.clone(); - if let Some(opts) = user_storage_options { - merged_storage_options.extend(opts.clone()); - } - if let Some(opts) = response_storage_options { - merged_storage_options.extend(opts.clone()); - } - - let request = ConnectRequest { - uri: location.to_string(), - #[cfg(feature = "remote")] - client_config: Default::default(), - options: merged_storage_options, - read_consistency_interval: self.read_consistency_interval, - session: self.session.clone(), - }; - - let mut listing_db = ListingDatabase::connect_with_options(&request).await?; - - // Create storage options provider only if namespace returned storage options - // (not just user-provided options) - if response_storage_options.is_some() { - let provider = Arc::new(LanceNamespaceStorageOptionsProvider::new( - self.namespace.clone(), - table_id, - )) as Arc; - listing_db.storage_options_provider = Some(provider); - } - - Ok(listing_db) - } -} - #[async_trait] impl Database for LanceNamespaceDatabase { fn uri(&self) -> &str { @@ -195,14 +149,6 @@ impl Database for LanceNamespaceDatabase { } async fn create_table(&self, request: DbCreateTableRequest) -> Result> { - // Extract user-provided storage options from request - let user_storage_options = request - .write_options - .lance_write_params - .as_ref() - .and_then(|lwp| lwp.store_params.as_ref()) - .and_then(|sp| sp.storage_options.as_ref()); - let mut table_id = request.namespace.clone(); table_id.push(request.name.clone()); let describe_request = DescribeTableRequest { @@ -235,34 +181,20 @@ impl Database for LanceNamespaceDatabase { } } CreateTableMode::ExistOk(_) => { - if let Ok(response) = describe_result { - let location = response.location.ok_or_else(|| Error::Runtime { - message: "Table location is missing from namespace response".to_string(), - })?; + if describe_result.is_ok() { + let native_table = NativeTable::open_from_namespace( + self.namespace.clone(), + &request.name, + request.namespace.clone(), + None, + None, + self.read_consistency_interval, + self.server_side_query_enabled, + self.session.clone(), + ) + .await?; - let listing_db = self - .create_listing_database( - &location, - table_id.clone(), - user_storage_options, - response.storage_options.as_ref(), - ) - .await?; - - let namespace_client = self - .server_side_query_enabled - .then(|| self.namespace.clone()); - - return listing_db - .open_table(OpenTableRequest { - name: request.name.clone(), - namespace: request.namespace.clone(), - index_cache_size: None, - lance_read_params: None, - location: Some(location), - namespace_client, - }) - .await; + return Ok(Arc::new(native_table)); } } } @@ -294,82 +226,37 @@ impl Database for LanceNamespaceDatabase { message: "Table location is missing from create_empty_table response".to_string(), })?; - let listing_db = self - .create_listing_database( - &location, - table_id.clone(), - user_storage_options, - create_empty_response.storage_options.as_ref(), - ) - .await?; + let native_table = NativeTable::create_from_namespace( + self.namespace.clone(), + &location, + &request.name, + request.namespace.clone(), + request.data, + None, // write_store_wrapper not used for namespace connections + request.write_options.lance_write_params, + self.read_consistency_interval, + self.server_side_query_enabled, + self.session.clone(), + ) + .await?; - let namespace_client = self - .server_side_query_enabled - .then(|| self.namespace.clone()); - - let create_request = DbCreateTableRequest { - name: request.name, - namespace: request.namespace, - data: request.data, - mode: request.mode, - write_options: request.write_options, - location: Some(location), - namespace_client, - }; - - listing_db.create_table(create_request).await + Ok(Arc::new(native_table)) } async fn open_table(&self, request: OpenTableRequest) -> Result> { - // Extract user-provided storage options from request - let user_storage_options = request - .lance_read_params - .as_ref() - .and_then(|lrp| lrp.store_options.as_ref()) - .and_then(|so| so.storage_options.as_ref()); + let native_table = NativeTable::open_from_namespace( + self.namespace.clone(), + &request.name, + request.namespace.clone(), + None, // write_store_wrapper not used for namespace connections + request.lance_read_params, + self.read_consistency_interval, + self.server_side_query_enabled, + self.session.clone(), + ) + .await?; - let mut table_id = request.namespace.clone(); - table_id.push(request.name.clone()); - - let describe_request = DescribeTableRequest { - id: Some(table_id.clone()), - version: None, - }; - let response = self - .namespace - .describe_table(describe_request) - .await - .map_err(|e| Error::Runtime { - message: format!("Failed to describe table: {}", e), - })?; - - let location = response.location.ok_or_else(|| Error::Runtime { - message: "Table location is missing from namespace response".to_string(), - })?; - - let listing_db = self - .create_listing_database( - &location, - table_id.clone(), - user_storage_options, - response.storage_options.as_ref(), - ) - .await?; - - let namespace_client = self - .server_side_query_enabled - .then(|| self.namespace.clone()); - - let open_request = OpenTableRequest { - name: request.name.clone(), - namespace: request.namespace.clone(), - index_cache_size: request.index_cache_size, - lance_read_params: request.lance_read_params, - location: Some(location), - namespace_client, - }; - - listing_db.open_table(open_request).await + Ok(Arc::new(native_table)) } async fn clone_table(&self, _request: CloneTableRequest) -> Result> { diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 14e04714..9f8d16b5 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -29,7 +29,7 @@ use lance::dataset::{ use lance::dataset::{MergeInsertBuilder as LanceMergeInsertBuilder, WhenNotMatchedBySource}; use lance::index::vector::utils::infer_vector_dim; use lance::index::vector::VectorIndexParams; -use lance::io::WrappingObjectStore; +use lance::io::{ObjectStoreParams, WrappingObjectStore}; use lance_datafusion::exec::{analyze_plan as lance_analyze_plan, execute_plan}; use lance_datafusion::utils::StreamingWriteSource; use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams}; @@ -40,6 +40,7 @@ use lance_index::vector::pq::PQBuildParams; use lance_index::vector::sq::builder::SQBuildParams; use lance_index::DatasetIndexExt; use lance_index::IndexType; +use lance_io::object_store::LanceNamespaceStorageOptionsProvider; use lance_namespace::models::{ QueryTableRequest as NsQueryTableRequest, QueryTableRequestFullTextQuery, QueryTableRequestVector, StringFtsQuery, @@ -1611,6 +1612,105 @@ impl NativeTable { self } + /// Opens an existing Table using a namespace client. + /// + /// This method uses `DatasetBuilder::from_namespace` to open the table, which + /// automatically fetches the table location and storage options from the namespace. + /// This eliminates the need to pre-fetch and merge storage options before opening. + /// + /// # Arguments + /// + /// * `namespace_client` - The namespace client to use for fetching table metadata + /// * `name` - The table name + /// * `namespace` - The namespace path (e.g., vec!["parent", "child"]) + /// * `write_store_wrapper` - Optional wrapper for the object store on write path + /// * `params` - Optional read parameters + /// * `read_consistency_interval` - Optional interval for read consistency + /// * `server_side_query_enabled` - Whether to enable server-side query execution. + /// When true, the namespace_client will be stored and queries will be executed + /// on the namespace server. When false, the namespace is only used for opening + /// the table, and queries are executed locally. + /// * `session` - Optional session for object stores and caching + /// + /// # Returns + /// + /// * A [NativeTable] object. + #[allow(clippy::too_many_arguments)] + pub async fn open_from_namespace( + namespace_client: Arc, + name: &str, + namespace: Vec, + write_store_wrapper: Option>, + params: Option, + read_consistency_interval: Option, + server_side_query_enabled: bool, + session: Option>, + ) -> Result { + let mut params = params.unwrap_or_default(); + + // Set the session in read params + if let Some(sess) = session { + params.session(sess); + } + + // patch the params if we have a write store wrapper + let params = match write_store_wrapper.clone() { + Some(wrapper) => params.patch_with_store_wrapper(wrapper)?, + None => params, + }; + + // Build table_id from namespace + name + let mut table_id = namespace.clone(); + table_id.push(name.to_string()); + + // Use DatasetBuilder::from_namespace which automatically fetches location + // and storage options from the namespace + let builder = DatasetBuilder::from_namespace( + namespace_client.clone(), + table_id, + false, // Don't ignore namespace storage options + ) + .await + .map_err(|e| match e { + lance::Error::Namespace { source, .. } => Error::Runtime { + message: format!("Failed to get table info from namespace: {:?}", source), + }, + source => Error::Lance { source }, + })?; + + let dataset = builder + .with_read_params(params) + .load() + .await + .map_err(|e| match e { + lance::Error::DatasetNotFound { .. } => Error::TableNotFound { + name: name.to_string(), + source: Box::new(e), + }, + source => Error::Lance { source }, + })?; + + let uri = dataset.uri().to_string(); + let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval); + let id = Self::build_id(&namespace, name); + + let stored_namespace_client = if server_side_query_enabled { + Some(namespace_client) + } else { + None + }; + + Ok(Self { + name: name.to_string(), + namespace, + id, + uri, + dataset, + read_consistency_interval, + namespace_client: stored_namespace_client, + }) + } + fn get_table_name(uri: &str) -> Result { let path = Path::new(uri); let name = path @@ -1722,6 +1822,102 @@ impl NativeTable { .await } + /// Creates a new Table using a namespace client for storage options. + /// + /// This method sets up a `StorageOptionsProvider` from the namespace client, + /// enabling automatic credential refresh for cloud storage. The namespace + /// is used for: + /// 1. Setting up storage options provider for credential vending + /// 2. Optionally enabling server-side query execution + /// + /// # Arguments + /// + /// * `namespace_client` - The namespace client to use for storage options + /// * `uri` - The URI to the table (obtained from create_empty_table response) + /// * `name` - The table name + /// * `namespace` - The namespace path (e.g., vec!["parent", "child"]) + /// * `batches` - RecordBatch to be saved in the database + /// * `write_store_wrapper` - Optional wrapper for the object store on write path + /// * `params` - Optional write parameters + /// * `read_consistency_interval` - Optional interval for read consistency + /// * `server_side_query_enabled` - Whether to enable server-side query execution + /// + /// # Returns + /// + /// * A [NativeTable] object. + #[allow(clippy::too_many_arguments)] + pub async fn create_from_namespace( + namespace_client: Arc, + uri: &str, + name: &str, + namespace: Vec, + batches: impl StreamingWriteSource, + write_store_wrapper: Option>, + params: Option, + read_consistency_interval: Option, + server_side_query_enabled: bool, + session: Option>, + ) -> Result { + // Build table_id from namespace + name for the storage options provider + let mut table_id = namespace.clone(); + table_id.push(name.to_string()); + + // Set up storage options provider from namespace + let storage_options_provider = Arc::new(LanceNamespaceStorageOptionsProvider::new( + namespace_client.clone(), + table_id, + )); + + // Start with provided params or defaults + let mut params = params.unwrap_or_default(); + + // Set the session in write params + if let Some(sess) = session { + params.session = Some(sess); + } + + // Ensure store_params exists and set the storage options provider + let store_params = params + .store_params + .get_or_insert_with(ObjectStoreParams::default); + store_params.storage_options_provider = Some(storage_options_provider); + + // Patch the params if we have a write store wrapper + let params = match write_store_wrapper.clone() { + Some(wrapper) => params.patch_with_store_wrapper(wrapper)?, + None => params, + }; + + let insert_builder = InsertBuilder::new(uri).with_params(¶ms); + let dataset = insert_builder + .execute_stream(batches) + .await + .map_err(|e| match e { + lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists { + name: name.to_string(), + }, + source => Error::Lance { source }, + })?; + + let id = Self::build_id(&namespace, name); + + let stored_namespace_client = if server_side_query_enabled { + Some(namespace_client) + } else { + None + }; + + Ok(Self { + name: name.to_string(), + namespace, + id, + uri: uri.to_string(), + dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval), + read_consistency_interval, + namespace_client: stored_namespace_client, + }) + } + async fn optimize_indices(&self, options: &OptimizeOptions) -> Result<()> { info!("LanceDB: optimizing indices: {:?}", options); self.dataset