diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 98d9396bf7..c77d1fd05e 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -22,6 +22,7 @@ use store_api::metadata::RegionMetadataRef; use crate::cache::write_cache::SstUploadRequest; use crate::cache::CacheManagerRef; +use crate::config::InvertedIndexConfig; use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result}; use crate::read::Source; use crate::region::options::IndexOptions; @@ -141,19 +142,20 @@ impl AccessLayer { .await? } else { // Write cache is disabled. + let store = self.object_store.clone(); let indexer = IndexerBuilder { - create_inverted_index: request.create_inverted_index, - mem_threshold_index_create: request.mem_threshold_index_create, - write_buffer_size: request.index_write_buffer_size, + op_type: request.op_type, file_id, file_path: index_file_path, metadata: &request.metadata, row_group_size: write_opts.row_group_size, - object_store: self.object_store.clone(), + puffin_manager: self.puffin_manager_factory.build(store), intermediate_manager: self.intermediate_manager.clone(), index_options: request.index_options, + inverted_index_config: request.inverted_index_config, } - .build(); + .build() + .await; let mut writer = ParquetWriter::new_with_object_store( self.object_store.clone(), file_path, @@ -182,22 +184,26 @@ impl AccessLayer { } } +/// `OperationType` represents the origin of the `SstWriteRequest`. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) enum OperationType { + Flush, + Compact, +} + /// Contents to build a SST. pub(crate) struct SstWriteRequest { + pub(crate) op_type: OperationType, pub(crate) file_id: FileId, pub(crate) metadata: RegionMetadataRef, pub(crate) source: Source, pub(crate) cache_manager: CacheManagerRef, #[allow(dead_code)] pub(crate) storage: Option, - /// Whether to create inverted index. - pub(crate) create_inverted_index: bool, - /// The threshold of memory size to create inverted index. - pub(crate) mem_threshold_index_create: Option, - /// The size of write buffer for index. - pub(crate) index_write_buffer_size: Option, - /// The options of the index for the region. + + /// Configs for index pub(crate) index_options: IndexOptions, + pub(crate) inverted_index_config: InvertedIndexConfig, } /// Creates a fs object store with atomic write dir. diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index f2731e25d0..c47846fd4d 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -29,6 +29,7 @@ use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, Inde use crate::error::{self, Result}; use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL}; use crate::sst::index::intermediate::IntermediateManager; +use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::index::IndexerBuilder; use crate::sst::parquet::writer::ParquetWriter; use crate::sst::parquet::{SstInfo, WriteOptions}; @@ -44,7 +45,9 @@ pub struct WriteCache { #[allow(unused)] /// TODO: Remove unused after implementing async write cache object_store_manager: ObjectStoreManagerRef, - /// Intermediate manager for inverted index. + /// Puffin manager factory for index. + puffin_manager_factory: PuffinManagerFactory, + /// Intermediate manager for index. intermediate_manager: IntermediateManager, } @@ -58,6 +61,7 @@ impl WriteCache { object_store_manager: ObjectStoreManagerRef, cache_capacity: ReadableSize, ttl: Option, + puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, ) -> Result { let file_cache = FileCache::new(local_store, cache_capacity, ttl); @@ -66,6 +70,7 @@ impl WriteCache { Ok(Self { file_cache: Arc::new(file_cache), object_store_manager, + puffin_manager_factory, intermediate_manager, }) } @@ -76,6 +81,7 @@ impl WriteCache { object_store_manager: ObjectStoreManagerRef, cache_capacity: ReadableSize, ttl: Option, + puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, ) -> Result { info!("Init write cache on {cache_dir}, capacity: {cache_capacity}"); @@ -86,6 +92,7 @@ impl WriteCache { object_store_manager, cache_capacity, ttl, + puffin_manager_factory, intermediate_manager, ) .await @@ -112,19 +119,20 @@ impl WriteCache { let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet); let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin); + let store = self.file_cache.local_store(); let indexer = IndexerBuilder { - create_inverted_index: write_request.create_inverted_index, - mem_threshold_index_create: write_request.mem_threshold_index_create, - write_buffer_size: write_request.index_write_buffer_size, + op_type: write_request.op_type, file_id, file_path: self.file_cache.cache_file_path(puffin_key), metadata: &write_request.metadata, row_group_size: write_opts.row_group_size, - object_store: self.file_cache.local_store(), + puffin_manager: self.puffin_manager_factory.build(store), intermediate_manager: self.intermediate_manager.clone(), index_options: write_request.index_options, + inverted_index_config: write_request.inverted_index_config, } - .build(); + .build() + .await; // Write to FileCache. let mut writer = ParquetWriter::new_with_object_store( @@ -148,7 +156,7 @@ impl WriteCache { let remote_store = &upload_request.remote_store; self.upload(parquet_key, parquet_path, remote_store).await?; - if sst_info.inverted_index_available { + if sst_info.index_metadata.file_size > 0 { let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin); let puffin_path = &upload_request.index_upload_path; self.upload(puffin_key, puffin_path, remote_store).await?; @@ -251,6 +259,7 @@ mod tests { use common_test_util::temp_dir::create_temp_dir; use super::*; + use crate::access_layer::OperationType; use crate::cache::test_util::new_fs_store; use crate::cache::CacheManager; use crate::region::options::IndexOptions; @@ -290,15 +299,14 @@ mod tests { ]); let write_request = SstWriteRequest { + op_type: OperationType::Flush, file_id, metadata, source, storage: None, - create_inverted_index: true, - mem_threshold_index_create: None, - index_write_buffer_size: None, cache_manager: Default::default(), index_options: IndexOptions::default(), + inverted_index_config: Default::default(), }; let upload_request = SstUploadRequest { @@ -375,15 +383,14 @@ mod tests { // Write to local cache and upload sst to mock remote store let write_request = SstWriteRequest { + op_type: OperationType::Flush, file_id, metadata, source, storage: None, - create_inverted_index: false, - mem_threshold_index_create: None, - index_write_buffer_size: None, cache_manager: cache_manager.clone(), index_options: IndexOptions::default(), + inverted_index_config: Default::default(), }; let write_opts = WriteOptions { row_group_size: 512, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index a303367a34..b6821006ef 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -24,7 +24,7 @@ use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; -use crate::access_layer::{AccessLayer, AccessLayerRef, SstWriteRequest}; +use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest}; use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::picker::{new_picker, PickerOutput}; use crate::compaction::CompactionSstReaderBuilder; @@ -260,23 +260,6 @@ impl Compactor for DefaultCompactor { write_buffer_size: compaction_region.engine_config.sst_write_buffer_size, ..Default::default() }; - let create_inverted_index = compaction_region - .engine_config - .inverted_index - .create_on_compaction - .auto(); - let mem_threshold_index_create = compaction_region - .engine_config - .inverted_index - .mem_threshold_on_create - .map(|m| m.as_bytes() as _); - let index_write_buffer_size = Some( - compaction_region - .engine_config - .index - .write_buffer_size - .as_bytes() as usize, - ); let region_metadata = compaction_region.region_metadata.clone(); let sst_layer = compaction_region.access_layer.clone(); @@ -291,6 +274,7 @@ impl Compactor for DefaultCompactor { .clone(); let append_mode = compaction_region.current_version.options.append_mode; let merge_mode = compaction_region.current_version.options.merge_mode(); + let inverted_index_config = compaction_region.engine_config.inverted_index.clone(); futs.push(async move { let reader = CompactionSstReaderBuilder { metadata: region_metadata.clone(), @@ -307,15 +291,14 @@ impl Compactor for DefaultCompactor { let file_meta_opt = sst_layer .write_sst( SstWriteRequest { + op_type: OperationType::Compact, file_id, metadata: region_metadata, source: Source::Reader(reader), cache_manager, storage, - create_inverted_index, - mem_threshold_index_create, - index_write_buffer_size, index_options, + inverted_index_config, }, &write_opts, ) @@ -326,11 +309,14 @@ impl Compactor for DefaultCompactor { time_range: sst_info.time_range, level: output.output_level, file_size: sst_info.file_size, - available_indexes: sst_info - .inverted_index_available - .then(|| SmallVec::from_iter([IndexType::InvertedIndex])) - .unwrap_or_default(), - index_file_size: sst_info.index_file_size, + available_indexes: { + let mut indexes = SmallVec::new(); + if sst_info.index_metadata.inverted_index.is_available() { + indexes.push(IndexType::InvertedIndex); + } + indexes + }, + index_file_size: sst_info.index_metadata.file_size, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, }); diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 04d085dda8..334d2de752 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -354,6 +354,9 @@ pub struct InvertedIndexConfig { #[serde_as(as = "NoneAsEmptyString")] pub mem_threshold_on_create: Option, + /// Whether to compress the index data. + pub compress: bool, + #[deprecated = "use [IndexConfig::aux_path] instead"] #[serde(skip_serializing)] pub intermediate_path: String, @@ -370,8 +373,10 @@ impl Default for InvertedIndexConfig { create_on_flush: Mode::Auto, create_on_compaction: Mode::Auto, apply_on_query: Mode::Auto, - write_buffer_size: ReadableSize::mb(8), + compress: true, mem_threshold_on_create: Some(ReadableSize::mb(64)), + + write_buffer_size: ReadableSize::mb(8), intermediate_path: String::new(), } } diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 9179d8a074..1d598efcb4 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -580,7 +580,7 @@ async fn test_region_usage() { flush_region(&engine, region_id, None).await; let region_stat = region.region_usage(); - assert_eq!(region_stat.sst_usage, 3010); + assert_eq!(region_stat.sst_usage, 3026); // region total usage // Some memtables may share items. diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index ed665e445c..35da912b7a 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -597,13 +597,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to write puffin completely"))] - PuffinFinish { - source: puffin::error::Error, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to add blob to puffin file"))] PuffinAddBlob { source: puffin::error::Error, @@ -891,7 +884,6 @@ impl ErrorExt for Error { | IndexFinish { source, .. } => source.status_code(), PuffinReadMetadata { source, .. } | PuffinReadBlob { source, .. } - | PuffinFinish { source, .. } | PuffinAddBlob { source, .. } | PuffinInitStager { source, .. } | PuffinBuildReader { source, .. } => source.status_code(), diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 2d573b423b..1bd7c078b0 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -25,7 +25,7 @@ use store_api::storage::RegionId; use strum::IntoStaticStr; use tokio::sync::mpsc; -use crate::access_layer::{AccessLayerRef, SstWriteRequest}; +use crate::access_layer::{AccessLayerRef, OperationType, SstWriteRequest}; use crate::cache::CacheManagerRef; use crate::config::MitoConfig; use crate::error::{ @@ -321,26 +321,17 @@ impl RegionFlushTask { let file_id = FileId::random(); let iter = mem.iter(None, None)?; let source = Source::Iter(iter); - let create_inverted_index = self.engine_config.inverted_index.create_on_flush.auto(); - let mem_threshold_index_create = self - .engine_config - .inverted_index - .mem_threshold_on_create - .map(|m| m.as_bytes() as _); - let index_write_buffer_size = - Some(self.engine_config.index.write_buffer_size.as_bytes() as usize); // Flush to level 0. let write_request = SstWriteRequest { + op_type: OperationType::Flush, file_id, metadata: version.metadata.clone(), source, cache_manager: self.cache_manager.clone(), storage: version.options.storage.clone(), - create_inverted_index, - mem_threshold_index_create, - index_write_buffer_size, index_options: self.index_options.clone(), + inverted_index_config: self.engine_config.inverted_index.clone(), }; let Some(sst_info) = self .access_layer @@ -358,11 +349,14 @@ impl RegionFlushTask { time_range: sst_info.time_range, level: 0, file_size: sst_info.file_size, - available_indexes: sst_info - .inverted_index_available - .then(|| SmallVec::from_iter([IndexType::InvertedIndex])) - .unwrap_or_default(), - index_file_size: sst_info.index_file_size, + available_indexes: { + let mut indexes = SmallVec::new(); + if sst_info.index_metadata.inverted_index.is_available() { + indexes.push(IndexType::InvertedIndex); + } + indexes + }, + index_file_size: sst_info.index_metadata.file_size, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, }; diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index c25a040295..48afb2d009 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -45,8 +45,8 @@ use crate::read::{Batch, Source}; use crate::region::options::MergeMode; use crate::region::version::VersionRef; use crate::sst::file::{overlaps, FileHandle, FileMeta}; -use crate::sst::index::applier::builder::SstIndexApplierBuilder; -use crate::sst::index::applier::SstIndexApplierRef; +use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder; +use crate::sst::index::inverted_index::applier::SstIndexApplierRef; use crate::sst::parquet::file_range::FileRange; /// A scanner scans a region and returns a [SendableRecordBatchStream]. diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 5bfee47ef7..50d40f0894 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -12,151 +12,151 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod applier; -mod codec; -pub(crate) mod creator; +mod indexer; pub(crate) mod intermediate; +pub(crate) mod inverted_index; pub(crate) mod puffin_manager; +mod statistics; mod store; use std::num::NonZeroUsize; use common_telemetry::{debug, warn}; -use creator::SstIndexCreator; -use object_store::ObjectStore; +use puffin::puffin_manager::PuffinManager; +use puffin_manager::{SstPuffinManager, SstPuffinWriter}; +use statistics::{ByteCount, RowCount}; use store_api::metadata::RegionMetadataRef; -use store_api::storage::RegionId; +use store_api::storage::{ColumnId, RegionId}; +use crate::access_layer::OperationType; +use crate::config::InvertedIndexConfig; use crate::metrics::INDEX_CREATE_MEMORY_USAGE; use crate::read::Batch; use crate::region::options::IndexOptions; use crate::sst::file::FileId; use crate::sst::index::intermediate::IntermediateManager; +use crate::sst::index::inverted_index::creator::SstIndexCreator as InvertedIndexer; -const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1"; +/// Output of the index creation. +#[derive(Debug, Clone, Default)] +pub struct IndexOutput { + /// Size of the file. + pub file_size: u64, + /// Inverted index output. + pub inverted_index: InvertedIndexOutput, +} + +/// Output of the inverted index creation. +#[derive(Debug, Clone, Default)] +pub struct InvertedIndexOutput { + /// Size of the index. + pub index_size: ByteCount, + /// Number of rows in the index. + pub row_count: RowCount, + /// Available columns in the index. + pub columns: Vec, +} + +impl InvertedIndexOutput { + pub fn is_available(&self) -> bool { + self.index_size > 0 + } +} /// The index creator that hides the error handling details. #[derive(Default)] pub struct Indexer { file_id: FileId, region_id: RegionId, - inner: Option, last_memory_usage: usize, + + inverted_indexer: Option, + puffin_writer: Option, } impl Indexer { - /// Update the index with the given batch. + /// Updates the index with the given batch. pub async fn update(&mut self, batch: &Batch) { - if let Some(creator) = self.inner.as_mut() { - if let Err(err) = creator.update(batch).await { - if cfg!(any(test, feature = "test")) { - panic!( - "Failed to update index, region_id: {}, file_id: {}, err: {}", - self.region_id, self.file_id, err - ); - } else { - warn!( - err; "Failed to update index, skip creating index, region_id: {}, file_id: {}", - self.region_id, self.file_id, - ); - } + self.do_update(batch).await; - // Skip index creation if error occurs. - self.inner = None; - } - } - - if let Some(creator) = self.inner.as_ref() { - let memory_usage = creator.memory_usage(); - INDEX_CREATE_MEMORY_USAGE.add(memory_usage as i64 - self.last_memory_usage as i64); - self.last_memory_usage = memory_usage; - } else { - INDEX_CREATE_MEMORY_USAGE.sub(self.last_memory_usage as i64); - self.last_memory_usage = 0; - } + let memory_usage = self.memory_usage(); + INDEX_CREATE_MEMORY_USAGE.add(memory_usage as i64 - self.last_memory_usage as i64); + self.last_memory_usage = memory_usage; } - /// Finish the index creation. - /// Returns the number of bytes written if success or None if failed. - pub async fn finish(&mut self) -> Option { - if let Some(mut creator) = self.inner.take() { - match creator.finish().await { - Ok((row_count, byte_count)) => { - debug!( - "Create index successfully, region_id: {}, file_id: {}, bytes: {}, rows: {}", - self.region_id, self.file_id, byte_count, row_count - ); - - INDEX_CREATE_MEMORY_USAGE.sub(self.last_memory_usage as i64); - self.last_memory_usage = 0; - return Some(byte_count); - } - Err(err) => { - if cfg!(any(test, feature = "test")) { - panic!( - "Failed to create index, region_id: {}, file_id: {}, err: {}", - self.region_id, self.file_id, err - ); - } else { - warn!( - err; "Failed to create index, region_id: {}, file_id: {}", - self.region_id, self.file_id, - ); - } - } - } - } - + /// Finalizes the index creation. + pub async fn finish(&mut self) -> IndexOutput { INDEX_CREATE_MEMORY_USAGE.sub(self.last_memory_usage as i64); self.last_memory_usage = 0; - None + + self.do_finish().await } - /// Abort the index creation. + /// Aborts the index creation. pub async fn abort(&mut self) { - if let Some(mut creator) = self.inner.take() { - if let Err(err) = creator.abort().await { - if cfg!(any(test, feature = "test")) { - panic!( - "Failed to abort index, region_id: {}, file_id: {}, err: {}", - self.region_id, self.file_id, err - ); - } else { - warn!( - err; "Failed to abort index, region_id: {}, file_id: {}", - self.region_id, self.file_id, - ); - } - } - } INDEX_CREATE_MEMORY_USAGE.sub(self.last_memory_usage as i64); self.last_memory_usage = 0; + + self.do_abort().await; + } + + fn memory_usage(&self) -> usize { + self.inverted_indexer + .as_ref() + .map_or(0, |creator| creator.memory_usage()) } } pub(crate) struct IndexerBuilder<'a> { - pub(crate) create_inverted_index: bool, - pub(crate) mem_threshold_index_create: Option, - pub(crate) write_buffer_size: Option, + pub(crate) op_type: OperationType, pub(crate) file_id: FileId, pub(crate) file_path: String, pub(crate) metadata: &'a RegionMetadataRef, pub(crate) row_group_size: usize, - pub(crate) object_store: ObjectStore, + pub(crate) puffin_manager: SstPuffinManager, pub(crate) intermediate_manager: IntermediateManager, pub(crate) index_options: IndexOptions, + pub(crate) inverted_index_config: InvertedIndexConfig, } impl<'a> IndexerBuilder<'a> { - /// Sanity check for arguments and create a new [Indexer] - /// with inner [SstIndexCreator] if arguments are valid. - pub(crate) fn build(self) -> Indexer { - if !self.create_inverted_index { + /// Sanity check for arguments and create a new [Indexer] if arguments are valid. + pub(crate) async fn build(self) -> Indexer { + let mut indexer = Indexer { + file_id: self.file_id, + region_id: self.metadata.region_id, + last_memory_usage: 0, + + ..Default::default() + }; + + indexer.inverted_indexer = self.build_inverted_indexer(); + if indexer.inverted_indexer.is_none() { + indexer.abort().await; + return Indexer::default(); + } + + indexer.puffin_writer = self.build_puffin_writer().await; + if indexer.puffin_writer.is_none() { + indexer.abort().await; + return Indexer::default(); + } + + indexer + } + + fn build_inverted_indexer(&self) -> Option { + let create = match self.op_type { + OperationType::Flush => self.inverted_index_config.create_on_flush.auto(), + OperationType::Compact => self.inverted_index_config.create_on_compaction.auto(), + }; + + if !create { debug!( - "Skip creating index due to request, region_id: {}, file_id: {}", + "Skip creating inverted index due to config, region_id: {}, file_id: {}", self.metadata.region_id, self.file_id, ); - return Indexer::default(); + return None; } if self.metadata.primary_key.is_empty() { @@ -164,7 +164,7 @@ impl<'a> IndexerBuilder<'a> { "No tag columns, skip creating index, region_id: {}, file_id: {}", self.metadata.region_id, self.file_id, ); - return Indexer::default(); + return None; } let Some(mut segment_row_count) = @@ -174,7 +174,7 @@ impl<'a> IndexerBuilder<'a> { "Segment row count is 0, skip creating index, region_id: {}, file_id: {}", self.metadata.region_id, self.file_id, ); - return Indexer::default(); + return None; }; let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else { @@ -182,7 +182,7 @@ impl<'a> IndexerBuilder<'a> { "Row group size is 0, skip creating index, region_id: {}, file_id: {}", self.metadata.region_id, self.file_id, ); - return Indexer::default(); + return None; }; // if segment row count not aligned with row group size, adjust it to be aligned. @@ -190,31 +190,43 @@ impl<'a> IndexerBuilder<'a> { segment_row_count = row_group_size; } - let creator = SstIndexCreator::new( - self.file_path, + let mem_threshold = self + .inverted_index_config + .mem_threshold_on_create + .map(|t| t.as_bytes() as usize); + + let indexer = InvertedIndexer::new( self.file_id, self.metadata, - self.object_store, - self.intermediate_manager, - self.mem_threshold_index_create, + self.intermediate_manager.clone(), + mem_threshold, segment_row_count, - ) - .with_buffer_size(self.write_buffer_size) - .with_ignore_column_ids( - self.index_options - .inverted_index - .ignore_column_ids - .iter() - .map(|i| i.to_string()) - .collect(), + self.inverted_index_config.compress, + &self.index_options.inverted_index.ignore_column_ids, ); - Indexer { - file_id: self.file_id, - region_id: self.metadata.region_id, - inner: Some(creator), - last_memory_usage: 0, + Some(indexer) + } + + async fn build_puffin_writer(&self) -> Option { + let err = match self.puffin_manager.writer(&self.file_path).await { + Ok(writer) => return Some(writer), + Err(err) => err, + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to create puffin writer, region_id: {}, file_id: {}, err: {}", + self.metadata.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to create puffin writer, region_id: {}, file_id: {}", + self.metadata.region_id, self.file_id, + ); } + + None } } @@ -226,9 +238,12 @@ mod tests { use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; use object_store::services::Memory; + use object_store::ObjectStore; + use puffin_manager::PuffinManagerFactory; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use super::*; + use crate::config::Mode; fn mock_region_metadata() -> RegionMetadataRef { let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2)); @@ -291,83 +306,102 @@ mod tests { IntermediateManager::new(mock_object_store()) } - #[test] - fn test_build_indexer_basic() { + #[tokio::test] + async fn test_build_indexer_basic() { + let (_d, factory) = + PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await; + let store = mock_object_store(); + let puffin_manager = factory.build(store); let metadata = mock_region_metadata(); let indexer = IndexerBuilder { - create_inverted_index: true, - mem_threshold_index_create: Some(1024), - write_buffer_size: None, + op_type: OperationType::Flush, file_id: FileId::random(), file_path: "test".to_string(), metadata: &metadata, row_group_size: 1024, - object_store: mock_object_store(), + puffin_manager, intermediate_manager: mock_intm_mgr(), index_options: IndexOptions::default(), + inverted_index_config: InvertedIndexConfig::default(), } - .build(); + .build() + .await; - assert!(indexer.inner.is_some()); + assert!(indexer.inverted_indexer.is_some()); } - #[test] - fn test_build_indexer_disable_create() { + #[tokio::test] + async fn test_build_indexer_disable_create() { + let (_d, factory) = + PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await; + let store = mock_object_store(); + let puffin_manager = factory.build(store); let metadata = mock_region_metadata(); let indexer = IndexerBuilder { - create_inverted_index: false, - mem_threshold_index_create: Some(1024), - write_buffer_size: None, + op_type: OperationType::Flush, file_id: FileId::random(), file_path: "test".to_string(), metadata: &metadata, row_group_size: 1024, - object_store: mock_object_store(), + puffin_manager, intermediate_manager: mock_intm_mgr(), index_options: IndexOptions::default(), + inverted_index_config: InvertedIndexConfig { + create_on_flush: Mode::Disable, + ..Default::default() + }, } - .build(); + .build() + .await; - assert!(indexer.inner.is_none()); + assert!(indexer.inverted_indexer.is_none()); } - #[test] - fn test_build_indexer_no_tag() { + #[tokio::test] + async fn test_build_indexer_no_tag() { + let (_d, factory) = + PuffinManagerFactory::new_for_test_async("test_build_indexer_no_tag_").await; + let store = mock_object_store(); + let puffin_manager = factory.build(store); let metadata = no_tag_region_metadata(); let indexer = IndexerBuilder { - create_inverted_index: true, - mem_threshold_index_create: Some(1024), - write_buffer_size: None, + op_type: OperationType::Flush, file_id: FileId::random(), file_path: "test".to_string(), metadata: &metadata, row_group_size: 1024, - object_store: mock_object_store(), + puffin_manager, intermediate_manager: mock_intm_mgr(), index_options: IndexOptions::default(), + inverted_index_config: InvertedIndexConfig::default(), } - .build(); + .build() + .await; - assert!(indexer.inner.is_none()); + assert!(indexer.inverted_indexer.is_none()); } - #[test] - fn test_build_indexer_zero_row_group() { + #[tokio::test] + async fn test_build_indexer_zero_row_group() { + let (_d, factory) = + PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await; + let store = mock_object_store(); + let puffin_manager = factory.build(store); let metadata = mock_region_metadata(); let indexer = IndexerBuilder { - create_inverted_index: true, - mem_threshold_index_create: Some(1024), - write_buffer_size: None, + op_type: OperationType::Flush, file_id: FileId::random(), file_path: "test".to_string(), metadata: &metadata, row_group_size: 0, - object_store: mock_object_store(), + puffin_manager, intermediate_manager: mock_intm_mgr(), index_options: IndexOptions::default(), + inverted_index_config: InvertedIndexConfig::default(), } - .build(); + .build() + .await; - assert!(indexer.inner.is_none()); + assert!(indexer.inverted_indexer.is_none()); } } diff --git a/src/mito2/src/sst/index/indexer.rs b/src/mito2/src/sst/index/indexer.rs new file mode 100644 index 0000000000..15d9ca1845 --- /dev/null +++ b/src/mito2/src/sst/index/indexer.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod abort; +mod finish; +mod update; diff --git a/src/mito2/src/sst/index/indexer/abort.rs b/src/mito2/src/sst/index/indexer/abort.rs new file mode 100644 index 0000000000..2e7afe5d39 --- /dev/null +++ b/src/mito2/src/sst/index/indexer/abort.rs @@ -0,0 +1,69 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::warn; +use puffin::puffin_manager::PuffinWriter; + +use crate::sst::index::Indexer; + +impl Indexer { + pub(crate) async fn do_abort(&mut self) { + self.do_abort_inverted_index().await; + self.do_abort_puffin_writer().await; + } + + async fn do_abort_inverted_index(&mut self) { + let Some(mut indexer) = self.inverted_indexer.take() else { + return; + }; + let Err(err) = indexer.abort().await else { + return; + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to abort inverted index, region_id: {}, file_id: {}, err: {}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to abort inverted index, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + } + + async fn do_abort_puffin_writer(&mut self) { + let Some(puffin_writer) = self.puffin_writer.take() else { + return; + }; + + let err = match puffin_writer.finish().await { + Ok(_) => return, + Err(err) => err, + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to abort puffin writer, region_id: {}, file_id: {}, err: {}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to abort puffin writer, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + } +} diff --git a/src/mito2/src/sst/index/indexer/finish.rs b/src/mito2/src/sst/index/indexer/finish.rs new file mode 100644 index 0000000000..31ec0c0e52 --- /dev/null +++ b/src/mito2/src/sst/index/indexer/finish.rs @@ -0,0 +1,118 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::{debug, warn}; +use puffin::puffin_manager::PuffinWriter; + +use crate::sst::index::inverted_index::creator::SstIndexCreator as InvertedIndexer; +use crate::sst::index::puffin_manager::SstPuffinWriter; +use crate::sst::index::statistics::{ByteCount, RowCount}; +use crate::sst::index::{IndexOutput, Indexer, InvertedIndexOutput}; + +impl Indexer { + pub(crate) async fn do_finish(&mut self) -> IndexOutput { + let mut output = IndexOutput::default(); + + let Some(mut writer) = self.puffin_writer.take() else { + return output; + }; + + let success = self + .do_finish_inverted_index(&mut writer, &mut output) + .await; + if !success { + self.do_abort().await; + return IndexOutput::default(); + } + + output.file_size = self.do_finish_puffin_writer(writer).await; + output + } + + async fn do_finish_puffin_writer(&mut self, writer: SstPuffinWriter) -> ByteCount { + let err = match writer.finish().await { + Ok(size) => return size, + Err(err) => err, + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to finish puffin writer, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + 0 + } + + /// Returns false if the finish failed. + async fn do_finish_inverted_index( + &mut self, + puffin_writer: &mut SstPuffinWriter, + index_output: &mut IndexOutput, + ) -> bool { + let Some(mut indexer) = self.inverted_indexer.take() else { + return true; + }; + + let err = match indexer.finish(puffin_writer).await { + Ok((row_count, byte_count)) => { + self.fill_inverted_index_output( + &mut index_output.inverted_index, + row_count, + byte_count, + &indexer, + ); + return true; + } + Err(err) => err, + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to finish inverted index, region_id: {}, file_id: {}, err: {}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to finish inverted index, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + false + } + + fn fill_inverted_index_output( + &mut self, + output: &mut InvertedIndexOutput, + row_count: RowCount, + byte_count: ByteCount, + indexer: &InvertedIndexer, + ) { + debug!( + "Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}", + self.region_id, self.file_id, byte_count, row_count + ); + + output.index_size = byte_count; + output.row_count = row_count; + output.columns = indexer.column_ids().collect(); + } +} diff --git a/src/mito2/src/sst/index/indexer/update.rs b/src/mito2/src/sst/index/indexer/update.rs new file mode 100644 index 0000000000..42302d83a7 --- /dev/null +++ b/src/mito2/src/sst/index/indexer/update.rs @@ -0,0 +1,55 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::warn; + +use crate::read::Batch; +use crate::sst::index::Indexer; + +impl Indexer { + pub(crate) async fn do_update(&mut self, batch: &Batch) { + if batch.is_empty() { + return; + } + + if !self.do_update_inverted_index(batch).await { + self.do_abort().await; + } + } + + /// Returns false if the update failed. + async fn do_update_inverted_index(&mut self, batch: &Batch) -> bool { + let Some(creator) = self.inverted_indexer.as_mut() else { + return true; + }; + + let Err(err) = creator.update(batch).await else { + return true; + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to update inverted index, region_id: {}, file_id: {}, err: {}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to update inverted index, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + false + } +} diff --git a/src/mito2/src/sst/index/inverted_index.rs b/src/mito2/src/sst/index/inverted_index.rs new file mode 100644 index 0000000000..d325f735a4 --- /dev/null +++ b/src/mito2/src/sst/index/inverted_index.rs @@ -0,0 +1,19 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) mod applier; +mod codec; +pub(crate) mod creator; + +const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1"; diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs similarity index 99% rename from src/mito2/src/sst/index/applier.rs rename to src/mito2/src/sst/index/inverted_index/applier.rs index d99d5ea8cd..7463f6011f 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -30,8 +30,8 @@ use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::error::{ApplyIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result}; use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE}; use crate::sst::file::FileId; +use crate::sst::index::inverted_index::INDEX_BLOB_TYPE; use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; -use crate::sst::index::INDEX_BLOB_TYPE; use crate::sst::location; /// The [`SstIndexApplier`] is responsible for applying predicates to the provided SST files diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/inverted_index/applier/builder.rs similarity index 98% rename from src/mito2/src/sst/index/applier/builder.rs rename to src/mito2/src/sst/index/inverted_index/applier/builder.rs index 1a4c1735ab..3dcb5c0ec8 100644 --- a/src/mito2/src/sst/index/applier/builder.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder.rs @@ -36,8 +36,8 @@ use store_api::storage::ColumnId; use crate::cache::file_cache::FileCacheRef; use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result}; use crate::row_converter::SortField; -use crate::sst::index::applier::SstIndexApplier; -use crate::sst::index::codec::IndexValueCodec; +use crate::sst::index::inverted_index::applier::SstIndexApplier; +use crate::sst::index::inverted_index::codec::IndexValueCodec; use crate::sst::index::puffin_manager::PuffinManagerFactory; /// Constructs an [`SstIndexApplier`] which applies predicates to SST files during scan. diff --git a/src/mito2/src/sst/index/applier/builder/between.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs similarity index 97% rename from src/mito2/src/sst/index/applier/builder/between.rs rename to src/mito2/src/sst/index/inverted_index/applier/builder/between.rs index 00740c8521..c35736d42b 100644 --- a/src/mito2/src/sst/index/applier/builder/between.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs @@ -16,7 +16,7 @@ use datafusion_expr::Between; use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePredicate}; use crate::error::Result; -use crate::sst::index::applier::builder::SstIndexApplierBuilder; +use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder; impl<'a> SstIndexApplierBuilder<'a> { /// Collects a `BETWEEN` expression in the form of `column BETWEEN lit AND lit`. @@ -62,7 +62,7 @@ mod tests { use super::*; use crate::error::Error; - use crate::sst::index::applier::builder::tests::{ + use crate::sst::index::inverted_index::applier::builder::tests::{ encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column, test_object_store, test_region_metadata, }; diff --git a/src/mito2/src/sst/index/applier/builder/comparison.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs similarity index 98% rename from src/mito2/src/sst/index/applier/builder/comparison.rs rename to src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs index 74a67aac6f..450e39ad7a 100644 --- a/src/mito2/src/sst/index/applier/builder/comparison.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs @@ -17,7 +17,7 @@ use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePre use index::inverted_index::Bytes; use crate::error::Result; -use crate::sst::index::applier::builder::SstIndexApplierBuilder; +use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder; impl<'a> SstIndexApplierBuilder<'a> { /// Collects a comparison expression in the form of @@ -134,7 +134,7 @@ mod tests { use super::*; use crate::error::Error; - use crate::sst::index::applier::builder::tests::{ + use crate::sst::index::inverted_index::applier::builder::tests::{ encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column, test_object_store, test_region_metadata, }; diff --git a/src/mito2/src/sst/index/applier/builder/eq_list.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs similarity index 98% rename from src/mito2/src/sst/index/applier/builder/eq_list.rs rename to src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs index a01f77d413..24f677db1d 100644 --- a/src/mito2/src/sst/index/applier/builder/eq_list.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs @@ -20,7 +20,7 @@ use index::inverted_index::search::predicate::{InListPredicate, Predicate}; use index::inverted_index::Bytes; use crate::error::Result; -use crate::sst::index::applier::builder::SstIndexApplierBuilder; +use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder; impl<'a> SstIndexApplierBuilder<'a> { /// Collects an eq expression in the form of `column = lit`. @@ -124,7 +124,7 @@ impl<'a> SstIndexApplierBuilder<'a> { mod tests { use super::*; use crate::error::Error; - use crate::sst::index::applier::builder::tests::{ + use crate::sst::index::inverted_index::applier::builder::tests::{ encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column, tag_column2, test_object_store, test_region_metadata, }; diff --git a/src/mito2/src/sst/index/applier/builder/in_list.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs similarity index 97% rename from src/mito2/src/sst/index/applier/builder/in_list.rs rename to src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs index c9e0068530..146b58aeec 100644 --- a/src/mito2/src/sst/index/applier/builder/in_list.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs @@ -18,7 +18,7 @@ use datafusion_expr::expr::InList; use index::inverted_index::search::predicate::{InListPredicate, Predicate}; use crate::error::Result; -use crate::sst::index::applier::builder::SstIndexApplierBuilder; +use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder; impl<'a> SstIndexApplierBuilder<'a> { /// Collects an in list expression in the form of `column IN (lit, lit, ...)`. @@ -55,7 +55,7 @@ impl<'a> SstIndexApplierBuilder<'a> { mod tests { use super::*; use crate::error::Error; - use crate::sst::index::applier::builder::tests::{ + use crate::sst::index::inverted_index::applier::builder::tests::{ encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column, test_object_store, test_region_metadata, }; diff --git a/src/mito2/src/sst/index/applier/builder/regex_match.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs similarity index 96% rename from src/mito2/src/sst/index/applier/builder/regex_match.rs rename to src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs index f341a03a69..3c2122f4c0 100644 --- a/src/mito2/src/sst/index/applier/builder/regex_match.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs @@ -17,7 +17,7 @@ use datafusion_expr::Expr as DfExpr; use index::inverted_index::search::predicate::{Predicate, RegexMatchPredicate}; use crate::error::Result; -use crate::sst::index::applier::builder::SstIndexApplierBuilder; +use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder; impl<'a> SstIndexApplierBuilder<'a> { /// Collects a regex match expression in the form of `column ~ pattern`. @@ -49,7 +49,7 @@ mod tests { use super::*; use crate::error::Error; - use crate::sst::index::applier::builder::tests::{ + use crate::sst::index::inverted_index::applier::builder::tests::{ field_column, int64_lit, nonexistent_column, string_lit, tag_column, test_object_store, test_region_metadata, }; diff --git a/src/mito2/src/sst/index/codec.rs b/src/mito2/src/sst/index/inverted_index/codec.rs similarity index 90% rename from src/mito2/src/sst/index/codec.rs rename to src/mito2/src/sst/index/inverted_index/codec.rs index 0e238e9914..f2d0bbaf4a 100644 --- a/src/mito2/src/sst/index/codec.rs +++ b/src/mito2/src/sst/index/inverted_index/codec.rs @@ -17,6 +17,7 @@ use datatypes::value::{Value, ValueRef}; use memcomparable::Serializer; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::ColumnMetadata; +use store_api::storage::ColumnId; use crate::error::{FieldTypeMismatchSnafu, IndexEncodeNullSnafu, Result}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; @@ -57,12 +58,11 @@ impl IndexValueCodec { } } -pub(crate) type ColumnId = String; - /// Decodes primary key values into their corresponding column ids, data types and values. pub struct IndexValuesCodec { - /// The tag column ids. - column_ids: Vec, + /// Tuples containing column id and its corresponding index_name (result of `to_string` on ColumnId), + /// to minimize redundant `to_string` calls. + column_ids: Vec<(ColumnId, String)>, /// The data types of tag columns. fields: Vec, /// The decoder for the primary key. @@ -75,7 +75,7 @@ impl IndexValuesCodec { let (column_ids, fields): (Vec<_>, Vec<_>) = tag_columns .map(|column| { ( - column.column_id.to_string(), + (column.column_id, column.column_id.to_string()), SortField::new(column.column_schema.data_type.clone()), ) }) @@ -93,7 +93,7 @@ impl IndexValuesCodec { pub fn decode( &self, primary_key: &[u8], - ) -> Result)>> { + ) -> Result)>> { let values = self.decoder.decode(primary_key)?; let iter = values @@ -175,13 +175,15 @@ mod tests { let codec = IndexValuesCodec::from_tag_columns(tag_columns.iter()); let mut iter = codec.decode(&primary_key).unwrap(); - let (column_id, field, value) = iter.next().unwrap(); - assert_eq!(column_id, "1"); + let ((column_id, col_id_str), field, value) = iter.next().unwrap(); + assert_eq!(*column_id, 1); + assert_eq!(col_id_str, "1"); assert_eq!(field, &SortField::new(ConcreteDataType::string_datatype())); assert_eq!(value, None); - let (column_id, field, value) = iter.next().unwrap(); - assert_eq!(column_id, "2"); + let ((column_id, col_id_str), field, value) = iter.next().unwrap(); + assert_eq!(*column_id, 2); + assert_eq!(col_id_str, "2"); assert_eq!(field, &SortField::new(ConcreteDataType::int64_datatype())); assert_eq!(value, Some(Value::Int64(10))); diff --git a/src/mito2/src/sst/index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs similarity index 84% rename from src/mito2/src/sst/index/creator.rs rename to src/mito2/src/sst/index/inverted_index/creator.rs index a2553baa23..4a464f7701 100644 --- a/src/mito2/src/sst/index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod statistics; -mod temp_provider; +pub(crate) mod temp_provider; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::num::NonZeroUsize; use std::sync::atomic::AtomicUsize; use std::sync::Arc; @@ -25,28 +24,26 @@ use index::inverted_index::create::sort::external_sort::ExternalSorter; use index::inverted_index::create::sort_create::SortIndexCreator; use index::inverted_index::create::InvertedIndexCreator; use index::inverted_index::format::writer::InvertedIndexBlobWriter; -use object_store::ObjectStore; -use puffin::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter}; +use puffin::blob_metadata::CompressionCodec; +use puffin::puffin_manager::{PuffinWriter, PutOptions}; use snafu::{ensure, ResultExt}; use store_api::metadata::RegionMetadataRef; +use store_api::storage::ColumnId; use tokio::io::duplex; use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; use crate::error::{ - BiSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PuffinFinishSnafu, - PushIndexValueSnafu, Result, -}; -use crate::metrics::{ - INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL, + BiSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PushIndexValueSnafu, + Result, }; use crate::read::Batch; use crate::sst::file::FileId; -use crate::sst::index::codec::{ColumnId, IndexValueCodec, IndexValuesCodec}; -use crate::sst::index::creator::statistics::Statistics; -use crate::sst::index::creator::temp_provider::TempFileProvider; use crate::sst::index::intermediate::{IntermediateLocation, IntermediateManager}; -use crate::sst::index::store::InstrumentedStore; -use crate::sst::index::INDEX_BLOB_TYPE; +use crate::sst::index::inverted_index::codec::{IndexValueCodec, IndexValuesCodec}; +use crate::sst::index::inverted_index::creator::temp_provider::TempFileProvider; +use crate::sst::index::inverted_index::INDEX_BLOB_TYPE; +use crate::sst::index::puffin_manager::SstPuffinWriter; +use crate::sst::index::statistics::{ByteCount, RowCount, Statistics}; /// The minimum memory usage threshold for one column. const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; // 1MB @@ -54,15 +51,8 @@ const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; // 1MB /// The buffer size for the pipe used to send index data to the puffin blob. const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192; -type ByteCount = u64; -type RowCount = usize; - /// Creates SST index. pub struct SstIndexCreator { - /// Path of index file to write. - file_path: String, - /// The store to write index files. - store: InstrumentedStore, /// The index creator. index_creator: Box, /// The provider of intermediate files. @@ -78,24 +68,27 @@ pub struct SstIndexCreator { /// Whether the index creation is aborted. aborted: bool, - /// Ignore column IDs for index creation. - ignore_column_ids: HashSet, - /// The memory usage of the index creator. memory_usage: Arc, + + /// Whether to compress the index data. + compress: bool, + + /// Ids of indexed columns. + column_ids: HashSet, } impl SstIndexCreator { /// Creates a new `SstIndexCreator`. /// Should ensure that the number of tag columns is greater than 0. pub fn new( - file_path: String, sst_file_id: FileId, metadata: &RegionMetadataRef, - index_store: ObjectStore, intermediate_manager: IntermediateManager, memory_usage_threshold: Option, segment_row_count: NonZeroUsize, + compress: bool, + ignore_column_ids: &[ColumnId], ) -> Self { let temp_file_provider = Arc::new(TempFileProvider::new( IntermediateLocation::new(&metadata.region_id, &sst_file_id), @@ -113,35 +106,27 @@ impl SstIndexCreator { let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count)); let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns()); + let mut column_ids = metadata + .primary_key_columns() + .map(|c| c.column_id) + .collect::>(); + for id in ignore_column_ids { + column_ids.remove(id); + } + Self { - file_path, - store: InstrumentedStore::new(index_store), codec, index_creator, temp_file_provider, - value_buf: vec![], - stats: Statistics::default(), aborted: false, - - ignore_column_ids: HashSet::default(), memory_usage, + compress, + column_ids, } } - /// Sets the write buffer size of the store. - pub fn with_buffer_size(mut self, write_buffer_size: Option) -> Self { - self.store = self.store.with_write_buffer_size(write_buffer_size); - self - } - - /// Sets the ignore column IDs for index creation. - pub fn with_ignore_column_ids(mut self, ignore_column_ids: HashSet) -> Self { - self.ignore_column_ids = ignore_column_ids; - self - } - /// Updates index with a batch of rows. /// Garbage will be cleaned up if failed to update. pub async fn update(&mut self, batch: &Batch) -> Result<()> { @@ -155,12 +140,9 @@ impl SstIndexCreator { // clean up garbage if failed to update if let Err(err) = self.do_cleanup().await { if cfg!(any(test, feature = "test")) { - panic!( - "Failed to clean up index creator, file_path: {}, err: {}", - self.file_path, err - ); + panic!("Failed to clean up index creator, err: {err}",); } else { - warn!(err; "Failed to clean up index creator, file_path: {}", self.file_path); + warn!(err; "Failed to clean up index creator"); } } return Err(update_err); @@ -171,7 +153,10 @@ impl SstIndexCreator { /// Finishes index creation and cleans up garbage. /// Returns the number of rows and bytes written. - pub async fn finish(&mut self) -> Result<(RowCount, ByteCount)> { + pub async fn finish( + &mut self, + puffin_writer: &mut SstPuffinWriter, + ) -> Result<(RowCount, ByteCount)> { ensure!(!self.aborted, OperateAbortedIndexSnafu); if self.stats.row_count() == 0 { @@ -179,16 +164,13 @@ impl SstIndexCreator { return Ok((0, 0)); } - let finish_res = self.do_finish().await; + let finish_res = self.do_finish(puffin_writer).await; // clean up garbage no matter finish successfully or not if let Err(err) = self.do_cleanup().await { if cfg!(any(test, feature = "test")) { - panic!( - "Failed to clean up index creator, file_path: {}, err: {}", - self.file_path, err - ); + panic!("Failed to clean up index creator, err: {err}",); } else { - warn!(err; "Failed to clean up index creator, file_path: {}", self.file_path); + warn!(err; "Failed to clean up index creator"); } } @@ -211,8 +193,8 @@ impl SstIndexCreator { let n = batch.num_rows(); guard.inc_row_count(n); - for (column_id, field, value) in self.codec.decode(batch.primary_key())? { - if self.ignore_column_ids.contains(column_id) { + for ((col_id, col_id_str), field, value) in self.codec.decode(batch.primary_key())? { + if !self.column_ids.contains(col_id) { continue; } @@ -228,7 +210,7 @@ impl SstIndexCreator { // non-null value -> Some(encoded_bytes), null value -> None let value = value.is_some().then_some(self.value_buf.as_slice()); self.index_creator - .push_with_name_n(column_id, value, n) + .push_with_name_n(col_id_str, value, n) .await .context(PushIndexValueSnafu)?; } @@ -254,32 +236,18 @@ impl SstIndexCreator { /// └─────────────┘ └────────────────►│ File │ /// └──────┘ /// ``` - async fn do_finish(&mut self) -> Result<()> { + async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> { let mut guard = self.stats.record_finish(); - let file_writer = self - .store - .writer( - &self.file_path, - &INDEX_PUFFIN_WRITE_BYTES_TOTAL, - &INDEX_PUFFIN_WRITE_OP_TOTAL, - &INDEX_PUFFIN_FLUSH_OP_TOTAL, - ) - .await?; - let mut puffin_writer = PuffinFileWriter::new(file_writer); - let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB); - let blob = Blob { - blob_type: INDEX_BLOB_TYPE.to_string(), - compressed_data: rx.compat(), - properties: HashMap::default(), - compression_codec: None, - }; let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write()); + let put_options = PutOptions { + compression: self.compress.then_some(CompressionCodec::Zstd), + }; let (index_finish, puffin_add_blob) = futures::join!( self.index_creator.finish(&mut index_writer), - puffin_writer.add_blob(blob) + puffin_writer.put_blob(INDEX_BLOB_TYPE, rx.compat(), put_options) ); match ( @@ -294,11 +262,11 @@ impl SstIndexCreator { (Ok(_), e @ Err(_)) => e?, (e @ Err(_), Ok(_)) => e.map(|_| ())?, - _ => {} + (Ok(written_bytes), Ok(_)) => { + guard.inc_byte_count(written_bytes); + } } - let byte_count = puffin_writer.finish().await.context(PuffinFinishSnafu)?; - guard.inc_byte_count(byte_count); Ok(()) } @@ -308,6 +276,10 @@ impl SstIndexCreator { self.temp_file_provider.cleanup().await } + pub fn column_ids(&self) -> impl Iterator + '_ { + self.column_ids.iter().copied() + } + pub fn memory_usage(&self) -> usize { self.memory_usage.load(std::sync::atomic::Ordering::Relaxed) } @@ -326,12 +298,14 @@ mod tests { use datatypes::vectors::{UInt64Vector, UInt8Vector}; use futures::future::BoxFuture; use object_store::services::Memory; + use object_store::ObjectStore; + use puffin::puffin_manager::PuffinManager; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::storage::RegionId; use super::*; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; - use crate::sst::index::applier::builder::SstIndexApplierBuilder; + use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::location; @@ -418,13 +392,13 @@ mod tests { let segment_row_count = 2; let mut creator = SstIndexCreator::new( - file_path, sst_file_id, ®ion_metadata, - object_store.clone(), intm_mgr, memory_threshold, NonZeroUsize::new(segment_row_count).unwrap(), + false, + &[], ); for (str_tag, i32_tag) in &tags { @@ -432,8 +406,11 @@ mod tests { creator.update(&batch).await.unwrap(); } - let (row_count, _) = creator.finish().await.unwrap(); + let puffin_manager = factory.build(object_store.clone()); + let mut writer = puffin_manager.writer(&file_path).await.unwrap(); + let (row_count, _) = creator.finish(&mut writer).await.unwrap(); assert_eq!(row_count, tags.len() * segment_row_count); + writer.finish().await.unwrap(); move |expr| { let _d = &d; diff --git a/src/mito2/src/sst/index/creator/temp_provider.rs b/src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs similarity index 100% rename from src/mito2/src/sst/index/creator/temp_provider.rs rename to src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs index f953ca5dd4..7a9d246951 100644 --- a/src/mito2/src/sst/index/puffin_manager.rs +++ b/src/mito2/src/sst/index/puffin_manager.rs @@ -22,7 +22,7 @@ use puffin::error::{self as puffin_error, Result as PuffinResult}; use puffin::puffin_manager::file_accessor::PuffinFileAccessor; use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager; use puffin::puffin_manager::stager::{BoundedStager, FsBlobGuard}; -use puffin::puffin_manager::BlobGuard; +use puffin::puffin_manager::{BlobGuard, PuffinManager}; use snafu::ResultExt; use crate::error::{PuffinInitStagerSnafu, Result}; @@ -36,6 +36,7 @@ type InstrumentedAsyncRead = store::InstrumentedAsyncRead<'static, FuturesAsyncR type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>; pub(crate) type BlobReader = as BlobGuard>::Reader; +pub(crate) type SstPuffinWriter = ::Writer; pub(crate) type SstPuffinManager = FsPuffinManager, ObjectStorePuffinFileAccessor>; diff --git a/src/mito2/src/sst/index/creator/statistics.rs b/src/mito2/src/sst/index/statistics.rs similarity index 100% rename from src/mito2/src/sst/index/creator/statistics.rs rename to src/mito2/src/sst/index/statistics.rs diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 84b61cda36..34819c0c71 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -20,6 +20,7 @@ use common_base::readable_size::ReadableSize; use parquet::file::metadata::ParquetMetaData; use crate::sst::file::FileTimeRange; +use crate::sst::index::IndexOutput; use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; pub(crate) mod file_range; @@ -71,10 +72,8 @@ pub struct SstInfo { pub num_row_groups: u64, /// File Meta Data pub file_metadata: Option>, - /// Whether inverted index is available. - pub inverted_index_available: bool, - /// Index file size in bytes. - pub index_file_size: u64, + /// Index Meta Data + pub index_metadata: IndexOutput, } #[cfg(test)] diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index db2eb5b9cf..98ef033313 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -52,7 +52,7 @@ use crate::metrics::{ use crate::read::{Batch, BatchReader}; use crate::row_converter::{McmpRowCodec, SortField}; use crate::sst::file::FileHandle; -use crate::sst::index::applier::SstIndexApplierRef; +use crate::sst::index::inverted_index::applier::SstIndexApplierRef; use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef}; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::metadata::MetadataLoader; diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 9a3d852f9c..1d63f5e3d0 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -134,9 +134,7 @@ where } } - let index_size = self.indexer.finish().await; - let inverted_index_available = index_size.is_some(); - let index_file_size = index_size.unwrap_or(0); + let index_output = self.indexer.finish().await; if stats.num_rows == 0 { return Ok(None); @@ -165,8 +163,7 @@ where num_rows: stats.num_rows, num_row_groups: parquet_metadata.num_row_groups() as u64, file_metadata: Some(Arc::new(parquet_metadata)), - inverted_index_available, - index_file_size, + index_metadata: index_output, })) } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index f1d863aa38..49b89b6a1b 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -69,6 +69,7 @@ use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::read::{Batch, BatchBuilder, BatchReader}; use crate::sst::file_purger::{FilePurger, FilePurgerRef, PurgeRequest}; use crate::sst::index::intermediate::IntermediateManager; +use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::time_provider::{StdTimeProvider, TimeProviderRef}; use crate::worker::WorkerGroup; @@ -604,15 +605,25 @@ impl TestEnv { ) -> WriteCacheRef { let data_home = self.data_home().display().to_string(); - let intm_mgr = IntermediateManager::init_fs(join_dir(&data_home, "intm")) + let index_aux_path = self.data_home.path().join("index_aux"); + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None) + .await + .unwrap(); + let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) .await .unwrap(); let object_store_manager = self.get_object_store_manager().unwrap(); - let write_cache = - WriteCache::new(local_store, object_store_manager, capacity, None, intm_mgr) - .await - .unwrap(); + let write_cache = WriteCache::new( + local_store, + object_store_manager, + capacity, + None, + puffin_mgr, + intm_mgr, + ) + .await + .unwrap(); Arc::new(write_cache) } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 2a9edf15f4..2ffcc65fbe 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -149,6 +149,7 @@ impl WorkerGroup { let write_cache = write_cache_from_config( &config, object_store_manager.clone(), + puffin_manager_factory.clone(), intermediate_manager.clone(), ) .await?; @@ -280,6 +281,7 @@ impl WorkerGroup { let write_cache = write_cache_from_config( &config, object_store_manager.clone(), + puffin_manager_factory.clone(), intermediate_manager.clone(), ) .await?; @@ -337,6 +339,7 @@ fn region_id_to_index(id: RegionId, num_workers: usize) -> usize { async fn write_cache_from_config( config: &MitoConfig, object_store_manager: ObjectStoreManagerRef, + puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, ) -> Result> { if !config.enable_experimental_write_cache { @@ -351,6 +354,7 @@ async fn write_cache_from_config( object_store_manager, config.experimental_write_cache_size, config.experimental_write_cache_ttl, + puffin_manager_factory, intermediate_manager, ) .await?; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 57ad46dfe0..ebe821e428 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -838,6 +838,7 @@ create_on_flush = "auto" create_on_compaction = "auto" apply_on_query = "auto" mem_threshold_on_create = "64.0MiB" +compress = true [region_engine.mito.memtable] type = "time_series"