diff --git a/src/mito2/benches/bench_filter_time_partition.rs b/src/mito2/benches/bench_filter_time_partition.rs index 46c394c618..678687502f 100644 --- a/src/mito2/benches/bench_filter_time_partition.rs +++ b/src/mito2/benches/bench_filter_time_partition.rs @@ -53,8 +53,8 @@ fn random_array(num: usize) -> BulkPart { .unwrap(); BulkPart { batch, - max_ts: max, - min_ts: min, + max_timestamp: max, + min_timestamp: min, sequence: 0, timestamp_index: 0, raw_data: None, @@ -86,8 +86,8 @@ fn filter_arrow_impl(part: &BulkPart, min: i64, max: i64) -> Option { let batch = arrow::compute::filter_record_batch(&part.batch, &predicate).unwrap(); Some(BulkPart { batch, - max_ts: max, - min_ts: min, + max_timestamp: max, + min_timestamp: min, sequence: 0, timestamp_index: part.timestamp_index, raw_data: None, diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 5a8e9d2cd5..6eb054d9eb 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -21,6 +21,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; pub use bulk::part::EncodedBulkPart; +use bytes::Bytes; use common_time::Timestamp; use datatypes::arrow::record_batch::RecordBatch; use mito_codec::key_values::KeyValue; @@ -40,6 +41,7 @@ use crate::read::prune::PruneTimeIterator; use crate::read::scan_region::PredicateGroup; use crate::region::options::{MemtableOptions, MergeMode}; use crate::sst::file::FileTimeRange; +use crate::sst::parquet::SstInfo; mod builder; pub mod bulk; @@ -199,6 +201,14 @@ pub trait Memtable: Send + Sync + fmt::Debug { /// /// A region must freeze the memtable before invoking this method. fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef; + + /// Compacts the memtable. + /// + /// The `for_flush` is true when the flush job calls this method. + fn compact(&self, for_flush: bool) -> Result<()> { + let _ = for_flush; + Ok(()) + } } pub type MemtableRef = Arc; @@ -394,6 +404,14 @@ pub(crate) struct MemScanMetricsData { pub(crate) scan_cost: Duration, } +/// Encoded range in the memtable. +pub struct EncodedRange { + /// Encoded file data. + pub data: Bytes, + /// Metadata of the encoded range. + pub sst_info: SstInfo, +} + /// Builder to build an iterator to read the range. /// The builder should know the projection and the predicate to build the iterator. pub trait IterBuilder: Send + Sync { @@ -416,6 +434,11 @@ pub trait IterBuilder: Send + Sync { } .fail() } + + /// Returns the [EncodedRange] if the range is already encoded into SST. + fn encoded_range(&self) -> Option { + None + } } pub type BoxedIterBuilder = Box; diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 0129c87d9d..102aae52ca 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -21,27 +21,35 @@ pub mod part; pub mod part_reader; mod row_group_reader; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet}; use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::Instant; use datatypes::arrow::datatypes::SchemaRef; use mito_codec::key_values::KeyValue; +use rayon::prelude::*; use store_api::metadata::RegionMetadataRef; -use store_api::storage::{ColumnId, SequenceNumber}; +use store_api::storage::{ColumnId, RegionId, SequenceNumber}; +use tokio::sync::Semaphore; use crate::error::{Result, UnsupportedOperationSnafu}; use crate::flush::WriteBufferManagerRef; use crate::memtable::bulk::context::BulkIterContext; -use crate::memtable::bulk::part::BulkPart; +use crate::memtable::bulk::part::{BulkPart, BulkPartEncodeMetrics, BulkPartEncoder}; use crate::memtable::bulk::part_reader::BulkPartRecordBatchIter; use crate::memtable::stats::WriteMetrics; use crate::memtable::{ - AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, IterBuilder, - KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange, + AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange, + IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, PredicateGroup, }; +use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow}; +use crate::read::flat_merge::FlatMergeIterator; +use crate::region::options::MergeMode; use crate::sst::file::FileId; +use crate::sst::parquet::format::FIXED_POS_COLUMN_NUM; +use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE}; use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema}; /// All parts in a bulk memtable. @@ -63,6 +71,166 @@ impl BulkParts { fn is_empty(&self) -> bool { self.parts.is_empty() && self.encoded_parts.is_empty() } + + /// Returns true if the bulk parts should be merged. + fn should_merge_bulk_parts(&self) -> bool { + let unmerged_count = self.parts.iter().filter(|wrapper| !wrapper.merging).count(); + // If the total number of unmerged parts is >= 8, start a merge task. + unmerged_count >= 8 + } + + /// Returns true if the encoded parts should be merged. + fn should_merge_encoded_parts(&self) -> bool { + let unmerged_count = self + .encoded_parts + .iter() + .filter(|wrapper| !wrapper.merging) + .count(); + // If the total number of unmerged encoded parts is >= 8, start a merge task. + unmerged_count >= 8 + } + + /// Collects unmerged parts and marks them as being merged. + /// Returns the collected parts to merge. + fn collect_bulk_parts_to_merge(&mut self) -> Vec { + let mut collected_parts = Vec::new(); + + for wrapper in &mut self.parts { + if !wrapper.merging { + wrapper.merging = true; + collected_parts.push(PartToMerge::Bulk { + part: wrapper.part.clone(), + file_id: wrapper.file_id, + }); + } + } + collected_parts + } + + /// Collects unmerged encoded parts within size threshold and marks them as being merged. + /// Returns the collected parts to merge. + fn collect_encoded_parts_to_merge(&mut self) -> Vec { + // Find minimum size among unmerged parts + let min_size = self + .encoded_parts + .iter() + .filter(|wrapper| !wrapper.merging) + .map(|wrapper| wrapper.part.size_bytes()) + .min(); + + let Some(min_size) = min_size else { + return Vec::new(); + }; + + let max_allowed_size = min_size.saturating_mul(16).min(4 * 1024 * 1024); + let mut collected_parts = Vec::new(); + + for wrapper in &mut self.encoded_parts { + if !wrapper.merging { + let size = wrapper.part.size_bytes(); + if size <= max_allowed_size { + wrapper.merging = true; + collected_parts.push(PartToMerge::Encoded { + part: wrapper.part.clone(), + file_id: wrapper.file_id, + }); + } + } + } + collected_parts + } + + /// Installs merged encoded parts and removes the original parts by file ids. + /// Returns the total number of rows in the merged parts. + fn install_merged_parts( + &mut self, + merged_parts: I, + merged_file_ids: &HashSet, + merge_encoded: bool, + ) -> usize + where + I: IntoIterator, + { + let mut total_output_rows = 0; + + for encoded_part in merged_parts { + total_output_rows += encoded_part.metadata().num_rows; + self.encoded_parts.push(EncodedPartWrapper { + part: encoded_part, + file_id: FileId::random(), + merging: false, + }); + } + + if merge_encoded { + self.encoded_parts + .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id)); + } else { + self.parts + .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id)); + } + + total_output_rows + } + + /// Resets merging flag for parts with the given file ids. + /// Used when merging fails or is cancelled. + fn reset_merging_flags(&mut self, file_ids: &HashSet, merge_encoded: bool) { + if merge_encoded { + for wrapper in &mut self.encoded_parts { + if file_ids.contains(&wrapper.file_id) { + wrapper.merging = false; + } + } + } else { + for wrapper in &mut self.parts { + if file_ids.contains(&wrapper.file_id) { + wrapper.merging = false; + } + } + } + } +} + +/// RAII guard for managing merging flags. +/// Automatically resets merging flags when dropped if the merge operation wasn't successful. +struct MergingFlagsGuard<'a> { + bulk_parts: &'a RwLock, + file_ids: &'a HashSet, + merge_encoded: bool, + success: bool, +} + +impl<'a> MergingFlagsGuard<'a> { + /// Creates a new guard for the given file ids. + fn new( + bulk_parts: &'a RwLock, + file_ids: &'a HashSet, + merge_encoded: bool, + ) -> Self { + Self { + bulk_parts, + file_ids, + merge_encoded, + success: false, + } + } + + /// Marks the merge operation as successful. + /// When this is called, the guard will not reset the flags on drop. + fn mark_success(&mut self) { + self.success = true; + } +} + +impl<'a> Drop for MergingFlagsGuard<'a> { + fn drop(&mut self) { + if !self.success + && let Ok(mut parts) = self.bulk_parts.write() + { + parts.reset_merging_flags(self.file_ids, self.merge_encoded); + } + } } /// Memtable that ingests and scans parts directly. @@ -78,6 +246,14 @@ pub struct BulkMemtable { /// Cached flat SST arrow schema for memtable compaction. #[allow(dead_code)] flat_arrow_schema: SchemaRef, + /// Compactor for merging bulk parts + compactor: Arc>, + /// Dispatcher for scheduling compaction tasks + compact_dispatcher: Option>, + /// Whether the append mode is enabled + append_mode: bool, + /// Mode to handle duplicate rows while merging + merge_mode: MergeMode, } impl std::fmt::Debug for BulkMemtable { @@ -115,8 +291,8 @@ impl Memtable for BulkMemtable { let local_metrics = WriteMetrics { key_bytes: 0, value_bytes: fragment.estimated_size(), - min_ts: fragment.min_ts, - max_ts: fragment.max_ts, + min_ts: fragment.min_timestamp, + max_ts: fragment.max_timestamp, num_rows: fragment.num_rows(), max_sequence: fragment.sequence, }; @@ -126,6 +302,7 @@ impl Memtable for BulkMemtable { bulk_parts.parts.push(BulkPartWrapper { part: fragment, file_id: FileId::random(), + merging: false, }); // Since this operation should be fast, we do it in parts lock scope. @@ -135,6 +312,10 @@ impl Memtable for BulkMemtable { self.update_stats(local_metrics); } + if self.should_compact() { + self.schedule_compact(); + } + Ok(()) } @@ -157,6 +338,7 @@ impl Memtable for BulkMemtable { let mut ranges = BTreeMap::new(); let mut range_id = 0; + // TODO(yingwen): Filter ranges by sequence. let context = Arc::new(BulkIterContext::new( self.metadata.clone(), &projection, @@ -286,8 +468,46 @@ impl Memtable for BulkMemtable { max_sequence: AtomicU64::new(0), num_rows: AtomicUsize::new(0), flat_arrow_schema, + compactor: Arc::new(Mutex::new(MemtableCompactor::new(metadata.region_id, id))), + compact_dispatcher: self.compact_dispatcher.clone(), + append_mode: self.append_mode, + merge_mode: self.merge_mode, }) } + + fn compact(&self, for_flush: bool) -> Result<()> { + let mut compactor = self.compactor.lock().unwrap(); + + if for_flush { + return Ok(()); + } + + // Try to merge regular parts first + let should_merge = self.parts.read().unwrap().should_merge_bulk_parts(); + if should_merge { + compactor.merge_bulk_parts( + &self.flat_arrow_schema, + &self.parts, + &self.metadata, + !self.append_mode, + self.merge_mode, + )?; + } + + // Then try to merge encoded parts + let should_merge = self.parts.read().unwrap().should_merge_encoded_parts(); + if should_merge { + compactor.merge_encoded_parts( + &self.flat_arrow_schema, + &self.parts, + &self.metadata, + !self.append_mode, + self.merge_mode, + )?; + } + + Ok(()) + } } impl BulkMemtable { @@ -296,12 +516,16 @@ impl BulkMemtable { id: MemtableId, metadata: RegionMetadataRef, write_buffer_manager: Option, + compact_dispatcher: Option>, + append_mode: bool, + merge_mode: MergeMode, ) -> Self { let flat_arrow_schema = to_flat_sst_arrow_schema( &metadata, &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding), ); + let region_id = metadata.region_id; Self { id, parts: Arc::new(RwLock::new(BulkParts::default())), @@ -312,6 +536,10 @@ impl BulkMemtable { max_sequence: AtomicU64::new(0), num_rows: AtomicUsize::new(0), flat_arrow_schema, + compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id))), + compact_dispatcher, + append_mode, + merge_mode, } } @@ -340,6 +568,33 @@ impl BulkMemtable { .map(|part_wrapper| part_wrapper.part.estimated_series_count()) .sum() } + + /// Returns whether the memtable should be compacted. + fn should_compact(&self) -> bool { + let parts = self.parts.read().unwrap(); + parts.should_merge_bulk_parts() || parts.should_merge_encoded_parts() + } + + /// Schedules a compaction task using the CompactDispatcher. + fn schedule_compact(&self) { + if let Some(dispatcher) = &self.compact_dispatcher { + let task = MemCompactTask { + metadata: self.metadata.clone(), + parts: self.parts.clone(), + flat_arrow_schema: self.flat_arrow_schema.clone(), + compactor: self.compactor.clone(), + append_mode: self.append_mode, + merge_mode: self.merge_mode, + }; + + dispatcher.dispatch_compact(task); + } else { + // Uses synchronous compaction if no dispatcher is available. + if let Err(e) = self.compact(false) { + common_telemetry::error!(e; "Failed to compact table"); + } + } + } } /// Iterator builder for bulk range @@ -373,6 +628,10 @@ impl IterBuilder for BulkRangeIterBuilder { Ok(Box::new(iter)) } + + fn encoded_range(&self) -> Option { + None + } } /// Iterator builder for encoded bulk range @@ -407,6 +666,13 @@ impl IterBuilder for EncodedBulkRangeIterBuilder { Ok(Box::new(std::iter::empty())) } } + + fn encoded_range(&self) -> Option { + Some(EncodedRange { + data: self.part.data().clone(), + sst_info: self.part.to_sst_info(self.file_id), + }) + } } struct BulkPartWrapper { @@ -414,6 +680,8 @@ struct BulkPartWrapper { /// The unique file id for this part in memtable. #[allow(dead_code)] file_id: FileId, + /// Whether this part is currently being merged. + merging: bool, } struct EncodedPartWrapper { @@ -421,21 +689,405 @@ struct EncodedPartWrapper { /// The unique file id for this part in memtable. #[allow(dead_code)] file_id: FileId, + /// Whether this part is currently being merged. + merging: bool, +} + +/// Enum to wrap different types of parts for unified merging. +#[derive(Clone)] +enum PartToMerge { + /// Raw bulk part. + Bulk { part: BulkPart, file_id: FileId }, + /// Encoded bulk part. + Encoded { + part: EncodedBulkPart, + file_id: FileId, + }, +} + +impl PartToMerge { + /// Gets the file ID of this part. + fn file_id(&self) -> FileId { + match self { + PartToMerge::Bulk { file_id, .. } => *file_id, + PartToMerge::Encoded { file_id, .. } => *file_id, + } + } + + /// Gets the minimum timestamp of this part. + fn min_timestamp(&self) -> i64 { + match self { + PartToMerge::Bulk { part, .. } => part.min_timestamp, + PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp, + } + } + + /// Gets the maximum timestamp of this part. + fn max_timestamp(&self) -> i64 { + match self { + PartToMerge::Bulk { part, .. } => part.max_timestamp, + PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp, + } + } + + /// Gets the number of rows in this part. + fn num_rows(&self) -> usize { + match self { + PartToMerge::Bulk { part, .. } => part.num_rows(), + PartToMerge::Encoded { part, .. } => part.metadata().num_rows, + } + } + + /// Creates a record batch iterator for this part. + fn create_iterator( + self, + context: Arc, + ) -> Result> { + match self { + PartToMerge::Bulk { part, .. } => { + let iter = BulkPartRecordBatchIter::new( + part.batch, context, None, // No sequence filter for merging + ); + Ok(Some(Box::new(iter) as BoxedRecordBatchIterator)) + } + PartToMerge::Encoded { part, .. } => part.read(context, None), + } + } +} + +struct MemtableCompactor { + region_id: RegionId, + memtable_id: MemtableId, +} + +impl MemtableCompactor { + /// Creates a new MemtableCompactor. + fn new(region_id: RegionId, memtable_id: MemtableId) -> Self { + Self { + region_id, + memtable_id, + } + } + + /// Merges bulk parts and then encodes the result to an [EncodedBulkPart]. + fn merge_bulk_parts( + &mut self, + arrow_schema: &SchemaRef, + bulk_parts: &RwLock, + metadata: &RegionMetadataRef, + dedup: bool, + merge_mode: MergeMode, + ) -> Result<()> { + let start = Instant::now(); + + // Collects unmerged parts and mark them as being merged + let parts_to_merge = bulk_parts.write().unwrap().collect_bulk_parts_to_merge(); + if parts_to_merge.is_empty() { + return Ok(()); + } + + let merged_file_ids: HashSet = + parts_to_merge.iter().map(|part| part.file_id()).collect(); + let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, false); + + // Sorts parts by row count (ascending) to merge parts with similar row counts. + let mut sorted_parts = parts_to_merge; + sorted_parts.sort_unstable_by_key(|part| part.num_rows()); + + // Groups parts into chunks for concurrent processing. + let part_groups: Vec> = sorted_parts + .chunks(16) + .map(|chunk| chunk.to_vec()) + .collect(); + + let total_groups = part_groups.len(); + let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum(); + let merged_parts = part_groups + .into_par_iter() + .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode)) + .collect::>>>()?; + + // Installs merged parts. + let total_output_rows = { + let mut parts = bulk_parts.write().unwrap(); + parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, false) + }; + + guard.mark_success(); + + common_telemetry::debug!( + "BulkMemtable {} {} concurrent compact {} groups, {} bulk parts, {} rows, cost: {:?}", + self.region_id, + self.memtable_id, + total_groups, + total_parts_to_merge, + total_output_rows, + start.elapsed() + ); + + Ok(()) + } + + /// Merges encoded parts and then encodes the result to an [EncodedBulkPart]. + fn merge_encoded_parts( + &mut self, + arrow_schema: &SchemaRef, + bulk_parts: &RwLock, + metadata: &RegionMetadataRef, + dedup: bool, + merge_mode: MergeMode, + ) -> Result<()> { + let start = Instant::now(); + + // Collects unmerged encoded parts within size threshold and mark them as being merged. + let parts_to_merge = { + let mut parts = bulk_parts.write().unwrap(); + parts.collect_encoded_parts_to_merge() + }; + + if parts_to_merge.is_empty() { + return Ok(()); + } + + let merged_file_ids: HashSet = + parts_to_merge.iter().map(|part| part.file_id()).collect(); + let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, true); + + if parts_to_merge.len() == 1 { + // Only 1 part, don't have to merge - the guard will automatically reset the flag + return Ok(()); + } + + // Groups parts into chunks for concurrent processing. + let part_groups: Vec> = parts_to_merge + .chunks(16) + .map(|chunk| chunk.to_vec()) + .collect(); + + let total_groups = part_groups.len(); + let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum(); + + let merged_parts = part_groups + .into_par_iter() + .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode)) + .collect::>>>()?; + + // Installs merged parts using iterator and get total output rows + let total_output_rows = { + let mut parts = bulk_parts.write().unwrap(); + parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, true) + }; + + // Marks the operation as successful to prevent flag reset + guard.mark_success(); + + common_telemetry::debug!( + "BulkMemtable {} {} concurrent compact {} groups, {} encoded parts, {} rows, cost: {:?}", + self.region_id, + self.memtable_id, + total_groups, + total_parts_to_merge, + total_output_rows, + start.elapsed() + ); + + Ok(()) + } + + /// Merges a group of parts into a single encoded part. + fn merge_parts_group( + parts_to_merge: Vec, + arrow_schema: &SchemaRef, + metadata: &RegionMetadataRef, + dedup: bool, + merge_mode: MergeMode, + ) -> Result> { + if parts_to_merge.is_empty() { + return Ok(None); + } + + // Calculates timestamp bounds for merged data + let min_timestamp = parts_to_merge + .iter() + .map(|p| p.min_timestamp()) + .min() + .unwrap_or(i64::MAX); + let max_timestamp = parts_to_merge + .iter() + .map(|p| p.max_timestamp()) + .max() + .unwrap_or(i64::MIN); + + let context = Arc::new(BulkIterContext::new( + metadata.clone(), + &None, // No column projection for merging + None, // No predicate for merging + )); + + // Creates iterators for all parts to merge. + let iterators: Vec = parts_to_merge + .into_iter() + .filter_map(|part| part.create_iterator(context.clone()).ok().flatten()) + .collect(); + + if iterators.is_empty() { + return Ok(None); + } + + let merged_iter = + FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?; + + let boxed_iter: BoxedRecordBatchIterator = if dedup { + // Applies deduplication based on merge mode + match merge_mode { + MergeMode::LastRow => { + let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false)); + Box::new(dedup_iter) + } + MergeMode::LastNonNull => { + // Calculates field column start: total columns - fixed columns - field columns + // Field column count = total metadata columns - time index column - primary key columns + let field_column_count = + metadata.column_metadatas.len() - 1 - metadata.primary_key.len(); + let total_columns = arrow_schema.fields().len(); + let field_column_start = + total_columns - FIXED_POS_COLUMN_NUM - field_column_count; + + let dedup_iter = FlatDedupIterator::new( + merged_iter, + FlatLastNonNull::new(field_column_start, false), + ); + Box::new(dedup_iter) + } + } + } else { + Box::new(merged_iter) + }; + + // Encodes the merged iterator + let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?; + let mut metrics = BulkPartEncodeMetrics::default(); + let encoded_part = encoder.encode_record_batch_iter( + boxed_iter, + arrow_schema.clone(), + min_timestamp, + max_timestamp, + &mut metrics, + )?; + + common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics); + + Ok(encoded_part) + } +} + +/// A memtable compact task to run in background. +struct MemCompactTask { + metadata: RegionMetadataRef, + parts: Arc>, + + /// Cached flat SST arrow schema + flat_arrow_schema: SchemaRef, + /// Compactor for merging bulk parts + compactor: Arc>, + /// Whether the append mode is enabled + append_mode: bool, + /// Mode to handle duplicate rows while merging + merge_mode: MergeMode, +} + +impl MemCompactTask { + fn compact(&self) -> Result<()> { + let mut compactor = self.compactor.lock().unwrap(); + + // Tries to merge regular parts first + let should_merge = self.parts.read().unwrap().should_merge_bulk_parts(); + if should_merge { + compactor.merge_bulk_parts( + &self.flat_arrow_schema, + &self.parts, + &self.metadata, + !self.append_mode, + self.merge_mode, + )?; + } + + // Then tries to merge encoded parts + let should_merge = self.parts.read().unwrap().should_merge_encoded_parts(); + if should_merge { + compactor.merge_encoded_parts( + &self.flat_arrow_schema, + &self.parts, + &self.metadata, + !self.append_mode, + self.merge_mode, + )?; + } + + Ok(()) + } +} + +/// Scheduler to run compact tasks in background. +#[derive(Debug)] +pub struct CompactDispatcher { + semaphore: Arc, +} + +impl CompactDispatcher { + /// Creates a new dispatcher with the given number of max concurrent tasks. + pub fn new(permits: usize) -> Self { + Self { + semaphore: Arc::new(Semaphore::new(permits)), + } + } + + /// Dispatches a compact task to run in background. + fn dispatch_compact(&self, task: MemCompactTask) { + let semaphore = self.semaphore.clone(); + common_runtime::spawn_global(async move { + let Ok(_permit) = semaphore.acquire().await else { + return; + }; + + common_runtime::spawn_blocking_global(move || { + if let Err(e) = task.compact() { + common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id); + } + }); + }); + } } /// Builder to build a [BulkMemtable]. #[derive(Debug, Default)] pub struct BulkMemtableBuilder { write_buffer_manager: Option, + compact_dispatcher: Option>, + append_mode: bool, + merge_mode: MergeMode, } impl BulkMemtableBuilder { /// Creates a new builder with specific `write_buffer_manager`. - pub fn new(write_buffer_manager: Option) -> Self { + pub fn new( + write_buffer_manager: Option, + append_mode: bool, + merge_mode: MergeMode, + ) -> Self { Self { write_buffer_manager, + compact_dispatcher: None, + append_mode, + merge_mode, } } + + /// Sets the compact dispatcher. + pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc) -> Self { + self.compact_dispatcher = Some(compact_dispatcher); + self + } } impl MemtableBuilder for BulkMemtableBuilder { @@ -444,6 +1096,9 @@ impl MemtableBuilder for BulkMemtableBuilder { id, metadata.clone(), self.write_buffer_manager.clone(), + self.compact_dispatcher.clone(), + self.append_mode, + self.merge_mode, )) } @@ -497,7 +1152,8 @@ mod tests { #[test] fn test_bulk_memtable_write_read() { let metadata = metadata_for_test(); - let memtable = BulkMemtable::new(999, metadata.clone(), None); + let memtable = + BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow); let test_data = vec![ ( @@ -559,7 +1215,8 @@ mod tests { #[test] fn test_bulk_memtable_ranges_with_projection() { let metadata = metadata_for_test(); - let memtable = BulkMemtable::new(111, metadata.clone(), None); + let memtable = + BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow); let bulk_part = create_bulk_part_with_converter( "projection_test", @@ -597,7 +1254,8 @@ mod tests { #[test] fn test_bulk_memtable_unsupported_operations() { let metadata = metadata_for_test(); - let memtable = BulkMemtable::new(111, metadata.clone(), None); + let memtable = + BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow); let key_values = build_key_values_with_ts_seq_values( &metadata, @@ -619,7 +1277,8 @@ mod tests { #[test] fn test_bulk_memtable_freeze() { let metadata = metadata_for_test(); - let memtable = BulkMemtable::new(222, metadata.clone(), None); + let memtable = + BulkMemtable::new(222, metadata.clone(), None, None, false, MergeMode::LastRow); let bulk_part = create_bulk_part_with_converter( "freeze_test", @@ -640,7 +1299,8 @@ mod tests { #[test] fn test_bulk_memtable_fork() { let metadata = metadata_for_test(); - let original_memtable = BulkMemtable::new(333, metadata.clone(), None); + let original_memtable = + BulkMemtable::new(333, metadata.clone(), None, None, false, MergeMode::LastRow); let bulk_part = create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500) @@ -657,4 +1317,120 @@ mod tests { assert!(!original_memtable.is_empty()); assert_eq!(1, original_memtable.stats().num_rows); } + + #[test] + fn test_bulk_memtable_ranges_multiple_parts() { + let metadata = metadata_for_test(); + let memtable = + BulkMemtable::new(777, metadata.clone(), None, None, false, MergeMode::LastRow); + + let parts_data = vec![ + ( + "part1", + 1u32, + vec![1000i64, 1100i64], + vec![Some(10.0), Some(11.0)], + 100u64, + ), + ( + "part2", + 2u32, + vec![2000i64, 2100i64], + vec![Some(20.0), Some(21.0)], + 200u64, + ), + ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64), + ]; + + for (k0, k1, timestamps, values, seq) in parts_data { + let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap(); + memtable.write_bulk(part).unwrap(); + } + + let predicate_group = PredicateGroup::new(&metadata, &[]); + let ranges = memtable.ranges(None, predicate_group, None).unwrap(); + + assert_eq!(3, ranges.ranges.len()); + assert_eq!(5, ranges.stats.num_rows); + assert_eq!(3, ranges.stats.num_ranges); + + for (range_id, range) in ranges.ranges.iter() { + assert!(*range_id < 3); + assert!(range.num_rows() > 0); + assert!(range.is_record_batch()); + } + } + + #[test] + fn test_bulk_memtable_ranges_with_sequence_filter() { + let metadata = metadata_for_test(); + let memtable = + BulkMemtable::new(888, metadata.clone(), None, None, false, MergeMode::LastRow); + + let part = create_bulk_part_with_converter( + "seq_test", + 1, + vec![1000, 2000, 3000], + vec![Some(10.0), Some(20.0), Some(30.0)], + 500, + ) + .unwrap(); + + memtable.write_bulk(part).unwrap(); + + let predicate_group = PredicateGroup::new(&metadata, &[]); + let sequence_filter = Some(400u64); // Filters out rows with sequence > 400 + let ranges = memtable + .ranges(None, predicate_group, sequence_filter) + .unwrap(); + + assert_eq!(1, ranges.ranges.len()); + let range = ranges.ranges.get(&0).unwrap(); + + let mut record_batch_iter = range.build_record_batch_iter(None).unwrap(); + assert!(record_batch_iter.next().is_none()); + } + + #[test] + fn test_bulk_memtable_ranges_with_encoded_parts() { + let metadata = metadata_for_test(); + let memtable = + BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow); + + // Adds enough bulk parts to trigger encoding + for i in 0..10 { + let part = create_bulk_part_with_converter( + &format!("key_{}", i), + i, + vec![1000 + i as i64 * 100], + vec![Some(i as f64 * 10.0)], + 100 + i as u64, + ) + .unwrap(); + memtable.write_bulk(part).unwrap(); + } + + memtable.compact(false).unwrap(); + + let predicate_group = PredicateGroup::new(&metadata, &[]); + let ranges = memtable.ranges(None, predicate_group, None).unwrap(); + + // Should have ranges for both bulk parts and encoded parts + assert_eq!(3, ranges.ranges.len()); + assert_eq!(10, ranges.stats.num_rows); + + for (_range_id, range) in ranges.ranges.iter() { + assert!(range.num_rows() > 0); + assert!(range.is_record_batch()); + + let record_batch_iter = range.build_record_batch_iter(None).unwrap(); + let mut total_rows = 0; + for batch_result in record_batch_iter { + let batch = batch_result.unwrap(); + total_rows += batch.num_rows(); + assert!(batch.num_rows() > 0); + } + assert_eq!(total_rows, range.num_rows()); + } + } } diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index d045a5e793..1507668fdf 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -16,6 +16,7 @@ use std::collections::VecDeque; use std::sync::Arc; +use std::time::{Duration, Instant}; use api::helper::{ColumnDataTypeWrapper, value_to_grpc_value}; use api::v1::bulk_wal_entry::Body; @@ -23,6 +24,7 @@ use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry}; use bytes::Bytes; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; use common_recordbatch::DfRecordBatch as RecordBatch; +use common_time::Timestamp; use common_time::timestamp::TimeUnit; use datatypes::arrow; use datatypes::arrow::array::{ @@ -55,24 +57,28 @@ use table::predicate::Predicate; use crate::error::{ self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu, - EncodeSnafu, NewRecordBatchSnafu, Result, + EncodeSnafu, InvalidMetadataSnafu, NewRecordBatchSnafu, Result, }; use crate::memtable::BoxedRecordBatchIterator; use crate::memtable::bulk::context::BulkIterContextRef; use crate::memtable::bulk::part_reader::EncodedBulkPartIter; use crate::memtable::time_series::{ValueBuilder, Values}; +use crate::sst::file::FileId; +use crate::sst::index::IndexOutput; use crate::sst::parquet::flat_format::primary_key_column_index; use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat}; use crate::sst::parquet::helper::parse_parquet_metadata; +use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo}; use crate::sst::to_sst_arrow_schema; const INIT_DICT_VALUE_CAPACITY: usize = 8; +/// A raw bulk part in the memtable. #[derive(Clone)] pub struct BulkPart { pub batch: RecordBatch, - pub max_ts: i64, - pub min_ts: i64, + pub max_timestamp: i64, + pub min_timestamp: i64, pub sequence: u64, pub timestamp_index: usize, pub raw_data: Option, @@ -91,8 +97,8 @@ impl TryFrom for BulkPart { .context(error::ConvertBulkWalEntrySnafu)?; Ok(Self { batch, - max_ts: value.max_ts, - min_ts: value.min_ts, + max_timestamp: value.max_ts, + min_timestamp: value.min_ts, sequence: value.sequence, timestamp_index: value.timestamp_index as usize, raw_data: Some(ipc), @@ -109,8 +115,8 @@ impl TryFrom<&BulkPart> for BulkWalEntry { if let Some(ipc) = &value.raw_data { Ok(BulkWalEntry { sequence: value.sequence, - max_ts: value.max_ts, - min_ts: value.min_ts, + max_ts: value.max_timestamp, + min_ts: value.min_timestamp, timestamp_index: value.timestamp_index as u32, body: Some(Body::ArrowIpc(ipc.clone())), }) @@ -130,8 +136,8 @@ impl TryFrom<&BulkPart> for BulkWalEntry { })?; Ok(BulkWalEntry { sequence: value.sequence, - max_ts: value.max_ts, - min_ts: value.min_ts, + max_ts: value.max_timestamp, + min_ts: value.min_timestamp, timestamp_index: value.timestamp_index as u32, body: Some(Body::ArrowIpc(ArrowIpc { schema: schema_bytes, @@ -145,12 +151,7 @@ impl TryFrom<&BulkPart> for BulkWalEntry { impl BulkPart { pub(crate) fn estimated_size(&self) -> usize { - self.batch - .columns() - .iter() - // If can not get slice memory size, assume 0 here. - .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0)) - .sum() + record_batch_estimated_size(&self.batch) } /// Returns the estimated series count in this BulkPart. @@ -230,6 +231,16 @@ impl BulkPart { } } +/// More accurate estimation of the size of a record batch. +pub(crate) fn record_batch_estimated_size(batch: &RecordBatch) -> usize { + batch + .columns() + .iter() + // If can not get slice memory size, assume 0 here. + .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0)) + .sum() +} + /// Primary key column builder for handling strings specially. enum PrimaryKeyColumnBuilder { /// String dictionary builder for string types. @@ -435,8 +446,8 @@ impl BulkPartConverter { Ok(BulkPart { batch, - max_ts: self.max_ts, - min_ts: self.min_ts, + max_timestamp: self.max_ts, + min_timestamp: self.min_ts, sequence: self.max_sequence, timestamp_index, raw_data: None, @@ -517,6 +528,39 @@ impl EncodedBulkPart { &self.metadata } + /// Returns the size of the encoded data in bytes + pub(crate) fn size_bytes(&self) -> usize { + self.data.len() + } + + /// Returns the encoded data. + pub(crate) fn data(&self) -> &Bytes { + &self.data + } + + /// Converts this `EncodedBulkPart` to `SstInfo`. + /// + /// # Arguments + /// * `file_id` - The SST file ID to assign to this part + /// + /// # Returns + /// Returns a `SstInfo` instance with information derived from this bulk part's metadata + pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo { + let unit = self.metadata.region_metadata.time_index_type().unit(); + SstInfo { + file_id, + time_range: ( + Timestamp::new(self.metadata.min_timestamp, unit), + Timestamp::new(self.metadata.max_timestamp, unit), + ), + file_size: self.data.len() as u64, + num_rows: self.metadata.num_rows, + num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64, + file_metadata: Some(self.metadata.parquet_metadata.clone()), + index_metadata: IndexOutput::default(), + } + } + pub(crate) fn read( &self, context: BulkIterContextRef, @@ -555,6 +599,21 @@ pub struct BulkPartMeta { pub region_metadata: RegionMetadataRef, } +/// Metrics for encoding a part. +#[derive(Default, Debug)] +pub struct BulkPartEncodeMetrics { + /// Cost of iterating over the data. + pub iter_cost: Duration, + /// Cost of writing the data. + pub write_cost: Duration, + /// Size of data before encoding. + pub raw_size: usize, + /// Size of data after encoding. + pub encoded_size: usize, + /// Number of rows in part. + pub num_rows: usize, +} + pub struct BulkPartEncoder { metadata: RegionMetadataRef, row_group_size: usize, @@ -562,22 +621,91 @@ pub struct BulkPartEncoder { } impl BulkPartEncoder { - pub(crate) fn new(metadata: RegionMetadataRef, row_group_size: usize) -> BulkPartEncoder { + pub(crate) fn new( + metadata: RegionMetadataRef, + row_group_size: usize, + ) -> Result { + // TODO(yingwen): Skip arrow schema if needed. + let json = metadata.to_json().context(InvalidMetadataSnafu)?; + let key_value_meta = + parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json); + + // TODO(yingwen): Do we need compression? let writer_props = Some( WriterProperties::builder() + .set_key_value_metadata(Some(vec![key_value_meta])) .set_write_batch_size(row_group_size) .set_max_row_group_size(row_group_size) .build(), ); - Self { + + Ok(Self { metadata, row_group_size, writer_props, - } + }) } } impl BulkPartEncoder { + /// Encodes [BoxedRecordBatchIterator] into [EncodedBulkPart] with min/max timestamps. + pub fn encode_record_batch_iter( + &self, + iter: BoxedRecordBatchIterator, + arrow_schema: SchemaRef, + min_timestamp: i64, + max_timestamp: i64, + metrics: &mut BulkPartEncodeMetrics, + ) -> Result> { + let mut buf = Vec::with_capacity(4096); + let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone()) + .context(EncodeMemtableSnafu)?; + let mut total_rows = 0; + + // Process each batch from the iterator + let mut iter_start = Instant::now(); + for batch_result in iter { + metrics.iter_cost += iter_start.elapsed(); + let batch = batch_result?; + if batch.num_rows() == 0 { + continue; + } + + metrics.raw_size += record_batch_estimated_size(&batch); + let write_start = Instant::now(); + writer.write(&batch).context(EncodeMemtableSnafu)?; + metrics.write_cost += write_start.elapsed(); + total_rows += batch.num_rows(); + iter_start = Instant::now(); + } + metrics.iter_cost += iter_start.elapsed(); + iter_start = Instant::now(); + + if total_rows == 0 { + return Ok(None); + } + + let close_start = Instant::now(); + let file_metadata = writer.close().context(EncodeMemtableSnafu)?; + metrics.write_cost += close_start.elapsed(); + metrics.encoded_size += buf.len(); + metrics.num_rows += total_rows; + + let buf = Bytes::from(buf); + let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?); + + Ok(Some(EncodedBulkPart { + data: buf, + metadata: BulkPartMeta { + num_rows: total_rows, + max_timestamp, + min_timestamp, + parquet_metadata, + region_metadata: self.metadata.clone(), + }, + })) + } + /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data. fn encode_part(&self, part: &BulkPart) -> Result> { if part.batch.num_rows() == 0 { @@ -602,8 +730,8 @@ impl BulkPartEncoder { data: buf, metadata: BulkPartMeta { num_rows: part.batch.num_rows(), - max_timestamp: part.max_ts, - min_timestamp: part.min_ts, + max_timestamp: part.max_timestamp, + min_timestamp: part.min_timestamp, parquet_metadata, region_metadata: self.metadata.clone(), }, @@ -1208,7 +1336,7 @@ mod tests { converter.append_key_values(&kv).unwrap(); } let part = converter.convert().unwrap(); - let encoder = BulkPartEncoder::new(metadata, 1024); + let encoder = BulkPartEncoder::new(metadata, 1024).unwrap(); encoder.encode_part(&part).unwrap().unwrap() } @@ -1287,7 +1415,7 @@ mod tests { converter.append_key_values(&kv).unwrap(); } let part = converter.convert().unwrap(); - let encoder = BulkPartEncoder::new(metadata, 1024); + let encoder = BulkPartEncoder::new(metadata, 1024).unwrap(); encoder.encode_part(&part).unwrap().unwrap() } @@ -1417,8 +1545,8 @@ mod tests { let bulk_part = converter.convert().unwrap(); assert_eq!(bulk_part.num_rows(), 3); - assert_eq!(bulk_part.min_ts, 1000); - assert_eq!(bulk_part.max_ts, 2000); + assert_eq!(bulk_part.min_timestamp, 1000); + assert_eq!(bulk_part.max_timestamp, 2000); assert_eq!(bulk_part.sequence, 2); assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4); @@ -1535,8 +1663,8 @@ mod tests { let bulk_part = converter.convert().unwrap(); assert_eq!(bulk_part.num_rows(), 0); - assert_eq!(bulk_part.min_ts, i64::MAX); - assert_eq!(bulk_part.max_ts, i64::MIN); + assert_eq!(bulk_part.min_timestamp, i64::MAX); + assert_eq!(bulk_part.max_timestamp, i64::MIN); assert_eq!(bulk_part.sequence, SequenceNumber::MIN); // Validate primary key columns are present in schema even for empty batch @@ -1597,8 +1725,8 @@ mod tests { let bulk_part = converter.convert().unwrap(); assert_eq!(bulk_part.num_rows(), 3); - assert_eq!(bulk_part.min_ts, 1000); - assert_eq!(bulk_part.max_ts, 2000); + assert_eq!(bulk_part.min_timestamp, 1000); + assert_eq!(bulk_part.max_timestamp, 2000); assert_eq!(bulk_part.sequence, 2); assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4); @@ -1801,8 +1929,8 @@ mod tests { let bulk_part = converter.convert().unwrap(); assert_eq!(bulk_part.num_rows(), 3); - assert_eq!(bulk_part.min_ts, 1000); - assert_eq!(bulk_part.max_ts, 2000); + assert_eq!(bulk_part.min_timestamp, 1000); + assert_eq!(bulk_part.max_timestamp, 2000); assert_eq!(bulk_part.sequence, 2); assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4); diff --git a/src/mito2/src/memtable/simple_bulk_memtable.rs b/src/mito2/src/memtable/simple_bulk_memtable.rs index e1d4430a4d..31cbf097d6 100644 --- a/src/mito2/src/memtable/simple_bulk_memtable.rs +++ b/src/mito2/src/memtable/simple_bulk_memtable.rs @@ -205,8 +205,8 @@ impl Memtable for SimpleBulkMemtable { self.update_stats(WriteMetrics { key_bytes: 0, value_bytes: part.estimated_size(), - min_ts: part.min_ts, - max_ts: part.max_ts, + min_ts: part.min_timestamp, + max_ts: part.max_timestamp, num_rows: part.num_rows(), max_sequence: sequence, }); @@ -717,8 +717,8 @@ mod tests { let part = BulkPart { batch: rb, sequence: 1, - min_ts: 1, - max_ts: 2, + min_timestamp: 1, + max_timestamp: 2, timestamp_index: 0, raw_data: None, }; @@ -883,8 +883,8 @@ mod tests { memtable .write_bulk(BulkPart { batch: rb_with_large_string(0, i32::MAX, ®ion_meta), - max_ts: 0, - min_ts: 0, + max_timestamp: 0, + min_timestamp: 0, sequence: 0, timestamp_index: 1, raw_data: None, @@ -895,8 +895,8 @@ mod tests { memtable .write_bulk(BulkPart { batch: rb_with_large_string(1, 3, ®ion_meta), - max_ts: 1, - min_ts: 1, + max_timestamp: 1, + min_timestamp: 1, sequence: 1, timestamp_index: 1, raw_data: None, diff --git a/src/mito2/src/memtable/time_partition.rs b/src/mito2/src/memtable/time_partition.rs index 9c77f61f3a..53d4a8f74a 100644 --- a/src/mito2/src/memtable/time_partition.rs +++ b/src/mito2/src/memtable/time_partition.rs @@ -189,8 +189,8 @@ pub fn filter_record_batch(part: &BulkPart, min: i64, max: i64) -> Result