From 4e03ee82bc7e0df0b902a3ee3fca47d91441e765 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 18 Mar 2025 10:13:59 -0700 Subject: [PATCH] refactor: rework catalog/database options (#2213) The `ConnectRequest` has a set of properties that only make sense for listing databases / catalogs and a set of properties that only make sense for remote databases. This PR reduces all options to a single `HashMap`. This makes it easier to add new database / catalog implementations and makes it clearer to users which options are applicable in which situations. I don't believe there are any breaking changes here. The closest thing is that I placed the `ConnectBuilder` methods `api_key`, `region`, and `host_override` behind a `remote` feature gate. This is not strictly needed and I could remove the feature gate but it seemed appropriate. Since using these methods without the remote feature would have been meaningless I don't feel this counts as a breaking change. We could look at removing these methods entirely from the `ConnectBuilder` (and encouraging users to use `RemoteDatabaseOptions` instead) but I'm not sure how I feel about that. Another approach we could take is to move these methods into a `RemoteConnectBuilderExt` trait (and there could be a similar `ListingConnectBuilderExt` trait to add methods for the listing database / catalog). For now though my main goal is to simplify `ConnectRequest` as much as possible (I see this being part of the key public API for database / catalog integrations, similar to the `BaseTable`, `Catalog`, and `Database` traits and I'd like it to be simple). --- rust/lancedb/src/catalog/listing.rs | 113 +++++++++++++++++++-------- rust/lancedb/src/connection.rs | 103 ++++++++++++++++-------- rust/lancedb/src/database/listing.rs | 100 ++++++++++++++++++++++-- rust/lancedb/src/remote.rs | 1 + rust/lancedb/src/remote/db.rs | 113 ++++++++++++++++++++++++++- 5 files changed, 355 insertions(+), 75 deletions(-) diff --git a/rust/lancedb/src/catalog/listing.rs b/rust/lancedb/src/catalog/listing.rs index 38e0aa07..06cf63d1 100644 --- a/rust/lancedb/src/catalog/listing.rs +++ b/rust/lancedb/src/catalog/listing.rs @@ -9,11 +9,12 @@ use std::path::Path; use std::sync::Arc; use super::{ - Catalog, CreateDatabaseMode, CreateDatabaseRequest, DatabaseNamesRequest, OpenDatabaseRequest, + Catalog, CatalogOptions, CreateDatabaseMode, CreateDatabaseRequest, DatabaseNamesRequest, + OpenDatabaseRequest, }; use crate::connection::ConnectRequest; -use crate::database::listing::ListingDatabase; -use crate::database::Database; +use crate::database::listing::{ListingDatabase, ListingDatabaseOptions}; +use crate::database::{Database, DatabaseOptions}; use crate::error::{CreateDirSnafu, Error, Result}; use async_trait::async_trait; use lance::io::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; @@ -21,6 +22,58 @@ use lance_io::local::to_local_path; use object_store::path::Path as ObjectStorePath; use snafu::ResultExt; +/// Options for the listing catalog +/// +/// Note: the catalog will use the `storage_options` configured on +/// db_options to configure storage for listing / creating / deleting +/// databases. +#[derive(Clone, Debug, Default)] +pub struct ListingCatalogOptions { + /// The options to use for databases opened by this catalog + /// + /// This also contains the storage options used by the catalog + pub db_options: ListingDatabaseOptions, +} + +impl CatalogOptions for ListingCatalogOptions { + fn serialize_into_map(&self, map: &mut HashMap) { + self.db_options.serialize_into_map(map); + } +} + +impl ListingCatalogOptions { + pub fn builder() -> ListingCatalogOptionsBuilder { + ListingCatalogOptionsBuilder::new() + } + + pub(crate) fn parse_from_map(map: &HashMap) -> Result { + let db_options = ListingDatabaseOptions::parse_from_map(map)?; + Ok(Self { db_options }) + } +} + +#[derive(Clone, Debug, Default)] +pub struct ListingCatalogOptionsBuilder { + options: ListingCatalogOptions, +} + +impl ListingCatalogOptionsBuilder { + pub fn new() -> Self { + Self { + options: ListingCatalogOptions::default(), + } + } + + pub fn db_options(mut self, db_options: ListingDatabaseOptions) -> Self { + self.options.db_options = db_options; + self + } + + pub fn build(self) -> ListingCatalogOptions { + self.options + } +} + /// A catalog implementation that works by listing subfolders in a directory /// /// The listing catalog will be created with a base folder specified by the URI. Every subfolder @@ -34,7 +87,7 @@ pub struct ListingCatalog { base_path: ObjectStorePath, - storage_options: HashMap, + options: ListingCatalogOptions, } impl ListingCatalog { @@ -61,7 +114,7 @@ impl ListingCatalog { uri: path.to_string(), base_path, object_store, - storage_options: HashMap::new(), + options: ListingCatalogOptions::default(), }) } @@ -69,13 +122,15 @@ impl ListingCatalog { let uri = &request.uri; let parse_res = url::Url::parse(uri); + let options = ListingCatalogOptions::parse_from_map(&request.options)?; + match parse_res { Ok(url) if url.scheme().len() == 1 && cfg!(windows) => Self::open_path(uri).await, Ok(url) => { let plain_uri = url.to_string(); let registry = Arc::new(ObjectStoreRegistry::default()); - let storage_options = request.storage_options.clone(); + let storage_options = options.db_options.storage_options.clone(); let os_params = ObjectStoreParams { storage_options: Some(storage_options.clone()), ..Default::default() @@ -90,7 +145,7 @@ impl ListingCatalog { uri: String::from(url.clone()), base_path, object_store, - storage_options, + options, }) } Err(_) => Self::open_path(uri).await, @@ -155,17 +210,19 @@ impl Catalog for ListingCatalog { let db_uri = format!("/{}/{}", self.base_path, request.name); - let connect_request = ConnectRequest { + let mut connect_request = ConnectRequest { uri: db_uri, - api_key: None, - region: None, - host_override: None, #[cfg(feature = "remote")] client_config: Default::default(), read_consistency_interval: None, - storage_options: self.storage_options.clone(), + options: Default::default(), }; + // Add the db options to the connect request + self.options + .db_options + .serialize_into_map(&mut connect_request.options); + Ok(Arc::new( ListingDatabase::connect_with_options(&connect_request).await?, )) @@ -180,17 +237,19 @@ impl Catalog for ListingCatalog { return Err(Error::DatabaseNotFound { name: request.name }); } - let connect_request = ConnectRequest { + let mut connect_request = ConnectRequest { uri: db_path.to_string(), - api_key: None, - region: None, - host_override: None, #[cfg(feature = "remote")] client_config: Default::default(), read_consistency_interval: None, - storage_options: self.storage_options.clone(), + options: Default::default(), }; + // Add the db options to the connect request + self.options + .db_options + .serialize_into_map(&mut connect_request.options); + Ok(Arc::new( ListingDatabase::connect_with_options(&connect_request).await?, )) @@ -249,12 +308,9 @@ mod tests { let request = ConnectRequest { uri: uri.clone(), - api_key: None, - region: None, - host_override: None, #[cfg(feature = "remote")] client_config: Default::default(), - storage_options: HashMap::new(), + options: Default::default(), read_consistency_interval: None, }; @@ -513,12 +569,9 @@ mod tests { let request = ConnectRequest { uri: path.to_string(), - api_key: None, - region: None, - host_override: None, #[cfg(feature = "remote")] client_config: Default::default(), - storage_options: HashMap::new(), + options: Default::default(), read_consistency_interval: None, }; @@ -535,12 +588,9 @@ mod tests { let request = ConnectRequest { uri: uri.clone(), - api_key: None, - region: None, - host_override: None, #[cfg(feature = "remote")] client_config: Default::default(), - storage_options: HashMap::new(), + options: Default::default(), read_consistency_interval: None, }; @@ -554,12 +604,9 @@ mod tests { let invalid_uri = "invalid:///path"; let request = ConnectRequest { uri: invalid_uri.to_string(), - api_key: None, - region: None, - host_override: None, #[cfg(feature = "remote")] client_config: Default::default(), - storage_options: HashMap::new(), + options: Default::default(), read_consistency_interval: None, }; diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index a413ce0d..4668ae9f 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -26,7 +26,10 @@ use crate::embeddings::{ }; use crate::error::{Error, Result}; #[cfg(feature = "remote")] -use crate::remote::client::ClientConfig; +use crate::remote::{ + client::ClientConfig, + db::{OPT_REMOTE_API_KEY, OPT_REMOTE_HOST_OVERRIDE, OPT_REMOTE_REGION}, +}; use crate::table::{TableDefinition, WriteOptions}; use crate::Table; pub use lance_encoding::version::LanceFileVersion; @@ -607,16 +610,11 @@ pub struct ConnectRequest { /// - `db://dbname` - LanceDB Cloud pub uri: String, - /// LanceDB Cloud API key, required if using Lance Cloud - pub api_key: Option, - /// LanceDB Cloud region, required if using Lance Cloud - pub region: Option, - /// LanceDB Cloud host override, only required if using an on-premises Lance Cloud instance - pub host_override: Option, #[cfg(feature = "remote")] pub client_config: ClientConfig, - pub storage_options: HashMap, + /// Database/Catalog specific options + pub options: HashMap, /// The interval at which to check for updates from other processes. /// @@ -643,35 +641,73 @@ impl ConnectBuilder { Self { request: ConnectRequest { uri: uri.to_string(), - api_key: None, - region: None, - host_override: None, #[cfg(feature = "remote")] client_config: Default::default(), read_consistency_interval: None, - storage_options: HashMap::new(), + options: HashMap::new(), }, embedding_registry: None, } } + /// Set the LanceDB Cloud API key. + /// + /// This option is only used when connecting to LanceDB Cloud (db:// URIs) + /// and will be ignored for other URIs. + /// + /// # Arguments + /// + /// * `api_key` - The API key to use for the connection + #[cfg(feature = "remote")] pub fn api_key(mut self, api_key: &str) -> Self { - self.request.api_key = Some(api_key.to_string()); + self.request + .options + .insert(OPT_REMOTE_API_KEY.to_string(), api_key.to_string()); self } + /// Set the LanceDB Cloud region. + /// + /// This option is only used when connecting to LanceDB Cloud (db:// URIs) + /// and will be ignored for other URIs. + /// + /// # Arguments + /// + /// * `region` - The region to use for the connection + #[cfg(feature = "remote")] pub fn region(mut self, region: &str) -> Self { - self.request.region = Some(region.to_string()); + self.request + .options + .insert(OPT_REMOTE_REGION.to_string(), region.to_string()); self } + /// Set the LanceDB Cloud host override. + /// + /// This option is only used when connecting to LanceDB Cloud (db:// URIs) + /// and will be ignored for other URIs. + /// + /// # Arguments + /// + /// * `host_override` - The host override to use for the connection + #[cfg(feature = "remote")] pub fn host_override(mut self, host_override: &str) -> Self { - self.request.host_override = Some(host_override.to_string()); + self.request.options.insert( + OPT_REMOTE_HOST_OVERRIDE.to_string(), + host_override.to_string(), + ); self } + /// Set the database specific options + /// + /// See [crate::database::listing::ListingDatabaseOptions] for the options available for + /// native LanceDB databases. + /// + /// See [crate::remote::db::RemoteDatabaseOptions] for the options available for + /// LanceDB Cloud and LanceDB Enterprise. pub fn database_options(mut self, database_options: &dyn DatabaseOptions) -> Self { - database_options.serialize_into_map(&mut self.request.storage_options); + database_options.serialize_into_map(&mut self.request.options); self } @@ -709,14 +745,14 @@ impl ConnectBuilder { #[deprecated(note = "Pass through storage_options instead")] pub fn aws_creds(mut self, aws_creds: AwsCredential) -> Self { self.request - .storage_options + .options .insert("aws_access_key_id".into(), aws_creds.key_id.clone()); self.request - .storage_options + .options .insert("aws_secret_access_key".into(), aws_creds.secret_key.clone()); if let Some(token) = &aws_creds.token { self.request - .storage_options + .options .insert("aws_session_token".into(), token.clone()); } self @@ -726,9 +762,7 @@ impl ConnectBuilder { /// /// See available options at pub fn storage_option(mut self, key: impl Into, value: impl Into) -> Self { - self.request - .storage_options - .insert(key.into(), value.into()); + self.request.options.insert(key.into(), value.into()); self } @@ -740,9 +774,7 @@ impl ConnectBuilder { pairs: impl IntoIterator, impl Into)>, ) -> Self { for (key, value) in pairs { - self.request - .storage_options - .insert(key.into(), value.into()); + self.request.options.insert(key.into(), value.into()); } self } @@ -772,19 +804,23 @@ impl ConnectBuilder { #[cfg(feature = "remote")] fn execute_remote(self) -> Result { - let region = self.request.region.ok_or_else(|| Error::InvalidInput { + use crate::remote::db::RemoteDatabaseOptions; + + let options = RemoteDatabaseOptions::parse_from_map(&self.request.options)?; + + let region = options.region.ok_or_else(|| Error::InvalidInput { message: "A region is required when connecting to LanceDb Cloud".to_string(), })?; - let api_key = self.request.api_key.ok_or_else(|| Error::InvalidInput { + let api_key = options.api_key.ok_or_else(|| Error::InvalidInput { message: "An api_key is required when connecting to LanceDb Cloud".to_string(), })?; - let storage_options = StorageOptions(self.request.storage_options.clone()); + let storage_options = StorageOptions(options.storage_options.clone()); let internal = Arc::new(crate::remote::db::RemoteDatabase::try_new( &self.request.uri, &api_key, ®ion, - self.request.host_override, + options.host_override, self.request.client_config, storage_options.into(), )?); @@ -844,19 +880,16 @@ impl CatalogConnectBuilder { Self { request: ConnectRequest { uri: uri.to_string(), - api_key: None, - region: None, - host_override: None, #[cfg(feature = "remote")] client_config: Default::default(), read_consistency_interval: None, - storage_options: HashMap::new(), + options: HashMap::new(), }, } } pub fn catalog_options(mut self, catalog_options: &dyn CatalogOptions) -> Self { - catalog_options.serialize_into_map(&mut self.request.storage_options); + catalog_options.serialize_into_map(&mut self.request.options); self } @@ -1036,6 +1069,7 @@ mod tests { data_storage_version: Some(LanceFileVersion::Legacy), ..Default::default() }, + ..Default::default() }) .execute() .await @@ -1068,6 +1102,7 @@ mod tests { data_storage_version: Some(LanceFileVersion::Stable), ..Default::default() }, + ..Default::default() }) .execute() .await diff --git a/rust/lancedb/src/database/listing.rs b/rust/lancedb/src/database/listing.rs index c47fd72a..10d8beab 100644 --- a/rust/lancedb/src/database/listing.rs +++ b/rust/lancedb/src/database/listing.rs @@ -51,10 +51,22 @@ pub struct NewTableConfig { pub struct ListingDatabaseOptions { /// Controls what kind of Lance tables will be created by this database pub new_table_config: NewTableConfig, + /// Storage options configure the storage layer (e.g. S3, GCS, Azure, etc.) + /// + /// These are used to create/list tables and they are inherited by all tables + /// opened by this database. + /// + /// See available options at + pub storage_options: HashMap, } impl ListingDatabaseOptions { - fn parse_from_map(map: &HashMap) -> Result { + /// Create a new builder for the listing database options + pub fn builder() -> ListingDatabaseOptionsBuilder { + ListingDatabaseOptionsBuilder::new() + } + + pub(crate) fn parse_from_map(map: &HashMap) -> Result { let new_table_config = NewTableConfig { data_storage_version: map .get(OPT_NEW_TABLE_STORAGE_VERSION) @@ -72,7 +84,19 @@ impl ListingDatabaseOptions { }) .transpose()?, }; - Ok(Self { new_table_config }) + // We just assume that any options that are not new table config options are storage options + let storage_options = map + .iter() + .filter(|(key, _)| { + key.as_str() != OPT_NEW_TABLE_STORAGE_VERSION + && key.as_str() != OPT_NEW_TABLE_V2_MANIFEST_PATHS + }) + .map(|(key, value)| (key.clone(), value.clone())) + .collect(); + Ok(Self { + new_table_config, + storage_options, + }) } } @@ -93,6 +117,71 @@ impl DatabaseOptions for ListingDatabaseOptions { } } +#[derive(Clone, Debug, Default)] +pub struct ListingDatabaseOptionsBuilder { + options: ListingDatabaseOptions, +} + +impl ListingDatabaseOptionsBuilder { + pub fn new() -> Self { + Self { + options: ListingDatabaseOptions::default(), + } + } +} + +impl ListingDatabaseOptionsBuilder { + /// Set the storage version to use for new tables + /// + /// # Arguments + /// + /// * `data_storage_version` - The storage version to use for new tables + pub fn data_storage_version(mut self, data_storage_version: LanceFileVersion) -> Self { + self.options.new_table_config.data_storage_version = Some(data_storage_version); + self + } + + /// Enable V2 manifest paths for new tables + /// + /// # Arguments + /// + /// * `enable_v2_manifest_paths` - Whether to enable V2 manifest paths for new tables + pub fn enable_v2_manifest_paths(mut self, enable_v2_manifest_paths: bool) -> Self { + self.options.new_table_config.enable_v2_manifest_paths = Some(enable_v2_manifest_paths); + self + } + + /// Set an option for the storage layer. + /// + /// See available options at + pub fn storage_option(mut self, key: impl Into, value: impl Into) -> Self { + self.options + .storage_options + .insert(key.into(), value.into()); + self + } + + /// Set multiple options for the storage layer. + /// + /// See available options at + pub fn storage_options( + mut self, + pairs: impl IntoIterator, impl Into)>, + ) -> Self { + for (key, value) in pairs { + self.options + .storage_options + .insert(key.into(), value.into()); + } + self + } + + /// Build the options + pub fn build(self) -> ListingDatabaseOptions { + self.options + } +} + /// A database that stores tables in a flat directory structure /// /// Tables are stored as directories in the base path of the object store. @@ -164,7 +253,7 @@ impl ListingDatabase { let uri = &request.uri; let parse_res = url::Url::parse(uri); - let options = ListingDatabaseOptions::parse_from_map(&request.storage_options)?; + let options = ListingDatabaseOptions::parse_from_map(&request.options)?; // TODO: pass params regardless of OS match parse_res { @@ -225,9 +314,8 @@ impl ListingDatabase { let plain_uri = url.to_string(); let registry = Arc::new(ObjectStoreRegistry::default()); - let storage_options = request.storage_options.clone(); let os_params = ObjectStoreParams { - storage_options: Some(storage_options.clone()), + storage_options: Some(options.storage_options.clone()), ..Default::default() }; let (object_store, base_path) = @@ -252,7 +340,7 @@ impl ListingDatabase { object_store, store_wrapper: write_store_wrapper, read_consistency_interval: request.read_consistency_interval, - storage_options, + storage_options: options.storage_options, new_table_config: options.new_table_config, }) } diff --git a/rust/lancedb/src/remote.rs b/rust/lancedb/src/remote.rs index 22da6de5..a154a662 100644 --- a/rust/lancedb/src/remote.rs +++ b/rust/lancedb/src/remote.rs @@ -18,3 +18,4 @@ const ARROW_FILE_CONTENT_TYPE: &str = "application/vnd.apache.arrow.file"; const JSON_CONTENT_TYPE: &str = "application/json"; pub use client::{ClientConfig, RetryConfig, TimeoutConfig}; +pub use db::{RemoteDatabaseOptions, RemoteDatabaseOptionsBuilder}; diff --git a/rust/lancedb/src/remote/db.rs b/rust/lancedb/src/remote/db.rs index 951c91be..2d1f4ced 100644 --- a/rust/lancedb/src/remote/db.rs +++ b/rust/lancedb/src/remote/db.rs @@ -14,8 +14,8 @@ use serde::Deserialize; use tokio::task::spawn_blocking; use crate::database::{ - CreateTableData, CreateTableMode, CreateTableRequest, Database, OpenTableRequest, - TableNamesRequest, + CreateTableData, CreateTableMode, CreateTableRequest, Database, DatabaseOptions, + OpenTableRequest, TableNamesRequest, }; use crate::error::Result; use crate::table::BaseTable; @@ -54,6 +54,115 @@ impl ServerVersion { } } +pub const OPT_REMOTE_PREFIX: &str = "remote_database_"; +pub const OPT_REMOTE_API_KEY: &str = "remote_database_api_key"; +pub const OPT_REMOTE_REGION: &str = "remote_database_region"; +pub const OPT_REMOTE_HOST_OVERRIDE: &str = "remote_database_host_override"; +// TODO: add support for configuring client config via key/value options + +#[derive(Clone, Debug, Default)] +pub struct RemoteDatabaseOptions { + /// The LanceDB Cloud API key + pub api_key: Option, + /// The LanceDB Cloud region + pub region: Option, + /// The LanceDB Enterprise host override + /// + /// This is required when connecting to LanceDB Enterprise and should be + /// provided if using an on-premises LanceDB Enterprise instance. + pub host_override: Option, + /// Storage options configure the storage layer (e.g. S3, GCS, Azure, etc.) + /// + /// See available options at + /// + /// These options are only used for LanceDB Enterprise and only a subset of options + /// are supported. + pub storage_options: HashMap, +} + +impl RemoteDatabaseOptions { + pub fn builder() -> RemoteDatabaseOptionsBuilder { + RemoteDatabaseOptionsBuilder::new() + } + + pub(crate) fn parse_from_map(map: &HashMap) -> Result { + let api_key = map.get(OPT_REMOTE_API_KEY).cloned(); + let region = map.get(OPT_REMOTE_REGION).cloned(); + let host_override = map.get(OPT_REMOTE_HOST_OVERRIDE).cloned(); + let storage_options = map + .iter() + .filter(|(key, _)| !key.starts_with(OPT_REMOTE_PREFIX)) + .map(|(key, value)| (key.clone(), value.clone())) + .collect(); + Ok(Self { + api_key, + region, + host_override, + storage_options, + }) + } +} + +impl DatabaseOptions for RemoteDatabaseOptions { + fn serialize_into_map(&self, map: &mut HashMap) { + for (key, value) in &self.storage_options { + map.insert(key.clone(), value.clone()); + } + if let Some(api_key) = &self.api_key { + map.insert(OPT_REMOTE_API_KEY.to_string(), api_key.clone()); + } + if let Some(region) = &self.region { + map.insert(OPT_REMOTE_REGION.to_string(), region.clone()); + } + if let Some(host_override) = &self.host_override { + map.insert(OPT_REMOTE_HOST_OVERRIDE.to_string(), host_override.clone()); + } + } +} + +#[derive(Clone, Debug, Default)] +pub struct RemoteDatabaseOptionsBuilder { + options: RemoteDatabaseOptions, +} + +impl RemoteDatabaseOptionsBuilder { + pub fn new() -> Self { + Self { + options: RemoteDatabaseOptions::default(), + } + } + + /// Set the LanceDB Cloud API key + /// + /// # Arguments + /// + /// * `api_key` - The LanceDB Cloud API key + pub fn api_key(mut self, api_key: String) -> Self { + self.options.api_key = Some(api_key); + self + } + + /// Set the LanceDB Cloud region + /// + /// # Arguments + /// + /// * `region` - The LanceDB Cloud region + pub fn region(mut self, region: String) -> Self { + self.options.region = Some(region); + self + } + + /// Set the LanceDB Enterprise host override + /// + /// # Arguments + /// + /// * `host_override` - The LanceDB Enterprise host override + pub fn host_override(mut self, host_override: String) -> Self { + self.options.host_override = Some(host_override); + self + } +} + #[derive(Deserialize)] struct ListTablesResponse { tables: Vec,