diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 62e89608b4..237db63a5b 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -783,7 +783,7 @@ impl InformationExtension for StandaloneInformationExtension { manifest_size: region_stat.manifest_size, sst_size: region_stat.sst_size, index_size: region_stat.index_size, - manifest_version: region_stat.manifest_version, + region_manifest: region_stat.manifest.into(), } }) .collect::>(); diff --git a/src/common/meta/src/datanode.rs b/src/common/meta/src/datanode.rs index a167d89599..ed1957cbd7 100644 --- a/src/common/meta/src/datanode.rs +++ b/src/common/meta/src/datanode.rs @@ -92,8 +92,22 @@ pub struct RegionStat { pub sst_size: u64, /// The size of the SST index files in bytes. pub index_size: u64, - /// The version of manifest. - pub manifest_version: u64, + /// The manifest infoof the region. + pub region_manifest: RegionManifestInfo, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum RegionManifestInfo { + Mito { + manifest_version: u64, + flushed_entry_id: u64, + }, + Metric { + data_manifest_version: u64, + data_flushed_entry_id: u64, + metadata_manifest_version: u64, + metadata_flushed_entry_id: u64, + }, } impl Stat { @@ -167,6 +181,31 @@ impl TryFrom<&HeartbeatRequest> for Stat { } } +impl From for RegionManifestInfo { + fn from(value: store_api::region_engine::RegionManifestInfo) -> Self { + match value { + store_api::region_engine::RegionManifestInfo::Mito { + manifest_version, + flushed_entry_id, + } => RegionManifestInfo::Mito { + manifest_version, + flushed_entry_id, + }, + store_api::region_engine::RegionManifestInfo::Metric { + data_manifest_version, + data_flushed_entry_id, + metadata_manifest_version, + metadata_flushed_entry_id, + } => RegionManifestInfo::Metric { + data_manifest_version, + data_flushed_entry_id, + metadata_manifest_version, + metadata_flushed_entry_id, + }, + } + } +} + impl From<&api::v1::meta::RegionStat> for RegionStat { fn from(value: &api::v1::meta::RegionStat) -> Self { let region_stat = value @@ -187,7 +226,7 @@ impl From<&api::v1::meta::RegionStat> for RegionStat { manifest_size: region_stat.manifest_size, sst_size: region_stat.sst_size, index_size: region_stat.index_size, - manifest_version: region_stat.manifest_version, + region_manifest: region_stat.manifest.into(), } } } diff --git a/src/common/meta/src/region_registry.rs b/src/common/meta/src/region_registry.rs index e478c556d3..267cf2d10b 100644 --- a/src/common/meta/src/region_registry.rs +++ b/src/common/meta/src/region_registry.rs @@ -19,13 +19,82 @@ use std::sync::{Arc, RwLock}; use common_telemetry::warn; use store_api::storage::RegionId; +use crate::datanode::RegionManifestInfo; + /// Represents information about a leader region in the cluster. /// Contains the datanode id where the leader is located, /// and the current manifest version. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct LeaderRegion { pub datanode_id: u64, - pub manifest_version: u64, + pub manifest: LeaderRegionManifestInfo, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum LeaderRegionManifestInfo { + Mito { + manifest_version: u64, + flushed_entry_id: u64, + }, + Metric { + data_manifest_version: u64, + data_flushed_entry_id: u64, + metadata_manifest_version: u64, + metadata_flushed_entry_id: u64, + }, +} + +impl From for LeaderRegionManifestInfo { + fn from(value: RegionManifestInfo) -> Self { + match value { + RegionManifestInfo::Mito { + manifest_version, + flushed_entry_id, + } => LeaderRegionManifestInfo::Mito { + manifest_version, + flushed_entry_id, + }, + RegionManifestInfo::Metric { + data_manifest_version, + data_flushed_entry_id, + metadata_manifest_version, + metadata_flushed_entry_id, + } => LeaderRegionManifestInfo::Metric { + data_manifest_version, + data_flushed_entry_id, + metadata_manifest_version, + metadata_flushed_entry_id, + }, + } + } +} + +impl LeaderRegionManifestInfo { + /// Returns the manifest version of the leader region. + pub fn manifest_version(&self) -> u64 { + match self { + LeaderRegionManifestInfo::Mito { + manifest_version, .. + } => *manifest_version, + LeaderRegionManifestInfo::Metric { + data_manifest_version, + .. + } => *data_manifest_version, + } + } + + /// Returns the flushed entry id of the leader region. + pub fn flushed_entry_id(&self) -> u64 { + match self { + LeaderRegionManifestInfo::Mito { + flushed_entry_id, .. + } => *flushed_entry_id, + LeaderRegionManifestInfo::Metric { + data_flushed_entry_id, + .. + } => *data_flushed_entry_id, + } + } } pub type LeaderRegionRegistryRef = Arc; @@ -71,13 +140,13 @@ impl LeaderRegionRegistry { entry.insert(leader_region); } Entry::Occupied(mut entry) => { - let manifest_version = entry.get().manifest_version; - if manifest_version > leader_region.manifest_version { + let manifest_version = entry.get().manifest.manifest_version(); + if manifest_version > leader_region.manifest.manifest_version() { warn!( "Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}", region_id, manifest_version, - leader_region.manifest_version + leader_region.manifest.manifest_version() ); } else { entry.insert(leader_region); diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 881f0c2a9b..3336ab2a0f 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -28,6 +28,7 @@ use collect_cluster_info_handler::{ CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler, CollectFrontendClusterInfoHandler, }; +use collect_leader_region_handler::CollectLeaderRegionHandler; use collect_stats_handler::CollectStatsHandler; use common_base::Plugins; use common_meta::datanode::Stat; @@ -62,6 +63,7 @@ use crate::service::mailbox::{ pub mod check_leader_handler; pub mod collect_cluster_info_handler; +pub mod collect_leader_region_handler; pub mod collect_stats_handler; pub mod extract_stat_handler; pub mod failure_handler; @@ -570,6 +572,7 @@ impl HeartbeatHandlerGroupBuilder { if let Some(publish_heartbeat_handler) = publish_heartbeat_handler { self.add_handler_last(publish_heartbeat_handler); } + self.add_handler_last(CollectLeaderRegionHandler); self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor)); self.add_handler_last(RemapFlowPeerHandler::default()); @@ -848,7 +851,7 @@ mod tests { .unwrap(); let handlers = group.handlers; - assert_eq!(13, handlers.len()); + assert_eq!(14, handlers.len()); let names = [ "ResponseHeaderHandler", @@ -862,6 +865,7 @@ mod tests { "CollectFlownodeClusterInfoHandler", "MailboxHandler", "FilterInactiveRegionStatsHandler", + "CollectLeaderRegionHandler", "CollectStatsHandler", "RemapFlowPeerHandler", ]; @@ -884,7 +888,7 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(14, handlers.len()); + assert_eq!(15, handlers.len()); let names = [ "ResponseHeaderHandler", @@ -899,6 +903,7 @@ mod tests { "MailboxHandler", "CollectStatsHandler", "FilterInactiveRegionStatsHandler", + "CollectLeaderRegionHandler", "CollectStatsHandler", "RemapFlowPeerHandler", ]; @@ -918,7 +923,7 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(14, handlers.len()); + assert_eq!(15, handlers.len()); let names = [ "CollectStatsHandler", @@ -933,6 +938,7 @@ mod tests { "CollectFlownodeClusterInfoHandler", "MailboxHandler", "FilterInactiveRegionStatsHandler", + "CollectLeaderRegionHandler", "CollectStatsHandler", "RemapFlowPeerHandler", ]; @@ -952,7 +958,7 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(14, handlers.len()); + assert_eq!(15, handlers.len()); let names = [ "ResponseHeaderHandler", @@ -967,6 +973,7 @@ mod tests { "MailboxHandler", "CollectStatsHandler", "FilterInactiveRegionStatsHandler", + "CollectLeaderRegionHandler", "CollectStatsHandler", "RemapFlowPeerHandler", ]; @@ -986,7 +993,7 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(14, handlers.len()); + assert_eq!(15, handlers.len()); let names = [ "ResponseHeaderHandler", @@ -1000,6 +1007,7 @@ mod tests { "CollectFlownodeClusterInfoHandler", "MailboxHandler", "FilterInactiveRegionStatsHandler", + "CollectLeaderRegionHandler", "CollectStatsHandler", "ResponseHeaderHandler", "RemapFlowPeerHandler", @@ -1020,7 +1028,7 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(13, handlers.len()); + assert_eq!(14, handlers.len()); let names = [ "ResponseHeaderHandler", @@ -1034,6 +1042,7 @@ mod tests { "CollectFlownodeClusterInfoHandler", "CollectStatsHandler", "FilterInactiveRegionStatsHandler", + "CollectLeaderRegionHandler", "CollectStatsHandler", "RemapFlowPeerHandler", ]; @@ -1053,7 +1062,7 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(13, handlers.len()); + assert_eq!(14, handlers.len()); let names = [ "ResponseHeaderHandler", @@ -1067,6 +1076,7 @@ mod tests { "CollectFlownodeClusterInfoHandler", "MailboxHandler", "FilterInactiveRegionStatsHandler", + "CollectLeaderRegionHandler", "ResponseHeaderHandler", "RemapFlowPeerHandler", ]; @@ -1086,7 +1096,7 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(13, handlers.len()); + assert_eq!(14, handlers.len()); let names = [ "CollectStatsHandler", @@ -1100,6 +1110,7 @@ mod tests { "CollectFlownodeClusterInfoHandler", "MailboxHandler", "FilterInactiveRegionStatsHandler", + "CollectLeaderRegionHandler", "CollectStatsHandler", "RemapFlowPeerHandler", ]; diff --git a/src/meta-srv/src/handler/collect_leader_region_handler.rs b/src/meta-srv/src/handler/collect_leader_region_handler.rs new file mode 100644 index 0000000000..9021fe32eb --- /dev/null +++ b/src/meta-srv/src/handler/collect_leader_region_handler.rs @@ -0,0 +1,233 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{HeartbeatRequest, Role}; +use common_meta::region_registry::LeaderRegion; +use common_telemetry::info; +use store_api::region_engine::RegionRole; + +use crate::error::Result; +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; +use crate::metasrv::Context; + +pub struct CollectLeaderRegionHandler; + +#[async_trait::async_trait] +impl HeartbeatHandler for CollectLeaderRegionHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Datanode + } + + async fn handle( + &self, + _req: &HeartbeatRequest, + ctx: &mut Context, + acc: &mut HeartbeatAccumulator, + ) -> Result { + let Some(current_stat) = acc.stat.as_ref() else { + return Ok(HandleControl::Continue); + }; + + let mut key_values = Vec::with_capacity(current_stat.region_stats.len()); + for stat in current_stat.region_stats.iter() { + if stat.role != RegionRole::Leader { + continue; + } + + let manifest = stat.region_manifest.into(); + let value = LeaderRegion { + datanode_id: current_stat.id, + manifest, + }; + key_values.push((stat.id, value)); + } + info!("collect leader region: {:?}", key_values); + ctx.leader_region_registry.batch_put(key_values); + + Ok(HandleControl::Continue) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_meta::cache_invalidator::DummyCacheInvalidator; + use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat}; + use common_meta::key::TableMetadataManager; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::region_registry::{LeaderRegionManifestInfo, LeaderRegionRegistry}; + use common_meta::sequence::SequenceBuilder; + use store_api::region_engine::RegionRole; + use store_api::storage::RegionId; + + use super::*; + use crate::cluster::MetaPeerClientBuilder; + use crate::handler::{HeartbeatMailbox, Pushers}; + use crate::service::store::cached_kv::LeaderCachedKvBackend; + + fn mock_ctx() -> Context { + let in_memory = Arc::new(MemoryKvBackend::new()); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader( + kv_backend.clone(), + )); + let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build(); + let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); + let meta_peer_client = MetaPeerClientBuilder::default() + .election(None) + .in_memory(in_memory.clone()) + .build() + .map(Arc::new) + // Safety: all required fields set at initialization + .unwrap(); + Context { + server_addr: "127.0.0.1:0000".to_string(), + in_memory, + kv_backend: kv_backend.clone(), + leader_cached_kv_backend, + meta_peer_client, + mailbox, + election: None, + is_infancy: false, + table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), + cache_invalidator: Arc::new(DummyCacheInvalidator), + leader_region_registry: Arc::new(LeaderRegionRegistry::new()), + } + } + + fn new_region_stat(id: RegionId, manifest_version: u64, role: RegionRole) -> RegionStat { + RegionStat { + id, + region_manifest: RegionManifestInfo::Mito { + manifest_version, + flushed_entry_id: 0, + }, + rcus: 0, + wcus: 0, + approximate_bytes: 0, + engine: "mito".to_string(), + role, + num_rows: 0, + memtable_size: 0, + manifest_size: 0, + sst_size: 0, + index_size: 0, + } + } + + #[tokio::test] + async fn test_handle_collect_leader_region() { + let mut ctx = mock_ctx(); + + let mut acc = HeartbeatAccumulator { + stat: Some(Stat { + id: 1, + region_stats: vec![ + new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader), + new_region_stat(RegionId::new(1, 2), 2, RegionRole::Follower), + ], + addr: "127.0.0.1:0000".to_string(), + region_num: 2, + ..Default::default() + }), + ..Default::default() + }; + + let handler = CollectLeaderRegionHandler; + let control = handler + .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc) + .await + .unwrap(); + + assert_eq!(control, HandleControl::Continue); + let regions = ctx + .leader_region_registry + .batch_get(vec![RegionId::new(1, 1), RegionId::new(1, 2)].into_iter()); + assert_eq!(regions.len(), 1); + assert_eq!( + regions.get(&RegionId::new(1, 1)), + Some(&LeaderRegion { + datanode_id: 1, + manifest: LeaderRegionManifestInfo::Mito { + manifest_version: 1, + flushed_entry_id: 0, + }, + }) + ); + + // New heartbeat with new manifest version + acc.stat = Some(Stat { + id: 1, + region_stats: vec![new_region_stat(RegionId::new(1, 1), 2, RegionRole::Leader)], + timestamp_millis: 0, + addr: "127.0.0.1:0000".to_string(), + region_num: 1, + node_epoch: 0, + ..Default::default() + }); + let control = handler + .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc) + .await + .unwrap(); + + assert_eq!(control, HandleControl::Continue); + let regions = ctx + .leader_region_registry + .batch_get(vec![RegionId::new(1, 1)].into_iter()); + assert_eq!(regions.len(), 1); + assert_eq!( + regions.get(&RegionId::new(1, 1)), + Some(&LeaderRegion { + datanode_id: 1, + manifest: LeaderRegionManifestInfo::Mito { + manifest_version: 2, + flushed_entry_id: 0, + }, + }) + ); + + // New heartbeat with old manifest version + acc.stat = Some(Stat { + id: 1, + region_stats: vec![new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader)], + timestamp_millis: 0, + addr: "127.0.0.1:0000".to_string(), + region_num: 1, + node_epoch: 0, + ..Default::default() + }); + let control = handler + .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc) + .await + .unwrap(); + + assert_eq!(control, HandleControl::Continue); + let regions = ctx + .leader_region_registry + .batch_get(vec![RegionId::new(1, 1)].into_iter()); + assert_eq!(regions.len(), 1); + assert_eq!( + regions.get(&RegionId::new(1, 1)), + // The manifest version is not updated + Some(&LeaderRegion { + datanode_id: 1, + manifest: LeaderRegionManifestInfo::Mito { + manifest_version: 2, + flushed_entry_id: 0, + }, + }) + ); + } +} diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 066d149534..81e9bfaeb9 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -64,7 +64,7 @@ impl HeartbeatHandler for RegionFailureHandler { mod tests { use api::v1::meta::HeartbeatRequest; use common_catalog::consts::default_engine; - use common_meta::datanode::{RegionStat, Stat}; + use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat}; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use tokio::sync::oneshot; @@ -98,7 +98,10 @@ mod tests { manifest_size: 0, sst_size: 0, index_size: 0, - manifest_version: 0, + region_manifest: RegionManifestInfo::Mito { + manifest_version: 0, + flushed_entry_id: 0, + }, } } acc.stat = Some(Stat { diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 7b1b1a7637..b89a570b80 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -119,7 +119,7 @@ mod test { use std::collections::{HashMap, HashSet}; use std::sync::Arc; - use common_meta::datanode::{RegionStat, Stat}; + use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat}; use common_meta::distributed_time_constants; use common_meta::key::table_route::TableRouteValue; use common_meta::key::test_utils::new_test_table_info; @@ -156,7 +156,10 @@ mod test { manifest_size: 0, sst_size: 0, index_size: 0, - manifest_version: 0, + region_manifest: RegionManifestInfo::Mito { + manifest_version: 0, + flushed_entry_id: 0, + }, } } diff --git a/src/meta-srv/src/selector/weight_compute.rs b/src/meta-srv/src/selector/weight_compute.rs index 63a10d3b4f..eb35f43f19 100644 --- a/src/meta-srv/src/selector/weight_compute.rs +++ b/src/meta-srv/src/selector/weight_compute.rs @@ -94,7 +94,9 @@ impl WeightCompute for RegionNumsBasedWeightCompute { mod tests { use std::collections::HashMap; - use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat, Stat}; + use common_meta::datanode::{ + DatanodeStatKey, DatanodeStatValue, RegionManifestInfo, RegionStat, Stat, + }; use common_meta::peer::Peer; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; @@ -189,7 +191,10 @@ mod tests { manifest_size: 0, sst_size: 0, index_size: 0, - manifest_version: 0, + region_manifest: RegionManifestInfo::Mito { + manifest_version: 0, + flushed_entry_id: 0, + }, }], ..Default::default() } @@ -211,7 +216,10 @@ mod tests { manifest_size: 0, sst_size: 0, index_size: 0, - manifest_version: 0, + region_manifest: RegionManifestInfo::Mito { + manifest_version: 0, + flushed_entry_id: 0, + }, }], ..Default::default() } @@ -233,7 +241,10 @@ mod tests { manifest_size: 0, sst_size: 0, index_size: 0, - manifest_version: 0, + region_manifest: RegionManifestInfo::Mito { + manifest_version: 0, + flushed_entry_id: 0, + }, }], ..Default::default() } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index d678136a34..42994c0437 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -40,8 +40,8 @@ use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::region_engine::{ - RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse, - SettableRegionRoleState, + RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, + SetRegionRoleStateResponse, SettableRegionRoleState, }; use store_api::region_request::{BatchRegionDdlRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; @@ -259,7 +259,29 @@ impl RegionEngine for MetricEngine { /// Note: Returns `None` if it's a logical region. fn region_statistic(&self, region_id: RegionId) -> Option { if self.inner.is_physical_region(region_id) { - self.inner.mito.region_statistic(region_id) + let metadata_region_id = utils::to_metadata_region_id(region_id); + let data_region_id = utils::to_data_region_id(region_id); + + let metadata_stat = self.inner.mito.region_statistic(metadata_region_id); + let data_stat = self.inner.mito.region_statistic(data_region_id); + + match (metadata_stat, data_stat) { + (Some(metadata_stat), Some(data_stat)) => Some(RegionStatistic { + num_rows: metadata_stat.num_rows + data_stat.num_rows, + memtable_size: metadata_stat.memtable_size + data_stat.memtable_size, + wal_size: metadata_stat.wal_size + data_stat.wal_size, + manifest_size: metadata_stat.manifest_size + data_stat.manifest_size, + sst_size: metadata_stat.sst_size + data_stat.sst_size, + index_size: metadata_stat.index_size + data_stat.index_size, + manifest: RegionManifestInfo::Metric { + data_flushed_entry_id: data_stat.manifest.flushed_entry_id(), + data_manifest_version: data_stat.manifest.manifest_version(), + metadata_flushed_entry_id: metadata_stat.manifest.flushed_entry_id(), + metadata_manifest_version: metadata_stat.manifest.manifest_version(), + }, + }), + _ => None, + } } else { None } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index cfbff3bf15..191300a076 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -30,7 +30,9 @@ use store_api::codec::PrimaryKeyEncoding; use store_api::logstore::provider::Provider; use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{RegionRole, RegionStatistic, SettableRegionRoleState}; +use store_api::region_engine::{ + RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState, +}; use store_api::storage::RegionId; use crate::access_layer::AccessLayerRef; @@ -290,6 +292,7 @@ impl MitoRegion { let manifest_usage = self.stats.total_manifest_size(); let num_rows = version.ssts.num_rows() + version.memtables.num_rows(); let manifest_version = self.stats.manifest_version(); + let flushed_entry_id = version.flushed_entry_id; RegionStatistic { num_rows, @@ -298,7 +301,10 @@ impl MitoRegion { manifest_size: manifest_usage, sst_size: sst_usage, index_size: index_usage, - manifest_version, + manifest: RegionManifestInfo::Mito { + manifest_version, + flushed_entry_id, + }, } } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index dfbbcb7b3a..022ddb80df 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -382,9 +382,60 @@ pub struct RegionStatistic { /// The size of SST index files in bytes. #[serde(default)] pub index_size: u64, - /// The version of manifest. + /// The details of the region. #[serde(default)] - pub manifest_version: u64, + pub manifest: RegionManifestInfo, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum RegionManifestInfo { + Mito { + manifest_version: u64, + flushed_entry_id: u64, + }, + Metric { + data_manifest_version: u64, + data_flushed_entry_id: u64, + metadata_manifest_version: u64, + metadata_flushed_entry_id: u64, + }, +} + +impl RegionManifestInfo { + /// Returns the flushed entry id of the data region. + pub fn flushed_entry_id(&self) -> u64 { + match self { + RegionManifestInfo::Mito { + flushed_entry_id, .. + } => *flushed_entry_id, + RegionManifestInfo::Metric { + metadata_flushed_entry_id, + .. + } => *metadata_flushed_entry_id, + } + } + + /// Returns the manifest version of the data region. + pub fn manifest_version(&self) -> u64 { + match self { + RegionManifestInfo::Mito { + manifest_version, .. + } => *manifest_version, + RegionManifestInfo::Metric { + metadata_manifest_version, + .. + } => *metadata_manifest_version, + } + } +} + +impl Default for RegionManifestInfo { + fn default() -> Self { + Self::Mito { + manifest_version: 0, + flushed_entry_id: 0, + } + } } impl RegionStatistic {