From b3aabb6706bcd0fb5357d59da03df1dff121b5a5 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Sun, 14 Sep 2025 21:36:24 +0800 Subject: [PATCH] feat: support flush and compact flat format files (#6949) * feat: basic functions for flush/compact flat format Signed-off-by: evenyag * feat: bridge flush and compaction for flat format Signed-off-by: evenyag * feat: add write cache support Signed-off-by: evenyag * style: fix clippy Signed-off-by: evenyag * chore: change log level to debug Signed-off-by: evenyag * refactor: wrap duplicated code to merge and dedup iter Signed-off-by: evenyag * refactor: wrap some code into flush_flat_mem_ranges Signed-off-by: evenyag * refactor: extract logic into do_flush_memtables Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito2/src/access_layer.rs | 68 +++- src/mito2/src/cache/write_cache.rs | 79 +++- src/mito2/src/compaction.rs | 28 +- src/mito2/src/compaction/compactor.rs | 19 +- src/mito2/src/flush.rs | 499 ++++++++++++++++++++------ src/mito2/src/memtable.rs | 18 + src/mito2/src/read/seq_scan.rs | 54 +++ src/mito2/src/sst/parquet.rs | 2 +- 8 files changed, 643 insertions(+), 124 deletions(-) diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 1ab47c6386..e65bec34c2 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -13,10 +13,11 @@ // limitations under the License. use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_stream::try_stream; use common_time::Timestamp; +use either::Either; use futures::{Stream, TryStreamExt}; use object_store::services::Fs; use object_store::util::{join_dir, with_instrument_layers}; @@ -34,7 +35,7 @@ use crate::cache::write_cache::SstUploadRequest; use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result}; use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED}; -use crate::read::Source; +use crate::read::{FlatSource, Source}; use crate::region::options::IndexOptions; use crate::sst::file::{FileHandle, FileId, RegionFileId}; use crate::sst::index::IndexerBuilderImpl; @@ -44,6 +45,7 @@ use crate::sst::location::{self, region_dir_from_table_dir}; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; use crate::sst::parquet::{SstInfo, WriteOptions}; +use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; pub type AccessLayerRef = Arc; /// SST write results. @@ -288,9 +290,16 @@ impl AccessLayer { ) .await .with_file_cleaner(cleaner); - let ssts = writer - .write_all(request.source, request.max_sequence, write_opts) - .await?; + let ssts = match request.source { + Either::Left(source) => { + writer + .write_all(source, request.max_sequence, write_opts) + .await? + } + Either::Right(flat_source) => { + writer.write_all_flat(flat_source, write_opts).await? + } + }; let metrics = writer.into_metrics(); (ssts, metrics) }; @@ -310,6 +319,53 @@ impl AccessLayer { Ok((sst_info, metrics)) } + /// Puts encoded SST bytes to the write cache (if enabled) and uploads it to the object store. + pub(crate) async fn put_sst( + &self, + data: &bytes::Bytes, + region_id: RegionId, + sst_info: &SstInfo, + cache_manager: &CacheManagerRef, + ) -> Result { + if let Some(write_cache) = cache_manager.write_cache() { + // Write to cache and upload to remote store + let upload_request = SstUploadRequest { + dest_path_provider: RegionFilePathFactory::new( + self.table_dir.clone(), + self.path_type, + ), + remote_store: self.object_store.clone(), + }; + write_cache + .put_and_upload_sst(data, region_id, sst_info, upload_request) + .await + } else { + let start = Instant::now(); + let cleaner = TempFileCleaner::new(region_id, self.object_store.clone()); + let path_provider = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type); + let sst_file_path = + path_provider.build_sst_file_path(RegionFileId::new(region_id, sst_info.file_id)); + let mut writer = self + .object_store + .writer_with(&sst_file_path) + .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) + .concurrent(DEFAULT_WRITE_CONCURRENCY) + .await + .context(OpenDalSnafu)?; + if let Err(err) = writer.write(data.clone()).await.context(OpenDalSnafu) { + cleaner.clean_by_file_id(sst_info.file_id).await; + return Err(err); + } + if let Err(err) = writer.close().await.context(OpenDalSnafu) { + cleaner.clean_by_file_id(sst_info.file_id).await; + return Err(err); + } + let mut metrics = Metrics::new(WriteType::Flush); + metrics.write_batch = start.elapsed(); + Ok(metrics) + } + } + /// Lists the SST entries from the storage layer in the table directory. pub fn storage_sst_entries(&self) -> impl Stream> + use<> { let object_store = self.object_store.clone(); @@ -363,7 +419,7 @@ pub enum OperationType { pub struct SstWriteRequest { pub op_type: OperationType, pub metadata: RegionMetadataRef, - pub source: Source, + pub source: Either, pub cache_manager: CacheManagerRef, #[allow(dead_code)] pub storage: Option, diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 322548b11c..498239bc7f 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -37,8 +37,8 @@ use crate::sst::file::RegionFileId; use crate::sst::index::IndexerBuilderImpl; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; -use crate::sst::parquet::WriteOptions; use crate::sst::parquet::writer::ParquetWriter; +use crate::sst::parquet::{SstInfo, WriteOptions}; use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; /// A cache for uploading files to remote object stores. @@ -101,6 +101,66 @@ impl WriteCache { self.file_cache.clone() } + /// Put encoded SST data to the cache and upload to the remote object store. + pub(crate) async fn put_and_upload_sst( + &self, + data: &bytes::Bytes, + region_id: RegionId, + sst_info: &SstInfo, + upload_request: SstUploadRequest, + ) -> Result { + let file_id = sst_info.file_id; + let mut metrics = Metrics::new(WriteType::Flush); + + // Create index key for the SST file + let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet); + + // Write to cache first + let cache_start = Instant::now(); + let cache_path = self.file_cache.cache_file_path(parquet_key); + let mut cache_writer = self + .file_cache + .local_store() + .writer(&cache_path) + .await + .context(crate::error::OpenDalSnafu)?; + + cache_writer + .write(data.clone()) + .await + .context(crate::error::OpenDalSnafu)?; + cache_writer + .close() + .await + .context(crate::error::OpenDalSnafu)?; + + // Register in file cache + let index_value = IndexValue { + file_size: data.len() as u32, + }; + self.file_cache.put(parquet_key, index_value).await; + metrics.write_batch = cache_start.elapsed(); + + // Upload to remote store + let upload_start = Instant::now(); + let region_file_id = RegionFileId::new(region_id, file_id); + let remote_path = upload_request + .dest_path_provider + .build_sst_file_path(region_file_id); + + if let Err(e) = self + .upload(parquet_key, &remote_path, &upload_request.remote_store) + .await + { + // Clean up cache on failure + self.remove(parquet_key).await; + return Err(e); + } + + metrics.upload_parquet = upload_start.elapsed(); + Ok(metrics) + } + /// Writes SST to the cache and then uploads it to the remote object store. pub(crate) async fn write_and_upload_sst( &self, @@ -139,9 +199,14 @@ impl WriteCache { .await .with_file_cleaner(cleaner); - let sst_info = writer - .write_all(write_request.source, write_request.max_sequence, write_opts) - .await?; + let sst_info = match write_request.source { + either::Left(source) => { + writer + .write_all(source, write_request.max_sequence, write_opts) + .await? + } + either::Right(flat_source) => writer.write_all_flat(flat_source, write_opts).await?, + }; let mut metrics = writer.into_metrics(); // Upload sst file to remote object store. @@ -469,7 +534,7 @@ mod tests { let write_request = SstWriteRequest { op_type: OperationType::Flush, metadata, - source, + source: either::Left(source), storage: None, max_sequence: None, cache_manager: Default::default(), @@ -567,7 +632,7 @@ mod tests { let write_request = SstWriteRequest { op_type: OperationType::Flush, metadata, - source, + source: either::Left(source), storage: None, max_sequence: None, cache_manager: cache_manager.clone(), @@ -646,7 +711,7 @@ mod tests { let write_request = SstWriteRequest { op_type: OperationType::Flush, metadata, - source, + source: either::Left(source), storage: None, max_sequence: None, cache_manager: cache_manager.clone(), diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index ed61ddc2ed..99eb8e7056 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -55,10 +55,10 @@ use crate::error::{ TimeRangePredicateOverflowSnafu, TimeoutSnafu, }; use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT}; -use crate::read::BoxedBatchReader; use crate::read::projection::ProjectionMapper; use crate::read::scan_region::{PredicateGroup, ScanInput}; use crate::read::seq_scan::SeqScan; +use crate::read::{BoxedBatchReader, BoxedRecordBatchStream}; use crate::region::options::MergeMode; use crate::region::version::VersionControlRef; use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState}; @@ -662,6 +662,32 @@ impl CompactionSstReaderBuilder<'_> { .build_reader_for_compaction() .await } + + /// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction. + async fn build_flat_sst_reader(self) -> Result { + let mut scan_input = + ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata, true)?) + .with_files(self.inputs.to_vec()) + .with_append_mode(self.append_mode) + // We use special cache strategy for compaction. + .with_cache(CacheStrategy::Compaction(self.cache)) + .with_filter_deleted(self.filter_deleted) + // We ignore file not found error during compaction. + .with_ignore_file_not_found(true) + .with_merge_mode(self.merge_mode) + .with_flat_format(true); + + // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944 + // by converting time ranges into predicate. + if let Some(time_range) = self.time_range { + scan_input = + scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?); + } + + SeqScan::new(scan_input, true) + .build_flat_reader_for_compaction() + .await + } } /// Converts time range to predicates so that rows outside the range will be filtered. diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index bac14906d5..4fe7cae839 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -42,7 +42,7 @@ use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions}; use crate::manifest::storage::manifest_compress_type; use crate::metrics; -use crate::read::Source; +use crate::read::{FlatSource, Source}; use crate::region::opener::new_manifest_dir; use crate::region::options::RegionOptions; use crate::region::version::VersionRef; @@ -335,6 +335,8 @@ impl Compactor for DefaultCompactor { let region_id = compaction_region.region_id; let cache_manager = compaction_region.cache_manager.clone(); let storage = compaction_region.region_options.storage.clone(); + // TODO: Set flat_format from mito config + let flat_format = false; let index_options = compaction_region .current_version .options @@ -359,7 +361,7 @@ impl Compactor for DefaultCompactor { .iter() .map(|f| f.file_id().to_string()) .join(","); - let reader = CompactionSstReaderBuilder { + let builder = CompactionSstReaderBuilder { metadata: region_metadata.clone(), sst_layer: sst_layer.clone(), cache: cache_manager.clone(), @@ -368,15 +370,20 @@ impl Compactor for DefaultCompactor { filter_deleted: output.filter_deleted, time_range: output.output_time_range, merge_mode, - } - .build_sst_reader() - .await?; + }; + let source = if flat_format { + let reader = builder.build_flat_sst_reader().await?; + either::Right(FlatSource::Stream(reader)) + } else { + let reader = builder.build_sst_reader().await?; + either::Left(Source::Reader(reader)) + }; let (sst_infos, metrics) = sst_layer .write_sst( SstWriteRequest { op_type: OperationType::Compact, metadata: region_metadata, - source: Source::Reader(reader), + source, cache_manager, storage, max_sequence: max_sequence.map(NonZero::get), diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index cc40b6fed0..3e2099beea 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -18,32 +18,41 @@ use std::collections::HashMap; use std::num::NonZeroU64; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Instant; use common_telemetry::{debug, error, info, trace}; +use datatypes::arrow::datatypes::SchemaRef; +use either::Either; +use partition::expr::PartitionExpr; +use smallvec::{SmallVec, smallvec}; use snafu::ResultExt; use store_api::storage::RegionId; use strum::IntoStaticStr; use tokio::sync::{mpsc, watch}; -use crate::access_layer::{AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType}; +use crate::access_layer::{ + AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType, +}; use crate::cache::CacheManagerRef; use crate::config::MitoConfig; use crate::error::{ - Error, FlushRegionSnafu, InvalidPartitionExprSnafu, RegionClosedSnafu, RegionDroppedSnafu, - RegionTruncatedSnafu, Result, + Error, FlushRegionSnafu, InvalidPartitionExprSnafu, JoinSnafu, RegionClosedSnafu, + RegionDroppedSnafu, RegionTruncatedSnafu, Result, }; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; -use crate::memtable::MemtableRanges; +use crate::memtable::{BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges}; use crate::metrics::{ FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL, INFLIGHT_FLUSH_COUNT, }; -use crate::read::Source; use crate::read::dedup::{DedupReader, LastNonNull, LastRow}; +use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow}; +use crate::read::flat_merge::FlatMergeIterator; use crate::read::merge::MergeReaderBuilder; use crate::read::scan_region::PredicateGroup; -use crate::region::options::{IndexOptions, MergeMode}; -use crate::region::version::{VersionControlData, VersionControlRef}; +use crate::read::{FlatSource, Source}; +use crate::region::options::{IndexOptions, MergeMode, RegionOptions}; +use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState}; use crate::request::{ BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest, @@ -51,7 +60,8 @@ use crate::request::{ }; use crate::schedule::scheduler::{Job, SchedulerRef}; use crate::sst::file::FileMeta; -use crate::sst::parquet::WriteOptions; +use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions}; +use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema}; use crate::worker::WorkerListener; /// Global write buffer (memtable) manager. @@ -343,100 +353,12 @@ impl RegionFlushTask { write_opts.row_group_size = row_group_size; } - let memtables = version.memtables.immutables(); - let mut file_metas = Vec::with_capacity(memtables.len()); - let mut flushed_bytes = 0; - let mut series_count = 0; - let mut flush_metrics = Metrics::new(WriteType::Flush); - for mem in memtables { - if mem.is_empty() { - // Skip empty memtables. - continue; - } - - let MemtableRanges { ranges, stats } = - mem.ranges(None, PredicateGroup::default(), None)?; - - let max_sequence = stats.max_sequence(); - series_count += stats.series_count(); - - let source = if ranges.len() == 1 { - let only_range = ranges.into_values().next().unwrap(); - let iter = only_range.build_iter()?; - Source::Iter(iter) - } else { - // todo(hl): a workaround since sync version of MergeReader is wip. - let sources = ranges - .into_values() - .map(|r| r.build_iter().map(Source::Iter)) - .collect::>>()?; - let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?; - let maybe_dedup = if version.options.append_mode { - // no dedup in append mode - Box::new(merge_reader) as _ - } else { - // dedup according to merge mode - match version.options.merge_mode.unwrap_or(MergeMode::LastRow) { - MergeMode::LastRow => { - Box::new(DedupReader::new(merge_reader, LastRow::new(false))) as _ - } - MergeMode::LastNonNull => { - Box::new(DedupReader::new(merge_reader, LastNonNull::new(false))) as _ - } - } - }; - Source::Reader(maybe_dedup) - }; - - // Flush to level 0. - let write_request = SstWriteRequest { - op_type: OperationType::Flush, - metadata: version.metadata.clone(), - source, - cache_manager: self.cache_manager.clone(), - storage: version.options.storage.clone(), - max_sequence: Some(max_sequence), - index_options: self.index_options.clone(), - inverted_index_config: self.engine_config.inverted_index.clone(), - fulltext_index_config: self.engine_config.fulltext_index.clone(), - bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(), - }; - - let (ssts_written, metrics) = self - .access_layer - .write_sst(write_request, &write_opts, WriteType::Flush) - .await?; - if ssts_written.is_empty() { - // No data written. - continue; - } - flush_metrics = flush_metrics.merge(metrics); - - // Convert partition expression once outside the map - let partition_expr = match &version.metadata.partition_expr { - None => None, - Some(json_expr) if json_expr.is_empty() => None, - Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str) - .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?, - }; - - file_metas.extend(ssts_written.into_iter().map(|sst_info| { - flushed_bytes += sst_info.file_size; - FileMeta { - region_id: self.region_id, - file_id: sst_info.file_id, - time_range: sst_info.time_range, - level: 0, - file_size: sst_info.file_size, - available_indexes: sst_info.index_metadata.build_available_indexes(), - index_file_size: sst_info.index_metadata.file_size, - num_rows: sst_info.num_rows as u64, - num_row_groups: sst_info.num_row_groups, - sequence: NonZeroU64::new(max_sequence), - partition_expr: partition_expr.clone(), - } - })); - } + let DoFlushMemtablesResult { + file_metas, + flushed_bytes, + series_count, + flush_metrics, + } = self.do_flush_memtables(version, write_opts).await?; if !file_metas.is_empty() { FLUSH_BYTES_TOTAL.inc_by(flushed_bytes); @@ -493,8 +415,237 @@ impl RegionFlushTask { Ok(edit) } + async fn do_flush_memtables( + &self, + version: &VersionRef, + write_opts: WriteOptions, + ) -> Result { + let memtables = version.memtables.immutables(); + let mut file_metas = Vec::with_capacity(memtables.len()); + let mut flushed_bytes = 0; + let mut series_count = 0; + // Convert partition expression once outside the map + let partition_expr = match &version.metadata.partition_expr { + None => None, + Some(json_expr) if json_expr.is_empty() => None, + Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str) + .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?, + }; + let mut flush_metrics = Metrics::new(WriteType::Flush); + for mem in memtables { + if mem.is_empty() { + // Skip empty memtables. + continue; + } + + // Compact the memtable first, this waits the background compaction to finish. + let compact_start = std::time::Instant::now(); + if let Err(e) = mem.compact(true) { + common_telemetry::error!(e; "Failed to compact memtable before flush"); + } + let compact_cost = compact_start.elapsed(); + + let mem_ranges = mem.ranges(None, PredicateGroup::default(), None)?; + let num_mem_ranges = mem_ranges.ranges.len(); + let num_mem_rows = mem_ranges.stats.num_rows(); + let memtable_id = mem.id(); + // Increases series count for each mem range. We consider each mem range has different series so + // the counter may have more series than the actual series count. + series_count += mem_ranges.stats.series_count(); + + if mem_ranges.is_record_batch() { + let flush_start = Instant::now(); + let FlushFlatMemResult { + num_encoded, + max_sequence, + num_sources, + results, + } = self + .flush_flat_mem_ranges(version, &write_opts, mem_ranges) + .await?; + for (source_idx, result) in results.into_iter().enumerate() { + let (ssts_written, metrics) = result?; + if ssts_written.is_empty() { + // No data written. + continue; + } + + common_telemetry::debug!( + "Region {} flush one memtable {} {}/{}, metrics: {:?}", + self.region_id, + memtable_id, + source_idx, + num_sources, + metrics + ); + + flush_metrics = flush_metrics.merge(metrics); + + file_metas.extend(ssts_written.into_iter().map(|sst_info| { + flushed_bytes += sst_info.file_size; + Self::new_file_meta( + self.region_id, + max_sequence, + sst_info, + partition_expr.clone(), + ) + })); + } + + common_telemetry::debug!( + "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}", + self.region_id, + num_sources, + memtable_id, + num_mem_ranges, + num_encoded, + num_mem_rows, + flush_start.elapsed(), + compact_cost, + ); + } else { + let max_sequence = mem_ranges.stats.max_sequence(); + let source = memtable_source(mem_ranges, &version.options).await?; + + // Flush to level 0. + let source = Either::Left(source); + let write_request = self.new_write_request(version, max_sequence, source); + + let (ssts_written, metrics) = self + .access_layer + .write_sst(write_request, &write_opts, WriteType::Flush) + .await?; + if ssts_written.is_empty() { + // No data written. + continue; + } + + common_telemetry::debug!( + "Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}", + self.region_id, + num_mem_ranges, + num_mem_rows, + metrics + ); + + flush_metrics = flush_metrics.merge(metrics); + + file_metas.extend(ssts_written.into_iter().map(|sst_info| { + flushed_bytes += sst_info.file_size; + Self::new_file_meta( + self.region_id, + max_sequence, + sst_info, + partition_expr.clone(), + ) + })); + }; + } + + Ok(DoFlushMemtablesResult { + file_metas, + flushed_bytes, + series_count, + flush_metrics, + }) + } + + async fn flush_flat_mem_ranges( + &self, + version: &VersionRef, + write_opts: &WriteOptions, + mem_ranges: MemtableRanges, + ) -> Result { + let batch_schema = + to_flat_sst_arrow_schema(&version.metadata, &FlatSchemaOptions::default()); + let flat_sources = memtable_flat_sources( + batch_schema, + mem_ranges, + &version.options, + version.metadata.primary_key.len(), + )?; + let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len()); + let num_encoded = flat_sources.encoded.len(); + let max_sequence = flat_sources.max_sequence; + for source in flat_sources.sources { + let source = Either::Right(source); + let write_request = self.new_write_request(version, max_sequence, source); + let access_layer = self.access_layer.clone(); + let write_opts = write_opts.clone(); + let task = common_runtime::spawn_global(async move { + access_layer + .write_sst(write_request, &write_opts, WriteType::Flush) + .await + }); + tasks.push(task); + } + for encoded in flat_sources.encoded { + let access_layer = self.access_layer.clone(); + let cache_manager = self.cache_manager.clone(); + let region_id = version.metadata.region_id; + let task = common_runtime::spawn_global(async move { + let metrics = access_layer + .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager) + .await?; + Ok((smallvec![encoded.sst_info], metrics)) + }); + tasks.push(task); + } + let num_sources = tasks.len(); + let results = futures::future::try_join_all(tasks) + .await + .context(JoinSnafu)?; + Ok(FlushFlatMemResult { + num_encoded, + max_sequence, + num_sources, + results, + }) + } + + fn new_file_meta( + region_id: RegionId, + max_sequence: u64, + sst_info: SstInfo, + partition_expr: Option, + ) -> FileMeta { + FileMeta { + region_id, + file_id: sst_info.file_id, + time_range: sst_info.time_range, + level: 0, + file_size: sst_info.file_size, + available_indexes: sst_info.index_metadata.build_available_indexes(), + index_file_size: sst_info.index_metadata.file_size, + num_rows: sst_info.num_rows as u64, + num_row_groups: sst_info.num_row_groups, + sequence: NonZeroU64::new(max_sequence), + partition_expr, + } + } + + fn new_write_request( + &self, + version: &VersionRef, + max_sequence: u64, + source: Either, + ) -> SstWriteRequest { + SstWriteRequest { + op_type: OperationType::Flush, + metadata: version.metadata.clone(), + source, + cache_manager: self.cache_manager.clone(), + storage: version.options.storage.clone(), + max_sequence: Some(max_sequence), + index_options: self.index_options.clone(), + inverted_index_config: self.engine_config.inverted_index.clone(), + fulltext_index_config: self.engine_config.fulltext_index.clone(), + bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(), + } + } + /// Notify flush job status. - async fn send_worker_request(&self, request: WorkerRequest) { + pub(crate) async fn send_worker_request(&self, request: WorkerRequest) { if let Err(e) = self .request_sender .send(WorkerRequestWithTime::new(request)) @@ -515,6 +666,148 @@ impl RegionFlushTask { } } +struct FlushFlatMemResult { + num_encoded: usize, + max_sequence: u64, + num_sources: usize, + results: Vec>, +} + +struct DoFlushMemtablesResult { + file_metas: Vec, + flushed_bytes: u64, + series_count: usize, + flush_metrics: Metrics, +} + +/// Returns a [Source] for the given memtable. +async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> Result { + let source = if mem_ranges.ranges.len() == 1 { + let only_range = mem_ranges.ranges.into_values().next().unwrap(); + let iter = only_range.build_iter()?; + Source::Iter(iter) + } else { + // todo(hl): a workaround since sync version of MergeReader is wip. + let sources = mem_ranges + .ranges + .into_values() + .map(|r| r.build_iter().map(Source::Iter)) + .collect::>>()?; + let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?; + let maybe_dedup = if options.append_mode { + // no dedup in append mode + Box::new(merge_reader) as _ + } else { + // dedup according to merge mode + match options.merge_mode.unwrap_or(MergeMode::LastRow) { + MergeMode::LastRow => { + Box::new(DedupReader::new(merge_reader, LastRow::new(false))) as _ + } + MergeMode::LastNonNull => { + Box::new(DedupReader::new(merge_reader, LastNonNull::new(false))) as _ + } + } + }; + Source::Reader(maybe_dedup) + }; + Ok(source) +} + +struct FlatSources { + max_sequence: u64, + sources: SmallVec<[FlatSource; 4]>, + encoded: SmallVec<[EncodedRange; 4]>, +} + +// TODO(yingwen): Flushes into multiple files in parallel. +/// Returns the max sequence and [FlatSource] for the given memtable. +fn memtable_flat_sources( + schema: SchemaRef, + mem_ranges: MemtableRanges, + options: &RegionOptions, + field_column_start: usize, +) -> Result { + let MemtableRanges { ranges, stats } = mem_ranges; + let max_sequence = stats.max_sequence(); + let mut flat_sources = FlatSources { + max_sequence, + sources: SmallVec::new(), + encoded: SmallVec::new(), + }; + + if ranges.len() == 1 { + let only_range = ranges.into_values().next().unwrap(); + if let Some(encoded) = only_range.encoded() { + flat_sources.encoded.push(encoded); + } else { + let iter = only_range.build_record_batch_iter(None)?; + flat_sources.sources.push(FlatSource::Iter(iter)); + }; + } else { + let min_flush_rows = stats.num_rows / 8; + let min_flush_rows = min_flush_rows.max(DEFAULT_ROW_GROUP_SIZE); + let mut last_iter_rows = 0; + let num_ranges = ranges.len(); + let mut input_iters = Vec::with_capacity(num_ranges); + for (_range_id, range) in ranges { + if let Some(encoded) = range.encoded() { + flat_sources.encoded.push(encoded); + continue; + } + + let iter = range.build_record_batch_iter(None)?; + input_iters.push(iter); + last_iter_rows += range.num_rows(); + + if last_iter_rows > min_flush_rows { + let maybe_dedup = merge_and_dedup( + &schema, + options, + field_column_start, + std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)), + )?; + + flat_sources.sources.push(FlatSource::Iter(maybe_dedup)); + last_iter_rows = 0; + } + } + + // Handle remaining iters. + if !input_iters.is_empty() { + let maybe_dedup = merge_and_dedup(&schema, options, field_column_start, input_iters)?; + + flat_sources.sources.push(FlatSource::Iter(maybe_dedup)); + } + } + + Ok(flat_sources) +} + +fn merge_and_dedup( + schema: &SchemaRef, + options: &RegionOptions, + field_column_start: usize, + input_iters: Vec, +) -> Result { + let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?; + let maybe_dedup = if options.append_mode { + // No dedup in append mode + Box::new(merge_iter) as _ + } else { + // Dedup according to merge mode. + match options.merge_mode() { + MergeMode::LastRow => { + Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _ + } + MergeMode::LastNonNull => Box::new(FlatDedupIterator::new( + merge_iter, + FlatLastNonNull::new(field_column_start, false), + )) as _, + } + }; + Ok(maybe_dedup) +} + /// Manages background flushes of a worker. pub(crate) struct FlushScheduler { /// Tracks regions need to flush. diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 6eb054d9eb..434c23650d 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -151,6 +151,19 @@ pub struct MemtableRanges { pub stats: MemtableStats, } +impl IterBuilder for MemtableRanges { + fn build(&self, _metrics: Option) -> Result { + UnsupportedOperationSnafu { + err_msg: "MemtableRanges does not support build iterator", + } + .fail() + } + + fn is_record_batch(&self) -> bool { + self.ranges.values().all(|range| range.is_record_batch()) + } +} + /// In memory write buffer. pub trait Memtable: Send + Sync + fmt::Debug { /// Returns the id of this memtable. @@ -528,6 +541,11 @@ impl MemtableRange { pub fn num_rows(&self) -> usize { self.num_rows } + + /// Returns the encoded range if available. + pub fn encoded(&self) -> Option { + self.context.builder.encoded_range() + } } #[cfg(test)] diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 5ccbe0ae30..8191b9f993 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -139,6 +139,27 @@ impl SeqScan { Ok(Box::new(reader)) } + /// Builds a [BoxedRecordBatchStream] from sequential scan for flat format compaction. + /// + /// # Panics + /// Panics if the compaction flag is not set. + pub async fn build_flat_reader_for_compaction(&self) -> Result { + assert!(self.compaction); + + let metrics_set = ExecutionPlanMetricsSet::new(); + let part_metrics = self.new_partition_metrics(false, &metrics_set, 0); + debug_assert_eq!(1, self.properties.partitions.len()); + let partition_ranges = &self.properties.partitions[0]; + + let reader = Self::merge_all_flat_ranges_for_compaction( + &self.stream_ctx, + partition_ranges, + &part_metrics, + ) + .await?; + Ok(reader) + } + /// Builds a merge reader that reads all ranges. /// Callers MUST not split ranges before calling this method. async fn merge_all_ranges_for_compaction( @@ -172,6 +193,39 @@ impl SeqScan { Self::build_reader_from_sources(stream_ctx, sources, None).await } + /// Builds a merge reader that reads all flat ranges. + /// Callers MUST not split ranges before calling this method. + async fn merge_all_flat_ranges_for_compaction( + stream_ctx: &Arc, + partition_ranges: &[PartitionRange], + part_metrics: &PartitionMetrics, + ) -> Result { + let mut sources = Vec::new(); + let range_builder_list = Arc::new(RangeBuilderList::new( + stream_ctx.input.num_memtables(), + stream_ctx.input.num_files(), + )); + for part_range in partition_ranges { + build_flat_sources( + stream_ctx, + part_range, + true, + part_metrics, + range_builder_list.clone(), + &mut sources, + ) + .await?; + } + + common_telemetry::debug!( + "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}", + stream_ctx.input.mapper.metadata().region_id, + partition_ranges.len(), + sources.len() + ); + Self::build_flat_reader_from_sources(stream_ctx, sources, None).await + } + /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel /// if possible. #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index d14de32128..f03e0947f8 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -43,7 +43,7 @@ pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024; pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE; /// Parquet write options. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct WriteOptions { /// Buffer size for async writer. pub write_buffer_size: ReadableSize,