Implement log store append and file set management (#43)

* add log store impl

* add some test

* delete failing test

* fix: concurrent close issue

* feat: use arcswap to replace unsafe AtomicPtr

* fix: use lock to protect rolling procedure.
fix: use try_recv to replace poll_recv on appender task.

* chores: 1. use direct tmp dir instead of creating TempDir instance; 2. inline some short function; 3. rename some structs; 4. optimize namespace to arc wrapper inner struct.
This commit is contained in:
Lei, Huang
2022-06-16 19:09:09 +08:00
committed by GitHub
parent 725a261b55
commit e03ac2fc2b
17 changed files with 1550 additions and 27 deletions

View File

@@ -2,7 +2,26 @@
name = "log-store"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
arc-swap = "1.5"
async-stream = "0.3"
async-trait = "0.1"
base64 = "0.13"
byteorder = "1.4"
bytes = "1.1"
common-error = { path = "../common/error" }
common-telemetry = { path = "../common/telemetry" }
crc = "3.0"
futures = "0.3"
futures-util = "0.3"
hex = "0.4"
memmap2 = "0.5"
snafu = { version = "0.7", features = ["backtraces"] }
store-api = { path = "../store-api" }
tempdir = "0.3"
tokio = { version = "1.18", features = ["full"] }
[dev-dependencies]
rand = "0.8.5"

View File

@@ -0,0 +1,57 @@
use std::any::Any;
use common_error::prelude::{ErrorExt, Snafu};
use snafu::{Backtrace, ErrorCompat};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to deserialize entry"))]
Deserialization { backtrace: Backtrace },
#[snafu(display("Entry corrupted, msg: {}", msg))]
Corrupted { msg: String, backtrace: Backtrace },
#[snafu(display("IO error, source: {}", source))]
Io {
source: std::io::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to open log file {}, source: {}", file_name, source))]
OpenLog {
file_name: String,
source: std::io::Error,
backtrace: Backtrace,
},
#[snafu(display("File name {} illegal", file_name))]
FileNameIllegal {
file_name: String,
backtrace: Backtrace,
},
#[snafu(display("Internal error, msg: {}", msg))]
Internal { msg: String, backtrace: Backtrace },
#[snafu(display("End of LogFile"))]
Eof,
#[snafu(display("File duplicate on start: {}", msg))]
DuplicateFile { msg: String },
#[snafu(display("Log file suffix is illegal: {}", suffix))]
SuffixIllegal { suffix: String },
}
impl ErrorExt for Error {
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
pub type Result<T> = std::result::Result<T, Error>;

29
src/log-store/src/fs.rs Normal file
View File

@@ -0,0 +1,29 @@
use store_api::logstore::entry::{Id, Offset};
use store_api::logstore::AppendResponse;
mod config;
mod crc;
mod entry;
mod file;
mod file_name;
mod index;
mod log;
mod namespace;
#[derive(Debug, PartialEq, Eq)]
pub struct AppendResponseImpl {
entry_id: Id,
offset: Offset,
}
impl AppendResponse for AppendResponseImpl {
#[inline]
fn entry_id(&self) -> Id {
self.entry_id
}
#[inline]
fn offset(&self) -> Offset {
self.offset
}
}

View File

@@ -0,0 +1,34 @@
#[derive(Debug, Clone)]
pub struct LogConfig {
pub append_buffer_size: usize,
pub max_log_file_size: usize,
pub log_file_dir: String,
}
impl Default for LogConfig {
/// Default value of config stores log file into a tmp directory, which should only be used
/// in tests.
fn default() -> Self {
Self {
append_buffer_size: 128,
max_log_file_size: 1024 * 1024 * 1024,
log_file_dir: "/tmp/greptimedb".to_string(),
}
}
}
#[cfg(test)]
mod tests {
use common_telemetry::info;
use super::*;
#[test]
pub fn test_default_config() {
common_telemetry::logging::init_default_ut_logging();
let default = LogConfig::default();
info!("LogConfig::default(): {:?}", default);
assert_eq!(1024 * 1024 * 1024, default.max_log_file_size);
assert_eq!(128, default.append_buffer_size);
}
}

View File

@@ -0,0 +1,3 @@
use crc::{Crc, CRC_32_ISCSI};
pub const CRC_ALGO: Crc<u32> = Crc::<u32>::new(&CRC_32_ISCSI);

View File

@@ -0,0 +1,207 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use byteorder::{ByteOrder, LittleEndian};
use futures::Stream;
use snafu::{ensure, Backtrace, GenerateImplicitData};
use store_api::logstore::entry::{Entry, Epoch, Id, Offset};
use store_api::logstore::entry_stream::{EntryStream, SendableEntryStream};
use crate::error::{DeserializationSnafu, Error};
use crate::fs::crc;
// length+offset+epoch+crc
const ENTRY_MIN_LEN: usize = 4 + 8 + 8 + 4;
#[derive(Debug, PartialEq, Clone)]
pub struct EntryImpl {
pub data: Vec<u8>,
pub offset: Offset,
pub id: Id,
pub epoch: Epoch,
}
impl Entry for EntryImpl {
type Error = Error;
fn data(&self) -> &[u8] {
&self.data
}
fn id(&self) -> Id {
self.id
}
fn offset(&self) -> Offset {
self.offset
}
fn set_offset(&mut self, offset: Offset) {
self.offset = offset;
}
fn set_id(&mut self, id: Id) {
self.id = id;
}
fn epoch(&self) -> Epoch {
self.epoch
}
fn len(&self) -> usize {
ENTRY_MIN_LEN + self.data.len()
}
fn is_empty(&self) -> bool {
self.data.is_empty()
}
fn serialize(&self) -> Vec<u8> {
let res: Vec<u8> = self.into();
res
}
fn deserialize(b: impl AsRef<[u8]>) -> Result<Self, Self::Error> {
EntryImpl::try_from(b.as_ref())
}
}
impl EntryImpl {
pub fn new(data: impl AsRef<[u8]>) -> Self {
let data = Vec::from(data.as_ref());
Self {
id: 0,
data,
offset: 0,
epoch: 0,
}
}
}
/// Entry binary format (Little endian):
///
/// +--------+--------+--------+--------+--------+
// |entry id| epoch | length | data | CRC |
// +--------+--------+--------+--------+--------+
// | 8 bytes| 8 bytes| 4 bytes|<length>| 4 bytes|
// +--------+--------+--------+--------+--------+
///
impl TryFrom<&[u8]> for EntryImpl {
type Error = Error;
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
ensure!(value.len() >= ENTRY_MIN_LEN, DeserializationSnafu);
// TODO(hl): will use byteorder to simplify encoding/decoding.
let id_end_ofs = 8;
let epoch_end_ofs = id_end_ofs + 8;
let length_end_offset = epoch_end_ofs + 4;
let length = LittleEndian::read_u32(&value[epoch_end_ofs..length_end_offset]);
let data_end_ofs = length_end_offset + length as usize;
let crc_end_ofs = data_end_ofs + 4;
let data = Vec::from(&value[length_end_offset..data_end_ofs]);
let id = LittleEndian::read_u64(&value[0..id_end_ofs]);
let epoch = LittleEndian::read_u64(&value[id_end_ofs..epoch_end_ofs]);
let crc_read = LittleEndian::read_u32(&value[data_end_ofs..crc_end_ofs]);
// TODO(hl): add a config option to turn off CRC checksum.
let crc_calc = crc::CRC_ALGO.checksum(&value[0..data_end_ofs]);
if crc_calc != crc_read {
return Err(Error::Corrupted {
msg: format!("CRC mismatch, read: {}, calc: {}", crc_read, crc_calc),
backtrace: Backtrace::generate(),
});
}
Ok(Self {
data,
offset: 0usize,
id,
epoch,
})
}
}
impl From<&EntryImpl> for Vec<u8> {
fn from(e: &EntryImpl) -> Self {
let data_length = e.data.len();
let total_size = data_length + ENTRY_MIN_LEN;
let mut vec = vec![0u8; total_size];
let buf = vec.as_mut_slice();
let id_end_ofs = 8;
let epoch_end_ofs = id_end_ofs + 8;
let length_end_offset = epoch_end_ofs + 4;
let data_end_ofs = length_end_offset + data_length as usize;
let crc_end_ofs = data_end_ofs + 4;
LittleEndian::write_u64(buf, e.id);
LittleEndian::write_u64(&mut buf[id_end_ofs..epoch_end_ofs], e.epoch);
LittleEndian::write_u32(
&mut buf[epoch_end_ofs..length_end_offset],
data_length as u32,
); // todo check this cast
buf[length_end_offset..data_end_ofs].copy_from_slice(e.data.as_slice());
let checksum = crc::CRC_ALGO.checksum(&buf[0..data_end_ofs]);
LittleEndian::write_u32(&mut buf[data_end_ofs..crc_end_ofs], checksum);
vec
}
}
pub struct StreamImpl<'a> {
pub inner: SendableEntryStream<'a, EntryImpl, Error>,
pub start_entry_id: Id,
}
impl<'a> Stream for StreamImpl<'a> {
type Item = Result<Vec<EntryImpl>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
}
impl<'a> EntryStream for StreamImpl<'a> {
type Error = Error;
type Entry = EntryImpl;
fn start_id(&self) -> u64 {
self.start_entry_id
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::fs::crc::CRC_ALGO;
#[test]
pub fn test_entry_deser() {
let data = "hello, world";
let entry = EntryImpl::new(data.as_bytes());
let vec: Vec<u8> = (&entry).into();
assert_eq!(ENTRY_MIN_LEN + data.as_bytes().len(), vec.len());
let deserialized = EntryImpl::try_from(vec.as_slice()).unwrap();
assert_eq!(entry, deserialized);
}
#[test]
pub fn test_rewrite_entry_id() {
let data = "hello, world";
let mut entry = EntryImpl::new(data.as_bytes());
let mut vec: Vec<u8> = (&entry).into();
entry.set_id(123);
assert_eq!(123, entry.id());
// rewrite entry id.
LittleEndian::write_u64(&mut vec[0..8], 333);
let len = vec.len();
let checksum = CRC_ALGO.checksum(&vec[0..len - 4]);
LittleEndian::write_u32(&mut vec[len - 4..], checksum);
let entry_impl = EntryImpl::deserialize(&vec).expect("Failed to deserialize");
assert_eq!(333, entry_impl.id());
}
}

View File

@@ -0,0 +1,519 @@
use std::fmt::{Debug, Formatter};
use std::io::SeekFrom;
use std::sync::atomic::Ordering::Acquire;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use async_stream::stream;
use byteorder::ByteOrder;
use byteorder::LittleEndian;
use common_telemetry::logging::{error, info};
use common_telemetry::warn;
use futures_util::StreamExt;
use memmap2::{Mmap, MmapOptions};
use snafu::{Backtrace, GenerateImplicitData, ResultExt};
use store_api::logstore::entry::{Entry, Id, Offset};
use store_api::logstore::entry_stream::EntryStream;
use store_api::logstore::namespace::Namespace;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::Receiver as MpscReceiver;
use tokio::sync::mpsc::Sender as MpscSender;
use tokio::sync::oneshot::Sender as OneshotSender;
use tokio::sync::{oneshot, Notify, RwLock};
use tokio::task::JoinHandle;
use tokio::time;
use crate::error::{Error, IoSnafu, OpenLogSnafu, Result};
use crate::fs::config::LogConfig;
use crate::fs::crc::CRC_ALGO;
use crate::fs::entry::{EntryImpl, StreamImpl};
use crate::fs::file_name::FileName;
use crate::fs::namespace::LocalNamespace;
use crate::fs::AppendResponseImpl;
const LOG_WRITER_BATCH_SIZE: usize = 16;
// TODO(hl): use pwrite polyfill in different platforms, avoid write syscall in each append request.
pub struct LogFile {
name: FileName,
file: Arc<RwLock<File>>,
write_offset: Arc<AtomicUsize>,
flush_offset: Arc<AtomicUsize>,
next_entry_id: Arc<AtomicU64>,
start_entry_id: u64,
pending_request_rx: Option<MpscReceiver<AppendRequest>>,
pending_request_tx: MpscSender<AppendRequest>,
notify: Arc<Notify>,
max_file_size: usize,
join_handle: Mutex<Option<JoinHandle<Result<()>>>>,
sealed: Arc<AtomicBool>,
stopped: Arc<AtomicBool>,
}
impl Debug for LogFile {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LogFile")
.field("name", &self.name)
.field("start_entry_id", &self.start_entry_id)
.field("write_offset", &self.write_offset.load(Ordering::Relaxed))
.field("flush_offset", &self.flush_offset.load(Ordering::Relaxed))
.field("next_entry_id", &self.next_entry_id.load(Ordering::Relaxed))
.field("max_file_size", &self.max_file_size)
.field("sealed", &self.sealed.load(Ordering::Relaxed))
.finish()
}
}
impl LogFile {
/// Opens a file in path with given log config.
pub async fn open(path: impl Into<String>, config: &LogConfig) -> Result<Self> {
let path = path.into();
let file = OpenOptions::new()
.write(true)
.read(true)
.create(true)
.open(path.clone())
.await
.context(OpenLogSnafu { file_name: &path })?;
let file_name: FileName = path.as_str().try_into()?;
let start_entry_id = file_name.entry_id();
let (tx, rx) = tokio::sync::mpsc::channel(config.append_buffer_size);
let mut log = Self {
name: file_name,
file: Arc::new(RwLock::new(file)),
write_offset: Arc::new(AtomicUsize::new(0)),
flush_offset: Arc::new(AtomicUsize::new(0)),
next_entry_id: Arc::new(AtomicU64::new(start_entry_id)),
start_entry_id,
pending_request_tx: tx,
pending_request_rx: Some(rx),
notify: Arc::new(Notify::new()),
max_file_size: config.max_log_file_size,
join_handle: Mutex::new(None),
sealed: Arc::new(AtomicBool::new(false)),
stopped: Arc::new(AtomicBool::new(false)),
};
let metadata = log.file.read().await.metadata().await.context(IoSnafu)?;
let expect_length = metadata.len() as usize;
log.write_offset.store(expect_length, Ordering::Relaxed);
log.flush_offset.store(expect_length, Ordering::Relaxed);
let replay_start_time = time::Instant::now();
let (actual_offset, next_entry_id) = log.replay().await?;
info!(
"Log file {} replay finished, last offset: {}, file start entry id: {}, next entry id: {}, elapsed time: {}ms",
path, actual_offset, start_entry_id, next_entry_id,
time::Instant::now().duration_since(replay_start_time).as_millis()
);
log.write_offset.store(actual_offset, Ordering::Relaxed);
log.flush_offset.store(actual_offset, Ordering::Relaxed);
log.next_entry_id.store(next_entry_id, Ordering::Relaxed);
log.seek(actual_offset).await?;
Ok(log)
}
/// Advances file cursor to given offset.
async fn seek(&mut self, offset: usize) -> Result<u64> {
self.file
.write()
.await
.seek(SeekFrom::Start(offset as u64))
.await
.context(IoSnafu)
}
/// Creates a file mmap region.
async fn map(&self, start: u64, length: usize) -> Result<Mmap> {
unsafe {
let file = self.file.read().await.try_clone().await.context(IoSnafu)?;
MmapOptions::new()
.offset(start)
.len(length)
.map(&file)
.context(IoSnafu)
}
}
/// Returns the persisted size of current log file.
#[allow(unused)]
#[inline]
pub fn persisted_size(&self) -> usize {
self.flush_offset.load(Ordering::Relaxed)
}
#[inline]
pub fn next_entry_id(&self) -> Id {
self.next_entry_id.load(Ordering::Relaxed)
}
/// Increases write offset field by `delta` and return the previous value.
#[inline]
fn inc_offset(&self, delta: usize) -> usize {
// Relaxed order is enough since no sync-with relationship
// between `offset` and any other field.
self.write_offset.fetch_add(delta, Ordering::Relaxed)
}
/// Increases next entry field by `delta` and return the previous value.
fn inc_entry_id(&self) -> u64 {
// Relaxed order is enough since no sync-with relationship
// between `offset` and any other field.
self.next_entry_id.fetch_add(1, Ordering::Relaxed)
}
/// Starts log file and it's internal components(including flush task, etc.).
pub async fn start(&mut self) -> Result<()> {
let notify = self.notify.clone();
let file = self.file.write().await.try_clone().await.context(IoSnafu)?;
let write_offset = self.write_offset.clone();
let flush_offset = self.flush_offset.clone();
let stopped = self.stopped.clone();
if let Some(mut rx) = self.pending_request_rx.take() {
let handle = tokio::spawn(async move {
let mut batch: Vec<AppendRequest> = Vec::with_capacity(LOG_WRITER_BATCH_SIZE);
let mut error_occurred = false;
while !stopped.load(Ordering::Acquire) {
for _ in 0..LOG_WRITER_BATCH_SIZE {
match rx.try_recv() {
Ok(req) => {
batch.push(req);
}
Err(e) => match e {
TryRecvError::Empty => {
if batch.is_empty() {
notify.notified().await;
if stopped.load(Ordering::Acquire) {
break;
}
} else {
break;
}
}
TryRecvError::Disconnected => {
info!("Channel disconnected...");
error_occurred = true;
break;
}
},
}
}
// flush all pending data to disk
let write_offset_read = write_offset.load(Ordering::Relaxed);
// TODO(hl): add flush metrics
if let Err(flush_err) = file.sync_all().await {
error!("Failed to flush log file: {}", flush_err);
error_occurred = true;
}
if error_occurred {
info!("Flush task stop");
break;
}
flush_offset.store(write_offset_read, Ordering::Relaxed);
while let Some(req) = batch.pop() {
req.complete();
}
}
// drain all pending request on stopping.
let write_offset_read = write_offset.load(Ordering::Relaxed);
if file.sync_all().await.is_ok() {
flush_offset.store(write_offset_read, Ordering::Release);
while let Ok(req) = rx.try_recv() {
req.complete()
}
}
Ok(())
});
*self.join_handle.lock().unwrap() = Some(handle);
info!("Flush task started...");
}
Ok(())
}
/// Stops log file.
/// # Panics
/// Panics when a log file is stopped while not being started ever.
pub async fn stop(&self) -> Result<()> {
self.stopped.store(true, Ordering::Release);
let join_handle = self
.join_handle
.lock()
.unwrap()
.take()
.expect("Join handle should present");
self.notify.notify_waiters();
let res = join_handle.await.unwrap();
info!("LogFile task finished: {:?}", res);
res
}
#[inline]
pub fn start_entry_id(&self) -> Id {
self.start_entry_id
}
/// Replays current file til last entry read
pub async fn replay(&mut self) -> Result<(usize, Id)> {
let log_name = self.name.to_string();
let previous_offset = self.flush_offset.load(Ordering::Relaxed);
let mut stream = self.create_stream(
// TODO(hl): LocalNamespace should be filled
LocalNamespace::default(),
0,
);
let mut last_offset = 0usize;
let mut last_entry_id: Option<Id> = None;
while let Some(res) = stream.next().await {
match res {
Ok(entries) => {
for e in entries {
last_offset += e.len();
last_entry_id = Some(e.id());
}
}
Err(e) => {
error!("Error while replay log {} {:?}", log_name, e);
break;
}
}
}
info!(
"Replay log {} finished, offset: {} -> {}",
log_name, previous_offset, last_offset
);
Ok((
last_offset,
match last_entry_id {
None => self.start_entry_id,
Some(v) => v + 1,
},
))
}
/// Creates a reader stream that asynchronously generates entries start from given entry id.
/// ### Notice
/// If the entry with start entry id is not present, the first generated entry will start with
/// the first entry with an id greater than `start_entry_id`.
pub fn create_stream(&self, _ns: impl Namespace, start_entry_id: u64) -> impl EntryStream + '_ {
let s = stream!({
let length = self.flush_offset.load(Ordering::Relaxed);
let mmap = self.map(0, length).await?;
let mut buf: &[u8] = mmap.as_ref();
if buf.len() == 0 {
info!("File is just created!");
// file is newly created
return;
}
loop {
let entry = EntryImpl::try_from(buf)?;
let entry_length = entry.len();
if entry.id() >= start_entry_id {
yield Ok(vec![entry]);
}
if buf.len() > entry_length {
buf = &buf[entry_length..];
} else {
break;
}
}
});
StreamImpl {
inner: Box::pin(s),
start_entry_id,
}
}
/// Appends an entry to `LogFile` and return a `Result` containing the id of entry appended.
pub async fn append<T: Entry>(&self, e: &mut T) -> Result<AppendResponseImpl> {
if self.stopped.load(Ordering::Acquire) {
return Err(Error::Eof);
}
e.set_id(0);
let mut serialized = e.serialize();
let size = serialized.len();
if size + self.write_offset.load(Ordering::Relaxed) > self.max_file_size {
return Err(Error::Eof);
}
let entry_offset;
let entry_id;
{
let mut file = self.file.write().await;
// generate entry id
entry_id = self.inc_entry_id();
// generate in-file offset
entry_offset = self.inc_offset(size);
// rewrite encoded data
LittleEndian::write_u64(&mut serialized[0..8], entry_id);
// TODO(hl): CRC was calculated twice
let checksum = CRC_ALGO.checksum(&serialized[0..size - 4]);
LittleEndian::write_u32(&mut serialized[size - 4..], checksum);
// write to file
// TODO(hl): use io buffer and pwrite to reduce syscalls.
file.write(serialized.as_slice()).await.context(IoSnafu)?;
}
let (tx, rx) = oneshot::channel();
if self
.pending_request_tx
.send(AppendRequest {
tx,
offset: entry_offset,
id: entry_id,
})
.await
.is_err()
{
self.file.write().await.sync_all().await.context(IoSnafu)?;
Ok(AppendResponseImpl {
offset: entry_offset,
entry_id,
})
} else {
self.notify.notify_one(); // notify flush thread.
rx.await.map_err(|e| {
warn!(
"Error while waiting for append result:{}, file {}",
e,
self.name.to_string()
);
Error::Internal {
msg: "Sender already dropped".to_string(),
backtrace: Backtrace::generate(),
}
})
}
}
#[inline]
pub fn try_seal(&self) -> bool {
self.sealed
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}
#[inline]
pub fn is_seal(&self) -> bool {
self.sealed.load(Acquire)
}
#[inline]
pub fn unseal(&self) {
self.sealed.store(false, Ordering::Release);
}
#[inline]
pub fn file_name(&self) -> String {
self.name.to_string()
}
}
impl ToString for LogFile {
fn to_string(&self) -> String {
format!("LogFile{{ name: {}, write_offset: {}, flush_offset: {}, start_entry_id: {}, entry_id_counter: {} }}",
self.name, self.write_offset.load(Ordering::Relaxed), self.flush_offset.load(Ordering::Relaxed), self.start_entry_id, self.next_entry_id.load(Ordering::Relaxed))
}
}
pub type LogFileRef = Arc<LogFile>;
#[allow(dead_code)]
#[derive(Debug)]
pub(crate) struct AppendRequest {
tx: OneshotSender<AppendResponseImpl>,
offset: Offset,
id: Id,
}
impl AppendRequest {
pub fn complete(self) {
// TODO(hl): use this result.
let _ = self.tx.send(AppendResponseImpl {
offset: self.offset,
entry_id: self.id,
});
}
}
#[cfg(test)]
mod tests {
use common_telemetry::logging;
use futures_util::StreamExt;
use tempdir::TempDir;
use super::*;
use crate::fs::namespace::LocalNamespace;
pub async fn create_temp_dir(file_name: impl AsRef<str>) -> (String, TempDir) {
let dir = TempDir::new("greptimedb-store-test").unwrap();
let path_buf = dir.path().join(file_name.as_ref());
let path_str = path_buf.to_str().unwrap().to_string();
File::create(path_str.as_str()).await.unwrap();
(path_str, dir)
}
#[tokio::test]
pub async fn test_create_entry_stream() {
logging::init_default_ut_logging();
let config = LogConfig::default();
let (path, _dir) = create_temp_dir("0010.log").await;
let mut file = LogFile::open(path.clone(), &config)
.await
.unwrap_or_else(|_| panic!("Failed to open file: {}", path));
file.start().await.expect("Failed to start log file");
assert_eq!(
10,
file.append(&mut EntryImpl::new("test1".as_bytes()))
.await
.expect("Failed to append entry 1")
.entry_id
);
assert_eq!(
11,
file.append(&mut EntryImpl::new("test-2".as_bytes()))
.await
.expect("Failed to append entry 2")
.entry_id
);
let mut stream = file.create_stream(LocalNamespace::default(), 0);
let mut data = vec![];
while let Some(v) = stream.next().await {
let entries = v.unwrap();
let content = entries[0].data();
let vec = content.to_vec();
data.push(String::from_utf8(vec).unwrap());
}
assert_eq!(vec!["test1".to_string(), "test-2".to_string()], data);
drop(stream);
let result = file.stop().await;
info!("Stop file res: {:?}", result);
}
}

View File

@@ -0,0 +1,111 @@
use std::fmt::{Display, Formatter};
use std::path::Path;
use snafu::OptionExt;
use store_api::logstore::entry::Id;
use crate::error::{Error, FileNameIllegalSnafu, SuffixIllegalSnafu};
use crate::fs::file_name::FileName::Log;
/// FileName represents the file name with padded leading zeros.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum FileName {
// File name with .log as suffix.
Log(Id),
}
impl TryFrom<&str> for FileName {
type Error = Error;
fn try_from(p: &str) -> Result<Self, Self::Error> {
let path = Path::new(p);
let extension =
path.extension()
.and_then(|s| s.to_str())
.with_context(|| FileNameIllegalSnafu {
file_name: path.to_string_lossy(),
})?;
let id: u64 = path
.file_stem()
.and_then(|s| s.to_str())
.and_then(|s| s.parse::<u64>().ok())
.with_context(|| FileNameIllegalSnafu {
file_name: p.to_string(),
})?;
Self::new_with_suffix(id, extension)
}
}
impl From<u64> for FileName {
fn from(entry_id: u64) -> Self {
Self::log(entry_id)
}
}
impl FileName {
pub fn log(entry_id: Id) -> Self {
Log(entry_id)
}
pub fn new_with_suffix(entry_id: Id, suffix: &str) -> Result<Self, Error> {
match suffix {
"log" => Ok(Log(entry_id)),
_ => SuffixIllegalSnafu { suffix }.fail(),
}
}
pub fn entry_id(&self) -> Id {
match self {
Log(id) => *id,
}
}
fn suffix(&self) -> &str {
match self {
Log(_) => ".log",
}
}
}
impl Display for FileName {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:020}{}", self.entry_id(), self.suffix())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
pub fn test_padding_file_name() {
let id = u64::MIN;
assert_eq!("00000000000000000000", format!("{:020}", id));
let id = 123u64;
assert_eq!("00000000000000000123", format!("{:020}", id));
let id = 123123123123u64;
assert_eq!("00000000123123123123", format!("{:020}", id));
let id = u64::MAX;
assert_eq!(u64::MAX.to_string(), format!("{:020}", id));
}
#[test]
pub fn test_file_name_to_string() {
assert_eq!("00000000000000000000.log", FileName::log(0).to_string());
assert_eq!(
u64::MAX.to_string() + ".log",
FileName::log(u64::MAX).to_string()
);
}
#[test]
pub fn test_parse_file_name() {
let path = "/path/to/any/01010010000.log";
let parsed = FileName::try_from(path).expect("Failed to parse file name");
assert_eq!(1010010000u64, parsed.entry_id());
assert_eq!(".log", parsed.suffix());
}
}

View File

@@ -0,0 +1,68 @@
use std::collections::BTreeMap;
use std::sync::RwLock;
use store_api::logstore::entry::{Id, Offset};
use crate::error::Result;
use crate::fs::file_name::FileName;
#[derive(Debug, Copy, Clone, PartialEq)]
pub struct Location {
pub file_name: FileName,
pub offset: Offset,
}
#[allow(dead_code)]
impl Location {
pub fn new(file_name: FileName, offset: Offset) -> Self {
Self { file_name, offset }
}
}
/// In-memory entry id to offset index.
pub trait EntryIndex {
/// Add entry id to offset mapping.
fn add_entry_id(&self, id: Id, loc: Location) -> Option<Location>;
/// Find offset by entry id.
fn find_offset_by_id(&self, id: Id) -> Result<Option<Location>>;
}
pub struct MemoryIndex {
map: RwLock<BTreeMap<Id, Location>>,
}
#[allow(dead_code)]
impl MemoryIndex {
pub fn new() -> Self {
Self {
map: RwLock::new(BTreeMap::new()),
}
}
}
impl EntryIndex for MemoryIndex {
fn add_entry_id(&self, id: Id, loc: Location) -> Option<Location> {
self.map.write().unwrap().insert(id, loc)
}
fn find_offset_by_id(&self, id: Id) -> Result<Option<Location>> {
Ok(self.map.read().unwrap().get(&id).cloned())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
pub fn test_entry() {
let index = MemoryIndex::new();
index.add_entry_id(1, Location::new(FileName::log(0), 1));
assert_eq!(
Location::new(FileName::log(0), 1),
index.find_offset_by_id(1).unwrap().unwrap()
);
assert_eq!(None, index.find_offset_by_id(2).unwrap());
}
}

309
src/log-store/src/fs/log.rs Normal file
View File

@@ -0,0 +1,309 @@
use std::collections::BTreeMap;
use std::path::Path;
use std::sync::Arc;
use arc_swap::ArcSwap;
use common_telemetry::{error, info, warn};
use snafu::{OptionExt, ResultExt};
use store_api::logstore::entry::Id;
use store_api::logstore::LogStore;
use tokio::sync::RwLock;
use crate::error::{
DuplicateFileSnafu, Error, FileNameIllegalSnafu, InternalSnafu, IoSnafu, Result,
};
use crate::fs::config::LogConfig;
use crate::fs::entry::EntryImpl;
use crate::fs::file::{LogFile, LogFileRef};
use crate::fs::file_name::FileName;
use crate::fs::namespace::LocalNamespace;
use crate::fs::AppendResponseImpl;
type FileMap = BTreeMap<u64, LogFileRef>;
pub struct LocalFileLogStore {
files: RwLock<FileMap>,
active: ArcSwap<LogFile>,
config: LogConfig,
}
impl LocalFileLogStore {
/// Opens a directory as log store directory, initialize directory if it is empty.
#[allow(unused)]
pub async fn open(config: &LogConfig) -> Result<Self> {
let mut files = Self::load_dir(&config.log_file_dir, config).await?;
if files.is_empty() {
Self::init_on_empty(&mut files, config).await?;
info!("Initialized log store directory: {}", config.log_file_dir)
}
let id = *files.keys().max().context(InternalSnafu {
msg: format!(
"log store directory is empty after initialization: {}",
config.log_file_dir
),
})?;
info!(
"Successfully loaded log store directory, files: {:?}",
files
);
let active_file = files
.get_mut(&id)
.expect("Not expected to fail when initing log store");
active_file.unseal();
let active_file_name = active_file.to_string();
info!("Log store active log file: {}", active_file_name);
// Start active log file
Arc::get_mut(active_file)
.with_context(|| InternalSnafu {
msg: format!(
"Concurrent modification on log store {} start is not allowed",
active_file_name
),
})?
.start()
.await?;
info!(
"Successfully started current active file: {}",
active_file_name
);
let active_file_cloned = active_file.clone();
Ok(Self {
files: RwLock::new(files),
active: ArcSwap::new(active_file_cloned),
config: config.clone(),
})
}
pub async fn init_on_empty(files: &mut FileMap, config: &LogConfig) -> Result<()> {
let path = Path::new(&config.log_file_dir).join(FileName::log(0).to_string());
let file_path = path.to_str().context(FileNameIllegalSnafu {
file_name: config.log_file_dir.clone(),
})?;
let file = LogFile::open(file_path, config).await?;
files.insert(0, Arc::new(file));
Ok(())
}
pub async fn load_dir(path: impl AsRef<str>, config: &LogConfig) -> Result<FileMap> {
let mut map = FileMap::new();
let mut dir = tokio::fs::read_dir(Path::new(path.as_ref()))
.await
.context(IoSnafu)?;
while let Some(f) = dir.next_entry().await.context(IoSnafu)? {
let path_buf = f.path();
let path = path_buf.to_str().context(FileNameIllegalSnafu {
file_name: path.as_ref().to_string(),
})?;
let file_name = FileName::try_from(path)?;
let start_id = file_name.entry_id();
let file = LogFile::open(path, config).await?;
info!("Load log store file {}: {:?}", start_id, file);
if map.contains_key(&start_id) {
error!("Log file with start entry id: {} already exists", start_id);
return DuplicateFileSnafu {
msg: format!("File with start id: {} duplicates on start", start_id),
}
.fail();
}
file.try_seal();
map.insert(start_id, Arc::new(file));
}
Ok(map)
}
/// Mark current active file as closed and create a new log file for writing.
async fn roll_next(&self, active: LogFileRef) -> Result<()> {
// acquires lock
let mut files = self.files.write().await;
// if active is already sealed, then just return.
if active.is_seal() {
return Ok(());
}
// create and start a new log file
let entry_id = active.next_entry_id();
let path_buf =
Path::new(&self.config.log_file_dir).join(FileName::log(entry_id).to_string());
let path = path_buf.to_str().context(FileNameIllegalSnafu {
file_name: self.config.log_file_dir.clone(),
})?;
let mut new_file = LogFile::open(path, &self.config).await?;
new_file.start().await?;
let new_file = Arc::new(new_file);
files.insert(new_file.start_entry_id(), new_file.clone());
self.active.swap(new_file);
active.try_seal();
tokio::spawn(async move {
active.stop().await.unwrap();
info!("Sealed log file {} stopped.", active.file_name());
});
Ok(()) // release lock
}
pub fn active_file(&self) -> Arc<LogFile> {
self.active.load().clone()
}
}
#[async_trait::async_trait]
impl LogStore for LocalFileLogStore {
type Error = Error;
type Namespace = LocalNamespace;
type Entry = EntryImpl;
type AppendResponse = AppendResponseImpl;
async fn append(
&self,
_ns: Self::Namespace,
mut e: Self::Entry,
) -> Result<Self::AppendResponse> {
// TODO(hl): configurable retry times
for _ in 0..3 {
let current_active_file = self.active_file();
match current_active_file.append(&mut e).await {
Ok(r) => return Ok(r),
Err(e) => match e {
Error::Eof => {
self.roll_next(current_active_file.clone()).await?;
info!("Rolled to next file, retry append");
continue;
}
Error::Internal { .. } => {
warn!("File closed, try new file");
continue;
}
_ => {
error!("Failed to roll to next log file, error:{}", e);
return Err(e);
}
},
}
}
return InternalSnafu {
msg: "Failed to append entry with max retry time exceeds".to_string(),
}
.fail();
}
async fn append_batch(&self, _ns: Self::Namespace, _e: Vec<Self::Entry>) -> Result<Id> {
todo!()
}
async fn read(
&self,
_ns: Self::Namespace,
_id: Id,
) -> Result<store_api::logstore::entry_stream::SendableEntryStream<'_, Self::Entry, Self::Error>>
{
todo!()
}
async fn create_namespace(&mut self, _ns: Self::Namespace) -> Result<()> {
todo!()
}
async fn delete_namespace(&mut self, _ns: Self::Namespace) -> Result<()> {
todo!()
}
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>> {
todo!()
}
}
#[cfg(test)]
mod tests {
use futures_util::StreamExt;
use rand::{distributions::Alphanumeric, Rng};
use store_api::logstore::entry::Entry;
use tempdir::TempDir;
use super::*;
#[tokio::test]
pub async fn test_roll_file() {
common_telemetry::logging::init_default_ut_logging();
let dir = TempDir::new("greptimedb").unwrap();
let config = LogConfig {
append_buffer_size: 128,
max_log_file_size: 128,
log_file_dir: dir.path().to_str().unwrap().to_string(),
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
assert_eq!(
0,
logstore
.append(
LocalNamespace::default(),
EntryImpl::new(generate_data(100)),
)
.await
.unwrap()
.entry_id
);
assert_eq!(
1,
logstore
.append(
LocalNamespace::default(),
EntryImpl::new(generate_data(100)),
)
.await
.unwrap()
.entry_id
);
}
fn generate_data(size: usize) -> Vec<u8> {
let s: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(size)
.map(char::from)
.collect();
s.into_bytes()
}
#[tokio::test]
pub async fn test_write_and_read_data() {
common_telemetry::logging::init_default_ut_logging();
let dir = TempDir::new("greptimedb").unwrap();
let config = LogConfig {
append_buffer_size: 128,
max_log_file_size: 128,
log_file_dir: dir.path().to_str().unwrap().to_string(),
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
let id = logstore
.append(
LocalNamespace::default(),
EntryImpl::new(generate_data(100)),
)
.await
.unwrap()
.entry_id;
assert_eq!(0, id);
let active_file = logstore.active_file();
let stream = active_file.create_stream(LocalNamespace::default(), 0);
tokio::pin!(stream);
let entries = stream.next().await.unwrap().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].id(), 0);
}
}

View File

@@ -0,0 +1,40 @@
use std::sync::Arc;
use store_api::logstore::namespace::Namespace;
#[derive(Clone)]
pub struct LocalNamespace {
inner: Arc<LocalNamespaceInner>,
}
impl Default for LocalNamespace {
fn default() -> Self {
LocalNamespace::new("", 0)
}
}
struct LocalNamespaceInner {
name: String,
id: u64,
}
impl Namespace for LocalNamespace {
fn name(&self) -> &str {
self.inner.name.as_str()
}
}
#[allow(dead_code)]
impl LocalNamespace {
fn id(&self) -> u64 {
self.inner.id
}
pub fn new(name: &str, id: u64) -> Self {
let inner = Arc::new(LocalNamespaceInner {
name: name.to_string(),
id,
});
Self { inner }
}
}

View File

@@ -1 +1,2 @@
mod error;
pub mod fs;

View File

@@ -10,6 +10,7 @@ async-trait = "0.1"
common-error = { path = "../common/error" }
datatypes = { path = "../datatypes" }
futures = "0.3"
snafu = { version = "0.7", features = ["backtraces"] }
[dev-dependencies]
async-stream = "0.3"

View File

@@ -1,9 +1,8 @@
//! LogStore APIs.
use common_error::prelude::ErrorExt;
use entry::Offset;
use crate::logstore::entry::Entry;
use crate::logstore::entry::{Entry, Id, Offset};
use crate::logstore::entry_stream::SendableEntryStream;
use crate::logstore::namespace::Namespace;
@@ -17,23 +16,28 @@ pub trait LogStore {
type Error: ErrorExt + Send + Sync;
type Namespace: Namespace;
type Entry: Entry;
type AppendResponse: AppendResponse;
/// Append an `Entry` to WAL with given namespace
async fn append(&mut self, ns: Self::Namespace, e: Self::Entry) -> Result<Offset, Self::Error>;
async fn append(
&self,
ns: Self::Namespace,
mut e: Self::Entry,
) -> Result<Self::AppendResponse, Self::Error>;
// Append a batch of entries atomically and return the offset of first entry.
async fn append_batch(
&mut self,
&self,
ns: Self::Namespace,
e: Vec<Self::Entry>,
) -> Result<Offset, Self::Error>;
) -> Result<Id, Self::Error>;
// Create a new `EntryStream` to asynchronously generates `Entry`.
async fn read(
&self,
ns: Self::Namespace,
offset: Offset,
) -> Result<SendableEntryStream<Self::Entry>, Self::Error>;
id: Id,
) -> Result<SendableEntryStream<Self::Entry, Self::Error>, Self::Error>;
// Create a new `Namespace`.
async fn create_namespace(&mut self, ns: Self::Namespace) -> Result<(), Self::Error>;
@@ -44,3 +48,9 @@ pub trait LogStore {
// List all existing namespaces.
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>, Self::Error>;
}
pub trait AppendResponse: Send + Sync {
fn entry_id(&self) -> Id;
fn offset(&self) -> Offset;
}

View File

@@ -1,14 +1,36 @@
pub type Offset = u64;
use common_error::ext::ErrorExt;
pub type Offset = usize;
pub type Epoch = u64;
pub type Id = u64;
/// Entry is the minimal data storage unit in `LogStore`.
pub trait Entry {
pub trait Entry: Send + Sync {
type Error: ErrorExt + Send + Sync;
/// Return contained data of entry.
fn data(&self) -> &[u8];
/// Return offset of entry.
/// Return entry id that monotonically increments.
fn id(&self) -> Id;
/// Return file offset of entry.
fn offset(&self) -> Offset;
fn set_offset(&mut self, offset: Offset);
fn set_id(&mut self, id: Id);
/// Returns epoch of entry.
fn epoch(&self) -> Epoch;
fn len(&self) -> usize;
fn is_empty(&self) -> bool;
fn serialize(&self) -> Vec<u8>;
fn deserialize(b: impl AsRef<[u8]>) -> Result<Self, Self::Error>
where
Self: Sized;
}

View File

@@ -1,25 +1,28 @@
use std::pin::Pin;
use common_error::prelude::ErrorExt;
use futures::Stream;
use crate::logstore::entry::Entry;
use crate::logstore::Offset;
pub trait EntryStream: Stream<Item = Vec<Self::Entry>> {
pub trait EntryStream: Stream<Item = Result<Vec<Self::Entry>, Self::Error>> {
type Error: ErrorExt;
type Entry: Entry;
fn start_offset(&self) -> Offset;
fn start_id(&self) -> u64;
}
pub type SendableEntryStream<'a, E> = Pin<Box<dyn Stream<Item = Vec<E>> + Send + 'a>>;
pub type SendableEntryStream<'a, I, E> = Pin<Box<dyn Stream<Item = Result<Vec<I>, E>> + Send + 'a>>;
#[cfg(test)]
mod tests {
use std::any::Any;
use std::task::{Context, Poll};
use futures::StreamExt;
use super::*;
use crate::logstore::entry::Epoch;
use crate::logstore::entry::{Epoch, Id, Offset};
pub struct SimpleEntry {
/// Offset of current entry
@@ -30,18 +33,61 @@ mod tests {
data: Vec<u8>,
}
use common_error::prelude::{ErrorExt, Snafu};
use snafu::{Backtrace, ErrorCompat};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub struct Error {}
impl ErrorExt for Error {
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl Entry for SimpleEntry {
type Error = Error;
fn data(&self) -> &[u8] {
self.data.as_slice()
&self.data
}
fn id(&self) -> Id {
0u64
}
fn offset(&self) -> Offset {
self.offset
}
fn set_offset(&mut self, _offset: Offset) {}
fn set_id(&mut self, _id: Id) {}
fn epoch(&self) -> Epoch {
self.epoch
}
fn len(&self) -> usize {
self.data.len()
}
fn is_empty(&self) -> bool {
self.data.is_empty()
}
fn serialize(&self) -> Vec<u8> {
self.data.clone()
}
fn deserialize(_b: impl AsRef<[u8]>) -> Result<Self, Self::Error> {
unimplemented!()
}
}
impl SimpleEntry {
@@ -56,20 +102,21 @@ mod tests {
}
pub struct EntryStreamImpl<'a> {
inner: SendableEntryStream<'a, SimpleEntry>,
start_offset: Offset,
inner: SendableEntryStream<'a, SimpleEntry, Error>,
start_id: u64,
}
impl<'a> EntryStream for EntryStreamImpl<'a> {
type Error = Error;
type Entry = SimpleEntry;
fn start_offset(&self) -> Offset {
self.start_offset
fn start_id(&self) -> u64 {
self.start_id
}
}
impl Stream for EntryStreamImpl<'_> {
type Item = Vec<SimpleEntry>;
type Item = Result<Vec<SimpleEntry>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.inner).poll_next(cx) {
@@ -83,17 +130,18 @@ mod tests {
#[tokio::test]
pub async fn test_entry_stream() {
let stream = async_stream::stream!({
yield vec![SimpleEntry::new("test_entry".as_bytes(), 0, 128)]
yield Ok(vec![SimpleEntry::new("test_entry".as_bytes(), 0, 128)])
});
let mut stream_impl = EntryStreamImpl {
inner: Box::pin(stream),
start_offset: 1234,
start_id: 1234,
};
if let Some(v) = stream_impl.next().await {
assert_eq!(1, v.len());
assert_eq!(b"test_entry", v[0].data());
let vec = v.unwrap();
assert_eq!(1, vec.len());
assert_eq!(b"test_entry", vec[0].data());
}
}
}