From f684479d9c32ad354934b1d22a84d2e7fac53bdb Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Wed, 1 Jul 2026 11:04:23 +0800 Subject: [PATCH] refactor: extract region hook function for extension (#8375) * refactor: extract region hook function for extension * refactor: add a warning for unmatched vector size * feat: reconstruct sst_info from file_meta * chore: update comments about the hook --- src/mito2/src/compaction/compactor.rs | 86 ++++++++++++++++++++++++++- src/mito2/src/compaction/task.rs | 20 +------ src/mito2/src/engine/region_hook.rs | 22 +++++-- 3 files changed, 103 insertions(+), 25 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index b712b229ae..17e5d83bf9 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -39,7 +39,7 @@ use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::picker::PickerOutput; use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options}; use crate::config::MitoConfig; -use crate::engine::region_hook::RegionHookRef; +use crate::engine::region_hook::{RegionHookRef, SstFileInfo}; use crate::error; use crate::error::{ EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result, @@ -257,6 +257,90 @@ impl CompactionRegion { Ok(()) } } + + /// Fires [`RegionHook::on_sst_files_written`] for the freshly-merged SST + /// files in `merge_output`. Shared by both compaction paths, it must run + /// before [`Compactor::update_manifest`], whose `on_manifest_updated` + /// drains the per-region state this hook populates. + pub async fn invoke_sst_hook(&self, merge_output: &MergeOutput) { + let Some(hook) = self.plugins.get::() else { + return; + }; + + // Remote compaction deserializes `MergeOutput` over gRPC, where + // `sst_infos` is `#[serde(skip)]`. Rebuild each `SstInfo` from the + // matching `FileMeta` so the hook observes real ids/rows/sizes rather + // than zeros — see [`sst_info_from_file_meta`] for what stays default. + let synthesized: Vec; + let infos: &[SstInfo] = if merge_output.sst_infos.is_empty() { + synthesized = merge_output + .files_to_add + .iter() + .map(sst_info_from_file_meta) + .collect(); + &synthesized + } else { + // `sst_infos` and `files_to_add` are documented as 1:1. If they + // ever diverge, `zip` below would silently truncate; warn so the + // mismatch is observable rather than dropping files from the hook. + if merge_output.sst_infos.len() != merge_output.files_to_add.len() { + warn!( + "sst_infos length ({}) does not match files_to_add length ({}) for region {}", + merge_output.sst_infos.len(), + merge_output.files_to_add.len(), + self.region_id + ); + } + &merge_output.sst_infos + }; + + let files: Vec> = merge_output + .files_to_add + .iter() + .zip(infos) + .map(|(meta, info)| SstFileInfo { + sst_info_ref: info, + file_meta: meta, + }) + .collect(); + hook.on_sst_files_written(self.region_id, &self.region_metadata, &files) + .await; + } +} + +/// Builds an [`SstInfo`] for the remote-compaction path by copying the scalar +/// fields that [`FileMeta`] also carries. +/// +/// Remote compaction runs off-process and ships `MergeOutput` over the wire; +/// [`MergeOutput::sst_infos`] is `#[serde(skip)]` because the parquet footer +/// (`ParquetMetaData`) is not serde-serializable. To avoid feeding the hook +/// default (zero) values for real SSTs, this rebuilds the seven fields `FileMeta` +/// and `SstInfo` share: `file_id`, `time_range`, `file_size`, +/// `max_row_group_uncompressed_size`, `num_rows`, `num_row_groups`, `num_series`. +/// +/// Two fields stay default and **cannot be recovered on the datanode**: +/// - `file_metadata` — the parquet footer, whose column min/max/null-count +/// statistics are required by hooks building richer artifacts (e.g. an Iceberg +/// manifest). That data is produced by the compactor's writer and exists only +/// in the freshly-written SST on object storage. +/// - `index_metadata` — `FileMeta` tracks indexes via `available_indexes` / +/// `indexes`, not as an [`IndexOutput`]. +/// +/// A hook that needs column stats must fetch the footer from object storage +/// (the [`CompactionRegion`]'s `access_layer` reaches the store), or the footer +/// must be shipped over the wire (revisit the `#[serde(skip)]` on `sst_infos`). +/// `num_rows` may read `0` for legacy `FileMeta`s where the count is unknown. +fn sst_info_from_file_meta(meta: &FileMeta) -> SstInfo { + SstInfo { + file_id: meta.file_id, + time_range: meta.time_range, + file_size: meta.file_size, + max_row_group_uncompressed_size: meta.max_row_group_uncompressed_size, + num_rows: meta.num_rows as usize, + num_row_groups: meta.num_row_groups, + num_series: meta.num_series, + ..Default::default() + } } /// `[MergeOutput]` represents the output of merging SST files. diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index bc024dd6db..a8da921968 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -28,7 +28,6 @@ use crate::compaction::LocalCompactionState; use crate::compaction::compactor::{CompactionRegion, Compactor, MergeOutput}; use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager}; use crate::compaction::picker::{CompactionTask, PickerOutput}; -use crate::engine::region_hook::{RegionHookRef, SstFileInfo}; use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED}; @@ -286,24 +285,7 @@ impl CompactionTaskImpl { } async fn invoke_sst_hook(&self, merge_output: &MergeOutput) { - let hook: Option = self.compaction_region.plugins.get(); - if let Some(hook) = hook { - let files: Vec> = merge_output - .sst_infos - .iter() - .zip(merge_output.files_to_add.iter()) - .map(|(info, meta)| SstFileInfo { - sst_info_ref: info, - file_meta: meta, - }) - .collect(); - hook.on_sst_files_written( - self.compaction_region.region_id, - &self.compaction_region.region_metadata, - &files, - ) - .await; - } + self.compaction_region.invoke_sst_hook(merge_output).await; } } diff --git a/src/mito2/src/engine/region_hook.rs b/src/mito2/src/engine/region_hook.rs index eca254b842..d1b28f6eba 100644 --- a/src/mito2/src/engine/region_hook.rs +++ b/src/mito2/src/engine/region_hook.rs @@ -19,8 +19,8 @@ //! The [`RegionHook`] trait provides two methods with clear separation of concerns: //! //! - [`on_sst_files_written`]: Fires when mito2 physically writes SST **data files**. -//! Provides per-file [`SstInfo`] + [`FileMeta`] — the richest available metadata -//! (row counts, index metadata, Parquet metadata, etc.). +//! Provides per-file [`SstInfo`] + [`FileMeta`]; metadata richness varies by path +//! (see [`SstFileInfo`] and the coverage footnote). //! //! - [`on_manifest_updated`]: Fires after **any** manifest write is successfully committed. //! Receives the full [`RegionMetaActionList`] so consumers can inspect what changed @@ -46,9 +46,10 @@ //! | Enter staging | ❌ (no SST files) | ✅ Yes | //! | Async index build | ❌ (index files only) | ✅ Yes | //! -//! ¹ Remote compaction runs on a dedicated compactor node via `open_compaction_region()`. -//! The caller must pass plugins via `OpenCompactionRegionRequest` to enable hooks on the -//! compactor node. +//! ¹ Remote compaction runs on a dedicated compactor node via `open_compaction_region()`; +//! pass plugins via `OpenCompactionRegionRequest` to enable hooks there. `sst_infos` is +//! `#[serde(skip)]` over the wire, so each [`SstInfo`] is rebuilt from [`FileMeta`] with +//! empty footer/index — see [`SstFileInfo`] for field-level detail. //! ² Apply staging fires `on_manifest_updated` twice: once when the staging SST files are //! committed via `RegionEdit`, and once when `exit_staging_on_success` merges all staged //! manifest actions into the live manifest. @@ -168,6 +169,12 @@ impl PendingManifestHook { } /// Information about a single SST data file written during flush or compaction. +/// +/// `file_meta` is always complete. `sst_info_ref` mirrors those scalars and adds the +/// Parquet footer (`file_metadata`) and full `index_metadata` — but **only when mito2 +/// wrote the file in-process** (flush, local compaction). On remote compaction `SstInfo` +/// is rebuilt from `FileMeta`, so both are empty; hooks needing column statistics must +/// fetch the footer from object storage. pub struct SstFileInfo<'a> { pub sst_info_ref: &'a SstInfo, pub file_meta: &'a FileMeta, @@ -190,6 +197,11 @@ pub trait RegionHook: Send + Sync + Debug { /// This fires only when mito2 itself writes SST files (flush and compaction). /// It does **not** fire when SST files are pre-written externally (bulk ingestion, /// copy region) or when only index files are written (async index build). + /// + /// # Metadata availability + /// See [`SstFileInfo`]: `file_meta` is always complete, but the [`SstInfo`] footer + /// and index output are empty on remote compaction. Hooks needing column statistics + /// (e.g. an Iceberg manifest) must fetch the footer from object storage. async fn on_sst_files_written( &self, region_id: RegionId,