feat: add WAL definitions (#35)

* feat: add WAL definitions

* rename and add some tests
This commit is contained in:
Lei, Huang
2022-05-24 16:12:23 +08:00
committed by GitHub
parent 1594da337f
commit 06b592f00f
8 changed files with 162 additions and 2 deletions

4
Cargo.lock generated
View File

@@ -2813,8 +2813,12 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
name = "store-api"
version = "0.1.0"
dependencies = [
"async-stream",
"async-trait",
"common-error",
"datatypes",
"futures",
"tokio",
]
[[package]]

View File

@@ -6,5 +6,11 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1"
common-error = { path = "../common/error" }
datatypes = { path = "../datatypes" }
futures = "0.3"
[dev-dependencies]
async-stream = "0.3"
tokio = { version = "1.18", features = ["full"] }

View File

@@ -1,4 +1,4 @@
//! Storage related APIs
pub mod logstore;
pub mod storage;
pub mod wal;

View File

@@ -0,0 +1,45 @@
//! LogStore APIs.
use common_error::prelude::ErrorExt;
use entry::Offset;
use crate::logstore::entry::Entry;
use crate::logstore::entry_stream::SendableEntryStream;
use crate::logstore::namespace::Namespace;
pub mod entry;
pub mod entry_stream;
pub mod namespace;
/// `LogStore` serves as a Write-Ahead-Log for storage engine.
#[async_trait::async_trait]
pub trait LogStore {
type Error: ErrorExt + Send + Sync;
type Namespace: Namespace;
/// Append an `Entry` to WAL with given namespace
async fn append(&mut self, ns: Self::Namespace, e: Entry) -> Result<Offset, Self::Error>;
// Append a batch of entries atomically and return the offset of first entry.
async fn append_batch(
&mut self,
ns: Self::Namespace,
e: Vec<Entry>,
) -> Result<Offset, Self::Error>;
// Create a new `EntryStream` to asynchronously generates `Entry`.
async fn read(
&self,
ns: Self::Namespace,
offset: Offset,
) -> Result<SendableEntryStream, Self::Error>;
// Create a new `Namespace`.
async fn create_namespace(&mut self, ns: Self::Namespace) -> Result<(), Self::Error>;
// Delete an existing `Namespace` with given ref.
async fn delete_namespace(&mut self, ns: Self::Namespace) -> Result<(), Self::Error>;
// List all existing namespaces.
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>, Self::Error>;
}

View File

@@ -0,0 +1,43 @@
pub type Offset = u64;
pub type Epoch = u64;
pub struct Entry {
/// Offset of current entry
offset: Offset,
/// Epoch of current entry
epoch: Epoch,
/// Binary data of current entry
data: Vec<u8>,
}
impl Entry {
pub fn new(data: impl AsRef<[u8]>, offset: Offset, epoch: u64) -> Self {
let data = data.as_ref().to_vec();
Self {
data,
offset,
epoch,
}
}
pub fn data(&self) -> &[u8] {
self.data.as_slice()
}
pub fn offset(&self) -> Offset {
self.offset
}
pub fn epoch(&self) -> Epoch {
self.epoch
}
/// Return total length of entry after serialization(maybe CRC and length field)
pub fn len(&self) -> usize {
self.data.len() + 8
}
pub fn is_empty(&self) -> bool {
self.data.len() == 0
}
}

View File

@@ -0,0 +1,60 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::Stream;
use crate::logstore::entry::Entry;
use crate::logstore::Offset;
pub trait EntryStream: Stream<Item = Vec<Entry>> {
fn start_offset(&self) -> Offset;
}
pub type SendableEntryStream<'a> = Pin<Box<dyn Stream<Item = Vec<Entry>> + Send + 'a>>;
pub struct EntryStreamImpl<'a> {
inner: SendableEntryStream<'a>,
start_offset: Offset,
}
impl<'a> EntryStream for EntryStreamImpl<'a> {
fn start_offset(&self) -> Offset {
self.start_offset
}
}
impl Stream for EntryStreamImpl<'_> {
type Item = Vec<Entry>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(Some(v)) => Poll::Ready(Some(v)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(test)]
mod tests {
use futures::StreamExt;
use super::*;
#[tokio::test]
pub async fn test_entry_stream() {
let stream =
async_stream::stream!({ yield vec![Entry::new("test_entry".as_bytes(), 0, 128)] });
let mut stream_impl = EntryStreamImpl {
inner: Box::pin(stream),
start_offset: 1234,
};
if let Some(v) = stream_impl.next().await {
assert_eq!(1, v.len());
assert_eq!(b"test_entry", v[0].data());
}
}
}

View File

@@ -0,0 +1,3 @@
pub trait Namespace: Send + Sync + Clone {
fn name(&self) -> &str;
}

View File

@@ -1 +0,0 @@
//! WAL APIs.