refactor: rework catalog/database options (#2213)

The `ConnectRequest` has a set of properties that only make sense for
listing databases / catalogs and a set of properties that only make
sense for remote databases.

This PR reduces all options to a single `HashMap<String, String>`. This
makes it easier to add new database / catalog implementations and makes
it clearer to users which options are applicable in which situations.

I don't believe there are any breaking changes here. The closest thing
is that I placed the `ConnectBuilder` methods `api_key`, `region`, and
`host_override` behind a `remote` feature gate. This is not strictly
needed and I could remove the feature gate but it seemed appropriate.
Since using these methods without the remote feature would have been
meaningless I don't feel this counts as a breaking change.

We could look at removing these methods entirely from the
`ConnectBuilder` (and encouraging users to use `RemoteDatabaseOptions`
instead) but I'm not sure how I feel about that.

Another approach we could take is to move these methods into a
`RemoteConnectBuilderExt` trait (and there could be a similar
`ListingConnectBuilderExt` trait to add methods for the listing database
/ catalog).

For now though my main goal is to simplify `ConnectRequest` as much as
possible (I see this being part of the key public API for database /
catalog integrations, similar to the `BaseTable`, `Catalog`, and
`Database` traits and I'd like it to be simple).
This commit is contained in:
Weston Pace
2025-03-18 10:13:59 -07:00
committed by GitHub
parent 46a6846d07
commit 4e03ee82bc
5 changed files with 355 additions and 75 deletions

View File

@@ -9,11 +9,12 @@ use std::path::Path;
use std::sync::Arc;
use super::{
Catalog, CreateDatabaseMode, CreateDatabaseRequest, DatabaseNamesRequest, OpenDatabaseRequest,
Catalog, CatalogOptions, CreateDatabaseMode, CreateDatabaseRequest, DatabaseNamesRequest,
OpenDatabaseRequest,
};
use crate::connection::ConnectRequest;
use crate::database::listing::ListingDatabase;
use crate::database::Database;
use crate::database::listing::{ListingDatabase, ListingDatabaseOptions};
use crate::database::{Database, DatabaseOptions};
use crate::error::{CreateDirSnafu, Error, Result};
use async_trait::async_trait;
use lance::io::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
@@ -21,6 +22,58 @@ use lance_io::local::to_local_path;
use object_store::path::Path as ObjectStorePath;
use snafu::ResultExt;
/// Options for the listing catalog
///
/// Note: the catalog will use the `storage_options` configured on
/// db_options to configure storage for listing / creating / deleting
/// databases.
#[derive(Clone, Debug, Default)]
pub struct ListingCatalogOptions {
/// The options to use for databases opened by this catalog
///
/// This also contains the storage options used by the catalog
pub db_options: ListingDatabaseOptions,
}
impl CatalogOptions for ListingCatalogOptions {
fn serialize_into_map(&self, map: &mut HashMap<String, String>) {
self.db_options.serialize_into_map(map);
}
}
impl ListingCatalogOptions {
pub fn builder() -> ListingCatalogOptionsBuilder {
ListingCatalogOptionsBuilder::new()
}
pub(crate) fn parse_from_map(map: &HashMap<String, String>) -> Result<Self> {
let db_options = ListingDatabaseOptions::parse_from_map(map)?;
Ok(Self { db_options })
}
}
#[derive(Clone, Debug, Default)]
pub struct ListingCatalogOptionsBuilder {
options: ListingCatalogOptions,
}
impl ListingCatalogOptionsBuilder {
pub fn new() -> Self {
Self {
options: ListingCatalogOptions::default(),
}
}
pub fn db_options(mut self, db_options: ListingDatabaseOptions) -> Self {
self.options.db_options = db_options;
self
}
pub fn build(self) -> ListingCatalogOptions {
self.options
}
}
/// A catalog implementation that works by listing subfolders in a directory
///
/// The listing catalog will be created with a base folder specified by the URI. Every subfolder
@@ -34,7 +87,7 @@ pub struct ListingCatalog {
base_path: ObjectStorePath,
storage_options: HashMap<String, String>,
options: ListingCatalogOptions,
}
impl ListingCatalog {
@@ -61,7 +114,7 @@ impl ListingCatalog {
uri: path.to_string(),
base_path,
object_store,
storage_options: HashMap::new(),
options: ListingCatalogOptions::default(),
})
}
@@ -69,13 +122,15 @@ impl ListingCatalog {
let uri = &request.uri;
let parse_res = url::Url::parse(uri);
let options = ListingCatalogOptions::parse_from_map(&request.options)?;
match parse_res {
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => Self::open_path(uri).await,
Ok(url) => {
let plain_uri = url.to_string();
let registry = Arc::new(ObjectStoreRegistry::default());
let storage_options = request.storage_options.clone();
let storage_options = options.db_options.storage_options.clone();
let os_params = ObjectStoreParams {
storage_options: Some(storage_options.clone()),
..Default::default()
@@ -90,7 +145,7 @@ impl ListingCatalog {
uri: String::from(url.clone()),
base_path,
object_store,
storage_options,
options,
})
}
Err(_) => Self::open_path(uri).await,
@@ -155,17 +210,19 @@ impl Catalog for ListingCatalog {
let db_uri = format!("/{}/{}", self.base_path, request.name);
let connect_request = ConnectRequest {
let mut connect_request = ConnectRequest {
uri: db_uri,
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
storage_options: self.storage_options.clone(),
options: Default::default(),
};
// Add the db options to the connect request
self.options
.db_options
.serialize_into_map(&mut connect_request.options);
Ok(Arc::new(
ListingDatabase::connect_with_options(&connect_request).await?,
))
@@ -180,17 +237,19 @@ impl Catalog for ListingCatalog {
return Err(Error::DatabaseNotFound { name: request.name });
}
let connect_request = ConnectRequest {
let mut connect_request = ConnectRequest {
uri: db_path.to_string(),
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
storage_options: self.storage_options.clone(),
options: Default::default(),
};
// Add the db options to the connect request
self.options
.db_options
.serialize_into_map(&mut connect_request.options);
Ok(Arc::new(
ListingDatabase::connect_with_options(&connect_request).await?,
))
@@ -249,12 +308,9 @@ mod tests {
let request = ConnectRequest {
uri: uri.clone(),
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
storage_options: HashMap::new(),
options: Default::default(),
read_consistency_interval: None,
};
@@ -513,12 +569,9 @@ mod tests {
let request = ConnectRequest {
uri: path.to_string(),
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
storage_options: HashMap::new(),
options: Default::default(),
read_consistency_interval: None,
};
@@ -535,12 +588,9 @@ mod tests {
let request = ConnectRequest {
uri: uri.clone(),
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
storage_options: HashMap::new(),
options: Default::default(),
read_consistency_interval: None,
};
@@ -554,12 +604,9 @@ mod tests {
let invalid_uri = "invalid:///path";
let request = ConnectRequest {
uri: invalid_uri.to_string(),
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
storage_options: HashMap::new(),
options: Default::default(),
read_consistency_interval: None,
};

View File

@@ -26,7 +26,10 @@ use crate::embeddings::{
};
use crate::error::{Error, Result};
#[cfg(feature = "remote")]
use crate::remote::client::ClientConfig;
use crate::remote::{
client::ClientConfig,
db::{OPT_REMOTE_API_KEY, OPT_REMOTE_HOST_OVERRIDE, OPT_REMOTE_REGION},
};
use crate::table::{TableDefinition, WriteOptions};
use crate::Table;
pub use lance_encoding::version::LanceFileVersion;
@@ -607,16 +610,11 @@ pub struct ConnectRequest {
/// - `db://dbname` - LanceDB Cloud
pub uri: String,
/// LanceDB Cloud API key, required if using Lance Cloud
pub api_key: Option<String>,
/// LanceDB Cloud region, required if using Lance Cloud
pub region: Option<String>,
/// LanceDB Cloud host override, only required if using an on-premises Lance Cloud instance
pub host_override: Option<String>,
#[cfg(feature = "remote")]
pub client_config: ClientConfig,
pub storage_options: HashMap<String, String>,
/// Database/Catalog specific options
pub options: HashMap<String, String>,
/// The interval at which to check for updates from other processes.
///
@@ -643,35 +641,73 @@ impl ConnectBuilder {
Self {
request: ConnectRequest {
uri: uri.to_string(),
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
storage_options: HashMap::new(),
options: HashMap::new(),
},
embedding_registry: None,
}
}
/// Set the LanceDB Cloud API key.
///
/// This option is only used when connecting to LanceDB Cloud (db:// URIs)
/// and will be ignored for other URIs.
///
/// # Arguments
///
/// * `api_key` - The API key to use for the connection
#[cfg(feature = "remote")]
pub fn api_key(mut self, api_key: &str) -> Self {
self.request.api_key = Some(api_key.to_string());
self.request
.options
.insert(OPT_REMOTE_API_KEY.to_string(), api_key.to_string());
self
}
/// Set the LanceDB Cloud region.
///
/// This option is only used when connecting to LanceDB Cloud (db:// URIs)
/// and will be ignored for other URIs.
///
/// # Arguments
///
/// * `region` - The region to use for the connection
#[cfg(feature = "remote")]
pub fn region(mut self, region: &str) -> Self {
self.request.region = Some(region.to_string());
self.request
.options
.insert(OPT_REMOTE_REGION.to_string(), region.to_string());
self
}
/// Set the LanceDB Cloud host override.
///
/// This option is only used when connecting to LanceDB Cloud (db:// URIs)
/// and will be ignored for other URIs.
///
/// # Arguments
///
/// * `host_override` - The host override to use for the connection
#[cfg(feature = "remote")]
pub fn host_override(mut self, host_override: &str) -> Self {
self.request.host_override = Some(host_override.to_string());
self.request.options.insert(
OPT_REMOTE_HOST_OVERRIDE.to_string(),
host_override.to_string(),
);
self
}
/// Set the database specific options
///
/// See [crate::database::listing::ListingDatabaseOptions] for the options available for
/// native LanceDB databases.
///
/// See [crate::remote::db::RemoteDatabaseOptions] for the options available for
/// LanceDB Cloud and LanceDB Enterprise.
pub fn database_options(mut self, database_options: &dyn DatabaseOptions) -> Self {
database_options.serialize_into_map(&mut self.request.storage_options);
database_options.serialize_into_map(&mut self.request.options);
self
}
@@ -709,14 +745,14 @@ impl ConnectBuilder {
#[deprecated(note = "Pass through storage_options instead")]
pub fn aws_creds(mut self, aws_creds: AwsCredential) -> Self {
self.request
.storage_options
.options
.insert("aws_access_key_id".into(), aws_creds.key_id.clone());
self.request
.storage_options
.options
.insert("aws_secret_access_key".into(), aws_creds.secret_key.clone());
if let Some(token) = &aws_creds.token {
self.request
.storage_options
.options
.insert("aws_session_token".into(), token.clone());
}
self
@@ -726,9 +762,7 @@ impl ConnectBuilder {
///
/// See available options at <https://lancedb.github.io/lancedb/guides/storage/>
pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.request
.storage_options
.insert(key.into(), value.into());
self.request.options.insert(key.into(), value.into());
self
}
@@ -740,9 +774,7 @@ impl ConnectBuilder {
pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
for (key, value) in pairs {
self.request
.storage_options
.insert(key.into(), value.into());
self.request.options.insert(key.into(), value.into());
}
self
}
@@ -772,19 +804,23 @@ impl ConnectBuilder {
#[cfg(feature = "remote")]
fn execute_remote(self) -> Result<Connection> {
let region = self.request.region.ok_or_else(|| Error::InvalidInput {
use crate::remote::db::RemoteDatabaseOptions;
let options = RemoteDatabaseOptions::parse_from_map(&self.request.options)?;
let region = options.region.ok_or_else(|| Error::InvalidInput {
message: "A region is required when connecting to LanceDb Cloud".to_string(),
})?;
let api_key = self.request.api_key.ok_or_else(|| Error::InvalidInput {
let api_key = options.api_key.ok_or_else(|| Error::InvalidInput {
message: "An api_key is required when connecting to LanceDb Cloud".to_string(),
})?;
let storage_options = StorageOptions(self.request.storage_options.clone());
let storage_options = StorageOptions(options.storage_options.clone());
let internal = Arc::new(crate::remote::db::RemoteDatabase::try_new(
&self.request.uri,
&api_key,
&region,
self.request.host_override,
options.host_override,
self.request.client_config,
storage_options.into(),
)?);
@@ -844,19 +880,16 @@ impl CatalogConnectBuilder {
Self {
request: ConnectRequest {
uri: uri.to_string(),
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
storage_options: HashMap::new(),
options: HashMap::new(),
},
}
}
pub fn catalog_options(mut self, catalog_options: &dyn CatalogOptions) -> Self {
catalog_options.serialize_into_map(&mut self.request.storage_options);
catalog_options.serialize_into_map(&mut self.request.options);
self
}
@@ -1036,6 +1069,7 @@ mod tests {
data_storage_version: Some(LanceFileVersion::Legacy),
..Default::default()
},
..Default::default()
})
.execute()
.await
@@ -1068,6 +1102,7 @@ mod tests {
data_storage_version: Some(LanceFileVersion::Stable),
..Default::default()
},
..Default::default()
})
.execute()
.await

View File

@@ -51,10 +51,22 @@ pub struct NewTableConfig {
pub struct ListingDatabaseOptions {
/// Controls what kind of Lance tables will be created by this database
pub new_table_config: NewTableConfig,
/// Storage options configure the storage layer (e.g. S3, GCS, Azure, etc.)
///
/// These are used to create/list tables and they are inherited by all tables
/// opened by this database.
///
/// See available options at <https://lancedb.github.io/lancedb/guides/storage/>
pub storage_options: HashMap<String, String>,
}
impl ListingDatabaseOptions {
fn parse_from_map(map: &HashMap<String, String>) -> Result<Self> {
/// Create a new builder for the listing database options
pub fn builder() -> ListingDatabaseOptionsBuilder {
ListingDatabaseOptionsBuilder::new()
}
pub(crate) fn parse_from_map(map: &HashMap<String, String>) -> Result<Self> {
let new_table_config = NewTableConfig {
data_storage_version: map
.get(OPT_NEW_TABLE_STORAGE_VERSION)
@@ -72,7 +84,19 @@ impl ListingDatabaseOptions {
})
.transpose()?,
};
Ok(Self { new_table_config })
// We just assume that any options that are not new table config options are storage options
let storage_options = map
.iter()
.filter(|(key, _)| {
key.as_str() != OPT_NEW_TABLE_STORAGE_VERSION
&& key.as_str() != OPT_NEW_TABLE_V2_MANIFEST_PATHS
})
.map(|(key, value)| (key.clone(), value.clone()))
.collect();
Ok(Self {
new_table_config,
storage_options,
})
}
}
@@ -93,6 +117,71 @@ impl DatabaseOptions for ListingDatabaseOptions {
}
}
#[derive(Clone, Debug, Default)]
pub struct ListingDatabaseOptionsBuilder {
options: ListingDatabaseOptions,
}
impl ListingDatabaseOptionsBuilder {
pub fn new() -> Self {
Self {
options: ListingDatabaseOptions::default(),
}
}
}
impl ListingDatabaseOptionsBuilder {
/// Set the storage version to use for new tables
///
/// # Arguments
///
/// * `data_storage_version` - The storage version to use for new tables
pub fn data_storage_version(mut self, data_storage_version: LanceFileVersion) -> Self {
self.options.new_table_config.data_storage_version = Some(data_storage_version);
self
}
/// Enable V2 manifest paths for new tables
///
/// # Arguments
///
/// * `enable_v2_manifest_paths` - Whether to enable V2 manifest paths for new tables
pub fn enable_v2_manifest_paths(mut self, enable_v2_manifest_paths: bool) -> Self {
self.options.new_table_config.enable_v2_manifest_paths = Some(enable_v2_manifest_paths);
self
}
/// Set an option for the storage layer.
///
/// See available options at <https://lancedb.github.io/lancedb/guides/storage/>
pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.options
.storage_options
.insert(key.into(), value.into());
self
}
/// Set multiple options for the storage layer.
///
/// See available options at <https://lancedb.github.io/lancedb/guides/storage/>
pub fn storage_options(
mut self,
pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
for (key, value) in pairs {
self.options
.storage_options
.insert(key.into(), value.into());
}
self
}
/// Build the options
pub fn build(self) -> ListingDatabaseOptions {
self.options
}
}
/// A database that stores tables in a flat directory structure
///
/// Tables are stored as directories in the base path of the object store.
@@ -164,7 +253,7 @@ impl ListingDatabase {
let uri = &request.uri;
let parse_res = url::Url::parse(uri);
let options = ListingDatabaseOptions::parse_from_map(&request.storage_options)?;
let options = ListingDatabaseOptions::parse_from_map(&request.options)?;
// TODO: pass params regardless of OS
match parse_res {
@@ -225,9 +314,8 @@ impl ListingDatabase {
let plain_uri = url.to_string();
let registry = Arc::new(ObjectStoreRegistry::default());
let storage_options = request.storage_options.clone();
let os_params = ObjectStoreParams {
storage_options: Some(storage_options.clone()),
storage_options: Some(options.storage_options.clone()),
..Default::default()
};
let (object_store, base_path) =
@@ -252,7 +340,7 @@ impl ListingDatabase {
object_store,
store_wrapper: write_store_wrapper,
read_consistency_interval: request.read_consistency_interval,
storage_options,
storage_options: options.storage_options,
new_table_config: options.new_table_config,
})
}

View File

@@ -18,3 +18,4 @@ const ARROW_FILE_CONTENT_TYPE: &str = "application/vnd.apache.arrow.file";
const JSON_CONTENT_TYPE: &str = "application/json";
pub use client::{ClientConfig, RetryConfig, TimeoutConfig};
pub use db::{RemoteDatabaseOptions, RemoteDatabaseOptionsBuilder};

View File

@@ -14,8 +14,8 @@ use serde::Deserialize;
use tokio::task::spawn_blocking;
use crate::database::{
CreateTableData, CreateTableMode, CreateTableRequest, Database, OpenTableRequest,
TableNamesRequest,
CreateTableData, CreateTableMode, CreateTableRequest, Database, DatabaseOptions,
OpenTableRequest, TableNamesRequest,
};
use crate::error::Result;
use crate::table::BaseTable;
@@ -54,6 +54,115 @@ impl ServerVersion {
}
}
pub const OPT_REMOTE_PREFIX: &str = "remote_database_";
pub const OPT_REMOTE_API_KEY: &str = "remote_database_api_key";
pub const OPT_REMOTE_REGION: &str = "remote_database_region";
pub const OPT_REMOTE_HOST_OVERRIDE: &str = "remote_database_host_override";
// TODO: add support for configuring client config via key/value options
#[derive(Clone, Debug, Default)]
pub struct RemoteDatabaseOptions {
/// The LanceDB Cloud API key
pub api_key: Option<String>,
/// The LanceDB Cloud region
pub region: Option<String>,
/// The LanceDB Enterprise host override
///
/// This is required when connecting to LanceDB Enterprise and should be
/// provided if using an on-premises LanceDB Enterprise instance.
pub host_override: Option<String>,
/// Storage options configure the storage layer (e.g. S3, GCS, Azure, etc.)
///
/// See available options at <https://lancedb.github.io/lancedb/guides/storage/>
///
/// These options are only used for LanceDB Enterprise and only a subset of options
/// are supported.
pub storage_options: HashMap<String, String>,
}
impl RemoteDatabaseOptions {
pub fn builder() -> RemoteDatabaseOptionsBuilder {
RemoteDatabaseOptionsBuilder::new()
}
pub(crate) fn parse_from_map(map: &HashMap<String, String>) -> Result<Self> {
let api_key = map.get(OPT_REMOTE_API_KEY).cloned();
let region = map.get(OPT_REMOTE_REGION).cloned();
let host_override = map.get(OPT_REMOTE_HOST_OVERRIDE).cloned();
let storage_options = map
.iter()
.filter(|(key, _)| !key.starts_with(OPT_REMOTE_PREFIX))
.map(|(key, value)| (key.clone(), value.clone()))
.collect();
Ok(Self {
api_key,
region,
host_override,
storage_options,
})
}
}
impl DatabaseOptions for RemoteDatabaseOptions {
fn serialize_into_map(&self, map: &mut HashMap<String, String>) {
for (key, value) in &self.storage_options {
map.insert(key.clone(), value.clone());
}
if let Some(api_key) = &self.api_key {
map.insert(OPT_REMOTE_API_KEY.to_string(), api_key.clone());
}
if let Some(region) = &self.region {
map.insert(OPT_REMOTE_REGION.to_string(), region.clone());
}
if let Some(host_override) = &self.host_override {
map.insert(OPT_REMOTE_HOST_OVERRIDE.to_string(), host_override.clone());
}
}
}
#[derive(Clone, Debug, Default)]
pub struct RemoteDatabaseOptionsBuilder {
options: RemoteDatabaseOptions,
}
impl RemoteDatabaseOptionsBuilder {
pub fn new() -> Self {
Self {
options: RemoteDatabaseOptions::default(),
}
}
/// Set the LanceDB Cloud API key
///
/// # Arguments
///
/// * `api_key` - The LanceDB Cloud API key
pub fn api_key(mut self, api_key: String) -> Self {
self.options.api_key = Some(api_key);
self
}
/// Set the LanceDB Cloud region
///
/// # Arguments
///
/// * `region` - The LanceDB Cloud region
pub fn region(mut self, region: String) -> Self {
self.options.region = Some(region);
self
}
/// Set the LanceDB Enterprise host override
///
/// # Arguments
///
/// * `host_override` - The LanceDB Enterprise host override
pub fn host_override(mut self, host_override: String) -> Self {
self.options.host_override = Some(host_override);
self
}
}
#[derive(Deserialize)]
struct ListTablesResponse {
tables: Vec<String>,