diff --git a/rust/lancedb/src/catalog.rs b/rust/lancedb/src/catalog.rs deleted file mode 100644 index a42269cf..00000000 --- a/rust/lancedb/src/catalog.rs +++ /dev/null @@ -1,86 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright The LanceDB Authors - -//! Catalog implementation for managing databases - -pub mod listing; - -use std::collections::HashMap; -use std::sync::Arc; - -use crate::database::Database; -use crate::error::Result; -use async_trait::async_trait; - -pub trait CatalogOptions { - fn serialize_into_map(&self, map: &mut HashMap); -} - -/// Request parameters for listing databases -#[derive(Clone, Debug, Default)] -pub struct DatabaseNamesRequest { - /// Start listing after this name (exclusive) - pub start_after: Option, - /// Maximum number of names to return - pub limit: Option, -} - -/// Request to open an existing database -#[derive(Clone, Debug)] -pub struct OpenDatabaseRequest { - /// The name of the database to open - pub name: String, - /// A map of database-specific options - /// - /// Consult the catalog / database implementation to determine which options are available - pub database_options: HashMap, -} - -/// Database creation mode -/// -/// The default behavior is Create -pub enum CreateDatabaseMode { - /// Create new database, error if exists - Create, - /// Open existing database if present - ExistOk, - /// Overwrite existing database - Overwrite, -} - -impl Default for CreateDatabaseMode { - fn default() -> Self { - Self::Create - } -} - -/// Request to create a new database -pub struct CreateDatabaseRequest { - /// The name of the database to create - pub name: String, - /// The creation mode - pub mode: CreateDatabaseMode, - /// A map of catalog-specific options, consult your catalog implementation to determine what's available - pub options: HashMap, -} - -#[async_trait] -pub trait Catalog: Send + Sync + std::fmt::Debug + 'static { - /// List database names with pagination - async fn database_names(&self, request: DatabaseNamesRequest) -> Result>; - - /// Create a new database - async fn create_database(&self, request: CreateDatabaseRequest) -> Result>; - - /// Open existing database - async fn open_database(&self, request: OpenDatabaseRequest) -> Result>; - - /// Rename database - async fn rename_database(&self, old_name: &str, new_name: &str) -> Result<()>; - - /// Delete database - async fn drop_database(&self, name: &str) -> Result<()>; - - /// Delete all databases - async fn drop_all_databases(&self) -> Result<()>; -} diff --git a/rust/lancedb/src/catalog/listing.rs b/rust/lancedb/src/catalog/listing.rs deleted file mode 100644 index b9604d62..00000000 --- a/rust/lancedb/src/catalog/listing.rs +++ /dev/null @@ -1,624 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright The LanceDB Authors - -//! Catalog implementation based on a local file system. - -use std::collections::HashMap; -use std::fs::create_dir_all; -use std::path::Path; -use std::sync::Arc; - -use super::{ - Catalog, CatalogOptions, CreateDatabaseMode, CreateDatabaseRequest, DatabaseNamesRequest, - OpenDatabaseRequest, -}; -use crate::connection::ConnectRequest; -use crate::database::listing::{ListingDatabase, ListingDatabaseOptions}; -use crate::database::{Database, DatabaseOptions}; -use crate::error::{CreateDirSnafu, Error, Result}; -use async_trait::async_trait; -use lance::io::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; -use lance_io::local::to_local_path; -use object_store::path::Path as ObjectStorePath; -use snafu::ResultExt; - -/// Options for the listing catalog -/// -/// Note: the catalog will use the `storage_options` configured on -/// db_options to configure storage for listing / creating / deleting -/// databases. -#[derive(Clone, Debug, Default)] -pub struct ListingCatalogOptions { - /// The options to use for databases opened by this catalog - /// - /// This also contains the storage options used by the catalog - pub db_options: ListingDatabaseOptions, -} - -impl CatalogOptions for ListingCatalogOptions { - fn serialize_into_map(&self, map: &mut HashMap) { - self.db_options.serialize_into_map(map); - } -} - -impl ListingCatalogOptions { - pub fn builder() -> ListingCatalogOptionsBuilder { - ListingCatalogOptionsBuilder::new() - } - - pub(crate) fn parse_from_map(map: &HashMap) -> Result { - let db_options = ListingDatabaseOptions::parse_from_map(map)?; - Ok(Self { db_options }) - } -} - -#[derive(Clone, Debug, Default)] -pub struct ListingCatalogOptionsBuilder { - options: ListingCatalogOptions, -} - -impl ListingCatalogOptionsBuilder { - pub fn new() -> Self { - Self { - options: ListingCatalogOptions::default(), - } - } - - pub fn db_options(mut self, db_options: ListingDatabaseOptions) -> Self { - self.options.db_options = db_options; - self - } - - pub fn build(self) -> ListingCatalogOptions { - self.options - } -} - -/// A catalog implementation that works by listing subfolders in a directory -/// -/// The listing catalog will be created with a base folder specified by the URI. Every subfolder -/// in this base folder will be considered a database. These will be opened as a -/// [`crate::database::listing::ListingDatabase`] -#[derive(Debug)] -pub struct ListingCatalog { - object_store: Arc, - - uri: String, - - base_path: ObjectStorePath, - - options: ListingCatalogOptions, -} - -impl ListingCatalog { - /// Try to create a local directory to store the lancedb dataset - pub fn try_create_dir(path: &str) -> core::result::Result<(), std::io::Error> { - let path = Path::new(path); - if !path.try_exists()? { - create_dir_all(path)?; - } - Ok(()) - } - - pub fn uri(&self) -> &str { - &self.uri - } - - async fn open_path(path: &str) -> Result { - let (object_store, base_path) = ObjectStore::from_uri(path).await?; - if object_store.is_local() { - Self::try_create_dir(path).context(CreateDirSnafu { path })?; - } - - Ok(Self { - uri: path.to_string(), - base_path, - object_store, - options: ListingCatalogOptions::default(), - }) - } - - pub async fn connect(request: &ConnectRequest) -> Result { - let uri = &request.uri; - let parse_res = url::Url::parse(uri); - - let options = ListingCatalogOptions::parse_from_map(&request.options)?; - - match parse_res { - Ok(url) if url.scheme().len() == 1 && cfg!(windows) => Self::open_path(uri).await, - Ok(url) => { - let plain_uri = url.to_string(); - - let registry = Arc::new(ObjectStoreRegistry::default()); - let storage_options = options.db_options.storage_options.clone(); - let os_params = ObjectStoreParams { - storage_options: Some(storage_options.clone()), - ..Default::default() - }; - let (object_store, base_path) = - ObjectStore::from_uri_and_params(registry, &plain_uri, &os_params).await?; - if object_store.is_local() { - Self::try_create_dir(&plain_uri).context(CreateDirSnafu { path: plain_uri })?; - } - - Ok(Self { - uri: String::from(url.clone()), - base_path, - object_store, - options, - }) - } - Err(_) => Self::open_path(uri).await, - } - } - - fn database_path(&self, name: &str) -> ObjectStorePath { - self.base_path.child(name.replace('\\', "/")) - } -} - -#[async_trait] -impl Catalog for ListingCatalog { - async fn database_names(&self, request: DatabaseNamesRequest) -> Result> { - let mut f = self - .object_store - .read_dir(self.base_path.clone()) - .await? - .iter() - .map(Path::new) - .filter_map(|p| p.file_name().and_then(|s| s.to_str().map(String::from))) - .collect::>(); - f.sort(); - - if let Some(start_after) = request.start_after { - let index = f - .iter() - .position(|name| name.as_str() > start_after.as_str()) - .unwrap_or(f.len()); - f.drain(0..index); - } - if let Some(limit) = request.limit { - f.truncate(limit as usize); - } - Ok(f) - } - - async fn create_database(&self, request: CreateDatabaseRequest) -> Result> { - let db_path = self.database_path(&request.name); - let db_path_str = to_local_path(&db_path); - let exists = Path::new(&db_path_str).exists(); - - match request.mode { - CreateDatabaseMode::Create if exists => { - return Err(Error::DatabaseAlreadyExists { name: request.name }) - } - CreateDatabaseMode::Create => { - create_dir_all(db_path.to_string()).unwrap(); - } - CreateDatabaseMode::ExistOk => { - if !exists { - create_dir_all(db_path.to_string()).unwrap(); - } - } - CreateDatabaseMode::Overwrite => { - if exists { - self.drop_database(&request.name).await?; - } - create_dir_all(db_path.to_string()).unwrap(); - } - } - - let db_uri = format!("/{}/{}", self.base_path, request.name); - - let mut connect_request = ConnectRequest { - uri: db_uri, - #[cfg(feature = "remote")] - client_config: Default::default(), - read_consistency_interval: None, - options: Default::default(), - session: None, - }; - - // Add the db options to the connect request - self.options - .db_options - .serialize_into_map(&mut connect_request.options); - - Ok(Arc::new( - ListingDatabase::connect_with_options(&connect_request).await?, - )) - } - - async fn open_database(&self, request: OpenDatabaseRequest) -> Result> { - let db_path = self.database_path(&request.name); - - let db_path_str = to_local_path(&db_path); - let exists = Path::new(&db_path_str).exists(); - if !exists { - return Err(Error::DatabaseNotFound { name: request.name }); - } - - let mut connect_request = ConnectRequest { - uri: db_path.to_string(), - #[cfg(feature = "remote")] - client_config: Default::default(), - read_consistency_interval: None, - options: Default::default(), - session: None, - }; - - // Add the db options to the connect request - self.options - .db_options - .serialize_into_map(&mut connect_request.options); - - Ok(Arc::new( - ListingDatabase::connect_with_options(&connect_request).await?, - )) - } - - async fn rename_database(&self, _old_name: &str, _new_name: &str) -> Result<()> { - Err(Error::NotSupported { - message: "rename_database is not supported in LanceDB OSS yet".to_string(), - }) - } - - async fn drop_database(&self, name: &str) -> Result<()> { - let db_path = self.database_path(name); - self.object_store - .remove_dir_all(db_path.clone()) - .await - .map_err(|err| match err { - lance::Error::NotFound { .. } => Error::DatabaseNotFound { - name: name.to_owned(), - }, - _ => Error::from(err), - })?; - - Ok(()) - } - - async fn drop_all_databases(&self) -> Result<()> { - self.object_store - .remove_dir_all(self.base_path.clone()) - .await?; - Ok(()) - } -} - -#[cfg(all(test, not(windows)))] -mod tests { - use super::*; - - /// file:/// URIs with drive letters do not work correctly on Windows - #[cfg(windows)] - fn path_to_uri(path: PathBuf) -> String { - path.to_str().unwrap().to_string() - } - - #[cfg(not(windows))] - fn path_to_uri(path: PathBuf) -> String { - Url::from_file_path(path).unwrap().to_string() - } - - async fn setup_catalog() -> (TempDir, ListingCatalog) { - let tempdir = tempfile::tempdir().unwrap(); - let catalog_path = tempdir.path().join("catalog"); - std::fs::create_dir_all(&catalog_path).unwrap(); - - let uri = path_to_uri(catalog_path); - - let request = ConnectRequest { - uri: uri.clone(), - #[cfg(feature = "remote")] - client_config: Default::default(), - options: Default::default(), - read_consistency_interval: None, - session: None, - }; - - let catalog = ListingCatalog::connect(&request).await.unwrap(); - - (tempdir, catalog) - } - - use crate::database::{CreateTableData, CreateTableRequest, TableNamesRequest}; - use crate::table::TableDefinition; - use arrow_schema::Field; - use std::path::PathBuf; - use std::sync::Arc; - use tempfile::{tempdir, TempDir}; - use url::Url; - - #[tokio::test] - async fn test_database_names() { - let (_tempdir, catalog) = setup_catalog().await; - - let names = catalog - .database_names(DatabaseNamesRequest::default()) - .await - .unwrap(); - assert!(names.is_empty()); - } - - #[tokio::test] - async fn test_create_database() { - let (_tempdir, catalog) = setup_catalog().await; - - catalog - .create_database(CreateDatabaseRequest { - name: "db1".into(), - mode: CreateDatabaseMode::Create, - options: HashMap::new(), - }) - .await - .unwrap(); - - let names = catalog - .database_names(DatabaseNamesRequest::default()) - .await - .unwrap(); - assert_eq!(names, vec!["db1"]); - } - - #[tokio::test] - async fn test_create_database_exist_ok() { - let (_tempdir, catalog) = setup_catalog().await; - - let db1 = catalog - .create_database(CreateDatabaseRequest { - name: "db_exist_ok".into(), - mode: CreateDatabaseMode::ExistOk, - options: HashMap::new(), - }) - .await - .unwrap(); - let dummy_schema = Arc::new(arrow_schema::Schema::new(Vec::::default())); - db1.create_table(CreateTableRequest { - name: "test_table".parse().unwrap(), - data: CreateTableData::Empty(TableDefinition::new_from_schema(dummy_schema)), - mode: Default::default(), - write_options: Default::default(), - namespace: vec![], - }) - .await - .unwrap(); - - let db2 = catalog - .create_database(CreateDatabaseRequest { - name: "db_exist_ok".into(), - mode: CreateDatabaseMode::ExistOk, - options: HashMap::new(), - }) - .await - .unwrap(); - - let tables = db2.table_names(TableNamesRequest::default()).await.unwrap(); - assert_eq!(tables, vec!["test_table".to_string()]); - } - - #[tokio::test] - async fn test_create_database_overwrite() { - let (_tempdir, catalog) = setup_catalog().await; - - let db = catalog - .create_database(CreateDatabaseRequest { - name: "db_overwrite".into(), - mode: CreateDatabaseMode::Create, - options: HashMap::new(), - }) - .await - .unwrap(); - let dummy_schema = Arc::new(arrow_schema::Schema::new(Vec::::default())); - db.create_table(CreateTableRequest { - name: "old_table".parse().unwrap(), - data: CreateTableData::Empty(TableDefinition::new_from_schema(dummy_schema)), - mode: Default::default(), - write_options: Default::default(), - namespace: vec![], - }) - .await - .unwrap(); - let tables = db.table_names(TableNamesRequest::default()).await.unwrap(); - assert!(!tables.is_empty()); - - let new_db = catalog - .create_database(CreateDatabaseRequest { - name: "db_overwrite".into(), - mode: CreateDatabaseMode::Overwrite, - options: HashMap::new(), - }) - .await - .unwrap(); - - let tables = new_db - .table_names(TableNamesRequest::default()) - .await - .unwrap(); - assert!(tables.is_empty()); - } - - #[tokio::test] - async fn test_create_database_overwrite_non_existing() { - let (_tempdir, catalog) = setup_catalog().await; - - catalog - .create_database(CreateDatabaseRequest { - name: "new_db".into(), - mode: CreateDatabaseMode::Overwrite, - options: HashMap::new(), - }) - .await - .unwrap(); - - let names = catalog - .database_names(DatabaseNamesRequest::default()) - .await - .unwrap(); - assert!(names.contains(&"new_db".to_string())); - } - - #[tokio::test] - async fn test_open_database() { - let (_tempdir, catalog) = setup_catalog().await; - - // Test open non-existent - let result = catalog - .open_database(OpenDatabaseRequest { - name: "missing".into(), - database_options: HashMap::new(), - }) - .await; - assert!(matches!( - result.unwrap_err(), - Error::DatabaseNotFound { name } if name == "missing" - )); - - // Create and open - catalog - .create_database(CreateDatabaseRequest { - name: "valid_db".into(), - mode: CreateDatabaseMode::Create, - options: HashMap::new(), - }) - .await - .unwrap(); - - let db = catalog - .open_database(OpenDatabaseRequest { - name: "valid_db".into(), - database_options: HashMap::new(), - }) - .await - .unwrap(); - assert_eq!( - db.table_names(TableNamesRequest::default()).await.unwrap(), - Vec::::new() - ); - } - - #[tokio::test] - async fn test_drop_database() { - let (_tempdir, catalog) = setup_catalog().await; - - // Create test database - catalog - .create_database(CreateDatabaseRequest { - name: "to_drop".into(), - mode: CreateDatabaseMode::Create, - options: HashMap::new(), - }) - .await - .unwrap(); - - let names = catalog - .database_names(DatabaseNamesRequest::default()) - .await - .unwrap(); - assert!(!names.is_empty()); - - // Drop database - catalog.drop_database("to_drop").await.unwrap(); - - let names = catalog - .database_names(DatabaseNamesRequest::default()) - .await - .unwrap(); - assert!(names.is_empty()); - } - - #[tokio::test] - async fn test_drop_all_databases() { - let (_tempdir, catalog) = setup_catalog().await; - - catalog - .create_database(CreateDatabaseRequest { - name: "db1".into(), - mode: CreateDatabaseMode::Create, - options: HashMap::new(), - }) - .await - .unwrap(); - catalog - .create_database(CreateDatabaseRequest { - name: "db2".into(), - mode: CreateDatabaseMode::Create, - options: HashMap::new(), - }) - .await - .unwrap(); - - catalog.drop_all_databases().await.unwrap(); - - let names = catalog - .database_names(DatabaseNamesRequest::default()) - .await - .unwrap(); - assert!(names.is_empty()); - } - - #[tokio::test] - async fn test_rename_database_unsupported() { - let (_tempdir, catalog) = setup_catalog().await; - let result = catalog.rename_database("old", "new").await; - assert!(matches!( - result.unwrap_err(), - Error::NotSupported { message } if message.contains("rename_database") - )); - } - - #[tokio::test] - async fn test_connect_local_path() { - let tmp_dir = tempdir().unwrap(); - let path = tmp_dir.path().to_str().unwrap(); - - let request = ConnectRequest { - uri: path.to_string(), - #[cfg(feature = "remote")] - client_config: Default::default(), - options: Default::default(), - read_consistency_interval: None, - session: None, - }; - - let catalog = ListingCatalog::connect(&request).await.unwrap(); - assert!(catalog.object_store.is_local()); - assert_eq!(catalog.uri, path); - } - - #[tokio::test] - async fn test_connect_file_scheme() { - let tmp_dir = tempdir().unwrap(); - let path = tmp_dir.path(); - let uri = path_to_uri(path.to_path_buf()); - - let request = ConnectRequest { - uri: uri.clone(), - #[cfg(feature = "remote")] - client_config: Default::default(), - options: Default::default(), - read_consistency_interval: None, - session: None, - }; - - let catalog = ListingCatalog::connect(&request).await.unwrap(); - assert!(catalog.object_store.is_local()); - assert_eq!(catalog.uri, uri); - } - - #[tokio::test] - async fn test_connect_invalid_uri_fallback() { - let invalid_uri = "invalid:///path"; - let request = ConnectRequest { - uri: invalid_uri.to_string(), - #[cfg(feature = "remote")] - client_config: Default::default(), - options: Default::default(), - read_consistency_interval: None, - session: None, - }; - - let result = ListingCatalog::connect(&request).await; - assert!(result.is_err()); - } -} diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index ca0d85a3..a4df666b 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -13,8 +13,6 @@ use lance::dataset::ReadParams; use object_store::aws::AwsCredential; use crate::arrow::{IntoArrow, IntoArrowStream, SendableRecordBatchStream}; -use crate::catalog::listing::ListingCatalog; -use crate::catalog::CatalogOptions; use crate::database::listing::{ ListingDatabase, OPT_NEW_TABLE_STORAGE_VERSION, OPT_NEW_TABLE_V2_MANIFEST_PATHS, }; @@ -660,7 +658,7 @@ pub struct ConnectRequest { #[cfg(feature = "remote")] pub client_config: ClientConfig, - /// Database/Catalog specific options + /// Database specific options pub options: HashMap, /// The interval at which to check for updates from other processes. @@ -937,50 +935,6 @@ pub fn connect(uri: &str) -> ConnectBuilder { ConnectBuilder::new(uri) } -/// A builder for configuring a connection to a LanceDB catalog -#[derive(Debug)] -pub struct CatalogConnectBuilder { - request: ConnectRequest, -} - -impl CatalogConnectBuilder { - /// Create a new [`CatalogConnectBuilder`] with the given catalog URI. - pub fn new(uri: &str) -> Self { - Self { - request: ConnectRequest { - uri: uri.to_string(), - #[cfg(feature = "remote")] - client_config: Default::default(), - read_consistency_interval: None, - options: HashMap::new(), - session: None, - }, - } - } - - pub fn catalog_options(mut self, catalog_options: &dyn CatalogOptions) -> Self { - catalog_options.serialize_into_map(&mut self.request.options); - self - } - - /// Establishes a connection to the catalog - pub async fn execute(self) -> Result> { - let catalog = ListingCatalog::connect(&self.request).await?; - Ok(Arc::new(catalog)) - } -} - -/// Connect to a LanceDB catalog. -/// -/// A catalog is a container for databases, which in turn are containers for tables. -/// -/// # Arguments -/// -/// * `uri` - URI where the catalog is located, can be a local directory or supported remote cloud storage. -pub fn connect_catalog(uri: &str) -> CatalogConnectBuilder { - CatalogConnectBuilder::new(uri) -} - #[cfg(all(test, feature = "remote"))] mod test_utils { use super::*; @@ -1022,7 +976,6 @@ mod test_utils { mod tests { use std::fs::create_dir_all; - use crate::catalog::{Catalog, DatabaseNamesRequest, OpenDatabaseRequest}; use crate::database::listing::{ListingDatabaseOptions, NewTableConfig}; use crate::query::QueryBase; use crate::query::{ExecutableQuery, QueryExecutionOptions}; @@ -1328,91 +1281,4 @@ mod tests { .unwrap(); assert_eq!(other_schema, overwritten.schema().await.unwrap()); } - - #[tokio::test] - async fn test_connect_catalog() { - let tmp_dir = tempdir().unwrap(); - let uri = tmp_dir.path().to_str().unwrap(); - let catalog = connect_catalog(uri).execute().await.unwrap(); - - // Verify that we can get the uri from the catalog - let catalog_uri = catalog.uri(); - assert_eq!(catalog_uri, uri); - - // Check that the catalog is initially empty - let dbs = catalog - .database_names(DatabaseNamesRequest::default()) - .await - .unwrap(); - assert_eq!(dbs.len(), 0); - } - - #[tokio::test] - #[cfg(not(windows))] - async fn test_catalog_create_database() { - let tmp_dir = tempdir().unwrap(); - let uri = tmp_dir.path().to_str().unwrap(); - let catalog = connect_catalog(uri).execute().await.unwrap(); - - let db_name = "test_db"; - catalog - .create_database(crate::catalog::CreateDatabaseRequest { - name: db_name.to_string(), - mode: Default::default(), - options: Default::default(), - }) - .await - .unwrap(); - - let dbs = catalog - .database_names(DatabaseNamesRequest::default()) - .await - .unwrap(); - assert_eq!(dbs.len(), 1); - assert_eq!(dbs[0], db_name); - - let db = catalog - .open_database(OpenDatabaseRequest { - name: db_name.to_string(), - database_options: HashMap::new(), - }) - .await - .unwrap(); - - let tables = db.table_names(Default::default()).await.unwrap(); - assert_eq!(tables.len(), 0); - } - - #[tokio::test] - #[cfg(not(windows))] - async fn test_catalog_drop_database() { - let tmp_dir = tempdir().unwrap(); - let uri = tmp_dir.path().to_str().unwrap(); - let catalog = connect_catalog(uri).execute().await.unwrap(); - - // Create and then drop a database - let db_name = "test_db_to_drop"; - catalog - .create_database(crate::catalog::CreateDatabaseRequest { - name: db_name.to_string(), - mode: Default::default(), - options: Default::default(), - }) - .await - .unwrap(); - - let dbs = catalog - .database_names(DatabaseNamesRequest::default()) - .await - .unwrap(); - assert_eq!(dbs.len(), 1); - - catalog.drop_database(db_name).await.unwrap(); - - let dbs_after = catalog - .database_names(DatabaseNamesRequest::default()) - .await - .unwrap(); - assert_eq!(dbs_after.len(), 0); - } } diff --git a/rust/lancedb/src/lib.rs b/rust/lancedb/src/lib.rs index e2dc1794..5f9f7ac6 100644 --- a/rust/lancedb/src/lib.rs +++ b/rust/lancedb/src/lib.rs @@ -191,7 +191,6 @@ //! ``` pub mod arrow; -pub mod catalog; pub mod connection; pub mod data; pub mod database;