diff --git a/Cargo.lock b/Cargo.lock index ad26abc63d..305cce0a89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3373,6 +3373,7 @@ dependencies = [ "store-api", "tempdir", "tokio", + "tokio-util", ] [[package]] diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index fa5fb8c4b4..1597ac7388 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -139,7 +139,16 @@ pub enum Error { CreateDir { dir: String, source: std::io::Error }, #[snafu(display("Failed to open log store, source: {}", source))] - OpenLogStore { source: log_store::error::Error }, + OpenLogStore { + #[snafu(backtrace)] + source: log_store::error::Error, + }, + + #[snafu(display("Failed to star log store gc task, source: {}", source))] + StartLogStore { + #[snafu(backtrace)] + source: log_store::error::Error, + }, #[snafu(display("Failed to storage engine, source: {}", source))] OpenStorageEngine { source: StorageError }, @@ -358,6 +367,7 @@ impl ErrorExt for Error { Error::BumpTableId { source, .. } => source.status_code(), Error::MissingNodeId { .. } => StatusCode::InvalidArguments, Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments, + Error::StartLogStore { source, .. } => source.status_code(), } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 88c3939d31..9d09e4724c 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -36,12 +36,13 @@ use servers::Mode; use snafu::prelude::*; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; +use store_api::logstore::LogStore; use table::table::TableIdProviderRef; use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; use crate::error::{ self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, - NewCatalogSnafu, Result, + NewCatalogSnafu, Result, StartLogStoreSnafu, }; use crate::heartbeat::HeartbeatTask; use crate::script::ScriptExecutor; @@ -61,6 +62,7 @@ pub struct Instance { pub(crate) script_executor: ScriptExecutor, pub(crate) table_id_provider: Option, pub(crate) heartbeat_task: Option, + pub(crate) logstore: Arc, } pub type InstanceRef = Arc; @@ -68,7 +70,7 @@ pub type InstanceRef = Arc; impl Instance { pub async fn new(opts: &DatanodeOptions) -> Result { let object_store = new_object_store(&opts.storage).await?; - let log_store = create_local_file_log_store(opts).await?; + let logstore = Arc::new(create_local_file_log_store(&opts.wal_dir).await?); let meta_client = match opts.mode { Mode::Standalone => None, @@ -88,7 +90,7 @@ impl Instance { TableEngineConfig::default(), EngineImpl::new( StorageEngineConfig::default(), - Arc::new(log_store), + logstore.clone(), object_store.clone(), ), object_store, @@ -158,6 +160,7 @@ impl Instance { script_executor, heartbeat_task, table_id_provider, + logstore, }) } @@ -166,6 +169,7 @@ impl Instance { .start() .await .context(NewCatalogSnafu)?; + self.logstore.start().await.context(StartLogStoreSnafu)?; if let Some(task) = &self.heartbeat_task { task.start().await?; } @@ -272,16 +276,16 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOpts) -> Resul } pub(crate) async fn create_local_file_log_store( - opts: &DatanodeOptions, + path: impl AsRef, ) -> Result { + let path = path.as_ref(); // create WAL directory - fs::create_dir_all(path::Path::new(&opts.wal_dir)) - .context(error::CreateDirSnafu { dir: &opts.wal_dir })?; + fs::create_dir_all(path::Path::new(path)).context(error::CreateDirSnafu { dir: path })?; - info!("The WAL directory is: {}", &opts.wal_dir); + info!("The WAL directory is: {}", path); let log_config = LogConfig { - log_file_dir: opts.wal_dir.clone(), + log_file_dir: path.to_string(), ..Default::default() }; diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index b37d491690..f8a0460012 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -38,10 +38,11 @@ impl Instance { // TODO(LFC): Delete it when callers no longer need it. pub async fn new_mock() -> Result { use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine}; - let mock_info = meta_srv::mocks::mock_with_memstore().await; let meta_client = Arc::new(mock_meta_client(mock_info, 0).await); - let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; + let (dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; + + let logstore = Arc::new(create_local_file_log_store(dir.path().to_str().unwrap()).await?); let mock_engine = Arc::new(MockMitoEngine::new( TableEngineConfig::default(), MockEngine::default(), @@ -80,6 +81,7 @@ impl Instance { script_executor, heartbeat_task, table_id_provider, + logstore, }) } @@ -90,13 +92,13 @@ impl Instance { pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result { let object_store = new_object_store(&opts.storage).await?; - let log_store = create_local_file_log_store(opts).await?; + let logstore = Arc::new(create_local_file_log_store(&opts.wal_dir).await?); let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await); let table_engine = Arc::new(DefaultEngine::new( TableEngineConfig::default(), EngineImpl::new( StorageEngineConfig::default(), - Arc::new(log_store), + logstore.clone(), object_store.clone(), ), object_store, @@ -132,6 +134,7 @@ impl Instance { script_executor, table_id_provider: Some(Arc::new(LocalTableIdProvider::default())), heartbeat_task: Some(heartbeat_task), + logstore, }) } } diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 77dcd45619..77530c6ac1 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -23,6 +23,7 @@ snafu = { version = "0.7", features = ["backtraces"] } store-api = { path = "../store-api" } tempdir = "0.3" tokio = { version = "1.18", features = ["full"] } +tokio-util = "0.7" [dev-dependencies] rand = "0.8" diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 9b26e3db4d..e510038ddb 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -17,6 +17,7 @@ use std::any::Any; use common_error::ext::BoxedError; use common_error::prelude::{ErrorExt, Snafu}; use snafu::{Backtrace, ErrorCompat}; +use tokio::task::JoinError; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] @@ -89,6 +90,15 @@ pub enum Error { #[snafu(display("Failed while waiting for write to finish, source: {}", source))] WaitWrite { source: tokio::task::JoinError }, + + #[snafu(display("Invalid logstore status, msg: {}", msg))] + InvalidState { msg: String, backtrace: Backtrace }, + + #[snafu(display("Failed to wait for gc task to stop, source: {}", source))] + WaitGcTaskStop { + source: JoinError, + backtrace: Backtrace, + }, } impl ErrorExt for Error { diff --git a/src/log-store/src/fs/config.rs b/src/log-store/src/fs/config.rs index c46abcd5a4..3a6c84a2ee 100644 --- a/src/log-store/src/fs/config.rs +++ b/src/log-store/src/fs/config.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + #[derive(Debug, Clone)] pub struct LogConfig { pub append_buffer_size: usize, pub max_log_file_size: usize, pub log_file_dir: String, + pub gc_interval: Duration, } impl Default for LogConfig { @@ -27,6 +30,7 @@ impl Default for LogConfig { append_buffer_size: 128, max_log_file_size: 1024 * 1024 * 1024, log_file_dir: "/tmp/greptimedb".to_string(), + gc_interval: Duration::from_secs(10 * 60), } } } @@ -44,5 +48,6 @@ mod tests { info!("LogConfig::default(): {:?}", default); assert_eq!(1024 * 1024 * 1024, default.max_log_file_size); assert_eq!(128, default.append_buffer_size); + assert_eq!(Duration::from_secs(600), default.gc_interval); } } diff --git a/src/log-store/src/fs/file.rs b/src/log-store/src/fs/file.rs index 57c7ade569..163cbe22d6 100644 --- a/src/log-store/src/fs/file.rs +++ b/src/log-store/src/fs/file.rs @@ -55,11 +55,12 @@ const LOG_WRITER_BATCH_SIZE: usize = 16; /// Wraps File operation to get rid of `&mut self` requirements struct FileWriter { inner: Arc, + path: String, } impl FileWriter { - pub fn new(file: Arc) -> Self { - Self { inner: file } + pub fn new(file: Arc, path: String) -> Self { + Self { inner: file, path } } pub async fn write(&self, data: Bytes, offset: u64) -> Result<()> { @@ -100,6 +101,11 @@ impl FileWriter { .await .context(WaitWriteSnafu)? } + + pub async fn destroy(&self) -> Result<()> { + tokio::fs::remove_file(&self.path).await.context(IoSnafu)?; + Ok(()) + } } pub type LogFileRef = Arc; @@ -128,7 +134,7 @@ pub struct LogFile { impl Drop for LogFile { fn drop(&mut self) { self.state.stopped.store(true, Ordering::Relaxed); - info!("Stopping log file {}", self.name); + info!("Dropping log file {}", self.name); } } @@ -143,12 +149,12 @@ impl LogFile { .open(path.clone()) .context(OpenLogSnafu { file_name: &path })?; - let file_name: FileName = path.as_str().try_into()?; + let file_name = FileName::try_from(path.as_str())?; let start_entry_id = file_name.entry_id(); let mut log = Self { name: file_name, - writer: Arc::new(FileWriter::new(Arc::new(file))), + writer: Arc::new(FileWriter::new(Arc::new(file), path.clone())), start_entry_id, pending_request_tx: None, notify: Arc::new(Notify::new()), @@ -243,6 +249,11 @@ impl LogFile { res } + pub async fn destroy(&self) -> Result<()> { + self.writer.destroy().await?; + Ok(()) + } + async fn handle_batch( mut batch: Vec, state: &Arc, @@ -477,6 +488,11 @@ impl LogFile { self.state.sealed.load(Ordering::Acquire) } + #[inline] + pub fn is_stopped(&self) -> bool { + self.state.stopped.load(Ordering::Acquire) + } + #[inline] pub fn unseal(&self) { self.state.sealed.store(false, Ordering::Release); diff --git a/src/log-store/src/fs/log.rs b/src/log-store/src/fs/log.rs index 0ecd0c0a3d..38c8dc285f 100644 --- a/src/log-store/src/fs/log.rs +++ b/src/log-store/src/fs/log.rs @@ -12,24 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::path::Path; use std::sync::Arc; use arc_swap::ArcSwap; use async_stream::stream; -use common_telemetry::{error, info, warn}; +use common_telemetry::{debug, error, info, warn}; use futures::{pin_mut, StreamExt}; use snafu::{OptionExt, ResultExt}; use store_api::logstore::entry::{Encode, Entry, Id}; use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::{Id as NamespaceId, Namespace}; use store_api::logstore::LogStore; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; use crate::error::{ - CreateDirSnafu, DuplicateFileSnafu, Error, FileNameIllegalSnafu, InternalSnafu, IoSnafu, - ReadPathSnafu, Result, + CreateDirSnafu, DuplicateFileSnafu, Error, FileNameIllegalSnafu, InternalSnafu, + InvalidStateSnafu, IoSnafu, ReadPathSnafu, Result, WaitGcTaskStopSnafu, }; use crate::fs::config::LogConfig; use crate::fs::entry::EntryImpl; @@ -42,9 +44,12 @@ type FileMap = BTreeMap; #[derive(Debug)] pub struct LocalFileLogStore { - files: RwLock, + files: Arc>, active: ArcSwap, config: LogConfig, + obsolete_ids: Arc>>, + cancel_token: Mutex>, + gc_task_handle: Mutex>>, } impl LocalFileLogStore { @@ -101,9 +106,12 @@ impl LocalFileLogStore { let active_file_cloned = active_file.clone(); Ok(Self { - files: RwLock::new(files), + files: Arc::new(RwLock::new(files)), active: ArcSwap::new(active_file_cloned), config: config.clone(), + obsolete_ids: Arc::new(Default::default()), + cancel_token: Mutex::new(None), + gc_task_handle: Mutex::new(None), }) } @@ -185,6 +193,60 @@ impl LocalFileLogStore { } } +async fn gc( + files: Arc>, + obsolete_ids: Arc>>, +) -> Result<()> { + if let Some(lowest) = find_lowest_id(obsolete_ids).await { + gc_inner(files, lowest).await + } else { + Ok(()) + } +} + +async fn find_lowest_id(obsolete_ids: Arc>>) -> Option { + let mut lowest_obsolete = None; + { + let obsolete_ids = obsolete_ids.read().await; + for (ns, id) in obsolete_ids.iter() { + if *id <= *lowest_obsolete.get_or_insert(*id) { + lowest_obsolete = Some(*id); + debug!("Current lowest obsolete id: {}, namespace: {:?}", *id, ns); + } + } + } + lowest_obsolete +} + +async fn gc_inner(files: Arc>, obsolete_id: u64) -> Result<()> { + let mut files = files.write().await; + let files_to_delete = find_files_to_delete(&files, obsolete_id); + info!( + "Compacting log file up to entry id: {}, files to delete: {:?}", + obsolete_id, files_to_delete + ); + for entry_id in files_to_delete { + if let Some(f) = files.remove(&entry_id) { + if !f.is_stopped() { + f.stop().await?; + } + f.destroy().await?; + info!("Destroyed log file: {}", f.file_name()); + } + } + Ok(()) +} + +fn find_files_to_delete(offset_map: &BTreeMap, entry_id: u64) -> Vec { + let mut res = vec![]; + for (cur, next) in offset_map.keys().zip(offset_map.keys().skip(1)) { + if *cur < entry_id && *next <= entry_id { + res.push(*cur); + } + } + res +} + #[async_trait::async_trait] impl LogStore for LocalFileLogStore { type Error = Error; @@ -192,6 +254,55 @@ impl LogStore for LocalFileLogStore { type Entry = EntryImpl; type AppendResponse = AppendResponseImpl; + async fn start(&self) -> Result<()> { + let files = self.files.clone(); + let obsolete_ids = self.obsolete_ids.clone(); + let interval = self.config.gc_interval; + let token = tokio_util::sync::CancellationToken::new(); + let child = token.child_token(); + + let handle = common_runtime::spawn_bg(async move { + loop { + if let Err(e) = gc(files.clone(), obsolete_ids.clone()).await { + error!(e; "Failed to gc log store"); + } + + tokio::select! { + _ = tokio::time::sleep(interval) => {} + _ = child.cancelled() => { + info!("LogStore gc task has been cancelled"); + return; + } + } + } + }); + + *self.gc_task_handle.lock().await = Some(handle); + *self.cancel_token.lock().await = Some(token); + Ok(()) + } + + async fn stop(&self) -> Result<()> { + let handle = self + .gc_task_handle + .lock() + .await + .take() + .context(InvalidStateSnafu { + msg: "Logstore gc task not spawned", + })?; + let token = self + .cancel_token + .lock() + .await + .take() + .context(InvalidStateSnafu { + msg: "Logstore gc task not spawned", + })?; + token.cancel(); + Ok(handle.await.context(WaitGcTaskStopSnafu)?) + } + async fn append(&self, mut entry: Self::Entry) -> Result { // TODO(hl): configurable retry times for _ in 0..3 { @@ -280,10 +391,25 @@ impl LogStore for LocalFileLogStore { fn namespace(&self, id: NamespaceId) -> Self::Namespace { LocalNamespace::new(id) } + + async fn obsolete( + &self, + namespace: Self::Namespace, + id: Id, + ) -> std::result::Result<(), Self::Error> { + info!("Mark namespace obsolete entry id, {:?}:{}", namespace, id); + let mut map = self.obsolete_ids.write().await; + let prev = map.insert(namespace, id); + info!("Prev: {:?}", prev); + Ok(()) + } } #[cfg(test)] mod tests { + use std::collections::HashSet; + use std::time::Duration; + use futures_util::StreamExt; use rand::distributions::Alphanumeric; use rand::Rng; @@ -300,6 +426,7 @@ mod tests { append_buffer_size: 128, max_log_file_size: 128, log_file_dir: dir.path().to_str().unwrap().to_string(), + ..Default::default() }; let logstore = LocalFileLogStore::open(&config).await.unwrap(); @@ -351,6 +478,7 @@ mod tests { append_buffer_size: 128, max_log_file_size: 128, log_file_dir: dir.path().to_str().unwrap().to_string(), + ..Default::default() }; let logstore = LocalFileLogStore::open(&config).await.unwrap(); let ns = LocalNamespace::new(42); @@ -382,6 +510,7 @@ mod tests { append_buffer_size: 128, max_log_file_size: 1024 * 1024, log_file_dir: dir.path().to_str().unwrap().to_string(), + ..Default::default() }; let logstore = LocalFileLogStore::open(&config).await.unwrap(); assert_eq!( @@ -426,4 +555,217 @@ mod tests { assert_eq!(entries[0].id(), 1); assert_eq!(43, entries[0].namespace_id); } + + #[test] + fn test_find_files_to_delete() { + let file_map = vec![(1u64, ()), (11u64, ()), (21u64, ()), (31u64, ())] + .into_iter() + .collect::>(); + + assert!(find_files_to_delete(&file_map, 0).is_empty()); + assert!(find_files_to_delete(&file_map, 1).is_empty()); + assert!(find_files_to_delete(&file_map, 2).is_empty()); + assert!(find_files_to_delete(&file_map, 10).is_empty()); + + assert_eq!(vec![1], find_files_to_delete(&file_map, 11)); + assert_eq!(vec![1], find_files_to_delete(&file_map, 20)); + assert_eq!(vec![1, 11], find_files_to_delete(&file_map, 21)); + + assert_eq!(vec![1, 11, 21], find_files_to_delete(&file_map, 31)); + assert_eq!(vec![1, 11, 21], find_files_to_delete(&file_map, 100)); + } + + #[tokio::test] + async fn test_find_lowest_id() { + common_telemetry::logging::init_default_ut_logging(); + let dir = TempDir::new("greptimedb-log-compact").unwrap(); + let config = LogConfig { + append_buffer_size: 128, + max_log_file_size: 4096, + log_file_dir: dir.path().to_str().unwrap().to_string(), + ..Default::default() + }; + let logstore = LocalFileLogStore::open(&config).await.unwrap(); + assert!(find_lowest_id(logstore.obsolete_ids.clone()) + .await + .is_none()); + + logstore + .obsolete(LocalNamespace::new(1), 100) + .await + .unwrap(); + assert_eq!( + Some(100), + find_lowest_id(logstore.obsolete_ids.clone()).await + ); + + logstore + .obsolete(LocalNamespace::new(2), 200) + .await + .unwrap(); + assert_eq!( + Some(100), + find_lowest_id(logstore.obsolete_ids.clone()).await + ); + + logstore + .obsolete(LocalNamespace::new(1), 101) + .await + .unwrap(); + assert_eq!( + Some(101), + find_lowest_id(logstore.obsolete_ids.clone()).await + ); + + logstore + .obsolete(LocalNamespace::new(2), 202) + .await + .unwrap(); + assert_eq!( + Some(101), + find_lowest_id(logstore.obsolete_ids.clone()).await + ); + + logstore + .obsolete(LocalNamespace::new(1), 300) + .await + .unwrap(); + assert_eq!( + Some(202), + find_lowest_id(logstore.obsolete_ids.clone()).await + ); + } + + #[tokio::test] + async fn test_compact_log_file() { + common_telemetry::logging::init_default_ut_logging(); + let dir = TempDir::new("greptimedb-log-compact").unwrap(); + let config = LogConfig { + append_buffer_size: 128, + max_log_file_size: 4096, + log_file_dir: dir.path().to_str().unwrap().to_string(), + ..Default::default() + }; + let logstore = LocalFileLogStore::open(&config).await.unwrap(); + + for id in 0..50 { + logstore + .append(EntryImpl::new( + generate_data(990), + id, + LocalNamespace::new(42), + )) + .await + .unwrap(); + } + + assert_eq!( + vec![0, 4, 8, 12, 16, 20, 24, 28, 32, 36, 40, 44, 48], + logstore + .files + .read() + .await + .keys() + .copied() + .collect::>() + ); + + gc_inner(logstore.files.clone(), 10).await.unwrap(); + + assert_eq!( + vec![8, 12, 16, 20, 24, 28, 32, 36, 40, 44, 48], + logstore + .files + .read() + .await + .keys() + .copied() + .collect::>() + ); + + gc_inner(logstore.files.clone(), 28).await.unwrap(); + + assert_eq!( + vec![28, 32, 36, 40, 44, 48], + logstore + .files + .read() + .await + .keys() + .copied() + .collect::>() + ); + + gc_inner(logstore.files.clone(), 50).await.unwrap(); + + assert_eq!( + vec![48], + logstore + .files + .read() + .await + .keys() + .copied() + .collect::>() + ); + } + + #[tokio::test] + async fn test_gc_task() { + common_telemetry::logging::init_default_ut_logging(); + let dir = TempDir::new("greptimedb-log-compact").unwrap(); + let config = LogConfig { + append_buffer_size: 128, + max_log_file_size: 4096, + log_file_dir: dir.path().to_str().unwrap().to_string(), + gc_interval: Duration::from_millis(100), + }; + let logstore = LocalFileLogStore::open(&config).await.unwrap(); + logstore.start().await.unwrap(); + + for id in 0..50 { + logstore + .append(EntryImpl::new( + generate_data(990), + id, + LocalNamespace::new(42), + )) + .await + .unwrap(); + } + logstore + .obsolete(LocalNamespace::new(42), 30) + .await + .unwrap(); + tokio::time::sleep(Duration::from_millis(150)).await; + let file_ids = logstore + .files + .read() + .await + .keys() + .cloned() + .collect::>(); + assert_eq!(vec![28, 32, 36, 40, 44, 48], file_ids); + + let mut files = vec![]; + let mut readir = tokio::fs::read_dir(dir.path()).await.unwrap(); + while let Some(r) = readir.next_entry().await.transpose() { + let entry = r.unwrap(); + files.push(entry.file_name().to_str().unwrap().to_string()); + } + + assert_eq!( + vec![ + "00000000000000000028.log".to_string(), + "00000000000000000048.log".to_string(), + "00000000000000000040.log".to_string(), + "00000000000000000044.log".to_string(), + "00000000000000000036.log".to_string(), + "00000000000000000032.log".to_string() + ] + .into_iter() + .collect::>(), + files.into_iter().collect::>() + ); + } } diff --git a/src/log-store/src/fs/namespace.rs b/src/log-store/src/fs/namespace.rs index 97a833a1d0..05203903b7 100644 --- a/src/log-store/src/fs/namespace.rs +++ b/src/log-store/src/fs/namespace.rs @@ -14,7 +14,7 @@ use store_api::logstore::namespace::{Id, Namespace}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct LocalNamespace { pub(crate) id: Id, } diff --git a/src/log-store/src/fs/noop.rs b/src/log-store/src/fs/noop.rs index 8d52f3fad7..099d3a9ce9 100644 --- a/src/log-store/src/fs/noop.rs +++ b/src/log-store/src/fs/noop.rs @@ -33,6 +33,14 @@ impl LogStore for NoopLogStore { type Entry = EntryImpl; type AppendResponse = AppendResponseImpl; + async fn start(&self) -> Result<()> { + Ok(()) + } + + async fn stop(&self) -> Result<()> { + Ok(()) + } + async fn append(&self, mut _e: Self::Entry) -> Result { Ok(AppendResponseImpl { entry_id: 0, @@ -72,4 +80,14 @@ impl LogStore for NoopLogStore { fn namespace(&self, id: NamespaceId) -> Self::Namespace { LocalNamespace::new(id) } + + async fn obsolete( + &self, + namespace: Self::Namespace, + id: Id, + ) -> std::result::Result<(), Self::Error> { + let _ = namespace; + let _ = id; + Ok(()) + } } diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs index fb11928ae3..a8d4d24f88 100644 --- a/src/log-store/src/test_util/log_store_util.rs +++ b/src/log-store/src/test_util/log_store_util.rs @@ -25,6 +25,7 @@ pub async fn create_tmp_local_file_log_store(dir: &str) -> (LocalFileLogStore, T append_buffer_size: 128, max_log_file_size: 128, log_file_dir: dir.path().to_str().unwrap().to_string(), + ..Default::default() }; (LocalFileLogStore::open(&cfg).await.unwrap(), dir) diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index de6d8e002b..bc86199f23 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -204,6 +204,17 @@ pub enum Error { source: BoxedError, }, + #[snafu(display( + "Failed to mark WAL as stable, region id: {}, source: {}", + region_id, + source + ))] + MarkWalStable { + region_id: u64, + #[snafu(backtrace)] + source: BoxedError, + }, + #[snafu(display("WAL data corrupted, region_id: {}, message: {}", region_id, message))] WalDataCorrupted { region_id: RegionId, @@ -409,6 +420,7 @@ impl ErrorExt for Error { PushBatch { source, .. } => source.status_code(), AddDefault { source, .. } => source.status_code(), ConvertChunk { source, .. } => source.status_code(), + MarkWalStable { source, .. } => source.status_code(), } } diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 9996919b3b..51c8f00565 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -223,7 +223,8 @@ impl FlushJob { edit, self.max_memtable_id, ) - .await + .await?; + self.wal.obsolete(self.flush_sequence).await } /// Generates random SST file name in format: `^[a-f\d]{8}(-[a-f\d]{4}){3}-[a-f\d]{12}.parquet$` @@ -237,9 +238,7 @@ impl Job for FlushJob { // TODO(yingwen): [flush] Support in-job parallelism (Flush memtables concurrently) async fn run(&mut self, ctx: &Context) -> Result<()> { let file_metas = self.write_memtables_to_layer(ctx).await?; - self.write_manifest_and_apply(&file_metas).await?; - Ok(()) } } diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 59d7d7e855..305f877f86 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -225,6 +225,7 @@ impl RegionImpl { } let wal = Wal::new(metadata.id(), store_config.log_store); + wal.obsolete(flushed_sequence).await?; let shared = Arc::new(SharedData { id: metadata.id(), name, diff --git a/src/storage/src/wal.rs b/src/storage/src/wal.rs index acd5baca63..10d3882cc8 100644 --- a/src/storage/src/wal.rs +++ b/src/storage/src/wal.rs @@ -24,7 +24,7 @@ use store_api::logstore::{AppendResponse, LogStore}; use store_api::storage::{RegionId, SequenceNumber}; use crate::codec::{Decoder, Encoder}; -use crate::error::{self, Error, Result}; +use crate::error::{self, Error, MarkWalStableSnafu, Result}; use crate::proto::wal::{self, PayloadType, WalHeader}; use crate::write_batch::codec::{ WriteBatchArrowDecoder, WriteBatchArrowEncoder, WriteBatchProtobufDecoder, @@ -64,6 +64,16 @@ impl Wal { } } + pub async fn obsolete(&self, seq: SequenceNumber) -> Result<()> { + self.store + .obsolete(self.namespace.clone(), seq) + .await + .map_err(BoxedError::new) + .context(MarkWalStableSnafu { + region_id: self.region_id, + }) + } + #[inline] pub fn region_id(&self) -> RegionId { self.region_id diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index effe838d44..80be436ea0 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -32,6 +32,10 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { type Entry: Entry; type AppendResponse: AppendResponse; + async fn start(&self) -> Result<(), Self::Error>; + + async fn stop(&self) -> Result<(), Self::Error>; + /// Append an `Entry` to WAL with given namespace async fn append(&self, mut e: Self::Entry) -> Result; @@ -65,6 +69,11 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { /// Create a namespace of the associate Namespace type // TODO(sunng87): confusion with `create_namespace` fn namespace(&self, id: namespace::Id) -> Self::Namespace; + + /// Mark all entry ids `<=id` of given `namespace` as obsolete so that logstore can safely delete + /// the log files if all entries inside are obsolete. This method may not delete log + /// files immediately. + async fn obsolete(&self, namespace: Self::Namespace, id: Id) -> Result<(), Self::Error>; } pub trait AppendResponse: Send + Sync { diff --git a/src/store-api/src/logstore/entry_stream.rs b/src/store-api/src/logstore/entry_stream.rs index 6d9cfa9c8e..ac8c574bb1 100644 --- a/src/store-api/src/logstore/entry_stream.rs +++ b/src/store-api/src/logstore/entry_stream.rs @@ -55,7 +55,7 @@ mod tests { #[snafu(visibility(pub))] pub struct Error {} - #[derive(Debug, Clone)] + #[derive(Debug, Clone, Eq, PartialEq, Hash)] pub struct Namespace {} impl crate::logstore::Namespace for Namespace { diff --git a/src/store-api/src/logstore/namespace.rs b/src/store-api/src/logstore/namespace.rs index bf71ca4641..d056939c95 100644 --- a/src/store-api/src/logstore/namespace.rs +++ b/src/store-api/src/logstore/namespace.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::hash::Hash; + pub type Id = u64; -pub trait Namespace: Send + Sync + Clone + std::fmt::Debug { +pub trait Namespace: Send + Sync + Clone + std::fmt::Debug + Hash + Eq { fn id(&self) -> Id; }