From a56a00224f48bdd8101db2cb158f28b360040eae Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Mon, 12 Jan 2026 16:30:51 +0800 Subject: [PATCH] feat: impl vector index scan in storage (#7528) * feat: impl vector index scan in storage Signed-off-by: Dennis Zhuang * feat: fallback to read remote blob when blob not found Signed-off-by: Dennis Zhuang * chore: refactor encoding and decoding and apply suggestions Signed-off-by: Dennis Zhuang * fix: license Signed-off-by: Dennis Zhuang * test: add apply_with_k tests Signed-off-by: Dennis Zhuang * chore: apply suggestions Signed-off-by: Dennis Zhuang * fix: forgot to align nulls when the vector column is not in the batch Signed-off-by: Dennis Zhuang * test: add test for vector column is not in a batch while buiilding Signed-off-by: Dennis Zhuang --------- Signed-off-by: Dennis Zhuang --- src/mito2/src/cache.rs | 30 + src/mito2/src/cache/index.rs | 2 + src/mito2/src/cache/index/vector_index.rs | 137 ++++ src/mito2/src/error.rs | 10 + src/mito2/src/read/scan_region.rs | 84 ++- src/mito2/src/read/scan_util.rs | 24 + src/mito2/src/sst/file.rs | 6 + .../src/sst/index/vector_index/applier.rs | 650 ++++++++++++++++++ .../src/sst/index/vector_index/creator.rs | 247 ++++--- .../src/sst/index/vector_index/format.rs | 324 +++++++++ src/mito2/src/sst/index/vector_index/mod.rs | 2 + src/mito2/src/sst/parquet/reader.rs | 176 +++++ tests-integration/Cargo.toml | 1 + tests-integration/tests/http.rs | 22 +- 14 files changed, 1617 insertions(+), 98 deletions(-) create mode 100644 src/mito2/src/cache/index/vector_index.rs create mode 100644 src/mito2/src/sst/index/vector_index/applier.rs create mode 100644 src/mito2/src/sst/index/vector_index/format.rs diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index f502fb51f5..2c82fb1f33 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -42,6 +42,8 @@ use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelect use crate::cache::cache_size::parquet_meta_size; use crate::cache::file_cache::{FileType, IndexKey}; use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef}; +#[cfg(feature = "vector_index")] +use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef}; use crate::cache::write_cache::WriteCacheRef; use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS}; use crate::read::Batch; @@ -247,6 +249,16 @@ impl CacheStrategy { } } + /// Calls [CacheManager::vector_index_cache()]. + /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. + #[cfg(feature = "vector_index")] + pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> { + match self { + CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(), + CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, + } + } + /// Calls [CacheManager::puffin_metadata_cache()]. /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> { @@ -303,6 +315,9 @@ pub struct CacheManager { inverted_index_cache: Option, /// Cache for bloom filter index. bloom_filter_index_cache: Option, + /// Cache for vector index. + #[cfg(feature = "vector_index")] + vector_index_cache: Option, /// Puffin metadata cache. puffin_metadata_cache: Option, /// Cache for time series selectors. @@ -434,6 +449,11 @@ impl CacheManager { cache.invalidate_file(file_id.file_id()); } + #[cfg(feature = "vector_index")] + if let Some(cache) = &self.vector_index_cache { + cache.invalidate_file(file_id.file_id()); + } + if let Some(cache) = &self.puffin_metadata_cache { cache.remove(&file_id.to_string()); } @@ -486,6 +506,11 @@ impl CacheManager { self.bloom_filter_index_cache.as_ref() } + #[cfg(feature = "vector_index")] + pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> { + self.vector_index_cache.as_ref() + } + pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> { self.puffin_metadata_cache.as_ref() } @@ -646,6 +671,9 @@ impl CacheManagerBuilder { self.index_content_size, self.index_content_page_size, ); + #[cfg(feature = "vector_index")] + let vector_index_cache = (self.index_content_size != 0) + .then(|| Arc::new(VectorIndexCache::new(self.index_content_size))); let index_result_cache = (self.index_result_cache_size != 0) .then(|| IndexResultCache::new(self.index_result_cache_size)); let puffin_metadata_cache = @@ -672,6 +700,8 @@ impl CacheManagerBuilder { write_cache: self.write_cache, inverted_index_cache: Some(Arc::new(inverted_index_cache)), bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)), + #[cfg(feature = "vector_index")] + vector_index_cache, puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)), selector_result_cache, index_result_cache, diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index 7393773a89..74f16fdb71 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -15,6 +15,8 @@ pub mod bloom_filter_index; pub mod inverted_index; pub mod result_cache; +#[cfg(feature = "vector_index")] +pub mod vector_index; use std::future::Future; use std::hash::Hash; diff --git a/src/mito2/src/cache/index/vector_index.rs b/src/mito2/src/cache/index/vector_index.rs new file mode 100644 index 0000000000..d0a6c3184f --- /dev/null +++ b/src/mito2/src/cache/index/vector_index.rs @@ -0,0 +1,137 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use moka::notification::RemovalCause; +use moka::sync::Cache; +use roaring::RoaringBitmap; +use store_api::storage::{ColumnId, FileId, IndexVersion, VectorDistanceMetric, VectorIndexEngine}; + +use crate::metrics::{CACHE_BYTES, CACHE_EVICTION}; + +const VECTOR_INDEX_CACHE_TYPE: &str = "vector_index"; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct VectorIndexCacheKey { + file_id: FileId, + index_version: IndexVersion, + column_id: ColumnId, +} + +impl VectorIndexCacheKey { + pub fn new(file_id: FileId, index_version: IndexVersion, column_id: ColumnId) -> Self { + Self { + file_id, + index_version, + column_id, + } + } +} + +pub struct CachedVectorIndex { + pub engine: Box, + pub null_bitmap: RoaringBitmap, + pub size_bytes: usize, + pub dimensions: u32, + pub metric: VectorDistanceMetric, + pub total_rows: u64, + pub indexed_rows: u64, +} + +impl CachedVectorIndex { + pub fn new( + engine: Box, + null_bitmap: RoaringBitmap, + dimensions: u32, + metric: VectorDistanceMetric, + total_rows: u64, + indexed_rows: u64, + ) -> Self { + let size_bytes = + engine.memory_usage() + null_bitmap.serialized_size() + std::mem::size_of::(); + Self { + engine, + null_bitmap, + size_bytes, + dimensions, + metric, + total_rows, + indexed_rows, + } + } +} + +impl std::fmt::Debug for CachedVectorIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CachedVectorIndex") + .field("size_bytes", &self.size_bytes) + .field("dimensions", &self.dimensions) + .field("metric", &self.metric) + .field("total_rows", &self.total_rows) + .field("indexed_rows", &self.indexed_rows) + .field("null_bitmap_len", &self.null_bitmap.len()) + .finish() + } +} + +pub struct VectorIndexCache { + inner: Cache>, +} + +pub type VectorIndexCacheRef = Arc; + +impl VectorIndexCache { + pub fn new(capacity: u64) -> Self { + fn to_str(cause: RemovalCause) -> &'static str { + match cause { + RemovalCause::Expired => "expired", + RemovalCause::Explicit => "explicit", + RemovalCause::Replaced => "replaced", + RemovalCause::Size => "size", + } + } + + let inner = Cache::builder() + .max_capacity(capacity) + .weigher(|_k, v: &Arc| v.size_bytes as u32) + .eviction_listener(|_k, v, cause| { + CACHE_BYTES + .with_label_values(&[VECTOR_INDEX_CACHE_TYPE]) + .sub(v.size_bytes as i64); + CACHE_EVICTION + .with_label_values(&[VECTOR_INDEX_CACHE_TYPE, to_str(cause)]) + .inc(); + }) + .build(); + Self { inner } + } + + pub fn get(&self, key: &VectorIndexCacheKey) -> Option> { + self.inner.get(key) + } + + pub fn insert(&self, key: VectorIndexCacheKey, value: Arc) { + CACHE_BYTES + .with_label_values(&[VECTOR_INDEX_CACHE_TYPE]) + .add(value.size_bytes as i64); + self.inner.insert(key, value); + } + + pub fn invalidate_file(&self, file_id: FileId) { + let _ = self + .inner + .invalidate_entries_if(move |k, _| k.file_id == file_id); + } +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 85edddde20..3756ff6805 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -652,6 +652,14 @@ pub enum Error { location: Location, }, + #[cfg(feature = "vector_index")] + #[snafu(display("Failed to apply vector index: {}", reason))] + ApplyVectorIndex { + reason: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to push index value"))] PushIndexValue { source: index::inverted_index::error::Error, @@ -1324,6 +1332,8 @@ impl ErrorExt for Error { | PushIndexValue { source, .. } | ApplyInvertedIndex { source, .. } | IndexFinish { source, .. } => source.status_code(), + #[cfg(feature = "vector_index")] + ApplyVectorIndex { .. } => StatusCode::Internal, PuffinReadBlob { source, .. } | PuffinAddBlob { source, .. } | PuffinInitStager { source, .. } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index f6d39ee901..d36a57c8b6 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -70,11 +70,15 @@ use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder; use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; +#[cfg(feature = "vector_index")] +use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef}; use crate::sst::parquet::file_range::PreFilterMode; use crate::sst::parquet::reader::ReaderMetrics; /// Parallel scan channel size for flat format. const FLAT_SCAN_CHANNEL_SIZE: usize = 2; +#[cfg(feature = "vector_index")] +const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2; /// A scanner scans a region and returns a [SendableRecordBatchStream]. pub(crate) enum Scanner { @@ -498,6 +502,16 @@ impl ScanRegion { self.build_fulltext_index_applier(&non_field_filters), self.build_fulltext_index_applier(&field_filters), ]; + #[cfg(feature = "vector_index")] + let vector_index_applier = self.build_vector_index_applier(); + #[cfg(feature = "vector_index")] + let vector_index_k = self.request.vector_search.as_ref().map(|search| { + if self.request.filters.is_empty() { + search.k + } else { + search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER) + } + }); let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?; if flat_format { @@ -523,6 +537,10 @@ impl ScanRegion { .with_series_row_selector(self.request.series_row_selector) .with_distribution(self.request.distribution) .with_flat_format(flat_format); + #[cfg(feature = "vector_index")] + let input = input + .with_vector_index_applier(vector_index_applier) + .with_vector_index_k(vector_index_k); #[cfg(feature = "enterprise")] let input = if let Some(provider) = self.extension_range_provider { @@ -667,6 +685,31 @@ impl ScanRegion { .flatten() .map(Arc::new) } + + /// Build the vector index applier from vector search request. + #[cfg(feature = "vector_index")] + fn build_vector_index_applier(&self) -> Option { + let vector_search = self.request.vector_search.as_ref()?; + + let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache()); + let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned(); + let vector_index_cache = self.cache_strategy.vector_index_cache().cloned(); + + let applier = VectorIndexApplier::new( + self.access_layer.table_dir().to_string(), + self.access_layer.path_type(), + self.access_layer.object_store().clone(), + self.access_layer.puffin_manager_factory().clone(), + vector_search.column_id, + vector_search.query_vector.clone(), + vector_search.metric, + ) + .with_file_cache(file_cache) + .with_puffin_metadata_cache(puffin_metadata_cache) + .with_vector_index_cache(vector_index_cache); + + Some(Arc::new(applier)) + } } /// Returns true if the time range of a SST `file` matches the `predicate`. @@ -708,6 +751,12 @@ pub struct ScanInput { inverted_index_appliers: [Option; 2], bloom_filter_index_appliers: [Option; 2], fulltext_index_appliers: [Option; 2], + /// Vector index applier for KNN search. + #[cfg(feature = "vector_index")] + pub(crate) vector_index_applier: Option, + /// Over-fetched k for vector index scan. + #[cfg(feature = "vector_index")] + pub(crate) vector_index_k: Option, /// Start time of the query. pub(crate) query_start: Option, /// The region is using append mode. @@ -747,6 +796,10 @@ impl ScanInput { inverted_index_appliers: [None, None], bloom_filter_index_appliers: [None, None], fulltext_index_appliers: [None, None], + #[cfg(feature = "vector_index")] + vector_index_applier: None, + #[cfg(feature = "vector_index")] + vector_index_k: None, query_start: None, append_mode: false, filter_deleted: true, @@ -853,6 +906,25 @@ impl ScanInput { self } + /// Sets vector index applier for KNN search. + #[cfg(feature = "vector_index")] + #[must_use] + pub(crate) fn with_vector_index_applier( + mut self, + applier: Option, + ) -> Self { + self.vector_index_applier = applier; + self + } + + /// Sets over-fetched k for vector index scan. + #[cfg(feature = "vector_index")] + #[must_use] + pub(crate) fn with_vector_index_k(mut self, k: Option) -> Self { + self.vector_index_k = k; + self + } + /// Sets start time of the query. #[must_use] pub(crate) fn with_start_time(mut self, now: Option) -> Self { @@ -988,7 +1060,7 @@ impl ScanInput { let predicate = self.predicate_for_file(file); let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode); let decode_pk_values = !self.compaction && self.mapper.has_tags(); - let res = self + let reader = self .access_layer .read_sst(file.clone()) .predicate(predicate) @@ -996,7 +1068,15 @@ impl ScanInput { .cache(self.cache_strategy.clone()) .inverted_index_appliers(self.inverted_index_appliers.clone()) .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone()) - .fulltext_index_appliers(self.fulltext_index_appliers.clone()) + .fulltext_index_appliers(self.fulltext_index_appliers.clone()); + #[cfg(feature = "vector_index")] + let reader = { + let mut reader = reader; + reader = + reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k); + reader + }; + let res = reader .expected_metadata(Some(self.mapper.metadata().clone())) .flat_format(self.flat_format) .compaction(self.compaction) diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 0605c37096..721c5562df 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -151,6 +151,8 @@ pub(crate) struct ScanMetricsSet { rg_minmax_filtered: usize, /// Number of row groups filtered by bloom filter index. rg_bloom_filtered: usize, + /// Number of row groups filtered by vector index. + rg_vector_filtered: usize, /// Number of rows in row group before filtering. rows_before_filter: usize, /// Number of rows in row group filtered by fulltext index. @@ -159,6 +161,8 @@ pub(crate) struct ScanMetricsSet { rows_inverted_filtered: usize, /// Number of rows in row group filtered by bloom filter index. rows_bloom_filtered: usize, + /// Number of rows filtered by vector index. + rows_vector_filtered: usize, /// Number of rows filtered by precise filter. rows_precise_filtered: usize, /// Number of record batches read from SST. @@ -255,10 +259,12 @@ impl fmt::Debug for ScanMetricsSet { rg_inverted_filtered, rg_minmax_filtered, rg_bloom_filtered, + rg_vector_filtered, rows_before_filter, rows_fulltext_filtered, rows_inverted_filtered, rows_bloom_filtered, + rows_vector_filtered, rows_precise_filtered, num_sst_record_batches, num_sst_batches, @@ -320,6 +326,9 @@ impl fmt::Debug for ScanMetricsSet { if *rg_bloom_filtered > 0 { write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?; } + if *rg_vector_filtered > 0 { + write!(f, ", \"rg_vector_filtered\":{rg_vector_filtered}")?; + } if *rows_fulltext_filtered > 0 { write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?; } @@ -329,6 +338,9 @@ impl fmt::Debug for ScanMetricsSet { if *rows_bloom_filtered > 0 { write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?; } + if *rows_vector_filtered > 0 { + write!(f, ", \"rows_vector_filtered\":{rows_vector_filtered}")?; + } if *rows_precise_filtered > 0 { write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?; } @@ -500,10 +512,12 @@ impl ScanMetricsSet { rg_inverted_filtered, rg_minmax_filtered, rg_bloom_filtered, + rg_vector_filtered, rows_total, rows_fulltext_filtered, rows_inverted_filtered, rows_bloom_filtered, + rows_vector_filtered, rows_precise_filtered, inverted_index_apply_metrics, bloom_filter_apply_metrics, @@ -525,11 +539,13 @@ impl ScanMetricsSet { self.rg_inverted_filtered += *rg_inverted_filtered; self.rg_minmax_filtered += *rg_minmax_filtered; self.rg_bloom_filtered += *rg_bloom_filtered; + self.rg_vector_filtered += *rg_vector_filtered; self.rows_before_filter += *rows_total; self.rows_fulltext_filtered += *rows_fulltext_filtered; self.rows_inverted_filtered += *rows_inverted_filtered; self.rows_bloom_filtered += *rows_bloom_filtered; + self.rows_vector_filtered += *rows_vector_filtered; self.rows_precise_filtered += *rows_precise_filtered; self.num_sst_record_batches += *num_record_batches; @@ -631,6 +647,10 @@ impl ScanMetricsSet { READ_ROW_GROUPS_TOTAL .with_label_values(&["bloom_filter_index_filtered"]) .inc_by(self.rg_bloom_filtered as u64); + #[cfg(feature = "vector_index")] + READ_ROW_GROUPS_TOTAL + .with_label_values(&["vector_index_filtered"]) + .inc_by(self.rg_vector_filtered as u64); PRECISE_FILTER_ROWS_TOTAL .with_label_values(&["parquet"]) @@ -647,6 +667,10 @@ impl ScanMetricsSet { READ_ROWS_IN_ROW_GROUP_TOTAL .with_label_values(&["bloom_filter_index_filtered"]) .inc_by(self.rows_bloom_filtered as u64); + #[cfg(feature = "vector_index")] + READ_ROWS_IN_ROW_GROUP_TOTAL + .with_label_values(&["vector_index_filtered"]) + .inc_by(self.rows_vector_filtered as u64); } } diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 7a64323649..814c38f8eb 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -344,6 +344,12 @@ impl FileMeta { .contains(&IndexType::BloomFilterIndex) } + /// Returns true if the file has a vector index. + #[cfg(feature = "vector_index")] + pub fn vector_index_available(&self) -> bool { + self.available_indexes.contains(&IndexType::VectorIndex) + } + pub fn index_file_size(&self) -> u64 { self.index_file_size } diff --git a/src/mito2/src/sst/index/vector_index/applier.rs b/src/mito2/src/sst/index/vector_index/applier.rs new file mode 100644 index 0000000000..6dd6139336 --- /dev/null +++ b/src/mito2/src/sst/index/vector_index/applier.rs @@ -0,0 +1,650 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Vector index applier for KNN search. + +use std::sync::Arc; + +use common_base::range_read::RangeReader; +use common_telemetry::warn; +use index::vector::distance_metric_to_usearch; +use puffin::puffin_manager::cache::PuffinMetadataCacheRef; +use puffin::puffin_manager::{PuffinManager, PuffinReader}; +use roaring::RoaringBitmap; +use snafu::ResultExt; +use store_api::storage::{ColumnId, VectorDistanceMetric}; + +use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider}; +use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; +use crate::cache::index::vector_index::{ + CachedVectorIndex, VectorIndexCacheKey, VectorIndexCacheRef, +}; +use crate::error::{ApplyVectorIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result}; +use crate::sst::file::RegionIndexId; +use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; +use crate::sst::index::trigger_index_background_download; +use crate::sst::index::vector_index::creator::VectorIndexConfig; +use crate::sst::index::vector_index::format::VectorIndexBlobHeader; +use crate::sst::index::vector_index::{INDEX_BLOB_TYPE, engine}; + +/// Result of applying vector index. +#[derive(Debug)] +pub struct VectorIndexApplyOutput { + /// Row offsets in the SST file. + pub row_offsets: Vec, +} + +/// Vector index applier for KNN search against SST blobs. +pub struct VectorIndexApplier { + table_dir: String, + path_type: store_api::region_request::PathType, + object_store: object_store::ObjectStore, + puffin_manager_factory: PuffinManagerFactory, + file_cache: Option, + puffin_metadata_cache: Option, + vector_index_cache: Option, + column_id: ColumnId, + query_vector: Vec, + metric: VectorDistanceMetric, +} + +pub type VectorIndexApplierRef = Arc; + +impl VectorIndexApplier { + pub fn new( + table_dir: String, + path_type: store_api::region_request::PathType, + object_store: object_store::ObjectStore, + puffin_manager_factory: PuffinManagerFactory, + column_id: ColumnId, + query_vector: Vec, + metric: VectorDistanceMetric, + ) -> Self { + Self { + table_dir, + path_type, + object_store, + puffin_manager_factory, + file_cache: None, + puffin_metadata_cache: None, + vector_index_cache: None, + column_id, + query_vector, + metric, + } + } + + pub fn with_file_cache(mut self, file_cache: Option) -> Self { + self.file_cache = file_cache; + self + } + + pub fn with_puffin_metadata_cache( + mut self, + puffin_metadata_cache: Option, + ) -> Self { + self.puffin_metadata_cache = puffin_metadata_cache; + self + } + + pub fn with_vector_index_cache(mut self, cache: Option) -> Self { + self.vector_index_cache = cache; + self + } + + /// Applies vector index to the file and returns candidates. + /// + /// This method loads the vector index blob (from cache or remote), runs + /// a KNN search against the indexed vectors, and maps the HNSW keys back + /// to row offsets in the SST file. It returns only row offsets; callers + /// are responsible for any higher-level ordering or limit enforcement. + pub async fn apply_with_k( + &self, + file_id: RegionIndexId, + file_size_hint: Option, + k: usize, + ) -> Result { + if k == 0 { + return Ok(VectorIndexApplyOutput { + row_offsets: Vec::new(), + }); + } + + let index = self.load_or_read_index(file_id, file_size_hint).await?; + let Some(index) = index else { + return Ok(VectorIndexApplyOutput { + row_offsets: Vec::new(), + }); + }; + + if self.query_vector.len() != index.dimensions as usize { + return ApplyVectorIndexSnafu { + reason: format!( + "Query vector dimension {} does not match index dimension {}", + self.query_vector.len(), + index.dimensions + ), + } + .fail(); + } + if self.metric != index.metric { + return ApplyVectorIndexSnafu { + reason: format!( + "Query metric {} does not match index metric {}", + self.metric, index.metric + ), + } + .fail(); + } + if index.indexed_rows == 0 { + return Ok(VectorIndexApplyOutput { + row_offsets: Vec::new(), + }); + } + + let matches = index + .engine + .search(&self.query_vector, k.min(index.indexed_rows as usize)) + .map_err(|e| { + ApplyVectorIndexSnafu { + reason: e.to_string(), + } + .build() + })?; + + let row_offsets = map_hnsw_keys_to_row_offsets( + &index.null_bitmap, + index.total_rows, + index.indexed_rows, + matches.keys, + )?; + + Ok(VectorIndexApplyOutput { row_offsets }) + } + + async fn load_or_read_index( + &self, + file_id: RegionIndexId, + file_size_hint: Option, + ) -> Result>> { + let cache_key = + VectorIndexCacheKey::new(file_id.file_id(), file_id.version, self.column_id); + if let Some(cache) = &self.vector_index_cache + && let Some(cached) = cache.get(&cache_key) + { + return Ok(Some(cached)); + } + + let reader = match self.cached_blob_reader(file_id, file_size_hint).await { + Ok(Some(reader)) => reader, + Ok(None) => self.remote_blob_reader(file_id, file_size_hint).await?, + Err(err) => { + if is_blob_not_found(&err) { + self.remote_blob_reader(file_id, file_size_hint).await? + } else { + warn!(err; "Failed to read cached vector index blob, fallback to remote"); + self.remote_blob_reader(file_id, file_size_hint).await? + } + } + }; + + let blob_data = read_all_blob(reader, file_size_hint).await?; + if blob_data.is_empty() { + return Ok(None); + } + + let cached = Arc::new(parse_vector_index_blob(&blob_data)?); + if let Some(cache) = &self.vector_index_cache { + cache.insert(cache_key, cached.clone()); + } + + Ok(Some(cached)) + } + + async fn cached_blob_reader( + &self, + file_id: RegionIndexId, + file_size_hint: Option, + ) -> Result> { + let Some(file_cache) = &self.file_cache else { + return Ok(None); + }; + + let index_key = IndexKey::new( + file_id.region_id(), + file_id.file_id(), + FileType::Puffin(file_id.version), + ); + if file_cache.get(index_key).await.is_none() { + return Ok(None); + } + + let puffin_manager = self.puffin_manager_factory.build( + file_cache.local_store(), + WriteCachePathProvider::new(file_cache.clone()), + ); + let blob_name = column_blob_name(self.column_id); + + let reader = puffin_manager + .reader(&file_id) + .await + .context(PuffinBuildReaderSnafu)? + .with_file_size_hint(file_size_hint) + .blob(&blob_name) + .await + .context(PuffinReadBlobSnafu)? + .reader() + .await + .context(PuffinBuildReaderSnafu)?; + Ok(Some(reader)) + } + + async fn remote_blob_reader( + &self, + file_id: RegionIndexId, + file_size_hint: Option, + ) -> Result { + let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type); + + trigger_index_background_download( + self.file_cache.as_ref(), + &file_id, + file_size_hint, + &path_factory, + &self.object_store, + ); + + let puffin_manager = self + .puffin_manager_factory + .build(self.object_store.clone(), path_factory) + .with_puffin_metadata_cache(self.puffin_metadata_cache.clone()); + + let blob_name = column_blob_name(self.column_id); + + puffin_manager + .reader(&file_id) + .await + .context(PuffinBuildReaderSnafu)? + .with_file_size_hint(file_size_hint) + .blob(&blob_name) + .await + .context(PuffinReadBlobSnafu)? + .reader() + .await + .context(PuffinBuildReaderSnafu) + } +} + +fn column_blob_name(column_id: ColumnId) -> String { + format!("{INDEX_BLOB_TYPE}-{}", column_id) +} + +fn is_blob_not_found(err: &crate::error::Error) -> bool { + matches!( + err, + crate::error::Error::PuffinReadBlob { + source: puffin::error::Error::BlobNotFound { .. }, + .. + } + ) +} + +async fn read_all_blob(reader: BlobReader, file_size_hint: Option) -> Result> { + let metadata = reader.metadata().await.map_err(|e| { + ApplyVectorIndexSnafu { + reason: format!("Failed to read vector index metadata: {}", e), + } + .build() + })?; + if let Some(limit) = file_size_hint + && metadata.content_length > limit + { + return ApplyVectorIndexSnafu { + reason: format!( + "Vector index blob size {} exceeds file size hint {}", + metadata.content_length, limit + ), + } + .fail(); + } + let bytes = reader.read(0..metadata.content_length).await.map_err(|e| { + ApplyVectorIndexSnafu { + reason: format!("Failed to read vector index data: {}", e), + } + .build() + })?; + Ok(bytes.to_vec()) +} + +fn parse_vector_index_blob(data: &[u8]) -> Result { + let (header, mut offset) = VectorIndexBlobHeader::decode(data).map_err(|e| { + ApplyVectorIndexSnafu { + reason: e.to_string(), + } + .build() + })?; + let null_bitmap_len = header.null_bitmap_len as usize; + + if data.len() < offset + null_bitmap_len { + return ApplyVectorIndexSnafu { + reason: "Vector index blob truncated while reading null bitmap".to_string(), + } + .fail(); + } + + let null_bitmap_bytes = &data[offset..offset + null_bitmap_len]; + offset += null_bitmap_len; + let null_bitmap = RoaringBitmap::deserialize_from(null_bitmap_bytes).map_err(|e| { + ApplyVectorIndexSnafu { + reason: format!("Failed to deserialize null bitmap: {}", e), + } + .build() + })?; + + let index_bytes = &data[offset..]; + let config = VectorIndexConfig { + engine: header.engine_type, + dim: header.dim as usize, + metric: distance_metric_to_usearch(header.metric), + distance_metric: header.metric, + connectivity: header.connectivity as usize, + expansion_add: header.expansion_add as usize, + expansion_search: header.expansion_search as usize, + }; + let engine = engine::load_engine(header.engine_type, &config, index_bytes).map_err(|e| { + ApplyVectorIndexSnafu { + reason: e.to_string(), + } + .build() + })?; + + Ok(CachedVectorIndex::new( + engine, + null_bitmap, + header.dim, + header.metric, + header.total_rows, + header.indexed_rows, + )) +} + +fn map_hnsw_keys_to_row_offsets( + null_bitmap: &RoaringBitmap, + total_rows: u64, + indexed_rows: u64, + keys: Vec, +) -> Result> { + if total_rows == 0 { + return Ok(Vec::new()); + } + let total_rows_u32 = u32::try_from(total_rows).map_err(|_| { + ApplyVectorIndexSnafu { + reason: format!("Total rows {} exceeds u32::MAX", total_rows), + } + .build() + })?; + + let mut row_offsets = Vec::with_capacity(keys.len()); + for key in keys { + let offset = hnsw_key_to_row_offset(null_bitmap, total_rows_u32, indexed_rows, key)?; + row_offsets.push(offset as u64); + } + Ok(row_offsets) +} + +fn hnsw_key_to_row_offset( + null_bitmap: &RoaringBitmap, + total_rows: u32, + indexed_rows: u64, + key: u64, +) -> Result { + if total_rows == 0 { + return ApplyVectorIndexSnafu { + reason: "Total rows is zero".to_string(), + } + .fail(); + } + if key >= indexed_rows { + return ApplyVectorIndexSnafu { + reason: format!("HNSW key {} exceeds indexed rows {}", key, indexed_rows), + } + .fail(); + } + + if null_bitmap.is_empty() { + return Ok(key as u32); + } + + let mut left: u32 = 0; + let mut right: u32 = total_rows - 1; + while left <= right { + let mid = left + (right - left) / 2; + let nulls_before = null_bitmap.rank(mid); + let non_nulls = (mid as u64 + 1).saturating_sub(nulls_before); + if non_nulls > key { + if mid == 0 { + break; + } + right = mid - 1; + } else { + left = mid + 1; + } + } + + if left >= total_rows { + return ApplyVectorIndexSnafu { + reason: "Failed to map HNSW key to row offset".to_string(), + } + .fail(); + } + + Ok(left) +} + +#[cfg(test)] +mod tests { + use common_test_util::temp_dir::TempDir; + use futures::io::Cursor; + use object_store::ObjectStore; + use object_store::services::Memory; + use puffin::puffin_manager::PuffinWriter; + use store_api::region_request::PathType; + use store_api::storage::{ColumnId, FileId, VectorDistanceMetric, VectorIndexEngineType}; + + use super::*; + use crate::access_layer::RegionFilePathFactory; + use crate::sst::file::RegionFileId; + use crate::sst::index::puffin_manager::PuffinManagerFactory; + use crate::sst::index::vector_index::creator::VectorIndexConfig; + + async fn build_applier_with_blob( + blob: Vec, + column_id: ColumnId, + query_vector: Vec, + metric: VectorDistanceMetric, + ) -> (TempDir, VectorIndexApplier, RegionIndexId, u64) { + let (dir, puffin_manager_factory) = + PuffinManagerFactory::new_for_test_async("test_vector_index_applier_").await; + let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let file_id = RegionFileId::new(0.into(), FileId::random()); + let index_id = RegionIndexId::new(file_id, 0); + let table_dir = "table_dir".to_string(); + + let puffin_manager = puffin_manager_factory.build( + object_store.clone(), + RegionFilePathFactory::new(table_dir.clone(), PathType::Bare), + ); + let mut writer = puffin_manager.writer(&index_id).await.unwrap(); + let blob_name = column_blob_name(column_id); + let _bytes_written = writer + .put_blob( + blob_name.as_str(), + Cursor::new(blob), + Default::default(), + Default::default(), + ) + .await + .unwrap(); + let file_size = writer.finish().await.unwrap(); + + let applier = VectorIndexApplier::new( + table_dir, + PathType::Bare, + object_store, + puffin_manager_factory, + column_id, + query_vector, + metric, + ); + + (dir, applier, index_id, file_size) + } + + fn build_blob_with_vectors( + config: &VectorIndexConfig, + vectors: Vec<(u64, Vec)>, + null_bitmap: &RoaringBitmap, + total_rows: u64, + indexed_rows: u64, + ) -> Vec { + let mut engine = engine::create_engine(config.engine, config).unwrap(); + for (key, vector) in vectors { + engine.add(key, &vector).unwrap(); + } + let index_size = engine.serialized_length(); + let mut index_bytes = vec![0u8; index_size]; + engine.save_to_buffer(&mut index_bytes).unwrap(); + + let mut null_bitmap_bytes = Vec::new(); + null_bitmap.serialize_into(&mut null_bitmap_bytes).unwrap(); + + let header = VectorIndexBlobHeader::new( + config.engine, + config.dim as u32, + config.distance_metric, + config.connectivity as u16, + config.expansion_add as u16, + config.expansion_search as u16, + total_rows, + indexed_rows, + null_bitmap_bytes.len() as u32, + ) + .unwrap(); + let mut blob = Vec::new(); + header.encode_into(&mut blob); + blob.extend_from_slice(&null_bitmap_bytes); + blob.extend_from_slice(&index_bytes); + blob + } + + #[test] + fn test_hnsw_key_to_row_offset_with_nulls() { + let mut bitmap = RoaringBitmap::new(); + bitmap.insert(1); + bitmap.insert(3); + + assert_eq!(hnsw_key_to_row_offset(&bitmap, 6, 4, 0).unwrap(), 0); + assert_eq!(hnsw_key_to_row_offset(&bitmap, 6, 4, 1).unwrap(), 2); + assert_eq!(hnsw_key_to_row_offset(&bitmap, 6, 4, 2).unwrap(), 4); + } + + #[test] + fn test_hnsw_key_to_row_offset_without_nulls() { + let bitmap = RoaringBitmap::new(); + assert_eq!(hnsw_key_to_row_offset(&bitmap, 4, 4, 3).unwrap(), 3); + } + + #[test] + fn test_hnsw_key_to_row_offset_out_of_range() { + let bitmap = RoaringBitmap::new(); + assert!(hnsw_key_to_row_offset(&bitmap, 4, 4, 4).is_err()); + } + + #[test] + fn test_map_hnsw_keys_to_row_offsets_multiple_keys() { + let bitmap = RoaringBitmap::new(); + let offsets = map_hnsw_keys_to_row_offsets(&bitmap, 4, 4, vec![0, 2, 3]).unwrap(); + assert_eq!(offsets, vec![0, 2, 3]); + } + + #[tokio::test] + async fn test_apply_with_k_returns_offsets() { + let config = VectorIndexConfig { + engine: VectorIndexEngineType::Usearch, + dim: 2, + metric: distance_metric_to_usearch(VectorDistanceMetric::L2sq), + distance_metric: VectorDistanceMetric::L2sq, + connectivity: 16, + expansion_add: 128, + expansion_search: 64, + }; + let mut null_bitmap = RoaringBitmap::new(); + null_bitmap.insert(1); + let blob = build_blob_with_vectors( + &config, + vec![(0, vec![1.0, 0.0]), (1, vec![0.0, 1.0])], + &null_bitmap, + 3, + 2, + ); + let (_dir, applier, index_id, size_bytes) = + build_applier_with_blob(blob, 1, vec![1.0, 0.0], VectorDistanceMetric::L2sq).await; + let output = applier + .apply_with_k(index_id, Some(size_bytes), 2) + .await + .unwrap(); + assert_eq!(output.row_offsets, vec![0, 2]); + } + + #[tokio::test] + async fn test_apply_with_k_dimension_mismatch() { + let config = VectorIndexConfig { + engine: VectorIndexEngineType::Usearch, + dim: 2, + metric: distance_metric_to_usearch(VectorDistanceMetric::L2sq), + distance_metric: VectorDistanceMetric::L2sq, + connectivity: 16, + expansion_add: 128, + expansion_search: 64, + }; + let null_bitmap = RoaringBitmap::new(); + let blob = build_blob_with_vectors(&config, vec![(0, vec![1.0, 0.0])], &null_bitmap, 1, 1); + let (_dir, applier, index_id, size_bytes) = + build_applier_with_blob(blob, 1, vec![1.0, 0.0, 0.0], VectorDistanceMetric::L2sq).await; + let res = applier.apply_with_k(index_id, Some(size_bytes), 1).await; + assert!(res.is_err()); + } + + #[tokio::test] + async fn test_apply_with_k_empty_blob() { + let config = VectorIndexConfig { + engine: VectorIndexEngineType::Usearch, + dim: 1, + metric: distance_metric_to_usearch(VectorDistanceMetric::L2sq), + distance_metric: VectorDistanceMetric::L2sq, + connectivity: 16, + expansion_add: 128, + expansion_search: 64, + }; + let null_bitmap = RoaringBitmap::new(); + let blob = build_blob_with_vectors(&config, Vec::new(), &null_bitmap, 0, 0); + let (_dir, applier, index_id, size_bytes) = + build_applier_with_blob(blob, 1, vec![1.0], VectorDistanceMetric::L2sq).await; + let output = applier + .apply_with_k(index_id, Some(size_bytes), 1) + .await + .unwrap(); + assert!(output.row_offsets.is_empty()); + } +} diff --git a/src/mito2/src/sst/index/vector_index/creator.rs b/src/mito2/src/sst/index/vector_index/creator.rs index bd50283b64..fef54e62df 100644 --- a/src/mito2/src/sst/index/vector_index/creator.rs +++ b/src/mito2/src/sst/index/vector_index/creator.rs @@ -44,6 +44,9 @@ use crate::sst::index::intermediate::{ }; use crate::sst::index::puffin_manager::SstPuffinWriter; use crate::sst::index::statistics::{ByteCount, RowCount, Statistics}; +use crate::sst::index::vector_index::format::{ + VECTOR_INDEX_BLOB_HEADER_SIZE, VectorIndexBlobHeader, +}; use crate::sst::index::vector_index::util::bytes_to_f32_slice; use crate::sst::index::vector_index::{INDEX_BLOB_TYPE, engine}; @@ -323,6 +326,7 @@ impl VectorIndexer { for (col_id, creator) in &mut self.creators { let Some(values) = batch.field_col_value(*col_id) else { + creator.add_nulls(n); continue; }; @@ -561,36 +565,12 @@ impl VectorIndexer { creator.save_to_buffer(&mut index_bytes)?; // Header size: version(1) + engine(1) + dim(4) + metric(1) + - // connectivity(2) + expansion_add(2) + expansion_search(2) + - // total_rows(8) + indexed_rows(8) + bitmap_len(4) = 33 bytes - /// Size of the vector index blob header in bytes. - /// Header format: version(1) + engine(1) + dim(4) + metric(1) + - /// connectivity(2) + expansion_add(2) + expansion_search(2) + - /// total_rows(8) + indexed_rows(8) + bitmap_len(4) = 33 bytes - const VECTOR_INDEX_BLOB_HEADER_SIZE: usize = 33; + // connectivity(2) + expansion_add(2) + expansion_search(2) + + // total_rows(8) + indexed_rows(8) + bitmap_len(4) = 33 bytes. let total_size = VECTOR_INDEX_BLOB_HEADER_SIZE + null_bitmap_bytes.len() + index_bytes.len(); let mut blob_data = Vec::with_capacity(total_size); - // Write version (1 byte) - blob_data.push(1u8); - // Write engine type (1 byte) - blob_data.push(creator.engine_type().as_u8()); - // Write dimension (4 bytes, little-endian) - blob_data.extend_from_slice(&(creator.config.dim as u32).to_le_bytes()); - // Write metric (1 byte) - blob_data.push(creator.metric().as_u8()); - // Write connectivity/M (2 bytes, little-endian) - blob_data.extend_from_slice(&(creator.config.connectivity as u16).to_le_bytes()); - // Write expansion_add/ef_construction (2 bytes, little-endian) - blob_data.extend_from_slice(&(creator.config.expansion_add as u16).to_le_bytes()); - // Write expansion_search/ef_search (2 bytes, little-endian) - blob_data.extend_from_slice(&(creator.config.expansion_search as u16).to_le_bytes()); - // Write total_rows (8 bytes, little-endian) - blob_data.extend_from_slice(&creator.current_row_offset.to_le_bytes()); - // Write indexed_rows (8 bytes, little-endian) - blob_data.extend_from_slice(&creator.next_hnsw_key.to_le_bytes()); - // Write NULL bitmap length (4 bytes, little-endian) let bitmap_len: u32 = null_bitmap_bytes.len().try_into().map_err(|_| { VectorIndexBuildSnafu { reason: format!( @@ -601,7 +581,24 @@ impl VectorIndexer { } .build() })?; - blob_data.extend_from_slice(&bitmap_len.to_le_bytes()); + let header = VectorIndexBlobHeader::new( + creator.engine_type(), + creator.config.dim as u32, + creator.metric(), + creator.config.connectivity as u16, + creator.config.expansion_add as u16, + creator.config.expansion_search as u16, + creator.current_row_offset, + creator.next_hnsw_key, + bitmap_len, + ) + .map_err(|e| { + VectorIndexFinishSnafu { + reason: e.to_string(), + } + .build() + })?; + header.encode_into(&mut blob_data); // Write NULL bitmap blob_data.extend_from_slice(&null_bitmap_bytes); // Write vector index @@ -686,7 +683,78 @@ impl VectorIndexer { #[cfg(test)] mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use api::v1::SemanticType; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use datatypes::value::ValueRef; + use datatypes::vectors::{TimestampMillisecondVector, UInt8Vector, UInt64Vector}; + use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField}; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; + use store_api::storage::{ColumnId, FileId, RegionId}; + use super::*; + use crate::read::BatchColumn; + + fn mock_region_metadata_with_vector() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("tag", ConcreteDataType::int64_datatype(), true), + semantic_type: SemanticType::Tag, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("vec", ConcreteDataType::vector_datatype(2), true), + semantic_type: SemanticType::Field, + column_id: 3, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_u64", + ConcreteDataType::uint64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 4, + }) + .primary_key(vec![2]); + + Arc::new(builder.build().unwrap()) + } + + fn new_batch_missing_vector_column(column_id: ColumnId, rows: usize) -> Batch { + let fields = vec![(0, SortField::new(ConcreteDataType::int64_datatype()))]; + let codec = DensePrimaryKeyCodec::with_fields(fields); + let primary_key = codec.encode([ValueRef::Int64(1)].into_iter()).unwrap(); + + let field = BatchColumn { + column_id, + data: Arc::new(UInt64Vector::from_iter_values(0..rows as u64)), + }; + + Batch::new( + primary_key, + Arc::new(TimestampMillisecondVector::from_values( + (0..rows).map(|i| i as i64).collect::>(), + )), + Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(0, rows))), + Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(1, rows))), + vec![field], + ) + .unwrap() + } #[test] fn test_vector_index_creator() { @@ -837,23 +905,24 @@ mod tests { let mut index_bytes = vec![0u8; index_size]; creator.save_to_buffer(&mut index_bytes).unwrap(); - // Header: 33 bytes - let header_size = 33; - let total_size = header_size + null_bitmap_bytes.len() + index_bytes.len(); + let total_size = + VECTOR_INDEX_BLOB_HEADER_SIZE + null_bitmap_bytes.len() + index_bytes.len(); let mut blob_data = Vec::with_capacity(total_size); - // Write header fields - blob_data.push(1u8); // version - blob_data.push(creator.engine_type().as_u8()); // engine type - blob_data.extend_from_slice(&(creator.config.dim as u32).to_le_bytes()); // dimension - blob_data.push(creator.metric().as_u8()); // metric - blob_data.extend_from_slice(&(creator.config.connectivity as u16).to_le_bytes()); - blob_data.extend_from_slice(&(creator.config.expansion_add as u16).to_le_bytes()); - blob_data.extend_from_slice(&(creator.config.expansion_search as u16).to_le_bytes()); - blob_data.extend_from_slice(&creator.current_row_offset.to_le_bytes()); // total_rows - blob_data.extend_from_slice(&creator.next_hnsw_key.to_le_bytes()); // indexed_rows let bitmap_len: u32 = null_bitmap_bytes.len().try_into().unwrap(); - blob_data.extend_from_slice(&bitmap_len.to_le_bytes()); + let header = VectorIndexBlobHeader::new( + creator.engine_type(), + creator.config.dim as u32, + creator.metric(), + creator.config.connectivity as u16, + creator.config.expansion_add as u16, + creator.config.expansion_search as u16, + creator.current_row_offset, + creator.next_hnsw_key, + bitmap_len, + ) + .unwrap(); + header.encode_into(&mut blob_data); blob_data.extend_from_slice(&null_bitmap_bytes); blob_data.extend_from_slice(&index_bytes); @@ -861,60 +930,62 @@ mod tests { assert_eq!(blob_data.len(), total_size); // Parse header and verify values - assert_eq!(blob_data[0], 1); // version - assert_eq!(blob_data[1], VectorIndexEngineType::Usearch.as_u8()); // engine - - let dim = u32::from_le_bytes([blob_data[2], blob_data[3], blob_data[4], blob_data[5]]); - assert_eq!(dim, 4); - - let metric = blob_data[6]; + let (decoded, header_size) = VectorIndexBlobHeader::decode(&blob_data).unwrap(); + assert_eq!(header_size, VECTOR_INDEX_BLOB_HEADER_SIZE); + assert_eq!(decoded.engine_type, VectorIndexEngineType::Usearch); + assert_eq!(decoded.dim, 4); assert_eq!( - metric, - datatypes::schema::VectorDistanceMetric::L2sq.as_u8() + decoded.metric, + datatypes::schema::VectorDistanceMetric::L2sq ); - - let connectivity = u16::from_le_bytes([blob_data[7], blob_data[8]]); - assert_eq!(connectivity, 24); - - let expansion_add = u16::from_le_bytes([blob_data[9], blob_data[10]]); - assert_eq!(expansion_add, 200); - - let expansion_search = u16::from_le_bytes([blob_data[11], blob_data[12]]); - assert_eq!(expansion_search, 100); - - let total_rows = u64::from_le_bytes([ - blob_data[13], - blob_data[14], - blob_data[15], - blob_data[16], - blob_data[17], - blob_data[18], - blob_data[19], - blob_data[20], - ]); - assert_eq!(total_rows, 5); - - let indexed_rows = u64::from_le_bytes([ - blob_data[21], - blob_data[22], - blob_data[23], - blob_data[24], - blob_data[25], - blob_data[26], - blob_data[27], - blob_data[28], - ]); - assert_eq!(indexed_rows, 3); - - let null_bitmap_len = - u32::from_le_bytes([blob_data[29], blob_data[30], blob_data[31], blob_data[32]]); - assert_eq!(null_bitmap_len as usize, null_bitmap_bytes.len()); + assert_eq!(decoded.connectivity, 24); + assert_eq!(decoded.expansion_add, 200); + assert_eq!(decoded.expansion_search, 100); + assert_eq!(decoded.total_rows, 5); + assert_eq!(decoded.indexed_rows, 3); + assert_eq!(decoded.null_bitmap_len as usize, null_bitmap_bytes.len()); // Verify null bitmap can be deserialized - let null_bitmap_data = &blob_data[header_size..header_size + null_bitmap_len as usize]; + let null_bitmap_data = + &blob_data[header_size..header_size + decoded.null_bitmap_len as usize]; let restored_bitmap = RoaringBitmap::deserialize_from(null_bitmap_data).unwrap(); assert_eq!(restored_bitmap.len(), 2); // 2 nulls assert!(restored_bitmap.contains(1)); assert!(restored_bitmap.contains(3)); } + + #[tokio::test] + async fn test_vector_index_missing_column_as_nulls() { + let tempdir = common_test_util::temp_dir::create_temp_dir( + "test_vector_index_missing_column_as_nulls_", + ); + let intm_mgr = IntermediateManager::init_fs(tempdir.path().to_string_lossy()) + .await + .unwrap(); + let region_metadata = mock_region_metadata_with_vector(); + + let mut vector_index_options = HashMap::new(); + vector_index_options.insert(3, VectorIndexOptions::default()); + + let mut indexer = VectorIndexer::new( + FileId::random(), + ®ion_metadata, + intm_mgr, + None, + &vector_index_options, + ) + .unwrap() + .unwrap(); + + let mut batch = new_batch_missing_vector_column(4, 3); + indexer.update(&mut batch).await.unwrap(); + + let creator = indexer.creators.get(&3).unwrap(); + assert_eq!(creator.size(), 0); + assert_eq!(creator.current_row_offset, 3); + assert_eq!(creator.null_bitmap.len(), 3); + for idx in 0..3 { + assert!(creator.null_bitmap.contains(idx as u32)); + } + } } diff --git a/src/mito2/src/sst/index/vector_index/format.rs b/src/mito2/src/sst/index/vector_index/format.rs new file mode 100644 index 0000000000..ef0c81596e --- /dev/null +++ b/src/mito2/src/sst/index/vector_index/format.rs @@ -0,0 +1,324 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Vector index blob format helpers. + +use std::fmt; + +#[cfg(test)] +use datatypes::schema::VectorDistanceMetric as SchemaVectorDistanceMetric; +#[cfg(test)] +use index::vector::distance_metric_to_usearch; +use store_api::storage::{VectorDistanceMetric, VectorIndexEngineType}; + +pub(crate) const VECTOR_INDEX_BLOB_VERSION: u8 = 1; +pub(crate) const VECTOR_INDEX_BLOB_HEADER_SIZE: usize = 33; + +#[derive(Debug, Clone, Copy)] +pub(crate) struct VectorIndexBlobHeader { + pub engine_type: VectorIndexEngineType, + pub dim: u32, + pub metric: VectorDistanceMetric, + pub connectivity: u16, + pub expansion_add: u16, + pub expansion_search: u16, + pub total_rows: u64, + pub indexed_rows: u64, + pub null_bitmap_len: u32, +} + +impl VectorIndexBlobHeader { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + engine_type: VectorIndexEngineType, + dim: u32, + metric: VectorDistanceMetric, + connectivity: u16, + expansion_add: u16, + expansion_search: u16, + total_rows: u64, + indexed_rows: u64, + null_bitmap_len: u32, + ) -> Result { + if total_rows < indexed_rows { + return Err(VectorIndexBlobFormatError::InvalidRowCounts { + total: total_rows, + indexed: indexed_rows, + }); + } + if total_rows > u64::from(u32::MAX) || indexed_rows > u64::from(u32::MAX) { + return Err(VectorIndexBlobFormatError::RowsExceedU32 { + total: total_rows, + indexed: indexed_rows, + }); + } + Ok(Self { + engine_type, + dim, + metric, + connectivity, + expansion_add, + expansion_search, + total_rows, + indexed_rows, + null_bitmap_len, + }) + } + + pub(crate) fn encode_into(&self, buf: &mut Vec) { + buf.push(VECTOR_INDEX_BLOB_VERSION); + buf.push(self.engine_type.as_u8()); + buf.extend_from_slice(&self.dim.to_le_bytes()); + buf.push(self.metric.as_u8()); + buf.extend_from_slice(&self.connectivity.to_le_bytes()); + buf.extend_from_slice(&self.expansion_add.to_le_bytes()); + buf.extend_from_slice(&self.expansion_search.to_le_bytes()); + buf.extend_from_slice(&self.total_rows.to_le_bytes()); + buf.extend_from_slice(&self.indexed_rows.to_le_bytes()); + buf.extend_from_slice(&self.null_bitmap_len.to_le_bytes()); + } + + pub(crate) fn decode(data: &[u8]) -> Result<(Self, usize), VectorIndexBlobFormatError> { + if data.len() < VECTOR_INDEX_BLOB_HEADER_SIZE { + return Err(VectorIndexBlobFormatError::Truncated("header")); + } + + let mut offset = 0; + let version = read_u8(data, &mut offset)?; + if version != VECTOR_INDEX_BLOB_VERSION { + return Err(VectorIndexBlobFormatError::UnsupportedVersion(version)); + } + + let engine_type = VectorIndexEngineType::try_from_u8(read_u8(data, &mut offset)?) + .ok_or_else(|| VectorIndexBlobFormatError::UnknownEngine(data[offset - 1]))?; + let dim = read_u32(data, &mut offset)?; + let metric = VectorDistanceMetric::try_from_u8(read_u8(data, &mut offset)?) + .ok_or_else(|| VectorIndexBlobFormatError::UnknownMetric(data[offset - 1]))?; + let connectivity = read_u16(data, &mut offset)?; + let expansion_add = read_u16(data, &mut offset)?; + let expansion_search = read_u16(data, &mut offset)?; + let total_rows = read_u64(data, &mut offset)?; + let indexed_rows = read_u64(data, &mut offset)?; + let null_bitmap_len = read_u32(data, &mut offset)?; + + let header = VectorIndexBlobHeader::new( + engine_type, + dim, + metric, + connectivity, + expansion_add, + expansion_search, + total_rows, + indexed_rows, + null_bitmap_len, + )?; + Ok((header, offset)) + } +} + +#[derive(Debug)] +pub(crate) enum VectorIndexBlobFormatError { + Truncated(&'static str), + UnsupportedVersion(u8), + UnknownEngine(u8), + UnknownMetric(u8), + InvalidRowCounts { total: u64, indexed: u64 }, + RowsExceedU32 { total: u64, indexed: u64 }, +} + +impl fmt::Display for VectorIndexBlobFormatError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Truncated(label) => { + write!(f, "Vector index blob truncated while reading {}", label) + } + Self::UnsupportedVersion(version) => { + write!(f, "Unsupported vector index version {}", version) + } + Self::UnknownEngine(value) => write!(f, "Unknown vector index engine type {}", value), + Self::UnknownMetric(value) => write!(f, "Unknown vector index metric {}", value), + Self::InvalidRowCounts { total, indexed } => { + write!( + f, + "Total rows {} is smaller than indexed rows {}", + total, indexed + ) + } + Self::RowsExceedU32 { total, indexed } => { + write!( + f, + "Vector index rows exceed u32::MAX (total={}, indexed={})", + total, indexed + ) + } + } + } +} + +fn read_exact( + data: &[u8], + offset: &mut usize, + label: &'static str, +) -> Result<[u8; N], VectorIndexBlobFormatError> { + if *offset + N > data.len() { + return Err(VectorIndexBlobFormatError::Truncated(label)); + } + let mut buf = [0u8; N]; + buf.copy_from_slice(&data[*offset..*offset + N]); + *offset += N; + Ok(buf) +} + +fn read_u8(data: &[u8], offset: &mut usize) -> Result { + Ok(read_exact::<1>(data, offset, "u8")?[0]) +} + +fn read_u16(data: &[u8], offset: &mut usize) -> Result { + Ok(u16::from_le_bytes(read_exact::<2>(data, offset, "u16")?)) +} + +fn read_u32(data: &[u8], offset: &mut usize) -> Result { + Ok(u32::from_le_bytes(read_exact::<4>(data, offset, "u32")?)) +} + +fn read_u64(data: &[u8], offset: &mut usize) -> Result { + Ok(u64::from_le_bytes(read_exact::<8>(data, offset, "u64")?)) +} + +#[cfg(test)] +mod tests { + use roaring::RoaringBitmap; + use store_api::storage::VectorIndexEngineType; + + use super::*; + use crate::sst::index::vector_index::creator::VectorIndexConfig; + use crate::sst::index::vector_index::engine; + + #[test] + fn test_vector_index_blob_header_roundtrip() { + let header = VectorIndexBlobHeader::new( + VectorIndexEngineType::Usearch, + 4, + VectorDistanceMetric::L2sq, + 24, + 200, + 100, + 5, + 3, + 16, + ) + .unwrap(); + let mut bytes = Vec::new(); + header.encode_into(&mut bytes); + + let (decoded, offset) = VectorIndexBlobHeader::decode(&bytes).unwrap(); + assert_eq!(offset, VECTOR_INDEX_BLOB_HEADER_SIZE); + assert_eq!(decoded.engine_type, header.engine_type); + assert_eq!(decoded.dim, header.dim); + assert_eq!(decoded.metric, header.metric); + assert_eq!(decoded.connectivity, header.connectivity); + assert_eq!(decoded.expansion_add, header.expansion_add); + assert_eq!(decoded.expansion_search, header.expansion_search); + assert_eq!(decoded.total_rows, header.total_rows); + assert_eq!(decoded.indexed_rows, header.indexed_rows); + assert_eq!(decoded.null_bitmap_len, header.null_bitmap_len); + } + + #[test] + fn test_vector_index_blob_header_invalid_version() { + let mut blob = vec![0u8; VECTOR_INDEX_BLOB_HEADER_SIZE]; + blob[0] = 2; + assert!(VectorIndexBlobHeader::decode(&blob).is_err()); + } + + #[test] + fn test_vector_index_blob_header_truncated() { + let blob = vec![0u8; VECTOR_INDEX_BLOB_HEADER_SIZE - 1]; + assert!(VectorIndexBlobHeader::decode(&blob).is_err()); + } + + #[test] + fn test_vector_index_blob_parse_roundtrip() { + let config = VectorIndexConfig { + engine: VectorIndexEngineType::Usearch, + dim: 2, + metric: distance_metric_to_usearch(VectorDistanceMetric::L2sq), + distance_metric: VectorDistanceMetric::L2sq, + connectivity: 16, + expansion_add: 128, + expansion_search: 64, + }; + let mut engine = engine::create_engine(config.engine, &config).unwrap(); + engine.add(0, &[0.0, 1.0]).unwrap(); + let index_size = engine.serialized_length(); + let mut index_bytes = vec![0u8; index_size]; + engine.save_to_buffer(&mut index_bytes).unwrap(); + + let null_bitmap = RoaringBitmap::new(); + let mut null_bitmap_bytes = Vec::new(); + null_bitmap.serialize_into(&mut null_bitmap_bytes).unwrap(); + + let header = VectorIndexBlobHeader::new( + config.engine, + config.dim as u32, + VectorDistanceMetric::L2sq, + config.connectivity as u16, + config.expansion_add as u16, + config.expansion_search as u16, + 1, + 1, + null_bitmap_bytes.len() as u32, + ) + .unwrap(); + let mut blob = Vec::new(); + header.encode_into(&mut blob); + blob.extend_from_slice(&null_bitmap_bytes); + blob.extend_from_slice(&index_bytes); + + let (decoded, offset) = VectorIndexBlobHeader::decode(&blob).unwrap(); + let null_bitmap_len = decoded.null_bitmap_len as usize; + let null_bitmap_data = &blob[offset..offset + null_bitmap_len]; + let restored_bitmap = RoaringBitmap::deserialize_from(null_bitmap_data).unwrap(); + + assert_eq!(decoded.metric, VectorDistanceMetric::L2sq); + assert_eq!(decoded.total_rows, 1); + assert_eq!(decoded.indexed_rows, 1); + assert_eq!(restored_bitmap.len(), 0); + } + + #[test] + fn test_vector_index_blob_header_format_matches_creator() { + let header = VectorIndexBlobHeader::new( + VectorIndexEngineType::Usearch, + 4, + VectorDistanceMetric::L2sq, + 24, + 200, + 100, + 5, + 3, + 2, + ) + .unwrap(); + let mut bytes = Vec::new(); + header.encode_into(&mut bytes); + + let (decoded, header_size) = VectorIndexBlobHeader::decode(&bytes).unwrap(); + assert_eq!(header_size, VECTOR_INDEX_BLOB_HEADER_SIZE); + assert_eq!(decoded.metric, SchemaVectorDistanceMetric::L2sq); + assert_eq!(decoded.total_rows, 5); + assert_eq!(decoded.indexed_rows, 3); + assert_eq!(decoded.null_bitmap_len, 2); + } +} diff --git a/src/mito2/src/sst/index/vector_index/mod.rs b/src/mito2/src/sst/index/vector_index/mod.rs index 764e9a5c7d..7708591085 100644 --- a/src/mito2/src/sst/index/vector_index/mod.rs +++ b/src/mito2/src/sst/index/vector_index/mod.rs @@ -14,8 +14,10 @@ //! Vector index module for HNSW-based approximate nearest neighbor search. +pub(crate) mod applier; pub(crate) mod creator; pub(crate) mod engine; +pub(crate) mod format; pub(crate) mod util; /// The blob type identifier for vector index in puffin files. diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 0484892b21..86e001b777 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -14,6 +14,8 @@ //! Parquet reader. +#[cfg(feature = "vector_index")] +use std::collections::BTreeSet; use std::collections::VecDeque; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -41,6 +43,8 @@ use table::predicate::Predicate; use crate::cache::CacheStrategy; use crate::cache::index::result_cache::PredicateKey; +#[cfg(feature = "vector_index")] +use crate::error::ApplyVectorIndexSnafu; use crate::error::{ ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu, ReadParquetSnafu, Result, @@ -61,6 +65,8 @@ use crate::sst::index::fulltext_index::applier::{ use crate::sst::index::inverted_index::applier::{ InvertedIndexApplierRef, InvertedIndexApplyMetrics, }; +#[cfg(feature = "vector_index")] +use crate::sst::index::vector_index::applier::VectorIndexApplierRef; use crate::sst::parquet::file_range::{ FileRangeContext, FileRangeContextRef, PreFilterMode, RangeBase, row_group_contains_delete, }; @@ -74,6 +80,7 @@ use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY}; const INDEX_TYPE_FULLTEXT: &str = "fulltext"; const INDEX_TYPE_INVERTED: &str = "inverted"; const INDEX_TYPE_BLOOM: &str = "bloom filter"; +const INDEX_TYPE_VECTOR: &str = "vector"; macro_rules! handle_index_error { ($err:expr, $file_handle:expr, $index_type:expr) => { @@ -117,6 +124,12 @@ pub struct ParquetReaderBuilder { inverted_index_appliers: [Option; 2], bloom_filter_index_appliers: [Option; 2], fulltext_index_appliers: [Option; 2], + /// Vector index applier for KNN search. + #[cfg(feature = "vector_index")] + vector_index_applier: Option, + /// Over-fetched k for vector index scan. + #[cfg(feature = "vector_index")] + vector_index_k: Option, /// Expected metadata of the region while reading the SST. /// This is usually the latest metadata of the region. The reader use /// it get the correct column id of a column by name. @@ -150,6 +163,10 @@ impl ParquetReaderBuilder { inverted_index_appliers: [None, None], bloom_filter_index_appliers: [None, None], fulltext_index_appliers: [None, None], + #[cfg(feature = "vector_index")] + vector_index_applier: None, + #[cfg(feature = "vector_index")] + vector_index_k: None, expected_metadata: None, flat_format: false, compaction: false, @@ -211,6 +228,19 @@ impl ParquetReaderBuilder { self } + /// Attaches the vector index applier to the builder. + #[cfg(feature = "vector_index")] + #[must_use] + pub(crate) fn vector_index_applier( + mut self, + applier: Option, + k: Option, + ) -> Self { + self.vector_index_applier = applier; + self.vector_index_k = k; + self + } + /// Attaches the expected metadata to the builder. #[must_use] pub fn expected_metadata(mut self, expected_metadata: Option) -> Self { @@ -572,6 +602,19 @@ impl ParquetReaderBuilder { ) .await; } + #[cfg(feature = "vector_index")] + { + self.prune_row_groups_by_vector_index( + row_group_size, + num_row_groups, + &mut output, + metrics, + ) + .await; + if output.is_empty() { + return output; + } + } output } @@ -799,6 +842,48 @@ impl ParquetReaderBuilder { pruned } + /// Prunes row groups by vector index results. + #[cfg(feature = "vector_index")] + async fn prune_row_groups_by_vector_index( + &self, + row_group_size: usize, + num_row_groups: usize, + output: &mut RowGroupSelection, + metrics: &mut ReaderFilterMetrics, + ) { + let Some(applier) = &self.vector_index_applier else { + return; + }; + let Some(k) = self.vector_index_k else { + return; + }; + if !self.file_handle.meta_ref().vector_index_available() { + return; + } + + let file_size_hint = self.file_handle.meta_ref().index_file_size(); + let apply_res = applier + .apply_with_k(self.file_handle.index_id(), Some(file_size_hint), k) + .await; + let row_ids = match apply_res { + Ok(res) => res.row_offsets, + Err(err) => { + handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR); + return; + } + }; + + let selection = match vector_selection_from_offsets(row_ids, row_group_size, num_row_groups) + { + Ok(selection) => selection, + Err(err) => { + handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR); + return; + } + }; + apply_selection_and_update_metrics(output, &selection, metrics, INDEX_TYPE_VECTOR); + } + async fn prune_row_groups_by_fulltext_bloom( &self, row_group_size: usize, @@ -983,6 +1068,29 @@ fn apply_selection_and_update_metrics( *output = intersection; } +#[cfg(feature = "vector_index")] +fn vector_selection_from_offsets( + row_offsets: Vec, + row_group_size: usize, + num_row_groups: usize, +) -> Result { + let mut row_ids = BTreeSet::new(); + for offset in row_offsets { + let row_id = u32::try_from(offset).map_err(|_| { + ApplyVectorIndexSnafu { + reason: format!("Row offset {} exceeds u32::MAX", offset), + } + .build() + })?; + row_ids.insert(row_id); + } + Ok(RowGroupSelection::from_row_ids( + row_ids, + row_group_size, + num_row_groups, + )) +} + fn all_required_row_groups_searched( required_row_groups: &RowGroupSelection, cached_row_groups: &RowGroupSelection, @@ -1008,6 +1116,8 @@ pub(crate) struct ReaderFilterMetrics { pub(crate) rg_minmax_filtered: usize, /// Number of row groups filtered by bloom filter index. pub(crate) rg_bloom_filtered: usize, + /// Number of row groups filtered by vector index. + pub(crate) rg_vector_filtered: usize, /// Number of rows in row group before filtering. pub(crate) rows_total: usize, @@ -1017,6 +1127,8 @@ pub(crate) struct ReaderFilterMetrics { pub(crate) rows_inverted_filtered: usize, /// Number of rows in row group filtered by bloom filter index. pub(crate) rows_bloom_filtered: usize, + /// Number of rows filtered by vector index. + pub(crate) rows_vector_filtered: usize, /// Number of rows filtered by precise filter. pub(crate) rows_precise_filtered: usize, @@ -1036,11 +1148,13 @@ impl ReaderFilterMetrics { self.rg_inverted_filtered += other.rg_inverted_filtered; self.rg_minmax_filtered += other.rg_minmax_filtered; self.rg_bloom_filtered += other.rg_bloom_filtered; + self.rg_vector_filtered += other.rg_vector_filtered; self.rows_total += other.rows_total; self.rows_fulltext_filtered += other.rows_fulltext_filtered; self.rows_inverted_filtered += other.rows_inverted_filtered; self.rows_bloom_filtered += other.rows_bloom_filtered; + self.rows_vector_filtered += other.rows_vector_filtered; self.rows_precise_filtered += other.rows_precise_filtered; // Merge optional applier metrics @@ -1078,6 +1192,9 @@ impl ReaderFilterMetrics { READ_ROW_GROUPS_TOTAL .with_label_values(&["bloom_filter_index_filtered"]) .inc_by(self.rg_bloom_filtered as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["vector_index_filtered"]) + .inc_by(self.rg_vector_filtered as u64); PRECISE_FILTER_ROWS_TOTAL .with_label_values(&["parquet"]) @@ -1094,6 +1211,9 @@ impl ReaderFilterMetrics { READ_ROWS_IN_ROW_GROUP_TOTAL .with_label_values(&["bloom_filter_index_filtered"]) .inc_by(self.rows_bloom_filtered as u64); + READ_ROWS_IN_ROW_GROUP_TOTAL + .with_label_values(&["vector_index_filtered"]) + .inc_by(self.rows_vector_filtered as u64); } fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) { @@ -1110,11 +1230,67 @@ impl ReaderFilterMetrics { self.rg_bloom_filtered += row_group_count; self.rows_bloom_filtered += row_count; } + INDEX_TYPE_VECTOR => { + self.rg_vector_filtered += row_group_count; + self.rows_vector_filtered += row_count; + } _ => {} } } } +#[cfg(all(test, feature = "vector_index"))] +mod tests { + use super::*; + + #[test] + fn test_vector_selection_from_offsets() { + let row_group_size = 4; + let num_row_groups = 3; + let selection = + vector_selection_from_offsets(vec![0, 1, 5, 9], row_group_size, num_row_groups) + .unwrap(); + + assert_eq!(selection.row_group_count(), 3); + assert_eq!(selection.row_count(), 4); + assert!(selection.contains_non_empty_row_group(0)); + assert!(selection.contains_non_empty_row_group(1)); + assert!(selection.contains_non_empty_row_group(2)); + } + + #[test] + fn test_vector_selection_from_offsets_out_of_range() { + let row_group_size = 4; + let num_row_groups = 2; + let selection = vector_selection_from_offsets( + vec![0, 7, u64::from(u32::MAX) + 1], + row_group_size, + num_row_groups, + ); + assert!(selection.is_err()); + } + + #[test] + fn test_vector_selection_updates_metrics() { + let row_group_size = 4; + let total_rows = 8; + let mut output = RowGroupSelection::new(row_group_size, total_rows); + let selection = vector_selection_from_offsets(vec![1], row_group_size, 2).unwrap(); + let mut metrics = ReaderFilterMetrics::default(); + + apply_selection_and_update_metrics( + &mut output, + &selection, + &mut metrics, + INDEX_TYPE_VECTOR, + ); + + assert_eq!(metrics.rg_vector_filtered, 1); + assert_eq!(metrics.rows_vector_filtered, 7); + assert_eq!(output.row_count(), 1); + } +} + /// Metrics for parquet metadata cache operations. #[derive(Default, Clone, Copy)] pub(crate) struct MetadataCacheMetrics { diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 36210046f6..7bf31ff587 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [features] dashboard = [] +vector_index = [] [lints] workspace = true diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index d70fc324a3..200123f8d6 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1379,6 +1379,19 @@ providers = []"#, ) }; + let vector_index_config = if cfg!(feature = "vector_index") { + r#" +[region_engine.mito.vector_index] +create_on_flush = "auto" +create_on_compaction = "auto" +apply_on_query = "auto" +mem_threshold_on_create = "auto" + +"# + } else { + "\n" + }; + let expected_toml_str = format!( r#" enable_telemetry = true @@ -1545,14 +1558,7 @@ create_on_flush = "auto" create_on_compaction = "auto" apply_on_query = "auto" mem_threshold_on_create = "auto" - -[region_engine.mito.vector_index] -create_on_flush = "auto" -create_on_compaction = "auto" -apply_on_query = "auto" -mem_threshold_on_create = "auto" - -[region_engine.mito.memtable] +{vector_index_config}[region_engine.mito.memtable] type = "time_series" [region_engine.mito.gc]