From 9ac92e50e886cc6ef8b9fa6f53d2aaa889534b12 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Thu, 21 May 2026 22:46:15 +0800 Subject: [PATCH] feat: remove primary key collection --- src/mito2/src/compaction/compactor.rs | 38 +++---------- src/mito2/src/compaction/task.rs | 1 - src/mito2/src/engine/flush_hook.rs | 20 +------ src/mito2/src/engine/flush_test.rs | 19 +------ src/mito2/src/flush.rs | 81 +++------------------------ 5 files changed, 20 insertions(+), 139 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index d498bc3a18..ef608ae0fc 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; use std::num::NonZero; use std::sync::Arc; use std::time::Duration; @@ -39,12 +38,10 @@ use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::picker::PickerOutput; use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options}; use crate::config::MitoConfig; -use crate::engine::flush_hook::FlushHookRef; use crate::error; use crate::error::{ EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result, }; -use crate::flush::{SharedPrimaryKeys, wrap_with_pk_collector}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::region::options::RegionOptions; @@ -264,8 +261,6 @@ pub struct MergeOutput { pub compaction_time_window: Option, #[serde(skip)] pub sst_infos: Vec, - #[serde(skip)] - pub primary_keys: Vec>, } impl MergeOutput { @@ -311,7 +306,7 @@ pub trait SstMerger: Send + Sync + 'static { compaction_region: CompactionRegion, output: CompactionOutput, write_opts: WriteOptions, - ) -> Result<(Vec, Vec, Vec>)>; + ) -> Result<(Vec, Vec)>; } /// The production [`SstMerger`] that reads, merges, and writes SST files. @@ -325,7 +320,7 @@ impl SstMerger for DefaultSstMerger { compaction_region: CompactionRegion, output: CompactionOutput, write_opts: WriteOptions, - ) -> Result<(Vec, Vec, Vec>)> { + ) -> Result<(Vec, Vec)> { let region_id = compaction_region.region_id; let storage = compaction_region.region_options.storage.clone(); let index_options = compaction_region @@ -371,19 +366,6 @@ impl SstMerger for DefaultSstMerger { }; let source = builder.build_flat_sst_reader().await?; - let hook: Option = compaction_region.plugins.get(); - let pk_collector: Option = hook - .as_ref() - .map(|_| Arc::new(std::sync::Mutex::new(HashSet::new()))); - let source = if let Some(collector) = &pk_collector { - crate::read::FlatSource::new_iter( - source.schema().clone(), - wrap_with_pk_collector(source.take_iter(), &Some(collector.clone())), - ) - } else { - source - }; - let mut metrics = Metrics::new(WriteType::Compaction); let region_metadata = compaction_region.region_metadata.clone(); let sst_infos = compaction_region @@ -463,10 +445,7 @@ impl SstMerger for DefaultSstMerger { region_id, input_file_names, output_file_names, flat_format, metrics ); metrics.observe(); - let primary_keys: Vec> = pk_collector - .map(|c| c.lock().unwrap().drain().collect()) - .unwrap_or_default(); - Ok((output_files, sst_infos.into_iter().collect(), primary_keys)) + Ok((output_files, sst_infos.into_iter().collect())) } } @@ -536,7 +515,6 @@ where let mut output_files = Vec::with_capacity(tasks.len()); let mut all_sst_infos: Vec = Vec::new(); - let mut all_primary_keys: HashSet> = HashSet::new(); let mut compacted_inputs = Vec::with_capacity( tasks.iter().map(|(inputs, _)| inputs.len()).sum::() + picker_output.expired_ssts.len(), @@ -560,10 +538,9 @@ where while let Some((inputs, handle)) = spawned.pop() { let abort_handle = handle.abort_handle(); match CancellableFuture::new(handle, self.cancel_handle.clone()).await { - Ok(Ok(Ok((files, infos, pks)))) => { + Ok(Ok(Ok((files, infos)))) => { output_files.extend(files); all_sst_infos.extend(infos); - all_primary_keys.extend(pks); compacted_inputs.extend(inputs); } Ok(Ok(Err(e))) => { @@ -624,7 +601,6 @@ where files_to_remove: compacted_inputs, compaction_time_window: Some(compaction_time_window), sst_infos: all_sst_infos, - primary_keys: all_primary_keys.into_iter().collect(), }) } @@ -742,10 +718,10 @@ mod tests { _compaction_region: CompactionRegion, _output: CompactionOutput, _write_opts: WriteOptions, - ) -> Result<(Vec, Vec, Vec>)> { + ) -> Result<(Vec, Vec)> { let idx = self.call_idx.fetch_add(1, Ordering::SeqCst); match self.results.lock().unwrap().get(idx) { - Some(Ok(files)) => Ok((files.clone(), Vec::new(), Vec::new())), + Some(Ok(files)) => Ok((files.clone(), Vec::new())), Some(Err(_)) => error::InvalidMetaSnafu { reason: format!("simulated failure at index {idx}"), } @@ -914,7 +890,7 @@ mod tests { _compaction_region: CompactionRegion, _output: CompactionOutput, _write_opts: WriteOptions, - ) -> Result<(Vec, Vec, Vec>)> { + ) -> Result<(Vec, Vec)> { self.call_idx.fetch_add(1, Ordering::SeqCst); std::future::pending().await } diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index 4c52d8a9df..0e3725b8e6 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -300,7 +300,6 @@ impl CompactionTaskImpl { self.compaction_region.region_id, &self.compaction_region.region_metadata, &files, - &merge_output.primary_keys, ) .await; } diff --git a/src/mito2/src/engine/flush_hook.rs b/src/mito2/src/engine/flush_hook.rs index 2ad3c1d229..771b83704f 100644 --- a/src/mito2/src/engine/flush_hook.rs +++ b/src/mito2/src/engine/flush_hook.rs @@ -41,35 +41,19 @@ pub struct SstFileInfo<'a> { /// /// plugins.insert(Arc::new(MyHook) as FlushHookRef); /// ``` -/// -/// To decode primary keys into tag name-value pairs, use: -/// ```ignore -/// use mito_codec::row_converter::build_primary_key_codec; -/// -/// let codec = build_primary_key_codec(region_metadata); -/// for pk_bytes in primary_keys { -/// let decoded = codec.decode(pk_bytes)?; -/// // Dense: Vec<(ColumnId, Value)> -/// // Sparse: SparseValues with column_id -> value mapping -/// } -/// ``` #[async_trait] pub trait FlushHook: Send + Sync { /// Called after SST files are written during flush. /// /// - `files`: per-file metadata (SstInfo + FileMeta) for each SST written. - /// - `primary_keys`: all unique primary keys (encoded bytes) across all files - /// in this flush. Decode with `build_primary_key_codec(region_metadata)`. - /// - `region_metadata`: provides the schema to decode primary keys into - /// tag/label name-value pairs. + /// - `region_metadata`: provides the schema for column type information. async fn on_sst_files_written( &self, region_id: RegionId, region_metadata: &RegionMetadataRef, files: &[SstFileInfo<'_>], - primary_keys: &[Vec], ) { - let _ = (region_id, region_metadata, files, primary_keys); + let _ = (region_id, region_metadata, files); } /// Called after the region manifest is successfully updated. diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index c6493f7e84..e85b854b1f 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -671,7 +671,6 @@ async fn test_update_topic_latest_entry_id(factory: Option) { struct MockFlushHook { sst_written_count: AtomicUsize, manifest_updated_count: AtomicUsize, - primary_keys_count: AtomicUsize, notify: Notify, } @@ -680,7 +679,6 @@ impl MockFlushHook { Self { sst_written_count: AtomicUsize::new(0), manifest_updated_count: AtomicUsize::new(0), - primary_keys_count: AtomicUsize::new(0), notify: Notify::new(), } } @@ -697,17 +695,13 @@ impl FlushHook for MockFlushHook { region_id: RegionId, _region_metadata: &RegionMetadataRef, files: &[SstFileInfo<'_>], - primary_keys: &[Vec], ) { self.sst_written_count .fetch_add(files.len(), Ordering::Relaxed); - self.primary_keys_count - .store(primary_keys.len(), Ordering::Relaxed); common_telemetry::info!( - "MockFlushHook::on_sst_files_written: region={}, files={}, primary_keys={}", + "MockFlushHook::on_sst_files_written: region={}, files={}", region_id, files.len(), - primary_keys.len(), ); for (i, file) in files.iter().enumerate() { common_telemetry::info!( @@ -729,11 +723,10 @@ impl FlushHook for MockFlushHook { ) { self.manifest_updated_count.fetch_add(1, Ordering::Relaxed); common_telemetry::info!( - "MockFlushHook::on_manifest_updated: region={}, manifest_version={}, files_added={}, primary_keys_collected={}", + "MockFlushHook::on_manifest_updated: region={}, manifest_version={}, files_added={}", region_id, manifest_version, edit.files_to_add.len(), - self.primary_keys_count.load(Ordering::Relaxed), ); self.notify.notify_one(); } @@ -778,7 +771,6 @@ async fn test_flush_hook() { let sst_count = hook.sst_written_count.load(Ordering::Relaxed); let manifest_count = hook.manifest_updated_count.load(Ordering::Relaxed); - let pk_count = hook.primary_keys_count.load(Ordering::Relaxed); assert!( sst_count > 0, @@ -788,15 +780,10 @@ async fn test_flush_hook() { manifest_count, 1, "Expected exactly 1 manifest update, got {manifest_count}" ); - assert!( - pk_count >= 2, - "Expected at least 2 unique primary keys (tags 'a' and 'b'), got {pk_count}" - ); common_telemetry::info!( - "test_flush_hook passed: sst_count={}, manifest_count={}, pk_count={}", + "test_flush_hook passed: sst_count={}, manifest_count={}", sst_count, manifest_count, - pk_count ); } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index ad4ea9ab50..c5428ab8be 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -14,18 +14,16 @@ //! Flush related utilities and structs. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::num::NonZeroU64; +use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; use std::time::Instant; use bytes::Bytes; use common_base::Plugins; use common_telemetry::{debug, error, info}; -use datatypes::arrow::array::{Array, BinaryArray, DictionaryArray}; -use datatypes::arrow::datatypes::{SchemaRef, UInt32Type}; -use datatypes::arrow::record_batch::RecordBatch; +use datatypes::arrow::datatypes::SchemaRef; use partition::expr::PartitionExpr; use smallvec::{SmallVec, smallvec}; use snafu::ResultExt; @@ -70,35 +68,6 @@ use crate::sst::parquet::{ use crate::sst::{FlatSchemaOptions, FormatType, to_flat_sst_arrow_schema}; use crate::worker::WorkerListener; -pub(crate) type SharedPrimaryKeys = Arc>>>; - -pub(crate) struct PkCollectingIter { - inner: BoxedRecordBatchIterator, - primary_keys: SharedPrimaryKeys, -} - -impl Iterator for PkCollectingIter { - type Item = Result; - - fn next(&mut self) -> Option { - let batch = self.inner.next(); - if let Some(Ok(ref record_batch)) = batch - && let Some(pk_col) = - record_batch.column_by_name(store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME) - && let Some(pk_dict) = pk_col - .as_any() - .downcast_ref::>() - && let Some(pk_values) = pk_dict.values().as_any().downcast_ref::() - { - let mut keys = self.primary_keys.lock().unwrap(); - for i in 0..pk_values.len() { - keys.insert(pk_values.value(i).to_vec()); - } - } - batch - } -} - /// Global write buffer (memtable) manager. /// /// Tracks write buffer (memtable) usages and decide whether the engine needs to flush. @@ -422,7 +391,6 @@ impl RegionFlushTask { encoded_part_count, flush_metrics, sst_infos, - primary_keys, } = self.do_flush_memtables(version, write_opts).await?; if !file_metas.is_empty() { @@ -461,7 +429,7 @@ impl RegionFlushTask { file_meta, }) .collect(); - hook.on_sst_files_written(self.region_id, &version.metadata, &files, &primary_keys) + hook.on_sst_files_written(self.region_id, &version.metadata, &files) .await; } @@ -527,8 +495,6 @@ impl RegionFlushTask { let mut flush_metrics = Metrics::new(WriteType::Flush); let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?; let hook: Option = self.plugins.get(); - let shared_pks: Option = - hook.as_ref().map(|_| Arc::new(Mutex::new(HashSet::new()))); let mut all_sst_infos = Vec::new(); for mem in memtables { if mem.is_empty() { @@ -562,7 +528,7 @@ impl RegionFlushTask { num_sources, results, } = self - .flush_flat_mem_ranges(version, &write_opts, mem_ranges, shared_pks.clone()) + .flush_flat_mem_ranges(version, &write_opts, mem_ranges) .await?; encoded_part_count += num_encoded; for (source_idx, result) in results.into_iter().enumerate() { @@ -615,10 +581,6 @@ impl RegionFlushTask { ); } - let primary_keys = shared_pks - .map(|pks| pks.lock().unwrap().drain().collect()) - .unwrap_or_default(); - Ok(DoFlushMemtablesResult { file_metas, flushed_bytes, @@ -626,7 +588,6 @@ impl RegionFlushTask { encoded_part_count, flush_metrics, sst_infos: all_sst_infos, - primary_keys, }) } @@ -635,7 +596,6 @@ impl RegionFlushTask { version: &VersionRef, write_opts: &WriteOptions, mem_ranges: MemtableRanges, - pk_collector: Option, ) -> Result { let batch_schema = to_flat_sst_arrow_schema( &version.metadata, @@ -648,7 +608,6 @@ impl RegionFlushTask { mem_ranges, &version.options, field_column_start, - pk_collector.clone(), )?; let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len()); let num_encoded = flat_sources.encoded.len(); @@ -794,7 +753,6 @@ struct DoFlushMemtablesResult { encoded_part_count: usize, flush_metrics: Metrics, sst_infos: Vec, - primary_keys: Vec>, } struct FlatSources { @@ -802,26 +760,12 @@ struct FlatSources { encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>, } -pub(crate) fn wrap_with_pk_collector( - iter: BoxedRecordBatchIterator, - pk_collector: &Option, -) -> BoxedRecordBatchIterator { - match pk_collector { - Some(collector) => Box::new(PkCollectingIter { - inner: iter, - primary_keys: collector.clone(), - }), - None => iter, - } -} - /// Returns the max sequence and [FlatSource] for the given memtable. fn memtable_flat_sources( schema: SchemaRef, mem_ranges: MemtableRanges, options: &RegionOptions, field_column_start: usize, - pk_collector: Option, ) -> Result { let MemtableRanges { ranges } = mem_ranges; let mut flat_sources = FlatSources { @@ -846,7 +790,6 @@ fn memtable_flat_sources( field_column_start, iter, ); - let iter = wrap_with_pk_collector(iter, &pk_collector); flat_sources .sources .push((FlatSource::new_iter(schema, iter), max_sequence)); @@ -911,7 +854,6 @@ fn memtable_flat_sources( field_column_start, std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)), )?; - let maybe_dedup = wrap_with_pk_collector(maybe_dedup, &pk_collector); flat_sources.sources.push(( FlatSource::new_iter(schema.clone(), maybe_dedup), @@ -944,7 +886,6 @@ fn memtable_flat_sources( field_column_start, input_iters, )?; - let maybe_dedup = wrap_with_pk_collector(maybe_dedup, &pk_collector); flat_sources .sources @@ -1615,7 +1556,6 @@ mod tests { mem_ranges, &options, metadata.primary_key.len(), - None, ) .unwrap(); assert!(flat_sources.encoded.is_empty()); @@ -1642,14 +1582,9 @@ mod tests { ..Default::default() }; - let flat_sources = memtable_flat_sources( - schema, - mem_ranges, - &options, - metadata.primary_key.len(), - None, - ) - .unwrap(); + let flat_sources = + memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len()) + .unwrap(); assert!(flat_sources.encoded.is_empty()); assert_eq!(1, flat_sources.sources.len());