From aa9a2f2db5e5b5769f8b34a096be145e9b0ed5d4 Mon Sep 17 00:00:00 2001 From: niebayes Date: Sun, 24 Dec 2023 00:24:16 +0800 Subject: [PATCH] refactor: entry id usage --- src/log-store/src/noop.rs | 5 ++- src/log-store/src/raft_engine/log_store.rs | 16 ++++++--- src/mito2/src/error.rs | 9 ++++- src/mito2/src/region_write_ctx.rs | 1 + src/mito2/src/wal.rs | 6 ++-- src/mito2/src/worker/handle_write.rs | 38 ++++++++++++++++++---- 6 files changed, 55 insertions(+), 20 deletions(-) diff --git a/src/log-store/src/noop.rs b/src/log-store/src/noop.rs index 1929e59a23..694641156f 100644 --- a/src/log-store/src/noop.rs +++ b/src/log-store/src/noop.rs @@ -66,14 +66,13 @@ impl LogStore for NoopLogStore { async fn append(&self, mut _e: Self::Entry) -> Result { Ok(AppendResponse { - entry_id: 0, - offset: None, + last_entry_id: Default::default(), }) } async fn append_batch(&self, _e: Vec) -> Result { Ok(AppendBatchResponse { - offsets: HashMap::new(), + last_entry_ids: HashMap::new(), }) } diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index eb14bf0cf9..7588a63096 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -179,8 +180,7 @@ impl LogStore for RaftEngineLogStore { .write(&mut batch, self.config.sync_write) .context(RaftEngineSnafu)?; Ok(AppendResponse { - entry_id, - offset: None, + last_entry_id: entry_id, }) } @@ -192,11 +192,18 @@ impl LogStore for RaftEngineLogStore { return Ok(AppendBatchResponse::default()); } + // Records the last entry id for each region's entries. + let mut last_entry_ids = HashMap::with_capacity(entries.len()); let mut batch = LogBatch::with_capacity(entries.len()); for e in entries { self.check_entry(&e)?; + // For raft-engine log store, the namespace id is the region id. let ns_id = e.namespace_id; + last_entry_ids + .entry(ns_id) + .and_modify(|x: &mut u64| *x = (*x).max(e.id)) + .or_insert(e.id); batch .add_entries::(ns_id, &[e]) .context(AddEntryLogBatchSnafu)?; @@ -207,8 +214,7 @@ impl LogStore for RaftEngineLogStore { .write(&mut batch, self.config.sync_write) .context(RaftEngineSnafu)?; - // The user of raft-engine log store does not care about the response. - Ok(AppendBatchResponse::default()) + Ok(AppendBatchResponse { last_entry_ids }) } /// Create a stream of entries from logstore in the given namespace. The end of stream is @@ -452,7 +458,7 @@ mod tests { )) .await .unwrap(); - assert_eq!(i, response.entry_id); + assert_eq!(i, response.last_entry_id); } let mut entries = HashSet::with_capacity(1024); let mut s = logstore.read(&Namespace::with_id(1), 0).await.unwrap(); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index b1d48f8c65..3e223d88e8 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -416,6 +416,12 @@ pub enum Error { error: ArrowError, location: Location, }, + + #[snafu(display("Missing the last entry id for region {}", region_id))] + MissingLastEntryId { + region_id: RegionId, + location: Location, + }, } pub type Result = std::result::Result; @@ -453,7 +459,8 @@ impl ErrorExt for Error { | RegionCorrupted { .. } | CreateDefault { .. } | InvalidParquet { .. } - | UnexpectedReplay { .. } => StatusCode::Unexpected, + | UnexpectedReplay { .. } + | MissingLastEntryId { .. } => StatusCode::Unexpected, RegionNotFound { .. } => StatusCode::RegionNotFound, ObjectStoreNotFound { .. } | InvalidScanIndex { .. } diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 8a6decefb4..82a7117f49 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -168,6 +168,7 @@ impl RegionWriteCtx { &self.wal_options, )?; // We only call this method one time, but we still bump next entry id for consistency. + // Warning: this update will be ignored since the `set_next_entry_id` is called by the writer explicitly. self.next_entry_id += 1; Ok(()) } diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index 3bbfefe96b..ac17d3df54 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -27,7 +27,7 @@ use futures::StreamExt; use prost::Message; use snafu::ResultExt; use store_api::logstore::entry::Entry; -use store_api::logstore::LogStore; +use store_api::logstore::{AppendBatchResponse, LogStore}; use store_api::storage::RegionId; use crate::error::{ @@ -165,8 +165,7 @@ impl WalWriter { } /// Write all buffered entries to the WAL. - // TODO(niebayes): returns an `AppendBatchResponse` and handle it properly. - pub async fn write_to_wal(&mut self) -> Result<()> { + pub async fn write_to_wal(&mut self) -> Result { // TODO(yingwen): metrics. let entries = mem::take(&mut self.entries); @@ -175,7 +174,6 @@ impl WalWriter { .await .map_err(BoxedError::new) .context(WriteWalSnafu) - .map(|_| ()) } } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 97a481d7d4..f1ac4af11a 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -17,11 +17,11 @@ use std::collections::{hash_map, HashMap}; use std::sync::Arc; -use store_api::logstore::LogStore; +use store_api::logstore::{AppendBatchResponse, LogStore}; use store_api::metadata::RegionMetadata; use store_api::storage::RegionId; -use crate::error::{RejectWriteSnafu, Result}; +use crate::error::{MissingLastEntryIdSnafu, RejectWriteSnafu, Result}; use crate::metrics::{ WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL, }; @@ -73,12 +73,17 @@ impl RegionWorkerLoop { region_ctx.set_error(e); } } - if let Err(e) = wal_writer.write_to_wal().await.map_err(Arc::new) { - // Failed to write wal. - for mut region_ctx in region_ctxs.into_values() { - region_ctx.set_error(e.clone()); + match wal_writer.write_to_wal().await.map_err(Arc::new) { + Ok(response) => { + update_next_entry_ids(&mut region_ctxs, &response); + } + Err(e) => { + // Failed to write wal. + for mut region_ctx in region_ctxs.into_values() { + region_ctx.set_error(e.clone()); + } + return; } - return; } } @@ -202,3 +207,22 @@ fn maybe_fill_missing_columns(request: &mut WriteRequest, metadata: &RegionMetad Ok(()) } + +/// Updates the next entry id for each region with the returned last entry id contained in the response. +fn update_next_entry_ids( + region_ctxs: &mut HashMap, + response: &AppendBatchResponse, +) { + for (region_id, region_ctx) in region_ctxs.iter_mut() { + // Missing a last entry id is not deemed as a critical error and hence no need to abort writing to memtable. + let Some(last_entry_id) = response.last_entry_ids.get(®ion_id.as_u64()) else { + let e = MissingLastEntryIdSnafu { + region_id: *region_id, + } + .build(); + region_ctx.set_error(Arc::new(e)); + continue; + }; + region_ctx.set_next_entry_id(last_entry_id + 1); + } +}