diff --git a/Cargo.lock b/Cargo.lock index c899734a27..2cdd34b5dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1987,12 +1987,12 @@ dependencies = [ "bytes", "common-base", "common-error", + "common-runtime", "common-telemetry", "crc", "futures", "futures-util", "hex", - "memmap2", "rand 0.8.5", "snafu", "store-api", @@ -2075,15 +2075,6 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" -[[package]] -name = "memmap2" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5172b50c23043ff43dd53e51392f36519d9b35a8f3a410d30ece5d1aedd58ae" -dependencies = [ - "libc", -] - [[package]] name = "memoffset" version = "0.6.5" diff --git a/src/common/base/src/buffer.rs b/src/common/base/src/buffer.rs index 4b7788d3f6..c50bfcaaf5 100644 --- a/src/common/base/src/buffer.rs +++ b/src/common/base/src/buffer.rs @@ -7,6 +7,7 @@ use paste::paste; use snafu::{ensure, Backtrace, ErrorCompat, ResultExt, Snafu}; #[derive(Debug, Snafu)] +#[snafu(visibility(pub))] pub enum Error { #[snafu(display( "Destination buffer overflow, src_len: {}, dst_len: {}", @@ -19,6 +20,9 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display("Buffer underflow"))] + Underflow { backtrace: Backtrace }, + #[snafu(display("IO operation reach EOF, source: {}", source))] Eof { source: std::io::Error, @@ -42,11 +46,20 @@ macro_rules! impl_read_le { ( $($num_ty: ty), *) => { $( paste!{ + // TODO(hl): default implementation requires allocating a + // temp buffer. maybe use more efficient impls in concrete buffers. + // see https://github.com/GrepTimeTeam/greptimedb/pull/97#discussion_r930798941 fn [](&mut self) -> Result<$num_ty> { let mut buf = [0u8; std::mem::size_of::<$num_ty>()]; self.read_to_slice(&mut buf)?; Ok($num_ty::from_le_bytes(buf)) } + + fn [](&mut self) -> Result<$num_ty> { + let mut buf = [0u8; std::mem::size_of::<$num_ty>()]; + self.peek_to_slice(&mut buf)?; + Ok($num_ty::from_le_bytes(buf)) + } } )* } @@ -65,21 +78,31 @@ macro_rules! impl_write_le { } } -pub trait Buffer: AsRef<[u8]> { - fn remaining_slice(&self) -> &[u8]; - - fn remaining_size(&self) -> usize { - self.remaining_slice().len() - } +pub trait Buffer { + /// Returns remaining data size for read. + fn remaining_size(&self) -> usize; + /// Returns true if buffer has no data for read. fn is_empty(&self) -> bool { self.remaining_size() == 0 } + /// Peeks data into dst. This method should not change internal cursor, + /// invoke `advance_by` if needed. /// # Panics /// This method **may** panic if buffer does not have enough data to be copied to dst. - fn read_to_slice(&mut self, dst: &mut [u8]) -> Result<()>; + fn peek_to_slice(&self, dst: &mut [u8]) -> Result<()>; + /// Reads data into dst. This method will change internal cursor. + /// # Panics + /// This method **may** panic if buffer does not have enough data to be copied to dst. + fn read_to_slice(&mut self, dst: &mut [u8]) -> Result<()> { + self.peek_to_slice(dst)?; + self.advance_by(dst.len()); + Ok(()) + } + + /// Advances internal cursor for next read. /// # Panics /// This method **may** panic if the offset after advancing exceeds the length of underlying buffer. fn advance_by(&mut self, by: usize); @@ -91,18 +114,18 @@ macro_rules! impl_buffer_for_bytes { ( $($buf_ty:ty), *) => { $( impl Buffer for $buf_ty { - #[inline] - fn remaining_slice(&self) -> &[u8] { - &self + fn remaining_size(&self) -> usize{ + self.len() } - fn read_to_slice(&mut self, dst: &mut [u8]) -> Result<()> { + fn peek_to_slice(&self, dst: &mut [u8]) -> Result<()> { + let dst_len = dst.len(); ensure!(self.remaining() >= dst.len(), OverflowSnafu { src_len: self.remaining_size(), - dst_len: dst.len(), + dst_len, } ); - self.copy_to_slice(dst); + dst.copy_from_slice(&self[0..dst_len]); Ok(()) } @@ -118,8 +141,21 @@ macro_rules! impl_buffer_for_bytes { impl_buffer_for_bytes![bytes::Bytes, bytes::BytesMut]; impl Buffer for &[u8] { - fn remaining_slice(&self) -> &[u8] { - self + fn remaining_size(&self) -> usize { + self.len() + } + + fn peek_to_slice(&self, dst: &mut [u8]) -> Result<()> { + let dst_len = dst.len(); + ensure!( + self.len() >= dst.len(), + OverflowSnafu { + src_len: self.remaining_size(), + dst_len, + } + ); + dst.copy_from_slice(&self[0..dst_len]); + Ok(()) } fn read_to_slice(&mut self, dst: &mut [u8]) -> Result<()> { diff --git a/src/common/base/tests/buffer_tests.rs b/src/common/base/tests/buffer_tests.rs index 350b369941..958f6312cb 100644 --- a/src/common/base/tests/buffer_tests.rs +++ b/src/common/base/tests/buffer_tests.rs @@ -4,7 +4,7 @@ mod tests { use std::assert_matches::assert_matches; - use bytes::{Buf, BytesMut}; + use bytes::{Buf, Bytes, BytesMut}; use common_base::buffer::Error::Overflow; use common_base::buffer::{Buffer, BufferMut}; use paste::paste; @@ -13,17 +13,34 @@ mod tests { pub fn test_buffer_read_write() { let mut buf = BytesMut::with_capacity(16); buf.write_u64_le(1234u64).unwrap(); - let result = buf.read_u64_le().unwrap(); + let result = buf.peek_u64_le().unwrap(); assert_eq!(1234u64, result); + buf.advance_by(8); buf.write_from_slice("hello, world".as_bytes()).unwrap(); let mut content = vec![0u8; 5]; - buf.read_to_slice(&mut content).unwrap(); + buf.peek_to_slice(&mut content).unwrap(); let read = String::from_utf8_lossy(&content); assert_eq!("hello", read); - + buf.advance_by(5); // after read, buffer should still have 7 bytes to read. assert_eq!(7, buf.remaining()); + + let mut content = vec![0u8; 6]; + buf.read_to_slice(&mut content).unwrap(); + let read = String::from_utf8_lossy(&content); + assert_eq!(", worl", read); + // after read, buffer should still have 1 byte to read. + assert_eq!(1, buf.remaining()); + } + + #[test] + pub fn test_buffer_read() { + let mut bytes = Bytes::from_static("hello".as_bytes()); + assert_eq!(5, bytes.remaining_size()); + assert_eq!(b'h', bytes.peek_u8_le().unwrap()); + bytes.advance_by(1); + assert_eq!(4, bytes.remaining_size()); } macro_rules! test_primitive_read_write { @@ -44,6 +61,22 @@ mod tests { #[test] pub fn test_read_write_from_slice_buffer() { + let mut buf = "hello".as_bytes(); + assert_eq!(104, buf.peek_u8_le().unwrap()); + buf.advance_by(1); + assert_eq!(101, buf.peek_u8_le().unwrap()); + buf.advance_by(1); + assert_eq!(108, buf.peek_u8_le().unwrap()); + buf.advance_by(1); + assert_eq!(108, buf.peek_u8_le().unwrap()); + buf.advance_by(1); + assert_eq!(111, buf.peek_u8_le().unwrap()); + buf.advance_by(1); + assert_matches!(buf.peek_u8_le(), Err(Overflow { .. })); + } + + #[test] + pub fn test_read_u8_from_slice_buffer() { let mut buf = "hello".as_bytes(); assert_eq!(104, buf.read_u8_le().unwrap()); assert_eq!(101, buf.read_u8_le().unwrap()); @@ -53,6 +86,18 @@ mod tests { assert_matches!(buf.read_u8_le(), Err(Overflow { .. })); } + #[test] + pub fn test_read_write_numbers() { + let mut buf: Vec = vec![]; + buf.write_u64_le(1234).unwrap(); + assert_eq!(1234, (&buf[..]).read_u64_le().unwrap()); + + buf.write_u32_le(4242).unwrap(); + let mut p = &buf[..]; + assert_eq!(1234, p.read_u64_le().unwrap()); + assert_eq!(4242, p.read_u32_le().unwrap()); + } + macro_rules! test_primitive_vec_read_write { ( $($num_ty: ty), *) => { $( @@ -71,15 +116,20 @@ mod tests { test_primitive_vec_read_write![u8, u16, u32, u64, i8, i16, i32, i64, f32, f64]; #[test] - pub fn test_read_write_from_vec_buffer() { + pub fn test_peek_write_from_vec_buffer() { let mut buf: Vec = vec![]; assert!(buf.write_from_slice("hello".as_bytes()).is_ok()); let mut slice = buf.as_slice(); - assert_eq!(104, slice.read_u8_le().unwrap()); - assert_eq!(101, slice.read_u8_le().unwrap()); - assert_eq!(108, slice.read_u8_le().unwrap()); - assert_eq!(108, slice.read_u8_le().unwrap()); - assert_eq!(111, slice.read_u8_le().unwrap()); + assert_eq!(104, slice.peek_u8_le().unwrap()); + slice.advance_by(1); + assert_eq!(101, slice.peek_u8_le().unwrap()); + slice.advance_by(1); + assert_eq!(108, slice.peek_u8_le().unwrap()); + slice.advance_by(1); + assert_eq!(108, slice.peek_u8_le().unwrap()); + slice.advance_by(1); + assert_eq!(111, slice.peek_u8_le().unwrap()); + slice.advance_by(1); assert_matches!(slice.read_u8_le(), Err(Overflow { .. })); } diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 5579e82ec7..9b0f36a99e 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -13,12 +13,12 @@ byteorder = "1.4" bytes = "1.1" common-base = { path = "../common/base" } common-error = { path = "../common/error" } +common-runtime = {path = "../common/runtime"} common-telemetry = { path = "../common/telemetry" } crc = "3.0" futures = "0.3" futures-util = "0.3" hex = "0.4" -memmap2 = "0.5" snafu = { version = "0.7", features = ["backtraces"] } store-api = { path = "../store-api" } tempdir = "0.3" diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 8098885db8..6fed0f1163 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -13,12 +13,18 @@ pub enum Error { #[snafu(display("Failed to decode entry, remain size: {}", size))] Decode { size: usize, backtrace: Backtrace }, + #[snafu(display("No enough data to decode, try again"))] + DecodeAgain, + #[snafu(display("Failed to append entry, source: {}", source))] Append { #[snafu(backtrace)] source: BoxedError, }, + #[snafu(display("Failed to wait for log file write complete, source: {}", source))] + Write { source: tokio::task::JoinError }, + #[snafu(display("Entry corrupted, msg: {}", msg))] Corrupted { msg: String, backtrace: Backtrace }, @@ -66,6 +72,9 @@ pub enum Error { #[snafu(display("Log file suffix is illegal: {}", suffix))] SuffixIllegal { suffix: String }, + + #[snafu(display("Failed while waiting for write to finish, source: {}", source))] + WaitWrite { source: tokio::task::JoinError }, } impl ErrorExt for Error { diff --git a/src/log-store/src/fs.rs b/src/log-store/src/fs.rs index 3ce5cdc01f..922e6813e3 100644 --- a/src/log-store/src/fs.rs +++ b/src/log-store/src/fs.rs @@ -1,12 +1,14 @@ use store_api::logstore::entry::{Id, Offset}; use store_api::logstore::AppendResponse; +mod chunk; pub mod config; mod crc; mod entry; mod file; mod file_name; mod index; +mod io; pub mod log; mod namespace; pub mod noop; diff --git a/src/log-store/src/fs/chunk.rs b/src/log-store/src/fs/chunk.rs new file mode 100644 index 0000000000..38dba7a27a --- /dev/null +++ b/src/log-store/src/fs/chunk.rs @@ -0,0 +1,220 @@ +use std::collections::LinkedList; + +use common_base::buffer::Buffer; +use common_base::buffer::UnderflowSnafu; +use snafu::ensure; + +pub const DEFAULT_CHUNK_SIZE: usize = 4096; + +#[derive(Debug)] +pub(crate) struct Chunk { + // internal data + pub data: Box<[u8]>, + // read offset + pub read_offset: usize, + // write offset + pub write_offset: usize, +} + +impl Default for Chunk { + fn default() -> Self { + let data = vec![0u8; DEFAULT_CHUNK_SIZE].into_boxed_slice(); + Self { + write_offset: 0, + read_offset: 0, + data, + } + } +} + +impl Chunk { + #[cfg(test)] + pub fn copy_from_slice(s: &[u8]) -> Self { + let src_len = s.len(); + // before [box syntax](https://github.com/rust-lang/rust/issues/49733) becomes stable, + // we can only initialize an array on heap like this. + let mut data = vec![0u8; src_len].into_boxed_slice(); + data[0..src_len].copy_from_slice(s); + Self { + read_offset: 0, + write_offset: src_len, + data, + } + } + + pub fn new(data: Box<[u8]>, write: usize) -> Self { + Self { + write_offset: write, + read_offset: 0, + data, + } + } + + pub fn len(&self) -> usize { + self.write_offset - self.read_offset + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// allows short read. + /// Calling read **will not** advance read cursor, must call `advance` manually. + pub fn read(&self, dst: &mut [u8]) -> usize { + let size = self.len().min(dst.len()); + let range = self.read_offset..(self.read_offset + size); + (&mut dst[0..size]).copy_from_slice(&self.data[range]); + size + } + + pub fn advance(&mut self, by: usize) -> usize { + assert!( + self.write_offset >= self.read_offset, + "Illegal chunk state, read: {}, write: {}", + self.read_offset, + self.write_offset + ); + let step = by.min(self.write_offset - self.read_offset); + self.read_offset += step; + step + } +} + +pub struct ChunkList { + chunks: LinkedList, +} + +impl ChunkList { + pub fn new() -> Self { + Self { + chunks: LinkedList::new(), + } + } + + pub(crate) fn push(&mut self, chunk: Chunk) { + self.chunks.push_back(chunk); + } +} + +impl Buffer for ChunkList { + fn remaining_size(&self) -> usize { + self.chunks.iter().map(|c| c.len()).sum() + } + + fn peek_to_slice(&self, mut dst: &mut [u8]) -> common_base::buffer::Result<()> { + ensure!(self.remaining_size() >= dst.len(), UnderflowSnafu); + + for c in &self.chunks { + if dst.is_empty() { + break; + } + let read = c.read(dst); + dst = &mut dst[read..]; + } + + ensure!(dst.is_empty(), UnderflowSnafu); + Ok(()) + } + + fn read_to_slice(&mut self, dst: &mut [u8]) -> common_base::buffer::Result<()> { + self.peek_to_slice(dst)?; + self.advance_by(dst.len()); + Ok(()) + } + + fn advance_by(&mut self, by: usize) { + let mut left = by; + while left > 0 { + if let Some(c) = self.chunks.front_mut() { + let actual = c.advance(left); + if c.is_empty() { + self.chunks.pop_front(); // remove first chunk + } + left -= actual; + } else { + panic!("Advance step [{}] exceeds max readable bytes", by); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + pub fn test_chunk() { + let chunk: Chunk = Chunk::copy_from_slice("hello".as_bytes()); + assert_eq!(5, chunk.write_offset); + assert_eq!(0, chunk.read_offset); + assert_eq!(5, chunk.len()); + + let mut dst = [0u8; 3]; + assert_eq!(3, chunk.read(&mut dst)); + assert_eq!(5, chunk.write_offset); + assert_eq!(0, chunk.read_offset); + assert_eq!(5, chunk.len()); + } + + #[test] + pub fn test_chunk_short_read() { + let chunk: Chunk = Chunk::copy_from_slice("hello".as_bytes()); + + let mut dst = vec![0u8; 8]; + let read = chunk.read(&mut dst); + assert_eq!(5, read); + assert_eq!(vec![b'h', b'e', b'l', b'l', b'o', 0x0, 0x0, 0x0], dst); + } + + #[test] + pub fn test_chunk_advance() { + let mut chunk: Chunk = Chunk::copy_from_slice("hello".as_bytes()); + let mut dst = vec![0u8; 8]; + assert_eq!(5, chunk.read(&mut dst)); + assert_eq!(0, chunk.read_offset); + assert_eq!(5, chunk.write_offset); + + assert_eq!(1, chunk.advance(1)); + assert_eq!(1, chunk.read_offset); + assert_eq!(5, chunk.write_offset); + + assert_eq!(4, chunk.advance(5)); + assert_eq!(5, chunk.read_offset); + assert_eq!(5, chunk.write_offset); + } + + #[test] + pub fn test_composite_chunk_read() { + let mut chunks = ChunkList { + chunks: LinkedList::new(), + }; + + chunks.push(Chunk::copy_from_slice("abcd".as_bytes())); + chunks.push(Chunk::copy_from_slice("12345".as_bytes())); + assert_eq!(9, chunks.remaining_size()); + + let mut dst = [0u8; 2]; + chunks.peek_to_slice(&mut dst).unwrap(); + chunks.advance_by(2); + assert_eq!([b'a', b'b'], dst); + assert_eq!(2, chunks.chunks.len()); + + let mut dst = [0u8; 3]; + chunks.peek_to_slice(&mut dst).unwrap(); + chunks.advance_by(3); + assert_eq!([b'c', b'd', b'1'], dst); + assert_eq!(4, chunks.remaining_size()); + assert_eq!(1, chunks.chunks.len()); + + let mut dst = [0u8; 4]; + chunks.peek_to_slice(&mut dst).unwrap(); + chunks.advance_by(4); + assert_eq!([b'2', b'3', b'4', b'5'], dst); + assert_eq!(0, chunks.remaining_size()); + assert_eq!(0, chunks.chunks.len()); + + chunks.push(Chunk::copy_from_slice("uvwxyz".as_bytes())); + assert_eq!(6, chunks.remaining_size()); + assert_eq!(1, chunks.chunks.len()); + } +} diff --git a/src/log-store/src/fs/entry.rs b/src/log-store/src/fs/entry.rs index 45f3d88fc5..39a67984e2 100644 --- a/src/log-store/src/fs/entry.rs +++ b/src/log-store/src/fs/entry.rs @@ -8,11 +8,13 @@ use snafu::{ensure, ResultExt}; use store_api::logstore::entry::{Encode, Entry, Epoch, Id, Offset}; use store_api::logstore::entry_stream::{EntryStream, SendableEntryStream}; -use crate::error::{CorruptedSnafu, DecodeSnafu, EncodeSnafu, Error}; +use crate::error::{CorruptedSnafu, DecodeAgainSnafu, DecodeSnafu, EncodeSnafu, Error}; use crate::fs::crc; -// length+offset+epoch+crc +// length + offset + epoch + crc const ENTRY_MIN_LEN: usize = 4 + 8 + 8 + 4; +// length + offset + epoch +const HEADER_LENGTH: usize = 4 + 8 + 8; #[derive(Debug, PartialEq, Clone)] pub struct EntryImpl { @@ -22,6 +24,13 @@ pub struct EntryImpl { pub epoch: Epoch, } +impl EntryImpl { + #[cfg(test)] + fn set_offset(&mut self, offset: Offset) { + self.offset = offset; + } +} + impl Encode for EntryImpl { type Error = Error; @@ -46,12 +55,7 @@ impl Encode for EntryImpl { } fn decode(buf: &mut T) -> Result { - ensure!( - buf.remaining_size() >= ENTRY_MIN_LEN, - DecodeSnafu { - size: buf.remaining_size(), - } - ); + ensure!(buf.remaining_size() >= HEADER_LENGTH, DecodeAgainSnafu); macro_rules! map_err { ($stmt: expr, $var: ident) => { @@ -65,23 +69,34 @@ impl Encode for EntryImpl { } let mut digest = crc::CRC_ALGO.digest(); - let id = map_err!(buf.read_u64_le(), buf)?; + let mut header = [0u8; HEADER_LENGTH]; + buf.peek_to_slice(&mut header).unwrap(); + + let mut header = &header[..]; + let id = header.read_u64_le().unwrap(); // unwrap here is safe because header bytes must be present digest.update(&id.to_le_bytes()); - let epoch = map_err!(buf.read_u64_le(), buf)?; + + let epoch = header.read_u64_le().unwrap(); digest.update(&epoch.to_le_bytes()); - let data_len = map_err!(buf.read_u32_le(), buf)?; + + let data_len = header.read_u32_le().unwrap(); digest.update(&data_len.to_le_bytes()); + ensure!( - buf.remaining_size() >= data_len as usize, - DecodeSnafu { - size: buf.remaining_size() - } + buf.remaining_size() >= ENTRY_MIN_LEN + data_len as usize, + DecodeAgainSnafu ); + + buf.advance_by(HEADER_LENGTH); + let mut data = vec![0u8; data_len as usize]; - map_err!(buf.read_to_slice(&mut data), buf)?; + map_err!(buf.peek_to_slice(&mut data), buf)?; digest.update(&data); - let crc_read = map_err!(buf.read_u32_le(), buf)?; + buf.advance_by(data_len as usize); + + let crc_read = map_err!(buf.peek_u32_le(), buf)?; let crc_calc = digest.finalize(); + ensure!( crc_read == crc_calc, CorruptedSnafu { @@ -93,6 +108,8 @@ impl Encode for EntryImpl { } ); + buf.advance_by(4); + Ok(Self { id, data, @@ -107,9 +124,9 @@ impl Encode for EntryImpl { } impl EntryImpl { - pub(crate) fn new(data: impl AsRef<[u8]>) -> EntryImpl { + pub(crate) fn new(data: impl AsRef<[u8]>, id: Id) -> EntryImpl { EntryImpl { - id: 0, + id, data: data.as_ref().to_vec(), offset: 0, epoch: 0, @@ -132,10 +149,6 @@ impl Entry for EntryImpl { self.offset } - fn set_offset(&mut self, offset: Offset) { - self.offset = offset; - } - fn set_id(&mut self, id: Id) { self.id = id; } @@ -194,16 +207,20 @@ impl<'a> EntryStream for StreamImpl<'a> { #[cfg(test)] mod tests { + use async_stream::stream; use byteorder::{ByteOrder, LittleEndian}; + use futures::pin_mut; + use futures_util::StreamExt; + use tokio::time::Duration; use super::*; + use crate::fs::chunk::{Chunk, ChunkList}; use crate::fs::crc::CRC_ALGO; #[test] pub fn test_entry_deser() { let data = "hello, world"; - let mut entry = EntryImpl::new(data.as_bytes()); - entry.set_id(8); + let mut entry = EntryImpl::new(data.as_bytes(), 8); entry.epoch = 9; let mut buf = BytesMut::with_capacity(entry.encoded_size()); entry.encode_to(&mut buf).unwrap(); @@ -215,10 +232,9 @@ mod tests { #[test] pub fn test_rewrite_entry_id() { let data = "hello, world"; - let mut entry = EntryImpl::new(data.as_bytes()); + let entry = EntryImpl::new(data.as_bytes(), 123); let mut buffer = BytesMut::with_capacity(entry.encoded_size()); entry.encode_to(&mut buffer).unwrap(); - entry.set_id(123); assert_eq!(123, entry.id()); // rewrite entry id. @@ -230,4 +246,119 @@ mod tests { let entry_impl = EntryImpl::decode(&mut buffer.freeze()).expect("Failed to deserialize"); assert_eq!(333, entry_impl.id()); } + + fn prepare_entry_bytes(data: &str, id: Id) -> Bytes { + let mut entry = EntryImpl::new(data.as_bytes(), id); + entry.set_id(123); + entry.set_offset(456); + let mut buffer = BytesMut::with_capacity(entry.encoded_size()); + entry.encode_to(&mut buffer).unwrap(); + let len = buffer.len(); + let checksum = CRC_ALGO.checksum(&buffer[0..len - 4]); + LittleEndian::write_u32(&mut buffer[len - 4..], checksum); + buffer.freeze() + } + + /// Test decode entry from a composite buffer. + #[test] + pub fn test_composite_buffer() { + let data_1 = "hello, world"; + let bytes = prepare_entry_bytes(data_1, 0); + EntryImpl::decode(&mut bytes.clone()).unwrap(); + let c1 = Chunk::copy_from_slice(&bytes); + + let data_2 = "LoremIpsumDolor"; + let bytes = prepare_entry_bytes(data_2, 1); + EntryImpl::decode(&mut bytes.clone()).unwrap(); + let c2 = Chunk::copy_from_slice(&bytes); + + let mut chunks = ChunkList::new(); + chunks.push(c1); + chunks.push(c2); + + assert_eq!( + ENTRY_MIN_LEN * 2 + data_2.len() + data_1.len(), + chunks.remaining_size() + ); + + let mut decoded = vec![]; + while chunks.remaining_size() > 0 { + let entry_impl = EntryImpl::decode(&mut chunks).unwrap(); + decoded.push(entry_impl.data); + } + + assert_eq!( + vec![data_1.as_bytes().to_vec(), data_2.as_bytes().to_vec()], + decoded + ); + } + + // split an encoded entry to two different chunk and try decode from this composite chunk + #[test] + pub fn test_decode_split_data_from_composite_chunk() { + let data = "hello, world"; + let bytes = prepare_entry_bytes(data, 42); + assert_eq!( + hex::decode("7B0000000000000000000000000000000C00000068656C6C6F2C20776F726C645B2EEC0F") + .unwrap() + .as_slice(), + &bytes[..] + ); + let original = EntryImpl::decode(&mut bytes.clone()).unwrap(); + let split_point = bytes.len() / 2; + let (left, right) = bytes.split_at(split_point); + + let mut chunks = ChunkList::new(); + chunks.push(Chunk::copy_from_slice(left)); + chunks.push(Chunk::copy_from_slice(right)); + + assert_eq!(bytes.len(), chunks.remaining_size()); + let decoded = EntryImpl::decode(&mut chunks).unwrap(); + assert_eq!(original, decoded); + } + + // Tests decode entry from encoded entry data as two chunks + #[tokio::test] + pub async fn test_decode_from_chunk_stream() { + // prepare entry + let data = "hello, world"; + let bytes = prepare_entry_bytes(data, 42); + assert_eq!( + hex::decode("7B0000000000000000000000000000000C00000068656C6C6F2C20776F726C645B2EEC0F") + .unwrap() + .as_slice(), + &bytes[..] + ); + let original = EntryImpl::decode(&mut bytes.clone()).unwrap(); + let split_point = bytes.len() / 2; + let (left, right) = bytes.split_at(split_point); + + // prepare chunk stream + let chunk_stream = stream!({ + yield Chunk::copy_from_slice(left); + tokio::time::sleep(Duration::from_millis(10)).await; + yield Chunk::copy_from_slice(right); + }); + + pin_mut!(chunk_stream); + + let mut chunks = ChunkList::new(); + let mut decoded = vec![]; + while let Some(c) = chunk_stream.next().await { + chunks.push(c); + match EntryImpl::decode(&mut chunks) { + Ok(e) => { + decoded.push(e); + } + Err(Error::DecodeAgain { .. }) => { + continue; + } + _ => { + panic!() + } + } + } + assert_eq!(1, decoded.len()); + assert_eq!(original, decoded.into_iter().next().unwrap()); + } } diff --git a/src/log-store/src/fs/file.rs b/src/log-store/src/fs/file.rs index 2dcef32a5a..a127399c3d 100644 --- a/src/log-store/src/fs/file.rs +++ b/src/log-store/src/fs/file.rs @@ -1,32 +1,35 @@ use std::fmt::{Debug, Formatter}; -use std::io::SeekFrom; +use std::fs::{File, OpenOptions}; +use std::pin::Pin; use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use async_stream::stream; use byteorder::ByteOrder; use byteorder::LittleEndian; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use common_error::ext::BoxedError; +use common_telemetry::debug; use common_telemetry::logging::{error, info}; -use common_telemetry::warn; +use futures::Stream; use futures_util::StreamExt; -use memmap2::{Mmap, MmapOptions}; -use snafu::{Backtrace, GenerateImplicitData, ResultExt}; +use snafu::ResultExt; use store_api::logstore::entry::{Encode, Entry, Id, Offset}; use store_api::logstore::entry_stream::EntryStream; use store_api::logstore::namespace::Namespace; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use tokio::sync::mpsc::error::TryRecvError; -use tokio::sync::mpsc::Receiver as MpscReceiver; +use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender as MpscSender; use tokio::sync::oneshot::Sender as OneshotSender; -use tokio::sync::{oneshot, Notify, RwLock}; +use tokio::sync::{oneshot, Notify}; use tokio::task::JoinHandle; use tokio::time; -use crate::error::{AppendSnafu, Error, IoSnafu, OpenLogSnafu, Result}; +use crate::error::Error::Eof; +use crate::error::{ + AppendSnafu, Error, InternalSnafu, IoSnafu, OpenLogSnafu, Result, WaitWriteSnafu, WriteSnafu, +}; +use crate::fs::chunk::{Chunk, ChunkList}; use crate::fs::config::LogConfig; use crate::fs::crc::CRC_ALGO; use crate::fs::entry::{EntryImpl, StreamImpl}; @@ -34,38 +37,80 @@ use crate::fs::file_name::FileName; use crate::fs::namespace::LocalNamespace; use crate::fs::AppendResponseImpl; +pub const CHUNK_SIZE: usize = 4096; const LOG_WRITER_BATCH_SIZE: usize = 16; -// TODO(hl): use pwrite polyfill in different platforms, avoid write syscall in each append request. -pub struct LogFile { - name: FileName, - file: Arc>, - path: String, - write_offset: Arc, - flush_offset: Arc, - next_entry_id: Arc, - start_entry_id: u64, - pending_request_rx: Option>, - pending_request_tx: MpscSender, - notify: Arc, - max_file_size: usize, - join_handle: Mutex>>>, - sealed: Arc, - stopped: Arc, +/// Wraps File operation to get rid of `&mut self` requirements +struct FileWriter { + inner: Arc, } -impl Debug for LogFile { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LogFile") - .field("name", &self.name) - .field("start_entry_id", &self.start_entry_id) - .field("write_offset", &self.write_offset.load(Ordering::Relaxed)) - .field("flush_offset", &self.flush_offset.load(Ordering::Relaxed)) - .field("next_entry_id", &self.next_entry_id.load(Ordering::Relaxed)) - .field("max_file_size", &self.max_file_size) - .field("sealed", &self.sealed.load(Ordering::Relaxed)) - .finish() +impl FileWriter { + pub fn new(file: Arc) -> Self { + Self { inner: file } } + + pub async fn write(&self, data: Bytes, offset: u64) -> Result<()> { + let file = self.inner.clone(); + let handle = common_runtime::spawn_blocking_write(move || { + crate::fs::io::pwrite_all(&file, &data, offset) + }); + handle.await.context(WriteSnafu)? + } + + /// Writes a batch of `AppendRequest` to file. + pub async fn write_batch(self: &Arc, batch: &Vec) -> Result { + let mut futures = Vec::with_capacity(batch.len()); + + let mut max_offset = 0; + for req in batch { + let offset = req.offset; + let end = req.data.len() + offset; + max_offset = max_offset.max(end); + let future = self.write(req.data.clone(), offset as u64); + futures.push(future); + } + debug!( + "Write batch, size: {}, max offset: {}", + batch.len(), + max_offset + ); + futures::future::join_all(futures) + .await + .into_iter() + .collect::>>() + .map(|_| max_offset) + } + + pub async fn flush(&self) -> Result<()> { + let file = self.inner.clone(); + common_runtime::spawn_blocking_write(move || file.sync_all().context(IoSnafu)) + .await + .context(WaitWriteSnafu)? + } +} + +pub type LogFileRef = Arc; + +pub struct LogFile { + // name of log file + name: FileName, + // file writer + writer: Arc, + // append request channel + pending_request_tx: Option>, + // flush task notifier + notify: Arc, + // flush task join handle + join_handle: Mutex>>>, + // internal state(offset, id counter...) + state: Arc, + // the start entry id of current log file + start_entry_id: u64, + // max file size of current log file + max_file_size: usize, + // buffer size for append request channel. read from config on start. + append_buffer_size: usize, } impl LogFile { @@ -77,174 +122,88 @@ impl LogFile { .read(true) .create(true) .open(path.clone()) - .await .context(OpenLogSnafu { file_name: &path })?; let file_name: FileName = path.as_str().try_into()?; let start_entry_id = file_name.entry_id(); - let (tx, rx) = tokio::sync::mpsc::channel(config.append_buffer_size); let mut log = Self { name: file_name, - file: Arc::new(RwLock::new(file)), - path: path.to_string(), - write_offset: Arc::new(AtomicUsize::new(0)), - flush_offset: Arc::new(AtomicUsize::new(0)), - next_entry_id: Arc::new(AtomicU64::new(start_entry_id)), + writer: Arc::new(FileWriter::new(Arc::new(file))), start_entry_id, - pending_request_tx: tx, - pending_request_rx: Some(rx), + pending_request_tx: None, notify: Arc::new(Notify::new()), max_file_size: config.max_log_file_size, join_handle: Mutex::new(None), - sealed: Arc::new(AtomicBool::new(false)), - stopped: Arc::new(AtomicBool::new(false)), + state: Arc::new(State::default()), + append_buffer_size: config.append_buffer_size, }; - let metadata = log.file.read().await.metadata().await.context(IoSnafu)?; + let metadata = log.writer.inner.metadata().context(IoSnafu)?; let expect_length = metadata.len() as usize; - log.write_offset.store(expect_length, Ordering::Relaxed); - log.flush_offset.store(expect_length, Ordering::Relaxed); + log.state + .write_offset + .store(expect_length, Ordering::Relaxed); + log.state + .flush_offset + .store(expect_length, Ordering::Relaxed); let replay_start_time = time::Instant::now(); let (actual_offset, next_entry_id) = log.replay().await?; info!( - "Log file {} replay finished, last offset: {}, file start entry id: {}, next entry id: {}, elapsed time: {}ms", - path, actual_offset, start_entry_id, next_entry_id, + "Log file {} replay finished, last offset: {}, file start entry id: {}, elapsed time: {}ms", + path, actual_offset, start_entry_id, time::Instant::now().duration_since(replay_start_time).as_millis() ); - log.write_offset.store(actual_offset, Ordering::Relaxed); - log.flush_offset.store(actual_offset, Ordering::Relaxed); - log.next_entry_id.store(next_entry_id, Ordering::Relaxed); - log.seek(actual_offset).await?; + log.state + .write_offset + .store(actual_offset, Ordering::Relaxed); + log.state + .flush_offset + .store(actual_offset, Ordering::Relaxed); + log.state + .last_entry_id + .store(next_entry_id, Ordering::Relaxed); Ok(log) } - /// Advances file cursor to given offset. - async fn seek(&mut self, offset: usize) -> Result { - self.file - .write() - .await - .seek(SeekFrom::Start(offset as u64)) - .await - .context(IoSnafu) - } - - /// Creates a file mmap region. - async fn map(&self, start: u64, length: usize) -> Result { - unsafe { - let file = self.file.read().await.try_clone().await.context(IoSnafu)?; - MmapOptions::new() - .offset(start) - .len(length) - .map(&file) - .context(IoSnafu) - } - } - /// Returns the persisted size of current log file. - #[allow(unused)] #[inline] pub fn persisted_size(&self) -> usize { - self.flush_offset.load(Ordering::Relaxed) - } - - #[inline] - pub fn next_entry_id(&self) -> Id { - self.next_entry_id.load(Ordering::Relaxed) - } - - /// Increases write offset field by `delta` and return the previous value. - #[inline] - fn inc_offset(&self, delta: usize) -> usize { - // Relaxed order is enough since no sync-with relationship - // between `offset` and any other field. - self.write_offset.fetch_add(delta, Ordering::Relaxed) - } - - /// Increases next entry field by `delta` and return the previous value. - fn inc_entry_id(&self) -> u64 { - // Relaxed order is enough since no sync-with relationship - // between `offset` and any other field. - self.next_entry_id.fetch_add(1, Ordering::Relaxed) + self.state.flush_offset() } /// Starts log file and it's internal components(including flush task, etc.). pub async fn start(&mut self) -> Result<()> { let notify = self.notify.clone(); - let file = self.file.write().await.try_clone().await.context(IoSnafu)?; + let writer = self.writer.clone(); + let state = self.state.clone(); - let write_offset = self.write_offset.clone(); - let flush_offset = self.flush_offset.clone(); + let (tx, mut rx) = tokio::sync::mpsc::channel(self.append_buffer_size); - let stopped = self.stopped.clone(); - - if let Some(mut rx) = self.pending_request_rx.take() { - let handle = tokio::spawn(async move { - let mut batch: Vec = Vec::with_capacity(LOG_WRITER_BATCH_SIZE); - - let mut error_occurred = false; - while !stopped.load(Ordering::Acquire) { - for _ in 0..LOG_WRITER_BATCH_SIZE { - match rx.try_recv() { - Ok(req) => { - batch.push(req); - } - Err(e) => match e { - TryRecvError::Empty => { - if batch.is_empty() { - notify.notified().await; - if stopped.load(Ordering::Acquire) { - break; - } - } else { - break; - } - } - TryRecvError::Disconnected => { - info!("Channel disconnected..."); - error_occurred = true; - break; - } - }, - } - } - - // flush all pending data to disk - let write_offset_read = write_offset.load(Ordering::Relaxed); - // TODO(hl): add flush metrics - if let Err(flush_err) = file.sync_all().await { - error!("Failed to flush log file: {}", flush_err); - error_occurred = true; - } - if error_occurred { - info!("Flush task stop"); - break; - } - let prev = flush_offset.load(Ordering::Acquire); - flush_offset.store(write_offset_read, Ordering::Relaxed); - info!("Flush offset: {} -> {}", prev, write_offset_read); - while let Some(req) = batch.pop() { - req.complete(); - } + let handle = tokio::spawn(async move { + while !state.is_stopped() { + let batch = Self::recv_batch(&mut rx, &state, ¬ify, true).await; + debug!("Receive write request, size: {}", batch.len()); + if !batch.is_empty() { + Self::handle_batch(batch, &state, &writer).await; } + } - // drain all pending request on stopping. - let write_offset_read = write_offset.load(Ordering::Relaxed); - if file.sync_all().await.is_ok() { - flush_offset.store(write_offset_read, Ordering::Release); - while let Ok(req) = rx.try_recv() { - req.complete() - } - } - Ok(()) - }); + // log file stopped + let batch = Self::recv_batch(&mut rx, &state, ¬ify, false).await; + if !batch.is_empty() { + Self::handle_batch(batch, &state, &writer).await; + } + info!("Writer task finished"); + Ok(()) + }); - *self.join_handle.lock().unwrap() = Some(handle); - info!("Flush task started: {}", self.name); - } + self.pending_request_tx = Some(tx); + *self.join_handle.lock().unwrap() = Some(handle); + info!("Flush task started: {}", self.name); Ok(()) } @@ -252,7 +211,7 @@ impl LogFile { /// # Panics /// Panics when a log file is stopped while not being started ever. pub async fn stop(&self) -> Result<()> { - self.stopped.store(true, Ordering::Release); + self.state.stopped.store(true, Ordering::Release); let join_handle = self .join_handle .lock() @@ -265,6 +224,89 @@ impl LogFile { res } + async fn handle_batch( + mut batch: Vec, + state: &Arc, + writer: &Arc, + ) { + // preserve previous write offset + let prev_write_offset = state.write_offset(); + + let mut last_id = 0; + for mut req in &mut batch { + req.offset = state + .write_offset + .fetch_add(req.data.len(), Ordering::AcqRel); + last_id = req.id; + debug!("Entry id: {}, offset: {}", req.id, req.offset,); + } + + match writer.write_batch(&batch).await { + Ok(max_offset) => match writer.flush().await { + Ok(_) => { + let prev_ofs = state.flush_offset.swap(max_offset, Ordering::Acquire); + let prev_id = state.last_entry_id.swap(last_id, Ordering::Acquire); + debug!( + "Flush offset: {} -> {}, max offset in batch: {}, entry id: {}->{}", + prev_ofs, + state.flush_offset.load(Ordering::Acquire), + max_offset, + prev_id, + state.last_entry_id.load(Ordering::Acquire), + ); + batch.into_iter().for_each(AppendRequest::complete); + } + Err(e) => { + error!("Failed to flush log file: {}", e); + batch.into_iter().for_each(|r| r.fail()); + state + .write_offset + .store(prev_write_offset, Ordering::Release); + } + }, + Err(e) => { + error!("Failed to write append requests, error: {}", e); + batch.into_iter().for_each(|r| r.fail()); + state + .write_offset + .store(prev_write_offset, Ordering::Release); + } + } + } + + async fn recv_batch( + rx: &mut Receiver, + state: &Arc, + notify: &Arc, + wait_on_empty: bool, + ) -> Vec { + let mut batch: Vec = Vec::with_capacity(LOG_WRITER_BATCH_SIZE); + for _ in 0..LOG_WRITER_BATCH_SIZE { + match rx.try_recv() { + Ok(req) => { + batch.push(req); + } + Err(e) => match e { + TryRecvError::Empty => { + if batch.is_empty() && wait_on_empty { + notify.notified().await; + if state.is_stopped() { + break; + } + } else { + break; + } + } + TryRecvError::Disconnected => { + error!("Channel unexpectedly disconnected!"); + break; + } + }, + } + } + batch + } + #[inline] pub fn start_entry_id(&self) -> Id { self.start_entry_id @@ -273,7 +315,7 @@ impl LogFile { /// Replays current file til last entry read pub async fn replay(&mut self) -> Result<(usize, Id)> { let log_name = self.name.to_string(); - let previous_offset = self.flush_offset.load(Ordering::Relaxed); + let previous_offset = self.state.flush_offset(); let ns = LocalNamespace::default(); let mut stream = self.create_stream( // TODO(hl): LocalNamespace should be filled @@ -297,16 +339,10 @@ impl LogFile { } } info!( - "Replay log {} finished, offset: {} -> {}", - log_name, previous_offset, last_offset + "Replay log {} finished, offset: {} -> {}, last entry id: {:?}", + log_name, previous_offset, last_offset, last_entry_id ); - Ok(( - last_offset, - match last_entry_id { - None => self.start_entry_id, - Some(v) => v + 1, - }, - )) + Ok((last_offset, last_entry_id.unwrap_or(self.start_entry_id))) } /// Creates a reader stream that asynchronously generates entries start from given entry id. @@ -318,27 +354,38 @@ impl LogFile { _ns: &impl Namespace, start_entry_id: u64, ) -> impl EntryStream + '_ { - let length = self.flush_offset.load(Ordering::Relaxed); + let length = self.state.flush_offset.load(Ordering::Relaxed); - let s = stream!({ - let mmap = self.map(0, length).await?; - let mut buf = &mmap[..]; - if buf.is_empty() { - info!("File is just created!"); - // file is newly created - return; - } - - while !buf.is_empty() { - let entry = EntryImpl::decode(&mut buf)?; - if entry.id() >= start_entry_id { - yield Ok(vec![entry]); + let mut chunk_stream = file_chunk_stream(self.writer.inner.clone(), 0, length, 0); + let entry_stream = stream!({ + let mut chunks = ChunkList::new(); + while let Some(chunk) = chunk_stream.next().await { + chunks.push(chunk.unwrap()); + let mut batch = vec![]; + loop { + match EntryImpl::decode(&mut chunks) { + Ok(e) => { + if e.id() >= start_entry_id { + batch.push(e); + } + } + Err(Error::DecodeAgain { .. }) => { + // no more data for decoding + break; + } + Err(e) => { + yield Err(e); + break; + } + } } + debug!("Yield batch size: {}", batch.len()); + yield Ok(batch); } }); StreamImpl { - inner: Box::pin(s), + inner: Box::pin(entry_stream), start_entry_id, } } @@ -348,201 +395,426 @@ impl LogFile { where T: Encode, { - if self.stopped.load(Ordering::Acquire) { + if self.state.is_stopped() { return Err(Error::Eof); } - e.set_id(0); + let entry_id = e.id(); let mut serialized = BytesMut::with_capacity(e.encoded_size()); e.encode_to(&mut serialized) .map_err(BoxedError::new) .context(AppendSnafu)?; let size = serialized.len(); - if size + self.write_offset.load(Ordering::Relaxed) > self.max_file_size { + if size + self.state.write_offset() > self.max_file_size { return Err(Error::Eof); } - let entry_offset; - let entry_id; - - { - let mut file = self.file.write().await; - // generate entry id - entry_id = self.inc_entry_id(); - // rewrite encoded data - LittleEndian::write_u64(&mut serialized[0..8], entry_id); - // TODO(hl): CRC was calculated twice - let checksum = CRC_ALGO.checksum(&serialized[0..size - 4]); - LittleEndian::write_u32(&mut serialized[size - 4..], checksum); - - // write to file - // TODO(hl): use io buffer and pwrite to reduce syscalls. - file.write(&serialized.freeze()).await.context(IoSnafu)?; - // generate in-file offset - entry_offset = self.inc_offset(size); - } + // rewrite encoded data + LittleEndian::write_u64(&mut serialized[0..8], entry_id); + let checksum = CRC_ALGO.checksum(&serialized[0..size - 4]); + LittleEndian::write_u32(&mut serialized[size - 4..], checksum); let (tx, rx) = oneshot::channel(); - - if self - .pending_request_tx + self.pending_request_tx + .as_ref() + .expect("Call start before write to LogFile!") .send(AppendRequest { + data: serialized.freeze(), tx, - offset: entry_offset, + offset: 0, id: entry_id, }) .await - .is_err() - { - self.file.write().await.sync_all().await.context(IoSnafu)?; - Ok(AppendResponseImpl { - offset: entry_offset, - entry_id, - }) - } else { - self.notify.notify_one(); // notify flush thread. - rx.await.map_err(|e| { - warn!( - "Error while waiting for append result:{}, file {}", - e, - self.name.to_string() - ); - Error::Internal { - msg: "Sender already dropped".to_string(), - backtrace: Backtrace::generate(), + .map_err(|_| { + InternalSnafu { + msg: "Send append request", } + .build() + })?; + + self.notify.notify_one(); // notify write thread. + + rx.await + .expect("Sender dropped while waiting for append result") + .map_err(|_| { + InternalSnafu { + msg: "Failed to write request".to_string(), + } + .build() }) - } } #[inline] pub fn try_seal(&self) -> bool { - self.sealed + self.state + .sealed .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) .is_ok() } #[inline] pub fn is_seal(&self) -> bool { - self.sealed.load(Ordering::Acquire) + self.state.sealed.load(Ordering::Acquire) } #[inline] pub fn unseal(&self) { - self.sealed.store(false, Ordering::Release); + self.state.sealed.store(false, Ordering::Release); } #[inline] pub fn file_name(&self) -> String { self.name.to_string() } -} -impl ToString for LogFile { - fn to_string(&self) -> String { - format!("LogFile{{ name: {}, path: {}, write_offset: {}, flush_offset: {}, start_entry_id: {}, entry_id_counter: {} }}", - self.name, - self.path, - self.write_offset.load(Ordering::Relaxed), self.flush_offset.load(Ordering::Relaxed), self.start_entry_id, self.next_entry_id.load(Ordering::Relaxed)) + #[inline] + pub fn last_entry_id(&self) -> Id { + self.state.last_entry_id.load(Ordering::Acquire) } } -pub type LogFileRef = Arc; +impl Debug for LogFile { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LogFile") + .field("name", &self.name) + .field("start_entry_id", &self.start_entry_id) + .field("max_file_size", &self.max_file_size) + .field("state", &self.state) + .finish() + } +} -#[allow(dead_code)] #[derive(Debug)] pub(crate) struct AppendRequest { - tx: OneshotSender, + tx: OneshotSender>, offset: Offset, id: Id, + data: Bytes, } impl AppendRequest { + #[inline] pub fn complete(self) { - // TODO(hl): use this result. - let _ = self.tx.send(AppendResponseImpl { + let _ = self.tx.send(Ok(AppendResponseImpl { offset: self.offset, entry_id: self.id, - }); + })); + } + + #[inline] + pub fn fail(self) { + let _ = self.tx.send(Err(())); } } -// TODO(hl): uncomment this test once log file read visibility issue fixed. -// #[cfg(test)] -// mod tests { -// use std::io::Read; -// -// use common_telemetry::logging; -// use futures_util::StreamExt; -// use tempdir::TempDir; -// -// use super::*; -// use crate::fs::namespace::LocalNamespace; -// -// #[tokio::test] -// pub async fn test_create_entry_stream() { -// logging::init_default_ut_logging(); -// let config = LogConfig::default(); -// -// let dir = TempDir::new("greptimedb-store-test").unwrap(); -// let path_buf = dir.path().join("0010.log"); -// let path = path_buf.to_str().unwrap().to_string(); -// File::create(path.as_str()).await.unwrap(); -// -// let mut file = LogFile::open(path.clone(), &config) -// .await -// .unwrap_or_else(|_| panic!("Failed to open file: {}", path)); -// file.start().await.expect("Failed to start log file"); -// -// assert_eq!( -// 10, -// file.append(&mut EntryImpl::new("test1".as_bytes())) -// .await -// .expect("Failed to append entry 1") -// .entry_id -// ); -// -// assert_eq!( -// 11, -// file.append(&mut EntryImpl::new("test-2".as_bytes())) -// .await -// .expect("Failed to append entry 2") -// .entry_id -// ); -// -// let mut log_file = std::fs::File::open(path.clone()).expect("Test log file does not exist"); -// let metadata = log_file.metadata().expect("Failed to read file metadata"); -// info!("Log file metadata: {:?}", metadata); -// -// assert_eq!(59, metadata.len()); // 24+5+24+6 -// let mut content = vec![0; metadata.len() as usize]; -// log_file -// .read_exact(&mut content) -// .expect("Read log file failed"); -// -// info!( -// "Log file {:?} content: {}, size:{}", -// dir, -// hex::encode(content), -// metadata.len() -// ); -// -// let mut stream = file.create_stream(LocalNamespace::default(), 0); -// -// let mut data = vec![]; -// -// while let Some(v) = stream.next().await { -// let entries = v.unwrap(); -// let content = entries[0].data(); -// let vec = content.to_vec(); -// info!("Read entry: {}", String::from_utf8_lossy(&vec)); -// data.push(String::from_utf8(vec).unwrap()); -// } -// -// assert_eq!(vec!["test1".to_string(), "test-2".to_string()], data); -// drop(stream); -// -// let result = file.stop().await; -// info!("Stop file res: {:?}", result); -// } -// } +#[derive(Default, Debug)] +struct State { + write_offset: AtomicUsize, + flush_offset: AtomicUsize, + last_entry_id: AtomicU64, + sealed: AtomicBool, + stopped: AtomicBool, +} + +impl State { + #[inline] + pub fn is_stopped(&self) -> bool { + self.stopped.load(Ordering::Acquire) + } + + #[inline] + pub fn write_offset(&self) -> usize { + self.write_offset.load(Ordering::Acquire) + } + + #[inline] + pub fn flush_offset(&self) -> usize { + self.flush_offset.load(Ordering::Acquire) + } +} + +type SendableChunkStream = Pin> + Send>>; + +/// Creates a stream of chunks of data from file. If `buffer_size` is not 0, the returned stream +/// will have a bounded buffer and a background thread will do prefetching. When consumer cannot +/// catch up with spawned prefetch loop, the prefetch thread will be blocked and wait until buffer +/// has enough capacity. +/// +/// If the `buffer_size` is 0, there will not be a prefetching thread. File chunks will not be read +/// until stream consumer asks for next chunk. +fn file_chunk_stream( + file: Arc, + mut offset: usize, + file_size: usize, + buffer_size: usize, +) -> SendableChunkStream { + if buffer_size == 0 { + return file_chunk_stream_sync(file, offset, file_size); + } + + let (tx, mut rx) = tokio::sync::mpsc::channel(buffer_size); + common_runtime::spawn_blocking_read(move || loop { + if offset >= file_size { + return; + } + match read_at(&file, offset, file_size) { + Ok(data) => { + let data_len = data.len(); + if tx.blocking_send(Ok(data)).is_err() { + break; + } + offset += data_len; + continue; + } + Err(e) => { + error!("Failed to read file chunk, error: {}", &e); + // we're going to break any way so just forget the join result. + let _ = tx.blocking_send(Err(e)); + break; + } + } + }); + Box::pin(stream!({ + while let Some(v) = rx.recv().await { + yield v; + } + })) +} + +fn file_chunk_stream_sync( + file: Arc, + mut offset: usize, + file_size: usize, +) -> SendableChunkStream { + let s = stream!({ + loop { + if offset >= file_size { + return; + } + match read_at(&file, offset, file_size) { + Ok(data) => { + let data_len = data.len(); + yield Ok(data); + offset += data_len; + continue; + } + Err(e) => { + error!("Failed to read file chunk, error: {}", &e); + yield Err(e); + break; + } + } + } + }); + + Box::pin(s) +} + +/// Reads a chunk of data from file in a blocking manner. +/// The file may not contain enough data to fulfill the whole chunk so only data available +/// is read into chunk. The `write` field of `Chunk` indicates the end of valid data. +fn read_at(file: &Arc, offset: usize, file_length: usize) -> Result { + if offset > file_length { + return Err(Eof); + } + let size = CHUNK_SIZE.min((file_length - offset) as usize); + let mut data = Box::new([0u8; CHUNK_SIZE]); + crate::fs::io::pread_exact(file.as_ref(), &mut data[0..size], offset as u64)?; + Ok(Chunk::new(data, size)) +} + +#[cfg(test)] +mod tests { + use std::io::Read; + + use common_telemetry::logging; + use futures::pin_mut; + use futures_util::StreamExt; + use tempdir::TempDir; + use tokio::io::AsyncWriteExt; + + use super::*; + use crate::fs::namespace::LocalNamespace; + + #[tokio::test] + pub async fn test_create_entry_stream() { + logging::init_default_ut_logging(); + let config = LogConfig::default(); + + let dir = TempDir::new("greptimedb-store-test").unwrap(); + let path_buf = dir.path().join("0010.log"); + let path = path_buf.to_str().unwrap().to_string(); + File::create(path.as_str()).unwrap(); + + let mut file = LogFile::open(path.clone(), &config) + .await + .unwrap_or_else(|_| panic!("Failed to open file: {}", path)); + file.start().await.expect("Failed to start log file"); + + assert_eq!( + 10, + file.append(&mut EntryImpl::new("test1".as_bytes(), 10)) + .await + .expect("Failed to append entry 1") + .entry_id + ); + + assert_eq!( + 11, + file.append(&mut EntryImpl::new("test-2".as_bytes(), 11)) + .await + .expect("Failed to append entry 2") + .entry_id + ); + + let mut log_file = std::fs::File::open(path.clone()).expect("Test log file does not exist"); + let metadata = log_file.metadata().expect("Failed to read file metadata"); + info!("Log file metadata: {:?}", metadata); + + assert_eq!(59, metadata.len()); // 24+5+24+6 + let mut content = vec![0; metadata.len() as usize]; + log_file + .read_exact(&mut content) + .expect("Read log file failed"); + + info!( + "Log file {:?} content: {}, size:{}", + dir, + hex::encode(content), + metadata.len() + ); + + let ns = LocalNamespace::default(); + let mut stream = file.create_stream(&ns, 0); + let mut data = vec![]; + + while let Some(v) = stream.next().await { + let entries = v.unwrap(); + for e in entries { + let vec = e.data().to_vec(); + info!("Read entry: {}", String::from_utf8_lossy(&vec)); + data.push(String::from_utf8(vec).unwrap()); + } + } + + assert_eq!(vec!["test1".to_string(), "test-2".to_string()], data); + drop(stream); + + let result = file.stop().await; + info!("Stop file res: {:?}", result); + } + + #[tokio::test] + pub async fn test_read_at() { + let dir = tempdir::TempDir::new("greptimedb-store-test").unwrap(); + let file_path = dir.path().join("chunk-stream-file-test"); + let mut file = tokio::fs::OpenOptions::new() + .create(true) + .write(true) + .read(true) + .open(&file_path) + .await + .unwrap(); + file.write_all("1234567890ab".as_bytes()).await.unwrap(); + file.flush().await.unwrap(); + + let file = Arc::new(file.into_std().await); + let result = read_at(&file, 0, 12).unwrap(); + + assert_eq!(12, result.len()); + assert_eq!("1234567890ab".as_bytes(), &result.data[0..result.len()]); + } + + #[tokio::test] + pub async fn test_read_at_center() { + let dir = tempdir::TempDir::new("greptimedb-store-test").unwrap(); + let file_path = dir.path().join("chunk-stream-file-test-center"); + let mut file = tokio::fs::OpenOptions::new() + .create(true) + .write(true) + .read(true) + .open(&file_path) + .await + .unwrap(); + file.write_all("1234567890ab".as_bytes()).await.unwrap(); + file.flush().await.unwrap(); + + let file_len = file.metadata().await.unwrap().len(); + let file = Arc::new(file.into_std().await); + let result = read_at(&file, 8, file_len as usize).unwrap(); + assert_eq!(4, result.len()); + assert_eq!("90ab".as_bytes(), &result.data[0..result.len()]); + } + + #[tokio::test] + pub async fn test_file_chunk_stream() { + let dir = tempdir::TempDir::new("greptimedb-store-test").unwrap(); + let file_path = dir.path().join("chunk-stream-file-test"); + let mut file = tokio::fs::OpenOptions::new() + .create(true) + .write(true) + .read(true) + .open(&file_path) + .await + .unwrap(); + file.write_all(&vec![42].repeat(4096 + 1024)).await.unwrap(); + file.flush().await.unwrap(); + + let file_size = file.metadata().await.unwrap().len(); + let file = Arc::new(file.into_std().await); + let stream = file_chunk_stream(file, 0, file_size as usize, 1024); + pin_mut!(stream); + + let mut chunks = vec![]; + while let Some(r) = stream.next().await { + chunks.push(r.unwrap()); + } + assert_eq!( + vec![4096, 1024], + chunks.iter().map(|c| c.write_offset).collect::>() + ); + assert_eq!( + vec![vec![42].repeat(4096), vec![42].repeat(1024)], + chunks + .iter() + .map(|c| &c.data[0..c.write_offset]) + .collect::>() + ); + } + + #[tokio::test] + pub async fn test_sync_chunk_stream() { + let dir = tempdir::TempDir::new("greptimedb-store-test").unwrap(); + let file_path = dir.path().join("chunk-stream-file-test"); + let mut file = tokio::fs::OpenOptions::new() + .create(true) + .write(true) + .read(true) + .open(&file_path) + .await + .unwrap(); + file.write_all(&vec![42].repeat(4096 + 1024)).await.unwrap(); + file.flush().await.unwrap(); + + let file_size = file.metadata().await.unwrap().len(); + let file = Arc::new(file.into_std().await); + let stream = file_chunk_stream_sync(file, 0, file_size as usize); + pin_mut!(stream); + + let mut chunks = vec![]; + while let Some(r) = stream.next().await { + chunks.push(r.unwrap()); + } + assert_eq!( + vec![4096, 1024], + chunks.iter().map(|c| c.write_offset).collect::>() + ); + assert_eq!( + vec![vec![42].repeat(4096), vec![42].repeat(1024)], + chunks + .iter() + .map(|c| &c.data[0..c.write_offset]) + .collect::>() + ); + } +} diff --git a/src/log-store/src/fs/io.rs b/src/log-store/src/fs/io.rs new file mode 100644 index 0000000000..c63d4d5d07 --- /dev/null +++ b/src/log-store/src/fs/io.rs @@ -0,0 +1,10 @@ +#[cfg(all(unix, not(miri)))] +mod unix; +// todo(hl): maybe support windows seek_write/seek_read +#[cfg(any(not(unix), miri))] +mod fallback; + +#[cfg(any(all(not(unix), not(windows)), miri))] +pub use fallback::{pread_exact, pwrite_all}; +#[cfg(all(unix, not(miri)))] +pub use unix::{pread_exact, pwrite_all}; diff --git a/src/log-store/src/fs/io/fallback.rs b/src/log-store/src/fs/io/fallback.rs new file mode 100644 index 0000000000..7a6ed5ce83 --- /dev/null +++ b/src/log-store/src/fs/io/fallback.rs @@ -0,0 +1,15 @@ +use std::convert::TryFrom; +use std::fs::File; + +use snafu::ResultExt; + +use crate::error::Error; + +// TODO(hl): Implement pread/pwrite for non-Unix platforms +pub fn pread_exact(file: &File, _buf: &mut [u8], _offset: u64) -> Result<(), Error> { + unimplemented!() +} + +pub fn pwrite_all(file: &File, _buf: &[u8], _offset: u64) -> Result<(), Error> { + unimplemented!() +} diff --git a/src/log-store/src/fs/io/unix.rs b/src/log-store/src/fs/io/unix.rs new file mode 100644 index 0000000000..43e38da65e --- /dev/null +++ b/src/log-store/src/fs/io/unix.rs @@ -0,0 +1,15 @@ +use std::fs::File; +use std::os::unix::fs::FileExt; + +use snafu::ResultExt; + +use crate::error::Error; +use crate::error::IoSnafu; + +pub fn pread_exact(file: &File, buf: &mut [u8], offset: u64) -> Result<(), Error> { + file.read_exact_at(buf, offset as u64).context(IoSnafu) +} + +pub fn pwrite_all(file: &File, buf: &[u8], offset: u64) -> Result<(), Error> { + file.write_all_at(buf, offset as u64).context(IoSnafu) +} diff --git a/src/log-store/src/fs/log.rs b/src/log-store/src/fs/log.rs index 1301ec561a..2ad05eb252 100644 --- a/src/log-store/src/fs/log.rs +++ b/src/log-store/src/fs/log.rs @@ -67,7 +67,7 @@ impl LocalFileLogStore { .expect("Not expected to fail when initing log store"); active_file.unseal(); - let active_file_name = active_file.to_string(); + let active_file_name = active_file.file_name(); info!("Log store active log file: {}", active_file_name); // Start active log file @@ -144,9 +144,9 @@ impl LocalFileLogStore { } // create and start a new log file - let entry_id = active.next_entry_id(); + let next_entry_id = active.last_entry_id() + 1; let path_buf = - Path::new(&self.config.log_file_dir).join(FileName::log(entry_id).to_string()); + Path::new(&self.config.log_file_dir).join(FileName::log(next_entry_id).to_string()); let path = path_buf.to_str().context(FileNameIllegalSnafu { file_name: self.config.log_file_dir.clone(), })?; @@ -210,7 +210,7 @@ impl LogStore for LocalFileLogStore { } return InternalSnafu { - msg: "Failed to append entry with max retry time exceeds".to_string(), + msg: "Failed to append entry with max retry time exceeds", } .fail(); } @@ -225,11 +225,11 @@ impl LogStore for LocalFileLogStore { id: Id, ) -> Result> { let files = self.files.read().await; - let ns = ns.clone(); let s = stream!({ for (start_id, file) in files.iter() { - if *start_id >= id { + // TODO(hl): Use index to lookup file + if *start_id <= id { let s = file.create_stream(&ns, *start_id); pin_mut!(s); while let Some(entries) = s.next().await { @@ -254,8 +254,8 @@ impl LogStore for LocalFileLogStore { todo!() } - fn entry>(&self, data: D) -> Self::Entry { - EntryImpl::new(data) + fn entry>(&self, data: D, id: Id) -> Self::Entry { + EntryImpl::new(data, id) } fn namespace(&self, name: &str) -> Self::Namespace { @@ -287,7 +287,7 @@ mod tests { assert_eq!( 0, logstore - .append(&ns, EntryImpl::new(generate_data(100)),) + .append(&ns, EntryImpl::new(generate_data(100), 0),) .await .unwrap() .entry_id @@ -296,7 +296,7 @@ mod tests { assert_eq!( 1, logstore - .append(&ns, EntryImpl::new(generate_data(100)),) + .append(&ns, EntryImpl::new(generate_data(100), 1)) .await .unwrap() .entry_id @@ -328,7 +328,7 @@ mod tests { let logstore = LocalFileLogStore::open(&config).await.unwrap(); let ns = LocalNamespace::default(); let id = logstore - .append(&ns, EntryImpl::new(generate_data(100))) + .append(&ns, EntryImpl::new(generate_data(100), 0)) .await .unwrap() .entry_id; diff --git a/src/log-store/src/fs/noop.rs b/src/log-store/src/fs/noop.rs index 25975538a6..6c723c997b 100644 --- a/src/log-store/src/fs/noop.rs +++ b/src/log-store/src/fs/noop.rs @@ -51,8 +51,8 @@ impl LogStore for NoopLogStore { todo!() } - fn entry>(&self, data: D) -> Self::Entry { - EntryImpl::new(data) + fn entry>(&self, data: D, id: Id) -> Self::Entry { + EntryImpl::new(data, id) } fn namespace(&self, name: &str) -> Self::Namespace { diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 2ded3e72d3..74bb9632a7 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -210,10 +210,6 @@ impl WriterInner { // Data after flushed sequence need to be recovered. flushed_sequence = version_control.current().flushed_sequence(); last_sequence = flushed_sequence; - // FIXME(yingwen): Now log store will overwrite the entry id by its internal entry id, - // which starts from 0. This is a hack to just make the test passes since we knows the - // entry id of log store is always equals to `sequence - 1`. Change this to - // `flushed_sequence + ` once the log store fixes this issue. let mut stream = writer_ctx.wal.read_from_wal(flushed_sequence).await?; while let Some((req_sequence, _header, request)) = stream.try_next().await? { if let Some(request) = request { @@ -222,9 +218,6 @@ impl WriterInner { // Note that memtables of `Version` may be updated during replay. let version = version_control.current(); - // FIXME(yingwen): Use req_sequence instead of `req_sequence + 1` once logstore - // won't overwrite the entry id. - let req_sequence = req_sequence + 1; if req_sequence > last_sequence { last_sequence = req_sequence; } else { diff --git a/src/storage/src/wal.rs b/src/storage/src/wal.rs index 6341878389..61a5fc6d67 100644 --- a/src/storage/src/wal.rs +++ b/src/storage/src/wal.rs @@ -134,8 +134,7 @@ impl Wal { } async fn write(&self, seq: SequenceNumber, bytes: &[u8]) -> Result<(u64, usize)> { - let mut e = self.store.entry(bytes); - e.set_id(seq); + let e = self.store.entry(bytes, seq); let res = self .store @@ -275,13 +274,14 @@ mod tests { #[tokio::test] pub async fn test_read_wal_only_header() -> Result<()> { + common_telemetry::init_default_ut_logging(); let (log_store, _tmp) = test_util::log_store_util::create_tmp_local_file_log_store("wal_test").await; let wal = Wal::new("test_region", Arc::new(log_store)); let header = WalHeader::with_last_manifest_version(111); let (seq_num, _) = wal.write_to_wal(3, header, Payload::None).await?; - assert_eq!(0, seq_num); + assert_eq!(3, seq_num); let mut stream = wal.read_from_wal(seq_num).await?; let mut data = vec![]; diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index f9d85ea292..a1974485b5 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -50,7 +50,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { async fn list_namespaces(&self) -> Result, Self::Error>; /// Create an entry of the associate Entry type - fn entry>(&self, data: D) -> Self::Entry; + fn entry>(&self, data: D, id: Id) -> Self::Entry; /// Create a namespace of the associate Namespace type // TODO(sunng87): confusion with `create_namespace` diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index 25de0d83e7..b1d9fe1aeb 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -17,8 +17,6 @@ pub trait Entry: Encode + Send + Sync { /// Return file offset of entry. fn offset(&self) -> Offset; - fn set_offset(&mut self, offset: Offset); - fn set_id(&mut self, id: Id); /// Returns epoch of entry. diff --git a/src/store-api/src/logstore/entry_stream.rs b/src/store-api/src/logstore/entry_stream.rs index 531588a9d2..6d67f2873f 100644 --- a/src/store-api/src/logstore/entry_stream.rs +++ b/src/store-api/src/logstore/entry_stream.rs @@ -83,8 +83,6 @@ mod tests { self.offset } - fn set_offset(&mut self, _offset: Offset) {} - fn set_id(&mut self, _id: Id) {} fn epoch(&self) -> Epoch {