From fb0585229e11bb1c8da084e4b708a1827fbf1a0f Mon Sep 17 00:00:00 2001 From: "Lei, Huang" Date: Thu, 26 May 2022 11:30:50 +0800 Subject: [PATCH] refactor: Entry should be a trait (#37) --- src/store-api/src/logstore.rs | 7 +- src/store-api/src/logstore/entry.rs | 49 +++-------- src/store-api/src/logstore/entry_stream.rs | 95 +++++++++++++++------- 3 files changed, 81 insertions(+), 70 deletions(-) diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index ee015a3710..333c4cbe47 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -16,15 +16,16 @@ pub mod namespace; pub trait LogStore { type Error: ErrorExt + Send + Sync; type Namespace: Namespace; + type Entry: Entry; /// Append an `Entry` to WAL with given namespace - async fn append(&mut self, ns: Self::Namespace, e: Entry) -> Result; + async fn append(&mut self, ns: Self::Namespace, e: Self::Entry) -> Result; // Append a batch of entries atomically and return the offset of first entry. async fn append_batch( &mut self, ns: Self::Namespace, - e: Vec, + e: Vec, ) -> Result; // Create a new `EntryStream` to asynchronously generates `Entry`. @@ -32,7 +33,7 @@ pub trait LogStore { &self, ns: Self::Namespace, offset: Offset, - ) -> Result; + ) -> Result, Self::Error>; // Create a new `Namespace`. async fn create_namespace(&mut self, ns: Self::Namespace) -> Result<(), Self::Error>; diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index 9ea7d1f327..88d97ab28b 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -1,43 +1,14 @@ pub type Offset = u64; pub type Epoch = u64; -pub struct Entry { - /// Offset of current entry - offset: Offset, - /// Epoch of current entry - epoch: Epoch, - /// Binary data of current entry - data: Vec, -} - -impl Entry { - pub fn new(data: impl AsRef<[u8]>, offset: Offset, epoch: u64) -> Self { - let data = data.as_ref().to_vec(); - Self { - data, - offset, - epoch, - } - } - - pub fn data(&self) -> &[u8] { - self.data.as_slice() - } - - pub fn offset(&self) -> Offset { - self.offset - } - - pub fn epoch(&self) -> Epoch { - self.epoch - } - - /// Return total length of entry after serialization(maybe CRC and length field) - pub fn len(&self) -> usize { - self.data.len() + 8 - } - - pub fn is_empty(&self) -> bool { - self.data.len() == 0 - } +/// Entry is the minimal data storage unit in `LogStore`. +pub trait Entry { + /// Return contained data of entry. + fn data(&self) -> &[u8]; + + /// Return offset of entry. + fn offset(&self) -> Offset; + + /// Returns epoch of entry. + fn epoch(&self) -> Epoch; } diff --git a/src/store-api/src/logstore/entry_stream.rs b/src/store-api/src/logstore/entry_stream.rs index 96c2c1e17e..9a607963dd 100644 --- a/src/store-api/src/logstore/entry_stream.rs +++ b/src/store-api/src/logstore/entry_stream.rs @@ -1,51 +1,90 @@ use std::pin::Pin; -use std::task::{Context, Poll}; use futures::Stream; use crate::logstore::entry::Entry; use crate::logstore::Offset; -pub trait EntryStream: Stream> { +pub trait EntryStream: Stream> { + type Entry: Entry; fn start_offset(&self) -> Offset; } -pub type SendableEntryStream<'a> = Pin> + Send + 'a>>; - -pub struct EntryStreamImpl<'a> { - inner: SendableEntryStream<'a>, - start_offset: Offset, -} - -impl<'a> EntryStream for EntryStreamImpl<'a> { - fn start_offset(&self) -> Offset { - self.start_offset - } -} - -impl Stream for EntryStreamImpl<'_> { - type Item = Vec; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.inner).poll_next(cx) { - Poll::Ready(Some(v)) => Poll::Ready(Some(v)), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } - } -} +pub type SendableEntryStream<'a, E> = Pin> + Send + 'a>>; #[cfg(test)] mod tests { + use std::task::{Context, Poll}; use futures::StreamExt; use super::*; + use crate::logstore::entry::Epoch; + + pub struct SimpleEntry { + /// Offset of current entry + offset: Offset, + /// Epoch of current entry + epoch: Epoch, + /// Binary data of current entry + data: Vec, + } + + impl Entry for SimpleEntry { + fn data(&self) -> &[u8] { + self.data.as_slice() + } + + fn offset(&self) -> Offset { + self.offset + } + + fn epoch(&self) -> Epoch { + self.epoch + } + } + + impl SimpleEntry { + pub fn new(data: impl AsRef<[u8]>, offset: Offset, epoch: u64) -> Self { + let data = data.as_ref().to_vec(); + Self { + data, + offset, + epoch, + } + } + } + + pub struct EntryStreamImpl<'a> { + inner: SendableEntryStream<'a, SimpleEntry>, + start_offset: Offset, + } + + impl<'a> EntryStream for EntryStreamImpl<'a> { + type Entry = SimpleEntry; + + fn start_offset(&self) -> Offset { + self.start_offset + } + } + + impl Stream for EntryStreamImpl<'_> { + type Item = Vec; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.inner).poll_next(cx) { + Poll::Ready(Some(v)) => Poll::Ready(Some(v)), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } + } #[tokio::test] pub async fn test_entry_stream() { - let stream = - async_stream::stream!({ yield vec![Entry::new("test_entry".as_bytes(), 0, 128)] }); + let stream = async_stream::stream!({ + yield vec![SimpleEntry::new("test_entry".as_bytes(), 0, 128)] + }); let mut stream_impl = EntryStreamImpl { inner: Box::pin(stream),