feat: logstore compaction (#740)

* feat: add benchmark for wal

* add bin

* feat: impl wal compaction

* chore: This reverts commit ef9f2326

* chore: This reverts commit 9142ec0e

* fix: remove empty files

* fix: failing tests

* fix: CR comments

* fix: Mark log as stable after writer applies manifest

* fix: some cr comments and namings

* chore: rename all stable_xxx to obsolete_xxx

* chore: error message
This commit is contained in:
Lei, HUANG
2022-12-14 16:15:29 +08:00
committed by GitHub
parent 6a4e2e5975
commit 756c068166
19 changed files with 476 additions and 32 deletions

1
Cargo.lock generated
View File

@@ -3373,6 +3373,7 @@ dependencies = [
"store-api",
"tempdir",
"tokio",
"tokio-util",
]
[[package]]

View File

@@ -139,7 +139,16 @@ pub enum Error {
CreateDir { dir: String, source: std::io::Error },
#[snafu(display("Failed to open log store, source: {}", source))]
OpenLogStore { source: log_store::error::Error },
OpenLogStore {
#[snafu(backtrace)]
source: log_store::error::Error,
},
#[snafu(display("Failed to star log store gc task, source: {}", source))]
StartLogStore {
#[snafu(backtrace)]
source: log_store::error::Error,
},
#[snafu(display("Failed to storage engine, source: {}", source))]
OpenStorageEngine { source: StorageError },
@@ -358,6 +367,7 @@ impl ErrorExt for Error {
Error::BumpTableId { source, .. } => source.status_code(),
Error::MissingNodeId { .. } => StatusCode::InvalidArguments,
Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments,
Error::StartLogStore { source, .. } => source.status_code(),
}
}

View File

@@ -36,12 +36,13 @@ use servers::Mode;
use snafu::prelude::*;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use store_api::logstore::LogStore;
use table::table::TableIdProviderRef;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
use crate::error::{
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
NewCatalogSnafu, Result,
NewCatalogSnafu, Result, StartLogStoreSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
@@ -61,6 +62,7 @@ pub struct Instance {
pub(crate) script_executor: ScriptExecutor,
pub(crate) table_id_provider: Option<TableIdProviderRef>,
pub(crate) heartbeat_task: Option<HeartbeatTask>,
pub(crate) logstore: Arc<LocalFileLogStore>,
}
pub type InstanceRef = Arc<Instance>;
@@ -68,7 +70,7 @@ pub type InstanceRef = Arc<Instance>;
impl Instance {
pub async fn new(opts: &DatanodeOptions) -> Result<Self> {
let object_store = new_object_store(&opts.storage).await?;
let log_store = create_local_file_log_store(opts).await?;
let logstore = Arc::new(create_local_file_log_store(&opts.wal_dir).await?);
let meta_client = match opts.mode {
Mode::Standalone => None,
@@ -88,7 +90,7 @@ impl Instance {
TableEngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(log_store),
logstore.clone(),
object_store.clone(),
),
object_store,
@@ -158,6 +160,7 @@ impl Instance {
script_executor,
heartbeat_task,
table_id_provider,
logstore,
})
}
@@ -166,6 +169,7 @@ impl Instance {
.start()
.await
.context(NewCatalogSnafu)?;
self.logstore.start().await.context(StartLogStoreSnafu)?;
if let Some(task) = &self.heartbeat_task {
task.start().await?;
}
@@ -272,16 +276,16 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOpts) -> Resul
}
pub(crate) async fn create_local_file_log_store(
opts: &DatanodeOptions,
path: impl AsRef<str>,
) -> Result<LocalFileLogStore> {
let path = path.as_ref();
// create WAL directory
fs::create_dir_all(path::Path::new(&opts.wal_dir))
.context(error::CreateDirSnafu { dir: &opts.wal_dir })?;
fs::create_dir_all(path::Path::new(path)).context(error::CreateDirSnafu { dir: path })?;
info!("The WAL directory is: {}", &opts.wal_dir);
info!("The WAL directory is: {}", path);
let log_config = LogConfig {
log_file_dir: opts.wal_dir.clone(),
log_file_dir: path.to_string(),
..Default::default()
};

View File

@@ -38,10 +38,11 @@ impl Instance {
// TODO(LFC): Delete it when callers no longer need it.
pub async fn new_mock() -> Result<Self> {
use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine};
let mock_info = meta_srv::mocks::mock_with_memstore().await;
let meta_client = Arc::new(mock_meta_client(mock_info, 0).await);
let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await;
let (dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await;
let logstore = Arc::new(create_local_file_log_store(dir.path().to_str().unwrap()).await?);
let mock_engine = Arc::new(MockMitoEngine::new(
TableEngineConfig::default(),
MockEngine::default(),
@@ -80,6 +81,7 @@ impl Instance {
script_executor,
heartbeat_task,
table_id_provider,
logstore,
})
}
@@ -90,13 +92,13 @@ impl Instance {
pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result<Self> {
let object_store = new_object_store(&opts.storage).await?;
let log_store = create_local_file_log_store(opts).await?;
let logstore = Arc::new(create_local_file_log_store(&opts.wal_dir).await?);
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await);
let table_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(log_store),
logstore.clone(),
object_store.clone(),
),
object_store,
@@ -132,6 +134,7 @@ impl Instance {
script_executor,
table_id_provider: Some(Arc::new(LocalTableIdProvider::default())),
heartbeat_task: Some(heartbeat_task),
logstore,
})
}
}

View File

@@ -23,6 +23,7 @@ snafu = { version = "0.7", features = ["backtraces"] }
store-api = { path = "../store-api" }
tempdir = "0.3"
tokio = { version = "1.18", features = ["full"] }
tokio-util = "0.7"
[dev-dependencies]
rand = "0.8"

View File

@@ -17,6 +17,7 @@ use std::any::Any;
use common_error::ext::BoxedError;
use common_error::prelude::{ErrorExt, Snafu};
use snafu::{Backtrace, ErrorCompat};
use tokio::task::JoinError;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
@@ -89,6 +90,15 @@ pub enum Error {
#[snafu(display("Failed while waiting for write to finish, source: {}", source))]
WaitWrite { source: tokio::task::JoinError },
#[snafu(display("Invalid logstore status, msg: {}", msg))]
InvalidState { msg: String, backtrace: Backtrace },
#[snafu(display("Failed to wait for gc task to stop, source: {}", source))]
WaitGcTaskStop {
source: JoinError,
backtrace: Backtrace,
},
}
impl ErrorExt for Error {

View File

@@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct LogConfig {
pub append_buffer_size: usize,
pub max_log_file_size: usize,
pub log_file_dir: String,
pub gc_interval: Duration,
}
impl Default for LogConfig {
@@ -27,6 +30,7 @@ impl Default for LogConfig {
append_buffer_size: 128,
max_log_file_size: 1024 * 1024 * 1024,
log_file_dir: "/tmp/greptimedb".to_string(),
gc_interval: Duration::from_secs(10 * 60),
}
}
}
@@ -44,5 +48,6 @@ mod tests {
info!("LogConfig::default(): {:?}", default);
assert_eq!(1024 * 1024 * 1024, default.max_log_file_size);
assert_eq!(128, default.append_buffer_size);
assert_eq!(Duration::from_secs(600), default.gc_interval);
}
}

View File

@@ -55,11 +55,12 @@ const LOG_WRITER_BATCH_SIZE: usize = 16;
/// Wraps File operation to get rid of `&mut self` requirements
struct FileWriter {
inner: Arc<File>,
path: String,
}
impl FileWriter {
pub fn new(file: Arc<File>) -> Self {
Self { inner: file }
pub fn new(file: Arc<File>, path: String) -> Self {
Self { inner: file, path }
}
pub async fn write(&self, data: Bytes, offset: u64) -> Result<()> {
@@ -100,6 +101,11 @@ impl FileWriter {
.await
.context(WaitWriteSnafu)?
}
pub async fn destroy(&self) -> Result<()> {
tokio::fs::remove_file(&self.path).await.context(IoSnafu)?;
Ok(())
}
}
pub type LogFileRef = Arc<LogFile>;
@@ -128,7 +134,7 @@ pub struct LogFile {
impl Drop for LogFile {
fn drop(&mut self) {
self.state.stopped.store(true, Ordering::Relaxed);
info!("Stopping log file {}", self.name);
info!("Dropping log file {}", self.name);
}
}
@@ -143,12 +149,12 @@ impl LogFile {
.open(path.clone())
.context(OpenLogSnafu { file_name: &path })?;
let file_name: FileName = path.as_str().try_into()?;
let file_name = FileName::try_from(path.as_str())?;
let start_entry_id = file_name.entry_id();
let mut log = Self {
name: file_name,
writer: Arc::new(FileWriter::new(Arc::new(file))),
writer: Arc::new(FileWriter::new(Arc::new(file), path.clone())),
start_entry_id,
pending_request_tx: None,
notify: Arc::new(Notify::new()),
@@ -243,6 +249,11 @@ impl LogFile {
res
}
pub async fn destroy(&self) -> Result<()> {
self.writer.destroy().await?;
Ok(())
}
async fn handle_batch(
mut batch: Vec<AppendRequest>,
state: &Arc<State>,
@@ -477,6 +488,11 @@ impl LogFile {
self.state.sealed.load(Ordering::Acquire)
}
#[inline]
pub fn is_stopped(&self) -> bool {
self.state.stopped.load(Ordering::Acquire)
}
#[inline]
pub fn unseal(&self) {
self.state.sealed.store(false, Ordering::Release);

View File

@@ -12,24 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::path::Path;
use std::sync::Arc;
use arc_swap::ArcSwap;
use async_stream::stream;
use common_telemetry::{error, info, warn};
use common_telemetry::{debug, error, info, warn};
use futures::{pin_mut, StreamExt};
use snafu::{OptionExt, ResultExt};
use store_api::logstore::entry::{Encode, Entry, Id};
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
use store_api::logstore::LogStore;
use tokio::sync::RwLock;
use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::error::{
CreateDirSnafu, DuplicateFileSnafu, Error, FileNameIllegalSnafu, InternalSnafu, IoSnafu,
ReadPathSnafu, Result,
CreateDirSnafu, DuplicateFileSnafu, Error, FileNameIllegalSnafu, InternalSnafu,
InvalidStateSnafu, IoSnafu, ReadPathSnafu, Result, WaitGcTaskStopSnafu,
};
use crate::fs::config::LogConfig;
use crate::fs::entry::EntryImpl;
@@ -42,9 +44,12 @@ type FileMap = BTreeMap<u64, LogFileRef>;
#[derive(Debug)]
pub struct LocalFileLogStore {
files: RwLock<FileMap>,
files: Arc<RwLock<FileMap>>,
active: ArcSwap<LogFile>,
config: LogConfig,
obsolete_ids: Arc<RwLock<HashMap<LocalNamespace, u64>>>,
cancel_token: Mutex<Option<CancellationToken>>,
gc_task_handle: Mutex<Option<JoinHandle<()>>>,
}
impl LocalFileLogStore {
@@ -101,9 +106,12 @@ impl LocalFileLogStore {
let active_file_cloned = active_file.clone();
Ok(Self {
files: RwLock::new(files),
files: Arc::new(RwLock::new(files)),
active: ArcSwap::new(active_file_cloned),
config: config.clone(),
obsolete_ids: Arc::new(Default::default()),
cancel_token: Mutex::new(None),
gc_task_handle: Mutex::new(None),
})
}
@@ -185,6 +193,60 @@ impl LocalFileLogStore {
}
}
async fn gc(
files: Arc<RwLock<FileMap>>,
obsolete_ids: Arc<RwLock<HashMap<LocalNamespace, u64>>>,
) -> Result<()> {
if let Some(lowest) = find_lowest_id(obsolete_ids).await {
gc_inner(files, lowest).await
} else {
Ok(())
}
}
async fn find_lowest_id(obsolete_ids: Arc<RwLock<HashMap<LocalNamespace, u64>>>) -> Option<u64> {
let mut lowest_obsolete = None;
{
let obsolete_ids = obsolete_ids.read().await;
for (ns, id) in obsolete_ids.iter() {
if *id <= *lowest_obsolete.get_or_insert(*id) {
lowest_obsolete = Some(*id);
debug!("Current lowest obsolete id: {}, namespace: {:?}", *id, ns);
}
}
}
lowest_obsolete
}
async fn gc_inner(files: Arc<RwLock<FileMap>>, obsolete_id: u64) -> Result<()> {
let mut files = files.write().await;
let files_to_delete = find_files_to_delete(&files, obsolete_id);
info!(
"Compacting log file up to entry id: {}, files to delete: {:?}",
obsolete_id, files_to_delete
);
for entry_id in files_to_delete {
if let Some(f) = files.remove(&entry_id) {
if !f.is_stopped() {
f.stop().await?;
}
f.destroy().await?;
info!("Destroyed log file: {}", f.file_name());
}
}
Ok(())
}
fn find_files_to_delete<T>(offset_map: &BTreeMap<u64, T>, entry_id: u64) -> Vec<u64> {
let mut res = vec![];
for (cur, next) in offset_map.keys().zip(offset_map.keys().skip(1)) {
if *cur < entry_id && *next <= entry_id {
res.push(*cur);
}
}
res
}
#[async_trait::async_trait]
impl LogStore for LocalFileLogStore {
type Error = Error;
@@ -192,6 +254,55 @@ impl LogStore for LocalFileLogStore {
type Entry = EntryImpl;
type AppendResponse = AppendResponseImpl;
async fn start(&self) -> Result<()> {
let files = self.files.clone();
let obsolete_ids = self.obsolete_ids.clone();
let interval = self.config.gc_interval;
let token = tokio_util::sync::CancellationToken::new();
let child = token.child_token();
let handle = common_runtime::spawn_bg(async move {
loop {
if let Err(e) = gc(files.clone(), obsolete_ids.clone()).await {
error!(e; "Failed to gc log store");
}
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = child.cancelled() => {
info!("LogStore gc task has been cancelled");
return;
}
}
}
});
*self.gc_task_handle.lock().await = Some(handle);
*self.cancel_token.lock().await = Some(token);
Ok(())
}
async fn stop(&self) -> Result<()> {
let handle = self
.gc_task_handle
.lock()
.await
.take()
.context(InvalidStateSnafu {
msg: "Logstore gc task not spawned",
})?;
let token = self
.cancel_token
.lock()
.await
.take()
.context(InvalidStateSnafu {
msg: "Logstore gc task not spawned",
})?;
token.cancel();
Ok(handle.await.context(WaitGcTaskStopSnafu)?)
}
async fn append(&self, mut entry: Self::Entry) -> Result<Self::AppendResponse> {
// TODO(hl): configurable retry times
for _ in 0..3 {
@@ -280,10 +391,25 @@ impl LogStore for LocalFileLogStore {
fn namespace(&self, id: NamespaceId) -> Self::Namespace {
LocalNamespace::new(id)
}
async fn obsolete(
&self,
namespace: Self::Namespace,
id: Id,
) -> std::result::Result<(), Self::Error> {
info!("Mark namespace obsolete entry id, {:?}:{}", namespace, id);
let mut map = self.obsolete_ids.write().await;
let prev = map.insert(namespace, id);
info!("Prev: {:?}", prev);
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::time::Duration;
use futures_util::StreamExt;
use rand::distributions::Alphanumeric;
use rand::Rng;
@@ -300,6 +426,7 @@ mod tests {
append_buffer_size: 128,
max_log_file_size: 128,
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
@@ -351,6 +478,7 @@ mod tests {
append_buffer_size: 128,
max_log_file_size: 128,
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
let ns = LocalNamespace::new(42);
@@ -382,6 +510,7 @@ mod tests {
append_buffer_size: 128,
max_log_file_size: 1024 * 1024,
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
assert_eq!(
@@ -426,4 +555,217 @@ mod tests {
assert_eq!(entries[0].id(), 1);
assert_eq!(43, entries[0].namespace_id);
}
#[test]
fn test_find_files_to_delete() {
let file_map = vec![(1u64, ()), (11u64, ()), (21u64, ()), (31u64, ())]
.into_iter()
.collect::<BTreeMap<u64, ()>>();
assert!(find_files_to_delete(&file_map, 0).is_empty());
assert!(find_files_to_delete(&file_map, 1).is_empty());
assert!(find_files_to_delete(&file_map, 2).is_empty());
assert!(find_files_to_delete(&file_map, 10).is_empty());
assert_eq!(vec![1], find_files_to_delete(&file_map, 11));
assert_eq!(vec![1], find_files_to_delete(&file_map, 20));
assert_eq!(vec![1, 11], find_files_to_delete(&file_map, 21));
assert_eq!(vec![1, 11, 21], find_files_to_delete(&file_map, 31));
assert_eq!(vec![1, 11, 21], find_files_to_delete(&file_map, 100));
}
#[tokio::test]
async fn test_find_lowest_id() {
common_telemetry::logging::init_default_ut_logging();
let dir = TempDir::new("greptimedb-log-compact").unwrap();
let config = LogConfig {
append_buffer_size: 128,
max_log_file_size: 4096,
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
assert!(find_lowest_id(logstore.obsolete_ids.clone())
.await
.is_none());
logstore
.obsolete(LocalNamespace::new(1), 100)
.await
.unwrap();
assert_eq!(
Some(100),
find_lowest_id(logstore.obsolete_ids.clone()).await
);
logstore
.obsolete(LocalNamespace::new(2), 200)
.await
.unwrap();
assert_eq!(
Some(100),
find_lowest_id(logstore.obsolete_ids.clone()).await
);
logstore
.obsolete(LocalNamespace::new(1), 101)
.await
.unwrap();
assert_eq!(
Some(101),
find_lowest_id(logstore.obsolete_ids.clone()).await
);
logstore
.obsolete(LocalNamespace::new(2), 202)
.await
.unwrap();
assert_eq!(
Some(101),
find_lowest_id(logstore.obsolete_ids.clone()).await
);
logstore
.obsolete(LocalNamespace::new(1), 300)
.await
.unwrap();
assert_eq!(
Some(202),
find_lowest_id(logstore.obsolete_ids.clone()).await
);
}
#[tokio::test]
async fn test_compact_log_file() {
common_telemetry::logging::init_default_ut_logging();
let dir = TempDir::new("greptimedb-log-compact").unwrap();
let config = LogConfig {
append_buffer_size: 128,
max_log_file_size: 4096,
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
for id in 0..50 {
logstore
.append(EntryImpl::new(
generate_data(990),
id,
LocalNamespace::new(42),
))
.await
.unwrap();
}
assert_eq!(
vec![0, 4, 8, 12, 16, 20, 24, 28, 32, 36, 40, 44, 48],
logstore
.files
.read()
.await
.keys()
.copied()
.collect::<Vec<_>>()
);
gc_inner(logstore.files.clone(), 10).await.unwrap();
assert_eq!(
vec![8, 12, 16, 20, 24, 28, 32, 36, 40, 44, 48],
logstore
.files
.read()
.await
.keys()
.copied()
.collect::<Vec<_>>()
);
gc_inner(logstore.files.clone(), 28).await.unwrap();
assert_eq!(
vec![28, 32, 36, 40, 44, 48],
logstore
.files
.read()
.await
.keys()
.copied()
.collect::<Vec<_>>()
);
gc_inner(logstore.files.clone(), 50).await.unwrap();
assert_eq!(
vec![48],
logstore
.files
.read()
.await
.keys()
.copied()
.collect::<Vec<_>>()
);
}
#[tokio::test]
async fn test_gc_task() {
common_telemetry::logging::init_default_ut_logging();
let dir = TempDir::new("greptimedb-log-compact").unwrap();
let config = LogConfig {
append_buffer_size: 128,
max_log_file_size: 4096,
log_file_dir: dir.path().to_str().unwrap().to_string(),
gc_interval: Duration::from_millis(100),
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
logstore.start().await.unwrap();
for id in 0..50 {
logstore
.append(EntryImpl::new(
generate_data(990),
id,
LocalNamespace::new(42),
))
.await
.unwrap();
}
logstore
.obsolete(LocalNamespace::new(42), 30)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;
let file_ids = logstore
.files
.read()
.await
.keys()
.cloned()
.collect::<Vec<_>>();
assert_eq!(vec![28, 32, 36, 40, 44, 48], file_ids);
let mut files = vec![];
let mut readir = tokio::fs::read_dir(dir.path()).await.unwrap();
while let Some(r) = readir.next_entry().await.transpose() {
let entry = r.unwrap();
files.push(entry.file_name().to_str().unwrap().to_string());
}
assert_eq!(
vec![
"00000000000000000028.log".to_string(),
"00000000000000000048.log".to_string(),
"00000000000000000040.log".to_string(),
"00000000000000000044.log".to_string(),
"00000000000000000036.log".to_string(),
"00000000000000000032.log".to_string()
]
.into_iter()
.collect::<HashSet<String>>(),
files.into_iter().collect::<HashSet<String>>()
);
}
}

View File

@@ -14,7 +14,7 @@
use store_api::logstore::namespace::{Id, Namespace};
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct LocalNamespace {
pub(crate) id: Id,
}

View File

@@ -33,6 +33,14 @@ impl LogStore for NoopLogStore {
type Entry = EntryImpl;
type AppendResponse = AppendResponseImpl;
async fn start(&self) -> Result<()> {
Ok(())
}
async fn stop(&self) -> Result<()> {
Ok(())
}
async fn append(&self, mut _e: Self::Entry) -> Result<Self::AppendResponse> {
Ok(AppendResponseImpl {
entry_id: 0,
@@ -72,4 +80,14 @@ impl LogStore for NoopLogStore {
fn namespace(&self, id: NamespaceId) -> Self::Namespace {
LocalNamespace::new(id)
}
async fn obsolete(
&self,
namespace: Self::Namespace,
id: Id,
) -> std::result::Result<(), Self::Error> {
let _ = namespace;
let _ = id;
Ok(())
}
}

View File

@@ -25,6 +25,7 @@ pub async fn create_tmp_local_file_log_store(dir: &str) -> (LocalFileLogStore, T
append_buffer_size: 128,
max_log_file_size: 128,
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
};
(LocalFileLogStore::open(&cfg).await.unwrap(), dir)

View File

@@ -204,6 +204,17 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display(
"Failed to mark WAL as stable, region id: {}, source: {}",
region_id,
source
))]
MarkWalStable {
region_id: u64,
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("WAL data corrupted, region_id: {}, message: {}", region_id, message))]
WalDataCorrupted {
region_id: RegionId,
@@ -409,6 +420,7 @@ impl ErrorExt for Error {
PushBatch { source, .. } => source.status_code(),
AddDefault { source, .. } => source.status_code(),
ConvertChunk { source, .. } => source.status_code(),
MarkWalStable { source, .. } => source.status_code(),
}
}

View File

@@ -223,7 +223,8 @@ impl<S: LogStore> FlushJob<S> {
edit,
self.max_memtable_id,
)
.await
.await?;
self.wal.obsolete(self.flush_sequence).await
}
/// Generates random SST file name in format: `^[a-f\d]{8}(-[a-f\d]{4}){3}-[a-f\d]{12}.parquet$`
@@ -237,9 +238,7 @@ impl<S: LogStore> Job for FlushJob<S> {
// TODO(yingwen): [flush] Support in-job parallelism (Flush memtables concurrently)
async fn run(&mut self, ctx: &Context) -> Result<()> {
let file_metas = self.write_memtables_to_layer(ctx).await?;
self.write_manifest_and_apply(&file_metas).await?;
Ok(())
}
}

View File

@@ -225,6 +225,7 @@ impl<S: LogStore> RegionImpl<S> {
}
let wal = Wal::new(metadata.id(), store_config.log_store);
wal.obsolete(flushed_sequence).await?;
let shared = Arc::new(SharedData {
id: metadata.id(),
name,

View File

@@ -24,7 +24,7 @@ use store_api::logstore::{AppendResponse, LogStore};
use store_api::storage::{RegionId, SequenceNumber};
use crate::codec::{Decoder, Encoder};
use crate::error::{self, Error, Result};
use crate::error::{self, Error, MarkWalStableSnafu, Result};
use crate::proto::wal::{self, PayloadType, WalHeader};
use crate::write_batch::codec::{
WriteBatchArrowDecoder, WriteBatchArrowEncoder, WriteBatchProtobufDecoder,
@@ -64,6 +64,16 @@ impl<S: LogStore> Wal<S> {
}
}
pub async fn obsolete(&self, seq: SequenceNumber) -> Result<()> {
self.store
.obsolete(self.namespace.clone(), seq)
.await
.map_err(BoxedError::new)
.context(MarkWalStableSnafu {
region_id: self.region_id,
})
}
#[inline]
pub fn region_id(&self) -> RegionId {
self.region_id

View File

@@ -32,6 +32,10 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
type Entry: Entry;
type AppendResponse: AppendResponse;
async fn start(&self) -> Result<(), Self::Error>;
async fn stop(&self) -> Result<(), Self::Error>;
/// Append an `Entry` to WAL with given namespace
async fn append(&self, mut e: Self::Entry) -> Result<Self::AppendResponse, Self::Error>;
@@ -65,6 +69,11 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
/// Create a namespace of the associate Namespace type
// TODO(sunng87): confusion with `create_namespace`
fn namespace(&self, id: namespace::Id) -> Self::Namespace;
/// Mark all entry ids `<=id` of given `namespace` as obsolete so that logstore can safely delete
/// the log files if all entries inside are obsolete. This method may not delete log
/// files immediately.
async fn obsolete(&self, namespace: Self::Namespace, id: Id) -> Result<(), Self::Error>;
}
pub trait AppendResponse: Send + Sync {

View File

@@ -55,7 +55,7 @@ mod tests {
#[snafu(visibility(pub))]
pub struct Error {}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct Namespace {}
impl crate::logstore::Namespace for Namespace {

View File

@@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::hash::Hash;
pub type Id = u64;
pub trait Namespace: Send + Sync + Clone + std::fmt::Debug {
pub trait Namespace: Send + Sync + Clone + std::fmt::Debug + Hash + Eq {
fn id(&self) -> Id;
}