From e03ee25214a6c67712b3498fc7e18ea1f926d7d1 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Thu, 21 May 2026 09:46:21 +0800 Subject: [PATCH] fix: address review comments --- src/mito2/src/compaction/task.rs | 2 +- src/mito2/src/engine/flush_hook.rs | 2 +- src/mito2/src/flush.rs | 38 ++++++++++++++++++++---------- 3 files changed, 27 insertions(+), 15 deletions(-) diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index 91014db837..4c52d8a9df 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -292,8 +292,8 @@ impl CompactionTaskImpl { .iter() .zip(merge_output.files_to_add.iter()) .map(|(info, meta)| SstFileInfo { - file_meta: meta.clone(), sst_info_ref: info, + file_meta: meta, }) .collect(); hook.on_sst_files_written( diff --git a/src/mito2/src/engine/flush_hook.rs b/src/mito2/src/engine/flush_hook.rs index 9e0528e173..2ad3c1d229 100644 --- a/src/mito2/src/engine/flush_hook.rs +++ b/src/mito2/src/engine/flush_hook.rs @@ -28,7 +28,7 @@ use crate::sst::parquet::SstInfo; /// Information about a single SST file written during flush. pub struct SstFileInfo<'a> { pub sst_info_ref: &'a SstInfo, - pub file_meta: FileMeta, + pub file_meta: &'a FileMeta, } /// Extension hook for flush operations. diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 03cd96b9c5..ad4ea9ab50 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -82,18 +82,17 @@ impl Iterator for PkCollectingIter { fn next(&mut self) -> Option { let batch = self.inner.next(); - if let Some(Ok(ref record_batch)) = batch { - let pk_col_idx = record_batch.num_columns().saturating_sub(3); - if let Some(pk_col) = record_batch.columns().get(pk_col_idx) - && let Some(pk_dict) = pk_col - .as_any() - .downcast_ref::>() - && let Some(pk_values) = pk_dict.values().as_any().downcast_ref::() - { - let mut keys = self.primary_keys.lock().unwrap(); - for i in 0..pk_values.len() { - keys.insert(pk_values.value(i).to_vec()); - } + if let Some(Ok(ref record_batch)) = batch + && let Some(pk_col) = + record_batch.column_by_name(store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME) + && let Some(pk_dict) = pk_col + .as_any() + .downcast_ref::>() + && let Some(pk_values) = pk_dict.values().as_any().downcast_ref::() + { + let mut keys = self.primary_keys.lock().unwrap(); + for i in 0..pk_values.len() { + keys.insert(pk_values.value(i).to_vec()); } } batch @@ -401,6 +400,8 @@ impl RegionFlushTask { /// Flushes memtables to level 0 SSTs and updates the manifest. /// Returns the [RegionEdit] to apply. async fn flush_memtables(&self, version_data: &VersionControlData) -> Result { + // We must use the immutable memtables list and entry ids from the `version_data` + // for consistency as others might already modify the version in the `version_control`. let version = &version_data.version; let timer = FLUSH_ELAPSED .with_label_values(&["flush_memtables"]) @@ -457,7 +458,7 @@ impl RegionFlushTask { .zip(file_metas.iter()) .map(|(sst_info, file_meta)| SstFileInfo { sst_info_ref: sst_info, - file_meta: file_meta.clone(), + file_meta, }) .collect(); hook.on_sst_files_written(self.region_id, &version.metadata, &files, &primary_keys) @@ -469,6 +470,7 @@ impl RegionFlushTask { files_to_remove: Vec::new(), timestamp_ms: Some(chrono::Utc::now().timestamp_millis()), compaction_time_window: None, + // The last entry has been flushed. flushed_entry_id: Some(version_data.last_entry_id), flushed_sequence: Some(version_data.committed_sequence), committed_sequence: None, @@ -483,6 +485,7 @@ impl RegionFlushTask { let expected_state = if matches!(self.reason, FlushReason::Downgrading) { RegionLeaderState::Downgrading } else { + // Check if region is in staging mode let current_state = self.manifest_ctx.current_state(); if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) { RegionLeaderState::Staging @@ -490,6 +493,8 @@ impl RegionFlushTask { RegionLeaderState::Writable } }; + // We will leak files if the manifest update fails, but we ignore them for simplicity. We can + // add a cleanup job to remove them later. let manifest_version = self .manifest_ctx .update_manifest(expected_state, action_list, self.is_staging) @@ -527,9 +532,11 @@ impl RegionFlushTask { let mut all_sst_infos = Vec::new(); for mem in memtables { if mem.is_empty() { + // Skip empty memtables. continue; } + // Compact the memtable first, this waits the background compaction to finish. let compact_start = std::time::Instant::now(); if let Err(e) = mem.compact(true) { common_telemetry::error!(e; "Failed to compact memtable before flush"); @@ -537,12 +544,16 @@ impl RegionFlushTask { let compact_cost = compact_start.elapsed(); flush_metrics.compact_memtable += compact_cost; + // Sets `for_flush` flag to true. let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?; let num_mem_ranges = mem_ranges.ranges.len(); + // Aggregate stats from all ranges let num_mem_rows = mem_ranges.num_rows(); let memtable_series_count = mem_ranges.series_count(); let memtable_id = mem.id(); + // Increases series count for each mem range. We consider each mem range has different series so + // the counter may have more series than the actual series count. series_count += memtable_series_count; let flush_start = Instant::now(); @@ -557,6 +568,7 @@ impl RegionFlushTask { for (source_idx, result) in results.into_iter().enumerate() { let (max_sequence, ssts_written, metrics) = result?; if ssts_written.is_empty() { + // No data written. continue; }