From 06b592f00f5e78f16df169804f6a83468ed088ea Mon Sep 17 00:00:00 2001 From: "Lei, Huang" Date: Tue, 24 May 2022 16:12:23 +0800 Subject: [PATCH] feat: add WAL definitions (#35) * feat: add WAL definitions * rename and add some tests --- Cargo.lock | 4 ++ src/store-api/Cargo.toml | 6 +++ src/store-api/src/lib.rs | 2 +- src/store-api/src/logstore.rs | 45 ++++++++++++++++ src/store-api/src/logstore/entry.rs | 43 ++++++++++++++++ src/store-api/src/logstore/entry_stream.rs | 60 ++++++++++++++++++++++ src/store-api/src/logstore/namespace.rs | 3 ++ src/store-api/src/wal.rs | 1 - 8 files changed, 162 insertions(+), 2 deletions(-) create mode 100644 src/store-api/src/logstore.rs create mode 100644 src/store-api/src/logstore/entry.rs create mode 100644 src/store-api/src/logstore/entry_stream.rs create mode 100644 src/store-api/src/logstore/namespace.rs delete mode 100644 src/store-api/src/wal.rs diff --git a/Cargo.lock b/Cargo.lock index 10bd083294..8be8b330cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2813,8 +2813,12 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" name = "store-api" version = "0.1.0" dependencies = [ + "async-stream", + "async-trait", "common-error", "datatypes", + "futures", + "tokio", ] [[package]] diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index 99cab0b2b6..e4dd1ce826 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -6,5 +6,11 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1" common-error = { path = "../common/error" } datatypes = { path = "../datatypes" } +futures = "0.3" + +[dev-dependencies] +async-stream = "0.3" +tokio = { version = "1.18", features = ["full"] } \ No newline at end of file diff --git a/src/store-api/src/lib.rs b/src/store-api/src/lib.rs index 4dea7f7301..d1efe4c280 100644 --- a/src/store-api/src/lib.rs +++ b/src/store-api/src/lib.rs @@ -1,4 +1,4 @@ //! Storage related APIs +pub mod logstore; pub mod storage; -pub mod wal; diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs new file mode 100644 index 0000000000..ee015a3710 --- /dev/null +++ b/src/store-api/src/logstore.rs @@ -0,0 +1,45 @@ +//! LogStore APIs. + +use common_error::prelude::ErrorExt; +use entry::Offset; + +use crate::logstore::entry::Entry; +use crate::logstore::entry_stream::SendableEntryStream; +use crate::logstore::namespace::Namespace; + +pub mod entry; +pub mod entry_stream; +pub mod namespace; + +/// `LogStore` serves as a Write-Ahead-Log for storage engine. +#[async_trait::async_trait] +pub trait LogStore { + type Error: ErrorExt + Send + Sync; + type Namespace: Namespace; + + /// Append an `Entry` to WAL with given namespace + async fn append(&mut self, ns: Self::Namespace, e: Entry) -> Result; + + // Append a batch of entries atomically and return the offset of first entry. + async fn append_batch( + &mut self, + ns: Self::Namespace, + e: Vec, + ) -> Result; + + // Create a new `EntryStream` to asynchronously generates `Entry`. + async fn read( + &self, + ns: Self::Namespace, + offset: Offset, + ) -> Result; + + // Create a new `Namespace`. + async fn create_namespace(&mut self, ns: Self::Namespace) -> Result<(), Self::Error>; + + // Delete an existing `Namespace` with given ref. + async fn delete_namespace(&mut self, ns: Self::Namespace) -> Result<(), Self::Error>; + + // List all existing namespaces. + async fn list_namespaces(&self) -> Result, Self::Error>; +} diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs new file mode 100644 index 0000000000..9ea7d1f327 --- /dev/null +++ b/src/store-api/src/logstore/entry.rs @@ -0,0 +1,43 @@ +pub type Offset = u64; +pub type Epoch = u64; + +pub struct Entry { + /// Offset of current entry + offset: Offset, + /// Epoch of current entry + epoch: Epoch, + /// Binary data of current entry + data: Vec, +} + +impl Entry { + pub fn new(data: impl AsRef<[u8]>, offset: Offset, epoch: u64) -> Self { + let data = data.as_ref().to_vec(); + Self { + data, + offset, + epoch, + } + } + + pub fn data(&self) -> &[u8] { + self.data.as_slice() + } + + pub fn offset(&self) -> Offset { + self.offset + } + + pub fn epoch(&self) -> Epoch { + self.epoch + } + + /// Return total length of entry after serialization(maybe CRC and length field) + pub fn len(&self) -> usize { + self.data.len() + 8 + } + + pub fn is_empty(&self) -> bool { + self.data.len() == 0 + } +} diff --git a/src/store-api/src/logstore/entry_stream.rs b/src/store-api/src/logstore/entry_stream.rs new file mode 100644 index 0000000000..96c2c1e17e --- /dev/null +++ b/src/store-api/src/logstore/entry_stream.rs @@ -0,0 +1,60 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::Stream; + +use crate::logstore::entry::Entry; +use crate::logstore::Offset; + +pub trait EntryStream: Stream> { + fn start_offset(&self) -> Offset; +} + +pub type SendableEntryStream<'a> = Pin> + Send + 'a>>; + +pub struct EntryStreamImpl<'a> { + inner: SendableEntryStream<'a>, + start_offset: Offset, +} + +impl<'a> EntryStream for EntryStreamImpl<'a> { + fn start_offset(&self) -> Offset { + self.start_offset + } +} + +impl Stream for EntryStreamImpl<'_> { + type Item = Vec; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.inner).poll_next(cx) { + Poll::Ready(Some(v)) => Poll::Ready(Some(v)), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +#[cfg(test)] +mod tests { + + use futures::StreamExt; + + use super::*; + + #[tokio::test] + pub async fn test_entry_stream() { + let stream = + async_stream::stream!({ yield vec![Entry::new("test_entry".as_bytes(), 0, 128)] }); + + let mut stream_impl = EntryStreamImpl { + inner: Box::pin(stream), + start_offset: 1234, + }; + + if let Some(v) = stream_impl.next().await { + assert_eq!(1, v.len()); + assert_eq!(b"test_entry", v[0].data()); + } + } +} diff --git a/src/store-api/src/logstore/namespace.rs b/src/store-api/src/logstore/namespace.rs new file mode 100644 index 0000000000..9d1f7b3f94 --- /dev/null +++ b/src/store-api/src/logstore/namespace.rs @@ -0,0 +1,3 @@ +pub trait Namespace: Send + Sync + Clone { + fn name(&self) -> &str; +} diff --git a/src/store-api/src/wal.rs b/src/store-api/src/wal.rs deleted file mode 100644 index 2b436a8b5c..0000000000 --- a/src/store-api/src/wal.rs +++ /dev/null @@ -1 +0,0 @@ -//! WAL APIs.