Compare commits

...

3 Commits

Author SHA1 Message Date
dennis zhuang
a56a00224f feat: impl vector index scan in storage (#7528)
* feat: impl vector index scan in storage

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* feat: fallback to read remote blob when blob not found

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: refactor encoding and decoding and apply suggestions

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: license

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: add apply_with_k tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: apply suggestions

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: forgot to align nulls when the vector column is not in the batch

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: add test for vector column is not in a batch while buiilding

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
2026-01-12 08:30:51 +00:00
discord9
6487f14f70 feat: gc schd update repart mapping (#7517)
* feat(gc): batch gc now alos handle routing

Signed-off-by: discord9 <discord9@163.com>

typo

Signed-off-by: discord9 <discord9@163.com>

s

Signed-off-by: discord9 <discord9@163.com>

feat: use batch gc procedure

Signed-off-by: discord9 <discord9@163.com>

feat: cross region refs

Signed-off-by: discord9 <discord9@163.com>

feat: clean up repartition

Signed-off-by: discord9 <discord9@163.com>

chore: cleanup

Signed-off-by: discord9 <discord9@163.com>

per review

Signed-off-by: discord9 <discord9@163.com>

test: update mock test

Signed-off-by: discord9 <discord9@163.com>

refactor: rm unused

Signed-off-by: discord9 <discord9@163.com>

refactor: invert related_regions

Signed-off-by: discord9 <discord9@163.com>

clippy

Signed-off-by: discord9 <discord9@163.com>

pcr

Signed-off-by: discord9 <discord9@163.com>

chore: remove unused

Signed-off-by: discord9 <discord9@163.com>

fix: after invert fix

Signed-off-by: discord9 <discord9@163.com>

chore: rm unused

Signed-off-by: discord9 <discord9@163.com>

refactor: eff

Signed-off-by: discord9 <discord9@163.com>

docs: chore

Signed-off-by: discord9 <discord9@163.com>

* after rebase fix

Signed-off-by: discord9 <discord9@163.com>

* chore

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

* fix: mssing region

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2026-01-12 08:28:34 +00:00
Ruihang Xia
45b4067721 feat: always canonicalize partition expr (#7553)
* feat: always canonicalize partition expr

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix ut assertion

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-12 07:24:29 +00:00
33 changed files with 2077 additions and 833 deletions

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::time::Duration;
@@ -432,11 +432,11 @@ where
pub struct GetFileRefs {
/// List of region IDs to get file references from active FileHandles (in-memory).
pub query_regions: Vec<RegionId>,
/// Mapping from the source region ID (where to read the manifest) to
/// the target region IDs (whose file references to look for).
/// Key: The region ID of the manifest.
/// Value: The list of region IDs to find references for in that manifest.
pub related_regions: HashMap<RegionId, Vec<RegionId>>,
/// Mapping from the src region IDs (whose file references to look for) to
/// the dst region IDs (where to read the manifests).
/// Key: The source region IDs (where files originally came from).
/// Value: The set of destination region IDs (whose manifests need to be read).
pub related_regions: HashMap<RegionId, HashSet<RegionId>>,
}
impl Display for GetFileRefs {

View File

@@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO(discord9): remove this once gc scheduler is fully merged
#![allow(unused)]
use std::collections::{HashMap, HashSet};
use common_meta::peer::Peer;
@@ -29,6 +26,7 @@ mod options;
mod procedure;
mod scheduler;
mod tracker;
mod util;
pub use options::GcSchedulerOptions;
pub use procedure::BatchGcProcedure;

View File

@@ -12,29 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::datanode::RegionStat;
use common_meta::instruction::{
GcRegions, GetFileRefs, GetFileRefsReply, Instruction, InstructionReply,
};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::peer::Peer;
use common_procedure::{ProcedureManagerRef, ProcedureWithId, watcher};
use common_telemetry::{debug, error, warn};
use common_telemetry::debug;
use snafu::{OptionExt as _, ResultExt as _};
use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
use store_api::storage::{GcReport, RegionId};
use table::metadata::TableId;
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, Result, TableMetadataManagerSnafu, UnexpectedSnafu};
use crate::gc::Region2Peers;
use crate::gc::procedure::{BatchGcProcedure, GcRegionProcedure};
use crate::handler::HeartbeatMailbox;
use crate::service::mailbox::{Channel, MailboxRef};
use crate::error::{self, Result, TableMetadataManagerSnafu};
use crate::gc::procedure::BatchGcProcedure;
use crate::service::mailbox::MailboxRef;
#[async_trait::async_trait]
pub(crate) trait SchedulerCtx: Send + Sync {
@@ -45,19 +38,9 @@ pub(crate) trait SchedulerCtx: Send + Sync {
table_id: TableId,
) -> Result<(TableId, PhysicalTableRouteValue)>;
async fn get_file_references(
&self,
query_regions: &[RegionId],
related_regions: HashMap<RegionId, Vec<RegionId>>,
region_routes: &Region2Peers,
timeout: Duration,
) -> Result<FileRefsManifest>;
async fn gc_regions(
&self,
peer: Peer,
region_ids: &[RegionId],
file_refs_manifest: &FileRefsManifest,
full_file_listing: bool,
timeout: Duration,
) -> Result<GcReport>;
@@ -100,7 +83,7 @@ impl SchedulerCtx for DefaultGcSchedulerCtx {
let dn_stats = self.meta_peer_client.get_all_dn_stat_kvs().await?;
let mut table_to_region_stats: HashMap<TableId, Vec<RegionStat>> = HashMap::new();
for (_dn_id, stats) in dn_stats {
let mut stats = stats.stats;
let stats = stats.stats;
let Some(latest_stat) = stats.iter().max_by_key(|s| s.timestamp_millis).cloned() else {
continue;
@@ -129,142 +112,34 @@ impl SchedulerCtx for DefaultGcSchedulerCtx {
async fn gc_regions(
&self,
peer: Peer,
region_ids: &[RegionId],
file_refs_manifest: &FileRefsManifest,
full_file_listing: bool,
timeout: Duration,
) -> Result<GcReport> {
self.gc_regions_inner(
peer,
region_ids,
file_refs_manifest,
full_file_listing,
timeout,
)
.await
}
async fn get_file_references(
&self,
query_regions: &[RegionId],
related_regions: HashMap<RegionId, Vec<RegionId>>,
region_routes: &Region2Peers,
timeout: Duration,
) -> Result<FileRefsManifest> {
debug!(
"Getting file references for {} regions",
query_regions.len()
);
// Group regions by datanode to minimize RPC calls
let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
for region_id in query_regions {
if let Some((leader, followers)) = region_routes.get(region_id) {
datanode2query_regions
.entry(leader.clone())
.or_default()
.push(*region_id);
// also need to send for follower regions for file refs in case query is running on follower
for follower in followers {
datanode2query_regions
.entry(follower.clone())
.or_default()
.push(*region_id);
}
} else {
return error::UnexpectedSnafu {
violated: format!(
"region_routes: {region_routes:?} does not contain region_id: {region_id}",
),
}
.fail();
}
}
let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, Vec<RegionId>>> =
HashMap::new();
for (related_region, queries) in related_regions {
if let Some((leader, followers)) = region_routes.get(&related_region) {
datanode2related_regions
.entry(leader.clone())
.or_default()
.insert(related_region, queries.clone());
} // since read from manifest, no need to send to followers
}
// Send GetFileRefs instructions to each datanode
let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
let mut all_manifest_versions = HashMap::new();
for (peer, regions) in datanode2query_regions {
let related_regions = datanode2related_regions.remove(&peer).unwrap_or_default();
match self
.send_get_file_refs_instruction(&peer, &regions, related_regions, timeout)
.await
{
Ok(manifest) => {
// TODO(discord9): if other regions provide file refs for one region on other datanode, and no version,
// is it correct to merge manifest_version directly?
// FIXME: follower region how to merge version???
for (region_id, file_refs) in manifest.file_refs {
all_file_refs
.entry(region_id)
.or_default()
.extend(file_refs);
}
// region manifest version should be the smallest one among all peers, so outdated region can be detected
for (region_id, version) in manifest.manifest_version {
let entry = all_manifest_versions.entry(region_id).or_insert(version);
*entry = (*entry).min(version);
}
}
Err(e) => {
warn!(
"Failed to get file refs from datanode {}: {}. Skipping regions on this datanode.",
peer, e
);
// Continue processing other datanodes instead of failing the entire operation
continue;
}
}
}
Ok(FileRefsManifest {
file_refs: all_file_refs,
manifest_version: all_manifest_versions,
})
self.gc_regions_inner(region_ids, full_file_listing, timeout)
.await
}
}
impl DefaultGcSchedulerCtx {
async fn gc_regions_inner(
&self,
peer: Peer,
region_ids: &[RegionId],
file_refs_manifest: &FileRefsManifest,
full_file_listing: bool,
timeout: Duration,
) -> Result<GcReport> {
debug!(
"Sending GC instruction to datanode {} for {} regions (full_file_listing: {})",
peer,
"Sending GC instruction for {} regions (full_file_listing: {})",
region_ids.len(),
full_file_listing
);
let gc_regions = GcRegions {
regions: region_ids.to_vec(),
file_refs_manifest: file_refs_manifest.clone(),
full_file_listing,
};
let procedure = GcRegionProcedure::new(
let procedure = BatchGcProcedure::new(
self.mailbox.clone(),
self.table_metadata_manager.clone(),
self.server_addr.clone(),
peer,
gc_regions,
format!("GC for {} regions", region_ids.len()),
region_ids.to_vec(),
full_file_listing,
timeout,
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
@@ -285,96 +160,8 @@ impl DefaultGcSchedulerCtx {
),
})?;
let gc_report = GcRegionProcedure::cast_result(res)?;
let gc_report = BatchGcProcedure::cast_result(res)?;
Ok(gc_report)
}
/// TODO(discord9): add support to read manifest of related regions for file refs too
/// (now it's only reading active FileHandles)
async fn send_get_file_refs_instruction(
&self,
peer: &Peer,
query_regions: &[RegionId],
related_regions: HashMap<RegionId, Vec<RegionId>>,
timeout: Duration,
) -> Result<FileRefsManifest> {
debug!(
"Sending GetFileRefs instruction to datanode {} for {} regions",
peer,
query_regions.len()
);
let instruction = Instruction::GetFileRefs(GetFileRefs {
query_regions: query_regions.to_vec(),
related_regions,
});
let reply = self
.send_instruction(peer, instruction, "Get file references", timeout)
.await?;
let InstructionReply::GetFileRefs(GetFileRefsReply {
file_refs_manifest,
success,
error,
}) = reply
else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: format!("{:?}", reply),
reason: "Unexpected reply of the GetFileRefs instruction",
}
.fail();
};
if !success {
return error::UnexpectedSnafu {
violated: format!(
"Failed to get file references from datanode {}: {:?}",
peer, error
),
}
.fail();
}
Ok(file_refs_manifest)
}
async fn send_instruction(
&self,
peer: &Peer,
instruction: Instruction,
description: &str,
timeout: Duration,
) -> Result<InstructionReply> {
let msg = MailboxMessage::json_message(
&format!("{}: {}", description, instruction),
&format!("Metasrv@{}", self.server_addr),
&format!("Datanode-{}@{}", peer.id, peer.addr),
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
let mailbox_rx = self
.mailbox
.send(&Channel::Datanode(peer.id), msg, timeout)
.await?;
match mailbox_rx.await {
Ok(reply_msg) => {
let reply = HeartbeatMailbox::json_reply(&reply_msg)?;
Ok(reply)
}
Err(e) => {
error!(
"Failed to receive reply from datanode {} for {}: {}",
peer, description, e
);
Err(e)
}
}
}
}

View File

@@ -15,24 +15,17 @@
use std::collections::{HashMap, HashSet};
use std::time::Instant;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::peer::Peer;
use common_telemetry::{debug, error, info, warn};
use futures::StreamExt;
use itertools::Itertools;
use store_api::storage::{FileRefsManifest, GcReport, RegionId};
use store_api::storage::{GcReport, RegionId};
use table::metadata::TableId;
use tokio::time::sleep;
use crate::error::Result;
use crate::gc::candidate::GcCandidate;
use crate::gc::scheduler::{GcJobReport, GcScheduler};
use crate::gc::tracker::RegionGcInfo;
use crate::region;
pub(crate) type Region2Peers = HashMap<RegionId, (Peer, Vec<Peer>)>;
pub(crate) type Peer2Regions = HashMap<Peer, HashSet<RegionId>>;
impl GcScheduler {
/// Iterate through all region stats, find region that might need gc, and send gc instruction to
@@ -61,6 +54,8 @@ impl GcScheduler {
.aggregate_candidates_by_datanode(per_table_candidates)
.await?;
// TODO(discord9): add deleted regions from repartition mapping
if datanode_to_candidates.is_empty() {
info!("No valid datanode candidates found, skipping GC cycle");
return Ok(Default::default());
@@ -83,17 +78,6 @@ impl GcScheduler {
Ok(report)
}
/// Find related regions that might share files with the candidate regions.
/// Currently returns the same regions since repartition is not implemented yet.
/// TODO(discord9): When repartition is implemented, this should also find src/dst regions
/// that might share files with the candidate regions.
pub(crate) async fn find_related_regions(
&self,
candidate_region_ids: &[RegionId],
) -> Result<HashMap<RegionId, Vec<RegionId>>> {
Ok(candidate_region_ids.iter().map(|&r| (r, vec![r])).collect())
}
/// Aggregate GC candidates by their corresponding datanode peer.
pub(crate) async fn aggregate_candidates_by_datanode(
&self,
@@ -210,28 +194,11 @@ impl GcScheduler {
let all_region_ids: Vec<RegionId> = candidates.iter().map(|(_, c)| c.region_id).collect();
let all_related_regions = self.find_related_regions(&all_region_ids).await?;
let (region_to_peer, _) = self
.discover_datanodes_for_regions(&all_related_regions.keys().cloned().collect_vec())
.await?;
// Step 1: Get file references for all regions on this datanode
let file_refs_manifest = self
.ctx
.get_file_references(
&all_region_ids,
all_related_regions,
&region_to_peer,
self.config.mailbox_timeout,
)
.await?;
// Step 2: Create a single GcRegionProcedure for all regions on this datanode
// Step 2: Run GC for all regions on this datanode in a single batch
let (gc_report, fully_listed_regions) = {
// Partition regions into full listing and fast listing in a single pass
let mut batch_full_listing_decisions =
let batch_full_listing_decisions =
self.batch_should_use_full_listing(&all_region_ids).await;
let need_full_list_regions = batch_full_listing_decisions
@@ -242,7 +209,7 @@ impl GcScheduler {
},
)
.collect_vec();
let mut fast_list_regions = batch_full_listing_decisions
let fast_list_regions = batch_full_listing_decisions
.iter()
.filter_map(
|(&region_id, &need_full)| {
@@ -257,13 +224,7 @@ impl GcScheduler {
if !fast_list_regions.is_empty() {
match self
.ctx
.gc_regions(
peer.clone(),
&fast_list_regions,
&file_refs_manifest,
false,
self.config.mailbox_timeout,
)
.gc_regions(&fast_list_regions, false, self.config.mailbox_timeout)
.await
{
Ok(report) => combined_report.merge(report),
@@ -284,13 +245,7 @@ impl GcScheduler {
if !need_full_list_regions.is_empty() {
match self
.ctx
.gc_regions(
peer.clone(),
&need_full_list_regions,
&file_refs_manifest,
true,
self.config.mailbox_timeout,
)
.gc_regions(&need_full_list_regions, true, self.config.mailbox_timeout)
.await
{
Ok(report) => combined_report.merge(report),
@@ -330,98 +285,6 @@ impl GcScheduler {
Ok(gc_report)
}
/// Discover datanodes for the given regions(and it's related regions) by fetching table routes in batches.
/// Returns mappings from region to peer(leader, Vec<followers>) and peer to regions.
async fn discover_datanodes_for_regions(
&self,
regions: &[RegionId],
) -> Result<(Region2Peers, Peer2Regions)> {
let all_related_regions = self
.find_related_regions(regions)
.await?
.into_iter()
.flat_map(|(k, mut v)| {
v.push(k);
v
})
.collect_vec();
let mut region_to_peer = HashMap::new();
let mut peer_to_regions = HashMap::new();
// Group regions by table ID for batch processing
let mut table_to_regions: HashMap<TableId, Vec<RegionId>> = HashMap::new();
for region_id in all_related_regions {
let table_id = region_id.table_id();
table_to_regions
.entry(table_id)
.or_default()
.push(region_id);
}
// Process each table's regions together for efficiency
for (table_id, table_regions) in table_to_regions {
match self.ctx.get_table_route(table_id).await {
Ok((_phy_table_id, table_route)) => {
self.get_table_regions_peer(
&table_route,
&table_regions,
&mut region_to_peer,
&mut peer_to_regions,
);
}
Err(e) => {
// Continue with other tables instead of failing completely
// TODO(discord9): consider failing here instead
warn!(
"Failed to get table route for table {}: {}, skipping its regions",
table_id, e
);
continue;
}
}
}
Ok((region_to_peer, peer_to_regions))
}
/// Process regions for a single table to find their current leader peers.
fn get_table_regions_peer(
&self,
table_route: &PhysicalTableRouteValue,
table_regions: &[RegionId],
region_to_peer: &mut Region2Peers,
peer_to_regions: &mut Peer2Regions,
) {
for &region_id in table_regions {
let mut found = false;
// Find the region in the table route
for region_route in &table_route.region_routes {
if region_route.region.id == region_id
&& let Some(leader_peer) = &region_route.leader_peer
{
region_to_peer.insert(
region_id,
(leader_peer.clone(), region_route.follower_peers.clone()),
);
peer_to_regions
.entry(leader_peer.clone())
.or_default()
.insert(region_id);
found = true;
break;
}
}
if !found {
warn!(
"Failed to find region {} in table route or no leader peer found",
region_id,
);
}
}
}
async fn batch_should_use_full_listing(
&self,
region_ids: &[RegionId],

View File

@@ -36,10 +36,9 @@ use store_api::storage::{FileRefsManifest, GcReport, RegionId};
use table::metadata::TableId;
use tokio::sync::mpsc::Sender;
use crate::error::{Result, UnexpectedSnafu};
use crate::error::Result;
use crate::gc::candidate::GcCandidate;
use crate::gc::ctx::SchedulerCtx;
use crate::gc::handler::Region2Peers;
use crate::gc::options::GcSchedulerOptions;
use crate::gc::scheduler::{Event, GcScheduler};
@@ -67,12 +66,10 @@ pub struct MockSchedulerCtx {
pub gc_reports: Arc<Mutex<HashMap<RegionId, GcReport>>>,
pub candidates: Arc<Mutex<Option<HashMap<TableId, Vec<GcCandidate>>>>>,
pub get_table_to_region_stats_calls: Arc<Mutex<usize>>,
pub get_file_references_calls: Arc<Mutex<usize>>,
pub gc_regions_calls: Arc<Mutex<usize>>,
// Error injection fields for testing
pub get_table_to_region_stats_error: Arc<Mutex<Option<crate::error::Error>>>,
pub get_table_route_error: Arc<Mutex<Option<crate::error::Error>>>,
pub get_file_references_error: Arc<Mutex<Option<crate::error::Error>>>,
pub gc_regions_error: Arc<Mutex<Option<crate::error::Error>>>,
// Retry testing fields
pub gc_regions_retry_count: Arc<Mutex<HashMap<RegionId, usize>>>,
@@ -119,57 +116,12 @@ impl MockSchedulerCtx {
*self.get_table_route_error.lock().unwrap() = Some(error);
}
/// Set an error to be returned by `get_file_references`
#[allow(dead_code)]
pub fn with_get_file_references_error(self, error: crate::error::Error) -> Self {
*self.get_file_references_error.lock().unwrap() = Some(error);
self
}
/// Set an error to be returned by `gc_regions`
pub fn with_gc_regions_error(self, error: crate::error::Error) -> Self {
*self.gc_regions_error.lock().unwrap() = Some(error);
self
}
/// Set a sequence of errors to be returned by `gc_regions` for retry testing
pub fn set_gc_regions_error_sequence(&self, errors: Vec<crate::error::Error>) {
*self.gc_regions_error_sequence.lock().unwrap() = errors;
}
/// Set success after a specific number of retries for a region
pub fn set_gc_regions_success_after_retries(&self, region_id: RegionId, retries: usize) {
self.gc_regions_success_after_retries
.lock()
.unwrap()
.insert(region_id, retries);
}
/// Get the retry count for a specific region
pub fn get_retry_count(&self, region_id: RegionId) -> usize {
self.gc_regions_retry_count
.lock()
.unwrap()
.get(&region_id)
.copied()
.unwrap_or(0)
}
/// Reset all retry tracking
pub fn reset_retry_tracking(&self) {
*self.gc_regions_retry_count.lock().unwrap() = HashMap::new();
*self.gc_regions_error_sequence.lock().unwrap() = Vec::new();
*self.gc_regions_success_after_retries.lock().unwrap() = HashMap::new();
}
/// Set an error to be returned for a specific region
pub fn set_gc_regions_error_for_region(&self, region_id: RegionId, error: crate::error::Error) {
self.gc_regions_per_region_errors
.lock()
.unwrap()
.insert(region_id, error);
}
/// Clear per-region errors
#[allow(unused)]
pub fn clear_gc_regions_per_region_errors(&self) {
@@ -213,39 +165,9 @@ impl SchedulerCtx for MockSchedulerCtx {
.unwrap_or_else(|| (table_id, PhysicalTableRouteValue::default())))
}
async fn get_file_references(
&self,
query_regions: &[RegionId],
_related_regions: HashMap<RegionId, Vec<RegionId>>,
region_to_peer: &Region2Peers,
_timeout: Duration,
) -> Result<FileRefsManifest> {
*self.get_file_references_calls.lock().unwrap() += 1;
// Check if we should return an injected error
if let Some(error) = self.get_file_references_error.lock().unwrap().take() {
return Err(error);
}
if query_regions
.iter()
.any(|region_id| !region_to_peer.contains_key(region_id))
{
UnexpectedSnafu {
violated: format!(
"region_to_peer{region_to_peer:?} does not contain all region_ids requested: {:?}",
query_regions
),
}.fail()?;
}
Ok(self.file_refs.lock().unwrap().clone().unwrap_or_default())
}
async fn gc_regions(
&self,
_peer: Peer,
region_ids: &[RegionId],
_file_refs_manifest: &FileRefsManifest,
_full_file_listing: bool,
_timeout: Duration,
) -> Result<GcReport> {

View File

@@ -152,7 +152,6 @@ async fn test_handle_tick() {
);
assert_eq!(*ctx.get_table_to_region_stats_calls.lock().unwrap(), 1);
assert_eq!(*ctx.get_file_references_calls.lock().unwrap(), 1);
assert_eq!(*ctx.gc_regions_calls.lock().unwrap(), 1);
let tracker = scheduler.region_gc_tracker.lock().await;

View File

@@ -64,6 +64,7 @@ async fn test_gc_regions_failure_handling() {
region_id,
HashSet::from([FileRef::new(region_id, FileId::random(), None)]),
)]),
cross_region_refs: HashMap::new(),
};
let ctx = Arc::new(
@@ -121,10 +122,6 @@ async fn test_gc_regions_failure_handling() {
1,
"Expected 1 call to get_table_to_region_stats"
);
assert!(
*ctx.get_file_references_calls.lock().unwrap() >= 1,
"Expected at least 1 call to get_file_references"
);
assert!(
*ctx.gc_regions_calls.lock().unwrap() >= 1,
"Expected at least 1 call to gc_regions"
@@ -206,13 +203,6 @@ async fn test_get_file_references_failure() {
datanode_report.deleted_files[&region_id].is_empty(),
"Should have empty deleted files due to file refs failure"
);
// Should still attempt to get file references (may be called multiple times due to retry logic)
assert!(
*ctx.get_file_references_calls.lock().unwrap() >= 1,
"Expected at least 1 call to get_file_references, got {}",
*ctx.get_file_references_calls.lock().unwrap()
);
}
#[tokio::test]
@@ -255,42 +245,22 @@ async fn test_get_table_route_failure() {
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
// Get candidates first
let stats = &ctx
.table_to_region_stats
.lock()
.unwrap()
.clone()
.unwrap_or_default();
let candidates = scheduler.select_gc_candidates(stats).await.unwrap();
// Test the full workflow to trigger table route failure during aggregation
// The table route failure should cause the entire GC cycle to fail
let result = scheduler.handle_tick().await;
// Convert table-based candidates to datanode-based candidates
let datanode_to_candidates = HashMap::from([(
Peer::new(1, ""),
candidates
.into_iter()
.flat_map(|(table_id, candidates)| candidates.into_iter().map(move |c| (table_id, c)))
.collect(),
)]);
// This should handle table route failure gracefully
let report = scheduler
.parallel_process_datanodes(datanode_to_candidates)
.await;
// Should process the datanode but handle route error gracefully
assert_eq!(
report.per_datanode_reports.len(),
0,
"Expected 0 datanode report"
);
assert_eq!(
report.failed_datanodes.len(),
1,
"Expected 1 failed datanodes (route error handled gracefully)"
);
// The table route failure should be propagated as an error
assert!(
report.failed_datanodes.contains_key(&1),
"Failed datanodes should contain the datanode with route error"
result.is_err(),
"Expected table route failure to propagate as error"
);
// Verify the error message contains our simulated failure
let error = result.unwrap_err();
let error_msg = format!("{}", error);
assert!(
error_msg.contains("Simulated table route failure for testing"),
"Error message should contain our simulated failure: {}",
error_msg
);
}

View File

@@ -123,11 +123,6 @@ async fn test_full_gc_workflow() {
1,
"Expected 1 call to get_table_to_region_stats"
);
assert_eq!(
*ctx.get_file_references_calls.lock().unwrap(),
1,
"Expected 1 call to get_file_references"
);
assert_eq!(
*ctx.gc_regions_calls.lock().unwrap(),
1,

View File

@@ -19,6 +19,8 @@ use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{self, GcRegions, GetFileRefs, GetFileRefsReply, InstructionReply};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::lock_key::RegionLock;
use common_meta::peer::Peer;
use common_procedure::error::ToJsonSnafu;
@@ -26,14 +28,16 @@ use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status,
};
use common_telemetry::{debug, error, info, warn};
use common_telemetry::{error, info, warn};
use itertools::Itertools as _;
use serde::{Deserialize, Serialize};
use snafu::ResultExt as _;
use store_api::storage::{FileRefsManifest, GcReport, RegionId};
use table::metadata::TableId;
use crate::error::{self, Result, SerializeToJsonSnafu};
use crate::gc::Region2Peers;
use crate::error::{self, KvBackendSnafu, Result, SerializeToJsonSnafu, TableMetadataManagerSnafu};
use crate::gc::util::table_route_to_region;
use crate::gc::{Peer2Regions, Region2Peers};
use crate::handler::HeartbeatMailbox;
use crate::service::mailbox::{Channel, MailboxRef};
@@ -146,56 +150,73 @@ async fn send_gc_regions(
}
}
/// TODO(discord9): another procedure which do both get file refs and gc regions.
pub struct GcRegionProcedure {
/// Procedure to perform get file refs then batch GC for multiple regions,
/// it holds locks for all regions during the whole procedure.
pub struct BatchGcProcedure {
mailbox: MailboxRef,
data: GcRegionData,
table_metadata_manager: TableMetadataManagerRef,
data: BatchGcData,
}
#[derive(Serialize, Deserialize)]
pub struct GcRegionData {
pub struct BatchGcData {
state: State,
/// Meta server address
server_addr: String,
peer: Peer,
gc_regions: GcRegions,
description: String,
/// The regions to be GC-ed
regions: Vec<RegionId>,
full_file_listing: bool,
region_routes: Region2Peers,
/// Related regions (e.g., for shared files after repartition).
/// The source regions (where those files originally came from) are used as the key, and the destination regions (where files are currently stored) are used as the value.
related_regions: HashMap<RegionId, HashSet<RegionId>>,
/// Acquired file references (Populated in Acquiring state)
file_refs: FileRefsManifest,
/// mailbox timeout duration
timeout: Duration,
gc_report: Option<GcReport>,
}
impl GcRegionProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::GcRegionProcedure";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum State {
/// Initial state
Start,
/// Fetching file references from datanodes
Acquiring,
/// Sending GC instruction to the target datanode
Gcing,
/// Updating region repartition info in kvbackend after GC based on the GC result
UpdateRepartition,
}
impl BatchGcProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure";
pub fn new(
mailbox: MailboxRef,
table_metadata_manager: TableMetadataManagerRef,
server_addr: String,
peer: Peer,
gc_regions: GcRegions,
description: String,
regions: Vec<RegionId>,
full_file_listing: bool,
timeout: Duration,
) -> Self {
Self {
mailbox,
data: GcRegionData {
peer,
table_metadata_manager,
data: BatchGcData {
state: State::Start,
server_addr,
gc_regions,
description,
regions,
full_file_listing,
timeout,
region_routes: HashMap::new(),
related_regions: HashMap::new(),
file_refs: FileRefsManifest::default(),
gc_report: None,
},
}
}
async fn send_gc_instr(&self) -> Result<GcReport> {
send_gc_regions(
&self.mailbox,
&self.data.peer,
self.data.gc_regions.clone(),
&self.data.server_addr,
self.data.timeout,
&self.data.description,
)
.await
}
pub fn cast_result(res: Arc<dyn Any>) -> Result<GcReport> {
res.downcast_ref::<GcReport>().cloned().ok_or_else(|| {
error::UnexpectedSnafu {
@@ -207,111 +228,129 @@ impl GcRegionProcedure {
.build()
})
}
}
#[async_trait::async_trait]
impl Procedure for GcRegionProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
// Send GC instruction to the datanode. This procedure only handle lock&send, results or other kind of
// errors will be reported back via the oneshot channel.
let reply = self
.send_gc_instr()
async fn get_table_route(
&self,
table_id: TableId,
) -> Result<(TableId, PhysicalTableRouteValue)> {
self.table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await
.map_err(ProcedureError::external)?;
Ok(Status::done_with_output(reply))
.context(TableMetadataManagerSnafu)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
/// Read lock all regions involved in this GC procedure.
/// So i.e. region migration won't happen during GC and cause race conditions.
///
/// only read lock the regions not catatlog/schema because it can run concurrently with other procedures(i.e. drop database/table)
/// TODO:(discord9): integration test to verify this
fn lock_key(&self) -> LockKey {
let lock_key: Vec<_> = self
.data
.gc_regions
.regions
.iter()
.sorted() // sort to have a deterministic lock order
.map(|id| RegionLock::Read(*id).into())
.collect();
LockKey::new(lock_key)
}
}
/// Procedure to perform get file refs then batch GC for multiple regions, should only be used by admin function
/// for triggering manual gc, as it holds locks for too long and for all regions during the procedure.
pub struct BatchGcProcedure {
mailbox: MailboxRef,
data: BatchGcData,
}
#[derive(Serialize, Deserialize)]
pub struct BatchGcData {
state: State,
server_addr: String,
/// The regions to be GC-ed
regions: Vec<RegionId>,
full_file_listing: bool,
region_routes: Region2Peers,
/// Related regions (e.g., for shared files). Map: RegionId -> List of related RegionIds.
related_regions: HashMap<RegionId, Vec<RegionId>>,
/// Acquired file references (Populated in Acquiring state)
file_refs: FileRefsManifest,
/// mailbox timeout duration
timeout: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum State {
/// Initial state
Start,
/// Fetching file references from datanodes
Acquiring,
/// Sending GC instruction to the target datanode
Gcing,
}
impl BatchGcProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure";
pub fn new(
mailbox: MailboxRef,
server_addr: String,
regions: Vec<RegionId>,
full_file_listing: bool,
region_routes: Region2Peers,
related_regions: HashMap<RegionId, Vec<RegionId>>,
timeout: Duration,
) -> Self {
Self {
mailbox,
data: BatchGcData {
state: State::Start,
server_addr,
regions,
full_file_listing,
region_routes,
related_regions,
file_refs: FileRefsManifest::default(),
timeout,
},
/// Return related regions for the given regions.
/// The returned map uses the source regions (where those files originally came from) as the key,
/// and the destination regions (where files are currently stored) as the value.
/// If a region is not found in the repartition manager, the returned map still have this region as key,
/// just empty value
async fn find_related_regions(
&self,
regions: &[RegionId],
) -> Result<HashMap<RegionId, HashSet<RegionId>>> {
let repart_mgr = self.table_metadata_manager.table_repart_manager();
let mut related_regions: HashMap<RegionId, HashSet<RegionId>> = HashMap::new();
for src_region in regions {
// TODO(discord9): batch get
if let Some(dst_regions) = repart_mgr
.get_dst_regions(*src_region)
.await
.context(KvBackendSnafu)?
{
related_regions.insert(*src_region, dst_regions.into_iter().collect());
} else {
related_regions.insert(*src_region, Default::default());
}
}
Ok(related_regions)
}
/// Clean up region repartition info in kvbackend after GC
/// according to cross reference in `FileRefsManifest`.
async fn cleanup_region_repartition(&self) -> Result<()> {
for (src_region, dst_regions) in self.data.file_refs.cross_region_refs.iter() {
// TODO(discord9): batch update
self.table_metadata_manager
.table_repart_manager()
.update_mappings(*src_region, &dst_regions.iter().cloned().collect_vec())
.await
.context(KvBackendSnafu)?;
}
Ok(())
}
/// Discover region routes for the given regions.
async fn discover_route_for_regions(
&self,
regions: &[RegionId],
) -> Result<(Region2Peers, Peer2Regions)> {
let mut region_to_peer = HashMap::new();
let mut peer_to_regions = HashMap::new();
// Group regions by table ID for batch processing
let mut table_to_regions: HashMap<TableId, Vec<RegionId>> = HashMap::new();
for region_id in regions {
let table_id = region_id.table_id();
table_to_regions
.entry(table_id)
.or_default()
.push(*region_id);
}
// Process each table's regions together for efficiency
for (table_id, table_regions) in table_to_regions {
match self.get_table_route(table_id).await {
Ok((_phy_table_id, table_route)) => {
table_route_to_region(
&table_route,
&table_regions,
&mut region_to_peer,
&mut peer_to_regions,
);
}
Err(e) => {
// Continue with other tables instead of failing completely
// TODO(discord9): consider failing here instead
warn!(
"Failed to get table route for table {}: {}, skipping its regions",
table_id, e
);
continue;
}
}
}
Ok((region_to_peer, peer_to_regions))
}
/// Set region routes and related regions for GC procedure
async fn set_routes_and_related_regions(&mut self) -> Result<()> {
let related_regions = self.find_related_regions(&self.data.regions).await?;
self.data.related_regions = related_regions.clone();
// Discover routes for all regions involved in GC, including both the
// primary GC regions and their related regions.
let mut regions_set: HashSet<RegionId> = self.data.regions.iter().cloned().collect();
regions_set.extend(related_regions.keys().cloned());
regions_set.extend(related_regions.values().flat_map(|v| v.iter()).cloned());
let regions_to_discover = regions_set.into_iter().collect_vec();
let (region_to_peer, _) = self
.discover_route_for_regions(&regions_to_discover)
.await?;
self.data.region_routes = region_to_peer;
Ok(())
}
/// Get file references from all datanodes that host the regions
async fn get_file_references(&self) -> Result<FileRefsManifest> {
use std::collections::{HashMap, HashSet};
async fn get_file_references(&mut self) -> Result<FileRefsManifest> {
self.set_routes_and_related_regions().await?;
let query_regions = &self.data.regions;
let related_regions = &self.data.related_regions;
@@ -344,20 +383,25 @@ impl BatchGcProcedure {
}
}
let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, Vec<RegionId>>> =
let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, HashSet<RegionId>>> =
HashMap::new();
for (related_region, queries) in related_regions {
if let Some((leader, _followers)) = region_routes.get(related_region) {
datanode2related_regions
.entry(leader.clone())
.or_default()
.insert(*related_region, queries.clone());
} // since read from manifest, no need to send to followers
for (src_region, dst_regions) in related_regions {
for dst_region in dst_regions {
if let Some((leader, _followers)) = region_routes.get(dst_region) {
datanode2related_regions
.entry(leader.clone())
.or_default()
.entry(*src_region)
.or_default()
.insert(*dst_region);
} // since read from manifest, no need to send to followers
}
}
// Send GetFileRefs instructions to each datanode
let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
let mut all_manifest_versions = HashMap::new();
let mut all_cross_region_refs = HashMap::new();
for (peer, regions) in datanode2query_regions {
let related_regions_for_peer =
@@ -400,17 +444,25 @@ impl BatchGcProcedure {
let entry = all_manifest_versions.entry(region_id).or_insert(version);
*entry = (*entry).min(version);
}
for (region_id, related_region_ids) in reply.file_refs_manifest.cross_region_refs {
let entry = all_cross_region_refs
.entry(region_id)
.or_insert_with(HashSet::new);
entry.extend(related_region_ids);
}
}
Ok(FileRefsManifest {
file_refs: all_file_refs,
manifest_version: all_manifest_versions,
cross_region_refs: all_cross_region_refs,
})
}
/// Send GC instruction to all datanodes that host the regions,
/// returns regions that need retry.
async fn send_gc_instructions(&self) -> Result<Vec<RegionId>> {
async fn send_gc_instructions(&self) -> Result<GcReport> {
let regions = &self.data.regions;
let region_routes = &self.data.region_routes;
let file_refs = &self.data.file_refs;
@@ -418,6 +470,7 @@ impl BatchGcProcedure {
// Group regions by datanode
let mut datanode2regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
let mut all_report = GcReport::default();
for region_id in regions {
if let Some((leader, _followers)) = region_routes.get(region_id) {
@@ -469,10 +522,15 @@ impl BatchGcProcedure {
peer, success, need_retry
);
}
all_need_retry.extend(report.need_retry_regions);
all_need_retry.extend(report.need_retry_regions.clone());
all_report.merge(report);
}
Ok(all_need_retry.into_iter().collect())
if !all_need_retry.is_empty() {
warn!("Regions need retry after batch GC: {:?}", all_need_retry);
}
Ok(all_report)
}
}
@@ -507,12 +565,10 @@ impl Procedure for BatchGcProcedure {
// Send GC instructions to all datanodes
// TODO(discord9): handle need-retry regions
match self.send_gc_instructions().await {
Ok(_) => {
info!(
"Batch GC completed successfully for regions {:?}",
self.data.regions
);
Ok(Status::done())
Ok(report) => {
self.data.state = State::UpdateRepartition;
self.data.gc_report = Some(report);
Ok(Status::executing(false))
}
Err(e) => {
error!("Failed to send GC instructions: {}", e);
@@ -520,6 +576,29 @@ impl Procedure for BatchGcProcedure {
}
}
}
State::UpdateRepartition => match self.cleanup_region_repartition().await {
Ok(()) => {
info!(
"Cleanup region repartition info completed successfully for regions {:?}",
self.data.regions
);
info!(
"Batch GC completed successfully for regions {:?}",
self.data.regions
);
let Some(report) = self.data.gc_report.take() else {
return common_procedure::error::UnexpectedSnafu {
err_msg: "GC report should be present after GC completion".to_string(),
}
.fail();
};
Ok(Status::done_with_output(report))
}
Err(e) => {
error!("Failed to cleanup region repartition info: {}", e);
Err(ProcedureError::external(e))
}
},
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
@@ -38,29 +38,6 @@ pub struct GcJobReport {
pub per_datanode_reports: HashMap<DatanodeId, GcReport>,
pub failed_datanodes: HashMap<DatanodeId, Vec<Error>>,
}
impl GcJobReport {
pub fn merge(&mut self, mut other: GcJobReport) {
// merge per_datanode_reports&failed_datanodes
for (dn_id, report) in other.per_datanode_reports {
let self_report = self.per_datanode_reports.entry(dn_id).or_default();
self_report.merge(report);
}
let all_failed_dn_ids = self
.failed_datanodes
.keys()
.cloned()
.chain(other.failed_datanodes.keys().cloned())
.collect::<HashSet<_>>();
for dn_id in all_failed_dn_ids {
let entry = self.failed_datanodes.entry(dn_id).or_default();
if let Some(other_errors) = other.failed_datanodes.remove(&dn_id) {
entry.extend(other_errors);
}
}
self.failed_datanodes
.retain(|dn_id, _| !self.per_datanode_reports.contains_key(dn_id));
}
}
/// [`Event`] represents various types of events that can be processed by the gc ticker.
///

View File

@@ -30,15 +30,6 @@ pub(crate) struct RegionGcInfo {
pub(crate) last_full_listing_time: Option<Instant>,
}
impl RegionGcInfo {
pub(crate) fn new(last_gc_time: Instant) -> Self {
Self {
last_gc_time,
last_full_listing_time: None,
}
}
}
/// Tracks the last GC time for regions to implement cooldown.
pub(crate) type RegionGcTracker = HashMap<RegionId, RegionGcInfo>;
@@ -46,7 +37,7 @@ impl GcScheduler {
/// Clean up stale entries from the region GC tracker if enough time has passed.
/// This removes entries for regions that no longer exist in the current table routes.
pub(crate) async fn cleanup_tracker_if_needed(&self) -> Result<()> {
let mut last_cleanup = *self.last_tracker_cleanup.lock().await;
let last_cleanup = *self.last_tracker_cleanup.lock().await;
let now = Instant::now();
// Check if enough time has passed since last cleanup
@@ -85,25 +76,6 @@ impl GcScheduler {
Ok(())
}
/// Determine if full file listing should be used for a region based on the last full listing time.
pub(crate) async fn should_use_full_listing(&self, region_id: RegionId) -> bool {
let gc_tracker = self.region_gc_tracker.lock().await;
let now = Instant::now();
if let Some(gc_info) = gc_tracker.get(&region_id) {
if let Some(last_full_listing) = gc_info.last_full_listing_time {
let elapsed = now.saturating_duration_since(last_full_listing);
elapsed >= self.config.full_file_listing_interval
} else {
// Never did full listing for this region, do it now
true
}
} else {
// First time GC for this region, do full listing
true
}
}
pub(crate) async fn update_full_listing_time(
&self,
region_id: RegionId,

View File

@@ -0,0 +1,55 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_telemetry::warn;
use store_api::storage::RegionId;
use crate::gc::{Peer2Regions, Region2Peers};
pub fn table_route_to_region(
table_route: &PhysicalTableRouteValue,
table_regions: &[RegionId],
region_to_peer: &mut Region2Peers,
peer_to_regions: &mut Peer2Regions,
) {
for &region_id in table_regions {
let mut found = false;
// Find the region in the table route
for region_route in &table_route.region_routes {
if region_route.region.id == region_id
&& let Some(leader_peer) = &region_route.leader_peer
{
region_to_peer.insert(
region_id,
(leader_peer.clone(), region_route.follower_peers.clone()),
);
peer_to_regions
.entry(leader_peer.clone())
.or_default()
.insert(region_id);
found = true;
break;
}
}
if !found {
warn!(
"Failed to find region {} in table route or no leader peer found",
region_id,
);
}
}
}

View File

@@ -42,6 +42,8 @@ use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelect
use crate::cache::cache_size::parquet_meta_size;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
#[cfg(feature = "vector_index")]
use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
use crate::cache::write_cache::WriteCacheRef;
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
use crate::read::Batch;
@@ -247,6 +249,16 @@ impl CacheStrategy {
}
}
/// Calls [CacheManager::vector_index_cache()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
#[cfg(feature = "vector_index")]
pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
/// Calls [CacheManager::puffin_metadata_cache()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
@@ -303,6 +315,9 @@ pub struct CacheManager {
inverted_index_cache: Option<InvertedIndexCacheRef>,
/// Cache for bloom filter index.
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
/// Cache for vector index.
#[cfg(feature = "vector_index")]
vector_index_cache: Option<VectorIndexCacheRef>,
/// Puffin metadata cache.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
/// Cache for time series selectors.
@@ -434,6 +449,11 @@ impl CacheManager {
cache.invalidate_file(file_id.file_id());
}
#[cfg(feature = "vector_index")]
if let Some(cache) = &self.vector_index_cache {
cache.invalidate_file(file_id.file_id());
}
if let Some(cache) = &self.puffin_metadata_cache {
cache.remove(&file_id.to_string());
}
@@ -486,6 +506,11 @@ impl CacheManager {
self.bloom_filter_index_cache.as_ref()
}
#[cfg(feature = "vector_index")]
pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
self.vector_index_cache.as_ref()
}
pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
self.puffin_metadata_cache.as_ref()
}
@@ -646,6 +671,9 @@ impl CacheManagerBuilder {
self.index_content_size,
self.index_content_page_size,
);
#[cfg(feature = "vector_index")]
let vector_index_cache = (self.index_content_size != 0)
.then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
let index_result_cache = (self.index_result_cache_size != 0)
.then(|| IndexResultCache::new(self.index_result_cache_size));
let puffin_metadata_cache =
@@ -672,6 +700,8 @@ impl CacheManagerBuilder {
write_cache: self.write_cache,
inverted_index_cache: Some(Arc::new(inverted_index_cache)),
bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
#[cfg(feature = "vector_index")]
vector_index_cache,
puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
selector_result_cache,
index_result_cache,

View File

@@ -15,6 +15,8 @@
pub mod bloom_filter_index;
pub mod inverted_index;
pub mod result_cache;
#[cfg(feature = "vector_index")]
pub mod vector_index;
use std::future::Future;
use std::hash::Hash;

View File

@@ -0,0 +1,137 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use moka::notification::RemovalCause;
use moka::sync::Cache;
use roaring::RoaringBitmap;
use store_api::storage::{ColumnId, FileId, IndexVersion, VectorDistanceMetric, VectorIndexEngine};
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION};
const VECTOR_INDEX_CACHE_TYPE: &str = "vector_index";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct VectorIndexCacheKey {
file_id: FileId,
index_version: IndexVersion,
column_id: ColumnId,
}
impl VectorIndexCacheKey {
pub fn new(file_id: FileId, index_version: IndexVersion, column_id: ColumnId) -> Self {
Self {
file_id,
index_version,
column_id,
}
}
}
pub struct CachedVectorIndex {
pub engine: Box<dyn VectorIndexEngine>,
pub null_bitmap: RoaringBitmap,
pub size_bytes: usize,
pub dimensions: u32,
pub metric: VectorDistanceMetric,
pub total_rows: u64,
pub indexed_rows: u64,
}
impl CachedVectorIndex {
pub fn new(
engine: Box<dyn VectorIndexEngine>,
null_bitmap: RoaringBitmap,
dimensions: u32,
metric: VectorDistanceMetric,
total_rows: u64,
indexed_rows: u64,
) -> Self {
let size_bytes =
engine.memory_usage() + null_bitmap.serialized_size() + std::mem::size_of::<Self>();
Self {
engine,
null_bitmap,
size_bytes,
dimensions,
metric,
total_rows,
indexed_rows,
}
}
}
impl std::fmt::Debug for CachedVectorIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CachedVectorIndex")
.field("size_bytes", &self.size_bytes)
.field("dimensions", &self.dimensions)
.field("metric", &self.metric)
.field("total_rows", &self.total_rows)
.field("indexed_rows", &self.indexed_rows)
.field("null_bitmap_len", &self.null_bitmap.len())
.finish()
}
}
pub struct VectorIndexCache {
inner: Cache<VectorIndexCacheKey, Arc<CachedVectorIndex>>,
}
pub type VectorIndexCacheRef = Arc<VectorIndexCache>;
impl VectorIndexCache {
pub fn new(capacity: u64) -> Self {
fn to_str(cause: RemovalCause) -> &'static str {
match cause {
RemovalCause::Expired => "expired",
RemovalCause::Explicit => "explicit",
RemovalCause::Replaced => "replaced",
RemovalCause::Size => "size",
}
}
let inner = Cache::builder()
.max_capacity(capacity)
.weigher(|_k, v: &Arc<CachedVectorIndex>| v.size_bytes as u32)
.eviction_listener(|_k, v, cause| {
CACHE_BYTES
.with_label_values(&[VECTOR_INDEX_CACHE_TYPE])
.sub(v.size_bytes as i64);
CACHE_EVICTION
.with_label_values(&[VECTOR_INDEX_CACHE_TYPE, to_str(cause)])
.inc();
})
.build();
Self { inner }
}
pub fn get(&self, key: &VectorIndexCacheKey) -> Option<Arc<CachedVectorIndex>> {
self.inner.get(key)
}
pub fn insert(&self, key: VectorIndexCacheKey, value: Arc<CachedVectorIndex>) {
CACHE_BYTES
.with_label_values(&[VECTOR_INDEX_CACHE_TYPE])
.add(value.size_bytes as i64);
self.inner.insert(key, value);
}
pub fn invalidate_file(&self, file_id: FileId) {
let _ = self
.inner
.invalidate_entries_if(move |k, _| k.file_id == file_id);
}
}

View File

@@ -81,7 +81,7 @@ mod apply_staging_manifest_test;
mod puffin_index;
use std::any::Any;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Instant;
@@ -303,7 +303,7 @@ impl MitoEngine {
pub async fn get_snapshot_of_file_refs(
&self,
file_handle_regions: impl IntoIterator<Item = RegionId>,
manifest_regions: HashMap<RegionId, Vec<RegionId>>,
related_regions: HashMap<RegionId, HashSet<RegionId>>,
) -> Result<FileRefsManifest> {
let file_ref_mgr = self.file_ref_manager();
@@ -315,15 +315,30 @@ impl MitoEngine {
.filter_map(|region_id| self.find_region(region_id))
.collect();
let related_regions: Vec<(MitoRegionRef, Vec<RegionId>)> = manifest_regions
.into_iter()
.filter_map(|(related_region, queries)| {
self.find_region(related_region).map(|r| (r, queries))
})
.collect();
let dst_region_to_src_regions: Vec<(MitoRegionRef, HashSet<RegionId>)> = {
let dst2src = related_regions
.into_iter()
.flat_map(|(src, dsts)| dsts.into_iter().map(move |dst| (dst, src)))
.fold(
HashMap::<RegionId, HashSet<RegionId>>::new(),
|mut acc, (k, v)| {
let entry = acc.entry(k).or_default();
entry.insert(v);
acc
},
);
let mut dst_region_to_src_regions = Vec::with_capacity(dst2src.len());
for (dst_region, srcs) in dst2src {
let Some(dst_region) = self.find_region(dst_region) else {
continue;
};
dst_region_to_src_regions.push((dst_region, srcs));
}
dst_region_to_src_regions
};
file_ref_mgr
.get_snapshot_of_file_refs(query_regions, related_regions)
.get_snapshot_of_file_refs(query_regions, dst_region_to_src_regions)
.await
}

View File

@@ -652,6 +652,14 @@ pub enum Error {
location: Location,
},
#[cfg(feature = "vector_index")]
#[snafu(display("Failed to apply vector index: {}", reason))]
ApplyVectorIndex {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to push index value"))]
PushIndexValue {
source: index::inverted_index::error::Error,
@@ -1324,6 +1332,8 @@ impl ErrorExt for Error {
| PushIndexValue { source, .. }
| ApplyInvertedIndex { source, .. }
| IndexFinish { source, .. } => source.status_code(),
#[cfg(feature = "vector_index")]
ApplyVectorIndex { .. } => StatusCode::Internal,
PuffinReadBlob { source, .. }
| PuffinAddBlob { source, .. }
| PuffinInitStager { source, .. }

View File

@@ -134,6 +134,7 @@ async fn test_gc_worker_basic_truncate() {
let file_ref_manifest = FileRefsManifest {
file_refs: Default::default(),
manifest_version: [(region_id, version)].into(),
cross_region_refs: HashMap::new(),
};
let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await;
let report = gc_worker.run().await.unwrap();
@@ -232,6 +233,7 @@ async fn test_gc_worker_truncate_with_ref() {
)]
.into(),
manifest_version: [(region_id, version)].into(),
cross_region_refs: HashMap::new(),
};
let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await;
let report = gc_worker.run().await.unwrap();
@@ -313,6 +315,7 @@ async fn test_gc_worker_basic_compact() {
let file_ref_manifest = FileRefsManifest {
file_refs: Default::default(),
manifest_version: [(region_id, version)].into(),
cross_region_refs: HashMap::new(),
};
let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await;
@@ -399,6 +402,7 @@ async fn test_gc_worker_compact_with_ref() {
.collect(),
)]),
manifest_version: [(region_id, version)].into(),
cross_region_refs: HashMap::new(),
};
let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await;

View File

@@ -70,11 +70,15 @@ use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
#[cfg(feature = "vector_index")]
use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
use crate::sst::parquet::file_range::PreFilterMode;
use crate::sst::parquet::reader::ReaderMetrics;
/// Parallel scan channel size for flat format.
const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
#[cfg(feature = "vector_index")]
const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
/// A scanner scans a region and returns a [SendableRecordBatchStream].
pub(crate) enum Scanner {
@@ -498,6 +502,16 @@ impl ScanRegion {
self.build_fulltext_index_applier(&non_field_filters),
self.build_fulltext_index_applier(&field_filters),
];
#[cfg(feature = "vector_index")]
let vector_index_applier = self.build_vector_index_applier();
#[cfg(feature = "vector_index")]
let vector_index_k = self.request.vector_search.as_ref().map(|search| {
if self.request.filters.is_empty() {
search.k
} else {
search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
}
});
let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
if flat_format {
@@ -523,6 +537,10 @@ impl ScanRegion {
.with_series_row_selector(self.request.series_row_selector)
.with_distribution(self.request.distribution)
.with_flat_format(flat_format);
#[cfg(feature = "vector_index")]
let input = input
.with_vector_index_applier(vector_index_applier)
.with_vector_index_k(vector_index_k);
#[cfg(feature = "enterprise")]
let input = if let Some(provider) = self.extension_range_provider {
@@ -667,6 +685,31 @@ impl ScanRegion {
.flatten()
.map(Arc::new)
}
/// Build the vector index applier from vector search request.
#[cfg(feature = "vector_index")]
fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
let vector_search = self.request.vector_search.as_ref()?;
let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
let applier = VectorIndexApplier::new(
self.access_layer.table_dir().to_string(),
self.access_layer.path_type(),
self.access_layer.object_store().clone(),
self.access_layer.puffin_manager_factory().clone(),
vector_search.column_id,
vector_search.query_vector.clone(),
vector_search.metric,
)
.with_file_cache(file_cache)
.with_puffin_metadata_cache(puffin_metadata_cache)
.with_vector_index_cache(vector_index_cache);
Some(Arc::new(applier))
}
}
/// Returns true if the time range of a SST `file` matches the `predicate`.
@@ -708,6 +751,12 @@ pub struct ScanInput {
inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
/// Vector index applier for KNN search.
#[cfg(feature = "vector_index")]
pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
/// Over-fetched k for vector index scan.
#[cfg(feature = "vector_index")]
pub(crate) vector_index_k: Option<usize>,
/// Start time of the query.
pub(crate) query_start: Option<Instant>,
/// The region is using append mode.
@@ -747,6 +796,10 @@ impl ScanInput {
inverted_index_appliers: [None, None],
bloom_filter_index_appliers: [None, None],
fulltext_index_appliers: [None, None],
#[cfg(feature = "vector_index")]
vector_index_applier: None,
#[cfg(feature = "vector_index")]
vector_index_k: None,
query_start: None,
append_mode: false,
filter_deleted: true,
@@ -853,6 +906,25 @@ impl ScanInput {
self
}
/// Sets vector index applier for KNN search.
#[cfg(feature = "vector_index")]
#[must_use]
pub(crate) fn with_vector_index_applier(
mut self,
applier: Option<VectorIndexApplierRef>,
) -> Self {
self.vector_index_applier = applier;
self
}
/// Sets over-fetched k for vector index scan.
#[cfg(feature = "vector_index")]
#[must_use]
pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
self.vector_index_k = k;
self
}
/// Sets start time of the query.
#[must_use]
pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
@@ -988,7 +1060,7 @@ impl ScanInput {
let predicate = self.predicate_for_file(file);
let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
let decode_pk_values = !self.compaction && self.mapper.has_tags();
let res = self
let reader = self
.access_layer
.read_sst(file.clone())
.predicate(predicate)
@@ -996,7 +1068,15 @@ impl ScanInput {
.cache(self.cache_strategy.clone())
.inverted_index_appliers(self.inverted_index_appliers.clone())
.bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
.fulltext_index_appliers(self.fulltext_index_appliers.clone())
.fulltext_index_appliers(self.fulltext_index_appliers.clone());
#[cfg(feature = "vector_index")]
let reader = {
let mut reader = reader;
reader =
reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
reader
};
let res = reader
.expected_metadata(Some(self.mapper.metadata().clone()))
.flat_format(self.flat_format)
.compaction(self.compaction)

View File

@@ -151,6 +151,8 @@ pub(crate) struct ScanMetricsSet {
rg_minmax_filtered: usize,
/// Number of row groups filtered by bloom filter index.
rg_bloom_filtered: usize,
/// Number of row groups filtered by vector index.
rg_vector_filtered: usize,
/// Number of rows in row group before filtering.
rows_before_filter: usize,
/// Number of rows in row group filtered by fulltext index.
@@ -159,6 +161,8 @@ pub(crate) struct ScanMetricsSet {
rows_inverted_filtered: usize,
/// Number of rows in row group filtered by bloom filter index.
rows_bloom_filtered: usize,
/// Number of rows filtered by vector index.
rows_vector_filtered: usize,
/// Number of rows filtered by precise filter.
rows_precise_filtered: usize,
/// Number of record batches read from SST.
@@ -255,10 +259,12 @@ impl fmt::Debug for ScanMetricsSet {
rg_inverted_filtered,
rg_minmax_filtered,
rg_bloom_filtered,
rg_vector_filtered,
rows_before_filter,
rows_fulltext_filtered,
rows_inverted_filtered,
rows_bloom_filtered,
rows_vector_filtered,
rows_precise_filtered,
num_sst_record_batches,
num_sst_batches,
@@ -320,6 +326,9 @@ impl fmt::Debug for ScanMetricsSet {
if *rg_bloom_filtered > 0 {
write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
}
if *rg_vector_filtered > 0 {
write!(f, ", \"rg_vector_filtered\":{rg_vector_filtered}")?;
}
if *rows_fulltext_filtered > 0 {
write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
}
@@ -329,6 +338,9 @@ impl fmt::Debug for ScanMetricsSet {
if *rows_bloom_filtered > 0 {
write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
}
if *rows_vector_filtered > 0 {
write!(f, ", \"rows_vector_filtered\":{rows_vector_filtered}")?;
}
if *rows_precise_filtered > 0 {
write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
}
@@ -500,10 +512,12 @@ impl ScanMetricsSet {
rg_inverted_filtered,
rg_minmax_filtered,
rg_bloom_filtered,
rg_vector_filtered,
rows_total,
rows_fulltext_filtered,
rows_inverted_filtered,
rows_bloom_filtered,
rows_vector_filtered,
rows_precise_filtered,
inverted_index_apply_metrics,
bloom_filter_apply_metrics,
@@ -525,11 +539,13 @@ impl ScanMetricsSet {
self.rg_inverted_filtered += *rg_inverted_filtered;
self.rg_minmax_filtered += *rg_minmax_filtered;
self.rg_bloom_filtered += *rg_bloom_filtered;
self.rg_vector_filtered += *rg_vector_filtered;
self.rows_before_filter += *rows_total;
self.rows_fulltext_filtered += *rows_fulltext_filtered;
self.rows_inverted_filtered += *rows_inverted_filtered;
self.rows_bloom_filtered += *rows_bloom_filtered;
self.rows_vector_filtered += *rows_vector_filtered;
self.rows_precise_filtered += *rows_precise_filtered;
self.num_sst_record_batches += *num_record_batches;
@@ -631,6 +647,10 @@ impl ScanMetricsSet {
READ_ROW_GROUPS_TOTAL
.with_label_values(&["bloom_filter_index_filtered"])
.inc_by(self.rg_bloom_filtered as u64);
#[cfg(feature = "vector_index")]
READ_ROW_GROUPS_TOTAL
.with_label_values(&["vector_index_filtered"])
.inc_by(self.rg_vector_filtered as u64);
PRECISE_FILTER_ROWS_TOTAL
.with_label_values(&["parquet"])
@@ -647,6 +667,10 @@ impl ScanMetricsSet {
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["bloom_filter_index_filtered"])
.inc_by(self.rows_bloom_filtered as u64);
#[cfg(feature = "vector_index")]
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["vector_index_filtered"])
.inc_by(self.rows_vector_filtered as u64);
}
}

View File

@@ -344,6 +344,12 @@ impl FileMeta {
.contains(&IndexType::BloomFilterIndex)
}
/// Returns true if the file has a vector index.
#[cfg(feature = "vector_index")]
pub fn vector_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::VectorIndex)
}
pub fn index_file_size(&self) -> u64 {
self.index_file_size
}

View File

@@ -87,7 +87,7 @@ impl FileReferenceManager {
pub(crate) async fn get_snapshot_of_file_refs(
&self,
query_regions_for_mem: Vec<MitoRegionRef>,
related_regions_in_manifest: Vec<(MitoRegionRef, Vec<RegionId>)>,
dst_region_to_src_regions: Vec<(MitoRegionRef, HashSet<RegionId>)>,
) -> Result<FileRefsManifest> {
let mut ref_files = HashMap::new();
// get from in memory file handles
@@ -99,12 +99,17 @@ impl FileReferenceManager {
let mut manifest_version = HashMap::new();
let mut cross_region_refs = HashMap::new();
// get file refs from related regions' manifests
for (related_region, queries) in &related_regions_in_manifest {
let queries = queries.iter().cloned().collect::<HashSet<_>>();
let manifest = related_region.manifest_ctx.manifest().await;
for (dst_region, src_regions) in &dst_region_to_src_regions {
let manifest = dst_region.manifest_ctx.manifest().await;
for meta in manifest.files.values() {
if queries.contains(&meta.region_id) {
if src_regions.contains(&meta.region_id) {
cross_region_refs
.entry(meta.region_id)
.or_insert_with(HashSet::new)
.insert(dst_region.region_id());
// since gc couldn't happen together with repartition
// (both the queries and related_region acquire region read lock), no need to worry about
// staging manifest in repartition here.
@@ -119,7 +124,7 @@ impl FileReferenceManager {
}
}
// not sure if related region's manifest version is needed, but record it for now.
manifest_version.insert(related_region.region_id(), manifest.manifest_version);
manifest_version.insert(dst_region.region_id(), manifest.manifest_version);
}
for r in &query_regions_for_mem {
@@ -138,6 +143,7 @@ impl FileReferenceManager {
Ok(FileRefsManifest {
file_refs: ref_files,
manifest_version,
cross_region_refs,
})
}

View File

@@ -0,0 +1,650 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Vector index applier for KNN search.
use std::sync::Arc;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
use index::vector::distance_metric_to_usearch;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{PuffinManager, PuffinReader};
use roaring::RoaringBitmap;
use snafu::ResultExt;
use store_api::storage::{ColumnId, VectorDistanceMetric};
use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::index::vector_index::{
CachedVectorIndex, VectorIndexCacheKey, VectorIndexCacheRef,
};
use crate::error::{ApplyVectorIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result};
use crate::sst::file::RegionIndexId;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
use crate::sst::index::trigger_index_background_download;
use crate::sst::index::vector_index::creator::VectorIndexConfig;
use crate::sst::index::vector_index::format::VectorIndexBlobHeader;
use crate::sst::index::vector_index::{INDEX_BLOB_TYPE, engine};
/// Result of applying vector index.
#[derive(Debug)]
pub struct VectorIndexApplyOutput {
/// Row offsets in the SST file.
pub row_offsets: Vec<u64>,
}
/// Vector index applier for KNN search against SST blobs.
pub struct VectorIndexApplier {
table_dir: String,
path_type: store_api::region_request::PathType,
object_store: object_store::ObjectStore,
puffin_manager_factory: PuffinManagerFactory,
file_cache: Option<FileCacheRef>,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
vector_index_cache: Option<VectorIndexCacheRef>,
column_id: ColumnId,
query_vector: Vec<f32>,
metric: VectorDistanceMetric,
}
pub type VectorIndexApplierRef = Arc<VectorIndexApplier>;
impl VectorIndexApplier {
pub fn new(
table_dir: String,
path_type: store_api::region_request::PathType,
object_store: object_store::ObjectStore,
puffin_manager_factory: PuffinManagerFactory,
column_id: ColumnId,
query_vector: Vec<f32>,
metric: VectorDistanceMetric,
) -> Self {
Self {
table_dir,
path_type,
object_store,
puffin_manager_factory,
file_cache: None,
puffin_metadata_cache: None,
vector_index_cache: None,
column_id,
query_vector,
metric,
}
}
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
self.file_cache = file_cache;
self
}
pub fn with_puffin_metadata_cache(
mut self,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}
pub fn with_vector_index_cache(mut self, cache: Option<VectorIndexCacheRef>) -> Self {
self.vector_index_cache = cache;
self
}
/// Applies vector index to the file and returns candidates.
///
/// This method loads the vector index blob (from cache or remote), runs
/// a KNN search against the indexed vectors, and maps the HNSW keys back
/// to row offsets in the SST file. It returns only row offsets; callers
/// are responsible for any higher-level ordering or limit enforcement.
pub async fn apply_with_k(
&self,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
k: usize,
) -> Result<VectorIndexApplyOutput> {
if k == 0 {
return Ok(VectorIndexApplyOutput {
row_offsets: Vec::new(),
});
}
let index = self.load_or_read_index(file_id, file_size_hint).await?;
let Some(index) = index else {
return Ok(VectorIndexApplyOutput {
row_offsets: Vec::new(),
});
};
if self.query_vector.len() != index.dimensions as usize {
return ApplyVectorIndexSnafu {
reason: format!(
"Query vector dimension {} does not match index dimension {}",
self.query_vector.len(),
index.dimensions
),
}
.fail();
}
if self.metric != index.metric {
return ApplyVectorIndexSnafu {
reason: format!(
"Query metric {} does not match index metric {}",
self.metric, index.metric
),
}
.fail();
}
if index.indexed_rows == 0 {
return Ok(VectorIndexApplyOutput {
row_offsets: Vec::new(),
});
}
let matches = index
.engine
.search(&self.query_vector, k.min(index.indexed_rows as usize))
.map_err(|e| {
ApplyVectorIndexSnafu {
reason: e.to_string(),
}
.build()
})?;
let row_offsets = map_hnsw_keys_to_row_offsets(
&index.null_bitmap,
index.total_rows,
index.indexed_rows,
matches.keys,
)?;
Ok(VectorIndexApplyOutput { row_offsets })
}
async fn load_or_read_index(
&self,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<Option<Arc<CachedVectorIndex>>> {
let cache_key =
VectorIndexCacheKey::new(file_id.file_id(), file_id.version, self.column_id);
if let Some(cache) = &self.vector_index_cache
&& let Some(cached) = cache.get(&cache_key)
{
return Ok(Some(cached));
}
let reader = match self.cached_blob_reader(file_id, file_size_hint).await {
Ok(Some(reader)) => reader,
Ok(None) => self.remote_blob_reader(file_id, file_size_hint).await?,
Err(err) => {
if is_blob_not_found(&err) {
self.remote_blob_reader(file_id, file_size_hint).await?
} else {
warn!(err; "Failed to read cached vector index blob, fallback to remote");
self.remote_blob_reader(file_id, file_size_hint).await?
}
}
};
let blob_data = read_all_blob(reader, file_size_hint).await?;
if blob_data.is_empty() {
return Ok(None);
}
let cached = Arc::new(parse_vector_index_blob(&blob_data)?);
if let Some(cache) = &self.vector_index_cache {
cache.insert(cache_key, cached.clone());
}
Ok(Some(cached))
}
async fn cached_blob_reader(
&self,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<Option<BlobReader>> {
let Some(file_cache) = &self.file_cache else {
return Ok(None);
};
let index_key = IndexKey::new(
file_id.region_id(),
file_id.file_id(),
FileType::Puffin(file_id.version),
);
if file_cache.get(index_key).await.is_none() {
return Ok(None);
}
let puffin_manager = self.puffin_manager_factory.build(
file_cache.local_store(),
WriteCachePathProvider::new(file_cache.clone()),
);
let blob_name = column_blob_name(self.column_id);
let reader = puffin_manager
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
.blob(&blob_name)
.await
.context(PuffinReadBlobSnafu)?
.reader()
.await
.context(PuffinBuildReaderSnafu)?;
Ok(Some(reader))
}
async fn remote_blob_reader(
&self,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<BlobReader> {
let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
trigger_index_background_download(
self.file_cache.as_ref(),
&file_id,
file_size_hint,
&path_factory,
&self.object_store,
);
let puffin_manager = self
.puffin_manager_factory
.build(self.object_store.clone(), path_factory)
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let blob_name = column_blob_name(self.column_id);
puffin_manager
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
.blob(&blob_name)
.await
.context(PuffinReadBlobSnafu)?
.reader()
.await
.context(PuffinBuildReaderSnafu)
}
}
fn column_blob_name(column_id: ColumnId) -> String {
format!("{INDEX_BLOB_TYPE}-{}", column_id)
}
fn is_blob_not_found(err: &crate::error::Error) -> bool {
matches!(
err,
crate::error::Error::PuffinReadBlob {
source: puffin::error::Error::BlobNotFound { .. },
..
}
)
}
async fn read_all_blob(reader: BlobReader, file_size_hint: Option<u64>) -> Result<Vec<u8>> {
let metadata = reader.metadata().await.map_err(|e| {
ApplyVectorIndexSnafu {
reason: format!("Failed to read vector index metadata: {}", e),
}
.build()
})?;
if let Some(limit) = file_size_hint
&& metadata.content_length > limit
{
return ApplyVectorIndexSnafu {
reason: format!(
"Vector index blob size {} exceeds file size hint {}",
metadata.content_length, limit
),
}
.fail();
}
let bytes = reader.read(0..metadata.content_length).await.map_err(|e| {
ApplyVectorIndexSnafu {
reason: format!("Failed to read vector index data: {}", e),
}
.build()
})?;
Ok(bytes.to_vec())
}
fn parse_vector_index_blob(data: &[u8]) -> Result<CachedVectorIndex> {
let (header, mut offset) = VectorIndexBlobHeader::decode(data).map_err(|e| {
ApplyVectorIndexSnafu {
reason: e.to_string(),
}
.build()
})?;
let null_bitmap_len = header.null_bitmap_len as usize;
if data.len() < offset + null_bitmap_len {
return ApplyVectorIndexSnafu {
reason: "Vector index blob truncated while reading null bitmap".to_string(),
}
.fail();
}
let null_bitmap_bytes = &data[offset..offset + null_bitmap_len];
offset += null_bitmap_len;
let null_bitmap = RoaringBitmap::deserialize_from(null_bitmap_bytes).map_err(|e| {
ApplyVectorIndexSnafu {
reason: format!("Failed to deserialize null bitmap: {}", e),
}
.build()
})?;
let index_bytes = &data[offset..];
let config = VectorIndexConfig {
engine: header.engine_type,
dim: header.dim as usize,
metric: distance_metric_to_usearch(header.metric),
distance_metric: header.metric,
connectivity: header.connectivity as usize,
expansion_add: header.expansion_add as usize,
expansion_search: header.expansion_search as usize,
};
let engine = engine::load_engine(header.engine_type, &config, index_bytes).map_err(|e| {
ApplyVectorIndexSnafu {
reason: e.to_string(),
}
.build()
})?;
Ok(CachedVectorIndex::new(
engine,
null_bitmap,
header.dim,
header.metric,
header.total_rows,
header.indexed_rows,
))
}
fn map_hnsw_keys_to_row_offsets(
null_bitmap: &RoaringBitmap,
total_rows: u64,
indexed_rows: u64,
keys: Vec<u64>,
) -> Result<Vec<u64>> {
if total_rows == 0 {
return Ok(Vec::new());
}
let total_rows_u32 = u32::try_from(total_rows).map_err(|_| {
ApplyVectorIndexSnafu {
reason: format!("Total rows {} exceeds u32::MAX", total_rows),
}
.build()
})?;
let mut row_offsets = Vec::with_capacity(keys.len());
for key in keys {
let offset = hnsw_key_to_row_offset(null_bitmap, total_rows_u32, indexed_rows, key)?;
row_offsets.push(offset as u64);
}
Ok(row_offsets)
}
fn hnsw_key_to_row_offset(
null_bitmap: &RoaringBitmap,
total_rows: u32,
indexed_rows: u64,
key: u64,
) -> Result<u32> {
if total_rows == 0 {
return ApplyVectorIndexSnafu {
reason: "Total rows is zero".to_string(),
}
.fail();
}
if key >= indexed_rows {
return ApplyVectorIndexSnafu {
reason: format!("HNSW key {} exceeds indexed rows {}", key, indexed_rows),
}
.fail();
}
if null_bitmap.is_empty() {
return Ok(key as u32);
}
let mut left: u32 = 0;
let mut right: u32 = total_rows - 1;
while left <= right {
let mid = left + (right - left) / 2;
let nulls_before = null_bitmap.rank(mid);
let non_nulls = (mid as u64 + 1).saturating_sub(nulls_before);
if non_nulls > key {
if mid == 0 {
break;
}
right = mid - 1;
} else {
left = mid + 1;
}
}
if left >= total_rows {
return ApplyVectorIndexSnafu {
reason: "Failed to map HNSW key to row offset".to_string(),
}
.fail();
}
Ok(left)
}
#[cfg(test)]
mod tests {
use common_test_util::temp_dir::TempDir;
use futures::io::Cursor;
use object_store::ObjectStore;
use object_store::services::Memory;
use puffin::puffin_manager::PuffinWriter;
use store_api::region_request::PathType;
use store_api::storage::{ColumnId, FileId, VectorDistanceMetric, VectorIndexEngineType};
use super::*;
use crate::access_layer::RegionFilePathFactory;
use crate::sst::file::RegionFileId;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::index::vector_index::creator::VectorIndexConfig;
async fn build_applier_with_blob(
blob: Vec<u8>,
column_id: ColumnId,
query_vector: Vec<f32>,
metric: VectorDistanceMetric,
) -> (TempDir, VectorIndexApplier, RegionIndexId, u64) {
let (dir, puffin_manager_factory) =
PuffinManagerFactory::new_for_test_async("test_vector_index_applier_").await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let file_id = RegionFileId::new(0.into(), FileId::random());
let index_id = RegionIndexId::new(file_id, 0);
let table_dir = "table_dir".to_string();
let puffin_manager = puffin_manager_factory.build(
object_store.clone(),
RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
);
let mut writer = puffin_manager.writer(&index_id).await.unwrap();
let blob_name = column_blob_name(column_id);
let _bytes_written = writer
.put_blob(
blob_name.as_str(),
Cursor::new(blob),
Default::default(),
Default::default(),
)
.await
.unwrap();
let file_size = writer.finish().await.unwrap();
let applier = VectorIndexApplier::new(
table_dir,
PathType::Bare,
object_store,
puffin_manager_factory,
column_id,
query_vector,
metric,
);
(dir, applier, index_id, file_size)
}
fn build_blob_with_vectors(
config: &VectorIndexConfig,
vectors: Vec<(u64, Vec<f32>)>,
null_bitmap: &RoaringBitmap,
total_rows: u64,
indexed_rows: u64,
) -> Vec<u8> {
let mut engine = engine::create_engine(config.engine, config).unwrap();
for (key, vector) in vectors {
engine.add(key, &vector).unwrap();
}
let index_size = engine.serialized_length();
let mut index_bytes = vec![0u8; index_size];
engine.save_to_buffer(&mut index_bytes).unwrap();
let mut null_bitmap_bytes = Vec::new();
null_bitmap.serialize_into(&mut null_bitmap_bytes).unwrap();
let header = VectorIndexBlobHeader::new(
config.engine,
config.dim as u32,
config.distance_metric,
config.connectivity as u16,
config.expansion_add as u16,
config.expansion_search as u16,
total_rows,
indexed_rows,
null_bitmap_bytes.len() as u32,
)
.unwrap();
let mut blob = Vec::new();
header.encode_into(&mut blob);
blob.extend_from_slice(&null_bitmap_bytes);
blob.extend_from_slice(&index_bytes);
blob
}
#[test]
fn test_hnsw_key_to_row_offset_with_nulls() {
let mut bitmap = RoaringBitmap::new();
bitmap.insert(1);
bitmap.insert(3);
assert_eq!(hnsw_key_to_row_offset(&bitmap, 6, 4, 0).unwrap(), 0);
assert_eq!(hnsw_key_to_row_offset(&bitmap, 6, 4, 1).unwrap(), 2);
assert_eq!(hnsw_key_to_row_offset(&bitmap, 6, 4, 2).unwrap(), 4);
}
#[test]
fn test_hnsw_key_to_row_offset_without_nulls() {
let bitmap = RoaringBitmap::new();
assert_eq!(hnsw_key_to_row_offset(&bitmap, 4, 4, 3).unwrap(), 3);
}
#[test]
fn test_hnsw_key_to_row_offset_out_of_range() {
let bitmap = RoaringBitmap::new();
assert!(hnsw_key_to_row_offset(&bitmap, 4, 4, 4).is_err());
}
#[test]
fn test_map_hnsw_keys_to_row_offsets_multiple_keys() {
let bitmap = RoaringBitmap::new();
let offsets = map_hnsw_keys_to_row_offsets(&bitmap, 4, 4, vec![0, 2, 3]).unwrap();
assert_eq!(offsets, vec![0, 2, 3]);
}
#[tokio::test]
async fn test_apply_with_k_returns_offsets() {
let config = VectorIndexConfig {
engine: VectorIndexEngineType::Usearch,
dim: 2,
metric: distance_metric_to_usearch(VectorDistanceMetric::L2sq),
distance_metric: VectorDistanceMetric::L2sq,
connectivity: 16,
expansion_add: 128,
expansion_search: 64,
};
let mut null_bitmap = RoaringBitmap::new();
null_bitmap.insert(1);
let blob = build_blob_with_vectors(
&config,
vec![(0, vec![1.0, 0.0]), (1, vec![0.0, 1.0])],
&null_bitmap,
3,
2,
);
let (_dir, applier, index_id, size_bytes) =
build_applier_with_blob(blob, 1, vec![1.0, 0.0], VectorDistanceMetric::L2sq).await;
let output = applier
.apply_with_k(index_id, Some(size_bytes), 2)
.await
.unwrap();
assert_eq!(output.row_offsets, vec![0, 2]);
}
#[tokio::test]
async fn test_apply_with_k_dimension_mismatch() {
let config = VectorIndexConfig {
engine: VectorIndexEngineType::Usearch,
dim: 2,
metric: distance_metric_to_usearch(VectorDistanceMetric::L2sq),
distance_metric: VectorDistanceMetric::L2sq,
connectivity: 16,
expansion_add: 128,
expansion_search: 64,
};
let null_bitmap = RoaringBitmap::new();
let blob = build_blob_with_vectors(&config, vec![(0, vec![1.0, 0.0])], &null_bitmap, 1, 1);
let (_dir, applier, index_id, size_bytes) =
build_applier_with_blob(blob, 1, vec![1.0, 0.0, 0.0], VectorDistanceMetric::L2sq).await;
let res = applier.apply_with_k(index_id, Some(size_bytes), 1).await;
assert!(res.is_err());
}
#[tokio::test]
async fn test_apply_with_k_empty_blob() {
let config = VectorIndexConfig {
engine: VectorIndexEngineType::Usearch,
dim: 1,
metric: distance_metric_to_usearch(VectorDistanceMetric::L2sq),
distance_metric: VectorDistanceMetric::L2sq,
connectivity: 16,
expansion_add: 128,
expansion_search: 64,
};
let null_bitmap = RoaringBitmap::new();
let blob = build_blob_with_vectors(&config, Vec::new(), &null_bitmap, 0, 0);
let (_dir, applier, index_id, size_bytes) =
build_applier_with_blob(blob, 1, vec![1.0], VectorDistanceMetric::L2sq).await;
let output = applier
.apply_with_k(index_id, Some(size_bytes), 1)
.await
.unwrap();
assert!(output.row_offsets.is_empty());
}
}

View File

@@ -44,6 +44,9 @@ use crate::sst::index::intermediate::{
};
use crate::sst::index::puffin_manager::SstPuffinWriter;
use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
use crate::sst::index::vector_index::format::{
VECTOR_INDEX_BLOB_HEADER_SIZE, VectorIndexBlobHeader,
};
use crate::sst::index::vector_index::util::bytes_to_f32_slice;
use crate::sst::index::vector_index::{INDEX_BLOB_TYPE, engine};
@@ -323,6 +326,7 @@ impl VectorIndexer {
for (col_id, creator) in &mut self.creators {
let Some(values) = batch.field_col_value(*col_id) else {
creator.add_nulls(n);
continue;
};
@@ -561,36 +565,12 @@ impl VectorIndexer {
creator.save_to_buffer(&mut index_bytes)?;
// Header size: version(1) + engine(1) + dim(4) + metric(1) +
// connectivity(2) + expansion_add(2) + expansion_search(2) +
// total_rows(8) + indexed_rows(8) + bitmap_len(4) = 33 bytes
/// Size of the vector index blob header in bytes.
/// Header format: version(1) + engine(1) + dim(4) + metric(1) +
/// connectivity(2) + expansion_add(2) + expansion_search(2) +
/// total_rows(8) + indexed_rows(8) + bitmap_len(4) = 33 bytes
const VECTOR_INDEX_BLOB_HEADER_SIZE: usize = 33;
// connectivity(2) + expansion_add(2) + expansion_search(2) +
// total_rows(8) + indexed_rows(8) + bitmap_len(4) = 33 bytes.
let total_size =
VECTOR_INDEX_BLOB_HEADER_SIZE + null_bitmap_bytes.len() + index_bytes.len();
let mut blob_data = Vec::with_capacity(total_size);
// Write version (1 byte)
blob_data.push(1u8);
// Write engine type (1 byte)
blob_data.push(creator.engine_type().as_u8());
// Write dimension (4 bytes, little-endian)
blob_data.extend_from_slice(&(creator.config.dim as u32).to_le_bytes());
// Write metric (1 byte)
blob_data.push(creator.metric().as_u8());
// Write connectivity/M (2 bytes, little-endian)
blob_data.extend_from_slice(&(creator.config.connectivity as u16).to_le_bytes());
// Write expansion_add/ef_construction (2 bytes, little-endian)
blob_data.extend_from_slice(&(creator.config.expansion_add as u16).to_le_bytes());
// Write expansion_search/ef_search (2 bytes, little-endian)
blob_data.extend_from_slice(&(creator.config.expansion_search as u16).to_le_bytes());
// Write total_rows (8 bytes, little-endian)
blob_data.extend_from_slice(&creator.current_row_offset.to_le_bytes());
// Write indexed_rows (8 bytes, little-endian)
blob_data.extend_from_slice(&creator.next_hnsw_key.to_le_bytes());
// Write NULL bitmap length (4 bytes, little-endian)
let bitmap_len: u32 = null_bitmap_bytes.len().try_into().map_err(|_| {
VectorIndexBuildSnafu {
reason: format!(
@@ -601,7 +581,24 @@ impl VectorIndexer {
}
.build()
})?;
blob_data.extend_from_slice(&bitmap_len.to_le_bytes());
let header = VectorIndexBlobHeader::new(
creator.engine_type(),
creator.config.dim as u32,
creator.metric(),
creator.config.connectivity as u16,
creator.config.expansion_add as u16,
creator.config.expansion_search as u16,
creator.current_row_offset,
creator.next_hnsw_key,
bitmap_len,
)
.map_err(|e| {
VectorIndexFinishSnafu {
reason: e.to_string(),
}
.build()
})?;
header.encode_into(&mut blob_data);
// Write NULL bitmap
blob_data.extend_from_slice(&null_bitmap_bytes);
// Write vector index
@@ -686,7 +683,78 @@ impl VectorIndexer {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
use datatypes::vectors::{TimestampMillisecondVector, UInt8Vector, UInt64Vector};
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::storage::{ColumnId, FileId, RegionId};
use super::*;
use crate::read::BatchColumn;
fn mock_region_metadata_with_vector() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("tag", ConcreteDataType::int64_datatype(), true),
semantic_type: SemanticType::Tag,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("vec", ConcreteDataType::vector_datatype(2), true),
semantic_type: SemanticType::Field,
column_id: 3,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"field_u64",
ConcreteDataType::uint64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 4,
})
.primary_key(vec![2]);
Arc::new(builder.build().unwrap())
}
fn new_batch_missing_vector_column(column_id: ColumnId, rows: usize) -> Batch {
let fields = vec![(0, SortField::new(ConcreteDataType::int64_datatype()))];
let codec = DensePrimaryKeyCodec::with_fields(fields);
let primary_key = codec.encode([ValueRef::Int64(1)].into_iter()).unwrap();
let field = BatchColumn {
column_id,
data: Arc::new(UInt64Vector::from_iter_values(0..rows as u64)),
};
Batch::new(
primary_key,
Arc::new(TimestampMillisecondVector::from_values(
(0..rows).map(|i| i as i64).collect::<Vec<_>>(),
)),
Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(0, rows))),
Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(1, rows))),
vec![field],
)
.unwrap()
}
#[test]
fn test_vector_index_creator() {
@@ -837,23 +905,24 @@ mod tests {
let mut index_bytes = vec![0u8; index_size];
creator.save_to_buffer(&mut index_bytes).unwrap();
// Header: 33 bytes
let header_size = 33;
let total_size = header_size + null_bitmap_bytes.len() + index_bytes.len();
let total_size =
VECTOR_INDEX_BLOB_HEADER_SIZE + null_bitmap_bytes.len() + index_bytes.len();
let mut blob_data = Vec::with_capacity(total_size);
// Write header fields
blob_data.push(1u8); // version
blob_data.push(creator.engine_type().as_u8()); // engine type
blob_data.extend_from_slice(&(creator.config.dim as u32).to_le_bytes()); // dimension
blob_data.push(creator.metric().as_u8()); // metric
blob_data.extend_from_slice(&(creator.config.connectivity as u16).to_le_bytes());
blob_data.extend_from_slice(&(creator.config.expansion_add as u16).to_le_bytes());
blob_data.extend_from_slice(&(creator.config.expansion_search as u16).to_le_bytes());
blob_data.extend_from_slice(&creator.current_row_offset.to_le_bytes()); // total_rows
blob_data.extend_from_slice(&creator.next_hnsw_key.to_le_bytes()); // indexed_rows
let bitmap_len: u32 = null_bitmap_bytes.len().try_into().unwrap();
blob_data.extend_from_slice(&bitmap_len.to_le_bytes());
let header = VectorIndexBlobHeader::new(
creator.engine_type(),
creator.config.dim as u32,
creator.metric(),
creator.config.connectivity as u16,
creator.config.expansion_add as u16,
creator.config.expansion_search as u16,
creator.current_row_offset,
creator.next_hnsw_key,
bitmap_len,
)
.unwrap();
header.encode_into(&mut blob_data);
blob_data.extend_from_slice(&null_bitmap_bytes);
blob_data.extend_from_slice(&index_bytes);
@@ -861,60 +930,62 @@ mod tests {
assert_eq!(blob_data.len(), total_size);
// Parse header and verify values
assert_eq!(blob_data[0], 1); // version
assert_eq!(blob_data[1], VectorIndexEngineType::Usearch.as_u8()); // engine
let dim = u32::from_le_bytes([blob_data[2], blob_data[3], blob_data[4], blob_data[5]]);
assert_eq!(dim, 4);
let metric = blob_data[6];
let (decoded, header_size) = VectorIndexBlobHeader::decode(&blob_data).unwrap();
assert_eq!(header_size, VECTOR_INDEX_BLOB_HEADER_SIZE);
assert_eq!(decoded.engine_type, VectorIndexEngineType::Usearch);
assert_eq!(decoded.dim, 4);
assert_eq!(
metric,
datatypes::schema::VectorDistanceMetric::L2sq.as_u8()
decoded.metric,
datatypes::schema::VectorDistanceMetric::L2sq
);
let connectivity = u16::from_le_bytes([blob_data[7], blob_data[8]]);
assert_eq!(connectivity, 24);
let expansion_add = u16::from_le_bytes([blob_data[9], blob_data[10]]);
assert_eq!(expansion_add, 200);
let expansion_search = u16::from_le_bytes([blob_data[11], blob_data[12]]);
assert_eq!(expansion_search, 100);
let total_rows = u64::from_le_bytes([
blob_data[13],
blob_data[14],
blob_data[15],
blob_data[16],
blob_data[17],
blob_data[18],
blob_data[19],
blob_data[20],
]);
assert_eq!(total_rows, 5);
let indexed_rows = u64::from_le_bytes([
blob_data[21],
blob_data[22],
blob_data[23],
blob_data[24],
blob_data[25],
blob_data[26],
blob_data[27],
blob_data[28],
]);
assert_eq!(indexed_rows, 3);
let null_bitmap_len =
u32::from_le_bytes([blob_data[29], blob_data[30], blob_data[31], blob_data[32]]);
assert_eq!(null_bitmap_len as usize, null_bitmap_bytes.len());
assert_eq!(decoded.connectivity, 24);
assert_eq!(decoded.expansion_add, 200);
assert_eq!(decoded.expansion_search, 100);
assert_eq!(decoded.total_rows, 5);
assert_eq!(decoded.indexed_rows, 3);
assert_eq!(decoded.null_bitmap_len as usize, null_bitmap_bytes.len());
// Verify null bitmap can be deserialized
let null_bitmap_data = &blob_data[header_size..header_size + null_bitmap_len as usize];
let null_bitmap_data =
&blob_data[header_size..header_size + decoded.null_bitmap_len as usize];
let restored_bitmap = RoaringBitmap::deserialize_from(null_bitmap_data).unwrap();
assert_eq!(restored_bitmap.len(), 2); // 2 nulls
assert!(restored_bitmap.contains(1));
assert!(restored_bitmap.contains(3));
}
#[tokio::test]
async fn test_vector_index_missing_column_as_nulls() {
let tempdir = common_test_util::temp_dir::create_temp_dir(
"test_vector_index_missing_column_as_nulls_",
);
let intm_mgr = IntermediateManager::init_fs(tempdir.path().to_string_lossy())
.await
.unwrap();
let region_metadata = mock_region_metadata_with_vector();
let mut vector_index_options = HashMap::new();
vector_index_options.insert(3, VectorIndexOptions::default());
let mut indexer = VectorIndexer::new(
FileId::random(),
&region_metadata,
intm_mgr,
None,
&vector_index_options,
)
.unwrap()
.unwrap();
let mut batch = new_batch_missing_vector_column(4, 3);
indexer.update(&mut batch).await.unwrap();
let creator = indexer.creators.get(&3).unwrap();
assert_eq!(creator.size(), 0);
assert_eq!(creator.current_row_offset, 3);
assert_eq!(creator.null_bitmap.len(), 3);
for idx in 0..3 {
assert!(creator.null_bitmap.contains(idx as u32));
}
}
}

View File

@@ -0,0 +1,324 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Vector index blob format helpers.
use std::fmt;
#[cfg(test)]
use datatypes::schema::VectorDistanceMetric as SchemaVectorDistanceMetric;
#[cfg(test)]
use index::vector::distance_metric_to_usearch;
use store_api::storage::{VectorDistanceMetric, VectorIndexEngineType};
pub(crate) const VECTOR_INDEX_BLOB_VERSION: u8 = 1;
pub(crate) const VECTOR_INDEX_BLOB_HEADER_SIZE: usize = 33;
#[derive(Debug, Clone, Copy)]
pub(crate) struct VectorIndexBlobHeader {
pub engine_type: VectorIndexEngineType,
pub dim: u32,
pub metric: VectorDistanceMetric,
pub connectivity: u16,
pub expansion_add: u16,
pub expansion_search: u16,
pub total_rows: u64,
pub indexed_rows: u64,
pub null_bitmap_len: u32,
}
impl VectorIndexBlobHeader {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
engine_type: VectorIndexEngineType,
dim: u32,
metric: VectorDistanceMetric,
connectivity: u16,
expansion_add: u16,
expansion_search: u16,
total_rows: u64,
indexed_rows: u64,
null_bitmap_len: u32,
) -> Result<Self, VectorIndexBlobFormatError> {
if total_rows < indexed_rows {
return Err(VectorIndexBlobFormatError::InvalidRowCounts {
total: total_rows,
indexed: indexed_rows,
});
}
if total_rows > u64::from(u32::MAX) || indexed_rows > u64::from(u32::MAX) {
return Err(VectorIndexBlobFormatError::RowsExceedU32 {
total: total_rows,
indexed: indexed_rows,
});
}
Ok(Self {
engine_type,
dim,
metric,
connectivity,
expansion_add,
expansion_search,
total_rows,
indexed_rows,
null_bitmap_len,
})
}
pub(crate) fn encode_into(&self, buf: &mut Vec<u8>) {
buf.push(VECTOR_INDEX_BLOB_VERSION);
buf.push(self.engine_type.as_u8());
buf.extend_from_slice(&self.dim.to_le_bytes());
buf.push(self.metric.as_u8());
buf.extend_from_slice(&self.connectivity.to_le_bytes());
buf.extend_from_slice(&self.expansion_add.to_le_bytes());
buf.extend_from_slice(&self.expansion_search.to_le_bytes());
buf.extend_from_slice(&self.total_rows.to_le_bytes());
buf.extend_from_slice(&self.indexed_rows.to_le_bytes());
buf.extend_from_slice(&self.null_bitmap_len.to_le_bytes());
}
pub(crate) fn decode(data: &[u8]) -> Result<(Self, usize), VectorIndexBlobFormatError> {
if data.len() < VECTOR_INDEX_BLOB_HEADER_SIZE {
return Err(VectorIndexBlobFormatError::Truncated("header"));
}
let mut offset = 0;
let version = read_u8(data, &mut offset)?;
if version != VECTOR_INDEX_BLOB_VERSION {
return Err(VectorIndexBlobFormatError::UnsupportedVersion(version));
}
let engine_type = VectorIndexEngineType::try_from_u8(read_u8(data, &mut offset)?)
.ok_or_else(|| VectorIndexBlobFormatError::UnknownEngine(data[offset - 1]))?;
let dim = read_u32(data, &mut offset)?;
let metric = VectorDistanceMetric::try_from_u8(read_u8(data, &mut offset)?)
.ok_or_else(|| VectorIndexBlobFormatError::UnknownMetric(data[offset - 1]))?;
let connectivity = read_u16(data, &mut offset)?;
let expansion_add = read_u16(data, &mut offset)?;
let expansion_search = read_u16(data, &mut offset)?;
let total_rows = read_u64(data, &mut offset)?;
let indexed_rows = read_u64(data, &mut offset)?;
let null_bitmap_len = read_u32(data, &mut offset)?;
let header = VectorIndexBlobHeader::new(
engine_type,
dim,
metric,
connectivity,
expansion_add,
expansion_search,
total_rows,
indexed_rows,
null_bitmap_len,
)?;
Ok((header, offset))
}
}
#[derive(Debug)]
pub(crate) enum VectorIndexBlobFormatError {
Truncated(&'static str),
UnsupportedVersion(u8),
UnknownEngine(u8),
UnknownMetric(u8),
InvalidRowCounts { total: u64, indexed: u64 },
RowsExceedU32 { total: u64, indexed: u64 },
}
impl fmt::Display for VectorIndexBlobFormatError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Truncated(label) => {
write!(f, "Vector index blob truncated while reading {}", label)
}
Self::UnsupportedVersion(version) => {
write!(f, "Unsupported vector index version {}", version)
}
Self::UnknownEngine(value) => write!(f, "Unknown vector index engine type {}", value),
Self::UnknownMetric(value) => write!(f, "Unknown vector index metric {}", value),
Self::InvalidRowCounts { total, indexed } => {
write!(
f,
"Total rows {} is smaller than indexed rows {}",
total, indexed
)
}
Self::RowsExceedU32 { total, indexed } => {
write!(
f,
"Vector index rows exceed u32::MAX (total={}, indexed={})",
total, indexed
)
}
}
}
}
fn read_exact<const N: usize>(
data: &[u8],
offset: &mut usize,
label: &'static str,
) -> Result<[u8; N], VectorIndexBlobFormatError> {
if *offset + N > data.len() {
return Err(VectorIndexBlobFormatError::Truncated(label));
}
let mut buf = [0u8; N];
buf.copy_from_slice(&data[*offset..*offset + N]);
*offset += N;
Ok(buf)
}
fn read_u8(data: &[u8], offset: &mut usize) -> Result<u8, VectorIndexBlobFormatError> {
Ok(read_exact::<1>(data, offset, "u8")?[0])
}
fn read_u16(data: &[u8], offset: &mut usize) -> Result<u16, VectorIndexBlobFormatError> {
Ok(u16::from_le_bytes(read_exact::<2>(data, offset, "u16")?))
}
fn read_u32(data: &[u8], offset: &mut usize) -> Result<u32, VectorIndexBlobFormatError> {
Ok(u32::from_le_bytes(read_exact::<4>(data, offset, "u32")?))
}
fn read_u64(data: &[u8], offset: &mut usize) -> Result<u64, VectorIndexBlobFormatError> {
Ok(u64::from_le_bytes(read_exact::<8>(data, offset, "u64")?))
}
#[cfg(test)]
mod tests {
use roaring::RoaringBitmap;
use store_api::storage::VectorIndexEngineType;
use super::*;
use crate::sst::index::vector_index::creator::VectorIndexConfig;
use crate::sst::index::vector_index::engine;
#[test]
fn test_vector_index_blob_header_roundtrip() {
let header = VectorIndexBlobHeader::new(
VectorIndexEngineType::Usearch,
4,
VectorDistanceMetric::L2sq,
24,
200,
100,
5,
3,
16,
)
.unwrap();
let mut bytes = Vec::new();
header.encode_into(&mut bytes);
let (decoded, offset) = VectorIndexBlobHeader::decode(&bytes).unwrap();
assert_eq!(offset, VECTOR_INDEX_BLOB_HEADER_SIZE);
assert_eq!(decoded.engine_type, header.engine_type);
assert_eq!(decoded.dim, header.dim);
assert_eq!(decoded.metric, header.metric);
assert_eq!(decoded.connectivity, header.connectivity);
assert_eq!(decoded.expansion_add, header.expansion_add);
assert_eq!(decoded.expansion_search, header.expansion_search);
assert_eq!(decoded.total_rows, header.total_rows);
assert_eq!(decoded.indexed_rows, header.indexed_rows);
assert_eq!(decoded.null_bitmap_len, header.null_bitmap_len);
}
#[test]
fn test_vector_index_blob_header_invalid_version() {
let mut blob = vec![0u8; VECTOR_INDEX_BLOB_HEADER_SIZE];
blob[0] = 2;
assert!(VectorIndexBlobHeader::decode(&blob).is_err());
}
#[test]
fn test_vector_index_blob_header_truncated() {
let blob = vec![0u8; VECTOR_INDEX_BLOB_HEADER_SIZE - 1];
assert!(VectorIndexBlobHeader::decode(&blob).is_err());
}
#[test]
fn test_vector_index_blob_parse_roundtrip() {
let config = VectorIndexConfig {
engine: VectorIndexEngineType::Usearch,
dim: 2,
metric: distance_metric_to_usearch(VectorDistanceMetric::L2sq),
distance_metric: VectorDistanceMetric::L2sq,
connectivity: 16,
expansion_add: 128,
expansion_search: 64,
};
let mut engine = engine::create_engine(config.engine, &config).unwrap();
engine.add(0, &[0.0, 1.0]).unwrap();
let index_size = engine.serialized_length();
let mut index_bytes = vec![0u8; index_size];
engine.save_to_buffer(&mut index_bytes).unwrap();
let null_bitmap = RoaringBitmap::new();
let mut null_bitmap_bytes = Vec::new();
null_bitmap.serialize_into(&mut null_bitmap_bytes).unwrap();
let header = VectorIndexBlobHeader::new(
config.engine,
config.dim as u32,
VectorDistanceMetric::L2sq,
config.connectivity as u16,
config.expansion_add as u16,
config.expansion_search as u16,
1,
1,
null_bitmap_bytes.len() as u32,
)
.unwrap();
let mut blob = Vec::new();
header.encode_into(&mut blob);
blob.extend_from_slice(&null_bitmap_bytes);
blob.extend_from_slice(&index_bytes);
let (decoded, offset) = VectorIndexBlobHeader::decode(&blob).unwrap();
let null_bitmap_len = decoded.null_bitmap_len as usize;
let null_bitmap_data = &blob[offset..offset + null_bitmap_len];
let restored_bitmap = RoaringBitmap::deserialize_from(null_bitmap_data).unwrap();
assert_eq!(decoded.metric, VectorDistanceMetric::L2sq);
assert_eq!(decoded.total_rows, 1);
assert_eq!(decoded.indexed_rows, 1);
assert_eq!(restored_bitmap.len(), 0);
}
#[test]
fn test_vector_index_blob_header_format_matches_creator() {
let header = VectorIndexBlobHeader::new(
VectorIndexEngineType::Usearch,
4,
VectorDistanceMetric::L2sq,
24,
200,
100,
5,
3,
2,
)
.unwrap();
let mut bytes = Vec::new();
header.encode_into(&mut bytes);
let (decoded, header_size) = VectorIndexBlobHeader::decode(&bytes).unwrap();
assert_eq!(header_size, VECTOR_INDEX_BLOB_HEADER_SIZE);
assert_eq!(decoded.metric, SchemaVectorDistanceMetric::L2sq);
assert_eq!(decoded.total_rows, 5);
assert_eq!(decoded.indexed_rows, 3);
assert_eq!(decoded.null_bitmap_len, 2);
}
}

View File

@@ -14,8 +14,10 @@
//! Vector index module for HNSW-based approximate nearest neighbor search.
pub(crate) mod applier;
pub(crate) mod creator;
pub(crate) mod engine;
pub(crate) mod format;
pub(crate) mod util;
/// The blob type identifier for vector index in puffin files.

View File

@@ -14,6 +14,8 @@
//! Parquet reader.
#[cfg(feature = "vector_index")]
use std::collections::BTreeSet;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -41,6 +43,8 @@ use table::predicate::Predicate;
use crate::cache::CacheStrategy;
use crate::cache::index::result_cache::PredicateKey;
#[cfg(feature = "vector_index")]
use crate::error::ApplyVectorIndexSnafu;
use crate::error::{
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
ReadParquetSnafu, Result,
@@ -61,6 +65,8 @@ use crate::sst::index::fulltext_index::applier::{
use crate::sst::index::inverted_index::applier::{
InvertedIndexApplierRef, InvertedIndexApplyMetrics,
};
#[cfg(feature = "vector_index")]
use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
use crate::sst::parquet::file_range::{
FileRangeContext, FileRangeContextRef, PreFilterMode, RangeBase, row_group_contains_delete,
};
@@ -74,6 +80,7 @@ use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
const INDEX_TYPE_FULLTEXT: &str = "fulltext";
const INDEX_TYPE_INVERTED: &str = "inverted";
const INDEX_TYPE_BLOOM: &str = "bloom filter";
const INDEX_TYPE_VECTOR: &str = "vector";
macro_rules! handle_index_error {
($err:expr, $file_handle:expr, $index_type:expr) => {
@@ -117,6 +124,12 @@ pub struct ParquetReaderBuilder {
inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
/// Vector index applier for KNN search.
#[cfg(feature = "vector_index")]
vector_index_applier: Option<VectorIndexApplierRef>,
/// Over-fetched k for vector index scan.
#[cfg(feature = "vector_index")]
vector_index_k: Option<usize>,
/// Expected metadata of the region while reading the SST.
/// This is usually the latest metadata of the region. The reader use
/// it get the correct column id of a column by name.
@@ -150,6 +163,10 @@ impl ParquetReaderBuilder {
inverted_index_appliers: [None, None],
bloom_filter_index_appliers: [None, None],
fulltext_index_appliers: [None, None],
#[cfg(feature = "vector_index")]
vector_index_applier: None,
#[cfg(feature = "vector_index")]
vector_index_k: None,
expected_metadata: None,
flat_format: false,
compaction: false,
@@ -211,6 +228,19 @@ impl ParquetReaderBuilder {
self
}
/// Attaches the vector index applier to the builder.
#[cfg(feature = "vector_index")]
#[must_use]
pub(crate) fn vector_index_applier(
mut self,
applier: Option<VectorIndexApplierRef>,
k: Option<usize>,
) -> Self {
self.vector_index_applier = applier;
self.vector_index_k = k;
self
}
/// Attaches the expected metadata to the builder.
#[must_use]
pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
@@ -572,6 +602,19 @@ impl ParquetReaderBuilder {
)
.await;
}
#[cfg(feature = "vector_index")]
{
self.prune_row_groups_by_vector_index(
row_group_size,
num_row_groups,
&mut output,
metrics,
)
.await;
if output.is_empty() {
return output;
}
}
output
}
@@ -799,6 +842,48 @@ impl ParquetReaderBuilder {
pruned
}
/// Prunes row groups by vector index results.
#[cfg(feature = "vector_index")]
async fn prune_row_groups_by_vector_index(
&self,
row_group_size: usize,
num_row_groups: usize,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
) {
let Some(applier) = &self.vector_index_applier else {
return;
};
let Some(k) = self.vector_index_k else {
return;
};
if !self.file_handle.meta_ref().vector_index_available() {
return;
}
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = applier
.apply_with_k(self.file_handle.index_id(), Some(file_size_hint), k)
.await;
let row_ids = match apply_res {
Ok(res) => res.row_offsets,
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
return;
}
};
let selection = match vector_selection_from_offsets(row_ids, row_group_size, num_row_groups)
{
Ok(selection) => selection,
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
return;
}
};
apply_selection_and_update_metrics(output, &selection, metrics, INDEX_TYPE_VECTOR);
}
async fn prune_row_groups_by_fulltext_bloom(
&self,
row_group_size: usize,
@@ -983,6 +1068,29 @@ fn apply_selection_and_update_metrics(
*output = intersection;
}
#[cfg(feature = "vector_index")]
fn vector_selection_from_offsets(
row_offsets: Vec<u64>,
row_group_size: usize,
num_row_groups: usize,
) -> Result<RowGroupSelection> {
let mut row_ids = BTreeSet::new();
for offset in row_offsets {
let row_id = u32::try_from(offset).map_err(|_| {
ApplyVectorIndexSnafu {
reason: format!("Row offset {} exceeds u32::MAX", offset),
}
.build()
})?;
row_ids.insert(row_id);
}
Ok(RowGroupSelection::from_row_ids(
row_ids,
row_group_size,
num_row_groups,
))
}
fn all_required_row_groups_searched(
required_row_groups: &RowGroupSelection,
cached_row_groups: &RowGroupSelection,
@@ -1008,6 +1116,8 @@ pub(crate) struct ReaderFilterMetrics {
pub(crate) rg_minmax_filtered: usize,
/// Number of row groups filtered by bloom filter index.
pub(crate) rg_bloom_filtered: usize,
/// Number of row groups filtered by vector index.
pub(crate) rg_vector_filtered: usize,
/// Number of rows in row group before filtering.
pub(crate) rows_total: usize,
@@ -1017,6 +1127,8 @@ pub(crate) struct ReaderFilterMetrics {
pub(crate) rows_inverted_filtered: usize,
/// Number of rows in row group filtered by bloom filter index.
pub(crate) rows_bloom_filtered: usize,
/// Number of rows filtered by vector index.
pub(crate) rows_vector_filtered: usize,
/// Number of rows filtered by precise filter.
pub(crate) rows_precise_filtered: usize,
@@ -1036,11 +1148,13 @@ impl ReaderFilterMetrics {
self.rg_inverted_filtered += other.rg_inverted_filtered;
self.rg_minmax_filtered += other.rg_minmax_filtered;
self.rg_bloom_filtered += other.rg_bloom_filtered;
self.rg_vector_filtered += other.rg_vector_filtered;
self.rows_total += other.rows_total;
self.rows_fulltext_filtered += other.rows_fulltext_filtered;
self.rows_inverted_filtered += other.rows_inverted_filtered;
self.rows_bloom_filtered += other.rows_bloom_filtered;
self.rows_vector_filtered += other.rows_vector_filtered;
self.rows_precise_filtered += other.rows_precise_filtered;
// Merge optional applier metrics
@@ -1078,6 +1192,9 @@ impl ReaderFilterMetrics {
READ_ROW_GROUPS_TOTAL
.with_label_values(&["bloom_filter_index_filtered"])
.inc_by(self.rg_bloom_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["vector_index_filtered"])
.inc_by(self.rg_vector_filtered as u64);
PRECISE_FILTER_ROWS_TOTAL
.with_label_values(&["parquet"])
@@ -1094,6 +1211,9 @@ impl ReaderFilterMetrics {
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["bloom_filter_index_filtered"])
.inc_by(self.rows_bloom_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["vector_index_filtered"])
.inc_by(self.rows_vector_filtered as u64);
}
fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
@@ -1110,11 +1230,67 @@ impl ReaderFilterMetrics {
self.rg_bloom_filtered += row_group_count;
self.rows_bloom_filtered += row_count;
}
INDEX_TYPE_VECTOR => {
self.rg_vector_filtered += row_group_count;
self.rows_vector_filtered += row_count;
}
_ => {}
}
}
}
#[cfg(all(test, feature = "vector_index"))]
mod tests {
use super::*;
#[test]
fn test_vector_selection_from_offsets() {
let row_group_size = 4;
let num_row_groups = 3;
let selection =
vector_selection_from_offsets(vec![0, 1, 5, 9], row_group_size, num_row_groups)
.unwrap();
assert_eq!(selection.row_group_count(), 3);
assert_eq!(selection.row_count(), 4);
assert!(selection.contains_non_empty_row_group(0));
assert!(selection.contains_non_empty_row_group(1));
assert!(selection.contains_non_empty_row_group(2));
}
#[test]
fn test_vector_selection_from_offsets_out_of_range() {
let row_group_size = 4;
let num_row_groups = 2;
let selection = vector_selection_from_offsets(
vec![0, 7, u64::from(u32::MAX) + 1],
row_group_size,
num_row_groups,
);
assert!(selection.is_err());
}
#[test]
fn test_vector_selection_updates_metrics() {
let row_group_size = 4;
let total_rows = 8;
let mut output = RowGroupSelection::new(row_group_size, total_rows);
let selection = vector_selection_from_offsets(vec![1], row_group_size, 2).unwrap();
let mut metrics = ReaderFilterMetrics::default();
apply_selection_and_update_metrics(
&mut output,
&selection,
&mut metrics,
INDEX_TYPE_VECTOR,
);
assert_eq!(metrics.rg_vector_filtered, 1);
assert_eq!(metrics.rows_vector_filtered, 7);
assert_eq!(output.row_count(), 1);
}
}
/// Metrics for parquet metadata cache operations.
#[derive(Default, Clone, Copy)]
pub(crate) struct MetadataCacheMetrics {

View File

@@ -2208,6 +2208,7 @@ mod test {
use sql::dialect::GreptimeDbDialect;
use sql::parser::{ParseOptions, ParserContext};
use sql::statements::statement::Statement;
use sqlparser::parser::Parser;
use super::*;
use crate::expr_helper;
@@ -2225,6 +2226,39 @@ mod test {
assert!(!NAME_PATTERN_REG.is_match("#"));
}
#[test]
fn test_partition_expr_equivalence_with_swapped_operands() {
let column_name = "device_id".to_string();
let column_name_and_type =
HashMap::from([(&column_name, ConcreteDataType::int32_datatype())]);
let timezone = Timezone::from_tz_string("UTC").unwrap();
let dialect = GreptimeDbDialect {};
let mut parser = Parser::new(&dialect)
.try_with_sql("device_id < 100")
.unwrap();
let expr_left = parser.parse_expr().unwrap();
let mut parser = Parser::new(&dialect)
.try_with_sql("100 > device_id")
.unwrap();
let expr_right = parser.parse_expr().unwrap();
let partition_left =
convert_one_expr(&expr_left, &column_name_and_type, &timezone).unwrap();
let partition_right =
convert_one_expr(&expr_right, &column_name_and_type, &timezone).unwrap();
assert_eq!(partition_left, partition_right);
assert!([partition_left.clone()].contains(&partition_right));
let mut physical_partition_exprs = vec![partition_left];
let mut logical_partition_exprs = vec![partition_right];
physical_partition_exprs.sort_unstable();
logical_partition_exprs.sort_unstable();
assert_eq!(physical_partition_exprs, logical_partition_exprs);
}
#[tokio::test]
#[ignore = "TODO(ruihang): WIP new partition rule"]
async fn test_parse_partitions() {

View File

@@ -185,6 +185,19 @@ impl RestrictedOp {
Self::Or => ParserBinaryOperator::Or,
}
}
fn invert_for_swap(&self) -> Self {
match self {
Self::Eq => Self::Eq,
Self::NotEq => Self::NotEq,
Self::Lt => Self::Gt,
Self::LtEq => Self::GtEq,
Self::Gt => Self::Lt,
Self::GtEq => Self::LtEq,
Self::And => Self::And,
Self::Or => Self::Or,
}
}
}
impl Display for RestrictedOp {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
@@ -208,6 +221,32 @@ impl PartitionExpr {
op,
rhs: Box::new(rhs),
}
.canonicalize()
}
/// Canonicalize to `Column op Value` form when possible for consistent equality checks.
pub fn canonicalize(self) -> Self {
let lhs = Self::canonicalize_operand(*self.lhs);
let rhs = Self::canonicalize_operand(*self.rhs);
let mut expr = Self {
lhs: Box::new(lhs),
op: self.op,
rhs: Box::new(rhs),
};
if matches!(&*expr.lhs, Operand::Value(_)) && matches!(&*expr.rhs, Operand::Column(_)) {
std::mem::swap(&mut expr.lhs, &mut expr.rhs);
expr.op = expr.op.invert_for_swap();
}
expr
}
fn canonicalize_operand(operand: Operand) -> Operand {
match operand {
Operand::Expr(expr) => Operand::Expr(expr.canonicalize()),
other => other,
}
}
/// Convert [Self] back to sqlparser's [Expr]
@@ -354,7 +393,7 @@ impl PartitionExpr {
let bound: PartitionBound = serde_json::from_str(s).context(error::DeserializeJsonSnafu)?;
match bound {
PartitionBound::Expr(expr) => Ok(Some(expr)),
PartitionBound::Expr(expr) => Ok(Some(expr.canonicalize())),
_ => Ok(None),
}
}
@@ -494,7 +533,7 @@ mod tests {
.try_as_logical_expr()
.unwrap()
.to_string(),
"Int64(10) < a OR a IS NULL"
"a > Int64(10) OR a IS NULL"
);
// Test Gt with column on LHS
@@ -519,7 +558,7 @@ mod tests {
.try_as_logical_expr()
.unwrap()
.to_string(),
"Int64(10) > a OR a IS NULL"
"a < Int64(10) OR a IS NULL"
);
// Test GtEq with column on LHS

View File

@@ -95,8 +95,15 @@ impl FileRef {
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct FileRefsManifest {
pub file_refs: HashMap<RegionId, HashSet<FileRef>>,
/// Manifest version when this manifest is read for it's files
/// Manifest version when this manifest is read for its files
pub manifest_version: HashMap<RegionId, ManifestVersion>,
/// Cross-region file ownership mapping.
///
/// Key is the source/original region id (before repartition); value is the set of
/// target/destination region ids (after repartition) that currently hold files
/// originally coming from that source region.
///
pub cross_region_refs: HashMap<RegionId, HashSet<RegionId>>,
}
#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
@@ -179,6 +186,8 @@ mod tests {
.insert(r1, [FileRef::new(r1, FileId::random(), None)].into());
manifest.manifest_version.insert(r0, 10);
manifest.manifest_version.insert(r1, 20);
manifest.cross_region_refs.insert(r0, [r1].into());
manifest.cross_region_refs.insert(r1, [r0].into());
let json = serde_json::to_string(&manifest).unwrap();
let parsed: FileRefsManifest = serde_json::from_str(&json).unwrap();

View File

@@ -6,6 +6,7 @@ license.workspace = true
[features]
dashboard = []
vector_index = []
[lints]
workspace = true

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::time::Duration;
use common_meta::key::TableMetadataManagerRef;
@@ -105,8 +105,10 @@ async fn distributed_with_gc(store_type: &StorageType) -> (TestContext, TempDirG
#[tokio::test]
async fn test_gc_basic_different_store() {
let _ = dotenv::dotenv();
common_telemetry::init_default_ut_logging();
let store_type = StorageType::build_storage_types_based_on_env();
info!("store type: {:?}", store_type);
for store in store_type {
if store == StorageType::File {
continue; // no point in test gc in fs storage
@@ -190,17 +192,16 @@ async fn test_gc_basic(store_type: &StorageType) {
assert_eq!(sst_files_after_compaction.len(), 5); // 4 old + 1 new
// Step 5: Get table route information for GC procedure
let (region_routes, regions) =
let (_region_routes, regions) =
get_table_route(metasrv.table_metadata_manager(), table_id).await;
// Step 6: Create and execute BatchGcProcedure
let procedure = BatchGcProcedure::new(
metasrv.mailbox().clone(),
metasrv.table_metadata_manager().clone(),
metasrv.options().grpc.server_addr.clone(),
regions.clone(),
false, // full_file_listing
region_routes,
HashMap::new(), // related_regions (empty for this simple test)
false, // full_file_listing
Duration::from_secs(10), // timeout
);

View File

@@ -1379,6 +1379,19 @@ providers = []"#,
)
};
let vector_index_config = if cfg!(feature = "vector_index") {
r#"
[region_engine.mito.vector_index]
create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
mem_threshold_on_create = "auto"
"#
} else {
"\n"
};
let expected_toml_str = format!(
r#"
enable_telemetry = true
@@ -1545,14 +1558,7 @@ create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
mem_threshold_on_create = "auto"
[region_engine.mito.vector_index]
create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
mem_threshold_on_create = "auto"
[region_engine.mito.memtable]
{vector_index_config}[region_engine.mito.memtable]
type = "time_series"
[region_engine.mito.gc]