feat: use region_hook to cover all possible cases

This commit is contained in:
Ning Sun
2026-06-05 06:07:22 -07:00
parent d34071000b
commit 37f712aa8e
16 changed files with 214 additions and 119 deletions

View File

@@ -1885,6 +1885,7 @@ mod tests {
Arc::new(ManifestContext::new(
manager,
RegionRoleState::Leader(RegionLeaderState::Staging),
None,
))
};

View File

@@ -39,6 +39,7 @@ use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::picker::PickerOutput;
use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options};
use crate::config::MitoConfig;
use crate::engine::region_hook::RegionHookRef;
use crate::error;
use crate::error::{
EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result,
@@ -181,6 +182,7 @@ pub async fn open_compaction_region(
let manifest_ctx = Arc::new(ManifestContext::new(
manifest_manager,
RegionRoleState::Leader(RegionLeaderState::Writable),
None,
));
let file_purger = {
@@ -514,6 +516,7 @@ where
tasks.push((inputs_to_remove, fut));
}
let hook: Option<RegionHookRef> = compaction_region.plugins.get();
let mut output_files = Vec::with_capacity(tasks.len());
let mut all_sst_infos: Vec<SstInfo> = Vec::new();
let mut compacted_inputs = Vec::with_capacity(
@@ -541,7 +544,9 @@ where
match CancellableFuture::new(handle, self.cancel_handle.clone()).await {
Ok(Ok(Ok((files, infos)))) => {
output_files.extend(files);
all_sst_infos.extend(infos);
if hook.is_some() {
all_sst_infos.extend(infos);
}
compacted_inputs.extend(inputs);
}
Ok(Ok(Err(e))) => {

View File

@@ -28,7 +28,7 @@ use crate::compaction::LocalCompactionState;
use crate::compaction::compactor::{CompactionRegion, Compactor, MergeOutput};
use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager};
use crate::compaction::picker::{CompactionTask, PickerOutput};
use crate::engine::flush_hook::{FlushHookRef, SstFileInfo};
use crate::engine::region_hook::{RegionHookRef, SstFileInfo};
use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
@@ -286,7 +286,7 @@ impl CompactionTaskImpl {
}
async fn invoke_sst_hook(&self, merge_output: &MergeOutput) {
let hook: Option<FlushHookRef> = self.compaction_region.plugins.get();
let hook: Option<RegionHookRef> = self.compaction_region.plugins.get();
if let Some(hook) = hook {
let files: Vec<SstFileInfo<'_>> = merge_output
.sst_infos
@@ -305,14 +305,6 @@ impl CompactionTaskImpl {
.await;
}
}
async fn invoke_manifest_hook(&self, edit: &RegionEdit, manifest_version: ManifestVersion) {
let hook: Option<FlushHookRef> = self.compaction_region.plugins.get();
if let Some(hook) = hook {
hook.on_manifest_updated(self.compaction_region.region_id, edit, manifest_version)
.await;
}
}
}
#[async_trait::async_trait]
@@ -361,8 +353,7 @@ impl CompactionTask for CompactionTaskImpl {
})
} else {
match self.update_manifest(merge_output).await {
Ok((edit, manifest_version)) => {
self.invoke_manifest_hook(&edit, manifest_version).await;
Ok((edit, _manifest_version)) => {
let senders = std::mem::take(&mut self.waiters);
BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.compaction_region.region_id,

View File

@@ -40,7 +40,7 @@ mod drop_test;
mod edit_region_test;
#[cfg(test)]
mod filter_deleted_test;
pub mod flush_hook;
pub mod region_hook;
#[cfg(test)]
mod flush_test;
#[cfg(test)]

View File

@@ -1,70 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Flush hook extension point for SST and manifest operations.
use std::sync::Arc;
use async_trait::async_trait;
use store_api::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use crate::manifest::action::RegionEdit;
use crate::sst::file::FileMeta;
use crate::sst::parquet::SstInfo;
/// Information about a single SST file written during flush.
pub struct SstFileInfo<'a> {
pub sst_info_ref: &'a SstInfo,
pub file_meta: &'a FileMeta,
}
/// Extension hook for flush operations.
///
/// Implementations can be registered via the `Plugins` system:
/// ```ignore
/// use std::sync::Arc;
/// use common_base::Plugins;
/// use mito2::engine::flush_hook::{FlushHook, FlushHookRef};
///
/// plugins.insert(Arc::new(MyHook) as FlushHookRef);
/// ```
#[async_trait]
pub trait FlushHook: Send + Sync {
/// Called after SST files are written during flush.
///
/// - `files`: per-file metadata (SstInfo + FileMeta) for each SST written.
/// - `region_metadata`: provides the schema for column type information.
async fn on_sst_files_written(
&self,
region_id: RegionId,
region_metadata: &RegionMetadataRef,
files: &[SstFileInfo<'_>],
) {
let _ = (region_id, region_metadata, files);
}
/// Called after the region manifest is successfully updated.
async fn on_manifest_updated(
&self,
region_id: RegionId,
edit: &RegionEdit,
manifest_version: ManifestVersion,
) {
let _ = (region_id, edit, manifest_version);
}
}
pub type FlushHookRef = Arc<dyn FlushHook>;

View File

@@ -34,9 +34,9 @@ use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::Notify;
use crate::config::MitoConfig;
use crate::engine::flush_hook::{FlushHook, FlushHookRef, SstFileInfo};
use crate::engine::listener::{FlushListener, StallListener};
use crate::manifest::action::RegionEdit;
use crate::engine::region_hook::{RegionHook, RegionHookRef, SstFileInfo};
use crate::manifest::action::RegionMetaActionList;
use crate::test_util::{
CreateRequestBuilder, LogStoreFactory, MockWriteBufferManager, TestEnv, build_rows,
build_rows_for_key, flush_region, kafka_log_store_factory, multiple_log_store_factories,
@@ -668,13 +668,14 @@ async fn test_update_topic_latest_entry_id(factory: Option<LogStoreFactory>) {
assert_eq!(region.topic_latest_entry_id.load(Ordering::Relaxed), 1);
}
struct MockFlushHook {
#[derive(Debug)]
struct MockRegionHook {
sst_written_count: AtomicUsize,
manifest_updated_count: AtomicUsize,
notify: Notify,
}
impl MockFlushHook {
impl MockRegionHook {
fn new() -> Self {
Self {
sst_written_count: AtomicUsize::new(0),
@@ -689,7 +690,7 @@ impl MockFlushHook {
}
#[async_trait]
impl FlushHook for MockFlushHook {
impl RegionHook for MockRegionHook {
async fn on_sst_files_written(
&self,
region_id: RegionId,
@@ -699,7 +700,7 @@ impl FlushHook for MockFlushHook {
self.sst_written_count
.fetch_add(files.len(), Ordering::Relaxed);
common_telemetry::info!(
"MockFlushHook::on_sst_files_written: region={}, files={}",
"MockRegionHook::on_sst_files_written: region={}, files={}",
region_id,
files.len(),
);
@@ -718,28 +719,37 @@ impl FlushHook for MockFlushHook {
async fn on_manifest_updated(
&self,
region_id: RegionId,
edit: &RegionEdit,
action_list: &RegionMetaActionList,
manifest_version: ManifestVersion,
) {
self.manifest_updated_count.fetch_add(1, Ordering::Relaxed);
// Count files added across all Edit actions.
let files_added: usize = action_list
.actions
.iter()
.map(|action| match action {
crate::manifest::action::RegionMetaAction::Edit(edit) => edit.files_to_add.len(),
_ => 0,
})
.sum();
common_telemetry::info!(
"MockFlushHook::on_manifest_updated: region={}, manifest_version={}, files_added={}",
"MockRegionHook::on_manifest_updated: region={}, manifest_version={}, files_added={}",
region_id,
manifest_version,
edit.files_to_add.len(),
files_added,
);
self.notify.notify_one();
}
}
#[tokio::test]
async fn test_flush_hook() {
async fn test_region_hook() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let hook = Arc::new(MockFlushHook::new());
let hook = Arc::new(MockRegionHook::new());
let plugins = Plugins::new();
plugins.insert(hook.clone() as FlushHookRef);
plugins.insert(hook.clone() as RegionHookRef);
let engine = env
.create_engine_with_plugins(MitoConfig::default(), plugins)
@@ -782,7 +792,7 @@ async fn test_flush_hook() {
);
common_telemetry::info!(
"test_flush_hook passed: sst_count={}, manifest_count={}",
"test_region_hook passed: sst_count={}, manifest_count={}",
sst_count,
manifest_count,
);

View File

@@ -0,0 +1,132 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Region hook extension point for observing SST writes and manifest mutations.
//!
//! ## Design
//!
//! The [`RegionHook`] trait provides two methods with clear separation of concerns:
//!
//! - [`on_sst_files_written`]: Fires when mito2 physically writes SST **data files**.
//! Provides per-file [`SstInfo`] + [`FileMeta`] — the richest available metadata
//! (row counts, index metadata, Parquet metadata, etc.).
//!
//! - [`on_manifest_updated`]: Fires after **any** manifest write is successfully committed.
//! Receives the full [`RegionMetaActionList`] so consumers can inspect what changed
//! (file additions/removals, schema changes, truncation, partition expression changes, etc.).
//!
//! Hook implementations are registered via the [`Plugins`](common_base::Plugins) system:
//! ```ignore
//! plugins.insert(Arc::new(MyHook) as RegionHookRef);
//! ```
//!
//! ## Coverage
//!
//! | Scenario | `on_sst_files_written` | `on_manifest_updated` |
//! |------------------------------|:----------------------:|:---------------------:|
//! | Flush (memtable → SST) | ✅ Yes | ✅ Yes |
//! | Local compaction | ✅ Yes | ✅ Yes |
//! | Remote compaction | ✅ (on compactor node) | ✅ (on compactor node) |
//! | RegionEdit / bulk ingestion | ❌ (files pre-written) | ✅ Yes |
//! | Copy region | ❌ (object-store copy) | ✅ Yes |
//! | Apply staging | ❌ (delegates to edit) | ✅ Yes |
//! | Alter (schema change) | ❌ (no SST files) | ✅ Yes |
//! | Truncate | ❌ (removes files) | ✅ Yes |
//! | Enter staging | ❌ (no SST files) | ✅ Yes |
//! | Async index build | ❌ (index files only) | ✅ Yes |
//!
//! The following paths do **not** trigger any hook:
//! - Follower region sync / catchup (manifest read-only; followers don't author changes)
//! - GC / checkpoint / drop / remap (internal bookkeeping, not logical state changes)
//!
//! ## Invocation points
//!
//! `on_sst_files_written` is invoked at the SST write site (flush task or compaction task),
//! immediately after SST files are written but **before** the manifest is committed.
//!
//! `on_manifest_updated` is centralized in [`ManifestContext::update_manifest_with_state_check`],
//! so it automatically covers all manifest write paths that go through `ManifestContext`.
//!
//! ## Future work
//!
//! A future `on_files_removed` hook may be added to observe file lifecycle end
//! (GC, drop, truncate, compaction removal). This is not yet implemented.
//!
//! [`on_sst_files_written`]: RegionHook::on_sst_files_written
//! [`on_manifest_updated`]: RegionHook::on_manifest_updated
use std::fmt::Debug;
use std::sync::Arc;
use async_trait::async_trait;
use store_api::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use crate::manifest::action::RegionMetaActionList;
use crate::sst::file::FileMeta;
use crate::sst::parquet::SstInfo;
/// Information about a single SST data file written during flush or compaction.
pub struct SstFileInfo<'a> {
pub sst_info_ref: &'a SstInfo,
pub file_meta: &'a FileMeta,
}
/// Hook for observing region mutations in mito2.
///
/// Implementations can be registered via the `Plugins` system:
/// ```ignore
/// use std::sync::Arc;
/// use common_base::Plugins;
/// use mito2::engine::region_hook::{RegionHook, RegionHookRef};
///
/// plugins.insert(Arc::new(MyHook) as RegionHookRef);
/// ```
#[async_trait]
pub trait RegionHook: Send + Sync + Debug {
/// Called after SST **data files** are physically written, before manifest commit.
///
/// This fires only when mito2 itself writes SST files (flush and compaction).
/// It does **not** fire when SST files are pre-written externally (bulk ingestion,
/// copy region) or when only index files are written (async index build).
async fn on_sst_files_written(
&self,
region_id: RegionId,
region_metadata: &RegionMetadataRef,
files: &[SstFileInfo<'_>],
) {
let _ = (region_id, region_metadata, files);
}
/// Called after the region manifest is successfully committed.
///
/// Fires for **all** manifest write paths: flush, compaction, region edit,
/// copy region, alter, truncate, enter staging, index build, etc.
///
/// Does **not** fire for:
/// - Manifest reads / follower sync (no write)
/// - GC / checkpoint (internal bookkeeping)
/// - Failed manifest updates
async fn on_manifest_updated(
&self,
region_id: RegionId,
action_list: &RegionMetaActionList,
manifest_version: ManifestVersion,
) {
let _ = (region_id, action_list, manifest_version);
}
}
pub type RegionHookRef = Arc<dyn RegionHook>;

View File

@@ -21,7 +21,6 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use bytes::Bytes;
use common_base::Plugins;
use common_telemetry::{debug, error, info};
use datatypes::arrow::datatypes::SchemaRef;
use partition::expr::PartitionExpr;
@@ -37,7 +36,7 @@ use crate::access_layer::{
};
use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::engine::flush_hook::{FlushHookRef, SstFileInfo};
use crate::engine::region_hook::SstFileInfo;
use crate::error::{
Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu,
RegionTruncatedSnafu, Result,
@@ -277,8 +276,6 @@ pub(crate) struct RegionFlushTask {
///
/// This is used to generate the file meta.
pub(crate) partition_expr: Option<String>,
/// Plugins for flush hooks.
pub(crate) plugins: Plugins,
}
impl RegionFlushTask {
@@ -419,7 +416,7 @@ impl RegionFlushTask {
);
flush_metrics.observe();
let hook: Option<FlushHookRef> = self.plugins.get();
let hook = self.manifest_ctx.hook();
if let Some(hook) = &hook {
let files: Vec<SstFileInfo<'_>> = sst_infos
.iter()
@@ -474,11 +471,6 @@ impl RegionFlushTask {
self.reason.as_str()
);
if let Some(hook) = &hook {
hook.on_manifest_updated(self.region_id, &edit, manifest_version)
.await;
}
Ok(edit)
}
@@ -494,7 +486,7 @@ impl RegionFlushTask {
let mut encoded_part_count = 0;
let mut flush_metrics = Metrics::new(WriteType::Flush);
let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?;
let hook: Option<FlushHookRef> = self.plugins.get();
let hook = self.manifest_ctx.hook();
let mut all_sst_infos = Vec::new();
for mem in memtables {
if mem.is_empty() {
@@ -1401,7 +1393,6 @@ mod tests {
flush_semaphore: Arc::new(Semaphore::new(2)),
is_staging: false,
partition_expr: None,
plugins: Plugins::new(),
};
task.push_sender(OptionOutputTx::from(output_tx));
scheduler
@@ -1446,7 +1437,6 @@ mod tests {
flush_semaphore: Arc::new(Semaphore::new(2)),
is_staging: false,
partition_expr: None,
plugins: Plugins::new(),
})
.collect();
// Schedule first task.
@@ -1633,7 +1623,6 @@ mod tests {
flush_semaphore: Arc::new(Semaphore::new(2)),
is_staging: false,
partition_expr: None,
plugins: Plugins::new(),
})
.collect();
// Schedule first task.

View File

@@ -45,6 +45,7 @@ use tokio::sync::RwLockWriteGuard;
pub use utils::*;
use crate::access_layer::AccessLayerRef;
use crate::engine::region_hook::RegionHookRef;
use crate::error::{
FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu,
RegionTruncatedSnafu, Result, UnexpectedSnafu, UpdateManifestSnafu,
@@ -947,17 +948,29 @@ pub(crate) struct ManifestContext {
/// During the staging mode, the region metadata in [`VersionControlRef`] is not updated,
/// so we need to store the partition info separately.
staging_partition_info: Mutex<Option<StagingPartitionInfo>>,
/// Optional region hook for observing manifest mutations.
hook: Option<RegionHookRef>,
}
impl ManifestContext {
pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
pub(crate) fn new(
manager: RegionManifestManager,
state: RegionRoleState,
hook: Option<RegionHookRef>,
) -> Self {
ManifestContext {
manifest_manager: tokio::sync::RwLock::new(manager),
state: AtomicCell::new(state),
staging_partition_info: Mutex::new(None),
hook,
}
}
/// Returns the region hook if one is registered.
pub(crate) fn hook(&self) -> Option<RegionHookRef> {
self.hook.clone()
}
pub(crate) fn staging_partition_info(&self) -> Option<StagingPartitionInfo> {
self.staging_partition_info.lock().unwrap().clone()
}
@@ -1174,6 +1187,10 @@ impl ManifestContext {
}
// Now we can update the manifest.
// If a region hook is registered, clone the action list before it is consumed
// so we can pass a reference to the hook after a successful update.
let hook = self.hook.clone();
let action_list_for_hook = hook.as_ref().map(|_| action_list.clone());
let version = manager.update(action_list, is_staging).await.inspect_err(
|e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
)?;
@@ -1185,6 +1202,12 @@ impl ManifestContext {
);
}
if let Some(hook) = hook
&& let Some(action_list) = action_list_for_hook {
hook.on_manifest_updated(manifest.metadata.region_id, &action_list, version)
.await;
}
Ok(version)
}
@@ -1718,6 +1741,7 @@ mod tests {
let manifest_ctx = Arc::new(ManifestContext::new(
manager,
RegionRoleState::Leader(RegionLeaderState::Writable),
None,
));
MitoRegion {
@@ -2007,6 +2031,7 @@ mod tests {
Arc::new(ManifestContext::new(
manager,
RegionRoleState::Leader(RegionLeaderState::Staging),
None,
))
};
@@ -2075,6 +2100,7 @@ mod tests {
let manifest_ctx = Arc::new(ManifestContext::new(
manager,
RegionRoleState::Leader(RegionLeaderState::Writable),
None,
));
let region = MitoRegion {

View File

@@ -45,6 +45,7 @@ use crate::access_layer::AccessLayer;
use crate::cache::CacheManagerRef;
use crate::cache::file_cache::{FileCache, FileType, IndexKey};
use crate::config::MitoConfig;
use crate::engine::region_hook::RegionHookRef;
use crate::error;
use crate::error::{
EmptyRegionDirSnafu, InvalidMetadataSnafu, InvalidRegionOptionsSnafu, ObjectStoreNotFoundSnafu,
@@ -114,6 +115,7 @@ pub(crate) struct RegionOpener {
replay_checkpoint: Option<u64>,
file_ref_manager: FileReferenceManagerRef,
partition_expr_fetcher: PartitionExprFetcherRef,
hook: Option<RegionHookRef>,
}
impl RegionOpener {
@@ -152,9 +154,16 @@ impl RegionOpener {
replay_checkpoint: None,
file_ref_manager,
partition_expr_fetcher,
hook: None,
}
}
/// Sets the region hook for observing manifest mutations.
pub(crate) fn hook(mut self, hook: Option<RegionHookRef>) -> Self {
self.hook = hook;
self
}
/// Sets metadata builder of the region to create.
pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
self.metadata_builder = Some(builder);
@@ -358,6 +367,7 @@ impl RegionOpener {
manifest_ctx: Arc::new(ManifestContext::new(
manifest_manager,
RegionRoleState::Leader(RegionLeaderState::Writable),
self.hook.clone(),
)),
file_purger: create_file_purger(
config.gc.enable,
@@ -601,6 +611,7 @@ impl RegionOpener {
manifest_ctx: Arc::new(ManifestContext::new(
manifest_manager,
RegionRoleState::Follower,
self.hook.clone(),
)),
file_purger,
provider: provider.clone(),

View File

@@ -144,6 +144,7 @@ impl SchedulerEnv {
.await
.unwrap(),
RegionRoleState::Leader(RegionLeaderState::Writable),
None,
))
}

View File

@@ -125,6 +125,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.partition_expr_fetcher.clone(),
)
.cache(Some(self.cache_manager.clone()))
.hook(self.plugins.get())
.options(region.version().options.clone())?
.skip_wal_replay(true)
.open(&self.config, &self.wal)

View File

@@ -19,7 +19,7 @@ use store_api::storage::{FileId, RegionId};
use crate::error::{InvalidRequestSnafu, MissingManifestSnafu, Result};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::region::{FileDescriptor, MitoRegionRef, RegionFileCopier, RegionMetadataLoader};
use crate::region::{FileDescriptor, MitoRegionRef, RegionFileCopier, RegionLeaderState, RegionMetadataLoader};
use crate::request::{
BackgroundNotify, CopyRegionFromFinished, CopyRegionFromRequest, WorkerRequest,
};
@@ -228,13 +228,10 @@ impl<S> RegionWorkerLoop<S> {
committed_sequence: None,
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
info!("Applying {edit:?} to region {target_region_id}, reason: CopyRegionFrom");
info!("Applying {:?} to region {target_region_id}, reason: CopyRegionFrom", edit);
let version = region
.manifest_ctx
.manifest_manager
.write()
.await
.update(action_list, false)
.update_manifest(RegionLeaderState::Writable, action_list, false)
.await?;
info!(
"Successfully update manifest version to {version}, region: {target_region_id}, reason: CopyRegionFrom"

View File

@@ -70,6 +70,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.metadata_builder(builder)
.parse_options(request.options)?
.cache(Some(self.cache_manager.clone()))
.hook(self.plugins.get())
.create_or_open(&self.config, &self.wal)
.await?;

View File

@@ -166,7 +166,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
flush_semaphore: self.flush_semaphore.clone(),
is_staging: region.is_staging(),
partition_expr: region.maybe_staging_partition_expr_str(),
plugins: self.plugins.clone(),
}
}
}

View File

@@ -107,6 +107,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
)
.skip_wal_replay(request.skip_wal_replay)
.cache(Some(self.cache_manager.clone()))
.hook(self.plugins.get())
.wal_entry_reader(wal_entry_receiver.map(|receiver| Box::new(receiver) as _))
.replay_checkpoint(request.checkpoint.map(|checkpoint| checkpoint.entry_id))
.parse_options(request.options.clone())