mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
feat: impl vector index building (#7468)
* feat: impl vector index building Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * feat: supports flat format Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * ci: add vector_index feature to test Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * chore: apply suggestions Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * chore: apply suggestions from copilot Signed-off-by: Dennis Zhuang <killme2008@gmail.com> --------- Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
This commit is contained in:
4
.github/workflows/develop.yml
vendored
4
.github/workflows/develop.yml
vendored
@@ -755,7 +755,7 @@ jobs:
|
||||
run: ../../.github/scripts/pull-test-deps-images.sh && docker compose up -d --wait
|
||||
|
||||
- name: Run nextest cases
|
||||
run: cargo nextest run --workspace -F dashboard -F pg_kvbackend -F mysql_kvbackend
|
||||
run: cargo nextest run --workspace -F dashboard -F pg_kvbackend -F mysql_kvbackend -F vector_index
|
||||
env:
|
||||
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
|
||||
RUST_BACKTRACE: 1
|
||||
@@ -813,7 +813,7 @@ jobs:
|
||||
run: ../../.github/scripts/pull-test-deps-images.sh && docker compose up -d --wait
|
||||
|
||||
- name: Run nextest cases
|
||||
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend -F mysql_kvbackend
|
||||
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend -F mysql_kvbackend -F vector_index
|
||||
env:
|
||||
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
|
||||
RUST_BACKTRACE: 1
|
||||
|
||||
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -7779,7 +7779,6 @@ dependencies = [
|
||||
"either",
|
||||
"futures",
|
||||
"greptime-proto",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"index",
|
||||
"itertools 0.14.0",
|
||||
@@ -7798,6 +7797,7 @@ dependencies = [
|
||||
"rand 0.9.1",
|
||||
"rayon",
|
||||
"regex",
|
||||
"roaring",
|
||||
"rskafka",
|
||||
"rstest",
|
||||
"rstest_reuse",
|
||||
@@ -7816,6 +7816,7 @@ dependencies = [
|
||||
"tokio-util",
|
||||
"toml 0.8.23",
|
||||
"tracing",
|
||||
"usearch",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ default = [
|
||||
]
|
||||
enterprise = ["common-meta/enterprise", "frontend/enterprise", "meta-srv/enterprise"]
|
||||
tokio-console = ["common-telemetry/tokio-console"]
|
||||
vector_index = ["mito2/vector_index"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -233,6 +233,8 @@ impl ObjbenchCommand {
|
||||
inverted_index_config: MitoConfig::default().inverted_index,
|
||||
fulltext_index_config,
|
||||
bloom_filter_index_config: MitoConfig::default().bloom_filter_index,
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
};
|
||||
|
||||
// Write SST
|
||||
|
||||
@@ -8,6 +8,7 @@ license.workspace = true
|
||||
default = []
|
||||
test = ["common-test-util", "rstest", "rstest_reuse", "rskafka"]
|
||||
enterprise = []
|
||||
vector_index = ["dep:usearch", "dep:roaring", "index/vector_index"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
@@ -28,9 +29,10 @@ common-datasource.workspace = true
|
||||
common-decimal.workspace = true
|
||||
common-error.workspace = true
|
||||
common-grpc.workspace = true
|
||||
common-function.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-meta.workspace = true
|
||||
common-memory-manager.workspace = true
|
||||
common-meta.workspace = true
|
||||
common-query.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-runtime.workspace = true
|
||||
@@ -49,7 +51,6 @@ dotenv.workspace = true
|
||||
either.workspace = true
|
||||
futures.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
humantime.workspace = true
|
||||
index.workspace = true
|
||||
itertools.workspace = true
|
||||
greptime-proto.workspace = true
|
||||
@@ -67,6 +68,7 @@ partition.workspace = true
|
||||
puffin.workspace = true
|
||||
rand.workspace = true
|
||||
rayon = "1.10"
|
||||
roaring = { version = "0.10", optional = true }
|
||||
regex.workspace = true
|
||||
rskafka = { workspace = true, optional = true }
|
||||
rstest = { workspace = true, optional = true }
|
||||
@@ -84,6 +86,7 @@ tokio.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tracing.workspace = true
|
||||
usearch = { version = "2.21", default-features = false, features = ["fp16lib"], optional = true }
|
||||
uuid.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -313,6 +313,8 @@ impl AccessLayer {
|
||||
inverted_index_config: request.inverted_index_config,
|
||||
fulltext_index_config: request.fulltext_index_config,
|
||||
bloom_filter_index_config: request.bloom_filter_index_config,
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: request.vector_index_config,
|
||||
};
|
||||
// We disable write cache on file system but we still use atomic write.
|
||||
// TODO(yingwen): If we support other non-fs stores without the write cache, then
|
||||
@@ -467,6 +469,8 @@ pub struct SstWriteRequest {
|
||||
pub inverted_index_config: InvertedIndexConfig,
|
||||
pub fulltext_index_config: FulltextIndexConfig,
|
||||
pub bloom_filter_index_config: BloomFilterConfig,
|
||||
#[cfg(feature = "vector_index")]
|
||||
pub vector_index_config: crate::config::VectorIndexConfig,
|
||||
}
|
||||
|
||||
/// Cleaner to remove temp files on the atomic write dir.
|
||||
|
||||
8
src/mito2/src/cache/write_cache.rs
vendored
8
src/mito2/src/cache/write_cache.rs
vendored
@@ -227,6 +227,8 @@ impl WriteCache {
|
||||
inverted_index_config: write_request.inverted_index_config,
|
||||
fulltext_index_config: write_request.fulltext_index_config,
|
||||
bloom_filter_index_config: write_request.bloom_filter_index_config,
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: write_request.vector_index_config,
|
||||
};
|
||||
|
||||
let cleaner = TempFileCleaner::new(region_id, store.clone());
|
||||
@@ -520,6 +522,8 @@ mod tests {
|
||||
inverted_index_config: Default::default(),
|
||||
fulltext_index_config: Default::default(),
|
||||
bloom_filter_index_config: Default::default(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
};
|
||||
|
||||
let upload_request = SstUploadRequest {
|
||||
@@ -620,6 +624,8 @@ mod tests {
|
||||
inverted_index_config: Default::default(),
|
||||
fulltext_index_config: Default::default(),
|
||||
bloom_filter_index_config: Default::default(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
};
|
||||
let write_opts = WriteOptions {
|
||||
row_group_size: 512,
|
||||
@@ -701,6 +707,8 @@ mod tests {
|
||||
inverted_index_config: Default::default(),
|
||||
fulltext_index_config: Default::default(),
|
||||
bloom_filter_index_config: Default::default(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
};
|
||||
let write_opts = WriteOptions {
|
||||
row_group_size: 512,
|
||||
|
||||
@@ -332,6 +332,8 @@ impl DefaultCompactor {
|
||||
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
|
||||
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
|
||||
let bloom_filter_index_config = compaction_region.engine_config.bloom_filter_index.clone();
|
||||
#[cfg(feature = "vector_index")]
|
||||
let vector_index_config = compaction_region.engine_config.vector_index.clone();
|
||||
|
||||
let input_file_names = output
|
||||
.inputs
|
||||
@@ -378,6 +380,8 @@ impl DefaultCompactor {
|
||||
inverted_index_config,
|
||||
fulltext_index_config,
|
||||
bloom_filter_index_config,
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config,
|
||||
},
|
||||
&write_opts,
|
||||
&mut metrics,
|
||||
|
||||
@@ -158,6 +158,9 @@ pub struct MitoConfig {
|
||||
pub fulltext_index: FulltextIndexConfig,
|
||||
/// Bloom filter index configs.
|
||||
pub bloom_filter_index: BloomFilterConfig,
|
||||
/// Vector index configs (HNSW).
|
||||
#[cfg(feature = "vector_index")]
|
||||
pub vector_index: VectorIndexConfig,
|
||||
|
||||
/// Memtable config
|
||||
pub memtable: MemtableConfig,
|
||||
@@ -214,6 +217,8 @@ impl Default for MitoConfig {
|
||||
inverted_index: InvertedIndexConfig::default(),
|
||||
fulltext_index: FulltextIndexConfig::default(),
|
||||
bloom_filter_index: BloomFilterConfig::default(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index: VectorIndexConfig::default(),
|
||||
memtable: MemtableConfig::default(),
|
||||
min_compaction_interval: Duration::from_secs(0),
|
||||
default_experimental_flat_format: false,
|
||||
@@ -643,6 +648,51 @@ impl BloomFilterConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Configuration options for the vector index (HNSW).
|
||||
#[cfg(feature = "vector_index")]
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct VectorIndexConfig {
|
||||
/// Whether to create the index on flush: automatically or never.
|
||||
pub create_on_flush: Mode,
|
||||
/// Whether to create the index on compaction: automatically or never.
|
||||
pub create_on_compaction: Mode,
|
||||
/// Whether to apply the index on query: automatically or never.
|
||||
pub apply_on_query: Mode,
|
||||
/// Memory threshold for creating the index.
|
||||
pub mem_threshold_on_create: MemoryThreshold,
|
||||
}
|
||||
|
||||
#[cfg(feature = "vector_index")]
|
||||
impl Default for VectorIndexConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
create_on_flush: Mode::Auto,
|
||||
create_on_compaction: Mode::Auto,
|
||||
apply_on_query: Mode::Auto,
|
||||
mem_threshold_on_create: MemoryThreshold::Auto,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "vector_index")]
|
||||
impl VectorIndexConfig {
|
||||
pub fn mem_threshold_on_create(&self) -> Option<usize> {
|
||||
match self.mem_threshold_on_create {
|
||||
MemoryThreshold::Auto => {
|
||||
if let Some(sys_memory) = get_total_memory_readable() {
|
||||
Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
|
||||
} else {
|
||||
Some(ReadableSize::mb(64).as_bytes() as usize)
|
||||
}
|
||||
}
|
||||
MemoryThreshold::Unlimited => None,
|
||||
MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Divide cpu num by a non-zero `divisor` and returns at least 1.
|
||||
fn divide_num_cpus(divisor: usize) -> usize {
|
||||
debug_assert!(divisor > 0);
|
||||
|
||||
@@ -1039,6 +1039,22 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[cfg(feature = "vector_index")]
|
||||
#[snafu(display("Failed to build vector index: {}", reason))]
|
||||
VectorIndexBuild {
|
||||
reason: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[cfg(feature = "vector_index")]
|
||||
#[snafu(display("Failed to finish vector index: {}", reason))]
|
||||
VectorIndexFinish {
|
||||
reason: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Manual compaction is override by following operations."))]
|
||||
ManualCompactionOverride {},
|
||||
|
||||
@@ -1345,6 +1361,9 @@ impl ErrorExt for Error {
|
||||
source.status_code()
|
||||
}
|
||||
|
||||
#[cfg(feature = "vector_index")]
|
||||
VectorIndexBuild { .. } | VectorIndexFinish { .. } => StatusCode::Internal,
|
||||
|
||||
ManualCompactionOverride {} => StatusCode::Cancelled,
|
||||
|
||||
CompactionMemoryExhausted { source, .. } => source.status_code(),
|
||||
|
||||
@@ -669,6 +669,8 @@ impl RegionFlushTask {
|
||||
inverted_index_config: self.engine_config.inverted_index.clone(),
|
||||
fulltext_index_config: self.engine_config.fulltext_index.clone(),
|
||||
bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: self.engine_config.vector_index.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -287,6 +287,9 @@ pub enum IndexType {
|
||||
FulltextIndex,
|
||||
/// Bloom Filter index
|
||||
BloomFilterIndex,
|
||||
/// Vector index (HNSW).
|
||||
#[cfg(feature = "vector_index")]
|
||||
VectorIndex,
|
||||
}
|
||||
|
||||
/// Metadata of indexes created for a specific column in an SST file.
|
||||
|
||||
@@ -20,6 +20,8 @@ pub(crate) mod inverted_index;
|
||||
pub mod puffin_manager;
|
||||
mod statistics;
|
||||
pub(crate) mod store;
|
||||
#[cfg(feature = "vector_index")]
|
||||
pub(crate) mod vector_index;
|
||||
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet};
|
||||
@@ -41,10 +43,14 @@ use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::{ColumnId, FileId, RegionId};
|
||||
use strum::IntoStaticStr;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
#[cfg(feature = "vector_index")]
|
||||
use vector_index::creator::VectorIndexer;
|
||||
|
||||
use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
|
||||
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
|
||||
use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
|
||||
#[cfg(feature = "vector_index")]
|
||||
use crate::config::VectorIndexConfig;
|
||||
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
|
||||
use crate::error::{
|
||||
BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, RegionClosedSnafu,
|
||||
@@ -76,6 +82,8 @@ use crate::worker::WorkerListener;
|
||||
pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
|
||||
pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
|
||||
pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
|
||||
#[cfg(feature = "vector_index")]
|
||||
pub(crate) const TYPE_VECTOR_INDEX: &str = "vector_index";
|
||||
|
||||
/// Triggers background download of an index file to the local cache.
|
||||
pub(crate) fn trigger_index_background_download(
|
||||
@@ -114,6 +122,9 @@ pub struct IndexOutput {
|
||||
pub fulltext_index: FulltextIndexOutput,
|
||||
/// Bloom filter output.
|
||||
pub bloom_filter: BloomFilterOutput,
|
||||
/// Vector index output.
|
||||
#[cfg(feature = "vector_index")]
|
||||
pub vector_index: VectorIndexOutput,
|
||||
}
|
||||
|
||||
impl IndexOutput {
|
||||
@@ -128,6 +139,10 @@ impl IndexOutput {
|
||||
if self.bloom_filter.is_available() {
|
||||
indexes.push(IndexType::BloomFilterIndex);
|
||||
}
|
||||
#[cfg(feature = "vector_index")]
|
||||
if self.vector_index.is_available() {
|
||||
indexes.push(IndexType::VectorIndex);
|
||||
}
|
||||
indexes
|
||||
}
|
||||
|
||||
@@ -151,6 +166,12 @@ impl IndexOutput {
|
||||
.push(IndexType::BloomFilterIndex);
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "vector_index")]
|
||||
if self.vector_index.is_available() {
|
||||
for &col in &self.vector_index.columns {
|
||||
map.entry(col).or_default().push(IndexType::VectorIndex);
|
||||
}
|
||||
}
|
||||
|
||||
map.into_iter()
|
||||
.map(|(column_id, created_indexes)| ColumnIndexMetadata {
|
||||
@@ -184,6 +205,9 @@ pub type InvertedIndexOutput = IndexBaseOutput;
|
||||
pub type FulltextIndexOutput = IndexBaseOutput;
|
||||
/// Output of the bloom filter creation.
|
||||
pub type BloomFilterOutput = IndexBaseOutput;
|
||||
/// Output of the vector index creation.
|
||||
#[cfg(feature = "vector_index")]
|
||||
pub type VectorIndexOutput = IndexBaseOutput;
|
||||
|
||||
/// The index creator that hides the error handling details.
|
||||
#[derive(Default)]
|
||||
@@ -199,6 +223,10 @@ pub struct Indexer {
|
||||
last_mem_fulltext_index: usize,
|
||||
bloom_filter_indexer: Option<BloomFilterIndexer>,
|
||||
last_mem_bloom_filter: usize,
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_indexer: Option<VectorIndexer>,
|
||||
#[cfg(feature = "vector_index")]
|
||||
last_mem_vector_index: usize,
|
||||
intermediate_manager: Option<IntermediateManager>,
|
||||
}
|
||||
|
||||
@@ -259,6 +287,18 @@ impl Indexer {
|
||||
.with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
|
||||
.add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
|
||||
self.last_mem_bloom_filter = bloom_filter_mem;
|
||||
|
||||
#[cfg(feature = "vector_index")]
|
||||
{
|
||||
let vector_mem = self
|
||||
.vector_indexer
|
||||
.as_ref()
|
||||
.map_or(0, |creator| creator.memory_usage());
|
||||
INDEX_CREATE_MEMORY_USAGE
|
||||
.with_label_values(&[TYPE_VECTOR_INDEX])
|
||||
.add(vector_mem as i64 - self.last_mem_vector_index as i64);
|
||||
self.last_mem_vector_index = vector_mem;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -279,6 +319,8 @@ pub(crate) struct IndexerBuilderImpl {
|
||||
pub(crate) inverted_index_config: InvertedIndexConfig,
|
||||
pub(crate) fulltext_index_config: FulltextIndexConfig,
|
||||
pub(crate) bloom_filter_index_config: BloomFilterConfig,
|
||||
#[cfg(feature = "vector_index")]
|
||||
pub(crate) vector_index_config: VectorIndexConfig,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -296,11 +338,23 @@ impl IndexerBuilder for IndexerBuilderImpl {
|
||||
indexer.inverted_indexer = self.build_inverted_indexer(file_id);
|
||||
indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
|
||||
indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
|
||||
indexer.intermediate_manager = Some(self.intermediate_manager.clone());
|
||||
if indexer.inverted_indexer.is_none()
|
||||
&& indexer.fulltext_indexer.is_none()
|
||||
&& indexer.bloom_filter_indexer.is_none()
|
||||
#[cfg(feature = "vector_index")]
|
||||
{
|
||||
indexer.vector_indexer = self.build_vector_indexer(file_id);
|
||||
}
|
||||
indexer.intermediate_manager = Some(self.intermediate_manager.clone());
|
||||
|
||||
#[cfg(feature = "vector_index")]
|
||||
let has_any_indexer = indexer.inverted_indexer.is_some()
|
||||
|| indexer.fulltext_indexer.is_some()
|
||||
|| indexer.bloom_filter_indexer.is_some()
|
||||
|| indexer.vector_indexer.is_some();
|
||||
#[cfg(not(feature = "vector_index"))]
|
||||
let has_any_indexer = indexer.inverted_indexer.is_some()
|
||||
|| indexer.fulltext_indexer.is_some()
|
||||
|| indexer.bloom_filter_indexer.is_some();
|
||||
|
||||
if !has_any_indexer {
|
||||
indexer.abort().await;
|
||||
return Indexer::default();
|
||||
}
|
||||
@@ -476,6 +530,69 @@ impl IndexerBuilderImpl {
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
#[cfg(feature = "vector_index")]
|
||||
fn build_vector_indexer(&self, file_id: FileId) -> Option<VectorIndexer> {
|
||||
let create = match self.build_type {
|
||||
IndexBuildType::Flush => self.vector_index_config.create_on_flush.auto(),
|
||||
IndexBuildType::Compact => self.vector_index_config.create_on_compaction.auto(),
|
||||
_ => true,
|
||||
};
|
||||
|
||||
if !create {
|
||||
debug!(
|
||||
"Skip creating vector index due to config, region_id: {}, file_id: {}",
|
||||
self.metadata.region_id, file_id,
|
||||
);
|
||||
return None;
|
||||
}
|
||||
|
||||
// Get vector index column IDs and options from metadata
|
||||
let vector_index_options = self.metadata.vector_indexed_column_ids();
|
||||
if vector_index_options.is_empty() {
|
||||
debug!(
|
||||
"No vector columns to index, skip creating vector index, region_id: {}, file_id: {}",
|
||||
self.metadata.region_id, file_id,
|
||||
);
|
||||
return None;
|
||||
}
|
||||
|
||||
let mem_limit = self.vector_index_config.mem_threshold_on_create();
|
||||
let indexer = VectorIndexer::new(
|
||||
file_id,
|
||||
&self.metadata,
|
||||
self.intermediate_manager.clone(),
|
||||
mem_limit,
|
||||
&vector_index_options,
|
||||
);
|
||||
|
||||
let err = match indexer {
|
||||
Ok(indexer) => {
|
||||
if indexer.is_none() {
|
||||
debug!(
|
||||
"Skip creating vector index due to no columns require indexing, region_id: {}, file_id: {}",
|
||||
self.metadata.region_id, file_id,
|
||||
);
|
||||
}
|
||||
return indexer;
|
||||
}
|
||||
Err(err) => err,
|
||||
};
|
||||
|
||||
if cfg!(any(test, feature = "test")) {
|
||||
panic!(
|
||||
"Failed to create vector index, region_id: {}, file_id: {}, err: {:?}",
|
||||
self.metadata.region_id, file_id, err
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
err; "Failed to create vector index, region_id: {}, file_id: {}",
|
||||
self.metadata.region_id, file_id,
|
||||
);
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Type of an index build task.
|
||||
@@ -1115,6 +1232,8 @@ mod tests {
|
||||
with_inverted: bool,
|
||||
with_fulltext: bool,
|
||||
with_skipping_bloom: bool,
|
||||
#[cfg(feature = "vector_index")]
|
||||
with_vector: bool,
|
||||
}
|
||||
|
||||
fn mock_region_metadata(
|
||||
@@ -1122,6 +1241,8 @@ mod tests {
|
||||
with_inverted,
|
||||
with_fulltext,
|
||||
with_skipping_bloom,
|
||||
#[cfg(feature = "vector_index")]
|
||||
with_vector,
|
||||
}: MetaConfig,
|
||||
) -> RegionMetadataRef {
|
||||
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
|
||||
@@ -1187,6 +1308,24 @@ mod tests {
|
||||
builder.push_column_metadata(column);
|
||||
}
|
||||
|
||||
#[cfg(feature = "vector_index")]
|
||||
if with_vector {
|
||||
use index::vector::VectorIndexOptions;
|
||||
|
||||
let options = VectorIndexOptions::default();
|
||||
let column_schema =
|
||||
ColumnSchema::new("vec", ConcreteDataType::vector_datatype(4), true)
|
||||
.with_vector_index_options(&options)
|
||||
.unwrap();
|
||||
let column = ColumnMetadata {
|
||||
column_schema,
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id: 6,
|
||||
};
|
||||
|
||||
builder.push_column_metadata(column);
|
||||
}
|
||||
|
||||
Arc::new(builder.build().unwrap())
|
||||
}
|
||||
|
||||
@@ -1237,6 +1376,8 @@ mod tests {
|
||||
inverted_index_config: Default::default(),
|
||||
fulltext_index_config: Default::default(),
|
||||
bloom_filter_index_config: Default::default(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
};
|
||||
let mut metrics = Metrics::new(WriteType::Flush);
|
||||
env.access_layer
|
||||
@@ -1287,6 +1428,8 @@ mod tests {
|
||||
inverted_index_config: InvertedIndexConfig::default(),
|
||||
fulltext_index_config: FulltextIndexConfig::default(),
|
||||
bloom_filter_index_config: BloomFilterConfig::default(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1300,6 +1443,8 @@ mod tests {
|
||||
with_inverted: true,
|
||||
with_fulltext: true,
|
||||
with_skipping_bloom: true,
|
||||
#[cfg(feature = "vector_index")]
|
||||
with_vector: false,
|
||||
});
|
||||
let indexer = IndexerBuilderImpl {
|
||||
build_type: IndexBuildType::Flush,
|
||||
@@ -1312,6 +1457,8 @@ mod tests {
|
||||
inverted_index_config: InvertedIndexConfig::default(),
|
||||
fulltext_index_config: FulltextIndexConfig::default(),
|
||||
bloom_filter_index_config: BloomFilterConfig::default(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
}
|
||||
.build(FileId::random(), 0)
|
||||
.await;
|
||||
@@ -1331,6 +1478,8 @@ mod tests {
|
||||
with_inverted: true,
|
||||
with_fulltext: true,
|
||||
with_skipping_bloom: true,
|
||||
#[cfg(feature = "vector_index")]
|
||||
with_vector: false,
|
||||
});
|
||||
let indexer = IndexerBuilderImpl {
|
||||
build_type: IndexBuildType::Flush,
|
||||
@@ -1346,6 +1495,8 @@ mod tests {
|
||||
},
|
||||
fulltext_index_config: FulltextIndexConfig::default(),
|
||||
bloom_filter_index_config: BloomFilterConfig::default(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
}
|
||||
.build(FileId::random(), 0)
|
||||
.await;
|
||||
@@ -1368,6 +1519,8 @@ mod tests {
|
||||
..Default::default()
|
||||
},
|
||||
bloom_filter_index_config: BloomFilterConfig::default(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
}
|
||||
.build(FileId::random(), 0)
|
||||
.await;
|
||||
@@ -1390,6 +1543,8 @@ mod tests {
|
||||
create_on_compaction: Mode::Disable,
|
||||
..Default::default()
|
||||
},
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
}
|
||||
.build(FileId::random(), 0)
|
||||
.await;
|
||||
@@ -1409,6 +1564,8 @@ mod tests {
|
||||
with_inverted: false,
|
||||
with_fulltext: true,
|
||||
with_skipping_bloom: true,
|
||||
#[cfg(feature = "vector_index")]
|
||||
with_vector: false,
|
||||
});
|
||||
let indexer = IndexerBuilderImpl {
|
||||
build_type: IndexBuildType::Flush,
|
||||
@@ -1421,6 +1578,8 @@ mod tests {
|
||||
inverted_index_config: InvertedIndexConfig::default(),
|
||||
fulltext_index_config: FulltextIndexConfig::default(),
|
||||
bloom_filter_index_config: BloomFilterConfig::default(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
}
|
||||
.build(FileId::random(), 0)
|
||||
.await;
|
||||
@@ -1433,6 +1592,8 @@ mod tests {
|
||||
with_inverted: true,
|
||||
with_fulltext: false,
|
||||
with_skipping_bloom: true,
|
||||
#[cfg(feature = "vector_index")]
|
||||
with_vector: false,
|
||||
});
|
||||
let indexer = IndexerBuilderImpl {
|
||||
build_type: IndexBuildType::Flush,
|
||||
@@ -1445,6 +1606,8 @@ mod tests {
|
||||
inverted_index_config: InvertedIndexConfig::default(),
|
||||
fulltext_index_config: FulltextIndexConfig::default(),
|
||||
bloom_filter_index_config: BloomFilterConfig::default(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
}
|
||||
.build(FileId::random(), 0)
|
||||
.await;
|
||||
@@ -1457,6 +1620,8 @@ mod tests {
|
||||
with_inverted: true,
|
||||
with_fulltext: true,
|
||||
with_skipping_bloom: false,
|
||||
#[cfg(feature = "vector_index")]
|
||||
with_vector: false,
|
||||
});
|
||||
let indexer = IndexerBuilderImpl {
|
||||
build_type: IndexBuildType::Flush,
|
||||
@@ -1469,6 +1634,8 @@ mod tests {
|
||||
inverted_index_config: InvertedIndexConfig::default(),
|
||||
fulltext_index_config: FulltextIndexConfig::default(),
|
||||
bloom_filter_index_config: BloomFilterConfig::default(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
}
|
||||
.build(FileId::random(), 0)
|
||||
.await;
|
||||
@@ -1488,6 +1655,8 @@ mod tests {
|
||||
with_inverted: true,
|
||||
with_fulltext: true,
|
||||
with_skipping_bloom: true,
|
||||
#[cfg(feature = "vector_index")]
|
||||
with_vector: false,
|
||||
});
|
||||
let indexer = IndexerBuilderImpl {
|
||||
build_type: IndexBuildType::Flush,
|
||||
@@ -1500,6 +1669,8 @@ mod tests {
|
||||
inverted_index_config: InvertedIndexConfig::default(),
|
||||
fulltext_index_config: FulltextIndexConfig::default(),
|
||||
bloom_filter_index_config: BloomFilterConfig::default(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
}
|
||||
.build(FileId::random(), 0)
|
||||
.await;
|
||||
@@ -1507,6 +1678,82 @@ mod tests {
|
||||
assert!(indexer.inverted_indexer.is_none());
|
||||
}
|
||||
|
||||
#[cfg(feature = "vector_index")]
|
||||
#[tokio::test]
|
||||
async fn test_update_flat_builds_vector_index() {
|
||||
use datatypes::arrow::array::BinaryBuilder;
|
||||
use datatypes::arrow::datatypes::{DataType, Field, Schema};
|
||||
|
||||
struct TestPathProvider;
|
||||
|
||||
impl FilePathProvider for TestPathProvider {
|
||||
fn build_index_file_path(&self, file_id: RegionFileId) -> String {
|
||||
format!("index/{}.puffin", file_id)
|
||||
}
|
||||
|
||||
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
|
||||
format!("index/{}.puffin", index_id)
|
||||
}
|
||||
|
||||
fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
|
||||
format!("sst/{}.parquet", file_id)
|
||||
}
|
||||
}
|
||||
|
||||
fn f32s_to_bytes(values: &[f32]) -> Vec<u8> {
|
||||
let mut bytes = Vec::with_capacity(values.len() * 4);
|
||||
for v in values {
|
||||
bytes.extend_from_slice(&v.to_le_bytes());
|
||||
}
|
||||
bytes
|
||||
}
|
||||
|
||||
let (dir, factory) =
|
||||
PuffinManagerFactory::new_for_test_async("test_update_flat_builds_vector_index_").await;
|
||||
let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
|
||||
|
||||
let metadata = mock_region_metadata(MetaConfig {
|
||||
with_inverted: false,
|
||||
with_fulltext: false,
|
||||
with_skipping_bloom: false,
|
||||
with_vector: true,
|
||||
});
|
||||
|
||||
let mut indexer = IndexerBuilderImpl {
|
||||
build_type: IndexBuildType::Flush,
|
||||
metadata,
|
||||
row_group_size: 1024,
|
||||
puffin_manager: factory.build(mock_object_store(), TestPathProvider),
|
||||
write_cache_enabled: false,
|
||||
intermediate_manager: intm_manager,
|
||||
index_options: IndexOptions::default(),
|
||||
inverted_index_config: InvertedIndexConfig::default(),
|
||||
fulltext_index_config: FulltextIndexConfig::default(),
|
||||
bloom_filter_index_config: BloomFilterConfig::default(),
|
||||
vector_index_config: Default::default(),
|
||||
}
|
||||
.build(FileId::random(), 0)
|
||||
.await;
|
||||
|
||||
assert!(indexer.vector_indexer.is_some());
|
||||
|
||||
let vec1 = f32s_to_bytes(&[1.0, 0.0, 0.0, 0.0]);
|
||||
let vec2 = f32s_to_bytes(&[0.0, 1.0, 0.0, 0.0]);
|
||||
|
||||
let mut builder = BinaryBuilder::with_capacity(2, vec1.len() + vec2.len());
|
||||
builder.append_value(&vec1);
|
||||
builder.append_value(&vec2);
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("vec", DataType::Binary, true)]));
|
||||
let batch = RecordBatch::try_new(schema, vec![Arc::new(builder.finish())]).unwrap();
|
||||
|
||||
indexer.update_flat(&batch).await;
|
||||
let output = indexer.finish().await;
|
||||
|
||||
assert!(output.vector_index.is_available());
|
||||
assert!(output.vector_index.columns.contains(&6));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_index_build_task_sst_not_exist() {
|
||||
let env = SchedulerEnv::new().await;
|
||||
@@ -1839,6 +2086,8 @@ mod tests {
|
||||
inverted_index_config: InvertedIndexConfig::default(),
|
||||
fulltext_index_config: FulltextIndexConfig::default(),
|
||||
bloom_filter_index_config: BloomFilterConfig::default(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
});
|
||||
|
||||
let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
|
||||
|
||||
@@ -23,6 +23,8 @@ impl Indexer {
|
||||
self.do_abort_inverted_index().await;
|
||||
self.do_abort_fulltext_index().await;
|
||||
self.do_abort_bloom_filter().await;
|
||||
#[cfg(feature = "vector_index")]
|
||||
self.do_abort_vector_index().await;
|
||||
self.do_prune_intm_sst_dir().await;
|
||||
if self.write_cache_enabled {
|
||||
self.do_abort_clean_fs_temp_dir().await;
|
||||
@@ -106,4 +108,26 @@ impl Indexer {
|
||||
.to_string();
|
||||
TempFileCleaner::clean_atomic_dir_files(fs_accessor.store().store(), &[&fs_handle]).await;
|
||||
}
|
||||
|
||||
#[cfg(feature = "vector_index")]
|
||||
async fn do_abort_vector_index(&mut self) {
|
||||
let Some(mut indexer) = self.vector_indexer.take() else {
|
||||
return;
|
||||
};
|
||||
let Err(err) = indexer.abort().await else {
|
||||
return;
|
||||
};
|
||||
|
||||
if cfg!(any(test, feature = "test")) {
|
||||
panic!(
|
||||
"Failed to abort vector index, region_id: {}, file_id: {}, err: {:?}",
|
||||
self.region_id, self.file_id, err
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
err; "Failed to abort vector index, region_id: {}, file_id: {}",
|
||||
self.region_id, self.file_id,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,8 @@ use puffin::puffin_manager::{PuffinManager, PuffinWriter};
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::sst::file::{RegionFileId, RegionIndexId};
|
||||
#[cfg(feature = "vector_index")]
|
||||
use crate::sst::index::VectorIndexOutput;
|
||||
use crate::sst::index::puffin_manager::SstPuffinWriter;
|
||||
use crate::sst::index::statistics::{ByteCount, RowCount};
|
||||
use crate::sst::index::{
|
||||
@@ -54,6 +56,15 @@ impl Indexer {
|
||||
return IndexOutput::default();
|
||||
}
|
||||
|
||||
#[cfg(feature = "vector_index")]
|
||||
{
|
||||
let success = self.do_finish_vector_index(&mut writer, &mut output).await;
|
||||
if !success {
|
||||
self.do_abort().await;
|
||||
return IndexOutput::default();
|
||||
}
|
||||
}
|
||||
|
||||
self.do_prune_intm_sst_dir().await;
|
||||
output.file_size = self.do_finish_puffin_writer(writer).await;
|
||||
output.version = self.index_version;
|
||||
@@ -276,6 +287,63 @@ impl Indexer {
|
||||
output.columns = column_ids;
|
||||
}
|
||||
|
||||
#[cfg(feature = "vector_index")]
|
||||
async fn do_finish_vector_index(
|
||||
&mut self,
|
||||
puffin_writer: &mut SstPuffinWriter,
|
||||
index_output: &mut IndexOutput,
|
||||
) -> bool {
|
||||
let Some(mut indexer) = self.vector_indexer.take() else {
|
||||
return true;
|
||||
};
|
||||
|
||||
let column_ids = indexer.column_ids().collect();
|
||||
let err = match indexer.finish(puffin_writer).await {
|
||||
Ok((row_count, byte_count)) => {
|
||||
self.fill_vector_index_output(
|
||||
&mut index_output.vector_index,
|
||||
row_count,
|
||||
byte_count,
|
||||
column_ids,
|
||||
);
|
||||
return true;
|
||||
}
|
||||
Err(err) => err,
|
||||
};
|
||||
|
||||
if cfg!(any(test, feature = "test")) {
|
||||
panic!(
|
||||
"Failed to finish vector index, region_id: {}, file_id: {}, err: {:?}",
|
||||
self.region_id, self.file_id, err
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
err; "Failed to finish vector index, region_id: {}, file_id: {}",
|
||||
self.region_id, self.file_id,
|
||||
);
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
#[cfg(feature = "vector_index")]
|
||||
fn fill_vector_index_output(
|
||||
&mut self,
|
||||
output: &mut VectorIndexOutput,
|
||||
row_count: RowCount,
|
||||
byte_count: ByteCount,
|
||||
column_ids: Vec<ColumnId>,
|
||||
) {
|
||||
debug!(
|
||||
"Vector index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
|
||||
self.region_id, self.file_id, byte_count, row_count, column_ids
|
||||
);
|
||||
|
||||
output.index_size = byte_count;
|
||||
output.row_count = row_count;
|
||||
output.columns = column_ids;
|
||||
}
|
||||
|
||||
pub(crate) async fn do_prune_intm_sst_dir(&mut self) {
|
||||
if let Some(manager) = self.intermediate_manager.take()
|
||||
&& let Err(e) = manager.prune_sst_dir(&self.region_id, &self.file_id).await
|
||||
|
||||
@@ -33,6 +33,10 @@ impl Indexer {
|
||||
if !self.do_update_bloom_filter(batch).await {
|
||||
self.do_abort().await;
|
||||
}
|
||||
#[cfg(feature = "vector_index")]
|
||||
if !self.do_update_vector_index(batch).await {
|
||||
self.do_abort().await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns false if the update failed.
|
||||
@@ -110,6 +114,32 @@ impl Indexer {
|
||||
false
|
||||
}
|
||||
|
||||
/// Returns false if the update failed.
|
||||
#[cfg(feature = "vector_index")]
|
||||
async fn do_update_vector_index(&mut self, batch: &mut Batch) -> bool {
|
||||
let Some(creator) = self.vector_indexer.as_mut() else {
|
||||
return true;
|
||||
};
|
||||
|
||||
let Err(err) = creator.update(batch).await else {
|
||||
return true;
|
||||
};
|
||||
|
||||
if cfg!(any(test, feature = "test")) {
|
||||
panic!(
|
||||
"Failed to update vector index, region_id: {}, file_id: {}, err: {:?}",
|
||||
self.region_id, self.file_id, err
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
err; "Failed to update vector index, region_id: {}, file_id: {}",
|
||||
self.region_id, self.file_id,
|
||||
);
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
pub(crate) async fn do_update_flat(&mut self, batch: &RecordBatch) {
|
||||
if batch.num_rows() == 0 {
|
||||
return;
|
||||
@@ -124,6 +154,10 @@ impl Indexer {
|
||||
if !self.do_update_flat_bloom_filter(batch).await {
|
||||
self.do_abort().await;
|
||||
}
|
||||
#[cfg(feature = "vector_index")]
|
||||
if !self.do_update_flat_vector_index(batch).await {
|
||||
self.do_abort().await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns false if the update failed.
|
||||
@@ -200,4 +234,30 @@ impl Indexer {
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
/// Returns false if the update failed.
|
||||
#[cfg(feature = "vector_index")]
|
||||
async fn do_update_flat_vector_index(&mut self, batch: &RecordBatch) -> bool {
|
||||
let Some(creator) = self.vector_indexer.as_mut() else {
|
||||
return true;
|
||||
};
|
||||
|
||||
let Err(err) = creator.update_flat(batch).await else {
|
||||
return true;
|
||||
};
|
||||
|
||||
if cfg!(any(test, feature = "test")) {
|
||||
panic!(
|
||||
"Failed to update vector index with flat format, region_id: {}, file_id: {}, err: {:?}",
|
||||
self.region_id, self.file_id, err
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
err; "Failed to update vector index with flat format, region_id: {}, file_id: {}",
|
||||
self.region_id, self.file_id,
|
||||
);
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
920
src/mito2/src/sst/index/vector_index/creator.rs
Normal file
920
src/mito2/src/sst/index/vector_index/creator.rs
Normal file
@@ -0,0 +1,920 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Vector index creator using pluggable vector index engines.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
|
||||
use common_telemetry::warn;
|
||||
use datatypes::arrow::array::{Array, BinaryArray};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::prelude::ValueRef;
|
||||
use index::vector::{VectorDistanceMetric, VectorIndexOptions, distance_metric_to_usearch};
|
||||
use puffin::puffin_manager::{PuffinWriter, PutOptions};
|
||||
use roaring::RoaringBitmap;
|
||||
use snafu::{ResultExt, ensure};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::{ColumnId, FileId, VectorIndexEngine, VectorIndexEngineType};
|
||||
use tokio_util::compat::TokioAsyncReadCompatExt;
|
||||
use usearch::MetricKind;
|
||||
|
||||
use crate::error::{
|
||||
BiErrorsSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, Result, VectorIndexBuildSnafu,
|
||||
VectorIndexFinishSnafu,
|
||||
};
|
||||
use crate::metrics::{INDEX_CREATE_BYTES_TOTAL, INDEX_CREATE_ROWS_TOTAL};
|
||||
use crate::read::Batch;
|
||||
use crate::sst::index::TYPE_VECTOR_INDEX;
|
||||
use crate::sst::index::intermediate::{
|
||||
IntermediateLocation, IntermediateManager, TempFileProvider,
|
||||
};
|
||||
use crate::sst::index::puffin_manager::SstPuffinWriter;
|
||||
use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
|
||||
use crate::sst::index::vector_index::util::bytes_to_f32_slice;
|
||||
use crate::sst::index::vector_index::{INDEX_BLOB_TYPE, engine};
|
||||
|
||||
/// The buffer size for the pipe used to send index data to the puffin blob.
|
||||
const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
|
||||
|
||||
/// Configuration for a single column's vector index.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct VectorIndexConfig {
|
||||
/// The vector index engine type.
|
||||
pub engine: VectorIndexEngineType,
|
||||
/// The dimension of vectors in this column.
|
||||
pub dim: usize,
|
||||
/// The distance metric to use (e.g., L2, Cosine, IP) - usearch format.
|
||||
pub metric: MetricKind,
|
||||
/// The original distance metric (for serialization).
|
||||
pub distance_metric: VectorDistanceMetric,
|
||||
/// HNSW connectivity parameter (M in the paper).
|
||||
/// Higher values give better recall but use more memory.
|
||||
pub connectivity: usize,
|
||||
/// Expansion factor during index construction (ef_construction).
|
||||
pub expansion_add: usize,
|
||||
/// Expansion factor during search (ef_search).
|
||||
pub expansion_search: usize,
|
||||
}
|
||||
|
||||
impl VectorIndexConfig {
|
||||
/// Creates a new vector index config from VectorIndexOptions.
|
||||
pub fn new(dim: usize, options: &VectorIndexOptions) -> Self {
|
||||
Self {
|
||||
engine: options.engine,
|
||||
dim,
|
||||
metric: distance_metric_to_usearch(options.metric),
|
||||
distance_metric: options.metric,
|
||||
connectivity: options.connectivity as usize,
|
||||
expansion_add: options.expansion_add as usize,
|
||||
expansion_search: options.expansion_search as usize,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Creator for a single column's vector index.
|
||||
struct VectorIndexCreator {
|
||||
/// The vector index engine (e.g., USearch HNSW).
|
||||
engine: Box<dyn VectorIndexEngine>,
|
||||
/// Configuration for this index.
|
||||
config: VectorIndexConfig,
|
||||
/// Bitmap tracking which row offsets have NULL vectors.
|
||||
/// HNSW keys are sequential (0, 1, 2...) but row offsets may have gaps due to NULLs.
|
||||
null_bitmap: RoaringBitmap,
|
||||
/// Current row offset (including NULLs).
|
||||
current_row_offset: u64,
|
||||
/// Next HNSW key to assign (only for non-NULL vectors).
|
||||
next_hnsw_key: u64,
|
||||
/// Memory usage estimation.
|
||||
memory_usage: usize,
|
||||
}
|
||||
|
||||
impl VectorIndexCreator {
|
||||
/// Creates a new vector index creator.
|
||||
fn new(config: VectorIndexConfig) -> Result<Self> {
|
||||
let engine_instance = engine::create_engine(config.engine, &config)?;
|
||||
|
||||
Ok(Self {
|
||||
engine: engine_instance,
|
||||
config,
|
||||
null_bitmap: RoaringBitmap::new(),
|
||||
current_row_offset: 0,
|
||||
next_hnsw_key: 0,
|
||||
memory_usage: 0,
|
||||
})
|
||||
}
|
||||
|
||||
/// Reserves capacity for the expected number of vectors.
|
||||
#[allow(dead_code)]
|
||||
fn reserve(&mut self, capacity: usize) -> Result<()> {
|
||||
self.engine.reserve(capacity).map_err(|e| {
|
||||
VectorIndexBuildSnafu {
|
||||
reason: format!("Failed to reserve capacity: {}", e),
|
||||
}
|
||||
.build()
|
||||
})
|
||||
}
|
||||
|
||||
/// Adds a vector to the index.
|
||||
/// Returns the HNSW key assigned to this vector.
|
||||
fn add_vector(&mut self, vector: &[f32]) -> Result<u64> {
|
||||
let key = self.next_hnsw_key;
|
||||
self.engine.add(key, vector).map_err(|e| {
|
||||
VectorIndexBuildSnafu {
|
||||
reason: e.to_string(),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
self.next_hnsw_key += 1;
|
||||
self.current_row_offset += 1;
|
||||
self.memory_usage = self.engine.memory_usage();
|
||||
Ok(key)
|
||||
}
|
||||
|
||||
/// Records a NULL vector at the current row offset.
|
||||
fn add_null(&mut self) {
|
||||
self.null_bitmap.insert(self.current_row_offset as u32);
|
||||
self.current_row_offset += 1;
|
||||
}
|
||||
|
||||
/// Records multiple NULL vectors starting at the current row offset.
|
||||
fn add_nulls(&mut self, n: usize) {
|
||||
let start = self.current_row_offset as u32;
|
||||
let end = start + n as u32;
|
||||
self.null_bitmap.insert_range(start..end);
|
||||
self.current_row_offset += n as u64;
|
||||
}
|
||||
|
||||
/// Returns the serialized size of the index.
|
||||
fn serialized_length(&self) -> usize {
|
||||
self.engine.serialized_length()
|
||||
}
|
||||
|
||||
/// Serializes the index to a buffer.
|
||||
fn save_to_buffer(&self, buffer: &mut [u8]) -> Result<()> {
|
||||
self.engine.save_to_buffer(buffer).map_err(|e| {
|
||||
VectorIndexFinishSnafu {
|
||||
reason: format!("Failed to serialize index: {}", e),
|
||||
}
|
||||
.build()
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the memory usage of this creator.
|
||||
fn memory_usage(&self) -> usize {
|
||||
self.memory_usage + self.null_bitmap.serialized_size()
|
||||
}
|
||||
|
||||
/// Returns the number of vectors in the index (excluding NULLs).
|
||||
fn size(&self) -> usize {
|
||||
self.engine.size()
|
||||
}
|
||||
|
||||
/// Returns the engine type.
|
||||
fn engine_type(&self) -> VectorIndexEngineType {
|
||||
self.config.engine
|
||||
}
|
||||
|
||||
/// Returns the distance metric.
|
||||
fn metric(&self) -> VectorDistanceMetric {
|
||||
self.config.distance_metric
|
||||
}
|
||||
}
|
||||
|
||||
/// The indexer for vector indexes across multiple columns.
|
||||
pub struct VectorIndexer {
|
||||
/// Per-column vector index creators.
|
||||
creators: HashMap<ColumnId, VectorIndexCreator>,
|
||||
/// Provider for intermediate files.
|
||||
temp_file_provider: Arc<TempFileProvider>,
|
||||
/// Whether the indexing process has been aborted.
|
||||
aborted: bool,
|
||||
/// Statistics for this indexer.
|
||||
stats: Statistics,
|
||||
/// Global memory usage tracker.
|
||||
#[allow(dead_code)]
|
||||
global_memory_usage: Arc<AtomicUsize>,
|
||||
/// Region metadata for column lookups.
|
||||
#[allow(dead_code)]
|
||||
metadata: RegionMetadataRef,
|
||||
/// Memory usage threshold.
|
||||
memory_usage_threshold: Option<usize>,
|
||||
}
|
||||
|
||||
impl VectorIndexer {
|
||||
/// Creates a new vector indexer.
|
||||
///
|
||||
/// Returns `None` if there are no vector columns that need indexing.
|
||||
pub fn new(
|
||||
sst_file_id: FileId,
|
||||
metadata: &RegionMetadataRef,
|
||||
intermediate_manager: IntermediateManager,
|
||||
memory_usage_threshold: Option<usize>,
|
||||
vector_index_options: &HashMap<ColumnId, VectorIndexOptions>,
|
||||
) -> Result<Option<Self>> {
|
||||
let mut creators = HashMap::new();
|
||||
|
||||
let temp_file_provider = Arc::new(TempFileProvider::new(
|
||||
IntermediateLocation::new(&metadata.region_id, &sst_file_id),
|
||||
intermediate_manager,
|
||||
));
|
||||
let global_memory_usage = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
// Find all vector columns that have vector index enabled
|
||||
for column in &metadata.column_metadatas {
|
||||
// Check if this column has vector index options configured
|
||||
let Some(options) = vector_index_options.get(&column.column_id) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
// Verify the column is a vector type
|
||||
let ConcreteDataType::Vector(vector_type) = &column.column_schema.data_type else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let config = VectorIndexConfig::new(vector_type.dim as usize, options);
|
||||
let creator = VectorIndexCreator::new(config)?;
|
||||
creators.insert(column.column_id, creator);
|
||||
}
|
||||
|
||||
if creators.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let indexer = Self {
|
||||
creators,
|
||||
temp_file_provider,
|
||||
aborted: false,
|
||||
stats: Statistics::new(TYPE_VECTOR_INDEX),
|
||||
global_memory_usage,
|
||||
metadata: metadata.clone(),
|
||||
memory_usage_threshold,
|
||||
};
|
||||
|
||||
Ok(Some(indexer))
|
||||
}
|
||||
|
||||
/// Updates index with a batch of rows.
|
||||
/// Garbage will be cleaned up if failed to update.
|
||||
pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
|
||||
ensure!(!self.aborted, OperateAbortedIndexSnafu);
|
||||
|
||||
if self.creators.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Err(update_err) = self.do_update(batch).await {
|
||||
// Clean up garbage if failed to update
|
||||
if let Err(err) = self.do_cleanup().await {
|
||||
if cfg!(any(test, feature = "test")) {
|
||||
panic!("Failed to clean up vector index creator, err: {err:?}");
|
||||
} else {
|
||||
warn!(err; "Failed to clean up vector index creator");
|
||||
}
|
||||
}
|
||||
return Err(update_err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Updates index with a flat format `RecordBatch`.
|
||||
/// Garbage will be cleaned up if failed to update.
|
||||
pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
|
||||
ensure!(!self.aborted, OperateAbortedIndexSnafu);
|
||||
|
||||
if self.creators.is_empty() || batch.num_rows() == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Err(update_err) = self.do_update_flat(batch).await {
|
||||
// Clean up garbage if failed to update
|
||||
if let Err(err) = self.do_cleanup().await {
|
||||
if cfg!(any(test, feature = "test")) {
|
||||
panic!("Failed to clean up vector index creator, err: {err:?}");
|
||||
} else {
|
||||
warn!(err; "Failed to clean up vector index creator");
|
||||
}
|
||||
}
|
||||
return Err(update_err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Internal update implementation.
|
||||
async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
|
||||
let mut guard = self.stats.record_update();
|
||||
let n = batch.num_rows();
|
||||
guard.inc_row_count(n);
|
||||
|
||||
for (col_id, creator) in &mut self.creators {
|
||||
let Some(values) = batch.field_col_value(*col_id) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
// Process each row in the batch
|
||||
for i in 0..n {
|
||||
let value = values.data.get_ref(i);
|
||||
if value.is_null() {
|
||||
creator.add_null();
|
||||
} else {
|
||||
// Extract the vector bytes and convert to f32 slice
|
||||
if let ValueRef::Binary(bytes) = value {
|
||||
let floats = bytes_to_f32_slice(bytes);
|
||||
if floats.len() != creator.config.dim {
|
||||
return VectorIndexBuildSnafu {
|
||||
reason: format!(
|
||||
"Vector dimension mismatch: expected {}, got {}",
|
||||
creator.config.dim,
|
||||
floats.len()
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
creator.add_vector(&floats)?;
|
||||
} else {
|
||||
creator.add_null();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check memory limit - abort index creation if exceeded
|
||||
if let Some(threshold) = self.memory_usage_threshold {
|
||||
let current_usage = creator.memory_usage();
|
||||
if current_usage > threshold {
|
||||
warn!(
|
||||
"Vector index memory usage {} exceeds threshold {}, aborting index creation, region_id: {}",
|
||||
current_usage, threshold, self.metadata.region_id
|
||||
);
|
||||
return VectorIndexBuildSnafu {
|
||||
reason: format!(
|
||||
"Memory usage {} exceeds threshold {}",
|
||||
current_usage, threshold
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Internal flat update implementation.
|
||||
async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
|
||||
let mut guard = self.stats.record_update();
|
||||
let n = batch.num_rows();
|
||||
guard.inc_row_count(n);
|
||||
|
||||
for (col_id, creator) in &mut self.creators {
|
||||
// This should never happen: creator exists but column not in metadata
|
||||
let column_meta = self.metadata.column_by_id(*col_id).ok_or_else(|| {
|
||||
VectorIndexBuildSnafu {
|
||||
reason: format!(
|
||||
"Column {} not found in region metadata, this is a bug",
|
||||
col_id
|
||||
),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
let column_name = &column_meta.column_schema.name;
|
||||
// Column not in batch is normal for flat format - treat as NULLs
|
||||
let Some(column_array) = batch.column_by_name(column_name) else {
|
||||
creator.add_nulls(n);
|
||||
continue;
|
||||
};
|
||||
|
||||
// Vector type must be stored as binary array
|
||||
let binary_array = column_array
|
||||
.as_any()
|
||||
.downcast_ref::<BinaryArray>()
|
||||
.ok_or_else(|| {
|
||||
VectorIndexBuildSnafu {
|
||||
reason: format!(
|
||||
"Column {} is not a binary array, got {:?}",
|
||||
column_name,
|
||||
column_array.data_type()
|
||||
),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
for i in 0..n {
|
||||
if !binary_array.is_valid(i) {
|
||||
creator.add_null();
|
||||
} else {
|
||||
let bytes = binary_array.value(i);
|
||||
let floats = bytes_to_f32_slice(bytes);
|
||||
if floats.len() != creator.config.dim {
|
||||
return VectorIndexBuildSnafu {
|
||||
reason: format!(
|
||||
"Vector dimension mismatch: expected {}, got {}",
|
||||
creator.config.dim,
|
||||
floats.len()
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
creator.add_vector(&floats)?;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(threshold) = self.memory_usage_threshold {
|
||||
let current_usage = creator.memory_usage();
|
||||
if current_usage > threshold {
|
||||
warn!(
|
||||
"Vector index memory usage {} exceeds threshold {}, aborting index creation, region_id: {}",
|
||||
current_usage, threshold, self.metadata.region_id
|
||||
);
|
||||
return VectorIndexBuildSnafu {
|
||||
reason: format!(
|
||||
"Memory usage {} exceeds threshold {}",
|
||||
current_usage, threshold
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Finishes index creation and writes to puffin.
|
||||
/// Returns the number of rows and bytes written.
|
||||
pub async fn finish(
|
||||
&mut self,
|
||||
puffin_writer: &mut SstPuffinWriter,
|
||||
) -> Result<(RowCount, ByteCount)> {
|
||||
ensure!(!self.aborted, OperateAbortedIndexSnafu);
|
||||
|
||||
if self.stats.row_count() == 0 {
|
||||
// No IO is performed, no garbage to clean up
|
||||
return Ok((0, 0));
|
||||
}
|
||||
|
||||
let finish_res = self.do_finish(puffin_writer).await;
|
||||
// Clean up garbage no matter finish successfully or not
|
||||
if let Err(err) = self.do_cleanup().await {
|
||||
if cfg!(any(test, feature = "test")) {
|
||||
panic!("Failed to clean up vector index creator, err: {err:?}");
|
||||
} else {
|
||||
warn!(err; "Failed to clean up vector index creator");
|
||||
}
|
||||
}
|
||||
|
||||
// Report metrics on successful finish
|
||||
if finish_res.is_ok() {
|
||||
INDEX_CREATE_ROWS_TOTAL
|
||||
.with_label_values(&[TYPE_VECTOR_INDEX])
|
||||
.inc_by(self.stats.row_count() as u64);
|
||||
INDEX_CREATE_BYTES_TOTAL
|
||||
.with_label_values(&[TYPE_VECTOR_INDEX])
|
||||
.inc_by(self.stats.byte_count());
|
||||
}
|
||||
|
||||
finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
|
||||
}
|
||||
|
||||
/// Internal finish implementation.
|
||||
async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
|
||||
let mut guard = self.stats.record_finish();
|
||||
|
||||
for (id, creator) in &mut self.creators {
|
||||
if creator.size() == 0 {
|
||||
// No vectors to index
|
||||
continue;
|
||||
}
|
||||
|
||||
let written_bytes = Self::do_finish_single_creator(*id, creator, puffin_writer).await?;
|
||||
guard.inc_byte_count(written_bytes);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Finishes a single column's vector index.
|
||||
///
|
||||
/// The blob format v1 (header = 33 bytes):
|
||||
/// ```text
|
||||
/// +------------------+
|
||||
/// | Version | 1 byte (u8, = 1)
|
||||
/// +------------------+
|
||||
/// | Engine type | 1 byte (u8, engine identifier)
|
||||
/// +------------------+
|
||||
/// | Dimension | 4 bytes (u32, little-endian)
|
||||
/// +------------------+
|
||||
/// | Metric | 1 byte (u8, distance metric)
|
||||
/// +------------------+
|
||||
/// | Connectivity | 2 bytes (u16, little-endian, HNSW M parameter)
|
||||
/// +------------------+
|
||||
/// | Expansion add | 2 bytes (u16, little-endian, ef_construction)
|
||||
/// +------------------+
|
||||
/// | Expansion search | 2 bytes (u16, little-endian, ef_search)
|
||||
/// +------------------+
|
||||
/// | Total rows | 8 bytes (u64, little-endian, total rows in SST)
|
||||
/// +------------------+
|
||||
/// | Indexed rows | 8 bytes (u64, little-endian, non-NULL rows indexed)
|
||||
/// +------------------+
|
||||
/// | NULL bitmap len | 4 bytes (u32, little-endian)
|
||||
/// +------------------+
|
||||
/// | NULL bitmap | variable length (serialized RoaringBitmap)
|
||||
/// +------------------+
|
||||
/// | Vector index | variable length (engine-specific serialized format)
|
||||
/// +------------------+
|
||||
/// ```
|
||||
async fn do_finish_single_creator(
|
||||
col_id: ColumnId,
|
||||
creator: &mut VectorIndexCreator,
|
||||
puffin_writer: &mut SstPuffinWriter,
|
||||
) -> Result<ByteCount> {
|
||||
// Serialize the NULL bitmap
|
||||
let mut null_bitmap_bytes = Vec::new();
|
||||
creator
|
||||
.null_bitmap
|
||||
.serialize_into(&mut null_bitmap_bytes)
|
||||
.map_err(|e| {
|
||||
VectorIndexFinishSnafu {
|
||||
reason: format!("Failed to serialize NULL bitmap: {}", e),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
// Serialize the vector index
|
||||
let index_size = creator.serialized_length();
|
||||
let mut index_bytes = vec![0u8; index_size];
|
||||
creator.save_to_buffer(&mut index_bytes)?;
|
||||
|
||||
// Header size: version(1) + engine(1) + dim(4) + metric(1) +
|
||||
// connectivity(2) + expansion_add(2) + expansion_search(2) +
|
||||
// total_rows(8) + indexed_rows(8) + bitmap_len(4) = 33 bytes
|
||||
/// Size of the vector index blob header in bytes.
|
||||
/// Header format: version(1) + engine(1) + dim(4) + metric(1) +
|
||||
/// connectivity(2) + expansion_add(2) + expansion_search(2) +
|
||||
/// total_rows(8) + indexed_rows(8) + bitmap_len(4) = 33 bytes
|
||||
const VECTOR_INDEX_BLOB_HEADER_SIZE: usize = 33;
|
||||
let total_size =
|
||||
VECTOR_INDEX_BLOB_HEADER_SIZE + null_bitmap_bytes.len() + index_bytes.len();
|
||||
let mut blob_data = Vec::with_capacity(total_size);
|
||||
|
||||
// Write version (1 byte)
|
||||
blob_data.push(1u8);
|
||||
// Write engine type (1 byte)
|
||||
blob_data.push(creator.engine_type().as_u8());
|
||||
// Write dimension (4 bytes, little-endian)
|
||||
blob_data.extend_from_slice(&(creator.config.dim as u32).to_le_bytes());
|
||||
// Write metric (1 byte)
|
||||
blob_data.push(creator.metric().as_u8());
|
||||
// Write connectivity/M (2 bytes, little-endian)
|
||||
blob_data.extend_from_slice(&(creator.config.connectivity as u16).to_le_bytes());
|
||||
// Write expansion_add/ef_construction (2 bytes, little-endian)
|
||||
blob_data.extend_from_slice(&(creator.config.expansion_add as u16).to_le_bytes());
|
||||
// Write expansion_search/ef_search (2 bytes, little-endian)
|
||||
blob_data.extend_from_slice(&(creator.config.expansion_search as u16).to_le_bytes());
|
||||
// Write total_rows (8 bytes, little-endian)
|
||||
blob_data.extend_from_slice(&creator.current_row_offset.to_le_bytes());
|
||||
// Write indexed_rows (8 bytes, little-endian)
|
||||
blob_data.extend_from_slice(&creator.next_hnsw_key.to_le_bytes());
|
||||
// Write NULL bitmap length (4 bytes, little-endian)
|
||||
let bitmap_len: u32 = null_bitmap_bytes.len().try_into().map_err(|_| {
|
||||
VectorIndexBuildSnafu {
|
||||
reason: format!(
|
||||
"NULL bitmap size {} exceeds maximum allowed size {}",
|
||||
null_bitmap_bytes.len(),
|
||||
u32::MAX
|
||||
),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
blob_data.extend_from_slice(&bitmap_len.to_le_bytes());
|
||||
// Write NULL bitmap
|
||||
blob_data.extend_from_slice(&null_bitmap_bytes);
|
||||
// Write vector index
|
||||
blob_data.extend_from_slice(&index_bytes);
|
||||
|
||||
// Create blob name following the same pattern as bloom filter
|
||||
let blob_name = format!("{}-{}", INDEX_BLOB_TYPE, col_id);
|
||||
|
||||
// Write to puffin using a pipe
|
||||
let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
|
||||
|
||||
// Writer task writes the blob data to the pipe
|
||||
let write_index = async move {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
let mut writer = tx;
|
||||
writer.write_all(&blob_data).await?;
|
||||
writer.shutdown().await?;
|
||||
Ok::<(), std::io::Error>(())
|
||||
};
|
||||
|
||||
let (index_write_result, puffin_add_blob) = futures::join!(
|
||||
write_index,
|
||||
puffin_writer.put_blob(
|
||||
&blob_name,
|
||||
rx.compat(),
|
||||
PutOptions::default(),
|
||||
Default::default()
|
||||
)
|
||||
);
|
||||
|
||||
match (
|
||||
puffin_add_blob.context(PuffinAddBlobSnafu),
|
||||
index_write_result.map_err(|e| {
|
||||
VectorIndexFinishSnafu {
|
||||
reason: format!("Failed to write blob data: {}", e),
|
||||
}
|
||||
.build()
|
||||
}),
|
||||
) {
|
||||
(Err(e1), Err(e2)) => BiErrorsSnafu {
|
||||
first: Box::new(e1),
|
||||
second: Box::new(e2),
|
||||
}
|
||||
.fail()?,
|
||||
|
||||
(Ok(_), e @ Err(_)) => e?,
|
||||
(e @ Err(_), Ok(_)) => e.map(|_| ())?,
|
||||
(Ok(written_bytes), Ok(_)) => {
|
||||
return Ok(written_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
/// Aborts index creation and cleans up garbage.
|
||||
pub async fn abort(&mut self) -> Result<()> {
|
||||
if self.aborted {
|
||||
return Ok(());
|
||||
}
|
||||
self.aborted = true;
|
||||
self.do_cleanup().await
|
||||
}
|
||||
|
||||
/// Cleans up temporary files.
|
||||
async fn do_cleanup(&mut self) -> Result<()> {
|
||||
let mut _guard = self.stats.record_cleanup();
|
||||
self.creators.clear();
|
||||
self.temp_file_provider.cleanup().await
|
||||
}
|
||||
|
||||
/// Returns the memory usage of the indexer.
|
||||
pub fn memory_usage(&self) -> usize {
|
||||
self.creators.values().map(|c| c.memory_usage()).sum()
|
||||
}
|
||||
|
||||
/// Returns the column IDs being indexed.
|
||||
pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
|
||||
self.creators.keys().copied()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_vector_index_creator() {
|
||||
let options = VectorIndexOptions::default();
|
||||
let config = VectorIndexConfig::new(4, &options);
|
||||
let mut creator = VectorIndexCreator::new(config).unwrap();
|
||||
|
||||
creator.reserve(10).unwrap();
|
||||
|
||||
// Add some vectors
|
||||
let v1 = vec![1.0f32, 0.0, 0.0, 0.0];
|
||||
let v2 = vec![0.0f32, 1.0, 0.0, 0.0];
|
||||
|
||||
creator.add_vector(&v1).unwrap();
|
||||
creator.add_null();
|
||||
creator.add_vector(&v2).unwrap();
|
||||
|
||||
assert_eq!(creator.size(), 2); // 2 vectors (excluding NULL)
|
||||
assert_eq!(creator.current_row_offset, 3); // 3 rows total
|
||||
assert!(creator.null_bitmap.contains(1)); // Row 1 is NULL
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_vector_index_creator_serialization() {
|
||||
let options = VectorIndexOptions::default();
|
||||
let config = VectorIndexConfig::new(4, &options);
|
||||
let mut creator = VectorIndexCreator::new(config).unwrap();
|
||||
|
||||
creator.reserve(10).unwrap();
|
||||
|
||||
// Add vectors
|
||||
let vectors = vec![
|
||||
vec![1.0f32, 0.0, 0.0, 0.0],
|
||||
vec![0.0f32, 1.0, 0.0, 0.0],
|
||||
vec![0.0f32, 0.0, 1.0, 0.0],
|
||||
];
|
||||
|
||||
for v in &vectors {
|
||||
creator.add_vector(v).unwrap();
|
||||
}
|
||||
|
||||
// Test serialization
|
||||
let size = creator.serialized_length();
|
||||
assert!(size > 0);
|
||||
|
||||
let mut buffer = vec![0u8; size];
|
||||
creator.save_to_buffer(&mut buffer).unwrap();
|
||||
|
||||
// Verify buffer is not empty and starts with some data
|
||||
assert!(!buffer.iter().all(|&b| b == 0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_vector_index_creator_null_bitmap_serialization() {
|
||||
let options = VectorIndexOptions::default();
|
||||
let config = VectorIndexConfig::new(4, &options);
|
||||
let mut creator = VectorIndexCreator::new(config).unwrap();
|
||||
|
||||
creator.reserve(10).unwrap();
|
||||
|
||||
// Add pattern: vector, null, vector, null, null, vector
|
||||
creator.add_vector(&[1.0, 0.0, 0.0, 0.0]).unwrap();
|
||||
creator.add_null();
|
||||
creator.add_vector(&[0.0, 1.0, 0.0, 0.0]).unwrap();
|
||||
creator.add_nulls(2);
|
||||
creator.add_vector(&[0.0, 0.0, 1.0, 0.0]).unwrap();
|
||||
|
||||
assert_eq!(creator.size(), 3); // 3 vectors
|
||||
assert_eq!(creator.current_row_offset, 6); // 6 rows total
|
||||
assert!(!creator.null_bitmap.contains(0));
|
||||
assert!(creator.null_bitmap.contains(1));
|
||||
assert!(!creator.null_bitmap.contains(2));
|
||||
assert!(creator.null_bitmap.contains(3));
|
||||
assert!(creator.null_bitmap.contains(4));
|
||||
assert!(!creator.null_bitmap.contains(5));
|
||||
|
||||
// Test NULL bitmap serialization
|
||||
let mut bitmap_bytes = Vec::new();
|
||||
creator
|
||||
.null_bitmap
|
||||
.serialize_into(&mut bitmap_bytes)
|
||||
.unwrap();
|
||||
|
||||
// Deserialize and verify
|
||||
let restored = RoaringBitmap::deserialize_from(&bitmap_bytes[..]).unwrap();
|
||||
assert_eq!(restored.len(), 3); // 3 NULLs
|
||||
assert!(restored.contains(1));
|
||||
assert!(restored.contains(3));
|
||||
assert!(restored.contains(4));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_vector_index_config() {
|
||||
use index::vector::VectorDistanceMetric;
|
||||
|
||||
let options = VectorIndexOptions {
|
||||
engine: VectorIndexEngineType::default(),
|
||||
metric: VectorDistanceMetric::Cosine,
|
||||
connectivity: 32,
|
||||
expansion_add: 256,
|
||||
expansion_search: 128,
|
||||
};
|
||||
let config = VectorIndexConfig::new(128, &options);
|
||||
assert_eq!(config.engine, VectorIndexEngineType::Usearch);
|
||||
assert_eq!(config.dim, 128);
|
||||
assert_eq!(config.metric, MetricKind::Cos);
|
||||
assert_eq!(config.connectivity, 32);
|
||||
assert_eq!(config.expansion_add, 256);
|
||||
assert_eq!(config.expansion_search, 128);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_vector_index_header_format() {
|
||||
use index::vector::VectorDistanceMetric;
|
||||
|
||||
// Create config with specific HNSW parameters
|
||||
let options = VectorIndexOptions {
|
||||
engine: VectorIndexEngineType::Usearch,
|
||||
metric: VectorDistanceMetric::L2sq,
|
||||
connectivity: 24,
|
||||
expansion_add: 200,
|
||||
expansion_search: 100,
|
||||
};
|
||||
let config = VectorIndexConfig::new(4, &options);
|
||||
let mut creator = VectorIndexCreator::new(config).unwrap();
|
||||
|
||||
creator.reserve(10).unwrap();
|
||||
|
||||
// Add pattern: vector, null, vector, null, vector
|
||||
creator.add_vector(&[1.0, 0.0, 0.0, 0.0]).unwrap();
|
||||
creator.add_null();
|
||||
creator.add_vector(&[0.0, 1.0, 0.0, 0.0]).unwrap();
|
||||
creator.add_null();
|
||||
creator.add_vector(&[0.0, 0.0, 1.0, 0.0]).unwrap();
|
||||
|
||||
// Verify counts
|
||||
assert_eq!(creator.current_row_offset, 5); // total_rows
|
||||
assert_eq!(creator.next_hnsw_key, 3); // indexed_rows
|
||||
|
||||
// Build blob data manually (simulating write_to_puffin header writing)
|
||||
let mut null_bitmap_bytes = Vec::new();
|
||||
creator
|
||||
.null_bitmap
|
||||
.serialize_into(&mut null_bitmap_bytes)
|
||||
.unwrap();
|
||||
|
||||
let index_size = creator.serialized_length();
|
||||
let mut index_bytes = vec![0u8; index_size];
|
||||
creator.save_to_buffer(&mut index_bytes).unwrap();
|
||||
|
||||
// Header: 33 bytes
|
||||
let header_size = 33;
|
||||
let total_size = header_size + null_bitmap_bytes.len() + index_bytes.len();
|
||||
let mut blob_data = Vec::with_capacity(total_size);
|
||||
|
||||
// Write header fields
|
||||
blob_data.push(1u8); // version
|
||||
blob_data.push(creator.engine_type().as_u8()); // engine type
|
||||
blob_data.extend_from_slice(&(creator.config.dim as u32).to_le_bytes()); // dimension
|
||||
blob_data.push(creator.metric().as_u8()); // metric
|
||||
blob_data.extend_from_slice(&(creator.config.connectivity as u16).to_le_bytes());
|
||||
blob_data.extend_from_slice(&(creator.config.expansion_add as u16).to_le_bytes());
|
||||
blob_data.extend_from_slice(&(creator.config.expansion_search as u16).to_le_bytes());
|
||||
blob_data.extend_from_slice(&creator.current_row_offset.to_le_bytes()); // total_rows
|
||||
blob_data.extend_from_slice(&creator.next_hnsw_key.to_le_bytes()); // indexed_rows
|
||||
let bitmap_len: u32 = null_bitmap_bytes.len().try_into().unwrap();
|
||||
blob_data.extend_from_slice(&bitmap_len.to_le_bytes());
|
||||
blob_data.extend_from_slice(&null_bitmap_bytes);
|
||||
blob_data.extend_from_slice(&index_bytes);
|
||||
|
||||
// Verify header size
|
||||
assert_eq!(blob_data.len(), total_size);
|
||||
|
||||
// Parse header and verify values
|
||||
assert_eq!(blob_data[0], 1); // version
|
||||
assert_eq!(blob_data[1], VectorIndexEngineType::Usearch.as_u8()); // engine
|
||||
|
||||
let dim = u32::from_le_bytes([blob_data[2], blob_data[3], blob_data[4], blob_data[5]]);
|
||||
assert_eq!(dim, 4);
|
||||
|
||||
let metric = blob_data[6];
|
||||
assert_eq!(
|
||||
metric,
|
||||
datatypes::schema::VectorDistanceMetric::L2sq.as_u8()
|
||||
);
|
||||
|
||||
let connectivity = u16::from_le_bytes([blob_data[7], blob_data[8]]);
|
||||
assert_eq!(connectivity, 24);
|
||||
|
||||
let expansion_add = u16::from_le_bytes([blob_data[9], blob_data[10]]);
|
||||
assert_eq!(expansion_add, 200);
|
||||
|
||||
let expansion_search = u16::from_le_bytes([blob_data[11], blob_data[12]]);
|
||||
assert_eq!(expansion_search, 100);
|
||||
|
||||
let total_rows = u64::from_le_bytes([
|
||||
blob_data[13],
|
||||
blob_data[14],
|
||||
blob_data[15],
|
||||
blob_data[16],
|
||||
blob_data[17],
|
||||
blob_data[18],
|
||||
blob_data[19],
|
||||
blob_data[20],
|
||||
]);
|
||||
assert_eq!(total_rows, 5);
|
||||
|
||||
let indexed_rows = u64::from_le_bytes([
|
||||
blob_data[21],
|
||||
blob_data[22],
|
||||
blob_data[23],
|
||||
blob_data[24],
|
||||
blob_data[25],
|
||||
blob_data[26],
|
||||
blob_data[27],
|
||||
blob_data[28],
|
||||
]);
|
||||
assert_eq!(indexed_rows, 3);
|
||||
|
||||
let null_bitmap_len =
|
||||
u32::from_le_bytes([blob_data[29], blob_data[30], blob_data[31], blob_data[32]]);
|
||||
assert_eq!(null_bitmap_len as usize, null_bitmap_bytes.len());
|
||||
|
||||
// Verify null bitmap can be deserialized
|
||||
let null_bitmap_data = &blob_data[header_size..header_size + null_bitmap_len as usize];
|
||||
let restored_bitmap = RoaringBitmap::deserialize_from(null_bitmap_data).unwrap();
|
||||
assert_eq!(restored_bitmap.len(), 2); // 2 nulls
|
||||
assert!(restored_bitmap.contains(1));
|
||||
assert!(restored_bitmap.contains(3));
|
||||
}
|
||||
}
|
||||
45
src/mito2/src/sst/index/vector_index/engine/mod.rs
Normal file
45
src/mito2/src/sst/index/vector_index/engine/mod.rs
Normal file
@@ -0,0 +1,45 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Pluggable vector index engine implementations.
|
||||
|
||||
mod usearch_impl;
|
||||
|
||||
use store_api::storage::{VectorIndexEngine, VectorIndexEngineType};
|
||||
pub use usearch_impl::UsearchEngine;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::sst::index::vector_index::creator::VectorIndexConfig;
|
||||
|
||||
/// Creates a new vector index engine based on the engine type.
|
||||
pub fn create_engine(
|
||||
engine_type: VectorIndexEngineType,
|
||||
config: &VectorIndexConfig,
|
||||
) -> Result<Box<dyn VectorIndexEngine>> {
|
||||
match engine_type {
|
||||
VectorIndexEngineType::Usearch => Ok(Box::new(UsearchEngine::create(config)?)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Loads a vector index engine from serialized data.
|
||||
#[allow(unused)]
|
||||
pub fn load_engine(
|
||||
engine_type: VectorIndexEngineType,
|
||||
config: &VectorIndexConfig,
|
||||
data: &[u8],
|
||||
) -> Result<Box<dyn VectorIndexEngine>> {
|
||||
match engine_type {
|
||||
VectorIndexEngineType::Usearch => Ok(Box::new(UsearchEngine::load(config, data)?)),
|
||||
}
|
||||
}
|
||||
231
src/mito2/src/sst/index/vector_index/engine/usearch_impl.rs
Normal file
231
src/mito2/src/sst/index/vector_index/engine/usearch_impl.rs
Normal file
@@ -0,0 +1,231 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! USearch HNSW implementation of VectorIndexEngine.
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use store_api::storage::{VectorIndexEngine, VectorSearchMatches};
|
||||
use usearch::{Index, IndexOptions, ScalarKind};
|
||||
|
||||
use crate::error::{Result, VectorIndexBuildSnafu};
|
||||
use crate::sst::index::vector_index::creator::VectorIndexConfig;
|
||||
|
||||
/// USearch-based vector index engine using HNSW algorithm.
|
||||
pub struct UsearchEngine {
|
||||
index: Index,
|
||||
}
|
||||
|
||||
impl UsearchEngine {
|
||||
/// Creates a new USearch engine with the given configuration.
|
||||
pub fn create(config: &VectorIndexConfig) -> Result<Self> {
|
||||
let options = IndexOptions {
|
||||
dimensions: config.dim,
|
||||
metric: config.metric,
|
||||
quantization: ScalarKind::F32,
|
||||
connectivity: config.connectivity,
|
||||
expansion_add: config.expansion_add,
|
||||
expansion_search: config.expansion_search,
|
||||
multi: false,
|
||||
};
|
||||
|
||||
let index = Index::new(&options).map_err(|e| {
|
||||
VectorIndexBuildSnafu {
|
||||
reason: format!("Failed to create USearch index: {}", e),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
Ok(Self { index })
|
||||
}
|
||||
|
||||
/// Loads a USearch engine from serialized data.
|
||||
#[allow(unused)]
|
||||
pub fn load(config: &VectorIndexConfig, data: &[u8]) -> Result<Self> {
|
||||
let options = IndexOptions {
|
||||
dimensions: config.dim,
|
||||
metric: config.metric,
|
||||
quantization: ScalarKind::F32,
|
||||
// These will be loaded from serialized data
|
||||
connectivity: 0,
|
||||
expansion_add: 0,
|
||||
expansion_search: 0,
|
||||
multi: false,
|
||||
};
|
||||
|
||||
let index = Index::new(&options).map_err(|e| {
|
||||
VectorIndexBuildSnafu {
|
||||
reason: format!("Failed to create USearch index for loading: {}", e),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
index.load_from_buffer(data).map_err(|e| {
|
||||
VectorIndexBuildSnafu {
|
||||
reason: format!("Failed to load USearch index from buffer: {}", e),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
Ok(Self { index })
|
||||
}
|
||||
}
|
||||
|
||||
impl VectorIndexEngine for UsearchEngine {
|
||||
fn add(&mut self, key: u64, vector: &[f32]) -> Result<(), BoxedError> {
|
||||
// Reserve capacity if needed
|
||||
if self.index.size() >= self.index.capacity() {
|
||||
let new_capacity = std::cmp::max(1, self.index.capacity() * 2);
|
||||
self.index.reserve(new_capacity).map_err(|e| {
|
||||
BoxedError::new(
|
||||
VectorIndexBuildSnafu {
|
||||
reason: format!("Failed to reserve capacity: {}", e),
|
||||
}
|
||||
.build(),
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
self.index.add(key, vector).map_err(|e| {
|
||||
BoxedError::new(
|
||||
VectorIndexBuildSnafu {
|
||||
reason: format!("Failed to add vector: {}", e),
|
||||
}
|
||||
.build(),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn search(&self, query: &[f32], k: usize) -> Result<VectorSearchMatches, BoxedError> {
|
||||
let matches = self.index.search(query, k).map_err(|e| {
|
||||
BoxedError::new(
|
||||
VectorIndexBuildSnafu {
|
||||
reason: format!("Failed to search: {}", e),
|
||||
}
|
||||
.build(),
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(VectorSearchMatches {
|
||||
keys: matches.keys,
|
||||
distances: matches.distances,
|
||||
})
|
||||
}
|
||||
|
||||
fn serialized_length(&self) -> usize {
|
||||
self.index.serialized_length()
|
||||
}
|
||||
|
||||
fn save_to_buffer(&self, buffer: &mut [u8]) -> Result<(), BoxedError> {
|
||||
self.index.save_to_buffer(buffer).map_err(|e| {
|
||||
BoxedError::new(
|
||||
VectorIndexBuildSnafu {
|
||||
reason: format!("Failed to save to buffer: {}", e),
|
||||
}
|
||||
.build(),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn reserve(&mut self, capacity: usize) -> Result<(), BoxedError> {
|
||||
self.index.reserve(capacity).map_err(|e| {
|
||||
BoxedError::new(
|
||||
VectorIndexBuildSnafu {
|
||||
reason: format!("Failed to reserve: {}", e),
|
||||
}
|
||||
.build(),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn size(&self) -> usize {
|
||||
self.index.size()
|
||||
}
|
||||
|
||||
fn capacity(&self) -> usize {
|
||||
self.index.capacity()
|
||||
}
|
||||
|
||||
fn memory_usage(&self) -> usize {
|
||||
self.index.memory_usage()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use index::vector::VectorDistanceMetric;
|
||||
use store_api::storage::VectorIndexEngineType;
|
||||
use usearch::MetricKind;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn test_config() -> VectorIndexConfig {
|
||||
VectorIndexConfig {
|
||||
engine: VectorIndexEngineType::Usearch,
|
||||
dim: 4,
|
||||
metric: MetricKind::L2sq,
|
||||
distance_metric: VectorDistanceMetric::L2sq,
|
||||
connectivity: 16,
|
||||
expansion_add: 128,
|
||||
expansion_search: 64,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_usearch_engine_create() {
|
||||
let config = test_config();
|
||||
let engine = UsearchEngine::create(&config).unwrap();
|
||||
assert_eq!(engine.size(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_usearch_engine_add_and_search() {
|
||||
let config = test_config();
|
||||
let mut engine = UsearchEngine::create(&config).unwrap();
|
||||
|
||||
// Add some vectors
|
||||
engine.add(0, &[1.0, 0.0, 0.0, 0.0]).unwrap();
|
||||
engine.add(1, &[0.0, 1.0, 0.0, 0.0]).unwrap();
|
||||
engine.add(2, &[0.0, 0.0, 1.0, 0.0]).unwrap();
|
||||
|
||||
assert_eq!(engine.size(), 3);
|
||||
|
||||
// Search
|
||||
let matches = engine.search(&[1.0, 0.0, 0.0, 0.0], 2).unwrap();
|
||||
assert_eq!(matches.keys.len(), 2);
|
||||
// First result should be the exact match (key 0)
|
||||
assert_eq!(matches.keys[0], 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_usearch_engine_serialization() {
|
||||
let config = test_config();
|
||||
let mut engine = UsearchEngine::create(&config).unwrap();
|
||||
|
||||
engine.add(0, &[1.0, 0.0, 0.0, 0.0]).unwrap();
|
||||
engine.add(1, &[0.0, 1.0, 0.0, 0.0]).unwrap();
|
||||
|
||||
// Serialize
|
||||
let len = engine.serialized_length();
|
||||
let mut buffer = vec![0u8; len];
|
||||
engine.save_to_buffer(&mut buffer).unwrap();
|
||||
|
||||
// Load
|
||||
let loaded = UsearchEngine::load(&config, &buffer).unwrap();
|
||||
assert_eq!(loaded.size(), 2);
|
||||
|
||||
// Verify search works on loaded index
|
||||
let matches = loaded.search(&[1.0, 0.0, 0.0, 0.0], 1).unwrap();
|
||||
assert_eq!(matches.keys[0], 0);
|
||||
}
|
||||
}
|
||||
22
src/mito2/src/sst/index/vector_index/mod.rs
Normal file
22
src/mito2/src/sst/index/vector_index/mod.rs
Normal file
@@ -0,0 +1,22 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Vector index module for HNSW-based approximate nearest neighbor search.
|
||||
|
||||
pub(crate) mod creator;
|
||||
pub(crate) mod engine;
|
||||
pub(crate) mod util;
|
||||
|
||||
/// The blob type identifier for vector index in puffin files.
|
||||
pub(crate) const INDEX_BLOB_TYPE: &str = "greptime-vector-index-v1";
|
||||
108
src/mito2/src/sst/index/vector_index/util.rs
Normal file
108
src/mito2/src/sst/index/vector_index/util.rs
Normal file
@@ -0,0 +1,108 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Utility functions for vector index operations.
|
||||
|
||||
use std::borrow::Cow;
|
||||
|
||||
/// Converts a byte slice (little-endian format) to f32 slice, handling unaligned data gracefully.
|
||||
/// Returns `Cow::Borrowed` for aligned data on little-endian systems (zero-copy)
|
||||
/// or `Cow::Owned` for unaligned data or big-endian systems.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the byte slice length is not a multiple of 4.
|
||||
pub fn bytes_to_f32_slice(bytes: &[u8]) -> Cow<'_, [f32]> {
|
||||
assert!(
|
||||
bytes.len().is_multiple_of(4),
|
||||
"Vector bytes length {} is not a multiple of 4",
|
||||
bytes.len()
|
||||
);
|
||||
|
||||
if bytes.is_empty() {
|
||||
return Cow::Borrowed(&[]);
|
||||
}
|
||||
|
||||
let ptr = bytes.as_ptr();
|
||||
// Fast path: zero-copy only when data is aligned AND we're on little-endian system
|
||||
// (since vector data is stored in little-endian format)
|
||||
#[cfg(target_endian = "little")]
|
||||
if (ptr as usize).is_multiple_of(std::mem::align_of::<f32>()) {
|
||||
// Safety: We've verified alignment and length requirements,
|
||||
// and on little-endian systems the byte representation matches f32 layout
|
||||
return Cow::Borrowed(unsafe {
|
||||
std::slice::from_raw_parts(ptr as *const f32, bytes.len() / 4)
|
||||
});
|
||||
}
|
||||
|
||||
// Slow path: data is not aligned or we're on big-endian system
|
||||
let floats: Vec<f32> = bytes
|
||||
.chunks_exact(4)
|
||||
.map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
|
||||
.collect();
|
||||
Cow::Owned(floats)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_bytes_to_f32_slice() {
|
||||
let floats = [1.0f32, 2.0, 3.0, 4.0];
|
||||
let bytes: Vec<u8> = floats.iter().flat_map(|f| f.to_le_bytes()).collect();
|
||||
|
||||
let result = bytes_to_f32_slice(&bytes);
|
||||
assert_eq!(result.len(), 4);
|
||||
assert_eq!(result[0], 1.0);
|
||||
assert_eq!(result[1], 2.0);
|
||||
assert_eq!(result[2], 3.0);
|
||||
assert_eq!(result[3], 4.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bytes_to_f32_slice_unaligned() {
|
||||
// Create a buffer with an extra byte at the start to force misalignment
|
||||
let floats = [1.0f32, 2.0, 3.0, 4.0];
|
||||
let mut bytes: Vec<u8> = vec![0u8]; // padding byte
|
||||
bytes.extend(floats.iter().flat_map(|f| f.to_le_bytes()));
|
||||
|
||||
// Take a slice starting at offset 1 (unaligned)
|
||||
let unaligned_bytes = &bytes[1..];
|
||||
|
||||
// Verify it's actually unaligned
|
||||
let ptr = unaligned_bytes.as_ptr();
|
||||
let is_aligned = (ptr as usize).is_multiple_of(std::mem::align_of::<f32>());
|
||||
|
||||
// The function should work regardless of alignment
|
||||
let result = bytes_to_f32_slice(unaligned_bytes);
|
||||
assert_eq!(result.len(), 4);
|
||||
assert_eq!(result[0], 1.0);
|
||||
assert_eq!(result[1], 2.0);
|
||||
assert_eq!(result[2], 3.0);
|
||||
assert_eq!(result[3], 4.0);
|
||||
|
||||
// If it was unaligned, it should return an owned Vec (Cow::Owned)
|
||||
if !is_aligned {
|
||||
assert!(matches!(result, Cow::Owned(_)));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bytes_to_f32_slice_empty() {
|
||||
let bytes: &[u8] = &[];
|
||||
let result = bytes_to_f32_slice(bytes);
|
||||
assert!(result.is_empty());
|
||||
}
|
||||
}
|
||||
@@ -742,6 +742,8 @@ mod tests {
|
||||
inverted_index_config: Default::default(),
|
||||
fulltext_index_config: Default::default(),
|
||||
bloom_filter_index_config: Default::default(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
};
|
||||
|
||||
let mut metrics = Metrics::new(WriteType::Flush);
|
||||
@@ -1152,6 +1154,8 @@ mod tests {
|
||||
inverted_index_config: Default::default(),
|
||||
fulltext_index_config: Default::default(),
|
||||
bloom_filter_index_config: Default::default(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1271,6 +1275,8 @@ mod tests {
|
||||
inverted_index_config: Default::default(),
|
||||
fulltext_index_config: Default::default(),
|
||||
bloom_filter_index_config: Default::default(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: Default::default(),
|
||||
};
|
||||
|
||||
let mut metrics = Metrics::new(WriteType::Flush);
|
||||
|
||||
@@ -64,6 +64,8 @@ impl<S> RegionWorkerLoop<S> {
|
||||
inverted_index_config: self.config.inverted_index.clone(),
|
||||
fulltext_index_config: self.config.fulltext_index.clone(),
|
||||
bloom_filter_index_config: self.config.bloom_filter_index.clone(),
|
||||
#[cfg(feature = "vector_index")]
|
||||
vector_index_config: self.config.vector_index.clone(),
|
||||
index_options: version.options.index_options.clone(),
|
||||
row_group_size: WriteOptions::default().row_group_size,
|
||||
intermediate_manager,
|
||||
|
||||
@@ -29,7 +29,7 @@ use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use datatypes::arrow;
|
||||
use datatypes::arrow::datatypes::FieldRef;
|
||||
use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef};
|
||||
use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef, VectorIndexOptions};
|
||||
use datatypes::types::TimestampType;
|
||||
use itertools::Itertools;
|
||||
use serde::de::Error;
|
||||
@@ -384,6 +384,22 @@ impl RegionMetadata {
|
||||
inverted_index
|
||||
}
|
||||
|
||||
/// Gets the column IDs that have vector indexes along with their options.
|
||||
/// Returns a map from column ID to the vector index options.
|
||||
pub fn vector_indexed_column_ids(&self) -> HashMap<ColumnId, VectorIndexOptions> {
|
||||
self.column_metadatas
|
||||
.iter()
|
||||
.filter_map(|column| {
|
||||
column
|
||||
.column_schema
|
||||
.vector_index_options()
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|options| (column.column_id, options))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Checks whether the metadata is valid.
|
||||
fn validate(&self) -> Result<()> {
|
||||
// Id to name.
|
||||
|
||||
@@ -1546,6 +1546,12 @@ create_on_compaction = "auto"
|
||||
apply_on_query = "auto"
|
||||
mem_threshold_on_create = "auto"
|
||||
|
||||
[region_engine.mito.vector_index]
|
||||
create_on_flush = "auto"
|
||||
create_on_compaction = "auto"
|
||||
apply_on_query = "auto"
|
||||
mem_threshold_on_create = "auto"
|
||||
|
||||
[region_engine.mito.memtable]
|
||||
type = "time_series"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user