From 779865d389130c34bfd3944a5f456ceecc7c8174 Mon Sep 17 00:00:00 2001 From: Sicong Hu Date: Fri, 10 Oct 2025 11:29:32 +0800 Subject: [PATCH] feat: introduce IndexBuildTask for async index build (#6927) * feat: add framework for asynchronous index building Signed-off-by: SNC123 * test: add unit tests for IndexBuildTask Signed-off-by: SNC123 * chore: clippy,format,fix-udeps Signed-off-by: SNC123 * fix: correct write cache logic in IndexBuildTask Signed-off-by: SNC123 * chore: clippy, resolve conflicts Signed-off-by: SNC123 * chore: resolve conflicts Signed-off-by: SNC123 * fix: apply review suggestions Signed-off-by: SNC123 * chore: resolve conflicts Signed-off-by: SNC123 * fix: clean up index files in aborted case Signed-off-by: SNC123 * refactor: move manifest update logic into IndexBuildTask Signed-off-by: SNC123 * fix: enhance check file logic and error handling Signed-off-by: SNC123 --------- Signed-off-by: SNC123 --- src/mito2/src/access_layer.rs | 16 +- src/mito2/src/cache/write_cache.rs | 30 +- src/mito2/src/compaction/compactor.rs | 2 + src/mito2/src/config.rs | 18 + src/mito2/src/flush.rs | 1 + src/mito2/src/request.rs | 33 + src/mito2/src/sst/file.rs | 8 + src/mito2/src/sst/index.rs | 742 ++++++++++++++++++- src/mito2/src/sst/parquet.rs | 23 +- src/mito2/src/sst/parquet/writer.rs | 37 +- src/mito2/src/sst/version.rs | 6 +- src/mito2/src/test_util/scheduler_util.rs | 10 +- src/mito2/src/worker.rs | 27 + src/mito2/src/worker/handle_rebuild_index.rs | 149 ++++ tests-integration/tests/http.rs | 2 + 15 files changed, 1049 insertions(+), 55 deletions(-) create mode 100644 src/mito2/src/worker/handle_rebuild_index.rs diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 6af56265cb..e5401209ca 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -32,7 +32,7 @@ use store_api::storage::{FileId, RegionId, SequenceNumber}; use crate::cache::CacheManagerRef; use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::cache::write_cache::SstUploadRequest; -use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; +use crate::config::{BloomFilterConfig, FulltextIndexConfig, IndexConfig, InvertedIndexConfig}; use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result}; use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED}; use crate::read::{FlatSource, Source}; @@ -40,7 +40,7 @@ use crate::region::options::IndexOptions; use crate::sst::file::{FileHandle, RegionFileId}; use crate::sst::index::IndexerBuilderImpl; use crate::sst::index::intermediate::IntermediateManager; -use crate::sst::index::puffin_manager::PuffinManagerFactory; +use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager}; use crate::sst::location::{self, region_dir_from_table_dir}; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; @@ -204,6 +204,14 @@ impl AccessLayer { &self.intermediate_manager } + /// Build the puffin manager. + pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager { + let store = self.object_store.clone(); + let path_provider = + RegionFilePathFactory::new(self.table_dir().to_string(), self.path_type()); + self.puffin_manager_factory.build(store, path_provider) + } + /// Deletes a SST file (and its index file if it has one) with given file id. pub(crate) async fn delete_sst(&self, region_file_id: &RegionFileId) -> Result<()> { let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type); @@ -273,7 +281,7 @@ impl AccessLayer { let store = self.object_store.clone(); let path_provider = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type); let indexer_builder = IndexerBuilderImpl { - op_type: request.op_type, + build_type: request.op_type.into(), metadata: request.metadata.clone(), row_group_size: write_opts.row_group_size, puffin_manager: self @@ -292,6 +300,7 @@ impl AccessLayer { let mut writer = ParquetWriter::new_with_object_store( self.object_store.clone(), request.metadata, + request.index_config, indexer_builder, path_provider, Metrics::new(write_type), @@ -435,6 +444,7 @@ pub struct SstWriteRequest { /// Configs for index pub index_options: IndexOptions, + pub index_config: IndexConfig, pub inverted_index_config: InvertedIndexConfig, pub fulltext_index_config: FulltextIndexConfig, pub bloom_filter_index_config: BloomFilterConfig, diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 3a4c9b1ece..d2b7e34997 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -36,7 +36,7 @@ use crate::metrics::{ use crate::sst::file::RegionFileId; use crate::sst::index::IndexerBuilderImpl; use crate::sst::index::intermediate::IntermediateManager; -use crate::sst::index::puffin_manager::PuffinManagerFactory; +use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager}; use crate::sst::parquet::writer::ParquetWriter; use crate::sst::parquet::{SstInfo, WriteOptions}; use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; @@ -101,6 +101,13 @@ impl WriteCache { self.file_cache.clone() } + /// Build the puffin manager + pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager { + let store = self.file_cache.local_store(); + let path_provider = WriteCachePathProvider::new(self.file_cache.clone()); + self.puffin_manager_factory.build(store, path_provider) + } + /// Put encoded SST data to the cache and upload to the remote object store. pub(crate) async fn put_and_upload_sst( &self, @@ -151,6 +158,11 @@ impl WriteCache { Ok(metrics) } + /// Returns the intermediate manager of the write cache. + pub(crate) fn intermediate_manager(&self) -> &IntermediateManager { + &self.intermediate_manager + } + /// Writes SST to the cache and then uploads it to the remote object store. pub(crate) async fn write_and_upload_sst( &self, @@ -164,7 +176,7 @@ impl WriteCache { let store = self.file_cache.local_store(); let path_provider = WriteCachePathProvider::new(self.file_cache.clone()); let indexer = IndexerBuilderImpl { - op_type: write_request.op_type, + build_type: write_request.op_type.into(), metadata: write_request.metadata.clone(), row_group_size: write_opts.row_group_size, puffin_manager: self @@ -182,6 +194,7 @@ impl WriteCache { let mut writer = ParquetWriter::new_with_object_store( store.clone(), write_request.metadata, + write_request.index_config, indexer, path_provider.clone(), Metrics::new(write_type), @@ -342,7 +355,7 @@ impl WriteCache { } /// Uploads a Parquet file or a Puffin file to the remote object store. - async fn upload( + pub(crate) async fn upload( &self, index_key: IndexKey, upload_path: &str, @@ -423,7 +436,7 @@ pub struct SstUploadRequest { } /// A structs to track files to upload and clean them if upload failed. -struct UploadTracker { +pub(crate) struct UploadTracker { /// Id of the region to track. region_id: RegionId, /// Paths of files uploaded successfully. @@ -432,7 +445,7 @@ struct UploadTracker { impl UploadTracker { /// Creates a new instance of `UploadTracker` for a given region. - fn new(region_id: RegionId) -> Self { + pub(crate) fn new(region_id: RegionId) -> Self { Self { region_id, files_uploaded: Vec::new(), @@ -440,12 +453,12 @@ impl UploadTracker { } /// Add a file path to the list of uploaded files. - fn push_uploaded_file(&mut self, path: String) { + pub(crate) fn push_uploaded_file(&mut self, path: String) { self.files_uploaded.push(path); } /// Cleans uploaded files and files in the file cache at best effort. - async fn clean( + pub(crate) async fn clean( &self, sst_info: &SstInfoArray, file_cache: &FileCacheRef, @@ -529,6 +542,7 @@ mod tests { max_sequence: None, cache_manager: Default::default(), index_options: IndexOptions::default(), + index_config: Default::default(), inverted_index_config: Default::default(), fulltext_index_config: Default::default(), bloom_filter_index_config: Default::default(), @@ -627,6 +641,7 @@ mod tests { max_sequence: None, cache_manager: cache_manager.clone(), index_options: IndexOptions::default(), + index_config: Default::default(), inverted_index_config: Default::default(), fulltext_index_config: Default::default(), bloom_filter_index_config: Default::default(), @@ -706,6 +721,7 @@ mod tests { max_sequence: None, cache_manager: cache_manager.clone(), index_options: IndexOptions::default(), + index_config: Default::default(), inverted_index_config: Default::default(), fulltext_index_config: Default::default(), bloom_filter_index_config: Default::default(), diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 0cf946416c..8de26b6570 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -345,6 +345,7 @@ impl Compactor for DefaultCompactor { let flat_format = compaction_region .engine_config .enable_experimental_flat_format; + let index_config = compaction_region.engine_config.index.clone(); let inverted_index_config = compaction_region.engine_config.inverted_index.clone(); let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone(); let bloom_filter_index_config = @@ -389,6 +390,7 @@ impl Compactor for DefaultCompactor { storage, max_sequence: max_sequence.map(NonZero::get), index_options, + index_config, inverted_index_config, fulltext_index_config, bloom_filter_index_config, diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 966edfdf00..20e7550b2f 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -80,6 +80,8 @@ pub struct MitoConfig { pub compress_manifest: bool, // Background job configs: + /// Max number of running background index build jobs (default: 1/8 of cpu cores). + pub max_background_index_builds: usize, /// Max number of running background flush jobs (default: 1/2 of cpu cores). pub max_background_flushes: usize, /// Max number of running background compaction jobs (default: 1/4 of cpu cores). @@ -157,6 +159,7 @@ impl Default for MitoConfig { experimental_manifest_keep_removed_file_count: 256, experimental_manifest_keep_removed_file_ttl: Duration::from_secs(60 * 60), compress_manifest: false, + max_background_index_builds: divide_num_cpus(8), max_background_flushes: divide_num_cpus(2), max_background_compactions: divide_num_cpus(4), max_background_purges: common_config::utils::get_cpus(), @@ -308,6 +311,17 @@ impl MitoConfig { } } +/// Index build mode. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum IndexBuildMode { + /// Build index synchronously. + #[default] + Sync, + /// Build index asynchronously. + Async, +} + #[serde_as] #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(default)] @@ -331,6 +345,9 @@ pub struct IndexConfig { #[serde(with = "humantime_serde")] pub staging_ttl: Option, + /// Index Build Mode + pub build_mode: IndexBuildMode, + /// Write buffer size for creating the index. pub write_buffer_size: ReadableSize, @@ -350,6 +367,7 @@ impl Default for IndexConfig { aux_path: String::new(), staging_size: ReadableSize::gb(2), staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)), + build_mode: IndexBuildMode::default(), write_buffer_size: ReadableSize::mb(8), metadata_cache_size: ReadableSize::mb(64), content_cache_size: ReadableSize::mb(128), diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 80ef610634..eb5e605ce1 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -658,6 +658,7 @@ impl RegionFlushTask { storage: version.options.storage.clone(), max_sequence: Some(max_sequence), index_options: self.index_options.clone(), + index_config: self.engine_config.index.clone(), inverted_index_config: self.engine_config.inverted_index.clone(), fulltext_index_config: self.engine_config.fulltext_index.clone(), bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(), diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 508a767e20..6ec797e2dc 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -50,6 +50,8 @@ use crate::manifest::action::{RegionEdit, TruncateKind}; use crate::memtable::MemtableId; use crate::memtable::bulk::part::BulkPart; use crate::metrics::COMPACTION_ELAPSED_TOTAL; +use crate::sst::file::FileMeta; +use crate::sst::index::IndexBuildType; use crate::wal::EntryId; use crate::wal::entry_distributor::WalEntryReceiver; @@ -597,6 +599,10 @@ pub(crate) enum WorkerRequest { /// Keep the manifest of a region up to date. SyncRegion(RegionSyncRequest), + /// Build indexes of a region. + #[allow(dead_code)] + BuildIndexRegion(RegionBuildIndexRequest), + /// Bulk inserts request and region metadata. BulkInserts { metadata: Option, @@ -776,6 +782,11 @@ pub(crate) enum BackgroundNotify { FlushFinished(FlushFinished), /// Flush has failed. FlushFailed(FlushFailed), + /// Index build has finished. + IndexBuildFinished(IndexBuildFinished), + /// Index build has failed. + #[allow(dead_code)] + IndexBuildFailed(IndexBuildFailed), /// Compaction has finished. CompactionFinished(CompactionFinished), /// Compaction has failed. @@ -832,6 +843,20 @@ pub(crate) struct FlushFailed { pub(crate) err: Arc, } +#[derive(Debug)] +pub(crate) struct IndexBuildFinished { + #[allow(dead_code)] + pub(crate) region_id: RegionId, + pub(crate) edit: RegionEdit, +} + +/// Notifies an index build job has failed. +#[derive(Debug)] +pub(crate) struct IndexBuildFailed { + #[allow(dead_code)] + pub(crate) err: Arc, +} + /// Notifies a compaction job has finished. #[derive(Debug)] pub(crate) struct CompactionFinished { @@ -924,6 +949,14 @@ pub(crate) struct RegionEditResult { pub(crate) result: Result<()>, } +#[derive(Debug)] +pub(crate) struct RegionBuildIndexRequest { + pub(crate) region_id: RegionId, + pub(crate) build_type: IndexBuildType, + /// files need to build index, empty means all. + pub(crate) file_metas: Vec, +} + #[derive(Debug)] pub(crate) struct RegionSyncRequest { pub(crate) region_id: RegionId, diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 0e5635610f..4ddde55746 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -317,6 +317,10 @@ impl FileHandle { &self.inner.meta } + pub fn file_purger(&self) -> FilePurgerRef { + self.inner.file_purger.clone() + } + pub fn size(&self) -> u64 { self.inner.meta.file_size } @@ -332,6 +336,10 @@ impl FileHandle { pub fn level(&self) -> Level { self.inner.meta.level } + + pub fn is_deleted(&self) -> bool { + self.inner.deleted.load(Ordering::Relaxed) + } } /// Inner data of [FileHandle]. diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index fa53807ef3..a79adcf787 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -22,25 +22,40 @@ mod statistics; pub(crate) mod store; use std::num::NonZeroUsize; +use std::sync::Arc; use bloom_filter::creator::BloomFilterIndexer; -use common_telemetry::{debug, warn}; +use common_telemetry::{debug, info, warn}; use datatypes::arrow::record_batch::RecordBatch; use puffin_manager::SstPuffinManager; -use smallvec::SmallVec; +use smallvec::{SmallVec, smallvec}; use statistics::{ByteCount, RowCount}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, FileId, RegionId}; +use strum::IntoStaticStr; +use tokio::sync::{mpsc, oneshot}; -use crate::access_layer::OperationType; +use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory}; +use crate::cache::file_cache::{FileType, IndexKey}; +use crate::cache::write_cache::{UploadTracker, WriteCacheRef}; use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; +use crate::error::Result; +use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::metrics::INDEX_CREATE_MEMORY_USAGE; -use crate::read::Batch; +use crate::read::{Batch, BatchReader}; use crate::region::options::IndexOptions; -use crate::sst::file::IndexType; +use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; +use crate::region::{ManifestContextRef, RegionLeaderState}; +use crate::request::{ + BackgroundNotify, IndexBuildFailed, IndexBuildFinished, WorkerRequest, WorkerRequestWithTime, +}; +use crate::schedule::scheduler::{Job, SchedulerRef}; +use crate::sst::file::{FileHandle, FileMeta, IndexType, RegionFileId}; +use crate::sst::file_purger::FilePurgerRef; use crate::sst::index::fulltext_index::creator::FulltextIndexer; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::inverted_index::creator::InvertedIndexer; +use crate::sst::parquet::SstInfo; pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index"; pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index"; @@ -179,9 +194,9 @@ pub trait IndexerBuilder { /// Builds indexer of given file id to [index_file_path]. async fn build(&self, file_id: FileId) -> Indexer; } - +#[derive(Clone)] pub(crate) struct IndexerBuilderImpl { - pub(crate) op_type: OperationType, + pub(crate) build_type: IndexBuildType, pub(crate) metadata: RegionMetadataRef, pub(crate) row_group_size: usize, pub(crate) puffin_manager: SstPuffinManager, @@ -221,9 +236,10 @@ impl IndexerBuilder for IndexerBuilderImpl { impl IndexerBuilderImpl { fn build_inverted_indexer(&self, file_id: FileId) -> Option { - let create = match self.op_type { - OperationType::Flush => self.inverted_index_config.create_on_flush.auto(), - OperationType::Compact => self.inverted_index_config.create_on_compaction.auto(), + let create = match self.build_type { + IndexBuildType::Flush => self.inverted_index_config.create_on_flush.auto(), + IndexBuildType::Compact => self.inverted_index_config.create_on_compaction.auto(), + _ => true, }; if !create { @@ -281,9 +297,10 @@ impl IndexerBuilderImpl { } async fn build_fulltext_indexer(&self, file_id: FileId) -> Option { - let create = match self.op_type { - OperationType::Flush => self.fulltext_index_config.create_on_flush.auto(), - OperationType::Compact => self.fulltext_index_config.create_on_compaction.auto(), + let create = match self.build_type { + IndexBuildType::Flush => self.fulltext_index_config.create_on_flush.auto(), + IndexBuildType::Compact => self.fulltext_index_config.create_on_compaction.auto(), + _ => true, }; if !create { @@ -334,9 +351,10 @@ impl IndexerBuilderImpl { } fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option { - let create = match self.op_type { - OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(), - OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(), + let create = match self.build_type { + IndexBuildType::Flush => self.bloom_filter_index_config.create_on_flush.auto(), + IndexBuildType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(), + _ => true, }; if !create { @@ -384,11 +402,279 @@ impl IndexerBuilderImpl { } } +/// Type of an index build task. +#[derive(Debug, Clone, PartialEq, IntoStaticStr)] +pub enum IndexBuildType { + /// Build index when schema change. + SchemaChange, + /// Create or update index after flush. + Flush, + /// Create or update index after compact. + Compact, + /// Manually build index. + Manual, +} + +impl IndexBuildType { + fn as_str(&self) -> &'static str { + self.into() + } +} + +impl From for IndexBuildType { + fn from(op_type: OperationType) -> Self { + match op_type { + OperationType::Flush => IndexBuildType::Flush, + OperationType::Compact => IndexBuildType::Compact, + } + } +} + +/// Outcome of an index build task. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum IndexBuildOutcome { + Finished, + Aborted(String), +} + +pub struct IndexBuildTask { + /// The file meta to build index for. + pub file_meta: FileMeta, + pub reason: IndexBuildType, + pub access_layer: AccessLayerRef, + pub(crate) manifest_ctx: ManifestContextRef, + pub write_cache: Option, + pub file_purger: FilePurgerRef, + /// When write cache is enabled, the indexer builder should be built from the write cache. + /// Otherwise, it should be built from the access layer. + pub indexer_builder: Arc, + /// Request sender to notify the region worker. + pub(crate) request_sender: mpsc::Sender, + /// Optional sender to send the result back to the caller. + pub result_sender: Option>, +} + +impl IndexBuildTask { + fn into_index_build_job(mut self, version_control: &VersionControlRef) -> Job { + let version_data = version_control.current(); + + Box::pin(async move { + self.do_index_build(version_data).await; + }) + } + + async fn do_index_build(&mut self, version_data: VersionControlData) { + let outcome = match self.index_build(&version_data).await { + Ok(outcome) => outcome, + Err(e) => { + warn!( + e; "Index build task failed, region: {}, file_id: {}", + self.file_meta.region_id, self.file_meta.file_id, + ); + IndexBuildOutcome::Aborted(format!("Index build failed: {}", e)) + } + }; + if let Some(sender) = self.result_sender.take() { + let _ = sender.send(outcome); + } + } + + // Checks if the SST file still exists in object store and version to avoid conflict with compaction. + async fn check_sst_file_exists(&self, version: &VersionRef) -> bool { + let region_id = self.file_meta.region_id; + let file_id = self.file_meta.file_id; + + let found_in_version = version + .ssts + .levels() + .iter() + .flat_map(|level| level.files.iter()) + .any(|(id, handle)| { + *id == self.file_meta.file_id && !handle.is_deleted() && !handle.compacting() + }); + if !found_in_version { + warn!( + "File id {} not found in region version for index build, region: {}", + file_id, region_id + ); + false + } else { + // If the file's metadata is present in the current version, the physical SST file + // is guaranteed to exist on object store. The file purger removes the physical + // file only after its metadata is removed from the version. + true + } + } + + async fn index_build( + &mut self, + version_data: &VersionControlData, + ) -> Result { + let version = &version_data.version; + let mut indexer = self.indexer_builder.build(self.file_meta.file_id).await; + let mut parquet_reader = self + .access_layer + .read_sst(FileHandle::new( + self.file_meta.clone(), + self.file_purger.clone(), + )) + .build() + .await?; + + // TODO(SNC123): optimize index batch + loop { + match parquet_reader.next_batch().await { + Ok(Some(batch)) => { + indexer.update(&mut batch.clone()).await; + } + Ok(None) => break, + Err(e) => { + indexer.abort().await; + return Err(e); + } + } + } + let index_output = indexer.finish().await; + + if index_output.file_size > 0 { + // Check SST file existence again after building index. + if !self.check_sst_file_exists(version).await { + // Calls abort to clean up index files. + indexer.abort().await; + return Ok(IndexBuildOutcome::Aborted(format!( + "SST file not found during index build, region: {}, file_id: {}", + self.file_meta.region_id, self.file_meta.file_id + ))); + } + + // Upload index file if write cache is enabled. + self.maybe_upload_index_file(index_output.clone()).await?; + + let worker_request = match self.update_manifest(index_output).await { + Ok(edit) => { + let index_build_finished = IndexBuildFinished { + region_id: self.file_meta.region_id, + edit, + }; + WorkerRequest::Background { + region_id: self.file_meta.region_id, + notify: BackgroundNotify::IndexBuildFinished(index_build_finished), + } + } + Err(e) => { + let err = Arc::new(e); + WorkerRequest::Background { + region_id: self.file_meta.region_id, + notify: BackgroundNotify::IndexBuildFailed(IndexBuildFailed { err }), + } + } + }; + + let _ = self + .request_sender + .send(WorkerRequestWithTime::new(worker_request)) + .await; + } + Ok(IndexBuildOutcome::Finished) + } + + async fn maybe_upload_index_file(&self, output: IndexOutput) -> Result<()> { + if let Some(write_cache) = &self.write_cache { + let file_id = self.file_meta.file_id; + let region_id = self.file_meta.region_id; + let remote_store = self.access_layer.object_store(); + let mut upload_tracker = UploadTracker::new(region_id); + let mut err = None; + let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin); + let puffin_path = RegionFilePathFactory::new( + self.access_layer.table_dir().to_string(), + self.access_layer.path_type(), + ) + .build_index_file_path(RegionFileId::new(region_id, file_id)); + if let Err(e) = write_cache + .upload(puffin_key, &puffin_path, remote_store) + .await + { + err = Some(e); + } + upload_tracker.push_uploaded_file(puffin_path); + if let Some(err) = err { + // Cleans index files on failure. + upload_tracker + .clean( + &smallvec![SstInfo { + file_id, + index_metadata: output, + ..Default::default() + }], + &write_cache.file_cache(), + remote_store, + ) + .await; + return Err(err); + } + } else { + debug!("write cache is not available, skip uploading index file"); + } + Ok(()) + } + + async fn update_manifest(&mut self, output: IndexOutput) -> Result { + self.file_meta.available_indexes = output.build_available_indexes(); + self.file_meta.index_file_size = output.file_size; + let edit = RegionEdit { + files_to_add: vec![self.file_meta.clone()], + files_to_remove: vec![], + timestamp_ms: Some(chrono::Utc::now().timestamp_millis()), + flushed_sequence: None, + flushed_entry_id: None, + committed_sequence: None, + compaction_time_window: None, + }; + let version = self + .manifest_ctx + .update_manifest( + RegionLeaderState::Writable, + RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())), + ) + .await?; + info!( + "Successfully update manifest version to {version}, region: {}, reason: {}", + self.file_meta.region_id, + self.reason.as_str() + ); + Ok(edit) + } +} + +#[derive(Clone)] +pub struct IndexBuildScheduler { + scheduler: SchedulerRef, +} + +impl IndexBuildScheduler { + pub fn new(scheduler: SchedulerRef) -> Self { + IndexBuildScheduler { scheduler } + } + + pub(crate) fn schedule_build( + &mut self, + version_control: &VersionControlRef, + task: IndexBuildTask, + ) -> Result<()> { + let job = task.into_index_build_job(version_control); + self.scheduler.schedule(job)?; + Ok(()) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; use api::v1::SemanticType; + use common_base::readable_size::ReadableSize; + use datafusion_common::HashMap; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType, @@ -397,11 +683,21 @@ mod tests { use object_store::services::Memory; use puffin_manager::PuffinManagerFactory; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use tokio::sync::{mpsc, oneshot}; use super::*; - use crate::access_layer::FilePathProvider; - use crate::config::{FulltextIndexConfig, Mode}; + use crate::access_layer::{FilePathProvider, SstWriteRequest, WriteType}; + use crate::cache::write_cache::WriteCache; + use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode}; + use crate::memtable::time_partition::TimePartitions; + use crate::region::version::{VersionBuilder, VersionControl}; use crate::sst::file::RegionFileId; + use crate::sst::file_purger::NoopFilePurger; + use crate::sst::location; + use crate::sst::parquet::WriteOptions; + use crate::test_util::memtable_util::EmptyMemtableBuilder; + use crate::test_util::scheduler_util::SchedulerEnv; + use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata}; struct MetaConfig { with_inverted: bool, @@ -489,7 +785,6 @@ mod tests { async fn mock_intm_mgr(path: impl AsRef) -> IntermediateManager { IntermediateManager::init_fs(path).await.unwrap() } - struct NoopPathProvider; impl FilePathProvider for NoopPathProvider { @@ -502,6 +797,82 @@ mod tests { } } + async fn mock_sst_file( + metadata: RegionMetadataRef, + env: &SchedulerEnv, + build_mode: IndexBuildMode, + ) -> SstInfo { + let source = new_source(&[ + new_batch_by_range(&["a", "d"], 0, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 200), + ]); + let mut index_config = MitoConfig::default().index; + index_config.build_mode = build_mode; + let write_request = SstWriteRequest { + op_type: OperationType::Flush, + metadata: metadata.clone(), + source: either::Left(source), + storage: None, + max_sequence: None, + cache_manager: Default::default(), + index_options: IndexOptions::default(), + index_config, + inverted_index_config: Default::default(), + fulltext_index_config: Default::default(), + bloom_filter_index_config: Default::default(), + }; + env.access_layer + .write_sst(write_request, &WriteOptions::default(), WriteType::Flush) + .await + .unwrap() + .0 + .remove(0) + } + + async fn mock_version_control( + metadata: RegionMetadataRef, + file_purger: FilePurgerRef, + files: HashMap, + ) -> VersionControlRef { + let mutable = Arc::new(TimePartitions::new( + metadata.clone(), + Arc::new(EmptyMemtableBuilder::default()), + 0, + None, + )); + let version_builder = VersionBuilder::new(metadata, mutable) + .add_files(file_purger, files.values().cloned()) + .build(); + Arc::new(VersionControl::new(version_builder)) + } + + async fn mock_indexer_builder( + metadata: RegionMetadataRef, + env: &SchedulerEnv, + ) -> Arc { + let (dir, factory) = PuffinManagerFactory::new_for_test_async("mock_indexer_builder").await; + let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await; + let puffin_manager = factory.build( + env.access_layer.object_store().clone(), + RegionFilePathFactory::new( + env.access_layer.table_dir().to_string(), + env.access_layer.path_type(), + ), + ); + Arc::new(IndexerBuilderImpl { + build_type: IndexBuildType::Flush, + metadata, + row_group_size: 1024, + puffin_manager, + intermediate_manager: intm_manager, + index_options: IndexOptions::default(), + inverted_index_config: InvertedIndexConfig::default(), + fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_index_config: BloomFilterConfig::default(), + }) + } + #[tokio::test] async fn test_build_indexer_basic() { let (dir, factory) = @@ -514,7 +885,7 @@ mod tests { with_skipping_bloom: true, }); let indexer = IndexerBuilderImpl { - op_type: OperationType::Flush, + build_type: IndexBuildType::Flush, metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store(), NoopPathProvider), @@ -544,7 +915,7 @@ mod tests { with_skipping_bloom: true, }); let indexer = IndexerBuilderImpl { - op_type: OperationType::Flush, + build_type: IndexBuildType::Flush, metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store(), NoopPathProvider), @@ -565,7 +936,7 @@ mod tests { assert!(indexer.bloom_filter_indexer.is_some()); let indexer = IndexerBuilderImpl { - op_type: OperationType::Compact, + build_type: IndexBuildType::Compact, metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store(), NoopPathProvider), @@ -586,7 +957,7 @@ mod tests { assert!(indexer.bloom_filter_indexer.is_some()); let indexer = IndexerBuilderImpl { - op_type: OperationType::Compact, + build_type: IndexBuildType::Compact, metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store(), NoopPathProvider), @@ -619,7 +990,7 @@ mod tests { with_skipping_bloom: true, }); let indexer = IndexerBuilderImpl { - op_type: OperationType::Flush, + build_type: IndexBuildType::Flush, metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store(), NoopPathProvider), @@ -642,7 +1013,7 @@ mod tests { with_skipping_bloom: true, }); let indexer = IndexerBuilderImpl { - op_type: OperationType::Flush, + build_type: IndexBuildType::Flush, metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store(), NoopPathProvider), @@ -665,7 +1036,7 @@ mod tests { with_skipping_bloom: false, }); let indexer = IndexerBuilderImpl { - op_type: OperationType::Flush, + build_type: IndexBuildType::Flush, metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store(), NoopPathProvider), @@ -695,7 +1066,7 @@ mod tests { with_skipping_bloom: true, }); let indexer = IndexerBuilderImpl { - op_type: OperationType::Flush, + build_type: IndexBuildType::Flush, metadata, row_group_size: 0, puffin_manager: factory.build(mock_object_store(), NoopPathProvider), @@ -710,4 +1081,321 @@ mod tests { assert!(indexer.inverted_indexer.is_none()); } + + #[tokio::test] + async fn test_index_build_task_sst_not_exist() { + let env = SchedulerEnv::new().await; + let (tx, _rx) = mpsc::channel(4); + let (result_tx, result_rx) = oneshot::channel::(); + let mut scheduler = env.mock_index_build_scheduler(); + let metadata = Arc::new(sst_region_metadata()); + let manifest_ctx = env.mock_manifest_context(metadata.clone()).await; + let file_purger = Arc::new(NoopFilePurger {}); + let files = HashMap::new(); + let version_control = + mock_version_control(metadata.clone(), file_purger.clone(), files).await; + let region_id = metadata.region_id; + let indexer_builder = mock_indexer_builder(metadata, &env).await; + + // Create mock task. + let task = IndexBuildTask { + file_meta: FileMeta { + region_id, + file_id: FileId::random(), + file_size: 100, + ..Default::default() + }, + reason: IndexBuildType::Flush, + access_layer: env.access_layer.clone(), + manifest_ctx, + write_cache: None, + file_purger, + indexer_builder, + request_sender: tx, + result_sender: Some(result_tx), + }; + + // Schedule the build task and check result. + scheduler.schedule_build(&version_control, task).unwrap(); + match result_rx.await.unwrap() { + IndexBuildOutcome::Aborted(_) => {} + _ => panic!("Expect aborted result due to missing SST file"), + } + } + + #[tokio::test] + async fn test_index_build_task_sst_exist() { + let env = SchedulerEnv::new().await; + let mut scheduler = env.mock_index_build_scheduler(); + let metadata = Arc::new(sst_region_metadata()); + let manifest_ctx = env.mock_manifest_context(metadata.clone()).await; + let region_id = metadata.region_id; + let file_purger = Arc::new(NoopFilePurger {}); + let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await; + let file_meta = FileMeta { + region_id, + file_id: sst_info.file_id, + file_size: sst_info.file_size, + index_file_size: sst_info.index_metadata.file_size, + num_rows: sst_info.num_rows as u64, + num_row_groups: sst_info.num_row_groups, + ..Default::default() + }; + let files = HashMap::from([(file_meta.file_id, file_meta.clone())]); + let version_control = + mock_version_control(metadata.clone(), file_purger.clone(), files).await; + let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await; + + // Create mock task. + let (tx, mut rx) = mpsc::channel(4); + let (result_tx, result_rx) = oneshot::channel::(); + let task = IndexBuildTask { + file_meta: file_meta.clone(), + reason: IndexBuildType::Flush, + access_layer: env.access_layer.clone(), + manifest_ctx, + write_cache: None, + file_purger, + indexer_builder, + request_sender: tx, + result_sender: Some(result_tx), + }; + + scheduler.schedule_build(&version_control, task).unwrap(); + + // The task should finish successfully. + assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished); + + // A notification should be sent to the worker to update the manifest. + let worker_req = rx.recv().await.unwrap().request; + match worker_req { + WorkerRequest::Background { + region_id: req_region_id, + notify: BackgroundNotify::IndexBuildFinished(finished), + } => { + assert_eq!(req_region_id, region_id); + assert_eq!(finished.edit.files_to_add.len(), 1); + let updated_meta = &finished.edit.files_to_add[0]; + + // The mock indexer builder creates all index types. + assert!(!updated_meta.available_indexes.is_empty()); + assert!(updated_meta.index_file_size > 0); + assert_eq!(updated_meta.file_id, file_meta.file_id); + } + _ => panic!("Unexpected worker request: {:?}", worker_req), + } + } + + async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) { + let env = SchedulerEnv::new().await; + let mut scheduler = env.mock_index_build_scheduler(); + let metadata = Arc::new(sst_region_metadata()); + let manifest_ctx = env.mock_manifest_context(metadata.clone()).await; + let file_purger = Arc::new(NoopFilePurger {}); + let region_id = metadata.region_id; + let sst_info = mock_sst_file(metadata.clone(), &env, build_mode.clone()).await; + let file_meta = FileMeta { + region_id, + file_id: sst_info.file_id, + file_size: sst_info.file_size, + index_file_size: sst_info.index_metadata.file_size, + num_rows: sst_info.num_rows as u64, + num_row_groups: sst_info.num_row_groups, + ..Default::default() + }; + let files = HashMap::from([(file_meta.file_id, file_meta.clone())]); + let version_control = + mock_version_control(metadata.clone(), file_purger.clone(), files).await; + let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await; + + // Create mock task. + let (tx, _rx) = mpsc::channel(4); + let (result_tx, result_rx) = oneshot::channel::(); + let task = IndexBuildTask { + file_meta: file_meta.clone(), + reason: IndexBuildType::Flush, + access_layer: env.access_layer.clone(), + manifest_ctx, + write_cache: None, + file_purger, + indexer_builder, + request_sender: tx, + result_sender: Some(result_tx), + }; + + scheduler.schedule_build(&version_control, task).unwrap(); + + let puffin_path = location::index_file_path( + env.access_layer.table_dir(), + RegionFileId::new(region_id, file_meta.file_id), + env.access_layer.path_type(), + ); + + if build_mode == IndexBuildMode::Async { + // The index file should not exist before the task finishes. + assert!( + !env.access_layer + .object_store() + .exists(&puffin_path) + .await + .unwrap() + ); + } else { + // The index file should exist before the task finishes. + assert!( + env.access_layer + .object_store() + .exists(&puffin_path) + .await + .unwrap() + ); + } + + // The task should finish successfully. + assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished); + + // The index file should exist after the task finishes. + assert!( + env.access_layer + .object_store() + .exists(&puffin_path) + .await + .unwrap() + ); + } + + #[tokio::test] + async fn test_index_build_task_build_mode() { + schedule_index_build_task_with_mode(IndexBuildMode::Async).await; + schedule_index_build_task_with_mode(IndexBuildMode::Sync).await; + } + + #[tokio::test] + async fn test_index_build_task_no_index() { + let env = SchedulerEnv::new().await; + let mut scheduler = env.mock_index_build_scheduler(); + let mut metadata = sst_region_metadata(); + // Unset indexes in metadata to simulate no index scenario. + metadata.column_metadatas.iter_mut().for_each(|col| { + col.column_schema.set_inverted_index(false); + let _ = col.column_schema.unset_skipping_options(); + }); + let region_id = metadata.region_id; + let metadata = Arc::new(metadata); + let manifest_ctx = env.mock_manifest_context(metadata.clone()).await; + let file_purger = Arc::new(NoopFilePurger {}); + let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await; + let file_meta = FileMeta { + region_id, + file_id: sst_info.file_id, + file_size: sst_info.file_size, + index_file_size: sst_info.index_metadata.file_size, + num_rows: sst_info.num_rows as u64, + num_row_groups: sst_info.num_row_groups, + ..Default::default() + }; + let files = HashMap::from([(file_meta.file_id, file_meta.clone())]); + let version_control = + mock_version_control(metadata.clone(), file_purger.clone(), files).await; + let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await; + + // Create mock task. + let (tx, mut rx) = mpsc::channel(4); + let (result_tx, result_rx) = oneshot::channel::(); + let task = IndexBuildTask { + file_meta: file_meta.clone(), + reason: IndexBuildType::Flush, + access_layer: env.access_layer.clone(), + manifest_ctx, + write_cache: None, + file_purger, + indexer_builder, + request_sender: tx, + result_sender: Some(result_tx), + }; + + scheduler.schedule_build(&version_control, task).unwrap(); + + // The task should finish successfully. + assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished); + + // No index is built, so no notification should be sent to the worker. + let _ = rx.recv().await.is_none(); + } + + #[tokio::test] + async fn test_index_build_task_with_write_cache() { + let env = SchedulerEnv::new().await; + let mut scheduler = env.mock_index_build_scheduler(); + let metadata = Arc::new(sst_region_metadata()); + let manifest_ctx = env.mock_manifest_context(metadata.clone()).await; + let file_purger = Arc::new(NoopFilePurger {}); + let region_id = metadata.region_id; + + let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_write_cache").await; + let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await; + + // Create mock write cache + let write_cache = Arc::new( + WriteCache::new_fs( + dir.path().to_str().unwrap(), + ReadableSize::mb(10), + None, + factory, + intm_manager, + ) + .await + .unwrap(), + ); + // Indexer builder built from write cache. + let indexer_builder = Arc::new(IndexerBuilderImpl { + build_type: IndexBuildType::Flush, + metadata: metadata.clone(), + row_group_size: 1024, + puffin_manager: write_cache.build_puffin_manager().clone(), + intermediate_manager: write_cache.intermediate_manager().clone(), + index_options: IndexOptions::default(), + inverted_index_config: InvertedIndexConfig::default(), + fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_index_config: BloomFilterConfig::default(), + }); + + let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await; + let file_meta = FileMeta { + region_id, + file_id: sst_info.file_id, + file_size: sst_info.file_size, + index_file_size: sst_info.index_metadata.file_size, + num_rows: sst_info.num_rows as u64, + num_row_groups: sst_info.num_row_groups, + ..Default::default() + }; + let files = HashMap::from([(file_meta.file_id, file_meta.clone())]); + let version_control = + mock_version_control(metadata.clone(), file_purger.clone(), files).await; + + // Create mock task. + let (tx, mut _rx) = mpsc::channel(4); + let (result_tx, result_rx) = oneshot::channel::(); + let task = IndexBuildTask { + file_meta: file_meta.clone(), + reason: IndexBuildType::Flush, + access_layer: env.access_layer.clone(), + manifest_ctx, + write_cache: Some(write_cache.clone()), + file_purger, + indexer_builder, + request_sender: tx, + result_sender: Some(result_tx), + }; + + scheduler.schedule_build(&version_control, task).unwrap(); + + // The task should finish successfully. + assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished); + + // The write cache should contain the uploaded index file. + let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Puffin); + assert!(write_cache.file_cache().contains_key(&index_key)); + } } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index ff2a52769d..9b56ffd4ae 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -67,7 +67,7 @@ impl Default for WriteOptions { } /// Parquet SST info returned by the writer. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct SstInfo { /// SST file id. pub file_id: FileId, @@ -110,17 +110,16 @@ mod tests { use tokio_util::compat::FuturesAsyncWriteCompatExt; use super::*; - use crate::access_layer::{ - FilePathProvider, Metrics, OperationType, RegionFilePathFactory, WriteType, - }; + use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType}; use crate::cache::{CacheManager, CacheStrategy, PageKey}; + use crate::config::IndexConfig; use crate::read::{BatchBuilder, BatchReader, FlatSource}; use crate::region::options::{IndexOptions, InvertedIndexOptions}; use crate::sst::file::{FileHandle, FileMeta, RegionFileId}; use crate::sst::file_purger::NoopFilePurger; use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; - use crate::sst::index::{Indexer, IndexerBuilder, IndexerBuilderImpl}; + use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl}; use crate::sst::parquet::format::PrimaryKeyWriteFormat; use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics}; use crate::sst::parquet::writer::ParquetWriter; @@ -183,6 +182,7 @@ mod tests { let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), + IndexConfig::default(), NoopIndexBuilder, file_path, Metrics::new(WriteType::Flush), @@ -244,6 +244,7 @@ mod tests { let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), + IndexConfig::default(), NoopIndexBuilder, FixedPathProvider { region_file_id: handle.file_id(), @@ -329,6 +330,7 @@ mod tests { let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), + IndexConfig::default(), NoopIndexBuilder, FixedPathProvider { region_file_id: handle.file_id(), @@ -377,6 +379,7 @@ mod tests { let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), + IndexConfig::default(), NoopIndexBuilder, FixedPathProvider { region_file_id: handle.file_id(), @@ -435,6 +438,7 @@ mod tests { let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), + IndexConfig::default(), NoopIndexBuilder, FixedPathProvider { region_file_id: handle.file_id(), @@ -478,6 +482,7 @@ mod tests { let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), + IndexConfig::default(), NoopIndexBuilder, FixedPathProvider { region_file_id: handle.file_id(), @@ -635,6 +640,7 @@ mod tests { let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), + IndexConfig::default(), NoopIndexBuilder, path_provider, Metrics::new(WriteType::Flush), @@ -692,7 +698,7 @@ mod tests { let intermediate_manager = env.get_intermediate_manager(); let indexer_builder = IndexerBuilderImpl { - op_type: OperationType::Flush, + build_type: IndexBuildType::Flush, metadata: metadata.clone(), row_group_size, puffin_manager, @@ -711,6 +717,7 @@ mod tests { let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), + IndexConfig::default(), indexer_builder, file_path.clone(), Metrics::new(WriteType::Flush), @@ -1066,7 +1073,7 @@ mod tests { let intermediate_manager = env.get_intermediate_manager(); let indexer_builder = IndexerBuilderImpl { - op_type: OperationType::Flush, + build_type: IndexBuildType::Flush, metadata: metadata.clone(), row_group_size, puffin_manager, @@ -1085,6 +1092,7 @@ mod tests { let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), + IndexConfig::default(), indexer_builder, file_path.clone(), Metrics::new(WriteType::Flush), @@ -1140,6 +1148,7 @@ mod tests { let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), + IndexConfig::default(), NoopIndexBuilder, file_path, Metrics::new(WriteType::Flush), diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 3940e166a7..01e1e95a9c 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -46,12 +46,13 @@ use tokio::io::AsyncWrite; use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt}; use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner}; +use crate::config::{IndexBuildMode, IndexConfig}; use crate::error::{ InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu, }; use crate::read::{Batch, FlatSource, Source}; use crate::sst::file::RegionFileId; -use crate::sst::index::{Indexer, IndexerBuilder}; +use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder}; use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index}; use crate::sst::parquet::format::PrimaryKeyWriteFormat; use crate::sst::parquet::helper::parse_parquet_metadata; @@ -68,6 +69,8 @@ pub struct ParquetWriter { + index_output = self.current_indexer.as_mut().unwrap().finish().await; + } + IndexBuildMode::Async => { + debug!( + "Index for file {} will be built asynchronously later", + self.current_file + ); + } + } current_writer.flush().await.context(WriteParquetSnafu)?; let file_meta = current_writer.close().await.context(WriteParquetSnafu)?; @@ -252,11 +270,16 @@ where stats.update(&batch); let start = Instant::now(); // safety: self.current_indexer must be set when first batch has been written. - self.current_indexer - .as_mut() - .unwrap() - .update(&mut batch) - .await; + match self.index_config.build_mode { + IndexBuildMode::Sync => { + self.current_indexer + .as_mut() + .unwrap() + .update(&mut batch) + .await; + } + IndexBuildMode::Async => {} + } self.metrics.update_index += start.elapsed(); if let Some(max_file_size) = opts.max_file_size && self.bytes_written.load(Ordering::Relaxed) > max_file_size diff --git a/src/mito2/src/sst/version.rs b/src/mito2/src/sst/version.rs index e2cfc859e0..6cae6ce83d 100644 --- a/src/mito2/src/sst/version.rs +++ b/src/mito2/src/sst/version.rs @@ -45,7 +45,8 @@ impl SstVersion { &self.levels } - /// Add files to the version. + /// Add files to the version. If a file with the same `file_id` already exists, + /// it will be overwritten with the new file. /// /// # Panics /// Panics if level of [FileMeta] is greater than [MAX_LEVEL]. @@ -58,8 +59,7 @@ impl SstVersion { let level = file.level; self.levels[level as usize] .files - .entry(file.file_id) - .or_insert_with(|| FileHandle::new(file, file_purger.clone())); + .insert(file.file_id, FileHandle::new(file, file_purger.clone())); } } diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index 90f272c1eb..4e3bc9fa62 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -35,6 +35,7 @@ use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::region::{ManifestContext, ManifestContextRef, RegionLeaderState, RegionRoleState}; use crate::request::WorkerRequestWithTime; use crate::schedule::scheduler::{Job, LocalScheduler, Scheduler, SchedulerRef}; +use crate::sst::index::IndexBuildScheduler; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::worker::WorkerListener; @@ -42,7 +43,7 @@ use crate::worker::WorkerListener; /// Scheduler mocker. pub(crate) struct SchedulerEnv { #[allow(unused)] - path: TempDir, + pub(crate) path: TempDir, /// Mock access layer for test. pub(crate) access_layer: AccessLayerRef, scheduler: Option, @@ -108,6 +109,13 @@ impl SchedulerEnv { FlushScheduler::new(scheduler) } + /// Creates a new index build scheduler. + pub(crate) fn mock_index_build_scheduler(&self) -> IndexBuildScheduler { + let scheduler = self.get_scheduler(); + + IndexBuildScheduler::new(scheduler) + } + /// Creates a new manifest context. pub(crate) async fn mock_manifest_context( &self, diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 9716ce3a7f..9ceb10d2ec 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -24,6 +24,7 @@ mod handle_drop; mod handle_flush; mod handle_manifest; mod handle_open; +mod handle_rebuild_index; mod handle_truncate; mod handle_write; @@ -67,6 +68,7 @@ use crate::request::{ }; use crate::schedule::scheduler::{LocalScheduler, SchedulerRef}; use crate::sst::file_ref::FileReferenceManagerRef; +use crate::sst::index::IndexBuildScheduler; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::time_provider::{StdTimeProvider, TimeProviderRef}; @@ -127,6 +129,8 @@ pub(crate) struct WorkerGroup { flush_job_pool: SchedulerRef, /// Compaction background job pool. compact_job_pool: SchedulerRef, + /// Scheduler for index build jobs. + index_build_job_pool: SchedulerRef, /// Scheduler for file purgers. purge_scheduler: SchedulerRef, /// Cache. @@ -163,6 +167,8 @@ impl WorkerGroup { let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path) .await? .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _)); + let index_build_job_pool = + Arc::new(LocalScheduler::new(config.max_background_index_builds)); let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes)); let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions)); let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes)); @@ -198,6 +204,7 @@ impl WorkerGroup { log_store: log_store.clone(), object_store_manager: object_store_manager.clone(), write_buffer_manager: write_buffer_manager.clone(), + index_build_job_pool: index_build_job_pool.clone(), flush_job_pool: flush_job_pool.clone(), compact_job_pool: compact_job_pool.clone(), purge_scheduler: purge_scheduler.clone(), @@ -222,6 +229,7 @@ impl WorkerGroup { workers, flush_job_pool, compact_job_pool, + index_build_job_pool, purge_scheduler, cache_manager, file_ref_manager, @@ -239,6 +247,8 @@ impl WorkerGroup { self.flush_job_pool.stop(true).await?; // Stops the purge scheduler gracefully. self.purge_scheduler.stop(true).await?; + // Stops the index build job pool gracefully. + self.index_build_job_pool.stop(true).await?; try_join_all(self.workers.iter().map(|worker| worker.stop())).await?; @@ -319,6 +329,8 @@ impl WorkerGroup { .with_notifier(flush_sender.clone()), ) }); + let index_build_job_pool = + Arc::new(LocalScheduler::new(config.max_background_index_builds)); let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes)); let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions)); let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes)); @@ -356,6 +368,7 @@ impl WorkerGroup { log_store: log_store.clone(), object_store_manager: object_store_manager.clone(), write_buffer_manager: write_buffer_manager.clone(), + index_build_job_pool: index_build_job_pool.clone(), flush_job_pool: flush_job_pool.clone(), compact_job_pool: compact_job_pool.clone(), purge_scheduler: purge_scheduler.clone(), @@ -380,6 +393,7 @@ impl WorkerGroup { workers, flush_job_pool, compact_job_pool, + index_build_job_pool, purge_scheduler, cache_manager, file_ref_manager, @@ -437,6 +451,7 @@ struct WorkerStarter { object_store_manager: ObjectStoreManagerRef, write_buffer_manager: WriteBufferManagerRef, compact_job_pool: SchedulerRef, + index_build_job_pool: SchedulerRef, flush_job_pool: SchedulerRef, purge_scheduler: SchedulerRef, listener: WorkerListener, @@ -482,6 +497,7 @@ impl WorkerStarter { ), purge_scheduler: self.purge_scheduler.clone(), write_buffer_manager: self.write_buffer_manager, + index_build_scheduler: IndexBuildScheduler::new(self.index_build_job_pool), flush_scheduler: FlushScheduler::new(self.flush_job_pool), compaction_scheduler: CompactionScheduler::new( self.compact_job_pool, @@ -725,6 +741,8 @@ struct RegionWorkerLoop { purge_scheduler: SchedulerRef, /// Engine write buffer manager. write_buffer_manager: WriteBufferManagerRef, + /// Scheduler for index build task. + index_build_scheduler: IndexBuildScheduler, /// Schedules background flush requests. flush_scheduler: FlushScheduler, /// Scheduler for compaction tasks. @@ -913,6 +931,9 @@ impl RegionWorkerLoop { WorkerRequest::EditRegion(request) => { self.handle_region_edit(request).await; } + WorkerRequest::BuildIndexRegion(request) => { + self.handle_rebuild_index(request).await; + } WorkerRequest::Stop => { debug_assert!(!self.running.load(Ordering::Relaxed)); } @@ -1021,6 +1042,12 @@ impl RegionWorkerLoop { self.handle_flush_finished(region_id, req).await } BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await, + BackgroundNotify::IndexBuildFinished(req) => { + self.handle_index_build_finished(region_id, req).await + } + BackgroundNotify::IndexBuildFailed(req) => { + self.handle_index_build_failed(region_id, req).await + } BackgroundNotify::CompactionFinished(req) => { self.handle_compaction_finished(region_id, req).await } diff --git a/src/mito2/src/worker/handle_rebuild_index.rs b/src/mito2/src/worker/handle_rebuild_index.rs new file mode 100644 index 0000000000..4128e09183 --- /dev/null +++ b/src/mito2/src/worker/handle_rebuild_index.rs @@ -0,0 +1,149 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Handles index build requests. + +use std::collections::HashMap; +use std::sync::Arc; + +use common_telemetry::{error, warn}; +use store_api::storage::{FileId, RegionId}; +use tokio::sync::oneshot; + +use crate::region::MitoRegionRef; +use crate::request::{IndexBuildFailed, IndexBuildFinished, RegionBuildIndexRequest}; +use crate::sst::file::FileHandle; +use crate::sst::index::{IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl}; +use crate::sst::parquet::WriteOptions; +use crate::worker::RegionWorkerLoop; + +impl RegionWorkerLoop { + pub(crate) fn new_index_build_task( + &self, + region: &MitoRegionRef, + file: FileHandle, + build_type: IndexBuildType, + result_sender: Option>, + ) -> IndexBuildTask { + let version = region.version(); + let access_layer = region.access_layer.clone(); + + let puffin_manager = if let Some(write_cache) = self.cache_manager.write_cache() { + write_cache.build_puffin_manager() + } else { + access_layer.build_puffin_manager() + }; + + let intermediate_manager = if let Some(write_cache) = self.cache_manager.write_cache() { + write_cache.intermediate_manager().clone() + } else { + access_layer.intermediate_manager().clone() + }; + + let indexer_builder_ref = Arc::new(IndexerBuilderImpl { + build_type: build_type.clone(), + metadata: version.metadata.clone(), + inverted_index_config: self.config.inverted_index.clone(), + fulltext_index_config: self.config.fulltext_index.clone(), + bloom_filter_index_config: self.config.bloom_filter_index.clone(), + index_options: version.options.index_options.clone(), + row_group_size: WriteOptions::default().row_group_size, + intermediate_manager, + puffin_manager, + }); + + IndexBuildTask { + file_meta: file.meta_ref().clone(), + reason: build_type, + access_layer: access_layer.clone(), + manifest_ctx: region.manifest_ctx.clone(), + write_cache: self.cache_manager.write_cache().cloned(), + file_purger: file.file_purger(), + request_sender: self.sender.clone(), + indexer_builder: indexer_builder_ref.clone(), + result_sender, + } + } + + pub(crate) async fn handle_rebuild_index(&mut self, request: RegionBuildIndexRequest) { + let region_id = request.region_id; + let Some(region) = self.regions.get_region(region_id) else { + return; + }; + + let version_control = region.version_control.clone(); + let version = version_control.current().version; + + let all_files: HashMap = version + .ssts + .levels() + .iter() + .flat_map(|level| level.files.iter()) + .filter(|(_, handle)| !handle.is_deleted() && !handle.compacting()) + .map(|(id, handle)| (*id, handle.clone())) + .collect(); + + let build_tasks = if request.file_metas.is_empty() { + // NOTE: Currently, rebuilding the index will reconstruct the index for all + // files in the region, which is a simplified approach and is not yet available for + // production use; further optimization is required. + all_files.values().cloned().collect::>() + } else { + request + .file_metas + .iter() + .filter_map(|meta| all_files.get(&meta.file_id).cloned()) + .collect::>() + }; + + for file_handle in build_tasks { + let task = + self.new_index_build_task(®ion, file_handle, request.build_type.clone(), None); + let _ = self + .index_build_scheduler + .schedule_build(®ion.version_control, task); + } + } + + pub(crate) async fn handle_index_build_finished( + &mut self, + region_id: RegionId, + request: IndexBuildFinished, + ) { + let region = match self.regions.get_region(region_id) { + Some(region) => region, + None => { + warn!( + "Region not found for index build finished, region_id: {}", + region_id + ); + return; + } + }; + region.version_control.apply_edit( + Some(request.edit.clone()), + &[], + region.file_purger.clone(), + ); + } + + pub(crate) async fn handle_index_build_failed( + &mut self, + region_id: RegionId, + request: IndexBuildFailed, + ) { + error!(request.err; "Index build failed for region: {}", region_id); + // TODO(SNC123): Implement error handling logic after IndexBuildScheduler optimization. + } +} diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 2bae3bf8c8..bba04439bd 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1508,6 +1508,7 @@ enable_experimental_flat_format = false aux_path = "" staging_size = "2GiB" staging_ttl = "7days" +build_mode = "sync" write_buffer_size = "8MiB" content_cache_page_size = "64KiB" @@ -1591,6 +1592,7 @@ fn drop_lines_with_inconsistent_results(input: String) -> String { "result_cache_size =", "name =", "recovery_parallelism =", + "max_background_index_builds =", "max_background_flushes =", "max_background_compactions =", "max_background_purges =",