From e4b5ef275fc72f51f61f424c12dbd62534530587 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Tue, 30 Dec 2025 11:38:51 +0800 Subject: [PATCH] feat: impl vector index building (#7468) * feat: impl vector index building Signed-off-by: Dennis Zhuang * feat: supports flat format Signed-off-by: Dennis Zhuang * ci: add vector_index feature to test Signed-off-by: Dennis Zhuang * chore: apply suggestions Signed-off-by: Dennis Zhuang * chore: apply suggestions from copilot Signed-off-by: Dennis Zhuang --------- Signed-off-by: Dennis Zhuang --- .github/workflows/develop.yml | 4 +- Cargo.lock | 3 +- src/cmd/Cargo.toml | 1 + src/cmd/src/datanode/objbench.rs | 2 + src/mito2/Cargo.toml | 7 +- src/mito2/src/access_layer.rs | 4 + src/mito2/src/cache/write_cache.rs | 8 + src/mito2/src/compaction/compactor.rs | 4 + src/mito2/src/config.rs | 50 + src/mito2/src/error.rs | 19 + src/mito2/src/flush.rs | 2 + src/mito2/src/sst/file.rs | 3 + src/mito2/src/sst/index.rs | 257 ++++- src/mito2/src/sst/index/indexer/abort.rs | 24 + src/mito2/src/sst/index/indexer/finish.rs | 68 ++ src/mito2/src/sst/index/indexer/update.rs | 60 ++ .../src/sst/index/vector_index/creator.rs | 920 ++++++++++++++++++ .../src/sst/index/vector_index/engine/mod.rs | 45 + .../index/vector_index/engine/usearch_impl.rs | 231 +++++ src/mito2/src/sst/index/vector_index/mod.rs | 22 + src/mito2/src/sst/index/vector_index/util.rs | 108 ++ src/mito2/src/sst/parquet.rs | 6 + src/mito2/src/worker/handle_rebuild_index.rs | 2 + src/store-api/src/metadata.rs | 18 +- tests-integration/tests/http.rs | 6 + 25 files changed, 1864 insertions(+), 10 deletions(-) create mode 100644 src/mito2/src/sst/index/vector_index/creator.rs create mode 100644 src/mito2/src/sst/index/vector_index/engine/mod.rs create mode 100644 src/mito2/src/sst/index/vector_index/engine/usearch_impl.rs create mode 100644 src/mito2/src/sst/index/vector_index/mod.rs create mode 100644 src/mito2/src/sst/index/vector_index/util.rs diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index af5ddc5368..cf5434a998 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -755,7 +755,7 @@ jobs: run: ../../.github/scripts/pull-test-deps-images.sh && docker compose up -d --wait - name: Run nextest cases - run: cargo nextest run --workspace -F dashboard -F pg_kvbackend -F mysql_kvbackend + run: cargo nextest run --workspace -F dashboard -F pg_kvbackend -F mysql_kvbackend -F vector_index env: CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold" RUST_BACKTRACE: 1 @@ -813,7 +813,7 @@ jobs: run: ../../.github/scripts/pull-test-deps-images.sh && docker compose up -d --wait - name: Run nextest cases - run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend -F mysql_kvbackend + run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend -F mysql_kvbackend -F vector_index env: CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold" RUST_BACKTRACE: 1 diff --git a/Cargo.lock b/Cargo.lock index 6a21760ffb..c8f8faedbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7779,7 +7779,6 @@ dependencies = [ "either", "futures", "greptime-proto", - "humantime", "humantime-serde", "index", "itertools 0.14.0", @@ -7798,6 +7797,7 @@ dependencies = [ "rand 0.9.1", "rayon", "regex", + "roaring", "rskafka", "rstest", "rstest_reuse", @@ -7816,6 +7816,7 @@ dependencies = [ "tokio-util", "toml 0.8.23", "tracing", + "usearch", "uuid", ] diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index d279ddb7f0..a70f164997 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -18,6 +18,7 @@ default = [ ] enterprise = ["common-meta/enterprise", "frontend/enterprise", "meta-srv/enterprise"] tokio-console = ["common-telemetry/tokio-console"] +vector_index = ["mito2/vector_index"] [lints] workspace = true diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs index 0a3f27b77e..1b06bf449a 100644 --- a/src/cmd/src/datanode/objbench.rs +++ b/src/cmd/src/datanode/objbench.rs @@ -233,6 +233,8 @@ impl ObjbenchCommand { inverted_index_config: MitoConfig::default().inverted_index, fulltext_index_config, bloom_filter_index_config: MitoConfig::default().bloom_filter_index, + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), }; // Write SST diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index c453534317..5590c17b05 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true default = [] test = ["common-test-util", "rstest", "rstest_reuse", "rskafka"] enterprise = [] +vector_index = ["dep:usearch", "dep:roaring", "index/vector_index"] [lints] workspace = true @@ -28,9 +29,10 @@ common-datasource.workspace = true common-decimal.workspace = true common-error.workspace = true common-grpc.workspace = true +common-function.workspace = true common-macro.workspace = true -common-meta.workspace = true common-memory-manager.workspace = true +common-meta.workspace = true common-query.workspace = true common-recordbatch.workspace = true common-runtime.workspace = true @@ -49,7 +51,6 @@ dotenv.workspace = true either.workspace = true futures.workspace = true humantime-serde.workspace = true -humantime.workspace = true index.workspace = true itertools.workspace = true greptime-proto.workspace = true @@ -67,6 +68,7 @@ partition.workspace = true puffin.workspace = true rand.workspace = true rayon = "1.10" +roaring = { version = "0.10", optional = true } regex.workspace = true rskafka = { workspace = true, optional = true } rstest = { workspace = true, optional = true } @@ -84,6 +86,7 @@ tokio.workspace = true tokio-stream.workspace = true tokio-util.workspace = true tracing.workspace = true +usearch = { version = "2.21", default-features = false, features = ["fp16lib"], optional = true } uuid.workspace = true [dev-dependencies] diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 8888ade815..049f8e1180 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -313,6 +313,8 @@ impl AccessLayer { inverted_index_config: request.inverted_index_config, fulltext_index_config: request.fulltext_index_config, bloom_filter_index_config: request.bloom_filter_index_config, + #[cfg(feature = "vector_index")] + vector_index_config: request.vector_index_config, }; // We disable write cache on file system but we still use atomic write. // TODO(yingwen): If we support other non-fs stores without the write cache, then @@ -467,6 +469,8 @@ pub struct SstWriteRequest { pub inverted_index_config: InvertedIndexConfig, pub fulltext_index_config: FulltextIndexConfig, pub bloom_filter_index_config: BloomFilterConfig, + #[cfg(feature = "vector_index")] + pub vector_index_config: crate::config::VectorIndexConfig, } /// Cleaner to remove temp files on the atomic write dir. diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 25d0d7b060..e074b7de7b 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -227,6 +227,8 @@ impl WriteCache { inverted_index_config: write_request.inverted_index_config, fulltext_index_config: write_request.fulltext_index_config, bloom_filter_index_config: write_request.bloom_filter_index_config, + #[cfg(feature = "vector_index")] + vector_index_config: write_request.vector_index_config, }; let cleaner = TempFileCleaner::new(region_id, store.clone()); @@ -520,6 +522,8 @@ mod tests { inverted_index_config: Default::default(), fulltext_index_config: Default::default(), bloom_filter_index_config: Default::default(), + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), }; let upload_request = SstUploadRequest { @@ -620,6 +624,8 @@ mod tests { inverted_index_config: Default::default(), fulltext_index_config: Default::default(), bloom_filter_index_config: Default::default(), + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), }; let write_opts = WriteOptions { row_group_size: 512, @@ -701,6 +707,8 @@ mod tests { inverted_index_config: Default::default(), fulltext_index_config: Default::default(), bloom_filter_index_config: Default::default(), + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), }; let write_opts = WriteOptions { row_group_size: 512, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 4857f79ba5..1876972b0d 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -332,6 +332,8 @@ impl DefaultCompactor { let inverted_index_config = compaction_region.engine_config.inverted_index.clone(); let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone(); let bloom_filter_index_config = compaction_region.engine_config.bloom_filter_index.clone(); + #[cfg(feature = "vector_index")] + let vector_index_config = compaction_region.engine_config.vector_index.clone(); let input_file_names = output .inputs @@ -378,6 +380,8 @@ impl DefaultCompactor { inverted_index_config, fulltext_index_config, bloom_filter_index_config, + #[cfg(feature = "vector_index")] + vector_index_config, }, &write_opts, &mut metrics, diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 767eb2c81b..602f5508ba 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -158,6 +158,9 @@ pub struct MitoConfig { pub fulltext_index: FulltextIndexConfig, /// Bloom filter index configs. pub bloom_filter_index: BloomFilterConfig, + /// Vector index configs (HNSW). + #[cfg(feature = "vector_index")] + pub vector_index: VectorIndexConfig, /// Memtable config pub memtable: MemtableConfig, @@ -214,6 +217,8 @@ impl Default for MitoConfig { inverted_index: InvertedIndexConfig::default(), fulltext_index: FulltextIndexConfig::default(), bloom_filter_index: BloomFilterConfig::default(), + #[cfg(feature = "vector_index")] + vector_index: VectorIndexConfig::default(), memtable: MemtableConfig::default(), min_compaction_interval: Duration::from_secs(0), default_experimental_flat_format: false, @@ -643,6 +648,51 @@ impl BloomFilterConfig { } } +/// Configuration options for the vector index (HNSW). +#[cfg(feature = "vector_index")] +#[serde_as] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(default)] +pub struct VectorIndexConfig { + /// Whether to create the index on flush: automatically or never. + pub create_on_flush: Mode, + /// Whether to create the index on compaction: automatically or never. + pub create_on_compaction: Mode, + /// Whether to apply the index on query: automatically or never. + pub apply_on_query: Mode, + /// Memory threshold for creating the index. + pub mem_threshold_on_create: MemoryThreshold, +} + +#[cfg(feature = "vector_index")] +impl Default for VectorIndexConfig { + fn default() -> Self { + Self { + create_on_flush: Mode::Auto, + create_on_compaction: Mode::Auto, + apply_on_query: Mode::Auto, + mem_threshold_on_create: MemoryThreshold::Auto, + } + } +} + +#[cfg(feature = "vector_index")] +impl VectorIndexConfig { + pub fn mem_threshold_on_create(&self) -> Option { + match self.mem_threshold_on_create { + MemoryThreshold::Auto => { + if let Some(sys_memory) = get_total_memory_readable() { + Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize) + } else { + Some(ReadableSize::mb(64).as_bytes() as usize) + } + } + MemoryThreshold::Unlimited => None, + MemoryThreshold::Size(size) => Some(size.as_bytes() as usize), + } + } +} + /// Divide cpu num by a non-zero `divisor` and returns at least 1. fn divide_num_cpus(divisor: usize) -> usize { debug_assert!(divisor > 0); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index ae7b02be7c..85edddde20 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -1039,6 +1039,22 @@ pub enum Error { location: Location, }, + #[cfg(feature = "vector_index")] + #[snafu(display("Failed to build vector index: {}", reason))] + VectorIndexBuild { + reason: String, + #[snafu(implicit)] + location: Location, + }, + + #[cfg(feature = "vector_index")] + #[snafu(display("Failed to finish vector index: {}", reason))] + VectorIndexFinish { + reason: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Manual compaction is override by following operations."))] ManualCompactionOverride {}, @@ -1345,6 +1361,9 @@ impl ErrorExt for Error { source.status_code() } + #[cfg(feature = "vector_index")] + VectorIndexBuild { .. } | VectorIndexFinish { .. } => StatusCode::Internal, + ManualCompactionOverride {} => StatusCode::Cancelled, CompactionMemoryExhausted { source, .. } => source.status_code(), diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 5da5c006ed..c769e569aa 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -669,6 +669,8 @@ impl RegionFlushTask { inverted_index_config: self.engine_config.inverted_index.clone(), fulltext_index_config: self.engine_config.fulltext_index.clone(), bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(), + #[cfg(feature = "vector_index")] + vector_index_config: self.engine_config.vector_index.clone(), } } diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index ccd6e931e2..7a64323649 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -287,6 +287,9 @@ pub enum IndexType { FulltextIndex, /// Bloom Filter index BloomFilterIndex, + /// Vector index (HNSW). + #[cfg(feature = "vector_index")] + VectorIndex, } /// Metadata of indexes created for a specific column in an SST file. diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 9b624062bf..4c4bcfeb6d 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -20,6 +20,8 @@ pub(crate) mod inverted_index; pub mod puffin_manager; mod statistics; pub(crate) mod store; +#[cfg(feature = "vector_index")] +pub(crate) mod vector_index; use std::cmp::Ordering; use std::collections::{BinaryHeap, HashMap, HashSet}; @@ -41,10 +43,14 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, FileId, RegionId}; use strum::IntoStaticStr; use tokio::sync::mpsc::Sender; +#[cfg(feature = "vector_index")] +use vector_index::creator::VectorIndexer; use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory}; use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::cache::write_cache::{UploadTracker, WriteCacheRef}; +#[cfg(feature = "vector_index")] +use crate::config::VectorIndexConfig; use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; use crate::error::{ BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, RegionClosedSnafu, @@ -76,6 +82,8 @@ use crate::worker::WorkerListener; pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index"; pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index"; pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index"; +#[cfg(feature = "vector_index")] +pub(crate) const TYPE_VECTOR_INDEX: &str = "vector_index"; /// Triggers background download of an index file to the local cache. pub(crate) fn trigger_index_background_download( @@ -114,6 +122,9 @@ pub struct IndexOutput { pub fulltext_index: FulltextIndexOutput, /// Bloom filter output. pub bloom_filter: BloomFilterOutput, + /// Vector index output. + #[cfg(feature = "vector_index")] + pub vector_index: VectorIndexOutput, } impl IndexOutput { @@ -128,6 +139,10 @@ impl IndexOutput { if self.bloom_filter.is_available() { indexes.push(IndexType::BloomFilterIndex); } + #[cfg(feature = "vector_index")] + if self.vector_index.is_available() { + indexes.push(IndexType::VectorIndex); + } indexes } @@ -151,6 +166,12 @@ impl IndexOutput { .push(IndexType::BloomFilterIndex); } } + #[cfg(feature = "vector_index")] + if self.vector_index.is_available() { + for &col in &self.vector_index.columns { + map.entry(col).or_default().push(IndexType::VectorIndex); + } + } map.into_iter() .map(|(column_id, created_indexes)| ColumnIndexMetadata { @@ -184,6 +205,9 @@ pub type InvertedIndexOutput = IndexBaseOutput; pub type FulltextIndexOutput = IndexBaseOutput; /// Output of the bloom filter creation. pub type BloomFilterOutput = IndexBaseOutput; +/// Output of the vector index creation. +#[cfg(feature = "vector_index")] +pub type VectorIndexOutput = IndexBaseOutput; /// The index creator that hides the error handling details. #[derive(Default)] @@ -199,6 +223,10 @@ pub struct Indexer { last_mem_fulltext_index: usize, bloom_filter_indexer: Option, last_mem_bloom_filter: usize, + #[cfg(feature = "vector_index")] + vector_indexer: Option, + #[cfg(feature = "vector_index")] + last_mem_vector_index: usize, intermediate_manager: Option, } @@ -259,6 +287,18 @@ impl Indexer { .with_label_values(&[TYPE_BLOOM_FILTER_INDEX]) .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64); self.last_mem_bloom_filter = bloom_filter_mem; + + #[cfg(feature = "vector_index")] + { + let vector_mem = self + .vector_indexer + .as_ref() + .map_or(0, |creator| creator.memory_usage()); + INDEX_CREATE_MEMORY_USAGE + .with_label_values(&[TYPE_VECTOR_INDEX]) + .add(vector_mem as i64 - self.last_mem_vector_index as i64); + self.last_mem_vector_index = vector_mem; + } } } @@ -279,6 +319,8 @@ pub(crate) struct IndexerBuilderImpl { pub(crate) inverted_index_config: InvertedIndexConfig, pub(crate) fulltext_index_config: FulltextIndexConfig, pub(crate) bloom_filter_index_config: BloomFilterConfig, + #[cfg(feature = "vector_index")] + pub(crate) vector_index_config: VectorIndexConfig, } #[async_trait::async_trait] @@ -296,11 +338,23 @@ impl IndexerBuilder for IndexerBuilderImpl { indexer.inverted_indexer = self.build_inverted_indexer(file_id); indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await; indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id); - indexer.intermediate_manager = Some(self.intermediate_manager.clone()); - if indexer.inverted_indexer.is_none() - && indexer.fulltext_indexer.is_none() - && indexer.bloom_filter_indexer.is_none() + #[cfg(feature = "vector_index")] { + indexer.vector_indexer = self.build_vector_indexer(file_id); + } + indexer.intermediate_manager = Some(self.intermediate_manager.clone()); + + #[cfg(feature = "vector_index")] + let has_any_indexer = indexer.inverted_indexer.is_some() + || indexer.fulltext_indexer.is_some() + || indexer.bloom_filter_indexer.is_some() + || indexer.vector_indexer.is_some(); + #[cfg(not(feature = "vector_index"))] + let has_any_indexer = indexer.inverted_indexer.is_some() + || indexer.fulltext_indexer.is_some() + || indexer.bloom_filter_indexer.is_some(); + + if !has_any_indexer { indexer.abort().await; return Indexer::default(); } @@ -476,6 +530,69 @@ impl IndexerBuilderImpl { None } + + #[cfg(feature = "vector_index")] + fn build_vector_indexer(&self, file_id: FileId) -> Option { + let create = match self.build_type { + IndexBuildType::Flush => self.vector_index_config.create_on_flush.auto(), + IndexBuildType::Compact => self.vector_index_config.create_on_compaction.auto(), + _ => true, + }; + + if !create { + debug!( + "Skip creating vector index due to config, region_id: {}, file_id: {}", + self.metadata.region_id, file_id, + ); + return None; + } + + // Get vector index column IDs and options from metadata + let vector_index_options = self.metadata.vector_indexed_column_ids(); + if vector_index_options.is_empty() { + debug!( + "No vector columns to index, skip creating vector index, region_id: {}, file_id: {}", + self.metadata.region_id, file_id, + ); + return None; + } + + let mem_limit = self.vector_index_config.mem_threshold_on_create(); + let indexer = VectorIndexer::new( + file_id, + &self.metadata, + self.intermediate_manager.clone(), + mem_limit, + &vector_index_options, + ); + + let err = match indexer { + Ok(indexer) => { + if indexer.is_none() { + debug!( + "Skip creating vector index due to no columns require indexing, region_id: {}, file_id: {}", + self.metadata.region_id, file_id, + ); + } + return indexer; + } + Err(err) => err, + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to create vector index, region_id: {}, file_id: {}, err: {:?}", + self.metadata.region_id, file_id, err + ); + } else { + warn!( + err; "Failed to create vector index, region_id: {}, file_id: {}", + self.metadata.region_id, file_id, + ); + } + + None + } } /// Type of an index build task. @@ -1115,6 +1232,8 @@ mod tests { with_inverted: bool, with_fulltext: bool, with_skipping_bloom: bool, + #[cfg(feature = "vector_index")] + with_vector: bool, } fn mock_region_metadata( @@ -1122,6 +1241,8 @@ mod tests { with_inverted, with_fulltext, with_skipping_bloom, + #[cfg(feature = "vector_index")] + with_vector, }: MetaConfig, ) -> RegionMetadataRef { let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2)); @@ -1187,6 +1308,24 @@ mod tests { builder.push_column_metadata(column); } + #[cfg(feature = "vector_index")] + if with_vector { + use index::vector::VectorIndexOptions; + + let options = VectorIndexOptions::default(); + let column_schema = + ColumnSchema::new("vec", ConcreteDataType::vector_datatype(4), true) + .with_vector_index_options(&options) + .unwrap(); + let column = ColumnMetadata { + column_schema, + semantic_type: SemanticType::Field, + column_id: 6, + }; + + builder.push_column_metadata(column); + } + Arc::new(builder.build().unwrap()) } @@ -1237,6 +1376,8 @@ mod tests { inverted_index_config: Default::default(), fulltext_index_config: Default::default(), bloom_filter_index_config: Default::default(), + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), }; let mut metrics = Metrics::new(WriteType::Flush); env.access_layer @@ -1287,6 +1428,8 @@ mod tests { inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), }) } @@ -1300,6 +1443,8 @@ mod tests { with_inverted: true, with_fulltext: true, with_skipping_bloom: true, + #[cfg(feature = "vector_index")] + with_vector: false, }); let indexer = IndexerBuilderImpl { build_type: IndexBuildType::Flush, @@ -1312,6 +1457,8 @@ mod tests { inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), } .build(FileId::random(), 0) .await; @@ -1331,6 +1478,8 @@ mod tests { with_inverted: true, with_fulltext: true, with_skipping_bloom: true, + #[cfg(feature = "vector_index")] + with_vector: false, }); let indexer = IndexerBuilderImpl { build_type: IndexBuildType::Flush, @@ -1346,6 +1495,8 @@ mod tests { }, fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), } .build(FileId::random(), 0) .await; @@ -1368,6 +1519,8 @@ mod tests { ..Default::default() }, bloom_filter_index_config: BloomFilterConfig::default(), + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), } .build(FileId::random(), 0) .await; @@ -1390,6 +1543,8 @@ mod tests { create_on_compaction: Mode::Disable, ..Default::default() }, + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), } .build(FileId::random(), 0) .await; @@ -1409,6 +1564,8 @@ mod tests { with_inverted: false, with_fulltext: true, with_skipping_bloom: true, + #[cfg(feature = "vector_index")] + with_vector: false, }); let indexer = IndexerBuilderImpl { build_type: IndexBuildType::Flush, @@ -1421,6 +1578,8 @@ mod tests { inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), } .build(FileId::random(), 0) .await; @@ -1433,6 +1592,8 @@ mod tests { with_inverted: true, with_fulltext: false, with_skipping_bloom: true, + #[cfg(feature = "vector_index")] + with_vector: false, }); let indexer = IndexerBuilderImpl { build_type: IndexBuildType::Flush, @@ -1445,6 +1606,8 @@ mod tests { inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), } .build(FileId::random(), 0) .await; @@ -1457,6 +1620,8 @@ mod tests { with_inverted: true, with_fulltext: true, with_skipping_bloom: false, + #[cfg(feature = "vector_index")] + with_vector: false, }); let indexer = IndexerBuilderImpl { build_type: IndexBuildType::Flush, @@ -1469,6 +1634,8 @@ mod tests { inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), } .build(FileId::random(), 0) .await; @@ -1488,6 +1655,8 @@ mod tests { with_inverted: true, with_fulltext: true, with_skipping_bloom: true, + #[cfg(feature = "vector_index")] + with_vector: false, }); let indexer = IndexerBuilderImpl { build_type: IndexBuildType::Flush, @@ -1500,6 +1669,8 @@ mod tests { inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), } .build(FileId::random(), 0) .await; @@ -1507,6 +1678,82 @@ mod tests { assert!(indexer.inverted_indexer.is_none()); } + #[cfg(feature = "vector_index")] + #[tokio::test] + async fn test_update_flat_builds_vector_index() { + use datatypes::arrow::array::BinaryBuilder; + use datatypes::arrow::datatypes::{DataType, Field, Schema}; + + struct TestPathProvider; + + impl FilePathProvider for TestPathProvider { + fn build_index_file_path(&self, file_id: RegionFileId) -> String { + format!("index/{}.puffin", file_id) + } + + fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String { + format!("index/{}.puffin", index_id) + } + + fn build_sst_file_path(&self, file_id: RegionFileId) -> String { + format!("sst/{}.parquet", file_id) + } + } + + fn f32s_to_bytes(values: &[f32]) -> Vec { + let mut bytes = Vec::with_capacity(values.len() * 4); + for v in values { + bytes.extend_from_slice(&v.to_le_bytes()); + } + bytes + } + + let (dir, factory) = + PuffinManagerFactory::new_for_test_async("test_update_flat_builds_vector_index_").await; + let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await; + + let metadata = mock_region_metadata(MetaConfig { + with_inverted: false, + with_fulltext: false, + with_skipping_bloom: false, + with_vector: true, + }); + + let mut indexer = IndexerBuilderImpl { + build_type: IndexBuildType::Flush, + metadata, + row_group_size: 1024, + puffin_manager: factory.build(mock_object_store(), TestPathProvider), + write_cache_enabled: false, + intermediate_manager: intm_manager, + index_options: IndexOptions::default(), + inverted_index_config: InvertedIndexConfig::default(), + fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_index_config: BloomFilterConfig::default(), + vector_index_config: Default::default(), + } + .build(FileId::random(), 0) + .await; + + assert!(indexer.vector_indexer.is_some()); + + let vec1 = f32s_to_bytes(&[1.0, 0.0, 0.0, 0.0]); + let vec2 = f32s_to_bytes(&[0.0, 1.0, 0.0, 0.0]); + + let mut builder = BinaryBuilder::with_capacity(2, vec1.len() + vec2.len()); + builder.append_value(&vec1); + builder.append_value(&vec2); + + let schema = Arc::new(Schema::new(vec![Field::new("vec", DataType::Binary, true)])); + let batch = RecordBatch::try_new(schema, vec![Arc::new(builder.finish())]).unwrap(); + + indexer.update_flat(&batch).await; + let output = indexer.finish().await; + + assert!(output.vector_index.is_available()); + assert!(output.vector_index.columns.contains(&6)); + } + #[tokio::test] async fn test_index_build_task_sst_not_exist() { let env = SchedulerEnv::new().await; @@ -1839,6 +2086,8 @@ mod tests { inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), }); let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await; diff --git a/src/mito2/src/sst/index/indexer/abort.rs b/src/mito2/src/sst/index/indexer/abort.rs index 9a95554f22..ae1ec78d74 100644 --- a/src/mito2/src/sst/index/indexer/abort.rs +++ b/src/mito2/src/sst/index/indexer/abort.rs @@ -23,6 +23,8 @@ impl Indexer { self.do_abort_inverted_index().await; self.do_abort_fulltext_index().await; self.do_abort_bloom_filter().await; + #[cfg(feature = "vector_index")] + self.do_abort_vector_index().await; self.do_prune_intm_sst_dir().await; if self.write_cache_enabled { self.do_abort_clean_fs_temp_dir().await; @@ -106,4 +108,26 @@ impl Indexer { .to_string(); TempFileCleaner::clean_atomic_dir_files(fs_accessor.store().store(), &[&fs_handle]).await; } + + #[cfg(feature = "vector_index")] + async fn do_abort_vector_index(&mut self) { + let Some(mut indexer) = self.vector_indexer.take() else { + return; + }; + let Err(err) = indexer.abort().await else { + return; + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to abort vector index, region_id: {}, file_id: {}, err: {:?}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to abort vector index, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + } } diff --git a/src/mito2/src/sst/index/indexer/finish.rs b/src/mito2/src/sst/index/indexer/finish.rs index 4f620dfe42..9dc23fcf5d 100644 --- a/src/mito2/src/sst/index/indexer/finish.rs +++ b/src/mito2/src/sst/index/indexer/finish.rs @@ -17,6 +17,8 @@ use puffin::puffin_manager::{PuffinManager, PuffinWriter}; use store_api::storage::ColumnId; use crate::sst::file::{RegionFileId, RegionIndexId}; +#[cfg(feature = "vector_index")] +use crate::sst::index::VectorIndexOutput; use crate::sst::index::puffin_manager::SstPuffinWriter; use crate::sst::index::statistics::{ByteCount, RowCount}; use crate::sst::index::{ @@ -54,6 +56,15 @@ impl Indexer { return IndexOutput::default(); } + #[cfg(feature = "vector_index")] + { + let success = self.do_finish_vector_index(&mut writer, &mut output).await; + if !success { + self.do_abort().await; + return IndexOutput::default(); + } + } + self.do_prune_intm_sst_dir().await; output.file_size = self.do_finish_puffin_writer(writer).await; output.version = self.index_version; @@ -276,6 +287,63 @@ impl Indexer { output.columns = column_ids; } + #[cfg(feature = "vector_index")] + async fn do_finish_vector_index( + &mut self, + puffin_writer: &mut SstPuffinWriter, + index_output: &mut IndexOutput, + ) -> bool { + let Some(mut indexer) = self.vector_indexer.take() else { + return true; + }; + + let column_ids = indexer.column_ids().collect(); + let err = match indexer.finish(puffin_writer).await { + Ok((row_count, byte_count)) => { + self.fill_vector_index_output( + &mut index_output.vector_index, + row_count, + byte_count, + column_ids, + ); + return true; + } + Err(err) => err, + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to finish vector index, region_id: {}, file_id: {}, err: {:?}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to finish vector index, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + false + } + + #[cfg(feature = "vector_index")] + fn fill_vector_index_output( + &mut self, + output: &mut VectorIndexOutput, + row_count: RowCount, + byte_count: ByteCount, + column_ids: Vec, + ) { + debug!( + "Vector index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}", + self.region_id, self.file_id, byte_count, row_count, column_ids + ); + + output.index_size = byte_count; + output.row_count = row_count; + output.columns = column_ids; + } + pub(crate) async fn do_prune_intm_sst_dir(&mut self) { if let Some(manager) = self.intermediate_manager.take() && let Err(e) = manager.prune_sst_dir(&self.region_id, &self.file_id).await diff --git a/src/mito2/src/sst/index/indexer/update.rs b/src/mito2/src/sst/index/indexer/update.rs index b6808ea412..095da337f4 100644 --- a/src/mito2/src/sst/index/indexer/update.rs +++ b/src/mito2/src/sst/index/indexer/update.rs @@ -33,6 +33,10 @@ impl Indexer { if !self.do_update_bloom_filter(batch).await { self.do_abort().await; } + #[cfg(feature = "vector_index")] + if !self.do_update_vector_index(batch).await { + self.do_abort().await; + } } /// Returns false if the update failed. @@ -110,6 +114,32 @@ impl Indexer { false } + /// Returns false if the update failed. + #[cfg(feature = "vector_index")] + async fn do_update_vector_index(&mut self, batch: &mut Batch) -> bool { + let Some(creator) = self.vector_indexer.as_mut() else { + return true; + }; + + let Err(err) = creator.update(batch).await else { + return true; + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to update vector index, region_id: {}, file_id: {}, err: {:?}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to update vector index, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + false + } + pub(crate) async fn do_update_flat(&mut self, batch: &RecordBatch) { if batch.num_rows() == 0 { return; @@ -124,6 +154,10 @@ impl Indexer { if !self.do_update_flat_bloom_filter(batch).await { self.do_abort().await; } + #[cfg(feature = "vector_index")] + if !self.do_update_flat_vector_index(batch).await { + self.do_abort().await; + } } /// Returns false if the update failed. @@ -200,4 +234,30 @@ impl Indexer { false } + + /// Returns false if the update failed. + #[cfg(feature = "vector_index")] + async fn do_update_flat_vector_index(&mut self, batch: &RecordBatch) -> bool { + let Some(creator) = self.vector_indexer.as_mut() else { + return true; + }; + + let Err(err) = creator.update_flat(batch).await else { + return true; + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to update vector index with flat format, region_id: {}, file_id: {}, err: {:?}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to update vector index with flat format, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + false + } } diff --git a/src/mito2/src/sst/index/vector_index/creator.rs b/src/mito2/src/sst/index/vector_index/creator.rs new file mode 100644 index 0000000000..bd50283b64 --- /dev/null +++ b/src/mito2/src/sst/index/vector_index/creator.rs @@ -0,0 +1,920 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Vector index creator using pluggable vector index engines. + +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; + +use common_telemetry::warn; +use datatypes::arrow::array::{Array, BinaryArray}; +use datatypes::arrow::record_batch::RecordBatch; +use datatypes::data_type::ConcreteDataType; +use datatypes::prelude::ValueRef; +use index::vector::{VectorDistanceMetric, VectorIndexOptions, distance_metric_to_usearch}; +use puffin::puffin_manager::{PuffinWriter, PutOptions}; +use roaring::RoaringBitmap; +use snafu::{ResultExt, ensure}; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::{ColumnId, FileId, VectorIndexEngine, VectorIndexEngineType}; +use tokio_util::compat::TokioAsyncReadCompatExt; +use usearch::MetricKind; + +use crate::error::{ + BiErrorsSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, Result, VectorIndexBuildSnafu, + VectorIndexFinishSnafu, +}; +use crate::metrics::{INDEX_CREATE_BYTES_TOTAL, INDEX_CREATE_ROWS_TOTAL}; +use crate::read::Batch; +use crate::sst::index::TYPE_VECTOR_INDEX; +use crate::sst::index::intermediate::{ + IntermediateLocation, IntermediateManager, TempFileProvider, +}; +use crate::sst::index::puffin_manager::SstPuffinWriter; +use crate::sst::index::statistics::{ByteCount, RowCount, Statistics}; +use crate::sst::index::vector_index::util::bytes_to_f32_slice; +use crate::sst::index::vector_index::{INDEX_BLOB_TYPE, engine}; + +/// The buffer size for the pipe used to send index data to the puffin blob. +const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192; + +/// Configuration for a single column's vector index. +#[derive(Debug, Clone)] +pub struct VectorIndexConfig { + /// The vector index engine type. + pub engine: VectorIndexEngineType, + /// The dimension of vectors in this column. + pub dim: usize, + /// The distance metric to use (e.g., L2, Cosine, IP) - usearch format. + pub metric: MetricKind, + /// The original distance metric (for serialization). + pub distance_metric: VectorDistanceMetric, + /// HNSW connectivity parameter (M in the paper). + /// Higher values give better recall but use more memory. + pub connectivity: usize, + /// Expansion factor during index construction (ef_construction). + pub expansion_add: usize, + /// Expansion factor during search (ef_search). + pub expansion_search: usize, +} + +impl VectorIndexConfig { + /// Creates a new vector index config from VectorIndexOptions. + pub fn new(dim: usize, options: &VectorIndexOptions) -> Self { + Self { + engine: options.engine, + dim, + metric: distance_metric_to_usearch(options.metric), + distance_metric: options.metric, + connectivity: options.connectivity as usize, + expansion_add: options.expansion_add as usize, + expansion_search: options.expansion_search as usize, + } + } +} + +/// Creator for a single column's vector index. +struct VectorIndexCreator { + /// The vector index engine (e.g., USearch HNSW). + engine: Box, + /// Configuration for this index. + config: VectorIndexConfig, + /// Bitmap tracking which row offsets have NULL vectors. + /// HNSW keys are sequential (0, 1, 2...) but row offsets may have gaps due to NULLs. + null_bitmap: RoaringBitmap, + /// Current row offset (including NULLs). + current_row_offset: u64, + /// Next HNSW key to assign (only for non-NULL vectors). + next_hnsw_key: u64, + /// Memory usage estimation. + memory_usage: usize, +} + +impl VectorIndexCreator { + /// Creates a new vector index creator. + fn new(config: VectorIndexConfig) -> Result { + let engine_instance = engine::create_engine(config.engine, &config)?; + + Ok(Self { + engine: engine_instance, + config, + null_bitmap: RoaringBitmap::new(), + current_row_offset: 0, + next_hnsw_key: 0, + memory_usage: 0, + }) + } + + /// Reserves capacity for the expected number of vectors. + #[allow(dead_code)] + fn reserve(&mut self, capacity: usize) -> Result<()> { + self.engine.reserve(capacity).map_err(|e| { + VectorIndexBuildSnafu { + reason: format!("Failed to reserve capacity: {}", e), + } + .build() + }) + } + + /// Adds a vector to the index. + /// Returns the HNSW key assigned to this vector. + fn add_vector(&mut self, vector: &[f32]) -> Result { + let key = self.next_hnsw_key; + self.engine.add(key, vector).map_err(|e| { + VectorIndexBuildSnafu { + reason: e.to_string(), + } + .build() + })?; + self.next_hnsw_key += 1; + self.current_row_offset += 1; + self.memory_usage = self.engine.memory_usage(); + Ok(key) + } + + /// Records a NULL vector at the current row offset. + fn add_null(&mut self) { + self.null_bitmap.insert(self.current_row_offset as u32); + self.current_row_offset += 1; + } + + /// Records multiple NULL vectors starting at the current row offset. + fn add_nulls(&mut self, n: usize) { + let start = self.current_row_offset as u32; + let end = start + n as u32; + self.null_bitmap.insert_range(start..end); + self.current_row_offset += n as u64; + } + + /// Returns the serialized size of the index. + fn serialized_length(&self) -> usize { + self.engine.serialized_length() + } + + /// Serializes the index to a buffer. + fn save_to_buffer(&self, buffer: &mut [u8]) -> Result<()> { + self.engine.save_to_buffer(buffer).map_err(|e| { + VectorIndexFinishSnafu { + reason: format!("Failed to serialize index: {}", e), + } + .build() + }) + } + + /// Returns the memory usage of this creator. + fn memory_usage(&self) -> usize { + self.memory_usage + self.null_bitmap.serialized_size() + } + + /// Returns the number of vectors in the index (excluding NULLs). + fn size(&self) -> usize { + self.engine.size() + } + + /// Returns the engine type. + fn engine_type(&self) -> VectorIndexEngineType { + self.config.engine + } + + /// Returns the distance metric. + fn metric(&self) -> VectorDistanceMetric { + self.config.distance_metric + } +} + +/// The indexer for vector indexes across multiple columns. +pub struct VectorIndexer { + /// Per-column vector index creators. + creators: HashMap, + /// Provider for intermediate files. + temp_file_provider: Arc, + /// Whether the indexing process has been aborted. + aborted: bool, + /// Statistics for this indexer. + stats: Statistics, + /// Global memory usage tracker. + #[allow(dead_code)] + global_memory_usage: Arc, + /// Region metadata for column lookups. + #[allow(dead_code)] + metadata: RegionMetadataRef, + /// Memory usage threshold. + memory_usage_threshold: Option, +} + +impl VectorIndexer { + /// Creates a new vector indexer. + /// + /// Returns `None` if there are no vector columns that need indexing. + pub fn new( + sst_file_id: FileId, + metadata: &RegionMetadataRef, + intermediate_manager: IntermediateManager, + memory_usage_threshold: Option, + vector_index_options: &HashMap, + ) -> Result> { + let mut creators = HashMap::new(); + + let temp_file_provider = Arc::new(TempFileProvider::new( + IntermediateLocation::new(&metadata.region_id, &sst_file_id), + intermediate_manager, + )); + let global_memory_usage = Arc::new(AtomicUsize::new(0)); + + // Find all vector columns that have vector index enabled + for column in &metadata.column_metadatas { + // Check if this column has vector index options configured + let Some(options) = vector_index_options.get(&column.column_id) else { + continue; + }; + + // Verify the column is a vector type + let ConcreteDataType::Vector(vector_type) = &column.column_schema.data_type else { + continue; + }; + + let config = VectorIndexConfig::new(vector_type.dim as usize, options); + let creator = VectorIndexCreator::new(config)?; + creators.insert(column.column_id, creator); + } + + if creators.is_empty() { + return Ok(None); + } + + let indexer = Self { + creators, + temp_file_provider, + aborted: false, + stats: Statistics::new(TYPE_VECTOR_INDEX), + global_memory_usage, + metadata: metadata.clone(), + memory_usage_threshold, + }; + + Ok(Some(indexer)) + } + + /// Updates index with a batch of rows. + /// Garbage will be cleaned up if failed to update. + pub async fn update(&mut self, batch: &mut Batch) -> Result<()> { + ensure!(!self.aborted, OperateAbortedIndexSnafu); + + if self.creators.is_empty() { + return Ok(()); + } + + if let Err(update_err) = self.do_update(batch).await { + // Clean up garbage if failed to update + if let Err(err) = self.do_cleanup().await { + if cfg!(any(test, feature = "test")) { + panic!("Failed to clean up vector index creator, err: {err:?}"); + } else { + warn!(err; "Failed to clean up vector index creator"); + } + } + return Err(update_err); + } + + Ok(()) + } + + /// Updates index with a flat format `RecordBatch`. + /// Garbage will be cleaned up if failed to update. + pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> { + ensure!(!self.aborted, OperateAbortedIndexSnafu); + + if self.creators.is_empty() || batch.num_rows() == 0 { + return Ok(()); + } + + if let Err(update_err) = self.do_update_flat(batch).await { + // Clean up garbage if failed to update + if let Err(err) = self.do_cleanup().await { + if cfg!(any(test, feature = "test")) { + panic!("Failed to clean up vector index creator, err: {err:?}"); + } else { + warn!(err; "Failed to clean up vector index creator"); + } + } + return Err(update_err); + } + + Ok(()) + } + + /// Internal update implementation. + async fn do_update(&mut self, batch: &mut Batch) -> Result<()> { + let mut guard = self.stats.record_update(); + let n = batch.num_rows(); + guard.inc_row_count(n); + + for (col_id, creator) in &mut self.creators { + let Some(values) = batch.field_col_value(*col_id) else { + continue; + }; + + // Process each row in the batch + for i in 0..n { + let value = values.data.get_ref(i); + if value.is_null() { + creator.add_null(); + } else { + // Extract the vector bytes and convert to f32 slice + if let ValueRef::Binary(bytes) = value { + let floats = bytes_to_f32_slice(bytes); + if floats.len() != creator.config.dim { + return VectorIndexBuildSnafu { + reason: format!( + "Vector dimension mismatch: expected {}, got {}", + creator.config.dim, + floats.len() + ), + } + .fail(); + } + creator.add_vector(&floats)?; + } else { + creator.add_null(); + } + } + } + + // Check memory limit - abort index creation if exceeded + if let Some(threshold) = self.memory_usage_threshold { + let current_usage = creator.memory_usage(); + if current_usage > threshold { + warn!( + "Vector index memory usage {} exceeds threshold {}, aborting index creation, region_id: {}", + current_usage, threshold, self.metadata.region_id + ); + return VectorIndexBuildSnafu { + reason: format!( + "Memory usage {} exceeds threshold {}", + current_usage, threshold + ), + } + .fail(); + } + } + } + + Ok(()) + } + + /// Internal flat update implementation. + async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> { + let mut guard = self.stats.record_update(); + let n = batch.num_rows(); + guard.inc_row_count(n); + + for (col_id, creator) in &mut self.creators { + // This should never happen: creator exists but column not in metadata + let column_meta = self.metadata.column_by_id(*col_id).ok_or_else(|| { + VectorIndexBuildSnafu { + reason: format!( + "Column {} not found in region metadata, this is a bug", + col_id + ), + } + .build() + })?; + + let column_name = &column_meta.column_schema.name; + // Column not in batch is normal for flat format - treat as NULLs + let Some(column_array) = batch.column_by_name(column_name) else { + creator.add_nulls(n); + continue; + }; + + // Vector type must be stored as binary array + let binary_array = column_array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + VectorIndexBuildSnafu { + reason: format!( + "Column {} is not a binary array, got {:?}", + column_name, + column_array.data_type() + ), + } + .build() + })?; + + for i in 0..n { + if !binary_array.is_valid(i) { + creator.add_null(); + } else { + let bytes = binary_array.value(i); + let floats = bytes_to_f32_slice(bytes); + if floats.len() != creator.config.dim { + return VectorIndexBuildSnafu { + reason: format!( + "Vector dimension mismatch: expected {}, got {}", + creator.config.dim, + floats.len() + ), + } + .fail(); + } + creator.add_vector(&floats)?; + } + } + + if let Some(threshold) = self.memory_usage_threshold { + let current_usage = creator.memory_usage(); + if current_usage > threshold { + warn!( + "Vector index memory usage {} exceeds threshold {}, aborting index creation, region_id: {}", + current_usage, threshold, self.metadata.region_id + ); + return VectorIndexBuildSnafu { + reason: format!( + "Memory usage {} exceeds threshold {}", + current_usage, threshold + ), + } + .fail(); + } + } + } + + Ok(()) + } + + /// Finishes index creation and writes to puffin. + /// Returns the number of rows and bytes written. + pub async fn finish( + &mut self, + puffin_writer: &mut SstPuffinWriter, + ) -> Result<(RowCount, ByteCount)> { + ensure!(!self.aborted, OperateAbortedIndexSnafu); + + if self.stats.row_count() == 0 { + // No IO is performed, no garbage to clean up + return Ok((0, 0)); + } + + let finish_res = self.do_finish(puffin_writer).await; + // Clean up garbage no matter finish successfully or not + if let Err(err) = self.do_cleanup().await { + if cfg!(any(test, feature = "test")) { + panic!("Failed to clean up vector index creator, err: {err:?}"); + } else { + warn!(err; "Failed to clean up vector index creator"); + } + } + + // Report metrics on successful finish + if finish_res.is_ok() { + INDEX_CREATE_ROWS_TOTAL + .with_label_values(&[TYPE_VECTOR_INDEX]) + .inc_by(self.stats.row_count() as u64); + INDEX_CREATE_BYTES_TOTAL + .with_label_values(&[TYPE_VECTOR_INDEX]) + .inc_by(self.stats.byte_count()); + } + + finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count())) + } + + /// Internal finish implementation. + async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> { + let mut guard = self.stats.record_finish(); + + for (id, creator) in &mut self.creators { + if creator.size() == 0 { + // No vectors to index + continue; + } + + let written_bytes = Self::do_finish_single_creator(*id, creator, puffin_writer).await?; + guard.inc_byte_count(written_bytes); + } + + Ok(()) + } + + /// Finishes a single column's vector index. + /// + /// The blob format v1 (header = 33 bytes): + /// ```text + /// +------------------+ + /// | Version | 1 byte (u8, = 1) + /// +------------------+ + /// | Engine type | 1 byte (u8, engine identifier) + /// +------------------+ + /// | Dimension | 4 bytes (u32, little-endian) + /// +------------------+ + /// | Metric | 1 byte (u8, distance metric) + /// +------------------+ + /// | Connectivity | 2 bytes (u16, little-endian, HNSW M parameter) + /// +------------------+ + /// | Expansion add | 2 bytes (u16, little-endian, ef_construction) + /// +------------------+ + /// | Expansion search | 2 bytes (u16, little-endian, ef_search) + /// +------------------+ + /// | Total rows | 8 bytes (u64, little-endian, total rows in SST) + /// +------------------+ + /// | Indexed rows | 8 bytes (u64, little-endian, non-NULL rows indexed) + /// +------------------+ + /// | NULL bitmap len | 4 bytes (u32, little-endian) + /// +------------------+ + /// | NULL bitmap | variable length (serialized RoaringBitmap) + /// +------------------+ + /// | Vector index | variable length (engine-specific serialized format) + /// +------------------+ + /// ``` + async fn do_finish_single_creator( + col_id: ColumnId, + creator: &mut VectorIndexCreator, + puffin_writer: &mut SstPuffinWriter, + ) -> Result { + // Serialize the NULL bitmap + let mut null_bitmap_bytes = Vec::new(); + creator + .null_bitmap + .serialize_into(&mut null_bitmap_bytes) + .map_err(|e| { + VectorIndexFinishSnafu { + reason: format!("Failed to serialize NULL bitmap: {}", e), + } + .build() + })?; + + // Serialize the vector index + let index_size = creator.serialized_length(); + let mut index_bytes = vec![0u8; index_size]; + creator.save_to_buffer(&mut index_bytes)?; + + // Header size: version(1) + engine(1) + dim(4) + metric(1) + + // connectivity(2) + expansion_add(2) + expansion_search(2) + + // total_rows(8) + indexed_rows(8) + bitmap_len(4) = 33 bytes + /// Size of the vector index blob header in bytes. + /// Header format: version(1) + engine(1) + dim(4) + metric(1) + + /// connectivity(2) + expansion_add(2) + expansion_search(2) + + /// total_rows(8) + indexed_rows(8) + bitmap_len(4) = 33 bytes + const VECTOR_INDEX_BLOB_HEADER_SIZE: usize = 33; + let total_size = + VECTOR_INDEX_BLOB_HEADER_SIZE + null_bitmap_bytes.len() + index_bytes.len(); + let mut blob_data = Vec::with_capacity(total_size); + + // Write version (1 byte) + blob_data.push(1u8); + // Write engine type (1 byte) + blob_data.push(creator.engine_type().as_u8()); + // Write dimension (4 bytes, little-endian) + blob_data.extend_from_slice(&(creator.config.dim as u32).to_le_bytes()); + // Write metric (1 byte) + blob_data.push(creator.metric().as_u8()); + // Write connectivity/M (2 bytes, little-endian) + blob_data.extend_from_slice(&(creator.config.connectivity as u16).to_le_bytes()); + // Write expansion_add/ef_construction (2 bytes, little-endian) + blob_data.extend_from_slice(&(creator.config.expansion_add as u16).to_le_bytes()); + // Write expansion_search/ef_search (2 bytes, little-endian) + blob_data.extend_from_slice(&(creator.config.expansion_search as u16).to_le_bytes()); + // Write total_rows (8 bytes, little-endian) + blob_data.extend_from_slice(&creator.current_row_offset.to_le_bytes()); + // Write indexed_rows (8 bytes, little-endian) + blob_data.extend_from_slice(&creator.next_hnsw_key.to_le_bytes()); + // Write NULL bitmap length (4 bytes, little-endian) + let bitmap_len: u32 = null_bitmap_bytes.len().try_into().map_err(|_| { + VectorIndexBuildSnafu { + reason: format!( + "NULL bitmap size {} exceeds maximum allowed size {}", + null_bitmap_bytes.len(), + u32::MAX + ), + } + .build() + })?; + blob_data.extend_from_slice(&bitmap_len.to_le_bytes()); + // Write NULL bitmap + blob_data.extend_from_slice(&null_bitmap_bytes); + // Write vector index + blob_data.extend_from_slice(&index_bytes); + + // Create blob name following the same pattern as bloom filter + let blob_name = format!("{}-{}", INDEX_BLOB_TYPE, col_id); + + // Write to puffin using a pipe + let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB); + + // Writer task writes the blob data to the pipe + let write_index = async move { + use tokio::io::AsyncWriteExt; + let mut writer = tx; + writer.write_all(&blob_data).await?; + writer.shutdown().await?; + Ok::<(), std::io::Error>(()) + }; + + let (index_write_result, puffin_add_blob) = futures::join!( + write_index, + puffin_writer.put_blob( + &blob_name, + rx.compat(), + PutOptions::default(), + Default::default() + ) + ); + + match ( + puffin_add_blob.context(PuffinAddBlobSnafu), + index_write_result.map_err(|e| { + VectorIndexFinishSnafu { + reason: format!("Failed to write blob data: {}", e), + } + .build() + }), + ) { + (Err(e1), Err(e2)) => BiErrorsSnafu { + first: Box::new(e1), + second: Box::new(e2), + } + .fail()?, + + (Ok(_), e @ Err(_)) => e?, + (e @ Err(_), Ok(_)) => e.map(|_| ())?, + (Ok(written_bytes), Ok(_)) => { + return Ok(written_bytes); + } + } + + Ok(0) + } + + /// Aborts index creation and cleans up garbage. + pub async fn abort(&mut self) -> Result<()> { + if self.aborted { + return Ok(()); + } + self.aborted = true; + self.do_cleanup().await + } + + /// Cleans up temporary files. + async fn do_cleanup(&mut self) -> Result<()> { + let mut _guard = self.stats.record_cleanup(); + self.creators.clear(); + self.temp_file_provider.cleanup().await + } + + /// Returns the memory usage of the indexer. + pub fn memory_usage(&self) -> usize { + self.creators.values().map(|c| c.memory_usage()).sum() + } + + /// Returns the column IDs being indexed. + pub fn column_ids(&self) -> impl Iterator + '_ { + self.creators.keys().copied() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_vector_index_creator() { + let options = VectorIndexOptions::default(); + let config = VectorIndexConfig::new(4, &options); + let mut creator = VectorIndexCreator::new(config).unwrap(); + + creator.reserve(10).unwrap(); + + // Add some vectors + let v1 = vec![1.0f32, 0.0, 0.0, 0.0]; + let v2 = vec![0.0f32, 1.0, 0.0, 0.0]; + + creator.add_vector(&v1).unwrap(); + creator.add_null(); + creator.add_vector(&v2).unwrap(); + + assert_eq!(creator.size(), 2); // 2 vectors (excluding NULL) + assert_eq!(creator.current_row_offset, 3); // 3 rows total + assert!(creator.null_bitmap.contains(1)); // Row 1 is NULL + } + + #[test] + fn test_vector_index_creator_serialization() { + let options = VectorIndexOptions::default(); + let config = VectorIndexConfig::new(4, &options); + let mut creator = VectorIndexCreator::new(config).unwrap(); + + creator.reserve(10).unwrap(); + + // Add vectors + let vectors = vec![ + vec![1.0f32, 0.0, 0.0, 0.0], + vec![0.0f32, 1.0, 0.0, 0.0], + vec![0.0f32, 0.0, 1.0, 0.0], + ]; + + for v in &vectors { + creator.add_vector(v).unwrap(); + } + + // Test serialization + let size = creator.serialized_length(); + assert!(size > 0); + + let mut buffer = vec![0u8; size]; + creator.save_to_buffer(&mut buffer).unwrap(); + + // Verify buffer is not empty and starts with some data + assert!(!buffer.iter().all(|&b| b == 0)); + } + + #[test] + fn test_vector_index_creator_null_bitmap_serialization() { + let options = VectorIndexOptions::default(); + let config = VectorIndexConfig::new(4, &options); + let mut creator = VectorIndexCreator::new(config).unwrap(); + + creator.reserve(10).unwrap(); + + // Add pattern: vector, null, vector, null, null, vector + creator.add_vector(&[1.0, 0.0, 0.0, 0.0]).unwrap(); + creator.add_null(); + creator.add_vector(&[0.0, 1.0, 0.0, 0.0]).unwrap(); + creator.add_nulls(2); + creator.add_vector(&[0.0, 0.0, 1.0, 0.0]).unwrap(); + + assert_eq!(creator.size(), 3); // 3 vectors + assert_eq!(creator.current_row_offset, 6); // 6 rows total + assert!(!creator.null_bitmap.contains(0)); + assert!(creator.null_bitmap.contains(1)); + assert!(!creator.null_bitmap.contains(2)); + assert!(creator.null_bitmap.contains(3)); + assert!(creator.null_bitmap.contains(4)); + assert!(!creator.null_bitmap.contains(5)); + + // Test NULL bitmap serialization + let mut bitmap_bytes = Vec::new(); + creator + .null_bitmap + .serialize_into(&mut bitmap_bytes) + .unwrap(); + + // Deserialize and verify + let restored = RoaringBitmap::deserialize_from(&bitmap_bytes[..]).unwrap(); + assert_eq!(restored.len(), 3); // 3 NULLs + assert!(restored.contains(1)); + assert!(restored.contains(3)); + assert!(restored.contains(4)); + } + + #[test] + fn test_vector_index_config() { + use index::vector::VectorDistanceMetric; + + let options = VectorIndexOptions { + engine: VectorIndexEngineType::default(), + metric: VectorDistanceMetric::Cosine, + connectivity: 32, + expansion_add: 256, + expansion_search: 128, + }; + let config = VectorIndexConfig::new(128, &options); + assert_eq!(config.engine, VectorIndexEngineType::Usearch); + assert_eq!(config.dim, 128); + assert_eq!(config.metric, MetricKind::Cos); + assert_eq!(config.connectivity, 32); + assert_eq!(config.expansion_add, 256); + assert_eq!(config.expansion_search, 128); + } + + #[test] + fn test_vector_index_header_format() { + use index::vector::VectorDistanceMetric; + + // Create config with specific HNSW parameters + let options = VectorIndexOptions { + engine: VectorIndexEngineType::Usearch, + metric: VectorDistanceMetric::L2sq, + connectivity: 24, + expansion_add: 200, + expansion_search: 100, + }; + let config = VectorIndexConfig::new(4, &options); + let mut creator = VectorIndexCreator::new(config).unwrap(); + + creator.reserve(10).unwrap(); + + // Add pattern: vector, null, vector, null, vector + creator.add_vector(&[1.0, 0.0, 0.0, 0.0]).unwrap(); + creator.add_null(); + creator.add_vector(&[0.0, 1.0, 0.0, 0.0]).unwrap(); + creator.add_null(); + creator.add_vector(&[0.0, 0.0, 1.0, 0.0]).unwrap(); + + // Verify counts + assert_eq!(creator.current_row_offset, 5); // total_rows + assert_eq!(creator.next_hnsw_key, 3); // indexed_rows + + // Build blob data manually (simulating write_to_puffin header writing) + let mut null_bitmap_bytes = Vec::new(); + creator + .null_bitmap + .serialize_into(&mut null_bitmap_bytes) + .unwrap(); + + let index_size = creator.serialized_length(); + let mut index_bytes = vec![0u8; index_size]; + creator.save_to_buffer(&mut index_bytes).unwrap(); + + // Header: 33 bytes + let header_size = 33; + let total_size = header_size + null_bitmap_bytes.len() + index_bytes.len(); + let mut blob_data = Vec::with_capacity(total_size); + + // Write header fields + blob_data.push(1u8); // version + blob_data.push(creator.engine_type().as_u8()); // engine type + blob_data.extend_from_slice(&(creator.config.dim as u32).to_le_bytes()); // dimension + blob_data.push(creator.metric().as_u8()); // metric + blob_data.extend_from_slice(&(creator.config.connectivity as u16).to_le_bytes()); + blob_data.extend_from_slice(&(creator.config.expansion_add as u16).to_le_bytes()); + blob_data.extend_from_slice(&(creator.config.expansion_search as u16).to_le_bytes()); + blob_data.extend_from_slice(&creator.current_row_offset.to_le_bytes()); // total_rows + blob_data.extend_from_slice(&creator.next_hnsw_key.to_le_bytes()); // indexed_rows + let bitmap_len: u32 = null_bitmap_bytes.len().try_into().unwrap(); + blob_data.extend_from_slice(&bitmap_len.to_le_bytes()); + blob_data.extend_from_slice(&null_bitmap_bytes); + blob_data.extend_from_slice(&index_bytes); + + // Verify header size + assert_eq!(blob_data.len(), total_size); + + // Parse header and verify values + assert_eq!(blob_data[0], 1); // version + assert_eq!(blob_data[1], VectorIndexEngineType::Usearch.as_u8()); // engine + + let dim = u32::from_le_bytes([blob_data[2], blob_data[3], blob_data[4], blob_data[5]]); + assert_eq!(dim, 4); + + let metric = blob_data[6]; + assert_eq!( + metric, + datatypes::schema::VectorDistanceMetric::L2sq.as_u8() + ); + + let connectivity = u16::from_le_bytes([blob_data[7], blob_data[8]]); + assert_eq!(connectivity, 24); + + let expansion_add = u16::from_le_bytes([blob_data[9], blob_data[10]]); + assert_eq!(expansion_add, 200); + + let expansion_search = u16::from_le_bytes([blob_data[11], blob_data[12]]); + assert_eq!(expansion_search, 100); + + let total_rows = u64::from_le_bytes([ + blob_data[13], + blob_data[14], + blob_data[15], + blob_data[16], + blob_data[17], + blob_data[18], + blob_data[19], + blob_data[20], + ]); + assert_eq!(total_rows, 5); + + let indexed_rows = u64::from_le_bytes([ + blob_data[21], + blob_data[22], + blob_data[23], + blob_data[24], + blob_data[25], + blob_data[26], + blob_data[27], + blob_data[28], + ]); + assert_eq!(indexed_rows, 3); + + let null_bitmap_len = + u32::from_le_bytes([blob_data[29], blob_data[30], blob_data[31], blob_data[32]]); + assert_eq!(null_bitmap_len as usize, null_bitmap_bytes.len()); + + // Verify null bitmap can be deserialized + let null_bitmap_data = &blob_data[header_size..header_size + null_bitmap_len as usize]; + let restored_bitmap = RoaringBitmap::deserialize_from(null_bitmap_data).unwrap(); + assert_eq!(restored_bitmap.len(), 2); // 2 nulls + assert!(restored_bitmap.contains(1)); + assert!(restored_bitmap.contains(3)); + } +} diff --git a/src/mito2/src/sst/index/vector_index/engine/mod.rs b/src/mito2/src/sst/index/vector_index/engine/mod.rs new file mode 100644 index 0000000000..b808418ef5 --- /dev/null +++ b/src/mito2/src/sst/index/vector_index/engine/mod.rs @@ -0,0 +1,45 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Pluggable vector index engine implementations. + +mod usearch_impl; + +use store_api::storage::{VectorIndexEngine, VectorIndexEngineType}; +pub use usearch_impl::UsearchEngine; + +use crate::error::Result; +use crate::sst::index::vector_index::creator::VectorIndexConfig; + +/// Creates a new vector index engine based on the engine type. +pub fn create_engine( + engine_type: VectorIndexEngineType, + config: &VectorIndexConfig, +) -> Result> { + match engine_type { + VectorIndexEngineType::Usearch => Ok(Box::new(UsearchEngine::create(config)?)), + } +} + +/// Loads a vector index engine from serialized data. +#[allow(unused)] +pub fn load_engine( + engine_type: VectorIndexEngineType, + config: &VectorIndexConfig, + data: &[u8], +) -> Result> { + match engine_type { + VectorIndexEngineType::Usearch => Ok(Box::new(UsearchEngine::load(config, data)?)), + } +} diff --git a/src/mito2/src/sst/index/vector_index/engine/usearch_impl.rs b/src/mito2/src/sst/index/vector_index/engine/usearch_impl.rs new file mode 100644 index 0000000000..3810710d16 --- /dev/null +++ b/src/mito2/src/sst/index/vector_index/engine/usearch_impl.rs @@ -0,0 +1,231 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! USearch HNSW implementation of VectorIndexEngine. + +use common_error::ext::BoxedError; +use store_api::storage::{VectorIndexEngine, VectorSearchMatches}; +use usearch::{Index, IndexOptions, ScalarKind}; + +use crate::error::{Result, VectorIndexBuildSnafu}; +use crate::sst::index::vector_index::creator::VectorIndexConfig; + +/// USearch-based vector index engine using HNSW algorithm. +pub struct UsearchEngine { + index: Index, +} + +impl UsearchEngine { + /// Creates a new USearch engine with the given configuration. + pub fn create(config: &VectorIndexConfig) -> Result { + let options = IndexOptions { + dimensions: config.dim, + metric: config.metric, + quantization: ScalarKind::F32, + connectivity: config.connectivity, + expansion_add: config.expansion_add, + expansion_search: config.expansion_search, + multi: false, + }; + + let index = Index::new(&options).map_err(|e| { + VectorIndexBuildSnafu { + reason: format!("Failed to create USearch index: {}", e), + } + .build() + })?; + + Ok(Self { index }) + } + + /// Loads a USearch engine from serialized data. + #[allow(unused)] + pub fn load(config: &VectorIndexConfig, data: &[u8]) -> Result { + let options = IndexOptions { + dimensions: config.dim, + metric: config.metric, + quantization: ScalarKind::F32, + // These will be loaded from serialized data + connectivity: 0, + expansion_add: 0, + expansion_search: 0, + multi: false, + }; + + let index = Index::new(&options).map_err(|e| { + VectorIndexBuildSnafu { + reason: format!("Failed to create USearch index for loading: {}", e), + } + .build() + })?; + + index.load_from_buffer(data).map_err(|e| { + VectorIndexBuildSnafu { + reason: format!("Failed to load USearch index from buffer: {}", e), + } + .build() + })?; + + Ok(Self { index }) + } +} + +impl VectorIndexEngine for UsearchEngine { + fn add(&mut self, key: u64, vector: &[f32]) -> Result<(), BoxedError> { + // Reserve capacity if needed + if self.index.size() >= self.index.capacity() { + let new_capacity = std::cmp::max(1, self.index.capacity() * 2); + self.index.reserve(new_capacity).map_err(|e| { + BoxedError::new( + VectorIndexBuildSnafu { + reason: format!("Failed to reserve capacity: {}", e), + } + .build(), + ) + })?; + } + + self.index.add(key, vector).map_err(|e| { + BoxedError::new( + VectorIndexBuildSnafu { + reason: format!("Failed to add vector: {}", e), + } + .build(), + ) + }) + } + + fn search(&self, query: &[f32], k: usize) -> Result { + let matches = self.index.search(query, k).map_err(|e| { + BoxedError::new( + VectorIndexBuildSnafu { + reason: format!("Failed to search: {}", e), + } + .build(), + ) + })?; + + Ok(VectorSearchMatches { + keys: matches.keys, + distances: matches.distances, + }) + } + + fn serialized_length(&self) -> usize { + self.index.serialized_length() + } + + fn save_to_buffer(&self, buffer: &mut [u8]) -> Result<(), BoxedError> { + self.index.save_to_buffer(buffer).map_err(|e| { + BoxedError::new( + VectorIndexBuildSnafu { + reason: format!("Failed to save to buffer: {}", e), + } + .build(), + ) + }) + } + + fn reserve(&mut self, capacity: usize) -> Result<(), BoxedError> { + self.index.reserve(capacity).map_err(|e| { + BoxedError::new( + VectorIndexBuildSnafu { + reason: format!("Failed to reserve: {}", e), + } + .build(), + ) + }) + } + + fn size(&self) -> usize { + self.index.size() + } + + fn capacity(&self) -> usize { + self.index.capacity() + } + + fn memory_usage(&self) -> usize { + self.index.memory_usage() + } +} + +#[cfg(test)] +mod tests { + use index::vector::VectorDistanceMetric; + use store_api::storage::VectorIndexEngineType; + use usearch::MetricKind; + + use super::*; + + fn test_config() -> VectorIndexConfig { + VectorIndexConfig { + engine: VectorIndexEngineType::Usearch, + dim: 4, + metric: MetricKind::L2sq, + distance_metric: VectorDistanceMetric::L2sq, + connectivity: 16, + expansion_add: 128, + expansion_search: 64, + } + } + + #[test] + fn test_usearch_engine_create() { + let config = test_config(); + let engine = UsearchEngine::create(&config).unwrap(); + assert_eq!(engine.size(), 0); + } + + #[test] + fn test_usearch_engine_add_and_search() { + let config = test_config(); + let mut engine = UsearchEngine::create(&config).unwrap(); + + // Add some vectors + engine.add(0, &[1.0, 0.0, 0.0, 0.0]).unwrap(); + engine.add(1, &[0.0, 1.0, 0.0, 0.0]).unwrap(); + engine.add(2, &[0.0, 0.0, 1.0, 0.0]).unwrap(); + + assert_eq!(engine.size(), 3); + + // Search + let matches = engine.search(&[1.0, 0.0, 0.0, 0.0], 2).unwrap(); + assert_eq!(matches.keys.len(), 2); + // First result should be the exact match (key 0) + assert_eq!(matches.keys[0], 0); + } + + #[test] + fn test_usearch_engine_serialization() { + let config = test_config(); + let mut engine = UsearchEngine::create(&config).unwrap(); + + engine.add(0, &[1.0, 0.0, 0.0, 0.0]).unwrap(); + engine.add(1, &[0.0, 1.0, 0.0, 0.0]).unwrap(); + + // Serialize + let len = engine.serialized_length(); + let mut buffer = vec![0u8; len]; + engine.save_to_buffer(&mut buffer).unwrap(); + + // Load + let loaded = UsearchEngine::load(&config, &buffer).unwrap(); + assert_eq!(loaded.size(), 2); + + // Verify search works on loaded index + let matches = loaded.search(&[1.0, 0.0, 0.0, 0.0], 1).unwrap(); + assert_eq!(matches.keys[0], 0); + } +} diff --git a/src/mito2/src/sst/index/vector_index/mod.rs b/src/mito2/src/sst/index/vector_index/mod.rs new file mode 100644 index 0000000000..764e9a5c7d --- /dev/null +++ b/src/mito2/src/sst/index/vector_index/mod.rs @@ -0,0 +1,22 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Vector index module for HNSW-based approximate nearest neighbor search. + +pub(crate) mod creator; +pub(crate) mod engine; +pub(crate) mod util; + +/// The blob type identifier for vector index in puffin files. +pub(crate) const INDEX_BLOB_TYPE: &str = "greptime-vector-index-v1"; diff --git a/src/mito2/src/sst/index/vector_index/util.rs b/src/mito2/src/sst/index/vector_index/util.rs new file mode 100644 index 0000000000..7610a07e7c --- /dev/null +++ b/src/mito2/src/sst/index/vector_index/util.rs @@ -0,0 +1,108 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utility functions for vector index operations. + +use std::borrow::Cow; + +/// Converts a byte slice (little-endian format) to f32 slice, handling unaligned data gracefully. +/// Returns `Cow::Borrowed` for aligned data on little-endian systems (zero-copy) +/// or `Cow::Owned` for unaligned data or big-endian systems. +/// +/// # Panics +/// +/// Panics if the byte slice length is not a multiple of 4. +pub fn bytes_to_f32_slice(bytes: &[u8]) -> Cow<'_, [f32]> { + assert!( + bytes.len().is_multiple_of(4), + "Vector bytes length {} is not a multiple of 4", + bytes.len() + ); + + if bytes.is_empty() { + return Cow::Borrowed(&[]); + } + + let ptr = bytes.as_ptr(); + // Fast path: zero-copy only when data is aligned AND we're on little-endian system + // (since vector data is stored in little-endian format) + #[cfg(target_endian = "little")] + if (ptr as usize).is_multiple_of(std::mem::align_of::()) { + // Safety: We've verified alignment and length requirements, + // and on little-endian systems the byte representation matches f32 layout + return Cow::Borrowed(unsafe { + std::slice::from_raw_parts(ptr as *const f32, bytes.len() / 4) + }); + } + + // Slow path: data is not aligned or we're on big-endian system + let floats: Vec = bytes + .chunks_exact(4) + .map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]])) + .collect(); + Cow::Owned(floats) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_bytes_to_f32_slice() { + let floats = [1.0f32, 2.0, 3.0, 4.0]; + let bytes: Vec = floats.iter().flat_map(|f| f.to_le_bytes()).collect(); + + let result = bytes_to_f32_slice(&bytes); + assert_eq!(result.len(), 4); + assert_eq!(result[0], 1.0); + assert_eq!(result[1], 2.0); + assert_eq!(result[2], 3.0); + assert_eq!(result[3], 4.0); + } + + #[test] + fn test_bytes_to_f32_slice_unaligned() { + // Create a buffer with an extra byte at the start to force misalignment + let floats = [1.0f32, 2.0, 3.0, 4.0]; + let mut bytes: Vec = vec![0u8]; // padding byte + bytes.extend(floats.iter().flat_map(|f| f.to_le_bytes())); + + // Take a slice starting at offset 1 (unaligned) + let unaligned_bytes = &bytes[1..]; + + // Verify it's actually unaligned + let ptr = unaligned_bytes.as_ptr(); + let is_aligned = (ptr as usize).is_multiple_of(std::mem::align_of::()); + + // The function should work regardless of alignment + let result = bytes_to_f32_slice(unaligned_bytes); + assert_eq!(result.len(), 4); + assert_eq!(result[0], 1.0); + assert_eq!(result[1], 2.0); + assert_eq!(result[2], 3.0); + assert_eq!(result[3], 4.0); + + // If it was unaligned, it should return an owned Vec (Cow::Owned) + if !is_aligned { + assert!(matches!(result, Cow::Owned(_))); + } + } + + #[test] + fn test_bytes_to_f32_slice_empty() { + let bytes: &[u8] = &[]; + let result = bytes_to_f32_slice(bytes); + assert!(result.is_empty()); + } +} diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 41e3113d55..ce8bd63c0a 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -742,6 +742,8 @@ mod tests { inverted_index_config: Default::default(), fulltext_index_config: Default::default(), bloom_filter_index_config: Default::default(), + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), }; let mut metrics = Metrics::new(WriteType::Flush); @@ -1152,6 +1154,8 @@ mod tests { inverted_index_config: Default::default(), fulltext_index_config: Default::default(), bloom_filter_index_config: Default::default(), + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), } } @@ -1271,6 +1275,8 @@ mod tests { inverted_index_config: Default::default(), fulltext_index_config: Default::default(), bloom_filter_index_config: Default::default(), + #[cfg(feature = "vector_index")] + vector_index_config: Default::default(), }; let mut metrics = Metrics::new(WriteType::Flush); diff --git a/src/mito2/src/worker/handle_rebuild_index.rs b/src/mito2/src/worker/handle_rebuild_index.rs index ed2390d853..79601ec52c 100644 --- a/src/mito2/src/worker/handle_rebuild_index.rs +++ b/src/mito2/src/worker/handle_rebuild_index.rs @@ -64,6 +64,8 @@ impl RegionWorkerLoop { inverted_index_config: self.config.inverted_index.clone(), fulltext_index_config: self.config.fulltext_index.clone(), bloom_filter_index_config: self.config.bloom_filter_index.clone(), + #[cfg(feature = "vector_index")] + vector_index_config: self.config.vector_index.clone(), index_options: version.options.index_options.clone(), row_group_size: WriteOptions::default().row_group_size, intermediate_manager, diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 5cb00004ef..d26f650a5a 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -29,7 +29,7 @@ use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datatypes::arrow; use datatypes::arrow::datatypes::FieldRef; -use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef}; +use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef, VectorIndexOptions}; use datatypes::types::TimestampType; use itertools::Itertools; use serde::de::Error; @@ -384,6 +384,22 @@ impl RegionMetadata { inverted_index } + /// Gets the column IDs that have vector indexes along with their options. + /// Returns a map from column ID to the vector index options. + pub fn vector_indexed_column_ids(&self) -> HashMap { + self.column_metadatas + .iter() + .filter_map(|column| { + column + .column_schema + .vector_index_options() + .ok() + .flatten() + .map(|options| (column.column_id, options)) + }) + .collect() + } + /// Checks whether the metadata is valid. fn validate(&self) -> Result<()> { // Id to name. diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index bf3a9e8a96..d70fc324a3 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1546,6 +1546,12 @@ create_on_compaction = "auto" apply_on_query = "auto" mem_threshold_on_create = "auto" +[region_engine.mito.vector_index] +create_on_flush = "auto" +create_on_compaction = "auto" +apply_on_query = "auto" +mem_threshold_on_create = "auto" + [region_engine.mito.memtable] type = "time_series"