From e03ac2fc2bf9d00082d767bf6d5cf9ce74b5adeb Mon Sep 17 00:00:00 2001 From: "Lei, Huang" Date: Thu, 16 Jun 2022 19:09:09 +0800 Subject: [PATCH] Implement log store append and file set management (#43) * add log store impl * add some test * delete failing test * fix: concurrent close issue * feat: use arcswap to replace unsafe AtomicPtr * fix: use lock to protect rolling procedure. fix: use try_recv to replace poll_recv on appender task. * chores: 1. use direct tmp dir instead of creating TempDir instance; 2. inline some short function; 3. rename some structs; 4. optimize namespace to arc wrapper inner struct. --- Cargo.lock | 45 ++ src/log-store/Cargo.toml | 21 +- src/log-store/src/error.rs | 57 +++ src/log-store/src/fs.rs | 29 ++ src/log-store/src/fs/config.rs | 34 ++ src/log-store/src/fs/crc.rs | 3 + src/log-store/src/fs/entry.rs | 207 ++++++++ src/log-store/src/fs/file.rs | 519 +++++++++++++++++++++ src/log-store/src/fs/file_name.rs | 111 +++++ src/log-store/src/fs/index.rs | 68 +++ src/log-store/src/fs/log.rs | 309 ++++++++++++ src/log-store/src/fs/namespace.rs | 40 ++ src/log-store/src/lib.rs | 3 +- src/store-api/Cargo.toml | 1 + src/store-api/src/logstore.rs | 24 +- src/store-api/src/logstore/entry.rs | 28 +- src/store-api/src/logstore/entry_stream.rs | 78 +++- 17 files changed, 1550 insertions(+), 27 deletions(-) create mode 100644 src/log-store/src/error.rs create mode 100644 src/log-store/src/fs.rs create mode 100644 src/log-store/src/fs/config.rs create mode 100644 src/log-store/src/fs/crc.rs create mode 100644 src/log-store/src/fs/entry.rs create mode 100644 src/log-store/src/fs/file.rs create mode 100644 src/log-store/src/fs/file_name.rs create mode 100644 src/log-store/src/fs/index.rs create mode 100644 src/log-store/src/fs/log.rs create mode 100644 src/log-store/src/fs/namespace.rs diff --git a/Cargo.lock b/Cargo.lock index 4fb3da8700..5fadce9985 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -692,6 +692,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53757d12b596c16c78b83458d732a5d1a17ab3f53f2f7412f6fb57cc8a140ab3" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d0165d2900ae6778e36e80bbc4da3b5eefccee9ba939761f9c2882a5d9af3ff" + [[package]] name = "crc32fast" version = "1.3.2" @@ -1596,6 +1611,26 @@ dependencies = [ [[package]] name = "log-store" version = "0.1.0" +dependencies = [ + "arc-swap", + "async-stream", + "async-trait", + "base64", + "byteorder", + "bytes", + "common-error", + "common-telemetry", + "crc", + "futures", + "futures-util", + "hex", + "memmap2", + "rand 0.8.5", + "snafu", + "store-api", + "tempdir", + "tokio", +] [[package]] name = "logical-plans" @@ -1672,6 +1707,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memmap2" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5172b50c23043ff43dd53e51392f36519d9b35a8f3a410d30ece5d1aedd58ae" +dependencies = [ + "libc", +] + [[package]] name = "memoffset" version = "0.6.5" @@ -3000,6 +3044,7 @@ dependencies = [ "common-error", "datatypes", "futures", + "snafu", "tokio", ] diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index eeffdf78d2..105f4633b2 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -2,7 +2,26 @@ name = "log-store" version = "0.1.0" edition = "2021" - # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +arc-swap = "1.5" +async-stream = "0.3" +async-trait = "0.1" +base64 = "0.13" +byteorder = "1.4" +bytes = "1.1" +common-error = { path = "../common/error" } +common-telemetry = { path = "../common/telemetry" } +crc = "3.0" +futures = "0.3" +futures-util = "0.3" +hex = "0.4" +memmap2 = "0.5" +snafu = { version = "0.7", features = ["backtraces"] } +store-api = { path = "../store-api" } +tempdir = "0.3" +tokio = { version = "1.18", features = ["full"] } + +[dev-dependencies] +rand = "0.8.5" diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs new file mode 100644 index 0000000000..f25107cdcc --- /dev/null +++ b/src/log-store/src/error.rs @@ -0,0 +1,57 @@ +use std::any::Any; + +use common_error::prelude::{ErrorExt, Snafu}; +use snafu::{Backtrace, ErrorCompat}; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Failed to deserialize entry"))] + Deserialization { backtrace: Backtrace }, + + #[snafu(display("Entry corrupted, msg: {}", msg))] + Corrupted { msg: String, backtrace: Backtrace }, + + #[snafu(display("IO error, source: {}", source))] + Io { + source: std::io::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to open log file {}, source: {}", file_name, source))] + OpenLog { + file_name: String, + source: std::io::Error, + backtrace: Backtrace, + }, + + #[snafu(display("File name {} illegal", file_name))] + FileNameIllegal { + file_name: String, + backtrace: Backtrace, + }, + + #[snafu(display("Internal error, msg: {}", msg))] + Internal { msg: String, backtrace: Backtrace }, + + #[snafu(display("End of LogFile"))] + Eof, + + #[snafu(display("File duplicate on start: {}", msg))] + DuplicateFile { msg: String }, + + #[snafu(display("Log file suffix is illegal: {}", suffix))] + SuffixIllegal { suffix: String }, +} + +impl ErrorExt for Error { + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +pub type Result = std::result::Result; diff --git a/src/log-store/src/fs.rs b/src/log-store/src/fs.rs new file mode 100644 index 0000000000..a852306754 --- /dev/null +++ b/src/log-store/src/fs.rs @@ -0,0 +1,29 @@ +use store_api::logstore::entry::{Id, Offset}; +use store_api::logstore::AppendResponse; + +mod config; +mod crc; +mod entry; +mod file; +mod file_name; +mod index; +mod log; +mod namespace; + +#[derive(Debug, PartialEq, Eq)] +pub struct AppendResponseImpl { + entry_id: Id, + offset: Offset, +} + +impl AppendResponse for AppendResponseImpl { + #[inline] + fn entry_id(&self) -> Id { + self.entry_id + } + + #[inline] + fn offset(&self) -> Offset { + self.offset + } +} diff --git a/src/log-store/src/fs/config.rs b/src/log-store/src/fs/config.rs new file mode 100644 index 0000000000..acdc565245 --- /dev/null +++ b/src/log-store/src/fs/config.rs @@ -0,0 +1,34 @@ +#[derive(Debug, Clone)] +pub struct LogConfig { + pub append_buffer_size: usize, + pub max_log_file_size: usize, + pub log_file_dir: String, +} + +impl Default for LogConfig { + /// Default value of config stores log file into a tmp directory, which should only be used + /// in tests. + fn default() -> Self { + Self { + append_buffer_size: 128, + max_log_file_size: 1024 * 1024 * 1024, + log_file_dir: "/tmp/greptimedb".to_string(), + } + } +} + +#[cfg(test)] +mod tests { + use common_telemetry::info; + + use super::*; + + #[test] + pub fn test_default_config() { + common_telemetry::logging::init_default_ut_logging(); + let default = LogConfig::default(); + info!("LogConfig::default(): {:?}", default); + assert_eq!(1024 * 1024 * 1024, default.max_log_file_size); + assert_eq!(128, default.append_buffer_size); + } +} diff --git a/src/log-store/src/fs/crc.rs b/src/log-store/src/fs/crc.rs new file mode 100644 index 0000000000..2e620ca1c3 --- /dev/null +++ b/src/log-store/src/fs/crc.rs @@ -0,0 +1,3 @@ +use crc::{Crc, CRC_32_ISCSI}; + +pub const CRC_ALGO: Crc = Crc::::new(&CRC_32_ISCSI); diff --git a/src/log-store/src/fs/entry.rs b/src/log-store/src/fs/entry.rs new file mode 100644 index 0000000000..13be5dc6c4 --- /dev/null +++ b/src/log-store/src/fs/entry.rs @@ -0,0 +1,207 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use byteorder::{ByteOrder, LittleEndian}; +use futures::Stream; +use snafu::{ensure, Backtrace, GenerateImplicitData}; +use store_api::logstore::entry::{Entry, Epoch, Id, Offset}; +use store_api::logstore::entry_stream::{EntryStream, SendableEntryStream}; + +use crate::error::{DeserializationSnafu, Error}; +use crate::fs::crc; + +// length+offset+epoch+crc +const ENTRY_MIN_LEN: usize = 4 + 8 + 8 + 4; + +#[derive(Debug, PartialEq, Clone)] +pub struct EntryImpl { + pub data: Vec, + pub offset: Offset, + pub id: Id, + pub epoch: Epoch, +} + +impl Entry for EntryImpl { + type Error = Error; + + fn data(&self) -> &[u8] { + &self.data + } + + fn id(&self) -> Id { + self.id + } + + fn offset(&self) -> Offset { + self.offset + } + + fn set_offset(&mut self, offset: Offset) { + self.offset = offset; + } + + fn set_id(&mut self, id: Id) { + self.id = id; + } + + fn epoch(&self) -> Epoch { + self.epoch + } + + fn len(&self) -> usize { + ENTRY_MIN_LEN + self.data.len() + } + + fn is_empty(&self) -> bool { + self.data.is_empty() + } + + fn serialize(&self) -> Vec { + let res: Vec = self.into(); + res + } + + fn deserialize(b: impl AsRef<[u8]>) -> Result { + EntryImpl::try_from(b.as_ref()) + } +} + +impl EntryImpl { + pub fn new(data: impl AsRef<[u8]>) -> Self { + let data = Vec::from(data.as_ref()); + Self { + id: 0, + data, + offset: 0, + epoch: 0, + } + } +} + +/// Entry binary format (Little endian): +/// +/// +--------+--------+--------+--------+--------+ +// |entry id| epoch | length | data | CRC | +// +--------+--------+--------+--------+--------+ +// | 8 bytes| 8 bytes| 4 bytes|| 4 bytes| +// +--------+--------+--------+--------+--------+ +/// +impl TryFrom<&[u8]> for EntryImpl { + type Error = Error; + + fn try_from(value: &[u8]) -> Result { + ensure!(value.len() >= ENTRY_MIN_LEN, DeserializationSnafu); + + // TODO(hl): will use byteorder to simplify encoding/decoding. + let id_end_ofs = 8; + let epoch_end_ofs = id_end_ofs + 8; + let length_end_offset = epoch_end_ofs + 4; + let length = LittleEndian::read_u32(&value[epoch_end_ofs..length_end_offset]); + let data_end_ofs = length_end_offset + length as usize; + let crc_end_ofs = data_end_ofs + 4; + let data = Vec::from(&value[length_end_offset..data_end_ofs]); + let id = LittleEndian::read_u64(&value[0..id_end_ofs]); + let epoch = LittleEndian::read_u64(&value[id_end_ofs..epoch_end_ofs]); + let crc_read = LittleEndian::read_u32(&value[data_end_ofs..crc_end_ofs]); + + // TODO(hl): add a config option to turn off CRC checksum. + let crc_calc = crc::CRC_ALGO.checksum(&value[0..data_end_ofs]); + if crc_calc != crc_read { + return Err(Error::Corrupted { + msg: format!("CRC mismatch, read: {}, calc: {}", crc_read, crc_calc), + backtrace: Backtrace::generate(), + }); + } + + Ok(Self { + data, + offset: 0usize, + id, + epoch, + }) + } +} + +impl From<&EntryImpl> for Vec { + fn from(e: &EntryImpl) -> Self { + let data_length = e.data.len(); + let total_size = data_length + ENTRY_MIN_LEN; + let mut vec = vec![0u8; total_size]; + + let buf = vec.as_mut_slice(); + + let id_end_ofs = 8; + let epoch_end_ofs = id_end_ofs + 8; + let length_end_offset = epoch_end_ofs + 4; + let data_end_ofs = length_end_offset + data_length as usize; + let crc_end_ofs = data_end_ofs + 4; + + LittleEndian::write_u64(buf, e.id); + LittleEndian::write_u64(&mut buf[id_end_ofs..epoch_end_ofs], e.epoch); + LittleEndian::write_u32( + &mut buf[epoch_end_ofs..length_end_offset], + data_length as u32, + ); // todo check this cast + + buf[length_end_offset..data_end_ofs].copy_from_slice(e.data.as_slice()); + let checksum = crc::CRC_ALGO.checksum(&buf[0..data_end_ofs]); + LittleEndian::write_u32(&mut buf[data_end_ofs..crc_end_ofs], checksum); + vec + } +} + +pub struct StreamImpl<'a> { + pub inner: SendableEntryStream<'a, EntryImpl, Error>, + pub start_entry_id: Id, +} + +impl<'a> Stream for StreamImpl<'a> { + type Item = Result, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_next(cx) + } +} + +impl<'a> EntryStream for StreamImpl<'a> { + type Error = Error; + type Entry = EntryImpl; + + fn start_id(&self) -> u64 { + self.start_entry_id + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::fs::crc::CRC_ALGO; + + #[test] + pub fn test_entry_deser() { + let data = "hello, world"; + let entry = EntryImpl::new(data.as_bytes()); + let vec: Vec = (&entry).into(); + assert_eq!(ENTRY_MIN_LEN + data.as_bytes().len(), vec.len()); + let deserialized = EntryImpl::try_from(vec.as_slice()).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + pub fn test_rewrite_entry_id() { + let data = "hello, world"; + let mut entry = EntryImpl::new(data.as_bytes()); + let mut vec: Vec = (&entry).into(); + entry.set_id(123); + assert_eq!(123, entry.id()); + + // rewrite entry id. + LittleEndian::write_u64(&mut vec[0..8], 333); + let len = vec.len(); + let checksum = CRC_ALGO.checksum(&vec[0..len - 4]); + LittleEndian::write_u32(&mut vec[len - 4..], checksum); + + let entry_impl = EntryImpl::deserialize(&vec).expect("Failed to deserialize"); + assert_eq!(333, entry_impl.id()); + } +} diff --git a/src/log-store/src/fs/file.rs b/src/log-store/src/fs/file.rs new file mode 100644 index 0000000000..71ed0ffd49 --- /dev/null +++ b/src/log-store/src/fs/file.rs @@ -0,0 +1,519 @@ +use std::fmt::{Debug, Formatter}; +use std::io::SeekFrom; +use std::sync::atomic::Ordering::Acquire; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; + +use async_stream::stream; +use byteorder::ByteOrder; +use byteorder::LittleEndian; +use common_telemetry::logging::{error, info}; +use common_telemetry::warn; +use futures_util::StreamExt; +use memmap2::{Mmap, MmapOptions}; +use snafu::{Backtrace, GenerateImplicitData, ResultExt}; +use store_api::logstore::entry::{Entry, Id, Offset}; +use store_api::logstore::entry_stream::EntryStream; +use store_api::logstore::namespace::Namespace; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncSeekExt, AsyncWriteExt}; +use tokio::sync::mpsc::error::TryRecvError; +use tokio::sync::mpsc::Receiver as MpscReceiver; +use tokio::sync::mpsc::Sender as MpscSender; +use tokio::sync::oneshot::Sender as OneshotSender; +use tokio::sync::{oneshot, Notify, RwLock}; +use tokio::task::JoinHandle; +use tokio::time; + +use crate::error::{Error, IoSnafu, OpenLogSnafu, Result}; +use crate::fs::config::LogConfig; +use crate::fs::crc::CRC_ALGO; +use crate::fs::entry::{EntryImpl, StreamImpl}; +use crate::fs::file_name::FileName; +use crate::fs::namespace::LocalNamespace; +use crate::fs::AppendResponseImpl; + +const LOG_WRITER_BATCH_SIZE: usize = 16; + +// TODO(hl): use pwrite polyfill in different platforms, avoid write syscall in each append request. +pub struct LogFile { + name: FileName, + file: Arc>, + write_offset: Arc, + flush_offset: Arc, + next_entry_id: Arc, + start_entry_id: u64, + pending_request_rx: Option>, + pending_request_tx: MpscSender, + notify: Arc, + max_file_size: usize, + join_handle: Mutex>>>, + sealed: Arc, + stopped: Arc, +} + +impl Debug for LogFile { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LogFile") + .field("name", &self.name) + .field("start_entry_id", &self.start_entry_id) + .field("write_offset", &self.write_offset.load(Ordering::Relaxed)) + .field("flush_offset", &self.flush_offset.load(Ordering::Relaxed)) + .field("next_entry_id", &self.next_entry_id.load(Ordering::Relaxed)) + .field("max_file_size", &self.max_file_size) + .field("sealed", &self.sealed.load(Ordering::Relaxed)) + .finish() + } +} + +impl LogFile { + /// Opens a file in path with given log config. + pub async fn open(path: impl Into, config: &LogConfig) -> Result { + let path = path.into(); + let file = OpenOptions::new() + .write(true) + .read(true) + .create(true) + .open(path.clone()) + .await + .context(OpenLogSnafu { file_name: &path })?; + + let file_name: FileName = path.as_str().try_into()?; + let start_entry_id = file_name.entry_id(); + let (tx, rx) = tokio::sync::mpsc::channel(config.append_buffer_size); + + let mut log = Self { + name: file_name, + file: Arc::new(RwLock::new(file)), + write_offset: Arc::new(AtomicUsize::new(0)), + flush_offset: Arc::new(AtomicUsize::new(0)), + next_entry_id: Arc::new(AtomicU64::new(start_entry_id)), + start_entry_id, + pending_request_tx: tx, + pending_request_rx: Some(rx), + notify: Arc::new(Notify::new()), + max_file_size: config.max_log_file_size, + join_handle: Mutex::new(None), + sealed: Arc::new(AtomicBool::new(false)), + stopped: Arc::new(AtomicBool::new(false)), + }; + + let metadata = log.file.read().await.metadata().await.context(IoSnafu)?; + let expect_length = metadata.len() as usize; + log.write_offset.store(expect_length, Ordering::Relaxed); + log.flush_offset.store(expect_length, Ordering::Relaxed); + + let replay_start_time = time::Instant::now(); + let (actual_offset, next_entry_id) = log.replay().await?; + + info!( + "Log file {} replay finished, last offset: {}, file start entry id: {}, next entry id: {}, elapsed time: {}ms", + path, actual_offset, start_entry_id, next_entry_id, + time::Instant::now().duration_since(replay_start_time).as_millis() + ); + + log.write_offset.store(actual_offset, Ordering::Relaxed); + log.flush_offset.store(actual_offset, Ordering::Relaxed); + log.next_entry_id.store(next_entry_id, Ordering::Relaxed); + log.seek(actual_offset).await?; + Ok(log) + } + + /// Advances file cursor to given offset. + async fn seek(&mut self, offset: usize) -> Result { + self.file + .write() + .await + .seek(SeekFrom::Start(offset as u64)) + .await + .context(IoSnafu) + } + + /// Creates a file mmap region. + async fn map(&self, start: u64, length: usize) -> Result { + unsafe { + let file = self.file.read().await.try_clone().await.context(IoSnafu)?; + MmapOptions::new() + .offset(start) + .len(length) + .map(&file) + .context(IoSnafu) + } + } + + /// Returns the persisted size of current log file. + #[allow(unused)] + #[inline] + pub fn persisted_size(&self) -> usize { + self.flush_offset.load(Ordering::Relaxed) + } + + #[inline] + pub fn next_entry_id(&self) -> Id { + self.next_entry_id.load(Ordering::Relaxed) + } + + /// Increases write offset field by `delta` and return the previous value. + #[inline] + fn inc_offset(&self, delta: usize) -> usize { + // Relaxed order is enough since no sync-with relationship + // between `offset` and any other field. + self.write_offset.fetch_add(delta, Ordering::Relaxed) + } + + /// Increases next entry field by `delta` and return the previous value. + fn inc_entry_id(&self) -> u64 { + // Relaxed order is enough since no sync-with relationship + // between `offset` and any other field. + self.next_entry_id.fetch_add(1, Ordering::Relaxed) + } + + /// Starts log file and it's internal components(including flush task, etc.). + pub async fn start(&mut self) -> Result<()> { + let notify = self.notify.clone(); + let file = self.file.write().await.try_clone().await.context(IoSnafu)?; + + let write_offset = self.write_offset.clone(); + let flush_offset = self.flush_offset.clone(); + + let stopped = self.stopped.clone(); + + if let Some(mut rx) = self.pending_request_rx.take() { + let handle = tokio::spawn(async move { + let mut batch: Vec = Vec::with_capacity(LOG_WRITER_BATCH_SIZE); + + let mut error_occurred = false; + while !stopped.load(Ordering::Acquire) { + for _ in 0..LOG_WRITER_BATCH_SIZE { + match rx.try_recv() { + Ok(req) => { + batch.push(req); + } + Err(e) => match e { + TryRecvError::Empty => { + if batch.is_empty() { + notify.notified().await; + if stopped.load(Ordering::Acquire) { + break; + } + } else { + break; + } + } + TryRecvError::Disconnected => { + info!("Channel disconnected..."); + error_occurred = true; + break; + } + }, + } + } + + // flush all pending data to disk + let write_offset_read = write_offset.load(Ordering::Relaxed); + // TODO(hl): add flush metrics + if let Err(flush_err) = file.sync_all().await { + error!("Failed to flush log file: {}", flush_err); + error_occurred = true; + } + if error_occurred { + info!("Flush task stop"); + break; + } + flush_offset.store(write_offset_read, Ordering::Relaxed); + while let Some(req) = batch.pop() { + req.complete(); + } + } + + // drain all pending request on stopping. + let write_offset_read = write_offset.load(Ordering::Relaxed); + if file.sync_all().await.is_ok() { + flush_offset.store(write_offset_read, Ordering::Release); + while let Ok(req) = rx.try_recv() { + req.complete() + } + } + Ok(()) + }); + + *self.join_handle.lock().unwrap() = Some(handle); + info!("Flush task started..."); + } + Ok(()) + } + + /// Stops log file. + /// # Panics + /// Panics when a log file is stopped while not being started ever. + pub async fn stop(&self) -> Result<()> { + self.stopped.store(true, Ordering::Release); + let join_handle = self + .join_handle + .lock() + .unwrap() + .take() + .expect("Join handle should present"); + self.notify.notify_waiters(); + let res = join_handle.await.unwrap(); + info!("LogFile task finished: {:?}", res); + res + } + + #[inline] + pub fn start_entry_id(&self) -> Id { + self.start_entry_id + } + + /// Replays current file til last entry read + pub async fn replay(&mut self) -> Result<(usize, Id)> { + let log_name = self.name.to_string(); + let previous_offset = self.flush_offset.load(Ordering::Relaxed); + let mut stream = self.create_stream( + // TODO(hl): LocalNamespace should be filled + LocalNamespace::default(), + 0, + ); + + let mut last_offset = 0usize; + let mut last_entry_id: Option = None; + while let Some(res) = stream.next().await { + match res { + Ok(entries) => { + for e in entries { + last_offset += e.len(); + last_entry_id = Some(e.id()); + } + } + Err(e) => { + error!("Error while replay log {} {:?}", log_name, e); + break; + } + } + } + info!( + "Replay log {} finished, offset: {} -> {}", + log_name, previous_offset, last_offset + ); + Ok(( + last_offset, + match last_entry_id { + None => self.start_entry_id, + Some(v) => v + 1, + }, + )) + } + + /// Creates a reader stream that asynchronously generates entries start from given entry id. + /// ### Notice + /// If the entry with start entry id is not present, the first generated entry will start with + /// the first entry with an id greater than `start_entry_id`. + pub fn create_stream(&self, _ns: impl Namespace, start_entry_id: u64) -> impl EntryStream + '_ { + let s = stream!({ + let length = self.flush_offset.load(Ordering::Relaxed); + let mmap = self.map(0, length).await?; + + let mut buf: &[u8] = mmap.as_ref(); + if buf.len() == 0 { + info!("File is just created!"); + // file is newly created + return; + } + + loop { + let entry = EntryImpl::try_from(buf)?; + let entry_length = entry.len(); + if entry.id() >= start_entry_id { + yield Ok(vec![entry]); + } + if buf.len() > entry_length { + buf = &buf[entry_length..]; + } else { + break; + } + } + }); + + StreamImpl { + inner: Box::pin(s), + start_entry_id, + } + } + + /// Appends an entry to `LogFile` and return a `Result` containing the id of entry appended. + pub async fn append(&self, e: &mut T) -> Result { + if self.stopped.load(Ordering::Acquire) { + return Err(Error::Eof); + } + e.set_id(0); + let mut serialized = e.serialize(); + let size = serialized.len(); + + if size + self.write_offset.load(Ordering::Relaxed) > self.max_file_size { + return Err(Error::Eof); + } + + let entry_offset; + let entry_id; + + { + let mut file = self.file.write().await; + // generate entry id + entry_id = self.inc_entry_id(); + // generate in-file offset + entry_offset = self.inc_offset(size); + // rewrite encoded data + LittleEndian::write_u64(&mut serialized[0..8], entry_id); + // TODO(hl): CRC was calculated twice + let checksum = CRC_ALGO.checksum(&serialized[0..size - 4]); + LittleEndian::write_u32(&mut serialized[size - 4..], checksum); + + // write to file + // TODO(hl): use io buffer and pwrite to reduce syscalls. + file.write(serialized.as_slice()).await.context(IoSnafu)?; + } + + let (tx, rx) = oneshot::channel(); + + if self + .pending_request_tx + .send(AppendRequest { + tx, + offset: entry_offset, + id: entry_id, + }) + .await + .is_err() + { + self.file.write().await.sync_all().await.context(IoSnafu)?; + Ok(AppendResponseImpl { + offset: entry_offset, + entry_id, + }) + } else { + self.notify.notify_one(); // notify flush thread. + rx.await.map_err(|e| { + warn!( + "Error while waiting for append result:{}, file {}", + e, + self.name.to_string() + ); + Error::Internal { + msg: "Sender already dropped".to_string(), + backtrace: Backtrace::generate(), + } + }) + } + } + + #[inline] + pub fn try_seal(&self) -> bool { + self.sealed + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + } + + #[inline] + pub fn is_seal(&self) -> bool { + self.sealed.load(Acquire) + } + + #[inline] + pub fn unseal(&self) { + self.sealed.store(false, Ordering::Release); + } + + #[inline] + pub fn file_name(&self) -> String { + self.name.to_string() + } +} + +impl ToString for LogFile { + fn to_string(&self) -> String { + format!("LogFile{{ name: {}, write_offset: {}, flush_offset: {}, start_entry_id: {}, entry_id_counter: {} }}", + self.name, self.write_offset.load(Ordering::Relaxed), self.flush_offset.load(Ordering::Relaxed), self.start_entry_id, self.next_entry_id.load(Ordering::Relaxed)) + } +} + +pub type LogFileRef = Arc; + +#[allow(dead_code)] +#[derive(Debug)] +pub(crate) struct AppendRequest { + tx: OneshotSender, + offset: Offset, + id: Id, +} + +impl AppendRequest { + pub fn complete(self) { + // TODO(hl): use this result. + let _ = self.tx.send(AppendResponseImpl { + offset: self.offset, + entry_id: self.id, + }); + } +} + +#[cfg(test)] +mod tests { + use common_telemetry::logging; + use futures_util::StreamExt; + use tempdir::TempDir; + + use super::*; + use crate::fs::namespace::LocalNamespace; + + pub async fn create_temp_dir(file_name: impl AsRef) -> (String, TempDir) { + let dir = TempDir::new("greptimedb-store-test").unwrap(); + let path_buf = dir.path().join(file_name.as_ref()); + let path_str = path_buf.to_str().unwrap().to_string(); + File::create(path_str.as_str()).await.unwrap(); + (path_str, dir) + } + + #[tokio::test] + pub async fn test_create_entry_stream() { + logging::init_default_ut_logging(); + let config = LogConfig::default(); + let (path, _dir) = create_temp_dir("0010.log").await; + let mut file = LogFile::open(path.clone(), &config) + .await + .unwrap_or_else(|_| panic!("Failed to open file: {}", path)); + file.start().await.expect("Failed to start log file"); + + assert_eq!( + 10, + file.append(&mut EntryImpl::new("test1".as_bytes())) + .await + .expect("Failed to append entry 1") + .entry_id + ); + + assert_eq!( + 11, + file.append(&mut EntryImpl::new("test-2".as_bytes())) + .await + .expect("Failed to append entry 2") + .entry_id + ); + + let mut stream = file.create_stream(LocalNamespace::default(), 0); + + let mut data = vec![]; + + while let Some(v) = stream.next().await { + let entries = v.unwrap(); + let content = entries[0].data(); + let vec = content.to_vec(); + data.push(String::from_utf8(vec).unwrap()); + } + + assert_eq!(vec!["test1".to_string(), "test-2".to_string()], data); + drop(stream); + + let result = file.stop().await; + info!("Stop file res: {:?}", result); + } +} diff --git a/src/log-store/src/fs/file_name.rs b/src/log-store/src/fs/file_name.rs new file mode 100644 index 0000000000..854b306103 --- /dev/null +++ b/src/log-store/src/fs/file_name.rs @@ -0,0 +1,111 @@ +use std::fmt::{Display, Formatter}; +use std::path::Path; + +use snafu::OptionExt; +use store_api::logstore::entry::Id; + +use crate::error::{Error, FileNameIllegalSnafu, SuffixIllegalSnafu}; +use crate::fs::file_name::FileName::Log; + +/// FileName represents the file name with padded leading zeros. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum FileName { + // File name with .log as suffix. + Log(Id), +} + +impl TryFrom<&str> for FileName { + type Error = Error; + + fn try_from(p: &str) -> Result { + let path = Path::new(p); + + let extension = + path.extension() + .and_then(|s| s.to_str()) + .with_context(|| FileNameIllegalSnafu { + file_name: path.to_string_lossy(), + })?; + + let id: u64 = path + .file_stem() + .and_then(|s| s.to_str()) + .and_then(|s| s.parse::().ok()) + .with_context(|| FileNameIllegalSnafu { + file_name: p.to_string(), + })?; + + Self::new_with_suffix(id, extension) + } +} + +impl From for FileName { + fn from(entry_id: u64) -> Self { + Self::log(entry_id) + } +} + +impl FileName { + pub fn log(entry_id: Id) -> Self { + Log(entry_id) + } + + pub fn new_with_suffix(entry_id: Id, suffix: &str) -> Result { + match suffix { + "log" => Ok(Log(entry_id)), + _ => SuffixIllegalSnafu { suffix }.fail(), + } + } + + pub fn entry_id(&self) -> Id { + match self { + Log(id) => *id, + } + } + + fn suffix(&self) -> &str { + match self { + Log(_) => ".log", + } + } +} + +impl Display for FileName { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{:020}{}", self.entry_id(), self.suffix()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + pub fn test_padding_file_name() { + let id = u64::MIN; + assert_eq!("00000000000000000000", format!("{:020}", id)); + let id = 123u64; + assert_eq!("00000000000000000123", format!("{:020}", id)); + let id = 123123123123u64; + assert_eq!("00000000123123123123", format!("{:020}", id)); + let id = u64::MAX; + assert_eq!(u64::MAX.to_string(), format!("{:020}", id)); + } + + #[test] + pub fn test_file_name_to_string() { + assert_eq!("00000000000000000000.log", FileName::log(0).to_string()); + assert_eq!( + u64::MAX.to_string() + ".log", + FileName::log(u64::MAX).to_string() + ); + } + + #[test] + pub fn test_parse_file_name() { + let path = "/path/to/any/01010010000.log"; + let parsed = FileName::try_from(path).expect("Failed to parse file name"); + assert_eq!(1010010000u64, parsed.entry_id()); + assert_eq!(".log", parsed.suffix()); + } +} diff --git a/src/log-store/src/fs/index.rs b/src/log-store/src/fs/index.rs new file mode 100644 index 0000000000..3504f93fe2 --- /dev/null +++ b/src/log-store/src/fs/index.rs @@ -0,0 +1,68 @@ +use std::collections::BTreeMap; +use std::sync::RwLock; + +use store_api::logstore::entry::{Id, Offset}; + +use crate::error::Result; +use crate::fs::file_name::FileName; + +#[derive(Debug, Copy, Clone, PartialEq)] +pub struct Location { + pub file_name: FileName, + pub offset: Offset, +} + +#[allow(dead_code)] +impl Location { + pub fn new(file_name: FileName, offset: Offset) -> Self { + Self { file_name, offset } + } +} + +/// In-memory entry id to offset index. +pub trait EntryIndex { + /// Add entry id to offset mapping. + fn add_entry_id(&self, id: Id, loc: Location) -> Option; + + /// Find offset by entry id. + fn find_offset_by_id(&self, id: Id) -> Result>; +} + +pub struct MemoryIndex { + map: RwLock>, +} + +#[allow(dead_code)] +impl MemoryIndex { + pub fn new() -> Self { + Self { + map: RwLock::new(BTreeMap::new()), + } + } +} + +impl EntryIndex for MemoryIndex { + fn add_entry_id(&self, id: Id, loc: Location) -> Option { + self.map.write().unwrap().insert(id, loc) + } + + fn find_offset_by_id(&self, id: Id) -> Result> { + Ok(self.map.read().unwrap().get(&id).cloned()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + pub fn test_entry() { + let index = MemoryIndex::new(); + index.add_entry_id(1, Location::new(FileName::log(0), 1)); + assert_eq!( + Location::new(FileName::log(0), 1), + index.find_offset_by_id(1).unwrap().unwrap() + ); + assert_eq!(None, index.find_offset_by_id(2).unwrap()); + } +} diff --git a/src/log-store/src/fs/log.rs b/src/log-store/src/fs/log.rs new file mode 100644 index 0000000000..38ab8212aa --- /dev/null +++ b/src/log-store/src/fs/log.rs @@ -0,0 +1,309 @@ +use std::collections::BTreeMap; +use std::path::Path; +use std::sync::Arc; + +use arc_swap::ArcSwap; +use common_telemetry::{error, info, warn}; +use snafu::{OptionExt, ResultExt}; +use store_api::logstore::entry::Id; +use store_api::logstore::LogStore; +use tokio::sync::RwLock; + +use crate::error::{ + DuplicateFileSnafu, Error, FileNameIllegalSnafu, InternalSnafu, IoSnafu, Result, +}; +use crate::fs::config::LogConfig; +use crate::fs::entry::EntryImpl; +use crate::fs::file::{LogFile, LogFileRef}; +use crate::fs::file_name::FileName; +use crate::fs::namespace::LocalNamespace; +use crate::fs::AppendResponseImpl; + +type FileMap = BTreeMap; + +pub struct LocalFileLogStore { + files: RwLock, + active: ArcSwap, + config: LogConfig, +} + +impl LocalFileLogStore { + /// Opens a directory as log store directory, initialize directory if it is empty. + #[allow(unused)] + pub async fn open(config: &LogConfig) -> Result { + let mut files = Self::load_dir(&config.log_file_dir, config).await?; + + if files.is_empty() { + Self::init_on_empty(&mut files, config).await?; + info!("Initialized log store directory: {}", config.log_file_dir) + } + + let id = *files.keys().max().context(InternalSnafu { + msg: format!( + "log store directory is empty after initialization: {}", + config.log_file_dir + ), + })?; + + info!( + "Successfully loaded log store directory, files: {:?}", + files + ); + + let active_file = files + .get_mut(&id) + .expect("Not expected to fail when initing log store"); + + active_file.unseal(); + let active_file_name = active_file.to_string(); + info!("Log store active log file: {}", active_file_name); + + // Start active log file + Arc::get_mut(active_file) + .with_context(|| InternalSnafu { + msg: format!( + "Concurrent modification on log store {} start is not allowed", + active_file_name + ), + })? + .start() + .await?; + info!( + "Successfully started current active file: {}", + active_file_name + ); + + let active_file_cloned = active_file.clone(); + Ok(Self { + files: RwLock::new(files), + active: ArcSwap::new(active_file_cloned), + config: config.clone(), + }) + } + + pub async fn init_on_empty(files: &mut FileMap, config: &LogConfig) -> Result<()> { + let path = Path::new(&config.log_file_dir).join(FileName::log(0).to_string()); + let file_path = path.to_str().context(FileNameIllegalSnafu { + file_name: config.log_file_dir.clone(), + })?; + let file = LogFile::open(file_path, config).await?; + files.insert(0, Arc::new(file)); + Ok(()) + } + + pub async fn load_dir(path: impl AsRef, config: &LogConfig) -> Result { + let mut map = FileMap::new(); + let mut dir = tokio::fs::read_dir(Path::new(path.as_ref())) + .await + .context(IoSnafu)?; + + while let Some(f) = dir.next_entry().await.context(IoSnafu)? { + let path_buf = f.path(); + let path = path_buf.to_str().context(FileNameIllegalSnafu { + file_name: path.as_ref().to_string(), + })?; + let file_name = FileName::try_from(path)?; + let start_id = file_name.entry_id(); + let file = LogFile::open(path, config).await?; + info!("Load log store file {}: {:?}", start_id, file); + if map.contains_key(&start_id) { + error!("Log file with start entry id: {} already exists", start_id); + return DuplicateFileSnafu { + msg: format!("File with start id: {} duplicates on start", start_id), + } + .fail(); + } + file.try_seal(); + map.insert(start_id, Arc::new(file)); + } + Ok(map) + } + + /// Mark current active file as closed and create a new log file for writing. + async fn roll_next(&self, active: LogFileRef) -> Result<()> { + // acquires lock + let mut files = self.files.write().await; + + // if active is already sealed, then just return. + if active.is_seal() { + return Ok(()); + } + + // create and start a new log file + let entry_id = active.next_entry_id(); + let path_buf = + Path::new(&self.config.log_file_dir).join(FileName::log(entry_id).to_string()); + let path = path_buf.to_str().context(FileNameIllegalSnafu { + file_name: self.config.log_file_dir.clone(), + })?; + + let mut new_file = LogFile::open(path, &self.config).await?; + new_file.start().await?; + + let new_file = Arc::new(new_file); + files.insert(new_file.start_entry_id(), new_file.clone()); + + self.active.swap(new_file); + active.try_seal(); + tokio::spawn(async move { + active.stop().await.unwrap(); + info!("Sealed log file {} stopped.", active.file_name()); + }); + Ok(()) // release lock + } + + pub fn active_file(&self) -> Arc { + self.active.load().clone() + } +} + +#[async_trait::async_trait] +impl LogStore for LocalFileLogStore { + type Error = Error; + type Namespace = LocalNamespace; + type Entry = EntryImpl; + type AppendResponse = AppendResponseImpl; + + async fn append( + &self, + _ns: Self::Namespace, + mut e: Self::Entry, + ) -> Result { + // TODO(hl): configurable retry times + for _ in 0..3 { + let current_active_file = self.active_file(); + match current_active_file.append(&mut e).await { + Ok(r) => return Ok(r), + Err(e) => match e { + Error::Eof => { + self.roll_next(current_active_file.clone()).await?; + info!("Rolled to next file, retry append"); + continue; + } + Error::Internal { .. } => { + warn!("File closed, try new file"); + continue; + } + _ => { + error!("Failed to roll to next log file, error:{}", e); + return Err(e); + } + }, + } + } + + return InternalSnafu { + msg: "Failed to append entry with max retry time exceeds".to_string(), + } + .fail(); + } + + async fn append_batch(&self, _ns: Self::Namespace, _e: Vec) -> Result { + todo!() + } + + async fn read( + &self, + _ns: Self::Namespace, + _id: Id, + ) -> Result> + { + todo!() + } + + async fn create_namespace(&mut self, _ns: Self::Namespace) -> Result<()> { + todo!() + } + + async fn delete_namespace(&mut self, _ns: Self::Namespace) -> Result<()> { + todo!() + } + + async fn list_namespaces(&self) -> Result> { + todo!() + } +} + +#[cfg(test)] +mod tests { + use futures_util::StreamExt; + use rand::{distributions::Alphanumeric, Rng}; + use store_api::logstore::entry::Entry; + use tempdir::TempDir; + + use super::*; + + #[tokio::test] + pub async fn test_roll_file() { + common_telemetry::logging::init_default_ut_logging(); + let dir = TempDir::new("greptimedb").unwrap(); + let config = LogConfig { + append_buffer_size: 128, + max_log_file_size: 128, + log_file_dir: dir.path().to_str().unwrap().to_string(), + }; + + let logstore = LocalFileLogStore::open(&config).await.unwrap(); + assert_eq!( + 0, + logstore + .append( + LocalNamespace::default(), + EntryImpl::new(generate_data(100)), + ) + .await + .unwrap() + .entry_id + ); + + assert_eq!( + 1, + logstore + .append( + LocalNamespace::default(), + EntryImpl::new(generate_data(100)), + ) + .await + .unwrap() + .entry_id + ); + } + + fn generate_data(size: usize) -> Vec { + let s: String = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(size) + .map(char::from) + .collect(); + s.into_bytes() + } + + #[tokio::test] + pub async fn test_write_and_read_data() { + common_telemetry::logging::init_default_ut_logging(); + let dir = TempDir::new("greptimedb").unwrap(); + let config = LogConfig { + append_buffer_size: 128, + max_log_file_size: 128, + log_file_dir: dir.path().to_str().unwrap().to_string(), + }; + let logstore = LocalFileLogStore::open(&config).await.unwrap(); + let id = logstore + .append( + LocalNamespace::default(), + EntryImpl::new(generate_data(100)), + ) + .await + .unwrap() + .entry_id; + assert_eq!(0, id); + + let active_file = logstore.active_file(); + let stream = active_file.create_stream(LocalNamespace::default(), 0); + tokio::pin!(stream); + + let entries = stream.next().await.unwrap().unwrap(); + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].id(), 0); + } +} diff --git a/src/log-store/src/fs/namespace.rs b/src/log-store/src/fs/namespace.rs new file mode 100644 index 0000000000..d5bbd7ef28 --- /dev/null +++ b/src/log-store/src/fs/namespace.rs @@ -0,0 +1,40 @@ +use std::sync::Arc; + +use store_api::logstore::namespace::Namespace; + +#[derive(Clone)] +pub struct LocalNamespace { + inner: Arc, +} + +impl Default for LocalNamespace { + fn default() -> Self { + LocalNamespace::new("", 0) + } +} + +struct LocalNamespaceInner { + name: String, + id: u64, +} + +impl Namespace for LocalNamespace { + fn name(&self) -> &str { + self.inner.name.as_str() + } +} + +#[allow(dead_code)] +impl LocalNamespace { + fn id(&self) -> u64 { + self.inner.id + } + + pub fn new(name: &str, id: u64) -> Self { + let inner = Arc::new(LocalNamespaceInner { + name: name.to_string(), + id, + }); + Self { inner } + } +} diff --git a/src/log-store/src/lib.rs b/src/log-store/src/lib.rs index 8b13789179..572d575ff2 100644 --- a/src/log-store/src/lib.rs +++ b/src/log-store/src/lib.rs @@ -1 +1,2 @@ - +mod error; +pub mod fs; diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index 2058ff1b62..6fcdaa0f71 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -10,6 +10,7 @@ async-trait = "0.1" common-error = { path = "../common/error" } datatypes = { path = "../datatypes" } futures = "0.3" +snafu = { version = "0.7", features = ["backtraces"] } [dev-dependencies] async-stream = "0.3" diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 333c4cbe47..b0992e684f 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -1,9 +1,8 @@ //! LogStore APIs. use common_error::prelude::ErrorExt; -use entry::Offset; -use crate::logstore::entry::Entry; +use crate::logstore::entry::{Entry, Id, Offset}; use crate::logstore::entry_stream::SendableEntryStream; use crate::logstore::namespace::Namespace; @@ -17,23 +16,28 @@ pub trait LogStore { type Error: ErrorExt + Send + Sync; type Namespace: Namespace; type Entry: Entry; + type AppendResponse: AppendResponse; /// Append an `Entry` to WAL with given namespace - async fn append(&mut self, ns: Self::Namespace, e: Self::Entry) -> Result; + async fn append( + &self, + ns: Self::Namespace, + mut e: Self::Entry, + ) -> Result; // Append a batch of entries atomically and return the offset of first entry. async fn append_batch( - &mut self, + &self, ns: Self::Namespace, e: Vec, - ) -> Result; + ) -> Result; // Create a new `EntryStream` to asynchronously generates `Entry`. async fn read( &self, ns: Self::Namespace, - offset: Offset, - ) -> Result, Self::Error>; + id: Id, + ) -> Result, Self::Error>; // Create a new `Namespace`. async fn create_namespace(&mut self, ns: Self::Namespace) -> Result<(), Self::Error>; @@ -44,3 +48,9 @@ pub trait LogStore { // List all existing namespaces. async fn list_namespaces(&self) -> Result, Self::Error>; } + +pub trait AppendResponse: Send + Sync { + fn entry_id(&self) -> Id; + + fn offset(&self) -> Offset; +} diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index 88d97ab28b..1566c68927 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -1,14 +1,36 @@ -pub type Offset = u64; +use common_error::ext::ErrorExt; + +pub type Offset = usize; pub type Epoch = u64; +pub type Id = u64; /// Entry is the minimal data storage unit in `LogStore`. -pub trait Entry { +pub trait Entry: Send + Sync { + type Error: ErrorExt + Send + Sync; + /// Return contained data of entry. fn data(&self) -> &[u8]; - /// Return offset of entry. + /// Return entry id that monotonically increments. + fn id(&self) -> Id; + + /// Return file offset of entry. fn offset(&self) -> Offset; + fn set_offset(&mut self, offset: Offset); + + fn set_id(&mut self, id: Id); + /// Returns epoch of entry. fn epoch(&self) -> Epoch; + + fn len(&self) -> usize; + + fn is_empty(&self) -> bool; + + fn serialize(&self) -> Vec; + + fn deserialize(b: impl AsRef<[u8]>) -> Result + where + Self: Sized; } diff --git a/src/store-api/src/logstore/entry_stream.rs b/src/store-api/src/logstore/entry_stream.rs index 9a607963dd..9e1cd1f3e3 100644 --- a/src/store-api/src/logstore/entry_stream.rs +++ b/src/store-api/src/logstore/entry_stream.rs @@ -1,25 +1,28 @@ use std::pin::Pin; +use common_error::prelude::ErrorExt; use futures::Stream; use crate::logstore::entry::Entry; -use crate::logstore::Offset; -pub trait EntryStream: Stream> { +pub trait EntryStream: Stream, Self::Error>> { + type Error: ErrorExt; type Entry: Entry; - fn start_offset(&self) -> Offset; + + fn start_id(&self) -> u64; } -pub type SendableEntryStream<'a, E> = Pin> + Send + 'a>>; +pub type SendableEntryStream<'a, I, E> = Pin, E>> + Send + 'a>>; #[cfg(test)] mod tests { + use std::any::Any; use std::task::{Context, Poll}; use futures::StreamExt; use super::*; - use crate::logstore::entry::Epoch; + use crate::logstore::entry::{Epoch, Id, Offset}; pub struct SimpleEntry { /// Offset of current entry @@ -30,18 +33,61 @@ mod tests { data: Vec, } + use common_error::prelude::{ErrorExt, Snafu}; + use snafu::{Backtrace, ErrorCompat}; + + #[derive(Debug, Snafu)] + #[snafu(visibility(pub))] + pub struct Error {} + + impl ErrorExt for Error { + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } + } + impl Entry for SimpleEntry { + type Error = Error; + fn data(&self) -> &[u8] { - self.data.as_slice() + &self.data + } + + fn id(&self) -> Id { + 0u64 } fn offset(&self) -> Offset { self.offset } + fn set_offset(&mut self, _offset: Offset) {} + + fn set_id(&mut self, _id: Id) {} + fn epoch(&self) -> Epoch { self.epoch } + + fn len(&self) -> usize { + self.data.len() + } + + fn is_empty(&self) -> bool { + self.data.is_empty() + } + + fn serialize(&self) -> Vec { + self.data.clone() + } + + fn deserialize(_b: impl AsRef<[u8]>) -> Result { + unimplemented!() + } } impl SimpleEntry { @@ -56,20 +102,21 @@ mod tests { } pub struct EntryStreamImpl<'a> { - inner: SendableEntryStream<'a, SimpleEntry>, - start_offset: Offset, + inner: SendableEntryStream<'a, SimpleEntry, Error>, + start_id: u64, } impl<'a> EntryStream for EntryStreamImpl<'a> { + type Error = Error; type Entry = SimpleEntry; - fn start_offset(&self) -> Offset { - self.start_offset + fn start_id(&self) -> u64 { + self.start_id } } impl Stream for EntryStreamImpl<'_> { - type Item = Vec; + type Item = Result, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match Pin::new(&mut self.inner).poll_next(cx) { @@ -83,17 +130,18 @@ mod tests { #[tokio::test] pub async fn test_entry_stream() { let stream = async_stream::stream!({ - yield vec![SimpleEntry::new("test_entry".as_bytes(), 0, 128)] + yield Ok(vec![SimpleEntry::new("test_entry".as_bytes(), 0, 128)]) }); let mut stream_impl = EntryStreamImpl { inner: Box::pin(stream), - start_offset: 1234, + start_id: 1234, }; if let Some(v) = stream_impl.next().await { - assert_eq!(1, v.len()); - assert_eq!(b"test_entry", v[0].data()); + let vec = v.unwrap(); + assert_eq!(1, vec.len()); + assert_eq!(b"test_entry", vec[0].data()); } } }