mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-06 21:32:58 +00:00
feat: introduce CollectLeaderRegionHandler (#5811)
* feat: introduce `CollectLeaderRegionHandler` * feat: add to default handler group * fix: correct unit test * chore: rename
This commit is contained in:
@@ -783,7 +783,7 @@ impl InformationExtension for StandaloneInformationExtension {
|
|||||||
manifest_size: region_stat.manifest_size,
|
manifest_size: region_stat.manifest_size,
|
||||||
sst_size: region_stat.sst_size,
|
sst_size: region_stat.sst_size,
|
||||||
index_size: region_stat.index_size,
|
index_size: region_stat.index_size,
|
||||||
manifest_version: region_stat.manifest_version,
|
region_manifest: region_stat.manifest.into(),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|||||||
@@ -92,8 +92,22 @@ pub struct RegionStat {
|
|||||||
pub sst_size: u64,
|
pub sst_size: u64,
|
||||||
/// The size of the SST index files in bytes.
|
/// The size of the SST index files in bytes.
|
||||||
pub index_size: u64,
|
pub index_size: u64,
|
||||||
/// The version of manifest.
|
/// The manifest infoof the region.
|
||||||
pub manifest_version: u64,
|
pub region_manifest: RegionManifestInfo,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||||
|
pub enum RegionManifestInfo {
|
||||||
|
Mito {
|
||||||
|
manifest_version: u64,
|
||||||
|
flushed_entry_id: u64,
|
||||||
|
},
|
||||||
|
Metric {
|
||||||
|
data_manifest_version: u64,
|
||||||
|
data_flushed_entry_id: u64,
|
||||||
|
metadata_manifest_version: u64,
|
||||||
|
metadata_flushed_entry_id: u64,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Stat {
|
impl Stat {
|
||||||
@@ -167,6 +181,31 @@ impl TryFrom<&HeartbeatRequest> for Stat {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<store_api::region_engine::RegionManifestInfo> for RegionManifestInfo {
|
||||||
|
fn from(value: store_api::region_engine::RegionManifestInfo) -> Self {
|
||||||
|
match value {
|
||||||
|
store_api::region_engine::RegionManifestInfo::Mito {
|
||||||
|
manifest_version,
|
||||||
|
flushed_entry_id,
|
||||||
|
} => RegionManifestInfo::Mito {
|
||||||
|
manifest_version,
|
||||||
|
flushed_entry_id,
|
||||||
|
},
|
||||||
|
store_api::region_engine::RegionManifestInfo::Metric {
|
||||||
|
data_manifest_version,
|
||||||
|
data_flushed_entry_id,
|
||||||
|
metadata_manifest_version,
|
||||||
|
metadata_flushed_entry_id,
|
||||||
|
} => RegionManifestInfo::Metric {
|
||||||
|
data_manifest_version,
|
||||||
|
data_flushed_entry_id,
|
||||||
|
metadata_manifest_version,
|
||||||
|
metadata_flushed_entry_id,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<&api::v1::meta::RegionStat> for RegionStat {
|
impl From<&api::v1::meta::RegionStat> for RegionStat {
|
||||||
fn from(value: &api::v1::meta::RegionStat) -> Self {
|
fn from(value: &api::v1::meta::RegionStat) -> Self {
|
||||||
let region_stat = value
|
let region_stat = value
|
||||||
@@ -187,7 +226,7 @@ impl From<&api::v1::meta::RegionStat> for RegionStat {
|
|||||||
manifest_size: region_stat.manifest_size,
|
manifest_size: region_stat.manifest_size,
|
||||||
sst_size: region_stat.sst_size,
|
sst_size: region_stat.sst_size,
|
||||||
index_size: region_stat.index_size,
|
index_size: region_stat.index_size,
|
||||||
manifest_version: region_stat.manifest_version,
|
region_manifest: region_stat.manifest.into(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,13 +19,82 @@ use std::sync::{Arc, RwLock};
|
|||||||
use common_telemetry::warn;
|
use common_telemetry::warn;
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
|
use crate::datanode::RegionManifestInfo;
|
||||||
|
|
||||||
/// Represents information about a leader region in the cluster.
|
/// Represents information about a leader region in the cluster.
|
||||||
/// Contains the datanode id where the leader is located,
|
/// Contains the datanode id where the leader is located,
|
||||||
/// and the current manifest version.
|
/// and the current manifest version.
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||||
pub struct LeaderRegion {
|
pub struct LeaderRegion {
|
||||||
pub datanode_id: u64,
|
pub datanode_id: u64,
|
||||||
pub manifest_version: u64,
|
pub manifest: LeaderRegionManifestInfo,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||||
|
pub enum LeaderRegionManifestInfo {
|
||||||
|
Mito {
|
||||||
|
manifest_version: u64,
|
||||||
|
flushed_entry_id: u64,
|
||||||
|
},
|
||||||
|
Metric {
|
||||||
|
data_manifest_version: u64,
|
||||||
|
data_flushed_entry_id: u64,
|
||||||
|
metadata_manifest_version: u64,
|
||||||
|
metadata_flushed_entry_id: u64,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<RegionManifestInfo> for LeaderRegionManifestInfo {
|
||||||
|
fn from(value: RegionManifestInfo) -> Self {
|
||||||
|
match value {
|
||||||
|
RegionManifestInfo::Mito {
|
||||||
|
manifest_version,
|
||||||
|
flushed_entry_id,
|
||||||
|
} => LeaderRegionManifestInfo::Mito {
|
||||||
|
manifest_version,
|
||||||
|
flushed_entry_id,
|
||||||
|
},
|
||||||
|
RegionManifestInfo::Metric {
|
||||||
|
data_manifest_version,
|
||||||
|
data_flushed_entry_id,
|
||||||
|
metadata_manifest_version,
|
||||||
|
metadata_flushed_entry_id,
|
||||||
|
} => LeaderRegionManifestInfo::Metric {
|
||||||
|
data_manifest_version,
|
||||||
|
data_flushed_entry_id,
|
||||||
|
metadata_manifest_version,
|
||||||
|
metadata_flushed_entry_id,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LeaderRegionManifestInfo {
|
||||||
|
/// Returns the manifest version of the leader region.
|
||||||
|
pub fn manifest_version(&self) -> u64 {
|
||||||
|
match self {
|
||||||
|
LeaderRegionManifestInfo::Mito {
|
||||||
|
manifest_version, ..
|
||||||
|
} => *manifest_version,
|
||||||
|
LeaderRegionManifestInfo::Metric {
|
||||||
|
data_manifest_version,
|
||||||
|
..
|
||||||
|
} => *data_manifest_version,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the flushed entry id of the leader region.
|
||||||
|
pub fn flushed_entry_id(&self) -> u64 {
|
||||||
|
match self {
|
||||||
|
LeaderRegionManifestInfo::Mito {
|
||||||
|
flushed_entry_id, ..
|
||||||
|
} => *flushed_entry_id,
|
||||||
|
LeaderRegionManifestInfo::Metric {
|
||||||
|
data_flushed_entry_id,
|
||||||
|
..
|
||||||
|
} => *data_flushed_entry_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;
|
pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;
|
||||||
@@ -71,13 +140,13 @@ impl LeaderRegionRegistry {
|
|||||||
entry.insert(leader_region);
|
entry.insert(leader_region);
|
||||||
}
|
}
|
||||||
Entry::Occupied(mut entry) => {
|
Entry::Occupied(mut entry) => {
|
||||||
let manifest_version = entry.get().manifest_version;
|
let manifest_version = entry.get().manifest.manifest_version();
|
||||||
if manifest_version > leader_region.manifest_version {
|
if manifest_version > leader_region.manifest.manifest_version() {
|
||||||
warn!(
|
warn!(
|
||||||
"Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}",
|
"Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}",
|
||||||
region_id,
|
region_id,
|
||||||
manifest_version,
|
manifest_version,
|
||||||
leader_region.manifest_version
|
leader_region.manifest.manifest_version()
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
entry.insert(leader_region);
|
entry.insert(leader_region);
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ use collect_cluster_info_handler::{
|
|||||||
CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
|
CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
|
||||||
CollectFrontendClusterInfoHandler,
|
CollectFrontendClusterInfoHandler,
|
||||||
};
|
};
|
||||||
|
use collect_leader_region_handler::CollectLeaderRegionHandler;
|
||||||
use collect_stats_handler::CollectStatsHandler;
|
use collect_stats_handler::CollectStatsHandler;
|
||||||
use common_base::Plugins;
|
use common_base::Plugins;
|
||||||
use common_meta::datanode::Stat;
|
use common_meta::datanode::Stat;
|
||||||
@@ -62,6 +63,7 @@ use crate::service::mailbox::{
|
|||||||
|
|
||||||
pub mod check_leader_handler;
|
pub mod check_leader_handler;
|
||||||
pub mod collect_cluster_info_handler;
|
pub mod collect_cluster_info_handler;
|
||||||
|
pub mod collect_leader_region_handler;
|
||||||
pub mod collect_stats_handler;
|
pub mod collect_stats_handler;
|
||||||
pub mod extract_stat_handler;
|
pub mod extract_stat_handler;
|
||||||
pub mod failure_handler;
|
pub mod failure_handler;
|
||||||
@@ -570,6 +572,7 @@ impl HeartbeatHandlerGroupBuilder {
|
|||||||
if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
|
if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
|
||||||
self.add_handler_last(publish_heartbeat_handler);
|
self.add_handler_last(publish_heartbeat_handler);
|
||||||
}
|
}
|
||||||
|
self.add_handler_last(CollectLeaderRegionHandler);
|
||||||
self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
|
self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
|
||||||
self.add_handler_last(RemapFlowPeerHandler::default());
|
self.add_handler_last(RemapFlowPeerHandler::default());
|
||||||
|
|
||||||
@@ -848,7 +851,7 @@ mod tests {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let handlers = group.handlers;
|
let handlers = group.handlers;
|
||||||
assert_eq!(13, handlers.len());
|
assert_eq!(14, handlers.len());
|
||||||
|
|
||||||
let names = [
|
let names = [
|
||||||
"ResponseHeaderHandler",
|
"ResponseHeaderHandler",
|
||||||
@@ -862,6 +865,7 @@ mod tests {
|
|||||||
"CollectFlownodeClusterInfoHandler",
|
"CollectFlownodeClusterInfoHandler",
|
||||||
"MailboxHandler",
|
"MailboxHandler",
|
||||||
"FilterInactiveRegionStatsHandler",
|
"FilterInactiveRegionStatsHandler",
|
||||||
|
"CollectLeaderRegionHandler",
|
||||||
"CollectStatsHandler",
|
"CollectStatsHandler",
|
||||||
"RemapFlowPeerHandler",
|
"RemapFlowPeerHandler",
|
||||||
];
|
];
|
||||||
@@ -884,7 +888,7 @@ mod tests {
|
|||||||
|
|
||||||
let group = builder.build().unwrap();
|
let group = builder.build().unwrap();
|
||||||
let handlers = group.handlers;
|
let handlers = group.handlers;
|
||||||
assert_eq!(14, handlers.len());
|
assert_eq!(15, handlers.len());
|
||||||
|
|
||||||
let names = [
|
let names = [
|
||||||
"ResponseHeaderHandler",
|
"ResponseHeaderHandler",
|
||||||
@@ -899,6 +903,7 @@ mod tests {
|
|||||||
"MailboxHandler",
|
"MailboxHandler",
|
||||||
"CollectStatsHandler",
|
"CollectStatsHandler",
|
||||||
"FilterInactiveRegionStatsHandler",
|
"FilterInactiveRegionStatsHandler",
|
||||||
|
"CollectLeaderRegionHandler",
|
||||||
"CollectStatsHandler",
|
"CollectStatsHandler",
|
||||||
"RemapFlowPeerHandler",
|
"RemapFlowPeerHandler",
|
||||||
];
|
];
|
||||||
@@ -918,7 +923,7 @@ mod tests {
|
|||||||
|
|
||||||
let group = builder.build().unwrap();
|
let group = builder.build().unwrap();
|
||||||
let handlers = group.handlers;
|
let handlers = group.handlers;
|
||||||
assert_eq!(14, handlers.len());
|
assert_eq!(15, handlers.len());
|
||||||
|
|
||||||
let names = [
|
let names = [
|
||||||
"CollectStatsHandler",
|
"CollectStatsHandler",
|
||||||
@@ -933,6 +938,7 @@ mod tests {
|
|||||||
"CollectFlownodeClusterInfoHandler",
|
"CollectFlownodeClusterInfoHandler",
|
||||||
"MailboxHandler",
|
"MailboxHandler",
|
||||||
"FilterInactiveRegionStatsHandler",
|
"FilterInactiveRegionStatsHandler",
|
||||||
|
"CollectLeaderRegionHandler",
|
||||||
"CollectStatsHandler",
|
"CollectStatsHandler",
|
||||||
"RemapFlowPeerHandler",
|
"RemapFlowPeerHandler",
|
||||||
];
|
];
|
||||||
@@ -952,7 +958,7 @@ mod tests {
|
|||||||
|
|
||||||
let group = builder.build().unwrap();
|
let group = builder.build().unwrap();
|
||||||
let handlers = group.handlers;
|
let handlers = group.handlers;
|
||||||
assert_eq!(14, handlers.len());
|
assert_eq!(15, handlers.len());
|
||||||
|
|
||||||
let names = [
|
let names = [
|
||||||
"ResponseHeaderHandler",
|
"ResponseHeaderHandler",
|
||||||
@@ -967,6 +973,7 @@ mod tests {
|
|||||||
"MailboxHandler",
|
"MailboxHandler",
|
||||||
"CollectStatsHandler",
|
"CollectStatsHandler",
|
||||||
"FilterInactiveRegionStatsHandler",
|
"FilterInactiveRegionStatsHandler",
|
||||||
|
"CollectLeaderRegionHandler",
|
||||||
"CollectStatsHandler",
|
"CollectStatsHandler",
|
||||||
"RemapFlowPeerHandler",
|
"RemapFlowPeerHandler",
|
||||||
];
|
];
|
||||||
@@ -986,7 +993,7 @@ mod tests {
|
|||||||
|
|
||||||
let group = builder.build().unwrap();
|
let group = builder.build().unwrap();
|
||||||
let handlers = group.handlers;
|
let handlers = group.handlers;
|
||||||
assert_eq!(14, handlers.len());
|
assert_eq!(15, handlers.len());
|
||||||
|
|
||||||
let names = [
|
let names = [
|
||||||
"ResponseHeaderHandler",
|
"ResponseHeaderHandler",
|
||||||
@@ -1000,6 +1007,7 @@ mod tests {
|
|||||||
"CollectFlownodeClusterInfoHandler",
|
"CollectFlownodeClusterInfoHandler",
|
||||||
"MailboxHandler",
|
"MailboxHandler",
|
||||||
"FilterInactiveRegionStatsHandler",
|
"FilterInactiveRegionStatsHandler",
|
||||||
|
"CollectLeaderRegionHandler",
|
||||||
"CollectStatsHandler",
|
"CollectStatsHandler",
|
||||||
"ResponseHeaderHandler",
|
"ResponseHeaderHandler",
|
||||||
"RemapFlowPeerHandler",
|
"RemapFlowPeerHandler",
|
||||||
@@ -1020,7 +1028,7 @@ mod tests {
|
|||||||
|
|
||||||
let group = builder.build().unwrap();
|
let group = builder.build().unwrap();
|
||||||
let handlers = group.handlers;
|
let handlers = group.handlers;
|
||||||
assert_eq!(13, handlers.len());
|
assert_eq!(14, handlers.len());
|
||||||
|
|
||||||
let names = [
|
let names = [
|
||||||
"ResponseHeaderHandler",
|
"ResponseHeaderHandler",
|
||||||
@@ -1034,6 +1042,7 @@ mod tests {
|
|||||||
"CollectFlownodeClusterInfoHandler",
|
"CollectFlownodeClusterInfoHandler",
|
||||||
"CollectStatsHandler",
|
"CollectStatsHandler",
|
||||||
"FilterInactiveRegionStatsHandler",
|
"FilterInactiveRegionStatsHandler",
|
||||||
|
"CollectLeaderRegionHandler",
|
||||||
"CollectStatsHandler",
|
"CollectStatsHandler",
|
||||||
"RemapFlowPeerHandler",
|
"RemapFlowPeerHandler",
|
||||||
];
|
];
|
||||||
@@ -1053,7 +1062,7 @@ mod tests {
|
|||||||
|
|
||||||
let group = builder.build().unwrap();
|
let group = builder.build().unwrap();
|
||||||
let handlers = group.handlers;
|
let handlers = group.handlers;
|
||||||
assert_eq!(13, handlers.len());
|
assert_eq!(14, handlers.len());
|
||||||
|
|
||||||
let names = [
|
let names = [
|
||||||
"ResponseHeaderHandler",
|
"ResponseHeaderHandler",
|
||||||
@@ -1067,6 +1076,7 @@ mod tests {
|
|||||||
"CollectFlownodeClusterInfoHandler",
|
"CollectFlownodeClusterInfoHandler",
|
||||||
"MailboxHandler",
|
"MailboxHandler",
|
||||||
"FilterInactiveRegionStatsHandler",
|
"FilterInactiveRegionStatsHandler",
|
||||||
|
"CollectLeaderRegionHandler",
|
||||||
"ResponseHeaderHandler",
|
"ResponseHeaderHandler",
|
||||||
"RemapFlowPeerHandler",
|
"RemapFlowPeerHandler",
|
||||||
];
|
];
|
||||||
@@ -1086,7 +1096,7 @@ mod tests {
|
|||||||
|
|
||||||
let group = builder.build().unwrap();
|
let group = builder.build().unwrap();
|
||||||
let handlers = group.handlers;
|
let handlers = group.handlers;
|
||||||
assert_eq!(13, handlers.len());
|
assert_eq!(14, handlers.len());
|
||||||
|
|
||||||
let names = [
|
let names = [
|
||||||
"CollectStatsHandler",
|
"CollectStatsHandler",
|
||||||
@@ -1100,6 +1110,7 @@ mod tests {
|
|||||||
"CollectFlownodeClusterInfoHandler",
|
"CollectFlownodeClusterInfoHandler",
|
||||||
"MailboxHandler",
|
"MailboxHandler",
|
||||||
"FilterInactiveRegionStatsHandler",
|
"FilterInactiveRegionStatsHandler",
|
||||||
|
"CollectLeaderRegionHandler",
|
||||||
"CollectStatsHandler",
|
"CollectStatsHandler",
|
||||||
"RemapFlowPeerHandler",
|
"RemapFlowPeerHandler",
|
||||||
];
|
];
|
||||||
|
|||||||
233
src/meta-srv/src/handler/collect_leader_region_handler.rs
Normal file
233
src/meta-srv/src/handler/collect_leader_region_handler.rs
Normal file
@@ -0,0 +1,233 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use api::v1::meta::{HeartbeatRequest, Role};
|
||||||
|
use common_meta::region_registry::LeaderRegion;
|
||||||
|
use common_telemetry::info;
|
||||||
|
use store_api::region_engine::RegionRole;
|
||||||
|
|
||||||
|
use crate::error::Result;
|
||||||
|
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
|
||||||
|
use crate::metasrv::Context;
|
||||||
|
|
||||||
|
pub struct CollectLeaderRegionHandler;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl HeartbeatHandler for CollectLeaderRegionHandler {
|
||||||
|
fn is_acceptable(&self, role: Role) -> bool {
|
||||||
|
role == Role::Datanode
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle(
|
||||||
|
&self,
|
||||||
|
_req: &HeartbeatRequest,
|
||||||
|
ctx: &mut Context,
|
||||||
|
acc: &mut HeartbeatAccumulator,
|
||||||
|
) -> Result<HandleControl> {
|
||||||
|
let Some(current_stat) = acc.stat.as_ref() else {
|
||||||
|
return Ok(HandleControl::Continue);
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut key_values = Vec::with_capacity(current_stat.region_stats.len());
|
||||||
|
for stat in current_stat.region_stats.iter() {
|
||||||
|
if stat.role != RegionRole::Leader {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let manifest = stat.region_manifest.into();
|
||||||
|
let value = LeaderRegion {
|
||||||
|
datanode_id: current_stat.id,
|
||||||
|
manifest,
|
||||||
|
};
|
||||||
|
key_values.push((stat.id, value));
|
||||||
|
}
|
||||||
|
info!("collect leader region: {:?}", key_values);
|
||||||
|
ctx.leader_region_registry.batch_put(key_values);
|
||||||
|
|
||||||
|
Ok(HandleControl::Continue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use common_meta::cache_invalidator::DummyCacheInvalidator;
|
||||||
|
use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
|
||||||
|
use common_meta::key::TableMetadataManager;
|
||||||
|
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||||
|
use common_meta::region_registry::{LeaderRegionManifestInfo, LeaderRegionRegistry};
|
||||||
|
use common_meta::sequence::SequenceBuilder;
|
||||||
|
use store_api::region_engine::RegionRole;
|
||||||
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
use crate::cluster::MetaPeerClientBuilder;
|
||||||
|
use crate::handler::{HeartbeatMailbox, Pushers};
|
||||||
|
use crate::service::store::cached_kv::LeaderCachedKvBackend;
|
||||||
|
|
||||||
|
fn mock_ctx() -> Context {
|
||||||
|
let in_memory = Arc::new(MemoryKvBackend::new());
|
||||||
|
let kv_backend = Arc::new(MemoryKvBackend::new());
|
||||||
|
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
|
||||||
|
kv_backend.clone(),
|
||||||
|
));
|
||||||
|
let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
|
||||||
|
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
|
||||||
|
let meta_peer_client = MetaPeerClientBuilder::default()
|
||||||
|
.election(None)
|
||||||
|
.in_memory(in_memory.clone())
|
||||||
|
.build()
|
||||||
|
.map(Arc::new)
|
||||||
|
// Safety: all required fields set at initialization
|
||||||
|
.unwrap();
|
||||||
|
Context {
|
||||||
|
server_addr: "127.0.0.1:0000".to_string(),
|
||||||
|
in_memory,
|
||||||
|
kv_backend: kv_backend.clone(),
|
||||||
|
leader_cached_kv_backend,
|
||||||
|
meta_peer_client,
|
||||||
|
mailbox,
|
||||||
|
election: None,
|
||||||
|
is_infancy: false,
|
||||||
|
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
|
||||||
|
cache_invalidator: Arc::new(DummyCacheInvalidator),
|
||||||
|
leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new_region_stat(id: RegionId, manifest_version: u64, role: RegionRole) -> RegionStat {
|
||||||
|
RegionStat {
|
||||||
|
id,
|
||||||
|
region_manifest: RegionManifestInfo::Mito {
|
||||||
|
manifest_version,
|
||||||
|
flushed_entry_id: 0,
|
||||||
|
},
|
||||||
|
rcus: 0,
|
||||||
|
wcus: 0,
|
||||||
|
approximate_bytes: 0,
|
||||||
|
engine: "mito".to_string(),
|
||||||
|
role,
|
||||||
|
num_rows: 0,
|
||||||
|
memtable_size: 0,
|
||||||
|
manifest_size: 0,
|
||||||
|
sst_size: 0,
|
||||||
|
index_size: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_handle_collect_leader_region() {
|
||||||
|
let mut ctx = mock_ctx();
|
||||||
|
|
||||||
|
let mut acc = HeartbeatAccumulator {
|
||||||
|
stat: Some(Stat {
|
||||||
|
id: 1,
|
||||||
|
region_stats: vec![
|
||||||
|
new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader),
|
||||||
|
new_region_stat(RegionId::new(1, 2), 2, RegionRole::Follower),
|
||||||
|
],
|
||||||
|
addr: "127.0.0.1:0000".to_string(),
|
||||||
|
region_num: 2,
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let handler = CollectLeaderRegionHandler;
|
||||||
|
let control = handler
|
||||||
|
.handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(control, HandleControl::Continue);
|
||||||
|
let regions = ctx
|
||||||
|
.leader_region_registry
|
||||||
|
.batch_get(vec![RegionId::new(1, 1), RegionId::new(1, 2)].into_iter());
|
||||||
|
assert_eq!(regions.len(), 1);
|
||||||
|
assert_eq!(
|
||||||
|
regions.get(&RegionId::new(1, 1)),
|
||||||
|
Some(&LeaderRegion {
|
||||||
|
datanode_id: 1,
|
||||||
|
manifest: LeaderRegionManifestInfo::Mito {
|
||||||
|
manifest_version: 1,
|
||||||
|
flushed_entry_id: 0,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
// New heartbeat with new manifest version
|
||||||
|
acc.stat = Some(Stat {
|
||||||
|
id: 1,
|
||||||
|
region_stats: vec![new_region_stat(RegionId::new(1, 1), 2, RegionRole::Leader)],
|
||||||
|
timestamp_millis: 0,
|
||||||
|
addr: "127.0.0.1:0000".to_string(),
|
||||||
|
region_num: 1,
|
||||||
|
node_epoch: 0,
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
let control = handler
|
||||||
|
.handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(control, HandleControl::Continue);
|
||||||
|
let regions = ctx
|
||||||
|
.leader_region_registry
|
||||||
|
.batch_get(vec![RegionId::new(1, 1)].into_iter());
|
||||||
|
assert_eq!(regions.len(), 1);
|
||||||
|
assert_eq!(
|
||||||
|
regions.get(&RegionId::new(1, 1)),
|
||||||
|
Some(&LeaderRegion {
|
||||||
|
datanode_id: 1,
|
||||||
|
manifest: LeaderRegionManifestInfo::Mito {
|
||||||
|
manifest_version: 2,
|
||||||
|
flushed_entry_id: 0,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
// New heartbeat with old manifest version
|
||||||
|
acc.stat = Some(Stat {
|
||||||
|
id: 1,
|
||||||
|
region_stats: vec![new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader)],
|
||||||
|
timestamp_millis: 0,
|
||||||
|
addr: "127.0.0.1:0000".to_string(),
|
||||||
|
region_num: 1,
|
||||||
|
node_epoch: 0,
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
let control = handler
|
||||||
|
.handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(control, HandleControl::Continue);
|
||||||
|
let regions = ctx
|
||||||
|
.leader_region_registry
|
||||||
|
.batch_get(vec![RegionId::new(1, 1)].into_iter());
|
||||||
|
assert_eq!(regions.len(), 1);
|
||||||
|
assert_eq!(
|
||||||
|
regions.get(&RegionId::new(1, 1)),
|
||||||
|
// The manifest version is not updated
|
||||||
|
Some(&LeaderRegion {
|
||||||
|
datanode_id: 1,
|
||||||
|
manifest: LeaderRegionManifestInfo::Mito {
|
||||||
|
manifest_version: 2,
|
||||||
|
flushed_entry_id: 0,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -64,7 +64,7 @@ impl HeartbeatHandler for RegionFailureHandler {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use api::v1::meta::HeartbeatRequest;
|
use api::v1::meta::HeartbeatRequest;
|
||||||
use common_catalog::consts::default_engine;
|
use common_catalog::consts::default_engine;
|
||||||
use common_meta::datanode::{RegionStat, Stat};
|
use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
|
||||||
use store_api::region_engine::RegionRole;
|
use store_api::region_engine::RegionRole;
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
@@ -98,7 +98,10 @@ mod tests {
|
|||||||
manifest_size: 0,
|
manifest_size: 0,
|
||||||
sst_size: 0,
|
sst_size: 0,
|
||||||
index_size: 0,
|
index_size: 0,
|
||||||
manifest_version: 0,
|
region_manifest: RegionManifestInfo::Mito {
|
||||||
|
manifest_version: 0,
|
||||||
|
flushed_entry_id: 0,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
acc.stat = Some(Stat {
|
acc.stat = Some(Stat {
|
||||||
|
|||||||
@@ -119,7 +119,7 @@ mod test {
|
|||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use common_meta::datanode::{RegionStat, Stat};
|
use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
|
||||||
use common_meta::distributed_time_constants;
|
use common_meta::distributed_time_constants;
|
||||||
use common_meta::key::table_route::TableRouteValue;
|
use common_meta::key::table_route::TableRouteValue;
|
||||||
use common_meta::key::test_utils::new_test_table_info;
|
use common_meta::key::test_utils::new_test_table_info;
|
||||||
@@ -156,7 +156,10 @@ mod test {
|
|||||||
manifest_size: 0,
|
manifest_size: 0,
|
||||||
sst_size: 0,
|
sst_size: 0,
|
||||||
index_size: 0,
|
index_size: 0,
|
||||||
manifest_version: 0,
|
region_manifest: RegionManifestInfo::Mito {
|
||||||
|
manifest_version: 0,
|
||||||
|
flushed_entry_id: 0,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -94,7 +94,9 @@ impl WeightCompute for RegionNumsBasedWeightCompute {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat, Stat};
|
use common_meta::datanode::{
|
||||||
|
DatanodeStatKey, DatanodeStatValue, RegionManifestInfo, RegionStat, Stat,
|
||||||
|
};
|
||||||
use common_meta::peer::Peer;
|
use common_meta::peer::Peer;
|
||||||
use store_api::region_engine::RegionRole;
|
use store_api::region_engine::RegionRole;
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
@@ -189,7 +191,10 @@ mod tests {
|
|||||||
manifest_size: 0,
|
manifest_size: 0,
|
||||||
sst_size: 0,
|
sst_size: 0,
|
||||||
index_size: 0,
|
index_size: 0,
|
||||||
manifest_version: 0,
|
region_manifest: RegionManifestInfo::Mito {
|
||||||
|
manifest_version: 0,
|
||||||
|
flushed_entry_id: 0,
|
||||||
|
},
|
||||||
}],
|
}],
|
||||||
..Default::default()
|
..Default::default()
|
||||||
}
|
}
|
||||||
@@ -211,7 +216,10 @@ mod tests {
|
|||||||
manifest_size: 0,
|
manifest_size: 0,
|
||||||
sst_size: 0,
|
sst_size: 0,
|
||||||
index_size: 0,
|
index_size: 0,
|
||||||
manifest_version: 0,
|
region_manifest: RegionManifestInfo::Mito {
|
||||||
|
manifest_version: 0,
|
||||||
|
flushed_entry_id: 0,
|
||||||
|
},
|
||||||
}],
|
}],
|
||||||
..Default::default()
|
..Default::default()
|
||||||
}
|
}
|
||||||
@@ -233,7 +241,10 @@ mod tests {
|
|||||||
manifest_size: 0,
|
manifest_size: 0,
|
||||||
sst_size: 0,
|
sst_size: 0,
|
||||||
index_size: 0,
|
index_size: 0,
|
||||||
manifest_version: 0,
|
region_manifest: RegionManifestInfo::Mito {
|
||||||
|
manifest_version: 0,
|
||||||
|
flushed_entry_id: 0,
|
||||||
|
},
|
||||||
}],
|
}],
|
||||||
..Default::default()
|
..Default::default()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -40,8 +40,8 @@ use store_api::manifest::ManifestVersion;
|
|||||||
use store_api::metadata::RegionMetadataRef;
|
use store_api::metadata::RegionMetadataRef;
|
||||||
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
|
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
|
||||||
use store_api::region_engine::{
|
use store_api::region_engine::{
|
||||||
RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse,
|
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
|
||||||
SettableRegionRoleState,
|
SetRegionRoleStateResponse, SettableRegionRoleState,
|
||||||
};
|
};
|
||||||
use store_api::region_request::{BatchRegionDdlRequest, RegionRequest};
|
use store_api::region_request::{BatchRegionDdlRequest, RegionRequest};
|
||||||
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
|
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
|
||||||
@@ -259,7 +259,29 @@ impl RegionEngine for MetricEngine {
|
|||||||
/// Note: Returns `None` if it's a logical region.
|
/// Note: Returns `None` if it's a logical region.
|
||||||
fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
|
fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
|
||||||
if self.inner.is_physical_region(region_id) {
|
if self.inner.is_physical_region(region_id) {
|
||||||
self.inner.mito.region_statistic(region_id)
|
let metadata_region_id = utils::to_metadata_region_id(region_id);
|
||||||
|
let data_region_id = utils::to_data_region_id(region_id);
|
||||||
|
|
||||||
|
let metadata_stat = self.inner.mito.region_statistic(metadata_region_id);
|
||||||
|
let data_stat = self.inner.mito.region_statistic(data_region_id);
|
||||||
|
|
||||||
|
match (metadata_stat, data_stat) {
|
||||||
|
(Some(metadata_stat), Some(data_stat)) => Some(RegionStatistic {
|
||||||
|
num_rows: metadata_stat.num_rows + data_stat.num_rows,
|
||||||
|
memtable_size: metadata_stat.memtable_size + data_stat.memtable_size,
|
||||||
|
wal_size: metadata_stat.wal_size + data_stat.wal_size,
|
||||||
|
manifest_size: metadata_stat.manifest_size + data_stat.manifest_size,
|
||||||
|
sst_size: metadata_stat.sst_size + data_stat.sst_size,
|
||||||
|
index_size: metadata_stat.index_size + data_stat.index_size,
|
||||||
|
manifest: RegionManifestInfo::Metric {
|
||||||
|
data_flushed_entry_id: data_stat.manifest.flushed_entry_id(),
|
||||||
|
data_manifest_version: data_stat.manifest.manifest_version(),
|
||||||
|
metadata_flushed_entry_id: metadata_stat.manifest.flushed_entry_id(),
|
||||||
|
metadata_manifest_version: metadata_stat.manifest.manifest_version(),
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,7 +30,9 @@ use store_api::codec::PrimaryKeyEncoding;
|
|||||||
use store_api::logstore::provider::Provider;
|
use store_api::logstore::provider::Provider;
|
||||||
use store_api::manifest::ManifestVersion;
|
use store_api::manifest::ManifestVersion;
|
||||||
use store_api::metadata::RegionMetadataRef;
|
use store_api::metadata::RegionMetadataRef;
|
||||||
use store_api::region_engine::{RegionRole, RegionStatistic, SettableRegionRoleState};
|
use store_api::region_engine::{
|
||||||
|
RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
|
||||||
|
};
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
use crate::access_layer::AccessLayerRef;
|
use crate::access_layer::AccessLayerRef;
|
||||||
@@ -290,6 +292,7 @@ impl MitoRegion {
|
|||||||
let manifest_usage = self.stats.total_manifest_size();
|
let manifest_usage = self.stats.total_manifest_size();
|
||||||
let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
|
let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
|
||||||
let manifest_version = self.stats.manifest_version();
|
let manifest_version = self.stats.manifest_version();
|
||||||
|
let flushed_entry_id = version.flushed_entry_id;
|
||||||
|
|
||||||
RegionStatistic {
|
RegionStatistic {
|
||||||
num_rows,
|
num_rows,
|
||||||
@@ -298,7 +301,10 @@ impl MitoRegion {
|
|||||||
manifest_size: manifest_usage,
|
manifest_size: manifest_usage,
|
||||||
sst_size: sst_usage,
|
sst_size: sst_usage,
|
||||||
index_size: index_usage,
|
index_size: index_usage,
|
||||||
manifest_version,
|
manifest: RegionManifestInfo::Mito {
|
||||||
|
manifest_version,
|
||||||
|
flushed_entry_id,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -382,9 +382,60 @@ pub struct RegionStatistic {
|
|||||||
/// The size of SST index files in bytes.
|
/// The size of SST index files in bytes.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub index_size: u64,
|
pub index_size: u64,
|
||||||
/// The version of manifest.
|
/// The details of the region.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub manifest_version: u64,
|
pub manifest: RegionManifestInfo,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub enum RegionManifestInfo {
|
||||||
|
Mito {
|
||||||
|
manifest_version: u64,
|
||||||
|
flushed_entry_id: u64,
|
||||||
|
},
|
||||||
|
Metric {
|
||||||
|
data_manifest_version: u64,
|
||||||
|
data_flushed_entry_id: u64,
|
||||||
|
metadata_manifest_version: u64,
|
||||||
|
metadata_flushed_entry_id: u64,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RegionManifestInfo {
|
||||||
|
/// Returns the flushed entry id of the data region.
|
||||||
|
pub fn flushed_entry_id(&self) -> u64 {
|
||||||
|
match self {
|
||||||
|
RegionManifestInfo::Mito {
|
||||||
|
flushed_entry_id, ..
|
||||||
|
} => *flushed_entry_id,
|
||||||
|
RegionManifestInfo::Metric {
|
||||||
|
metadata_flushed_entry_id,
|
||||||
|
..
|
||||||
|
} => *metadata_flushed_entry_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the manifest version of the data region.
|
||||||
|
pub fn manifest_version(&self) -> u64 {
|
||||||
|
match self {
|
||||||
|
RegionManifestInfo::Mito {
|
||||||
|
manifest_version, ..
|
||||||
|
} => *manifest_version,
|
||||||
|
RegionManifestInfo::Metric {
|
||||||
|
metadata_manifest_version,
|
||||||
|
..
|
||||||
|
} => *metadata_manifest_version,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for RegionManifestInfo {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::Mito {
|
||||||
|
manifest_version: 0,
|
||||||
|
flushed_entry_id: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RegionStatistic {
|
impl RegionStatistic {
|
||||||
|
|||||||
Reference in New Issue
Block a user