diff --git a/src/log-store/src/fs/file.rs b/src/log-store/src/fs/file.rs index e718e26447..2dcef32a5a 100644 --- a/src/log-store/src/fs/file.rs +++ b/src/log-store/src/fs/file.rs @@ -274,10 +274,10 @@ impl LogFile { pub async fn replay(&mut self) -> Result<(usize, Id)> { let log_name = self.name.to_string(); let previous_offset = self.flush_offset.load(Ordering::Relaxed); + let ns = LocalNamespace::default(); let mut stream = self.create_stream( // TODO(hl): LocalNamespace should be filled - LocalNamespace::default(), - 0, + &ns, 0, ); let mut last_offset = 0usize; @@ -313,7 +313,11 @@ impl LogFile { /// ### Notice /// If the entry with start entry id is not present, the first generated entry will start with /// the first entry with an id greater than `start_entry_id`. - pub fn create_stream(&self, _ns: impl Namespace, start_entry_id: u64) -> impl EntryStream + '_ { + pub fn create_stream( + &self, + _ns: &impl Namespace, + start_entry_id: u64, + ) -> impl EntryStream + '_ { let length = self.flush_offset.load(Ordering::Relaxed); let s = stream!({ diff --git a/src/log-store/src/fs/log.rs b/src/log-store/src/fs/log.rs index dd4da5ab07..39bdbd0ce4 100644 --- a/src/log-store/src/fs/log.rs +++ b/src/log-store/src/fs/log.rs @@ -3,9 +3,13 @@ use std::path::Path; use std::sync::Arc; use arc_swap::ArcSwap; +use async_stream::stream; use common_telemetry::{error, info, warn}; +use futures::pin_mut; +use futures::StreamExt; use snafu::{OptionExt, ResultExt}; use store_api::logstore::entry::{Encode, Id}; +use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::LogStore; use tokio::sync::RwLock; @@ -167,7 +171,7 @@ impl LogStore for LocalFileLogStore { async fn append( &self, - _ns: Self::Namespace, + _ns: &Self::Namespace, mut entry: Self::Entry, ) -> Result { // TODO(hl): configurable retry times @@ -208,11 +212,24 @@ impl LogStore for LocalFileLogStore { async fn read( &self, - _ns: Self::Namespace, - _id: Id, - ) -> Result> - { - todo!() + ns: Self::Namespace, + id: Id, + ) -> Result> { + let files = self.files.read().await; + + let s = stream!({ + for (start_id, file) in files.iter() { + if *start_id >= id { + let s = file.create_stream(&ns, *start_id); + pin_mut!(s); + while let Some(entries) = s.next().await { + yield entries; + } + } + } + }); + + Ok(Box::pin(s)) } async fn create_namespace(&mut self, _ns: Self::Namespace) -> Result<()> { @@ -248,13 +265,11 @@ mod tests { }; let logstore = LocalFileLogStore::open(&config).await.unwrap(); + let ns = LocalNamespace::default(); assert_eq!( 0, logstore - .append( - LocalNamespace::default(), - EntryImpl::new(generate_data(100)), - ) + .append(&ns, EntryImpl::new(generate_data(100)),) .await .unwrap() .entry_id @@ -263,10 +278,7 @@ mod tests { assert_eq!( 1, logstore - .append( - LocalNamespace::default(), - EntryImpl::new(generate_data(100)), - ) + .append(&ns, EntryImpl::new(generate_data(100)),) .await .unwrap() .entry_id @@ -296,18 +308,15 @@ mod tests { log_file_dir: dir.path().to_str().unwrap().to_string(), }; let logstore = LocalFileLogStore::open(&config).await.unwrap(); + let ns = LocalNamespace::default(); let id = logstore - .append( - LocalNamespace::default(), - EntryImpl::new(generate_data(100)), - ) + .append(&ns, EntryImpl::new(generate_data(100))) .await .unwrap() .entry_id; assert_eq!(0, id); - let active_file = logstore.active_file(); - let stream = active_file.create_stream(LocalNamespace::default(), 0); + let stream = logstore.read(ns, 0).await.unwrap(); tokio::pin!(stream); let entries = stream.next().await.unwrap().unwrap(); diff --git a/src/log-store/src/fs/noop.rs b/src/log-store/src/fs/noop.rs index cce3131d12..1204792352 100644 --- a/src/log-store/src/fs/noop.rs +++ b/src/log-store/src/fs/noop.rs @@ -17,7 +17,7 @@ impl LogStore for NoopLogStore { async fn append( &self, - _ns: Self::Namespace, + _ns: &Self::Namespace, mut _e: Self::Entry, ) -> Result { Ok(AppendResponseImpl { diff --git a/src/storage/src/wal.rs b/src/storage/src/wal.rs index 43f230b17b..048a8821b0 100644 --- a/src/storage/src/wal.rs +++ b/src/storage/src/wal.rs @@ -127,7 +127,7 @@ impl Wal { let res = self .store - .append(ns, e) + .append(&ns, e) .await .map_err(BoxedError::new) .context(error::WriteWalSnafu { name: self.name() })?; diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 6894567d17..737e0f0a61 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -21,7 +21,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { /// Append an `Entry` to WAL with given namespace async fn append( &self, - ns: Self::Namespace, + ns: &Self::Namespace, mut e: Self::Entry, ) -> Result; @@ -32,7 +32,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { e: Vec, ) -> Result; - // Create a new `EntryStream` to asynchronously generates `Entry`. + // Create a new `EntryStream` to asynchronously generates `Entry` with ids starting from `id`. async fn read( &self, ns: Self::Namespace,