chore: refresh route table (#4673)

* chore: remove error::

* chore: avoid to use get_raw if unnecessary

* chore: clearer method name

* feat: remap node addresses in table route

* chore: add unit test for remap address

* feat: refresh node address mapping via heartbeat

* feat: broadcast table cache invalidate on new epoch

* chore: clarify heartbeat log

* chore: remove InvalidHeartbeatRequest

* chore: add log

* feat: add role into NodeAddressKey

* chore: fix test

* Update src/common/meta/src/key/table_route.rs

Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>

* chore: simplify code

---------

Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>
This commit is contained in:
jeremyhi
2024-09-10 20:08:59 +08:00
committed by GitHub
parent d0fd79ac7f
commit dcae21208b
14 changed files with 440 additions and 81 deletions

View File

@@ -39,7 +39,7 @@ use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::find_leaders;
use crate::{cache_invalidator, metrics, ClusterId};
use crate::{metrics, ClusterId};
pub struct AlterLogicalTablesProcedure {
pub context: DdlContext,
@@ -170,12 +170,11 @@ impl AlterLogicalTablesProcedure {
}
pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
let ctx = cache_invalidator::Context::default();
let to_invalidate = self.build_table_cache_keys_to_invalidate();
self.context
.cache_invalidator
.invalidate(&ctx, &to_invalidate)
.invalidate(&Default::default(), &to_invalidate)
.await?;
Ok(Status::done())
}

View File

@@ -441,11 +441,9 @@ async fn handle_alter_table_task(
.table_metadata_manager()
.table_route_manager()
.table_route_storage()
.get_raw(table_id)
.get(table_id)
.await?
.context(TableRouteNotFoundSnafu { table_id })?
.into_inner();
.context(TableRouteNotFoundSnafu { table_id })?;
ensure!(
table_route_value.is_physical(),
UnexpectedLogicalRouteTableSnafu {

View File

@@ -90,6 +90,7 @@
pub mod catalog_name;
pub mod datanode_table;
pub mod flow;
pub mod node_address;
pub mod schema_name;
pub mod table_info;
pub mod table_name;
@@ -102,7 +103,7 @@ pub mod view_info;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Debug;
use std::ops::Deref;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use bytes::Bytes;
@@ -134,6 +135,7 @@ use self::table_route::{TableRouteManager, TableRouteValue};
use self::tombstone::TombstoneManager;
use crate::ddl::utils::region_storage_path;
use crate::error::{self, Result, SerdeJsonSnafu};
use crate::key::node_address::NodeAddressValue;
use crate::key::table_route::TableRouteKey;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::kv_backend::txn::{Txn, TxnOp};
@@ -152,12 +154,15 @@ pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
pub const CACHE_KEY_PREFIXES: [&str; 4] = [
/// The keys with these prefixes will be loaded into the cache when the leader starts.
pub const CACHE_KEY_PREFIXES: [&str; 5] = [
TABLE_NAME_KEY_PREFIX,
CATALOG_NAME_KEY_PREFIX,
SCHEMA_NAME_KEY_PREFIX,
TABLE_ROUTE_PREFIX,
NODE_ADDRESS_PREFIX,
];
pub type RegionDistribution = BTreeMap<DatanodeId, Vec<RegionNumber>>;
@@ -210,6 +215,11 @@ lazy_static! {
.unwrap();
}
lazy_static! {
static ref NODE_ADDRESS_PATTERN: Regex =
Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
}
/// The key of metadata.
pub trait MetadataKey<'a, T> {
fn to_bytes(&self) -> Vec<u8>;
@@ -306,6 +316,12 @@ impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
}
}
impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
@@ -1230,7 +1246,8 @@ impl_metadata_value! {
FlowInfoValue,
FlowNameValue,
FlowRouteValue,
TableFlowValue
TableFlowValue,
NodeAddressValue
}
impl_optional_metadata_value! {
@@ -1952,7 +1969,7 @@ mod tests {
let table_route_value = table_metadata_manager
.table_route_manager
.table_route_storage()
.get_raw(table_id)
.get_with_raw_bytes(table_id)
.await
.unwrap()
.unwrap();
@@ -2005,7 +2022,7 @@ mod tests {
let table_route_value = table_metadata_manager
.table_route_manager
.table_route_storage()
.get_raw(table_id)
.get_with_raw_bytes(table_id)
.await
.unwrap()
.unwrap();

View File

@@ -0,0 +1,114 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use api::v1::meta::Role;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use crate::error::{InvalidMetadataSnafu, Result};
use crate::key::{MetadataKey, NODE_ADDRESS_PATTERN, NODE_ADDRESS_PREFIX};
use crate::peer::Peer;
/// The key stores node address.
///
/// The layout: `__node_address/{role}/{node_id}`
#[derive(Debug, PartialEq)]
pub struct NodeAddressKey {
pub role: Role,
pub node_id: u64,
}
impl NodeAddressKey {
pub fn new(role: Role, node_id: u64) -> Self {
Self { role, node_id }
}
pub fn with_datanode(node_id: u64) -> Self {
Self::new(Role::Datanode, node_id)
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct NodeAddressValue {
pub peer: Peer,
}
impl NodeAddressValue {
pub fn new(peer: Peer) -> Self {
Self { peer }
}
}
impl<'a> MetadataKey<'a, NodeAddressKey> for NodeAddressKey {
fn to_bytes(&self) -> Vec<u8> {
self.to_string().into_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<NodeAddressKey> {
let key = std::str::from_utf8(bytes).map_err(|e| {
InvalidMetadataSnafu {
err_msg: format!(
"NodeAddressKey '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
let captures = NODE_ADDRESS_PATTERN
.captures(key)
.context(InvalidMetadataSnafu {
err_msg: format!("Invalid NodeAddressKey '{key}'"),
})?;
// Safety: pass the regex check above
let role = captures[1].parse::<i32>().unwrap();
let role = Role::try_from(role).map_err(|_| {
InvalidMetadataSnafu {
err_msg: format!("Invalid Role value: {role}"),
}
.build()
})?;
let node_id = captures[2].parse::<u64>().unwrap();
Ok(NodeAddressKey::new(role, node_id))
}
}
impl Display for NodeAddressKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}/{}/{}",
NODE_ADDRESS_PREFIX, self.role as i32, self.node_id
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_node_address_key() {
let key = NodeAddressKey::new(Role::Datanode, 1);
let bytes = key.to_bytes();
let key2 = NodeAddressKey::from_bytes(&bytes).unwrap();
assert_eq!(key, key2);
let key = NodeAddressKey::new(Role::Flownode, 3);
let bytes = key.to_bytes();
let key2 = NodeAddressKey::from_bytes(&bytes).unwrap();
assert_eq!(key, key2);
}
}

View File

@@ -22,9 +22,10 @@ use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use crate::error::{
self, InvalidMetadataSnafu, MetadataCorruptionSnafu, Result, SerdeJsonSnafu,
TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu,
InvalidMetadataSnafu, MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu,
UnexpectedLogicalRouteTableSnafu,
};
use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
DeserializedValueWithBytes, MetadataKey, MetadataValue, RegionDistribution,
@@ -85,7 +86,7 @@ impl TableRouteValue {
debug_assert_eq!(region.region.id.table_id(), physical_table_id);
RegionId::new(table_id, region.region.id.region_number())
})
.collect::<Vec<_>>();
.collect();
TableRouteValue::logical(physical_table_id, region_routes)
}
}
@@ -189,12 +190,12 @@ impl TableRouteValue {
.region_routes
.iter()
.map(|region_route| region_route.region.id.region_number())
.collect::<Vec<_>>(),
.collect(),
TableRouteValue::Logical(x) => x
.region_ids()
.iter()
.map(|region_id| region_id.region_number())
.collect::<Vec<_>>(),
.collect(),
}
}
}
@@ -301,7 +302,7 @@ impl TableRouteManager {
Some(route) => {
ensure!(
route.is_physical(),
error::UnexpectedLogicalRouteTableSnafu {
UnexpectedLogicalRouteTableSnafu {
err_msg: format!("{route:?} is a non-physical TableRouteValue.")
}
);
@@ -321,7 +322,7 @@ impl TableRouteManager {
) -> Result<TableId> {
let table_route = self
.storage
.get(logical_or_physical_table_id)
.get_inner(logical_or_physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: logical_or_physical_table_id,
@@ -335,7 +336,7 @@ impl TableRouteManager {
/// Returns the [TableRouteValue::Physical] recursively.
///
/// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
/// Returns a [TableRouteNotFound](error::Error::TableRouteNotFound) Error if:
/// - the physical table(`logical_or_physical_table_id`) does not exist
/// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exist.
pub async fn get_physical_table_route(
@@ -528,6 +529,15 @@ impl TableRouteStorage {
/// Returns the [`TableRouteValue`].
pub async fn get(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
let mut table_route = self.get_inner(table_id).await?;
if let Some(table_route) = &mut table_route {
self.remap_route_address(table_route).await?;
};
Ok(table_route)
}
async fn get_inner(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
let key = TableRouteKey::new(table_id);
self.kv_backend
.get(&key.to_bytes())
@@ -537,7 +547,19 @@ impl TableRouteStorage {
}
/// Returns the [`TableRouteValue`] wrapped with [`DeserializedValueWithBytes`].
pub async fn get_raw(
pub async fn get_with_raw_bytes(
&self,
table_id: TableId,
) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
let mut table_route = self.get_with_raw_bytes_inner(table_id).await?;
if let Some(table_route) = &mut table_route {
self.remap_route_address(table_route).await?;
};
Ok(table_route)
}
async fn get_with_raw_bytes_inner(
&self,
table_id: TableId,
) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
@@ -554,27 +576,27 @@ impl TableRouteStorage {
/// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
/// - the physical table(`logical_or_physical_table_id`) does not exist
/// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exist.
pub async fn get_raw_physical_table_route(
pub async fn get_physical_table_route_with_raw_bytes(
&self,
logical_or_physical_table_id: TableId,
) -> Result<(TableId, DeserializedValueWithBytes<TableRouteValue>)> {
let table_route =
self.get_raw(logical_or_physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: logical_or_physical_table_id,
})?;
let table_route = self
.get_with_raw_bytes(logical_or_physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: logical_or_physical_table_id,
})?;
match table_route.get_inner_ref() {
TableRouteValue::Physical(_) => Ok((logical_or_physical_table_id, table_route)),
TableRouteValue::Logical(x) => {
let physical_table_id = x.physical_table_id();
let physical_table_route =
self.get_raw(physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
let physical_table_route = self
.get_with_raw_bytes(physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
Ok((physical_table_id, physical_table_route))
}
}
@@ -582,6 +604,13 @@ impl TableRouteStorage {
/// Returns batch of [`TableRouteValue`] that respects the order of `table_ids`.
pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
let mut table_routes = self.batch_get_inner(table_ids).await?;
self.remap_routes_addresses(&mut table_routes).await?;
Ok(table_routes)
}
async fn batch_get_inner(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
let keys = table_ids
.iter()
.map(|id| TableRouteKey::new(*id).to_bytes())
@@ -604,8 +633,107 @@ impl TableRouteStorage {
Ok(None)
}
})
.collect::<Result<Vec<_>>>()
.collect()
}
async fn remap_routes_addresses(
&self,
table_routes: &mut [Option<TableRouteValue>],
) -> Result<()> {
let keys = table_routes
.iter()
.flat_map(|table_route| {
table_route
.as_ref()
.map(extract_address_keys)
.unwrap_or_default()
})
.collect::<HashSet<_>>()
.into_iter()
.collect();
let node_addrs = self.get_node_addresses(keys).await?;
for table_route in table_routes.iter_mut().flatten() {
set_addresses(&node_addrs, table_route)?;
}
Ok(())
}
async fn remap_route_address(&self, table_route: &mut TableRouteValue) -> Result<()> {
let keys = extract_address_keys(table_route).into_iter().collect();
let node_addrs = self.get_node_addresses(keys).await?;
set_addresses(&node_addrs, table_route)?;
Ok(())
}
async fn get_node_addresses(
&self,
keys: Vec<Vec<u8>>,
) -> Result<HashMap<u64, NodeAddressValue>> {
if keys.is_empty() {
return Ok(HashMap::default());
}
self.kv_backend
.batch_get(BatchGetRequest { keys })
.await?
.kvs
.into_iter()
.map(|kv| {
let node_id = NodeAddressKey::from_bytes(&kv.key)?.node_id;
let node_addr = NodeAddressValue::try_from_raw_value(&kv.value)?;
Ok((node_id, node_addr))
})
.collect()
}
}
fn set_addresses(
node_addrs: &HashMap<u64, NodeAddressValue>,
table_route: &mut TableRouteValue,
) -> Result<()> {
let TableRouteValue::Physical(physical_table_route) = table_route else {
return Ok(());
};
for region_route in &mut physical_table_route.region_routes {
if let Some(leader) = &mut region_route.leader_peer {
if let Some(node_addr) = node_addrs.get(&leader.id) {
leader.addr = node_addr.peer.addr.clone();
}
}
for follower in &mut region_route.follower_peers {
if let Some(node_addr) = node_addrs.get(&follower.id) {
follower.addr = node_addr.peer.addr.clone();
}
}
}
Ok(())
}
fn extract_address_keys(table_route: &TableRouteValue) -> HashSet<Vec<u8>> {
let TableRouteValue::Physical(physical_table_route) = table_route else {
return HashSet::default();
};
physical_table_route
.region_routes
.iter()
.flat_map(|region_route| {
region_route
.follower_peers
.iter()
.map(|peer| NodeAddressKey::with_datanode(peer.id).to_bytes())
.chain(
region_route
.leader_peer
.as_ref()
.map(|leader| NodeAddressKey::with_datanode(leader.id).to_bytes()),
)
})
.collect()
}
#[cfg(test)]
@@ -614,7 +742,9 @@ mod tests {
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::TxnService;
use crate::kv_backend::{KvBackend, TxnService};
use crate::peer::Peer;
use crate::rpc::store::PutRequest;
#[test]
fn test_table_route_compatibility() {
@@ -643,18 +773,18 @@ mod tests {
}
#[tokio::test]
async fn test_table_route_storage_get_raw_empty() {
async fn test_table_route_storage_get_with_raw_bytes_empty() {
let kv = Arc::new(MemoryKvBackend::default());
let table_route_storage = TableRouteStorage::new(kv);
let table_route = table_route_storage.get_raw(1024).await.unwrap();
let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
assert!(table_route.is_none());
}
#[tokio::test]
async fn test_table_route_storage_get_raw() {
async fn test_table_route_storage_get_with_raw_bytes() {
let kv = Arc::new(MemoryKvBackend::default());
let table_route_storage = TableRouteStorage::new(kv.clone());
let table_route = table_route_storage.get_raw(1024).await.unwrap();
let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
assert!(table_route.is_none());
let table_route_manager = TableRouteManager::new(kv.clone());
let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue {
@@ -667,7 +797,7 @@ mod tests {
.unwrap();
let r = kv.txn(txn).await.unwrap();
assert!(r.succeeded);
let table_route = table_route_storage.get_raw(1024).await.unwrap();
let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
assert!(table_route.is_some());
let got = table_route.unwrap().inner;
assert_eq!(got, table_route_value);
@@ -718,4 +848,61 @@ mod tests {
assert!(results[2].is_none());
assert_eq!(results[3].as_ref().unwrap(), &routes[0].1);
}
#[tokio::test]
async fn remap_route_address_updates_addresses() {
let kv = Arc::new(MemoryKvBackend::default());
let table_route_storage = TableRouteStorage::new(kv.clone());
let mut table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
region_routes: vec![RegionRoute {
leader_peer: Some(Peer {
id: 1,
..Default::default()
}),
follower_peers: vec![Peer {
id: 2,
..Default::default()
}],
..Default::default()
}],
version: 0,
});
kv.put(PutRequest {
key: NodeAddressKey::with_datanode(1).to_bytes(),
value: NodeAddressValue {
peer: Peer {
addr: "addr1".to_string(),
..Default::default()
},
}
.try_as_raw_value()
.unwrap(),
..Default::default()
})
.await
.unwrap();
table_route_storage
.remap_route_address(&mut table_route)
.await
.unwrap();
if let TableRouteValue::Physical(physical_table_route) = table_route {
assert_eq!(
physical_table_route.region_routes[0]
.leader_peer
.as_ref()
.unwrap()
.addr,
"addr1"
);
assert_eq!(
physical_table_route.region_routes[0].follower_peers[0].addr,
""
);
} else {
panic!("Expected PhysicalTableRouteValue");
}
}
}

View File

@@ -192,7 +192,7 @@ impl HeartbeatTask {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);
let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
let quit_signal = Arc::new(tokio::sync::Notify::new());
let quit_signal = Arc::new(Notify::new());
let mut tx = Self::create_streams(
&meta_client,

View File

@@ -655,13 +655,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid heartbeat request: {}", err_msg))]
InvalidHeartbeatRequest {
err_msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to publish message"))]
PublishMessage {
#[snafu(source)]
@@ -809,7 +802,6 @@ impl ErrorExt for Error {
| Error::UnsupportedSelectorType { .. }
| Error::InvalidArguments { .. }
| Error::InitExportMetricsTask { .. }
| Error::InvalidHeartbeatRequest { .. }
| Error::ProcedureNotFound { .. }
| Error::TooManyPartitions { .. }
| Error::TomlFormat { .. } => StatusCode::InvalidArguments,

View File

@@ -15,8 +15,12 @@
use std::cmp::Ordering;
use api::v1::meta::{HeartbeatRequest, Role};
use common_meta::instruction::CacheIdent;
use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue};
use common_meta::key::{MetadataKey, MetadataValue};
use common_meta::peer::Peer;
use common_meta::rpc::store::PutRequest;
use common_telemetry::warn;
use common_telemetry::{error, warn};
use dashmap::DashMap;
use snafu::ResultExt;
@@ -120,6 +124,13 @@ impl HeartbeatHandler for CollectStatsHandler {
true
};
// Need to refresh the [datanode -> address] mapping
if refresh {
// Safety: `epoch_stats.stats` is not empty
let last = epoch_stats.stats.last().unwrap();
rewrite_node_address(ctx, last).await;
}
if !refresh && epoch_stats.len() < MAX_CACHED_STATS_PER_KEY {
return Ok(HandleControl::Continue);
}
@@ -131,7 +142,7 @@ impl HeartbeatHandler for CollectStatsHandler {
let put = PutRequest {
key,
value,
..Default::default()
prev_kv: false,
};
let _ = ctx
@@ -144,6 +155,44 @@ impl HeartbeatHandler for CollectStatsHandler {
}
}
async fn rewrite_node_address(ctx: &mut Context, stat: &Stat) {
let peer = Peer {
id: stat.id,
addr: stat.addr.clone(),
};
let key = NodeAddressKey::with_datanode(peer.id).to_bytes();
if let Ok(value) = NodeAddressValue::new(peer.clone()).try_as_raw_value() {
let put = PutRequest {
key,
value,
prev_kv: false,
};
match ctx.leader_cached_kv_backend.put(put).await {
Ok(_) => {
// broadcast invalidating cache
let cache_idents = stat
.table_ids()
.into_iter()
.map(CacheIdent::TableId)
.collect::<Vec<_>>();
if let Err(e) = ctx
.cache_invalidator
.invalidate(&Default::default(), &cache_idents)
.await
{
error!(e; "Failed to invalidate {} `NodeAddressKey` cache, peer: {:?}", cache_idents.len(), peer);
}
}
Err(e) => {
error!(e; "Failed to update NodeAddressValue: {:?}", peer);
}
}
} else {
warn!("Failed to serialize NodeAddressValue: {:?}", peer);
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Role};
use common_telemetry::warn;
use common_telemetry::{info, warn};
use super::node_stat::Stat;
use crate::error::Result;
@@ -40,12 +40,15 @@ impl HeartbeatHandler for ExtractStatHandler {
return Ok(HandleControl::Continue);
}
match Stat::try_from(req.clone()) {
match Stat::try_from(req) {
Ok(stat) => {
let _ = acc.stat.insert(stat);
}
Err(err) => {
warn!(err; "Incomplete heartbeat data: {:?}", req);
Err(Some(header)) => {
info!("New handshake request: {:?}", header);
}
Err(_) => {
warn!("Incomplete heartbeat data: {:?}", req);
}
};

View File

@@ -14,14 +14,14 @@
use std::collections::{HashMap, HashSet};
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::{HeartbeatRequest, RequestHeader};
use common_meta::ClusterId;
use common_time::util as time_util;
use serde::{Deserialize, Serialize};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use table::metadata::TableId;
use crate::error::{Error, InvalidHeartbeatRequestSnafu};
use crate::key::DatanodeStatKey;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
@@ -79,6 +79,11 @@ impl Stat {
self.region_stats.iter().map(|s| (s.id, s.role)).collect()
}
/// Returns all table ids in the region stats.
pub fn table_ids(&self) -> HashSet<TableId> {
self.region_stats.iter().map(|s| s.id.table_id()).collect()
}
pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<RegionId>) {
if inactive_region_ids.is_empty() {
return;
@@ -92,10 +97,10 @@ impl Stat {
}
}
impl TryFrom<HeartbeatRequest> for Stat {
type Error = Error;
impl TryFrom<&HeartbeatRequest> for Stat {
type Error = Option<RequestHeader>;
fn try_from(value: HeartbeatRequest) -> Result<Self, Self::Error> {
fn try_from(value: &HeartbeatRequest) -> Result<Self, Self::Error> {
let HeartbeatRequest {
header,
peer,
@@ -107,9 +112,9 @@ impl TryFrom<HeartbeatRequest> for Stat {
match (header, peer) {
(Some(header), Some(peer)) => {
let region_stats = region_stats
.into_iter()
.map(RegionStat::try_from)
.collect::<Result<Vec<_>, _>>()?;
.iter()
.map(RegionStat::from)
.collect::<Vec<_>>();
Ok(Self {
timestamp_millis: time_util::current_time_millis(),
@@ -117,35 +122,30 @@ impl TryFrom<HeartbeatRequest> for Stat {
// datanode id
id: peer.id,
// datanode address
addr: peer.addr,
addr: peer.addr.clone(),
rcus: region_stats.iter().map(|s| s.rcus).sum(),
wcus: region_stats.iter().map(|s| s.wcus).sum(),
region_num: region_stats.len() as u64,
region_stats,
node_epoch,
node_epoch: *node_epoch,
})
}
_ => InvalidHeartbeatRequestSnafu {
err_msg: "missing header or peer",
}
.fail(),
(header, _) => Err(header.clone()),
}
}
}
impl TryFrom<api::v1::meta::RegionStat> for RegionStat {
type Error = Error;
fn try_from(value: api::v1::meta::RegionStat) -> Result<Self, Self::Error> {
Ok(Self {
impl From<&api::v1::meta::RegionStat> for RegionStat {
fn from(value: &api::v1::meta::RegionStat) -> Self {
Self {
id: RegionId::from_u64(value.region_id),
rcus: value.rcus,
wcus: value.wcus,
approximate_bytes: value.approximate_bytes,
engine: value.engine.to_string(),
role: RegionRole::from(value.role()),
extensions: value.extensions,
})
extensions: value.extensions.clone(),
}
}
}

View File

@@ -232,7 +232,7 @@ impl Context {
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_raw(table_id)
.get_with_raw_bytes(table_id)
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)

View File

@@ -146,7 +146,7 @@ mod tests {
let original_table_route = table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_raw(table_id)
.get_with_raw_bytes(table_id)
.await
.unwrap()
.unwrap();

View File

@@ -140,7 +140,7 @@ mod tests {
let old_table_route = table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_raw(table_id)
.get_with_raw_bytes(table_id)
.await
.unwrap()
.unwrap();

View File

@@ -337,7 +337,7 @@ mod tests {
let original_table_route = table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_raw(table_id)
.get_with_raw_bytes(table_id)
.await
.unwrap()
.unwrap();