From 563adbabe98e4aed8ad2ab1c491f1dbcb41f1688 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Fri, 31 Mar 2023 10:42:00 +0800 Subject: [PATCH] feat!: improve region manifest service (#1268) * feat: try to use batch delete in ManifestLogStorage * feat: clean temp dir when startup with file backend * refactor: export region manifest checkpoint actions magin and refactor storage options * feat: purge unused manifest and checkpoint files by repeat gc task * chore: debug deleted logs * feat: adds RepeatedTask and refactor all gc tasks * chore: clean code * feat: export gc_duration to manifest config * test: assert gc works * fix: typo * Update src/common/runtime/src/error.rs Co-authored-by: LFC * Update src/common/runtime/src/repeated_task.rs Co-authored-by: LFC * Update src/common/runtime/src/repeated_task.rs Co-authored-by: LFC * fix: format * Update src/common/runtime/src/repeated_task.rs Co-authored-by: Yingwen * chore: by CR comments * chore: by CR comments * fix: serde default for StorageConfig * chore: remove compaction config in StandaloneOptions --------- Co-authored-by: LFC Co-authored-by: Yingwen --- Cargo.lock | 2 + config/datanode.example.toml | 10 +- config/standalone.example.toml | 11 +- src/cmd/src/datanode.rs | 33 ++-- src/cmd/src/standalone.rs | 11 +- src/common/runtime/Cargo.toml | 2 + src/common/runtime/src/error.rs | 14 ++ src/common/runtime/src/lib.rs | 2 + src/common/runtime/src/repeated_task.rs | 174 +++++++++++++++++++++ src/datanode/src/datanode.rs | 46 +++++- src/datanode/src/error.rs | 4 + src/datanode/src/instance.rs | 18 ++- src/datanode/src/tests/test_util.rs | 13 +- src/frontend/src/tests.rs | 22 ++- src/log-store/src/error.rs | 16 +- src/log-store/src/raft_engine/log_store.rs | 118 ++++++-------- src/mito/src/manifest.rs | 2 +- src/mito/src/table.rs | 4 +- src/object-store/src/lib.rs | 1 + src/storage/src/config.rs | 6 + src/storage/src/engine.rs | 44 ++++-- src/storage/src/error.rs | 19 ++- src/storage/src/manifest/impl_.rs | 88 +++++++++-- src/storage/src/manifest/region.rs | 58 +++++-- src/storage/src/manifest/storage.rs | 118 +++++++++++--- src/storage/src/region.rs | 3 +- src/storage/src/region/tests.rs | 3 +- src/storage/src/test_util/config_util.rs | 4 +- src/store-api/src/manifest.rs | 9 ++ src/store-api/src/manifest/storage.rs | 4 + tests-integration/src/test_util.rs | 17 +- 31 files changed, 686 insertions(+), 190 deletions(-) create mode 100644 src/common/runtime/src/repeated_task.rs diff --git a/Cargo.lock b/Cargo.lock index b2d67598c1..2703420dcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1730,6 +1730,7 @@ dependencies = [ name = "common-runtime" version = "0.1.1" dependencies = [ + "async-trait", "common-error", "common-telemetry", "metrics", @@ -1738,6 +1739,7 @@ dependencies = [ "snafu", "tokio", "tokio-test", + "tokio-util", ] [[package]] diff --git a/config/datanode.example.toml b/config/datanode.example.toml index c2f2ccca90..cab3f17428 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -37,11 +37,19 @@ type = "File" data_dir = "/tmp/greptimedb/data/" # Compaction options, see `standalone.example.toml`. -[compaction] +[storage.compaction] max_inflight_tasks = 4 max_files_in_level0 = 8 max_purge_tasks = 32 +# Storage manifest options +[storage.manifest] +# Region checkpoint actions margin. +# Create a checkpoint every actions. +checkpoint_margin = 10 +# Region manifest logs and checkpoints gc execution duration +gc_duration = '30s' + # Procedure storage options, see `standalone.example.toml`. # [procedure.store] # type = "File" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index c9a0d28f9f..e06ca04a04 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -99,7 +99,7 @@ type = "File" data_dir = "/tmp/greptimedb/data/" # Compaction options. -[compaction] +[storage.compaction] # Max task number that can concurrently run. max_inflight_tasks = 4 # Max files in level 0 to trigger compaction. @@ -107,6 +107,15 @@ max_files_in_level0 = 8 # Max task number for SST purge task after compaction. max_purge_tasks = 32 +# Storage manifest options +[storage.manifest] +# Region checkpoint actions margin. +# Create a checkpoint every actions. +checkpoint_margin = 10 +# Region manifest logs and checkpoints gc execution duration +gc_duration = '30s' + + # Procedure storage options. # Uncomment to enable. # [procedure.store] diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 7085987ab0..27492982f8 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -146,7 +146,7 @@ impl TryFrom for DatanodeOptions { } if let Some(data_dir) = cmd.data_dir { - opts.storage = ObjectStoreConfig::File(FileConfig { data_dir }); + opts.storage.store = ObjectStoreConfig::File(FileConfig { data_dir }); } if let Some(wal_dir) = cmd.wal_dir { @@ -167,7 +167,7 @@ mod tests { use std::time::Duration; use common_test_util::temp_dir::create_named_temp_file; - use datanode::datanode::{CompactionConfig, ObjectStoreConfig}; + use datanode::datanode::{CompactionConfig, ObjectStoreConfig, RegionManifestConfig}; use servers::Mode; use super::*; @@ -203,10 +203,14 @@ mod tests { type = "File" data_dir = "/tmp/greptimedb/data/" - [compaction] - max_inflight_tasks = 4 - max_files_in_level0 = 8 + [storage.compaction] + max_inflight_tasks = 3 + max_files_in_level0 = 7 max_purge_tasks = 32 + + [storage.manifest] + checkpoint_margin = 9 + gc_duration = '7s' "#; write!(file, "{}", toml_str).unwrap(); @@ -237,9 +241,9 @@ mod tests { assert_eq!(3000, timeout_millis); assert!(tcp_nodelay); - match options.storage { - ObjectStoreConfig::File(FileConfig { data_dir }) => { - assert_eq!("/tmp/greptimedb/data/".to_string(), data_dir) + match &options.storage.store { + ObjectStoreConfig::File(FileConfig { data_dir, .. }) => { + assert_eq!("/tmp/greptimedb/data/", data_dir) } ObjectStoreConfig::S3 { .. } => unreachable!(), ObjectStoreConfig::Oss { .. } => unreachable!(), @@ -247,11 +251,18 @@ mod tests { assert_eq!( CompactionConfig { - max_inflight_tasks: 4, - max_files_in_level0: 8, + max_inflight_tasks: 3, + max_files_in_level0: 7, max_purge_tasks: 32, }, - options.compaction + options.storage.compaction, + ); + assert_eq!( + RegionManifestConfig { + checkpoint_margin: Some(9), + gc_duration: Some(Duration::from_secs(7)), + }, + options.storage.manifest, ); } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index b159052aaf..b5fcfa35e3 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -17,9 +17,7 @@ use std::sync::Arc; use clap::Parser; use common_base::Plugins; use common_telemetry::info; -use datanode::datanode::{ - CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig, -}; +use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig, WalConfig}; use datanode::instance::InstanceRef; use frontend::frontend::FrontendOptions; use frontend::grpc::GrpcOptions; @@ -82,8 +80,7 @@ pub struct StandaloneOptions { pub prometheus_options: Option, pub prom_options: Option, pub wal: WalConfig, - pub storage: ObjectStoreConfig, - pub compaction: CompactionConfig, + pub storage: StorageConfig, pub procedure: Option, } @@ -101,8 +98,7 @@ impl Default for StandaloneOptions { prometheus_options: Some(PrometheusOptions::default()), prom_options: Some(PromOptions::default()), wal: WalConfig::default(), - storage: ObjectStoreConfig::default(), - compaction: CompactionConfig::default(), + storage: StorageConfig::default(), procedure: None, } } @@ -129,7 +125,6 @@ impl StandaloneOptions { enable_memory_catalog: self.enable_memory_catalog, wal: self.wal, storage: self.storage, - compaction: self.compaction, procedure: self.procedure, ..Default::default() } diff --git a/src/common/runtime/Cargo.toml b/src/common/runtime/Cargo.toml index e29674e7e5..18cccda32f 100644 --- a/src/common/runtime/Cargo.toml +++ b/src/common/runtime/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +async-trait.workspace = true common-error = { path = "../error" } common-telemetry = { path = "../telemetry" } metrics = "0.20" @@ -12,6 +13,7 @@ once_cell = "1.12" paste.workspace = true snafu.workspace = true tokio.workspace = true +tokio-util.workspace = true [dev-dependencies] tokio-test = "0.4" diff --git a/src/common/runtime/src/error.rs b/src/common/runtime/src/error.rs index 0a4c4d0730..49088825a9 100644 --- a/src/common/runtime/src/error.rs +++ b/src/common/runtime/src/error.rs @@ -15,6 +15,7 @@ use std::any::Any; use common_error::prelude::*; +use tokio::task::JoinError; pub type Result = std::result::Result; @@ -26,6 +27,19 @@ pub enum Error { source: std::io::Error, backtrace: Backtrace, }, + #[snafu(display("Repeated task {} not started yet", name))] + IllegalState { name: String, backtrace: Backtrace }, + + #[snafu(display( + "Failed to wait for repeated task {} to stop, source: {}", + name, + source + ))] + WaitGcTaskStop { + name: String, + source: JoinError, + backtrace: Backtrace, + }, } impl ErrorExt for Error { diff --git a/src/common/runtime/src/lib.rs b/src/common/runtime/src/lib.rs index b7bed53c77..2a5b3a6b65 100644 --- a/src/common/runtime/src/lib.rs +++ b/src/common/runtime/src/lib.rs @@ -15,6 +15,7 @@ pub mod error; mod global; pub mod metric; +mod repeated_task; pub mod runtime; pub use global::{ @@ -23,4 +24,5 @@ pub use global::{ spawn_read, spawn_write, write_runtime, }; +pub use crate::repeated_task::{RepeatedTask, TaskFunction, TaskFunctionRef}; pub use crate::runtime::{Builder, JoinError, JoinHandle, Runtime}; diff --git a/src/common/runtime/src/repeated_task.rs b/src/common/runtime/src/repeated_task.rs new file mode 100644 index 0000000000..4af75dc488 --- /dev/null +++ b/src/common/runtime/src/repeated_task.rs @@ -0,0 +1,174 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use common_error::prelude::ErrorExt; +use common_telemetry::logging; +use snafu::{ensure, OptionExt, ResultExt}; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; + +use crate::error::{IllegalStateSnafu, Result, WaitGcTaskStopSnafu}; +use crate::Runtime; + +#[async_trait::async_trait] +pub trait TaskFunction { + async fn call(&self) -> std::result::Result<(), E>; + fn name(&self) -> &str; +} + +pub type TaskFunctionRef = Arc + Send + Sync>; + +pub struct RepeatedTask { + cancel_token: Mutex>, + task_handle: Mutex>>, + started: AtomicBool, + interval: Duration, + task_fn: TaskFunctionRef, +} + +impl std::fmt::Display for RepeatedTask { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "RepeatedTask({})", self.task_fn.name()) + } +} + +impl std::fmt::Debug for RepeatedTask { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("RepeatedTask") + .field(&self.task_fn.name()) + .finish() + } +} + +impl RepeatedTask { + pub fn new(interval: Duration, task_fn: TaskFunctionRef) -> Self { + Self { + cancel_token: Mutex::new(None), + task_handle: Mutex::new(None), + started: AtomicBool::new(false), + interval, + task_fn, + } + } + + pub fn started(&self) -> bool { + self.started.load(Ordering::Relaxed) + } + + pub async fn start(&self, runtime: Runtime) -> Result<()> { + let token = CancellationToken::new(); + let interval = self.interval; + let child = token.child_token(); + let task_fn = self.task_fn.clone(); + // TODO(hl): Maybe spawn to a blocking runtime. + let handle = runtime.spawn(async move { + loop { + tokio::select! { + _ = tokio::time::sleep(interval) => {} + _ = child.cancelled() => { + return; + } + } + if let Err(e) = task_fn.call().await { + logging::error!(e; "Failed to run repeated task: {}", task_fn.name()); + } + } + }); + *self.cancel_token.lock().await = Some(token); + *self.task_handle.lock().await = Some(handle); + self.started.store(true, Ordering::Relaxed); + + logging::info!( + "Repeated task {} started with interval: {:?}", + self.task_fn.name(), + self.interval + ); + + Ok(()) + } + + pub async fn stop(&self) -> Result<()> { + let name = self.task_fn.name(); + ensure!( + self.started + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok(), + IllegalStateSnafu { name } + ); + let token = self + .cancel_token + .lock() + .await + .take() + .context(IllegalStateSnafu { name })?; + let handle = self + .task_handle + .lock() + .await + .take() + .context(IllegalStateSnafu { name })?; + + token.cancel(); + handle.await.context(WaitGcTaskStopSnafu { name })?; + + logging::info!("Repeated task {} stopped", name); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicI32; + + use super::*; + + struct TickTask { + n: AtomicI32, + } + + #[async_trait::async_trait] + impl TaskFunction for TickTask { + fn name(&self) -> &str { + "test" + } + + async fn call(&self) -> Result<()> { + self.n.fetch_add(1, Ordering::Relaxed); + + Ok(()) + } + } + + #[tokio::test] + async fn test_repeated_task() { + common_telemetry::init_default_ut_logging(); + + let task_fn = Arc::new(TickTask { + n: AtomicI32::new(0), + }); + + let task = RepeatedTask::new(Duration::from_millis(100), task_fn.clone()); + + task.start(crate::bg_runtime()).await.unwrap(); + tokio::time::sleep(Duration::from_millis(550)).await; + task.stop().await.unwrap(); + + assert_eq!(task_fn.n.load(Ordering::Relaxed), 5); + } +} diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index a21057bb7f..17f1998265 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -29,6 +29,7 @@ use crate::server::Services; pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024); +/// Object storage config #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum ObjectStoreConfig { @@ -37,6 +38,16 @@ pub enum ObjectStoreConfig { Oss(OssConfig), } +/// Storage engine config +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(default)] +pub struct StorageConfig { + #[serde(flatten)] + pub store: ObjectStoreConfig, + pub compaction: CompactionConfig, + pub manifest: RegionManifestConfig, +} + #[derive(Debug, Clone, Serialize, Default, Deserialize)] #[serde(default)] pub struct FileConfig { @@ -107,6 +118,27 @@ impl Default for WalConfig { } } +/// Options for region manifest +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] +#[serde(default)] +pub struct RegionManifestConfig { + /// Region manifest checkpoint actions margin. + /// Manifest service create a checkpoint every [checkpoint_margin] actions. + pub checkpoint_margin: Option, + /// Region manifest logs and checkpoints gc task execution duration. + #[serde(with = "humantime_serde")] + pub gc_duration: Option, +} + +impl Default for RegionManifestConfig { + fn default() -> Self { + Self { + checkpoint_margin: Some(10u16), + gc_duration: Some(Duration::from_secs(30)), + } + } +} + /// Options for table compaction #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] #[serde(default)] @@ -132,7 +164,7 @@ impl Default for CompactionConfig { impl From<&DatanodeOptions> for SchedulerConfig { fn from(value: &DatanodeOptions) -> Self { Self { - max_inflight_tasks: value.compaction.max_inflight_tasks, + max_inflight_tasks: value.storage.compaction.max_inflight_tasks, } } } @@ -140,8 +172,10 @@ impl From<&DatanodeOptions> for SchedulerConfig { impl From<&DatanodeOptions> for StorageEngineConfig { fn from(value: &DatanodeOptions) -> Self { Self { - max_files_in_l0: value.compaction.max_files_in_level0, - max_purge_tasks: value.compaction.max_purge_tasks, + manifest_checkpoint_margin: value.storage.manifest.checkpoint_margin, + manifest_gc_duration: value.storage.manifest.gc_duration, + max_files_in_l0: value.storage.compaction.max_files_in_level0, + max_purge_tasks: value.storage.compaction.max_purge_tasks, } } } @@ -192,8 +226,7 @@ pub struct DatanodeOptions { pub mysql_runtime_size: usize, pub meta_client_options: Option, pub wal: WalConfig, - pub storage: ObjectStoreConfig, - pub compaction: CompactionConfig, + pub storage: StorageConfig, pub procedure: Option, } @@ -210,8 +243,7 @@ impl Default for DatanodeOptions { mysql_runtime_size: 2, meta_client_options: None, wal: WalConfig::default(), - storage: ObjectStoreConfig::default(), - compaction: CompactionConfig::default(), + storage: StorageConfig::default(), procedure: None, } } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index e7a85a58b7..01f9fbc27f 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -181,6 +181,9 @@ pub enum Error { #[snafu(display("Failed to create directory {}, source: {}", dir, source))] CreateDir { dir: String, source: std::io::Error }, + #[snafu(display("Failed to remove directory {}, source: {}", dir, source))] + RemoveDir { dir: String, source: std::io::Error }, + #[snafu(display("Failed to open log store, source: {}", source))] OpenLogStore { #[snafu(backtrace)] @@ -576,6 +579,7 @@ impl ErrorExt for Error { | TcpBind { .. } | StartGrpc { .. } | CreateDir { .. } + | RemoveDir { .. } | InsertSystemCatalog { .. } | RenameTable { .. } | Catalog { .. } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 5f448c4729..ae2495dd58 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -104,7 +104,7 @@ impl Instance { meta_client: Option>, compaction_scheduler: CompactionSchedulerRef, ) -> Result { - let object_store = new_object_store(&opts.storage).await?; + let object_store = new_object_store(&opts.storage.store).await?; let log_store = Arc::new(create_log_store(&opts.wal).await?); let table_engine = Arc::new(DefaultEngine::new( @@ -430,15 +430,27 @@ pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Res info!("The file storage directory is: {}", &data_dir); let atomic_write_dir = format!("{data_dir}/.tmp/"); + if path::Path::new(&atomic_write_dir).exists() { + info!( + "Begin to clean temp storage directory: {}", + &atomic_write_dir + ); + fs::remove_dir_all(&atomic_write_dir).context(error::RemoveDirSnafu { + dir: &atomic_write_dir, + })?; + info!("Cleaned temp storage directory: {}", &atomic_write_dir); + } let mut builder = FsBuilder::default(); builder.root(&data_dir).atomic_write_dir(&atomic_write_dir); - Ok(ObjectStore::new(builder) + let object_store = ObjectStore::new(builder) .context(error::InitBackendSnafu { config: store_config.clone(), })? - .finish()) + .finish(); + + Ok(object_store) } /// Create metasrv client instance and spawn heartbeat loop. diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 11a3e57ee1..220010ccf3 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -32,7 +32,9 @@ use sql::statements::tql::Tql; use table::engine::{EngineContext, TableEngineRef}; use table::requests::{CreateTableRequest, TableOptions}; -use crate::datanode::{DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, WalConfig}; +use crate::datanode::{ + DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, StorageConfig, WalConfig, +}; use crate::error::{CreateTableSnafu, Result}; use crate::instance::Instance; use crate::sql::SqlHandler; @@ -130,9 +132,12 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) dir: wal_tmp_dir.path().to_str().unwrap().to_string(), ..Default::default() }, - storage: ObjectStoreConfig::File(FileConfig { - data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), - }), + storage: StorageConfig { + store: ObjectStoreConfig::File(FileConfig { + data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), + }), + ..Default::default() + }, mode: Mode::Standalone, ..Default::default() }; diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 75e8a70ea1..9de5b8857c 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -24,7 +24,9 @@ use client::Client; use common_grpc::channel_manager::ChannelManager; use common_runtime::Builder as RuntimeBuilder; use common_test_util::temp_dir::{create_temp_dir, TempDir}; -use datanode::datanode::{DatanodeOptions, FileConfig, ObjectStoreConfig, WalConfig}; +use datanode::datanode::{ + DatanodeOptions, FileConfig, ObjectStoreConfig, StorageConfig, WalConfig, +}; use datanode::instance::Instance as DatanodeInstance; use meta_client::client::MetaClientBuilder; use meta_client::rpc::Peer; @@ -98,9 +100,12 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) dir: wal_tmp_dir.path().to_str().unwrap().to_string(), ..Default::default() }, - storage: ObjectStoreConfig::File(FileConfig { - data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), - }), + storage: StorageConfig { + store: ObjectStoreConfig::File(FileConfig { + data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), + }), + ..Default::default() + }, mode: Mode::Standalone, ..Default::default() }; @@ -185,9 +190,12 @@ async fn create_distributed_datanode( dir: wal_tmp_dir.path().to_str().unwrap().to_string(), ..Default::default() }, - storage: ObjectStoreConfig::File(FileConfig { - data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), - }), + storage: StorageConfig { + store: ObjectStoreConfig::File(FileConfig { + data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), + }), + ..Default::default() + }, mode: Mode::Distributed, ..Default::default() }; diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index cdb7a63302..2136e79f3a 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -15,16 +15,22 @@ use std::any::Any; use common_error::prelude::{ErrorExt, Snafu}; +use common_runtime::error::Error as RuntimeError; use snafu::{Backtrace, ErrorCompat}; -use tokio::task::JoinError; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Failed to wait for gc task to stop, source: {}", source))] - WaitGcTaskStop { - source: JoinError, - backtrace: Backtrace, + #[snafu(display("Failed to start log store gc task, source: {}", source))] + StartGcTask { + #[snafu(backtrace)] + source: RuntimeError, + }, + + #[snafu(display("Failed to stop log store gc task, source: {}", source))] + StopGcTask { + #[snafu(backtrace)] + source: RuntimeError, }, #[snafu(display("Failed to add entry to LogBatch, source: {}", source))] diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 48dfc187b2..39ca5d6789 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -13,25 +13,22 @@ // limitations under the License. use std::fmt::{Debug, Formatter}; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use async_stream::stream; +use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::{error, info}; use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode}; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{ensure, ResultExt}; use store_api::logstore::entry::Id; use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::Namespace as NamespaceTrait; use store_api::logstore::{AppendResponse, LogStore}; -use tokio::sync::Mutex; -use tokio::task::JoinHandle; -use tokio_util::sync::CancellationToken; use crate::config::LogConfig; use crate::error::{ AddEntryLogBatchSnafu, Error, FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu, - RaftEngineSnafu, WaitGcTaskStopSnafu, + RaftEngineSnafu, StartGcTaskSnafu, StopGcTaskSnafu, }; use crate::raft_engine::protos::logstore::{EntryImpl as Entry, NamespaceImpl as Namespace}; @@ -41,9 +38,36 @@ const SYSTEM_NAMESPACE: u64 = 0; pub struct RaftEngineLogStore { config: LogConfig, engine: Arc, - cancel_token: Mutex>, - gc_task_handle: Mutex>>, - started: AtomicBool, + gc_task: RepeatedTask, +} + +pub struct PurgeExpiredFilesFunction { + engine: Arc, +} + +#[async_trait::async_trait] +impl TaskFunction for PurgeExpiredFilesFunction { + fn name(&self) -> &str { + "RaftEngineLogStore-gc-task" + } + + async fn call(&self) -> Result<(), Error> { + match self.engine.purge_expired_files().context(RaftEngineSnafu) { + Ok(res) => { + // TODO(hl): the retval of purge_expired_files indicates the namespaces need to be compact, + // which is useful when monitoring regions failed to flush it's memtable to SSTs. + info!( + "Successfully purged logstore files, namespaces need compaction: {:?}", + res + ); + } + Err(e) => { + error!(e; "Failed to purge files in logstore"); + } + } + + Ok(()) + } } impl RaftEngineLogStore { @@ -58,56 +82,31 @@ impl RaftEngineLogStore { ..Default::default() }; let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?); + let gc_task = RepeatedTask::new( + config.purge_interval, + Arc::new(PurgeExpiredFilesFunction { + engine: engine.clone(), + }), + ); + let log_store = Self { config, engine, - cancel_token: Mutex::new(None), - gc_task_handle: Mutex::new(None), - started: AtomicBool::new(false), + gc_task, }; log_store.start().await?; Ok(log_store) } pub fn started(&self) -> bool { - self.started.load(Ordering::Relaxed) + self.gc_task.started() } async fn start(&self) -> Result<(), Error> { - let engine_clone = self.engine.clone(); - let interval = self.config.purge_interval; - let token = CancellationToken::new(); - let child = token.child_token(); - // TODO(hl): Maybe spawn to a blocking runtime. - let handle = common_runtime::spawn_bg(async move { - loop { - tokio::select! { - _ = tokio::time::sleep(interval) => {} - _ = child.cancelled() => { - info!("LogStore gc task has been cancelled"); - return; - } - } - match engine_clone.purge_expired_files().context(RaftEngineSnafu) { - Ok(res) => { - // TODO(hl): the retval of purge_expired_files indicates the namespaces need to be compact, - // which is useful when monitoring regions failed to flush it's memtable to SSTs. - info!( - "Successfully purged logstore files, namespaces need compaction: {:?}", - res - ); - } - Err(e) => { - error!(e; "Failed to purge files in logstore"); - } - } - } - }); - *self.cancel_token.lock().await = Some(token); - *self.gc_task_handle.lock().await = Some(handle); - self.started.store(true, Ordering::Relaxed); - info!("RaftEngineLogStore started with config: {:?}", self.config); - Ok(()) + self.gc_task + .start(common_runtime::bg_runtime()) + .await + .context(StartGcTaskSnafu) } } @@ -115,7 +114,7 @@ impl Debug for RaftEngineLogStore { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("RaftEngineLogsStore") .field("config", &self.config) - .field("started", &self.started.load(Ordering::Relaxed)) + .field("started", &self.gc_task.started()) .finish() } } @@ -127,28 +126,7 @@ impl LogStore for RaftEngineLogStore { type Entry = Entry; async fn stop(&self) -> Result<(), Self::Error> { - ensure!( - self.started - .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) - .is_ok(), - IllegalStateSnafu - ); - let handle = self - .gc_task_handle - .lock() - .await - .take() - .context(IllegalStateSnafu)?; - let token = self - .cancel_token - .lock() - .await - .take() - .context(IllegalStateSnafu)?; - token.cancel(); - handle.await.context(WaitGcTaskStopSnafu)?; - info!("RaftEngineLogStore stopped"); - Ok(()) + self.gc_task.stop().await.context(StopGcTaskSnafu) } /// Append an entry to logstore. Currently of existence of entry's namespace is not checked. diff --git a/src/mito/src/manifest.rs b/src/mito/src/manifest.rs index 24ae4d39e5..db1f42f77b 100644 --- a/src/mito/src/manifest.rs +++ b/src/mito/src/manifest.rs @@ -80,7 +80,7 @@ mod tests { async fn test_table_manifest() { let (_dir, object_store) = test_util::new_test_object_store("test_table_manifest").await; - let manifest = TableManifest::new("manifest/", object_store, None); + let manifest = TableManifest::create("manifest/", object_store); let mut iter = manifest.scan(0, 100).await.unwrap(); assert!(iter.next_action().await.unwrap().is_none()); diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 9b8ec3239e..d72109f0b1 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -471,7 +471,7 @@ impl MitoTable { regions: HashMap, object_store: ObjectStore, ) -> Result> { - let manifest = TableManifest::new(&table_manifest_dir(table_dir), object_store, None); + let manifest = TableManifest::create(&table_manifest_dir(table_dir), object_store); // TODO(dennis): save manifest version into catalog? let _manifest_version = manifest @@ -487,7 +487,7 @@ impl MitoTable { } pub(crate) fn build_manifest(table_dir: &str, object_store: ObjectStore) -> TableManifest { - TableManifest::new(&table_manifest_dir(table_dir), object_store, None) + TableManifest::create(&table_manifest_dir(table_dir), object_store) } pub(crate) async fn recover_table_info( diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index 9fd9c578bb..d39e8ff41c 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub use opendal::raw::normalize_path as raw_normalize_path; pub use opendal::raw::oio::Pager; pub use opendal::{ layers, services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Metakey, diff --git a/src/storage/src/config.rs b/src/storage/src/config.rs index bec107489d..51b3185ae6 100644 --- a/src/storage/src/config.rs +++ b/src/storage/src/config.rs @@ -14,8 +14,12 @@ //! storage engine config +use std::time::Duration; + #[derive(Debug, Clone)] pub struct EngineConfig { + pub manifest_checkpoint_margin: Option, + pub manifest_gc_duration: Option, pub max_files_in_l0: usize, pub max_purge_tasks: usize, } @@ -23,6 +27,8 @@ pub struct EngineConfig { impl Default for EngineConfig { fn default() -> Self { Self { + manifest_checkpoint_margin: Some(10), + manifest_gc_duration: Some(Duration::from_secs(30)), max_files_in_l0: 8, max_purge_tasks: 32, } diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index 3c773a93d7..16d2f05bb1 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -21,6 +21,7 @@ use common_telemetry::logging::info; use object_store::{util, ObjectStore}; use snafu::ResultExt; use store_api::logstore::LogStore; +use store_api::manifest::Manifest; use store_api::storage::{ CreateOptions, EngineContext, OpenOptions, Region, RegionDescriptor, StorageEngine, }; @@ -290,8 +291,15 @@ impl EngineInner { let mut guard = SlotGuard::new(name, &self.regions); - let store_config = - self.region_store_config(&opts.parent_dir, opts.write_buffer_size, name, opts.ttl); + let store_config = self + .region_store_config( + &opts.parent_dir, + opts.write_buffer_size, + name, + &self.config, + opts.ttl, + ) + .await?; let region = match RegionImpl::open(name.to_string(), store_config, opts).await? { None => return Ok(None), @@ -321,12 +329,15 @@ impl EngineInner { .context(error::InvalidRegionDescSnafu { region: ®ion_name, })?; - let store_config = self.region_store_config( - &opts.parent_dir, - opts.write_buffer_size, - ®ion_name, - opts.ttl, - ); + let store_config = self + .region_store_config( + &opts.parent_dir, + opts.write_buffer_size, + ®ion_name, + &self.config, + opts.ttl, + ) + .await?; let region = RegionImpl::create(metadata, store_config).await?; @@ -342,25 +353,32 @@ impl EngineInner { slot.get_ready_region() } - fn region_store_config( + async fn region_store_config( &self, parent_dir: &str, write_buffer_size: Option, region_name: &str, + config: &EngineConfig, ttl: Option, - ) -> StoreConfig { + ) -> Result> { let parent_dir = util::normalize_dir(parent_dir); let sst_dir = ®ion_sst_dir(&parent_dir, region_name); let sst_layer = Arc::new(FsAccessLayer::new(sst_dir, self.object_store.clone())); let manifest_dir = region_manifest_dir(&parent_dir, region_name); - let manifest = RegionManifest::with_checkpointer(&manifest_dir, self.object_store.clone()); + let manifest = RegionManifest::with_checkpointer( + &manifest_dir, + self.object_store.clone(), + config.manifest_checkpoint_margin, + config.manifest_gc_duration, + ); + manifest.start().await?; let flush_strategy = write_buffer_size .map(|size| Arc::new(SizeBasedStrategy::new(size)) as Arc<_>) .unwrap_or_else(|| self.flush_strategy.clone()); - StoreConfig { + Ok(StoreConfig { log_store: self.log_store.clone(), sst_layer, manifest, @@ -371,7 +389,7 @@ impl EngineInner { engine_config: self.config.clone(), file_purger: self.file_purger.clone(), ttl, - } + }) } } diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index bec8d32716..6148dc4999 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -17,6 +17,7 @@ use std::io::Error as IoError; use std::str::Utf8Error; use common_error::prelude::*; +use common_runtime::error::Error as RuntimeError; use datatypes::arrow::error::ArrowError; use datatypes::prelude::ConcreteDataType; use serde_json::error::Error as JsonError; @@ -423,6 +424,18 @@ pub enum Error { #[snafu(display("Cannot schedule request, scheduler's already stopped"))] IllegalSchedulerState { backtrace: Backtrace }, + #[snafu(display("Failed to start manifest gc task: {}", source))] + StartManifestGcTask { + #[snafu(backtrace)] + source: RuntimeError, + }, + + #[snafu(display("Failed to stop manifest gc task: {}", source))] + StopManifestGcTask { + #[snafu(backtrace)] + source: RuntimeError, + }, + #[snafu(display("Failed to stop scheduler, source: {}", source))] StopScheduler { source: JoinError, @@ -524,7 +537,11 @@ impl ErrorExt for Error { StatusCode::Internal } DeleteSst { .. } => StatusCode::StorageUnavailable, - IllegalSchedulerState { .. } => StatusCode::Unexpected, + + StartManifestGcTask { .. } + | StopManifestGcTask { .. } + | IllegalSchedulerState { .. } => StatusCode::Unexpected, + TtlCalculation { source, .. } => source.status_code(), } } diff --git a/src/storage/src/manifest/impl_.rs b/src/storage/src/manifest/impl_.rs index e2b94025c7..b320dab5b1 100644 --- a/src/storage/src/manifest/impl_.rs +++ b/src/storage/src/manifest/impl_.rs @@ -15,43 +15,72 @@ use std::marker::PhantomData; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::time::Duration; use arc_swap::ArcSwap; use async_trait::async_trait; +use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::{debug, logging, warn}; use object_store::ObjectStore; -use snafu::ensure; +use snafu::{ensure, ResultExt}; use store_api::manifest::action::{self, ProtocolAction, ProtocolVersion}; use store_api::manifest::*; -use crate::error::{Error, ManifestProtocolForbidWriteSnafu, Result}; +use crate::error::{ + Error, ManifestProtocolForbidWriteSnafu, Result, StartManifestGcTaskSnafu, + StopManifestGcTaskSnafu, +}; use crate::manifest::action::RegionCheckpoint; use crate::manifest::checkpoint::Checkpointer; use crate::manifest::storage::{ManifestObjectStore, ObjectStoreLogIterator}; -// TODO(dennis): export this option to table options or storage options. -const CHECKPOINT_ACTIONS_MARGIN: u64 = 10; +const CHECKPOINT_ACTIONS_MARGIN: u16 = 10; +const GC_DURATION_SECS: u64 = 30; #[derive(Clone, Debug)] pub struct ManifestImpl, M: MetaAction> { inner: Arc>, checkpointer: Option>>, last_checkpoint_version: Arc, + checkpoint_actions_margin: u16, + gc_task: Option>>, } -impl, M: MetaAction> ManifestImpl { +impl, M: 'static + MetaAction> + ManifestImpl +{ pub fn new( manifest_dir: &str, object_store: ObjectStore, + checkpoint_actions_margin: Option, + gc_duration: Option, checkpointer: Option>>, ) -> Self { + let inner = Arc::new(ManifestImplInner::new(manifest_dir, object_store)); + let gc_task = if checkpointer.is_some() { + // only start gc task when checkpoint is enabled. + Some(Arc::new(RepeatedTask::new( + gc_duration.unwrap_or_else(|| Duration::from_secs(GC_DURATION_SECS)), + inner.clone() as _, + ))) + } else { + None + }; + ManifestImpl { - inner: Arc::new(ManifestImplInner::new(manifest_dir, object_store)), + inner, checkpointer, + checkpoint_actions_margin: checkpoint_actions_margin + .unwrap_or(CHECKPOINT_ACTIONS_MARGIN), last_checkpoint_version: Arc::new(AtomicU64::new(MIN_VERSION)), + gc_task, } } + pub fn create(manifest_dir: &str, object_store: ObjectStore) -> Self { + Self::new(manifest_dir, object_store, None, None, None) + } + pub fn checkpointer(&self) -> &Option>> { &self.checkpointer } @@ -100,7 +129,7 @@ impl, M: 'static + MetaAction Result { let version = self.inner.save(action_list).await?; if version - self.last_checkpoint_version.load(Ordering::Relaxed) - >= CHECKPOINT_ACTIONS_MARGIN + >= self.checkpoint_actions_margin as u64 { let s = self.do_checkpoint().await?; debug!("Manifest checkpoint, checkpoint: {:#?}", s); @@ -135,6 +164,24 @@ impl, M: 'static + MetaAction ManifestVersion { self.inner.last_version() } + + async fn start(&self) -> Result<()> { + if let Some(task) = &self.gc_task { + task.start(common_runtime::bg_runtime()) + .await + .context(StartManifestGcTaskSnafu)?; + } + + Ok(()) + } + + async fn stop(&self) -> Result<()> { + if let Some(task) = &self.gc_task { + task.stop().await.context(StopManifestGcTaskSnafu)?; + } + + Ok(()) + } } #[derive(Debug)] @@ -183,6 +230,29 @@ impl> MetaActionIterator for MetaActionIteratorImpl } } +#[async_trait::async_trait] +impl, M: MetaAction> TaskFunction + for ManifestImplInner +{ + fn name(&self) -> &str { + "region-manifest-gc" + } + + async fn call(&self) -> Result<()> { + if let Some((last_version, _)) = self.store.load_last_checkpoint().await? { + // Purge all manifest and checkpoint files before last_version. + let deleted = self.store.delete_until(last_version).await?; + debug!( + "Deleted {} logs from region manifest storage(path={}).", + deleted, + self.store.path() + ); + } + + Ok(()) + } +} + impl, M: MetaAction> ManifestImplInner { fn new(manifest_dir: &str, object_store: ObjectStore) -> Self { let (reader_version, writer_version) = action::supported_protocol_version(); @@ -283,8 +353,8 @@ impl, M: MetaAction> ManifestImplInn // It happens when saving checkpoint successfully, but failed at saving checkpoint metadata(the "__last_checkpoint" file). // Then we try to use the old checkpoint and do the checkpoint next time. // If the old checkpoint was deleted, it's fine that we return the latest checkpoint. - // the only side effect is leaving some unused checkpoint checkpoint files. - // TODO(dennis): delete unused checkpoint files + // the only side effect is leaving some unused checkpoint checkpoint files, + // and they will be purged by gc task. warn!("The checkpoint manifest version {} in {} is greater than checkpoint metadata version {}.", self.store.path(), checkpoint.last_version(), version); if let Some((_, bytes)) = self.store.load_checkpoint(version).await? { diff --git a/src/storage/src/manifest/region.rs b/src/storage/src/manifest/region.rs index 2a2dc39efb..ce1457ec76 100644 --- a/src/storage/src/manifest/region.rs +++ b/src/storage/src/manifest/region.rs @@ -16,6 +16,7 @@ use std::any::Any; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use common_telemetry::info; @@ -115,17 +116,10 @@ impl Checkpointer for RegionManifestCheckpointer { }; manifest.save_checkpoint(&checkpoint).await?; - // TODO(dennis): background task to clean old manifest actions and checkpoints. manifest .manifest_store() .delete(start_version, last_version + 1) .await?; - if start_version > MIN_VERSION { - manifest - .manifest_store() - .delete_checkpoint(start_version - 1) - .await? - } info!("Region manifest checkpoint, start_version: {}, last_version: {}, compacted actions: {}", start_version, last_version, compacted_actions); @@ -138,10 +132,17 @@ impl Checkpointer for RegionManifestCheckpointer { } impl RegionManifest { - pub fn with_checkpointer(manifest_dir: &str, object_store: ObjectStore) -> Self { + pub fn with_checkpointer( + manifest_dir: &str, + object_store: ObjectStore, + checkpoint_actions_margin: Option, + gc_duration: Option, + ) -> Self { Self::new( manifest_dir, object_store, + checkpoint_actions_margin, + gc_duration, Some(Arc::new(RegionManifestCheckpointer { flushed_manifest_version: AtomicU64::new(0), })), @@ -184,7 +185,8 @@ mod tests { builder.root(&tmp_dir.path().to_string_lossy()); let object_store = ObjectStore::new(builder).unwrap().finish(); - let manifest = RegionManifest::with_checkpointer("/manifest/", object_store); + let manifest = RegionManifest::with_checkpointer("/manifest/", object_store, None, None); + manifest.start().await.unwrap(); let region_meta = Arc::new(build_region_meta()); @@ -274,6 +276,8 @@ mod tests { // Reach end assert!(iter.next_action().await.unwrap().is_none()); + + manifest.stop().await.unwrap(); } async fn assert_scan(manifest: &RegionManifest, start_version: ManifestVersion, expected: u64) { @@ -294,7 +298,13 @@ mod tests { builder.root(&tmp_dir.path().to_string_lossy()); let object_store = ObjectStore::new(builder).unwrap().finish(); - let manifest = RegionManifest::with_checkpointer("/manifest/", object_store); + let manifest = RegionManifest::with_checkpointer( + "/manifest/", + object_store, + None, + Some(Duration::from_millis(50)), + ); + manifest.start().await.unwrap(); let region_meta = Arc::new(build_region_meta()); let new_region_meta = Arc::new(build_altered_region_meta()); @@ -325,12 +335,15 @@ mod tests { // update flushed manifest version for doing checkpoint manifest.set_flushed_manifest_version(2); + let mut checkpoint_versions = vec![]; + // do a checkpoint let checkpoint = manifest.do_checkpoint().await.unwrap().unwrap(); let last_checkpoint = manifest.last_checkpoint().await.unwrap().unwrap(); assert_eq!(checkpoint, last_checkpoint); assert_eq!(checkpoint.compacted_actions, 3); assert_eq!(checkpoint.last_version, 2); + checkpoint_versions.push(2); let alterd_raw_meta = RawRegionMetadata::from(new_region_meta.as_ref()); assert!(matches!(&checkpoint.checkpoint, Some(RegionManifestData { committed_sequence: 99, @@ -376,8 +389,8 @@ mod tests { } assert_scan(&manifest, 3, 2).await; - // do another checkpoints + // do another checkpoints // compacted RegionChange manifest.set_flushed_manifest_version(3); let checkpoint = manifest.do_checkpoint().await.unwrap().unwrap(); @@ -385,6 +398,7 @@ mod tests { assert_eq!(checkpoint, last_checkpoint); assert_eq!(checkpoint.compacted_actions, 1); assert_eq!(checkpoint.last_version, 3); + checkpoint_versions.push(3); assert!(matches!(&checkpoint.checkpoint, Some(RegionManifestData { committed_sequence: 200, metadata, @@ -406,6 +420,7 @@ mod tests { assert_eq!(checkpoint, last_checkpoint); assert_eq!(checkpoint.compacted_actions, 1); assert_eq!(checkpoint.last_version, 4); + checkpoint_versions.push(4); assert!(matches!(&checkpoint.checkpoint, Some(RegionManifestData { committed_sequence: 200, metadata, @@ -427,5 +442,26 @@ mod tests { .await .unwrap() .is_none()); + + // wait for gc + tokio::time::sleep(Duration::from_millis(60)).await; + + for v in checkpoint_versions { + if v < 4 { + // ensure old checkpoints were purged. + assert!(manifest + .manifest_store() + .load_checkpoint(v) + .await + .unwrap() + .is_none()); + } else { + // the last checkpoints is still exists. + let last_checkpoint = manifest.last_checkpoint().await.unwrap().unwrap(); + assert_eq!(checkpoint, last_checkpoint); + } + } + + manifest.stop().await.unwrap(); } } diff --git a/src/storage/src/manifest/storage.rs b/src/storage/src/manifest/storage.rs index ac2736c513..92a420472d 100644 --- a/src/storage/src/manifest/storage.rs +++ b/src/storage/src/manifest/storage.rs @@ -19,7 +19,7 @@ use async_trait::async_trait; use common_telemetry::logging; use futures::TryStreamExt; use lazy_static::lazy_static; -use object_store::{util, Entry, ErrorKind, ObjectStore}; +use object_store::{raw_normalize_path, util, Entry, ErrorKind, ObjectStore}; use regex::Regex; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -31,7 +31,8 @@ use crate::error::{ }; lazy_static! { - static ref RE: Regex = Regex::new("^\\d+\\.json$").unwrap(); + static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json$").unwrap(); + static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap(); } const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint"; @@ -46,20 +47,24 @@ pub fn checkpoint_file(version: ManifestVersion) -> String { format!("{version:020}.checkpoint") } -/// Return's the delta file version from path +/// Return's the file manifest version from path /// /// # Panics -/// Panics if the file path is not a valid delta file. +/// Panics if the file path is not a valid delta or checkpoint file. #[inline] -pub fn delta_version(path: &str) -> ManifestVersion { +pub fn file_version(path: &str) -> ManifestVersion { let s = path.split('.').next().unwrap(); - s.parse() - .unwrap_or_else(|_| panic!("Invalid delta file: {path}")) + s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}")) } #[inline] pub fn is_delta_file(file_name: &str) -> bool { - RE.is_match(file_name) + DELTA_RE.is_match(file_name) +} + +#[inline] +pub fn is_checkpoint_file(file_name: &str) -> bool { + CHECKPOINT_RE.is_match(file_name) } pub struct ObjectStoreLogIterator { @@ -162,7 +167,7 @@ impl ManifestLogStorage for ManifestObjectStore { .try_filter_map(|e| async move { let file_name = e.name(); if is_delta_file(file_name) { - let version = delta_version(file_name); + let version = file_version(file_name); if version >= start && version < end { Ok(Some((version, e))) } else { @@ -184,6 +189,48 @@ impl ManifestLogStorage for ManifestObjectStore { }) } + async fn delete_until(&self, end: ManifestVersion) -> Result { + let streamer = self + .object_store + .list(&self.path) + .await + .context(ListObjectsSnafu { path: &self.path })?; + + let paths: Vec<_> = streamer + .try_filter_map(|e| async move { + let file_name = e.name(); + if is_delta_file(file_name) || is_checkpoint_file(file_name) { + let version = file_version(file_name); + if version < end { + Ok(Some(e.path().to_string())) + } else { + Ok(None) + } + } else { + Ok(None) + } + }) + .try_collect::>() + .await + .context(ListObjectsSnafu { path: &self.path })?; + let ret = paths.len(); + + logging::debug!( + "Deleting {} logs from manifest storage path {}.", + ret, + self.path + ); + + self.object_store + .remove(paths) + .await + .with_context(|_| DeleteObjectSnafu { + path: self.path.clone(), + })?; + + Ok(ret) + } + async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { let path = self.delta_file_path(version); self.object_store @@ -193,14 +240,22 @@ impl ManifestLogStorage for ManifestObjectStore { } async fn delete(&self, start: ManifestVersion, end: ManifestVersion) -> Result<()> { - //TODO(dennis): delete in batch or concurrently? - for v in start..end { - let path = self.delta_file_path(v); - self.object_store - .delete(&path) - .await - .context(DeleteObjectSnafu { path })?; - } + let raw_paths = (start..end) + .map(|v| self.delta_file_path(v)) + .collect::>(); + + let paths = raw_paths + .iter() + .map(|p| raw_normalize_path(p)) + .collect::>(); + + self.object_store + .remove(paths) + .await + .with_context(|_| DeleteObjectSnafu { + path: raw_paths.join(","), + })?; + Ok(()) } @@ -243,13 +298,11 @@ impl ManifestLogStorage for ManifestObjectStore { version: ManifestVersion, ) -> Result)>> { let path = self.checkpoint_file_path(version); - let checkpoint = self - .object_store - .read(&path) - .await - .context(ReadObjectSnafu { path })?; - - Ok(Some((version, checkpoint))) + match self.object_store.read(&path).await { + Ok(checkpoint) => Ok(Some((version, checkpoint))), + Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), + Err(e) => Err(e).context(ReadObjectSnafu { path }), + } } async fn delete_checkpoint(&self, version: ManifestVersion) -> Result<()> { @@ -351,5 +404,22 @@ mod tests { let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap(); assert_eq!(checkpoint, "checkpoint".as_bytes()); assert_eq!(3, v); + + //delete (,4) logs and checkpoints + log_store.delete_until(4).await.unwrap(); + assert!(log_store.load_checkpoint(3).await.unwrap().is_none()); + assert!(log_store.load_last_checkpoint().await.unwrap().is_none()); + let mut it = log_store.scan(0, 11).await.unwrap(); + let (version, bytes) = it.next_log().await.unwrap().unwrap(); + assert_eq!(4, version); + assert_eq!("hello, 4".as_bytes(), bytes); + assert!(it.next_log().await.unwrap().is_none()); + + // delete all logs and checkpoints + log_store.delete_until(11).await.unwrap(); + assert!(log_store.load_checkpoint(3).await.unwrap().is_none()); + assert!(log_store.load_last_checkpoint().await.unwrap().is_none()); + let mut it = log_store.scan(0, 11).await.unwrap(); + assert!(it.next_log().await.unwrap().is_none()); } } diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 69694c1efe..38bbb74536 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -629,7 +629,8 @@ impl RegionInner { } async fn close(&self) -> Result<()> { - self.writer.close().await + self.writer.close().await?; + self.manifest.stop().await } async fn flush(&self, ctx: &FlushContext) -> Result<()> { diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 4c27a0892e..27319dacc1 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -314,7 +314,8 @@ async fn test_recover_region_manifets() { builder.root(&tmp_dir.path().to_string_lossy()); let object_store = ObjectStore::new(builder).unwrap().finish(); - let manifest = RegionManifest::with_checkpointer("/manifest/", object_store.clone()); + let manifest = + RegionManifest::with_checkpointer("/manifest/", object_store.clone(), None, None); let region_meta = Arc::new(build_region_meta()); let sst_layer = Arc::new(FsAccessLayer::new("sst", object_store)) as _; diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index 46356bfdac..e498d5869a 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -18,6 +18,7 @@ use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::LogConfig; use object_store::services::Fs; use object_store::ObjectStore; +use store_api::manifest::Manifest; use crate::background::JobPoolImpl; use crate::compaction::noop::NoopCompactionScheduler; @@ -57,7 +58,8 @@ pub async fn new_store_config_with_object_store( let manifest_dir = engine::region_manifest_dir(parent_dir, region_name); let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone())); - let manifest = RegionManifest::with_checkpointer(&manifest_dir, object_store); + let manifest = RegionManifest::with_checkpointer(&manifest_dir, object_store, None, None); + manifest.start().await.unwrap(); let job_pool = Arc::new(JobPoolImpl {}); let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool)); let log_config = LogConfig { diff --git a/src/store-api/src/manifest.rs b/src/store-api/src/manifest.rs index 1806afc76f..985339c694 100644 --- a/src/store-api/src/manifest.rs +++ b/src/store-api/src/manifest.rs @@ -102,4 +102,13 @@ pub trait Manifest: Send + Sync + Clone + 'static { /// Returns the last(or latest) manifest version. fn last_version(&self) -> ManifestVersion; + + /// Start the service + async fn start(&self) -> Result<(), Self::Error> { + Ok(()) + } + /// Stop the service + async fn stop(&self) -> Result<(), Self::Error> { + Ok(()) + } } diff --git a/src/store-api/src/manifest/storage.rs b/src/store-api/src/manifest/storage.rs index d7e7578c90..c93c1c9808 100644 --- a/src/store-api/src/manifest/storage.rs +++ b/src/store-api/src/manifest/storage.rs @@ -36,6 +36,10 @@ pub trait ManifestLogStorage { end: ManifestVersion, ) -> Result; + /// Delete logs which version is less than specified version. + /// Returns the delete logs number. + async fn delete_until(&self, version: ManifestVersion) -> Result; + /// Save a log async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<(), Self::Error>; diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index bbb5b8b25a..090311bd56 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -24,7 +24,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER use common_runtime::Builder as RuntimeBuilder; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datanode::datanode::{ - DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, S3Config, WalConfig, + DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, S3Config, StorageConfig, WalConfig, }; use datanode::error::{CreateTableSnafu, Result}; use datanode::instance::{Instance, InstanceRef}; @@ -100,8 +100,7 @@ fn get_test_store_config( access_key_secret: env::var("GT_OSS_ACCESS_KEY").unwrap(), bucket: env::var("GT_OSS_BUCKET").unwrap(), endpoint: env::var("GT_OSS_ENDPOINT").unwrap(), - cache_path: None, - cache_capacity: None, + ..Default::default() }; let mut builder = Oss::default(); @@ -127,10 +126,7 @@ fn get_test_store_config( access_key_id: env::var("GT_S3_ACCESS_KEY_ID").unwrap(), secret_access_key: env::var("GT_S3_ACCESS_KEY").unwrap(), bucket: env::var("GT_S3_BUCKET").unwrap(), - endpoint: None, - region: None, - cache_path: None, - cache_capacity: None, + ..Default::default() }; let mut builder = S3::default(); @@ -189,14 +185,17 @@ pub fn create_tmp_dir_and_datanode_opts( ) -> (DatanodeOptions, TestGuard) { let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}")); - let (storage, data_tmp_dir) = get_test_store_config(&store_type, name); + let (store, data_tmp_dir) = get_test_store_config(&store_type, name); let opts = DatanodeOptions { wal: WalConfig { dir: wal_tmp_dir.path().to_str().unwrap().to_string(), ..Default::default() }, - storage, + storage: StorageConfig { + store, + ..Default::default() + }, mode: Mode::Standalone, ..Default::default() };