feat: datanode support report number of regions to meta (#838)

* feat: dn support report number of regions to meta

* put the heartbeat batch to store

* cr: change region_number's parameter to &CatalogManagerRef

* cr: when dn failed to get region number, report region_num = -1 to meta
This commit is contained in:
fys
2023-01-09 16:13:53 +08:00
committed by GitHub
parent 2679faf911
commit 9e58311ecd
10 changed files with 332 additions and 32 deletions

View File

@@ -34,13 +34,13 @@ message HeartbeatRequest {
message NodeStat {
// The read capacity units during this period
uint64 rcus = 1;
int64 rcus = 1;
// The write capacity units during this period
uint64 wcus = 2;
int64 wcus = 2;
// How many tables on this node
uint64 table_num = 3;
int64 table_num = 3;
// How many regions on this node
uint64 region_num = 4;
int64 region_num = 4;
double cpu_usage = 5;
double load = 6;
@@ -57,13 +57,13 @@ message RegionStat {
uint64 region_id = 1;
TableName table_name = 2;
// The read capacity units during this period
uint64 rcus = 3;
int64 rcus = 3;
// The write capacity units during this period
uint64 wcus = 4;
int64 wcus = 4;
// Approximate bytes of this region
uint64 approximate_bytes = 5;
int64 approximate_bytes = 5;
// Approximate number of rows in this region
uint64 approximate_rows = 6;
int64 approximate_rows = 6;
// Others
map<string, string> attrs = 100;

View File

@@ -19,7 +19,7 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use common_telemetry::info;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use table::engine::{EngineContext, TableEngineRef};
use table::metadata::TableId;
use table::requests::CreateTableRequest;
@@ -208,3 +208,38 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
}
Ok(())
}
/// The number of regions in the datanode node.
pub fn region_number(catalog_manager: &CatalogManagerRef) -> Result<u64> {
let mut region_number: u64 = 0;
for catalog_name in catalog_manager.catalog_names()? {
let catalog =
catalog_manager
.catalog(&catalog_name)?
.context(error::CatalogNotFoundSnafu {
catalog_name: &catalog_name,
})?;
for schema_name in catalog.schema_names()? {
let schema = catalog
.schema(&schema_name)?
.context(error::SchemaNotFoundSnafu {
catalog: &catalog_name,
schema: &schema_name,
})?;
for table_name in schema.table_names()? {
let table = schema
.table(&table_name)?
.context(error::TableNotFoundSnafu {
table_info: &table_name,
})?;
let region_numbers = &table.table_info().meta.region_numbers;
region_number += region_numbers.len() as u64;
}
}
}
Ok(region_number)
}

View File

@@ -16,19 +16,20 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, Peer};
use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, NodeStat, Peer};
use catalog::{region_number, CatalogManagerRef};
use common_telemetry::{error, info, warn};
use meta_client::client::{HeartbeatSender, MetaClient};
use snafu::ResultExt;
use crate::error::{MetaClientInitSnafu, Result};
#[derive(Debug, Clone, Default)]
pub struct HeartbeatTask {
node_id: u64,
server_addr: String,
running: Arc<AtomicBool>,
meta_client: Arc<MetaClient>,
catalog_manager: CatalogManagerRef,
interval: u64,
}
@@ -40,12 +41,18 @@ impl Drop for HeartbeatTask {
impl HeartbeatTask {
/// Create a new heartbeat task instance.
pub fn new(node_id: u64, server_addr: String, meta_client: Arc<MetaClient>) -> Self {
pub fn new(
node_id: u64,
server_addr: String,
meta_client: Arc<MetaClient>,
catalog_manager: CatalogManagerRef,
) -> Self {
Self {
node_id,
server_addr,
running: Arc::new(AtomicBool::new(false)),
meta_client,
catalog_manager,
interval: 5_000, // default interval is set to 5 secs
}
}
@@ -92,16 +99,30 @@ impl HeartbeatTask {
let server_addr = self.server_addr.clone();
let meta_client = self.meta_client.clone();
let catalog_manager_clone = self.catalog_manager.clone();
let mut tx = Self::create_streams(&meta_client, running.clone()).await?;
common_runtime::spawn_bg(async move {
while running.load(Ordering::Acquire) {
let region_num = match region_number(&catalog_manager_clone) {
Ok(region_num) => region_num as i64,
Err(e) => {
error!("failed to get region number, err: {e:?}");
-1
}
};
let req = HeartbeatRequest {
peer: Some(Peer {
id: node_id,
addr: server_addr.clone(),
}),
node_stat: Some(NodeStat {
region_num,
..Default::default()
}),
..Default::default()
};
if let Err(e) = tx.send(req).await {
error!("Failed to send heartbeat to metasrv, error: {:?}", e);
match Self::create_streams(&meta_client, running.clone()).await {

View File

@@ -146,6 +146,7 @@ impl Instance {
opts.node_id.context(MissingNodeIdSnafu)?,
opts.rpc_addr.clone(),
meta_client.as_ref().unwrap().clone(),
catalog_manager.clone(),
)),
};
Ok(Self {

View File

@@ -71,6 +71,7 @@ impl Instance {
opts.node_id.unwrap_or(42),
opts.rpc_addr.clone(),
meta_client.clone(),
catalog_manager.clone(),
);
Ok(Self {
query_engine: query_engine.clone(),

View File

@@ -22,7 +22,7 @@ mod check_leader_handler;
mod collect_stats_handler;
mod instruction;
mod keep_lease_handler;
mod node_stat;
pub(crate) mod node_stat;
mod persist_stats_handler;
mod response_header_handler;

View File

@@ -14,8 +14,11 @@
use api::v1::meta::HeartbeatRequest;
use common_time::util as time_util;
use serde::{Deserialize, Serialize};
#[derive(Debug)]
use crate::keys::StatKey;
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct Stat {
pub timestamp_millis: i64,
pub cluster_id: u64,
@@ -24,13 +27,13 @@ pub struct Stat {
/// Leader node
pub is_leader: bool,
/// The read capacity units during this period
pub rcus: u64,
pub rcus: i64,
/// The write capacity units during this period
pub wcus: u64,
pub wcus: i64,
/// How many tables on this node
pub table_num: u64,
pub table_num: i64,
/// How many regions on this node
pub region_num: u64,
pub region_num: i64,
pub cpu_usage: f64,
pub load: f64,
/// Read disk IO on this node
@@ -41,20 +44,29 @@ pub struct Stat {
pub region_stats: Vec<RegionStat>,
}
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct RegionStat {
pub id: u64,
pub catalog: String,
pub schema: String,
pub table: String,
/// The read capacity units during this period
pub rcus: u64,
pub rcus: i64,
/// The write capacity units during this period
pub wcus: u64,
pub wcus: i64,
/// Approximate bytes of this region
pub approximate_bytes: u64,
pub approximate_bytes: i64,
/// Approximate number of rows in this region
pub approximate_rows: u64,
pub approximate_rows: i64,
}
impl Stat {
pub fn stat_key(&self) -> StatKey {
StatKey {
cluster_id: self.cluster_id,
node_id: self.id,
}
}
}
impl TryFrom<&HeartbeatRequest> for Stat {
@@ -107,3 +119,23 @@ impl From<&api::v1::meta::RegionStat> for RegionStat {
}
}
}
#[cfg(test)]
mod tests {
use crate::handler::node_stat::Stat;
#[test]
fn test_stat_key() {
let stat = Stat {
cluster_id: 3,
id: 101,
region_num: 10,
..Default::default()
};
let stat_key = stat.stat_key();
assert_eq!(3, stat_key.cluster_id);
assert_eq!(101, stat_key.node_id);
}
}

View File

@@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::{HeartbeatRequest, PutRequest};
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::keys::StatValue;
use crate::metasrv::Context;
#[derive(Default)]
@@ -33,8 +34,89 @@ impl HeartbeatHandler for PersistStatsHandler {
return Ok(());
}
// TODO(jiachun): remove stats from `acc` and persist to store
let stats = &mut acc.stats;
let key = match stats.get(0) {
Some(stat) => stat.stat_key(),
None => return Ok(()),
};
// take stats from &mut acc.stats, avoid clone of vec
let stats = std::mem::take(stats);
let val = &StatValue { stats };
let put = PutRequest {
key: key.into(),
value: val.try_into()?,
..Default::default()
};
ctx.kv_store.put(put).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use api::v1::meta::RangeRequest;
use super::*;
use crate::handler::node_stat::Stat;
use crate::keys::StatKey;
use crate::service::store::memory::MemStore;
#[tokio::test]
async fn test_handle_datanode_stats() {
let kv_store = Arc::new(MemStore::new());
let ctx = Context {
datanode_lease_secs: 30,
server_addr: "127.0.0.1:0000".to_string(),
kv_store,
election: None,
skip_all: Arc::new(AtomicBool::new(false)),
};
let req = HeartbeatRequest::default();
let mut acc = HeartbeatAccumulator {
stats: vec![Stat {
cluster_id: 3,
id: 101,
region_num: 100,
..Default::default()
}],
..Default::default()
};
let stats_handler = PersistStatsHandler;
stats_handler.handle(&req, &ctx, &mut acc).await.unwrap();
let key = StatKey {
cluster_id: 3,
node_id: 101,
};
let req = RangeRequest {
key: key.try_into().unwrap(),
..Default::default()
};
let res = ctx.kv_store.range(req).await.unwrap();
assert_eq!(1, res.kvs.len());
let kv = &res.kvs[0];
let key: StatKey = kv.key.clone().try_into().unwrap();
assert_eq!(3, key.cluster_id);
assert_eq!(101, key.node_id);
let val: StatValue = kv.value.clone().try_into().unwrap();
assert_eq!(1, val.stats.len());
assert_eq!(100, val.stats[0].region_num);
}
}

View File

@@ -23,17 +23,22 @@ use snafu::{ensure, OptionExt, ResultExt};
use crate::error;
use crate::error::Result;
use crate::handler::node_stat::Stat;
pub(crate) const REMOVED_PREFIX: &str = "__removed";
pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease";
pub(crate) const SEQ_PREFIX: &str = "__meta_seq";
pub(crate) const TABLE_ROUTE_PREFIX: &str = "__meta_table_route";
pub const DN_STAT_PREFIX: &str = "__meta_dnstat";
lazy_static! {
static ref DATANODE_KEY_PATTERN: Regex =
static ref DATANODE_LEASE_KEY_PATTERN: Regex =
Regex::new(&format!("^{DN_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
static ref DATANODE_STAT_KEY_PATTERN: Regex =
Regex::new(&format!("^{DN_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
pub struct LeaseKey {
pub cluster_id: u64,
pub node_id: u64,
@@ -43,7 +48,7 @@ impl FromStr for LeaseKey {
type Err = error::Error;
fn from_str(key: &str) -> Result<Self> {
let caps = DATANODE_KEY_PATTERN
let caps = DATANODE_LEASE_KEY_PATTERN
.captures(key)
.context(error::InvalidLeaseKeySnafu { key })?;
@@ -169,12 +174,135 @@ impl<'a> TableRouteKey<'a> {
}
}
#[derive(Eq, PartialEq, Debug)]
pub struct StatKey {
pub cluster_id: u64,
pub node_id: u64,
}
impl From<StatKey> for Vec<u8> {
fn from(value: StatKey) -> Self {
format!("{}-{}-{}", DN_STAT_PREFIX, value.cluster_id, value.node_id).into_bytes()
}
}
impl FromStr for StatKey {
type Err = error::Error;
fn from_str(key: &str) -> Result<Self> {
let caps = DATANODE_STAT_KEY_PATTERN
.captures(key)
.context(error::InvalidLeaseKeySnafu { key })?;
ensure!(caps.len() == 3, error::InvalidLeaseKeySnafu { key });
let cluster_id = caps[1].to_string();
let node_id = caps[2].to_string();
let cluster_id: u64 = cluster_id.parse().context(error::ParseNumSnafu {
err_msg: format!("invalid cluster_id: {cluster_id}"),
})?;
let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
err_msg: format!("invalid node_id: {node_id}"),
})?;
Ok(Self {
cluster_id,
node_id,
})
}
}
impl TryFrom<Vec<u8>> for StatKey {
type Error = error::Error;
fn try_from(bytes: Vec<u8>) -> Result<Self> {
String::from_utf8(bytes)
.context(error::LeaseKeyFromUtf8Snafu {})
.map(|x| x.parse())?
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(transparent)]
pub struct StatValue {
pub stats: Vec<Stat>,
}
impl TryFrom<&StatValue> for Vec<u8> {
type Error = error::Error;
fn try_from(stats: &StatValue) -> Result<Self> {
Ok(serde_json::to_string(stats)
.context(crate::error::SerializeToJsonSnafu {
input: format!("{stats:?}"),
})?
.into_bytes())
}
}
impl FromStr for StatValue {
type Err = error::Error;
fn from_str(value: &str) -> Result<Self> {
serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value })
}
}
impl TryFrom<Vec<u8>> for StatValue {
type Error = error::Error;
fn try_from(value: Vec<u8>) -> Result<Self> {
String::from_utf8(value)
.context(error::LeaseKeyFromUtf8Snafu {})
.map(|x| x.parse())?
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_datanode_lease_key() {
fn test_stat_key_round_trip() {
let key = StatKey {
cluster_id: 0,
node_id: 1,
};
let key_bytes: Vec<u8> = key.try_into().unwrap();
let new_key: StatKey = key_bytes.try_into().unwrap();
assert_eq!(0, new_key.cluster_id);
assert_eq!(1, new_key.node_id);
}
#[test]
fn test_stat_val_round_trip() {
let stat = Stat {
cluster_id: 0,
id: 101,
is_leader: false,
region_num: 100,
..Default::default()
};
let stat_val = &StatValue { stats: vec![stat] };
let bytes: Vec<u8> = stat_val.try_into().unwrap();
let stat_val: StatValue = bytes.try_into().unwrap();
let stats = stat_val.stats;
assert_eq!(1, stats.len());
let stat = stats.get(0).unwrap();
assert_eq!(0, stat.cluster_id);
assert_eq!(101, stat.id);
assert!(!stat.is_leader);
assert_eq!(100, stat.region_num);
}
#[test]
fn test_lease_key_round_trip() {
let key = LeaseKey {
cluster_id: 0,
node_id: 1,
@@ -187,7 +315,7 @@ mod tests {
}
#[test]
fn test_datanode_lease_value() {
fn test_lease_value_round_trip() {
let value = LeaseValue {
timestamp_millis: 111,
node_addr: "127.0.0.1:3002".to_string(),

View File

@@ -17,7 +17,7 @@ pub mod bootstrap;
pub mod election;
pub mod error;
pub mod handler;
mod keys;
pub mod keys;
pub mod lease;
pub mod metasrv;
#[cfg(feature = "mock")]
@@ -25,6 +25,6 @@ pub mod mocks;
pub mod selector;
mod sequence;
pub mod service;
mod util;
pub mod util;
pub use crate::error::Result;