From 97cfa3d6c97f4a4ad3413a20e29b5f0859e304fc Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Fri, 14 Jul 2023 17:57:17 +0800 Subject: [PATCH] feat: support append entries from multiple regions at a time (#1959) * feat: support append entries from multiple regions at a time * chore: add some tests * fix: false postive mutable_key warning * fix: append_batch api * fix: remove unused clippy allows --- src/log-store/src/noop.rs | 11 +- src/log-store/src/raft_engine.rs | 3 + src/log-store/src/raft_engine/log_store.rs | 151 ++++++++++++++------- src/store-api/src/logstore.rs | 8 +- src/store-api/src/logstore/namespace.rs | 2 +- 5 files changed, 113 insertions(+), 62 deletions(-) diff --git a/src/log-store/src/noop.rs b/src/log-store/src/noop.rs index 7d286ed08e..330911e70a 100644 --- a/src/log-store/src/noop.rs +++ b/src/log-store/src/noop.rs @@ -25,7 +25,7 @@ pub struct NoopLogStore; #[derive(Debug, Default, Clone, PartialEq)] pub struct EntryImpl; -#[derive(Debug, Clone, Default, Hash, PartialEq)] +#[derive(Debug, Clone, Default, Eq, PartialEq, Hash)] pub struct NamespaceImpl; impl Namespace for NamespaceImpl { @@ -65,8 +65,8 @@ impl LogStore for NoopLogStore { Ok(AppendResponse { entry_id: 0 }) } - async fn append_batch(&self, _ns: &Self::Namespace, _e: Vec) -> Result> { - Ok(vec![]) + async fn append_batch(&self, _e: Vec) -> Result<()> { + Ok(()) } async fn read( @@ -131,10 +131,7 @@ mod tests { let store = NoopLogStore::default(); let e = store.entry("".as_bytes(), 1, NamespaceImpl::default()); let _ = store.append(e.clone()).await.unwrap(); - assert!(store - .append_batch(&NamespaceImpl::default(), vec![e]) - .await - .is_ok()); + assert!(store.append_batch(vec![e]).await.is_ok()); store .create_namespace(&NamespaceImpl::default()) .await diff --git a/src/log-store/src/raft_engine.rs b/src/log-store/src/raft_engine.rs index 104540066a..b053c491ba 100644 --- a/src/log-store/src/raft_engine.rs +++ b/src/log-store/src/raft_engine.rs @@ -36,6 +36,7 @@ impl EntryImpl { } } } + impl NamespaceImpl { pub fn with_id(id: Id) -> Self { Self { @@ -52,6 +53,8 @@ impl Hash for NamespaceImpl { } } +impl Eq for NamespaceImpl {} + impl Namespace for NamespaceImpl { fn id(&self) -> store_api::logstore::namespace::Id { self.id diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index cfa434d7ac..af71baa26a 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -20,7 +20,7 @@ use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::{error, info}; use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode}; use snafu::{ensure, ResultExt}; -use store_api::logstore::entry::Id; +use store_api::logstore::entry::{Entry, Id}; use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::Namespace as NamespaceTrait; use store_api::logstore::{AppendResponse, LogStore}; @@ -29,9 +29,9 @@ use crate::config::LogConfig; use crate::error; use crate::error::{ AddEntryLogBatchSnafu, Error, FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu, - RaftEngineSnafu, StartGcTaskSnafu, StopGcTaskSnafu, + OverrideCompactedEntrySnafu, RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu, }; -use crate::raft_engine::protos::logstore::{EntryImpl as Entry, NamespaceImpl as Namespace}; +use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl as Namespace}; const NAMESPACE_PREFIX: &str = "__sys_namespace_"; const SYSTEM_NAMESPACE: u64 = 0; @@ -52,7 +52,7 @@ impl TaskFunction for PurgeExpiredFilesFunction { "RaftEngineLogStore-gc-task" } - async fn call(&mut self) -> Result<(), Error> { + async fn call(&mut self) -> Result<()> { match self.engine.purge_expired_files().context(RaftEngineSnafu) { Ok(res) => { // TODO(hl): the retval of purge_expired_files indicates the namespaces need to be compact, @@ -72,8 +72,7 @@ impl TaskFunction for PurgeExpiredFilesFunction { } impl RaftEngineLogStore { - pub async fn try_new(config: LogConfig) -> Result { - // TODO(hl): set according to available disk space + pub async fn try_new(config: LogConfig) -> Result { let raft_engine_config = Config { dir: config.log_file_dir.clone(), purge_threshold: ReadableSize(config.purge_threshold), @@ -103,7 +102,7 @@ impl RaftEngineLogStore { self.gc_task.started() } - fn start(&self) -> Result<(), Error> { + fn start(&self) -> Result<()> { self.gc_task .start(common_runtime::bg_runtime()) .context(StartGcTaskSnafu) @@ -115,6 +114,24 @@ impl RaftEngineLogStore { self.engine.last_index(namespace.id()), ) } + + /// Checks if entry does not override the min index of namespace. + fn check_entry(&self, e: &EntryImpl) -> Result<()> { + if cfg!(debug_assertions) { + let ns_id = e.namespace_id; + if let Some(first_index) = self.engine.first_index(ns_id) { + ensure!( + e.id() >= first_index, + OverrideCompactedEntrySnafu { + namespace: ns_id, + first_index, + attempt_index: e.id(), + } + ); + } + } + Ok(()) + } } impl Debug for RaftEngineLogStore { @@ -130,14 +147,14 @@ impl Debug for RaftEngineLogStore { impl LogStore for RaftEngineLogStore { type Error = Error; type Namespace = Namespace; - type Entry = Entry; + type Entry = EntryImpl; - async fn stop(&self) -> Result<(), Self::Error> { + async fn stop(&self) -> Result<()> { self.gc_task.stop().await.context(StopGcTaskSnafu) } /// Append an entry to logstore. Currently of existence of entry's namespace is not checked. - async fn append(&self, e: Self::Entry) -> Result { + async fn append(&self, e: Self::Entry) -> Result { ensure!(self.started(), IllegalStateSnafu); let entry_id = e.id; let namespace_id = e.namespace_id; @@ -166,49 +183,27 @@ impl LogStore for RaftEngineLogStore { /// Append a batch of entries to logstore. `RaftEngineLogStore` assures the atomicity of /// batch append. - async fn append_batch( - &self, - ns: &Self::Namespace, - entries: Vec, - ) -> Result, Self::Error> { + async fn append_batch(&self, entries: Vec) -> Result<()> { ensure!(self.started(), IllegalStateSnafu); if entries.is_empty() { - return Ok(vec![]); + return Ok(()); } - let mut min_entry_id = u64::MAX; - let entry_ids = entries - .iter() - .map(|e| { - let id = e.get_id(); - if id < min_entry_id { - min_entry_id = id; - } - id - }) - .collect::>(); - let mut batch = LogBatch::with_capacity(entries.len()); - batch - .add_entries::(ns.id, &entries) - .context(AddEntryLogBatchSnafu)?; - if let Some(first_index) = self.engine.first_index(ns.id) { - ensure!( - min_entry_id >= first_index, - error::OverrideCompactedEntrySnafu { - namespace: ns.id, - first_index, - attempt_index: min_entry_id, - } - ); + for e in entries { + self.check_entry(&e)?; + let ns_id = e.namespace_id; + batch + .add_entries::(ns_id, &[e]) + .context(AddEntryLogBatchSnafu)?; } let _ = self .engine .write(&mut batch, self.config.sync_write) .context(RaftEngineSnafu)?; - Ok(entry_ids) + Ok(()) } /// Create a stream of entries from logstore in the given namespace. The end of stream is @@ -217,7 +212,7 @@ impl LogStore for RaftEngineLogStore { &self, ns: &Self::Namespace, id: Id, - ) -> Result, Self::Error> { + ) -> Result> { ensure!(self.started(), IllegalStateSnafu); let engine = self.engine.clone(); @@ -275,7 +270,7 @@ impl LogStore for RaftEngineLogStore { Ok(Box::pin(s)) } - async fn create_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error> { + async fn create_namespace(&self, ns: &Self::Namespace) -> Result<()> { ensure!( ns.id != SYSTEM_NAMESPACE, IllegalNamespaceSnafu { ns: ns.id } @@ -293,7 +288,7 @@ impl LogStore for RaftEngineLogStore { Ok(()) } - async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error> { + async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<()> { ensure!( ns.id != SYSTEM_NAMESPACE, IllegalNamespaceSnafu { ns: ns.id } @@ -309,7 +304,7 @@ impl LogStore for RaftEngineLogStore { Ok(()) } - async fn list_namespaces(&self) -> Result, Self::Error> { + async fn list_namespaces(&self) -> Result> { ensure!(self.started(), IllegalStateSnafu); let mut namespaces: Vec = vec![]; self.engine @@ -328,7 +323,7 @@ impl LogStore for RaftEngineLogStore { } fn entry>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry { - Entry { + EntryImpl { id, data: data.as_ref().to_vec(), namespace_id: ns.id(), @@ -343,7 +338,7 @@ impl LogStore for RaftEngineLogStore { } } - async fn obsolete(&self, namespace: Self::Namespace, id: Id) -> Result<(), Self::Error> { + async fn obsolete(&self, namespace: Self::Namespace, id: Id) -> Result<()> { ensure!(self.started(), IllegalStateSnafu); let obsoleted = self.engine.compact_to(namespace.id(), id + 1); info!( @@ -361,7 +356,7 @@ impl LogStore for RaftEngineLogStore { struct MessageType; impl MessageExt for MessageType { - type Entry = Entry; + type Entry = EntryImpl; fn index(e: &Self::Entry) -> u64 { e.id @@ -580,4 +575,64 @@ mod tests { vec.sort_by(|a, b| a.id.partial_cmp(&b.id).unwrap()); assert_eq!(101, vec.first().unwrap().id); } + + #[tokio::test] + async fn test_append_batch() { + common_telemetry::init_default_ut_logging(); + let dir = create_temp_dir("logstore-append-batch-test"); + + let config = LogConfig { + log_file_dir: dir.path().to_str().unwrap().to_string(), + file_size: ReadableSize::mb(2).0, + purge_threshold: ReadableSize::mb(4).0, + purge_interval: Duration::from_secs(5), + ..Default::default() + }; + + let logstore = RaftEngineLogStore::try_new(config).await.unwrap(); + + let entries = (0..8) + .flat_map(|ns_id| { + let data = [ns_id as u8].repeat(4096); + (0..16).map(move |idx| Entry::create(idx, ns_id, data.clone())) + }) + .collect(); + + logstore.append_batch(entries).await.unwrap(); + for ns_id in 0..8 { + let namespace = Namespace::with_id(ns_id); + let (first, last) = logstore.span(&namespace); + assert_eq!(0, first.unwrap()); + assert_eq!(15, last.unwrap()); + } + } + + #[tokio::test] + async fn test_append_batch_interleaved() { + common_telemetry::init_default_ut_logging(); + let dir = create_temp_dir("logstore-append-batch-test"); + + let config = LogConfig { + log_file_dir: dir.path().to_str().unwrap().to_string(), + file_size: ReadableSize::mb(2).0, + purge_threshold: ReadableSize::mb(4).0, + purge_interval: Duration::from_secs(5), + ..Default::default() + }; + + let logstore = RaftEngineLogStore::try_new(config).await.unwrap(); + + let entries = vec![ + Entry::create(0, 0, [b'0'; 4096].to_vec()), + Entry::create(1, 0, [b'0'; 4096].to_vec()), + Entry::create(0, 1, [b'1'; 4096].to_vec()), + Entry::create(2, 0, [b'0'; 4096].to_vec()), + Entry::create(1, 1, [b'1'; 4096].to_vec()), + ]; + + logstore.append_batch(entries).await.unwrap(); + + assert_eq!((Some(0), Some(2)), logstore.span(&Namespace::with_id(0))); + assert_eq!((Some(0), Some(1)), logstore.span(&Namespace::with_id(1))); + } } diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index f00914015c..975715074c 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -36,14 +36,10 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { /// Append an `Entry` to WAL with given namespace and return append response containing /// the entry id. - async fn append(&self, mut e: Self::Entry) -> Result; + async fn append(&self, e: Self::Entry) -> Result; /// Append a batch of entries atomically and return the offset of first entry. - async fn append_batch( - &self, - ns: &Self::Namespace, - e: Vec, - ) -> Result, Self::Error>; + async fn append_batch(&self, e: Vec) -> Result<(), Self::Error>; /// Create a new `EntryStream` to asynchronously generates `Entry` with ids /// starting from `id`. diff --git a/src/store-api/src/logstore/namespace.rs b/src/store-api/src/logstore/namespace.rs index 8cecd47588..35a136d809 100644 --- a/src/store-api/src/logstore/namespace.rs +++ b/src/store-api/src/logstore/namespace.rs @@ -16,6 +16,6 @@ use std::hash::Hash; pub type Id = u64; -pub trait Namespace: Send + Sync + Clone + std::fmt::Debug + Hash + PartialEq { +pub trait Namespace: Send + Sync + Clone + std::fmt::Debug + Hash + PartialEq + Eq { fn id(&self) -> Id; }