From 0528cd858adbc88c7408107d211b101328cfae57 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Fri, 30 May 2025 14:43:12 -0700 Subject: [PATCH] fix: avoid failing list_indices for any unknown index (#2413) Closes #2412 ## Summary by CodeRabbit - **Bug Fixes** - Improved the reliability of listing indices by logging warnings for errors and skipping problematic entries, ensuring successful results are returned. - Internal indices used for optimization are now excluded from the visible list of indices. --- rust/lancedb/src/table.rs | 124 ++++++++++++++++++++++++++++++++------ 1 file changed, 107 insertions(+), 17 deletions(-) diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index eff000a9..a534be4e 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -14,7 +14,7 @@ use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::ExecutionPlan; -use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{FutureExt, StreamExt, TryFutureExt}; use lance::dataset::builder::DatasetBuilder; use lance::dataset::cleanup::RemovalStats; use lance::dataset::optimize::{compact_files, CompactionMetrics, IndexRemapperOptions}; @@ -85,6 +85,7 @@ pub use lance::dataset::optimize::CompactionOptions; pub use lance::dataset::refs::{TagContents, Tags as LanceTags}; pub use lance::dataset::scanner::DatasetRecordBatchStream; use lance::dataset::statistics::DatasetStatisticsExt; +use lance_index::frag_reuse::FRAG_REUSE_INDEX_NAME; pub use lance_index::optimize::OptimizeOptions; use serde_with::skip_serializing_none; @@ -2601,28 +2602,56 @@ impl BaseTable for NativeTable { async fn list_indices(&self) -> Result> { let dataset = self.dataset.get().await?; let indices = dataset.load_indices().await?; - futures::stream::iter(indices.as_slice()).then(|idx| async { - let stats = dataset.index_statistics(idx.name.as_str()).await?; - let stats: serde_json::Value = serde_json::from_str(&stats).map_err(|e| Error::Runtime { - message: format!("error deserializing index statistics: {}", e), - })?; - let index_type = stats.get("index_type").and_then(|v| v.as_str()) - .ok_or_else(|| Error::Runtime { - message: "index statistics was missing index type".to_string(), - })?; - let index_type: crate::index::IndexType = index_type.parse().map_err(|e| Error::Runtime { - message: format!("error parsing index type: {}", e), - })?; + let results = futures::stream::iter(indices.as_slice()).then(|idx| async { + + // skip Lance internal indexes + if idx.name == FRAG_REUSE_INDEX_NAME { + return None; + } + + let stats = match dataset.index_statistics(idx.name.as_str()).await { + Ok(stats) => stats, + Err(e) => { + log::warn!("Failed to get statistics for index {} ({}): {}", idx.name, idx.uuid, e); + return None; + } + }; + + let stats: serde_json::Value = match serde_json::from_str(&stats) { + Ok(stats) => stats, + Err(e) => { + log::warn!("Failed to deserialize index statistics for index {} ({}): {}", idx.name, idx.uuid, e); + return None; + } + }; + + let Some(index_type) = stats.get("index_type").and_then(|v| v.as_str()) else { + log::warn!("Index statistics was missing 'index_type' field for index {} ({})", idx.name, idx.uuid); + return None; + }; + + let index_type: crate::index::IndexType = match index_type.parse() { + Ok(index_type) => index_type, + Err(e) => { + log::warn!("Failed to parse index type for index {} ({}): {}", idx.name, idx.uuid, e); + return None; + } + }; let mut columns = Vec::with_capacity(idx.fields.len()); for field_id in &idx.fields { - let field = dataset.schema().field_by_id(*field_id).ok_or_else(|| Error::Runtime { message: format!("The index with name {} and uuid {} referenced a field with id {} which does not exist in the schema", idx.name, idx.uuid, field_id) })?; + let Some(field) = dataset.schema().field_by_id(*field_id) else { + log::warn!("The index {} ({}) referenced a field with id {} which does not exist in the schema", idx.name, idx.uuid, field_id); + return None; + }; columns.push(field.name.clone()); } let name = idx.name.clone(); - Ok(IndexConfig { index_type, columns, name }) - }).try_collect::>().await + Some(IndexConfig { index_type, columns, name }) + }).collect::>().await; + + Ok(results.into_iter().flatten().collect()) } fn dataset_uri(&self) -> &str { @@ -2815,7 +2844,7 @@ mod tests { use super::*; use crate::connect; use crate::connection::ConnectBuilder; - use crate::index::scalar::BTreeIndexBuilder; + use crate::index::scalar::{BTreeIndexBuilder, BitmapIndexBuilder}; use crate::query::{ExecutableQuery, QueryBase}; #[tokio::test] @@ -4267,4 +4296,65 @@ mod tests { } ) } + + #[tokio::test] + pub async fn test_list_indices_skip_frag_reuse() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + + let conn = ConnectBuilder::new(uri).execute().await.unwrap(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("foo", DataType::Int32, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from_iter_values(0..100)), + Arc::new(Int32Array::from_iter_values(0..100)), + ], + ) + .unwrap(); + + let table = conn + .create_table( + "test_list_indices_skip_frag_reuse", + RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()), + ) + .execute() + .await + .unwrap(); + + table + .add(RecordBatchIterator::new( + vec![Ok(batch.clone())], + batch.schema(), + )) + .execute() + .await + .unwrap(); + + table + .create_index(&["id"], Index::Bitmap(BitmapIndexBuilder {})) + .execute() + .await + .unwrap(); + + table + .optimize(OptimizeAction::Compact { + options: CompactionOptions { + target_rows_per_fragment: 2_000, + defer_index_remap: true, + ..Default::default() + }, + remap_options: None, + }) + .await + .unwrap(); + + let result = table.list_indices().await.unwrap(); + assert_eq!(result.len(), 1); + assert_eq!(result[0].index_type, crate::index::IndexType::Bitmap); + } }