refactor: entry id usage

This commit is contained in:
niebayes
2023-12-24 00:24:16 +08:00
parent 60087341a4
commit aa9a2f2db5
6 changed files with 55 additions and 20 deletions

View File

@@ -66,14 +66,13 @@ impl LogStore for NoopLogStore {
async fn append(&self, mut _e: Self::Entry) -> Result<AppendResponse> {
Ok(AppendResponse {
entry_id: 0,
offset: None,
last_entry_id: Default::default(),
})
}
async fn append_batch(&self, _e: Vec<Self::Entry>) -> Result<AppendBatchResponse> {
Ok(AppendBatchResponse {
offsets: HashMap::new(),
last_entry_ids: HashMap::new(),
})
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
@@ -179,8 +180,7 @@ impl LogStore for RaftEngineLogStore {
.write(&mut batch, self.config.sync_write)
.context(RaftEngineSnafu)?;
Ok(AppendResponse {
entry_id,
offset: None,
last_entry_id: entry_id,
})
}
@@ -192,11 +192,18 @@ impl LogStore for RaftEngineLogStore {
return Ok(AppendBatchResponse::default());
}
// Records the last entry id for each region's entries.
let mut last_entry_ids = HashMap::with_capacity(entries.len());
let mut batch = LogBatch::with_capacity(entries.len());
for e in entries {
self.check_entry(&e)?;
// For raft-engine log store, the namespace id is the region id.
let ns_id = e.namespace_id;
last_entry_ids
.entry(ns_id)
.and_modify(|x: &mut u64| *x = (*x).max(e.id))
.or_insert(e.id);
batch
.add_entries::<MessageType>(ns_id, &[e])
.context(AddEntryLogBatchSnafu)?;
@@ -207,8 +214,7 @@ impl LogStore for RaftEngineLogStore {
.write(&mut batch, self.config.sync_write)
.context(RaftEngineSnafu)?;
// The user of raft-engine log store does not care about the response.
Ok(AppendBatchResponse::default())
Ok(AppendBatchResponse { last_entry_ids })
}
/// Create a stream of entries from logstore in the given namespace. The end of stream is
@@ -452,7 +458,7 @@ mod tests {
))
.await
.unwrap();
assert_eq!(i, response.entry_id);
assert_eq!(i, response.last_entry_id);
}
let mut entries = HashSet::with_capacity(1024);
let mut s = logstore.read(&Namespace::with_id(1), 0).await.unwrap();

View File

@@ -416,6 +416,12 @@ pub enum Error {
error: ArrowError,
location: Location,
},
#[snafu(display("Missing the last entry id for region {}", region_id))]
MissingLastEntryId {
region_id: RegionId,
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -453,7 +459,8 @@ impl ErrorExt for Error {
| RegionCorrupted { .. }
| CreateDefault { .. }
| InvalidParquet { .. }
| UnexpectedReplay { .. } => StatusCode::Unexpected,
| UnexpectedReplay { .. }
| MissingLastEntryId { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }
| InvalidScanIndex { .. }

View File

@@ -168,6 +168,7 @@ impl RegionWriteCtx {
&self.wal_options,
)?;
// We only call this method one time, but we still bump next entry id for consistency.
// Warning: this update will be ignored since the `set_next_entry_id` is called by the writer explicitly.
self.next_entry_id += 1;
Ok(())
}

View File

@@ -27,7 +27,7 @@ use futures::StreamExt;
use prost::Message;
use snafu::ResultExt;
use store_api::logstore::entry::Entry;
use store_api::logstore::LogStore;
use store_api::logstore::{AppendBatchResponse, LogStore};
use store_api::storage::RegionId;
use crate::error::{
@@ -165,8 +165,7 @@ impl<S: LogStore> WalWriter<S> {
}
/// Write all buffered entries to the WAL.
// TODO(niebayes): returns an `AppendBatchResponse` and handle it properly.
pub async fn write_to_wal(&mut self) -> Result<()> {
pub async fn write_to_wal(&mut self) -> Result<AppendBatchResponse> {
// TODO(yingwen): metrics.
let entries = mem::take(&mut self.entries);
@@ -175,7 +174,6 @@ impl<S: LogStore> WalWriter<S> {
.await
.map_err(BoxedError::new)
.context(WriteWalSnafu)
.map(|_| ())
}
}

View File

@@ -17,11 +17,11 @@
use std::collections::{hash_map, HashMap};
use std::sync::Arc;
use store_api::logstore::LogStore;
use store_api::logstore::{AppendBatchResponse, LogStore};
use store_api::metadata::RegionMetadata;
use store_api::storage::RegionId;
use crate::error::{RejectWriteSnafu, Result};
use crate::error::{MissingLastEntryIdSnafu, RejectWriteSnafu, Result};
use crate::metrics::{
WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL,
};
@@ -73,12 +73,17 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_ctx.set_error(e);
}
}
if let Err(e) = wal_writer.write_to_wal().await.map_err(Arc::new) {
// Failed to write wal.
for mut region_ctx in region_ctxs.into_values() {
region_ctx.set_error(e.clone());
match wal_writer.write_to_wal().await.map_err(Arc::new) {
Ok(response) => {
update_next_entry_ids(&mut region_ctxs, &response);
}
Err(e) => {
// Failed to write wal.
for mut region_ctx in region_ctxs.into_values() {
region_ctx.set_error(e.clone());
}
return;
}
return;
}
}
@@ -202,3 +207,22 @@ fn maybe_fill_missing_columns(request: &mut WriteRequest, metadata: &RegionMetad
Ok(())
}
/// Updates the next entry id for each region with the returned last entry id contained in the response.
fn update_next_entry_ids(
region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
response: &AppendBatchResponse,
) {
for (region_id, region_ctx) in region_ctxs.iter_mut() {
// Missing a last entry id is not deemed as a critical error and hence no need to abort writing to memtable.
let Some(last_entry_id) = response.last_entry_ids.get(&region_id.as_u64()) else {
let e = MissingLastEntryIdSnafu {
region_id: *region_id,
}
.build();
region_ctx.set_error(Arc::new(e));
continue;
};
region_ctx.set_next_entry_id(last_entry_id + 1);
}
}