diff --git a/src/datanode/src/metrics.rs b/src/datanode/src/metrics.rs index d11e8af9fe..12ac482826 100644 --- a/src/datanode/src/metrics.rs +++ b/src/datanode/src/metrics.rs @@ -20,13 +20,21 @@ pub const REGION_REQUEST_TYPE: &str = "datanode_region_request_type"; pub const REGION_ROLE: &str = "region_role"; pub const REGION_ID: &str = "region_id"; +pub const RESULT_TYPE: &str = "result"; lazy_static! { /// The elapsed time of handling a request in the region_server. pub static ref HANDLE_REGION_REQUEST_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_datanode_handle_region_request_elapsed", "datanode handle region request elapsed", - &[REGION_REQUEST_TYPE] + &[REGION_ID, REGION_REQUEST_TYPE] + ) + .unwrap(); + /// The number of rows in region request received by region server, labeled with request type. + pub static ref REGION_CHANGED_ROW_COUNT: IntCounterVec = register_int_counter_vec!( + "greptime_datanode_region_changed_row_count", + "datanode region changed row count", + &[REGION_ID, REGION_REQUEST_TYPE] ) .unwrap(); /// The elapsed time since the last received heartbeat. @@ -64,7 +72,7 @@ lazy_static! { pub static ref HEARTBEAT_RECV_COUNT: IntCounterVec = register_int_counter_vec!( "greptime_datanode_heartbeat_recv_count", "datanode heartbeat received", - &["result"] + &[RESULT_TYPE] ) .unwrap(); } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 14ccfa2816..bff28c109b 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -690,18 +690,20 @@ impl RegionServerInner { }, None => return Ok(CurrentEngine::EarlyReturn(0)), }, - RegionChange::None | RegionChange::Catchup => match current_region_status { - Some(status) => match status.clone() { - RegionEngineWithStatus::Registering(_) => { - return error::RegionNotReadySnafu { region_id }.fail() - } - RegionEngineWithStatus::Deregistering(_) => { - return error::RegionNotFoundSnafu { region_id }.fail() - } - RegionEngineWithStatus::Ready(engine) => engine, - }, - None => return error::RegionNotFoundSnafu { region_id }.fail(), - }, + RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => { + match current_region_status { + Some(status) => match status.clone() { + RegionEngineWithStatus::Registering(_) => { + return error::RegionNotReadySnafu { region_id }.fail() + } + RegionEngineWithStatus::Deregistering(_) => { + return error::RegionNotFoundSnafu { region_id }.fail() + } + RegionEngineWithStatus::Ready(engine) => engine, + }, + None => return error::RegionNotFoundSnafu { region_id }.fail(), + } + } }; Ok(CurrentEngine::Engine(engine)) @@ -885,8 +887,9 @@ impl RegionServerInner { request: RegionRequest, ) -> Result { let request_type = request.request_type(); + let region_id_str = region_id.to_string(); let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED - .with_label_values(&[request_type]) + .with_label_values(&[®ion_id_str, request_type]) .start_timer(); let region_change = match &request { @@ -899,9 +902,8 @@ impl RegionServerInner { RegionChange::Register(attribute) } RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters, - RegionRequest::Put(_) - | RegionRequest::Delete(_) - | RegionRequest::Alter(_) + RegionRequest::Put(_) | RegionRequest::Delete(_) => RegionChange::Ingest, + RegionRequest::Alter(_) | RegionRequest::Flush(_) | RegionRequest::Compact(_) | RegionRequest::Truncate(_) => RegionChange::None, @@ -922,6 +924,12 @@ impl RegionServerInner { .with_context(|_| HandleRegionRequestSnafu { region_id }) { Ok(result) => { + // Update metrics + if matches!(region_change, RegionChange::Ingest) { + crate::metrics::REGION_CHANGED_ROW_COUNT + .with_label_values(&[®ion_id_str, request_type]) + .inc_by(result.affected_rows as u64); + } // Sets corresponding region status to ready. self.set_region_status_ready(region_id, engine, region_change) .await?; @@ -968,7 +976,7 @@ impl RegionServerInner { region_change: RegionChange, ) { match region_change { - RegionChange::None => {} + RegionChange::None | RegionChange::Ingest => {} RegionChange::Register(_) => { self.region_map.remove(®ion_id); } @@ -988,7 +996,7 @@ impl RegionServerInner { ) -> Result<()> { let engine_type = engine.name(); match region_change { - RegionChange::None => {} + RegionChange::None | RegionChange::Ingest => {} RegionChange::Register(attribute) => { info!( "Region {region_id} is registered to engine {}", @@ -1129,6 +1137,7 @@ enum RegionChange { Register(RegionAttribute), Deregisters, Catchup, + Ingest, } fn is_metric_engine(engine: &str) -> bool {