diff --git a/Cargo.lock b/Cargo.lock index b70498a449..6e3980e8fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5351,7 +5351,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=437e6a1ef8139e3946dcc043ea22c7fe6877019d#437e6a1ef8139e3946dcc043ea22c7fe6877019d" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0df99f09f1d6785055b2d9da96fc4ecc2bdf6803#0df99f09f1d6785055b2d9da96fc4ecc2bdf6803" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", diff --git a/Cargo.toml b/Cargo.toml index e97e3360ba..18f61b2fce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -148,7 +148,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "437e6a1ef8139e3946dcc043ea22c7fe6877019d" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0df99f09f1d6785055b2d9da96fc4ecc2bdf6803" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs index dffb971072..a8ff8b4daf 100644 --- a/src/cmd/src/datanode/objbench.rs +++ b/src/cmd/src/datanode/objbench.rs @@ -161,6 +161,7 @@ impl ObjbenchCommand { level: 0, file_size, available_indexes: Default::default(), + indexes: Default::default(), index_file_size: 0, index_file_id: None, num_rows, diff --git a/src/common/function/src/admin.rs b/src/common/function/src/admin.rs index 11270c3282..e7fd186b86 100644 --- a/src/common/function/src/admin.rs +++ b/src/common/function/src/admin.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod build_index_table; mod flush_compact_region; mod flush_compact_table; mod migrate_region; @@ -26,6 +27,7 @@ use reconcile_catalog::ReconcileCatalogFunction; use reconcile_database::ReconcileDatabaseFunction; use reconcile_table::ReconcileTableFunction; +use crate::admin::build_index_table::BuildIndexFunction; use crate::flush_flow::FlushFlowFunction; use crate::function_registry::FunctionRegistry; @@ -40,6 +42,7 @@ impl AdminFunction { registry.register(CompactRegionFunction::factory()); registry.register(FlushTableFunction::factory()); registry.register(CompactTableFunction::factory()); + registry.register(BuildIndexFunction::factory()); registry.register(FlushFlowFunction::factory()); registry.register(ReconcileCatalogFunction::factory()); registry.register(ReconcileDatabaseFunction::factory()); diff --git a/src/common/function/src/admin/build_index_table.rs b/src/common/function/src/admin/build_index_table.rs new file mode 100644 index 0000000000..155f198c79 --- /dev/null +++ b/src/common/function/src/admin/build_index_table.rs @@ -0,0 +1,80 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow::datatypes::DataType as ArrowDataType; +use common_error::ext::BoxedError; +use common_macro::admin_fn; +use common_query::error::{ + InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu, + UnsupportedInputDataTypeSnafu, +}; +use datafusion_expr::{Signature, Volatility}; +use datatypes::prelude::*; +use session::context::QueryContextRef; +use session::table_name::table_name_to_full_name; +use snafu::{ResultExt, ensure}; +use table::requests::BuildIndexTableRequest; + +use crate::handlers::TableMutationHandlerRef; + +#[admin_fn( + name = BuildIndexFunction, + display_name = build_index, + sig_fn = build_index_signature, + ret = uint64 +)] +pub(crate) async fn build_index( + table_mutation_handler: &TableMutationHandlerRef, + query_ctx: &QueryContextRef, + params: &[ValueRef<'_>], +) -> Result { + ensure!( + params.len() == 1, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect 1, have: {}", + params.len() + ), + } + ); + + let ValueRef::String(table_name) = params[0] else { + return UnsupportedInputDataTypeSnafu { + function: "build_index", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + + let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx) + .map_err(BoxedError::new) + .context(TableMutationSnafu)?; + + let affected_rows = table_mutation_handler + .build_index( + BuildIndexTableRequest { + catalog_name, + schema_name, + table_name, + }, + query_ctx.clone(), + ) + .await?; + + Ok(Value::from(affected_rows as u64)) +} + +fn build_index_signature() -> Signature { + Signature::uniform(1, vec![ArrowDataType::Utf8], Volatility::Immutable) +} diff --git a/src/common/function/src/handlers.rs b/src/common/function/src/handlers.rs index e7ab67e312..0e6060e90c 100644 --- a/src/common/function/src/handlers.rs +++ b/src/common/function/src/handlers.rs @@ -25,7 +25,9 @@ use common_query::Output; use common_query::error::Result; use session::context::QueryContextRef; use store_api::storage::RegionId; -use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest}; +use table::requests::{ + BuildIndexTableRequest, CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest, +}; /// A trait for handling table mutations in `QueryEngine`. #[async_trait] @@ -47,6 +49,13 @@ pub trait TableMutationHandler: Send + Sync { ctx: QueryContextRef, ) -> Result; + /// Trigger an index build task for the table. + async fn build_index( + &self, + request: BuildIndexTableRequest, + ctx: QueryContextRef, + ) -> Result; + /// Trigger a flush task for a table region. async fn flush_region(&self, region_id: RegionId, ctx: QueryContextRef) -> Result; diff --git a/src/common/function/src/state.rs b/src/common/function/src/state.rs index f90479b923..d1a3d341b4 100644 --- a/src/common/function/src/state.rs +++ b/src/common/function/src/state.rs @@ -44,7 +44,8 @@ impl FunctionState { use session::context::QueryContextRef; use store_api::storage::RegionId; use table::requests::{ - CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest, + BuildIndexTableRequest, CompactTableRequest, DeleteRequest, FlushTableRequest, + InsertRequest, }; use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler}; @@ -120,6 +121,14 @@ impl FunctionState { Ok(ROWS) } + async fn build_index( + &self, + _request: BuildIndexTableRequest, + _ctx: QueryContextRef, + ) -> Result { + Ok(ROWS) + } + async fn flush_region( &self, _region_id: RegionId, diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index 2ba7beb701..9272ba4b21 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -431,6 +431,10 @@ impl ColumnSchema { Ok(()) } + + pub fn is_indexed(&self) -> bool { + self.is_inverted_indexed() || self.is_fulltext_indexed() || self.is_skipping_indexed() + } } /// Column extended type set in column schema's metadata. diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 3c84a9ec92..3705132e41 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -397,6 +397,7 @@ impl DefaultCompactor { level: output.output_level, file_size: sst_info.file_size, available_indexes: sst_info.index_metadata.build_available_indexes(), + indexes: sst_info.index_metadata.build_indexes(), index_file_size: sst_info.index_metadata.file_size, index_file_id: None, num_rows: sst_info.num_rows as u64, diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs index 781b905349..90960b9841 100644 --- a/src/mito2/src/compaction/test_util.rs +++ b/src/mito2/src/compaction/test_util.rs @@ -75,6 +75,7 @@ pub fn new_file_handle_with_size_and_sequence( level, file_size, available_indexes: Default::default(), + indexes: Default::default(), index_file_size: 0, index_file_id: None, num_rows: 0, diff --git a/src/mito2/src/engine/index_build_test.rs b/src/mito2/src/engine/index_build_test.rs index 9b71aa2bb3..7c5ec19c7f 100644 --- a/src/mito2/src/engine/index_build_test.rs +++ b/src/mito2/src/engine/index_build_test.rs @@ -19,7 +19,9 @@ use std::sync::Arc; use api::v1::Rows; use store_api::region_engine::RegionEngine; -use store_api::region_request::{AlterKind, RegionAlterRequest, RegionRequest, SetIndexOption}; +use store_api::region_request::{ + AlterKind, RegionAlterRequest, RegionBuildIndexRequest, RegionRequest, SetIndexOption, +}; use store_api::storage::{RegionId, ScanRequest}; use crate::config::{IndexBuildMode, MitoConfig, Mode}; @@ -71,11 +73,9 @@ async fn num_of_index_files(engine: &MitoEngine, scanner: &Scanner, region_id: R index_files_count } -#[allow(dead_code)] fn assert_listener_counts( listener: &IndexBuildListener, expected_begin_count: usize, - expected_success_count: usize, ) { assert_eq!(listener.begin_count(), expected_begin_count); @@ -299,3 +299,171 @@ async fn test_index_build_type_schema_change() { assert_eq!(scanner.num_files(), 1); assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 1); } + +#[tokio::test] +async fn test_index_build_type_manual_basic() { + let mut env = TestEnv::with_prefix("test_index_build_type_manual_").await; + let listener = Arc::new(IndexBuildListener::default()); + let engine = env + .create_engine_with( + async_build_mode_config(false), // Disable index file creation on flush. + None, + Some(listener.clone()), + None, + ) + .await; + + let region_id = RegionId::new(1, 1); + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + // Create a region with index. + let request = CreateRequestBuilder::new().build_with_index(); + let table_dir = request.table_dir.clone(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Flush and make sure there is no index file (because create_on_flush is disabled). + put_and_flush(&engine, region_id, &column_schemas, 10..20).await; + reopen_region(&engine, region_id, table_dir.clone(), true, HashMap::new()).await; + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); + // Index build task is triggered on flush, but not finished. + assert_listener_counts(&listener, 1, 0); + assert_eq!(scanner.num_files(), 1); + assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 0); + + // Trigger manual index build task and make sure index file is built without flush or compaction. + let request = RegionRequest::BuildIndex(RegionBuildIndexRequest {}); + engine.handle_request(region_id, request).await.unwrap(); + listener.wait_finish(1).await; + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); + assert_listener_counts(&listener, 2, 1); + assert_eq!(scanner.num_files(), 1); + assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 1); + + // Test idempotency: Second manual index build request on the same file. + let request = RegionRequest::BuildIndex(RegionBuildIndexRequest {}); + engine.handle_request(region_id, request).await.unwrap(); + reopen_region(&engine, region_id, table_dir.clone(), true, HashMap::new()).await; + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); + // Should still be 2 begin and 1 finish - no new task should be created for already indexed file. + assert_listener_counts(&listener, 2, 1); + assert_eq!(scanner.num_files(), 1); + assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 1); + + // Test idempotency again: Third manual index build request to further verify. + let request = RegionRequest::BuildIndex(RegionBuildIndexRequest {}); + engine.handle_request(region_id, request).await.unwrap(); + reopen_region(&engine, region_id, table_dir.clone(), true, HashMap::new()).await; + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); + assert_listener_counts(&listener, 2, 1); + assert_eq!(scanner.num_files(), 1); + assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 1); +} + +#[tokio::test] +async fn test_index_build_type_manual_consistency() { + let mut env = TestEnv::with_prefix("test_index_build_type_manual_consistency_").await; + let listener = Arc::new(IndexBuildListener::default()); + let engine = env + .create_engine_with( + async_build_mode_config(true), + None, + Some(listener.clone()), + None, + ) + .await; + + let region_id = RegionId::new(1, 1); + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + // Create a region with index. + let create_request = CreateRequestBuilder::new().build_with_index(); + let table_dir = create_request.table_dir.clone(); + let column_schemas = rows_schema(&create_request); + engine + .handle_request(region_id, RegionRequest::Create(create_request.clone())) + .await + .unwrap(); + assert_listener_counts(&listener, 0, 0); + + // Flush and make sure index file exists. + put_and_flush(&engine, region_id, &column_schemas, 10..20).await; + listener.wait_finish(1).await; + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); + assert_listener_counts(&listener, 1, 1); + assert_eq!(scanner.num_files(), 1); + assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 1); + + // Check index build task for consistent file will be skipped. + let request = RegionRequest::BuildIndex(RegionBuildIndexRequest {}); + engine.handle_request(region_id, request).await.unwrap(); + // Reopen the region to ensure the task wasn't skipped due to insufficient time. + reopen_region(&engine, region_id, table_dir.clone(), true, HashMap::new()).await; + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); + // Because the file is consistent, no new index build task is triggered. + assert_listener_counts(&listener, 1, 1); + assert_eq!(scanner.num_files(), 1); + assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 1); + + let mut altered_metadata = create_request.column_metadatas.clone(); + // Set index for field_0. + altered_metadata[1].column_schema.set_inverted_index(true); + let sync_columns_request = RegionAlterRequest { + kind: AlterKind::SyncColumns { + column_metadatas: altered_metadata, + }, + }; + // Use SyncColumns to avoid triggering SchemaChange index build. + engine + .handle_request(region_id, RegionRequest::Alter(sync_columns_request)) + .await + .unwrap(); + reopen_region(&engine, region_id, table_dir, true, HashMap::new()).await; + // SyncColumns won't trigger index build. + assert_listener_counts(&listener, 1, 1); + + let request = RegionRequest::BuildIndex(RegionBuildIndexRequest {}); + engine.handle_request(region_id, request).await.unwrap(); + listener.wait_finish(2).await; // previous 1 + new 1 + // Because the file is inconsistent, new index build task is triggered. + assert_listener_counts(&listener, 2, 2); +} diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 819a227e4b..efd10e96a2 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -637,6 +637,7 @@ impl RegionFlushTask { level: 0, file_size: sst_info.file_size, available_indexes: sst_info.index_metadata.build_available_indexes(), + indexes: sst_info.index_metadata.build_indexes(), index_file_size: sst_info.index_metadata.file_size, index_file_id: None, num_rows: sst_info.num_rows as u64, diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index 71391457bb..6f5ca235e1 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -264,6 +264,7 @@ async fn checkpoint_with_different_compression_types() { level: 0, file_size: 1024000, available_indexes: Default::default(), + indexes: Default::default(), index_file_size: 0, index_file_id: None, num_rows: 0, @@ -331,6 +332,7 @@ fn generate_action_lists(num: usize) -> (Vec, Vec) level: 0, file_size: 1024000, available_indexes: Default::default(), + indexes: Default::default(), index_file_size: 0, index_file_id: None, num_rows: 0, diff --git a/src/mito2/src/remap_manifest.rs b/src/mito2/src/remap_manifest.rs index cafb62f191..79f816bb13 100644 --- a/src/mito2/src/remap_manifest.rs +++ b/src/mito2/src/remap_manifest.rs @@ -426,6 +426,7 @@ mod tests { level: 0, file_size: 1024, available_indexes: SmallVec::new(), + indexes: Default::default(), index_file_size: 0, index_file_id: None, num_rows: 100, diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index ad13ff5808..caead6e3f4 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -26,8 +26,9 @@ use common_time::Timestamp; use partition::expr::PartitionExpr; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; +use store_api::metadata::ColumnMetadata; use store_api::region_request::PathType; -use store_api::storage::{FileId, RegionId}; +use store_api::storage::{ColumnId, FileId, RegionId}; use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; @@ -79,6 +80,8 @@ where pub type Level = u8; /// Maximum level of SSTs. pub const MAX_LEVEL: Level = 2; +/// Type to store index types for a column. +pub type IndexTypes = SmallVec<[IndexType; 4]>; /// Cross-region file id. /// @@ -143,7 +146,17 @@ pub struct FileMeta { /// Size of the file. pub file_size: u64, /// Available indexes of the file. - pub available_indexes: SmallVec<[IndexType; 4]>, + pub available_indexes: IndexTypes, + /// Created indexes of the file for each column. + /// + /// This is essentially a more granular, column-level version of `available_indexes`, + /// primarily used for manual index building in the asynchronous index construction mode. + /// + /// For backward compatibility, older `FileMeta` versions might only contain `available_indexes`. + /// In such cases, we cannot deduce specific column index information from `available_indexes` alone. + /// Therefore, defaulting this `indexes` field to an empty list during deserialization is a + /// reasonable and necessary step to ensure column information consistency. + pub indexes: Vec, /// Size of the index file. pub index_file_size: u64, /// File ID of the index file. @@ -206,6 +219,7 @@ impl Debug for FileMeta { if !self.available_indexes.is_empty() { debug_struct .field("available_indexes", &self.available_indexes) + .field("indexes", &self.indexes) .field("index_file_size", &ReadableSize(self.index_file_size)); } debug_struct @@ -236,6 +250,24 @@ pub enum IndexType { BloomFilterIndex, } +/// Metadata of indexes created for a specific column in an SST file. +/// +/// This structure tracks which index types have been successfully created for a column. +/// It provides more granular, column-level index information compared to the file-level +/// `available_indexes` field in [`FileMeta`]. +/// +/// This is primarily used for: +/// - Manual index building in asynchronous index construction mode +/// - Verifying index consistency between files and region metadata +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] +#[serde(default)] +pub struct ColumnIndexMetadata { + /// The column ID this index metadata applies to. + pub column_id: ColumnId, + /// List of index types that have been successfully created for this column. + pub created_indexes: IndexTypes, +} + impl FileMeta { pub fn exists_index(&self) -> bool { !self.available_indexes.is_empty() @@ -261,6 +293,40 @@ impl FileMeta { self.index_file_size } + /// Check whether the file index is consistent with the given region metadata. + pub fn is_index_consistent_with_region(&self, metadata: &[ColumnMetadata]) -> bool { + let id_to_indexes = self + .indexes + .iter() + .map(|index| (index.column_id, index.created_indexes.clone())) + .collect::>(); + for column in metadata { + if !column.column_schema.is_indexed() { + continue; + } + if let Some(indexes) = id_to_indexes.get(&column.column_id) { + if column.column_schema.is_inverted_indexed() + && !indexes.contains(&IndexType::InvertedIndex) + { + return false; + } + if column.column_schema.is_fulltext_indexed() + && !indexes.contains(&IndexType::FulltextIndex) + { + return false; + } + if column.column_schema.is_skipping_indexed() + && !indexes.contains(&IndexType::BloomFilterIndex) + { + return false; + } + } else { + return false; + } + } + true + } + /// Returns the cross-region file id. pub fn file_id(&self) -> RegionFileId { RegionFileId::new(self.region_id, self.file_id) @@ -475,6 +541,10 @@ pub async fn delete_files( mod tests { use std::str::FromStr; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ + ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, SkippingIndexOptions, + }; use datatypes::value::Value; use partition::expr::{PartitionExpr, col}; @@ -488,6 +558,10 @@ mod tests { level, file_size: 0, available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + indexes: vec![ColumnIndexMetadata { + column_id: 0, + created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + }], index_file_size: 0, index_file_id: None, num_rows: 0, @@ -510,7 +584,7 @@ mod tests { fn test_deserialize_from_string() { let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\ \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\ - \"available_indexes\":[\"InvertedIndex\"],\"level\":0}"; + \"available_indexes\":[\"InvertedIndex\"],\"indexes\":[{\"column_id\": 0, \"created_indexes\": [\"InvertedIndex\"]}],\"level\":0}"; let file_meta = create_file_meta( FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(), 0, @@ -535,6 +609,10 @@ mod tests { level: 0, file_size: 0, available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + indexes: vec![ColumnIndexMetadata { + column_id: 0, + created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + }], index_file_size: 0, index_file_id: None, num_rows: 0, @@ -653,4 +731,147 @@ mod tests { let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap(); assert!(file_meta_empty.partition_expr.is_none()); } + + #[test] + fn test_file_meta_indexes_backward_compatibility() { + // Old FileMeta format without the 'indexes' field + let json_old_file_meta = r#"{ + "region_id": 0, + "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55", + "time_range": [ + {"value": 0, "unit": "Millisecond"}, + {"value": 0, "unit": "Millisecond"} + ], + "available_indexes": ["InvertedIndex"], + "level": 0, + "file_size": 0, + "index_file_size": 0, + "num_rows": 0, + "num_row_groups": 0 + }"#; + + let deserialized_file_meta: FileMeta = serde_json::from_str(json_old_file_meta).unwrap(); + + // Verify backward compatibility: indexes field should default to empty vec + assert_eq!(deserialized_file_meta.indexes, vec![]); + + let expected_indexes: IndexTypes = SmallVec::from_iter([IndexType::InvertedIndex]); + assert_eq!(deserialized_file_meta.available_indexes, expected_indexes); + + assert_eq!( + deserialized_file_meta.file_id, + FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap() + ); + } + #[test] + fn test_is_index_consistent_with_region() { + fn new_column_meta( + id: ColumnId, + name: &str, + inverted: bool, + fulltext: bool, + skipping: bool, + ) -> ColumnMetadata { + let mut column_schema = + ColumnSchema::new(name, ConcreteDataType::string_datatype(), true); + if inverted { + column_schema = column_schema.with_inverted_index(true); + } + if fulltext { + column_schema = column_schema + .with_fulltext_options(FulltextOptions::new_unchecked( + true, + FulltextAnalyzer::English, + false, + FulltextBackend::Bloom, + 1000, + 0.01, + )) + .unwrap(); + } + if skipping { + column_schema = column_schema + .with_skipping_options(SkippingIndexOptions::new_unchecked( + 1024, + 0.01, + datatypes::schema::SkippingIndexType::BloomFilter, + )) + .unwrap(); + } + + ColumnMetadata { + column_schema, + semantic_type: api::v1::SemanticType::Tag, + column_id: id, + } + } + + // Case 1: Perfect match. File has exactly the required indexes. + let mut file_meta = FileMeta { + indexes: vec![ColumnIndexMetadata { + column_id: 1, + created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + }], + ..Default::default() + }; + let region_meta = vec![new_column_meta(1, "tag1", true, false, false)]; + assert!(file_meta.is_index_consistent_with_region(®ion_meta)); + + // Case 2: Superset match. File has more indexes than required. + file_meta.indexes = vec![ColumnIndexMetadata { + column_id: 1, + created_indexes: SmallVec::from_iter([ + IndexType::InvertedIndex, + IndexType::BloomFilterIndex, + ]), + }]; + let region_meta = vec![new_column_meta(1, "tag1", true, false, false)]; + assert!(file_meta.is_index_consistent_with_region(®ion_meta)); + + // Case 3: Missing index type. File has the column but lacks the required index type. + file_meta.indexes = vec![ColumnIndexMetadata { + column_id: 1, + created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + }]; + let region_meta = vec![new_column_meta(1, "tag1", true, true, false)]; // Requires fulltext too + assert!(!file_meta.is_index_consistent_with_region(®ion_meta)); + + // Case 4: Missing column. Region requires an index on a column not in the file's index list. + file_meta.indexes = vec![ColumnIndexMetadata { + column_id: 2, // File only has index for column 2 + created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + }]; + let region_meta = vec![new_column_meta(1, "tag1", true, false, false)]; // Requires index on column 1 + assert!(!file_meta.is_index_consistent_with_region(®ion_meta)); + + // Case 5: No indexes required by region. Should always be consistent. + file_meta.indexes = vec![ColumnIndexMetadata { + column_id: 1, + created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + }]; + let region_meta = vec![new_column_meta(1, "tag1", false, false, false)]; // No index required + assert!(file_meta.is_index_consistent_with_region(®ion_meta)); + + // Case 6: Empty file indexes. Region requires an index. + file_meta.indexes = vec![]; + let region_meta = vec![new_column_meta(1, "tag1", true, false, false)]; + assert!(!file_meta.is_index_consistent_with_region(®ion_meta)); + + // Case 7: Multiple columns, one is inconsistent. + file_meta.indexes = vec![ + ColumnIndexMetadata { + column_id: 1, + created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + }, + ColumnIndexMetadata { + column_id: 2, // Column 2 is missing the required BloomFilterIndex + created_indexes: SmallVec::from_iter([IndexType::FulltextIndex]), + }, + ]; + let region_meta = vec![ + new_column_meta(1, "tag1", true, false, false), + new_column_meta(2, "tag2", false, true, true), // Requires Fulltext and BloomFilter + ]; + assert!(!file_meta.is_index_consistent_with_region(®ion_meta)); + } } diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 81a004ff15..bf1c2ee5c3 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -185,7 +185,9 @@ mod tests { use super::*; use crate::access_layer::AccessLayer; use crate::schedule::scheduler::{LocalScheduler, Scheduler}; - use crate::sst::file::{FileHandle, FileMeta, FileTimeRange, IndexType, RegionFileId}; + use crate::sst::file::{ + ColumnIndexMetadata, FileHandle, FileMeta, FileTimeRange, IndexType, RegionFileId, + }; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::location; @@ -233,6 +235,7 @@ mod tests { level: 0, file_size: 4096, available_indexes: Default::default(), + indexes: Default::default(), index_file_size: 0, index_file_id: None, num_rows: 0, @@ -301,6 +304,10 @@ mod tests { level: 0, file_size: 4096, available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + indexes: vec![ColumnIndexMetadata { + column_id: 0, + created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + }], index_file_size: 4096, index_file_id: None, num_rows: 1024, diff --git a/src/mito2/src/sst/file_ref.rs b/src/mito2/src/sst/file_ref.rs index a281aeb5fa..e3cc38640c 100644 --- a/src/mito2/src/sst/file_ref.rs +++ b/src/mito2/src/sst/file_ref.rs @@ -208,7 +208,7 @@ mod tests { use store_api::storage::{FileId, RegionId}; use super::*; - use crate::sst::file::{FileMeta, FileTimeRange, IndexType, RegionFileId}; + use crate::sst::file::{ColumnIndexMetadata, FileMeta, FileTimeRange, IndexType, RegionFileId}; #[tokio::test] async fn test_file_ref_mgr() { @@ -225,6 +225,10 @@ mod tests { level: 0, file_size: 4096, available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + indexes: vec![ColumnIndexMetadata { + column_id: 0, + created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + }], index_file_size: 4096, index_file_id: None, num_rows: 1024, diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index c51a3893e6..c01f64556f 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -60,7 +60,9 @@ use crate::request::{ WorkerRequestWithTime, }; use crate::schedule::scheduler::{Job, SchedulerRef}; -use crate::sst::file::{FileHandle, FileMeta, IndexType, RegionFileId}; +use crate::sst::file::{ + ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId, +}; use crate::sst::file_purger::FilePurgerRef; use crate::sst::index::fulltext_index::creator::FulltextIndexer; use crate::sst::index::intermediate::IntermediateManager; @@ -101,6 +103,35 @@ impl IndexOutput { } indexes } + + pub fn build_indexes(&self) -> Vec { + let mut map: HashMap = HashMap::new(); + + if self.inverted_index.is_available() { + for &col in &self.inverted_index.columns { + map.entry(col).or_default().push(IndexType::InvertedIndex); + } + } + if self.fulltext_index.is_available() { + for &col in &self.fulltext_index.columns { + map.entry(col).or_default().push(IndexType::FulltextIndex); + } + } + if self.bloom_filter.is_available() { + for &col in &self.bloom_filter.columns { + map.entry(col) + .or_default() + .push(IndexType::BloomFilterIndex); + } + } + + map.into_iter() + .map(|(column_id, created_indexes)| ColumnIndexMetadata { + column_id, + created_indexes, + }) + .collect::>() + } } /// Base output of the index creation. @@ -416,7 +447,7 @@ impl IndexerBuilderImpl { } /// Type of an index build task. -#[derive(Debug, Clone, IntoStaticStr)] +#[derive(Debug, Clone, IntoStaticStr, PartialEq)] pub enum IndexBuildType { /// Build index when schema change. SchemaChange, @@ -728,6 +759,7 @@ impl IndexBuildTask { index_file_id: FileId, ) -> Result { self.file_meta.available_indexes = output.build_available_indexes(); + self.file_meta.indexes = output.build_indexes(); self.file_meta.index_file_size = output.file_size; self.file_meta.index_file_id = Some(index_file_id); let edit = RegionEdit { diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 95ba0b28b3..4553372569 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -767,6 +767,7 @@ mod tests { level: 0, file_size: info.file_size, available_indexes: info.index_metadata.build_available_indexes(), + indexes: info.index_metadata.build_indexes(), index_file_size: info.index_metadata.file_size, index_file_id: None, num_row_groups: info.num_row_groups, diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index dc75a1e08c..8299c9e3da 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -124,6 +124,7 @@ pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64) level: 0, file_size: 0, available_indexes: Default::default(), + indexes: Default::default(), index_file_size: 0, index_file_id: None, num_rows: 0, diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 53b28478c9..68fe723ab0 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -102,6 +102,7 @@ impl VersionControlBuilder { level: 0, file_size: 0, // We don't care file size. available_indexes: Default::default(), + indexes: Default::default(), index_file_size: 0, index_file_id: None, num_rows: 0, @@ -192,6 +193,7 @@ pub(crate) fn apply_edit( level: 0, file_size: 0, // We don't care file size. available_indexes: Default::default(), + indexes: Default::default(), index_file_size: 0, index_file_id: None, num_rows: 0, diff --git a/src/mito2/src/worker/handle_rebuild_index.rs b/src/mito2/src/worker/handle_rebuild_index.rs index 6e7e96d7f2..5030cd77cd 100644 --- a/src/mito2/src/worker/handle_rebuild_index.rs +++ b/src/mito2/src/worker/handle_rebuild_index.rs @@ -85,7 +85,6 @@ impl RegionWorkerLoop { } /// Handles manual build index requests. - /// TODO(SNC123): Support admin function of manual index building later. pub(crate) async fn handle_build_index_request( &mut self, region_id: RegionId, @@ -126,10 +125,16 @@ impl RegionWorkerLoop { .collect(); let build_tasks = if request.file_metas.is_empty() { - // NOTE: Currently, rebuilding the index will reconstruct the index for all - // files in the region, which is a simplified approach and is not yet available for - // production use; further optimization is required. - all_files.values().cloned().collect::>() + // If no specific files are provided, find files whose index is inconsistent with the region metadata. + all_files + .values() + .filter(|file| { + !file + .meta_ref() + .is_index_consistent_with_region(&version.metadata.column_metadatas) + }) + .cloned() + .collect::>() } else { request .file_metas diff --git a/src/operator/src/request.rs b/src/operator/src/request.rs index 1bca461842..a5ed045313 100644 --- a/src/operator/src/request.rs +++ b/src/operator/src/request.rs @@ -15,19 +15,19 @@ use std::sync::Arc; use api::v1::region::region_request::Body as RegionRequestBody; -use api::v1::region::{CompactRequest, FlushRequest, RegionRequestHeader}; +use api::v1::region::{BuildIndexRequest, CompactRequest, FlushRequest, RegionRequestHeader}; use catalog::CatalogManagerRef; use common_catalog::build_db_string; use common_meta::node_manager::{AffectedRows, NodeManagerRef}; use common_meta::peer::Peer; use common_telemetry::tracing_context::TracingContext; -use common_telemetry::{error, info}; +use common_telemetry::{debug, error, info}; use futures_util::future; use partition::manager::{PartitionInfo, PartitionRuleManagerRef}; use session::context::QueryContextRef; use snafu::prelude::*; use store_api::storage::RegionId; -use table::requests::{CompactTableRequest, FlushTableRequest}; +use table::requests::{BuildIndexTableRequest, CompactTableRequest, FlushTableRequest}; use crate::error::{ CatalogSnafu, FindRegionLeaderSnafu, FindTablePartitionRuleSnafu, JoinTaskSnafu, @@ -90,6 +90,43 @@ impl Requester { .await } + /// Handle the request to build index for table. + pub async fn handle_table_build_index( + &self, + request: BuildIndexTableRequest, + ctx: QueryContextRef, + ) -> Result { + let partitions = self + .get_table_partitions( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ) + .await?; + + let requests = partitions + .into_iter() + .map(|partition| { + RegionRequestBody::BuildIndex(BuildIndexRequest { + region_id: partition.id.into(), + }) + }) + .collect(); + + info!( + "Handle table manual build index for table {}", + request.table_name + ); + debug!("Request details: {:?}", request); + + self.do_request( + requests, + Some(build_db_string(&request.catalog_name, &request.schema_name)), + &ctx, + ) + .await + } + /// Handle the request to compact table. pub async fn handle_table_compaction( &self, @@ -201,6 +238,7 @@ impl Requester { let region_id = match req { RegionRequestBody::Flush(req) => req.region_id, RegionRequestBody::Compact(req) => req.region_id, + RegionRequestBody::BuildIndex(req) => req.region_id, _ => { error!("Unsupported region request: {:?}", req); return UnsupportedRegionRequestSnafu {}.fail(); diff --git a/src/operator/src/table.rs b/src/operator/src/table.rs index 52c37bb401..13ed57200c 100644 --- a/src/operator/src/table.rs +++ b/src/operator/src/table.rs @@ -23,8 +23,8 @@ use session::context::QueryContextRef; use snafu::ResultExt; use store_api::storage::RegionId; use table::requests::{ - CompactTableRequest, DeleteRequest as TableDeleteRequest, FlushTableRequest, - InsertRequest as TableInsertRequest, + BuildIndexTableRequest, CompactTableRequest, DeleteRequest as TableDeleteRequest, + FlushTableRequest, InsertRequest as TableInsertRequest, }; use crate::delete::DeleterRef; @@ -97,6 +97,18 @@ impl TableMutationHandler for TableMutationOperator { .context(query_error::TableMutationSnafu) } + async fn build_index( + &self, + request: BuildIndexTableRequest, + ctx: QueryContextRef, + ) -> QueryResult { + self.requester + .handle_table_build_index(request, ctx) + .await + .map_err(BoxedError::new) + .context(query_error::TableMutationSnafu) + } + async fn flush_region( &self, region_id: RegionId, diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 7fb4b7fda0..bbd532c17c 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -22,9 +22,10 @@ use api::v1::column_def::{ }; use api::v1::region::bulk_insert_request::Body; use api::v1::region::{ - AlterRequest, AlterRequests, BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest, - CreateRequests, DeleteRequests, DropRequest, DropRequests, FlushRequest, InsertRequests, - OpenRequest, TruncateRequest, alter_request, compact_request, region_request, truncate_request, + AlterRequest, AlterRequests, BuildIndexRequest, BulkInsertRequest, CloseRequest, + CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests, + FlushRequest, InsertRequests, OpenRequest, TruncateRequest, alter_request, compact_request, + region_request, truncate_request, }; use api::v1::{ self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows, @@ -166,6 +167,7 @@ impl RegionRequest { region_request::Body::Alter(alter) => make_region_alter(alter), region_request::Body::Flush(flush) => make_region_flush(flush), region_request::Body::Compact(compact) => make_region_compact(compact), + region_request::Body::BuildIndex(index) => make_region_build_index(index), region_request::Body::Truncate(truncate) => make_region_truncate(truncate), region_request::Body::Creates(creates) => make_region_creates(creates), region_request::Body::Drops(drops) => make_region_drops(drops), @@ -354,6 +356,14 @@ fn make_region_compact(compact: CompactRequest) -> Result Result> { + let region_id = index.region_id.into(); + Ok(vec![( + region_id, + RegionRequest::BuildIndex(RegionBuildIndexRequest {}), + )]) +} + fn make_region_truncate(truncate: TruncateRequest) -> Result> { let region_id = truncate.region_id.into(); match truncate.kind { diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 6cdf945480..3b4058e083 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -395,6 +395,13 @@ pub struct FlushTableRequest { pub table_name: String, } +#[derive(Debug, Clone, Default)] +pub struct BuildIndexTableRequest { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, +} + #[derive(Debug, Clone, PartialEq)] pub struct CompactTableRequest { pub catalog_name: String, diff --git a/tests/cases/standalone/common/function/admin/build_index_table.result b/tests/cases/standalone/common/function/admin/build_index_table.result new file mode 100644 index 0000000000..4951df9e0d --- /dev/null +++ b/tests/cases/standalone/common/function/admin/build_index_table.result @@ -0,0 +1,72 @@ +CREATE TABLE test ( + ts TIMESTAMP TIME INDEX, + msg TEXT, +); + +Affected Rows: 0 + +INSERT INTO test VALUES +(1,"The quick brown fox jumps over the lazy dog"), +(2,"The quick brown fox jumps over the lazy cat"), +(3,"The quick brown fox jumps over the lazy mouse"), +(4,"The quick brown fox jumps over the lazy rabbit"), +(5,"The quick brown fox jumps over the lazy turtle"); + +Affected Rows: 5 + +SELECT * FROM test; + ++-------------------------+------------------------------------------------+ +| ts | msg | ++-------------------------+------------------------------------------------+ +| 1970-01-01T00:00:00.001 | The quick brown fox jumps over the lazy dog | +| 1970-01-01T00:00:00.002 | The quick brown fox jumps over the lazy cat | +| 1970-01-01T00:00:00.003 | The quick brown fox jumps over the lazy mouse | +| 1970-01-01T00:00:00.004 | The quick brown fox jumps over the lazy rabbit | +| 1970-01-01T00:00:00.005 | The quick brown fox jumps over the lazy turtle | ++-------------------------+------------------------------------------------+ + +ADMIN FLUSH_TABLE('test'); + ++---------------------------+ +| ADMIN FLUSH_TABLE('test') | ++---------------------------+ +| 0 | ++---------------------------+ + +-- SQLNESS SLEEP 1s +-- No fulltext index yet +SELECT index_size FROM INFORMATION_SCHEMA.REGION_STATISTICS; + ++------------+ +| index_size | ++------------+ +| 0 | ++------------+ + +ALTER TABLE test MODIFY COLUMN msg SET FULLTEXT INDEX; + +Affected Rows: 0 + +ADMIN BUILD_INDEX('test'); + ++---------------------------+ +| ADMIN BUILD_INDEX('test') | ++---------------------------+ +| 0 | ++---------------------------+ + +-- SQLNESS SLEEP 1s +-- Fulltext index built +SELECT index_size FROM INFORMATION_SCHEMA.REGION_STATISTICS; + ++------------+ +| index_size | ++------------+ +| 318 | ++------------+ + +DROP TABLE test; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/function/admin/build_index_table.sql b/tests/cases/standalone/common/function/admin/build_index_table.sql new file mode 100644 index 0000000000..da3b84f23d --- /dev/null +++ b/tests/cases/standalone/common/function/admin/build_index_table.sql @@ -0,0 +1,29 @@ +CREATE TABLE test ( + ts TIMESTAMP TIME INDEX, + msg TEXT, +); + +INSERT INTO test VALUES +(1,"The quick brown fox jumps over the lazy dog"), +(2,"The quick brown fox jumps over the lazy cat"), +(3,"The quick brown fox jumps over the lazy mouse"), +(4,"The quick brown fox jumps over the lazy rabbit"), +(5,"The quick brown fox jumps over the lazy turtle"); + +SELECT * FROM test; + +ADMIN FLUSH_TABLE('test'); + +-- SQLNESS SLEEP 1s +-- No fulltext index yet +SELECT index_size FROM INFORMATION_SCHEMA.REGION_STATISTICS; + +ALTER TABLE test MODIFY COLUMN msg SET FULLTEXT INDEX; + +ADMIN BUILD_INDEX('test'); + +-- SQLNESS SLEEP 1s +-- Fulltext index built +SELECT index_size FROM INFORMATION_SCHEMA.REGION_STATISTICS; + +DROP TABLE test;