fix: avoid failing list_indices for any unknown index (#2413)

Closes #2412 

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Bug Fixes**
- Improved the reliability of listing indices by logging warnings for
errors and skipping problematic entries, ensuring successful results are
returned.
- Internal indices used for optimization are now excluded from the
visible list of indices.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
Jack Ye
2025-05-30 14:43:12 -07:00
committed by GitHub
parent 6582f43422
commit 0528cd858a

View File

@@ -14,7 +14,7 @@ use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::union::UnionExec;
use datafusion_physical_plan::ExecutionPlan;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use futures::{FutureExt, StreamExt, TryFutureExt};
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::cleanup::RemovalStats;
use lance::dataset::optimize::{compact_files, CompactionMetrics, IndexRemapperOptions};
@@ -85,6 +85,7 @@ pub use lance::dataset::optimize::CompactionOptions;
pub use lance::dataset::refs::{TagContents, Tags as LanceTags};
pub use lance::dataset::scanner::DatasetRecordBatchStream;
use lance::dataset::statistics::DatasetStatisticsExt;
use lance_index::frag_reuse::FRAG_REUSE_INDEX_NAME;
pub use lance_index::optimize::OptimizeOptions;
use serde_with::skip_serializing_none;
@@ -2601,28 +2602,56 @@ impl BaseTable for NativeTable {
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
let dataset = self.dataset.get().await?;
let indices = dataset.load_indices().await?;
futures::stream::iter(indices.as_slice()).then(|idx| async {
let stats = dataset.index_statistics(idx.name.as_str()).await?;
let stats: serde_json::Value = serde_json::from_str(&stats).map_err(|e| Error::Runtime {
message: format!("error deserializing index statistics: {}", e),
})?;
let index_type = stats.get("index_type").and_then(|v| v.as_str())
.ok_or_else(|| Error::Runtime {
message: "index statistics was missing index type".to_string(),
})?;
let index_type: crate::index::IndexType = index_type.parse().map_err(|e| Error::Runtime {
message: format!("error parsing index type: {}", e),
})?;
let results = futures::stream::iter(indices.as_slice()).then(|idx| async {
// skip Lance internal indexes
if idx.name == FRAG_REUSE_INDEX_NAME {
return None;
}
let stats = match dataset.index_statistics(idx.name.as_str()).await {
Ok(stats) => stats,
Err(e) => {
log::warn!("Failed to get statistics for index {} ({}): {}", idx.name, idx.uuid, e);
return None;
}
};
let stats: serde_json::Value = match serde_json::from_str(&stats) {
Ok(stats) => stats,
Err(e) => {
log::warn!("Failed to deserialize index statistics for index {} ({}): {}", idx.name, idx.uuid, e);
return None;
}
};
let Some(index_type) = stats.get("index_type").and_then(|v| v.as_str()) else {
log::warn!("Index statistics was missing 'index_type' field for index {} ({})", idx.name, idx.uuid);
return None;
};
let index_type: crate::index::IndexType = match index_type.parse() {
Ok(index_type) => index_type,
Err(e) => {
log::warn!("Failed to parse index type for index {} ({}): {}", idx.name, idx.uuid, e);
return None;
}
};
let mut columns = Vec::with_capacity(idx.fields.len());
for field_id in &idx.fields {
let field = dataset.schema().field_by_id(*field_id).ok_or_else(|| Error::Runtime { message: format!("The index with name {} and uuid {} referenced a field with id {} which does not exist in the schema", idx.name, idx.uuid, field_id) })?;
let Some(field) = dataset.schema().field_by_id(*field_id) else {
log::warn!("The index {} ({}) referenced a field with id {} which does not exist in the schema", idx.name, idx.uuid, field_id);
return None;
};
columns.push(field.name.clone());
}
let name = idx.name.clone();
Ok(IndexConfig { index_type, columns, name })
}).try_collect::<Vec<_>>().await
Some(IndexConfig { index_type, columns, name })
}).collect::<Vec<_>>().await;
Ok(results.into_iter().flatten().collect())
}
fn dataset_uri(&self) -> &str {
@@ -2815,7 +2844,7 @@ mod tests {
use super::*;
use crate::connect;
use crate::connection::ConnectBuilder;
use crate::index::scalar::BTreeIndexBuilder;
use crate::index::scalar::{BTreeIndexBuilder, BitmapIndexBuilder};
use crate::query::{ExecutableQuery, QueryBase};
#[tokio::test]
@@ -4267,4 +4296,65 @@ mod tests {
}
)
}
#[tokio::test]
pub async fn test_list_indices_skip_frag_reuse() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri).execute().await.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("foo", DataType::Int32, true),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(0..100)),
Arc::new(Int32Array::from_iter_values(0..100)),
],
)
.unwrap();
let table = conn
.create_table(
"test_list_indices_skip_frag_reuse",
RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()),
)
.execute()
.await
.unwrap();
table
.add(RecordBatchIterator::new(
vec![Ok(batch.clone())],
batch.schema(),
))
.execute()
.await
.unwrap();
table
.create_index(&["id"], Index::Bitmap(BitmapIndexBuilder {}))
.execute()
.await
.unwrap();
table
.optimize(OptimizeAction::Compact {
options: CompactionOptions {
target_rows_per_fragment: 2_000,
defer_index_remap: true,
..Default::default()
},
remap_options: None,
})
.await
.unwrap();
let result = table.list_indices().await.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].index_type, crate::index::IndexType::Bitmap);
}
}