diff --git a/rust/lancedb/src/database/listing.rs b/rust/lancedb/src/database/listing.rs index f31274698..54f92622d 100644 --- a/rust/lancedb/src/database/listing.rs +++ b/rust/lancedb/src/database/listing.rs @@ -14,7 +14,6 @@ use lance::io::{ObjectStore, ObjectStoreParams, WrappingObjectStore}; use lance_datafusion::utils::StreamingWriteSource; use lance_encoding::version::LanceFileVersion; use lance_io::object_store::{StorageOptionsAccessor, StorageOptionsProvider}; -use lance_table::io::commit::commit_handler_from_url; use object_store::local::LocalFileSystem; use snafu::ResultExt; @@ -235,11 +234,9 @@ impl ListingDatabaseOptionsBuilder { /// We will have two tables named `table1` and `table2`. #[derive(Debug)] pub struct ListingDatabase { - object_store: Arc, query_string: Option, pub(crate) uri: String, - pub(crate) base_path: object_store::path::Path, // the object store wrapper to use on write path pub(crate) store_wrapper: Option>, @@ -258,8 +255,13 @@ pub struct ListingDatabase { // Session for object stores and caching session: Arc, - // Namespace-backed database for child namespace operations + // Namespace-backed database for child namespace operations (manifest mode). namespace_database: Arc, + + // V1 (manifest-disabled) directory namespace for root table lifecycle, so root + // drops are soft-deletes and purge/table_status are available. Shares the same root + // as `namespace_database` but in directory mode. + root_namespace_database: Arc, } impl std::fmt::Display for ListingDatabase { @@ -280,7 +282,6 @@ impl std::fmt::Display for ListingDatabase { } } -const LANCE_EXTENSION: &str = "lance"; const ENGINE: &str = "engine"; const MIRRORED_STORE: &str = "mirroredStore"; @@ -342,6 +343,39 @@ impl ListingDatabase { )) } + /// Build the V1 (manifest-disabled) directory namespace used for *root* table + /// lifecycle ops. + /// + /// Root tables in a listing database are flat `.lance` directories; soft-delete + /// (drop/purge/TTL) is a V1-only mechanism, so root ops go through this namespace. + /// Child namespaces are manifest-backed and handled by the separate + /// (manifest-enabled) `namespace_database`. + async fn connect_root_namespace_database( + uri: &str, + storage_options: HashMap, + namespace_client_properties: HashMap, + read_consistency_interval: Option, + session: Arc, + ) -> Result> { + let mut ns_properties = Self::build_namespace_client_properties( + uri, + &storage_options, + namespace_client_properties, + ); + ns_properties.insert("manifest_enabled".to_string(), "false".to_string()); + Ok(Arc::new( + LanceNamespaceDatabase::connect( + "dir", + ns_properties, + storage_options, + read_consistency_interval, + Some(session), + HashSet::new(), + ) + .await?, + )) + } + async fn prepare_namespace_root( uri: &str, storage_options: &HashMap, @@ -548,7 +582,7 @@ impl ListingDatabase { }, ..Default::default() }; - let (object_store, base_path) = ObjectStore::from_uri_and_params( + let (object_store, _base_path) = ObjectStore::from_uri_and_params( session.store_registry(), &plain_uri, &os_params, @@ -577,12 +611,18 @@ impl ListingDatabase { session.clone(), ) .await?; + let root_namespace_database = Self::connect_root_namespace_database( + &table_base_uri, + options.storage_options.clone(), + request.namespace_client_properties.clone(), + request.read_consistency_interval, + session.clone(), + ) + .await?; Ok(Self { uri: table_base_uri, query_string, - base_path, - object_store, store_wrapper: write_store_wrapper, read_consistency_interval: request.read_consistency_interval, storage_options: options.storage_options, @@ -590,6 +630,7 @@ impl ListingDatabase { new_table_config: options.new_table_config, session, namespace_database, + root_namespace_database, }) } Err(_) => { @@ -613,7 +654,7 @@ impl ListingDatabase { session: Option>, ) -> Result { let session = session.unwrap_or_else(|| Arc::new(lance::session::Session::default())); - let (object_store, base_path) = ObjectStore::from_uri_and_params( + let (object_store, _base_path) = ObjectStore::from_uri_and_params( session.store_registry(), path, &ObjectStoreParams::default(), @@ -624,6 +665,14 @@ impl ListingDatabase { } let namespace_database = Self::connect_namespace_database( + path, + HashMap::new(), + namespace_client_properties.clone(), + read_consistency_interval, + session.clone(), + ) + .await?; + let root_namespace_database = Self::connect_root_namespace_database( path, HashMap::new(), namespace_client_properties, @@ -635,8 +684,6 @@ impl ListingDatabase { Ok(Self { uri: path.to_string(), query_string: None, - base_path, - object_store, store_wrapper: None, read_consistency_interval, storage_options: HashMap::new(), @@ -644,6 +691,7 @@ impl ListingDatabase { new_table_config, session, namespace_database, + root_namespace_database, }) } @@ -705,42 +753,10 @@ impl ListingDatabase { self.namespace_database.clone() } - async fn drop_tables(&self, names: Vec) -> Result<()> { - let object_store_params = ObjectStoreParams { - storage_options_accessor: if self.storage_options.is_empty() { - None - } else { - Some(Arc::new(StorageOptionsAccessor::with_static_options( - self.storage_options.clone(), - ))) - }, - ..Default::default() - }; - let mut uri = self.uri.clone(); - if let Some(query_string) = &self.query_string { - uri.push_str(&format!("?{}", query_string)); - } - let commit_handler = commit_handler_from_url(&uri, &Some(object_store_params)).await?; - for name in names { - let dir_name = format!("{}.{}", name, LANCE_EXTENSION); - let full_path = self.base_path.clone().join(dir_name.clone()); - - commit_handler.delete(&full_path).await?; - - self.object_store - .remove_dir_all(full_path.clone()) - .await - .map_err(|err| match err { - // this error is not lance::Error::DatasetNotFound, as the method - // `remove_dir_all` may be used to remove something not be a dataset - lance::Error::NotFound { .. } => Error::TableNotFound { - name: name.clone(), - source: Box::new(err), - }, - _ => Error::from(err), - })?; - } - Ok(()) + /// The V1 directory namespace used for root table lifecycle (soft-delete drop, purge, + /// table_status, O(1) listing). + fn root_namespace_database(&self) -> Arc { + self.root_namespace_database.clone() } /// Inherit storage options from the connection into the target map @@ -946,88 +962,43 @@ impl Database for ListingDatabase { if !request.namespace_path.is_empty() { return self.namespace_database().table_names(request).await; } - let mut f = self - .object_store - .read_dir(self.base_path.clone()) - .await? - .iter() - .map(Path::new) - .filter(|path| { - let is_lance = path - .extension() - .and_then(|e| e.to_str()) - .map(|e| e == LANCE_EXTENSION); - is_lance.unwrap_or(false) - }) - .filter_map(|p| p.file_stem().and_then(|s| s.to_str().map(String::from))) - .collect::>(); - f.sort(); - if let Some(start_after) = request.start_after { - let index = f - .iter() - .position(|name| name.as_str() > start_after.as_str()) - .unwrap_or(f.len()); - f.drain(0..index); - } - if let Some(limit) = request.limit { - f.truncate(limit as usize); - } - Ok(f) + // Root tables: the V1 namespace lists them in a single read_dir (O(1) requests) + // and excludes soft-deleted tables, instead of a per-table probe here. + self.root_namespace_database().table_names(request).await } async fn list_tables(&self, request: ListTablesRequest) -> Result { if request.id.as_ref().map(|v| !v.is_empty()).unwrap_or(false) { return self.namespace_database().list_tables(request).await; } - let mut f = self - .object_store - .read_dir(self.base_path.clone()) - .await? - .iter() - .map(Path::new) - .filter(|path| { - let is_lance = path - .extension() - .and_then(|e| e.to_str()) - .map(|e| e == LANCE_EXTENSION); - is_lance.unwrap_or(false) - }) - .filter_map(|p| p.file_stem().and_then(|s| s.to_str().map(String::from))) - .collect::>(); - f.sort(); - - // Handle pagination with page_token - if let Some(ref page_token) = request.page_token { - let index = f - .iter() - .position(|name| name.as_str() > page_token.as_str()) - .unwrap_or(f.len()); - f.drain(0..index); - } - - // Determine if there's a next page - let next_page_token = if let Some(limit) = request.limit { - if f.len() > limit as usize { - let token = f[limit as usize].clone(); - f.truncate(limit as usize); - Some(token) - } else { - None - } - } else { - None - }; - - Ok(ListTablesResponse { - tables: f, - page_token: next_page_token, - }) + self.root_namespace_database().list_tables(request).await } async fn create_table(&self, request: CreateTableRequest) -> Result> { if !request.namespace_path.is_empty() { return self.namespace_database().create_table(request).await; } + let mut request = request; + // Re-creating a soft-deleted table is a revive: clear the delete marker (via the + // V1 root namespace, under its lifecycle lock so a concurrent purge can't race), + // making the table live again, then overwrite its data through the native create + // path below (preserving lineage as a new version). A plain native create would + // leave the marker in place, keeping the table hidden. + if matches!( + self.root_namespace_database() + .namespace_client() + .await? + .table_status(Some(vec![request.name.clone()])) + .await?, + lance_namespace::TableLifecycle::SoftDeleted { .. } + ) { + self.root_namespace_database() + .namespace_client() + .await? + .undelete_table(Some(vec![request.name.clone()])) + .await?; + request.mode = CreateTableMode::Overwrite; + } // Use provided location if available, otherwise derive from table name let table_uri = request .location @@ -1146,6 +1117,19 @@ impl Database for ListingDatabase { if !request.namespace_path.is_empty() { return self.namespace_database().open_table(request).await; } + // A soft-deleted (dropped-but-not-purged) table must read as absent even though + // its data still exists on disk. Consult the V1 root namespace (which owns the + // marker); if soft-deleted, route to it so the open surfaces TableNotFound. + if matches!( + self.root_namespace_database() + .namespace_client() + .await? + .table_status(Some(vec![request.name.clone()])) + .await?, + lance_namespace::TableLifecycle::SoftDeleted { .. } + ) { + return self.root_namespace_database().open_table(request).await; + } // Use provided location if available, otherwise derive from table name let table_uri = request .location @@ -1245,20 +1229,23 @@ impl Database for ListingDatabase { .drop_table(name, namespace_path) .await; } - self.drop_tables(vec![name.to_string()]).await + // Root table: route through the V1 namespace so the drop is a soft-delete (writes + // a marker, leaves data for later purge) rather than an immediate remove_dir_all. + self.root_namespace_database() + .drop_table(name, namespace_path) + .await } - #[allow(deprecated)] async fn drop_all_tables(&self, namespace_path: &[String]) -> Result<()> { - // Check if namespace parameter is provided if !namespace_path.is_empty() { return self .namespace_database() .drop_all_tables(namespace_path) .await; } - let tables = self.table_names(TableNamesRequest::default()).await?; - self.drop_tables(tables).await + self.root_namespace_database() + .drop_all_tables(namespace_path) + .await } fn as_any(&self) -> &dyn std::any::Any { @@ -1266,6 +1253,9 @@ impl Database for ListingDatabase { } async fn namespace_client(&self) -> Result> { + // Returns the manifest-backed namespace so callers can operate on child + // namespaces (multi-level table ids) through the client. Root-table soft-delete + // lifecycle (table_status/purge) is reached via the V1 root namespace internally. self.namespace_database.namespace_client().await } @@ -2615,4 +2605,154 @@ mod tests { .unwrap(); assert!(post_drop.tables.is_empty()); } + + /// Root-table drop is a soft-delete routed through the V1 namespace: the table is + /// hidden from listing/open but its data survives until purged, and re-creating it + /// revives it. Verifies the consolidation end-to-end at the ListingDatabase level. + #[tokio::test] + async fn test_root_table_soft_delete_lifecycle() { + let (_tempdir, db) = setup_database().await; + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let create = |name: &str| CreateTableRequest { + name: name.to_string(), + namespace_path: vec![], + data: Box::new(RecordBatch::new_empty(schema.clone())) as Box, + mode: CreateTableMode::Create, + write_options: Default::default(), + location: None, + namespace_client: None, + }; + let open = |name: &str| OpenTableRequest { + name: name.to_string(), + namespace_path: vec![], + index_cache_size: None, + lance_read_params: None, + location: None, + namespace_client: None, + managed_versioning: None, + }; + + db.create_table(create("t")).await.unwrap(); + db.drop_table("t", &[]).await.unwrap(); + + // Hidden from listing and not openable... + #[allow(deprecated)] + let names = db.table_names(TableNamesRequest::default()).await.unwrap(); + assert!(!names.contains(&"t".to_string())); + assert!(matches!( + db.open_table(open("t")).await, + Err(Error::TableNotFound { .. }) + )); + + // ...but data survives: it shows up as purgable via the V1 root namespace. + let root_ns = db + .root_namespace_database() + .namespace_client() + .await + .unwrap(); + let purgable = root_ns.list_purgable_tables(None).await.unwrap(); + assert_eq!(purgable.len(), 1); + assert_eq!(purgable[0].id, vec!["t".to_string()]); + + // Re-creating revives it. + db.create_table(create("t")).await.unwrap(); + db.open_table(open("t")).await.unwrap(); + #[allow(deprecated)] + let names = db.table_names(TableNamesRequest::default()).await.unwrap(); + assert!(names.contains(&"t".to_string())); + assert!(root_ns.list_purgable_tables(None).await.unwrap().is_empty()); + + // Drop then purge reclaims it for good. + db.drop_table("t", &[]).await.unwrap(); + let purged = root_ns.purge_tables(None).await.unwrap(); + assert_eq!(purged, vec![vec!["t".to_string()]]); + assert!(root_ns.list_purgable_tables(None).await.unwrap().is_empty()); + } + + /// Migration: a table written by the *old* path (a plain `/.lance` + /// dataset directory with no namespace markers — exactly what a pre-existing listing + /// database looks like on disk) is fully usable by the new embedded-namespace routing. + /// It lists, opens, and reads normally, and dropping it is now a soft-delete (data + /// survives on disk) rather than an immediate removal. + #[tokio::test] + async fn test_legacy_directory_layout_migrates() { + use arrow_array::RecordBatchIterator; + use lance::dataset::{Dataset, WriteParams}; + + let tempdir = tempdir().unwrap(); + let root = tempdir.path().to_str().unwrap().to_string(); + + // Write a dataset the way the pre-namespace path did: raw lance, no declare/reserve + // marker, no manifest — just `/legacy_table.lance`. + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + let legacy_uri = format!("{}/legacy_table.lance", root); + Dataset::write( + RecordBatchIterator::new(vec![Ok(batch)], schema.clone()), + &legacy_uri, + Some(WriteParams::default()), + ) + .await + .unwrap(); + + // Connect the new ListingDatabase over that pre-existing directory. + let request = ConnectRequest { + uri: root.clone(), + #[cfg(feature = "remote")] + client_config: Default::default(), + options: Default::default(), + namespace_client_properties: Default::default(), + manifest_enabled: false, + read_consistency_interval: None, + session: None, + }; + let db = ListingDatabase::connect_with_options(&request) + .await + .unwrap(); + + let open = |name: &str| OpenTableRequest { + name: name.to_string(), + namespace_path: vec![], + index_cache_size: None, + lance_read_params: None, + location: None, + namespace_client: None, + managed_versioning: None, + }; + + // The legacy table is listed, opens, and reads its rows. + #[allow(deprecated)] + let names = db.table_names(TableNamesRequest::default()).await.unwrap(); + assert!(names.contains(&"legacy_table".to_string())); + let table = db.open_table(open("legacy_table")).await.unwrap(); + assert_eq!(table.count_rows(None).await.unwrap(), 3); + + // Dropping it is now a soft-delete: hidden from listing / not openable, but the + // dataset directory still exists on disk (reclaimed only by a later purge). + db.drop_table("legacy_table", &[]).await.unwrap(); + #[allow(deprecated)] + let names = db.table_names(TableNamesRequest::default()).await.unwrap(); + assert!(!names.contains(&"legacy_table".to_string())); + assert!(db.open_table(open("legacy_table")).await.is_err()); + assert!( + PathBuf::from(&legacy_uri).exists(), + "soft-delete must leave the legacy dataset directory intact" + ); + + // And it can be revived by re-creating the name. + let root_ns = db + .root_namespace_database() + .namespace_client() + .await + .unwrap(); + assert_eq!( + root_ns.list_purgable_tables(None).await.unwrap().len(), + 1, + "the dropped legacy table is purgable" + ); + } } diff --git a/rust/lancedb/src/database/namespace.rs b/rust/lancedb/src/database/namespace.rs index d18c78682..140d931f8 100644 --- a/rust/lancedb/src/database/namespace.rs +++ b/rust/lancedb/src/database/namespace.rs @@ -583,9 +583,9 @@ impl Database for LanceNamespaceDatabase { self.namespace .drop_table(drop_request) .await - .map_err(|e| Error::Runtime { - message: format!("Failed to drop table: {}", e), - })?; + // Preserve TableNotFound (e.g. dropping a non-existent table) rather than + // flattening every failure to a generic Runtime error. + .map_err(|e| map_namespace_lance_error(e, name))?; Ok(()) }