feat: remove primary key collection

This commit is contained in:
Ning Sun
2026-05-21 22:46:15 +08:00
parent e03ee25214
commit 9ac92e50e8
5 changed files with 20 additions and 139 deletions

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::num::NonZero;
use std::sync::Arc;
use std::time::Duration;
@@ -39,12 +38,10 @@ use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::picker::PickerOutput;
use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options};
use crate::config::MitoConfig;
use crate::engine::flush_hook::FlushHookRef;
use crate::error;
use crate::error::{
EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result,
};
use crate::flush::{SharedPrimaryKeys, wrap_with_pk_collector};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::options::RegionOptions;
@@ -264,8 +261,6 @@ pub struct MergeOutput {
pub compaction_time_window: Option<i64>,
#[serde(skip)]
pub sst_infos: Vec<SstInfo>,
#[serde(skip)]
pub primary_keys: Vec<Vec<u8>>,
}
impl MergeOutput {
@@ -311,7 +306,7 @@ pub trait SstMerger: Send + Sync + 'static {
compaction_region: CompactionRegion,
output: CompactionOutput,
write_opts: WriteOptions,
) -> Result<(Vec<FileMeta>, Vec<SstInfo>, Vec<Vec<u8>>)>;
) -> Result<(Vec<FileMeta>, Vec<SstInfo>)>;
}
/// The production [`SstMerger`] that reads, merges, and writes SST files.
@@ -325,7 +320,7 @@ impl SstMerger for DefaultSstMerger {
compaction_region: CompactionRegion,
output: CompactionOutput,
write_opts: WriteOptions,
) -> Result<(Vec<FileMeta>, Vec<SstInfo>, Vec<Vec<u8>>)> {
) -> Result<(Vec<FileMeta>, Vec<SstInfo>)> {
let region_id = compaction_region.region_id;
let storage = compaction_region.region_options.storage.clone();
let index_options = compaction_region
@@ -371,19 +366,6 @@ impl SstMerger for DefaultSstMerger {
};
let source = builder.build_flat_sst_reader().await?;
let hook: Option<FlushHookRef> = compaction_region.plugins.get();
let pk_collector: Option<SharedPrimaryKeys> = hook
.as_ref()
.map(|_| Arc::new(std::sync::Mutex::new(HashSet::new())));
let source = if let Some(collector) = &pk_collector {
crate::read::FlatSource::new_iter(
source.schema().clone(),
wrap_with_pk_collector(source.take_iter(), &Some(collector.clone())),
)
} else {
source
};
let mut metrics = Metrics::new(WriteType::Compaction);
let region_metadata = compaction_region.region_metadata.clone();
let sst_infos = compaction_region
@@ -463,10 +445,7 @@ impl SstMerger for DefaultSstMerger {
region_id, input_file_names, output_file_names, flat_format, metrics
);
metrics.observe();
let primary_keys: Vec<Vec<u8>> = pk_collector
.map(|c| c.lock().unwrap().drain().collect())
.unwrap_or_default();
Ok((output_files, sst_infos.into_iter().collect(), primary_keys))
Ok((output_files, sst_infos.into_iter().collect()))
}
}
@@ -536,7 +515,6 @@ where
let mut output_files = Vec::with_capacity(tasks.len());
let mut all_sst_infos: Vec<SstInfo> = Vec::new();
let mut all_primary_keys: HashSet<Vec<u8>> = HashSet::new();
let mut compacted_inputs = Vec::with_capacity(
tasks.iter().map(|(inputs, _)| inputs.len()).sum::<usize>()
+ picker_output.expired_ssts.len(),
@@ -560,10 +538,9 @@ where
while let Some((inputs, handle)) = spawned.pop() {
let abort_handle = handle.abort_handle();
match CancellableFuture::new(handle, self.cancel_handle.clone()).await {
Ok(Ok(Ok((files, infos, pks)))) => {
Ok(Ok(Ok((files, infos)))) => {
output_files.extend(files);
all_sst_infos.extend(infos);
all_primary_keys.extend(pks);
compacted_inputs.extend(inputs);
}
Ok(Ok(Err(e))) => {
@@ -624,7 +601,6 @@ where
files_to_remove: compacted_inputs,
compaction_time_window: Some(compaction_time_window),
sst_infos: all_sst_infos,
primary_keys: all_primary_keys.into_iter().collect(),
})
}
@@ -742,10 +718,10 @@ mod tests {
_compaction_region: CompactionRegion,
_output: CompactionOutput,
_write_opts: WriteOptions,
) -> Result<(Vec<FileMeta>, Vec<SstInfo>, Vec<Vec<u8>>)> {
) -> Result<(Vec<FileMeta>, Vec<SstInfo>)> {
let idx = self.call_idx.fetch_add(1, Ordering::SeqCst);
match self.results.lock().unwrap().get(idx) {
Some(Ok(files)) => Ok((files.clone(), Vec::new(), Vec::new())),
Some(Ok(files)) => Ok((files.clone(), Vec::new())),
Some(Err(_)) => error::InvalidMetaSnafu {
reason: format!("simulated failure at index {idx}"),
}
@@ -914,7 +890,7 @@ mod tests {
_compaction_region: CompactionRegion,
_output: CompactionOutput,
_write_opts: WriteOptions,
) -> Result<(Vec<FileMeta>, Vec<SstInfo>, Vec<Vec<u8>>)> {
) -> Result<(Vec<FileMeta>, Vec<SstInfo>)> {
self.call_idx.fetch_add(1, Ordering::SeqCst);
std::future::pending().await
}

View File

@@ -300,7 +300,6 @@ impl CompactionTaskImpl {
self.compaction_region.region_id,
&self.compaction_region.region_metadata,
&files,
&merge_output.primary_keys,
)
.await;
}

View File

@@ -41,35 +41,19 @@ pub struct SstFileInfo<'a> {
///
/// plugins.insert(Arc::new(MyHook) as FlushHookRef);
/// ```
///
/// To decode primary keys into tag name-value pairs, use:
/// ```ignore
/// use mito_codec::row_converter::build_primary_key_codec;
///
/// let codec = build_primary_key_codec(region_metadata);
/// for pk_bytes in primary_keys {
/// let decoded = codec.decode(pk_bytes)?;
/// // Dense: Vec<(ColumnId, Value)>
/// // Sparse: SparseValues with column_id -> value mapping
/// }
/// ```
#[async_trait]
pub trait FlushHook: Send + Sync {
/// Called after SST files are written during flush.
///
/// - `files`: per-file metadata (SstInfo + FileMeta) for each SST written.
/// - `primary_keys`: all unique primary keys (encoded bytes) across all files
/// in this flush. Decode with `build_primary_key_codec(region_metadata)`.
/// - `region_metadata`: provides the schema to decode primary keys into
/// tag/label name-value pairs.
/// - `region_metadata`: provides the schema for column type information.
async fn on_sst_files_written(
&self,
region_id: RegionId,
region_metadata: &RegionMetadataRef,
files: &[SstFileInfo<'_>],
primary_keys: &[Vec<u8>],
) {
let _ = (region_id, region_metadata, files, primary_keys);
let _ = (region_id, region_metadata, files);
}
/// Called after the region manifest is successfully updated.

View File

@@ -671,7 +671,6 @@ async fn test_update_topic_latest_entry_id(factory: Option<LogStoreFactory>) {
struct MockFlushHook {
sst_written_count: AtomicUsize,
manifest_updated_count: AtomicUsize,
primary_keys_count: AtomicUsize,
notify: Notify,
}
@@ -680,7 +679,6 @@ impl MockFlushHook {
Self {
sst_written_count: AtomicUsize::new(0),
manifest_updated_count: AtomicUsize::new(0),
primary_keys_count: AtomicUsize::new(0),
notify: Notify::new(),
}
}
@@ -697,17 +695,13 @@ impl FlushHook for MockFlushHook {
region_id: RegionId,
_region_metadata: &RegionMetadataRef,
files: &[SstFileInfo<'_>],
primary_keys: &[Vec<u8>],
) {
self.sst_written_count
.fetch_add(files.len(), Ordering::Relaxed);
self.primary_keys_count
.store(primary_keys.len(), Ordering::Relaxed);
common_telemetry::info!(
"MockFlushHook::on_sst_files_written: region={}, files={}, primary_keys={}",
"MockFlushHook::on_sst_files_written: region={}, files={}",
region_id,
files.len(),
primary_keys.len(),
);
for (i, file) in files.iter().enumerate() {
common_telemetry::info!(
@@ -729,11 +723,10 @@ impl FlushHook for MockFlushHook {
) {
self.manifest_updated_count.fetch_add(1, Ordering::Relaxed);
common_telemetry::info!(
"MockFlushHook::on_manifest_updated: region={}, manifest_version={}, files_added={}, primary_keys_collected={}",
"MockFlushHook::on_manifest_updated: region={}, manifest_version={}, files_added={}",
region_id,
manifest_version,
edit.files_to_add.len(),
self.primary_keys_count.load(Ordering::Relaxed),
);
self.notify.notify_one();
}
@@ -778,7 +771,6 @@ async fn test_flush_hook() {
let sst_count = hook.sst_written_count.load(Ordering::Relaxed);
let manifest_count = hook.manifest_updated_count.load(Ordering::Relaxed);
let pk_count = hook.primary_keys_count.load(Ordering::Relaxed);
assert!(
sst_count > 0,
@@ -788,15 +780,10 @@ async fn test_flush_hook() {
manifest_count, 1,
"Expected exactly 1 manifest update, got {manifest_count}"
);
assert!(
pk_count >= 2,
"Expected at least 2 unique primary keys (tags 'a' and 'b'), got {pk_count}"
);
common_telemetry::info!(
"test_flush_hook passed: sst_count={}, manifest_count={}, pk_count={}",
"test_flush_hook passed: sst_count={}, manifest_count={}",
sst_count,
manifest_count,
pk_count
);
}

View File

@@ -14,18 +14,16 @@
//! Flush related utilities and structs.
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::num::NonZeroU64;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use bytes::Bytes;
use common_base::Plugins;
use common_telemetry::{debug, error, info};
use datatypes::arrow::array::{Array, BinaryArray, DictionaryArray};
use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::arrow::datatypes::SchemaRef;
use partition::expr::PartitionExpr;
use smallvec::{SmallVec, smallvec};
use snafu::ResultExt;
@@ -70,35 +68,6 @@ use crate::sst::parquet::{
use crate::sst::{FlatSchemaOptions, FormatType, to_flat_sst_arrow_schema};
use crate::worker::WorkerListener;
pub(crate) type SharedPrimaryKeys = Arc<Mutex<HashSet<Vec<u8>>>>;
pub(crate) struct PkCollectingIter {
inner: BoxedRecordBatchIterator,
primary_keys: SharedPrimaryKeys,
}
impl Iterator for PkCollectingIter {
type Item = Result<RecordBatch, Error>;
fn next(&mut self) -> Option<Self::Item> {
let batch = self.inner.next();
if let Some(Ok(ref record_batch)) = batch
&& let Some(pk_col) =
record_batch.column_by_name(store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME)
&& let Some(pk_dict) = pk_col
.as_any()
.downcast_ref::<DictionaryArray<UInt32Type>>()
&& let Some(pk_values) = pk_dict.values().as_any().downcast_ref::<BinaryArray>()
{
let mut keys = self.primary_keys.lock().unwrap();
for i in 0..pk_values.len() {
keys.insert(pk_values.value(i).to_vec());
}
}
batch
}
}
/// Global write buffer (memtable) manager.
///
/// Tracks write buffer (memtable) usages and decide whether the engine needs to flush.
@@ -422,7 +391,6 @@ impl RegionFlushTask {
encoded_part_count,
flush_metrics,
sst_infos,
primary_keys,
} = self.do_flush_memtables(version, write_opts).await?;
if !file_metas.is_empty() {
@@ -461,7 +429,7 @@ impl RegionFlushTask {
file_meta,
})
.collect();
hook.on_sst_files_written(self.region_id, &version.metadata, &files, &primary_keys)
hook.on_sst_files_written(self.region_id, &version.metadata, &files)
.await;
}
@@ -527,8 +495,6 @@ impl RegionFlushTask {
let mut flush_metrics = Metrics::new(WriteType::Flush);
let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?;
let hook: Option<FlushHookRef> = self.plugins.get();
let shared_pks: Option<SharedPrimaryKeys> =
hook.as_ref().map(|_| Arc::new(Mutex::new(HashSet::new())));
let mut all_sst_infos = Vec::new();
for mem in memtables {
if mem.is_empty() {
@@ -562,7 +528,7 @@ impl RegionFlushTask {
num_sources,
results,
} = self
.flush_flat_mem_ranges(version, &write_opts, mem_ranges, shared_pks.clone())
.flush_flat_mem_ranges(version, &write_opts, mem_ranges)
.await?;
encoded_part_count += num_encoded;
for (source_idx, result) in results.into_iter().enumerate() {
@@ -615,10 +581,6 @@ impl RegionFlushTask {
);
}
let primary_keys = shared_pks
.map(|pks| pks.lock().unwrap().drain().collect())
.unwrap_or_default();
Ok(DoFlushMemtablesResult {
file_metas,
flushed_bytes,
@@ -626,7 +588,6 @@ impl RegionFlushTask {
encoded_part_count,
flush_metrics,
sst_infos: all_sst_infos,
primary_keys,
})
}
@@ -635,7 +596,6 @@ impl RegionFlushTask {
version: &VersionRef,
write_opts: &WriteOptions,
mem_ranges: MemtableRanges,
pk_collector: Option<SharedPrimaryKeys>,
) -> Result<FlushFlatMemResult> {
let batch_schema = to_flat_sst_arrow_schema(
&version.metadata,
@@ -648,7 +608,6 @@ impl RegionFlushTask {
mem_ranges,
&version.options,
field_column_start,
pk_collector.clone(),
)?;
let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
let num_encoded = flat_sources.encoded.len();
@@ -794,7 +753,6 @@ struct DoFlushMemtablesResult {
encoded_part_count: usize,
flush_metrics: Metrics,
sst_infos: Vec<SstInfo>,
primary_keys: Vec<Vec<u8>>,
}
struct FlatSources {
@@ -802,26 +760,12 @@ struct FlatSources {
encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,
}
pub(crate) fn wrap_with_pk_collector(
iter: BoxedRecordBatchIterator,
pk_collector: &Option<SharedPrimaryKeys>,
) -> BoxedRecordBatchIterator {
match pk_collector {
Some(collector) => Box::new(PkCollectingIter {
inner: iter,
primary_keys: collector.clone(),
}),
None => iter,
}
}
/// Returns the max sequence and [FlatSource] for the given memtable.
fn memtable_flat_sources(
schema: SchemaRef,
mem_ranges: MemtableRanges,
options: &RegionOptions,
field_column_start: usize,
pk_collector: Option<SharedPrimaryKeys>,
) -> Result<FlatSources> {
let MemtableRanges { ranges } = mem_ranges;
let mut flat_sources = FlatSources {
@@ -846,7 +790,6 @@ fn memtable_flat_sources(
field_column_start,
iter,
);
let iter = wrap_with_pk_collector(iter, &pk_collector);
flat_sources
.sources
.push((FlatSource::new_iter(schema, iter), max_sequence));
@@ -911,7 +854,6 @@ fn memtable_flat_sources(
field_column_start,
std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
)?;
let maybe_dedup = wrap_with_pk_collector(maybe_dedup, &pk_collector);
flat_sources.sources.push((
FlatSource::new_iter(schema.clone(), maybe_dedup),
@@ -944,7 +886,6 @@ fn memtable_flat_sources(
field_column_start,
input_iters,
)?;
let maybe_dedup = wrap_with_pk_collector(maybe_dedup, &pk_collector);
flat_sources
.sources
@@ -1615,7 +1556,6 @@ mod tests {
mem_ranges,
&options,
metadata.primary_key.len(),
None,
)
.unwrap();
assert!(flat_sources.encoded.is_empty());
@@ -1642,14 +1582,9 @@ mod tests {
..Default::default()
};
let flat_sources = memtable_flat_sources(
schema,
mem_ranges,
&options,
metadata.primary_key.len(),
None,
)
.unwrap();
let flat_sources =
memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
.unwrap();
assert!(flat_sources.encoded.is_empty());
assert_eq!(1, flat_sources.sources.len());