From 37f712aa8e9913b51fd42b5d9ea2cf76c575012d Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Fri, 5 Jun 2026 06:07:22 -0700 Subject: [PATCH] feat: use region_hook to cover all possible cases --- src/mito2/src/compaction.rs | 1 + src/mito2/src/compaction/compactor.rs | 7 +- src/mito2/src/compaction/task.rs | 15 +-- src/mito2/src/engine.rs | 2 +- src/mito2/src/engine/flush_hook.rs | 70 ----------- src/mito2/src/engine/flush_test.rs | 36 ++++-- src/mito2/src/engine/region_hook.rs | 132 +++++++++++++++++++++ src/mito2/src/flush.rs | 17 +-- src/mito2/src/region.rs | 28 ++++- src/mito2/src/region/opener.rs | 11 ++ src/mito2/src/test_util/scheduler_util.rs | 1 + src/mito2/src/worker/handle_catchup.rs | 1 + src/mito2/src/worker/handle_copy_region.rs | 9 +- src/mito2/src/worker/handle_create.rs | 1 + src/mito2/src/worker/handle_flush.rs | 1 - src/mito2/src/worker/handle_open.rs | 1 + 16 files changed, 214 insertions(+), 119 deletions(-) delete mode 100644 src/mito2/src/engine/flush_hook.rs create mode 100644 src/mito2/src/engine/region_hook.rs diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 8c98395292..e743ef82ba 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -1885,6 +1885,7 @@ mod tests { Arc::new(ManifestContext::new( manager, RegionRoleState::Leader(RegionLeaderState::Staging), + None, )) }; diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index d540b83b2c..d20270ea48 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -39,6 +39,7 @@ use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::picker::PickerOutput; use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options}; use crate::config::MitoConfig; +use crate::engine::region_hook::RegionHookRef; use crate::error; use crate::error::{ EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result, @@ -181,6 +182,7 @@ pub async fn open_compaction_region( let manifest_ctx = Arc::new(ManifestContext::new( manifest_manager, RegionRoleState::Leader(RegionLeaderState::Writable), + None, )); let file_purger = { @@ -514,6 +516,7 @@ where tasks.push((inputs_to_remove, fut)); } + let hook: Option = compaction_region.plugins.get(); let mut output_files = Vec::with_capacity(tasks.len()); let mut all_sst_infos: Vec = Vec::new(); let mut compacted_inputs = Vec::with_capacity( @@ -541,7 +544,9 @@ where match CancellableFuture::new(handle, self.cancel_handle.clone()).await { Ok(Ok(Ok((files, infos)))) => { output_files.extend(files); - all_sst_infos.extend(infos); + if hook.is_some() { + all_sst_infos.extend(infos); + } compacted_inputs.extend(inputs); } Ok(Ok(Err(e))) => { diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index 6cdb3bf1db..bc024dd6db 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -28,7 +28,7 @@ use crate::compaction::LocalCompactionState; use crate::compaction::compactor::{CompactionRegion, Compactor, MergeOutput}; use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager}; use crate::compaction::picker::{CompactionTask, PickerOutput}; -use crate::engine::flush_hook::{FlushHookRef, SstFileInfo}; +use crate::engine::region_hook::{RegionHookRef, SstFileInfo}; use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED}; @@ -286,7 +286,7 @@ impl CompactionTaskImpl { } async fn invoke_sst_hook(&self, merge_output: &MergeOutput) { - let hook: Option = self.compaction_region.plugins.get(); + let hook: Option = self.compaction_region.plugins.get(); if let Some(hook) = hook { let files: Vec> = merge_output .sst_infos @@ -305,14 +305,6 @@ impl CompactionTaskImpl { .await; } } - - async fn invoke_manifest_hook(&self, edit: &RegionEdit, manifest_version: ManifestVersion) { - let hook: Option = self.compaction_region.plugins.get(); - if let Some(hook) = hook { - hook.on_manifest_updated(self.compaction_region.region_id, edit, manifest_version) - .await; - } - } } #[async_trait::async_trait] @@ -361,8 +353,7 @@ impl CompactionTask for CompactionTaskImpl { }) } else { match self.update_manifest(merge_output).await { - Ok((edit, manifest_version)) => { - self.invoke_manifest_hook(&edit, manifest_version).await; + Ok((edit, _manifest_version)) => { let senders = std::mem::take(&mut self.waiters); BackgroundNotify::CompactionFinished(CompactionFinished { region_id: self.compaction_region.region_id, diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 41215e1ab6..2a4a04af58 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -40,7 +40,7 @@ mod drop_test; mod edit_region_test; #[cfg(test)] mod filter_deleted_test; -pub mod flush_hook; +pub mod region_hook; #[cfg(test)] mod flush_test; #[cfg(test)] diff --git a/src/mito2/src/engine/flush_hook.rs b/src/mito2/src/engine/flush_hook.rs deleted file mode 100644 index 771b83704f..0000000000 --- a/src/mito2/src/engine/flush_hook.rs +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Flush hook extension point for SST and manifest operations. - -use std::sync::Arc; - -use async_trait::async_trait; -use store_api::ManifestVersion; -use store_api::metadata::RegionMetadataRef; -use store_api::storage::RegionId; - -use crate::manifest::action::RegionEdit; -use crate::sst::file::FileMeta; -use crate::sst::parquet::SstInfo; - -/// Information about a single SST file written during flush. -pub struct SstFileInfo<'a> { - pub sst_info_ref: &'a SstInfo, - pub file_meta: &'a FileMeta, -} - -/// Extension hook for flush operations. -/// -/// Implementations can be registered via the `Plugins` system: -/// ```ignore -/// use std::sync::Arc; -/// use common_base::Plugins; -/// use mito2::engine::flush_hook::{FlushHook, FlushHookRef}; -/// -/// plugins.insert(Arc::new(MyHook) as FlushHookRef); -/// ``` -#[async_trait] -pub trait FlushHook: Send + Sync { - /// Called after SST files are written during flush. - /// - /// - `files`: per-file metadata (SstInfo + FileMeta) for each SST written. - /// - `region_metadata`: provides the schema for column type information. - async fn on_sst_files_written( - &self, - region_id: RegionId, - region_metadata: &RegionMetadataRef, - files: &[SstFileInfo<'_>], - ) { - let _ = (region_id, region_metadata, files); - } - - /// Called after the region manifest is successfully updated. - async fn on_manifest_updated( - &self, - region_id: RegionId, - edit: &RegionEdit, - manifest_version: ManifestVersion, - ) { - let _ = (region_id, edit, manifest_version); - } -} - -pub type FlushHookRef = Arc; diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index e85b854b1f..4fcbcade90 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -34,9 +34,9 @@ use store_api::storage::{RegionId, ScanRequest}; use tokio::sync::Notify; use crate::config::MitoConfig; -use crate::engine::flush_hook::{FlushHook, FlushHookRef, SstFileInfo}; use crate::engine::listener::{FlushListener, StallListener}; -use crate::manifest::action::RegionEdit; +use crate::engine::region_hook::{RegionHook, RegionHookRef, SstFileInfo}; +use crate::manifest::action::RegionMetaActionList; use crate::test_util::{ CreateRequestBuilder, LogStoreFactory, MockWriteBufferManager, TestEnv, build_rows, build_rows_for_key, flush_region, kafka_log_store_factory, multiple_log_store_factories, @@ -668,13 +668,14 @@ async fn test_update_topic_latest_entry_id(factory: Option) { assert_eq!(region.topic_latest_entry_id.load(Ordering::Relaxed), 1); } -struct MockFlushHook { +#[derive(Debug)] +struct MockRegionHook { sst_written_count: AtomicUsize, manifest_updated_count: AtomicUsize, notify: Notify, } -impl MockFlushHook { +impl MockRegionHook { fn new() -> Self { Self { sst_written_count: AtomicUsize::new(0), @@ -689,7 +690,7 @@ impl MockFlushHook { } #[async_trait] -impl FlushHook for MockFlushHook { +impl RegionHook for MockRegionHook { async fn on_sst_files_written( &self, region_id: RegionId, @@ -699,7 +700,7 @@ impl FlushHook for MockFlushHook { self.sst_written_count .fetch_add(files.len(), Ordering::Relaxed); common_telemetry::info!( - "MockFlushHook::on_sst_files_written: region={}, files={}", + "MockRegionHook::on_sst_files_written: region={}, files={}", region_id, files.len(), ); @@ -718,28 +719,37 @@ impl FlushHook for MockFlushHook { async fn on_manifest_updated( &self, region_id: RegionId, - edit: &RegionEdit, + action_list: &RegionMetaActionList, manifest_version: ManifestVersion, ) { self.manifest_updated_count.fetch_add(1, Ordering::Relaxed); + // Count files added across all Edit actions. + let files_added: usize = action_list + .actions + .iter() + .map(|action| match action { + crate::manifest::action::RegionMetaAction::Edit(edit) => edit.files_to_add.len(), + _ => 0, + }) + .sum(); common_telemetry::info!( - "MockFlushHook::on_manifest_updated: region={}, manifest_version={}, files_added={}", + "MockRegionHook::on_manifest_updated: region={}, manifest_version={}, files_added={}", region_id, manifest_version, - edit.files_to_add.len(), + files_added, ); self.notify.notify_one(); } } #[tokio::test] -async fn test_flush_hook() { +async fn test_region_hook() { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let hook = Arc::new(MockFlushHook::new()); + let hook = Arc::new(MockRegionHook::new()); let plugins = Plugins::new(); - plugins.insert(hook.clone() as FlushHookRef); + plugins.insert(hook.clone() as RegionHookRef); let engine = env .create_engine_with_plugins(MitoConfig::default(), plugins) @@ -782,7 +792,7 @@ async fn test_flush_hook() { ); common_telemetry::info!( - "test_flush_hook passed: sst_count={}, manifest_count={}", + "test_region_hook passed: sst_count={}, manifest_count={}", sst_count, manifest_count, ); diff --git a/src/mito2/src/engine/region_hook.rs b/src/mito2/src/engine/region_hook.rs new file mode 100644 index 0000000000..c5ae12651d --- /dev/null +++ b/src/mito2/src/engine/region_hook.rs @@ -0,0 +1,132 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Region hook extension point for observing SST writes and manifest mutations. +//! +//! ## Design +//! +//! The [`RegionHook`] trait provides two methods with clear separation of concerns: +//! +//! - [`on_sst_files_written`]: Fires when mito2 physically writes SST **data files**. +//! Provides per-file [`SstInfo`] + [`FileMeta`] — the richest available metadata +//! (row counts, index metadata, Parquet metadata, etc.). +//! +//! - [`on_manifest_updated`]: Fires after **any** manifest write is successfully committed. +//! Receives the full [`RegionMetaActionList`] so consumers can inspect what changed +//! (file additions/removals, schema changes, truncation, partition expression changes, etc.). +//! +//! Hook implementations are registered via the [`Plugins`](common_base::Plugins) system: +//! ```ignore +//! plugins.insert(Arc::new(MyHook) as RegionHookRef); +//! ``` +//! +//! ## Coverage +//! +//! | Scenario | `on_sst_files_written` | `on_manifest_updated` | +//! |------------------------------|:----------------------:|:---------------------:| +//! | Flush (memtable → SST) | ✅ Yes | ✅ Yes | +//! | Local compaction | ✅ Yes | ✅ Yes | +//! | Remote compaction | ✅ (on compactor node) | ✅ (on compactor node) | +//! | RegionEdit / bulk ingestion | ❌ (files pre-written) | ✅ Yes | +//! | Copy region | ❌ (object-store copy) | ✅ Yes | +//! | Apply staging | ❌ (delegates to edit) | ✅ Yes | +//! | Alter (schema change) | ❌ (no SST files) | ✅ Yes | +//! | Truncate | ❌ (removes files) | ✅ Yes | +//! | Enter staging | ❌ (no SST files) | ✅ Yes | +//! | Async index build | ❌ (index files only) | ✅ Yes | +//! +//! The following paths do **not** trigger any hook: +//! - Follower region sync / catchup (manifest read-only; followers don't author changes) +//! - GC / checkpoint / drop / remap (internal bookkeeping, not logical state changes) +//! +//! ## Invocation points +//! +//! `on_sst_files_written` is invoked at the SST write site (flush task or compaction task), +//! immediately after SST files are written but **before** the manifest is committed. +//! +//! `on_manifest_updated` is centralized in [`ManifestContext::update_manifest_with_state_check`], +//! so it automatically covers all manifest write paths that go through `ManifestContext`. +//! +//! ## Future work +//! +//! A future `on_files_removed` hook may be added to observe file lifecycle end +//! (GC, drop, truncate, compaction removal). This is not yet implemented. +//! +//! [`on_sst_files_written`]: RegionHook::on_sst_files_written +//! [`on_manifest_updated`]: RegionHook::on_manifest_updated + +use std::fmt::Debug; +use std::sync::Arc; + +use async_trait::async_trait; +use store_api::ManifestVersion; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::RegionId; + +use crate::manifest::action::RegionMetaActionList; +use crate::sst::file::FileMeta; +use crate::sst::parquet::SstInfo; + +/// Information about a single SST data file written during flush or compaction. +pub struct SstFileInfo<'a> { + pub sst_info_ref: &'a SstInfo, + pub file_meta: &'a FileMeta, +} + +/// Hook for observing region mutations in mito2. +/// +/// Implementations can be registered via the `Plugins` system: +/// ```ignore +/// use std::sync::Arc; +/// use common_base::Plugins; +/// use mito2::engine::region_hook::{RegionHook, RegionHookRef}; +/// +/// plugins.insert(Arc::new(MyHook) as RegionHookRef); +/// ``` +#[async_trait] +pub trait RegionHook: Send + Sync + Debug { + /// Called after SST **data files** are physically written, before manifest commit. + /// + /// This fires only when mito2 itself writes SST files (flush and compaction). + /// It does **not** fire when SST files are pre-written externally (bulk ingestion, + /// copy region) or when only index files are written (async index build). + async fn on_sst_files_written( + &self, + region_id: RegionId, + region_metadata: &RegionMetadataRef, + files: &[SstFileInfo<'_>], + ) { + let _ = (region_id, region_metadata, files); + } + + /// Called after the region manifest is successfully committed. + /// + /// Fires for **all** manifest write paths: flush, compaction, region edit, + /// copy region, alter, truncate, enter staging, index build, etc. + /// + /// Does **not** fire for: + /// - Manifest reads / follower sync (no write) + /// - GC / checkpoint (internal bookkeeping) + /// - Failed manifest updates + async fn on_manifest_updated( + &self, + region_id: RegionId, + action_list: &RegionMetaActionList, + manifest_version: ManifestVersion, + ) { + let _ = (region_id, action_list, manifest_version); + } +} + +pub type RegionHookRef = Arc; diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index c5428ab8be..3105816393 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -21,7 +21,6 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Instant; use bytes::Bytes; -use common_base::Plugins; use common_telemetry::{debug, error, info}; use datatypes::arrow::datatypes::SchemaRef; use partition::expr::PartitionExpr; @@ -37,7 +36,7 @@ use crate::access_layer::{ }; use crate::cache::CacheManagerRef; use crate::config::MitoConfig; -use crate::engine::flush_hook::{FlushHookRef, SstFileInfo}; +use crate::engine::region_hook::SstFileInfo; use crate::error::{ Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, @@ -277,8 +276,6 @@ pub(crate) struct RegionFlushTask { /// /// This is used to generate the file meta. pub(crate) partition_expr: Option, - /// Plugins for flush hooks. - pub(crate) plugins: Plugins, } impl RegionFlushTask { @@ -419,7 +416,7 @@ impl RegionFlushTask { ); flush_metrics.observe(); - let hook: Option = self.plugins.get(); + let hook = self.manifest_ctx.hook(); if let Some(hook) = &hook { let files: Vec> = sst_infos .iter() @@ -474,11 +471,6 @@ impl RegionFlushTask { self.reason.as_str() ); - if let Some(hook) = &hook { - hook.on_manifest_updated(self.region_id, &edit, manifest_version) - .await; - } - Ok(edit) } @@ -494,7 +486,7 @@ impl RegionFlushTask { let mut encoded_part_count = 0; let mut flush_metrics = Metrics::new(WriteType::Flush); let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?; - let hook: Option = self.plugins.get(); + let hook = self.manifest_ctx.hook(); let mut all_sst_infos = Vec::new(); for mem in memtables { if mem.is_empty() { @@ -1401,7 +1393,6 @@ mod tests { flush_semaphore: Arc::new(Semaphore::new(2)), is_staging: false, partition_expr: None, - plugins: Plugins::new(), }; task.push_sender(OptionOutputTx::from(output_tx)); scheduler @@ -1446,7 +1437,6 @@ mod tests { flush_semaphore: Arc::new(Semaphore::new(2)), is_staging: false, partition_expr: None, - plugins: Plugins::new(), }) .collect(); // Schedule first task. @@ -1633,7 +1623,6 @@ mod tests { flush_semaphore: Arc::new(Semaphore::new(2)), is_staging: false, partition_expr: None, - plugins: Plugins::new(), }) .collect(); // Schedule first task. diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 4acec5a893..93ad67bb58 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -45,6 +45,7 @@ use tokio::sync::RwLockWriteGuard; pub use utils::*; use crate::access_layer::AccessLayerRef; +use crate::engine::region_hook::RegionHookRef; use crate::error::{ FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result, UnexpectedSnafu, UpdateManifestSnafu, @@ -947,17 +948,29 @@ pub(crate) struct ManifestContext { /// During the staging mode, the region metadata in [`VersionControlRef`] is not updated, /// so we need to store the partition info separately. staging_partition_info: Mutex>, + /// Optional region hook for observing manifest mutations. + hook: Option, } impl ManifestContext { - pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self { + pub(crate) fn new( + manager: RegionManifestManager, + state: RegionRoleState, + hook: Option, + ) -> Self { ManifestContext { manifest_manager: tokio::sync::RwLock::new(manager), state: AtomicCell::new(state), staging_partition_info: Mutex::new(None), + hook, } } + /// Returns the region hook if one is registered. + pub(crate) fn hook(&self) -> Option { + self.hook.clone() + } + pub(crate) fn staging_partition_info(&self) -> Option { self.staging_partition_info.lock().unwrap().clone() } @@ -1174,6 +1187,10 @@ impl ManifestContext { } // Now we can update the manifest. + // If a region hook is registered, clone the action list before it is consumed + // so we can pass a reference to the hook after a successful update. + let hook = self.hook.clone(); + let action_list_for_hook = hook.as_ref().map(|_| action_list.clone()); let version = manager.update(action_list, is_staging).await.inspect_err( |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id), )?; @@ -1185,6 +1202,12 @@ impl ManifestContext { ); } + if let Some(hook) = hook + && let Some(action_list) = action_list_for_hook { + hook.on_manifest_updated(manifest.metadata.region_id, &action_list, version) + .await; + } + Ok(version) } @@ -1718,6 +1741,7 @@ mod tests { let manifest_ctx = Arc::new(ManifestContext::new( manager, RegionRoleState::Leader(RegionLeaderState::Writable), + None, )); MitoRegion { @@ -2007,6 +2031,7 @@ mod tests { Arc::new(ManifestContext::new( manager, RegionRoleState::Leader(RegionLeaderState::Staging), + None, )) }; @@ -2075,6 +2100,7 @@ mod tests { let manifest_ctx = Arc::new(ManifestContext::new( manager, RegionRoleState::Leader(RegionLeaderState::Writable), + None, )); let region = MitoRegion { diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 412172aead..5e9db57d35 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -45,6 +45,7 @@ use crate::access_layer::AccessLayer; use crate::cache::CacheManagerRef; use crate::cache::file_cache::{FileCache, FileType, IndexKey}; use crate::config::MitoConfig; +use crate::engine::region_hook::RegionHookRef; use crate::error; use crate::error::{ EmptyRegionDirSnafu, InvalidMetadataSnafu, InvalidRegionOptionsSnafu, ObjectStoreNotFoundSnafu, @@ -114,6 +115,7 @@ pub(crate) struct RegionOpener { replay_checkpoint: Option, file_ref_manager: FileReferenceManagerRef, partition_expr_fetcher: PartitionExprFetcherRef, + hook: Option, } impl RegionOpener { @@ -152,9 +154,16 @@ impl RegionOpener { replay_checkpoint: None, file_ref_manager, partition_expr_fetcher, + hook: None, } } + /// Sets the region hook for observing manifest mutations. + pub(crate) fn hook(mut self, hook: Option) -> Self { + self.hook = hook; + self + } + /// Sets metadata builder of the region to create. pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self { self.metadata_builder = Some(builder); @@ -358,6 +367,7 @@ impl RegionOpener { manifest_ctx: Arc::new(ManifestContext::new( manifest_manager, RegionRoleState::Leader(RegionLeaderState::Writable), + self.hook.clone(), )), file_purger: create_file_purger( config.gc.enable, @@ -601,6 +611,7 @@ impl RegionOpener { manifest_ctx: Arc::new(ManifestContext::new( manifest_manager, RegionRoleState::Follower, + self.hook.clone(), )), file_purger, provider: provider.clone(), diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index 94a16f0b1d..10d4bffec5 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -144,6 +144,7 @@ impl SchedulerEnv { .await .unwrap(), RegionRoleState::Leader(RegionLeaderState::Writable), + None, )) } diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index ad517c9ef7..71d3f05c69 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -125,6 +125,7 @@ impl RegionWorkerLoop { self.partition_expr_fetcher.clone(), ) .cache(Some(self.cache_manager.clone())) + .hook(self.plugins.get()) .options(region.version().options.clone())? .skip_wal_replay(true) .open(&self.config, &self.wal) diff --git a/src/mito2/src/worker/handle_copy_region.rs b/src/mito2/src/worker/handle_copy_region.rs index aa5b26448e..b8f6acf09f 100644 --- a/src/mito2/src/worker/handle_copy_region.rs +++ b/src/mito2/src/worker/handle_copy_region.rs @@ -19,7 +19,7 @@ use store_api::storage::{FileId, RegionId}; use crate::error::{InvalidRequestSnafu, MissingManifestSnafu, Result}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; -use crate::region::{FileDescriptor, MitoRegionRef, RegionFileCopier, RegionMetadataLoader}; +use crate::region::{FileDescriptor, MitoRegionRef, RegionFileCopier, RegionLeaderState, RegionMetadataLoader}; use crate::request::{ BackgroundNotify, CopyRegionFromFinished, CopyRegionFromRequest, WorkerRequest, }; @@ -228,13 +228,10 @@ impl RegionWorkerLoop { committed_sequence: None, }; let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); - info!("Applying {edit:?} to region {target_region_id}, reason: CopyRegionFrom"); + info!("Applying {:?} to region {target_region_id}, reason: CopyRegionFrom", edit); let version = region .manifest_ctx - .manifest_manager - .write() - .await - .update(action_list, false) + .update_manifest(RegionLeaderState::Writable, action_list, false) .await?; info!( "Successfully update manifest version to {version}, region: {target_region_id}, reason: CopyRegionFrom" diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 9e812ba88f..c1ab18d59d 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -70,6 +70,7 @@ impl RegionWorkerLoop { .metadata_builder(builder) .parse_options(request.options)? .cache(Some(self.cache_manager.clone())) + .hook(self.plugins.get()) .create_or_open(&self.config, &self.wal) .await?; diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 1932a31c7a..6ad0e037e1 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -166,7 +166,6 @@ impl RegionWorkerLoop { flush_semaphore: self.flush_semaphore.clone(), is_staging: region.is_staging(), partition_expr: region.maybe_staging_partition_expr_str(), - plugins: self.plugins.clone(), } } } diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index a154140d98..15e800c270 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -107,6 +107,7 @@ impl RegionWorkerLoop { ) .skip_wal_replay(request.skip_wal_replay) .cache(Some(self.cache_manager.clone())) + .hook(self.plugins.get()) .wal_entry_reader(wal_entry_receiver.map(|receiver| Box::new(receiver) as _)) .replay_checkpoint(request.checkpoint.map(|checkpoint| checkpoint.entry_id)) .parse_options(request.options.clone())