diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index bd5920b3cb..855ba9ddf6 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -44,4 +44,6 @@ pub const WRITE_REJECT_TOTAL: &str = "mito.write.reject_total"; pub const WRITE_STAGE_ELAPSED: &str = "mito.write.stage_elapsed"; /// Stage label. pub const STAGE_LABEL: &str = "stage"; +/// Counter of rows to write. +pub const WRITE_ROWS_TOTAL: &str = "mito.write.rows_total"; // ------ End of write related metrics diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 8e83a6442a..5270ceec32 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -15,7 +15,7 @@ use std::mem; use std::sync::Arc; -use api::v1::{Mutation, Rows, WalEntry}; +use api::v1::{Mutation, OpType, Rows, WalEntry}; use common_query::Output; use snafu::ResultExt; use store_api::logstore::LogStore; @@ -92,6 +92,14 @@ pub(crate) struct RegionWriteCtx { /// /// The i-th notify is for i-th mutation. notifiers: Vec, + /// The write operation is failed and we should not write to the mutable memtable. + failed: bool, + + // Metrics: + /// Rows to put. + pub(crate) put_num: usize, + /// Rows to delete. + pub(crate) delete_num: usize, } impl RegionWriteCtx { @@ -112,6 +120,9 @@ impl RegionWriteCtx { next_entry_id: last_entry_id + 1, wal_entry: WalEntry::default(), notifiers: Vec::new(), + failed: false, + put_num: 0, + delete_num: 0, } } @@ -130,6 +141,13 @@ impl RegionWriteCtx { // Increase sequence number. self.next_sequence += num_rows as u64; + + // Update metrics. + match OpType::from_i32(op_type) { + Some(OpType::Delete) => self.delete_num += num_rows, + Some(OpType::Put) => self.put_num += num_rows, + None => (), + } } /// Encode and add WAL entry to the writer. @@ -153,6 +171,9 @@ impl RegionWriteCtx { for notify in &mut self.notifiers { notify.err = Some(err.clone()); } + + // Fail the whole write operation. + self.failed = true; } /// Updates next entry id. @@ -164,6 +185,10 @@ impl RegionWriteCtx { pub(crate) fn write_memtable(&mut self) { debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len()); + if self.failed { + return; + } + let mutable = &self.version.memtables.mutable; // Takes mutations from the wal entry. let mutations = mem::take(&mut self.wal_entry.mutations); diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index de6b6a5c5e..873ef726fb 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -24,7 +24,10 @@ use store_api::metadata::RegionMetadata; use store_api::storage::RegionId; use crate::error::{RejectWriteSnafu, Result}; -use crate::metrics::{STAGE_LABEL, WRITE_REJECT_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL}; +use crate::metrics::{ + STAGE_LABEL, TYPE_LABEL, WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, + WRITE_STALL_TOTAL, +}; use crate::region_write_ctx::RegionWriteCtx; use crate::request::{SenderWriteRequest, WriteRequest}; use crate::worker::RegionWorkerLoop; @@ -80,13 +83,19 @@ impl RegionWorkerLoop { } } + let (mut put_rows, mut delete_rows) = (0, 0); // Write to memtables. { let _timer = timer!(WRITE_STAGE_ELAPSED, &[(STAGE_LABEL, "write_memtable")]); for mut region_ctx in region_ctxs.into_values() { region_ctx.write_memtable(); + put_rows += region_ctx.put_num; + delete_rows += region_ctx.delete_num; } } + + counter!(WRITE_ROWS_TOTAL, put_rows as u64, TYPE_LABEL => "put"); + counter!(WRITE_ROWS_TOTAL, delete_rows as u64, TYPE_LABEL => "delete"); } } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 9d608f2077..04251b81ae 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -30,7 +30,6 @@ use crate::storage::{ColumnId, RegionId, ScanRequest}; #[derive(Debug, IntoStaticStr)] pub enum RegionRequest { - // TODO: rename to InsertRequest Put(RegionPutRequest), Delete(RegionDeleteRequest), Create(RegionCreateRequest),