refactor: extract region hook function for extension (#8375)

* refactor: extract region hook function for extension

* refactor: add a warning for unmatched vector size

* feat: reconstruct sst_info from file_meta

* chore: update comments about the hook
This commit is contained in:
Ning Sun
2026-07-01 11:04:23 +08:00
committed by GitHub
parent 5a7a9c9f07
commit f684479d9c
3 changed files with 103 additions and 25 deletions

View File

@@ -39,7 +39,7 @@ use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::picker::PickerOutput;
use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options};
use crate::config::MitoConfig;
use crate::engine::region_hook::RegionHookRef;
use crate::engine::region_hook::{RegionHookRef, SstFileInfo};
use crate::error;
use crate::error::{
EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result,
@@ -257,6 +257,90 @@ impl CompactionRegion {
Ok(())
}
}
/// Fires [`RegionHook::on_sst_files_written`] for the freshly-merged SST
/// files in `merge_output`. Shared by both compaction paths, it must run
/// before [`Compactor::update_manifest`], whose `on_manifest_updated`
/// drains the per-region state this hook populates.
pub async fn invoke_sst_hook(&self, merge_output: &MergeOutput) {
let Some(hook) = self.plugins.get::<RegionHookRef>() else {
return;
};
// Remote compaction deserializes `MergeOutput` over gRPC, where
// `sst_infos` is `#[serde(skip)]`. Rebuild each `SstInfo` from the
// matching `FileMeta` so the hook observes real ids/rows/sizes rather
// than zeros — see [`sst_info_from_file_meta`] for what stays default.
let synthesized: Vec<SstInfo>;
let infos: &[SstInfo] = if merge_output.sst_infos.is_empty() {
synthesized = merge_output
.files_to_add
.iter()
.map(sst_info_from_file_meta)
.collect();
&synthesized
} else {
// `sst_infos` and `files_to_add` are documented as 1:1. If they
// ever diverge, `zip` below would silently truncate; warn so the
// mismatch is observable rather than dropping files from the hook.
if merge_output.sst_infos.len() != merge_output.files_to_add.len() {
warn!(
"sst_infos length ({}) does not match files_to_add length ({}) for region {}",
merge_output.sst_infos.len(),
merge_output.files_to_add.len(),
self.region_id
);
}
&merge_output.sst_infos
};
let files: Vec<SstFileInfo<'_>> = merge_output
.files_to_add
.iter()
.zip(infos)
.map(|(meta, info)| SstFileInfo {
sst_info_ref: info,
file_meta: meta,
})
.collect();
hook.on_sst_files_written(self.region_id, &self.region_metadata, &files)
.await;
}
}
/// Builds an [`SstInfo`] for the remote-compaction path by copying the scalar
/// fields that [`FileMeta`] also carries.
///
/// Remote compaction runs off-process and ships `MergeOutput` over the wire;
/// [`MergeOutput::sst_infos`] is `#[serde(skip)]` because the parquet footer
/// (`ParquetMetaData`) is not serde-serializable. To avoid feeding the hook
/// default (zero) values for real SSTs, this rebuilds the seven fields `FileMeta`
/// and `SstInfo` share: `file_id`, `time_range`, `file_size`,
/// `max_row_group_uncompressed_size`, `num_rows`, `num_row_groups`, `num_series`.
///
/// Two fields stay default and **cannot be recovered on the datanode**:
/// - `file_metadata` — the parquet footer, whose column min/max/null-count
/// statistics are required by hooks building richer artifacts (e.g. an Iceberg
/// manifest). That data is produced by the compactor's writer and exists only
/// in the freshly-written SST on object storage.
/// - `index_metadata` — `FileMeta` tracks indexes via `available_indexes` /
/// `indexes`, not as an [`IndexOutput`].
///
/// A hook that needs column stats must fetch the footer from object storage
/// (the [`CompactionRegion`]'s `access_layer` reaches the store), or the footer
/// must be shipped over the wire (revisit the `#[serde(skip)]` on `sst_infos`).
/// `num_rows` may read `0` for legacy `FileMeta`s where the count is unknown.
fn sst_info_from_file_meta(meta: &FileMeta) -> SstInfo {
SstInfo {
file_id: meta.file_id,
time_range: meta.time_range,
file_size: meta.file_size,
max_row_group_uncompressed_size: meta.max_row_group_uncompressed_size,
num_rows: meta.num_rows as usize,
num_row_groups: meta.num_row_groups,
num_series: meta.num_series,
..Default::default()
}
}
/// `[MergeOutput]` represents the output of merging SST files.

View File

@@ -28,7 +28,6 @@ use crate::compaction::LocalCompactionState;
use crate::compaction::compactor::{CompactionRegion, Compactor, MergeOutput};
use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager};
use crate::compaction::picker::{CompactionTask, PickerOutput};
use crate::engine::region_hook::{RegionHookRef, SstFileInfo};
use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
@@ -286,24 +285,7 @@ impl CompactionTaskImpl {
}
async fn invoke_sst_hook(&self, merge_output: &MergeOutput) {
let hook: Option<RegionHookRef> = self.compaction_region.plugins.get();
if let Some(hook) = hook {
let files: Vec<SstFileInfo<'_>> = merge_output
.sst_infos
.iter()
.zip(merge_output.files_to_add.iter())
.map(|(info, meta)| SstFileInfo {
sst_info_ref: info,
file_meta: meta,
})
.collect();
hook.on_sst_files_written(
self.compaction_region.region_id,
&self.compaction_region.region_metadata,
&files,
)
.await;
}
self.compaction_region.invoke_sst_hook(merge_output).await;
}
}

View File

@@ -19,8 +19,8 @@
//! The [`RegionHook`] trait provides two methods with clear separation of concerns:
//!
//! - [`on_sst_files_written`]: Fires when mito2 physically writes SST **data files**.
//! Provides per-file [`SstInfo`] + [`FileMeta`] — the richest available metadata
//! (row counts, index metadata, Parquet metadata, etc.).
//! Provides per-file [`SstInfo`] + [`FileMeta`]; metadata richness varies by path
//! (see [`SstFileInfo`] and the coverage footnote).
//!
//! - [`on_manifest_updated`]: Fires after **any** manifest write is successfully committed.
//! Receives the full [`RegionMetaActionList`] so consumers can inspect what changed
@@ -46,9 +46,10 @@
//! | Enter staging | ❌ (no SST files) | ✅ Yes |
//! | Async index build | ❌ (index files only) | ✅ Yes |
//!
//! ¹ Remote compaction runs on a dedicated compactor node via `open_compaction_region()`.
//! The caller must pass plugins via `OpenCompactionRegionRequest` to enable hooks on the
//! compactor node.
//! ¹ Remote compaction runs on a dedicated compactor node via `open_compaction_region()`;
//! pass plugins via `OpenCompactionRegionRequest` to enable hooks there. `sst_infos` is
//! `#[serde(skip)]` over the wire, so each [`SstInfo`] is rebuilt from [`FileMeta`] with
//! empty footer/index — see [`SstFileInfo`] for field-level detail.
//! ² Apply staging fires `on_manifest_updated` twice: once when the staging SST files are
//! committed via `RegionEdit`, and once when `exit_staging_on_success` merges all staged
//! manifest actions into the live manifest.
@@ -168,6 +169,12 @@ impl PendingManifestHook {
}
/// Information about a single SST data file written during flush or compaction.
///
/// `file_meta` is always complete. `sst_info_ref` mirrors those scalars and adds the
/// Parquet footer (`file_metadata`) and full `index_metadata` — but **only when mito2
/// wrote the file in-process** (flush, local compaction). On remote compaction `SstInfo`
/// is rebuilt from `FileMeta`, so both are empty; hooks needing column statistics must
/// fetch the footer from object storage.
pub struct SstFileInfo<'a> {
pub sst_info_ref: &'a SstInfo,
pub file_meta: &'a FileMeta,
@@ -190,6 +197,11 @@ pub trait RegionHook: Send + Sync + Debug {
/// This fires only when mito2 itself writes SST files (flush and compaction).
/// It does **not** fire when SST files are pre-written externally (bulk ingestion,
/// copy region) or when only index files are written (async index build).
///
/// # Metadata availability
/// See [`SstFileInfo`]: `file_meta` is always complete, but the [`SstInfo`] footer
/// and index output are empty on remote compaction. Hooks needing column statistics
/// (e.g. an Iceberg manifest) must fetch the footer from object storage.
async fn on_sst_files_written(
&self,
region_id: RegionId,