fix: log store write and read (#97)

* add pwrite

* write

* fix write

* error handling in write thread

* wrap some LogFile field to state field

* remove some unwraps

* reStructure some code

* implement file chunk

* composite chunk decode

* add test for chunk stream

* fix buffer test

* remove some useless code

* add test for read_at and file_chunk_stream

* use bounded channel to implement back pressure

* reimplement entry read and decoding

* add some doc

* clean some code

* use Sender::blocking_send to replace manually spawn

* support synchronous file chunk stream

* remove useless clone

* remove set_offset from Entry trait

* cr: fix some comments

* fix: add peek methods for Buffer

* add test for read at the middle of file

* fix some minor issues on comments

* rebase on to develop

* add peek_to_slice and read_to_slice

* initialize file chunk on heap

* fix some comments in CR

* respect entry id set outside LogStore

* fix unit test

* Update src/log-store/src/fs/file.rs

Co-authored-by: evenyag <realevenyag@gmail.com>

* fix some cr comments

Co-authored-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Lei, Huang
2022-08-10 11:16:04 +08:00
committed by GitHub
parent 8d51ad3429
commit d141fbc674
19 changed files with 1164 additions and 424 deletions

11
Cargo.lock generated
View File

@@ -1987,12 +1987,12 @@ dependencies = [
"bytes",
"common-base",
"common-error",
"common-runtime",
"common-telemetry",
"crc",
"futures",
"futures-util",
"hex",
"memmap2",
"rand 0.8.5",
"snafu",
"store-api",
@@ -2075,15 +2075,6 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "memmap2"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5172b50c23043ff43dd53e51392f36519d9b35a8f3a410d30ece5d1aedd58ae"
dependencies = [
"libc",
]
[[package]]
name = "memoffset"
version = "0.6.5"

View File

@@ -7,6 +7,7 @@ use paste::paste;
use snafu::{ensure, Backtrace, ErrorCompat, ResultExt, Snafu};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display(
"Destination buffer overflow, src_len: {}, dst_len: {}",
@@ -19,6 +20,9 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Buffer underflow"))]
Underflow { backtrace: Backtrace },
#[snafu(display("IO operation reach EOF, source: {}", source))]
Eof {
source: std::io::Error,
@@ -42,11 +46,20 @@ macro_rules! impl_read_le {
( $($num_ty: ty), *) => {
$(
paste!{
// TODO(hl): default implementation requires allocating a
// temp buffer. maybe use more efficient impls in concrete buffers.
// see https://github.com/GrepTimeTeam/greptimedb/pull/97#discussion_r930798941
fn [<read_ $num_ty _le>](&mut self) -> Result<$num_ty> {
let mut buf = [0u8; std::mem::size_of::<$num_ty>()];
self.read_to_slice(&mut buf)?;
Ok($num_ty::from_le_bytes(buf))
}
fn [<peek_ $num_ty _le>](&mut self) -> Result<$num_ty> {
let mut buf = [0u8; std::mem::size_of::<$num_ty>()];
self.peek_to_slice(&mut buf)?;
Ok($num_ty::from_le_bytes(buf))
}
}
)*
}
@@ -65,21 +78,31 @@ macro_rules! impl_write_le {
}
}
pub trait Buffer: AsRef<[u8]> {
fn remaining_slice(&self) -> &[u8];
fn remaining_size(&self) -> usize {
self.remaining_slice().len()
}
pub trait Buffer {
/// Returns remaining data size for read.
fn remaining_size(&self) -> usize;
/// Returns true if buffer has no data for read.
fn is_empty(&self) -> bool {
self.remaining_size() == 0
}
/// Peeks data into dst. This method should not change internal cursor,
/// invoke `advance_by` if needed.
/// # Panics
/// This method **may** panic if buffer does not have enough data to be copied to dst.
fn read_to_slice(&mut self, dst: &mut [u8]) -> Result<()>;
fn peek_to_slice(&self, dst: &mut [u8]) -> Result<()>;
/// Reads data into dst. This method will change internal cursor.
/// # Panics
/// This method **may** panic if buffer does not have enough data to be copied to dst.
fn read_to_slice(&mut self, dst: &mut [u8]) -> Result<()> {
self.peek_to_slice(dst)?;
self.advance_by(dst.len());
Ok(())
}
/// Advances internal cursor for next read.
/// # Panics
/// This method **may** panic if the offset after advancing exceeds the length of underlying buffer.
fn advance_by(&mut self, by: usize);
@@ -91,18 +114,18 @@ macro_rules! impl_buffer_for_bytes {
( $($buf_ty:ty), *) => {
$(
impl Buffer for $buf_ty {
#[inline]
fn remaining_slice(&self) -> &[u8] {
&self
fn remaining_size(&self) -> usize{
self.len()
}
fn read_to_slice(&mut self, dst: &mut [u8]) -> Result<()> {
fn peek_to_slice(&self, dst: &mut [u8]) -> Result<()> {
let dst_len = dst.len();
ensure!(self.remaining() >= dst.len(), OverflowSnafu {
src_len: self.remaining_size(),
dst_len: dst.len(),
dst_len,
}
);
self.copy_to_slice(dst);
dst.copy_from_slice(&self[0..dst_len]);
Ok(())
}
@@ -118,8 +141,21 @@ macro_rules! impl_buffer_for_bytes {
impl_buffer_for_bytes![bytes::Bytes, bytes::BytesMut];
impl Buffer for &[u8] {
fn remaining_slice(&self) -> &[u8] {
self
fn remaining_size(&self) -> usize {
self.len()
}
fn peek_to_slice(&self, dst: &mut [u8]) -> Result<()> {
let dst_len = dst.len();
ensure!(
self.len() >= dst.len(),
OverflowSnafu {
src_len: self.remaining_size(),
dst_len,
}
);
dst.copy_from_slice(&self[0..dst_len]);
Ok(())
}
fn read_to_slice(&mut self, dst: &mut [u8]) -> Result<()> {

View File

@@ -4,7 +4,7 @@
mod tests {
use std::assert_matches::assert_matches;
use bytes::{Buf, BytesMut};
use bytes::{Buf, Bytes, BytesMut};
use common_base::buffer::Error::Overflow;
use common_base::buffer::{Buffer, BufferMut};
use paste::paste;
@@ -13,17 +13,34 @@ mod tests {
pub fn test_buffer_read_write() {
let mut buf = BytesMut::with_capacity(16);
buf.write_u64_le(1234u64).unwrap();
let result = buf.read_u64_le().unwrap();
let result = buf.peek_u64_le().unwrap();
assert_eq!(1234u64, result);
buf.advance_by(8);
buf.write_from_slice("hello, world".as_bytes()).unwrap();
let mut content = vec![0u8; 5];
buf.read_to_slice(&mut content).unwrap();
buf.peek_to_slice(&mut content).unwrap();
let read = String::from_utf8_lossy(&content);
assert_eq!("hello", read);
buf.advance_by(5);
// after read, buffer should still have 7 bytes to read.
assert_eq!(7, buf.remaining());
let mut content = vec![0u8; 6];
buf.read_to_slice(&mut content).unwrap();
let read = String::from_utf8_lossy(&content);
assert_eq!(", worl", read);
// after read, buffer should still have 1 byte to read.
assert_eq!(1, buf.remaining());
}
#[test]
pub fn test_buffer_read() {
let mut bytes = Bytes::from_static("hello".as_bytes());
assert_eq!(5, bytes.remaining_size());
assert_eq!(b'h', bytes.peek_u8_le().unwrap());
bytes.advance_by(1);
assert_eq!(4, bytes.remaining_size());
}
macro_rules! test_primitive_read_write {
@@ -44,6 +61,22 @@ mod tests {
#[test]
pub fn test_read_write_from_slice_buffer() {
let mut buf = "hello".as_bytes();
assert_eq!(104, buf.peek_u8_le().unwrap());
buf.advance_by(1);
assert_eq!(101, buf.peek_u8_le().unwrap());
buf.advance_by(1);
assert_eq!(108, buf.peek_u8_le().unwrap());
buf.advance_by(1);
assert_eq!(108, buf.peek_u8_le().unwrap());
buf.advance_by(1);
assert_eq!(111, buf.peek_u8_le().unwrap());
buf.advance_by(1);
assert_matches!(buf.peek_u8_le(), Err(Overflow { .. }));
}
#[test]
pub fn test_read_u8_from_slice_buffer() {
let mut buf = "hello".as_bytes();
assert_eq!(104, buf.read_u8_le().unwrap());
assert_eq!(101, buf.read_u8_le().unwrap());
@@ -53,6 +86,18 @@ mod tests {
assert_matches!(buf.read_u8_le(), Err(Overflow { .. }));
}
#[test]
pub fn test_read_write_numbers() {
let mut buf: Vec<u8> = vec![];
buf.write_u64_le(1234).unwrap();
assert_eq!(1234, (&buf[..]).read_u64_le().unwrap());
buf.write_u32_le(4242).unwrap();
let mut p = &buf[..];
assert_eq!(1234, p.read_u64_le().unwrap());
assert_eq!(4242, p.read_u32_le().unwrap());
}
macro_rules! test_primitive_vec_read_write {
( $($num_ty: ty), *) => {
$(
@@ -71,15 +116,20 @@ mod tests {
test_primitive_vec_read_write![u8, u16, u32, u64, i8, i16, i32, i64, f32, f64];
#[test]
pub fn test_read_write_from_vec_buffer() {
pub fn test_peek_write_from_vec_buffer() {
let mut buf: Vec<u8> = vec![];
assert!(buf.write_from_slice("hello".as_bytes()).is_ok());
let mut slice = buf.as_slice();
assert_eq!(104, slice.read_u8_le().unwrap());
assert_eq!(101, slice.read_u8_le().unwrap());
assert_eq!(108, slice.read_u8_le().unwrap());
assert_eq!(108, slice.read_u8_le().unwrap());
assert_eq!(111, slice.read_u8_le().unwrap());
assert_eq!(104, slice.peek_u8_le().unwrap());
slice.advance_by(1);
assert_eq!(101, slice.peek_u8_le().unwrap());
slice.advance_by(1);
assert_eq!(108, slice.peek_u8_le().unwrap());
slice.advance_by(1);
assert_eq!(108, slice.peek_u8_le().unwrap());
slice.advance_by(1);
assert_eq!(111, slice.peek_u8_le().unwrap());
slice.advance_by(1);
assert_matches!(slice.read_u8_le(), Err(Overflow { .. }));
}

View File

@@ -13,12 +13,12 @@ byteorder = "1.4"
bytes = "1.1"
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-runtime = {path = "../common/runtime"}
common-telemetry = { path = "../common/telemetry" }
crc = "3.0"
futures = "0.3"
futures-util = "0.3"
hex = "0.4"
memmap2 = "0.5"
snafu = { version = "0.7", features = ["backtraces"] }
store-api = { path = "../store-api" }
tempdir = "0.3"

View File

@@ -13,12 +13,18 @@ pub enum Error {
#[snafu(display("Failed to decode entry, remain size: {}", size))]
Decode { size: usize, backtrace: Backtrace },
#[snafu(display("No enough data to decode, try again"))]
DecodeAgain,
#[snafu(display("Failed to append entry, source: {}", source))]
Append {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to wait for log file write complete, source: {}", source))]
Write { source: tokio::task::JoinError },
#[snafu(display("Entry corrupted, msg: {}", msg))]
Corrupted { msg: String, backtrace: Backtrace },
@@ -66,6 +72,9 @@ pub enum Error {
#[snafu(display("Log file suffix is illegal: {}", suffix))]
SuffixIllegal { suffix: String },
#[snafu(display("Failed while waiting for write to finish, source: {}", source))]
WaitWrite { source: tokio::task::JoinError },
}
impl ErrorExt for Error {

View File

@@ -1,12 +1,14 @@
use store_api::logstore::entry::{Id, Offset};
use store_api::logstore::AppendResponse;
mod chunk;
pub mod config;
mod crc;
mod entry;
mod file;
mod file_name;
mod index;
mod io;
pub mod log;
mod namespace;
pub mod noop;

View File

@@ -0,0 +1,220 @@
use std::collections::LinkedList;
use common_base::buffer::Buffer;
use common_base::buffer::UnderflowSnafu;
use snafu::ensure;
pub const DEFAULT_CHUNK_SIZE: usize = 4096;
#[derive(Debug)]
pub(crate) struct Chunk {
// internal data
pub data: Box<[u8]>,
// read offset
pub read_offset: usize,
// write offset
pub write_offset: usize,
}
impl Default for Chunk {
fn default() -> Self {
let data = vec![0u8; DEFAULT_CHUNK_SIZE].into_boxed_slice();
Self {
write_offset: 0,
read_offset: 0,
data,
}
}
}
impl Chunk {
#[cfg(test)]
pub fn copy_from_slice(s: &[u8]) -> Self {
let src_len = s.len();
// before [box syntax](https://github.com/rust-lang/rust/issues/49733) becomes stable,
// we can only initialize an array on heap like this.
let mut data = vec![0u8; src_len].into_boxed_slice();
data[0..src_len].copy_from_slice(s);
Self {
read_offset: 0,
write_offset: src_len,
data,
}
}
pub fn new(data: Box<[u8]>, write: usize) -> Self {
Self {
write_offset: write,
read_offset: 0,
data,
}
}
pub fn len(&self) -> usize {
self.write_offset - self.read_offset
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// allows short read.
/// Calling read **will not** advance read cursor, must call `advance` manually.
pub fn read(&self, dst: &mut [u8]) -> usize {
let size = self.len().min(dst.len());
let range = self.read_offset..(self.read_offset + size);
(&mut dst[0..size]).copy_from_slice(&self.data[range]);
size
}
pub fn advance(&mut self, by: usize) -> usize {
assert!(
self.write_offset >= self.read_offset,
"Illegal chunk state, read: {}, write: {}",
self.read_offset,
self.write_offset
);
let step = by.min(self.write_offset - self.read_offset);
self.read_offset += step;
step
}
}
pub struct ChunkList {
chunks: LinkedList<Chunk>,
}
impl ChunkList {
pub fn new() -> Self {
Self {
chunks: LinkedList::new(),
}
}
pub(crate) fn push(&mut self, chunk: Chunk) {
self.chunks.push_back(chunk);
}
}
impl Buffer for ChunkList {
fn remaining_size(&self) -> usize {
self.chunks.iter().map(|c| c.len()).sum()
}
fn peek_to_slice(&self, mut dst: &mut [u8]) -> common_base::buffer::Result<()> {
ensure!(self.remaining_size() >= dst.len(), UnderflowSnafu);
for c in &self.chunks {
if dst.is_empty() {
break;
}
let read = c.read(dst);
dst = &mut dst[read..];
}
ensure!(dst.is_empty(), UnderflowSnafu);
Ok(())
}
fn read_to_slice(&mut self, dst: &mut [u8]) -> common_base::buffer::Result<()> {
self.peek_to_slice(dst)?;
self.advance_by(dst.len());
Ok(())
}
fn advance_by(&mut self, by: usize) {
let mut left = by;
while left > 0 {
if let Some(c) = self.chunks.front_mut() {
let actual = c.advance(left);
if c.is_empty() {
self.chunks.pop_front(); // remove first chunk
}
left -= actual;
} else {
panic!("Advance step [{}] exceeds max readable bytes", by);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
pub fn test_chunk() {
let chunk: Chunk = Chunk::copy_from_slice("hello".as_bytes());
assert_eq!(5, chunk.write_offset);
assert_eq!(0, chunk.read_offset);
assert_eq!(5, chunk.len());
let mut dst = [0u8; 3];
assert_eq!(3, chunk.read(&mut dst));
assert_eq!(5, chunk.write_offset);
assert_eq!(0, chunk.read_offset);
assert_eq!(5, chunk.len());
}
#[test]
pub fn test_chunk_short_read() {
let chunk: Chunk = Chunk::copy_from_slice("hello".as_bytes());
let mut dst = vec![0u8; 8];
let read = chunk.read(&mut dst);
assert_eq!(5, read);
assert_eq!(vec![b'h', b'e', b'l', b'l', b'o', 0x0, 0x0, 0x0], dst);
}
#[test]
pub fn test_chunk_advance() {
let mut chunk: Chunk = Chunk::copy_from_slice("hello".as_bytes());
let mut dst = vec![0u8; 8];
assert_eq!(5, chunk.read(&mut dst));
assert_eq!(0, chunk.read_offset);
assert_eq!(5, chunk.write_offset);
assert_eq!(1, chunk.advance(1));
assert_eq!(1, chunk.read_offset);
assert_eq!(5, chunk.write_offset);
assert_eq!(4, chunk.advance(5));
assert_eq!(5, chunk.read_offset);
assert_eq!(5, chunk.write_offset);
}
#[test]
pub fn test_composite_chunk_read() {
let mut chunks = ChunkList {
chunks: LinkedList::new(),
};
chunks.push(Chunk::copy_from_slice("abcd".as_bytes()));
chunks.push(Chunk::copy_from_slice("12345".as_bytes()));
assert_eq!(9, chunks.remaining_size());
let mut dst = [0u8; 2];
chunks.peek_to_slice(&mut dst).unwrap();
chunks.advance_by(2);
assert_eq!([b'a', b'b'], dst);
assert_eq!(2, chunks.chunks.len());
let mut dst = [0u8; 3];
chunks.peek_to_slice(&mut dst).unwrap();
chunks.advance_by(3);
assert_eq!([b'c', b'd', b'1'], dst);
assert_eq!(4, chunks.remaining_size());
assert_eq!(1, chunks.chunks.len());
let mut dst = [0u8; 4];
chunks.peek_to_slice(&mut dst).unwrap();
chunks.advance_by(4);
assert_eq!([b'2', b'3', b'4', b'5'], dst);
assert_eq!(0, chunks.remaining_size());
assert_eq!(0, chunks.chunks.len());
chunks.push(Chunk::copy_from_slice("uvwxyz".as_bytes()));
assert_eq!(6, chunks.remaining_size());
assert_eq!(1, chunks.chunks.len());
}
}

View File

@@ -8,11 +8,13 @@ use snafu::{ensure, ResultExt};
use store_api::logstore::entry::{Encode, Entry, Epoch, Id, Offset};
use store_api::logstore::entry_stream::{EntryStream, SendableEntryStream};
use crate::error::{CorruptedSnafu, DecodeSnafu, EncodeSnafu, Error};
use crate::error::{CorruptedSnafu, DecodeAgainSnafu, DecodeSnafu, EncodeSnafu, Error};
use crate::fs::crc;
// length+offset+epoch+crc
// length + offset + epoch + crc
const ENTRY_MIN_LEN: usize = 4 + 8 + 8 + 4;
// length + offset + epoch
const HEADER_LENGTH: usize = 4 + 8 + 8;
#[derive(Debug, PartialEq, Clone)]
pub struct EntryImpl {
@@ -22,6 +24,13 @@ pub struct EntryImpl {
pub epoch: Epoch,
}
impl EntryImpl {
#[cfg(test)]
fn set_offset(&mut self, offset: Offset) {
self.offset = offset;
}
}
impl Encode for EntryImpl {
type Error = Error;
@@ -46,12 +55,7 @@ impl Encode for EntryImpl {
}
fn decode<T: Buffer>(buf: &mut T) -> Result<Self, Self::Error> {
ensure!(
buf.remaining_size() >= ENTRY_MIN_LEN,
DecodeSnafu {
size: buf.remaining_size(),
}
);
ensure!(buf.remaining_size() >= HEADER_LENGTH, DecodeAgainSnafu);
macro_rules! map_err {
($stmt: expr, $var: ident) => {
@@ -65,23 +69,34 @@ impl Encode for EntryImpl {
}
let mut digest = crc::CRC_ALGO.digest();
let id = map_err!(buf.read_u64_le(), buf)?;
let mut header = [0u8; HEADER_LENGTH];
buf.peek_to_slice(&mut header).unwrap();
let mut header = &header[..];
let id = header.read_u64_le().unwrap(); // unwrap here is safe because header bytes must be present
digest.update(&id.to_le_bytes());
let epoch = map_err!(buf.read_u64_le(), buf)?;
let epoch = header.read_u64_le().unwrap();
digest.update(&epoch.to_le_bytes());
let data_len = map_err!(buf.read_u32_le(), buf)?;
let data_len = header.read_u32_le().unwrap();
digest.update(&data_len.to_le_bytes());
ensure!(
buf.remaining_size() >= data_len as usize,
DecodeSnafu {
size: buf.remaining_size()
}
buf.remaining_size() >= ENTRY_MIN_LEN + data_len as usize,
DecodeAgainSnafu
);
buf.advance_by(HEADER_LENGTH);
let mut data = vec![0u8; data_len as usize];
map_err!(buf.read_to_slice(&mut data), buf)?;
map_err!(buf.peek_to_slice(&mut data), buf)?;
digest.update(&data);
let crc_read = map_err!(buf.read_u32_le(), buf)?;
buf.advance_by(data_len as usize);
let crc_read = map_err!(buf.peek_u32_le(), buf)?;
let crc_calc = digest.finalize();
ensure!(
crc_read == crc_calc,
CorruptedSnafu {
@@ -93,6 +108,8 @@ impl Encode for EntryImpl {
}
);
buf.advance_by(4);
Ok(Self {
id,
data,
@@ -107,9 +124,9 @@ impl Encode for EntryImpl {
}
impl EntryImpl {
pub(crate) fn new(data: impl AsRef<[u8]>) -> EntryImpl {
pub(crate) fn new(data: impl AsRef<[u8]>, id: Id) -> EntryImpl {
EntryImpl {
id: 0,
id,
data: data.as_ref().to_vec(),
offset: 0,
epoch: 0,
@@ -132,10 +149,6 @@ impl Entry for EntryImpl {
self.offset
}
fn set_offset(&mut self, offset: Offset) {
self.offset = offset;
}
fn set_id(&mut self, id: Id) {
self.id = id;
}
@@ -194,16 +207,20 @@ impl<'a> EntryStream for StreamImpl<'a> {
#[cfg(test)]
mod tests {
use async_stream::stream;
use byteorder::{ByteOrder, LittleEndian};
use futures::pin_mut;
use futures_util::StreamExt;
use tokio::time::Duration;
use super::*;
use crate::fs::chunk::{Chunk, ChunkList};
use crate::fs::crc::CRC_ALGO;
#[test]
pub fn test_entry_deser() {
let data = "hello, world";
let mut entry = EntryImpl::new(data.as_bytes());
entry.set_id(8);
let mut entry = EntryImpl::new(data.as_bytes(), 8);
entry.epoch = 9;
let mut buf = BytesMut::with_capacity(entry.encoded_size());
entry.encode_to(&mut buf).unwrap();
@@ -215,10 +232,9 @@ mod tests {
#[test]
pub fn test_rewrite_entry_id() {
let data = "hello, world";
let mut entry = EntryImpl::new(data.as_bytes());
let entry = EntryImpl::new(data.as_bytes(), 123);
let mut buffer = BytesMut::with_capacity(entry.encoded_size());
entry.encode_to(&mut buffer).unwrap();
entry.set_id(123);
assert_eq!(123, entry.id());
// rewrite entry id.
@@ -230,4 +246,119 @@ mod tests {
let entry_impl = EntryImpl::decode(&mut buffer.freeze()).expect("Failed to deserialize");
assert_eq!(333, entry_impl.id());
}
fn prepare_entry_bytes(data: &str, id: Id) -> Bytes {
let mut entry = EntryImpl::new(data.as_bytes(), id);
entry.set_id(123);
entry.set_offset(456);
let mut buffer = BytesMut::with_capacity(entry.encoded_size());
entry.encode_to(&mut buffer).unwrap();
let len = buffer.len();
let checksum = CRC_ALGO.checksum(&buffer[0..len - 4]);
LittleEndian::write_u32(&mut buffer[len - 4..], checksum);
buffer.freeze()
}
/// Test decode entry from a composite buffer.
#[test]
pub fn test_composite_buffer() {
let data_1 = "hello, world";
let bytes = prepare_entry_bytes(data_1, 0);
EntryImpl::decode(&mut bytes.clone()).unwrap();
let c1 = Chunk::copy_from_slice(&bytes);
let data_2 = "LoremIpsumDolor";
let bytes = prepare_entry_bytes(data_2, 1);
EntryImpl::decode(&mut bytes.clone()).unwrap();
let c2 = Chunk::copy_from_slice(&bytes);
let mut chunks = ChunkList::new();
chunks.push(c1);
chunks.push(c2);
assert_eq!(
ENTRY_MIN_LEN * 2 + data_2.len() + data_1.len(),
chunks.remaining_size()
);
let mut decoded = vec![];
while chunks.remaining_size() > 0 {
let entry_impl = EntryImpl::decode(&mut chunks).unwrap();
decoded.push(entry_impl.data);
}
assert_eq!(
vec![data_1.as_bytes().to_vec(), data_2.as_bytes().to_vec()],
decoded
);
}
// split an encoded entry to two different chunk and try decode from this composite chunk
#[test]
pub fn test_decode_split_data_from_composite_chunk() {
let data = "hello, world";
let bytes = prepare_entry_bytes(data, 42);
assert_eq!(
hex::decode("7B0000000000000000000000000000000C00000068656C6C6F2C20776F726C645B2EEC0F")
.unwrap()
.as_slice(),
&bytes[..]
);
let original = EntryImpl::decode(&mut bytes.clone()).unwrap();
let split_point = bytes.len() / 2;
let (left, right) = bytes.split_at(split_point);
let mut chunks = ChunkList::new();
chunks.push(Chunk::copy_from_slice(left));
chunks.push(Chunk::copy_from_slice(right));
assert_eq!(bytes.len(), chunks.remaining_size());
let decoded = EntryImpl::decode(&mut chunks).unwrap();
assert_eq!(original, decoded);
}
// Tests decode entry from encoded entry data as two chunks
#[tokio::test]
pub async fn test_decode_from_chunk_stream() {
// prepare entry
let data = "hello, world";
let bytes = prepare_entry_bytes(data, 42);
assert_eq!(
hex::decode("7B0000000000000000000000000000000C00000068656C6C6F2C20776F726C645B2EEC0F")
.unwrap()
.as_slice(),
&bytes[..]
);
let original = EntryImpl::decode(&mut bytes.clone()).unwrap();
let split_point = bytes.len() / 2;
let (left, right) = bytes.split_at(split_point);
// prepare chunk stream
let chunk_stream = stream!({
yield Chunk::copy_from_slice(left);
tokio::time::sleep(Duration::from_millis(10)).await;
yield Chunk::copy_from_slice(right);
});
pin_mut!(chunk_stream);
let mut chunks = ChunkList::new();
let mut decoded = vec![];
while let Some(c) = chunk_stream.next().await {
chunks.push(c);
match EntryImpl::decode(&mut chunks) {
Ok(e) => {
decoded.push(e);
}
Err(Error::DecodeAgain { .. }) => {
continue;
}
_ => {
panic!()
}
}
}
assert_eq!(1, decoded.len());
assert_eq!(original, decoded.into_iter().next().unwrap());
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,10 @@
#[cfg(all(unix, not(miri)))]
mod unix;
// todo(hl): maybe support windows seek_write/seek_read
#[cfg(any(not(unix), miri))]
mod fallback;
#[cfg(any(all(not(unix), not(windows)), miri))]
pub use fallback::{pread_exact, pwrite_all};
#[cfg(all(unix, not(miri)))]
pub use unix::{pread_exact, pwrite_all};

View File

@@ -0,0 +1,15 @@
use std::convert::TryFrom;
use std::fs::File;
use snafu::ResultExt;
use crate::error::Error;
// TODO(hl): Implement pread/pwrite for non-Unix platforms
pub fn pread_exact(file: &File, _buf: &mut [u8], _offset: u64) -> Result<(), Error> {
unimplemented!()
}
pub fn pwrite_all(file: &File, _buf: &[u8], _offset: u64) -> Result<(), Error> {
unimplemented!()
}

View File

@@ -0,0 +1,15 @@
use std::fs::File;
use std::os::unix::fs::FileExt;
use snafu::ResultExt;
use crate::error::Error;
use crate::error::IoSnafu;
pub fn pread_exact(file: &File, buf: &mut [u8], offset: u64) -> Result<(), Error> {
file.read_exact_at(buf, offset as u64).context(IoSnafu)
}
pub fn pwrite_all(file: &File, buf: &[u8], offset: u64) -> Result<(), Error> {
file.write_all_at(buf, offset as u64).context(IoSnafu)
}

View File

@@ -67,7 +67,7 @@ impl LocalFileLogStore {
.expect("Not expected to fail when initing log store");
active_file.unseal();
let active_file_name = active_file.to_string();
let active_file_name = active_file.file_name();
info!("Log store active log file: {}", active_file_name);
// Start active log file
@@ -144,9 +144,9 @@ impl LocalFileLogStore {
}
// create and start a new log file
let entry_id = active.next_entry_id();
let next_entry_id = active.last_entry_id() + 1;
let path_buf =
Path::new(&self.config.log_file_dir).join(FileName::log(entry_id).to_string());
Path::new(&self.config.log_file_dir).join(FileName::log(next_entry_id).to_string());
let path = path_buf.to_str().context(FileNameIllegalSnafu {
file_name: self.config.log_file_dir.clone(),
})?;
@@ -210,7 +210,7 @@ impl LogStore for LocalFileLogStore {
}
return InternalSnafu {
msg: "Failed to append entry with max retry time exceeds".to_string(),
msg: "Failed to append entry with max retry time exceeds",
}
.fail();
}
@@ -225,11 +225,11 @@ impl LogStore for LocalFileLogStore {
id: Id,
) -> Result<SendableEntryStream<'_, Self::Entry, Self::Error>> {
let files = self.files.read().await;
let ns = ns.clone();
let s = stream!({
for (start_id, file) in files.iter() {
if *start_id >= id {
// TODO(hl): Use index to lookup file
if *start_id <= id {
let s = file.create_stream(&ns, *start_id);
pin_mut!(s);
while let Some(entries) = s.next().await {
@@ -254,8 +254,8 @@ impl LogStore for LocalFileLogStore {
todo!()
}
fn entry<D: AsRef<[u8]>>(&self, data: D) -> Self::Entry {
EntryImpl::new(data)
fn entry<D: AsRef<[u8]>>(&self, data: D, id: Id) -> Self::Entry {
EntryImpl::new(data, id)
}
fn namespace(&self, name: &str) -> Self::Namespace {
@@ -287,7 +287,7 @@ mod tests {
assert_eq!(
0,
logstore
.append(&ns, EntryImpl::new(generate_data(100)),)
.append(&ns, EntryImpl::new(generate_data(100), 0),)
.await
.unwrap()
.entry_id
@@ -296,7 +296,7 @@ mod tests {
assert_eq!(
1,
logstore
.append(&ns, EntryImpl::new(generate_data(100)),)
.append(&ns, EntryImpl::new(generate_data(100), 1))
.await
.unwrap()
.entry_id
@@ -328,7 +328,7 @@ mod tests {
let logstore = LocalFileLogStore::open(&config).await.unwrap();
let ns = LocalNamespace::default();
let id = logstore
.append(&ns, EntryImpl::new(generate_data(100)))
.append(&ns, EntryImpl::new(generate_data(100), 0))
.await
.unwrap()
.entry_id;

View File

@@ -51,8 +51,8 @@ impl LogStore for NoopLogStore {
todo!()
}
fn entry<D: AsRef<[u8]>>(&self, data: D) -> Self::Entry {
EntryImpl::new(data)
fn entry<D: AsRef<[u8]>>(&self, data: D, id: Id) -> Self::Entry {
EntryImpl::new(data, id)
}
fn namespace(&self, name: &str) -> Self::Namespace {

View File

@@ -210,10 +210,6 @@ impl WriterInner {
// Data after flushed sequence need to be recovered.
flushed_sequence = version_control.current().flushed_sequence();
last_sequence = flushed_sequence;
// FIXME(yingwen): Now log store will overwrite the entry id by its internal entry id,
// which starts from 0. This is a hack to just make the test passes since we knows the
// entry id of log store is always equals to `sequence - 1`. Change this to
// `flushed_sequence + ` once the log store fixes this issue.
let mut stream = writer_ctx.wal.read_from_wal(flushed_sequence).await?;
while let Some((req_sequence, _header, request)) = stream.try_next().await? {
if let Some(request) = request {
@@ -222,9 +218,6 @@ impl WriterInner {
// Note that memtables of `Version` may be updated during replay.
let version = version_control.current();
// FIXME(yingwen): Use req_sequence instead of `req_sequence + 1` once logstore
// won't overwrite the entry id.
let req_sequence = req_sequence + 1;
if req_sequence > last_sequence {
last_sequence = req_sequence;
} else {

View File

@@ -134,8 +134,7 @@ impl<S: LogStore> Wal<S> {
}
async fn write(&self, seq: SequenceNumber, bytes: &[u8]) -> Result<(u64, usize)> {
let mut e = self.store.entry(bytes);
e.set_id(seq);
let e = self.store.entry(bytes, seq);
let res = self
.store
@@ -275,13 +274,14 @@ mod tests {
#[tokio::test]
pub async fn test_read_wal_only_header() -> Result<()> {
common_telemetry::init_default_ut_logging();
let (log_store, _tmp) =
test_util::log_store_util::create_tmp_local_file_log_store("wal_test").await;
let wal = Wal::new("test_region", Arc::new(log_store));
let header = WalHeader::with_last_manifest_version(111);
let (seq_num, _) = wal.write_to_wal(3, header, Payload::None).await?;
assert_eq!(0, seq_num);
assert_eq!(3, seq_num);
let mut stream = wal.read_from_wal(seq_num).await?;
let mut data = vec![];

View File

@@ -50,7 +50,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>, Self::Error>;
/// Create an entry of the associate Entry type
fn entry<D: AsRef<[u8]>>(&self, data: D) -> Self::Entry;
fn entry<D: AsRef<[u8]>>(&self, data: D, id: Id) -> Self::Entry;
/// Create a namespace of the associate Namespace type
// TODO(sunng87): confusion with `create_namespace`

View File

@@ -17,8 +17,6 @@ pub trait Entry: Encode + Send + Sync {
/// Return file offset of entry.
fn offset(&self) -> Offset;
fn set_offset(&mut self, offset: Offset);
fn set_id(&mut self, id: Id);
/// Returns epoch of entry.

View File

@@ -83,8 +83,6 @@ mod tests {
self.offset
}
fn set_offset(&mut self, _offset: Offset) {}
fn set_id(&mut self, _id: Id) {}
fn epoch(&self) -> Epoch {