chore: explain why choose USearch and distributed query

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
This commit is contained in:
Dennis Zhuang
2025-12-04 22:05:31 -08:00
parent 97fbfbea98
commit 4708cb64d0

View File

@@ -36,6 +36,151 @@ To support these use cases at scale, GreptimeDB needs an efficient vector index
3. Distributed vector index across datanodes
4. Real-time index updates (follows SST lifecycle)
## Why USearch
We evaluated several vector index libraries before selecting [USearch](https://github.com/unum-cloud/usearch). This section explains our rationale.
### Evaluation Criteria
| Criterion | Weight | Description |
|-----------|--------|-------------|
| Rust Support | High | Native Rust API or high-quality bindings |
| Performance | High | Competitive indexing and search speed |
| Memory Efficiency | High | Reasonable memory footprint for large indexes |
| Persistence | High | Ability to serialize/deserialize indexes |
| Maintenance | Medium | Active development and community |
| Build Complexity | Medium | Ease of integration into our build system |
| License | High | Permissive license compatible with Apache 2.0 |
### Libraries Evaluated
#### 1. USearch ✓ (Selected)
[USearch](https://github.com/unum-cloud/usearch) is a single-file vector search engine developed by Unum.
**Strengths:**
- **First-class Rust support**: Official `usearch` crate with safe Rust API via cxx bindings
- **Performance**: Consistently ranks among the fastest in [ANN benchmarks](http://ann-benchmarks.com/)
- **Compact implementation**: Single-header C++ core, minimal dependencies
- **Flexible serialization**: `save_to_buffer`/`load_from_buffer` for in-memory serialization, ideal for Puffin blob storage
- **Memory-mapped support**: `view_from_file` for zero-copy index loading
- **SIMD optimization**: Leverages SimSIMD for hardware-accelerated distance calculations (AVX-512, NEON)
- **Multiple metrics**: Cosine, L2, Inner Product, Hamming, Jaccard, and custom metrics
- **Quantization options**: f32, f64, f16, i8 for memory/accuracy tradeoffs
- **Active maintenance**: Regular releases, responsive maintainers
- **Apache 2.0 license**: Fully compatible with GreptimeDB
**Weaknesses:**
- C++ dependency (via cxx), though build is straightforward
- Less index variety compared to FAISS (HNSW only)
**Benchmark Performance** (from USearch documentation):
| Operation | Performance |
|-----------|-------------|
| Index construction | ~1M vectors/sec (f32, 128-dim) |
| Search throughput | ~100K QPS (single-threaded) |
| Memory overhead | ~1.1x raw vector size |
#### 2. FAISS (Meta)
[FAISS](https://github.com/facebookresearch/faiss) is Meta's comprehensive similarity search library.
**Strengths:**
- Extensive index types: IVF, PQ, HNSW, LSH, and combinations
- GPU acceleration (CUDA)
- Production-proven at massive scale
**Weaknesses:**
- **No official Rust bindings**: Third-party bindings exist but are incomplete or unmaintained
- **Complex C++ build**: Requires careful CMake configuration, optional GPU dependencies
- **Heavy dependency**: Brings in OpenMP, BLAS/LAPACK, potentially MKL
- **Overkill for our needs**: Most advanced features (GPU, IVF+PQ) are not immediately needed
**Verdict**: Too heavyweight and poor Rust integration.
#### 3. Hnswlib
[Hnswlib](https://github.com/nmslib/hnswlib) is the reference HNSW implementation.
**Strengths:**
- Reference implementation of HNSW algorithm
- Simple API
**Weaknesses:**
- **No official Rust bindings**: Community bindings are outdated
- **Maintenance concerns**: Less active development in recent years
- **Limited features**: No built-in quantization, fewer distance metrics
**Verdict**: USearch provides better Rust support and is more actively maintained.
#### 4. Annoy (Spotify)
[Annoy](https://github.com/spotify/annoy) uses random projection trees.
**Strengths:**
- Memory-mapped by design, excellent for read-heavy workloads
- Simple API
**Weaknesses:**
- **Immutable indexes**: Cannot add vectors after build (requires full rebuild)
- **Slower search**: Random projection trees are generally slower than HNSW
- **Limited metrics**: Only Euclidean, Manhattan, Angular, Hamming
**Verdict**: Immutability is acceptable for SST use case, but performance is inferior to HNSW.
#### 5. Milvus Knowhere
[Knowhere](https://github.com/milvus-io/knowhere) is Milvus's vector search engine.
**Strengths:**
- Multiple algorithm support
- Designed for production use
**Weaknesses:**
- **No standalone Rust crate**: Tightly coupled with Milvus
- **Complex build**: Many dependencies
- **Less suitable for embedding**: Designed as a service component
**Verdict**: Not designed for library usage.
#### 6. Custom Implementation
Building our own HNSW implementation in pure Rust.
**Strengths:**
- Full control over implementation
- No external dependencies
- Can be tailored to our exact needs
**Weaknesses:**
- **Significant engineering effort**: HNSW is complex, especially with good SIMD optimization
- **Performance risk**: Unlikely to match years of optimization in USearch/FAISS
- **Maintenance burden**: Bug fixes, performance tuning, new features
**Verdict**: Not justified given high-quality existing libraries.
### Decision Matrix
| Library | Rust Support | Performance | Build | Maintenance | License | Score |
|---------|--------------|-------------|-------|-------------|---------|-------|
| **USearch** | ★★★★★ | ★★★★★ | ★★★★☆ | ★★★★★ | Apache 2.0 | **24/25** |
| FAISS | ★★☆☆☆ | ★★★★★ | ★★☆☆☆ | ★★★★★ | MIT | 18/25 |
| Hnswlib | ★★☆☆☆ | ★★★★☆ | ★★★☆☆ | ★★★☆☆ | Apache 2.0 | 15/25 |
| Annoy | ★★★☆☆ | ★★★☆☆ | ★★★★☆ | ★★★☆☆ | Apache 2.0 | 16/25 |
| Custom | ★★★★★ | ★★★☆☆ | ★★★★★ | ★★☆☆☆ | N/A | 17/25 |
### Conclusion
USearch provides the best combination of:
1. **Native Rust support** with a well-maintained crate
2. **Top-tier performance** in ANN benchmarks
3. **Simple integration** with minimal build complexity
4. **Flexible persistence** that fits our Puffin blob storage model
5. **Active development** with responsive maintainers
6. **Permissive licensing** compatible with Apache 2.0
The only trade-off is the C++ dependency via cxx, which is acceptable given GreptimeDB already uses cxx for other components.
## Design Overview
### Architecture
@@ -641,33 +786,522 @@ The distance functions serve dual purposes:
| `src/sql/src/parsers/` | Parse VECTOR INDEX DDL |
| `src/common/function/src/scalars/vector/` | No changes (fallback preserved) |
## Alternatives Considered
## Distributed Vector Index
### 1. FAISS
- **Pros**: More index types (IVF, PQ), GPU support
- **Cons**: Heavier dependency, complex C++ build, less Rust-native
This section describes how vector index searching works in distributed mode, where data is partitioned across multiple regions on different datanodes.
### 2. Annoy (Spotify)
- **Pros**: Simple, memory-mapped
- **Cons**: Slower build time, cannot add vectors after build
### Architecture Overview
### 3. Hnswlib
- **Pros**: Reference HNSW implementation
- **Cons**: Less maintained, no official Rust bindings
```
┌─────────────────────────────────────────────────────────────────────────┐
│ Frontend │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ VectorSearchOptimizer │ │
│ │ Detects: ORDER BY vec_distance(col, query) LIMIT k │ │
│ │ Rewrites to: DistributedVectorAnnPlan │ │
│ └───────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ MergeScanExec │ │
│ │ Distributes VectorAnnScan to regions │ │
│ │ Merges top-k results from all datanodes │ │
│ └───────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Datanode 1 │ │ Datanode 2 │ │ Datanode 3 │
│ ┌────────┐ │ │ ┌────────┐ │ │ ┌────────┐ │
│ │Region 1│ │ │ │Region 2│ │ │ │Region 3│ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ HNSW │ │ │ │ HNSW │ │ │ │ HNSW │ │
│ │ Index │ │ │ │ Index │ │ │ │ Index │ │
│ └────────┘ │ │ └────────┘ │ │ └────────┘ │
│ │ │ │ │ │ │ │ │
│ ▼ │ │ ▼ │ │ ▼ │
│ Top-k local │ │ Top-k local │ │ Top-k local │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
└────────────────────┼────────────────────┘
┌─────────────────┐
│ Global Top-k │
│ Merge at │
│ Frontend │
└─────────────────┘
```
### 4. Custom HNSW Implementation
- **Pros**: Full control, no external dependency
- **Cons**: Significant engineering effort, unlikely to match USearch performance
### Distributed Query Flow
**Decision**: USearch provides the best balance of performance, Rust support, and maintenance.
#### Step 1: Query Detection and Planning
The `VectorSearchOptimizer` at the frontend detects vector search patterns and creates a distributed plan:
```rust
impl PhysicalOptimizerRule for VectorSearchOptimizer {
fn optimize(&self, plan: Arc<dyn ExecutionPlan>, ...) -> Result<Arc<dyn ExecutionPlan>> {
// Detect: ORDER BY vec_*_distance(col, query) LIMIT k
let (column, query_vector, metric, k) = extract_vector_search_pattern(&plan)?;
// Check if column has vector index
let index_meta = self.get_vector_index_meta(&column)?;
// Create distributed vector search plan
// This will be wrapped in MergeScanExec for distribution
Ok(Arc::new(DistributedVectorAnnPlan::new(
column,
query_vector,
metric,
k,
index_meta,
)))
}
}
```
#### Step 2: Plan Distribution via MergeScan
The distributed plan integrates with GreptimeDB's existing `MergeScanExec`:
```rust
/// Distributed vector ANN scan that executes on each region
pub struct VectorAnnRegionPlan {
/// Column containing vectors
column: Column,
/// Query vector for similarity search
query_vector: Vec<f32>,
/// Distance metric
metric: VectorMetric,
/// Number of results to return per region
/// Over-fetch to improve global recall
local_k: usize,
/// Index configuration
index_config: VectorIndexConfig,
}
impl VectorAnnRegionPlan {
/// Calculate local_k based on global k and region count
/// Over-fetching improves recall when merging results
pub fn calculate_local_k(global_k: usize, region_count: usize) -> usize {
// Heuristic: fetch more from each region to ensure good global recall
// For small k: fetch k * multiplier
// For large k: fetch k + buffer
let multiplier = match region_count {
1 => 1,
2..=4 => 2,
5..=10 => 3,
_ => 4,
};
(global_k * multiplier).max(global_k + 10)
}
}
```
#### Step 3: Datanode Local Execution
Each datanode executes the vector search on its local regions:
```rust
impl RegionServer {
async fn handle_vector_ann_query(
&self,
request: VectorAnnRequest,
) -> Result<VectorAnnResponse> {
let region = self.get_region(request.region_id)?;
let mut all_candidates = Vec::new();
// Search each SST's vector index in this region
for sst in region.sst_files() {
if let Some(applier) = sst.vector_index_applier(&request.column)? {
applier.load()?;
// Local ANN search
let matches = applier.search(&request.query_vector, request.local_k)?;
for (row_id, distance) in matches {
all_candidates.push(VectorMatch {
row_id,
distance,
sst_id: sst.id(),
});
}
} else {
// Fallback: brute-force for SSTs without index
let matches = self.brute_force_search(sst, &request)?;
all_candidates.extend(matches);
}
}
// Sort and return top local_k
all_candidates.sort_by(|a, b| a.distance.partial_cmp(&b.distance).unwrap());
all_candidates.truncate(request.local_k);
// Fetch actual row data for candidates
let rows = self.fetch_rows(&all_candidates)?;
Ok(VectorAnnResponse { rows, distances: all_candidates })
}
}
```
#### Step 4: Global Merge at Frontend
The frontend merges results from all regions:
```rust
pub struct VectorAnnMergeExec {
/// Results from all regions
region_streams: Vec<SendableRecordBatchStream>,
/// Global k (final result count)
k: usize,
/// Distance column index for sorting
distance_col_idx: usize,
}
impl ExecutionPlan for VectorAnnMergeExec {
fn execute(&self, partition: usize, context: Arc<TaskContext>)
-> Result<SendableRecordBatchStream>
{
// Collect all results from regions
let mut all_results: Vec<(RecordBatch, f32)> = Vec::new();
for stream in &self.region_streams {
while let Some(batch) = stream.next().await? {
let distances = batch.column(self.distance_col_idx)
.as_any().downcast_ref::<Float32Array>()?;
for (i, distance) in distances.iter().enumerate() {
if let Some(d) = distance {
all_results.push((batch.slice(i, 1), d));
}
}
}
}
// Global sort by distance
all_results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
// Take global top-k
all_results.truncate(self.k);
// Combine into final batch
let final_batch = concat_batches(&all_results)?;
Ok(Box::pin(MemoryStream::new(vec![final_batch])))
}
}
```
### Recall Optimization Strategies
Distributed ANN search faces a recall challenge: each region returns its local top-k, but the global top-k may require results ranked lower in individual regions.
#### Strategy 1: Over-fetching
Fetch more candidates from each region than the final k:
```rust
pub struct OverfetchConfig {
/// Multiplier for local k (local_k = global_k * multiplier)
pub multiplier: f32,
/// Minimum over-fetch count
pub min_overfetch: usize,
/// Maximum over-fetch count (to limit network traffic)
pub max_overfetch: usize,
}
impl Default for OverfetchConfig {
fn default() -> Self {
Self {
multiplier: 2.0,
min_overfetch: 10,
max_overfetch: 1000,
}
}
}
fn calculate_local_k(global_k: usize, region_count: usize, config: &OverfetchConfig) -> usize {
let base = (global_k as f32 * config.multiplier) as usize;
base.clamp(global_k + config.min_overfetch, config.max_overfetch)
}
```
#### Strategy 2: Adaptive Expansion
If initial results have poor distance distribution, expand search:
```rust
pub struct AdaptiveVectorSearch {
/// Initial local k
initial_k: usize,
/// Maximum expansion rounds
max_rounds: usize,
/// Distance threshold for expansion trigger
expansion_threshold: f32,
}
impl AdaptiveVectorSearch {
async fn search(&self, regions: &[RegionId], query: &[f32], global_k: usize)
-> Result<Vec<VectorMatch>>
{
let mut local_k = self.initial_k;
let mut results = Vec::new();
for round in 0..self.max_rounds {
results = self.search_regions(regions, query, local_k).await?;
if results.len() >= global_k {
// Check distance distribution
let distances: Vec<f32> = results.iter().map(|r| r.distance).collect();
let variance = calculate_variance(&distances[..global_k]);
// If distances are tightly clustered, likely good recall
if variance < self.expansion_threshold {
break;
}
}
// Expand search
local_k *= 2;
}
results.truncate(global_k);
Ok(results)
}
}
```
#### Strategy 3: Distance-based Filtering
Use the k-th distance from first round as a filter for subsequent fetches:
```rust
pub struct DistanceFilteredSearch {
global_k: usize,
}
impl DistanceFilteredSearch {
async fn search(&self, regions: &[RegionId], query: &[f32]) -> Result<Vec<VectorMatch>> {
// Round 1: Get initial candidates with small k
let initial_k = self.global_k * 2;
let mut results = self.search_all_regions(regions, query, initial_k).await?;
results.sort_by(|a, b| a.distance.partial_cmp(&b.distance).unwrap());
if results.len() < self.global_k {
return Ok(results);
}
// Get distance threshold (k-th best distance)
let threshold = results[self.global_k - 1].distance;
// Round 2: Fetch all results within threshold from each region
// This catches candidates that were ranked > initial_k locally
// but are within the global top-k distance
let additional = self.search_with_threshold(regions, query, threshold).await?;
results.extend(additional);
results.sort_by(|a, b| a.distance.partial_cmp(&b.distance).unwrap());
results.dedup_by(|a, b| a.row_id == b.row_id);
results.truncate(self.global_k);
Ok(results)
}
}
```
### Region Pruning for Vector Search
Leverage partition metadata to skip irrelevant regions:
```rust
pub struct VectorSearchRegionPruner {
partition_manager: PartitionRuleManagerRef,
}
impl VectorSearchRegionPruner {
/// Prune regions based on query predicates
/// Vector search often has time-based or category-based filters
async fn prune_regions(
&self,
table_id: TableId,
predicates: &[Expr],
all_regions: &[RegionId],
) -> Result<Vec<RegionId>> {
// Extract partition-relevant predicates
let partition_exprs = PredicateExtractor::extract_partition_expressions(
predicates,
&self.get_partition_columns(table_id).await?,
)?;
if partition_exprs.is_empty() {
// No pruning possible, search all regions
return Ok(all_regions.to_vec());
}
// Use existing constraint pruner
let partitions = self.partition_manager
.find_table_partitions(table_id)
.await?;
ConstraintPruner::prune_regions(&partition_exprs, &partitions)
}
}
```
### Combining with Pre-filters
Vector search often includes scalar predicates (e.g., `WHERE category = 'A'`):
```rust
/// Execution strategy for filtered vector search
pub enum FilteredVectorStrategy {
/// Filter first, then vector search on filtered rows
/// Best when filter is highly selective
PreFilter,
/// Vector search first, then apply filter
/// Best when filter has low selectivity
PostFilter,
/// Hybrid: use filter to prune, then vector search
/// Best for moderate selectivity
Hybrid { selectivity_threshold: f32 },
}
impl FilteredVectorSearch {
fn choose_strategy(&self, filter_selectivity: f32) -> FilteredVectorStrategy {
match filter_selectivity {
s if s < 0.01 => FilteredVectorStrategy::PreFilter,
s if s > 0.5 => FilteredVectorStrategy::PostFilter,
_ => FilteredVectorStrategy::Hybrid { selectivity_threshold: 0.1 },
}
}
async fn execute_prefilter(
&self,
region: &Region,
filter: &Expr,
query: &[f32],
k: usize,
) -> Result<Vec<VectorMatch>> {
// 1. Apply scalar filter to get candidate row IDs
let filtered_row_ids = region.evaluate_filter(filter).await?;
// 2. If few candidates, use brute-force
if filtered_row_ids.len() < k * 10 {
return self.brute_force_on_rows(region, &filtered_row_ids, query, k).await;
}
// 3. Otherwise, use index with row ID filter
let applier = region.vector_index_applier()?;
applier.search_with_filter(query, k, &filtered_row_ids)
}
}
```
### Protocol Definition
```protobuf
// Vector ANN request sent to datanodes
message VectorAnnRequest {
RegionRequestHeader header = 1;
uint64 region_id = 2;
string column_name = 3;
repeated float query_vector = 4;
VectorMetric metric = 5;
uint32 local_k = 6;
// Optional: scalar predicates to apply
bytes filter_expr = 7;
}
message VectorAnnResponse {
repeated VectorMatch matches = 1;
}
message VectorMatch {
uint64 row_id = 1;
float distance = 2;
// Row data (all columns)
bytes row_data = 3;
}
enum VectorMetric {
COSINE = 0;
L2_SQUARED = 1;
INNER_PRODUCT = 2;
}
```
### Configuration
```rust
/// Distributed vector search configuration
pub struct DistributedVectorConfig {
/// Over-fetch multiplier for local k calculation
pub overfetch_multiplier: f32,
/// Maximum candidates to fetch from each region
pub max_local_k: usize,
/// Enable adaptive expansion
pub adaptive_expansion: bool,
/// Maximum expansion rounds
pub max_expansion_rounds: usize,
/// Timeout for individual region queries
pub region_timeout: Duration,
/// Whether to use parallel region queries
pub parallel_regions: bool,
}
impl Default for DistributedVectorConfig {
fn default() -> Self {
Self {
overfetch_multiplier: 2.0,
max_local_k: 1000,
adaptive_expansion: false,
max_expansion_rounds: 3,
region_timeout: Duration::from_secs(30),
parallel_regions: true,
}
}
}
```
### Implementation Plan (Additional Phases)
#### Phase 5: Distributed Vector Search
- [ ] Add `VectorAnnRequest`/`VectorAnnResponse` to region protocol
- [ ] Implement `VectorAnnRegionHandler` in datanode
- [ ] Implement `VectorAnnMergeExec` for frontend result merging
- [ ] Add distributed vector search to `MergeScanExec` integration
- [ ] Implement over-fetching strategy
#### Phase 6: Advanced Distributed Features
- [ ] Implement adaptive expansion for recall improvement
- [ ] Add region pruning for vector search with predicates
- [ ] Implement pre-filter/post-filter strategy selection
- [ ] Add distributed vector search metrics and monitoring
### Files to Modify (Additional)
| Path | Change |
|------|--------|
| `src/api/src/region.proto` | Add VectorAnnRequest/Response messages |
| `src/datanode/src/region_server.rs` | Handle vector ANN requests |
| `src/query/src/dist_plan/merge_scan.rs` | Integrate vector search distribution |
| `src/query/src/dist_plan/vector_merge.rs` | New: VectorAnnMergeExec |
| `src/frontend/src/instance/region_query.rs` | Route vector ANN requests |
## Future Extensions
1. **Quantization**: Support int8/binary quantization for reduced memory
2. **Filtering**: Pre-filtering with predicates before ANN search
3. **Distributed Index**: Shard vector index across datanodes
4. **Hybrid Search**: Combine vector similarity with full-text search
5. **Index Advisor**: Automatic index recommendation based on query patterns
3. **Hybrid Search**: Combine vector similarity with full-text search
4. **Index Advisor**: Automatic index recommendation based on query patterns
5. **Cross-Region Index**: Global HNSW index spanning multiple regions (research)
## References