feat: introduce flush and compaction hook

This commit is contained in:
Ning Sun
2026-05-13 15:07:52 +08:00
parent 4668dd43bd
commit dd454f7ebb
12 changed files with 459 additions and 41 deletions

View File

@@ -515,6 +515,7 @@ impl CompactionScheduler {
file_purger: None,
ttl: Some(ttl),
max_parallelism,
plugins: self.plugins.clone(),
};
let picker_output = {

View File

@@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::num::NonZero;
use std::sync::Arc;
use std::time::Duration;
use common_base::Plugins;
use common_base::cancellation::{CancellableFuture, CancellationHandle};
use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{debug, info, warn};
@@ -37,10 +39,12 @@ use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::picker::PickerOutput;
use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options};
use crate::config::MitoConfig;
use crate::engine::flush_hook::FlushHookRef;
use crate::error;
use crate::error::{
EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result,
};
use crate::flush::{SharedPrimaryKeys, wrap_with_pk_collector};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::options::RegionOptions;
@@ -53,8 +57,8 @@ use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::location::region_dir_from_table_dir;
use crate::sst::parquet::WriteOptions;
use crate::sst::parquet::metadata::extract_primary_key_range;
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::version::{SstVersion, SstVersionRef};
/// Region version for compaction that does not hold memtables.
@@ -106,6 +110,8 @@ pub struct CompactionRegion {
/// The parallel is inside this compaction task, not across different compaction tasks.
/// It can be different windows of the same compaction task or something like this.
pub max_parallelism: usize,
pub(crate) plugins: Plugins,
}
/// OpenCompactionRegionRequest represents the request to open a compaction region.
@@ -230,6 +236,7 @@ pub async fn open_compaction_region(
file_purger: Some(file_purger),
ttl: Some(ttl),
max_parallelism: req.max_parallelism,
plugins: Plugins::new(),
})
}
@@ -250,11 +257,15 @@ impl CompactionRegion {
}
/// `[MergeOutput]` represents the output of merging SST files.
#[derive(Default, Clone, Debug, Serialize, Deserialize)]
#[derive(Default, Debug, Serialize, Deserialize)]
pub struct MergeOutput {
pub files_to_add: Vec<FileMeta>,
pub files_to_remove: Vec<FileMeta>,
pub compaction_time_window: Option<i64>,
#[serde(skip)]
pub sst_infos: Vec<SstInfo>,
#[serde(skip)]
pub primary_keys: Vec<Vec<u8>>,
}
impl MergeOutput {
@@ -300,7 +311,7 @@ pub trait SstMerger: Send + Sync + 'static {
compaction_region: CompactionRegion,
output: CompactionOutput,
write_opts: WriteOptions,
) -> Result<Vec<FileMeta>>;
) -> Result<(Vec<FileMeta>, Vec<SstInfo>, Vec<Vec<u8>>)>;
}
/// The production [`SstMerger`] that reads, merges, and writes SST files.
@@ -314,7 +325,7 @@ impl SstMerger for DefaultSstMerger {
compaction_region: CompactionRegion,
output: CompactionOutput,
write_opts: WriteOptions,
) -> Result<Vec<FileMeta>> {
) -> Result<(Vec<FileMeta>, Vec<SstInfo>, Vec<Vec<u8>>)> {
let region_id = compaction_region.region_id;
let storage = compaction_region.region_options.storage.clone();
let index_options = compaction_region
@@ -359,6 +370,20 @@ impl SstMerger for DefaultSstMerger {
merge_mode,
};
let source = builder.build_flat_sst_reader().await?;
let hook: Option<FlushHookRef> = compaction_region.plugins.get();
let pk_collector: Option<SharedPrimaryKeys> = hook
.as_ref()
.map(|_| Arc::new(std::sync::Mutex::new(HashSet::new())));
let source = if let Some(collector) = &pk_collector {
crate::read::FlatSource::new_iter(
source.schema().clone(),
wrap_with_pk_collector(source.take_iter(), &Some(collector.clone())),
)
} else {
source
};
let mut metrics = Metrics::new(WriteType::Compaction);
let region_metadata = compaction_region.region_metadata.clone();
let sst_infos = compaction_region
@@ -400,7 +425,7 @@ impl SstMerger for DefaultSstMerger {
};
let output_files = sst_infos
.into_iter()
.iter()
.map(|sst_info| {
let pk_range = sst_info
.file_metadata
@@ -438,7 +463,10 @@ impl SstMerger for DefaultSstMerger {
region_id, input_file_names, output_file_names, flat_format, metrics
);
metrics.observe();
Ok(output_files)
let primary_keys: Vec<Vec<u8>> = pk_collector
.map(|c| c.lock().unwrap().drain().collect())
.unwrap_or_default();
Ok((output_files, sst_infos.into_iter().collect(), primary_keys))
}
}
@@ -507,6 +535,8 @@ where
}
let mut output_files = Vec::with_capacity(tasks.len());
let mut all_sst_infos: Vec<SstInfo> = Vec::new();
let mut all_primary_keys: HashSet<Vec<u8>> = HashSet::new();
let mut compacted_inputs = Vec::with_capacity(
tasks.iter().map(|(inputs, _)| inputs.len()).sum::<usize>()
+ picker_output.expired_ssts.len(),
@@ -530,8 +560,10 @@ where
while let Some((inputs, handle)) = spawned.pop() {
let abort_handle = handle.abort_handle();
match CancellableFuture::new(handle, self.cancel_handle.clone()).await {
Ok(Ok(Ok(files))) => {
Ok(Ok(Ok((files, infos, pks)))) => {
output_files.extend(files);
all_sst_infos.extend(infos);
all_primary_keys.extend(pks);
compacted_inputs.extend(inputs);
}
Ok(Ok(Err(e))) => {
@@ -591,6 +623,8 @@ where
files_to_add: output_files,
files_to_remove: compacted_inputs,
compaction_time_window: Some(compaction_time_window),
sst_infos: all_sst_infos,
primary_keys: all_primary_keys.into_iter().collect(),
})
}
@@ -679,6 +713,7 @@ mod tests {
file_purger: None,
ttl: None,
max_parallelism: 1,
plugins: Plugins::new(),
}
}
@@ -707,10 +742,10 @@ mod tests {
_compaction_region: CompactionRegion,
_output: CompactionOutput,
_write_opts: WriteOptions,
) -> Result<Vec<FileMeta>> {
) -> Result<(Vec<FileMeta>, Vec<SstInfo>, Vec<Vec<u8>>)> {
let idx = self.call_idx.fetch_add(1, Ordering::SeqCst);
match self.results.lock().unwrap().get(idx) {
Some(Ok(files)) => Ok(files.clone()),
Some(Ok(files)) => Ok((files.clone(), Vec::new(), Vec::new())),
Some(Err(_)) => error::InvalidMetaSnafu {
reason: format!("simulated failure at index {idx}"),
}
@@ -879,7 +914,7 @@ mod tests {
_compaction_region: CompactionRegion,
_output: CompactionOutput,
_write_opts: WriteOptions,
) -> Result<Vec<FileMeta>> {
) -> Result<(Vec<FileMeta>, Vec<SstInfo>, Vec<Vec<u8>>)> {
self.call_idx.fetch_add(1, Ordering::SeqCst);
std::future::pending().await
}

View File

@@ -27,6 +27,7 @@ use crate::compaction::LocalCompactionState;
use crate::compaction::compactor::{CompactionRegion, Compactor, MergeOutput};
use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager};
use crate::compaction::picker::{CompactionTask, PickerOutput};
use crate::engine::flush_hook::{FlushHookRef, SstFileInfo};
use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
@@ -282,6 +283,43 @@ impl CompactionTaskImpl {
);
}
}
async fn invoke_sst_hook(&self, merge_output: &MergeOutput) {
let hook: Option<FlushHookRef> = self.compaction_region.plugins.get();
if let Some(hook) = hook {
let files: Vec<SstFileInfo<'_>> = merge_output
.sst_infos
.iter()
.zip(merge_output.files_to_add.iter())
.map(|(info, meta)| SstFileInfo {
file_meta: meta.clone(),
sst_info_ref: info,
})
.collect();
hook.on_sst_files_written(
self.compaction_region.region_id,
&self.compaction_region.region_metadata,
&files,
&merge_output.primary_keys,
)
.await;
}
}
async fn invoke_manifest_hook(&self, edit: &RegionEdit) {
let hook: Option<FlushHookRef> = self.compaction_region.plugins.get();
if let Some(hook) = hook {
let manifest_version = self
.compaction_region
.manifest_ctx
.manifest_manager
.read()
.await
.last_version();
hook.on_manifest_updated(self.compaction_region.region_id, edit, manifest_version)
.await;
}
}
}
#[async_trait::async_trait]
@@ -320,6 +358,7 @@ impl CompactionTask for CompactionTaskImpl {
.await
{
Ok(Ok(merge_output)) => {
self.invoke_sst_hook(&merge_output).await;
// Stop accepting cancellation once we are about to publish the compaction edit.
if !self.state.mark_commit_started() {
let senders = std::mem::take(&mut self.waiters);
@@ -330,6 +369,7 @@ impl CompactionTask for CompactionTaskImpl {
} else {
match self.update_manifest(merge_output).await {
Ok(edit) => {
self.invoke_manifest_hook(&edit).await;
let senders = std::mem::take(&mut self.waiters);
BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.compaction_region.region_id,

View File

@@ -40,6 +40,7 @@ mod drop_test;
mod edit_region_test;
#[cfg(test)]
mod filter_deleted_test;
pub mod flush_hook;
#[cfg(test)]
mod flush_test;
#[cfg(test)]

View File

@@ -0,0 +1,86 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Flush hook extension point for SST and manifest operations.
use std::sync::Arc;
use async_trait::async_trait;
use store_api::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use crate::manifest::action::RegionEdit;
use crate::sst::file::FileMeta;
use crate::sst::parquet::SstInfo;
/// Information about a single SST file written during flush.
pub struct SstFileInfo<'a> {
pub sst_info_ref: &'a SstInfo,
pub file_meta: FileMeta,
}
/// Extension hook for flush operations.
///
/// Implementations can be registered via the `Plugins` system:
/// ```ignore
/// use std::sync::Arc;
/// use common_base::Plugins;
/// use mito2::engine::flush_hook::{FlushHook, FlushHookRef};
///
/// plugins.insert(Arc::new(MyHook) as FlushHookRef);
/// ```
///
/// To decode primary keys into tag name-value pairs, use:
/// ```ignore
/// use mito_codec::row_converter::build_primary_key_codec;
///
/// let codec = build_primary_key_codec(region_metadata);
/// for pk_bytes in primary_keys {
/// let decoded = codec.decode(pk_bytes)?;
/// // Dense: Vec<(ColumnId, Value)>
/// // Sparse: SparseValues with column_id -> value mapping
/// }
/// ```
#[async_trait]
pub trait FlushHook: Send + Sync {
/// Called after SST files are written during flush.
///
/// - `files`: per-file metadata (SstInfo + FileMeta) for each SST written.
/// - `primary_keys`: all unique primary keys (encoded bytes) across all files
/// in this flush. Decode with `build_primary_key_codec(region_metadata)`.
/// - `region_metadata`: provides the schema to decode primary keys into
/// tag/label name-value pairs.
async fn on_sst_files_written(
&self,
region_id: RegionId,
region_metadata: &RegionMetadataRef,
files: &[SstFileInfo<'_>],
primary_keys: &[Vec<u8>],
) {
let _ = (region_id, region_metadata, files, primary_keys);
}
/// Called after the region manifest is successfully updated.
async fn on_manifest_updated(
&self,
region_id: RegionId,
edit: &RegionEdit,
manifest_version: ManifestVersion,
) {
let _ = (region_id, edit, manifest_version);
}
}
pub type FlushHookRef = Arc<dyn FlushHook>;

View File

@@ -15,21 +15,28 @@
//! Flush tests for mito engine.
use std::sync::Arc;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
use std::time::Duration;
use api::v1::Rows;
use async_trait::async_trait;
use common_base::Plugins;
use common_recordbatch::RecordBatches;
use common_time::util::current_time_millis;
use common_wal::options::WAL_OPTIONS_KEY;
use rstest::rstest;
use rstest_reuse::{self, apply};
use store_api::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::Notify;
use crate::config::MitoConfig;
use crate::engine::flush_hook::{FlushHook, FlushHookRef, SstFileInfo};
use crate::engine::listener::{FlushListener, StallListener};
use crate::manifest::action::RegionEdit;
use crate::test_util::{
CreateRequestBuilder, LogStoreFactory, MockWriteBufferManager, TestEnv, build_rows,
build_rows_for_key, flush_region, kafka_log_store_factory, multiple_log_store_factories,
@@ -660,3 +667,136 @@ async fn test_update_topic_latest_entry_id(factory: Option<LogStoreFactory>) {
.unwrap();
assert_eq!(region.topic_latest_entry_id.load(Ordering::Relaxed), 1);
}
struct MockFlushHook {
sst_written_count: AtomicUsize,
manifest_updated_count: AtomicUsize,
primary_keys_count: AtomicUsize,
notify: Notify,
}
impl MockFlushHook {
fn new() -> Self {
Self {
sst_written_count: AtomicUsize::new(0),
manifest_updated_count: AtomicUsize::new(0),
primary_keys_count: AtomicUsize::new(0),
notify: Notify::new(),
}
}
async fn wait_for_manifest_update(&self) {
self.notify.notified().await;
}
}
#[async_trait]
impl FlushHook for MockFlushHook {
async fn on_sst_files_written(
&self,
region_id: RegionId,
_region_metadata: &RegionMetadataRef,
files: &[SstFileInfo<'_>],
primary_keys: &[Vec<u8>],
) {
self.sst_written_count
.fetch_add(files.len(), Ordering::Relaxed);
self.primary_keys_count
.store(primary_keys.len(), Ordering::Relaxed);
common_telemetry::info!(
"MockFlushHook::on_sst_files_written: region={}, files={}, primary_keys={}",
region_id,
files.len(),
primary_keys.len(),
);
for (i, file) in files.iter().enumerate() {
common_telemetry::info!(
" file[{}]: file_id={}, num_rows={}, num_series={}, file_size={}",
i,
file.sst_info_ref.file_id,
file.sst_info_ref.num_rows,
file.sst_info_ref.num_series,
file.sst_info_ref.file_size,
);
}
}
async fn on_manifest_updated(
&self,
region_id: RegionId,
edit: &RegionEdit,
manifest_version: ManifestVersion,
) {
self.manifest_updated_count.fetch_add(1, Ordering::Relaxed);
common_telemetry::info!(
"MockFlushHook::on_manifest_updated: region={}, manifest_version={}, files_added={}, primary_keys_collected={}",
region_id,
manifest_version,
edit.files_to_add.len(),
self.primary_keys_count.load(Ordering::Relaxed),
);
self.notify.notify_one();
}
}
#[tokio::test]
async fn test_flush_hook() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let hook = Arc::new(MockFlushHook::new());
let plugins = Plugins::new();
plugins.insert(hook.clone() as FlushHookRef);
let engine = env
.create_engine_with_plugins(MitoConfig::default(), plugins)
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 10, 0),
};
put_rows(&engine, region_id, rows).await;
let rows2 = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 0, 10, 0),
};
put_rows(&engine, region_id, rows2).await;
flush_region(&engine, region_id, None).await;
hook.wait_for_manifest_update().await;
let sst_count = hook.sst_written_count.load(Ordering::Relaxed);
let manifest_count = hook.manifest_updated_count.load(Ordering::Relaxed);
let pk_count = hook.primary_keys_count.load(Ordering::Relaxed);
assert!(
sst_count > 0,
"Expected at least 1 SST file, got {sst_count}"
);
assert_eq!(
manifest_count, 1,
"Expected exactly 1 manifest update, got {manifest_count}"
);
assert!(
pk_count >= 2,
"Expected at least 2 unique primary keys (tags 'a' and 'b'), got {pk_count}"
);
common_telemetry::info!(
"test_flush_hook passed: sst_count={}, manifest_count={}, pk_count={}",
sst_count,
manifest_count,
pk_count
);
}

View File

@@ -14,15 +14,18 @@
//! Flush related utilities and structs.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::num::NonZeroU64;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use bytes::Bytes;
use common_base::Plugins;
use common_telemetry::{debug, error, info};
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::array::{Array, BinaryArray, DictionaryArray};
use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
use datatypes::arrow::record_batch::RecordBatch;
use partition::expr::PartitionExpr;
use smallvec::{SmallVec, smallvec};
use snafu::ResultExt;
@@ -36,6 +39,7 @@ use crate::access_layer::{
};
use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::engine::flush_hook::{FlushHookRef, SstFileInfo};
use crate::error::{
Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu,
RegionTruncatedSnafu, Result,
@@ -66,6 +70,36 @@ use crate::sst::parquet::{
use crate::sst::{FlatSchemaOptions, FormatType, to_flat_sst_arrow_schema};
use crate::worker::WorkerListener;
pub(crate) type SharedPrimaryKeys = Arc<Mutex<HashSet<Vec<u8>>>>;
pub(crate) struct PkCollectingIter {
inner: BoxedRecordBatchIterator,
primary_keys: SharedPrimaryKeys,
}
impl Iterator for PkCollectingIter {
type Item = Result<RecordBatch, Error>;
fn next(&mut self) -> Option<Self::Item> {
let batch = self.inner.next();
if let Some(Ok(ref record_batch)) = batch {
let pk_col_idx = record_batch.num_columns().saturating_sub(3);
if let Some(pk_col) = record_batch.columns().get(pk_col_idx)
&& let Some(pk_dict) = pk_col
.as_any()
.downcast_ref::<DictionaryArray<UInt32Type>>()
&& let Some(pk_values) = pk_dict.values().as_any().downcast_ref::<BinaryArray>()
{
let mut keys = self.primary_keys.lock().unwrap();
for i in 0..pk_values.len() {
keys.insert(pk_values.value(i).to_vec());
}
}
}
batch
}
}
/// Global write buffer (memtable) manager.
///
/// Tracks write buffer (memtable) usages and decide whether the engine needs to flush.
@@ -275,6 +309,8 @@ pub(crate) struct RegionFlushTask {
///
/// This is used to generate the file meta.
pub(crate) partition_expr: Option<String>,
/// Plugins for flush hooks.
pub(crate) plugins: Plugins,
}
impl RegionFlushTask {
@@ -365,8 +401,6 @@ impl RegionFlushTask {
/// Flushes memtables to level 0 SSTs and updates the manifest.
/// Returns the [RegionEdit] to apply.
async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
// We must use the immutable memtables list and entry ids from the `version_data`
// for consistency as others might already modify the version in the `version_control`.
let version = &version_data.version;
let timer = FLUSH_ELAPSED
.with_label_values(&["flush_memtables"])
@@ -386,6 +420,8 @@ impl RegionFlushTask {
series_count,
encoded_part_count,
flush_metrics,
sst_infos,
primary_keys,
} = self.do_flush_memtables(version, write_opts).await?;
if !file_metas.is_empty() {
@@ -414,12 +450,25 @@ impl RegionFlushTask {
);
flush_metrics.observe();
let hook: Option<FlushHookRef> = self.plugins.get();
if let Some(hook) = &hook {
let files: Vec<SstFileInfo<'_>> = sst_infos
.iter()
.zip(file_metas.iter())
.map(|(sst_info, file_meta)| SstFileInfo {
sst_info_ref: sst_info,
file_meta: file_meta.clone(),
})
.collect();
hook.on_sst_files_written(self.region_id, &version.metadata, &files, &primary_keys)
.await;
}
let edit = RegionEdit {
files_to_add: file_metas,
files_to_remove: Vec::new(),
timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
compaction_time_window: None,
// The last entry has been flushed.
flushed_entry_id: Some(version_data.last_entry_id),
flushed_sequence: Some(version_data.committed_sequence),
committed_sequence: None,
@@ -434,7 +483,6 @@ impl RegionFlushTask {
let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
RegionLeaderState::Downgrading
} else {
// Check if region is in staging mode
let current_state = self.manifest_ctx.current_state();
if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
RegionLeaderState::Staging
@@ -442,19 +490,22 @@ impl RegionFlushTask {
RegionLeaderState::Writable
}
};
// We will leak files if the manifest update fails, but we ignore them for simplicity. We can
// add a cleanup job to remove them later.
let version = self
let manifest_version = self
.manifest_ctx
.update_manifest(expected_state, action_list, self.is_staging)
.await?;
info!(
"Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}",
"Successfully update manifest version to {manifest_version}, region: {}, is_staging: {}, reason: {}",
self.region_id,
self.is_staging,
self.reason.as_str()
);
if let Some(hook) = &hook {
hook.on_manifest_updated(self.region_id, &edit, manifest_version)
.await;
}
Ok(edit)
}
@@ -470,13 +521,15 @@ impl RegionFlushTask {
let mut encoded_part_count = 0;
let mut flush_metrics = Metrics::new(WriteType::Flush);
let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?;
let hook: Option<FlushHookRef> = self.plugins.get();
let shared_pks: Option<SharedPrimaryKeys> =
hook.as_ref().map(|_| Arc::new(Mutex::new(HashSet::new())));
let mut all_sst_infos = Vec::new();
for mem in memtables {
if mem.is_empty() {
// Skip empty memtables.
continue;
}
// Compact the memtable first, this waits the background compaction to finish.
let compact_start = std::time::Instant::now();
if let Err(e) = mem.compact(true) {
common_telemetry::error!(e; "Failed to compact memtable before flush");
@@ -484,16 +537,12 @@ impl RegionFlushTask {
let compact_cost = compact_start.elapsed();
flush_metrics.compact_memtable += compact_cost;
// Sets `for_flush` flag to true.
let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
let num_mem_ranges = mem_ranges.ranges.len();
// Aggregate stats from all ranges
let num_mem_rows = mem_ranges.num_rows();
let memtable_series_count = mem_ranges.series_count();
let memtable_id = mem.id();
// Increases series count for each mem range. We consider each mem range has different series so
// the counter may have more series than the actual series count.
series_count += memtable_series_count;
let flush_start = Instant::now();
@@ -502,13 +551,12 @@ impl RegionFlushTask {
num_sources,
results,
} = self
.flush_flat_mem_ranges(version, &write_opts, mem_ranges)
.flush_flat_mem_ranges(version, &write_opts, mem_ranges, shared_pks.clone())
.await?;
encoded_part_count += num_encoded;
for (source_idx, result) in results.into_iter().enumerate() {
let (max_sequence, ssts_written, metrics) = result?;
if ssts_written.is_empty() {
// No data written.
continue;
}
@@ -523,7 +571,7 @@ impl RegionFlushTask {
flush_metrics = flush_metrics.merge(metrics);
for sst_info in ssts_written {
for sst_info in &ssts_written {
flushed_bytes += sst_info.file_size;
let pk_range = sst_info
.file_metadata
@@ -537,6 +585,9 @@ impl RegionFlushTask {
pk_range,
));
}
if hook.is_some() {
all_sst_infos.extend(ssts_written);
}
}
common_telemetry::debug!(
@@ -552,12 +603,18 @@ impl RegionFlushTask {
);
}
let primary_keys = shared_pks
.map(|pks| pks.lock().unwrap().drain().collect())
.unwrap_or_default();
Ok(DoFlushMemtablesResult {
file_metas,
flushed_bytes,
series_count,
encoded_part_count,
flush_metrics,
sst_infos: all_sst_infos,
primary_keys,
})
}
@@ -566,6 +623,7 @@ impl RegionFlushTask {
version: &VersionRef,
write_opts: &WriteOptions,
mem_ranges: MemtableRanges,
pk_collector: Option<SharedPrimaryKeys>,
) -> Result<FlushFlatMemResult> {
let batch_schema = to_flat_sst_arrow_schema(
&version.metadata,
@@ -578,6 +636,7 @@ impl RegionFlushTask {
mem_ranges,
&version.options,
field_column_start,
pk_collector.clone(),
)?;
let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
let num_encoded = flat_sources.encoded.len();
@@ -626,7 +685,7 @@ impl RegionFlushTask {
fn new_file_meta(
region_id: RegionId,
max_sequence: u64,
sst_info: SstInfo,
sst_info: &SstInfo,
partition_expr: Option<PartitionExpr>,
primary_key_range: Option<(Bytes, Bytes)>,
) -> FileMeta {
@@ -722,6 +781,8 @@ struct DoFlushMemtablesResult {
series_count: usize,
encoded_part_count: usize,
flush_metrics: Metrics,
sst_infos: Vec<SstInfo>,
primary_keys: Vec<Vec<u8>>,
}
struct FlatSources {
@@ -729,12 +790,26 @@ struct FlatSources {
encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,
}
pub(crate) fn wrap_with_pk_collector(
iter: BoxedRecordBatchIterator,
pk_collector: &Option<SharedPrimaryKeys>,
) -> BoxedRecordBatchIterator {
match pk_collector {
Some(collector) => Box::new(PkCollectingIter {
inner: iter,
primary_keys: collector.clone(),
}),
None => iter,
}
}
/// Returns the max sequence and [FlatSource] for the given memtable.
fn memtable_flat_sources(
schema: SchemaRef,
mem_ranges: MemtableRanges,
options: &RegionOptions,
field_column_start: usize,
pk_collector: Option<SharedPrimaryKeys>,
) -> Result<FlatSources> {
let MemtableRanges { ranges } = mem_ranges;
let mut flat_sources = FlatSources {
@@ -759,6 +834,7 @@ fn memtable_flat_sources(
field_column_start,
iter,
);
let iter = wrap_with_pk_collector(iter, &pk_collector);
flat_sources
.sources
.push((FlatSource::new_iter(schema, iter), max_sequence));
@@ -823,6 +899,7 @@ fn memtable_flat_sources(
field_column_start,
std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
)?;
let maybe_dedup = wrap_with_pk_collector(maybe_dedup, &pk_collector);
flat_sources.sources.push((
FlatSource::new_iter(schema.clone(), maybe_dedup),
@@ -855,6 +932,7 @@ fn memtable_flat_sources(
field_column_start,
input_iters,
)?;
let maybe_dedup = wrap_with_pk_collector(maybe_dedup, &pk_collector);
flat_sources
.sources
@@ -1370,6 +1448,7 @@ mod tests {
flush_semaphore: Arc::new(Semaphore::new(2)),
is_staging: false,
partition_expr: None,
plugins: Plugins::new(),
};
task.push_sender(OptionOutputTx::from(output_tx));
scheduler
@@ -1414,6 +1493,7 @@ mod tests {
flush_semaphore: Arc::new(Semaphore::new(2)),
is_staging: false,
partition_expr: None,
plugins: Plugins::new(),
})
.collect();
// Schedule first task.
@@ -1523,6 +1603,7 @@ mod tests {
mem_ranges,
&options,
metadata.primary_key.len(),
None,
)
.unwrap();
assert!(flat_sources.encoded.is_empty());
@@ -1549,9 +1630,14 @@ mod tests {
..Default::default()
};
let flat_sources =
memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
.unwrap();
let flat_sources = memtable_flat_sources(
schema,
mem_ranges,
&options,
metadata.primary_key.len(),
None,
)
.unwrap();
assert!(flat_sources.encoded.is_empty());
assert_eq!(1, flat_sources.sources.len());
@@ -1600,6 +1686,7 @@ mod tests {
flush_semaphore: Arc::new(Semaphore::new(2)),
is_staging: false,
partition_expr: None,
plugins: Plugins::new(),
})
.collect();
// Schedule first task.

View File

@@ -693,7 +693,7 @@ impl RegionManifestManager {
version
}
fn last_version(&self) -> ManifestVersion {
pub fn last_version(&self) -> ManifestVersion {
self.last_version.load(Ordering::Relaxed)
}

View File

@@ -1163,7 +1163,7 @@ impl FlatSource {
}
}
pub(crate) fn schema(&self) -> &SchemaRef {
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
@@ -1171,7 +1171,6 @@ impl FlatSource {
self.inner.next_batch().await
}
#[cfg(test)]
pub(crate) fn take_iter(self) -> BoxedRecordBatchIterator {
match self.inner {
FlatSourceInner::Iter(iter) => iter,

View File

@@ -298,10 +298,20 @@ impl TestEnv {
}
pub(crate) async fn new_mito_engine(&self, config: MitoConfig) -> MitoEngine {
self.new_mito_engine_with_plugins(config, Plugins::new())
.await
}
pub(crate) async fn new_mito_engine_with_plugins(
&self,
config: MitoConfig,
plugins: Plugins,
) -> MitoEngine {
async fn create<S: LogStore>(
zelf: &TestEnv,
config: MitoConfig,
log_store: Arc<S>,
plugins: Plugins,
) -> MitoEngine {
let data_home = zelf.data_home().display().to_string();
MitoEngine::new(
@@ -312,15 +322,15 @@ impl TestEnv {
zelf.schema_metadata_manager.clone(),
zelf.file_ref_manager.clone(),
zelf.partition_expr_fetcher.clone(),
Plugins::new(),
plugins,
)
.await
.unwrap()
}
match self.log_store.as_ref().unwrap().clone() {
LogStoreImpl::RaftEngine(log_store) => create(self, config, log_store).await,
LogStoreImpl::Kafka(log_store) => create(self, config, log_store).await,
LogStoreImpl::RaftEngine(log_store) => create(self, config, log_store, plugins).await,
LogStoreImpl::Kafka(log_store) => create(self, config, log_store, plugins).await,
}
}
@@ -335,6 +345,21 @@ impl TestEnv {
self.new_mito_engine(config).await
}
/// Creates a new engine with specific config and plugins.
pub async fn create_engine_with_plugins(
&mut self,
config: MitoConfig,
plugins: Plugins,
) -> MitoEngine {
let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;
let object_store_manager = Arc::new(object_store_manager);
self.log_store = Some(log_store.clone());
self.object_store_manager = Some(object_store_manager.clone());
self.new_mito_engine_with_plugins(config, plugins).await
}
/// Creates a new engine with specific config and existing logstore and object store manager.
pub async fn create_follower_engine(&mut self, config: MitoConfig) -> MitoEngine {
self.new_mito_engine(config).await

View File

@@ -619,6 +619,7 @@ impl<S: LogStore> WorkerStarter<S> {
file_ref_manager: self.file_ref_manager.clone(),
partition_expr_fetcher: self.partition_expr_fetcher,
flush_semaphore: self.flush_semaphore,
plugins: self.plugins,
};
let handle = common_runtime::spawn_global(async move {
worker_thread.run().await;
@@ -892,6 +893,8 @@ struct RegionWorkerLoop<S> {
partition_expr_fetcher: PartitionExprFetcherRef,
/// Semaphore to control flush concurrency.
flush_semaphore: Arc<Semaphore>,
/// Plugins for flush hooks.
plugins: Plugins,
}
impl<S: LogStore> RegionWorkerLoop<S> {

View File

@@ -166,6 +166,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
flush_semaphore: self.flush_semaphore.clone(),
is_staging: region.is_staging(),
partition_expr: region.maybe_staging_partition_expr_str(),
plugins: self.plugins.clone(),
}
}
}