feat: implement manual type for async index build (#7104)

* feat: prepare for index_build command

Signed-off-by: SNC123 <sinhco@outlook.com>

* feat: impl manual index build

Signed-off-by: SNC123 <sinhco@outlook.com>

* chore: clippy and fmt

Signed-off-by: SNC123 <sinhco@outlook.com>

* test: add idempotency check for manual build

Signed-off-by: SNC123 <sinhco@outlook.com>

* chore: apply suggestions

Signed-off-by: SNC123 <sinhco@outlook.com>

* chore: update proto

Signed-off-by: SNC123 <sinhco@outlook.com>

* chore: apply suggestions

Signed-off-by: SNC123 <sinhco@outlook.com>

* chore: fmt

Signed-off-by: SNC123 <sinhco@outlook.com>

* chore: update proto souce to greptimedb

Signed-off-by: SNC123 <sinhco@outlook.com>

* fix: cargo.lock

Signed-off-by: SNC123 <sinhco@outlook.com>

---------

Signed-off-by: SNC123 <sinhco@outlook.com>
This commit is contained in:
Sicong Hu
2025-11-25 23:21:30 +08:00
committed by GitHub
parent 6b6d1ce7c4
commit 2783a5218e
28 changed files with 748 additions and 27 deletions

View File

@@ -161,6 +161,7 @@ impl ObjbenchCommand {
level: 0,
file_size,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
num_rows,

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod build_index_table;
mod flush_compact_region;
mod flush_compact_table;
mod migrate_region;
@@ -26,6 +27,7 @@ use reconcile_catalog::ReconcileCatalogFunction;
use reconcile_database::ReconcileDatabaseFunction;
use reconcile_table::ReconcileTableFunction;
use crate::admin::build_index_table::BuildIndexFunction;
use crate::flush_flow::FlushFlowFunction;
use crate::function_registry::FunctionRegistry;
@@ -40,6 +42,7 @@ impl AdminFunction {
registry.register(CompactRegionFunction::factory());
registry.register(FlushTableFunction::factory());
registry.register(CompactTableFunction::factory());
registry.register(BuildIndexFunction::factory());
registry.register(FlushFlowFunction::factory());
registry.register(ReconcileCatalogFunction::factory());
registry.register(ReconcileDatabaseFunction::factory());

View File

@@ -0,0 +1,80 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow::datatypes::DataType as ArrowDataType;
use common_error::ext::BoxedError;
use common_macro::admin_fn;
use common_query::error::{
InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu,
UnsupportedInputDataTypeSnafu,
};
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::*;
use session::context::QueryContextRef;
use session::table_name::table_name_to_full_name;
use snafu::{ResultExt, ensure};
use table::requests::BuildIndexTableRequest;
use crate::handlers::TableMutationHandlerRef;
#[admin_fn(
name = BuildIndexFunction,
display_name = build_index,
sig_fn = build_index_signature,
ret = uint64
)]
pub(crate) async fn build_index(
table_mutation_handler: &TableMutationHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
);
let ValueRef::String(table_name) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: "build_index",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
.map_err(BoxedError::new)
.context(TableMutationSnafu)?;
let affected_rows = table_mutation_handler
.build_index(
BuildIndexTableRequest {
catalog_name,
schema_name,
table_name,
},
query_ctx.clone(),
)
.await?;
Ok(Value::from(affected_rows as u64))
}
fn build_index_signature() -> Signature {
Signature::uniform(1, vec![ArrowDataType::Utf8], Volatility::Immutable)
}

View File

@@ -25,7 +25,9 @@ use common_query::Output;
use common_query::error::Result;
use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest};
use table::requests::{
BuildIndexTableRequest, CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
};
/// A trait for handling table mutations in `QueryEngine`.
#[async_trait]
@@ -47,6 +49,13 @@ pub trait TableMutationHandler: Send + Sync {
ctx: QueryContextRef,
) -> Result<AffectedRows>;
/// Trigger an index build task for the table.
async fn build_index(
&self,
request: BuildIndexTableRequest,
ctx: QueryContextRef,
) -> Result<AffectedRows>;
/// Trigger a flush task for a table region.
async fn flush_region(&self, region_id: RegionId, ctx: QueryContextRef)
-> Result<AffectedRows>;

View File

@@ -44,7 +44,8 @@ impl FunctionState {
use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::requests::{
CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
BuildIndexTableRequest, CompactTableRequest, DeleteRequest, FlushTableRequest,
InsertRequest,
};
use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
@@ -120,6 +121,14 @@ impl FunctionState {
Ok(ROWS)
}
async fn build_index(
&self,
_request: BuildIndexTableRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
async fn flush_region(
&self,
_region_id: RegionId,

View File

@@ -431,6 +431,10 @@ impl ColumnSchema {
Ok(())
}
pub fn is_indexed(&self) -> bool {
self.is_inverted_indexed() || self.is_fulltext_indexed() || self.is_skipping_indexed()
}
}
/// Column extended type set in column schema's metadata.

View File

@@ -397,6 +397,7 @@ impl DefaultCompactor {
level: output.output_level,
file_size: sst_info.file_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
indexes: sst_info.index_metadata.build_indexes(),
index_file_size: sst_info.index_metadata.file_size,
index_file_id: None,
num_rows: sst_info.num_rows as u64,

View File

@@ -75,6 +75,7 @@ pub fn new_file_handle_with_size_and_sequence(
level,
file_size,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
num_rows: 0,

View File

@@ -19,7 +19,9 @@ use std::sync::Arc;
use api::v1::Rows;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AlterKind, RegionAlterRequest, RegionRequest, SetIndexOption};
use store_api::region_request::{
AlterKind, RegionAlterRequest, RegionBuildIndexRequest, RegionRequest, SetIndexOption,
};
use store_api::storage::{RegionId, ScanRequest};
use crate::config::{IndexBuildMode, MitoConfig, Mode};
@@ -71,11 +73,9 @@ async fn num_of_index_files(engine: &MitoEngine, scanner: &Scanner, region_id: R
index_files_count
}
#[allow(dead_code)]
fn assert_listener_counts(
listener: &IndexBuildListener,
expected_begin_count: usize,
expected_success_count: usize,
) {
assert_eq!(listener.begin_count(), expected_begin_count);
@@ -299,3 +299,171 @@ async fn test_index_build_type_schema_change() {
assert_eq!(scanner.num_files(), 1);
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 1);
}
#[tokio::test]
async fn test_index_build_type_manual_basic() {
let mut env = TestEnv::with_prefix("test_index_build_type_manual_").await;
let listener = Arc::new(IndexBuildListener::default());
let engine = env
.create_engine_with(
async_build_mode_config(false), // Disable index file creation on flush.
None,
Some(listener.clone()),
None,
)
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
// Create a region with index.
let request = CreateRequestBuilder::new().build_with_index();
let table_dir = request.table_dir.clone();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush and make sure there is no index file (because create_on_flush is disabled).
put_and_flush(&engine, region_id, &column_schemas, 10..20).await;
reopen_region(&engine, region_id, table_dir.clone(), true, HashMap::new()).await;
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
// Index build task is triggered on flush, but not finished.
assert_listener_counts(&listener, 1, 0);
assert_eq!(scanner.num_files(), 1);
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 0);
// Trigger manual index build task and make sure index file is built without flush or compaction.
let request = RegionRequest::BuildIndex(RegionBuildIndexRequest {});
engine.handle_request(region_id, request).await.unwrap();
listener.wait_finish(1).await;
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_listener_counts(&listener, 2, 1);
assert_eq!(scanner.num_files(), 1);
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 1);
// Test idempotency: Second manual index build request on the same file.
let request = RegionRequest::BuildIndex(RegionBuildIndexRequest {});
engine.handle_request(region_id, request).await.unwrap();
reopen_region(&engine, region_id, table_dir.clone(), true, HashMap::new()).await;
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
// Should still be 2 begin and 1 finish - no new task should be created for already indexed file.
assert_listener_counts(&listener, 2, 1);
assert_eq!(scanner.num_files(), 1);
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 1);
// Test idempotency again: Third manual index build request to further verify.
let request = RegionRequest::BuildIndex(RegionBuildIndexRequest {});
engine.handle_request(region_id, request).await.unwrap();
reopen_region(&engine, region_id, table_dir.clone(), true, HashMap::new()).await;
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_listener_counts(&listener, 2, 1);
assert_eq!(scanner.num_files(), 1);
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 1);
}
#[tokio::test]
async fn test_index_build_type_manual_consistency() {
let mut env = TestEnv::with_prefix("test_index_build_type_manual_consistency_").await;
let listener = Arc::new(IndexBuildListener::default());
let engine = env
.create_engine_with(
async_build_mode_config(true),
None,
Some(listener.clone()),
None,
)
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
// Create a region with index.
let create_request = CreateRequestBuilder::new().build_with_index();
let table_dir = create_request.table_dir.clone();
let column_schemas = rows_schema(&create_request);
engine
.handle_request(region_id, RegionRequest::Create(create_request.clone()))
.await
.unwrap();
assert_listener_counts(&listener, 0, 0);
// Flush and make sure index file exists.
put_and_flush(&engine, region_id, &column_schemas, 10..20).await;
listener.wait_finish(1).await;
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_listener_counts(&listener, 1, 1);
assert_eq!(scanner.num_files(), 1);
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 1);
// Check index build task for consistent file will be skipped.
let request = RegionRequest::BuildIndex(RegionBuildIndexRequest {});
engine.handle_request(region_id, request).await.unwrap();
// Reopen the region to ensure the task wasn't skipped due to insufficient time.
reopen_region(&engine, region_id, table_dir.clone(), true, HashMap::new()).await;
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
// Because the file is consistent, no new index build task is triggered.
assert_listener_counts(&listener, 1, 1);
assert_eq!(scanner.num_files(), 1);
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 1);
let mut altered_metadata = create_request.column_metadatas.clone();
// Set index for field_0.
altered_metadata[1].column_schema.set_inverted_index(true);
let sync_columns_request = RegionAlterRequest {
kind: AlterKind::SyncColumns {
column_metadatas: altered_metadata,
},
};
// Use SyncColumns to avoid triggering SchemaChange index build.
engine
.handle_request(region_id, RegionRequest::Alter(sync_columns_request))
.await
.unwrap();
reopen_region(&engine, region_id, table_dir, true, HashMap::new()).await;
// SyncColumns won't trigger index build.
assert_listener_counts(&listener, 1, 1);
let request = RegionRequest::BuildIndex(RegionBuildIndexRequest {});
engine.handle_request(region_id, request).await.unwrap();
listener.wait_finish(2).await; // previous 1 + new 1
// Because the file is inconsistent, new index build task is triggered.
assert_listener_counts(&listener, 2, 2);
}

View File

@@ -637,6 +637,7 @@ impl RegionFlushTask {
level: 0,
file_size: sst_info.file_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
indexes: sst_info.index_metadata.build_indexes(),
index_file_size: sst_info.index_metadata.file_size,
index_file_id: None,
num_rows: sst_info.num_rows as u64,

View File

@@ -264,6 +264,7 @@ async fn checkpoint_with_different_compression_types() {
level: 0,
file_size: 1024000,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
num_rows: 0,
@@ -331,6 +332,7 @@ fn generate_action_lists(num: usize) -> (Vec<FileId>, Vec<RegionMetaActionList>)
level: 0,
file_size: 1024000,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
num_rows: 0,

View File

@@ -426,6 +426,7 @@ mod tests {
level: 0,
file_size: 1024,
available_indexes: SmallVec::new(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
num_rows: 100,

View File

@@ -26,8 +26,9 @@ use common_time::Timestamp;
use partition::expr::PartitionExpr;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::PathType;
use store_api::storage::{FileId, RegionId};
use store_api::storage::{ColumnId, FileId, RegionId};
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
@@ -79,6 +80,8 @@ where
pub type Level = u8;
/// Maximum level of SSTs.
pub const MAX_LEVEL: Level = 2;
/// Type to store index types for a column.
pub type IndexTypes = SmallVec<[IndexType; 4]>;
/// Cross-region file id.
///
@@ -143,7 +146,17 @@ pub struct FileMeta {
/// Size of the file.
pub file_size: u64,
/// Available indexes of the file.
pub available_indexes: SmallVec<[IndexType; 4]>,
pub available_indexes: IndexTypes,
/// Created indexes of the file for each column.
///
/// This is essentially a more granular, column-level version of `available_indexes`,
/// primarily used for manual index building in the asynchronous index construction mode.
///
/// For backward compatibility, older `FileMeta` versions might only contain `available_indexes`.
/// In such cases, we cannot deduce specific column index information from `available_indexes` alone.
/// Therefore, defaulting this `indexes` field to an empty list during deserialization is a
/// reasonable and necessary step to ensure column information consistency.
pub indexes: Vec<ColumnIndexMetadata>,
/// Size of the index file.
pub index_file_size: u64,
/// File ID of the index file.
@@ -206,6 +219,7 @@ impl Debug for FileMeta {
if !self.available_indexes.is_empty() {
debug_struct
.field("available_indexes", &self.available_indexes)
.field("indexes", &self.indexes)
.field("index_file_size", &ReadableSize(self.index_file_size));
}
debug_struct
@@ -236,6 +250,24 @@ pub enum IndexType {
BloomFilterIndex,
}
/// Metadata of indexes created for a specific column in an SST file.
///
/// This structure tracks which index types have been successfully created for a column.
/// It provides more granular, column-level index information compared to the file-level
/// `available_indexes` field in [`FileMeta`].
///
/// This is primarily used for:
/// - Manual index building in asynchronous index construction mode
/// - Verifying index consistency between files and region metadata
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct ColumnIndexMetadata {
/// The column ID this index metadata applies to.
pub column_id: ColumnId,
/// List of index types that have been successfully created for this column.
pub created_indexes: IndexTypes,
}
impl FileMeta {
pub fn exists_index(&self) -> bool {
!self.available_indexes.is_empty()
@@ -261,6 +293,40 @@ impl FileMeta {
self.index_file_size
}
/// Check whether the file index is consistent with the given region metadata.
pub fn is_index_consistent_with_region(&self, metadata: &[ColumnMetadata]) -> bool {
let id_to_indexes = self
.indexes
.iter()
.map(|index| (index.column_id, index.created_indexes.clone()))
.collect::<std::collections::HashMap<_, _>>();
for column in metadata {
if !column.column_schema.is_indexed() {
continue;
}
if let Some(indexes) = id_to_indexes.get(&column.column_id) {
if column.column_schema.is_inverted_indexed()
&& !indexes.contains(&IndexType::InvertedIndex)
{
return false;
}
if column.column_schema.is_fulltext_indexed()
&& !indexes.contains(&IndexType::FulltextIndex)
{
return false;
}
if column.column_schema.is_skipping_indexed()
&& !indexes.contains(&IndexType::BloomFilterIndex)
{
return false;
}
} else {
return false;
}
}
true
}
/// Returns the cross-region file id.
pub fn file_id(&self) -> RegionFileId {
RegionFileId::new(self.region_id, self.file_id)
@@ -475,6 +541,10 @@ pub async fn delete_files(
mod tests {
use std::str::FromStr;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{
ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, SkippingIndexOptions,
};
use datatypes::value::Value;
use partition::expr::{PartitionExpr, col};
@@ -488,6 +558,10 @@ mod tests {
level,
file_size: 0,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 0,
index_file_id: None,
num_rows: 0,
@@ -510,7 +584,7 @@ mod tests {
fn test_deserialize_from_string() {
let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
\"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
\"available_indexes\":[\"InvertedIndex\"],\"level\":0}";
\"available_indexes\":[\"InvertedIndex\"],\"indexes\":[{\"column_id\": 0, \"created_indexes\": [\"InvertedIndex\"]}],\"level\":0}";
let file_meta = create_file_meta(
FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
0,
@@ -535,6 +609,10 @@ mod tests {
level: 0,
file_size: 0,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 0,
index_file_id: None,
num_rows: 0,
@@ -653,4 +731,147 @@ mod tests {
let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
assert!(file_meta_empty.partition_expr.is_none());
}
#[test]
fn test_file_meta_indexes_backward_compatibility() {
// Old FileMeta format without the 'indexes' field
let json_old_file_meta = r#"{
"region_id": 0,
"file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
"time_range": [
{"value": 0, "unit": "Millisecond"},
{"value": 0, "unit": "Millisecond"}
],
"available_indexes": ["InvertedIndex"],
"level": 0,
"file_size": 0,
"index_file_size": 0,
"num_rows": 0,
"num_row_groups": 0
}"#;
let deserialized_file_meta: FileMeta = serde_json::from_str(json_old_file_meta).unwrap();
// Verify backward compatibility: indexes field should default to empty vec
assert_eq!(deserialized_file_meta.indexes, vec![]);
let expected_indexes: IndexTypes = SmallVec::from_iter([IndexType::InvertedIndex]);
assert_eq!(deserialized_file_meta.available_indexes, expected_indexes);
assert_eq!(
deserialized_file_meta.file_id,
FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap()
);
}
#[test]
fn test_is_index_consistent_with_region() {
fn new_column_meta(
id: ColumnId,
name: &str,
inverted: bool,
fulltext: bool,
skipping: bool,
) -> ColumnMetadata {
let mut column_schema =
ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
if inverted {
column_schema = column_schema.with_inverted_index(true);
}
if fulltext {
column_schema = column_schema
.with_fulltext_options(FulltextOptions::new_unchecked(
true,
FulltextAnalyzer::English,
false,
FulltextBackend::Bloom,
1000,
0.01,
))
.unwrap();
}
if skipping {
column_schema = column_schema
.with_skipping_options(SkippingIndexOptions::new_unchecked(
1024,
0.01,
datatypes::schema::SkippingIndexType::BloomFilter,
))
.unwrap();
}
ColumnMetadata {
column_schema,
semantic_type: api::v1::SemanticType::Tag,
column_id: id,
}
}
// Case 1: Perfect match. File has exactly the required indexes.
let mut file_meta = FileMeta {
indexes: vec![ColumnIndexMetadata {
column_id: 1,
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
..Default::default()
};
let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
assert!(file_meta.is_index_consistent_with_region(&region_meta));
// Case 2: Superset match. File has more indexes than required.
file_meta.indexes = vec![ColumnIndexMetadata {
column_id: 1,
created_indexes: SmallVec::from_iter([
IndexType::InvertedIndex,
IndexType::BloomFilterIndex,
]),
}];
let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
assert!(file_meta.is_index_consistent_with_region(&region_meta));
// Case 3: Missing index type. File has the column but lacks the required index type.
file_meta.indexes = vec![ColumnIndexMetadata {
column_id: 1,
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}];
let region_meta = vec![new_column_meta(1, "tag1", true, true, false)]; // Requires fulltext too
assert!(!file_meta.is_index_consistent_with_region(&region_meta));
// Case 4: Missing column. Region requires an index on a column not in the file's index list.
file_meta.indexes = vec![ColumnIndexMetadata {
column_id: 2, // File only has index for column 2
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}];
let region_meta = vec![new_column_meta(1, "tag1", true, false, false)]; // Requires index on column 1
assert!(!file_meta.is_index_consistent_with_region(&region_meta));
// Case 5: No indexes required by region. Should always be consistent.
file_meta.indexes = vec![ColumnIndexMetadata {
column_id: 1,
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}];
let region_meta = vec![new_column_meta(1, "tag1", false, false, false)]; // No index required
assert!(file_meta.is_index_consistent_with_region(&region_meta));
// Case 6: Empty file indexes. Region requires an index.
file_meta.indexes = vec![];
let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
assert!(!file_meta.is_index_consistent_with_region(&region_meta));
// Case 7: Multiple columns, one is inconsistent.
file_meta.indexes = vec![
ColumnIndexMetadata {
column_id: 1,
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
},
ColumnIndexMetadata {
column_id: 2, // Column 2 is missing the required BloomFilterIndex
created_indexes: SmallVec::from_iter([IndexType::FulltextIndex]),
},
];
let region_meta = vec![
new_column_meta(1, "tag1", true, false, false),
new_column_meta(2, "tag2", false, true, true), // Requires Fulltext and BloomFilter
];
assert!(!file_meta.is_index_consistent_with_region(&region_meta));
}
}

View File

@@ -185,7 +185,9 @@ mod tests {
use super::*;
use crate::access_layer::AccessLayer;
use crate::schedule::scheduler::{LocalScheduler, Scheduler};
use crate::sst::file::{FileHandle, FileMeta, FileTimeRange, IndexType, RegionFileId};
use crate::sst::file::{
ColumnIndexMetadata, FileHandle, FileMeta, FileTimeRange, IndexType, RegionFileId,
};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::location;
@@ -233,6 +235,7 @@ mod tests {
level: 0,
file_size: 4096,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
num_rows: 0,
@@ -301,6 +304,10 @@ mod tests {
level: 0,
file_size: 4096,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 4096,
index_file_id: None,
num_rows: 1024,

View File

@@ -208,7 +208,7 @@ mod tests {
use store_api::storage::{FileId, RegionId};
use super::*;
use crate::sst::file::{FileMeta, FileTimeRange, IndexType, RegionFileId};
use crate::sst::file::{ColumnIndexMetadata, FileMeta, FileTimeRange, IndexType, RegionFileId};
#[tokio::test]
async fn test_file_ref_mgr() {
@@ -225,6 +225,10 @@ mod tests {
level: 0,
file_size: 4096,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 4096,
index_file_id: None,
num_rows: 1024,

View File

@@ -60,7 +60,9 @@ use crate::request::{
WorkerRequestWithTime,
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{FileHandle, FileMeta, IndexType, RegionFileId};
use crate::sst::file::{
ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId,
};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::index::fulltext_index::creator::FulltextIndexer;
use crate::sst::index::intermediate::IntermediateManager;
@@ -101,6 +103,35 @@ impl IndexOutput {
}
indexes
}
pub fn build_indexes(&self) -> Vec<ColumnIndexMetadata> {
let mut map: HashMap<ColumnId, IndexTypes> = HashMap::new();
if self.inverted_index.is_available() {
for &col in &self.inverted_index.columns {
map.entry(col).or_default().push(IndexType::InvertedIndex);
}
}
if self.fulltext_index.is_available() {
for &col in &self.fulltext_index.columns {
map.entry(col).or_default().push(IndexType::FulltextIndex);
}
}
if self.bloom_filter.is_available() {
for &col in &self.bloom_filter.columns {
map.entry(col)
.or_default()
.push(IndexType::BloomFilterIndex);
}
}
map.into_iter()
.map(|(column_id, created_indexes)| ColumnIndexMetadata {
column_id,
created_indexes,
})
.collect::<Vec<_>>()
}
}
/// Base output of the index creation.
@@ -416,7 +447,7 @@ impl IndexerBuilderImpl {
}
/// Type of an index build task.
#[derive(Debug, Clone, IntoStaticStr)]
#[derive(Debug, Clone, IntoStaticStr, PartialEq)]
pub enum IndexBuildType {
/// Build index when schema change.
SchemaChange,
@@ -728,6 +759,7 @@ impl IndexBuildTask {
index_file_id: FileId,
) -> Result<RegionEdit> {
self.file_meta.available_indexes = output.build_available_indexes();
self.file_meta.indexes = output.build_indexes();
self.file_meta.index_file_size = output.file_size;
self.file_meta.index_file_id = Some(index_file_id);
let edit = RegionEdit {

View File

@@ -767,6 +767,7 @@ mod tests {
level: 0,
file_size: info.file_size,
available_indexes: info.index_metadata.build_available_indexes(),
indexes: info.index_metadata.build_indexes(),
index_file_size: info.index_metadata.file_size,
index_file_id: None,
num_row_groups: info.num_row_groups,

View File

@@ -124,6 +124,7 @@ pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64)
level: 0,
file_size: 0,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
num_rows: 0,

View File

@@ -102,6 +102,7 @@ impl VersionControlBuilder {
level: 0,
file_size: 0, // We don't care file size.
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
num_rows: 0,
@@ -192,6 +193,7 @@ pub(crate) fn apply_edit(
level: 0,
file_size: 0, // We don't care file size.
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
num_rows: 0,

View File

@@ -85,7 +85,6 @@ impl<S> RegionWorkerLoop<S> {
}
/// Handles manual build index requests.
/// TODO(SNC123): Support admin function of manual index building later.
pub(crate) async fn handle_build_index_request(
&mut self,
region_id: RegionId,
@@ -126,10 +125,16 @@ impl<S> RegionWorkerLoop<S> {
.collect();
let build_tasks = if request.file_metas.is_empty() {
// NOTE: Currently, rebuilding the index will reconstruct the index for all
// files in the region, which is a simplified approach and is not yet available for
// production use; further optimization is required.
all_files.values().cloned().collect::<Vec<_>>()
// If no specific files are provided, find files whose index is inconsistent with the region metadata.
all_files
.values()
.filter(|file| {
!file
.meta_ref()
.is_index_consistent_with_region(&version.metadata.column_metadatas)
})
.cloned()
.collect::<Vec<_>>()
} else {
request
.file_metas

View File

@@ -15,19 +15,19 @@
use std::sync::Arc;
use api::v1::region::region_request::Body as RegionRequestBody;
use api::v1::region::{CompactRequest, FlushRequest, RegionRequestHeader};
use api::v1::region::{BuildIndexRequest, CompactRequest, FlushRequest, RegionRequestHeader};
use catalog::CatalogManagerRef;
use common_catalog::build_db_string;
use common_meta::node_manager::{AffectedRows, NodeManagerRef};
use common_meta::peer::Peer;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info};
use common_telemetry::{debug, error, info};
use futures_util::future;
use partition::manager::{PartitionInfo, PartitionRuleManagerRef};
use session::context::QueryContextRef;
use snafu::prelude::*;
use store_api::storage::RegionId;
use table::requests::{CompactTableRequest, FlushTableRequest};
use table::requests::{BuildIndexTableRequest, CompactTableRequest, FlushTableRequest};
use crate::error::{
CatalogSnafu, FindRegionLeaderSnafu, FindTablePartitionRuleSnafu, JoinTaskSnafu,
@@ -90,6 +90,43 @@ impl Requester {
.await
}
/// Handle the request to build index for table.
pub async fn handle_table_build_index(
&self,
request: BuildIndexTableRequest,
ctx: QueryContextRef,
) -> Result<AffectedRows> {
let partitions = self
.get_table_partitions(
&request.catalog_name,
&request.schema_name,
&request.table_name,
)
.await?;
let requests = partitions
.into_iter()
.map(|partition| {
RegionRequestBody::BuildIndex(BuildIndexRequest {
region_id: partition.id.into(),
})
})
.collect();
info!(
"Handle table manual build index for table {}",
request.table_name
);
debug!("Request details: {:?}", request);
self.do_request(
requests,
Some(build_db_string(&request.catalog_name, &request.schema_name)),
&ctx,
)
.await
}
/// Handle the request to compact table.
pub async fn handle_table_compaction(
&self,
@@ -201,6 +238,7 @@ impl Requester {
let region_id = match req {
RegionRequestBody::Flush(req) => req.region_id,
RegionRequestBody::Compact(req) => req.region_id,
RegionRequestBody::BuildIndex(req) => req.region_id,
_ => {
error!("Unsupported region request: {:?}", req);
return UnsupportedRegionRequestSnafu {}.fail();

View File

@@ -23,8 +23,8 @@ use session::context::QueryContextRef;
use snafu::ResultExt;
use store_api::storage::RegionId;
use table::requests::{
CompactTableRequest, DeleteRequest as TableDeleteRequest, FlushTableRequest,
InsertRequest as TableInsertRequest,
BuildIndexTableRequest, CompactTableRequest, DeleteRequest as TableDeleteRequest,
FlushTableRequest, InsertRequest as TableInsertRequest,
};
use crate::delete::DeleterRef;
@@ -97,6 +97,18 @@ impl TableMutationHandler for TableMutationOperator {
.context(query_error::TableMutationSnafu)
}
async fn build_index(
&self,
request: BuildIndexTableRequest,
ctx: QueryContextRef,
) -> QueryResult<AffectedRows> {
self.requester
.handle_table_build_index(request, ctx)
.await
.map_err(BoxedError::new)
.context(query_error::TableMutationSnafu)
}
async fn flush_region(
&self,
region_id: RegionId,

View File

@@ -22,9 +22,10 @@ use api::v1::column_def::{
};
use api::v1::region::bulk_insert_request::Body;
use api::v1::region::{
AlterRequest, AlterRequests, BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest,
CreateRequests, DeleteRequests, DropRequest, DropRequests, FlushRequest, InsertRequests,
OpenRequest, TruncateRequest, alter_request, compact_request, region_request, truncate_request,
AlterRequest, AlterRequests, BuildIndexRequest, BulkInsertRequest, CloseRequest,
CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
FlushRequest, InsertRequests, OpenRequest, TruncateRequest, alter_request, compact_request,
region_request, truncate_request,
};
use api::v1::{
self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
@@ -166,6 +167,7 @@ impl RegionRequest {
region_request::Body::Alter(alter) => make_region_alter(alter),
region_request::Body::Flush(flush) => make_region_flush(flush),
region_request::Body::Compact(compact) => make_region_compact(compact),
region_request::Body::BuildIndex(index) => make_region_build_index(index),
region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
region_request::Body::Creates(creates) => make_region_creates(creates),
region_request::Body::Drops(drops) => make_region_drops(drops),
@@ -354,6 +356,14 @@ fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionR
)])
}
fn make_region_build_index(index: BuildIndexRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = index.region_id.into();
Ok(vec![(
region_id,
RegionRequest::BuildIndex(RegionBuildIndexRequest {}),
)])
}
fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = truncate.region_id.into();
match truncate.kind {

View File

@@ -395,6 +395,13 @@ pub struct FlushTableRequest {
pub table_name: String,
}
#[derive(Debug, Clone, Default)]
pub struct BuildIndexTableRequest {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
}
#[derive(Debug, Clone, PartialEq)]
pub struct CompactTableRequest {
pub catalog_name: String,