feat: support append entries from multiple regions at a time (#1959)

* feat: support append entries from multiple regions at a time

* chore: add some tests

* fix: false postive mutable_key warning

* fix: append_batch api

* fix: remove unused clippy allows
This commit is contained in:
Lei, HUANG
2023-07-14 17:57:17 +08:00
committed by GitHub
parent ef7c5dd311
commit 97cfa3d6c9
5 changed files with 113 additions and 62 deletions

View File

@@ -25,7 +25,7 @@ pub struct NoopLogStore;
#[derive(Debug, Default, Clone, PartialEq)]
pub struct EntryImpl;
#[derive(Debug, Clone, Default, Hash, PartialEq)]
#[derive(Debug, Clone, Default, Eq, PartialEq, Hash)]
pub struct NamespaceImpl;
impl Namespace for NamespaceImpl {
@@ -65,8 +65,8 @@ impl LogStore for NoopLogStore {
Ok(AppendResponse { entry_id: 0 })
}
async fn append_batch(&self, _ns: &Self::Namespace, _e: Vec<Self::Entry>) -> Result<Vec<Id>> {
Ok(vec![])
async fn append_batch(&self, _e: Vec<Self::Entry>) -> Result<()> {
Ok(())
}
async fn read(
@@ -131,10 +131,7 @@ mod tests {
let store = NoopLogStore::default();
let e = store.entry("".as_bytes(), 1, NamespaceImpl::default());
let _ = store.append(e.clone()).await.unwrap();
assert!(store
.append_batch(&NamespaceImpl::default(), vec![e])
.await
.is_ok());
assert!(store.append_batch(vec![e]).await.is_ok());
store
.create_namespace(&NamespaceImpl::default())
.await

View File

@@ -36,6 +36,7 @@ impl EntryImpl {
}
}
}
impl NamespaceImpl {
pub fn with_id(id: Id) -> Self {
Self {
@@ -52,6 +53,8 @@ impl Hash for NamespaceImpl {
}
}
impl Eq for NamespaceImpl {}
impl Namespace for NamespaceImpl {
fn id(&self) -> store_api::logstore::namespace::Id {
self.id

View File

@@ -20,7 +20,7 @@ use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::{error, info};
use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode};
use snafu::{ensure, ResultExt};
use store_api::logstore::entry::Id;
use store_api::logstore::entry::{Entry, Id};
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::Namespace as NamespaceTrait;
use store_api::logstore::{AppendResponse, LogStore};
@@ -29,9 +29,9 @@ use crate::config::LogConfig;
use crate::error;
use crate::error::{
AddEntryLogBatchSnafu, Error, FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu,
RaftEngineSnafu, StartGcTaskSnafu, StopGcTaskSnafu,
OverrideCompactedEntrySnafu, RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu,
};
use crate::raft_engine::protos::logstore::{EntryImpl as Entry, NamespaceImpl as Namespace};
use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl as Namespace};
const NAMESPACE_PREFIX: &str = "__sys_namespace_";
const SYSTEM_NAMESPACE: u64 = 0;
@@ -52,7 +52,7 @@ impl TaskFunction<Error> for PurgeExpiredFilesFunction {
"RaftEngineLogStore-gc-task"
}
async fn call(&mut self) -> Result<(), Error> {
async fn call(&mut self) -> Result<()> {
match self.engine.purge_expired_files().context(RaftEngineSnafu) {
Ok(res) => {
// TODO(hl): the retval of purge_expired_files indicates the namespaces need to be compact,
@@ -72,8 +72,7 @@ impl TaskFunction<Error> for PurgeExpiredFilesFunction {
}
impl RaftEngineLogStore {
pub async fn try_new(config: LogConfig) -> Result<Self, Error> {
// TODO(hl): set according to available disk space
pub async fn try_new(config: LogConfig) -> Result<Self> {
let raft_engine_config = Config {
dir: config.log_file_dir.clone(),
purge_threshold: ReadableSize(config.purge_threshold),
@@ -103,7 +102,7 @@ impl RaftEngineLogStore {
self.gc_task.started()
}
fn start(&self) -> Result<(), Error> {
fn start(&self) -> Result<()> {
self.gc_task
.start(common_runtime::bg_runtime())
.context(StartGcTaskSnafu)
@@ -115,6 +114,24 @@ impl RaftEngineLogStore {
self.engine.last_index(namespace.id()),
)
}
/// Checks if entry does not override the min index of namespace.
fn check_entry(&self, e: &EntryImpl) -> Result<()> {
if cfg!(debug_assertions) {
let ns_id = e.namespace_id;
if let Some(first_index) = self.engine.first_index(ns_id) {
ensure!(
e.id() >= first_index,
OverrideCompactedEntrySnafu {
namespace: ns_id,
first_index,
attempt_index: e.id(),
}
);
}
}
Ok(())
}
}
impl Debug for RaftEngineLogStore {
@@ -130,14 +147,14 @@ impl Debug for RaftEngineLogStore {
impl LogStore for RaftEngineLogStore {
type Error = Error;
type Namespace = Namespace;
type Entry = Entry;
type Entry = EntryImpl;
async fn stop(&self) -> Result<(), Self::Error> {
async fn stop(&self) -> Result<()> {
self.gc_task.stop().await.context(StopGcTaskSnafu)
}
/// Append an entry to logstore. Currently of existence of entry's namespace is not checked.
async fn append(&self, e: Self::Entry) -> Result<AppendResponse, Self::Error> {
async fn append(&self, e: Self::Entry) -> Result<AppendResponse> {
ensure!(self.started(), IllegalStateSnafu);
let entry_id = e.id;
let namespace_id = e.namespace_id;
@@ -166,49 +183,27 @@ impl LogStore for RaftEngineLogStore {
/// Append a batch of entries to logstore. `RaftEngineLogStore` assures the atomicity of
/// batch append.
async fn append_batch(
&self,
ns: &Self::Namespace,
entries: Vec<Self::Entry>,
) -> Result<Vec<Id>, Self::Error> {
async fn append_batch(&self, entries: Vec<Self::Entry>) -> Result<()> {
ensure!(self.started(), IllegalStateSnafu);
if entries.is_empty() {
return Ok(vec![]);
return Ok(());
}
let mut min_entry_id = u64::MAX;
let entry_ids = entries
.iter()
.map(|e| {
let id = e.get_id();
if id < min_entry_id {
min_entry_id = id;
}
id
})
.collect::<Vec<_>>();
let mut batch = LogBatch::with_capacity(entries.len());
batch
.add_entries::<MessageType>(ns.id, &entries)
.context(AddEntryLogBatchSnafu)?;
if let Some(first_index) = self.engine.first_index(ns.id) {
ensure!(
min_entry_id >= first_index,
error::OverrideCompactedEntrySnafu {
namespace: ns.id,
first_index,
attempt_index: min_entry_id,
}
);
for e in entries {
self.check_entry(&e)?;
let ns_id = e.namespace_id;
batch
.add_entries::<MessageType>(ns_id, &[e])
.context(AddEntryLogBatchSnafu)?;
}
let _ = self
.engine
.write(&mut batch, self.config.sync_write)
.context(RaftEngineSnafu)?;
Ok(entry_ids)
Ok(())
}
/// Create a stream of entries from logstore in the given namespace. The end of stream is
@@ -217,7 +212,7 @@ impl LogStore for RaftEngineLogStore {
&self,
ns: &Self::Namespace,
id: Id,
) -> Result<SendableEntryStream<'_, Self::Entry, Self::Error>, Self::Error> {
) -> Result<SendableEntryStream<'_, Self::Entry, Self::Error>> {
ensure!(self.started(), IllegalStateSnafu);
let engine = self.engine.clone();
@@ -275,7 +270,7 @@ impl LogStore for RaftEngineLogStore {
Ok(Box::pin(s))
}
async fn create_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error> {
async fn create_namespace(&self, ns: &Self::Namespace) -> Result<()> {
ensure!(
ns.id != SYSTEM_NAMESPACE,
IllegalNamespaceSnafu { ns: ns.id }
@@ -293,7 +288,7 @@ impl LogStore for RaftEngineLogStore {
Ok(())
}
async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error> {
async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<()> {
ensure!(
ns.id != SYSTEM_NAMESPACE,
IllegalNamespaceSnafu { ns: ns.id }
@@ -309,7 +304,7 @@ impl LogStore for RaftEngineLogStore {
Ok(())
}
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>, Self::Error> {
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>> {
ensure!(self.started(), IllegalStateSnafu);
let mut namespaces: Vec<Namespace> = vec![];
self.engine
@@ -328,7 +323,7 @@ impl LogStore for RaftEngineLogStore {
}
fn entry<D: AsRef<[u8]>>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry {
Entry {
EntryImpl {
id,
data: data.as_ref().to_vec(),
namespace_id: ns.id(),
@@ -343,7 +338,7 @@ impl LogStore for RaftEngineLogStore {
}
}
async fn obsolete(&self, namespace: Self::Namespace, id: Id) -> Result<(), Self::Error> {
async fn obsolete(&self, namespace: Self::Namespace, id: Id) -> Result<()> {
ensure!(self.started(), IllegalStateSnafu);
let obsoleted = self.engine.compact_to(namespace.id(), id + 1);
info!(
@@ -361,7 +356,7 @@ impl LogStore for RaftEngineLogStore {
struct MessageType;
impl MessageExt for MessageType {
type Entry = Entry;
type Entry = EntryImpl;
fn index(e: &Self::Entry) -> u64 {
e.id
@@ -580,4 +575,64 @@ mod tests {
vec.sort_by(|a, b| a.id.partial_cmp(&b.id).unwrap());
assert_eq!(101, vec.first().unwrap().id);
}
#[tokio::test]
async fn test_append_batch() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("logstore-append-batch-test");
let config = LogConfig {
log_file_dir: dir.path().to_str().unwrap().to_string(),
file_size: ReadableSize::mb(2).0,
purge_threshold: ReadableSize::mb(4).0,
purge_interval: Duration::from_secs(5),
..Default::default()
};
let logstore = RaftEngineLogStore::try_new(config).await.unwrap();
let entries = (0..8)
.flat_map(|ns_id| {
let data = [ns_id as u8].repeat(4096);
(0..16).map(move |idx| Entry::create(idx, ns_id, data.clone()))
})
.collect();
logstore.append_batch(entries).await.unwrap();
for ns_id in 0..8 {
let namespace = Namespace::with_id(ns_id);
let (first, last) = logstore.span(&namespace);
assert_eq!(0, first.unwrap());
assert_eq!(15, last.unwrap());
}
}
#[tokio::test]
async fn test_append_batch_interleaved() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("logstore-append-batch-test");
let config = LogConfig {
log_file_dir: dir.path().to_str().unwrap().to_string(),
file_size: ReadableSize::mb(2).0,
purge_threshold: ReadableSize::mb(4).0,
purge_interval: Duration::from_secs(5),
..Default::default()
};
let logstore = RaftEngineLogStore::try_new(config).await.unwrap();
let entries = vec![
Entry::create(0, 0, [b'0'; 4096].to_vec()),
Entry::create(1, 0, [b'0'; 4096].to_vec()),
Entry::create(0, 1, [b'1'; 4096].to_vec()),
Entry::create(2, 0, [b'0'; 4096].to_vec()),
Entry::create(1, 1, [b'1'; 4096].to_vec()),
];
logstore.append_batch(entries).await.unwrap();
assert_eq!((Some(0), Some(2)), logstore.span(&Namespace::with_id(0)));
assert_eq!((Some(0), Some(1)), logstore.span(&Namespace::with_id(1)));
}
}

View File

@@ -36,14 +36,10 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
/// Append an `Entry` to WAL with given namespace and return append response containing
/// the entry id.
async fn append(&self, mut e: Self::Entry) -> Result<AppendResponse, Self::Error>;
async fn append(&self, e: Self::Entry) -> Result<AppendResponse, Self::Error>;
/// Append a batch of entries atomically and return the offset of first entry.
async fn append_batch(
&self,
ns: &Self::Namespace,
e: Vec<Self::Entry>,
) -> Result<Vec<Id>, Self::Error>;
async fn append_batch(&self, e: Vec<Self::Entry>) -> Result<(), Self::Error>;
/// Create a new `EntryStream` to asynchronously generates `Entry` with ids
/// starting from `id`.

View File

@@ -16,6 +16,6 @@ use std::hash::Hash;
pub type Id = u64;
pub trait Namespace: Send + Sync + Clone + std::fmt::Debug + Hash + PartialEq {
pub trait Namespace: Send + Sync + Clone + std::fmt::Debug + Hash + PartialEq + Eq {
fn id(&self) -> Id;
}