feat: report per-region metrics on region server (#5893)

* feat: report per-region metrics on region server

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename Change to Ingest

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-04-15 14:45:00 +08:00
committed by GitHub
parent 8d485e9be0
commit 96fbce1797
2 changed files with 37 additions and 20 deletions

View File

@@ -20,13 +20,21 @@ pub const REGION_REQUEST_TYPE: &str = "datanode_region_request_type";
pub const REGION_ROLE: &str = "region_role";
pub const REGION_ID: &str = "region_id";
pub const RESULT_TYPE: &str = "result";
lazy_static! {
/// The elapsed time of handling a request in the region_server.
pub static ref HANDLE_REGION_REQUEST_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_datanode_handle_region_request_elapsed",
"datanode handle region request elapsed",
&[REGION_REQUEST_TYPE]
&[REGION_ID, REGION_REQUEST_TYPE]
)
.unwrap();
/// The number of rows in region request received by region server, labeled with request type.
pub static ref REGION_CHANGED_ROW_COUNT: IntCounterVec = register_int_counter_vec!(
"greptime_datanode_region_changed_row_count",
"datanode region changed row count",
&[REGION_ID, REGION_REQUEST_TYPE]
)
.unwrap();
/// The elapsed time since the last received heartbeat.
@@ -64,7 +72,7 @@ lazy_static! {
pub static ref HEARTBEAT_RECV_COUNT: IntCounterVec = register_int_counter_vec!(
"greptime_datanode_heartbeat_recv_count",
"datanode heartbeat received",
&["result"]
&[RESULT_TYPE]
)
.unwrap();
}

View File

@@ -690,18 +690,20 @@ impl RegionServerInner {
},
None => return Ok(CurrentEngine::EarlyReturn(0)),
},
RegionChange::None | RegionChange::Catchup => match current_region_status {
Some(status) => match status.clone() {
RegionEngineWithStatus::Registering(_) => {
return error::RegionNotReadySnafu { region_id }.fail()
}
RegionEngineWithStatus::Deregistering(_) => {
return error::RegionNotFoundSnafu { region_id }.fail()
}
RegionEngineWithStatus::Ready(engine) => engine,
},
None => return error::RegionNotFoundSnafu { region_id }.fail(),
},
RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
match current_region_status {
Some(status) => match status.clone() {
RegionEngineWithStatus::Registering(_) => {
return error::RegionNotReadySnafu { region_id }.fail()
}
RegionEngineWithStatus::Deregistering(_) => {
return error::RegionNotFoundSnafu { region_id }.fail()
}
RegionEngineWithStatus::Ready(engine) => engine,
},
None => return error::RegionNotFoundSnafu { region_id }.fail(),
}
}
};
Ok(CurrentEngine::Engine(engine))
@@ -885,8 +887,9 @@ impl RegionServerInner {
request: RegionRequest,
) -> Result<RegionResponse> {
let request_type = request.request_type();
let region_id_str = region_id.to_string();
let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
.with_label_values(&[request_type])
.with_label_values(&[&region_id_str, request_type])
.start_timer();
let region_change = match &request {
@@ -899,9 +902,8 @@ impl RegionServerInner {
RegionChange::Register(attribute)
}
RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
RegionRequest::Put(_)
| RegionRequest::Delete(_)
| RegionRequest::Alter(_)
RegionRequest::Put(_) | RegionRequest::Delete(_) => RegionChange::Ingest,
RegionRequest::Alter(_)
| RegionRequest::Flush(_)
| RegionRequest::Compact(_)
| RegionRequest::Truncate(_) => RegionChange::None,
@@ -922,6 +924,12 @@ impl RegionServerInner {
.with_context(|_| HandleRegionRequestSnafu { region_id })
{
Ok(result) => {
// Update metrics
if matches!(region_change, RegionChange::Ingest) {
crate::metrics::REGION_CHANGED_ROW_COUNT
.with_label_values(&[&region_id_str, request_type])
.inc_by(result.affected_rows as u64);
}
// Sets corresponding region status to ready.
self.set_region_status_ready(region_id, engine, region_change)
.await?;
@@ -968,7 +976,7 @@ impl RegionServerInner {
region_change: RegionChange,
) {
match region_change {
RegionChange::None => {}
RegionChange::None | RegionChange::Ingest => {}
RegionChange::Register(_) => {
self.region_map.remove(&region_id);
}
@@ -988,7 +996,7 @@ impl RegionServerInner {
) -> Result<()> {
let engine_type = engine.name();
match region_change {
RegionChange::None => {}
RegionChange::None | RegionChange::Ingest => {}
RegionChange::Register(attribute) => {
info!(
"Region {region_id} is registered to engine {}",
@@ -1129,6 +1137,7 @@ enum RegionChange {
Register(RegionAttribute),
Deregisters,
Catchup,
Ingest,
}
fn is_metric_engine(engine: &str) -> bool {