mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
fix(mito): Do not write to memtables if writing wal is failed (#2561)
* feat: add writes total metrics * fix: don't write memtable if write ctx is failed * feat: write rows metrics
This commit is contained in:
@@ -44,4 +44,6 @@ pub const WRITE_REJECT_TOTAL: &str = "mito.write.reject_total";
|
||||
pub const WRITE_STAGE_ELAPSED: &str = "mito.write.stage_elapsed";
|
||||
/// Stage label.
|
||||
pub const STAGE_LABEL: &str = "stage";
|
||||
/// Counter of rows to write.
|
||||
pub const WRITE_ROWS_TOTAL: &str = "mito.write.rows_total";
|
||||
// ------ End of write related metrics
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::mem;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::{Mutation, Rows, WalEntry};
|
||||
use api::v1::{Mutation, OpType, Rows, WalEntry};
|
||||
use common_query::Output;
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::LogStore;
|
||||
@@ -92,6 +92,14 @@ pub(crate) struct RegionWriteCtx {
|
||||
///
|
||||
/// The i-th notify is for i-th mutation.
|
||||
notifiers: Vec<WriteNotify>,
|
||||
/// The write operation is failed and we should not write to the mutable memtable.
|
||||
failed: bool,
|
||||
|
||||
// Metrics:
|
||||
/// Rows to put.
|
||||
pub(crate) put_num: usize,
|
||||
/// Rows to delete.
|
||||
pub(crate) delete_num: usize,
|
||||
}
|
||||
|
||||
impl RegionWriteCtx {
|
||||
@@ -112,6 +120,9 @@ impl RegionWriteCtx {
|
||||
next_entry_id: last_entry_id + 1,
|
||||
wal_entry: WalEntry::default(),
|
||||
notifiers: Vec::new(),
|
||||
failed: false,
|
||||
put_num: 0,
|
||||
delete_num: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -130,6 +141,13 @@ impl RegionWriteCtx {
|
||||
|
||||
// Increase sequence number.
|
||||
self.next_sequence += num_rows as u64;
|
||||
|
||||
// Update metrics.
|
||||
match OpType::from_i32(op_type) {
|
||||
Some(OpType::Delete) => self.delete_num += num_rows,
|
||||
Some(OpType::Put) => self.put_num += num_rows,
|
||||
None => (),
|
||||
}
|
||||
}
|
||||
|
||||
/// Encode and add WAL entry to the writer.
|
||||
@@ -153,6 +171,9 @@ impl RegionWriteCtx {
|
||||
for notify in &mut self.notifiers {
|
||||
notify.err = Some(err.clone());
|
||||
}
|
||||
|
||||
// Fail the whole write operation.
|
||||
self.failed = true;
|
||||
}
|
||||
|
||||
/// Updates next entry id.
|
||||
@@ -164,6 +185,10 @@ impl RegionWriteCtx {
|
||||
pub(crate) fn write_memtable(&mut self) {
|
||||
debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len());
|
||||
|
||||
if self.failed {
|
||||
return;
|
||||
}
|
||||
|
||||
let mutable = &self.version.memtables.mutable;
|
||||
// Takes mutations from the wal entry.
|
||||
let mutations = mem::take(&mut self.wal_entry.mutations);
|
||||
|
||||
@@ -24,7 +24,10 @@ use store_api::metadata::RegionMetadata;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::{RejectWriteSnafu, Result};
|
||||
use crate::metrics::{STAGE_LABEL, WRITE_REJECT_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL};
|
||||
use crate::metrics::{
|
||||
STAGE_LABEL, TYPE_LABEL, WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED,
|
||||
WRITE_STALL_TOTAL,
|
||||
};
|
||||
use crate::region_write_ctx::RegionWriteCtx;
|
||||
use crate::request::{SenderWriteRequest, WriteRequest};
|
||||
use crate::worker::RegionWorkerLoop;
|
||||
@@ -80,13 +83,19 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
}
|
||||
}
|
||||
|
||||
let (mut put_rows, mut delete_rows) = (0, 0);
|
||||
// Write to memtables.
|
||||
{
|
||||
let _timer = timer!(WRITE_STAGE_ELAPSED, &[(STAGE_LABEL, "write_memtable")]);
|
||||
for mut region_ctx in region_ctxs.into_values() {
|
||||
region_ctx.write_memtable();
|
||||
put_rows += region_ctx.put_num;
|
||||
delete_rows += region_ctx.delete_num;
|
||||
}
|
||||
}
|
||||
|
||||
counter!(WRITE_ROWS_TOTAL, put_rows as u64, TYPE_LABEL => "put");
|
||||
counter!(WRITE_ROWS_TOTAL, delete_rows as u64, TYPE_LABEL => "delete");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -30,7 +30,6 @@ use crate::storage::{ColumnId, RegionId, ScanRequest};
|
||||
|
||||
#[derive(Debug, IntoStaticStr)]
|
||||
pub enum RegionRequest {
|
||||
// TODO: rename to InsertRequest
|
||||
Put(RegionPutRequest),
|
||||
Delete(RegionDeleteRequest),
|
||||
Create(RegionCreateRequest),
|
||||
|
||||
Reference in New Issue
Block a user