diff --git a/config/datanode.example.toml b/config/datanode.example.toml index c68f379c9c..57654e3f35 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -26,5 +26,6 @@ connect_timeout_millis = 5000 tcp_nodelay = false [compaction] -max_inflight_task = 4 -max_file_in_level0 = 16 +max_inflight_tasks = 4 +max_files_in_level0 = 16 +max_purge_tasks = 32 diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index f17542c86a..e67e0533cc 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -184,8 +184,9 @@ mod tests { assert_eq!( CompactionConfig { - max_inflight_task: 4, - max_file_in_level0: 16, + max_inflight_tasks: 4, + max_files_in_level0: 16, + max_purge_tasks: 32, }, options.compaction ); diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index edb9e18a8d..8d2a9f25a2 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -110,16 +110,19 @@ impl Default for WalConfig { #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct CompactionConfig { /// Max task number that can concurrently run. - pub max_inflight_task: usize, + pub max_inflight_tasks: usize, /// Max files in level 0 to trigger compaction. - pub max_file_in_level0: usize, + pub max_files_in_level0: usize, + /// Max task number for SST purge task after compaction. + pub max_purge_tasks: usize, } impl Default for CompactionConfig { fn default() -> Self { Self { - max_inflight_task: 4, - max_file_in_level0: 8, + max_inflight_tasks: 4, + max_files_in_level0: 8, + max_purge_tasks: 32, } } } @@ -127,7 +130,7 @@ impl Default for CompactionConfig { impl From<&DatanodeOptions> for SchedulerConfig { fn from(value: &DatanodeOptions) -> Self { Self { - max_inflight_task: value.compaction.max_inflight_task, + max_inflight_tasks: value.compaction.max_inflight_tasks, } } } @@ -135,7 +138,8 @@ impl From<&DatanodeOptions> for SchedulerConfig { impl From<&DatanodeOptions> for StorageEngineConfig { fn from(value: &DatanodeOptions) -> Self { Self { - max_files_in_l0: value.compaction.max_file_in_level0, + max_files_in_l0: value.compaction.max_files_in_level0, + max_purge_tasks: value.compaction.max_purge_tasks, } } } diff --git a/src/storage/src/compaction/noop.rs b/src/storage/src/compaction/noop.rs index 3ee3879051..0006226852 100644 --- a/src/storage/src/compaction/noop.rs +++ b/src/storage/src/compaction/noop.rs @@ -82,11 +82,11 @@ where { type Request = R; - async fn schedule(&self, _request: Self::Request) -> crate::error::Result { + fn schedule(&self, _request: Self::Request) -> crate::error::Result { Ok(true) } - async fn stop(&self) -> crate::error::Result<()> { + async fn stop(&self, _await_termination: bool) -> crate::error::Result<()> { Ok(()) } } diff --git a/src/storage/src/compaction/scheduler.rs b/src/storage/src/compaction/scheduler.rs index e8dad10f42..921e0db360 100644 --- a/src/storage/src/compaction/scheduler.rs +++ b/src/storage/src/compaction/scheduler.rs @@ -109,253 +109,3 @@ where Ok(()) } } - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use super::*; - use crate::scheduler::dedup_deque::DedupDeque; - use crate::scheduler::rate_limit::{ - BoxedRateLimitToken, CascadeRateLimiter, MaxInflightTaskLimiter, - }; - use crate::scheduler::{HandlerLoop, LocalScheduler, Scheduler, SchedulerConfig}; - - struct CountdownLatch { - counter: std::sync::Mutex, - notifies: std::sync::RwLock>>, - } - - impl CountdownLatch { - fn new(size: usize) -> Self { - Self { - counter: std::sync::Mutex::new(size), - notifies: std::sync::RwLock::new(vec![]), - } - } - - fn countdown(&self) { - let mut counter = self.counter.lock().unwrap(); - if *counter >= 1 { - *counter -= 1; - if *counter == 0 { - let notifies = self.notifies.read().unwrap(); - for waiter in notifies.iter() { - waiter.notify_one(); - } - } - } - } - - async fn wait(&self) { - let notify = Arc::new(Notify::new()); - { - let notify = notify.clone(); - let mut notifies = self.notifies.write().unwrap(); - notifies.push(notify); - } - notify.notified().await - } - } - - #[tokio::test] - async fn test_schedule_handler() { - common_telemetry::init_default_ut_logging(); - let queue = Arc::new(std::sync::RwLock::new(DedupDeque::default())); - let latch = Arc::new(CountdownLatch::new(2)); - let latch_cloned = latch.clone(); - let handler = Arc::new(HandlerLoop { - req_queue: queue.clone(), - cancel_token: Default::default(), - task_notifier: Arc::new(Default::default()), - request_handler: MockHandler { - cb: move || { - latch_cloned.countdown(); - }, - }, - limiter: Arc::new(CascadeRateLimiter::new(vec![Box::new( - MaxInflightTaskLimiter::new(3), - )])), - }); - - let handler_cloned = handler.clone(); - common_runtime::spawn_bg(async move { handler_cloned.run().await }); - - queue.write().unwrap().push_back(1, MockRequest::default()); - handler.task_notifier.notify_one(); - queue.write().unwrap().push_back(2, MockRequest::default()); - handler.task_notifier.notify_one(); - - tokio::time::timeout(Duration::from_secs(1), latch.wait()) - .await - .unwrap(); - } - - #[derive(Default, Debug)] - struct MockRequest { - region_id: RegionId, - } - - struct MockHandler { - cb: F, - } - - #[async_trait::async_trait] - impl Handler for MockHandler - where - F: Fn() + Send + Sync, - { - type Request = MockRequest; - - async fn handle_request( - &self, - _req: Self::Request, - token: BoxedRateLimitToken, - finish_notifier: Arc, - ) -> Result<()> { - (self.cb)(); - token.try_release(); - finish_notifier.notify_one(); - Ok(()) - } - } - - impl Request for MockRequest { - type Key = RegionId; - - fn key(&self) -> Self::Key { - self.region_id - } - } - - #[tokio::test] - async fn test_scheduler() { - let latch = Arc::new(CountdownLatch::new(2)); - let latch_cloned = latch.clone(); - - let handler = MockHandler { - cb: move || { - latch_cloned.countdown(); - }, - }; - let scheduler: LocalScheduler = LocalScheduler::new( - SchedulerConfig { - max_inflight_task: 3, - }, - handler, - ); - - scheduler - .schedule(MockRequest { region_id: 1 }) - .await - .unwrap(); - - scheduler - .schedule(MockRequest { region_id: 2 }) - .await - .unwrap(); - - tokio::time::timeout(Duration::from_secs(1), latch.wait()) - .await - .unwrap(); - } - - #[tokio::test] - async fn test_scheduler_many() { - common_telemetry::init_default_ut_logging(); - let task_size = 100; - - let latch = Arc::new(CountdownLatch::new(task_size)); - let latch_clone = latch.clone(); - - let handler = MockHandler { - cb: move || { - latch_clone.countdown(); - }, - }; - - let config = SchedulerConfig { - max_inflight_task: 3, - }; - let scheduler = LocalScheduler::new(config, handler); - - for i in 0..task_size { - scheduler - .schedule(MockRequest { - region_id: i as RegionId, - }) - .await - .unwrap(); - } - - tokio::time::timeout(Duration::from_secs(3), latch.wait()) - .await - .unwrap(); - } - - #[tokio::test] - async fn test_scheduler_interval() { - common_telemetry::init_default_ut_logging(); - let task_size = 100; - let latch = Arc::new(CountdownLatch::new(task_size)); - let latch_clone = latch.clone(); - - let handler = MockHandler { - cb: move || { - latch_clone.countdown(); - }, - }; - - let config = SchedulerConfig { - max_inflight_task: 3, - }; - let scheduler = LocalScheduler::new(config, handler); - - for i in 0..task_size / 2 { - scheduler - .schedule(MockRequest { - region_id: i as RegionId, - }) - .await - .unwrap(); - } - - tokio::time::sleep(Duration::from_millis(100)).await; - for i in task_size / 2..task_size { - scheduler - .schedule(MockRequest { - region_id: i as RegionId, - }) - .await - .unwrap(); - } - - tokio::time::timeout(Duration::from_secs(6), latch.wait()) - .await - .unwrap(); - } - - #[tokio::test] - async fn test_schedule_duplicate_tasks() { - common_telemetry::init_default_ut_logging(); - let handler = MockHandler { cb: || {} }; - let config = SchedulerConfig { - max_inflight_task: 3, - }; - let scheduler = LocalScheduler::new(config, handler); - - let mut scheduled_task = 0; - for _ in 0..10 { - if scheduler - .schedule(MockRequest { region_id: 1 }) - .await - .unwrap() - { - scheduled_task += 1; - } - } - scheduler.stop().await.unwrap(); - debug!("Schedule tasks: {}", scheduled_task); - assert!(scheduled_task < 10); - } -} diff --git a/src/storage/src/compaction/strategy.rs b/src/storage/src/compaction/strategy.rs index e9fff670e8..24186b19ef 100644 --- a/src/storage/src/compaction/strategy.rs +++ b/src/storage/src/compaction/strategy.rs @@ -165,9 +165,10 @@ fn fit_time_bucket(span_sec: i64) -> i64 { #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::collections::{HashMap, HashSet}; use super::*; + use crate::file_purger::noop::new_noop_file_purger; use crate::sst::FileMeta; #[test] @@ -227,14 +228,21 @@ mod tests { } fn new_file_handle(name: &str, start_ts_millis: i64, end_ts_millis: i64) -> FileHandle { - FileHandle::new(FileMeta { - file_name: name.to_string(), - time_range: Some(( - Timestamp::new_millisecond(start_ts_millis), - Timestamp::new_millisecond(end_ts_millis), - )), - level: 0, - }) + let file_purger = new_noop_file_purger(); + let layer = Arc::new(crate::test_util::access_layer_util::MockAccessLayer {}); + FileHandle::new( + FileMeta { + region_id: 0, + file_name: name.to_string(), + time_range: Some(( + Timestamp::new_millisecond(start_ts_millis), + Timestamp::new_millisecond(end_ts_millis), + )), + level: 0, + }, + layer, + file_purger, + ) } fn new_file_handles(input: &[(&str, i64, i64)]) -> Vec { diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index c795197670..501c0a3314 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -17,6 +17,7 @@ use std::fmt::{Debug, Formatter}; use common_telemetry::{error, info}; use store_api::logstore::LogStore; +use store_api::storage::RegionId; use uuid::Uuid; use crate::compaction::writer::build_sst_reader; @@ -62,11 +63,12 @@ impl CompactionTaskImpl { async fn merge_ssts(&mut self) -> Result<(Vec, Vec)> { let mut futs = Vec::with_capacity(self.outputs.len()); let mut compacted_inputs = HashSet::new(); - + let region_id = self.shared_data.id(); for output in self.outputs.drain(..) { let schema = self.schema.clone(); let sst_layer = self.sst_layer.clone(); compacted_inputs.extend(output.inputs.iter().map(|f| FileMeta { + region_id, file_name: f.file_name().to_string(), time_range: *f.time_range(), level: f.level(), @@ -74,7 +76,7 @@ impl CompactionTaskImpl { // TODO(hl): Maybe spawn to runtime to exploit in-job parallelism. futs.push(async move { - match output.build(schema, sst_layer).await { + match output.build(region_id, schema, sst_layer).await { Ok(meta) => Ok(meta), Err(e) => Err(e), } @@ -118,7 +120,7 @@ impl CompactionTaskImpl { fn mark_files_compacting(&self, compacting: bool) { for o in &self.outputs { for input in &o.inputs { - input.set_compacting(compacting); + input.mark_compacting(compacting); } } } @@ -157,7 +159,12 @@ pub struct CompactionOutput { } impl CompactionOutput { - async fn build(&self, schema: RegionSchemaRef, sst_layer: AccessLayerRef) -> Result { + async fn build( + &self, + region_id: RegionId, + schema: RegionSchemaRef, + sst_layer: AccessLayerRef, + ) -> Result { let reader = build_sst_reader( schema, sst_layer.clone(), @@ -174,6 +181,7 @@ impl CompactionOutput { .await?; Ok(FileMeta { + region_id, file_name: output_file_name, time_range, level: self.output_level, diff --git a/src/storage/src/compaction/writer.rs b/src/storage/src/compaction/writer.rs index 880a13f2f3..61a891ff1a 100644 --- a/src/storage/src/compaction/writer.rs +++ b/src/storage/src/compaction/writer.rs @@ -97,6 +97,7 @@ mod tests { use tempdir::TempDir; use super::*; + use crate::file_purger::noop::new_noop_file_purger; use crate::memtable::{ DefaultMemtableBuilder, IterContext, KeyValues, Memtable, MemtableBuilder, }; @@ -224,11 +225,16 @@ mod tests { .write_sst(&sst::WriteOptions::default()) .await .unwrap(); - let handle = FileHandle::new(FileMeta { - file_name: sst_file_name.to_string(), - time_range, - level: 0, - }); + let handle = FileHandle::new( + FileMeta { + region_id: 0, + file_name: sst_file_name.to_string(), + time_range, + level: 0, + }, + Arc::new(crate::test_util::access_layer_util::MockAccessLayer {}), + new_noop_file_purger(), + ); seq.fetch_add(1, Ordering::Relaxed); handle } @@ -452,11 +458,16 @@ mod tests { let output_files = ["o1.parquet", "o2.parquet", "o3.parquet"] .into_iter() .map(|f| { - FileHandle::new(FileMeta { - file_name: f.to_string(), - level: 1, - time_range: None, - }) + FileHandle::new( + FileMeta { + region_id: 0, + file_name: f.to_string(), + level: 1, + time_range: None, + }, + Arc::new(crate::test_util::access_layer_util::MockAccessLayer {}), + new_noop_file_purger(), + ) }) .collect::>(); diff --git a/src/storage/src/config.rs b/src/storage/src/config.rs index 26f2741461..bec107489d 100644 --- a/src/storage/src/config.rs +++ b/src/storage/src/config.rs @@ -17,10 +17,14 @@ #[derive(Debug, Clone)] pub struct EngineConfig { pub max_files_in_l0: usize, + pub max_purge_tasks: usize, } impl Default for EngineConfig { fn default() -> Self { - Self { max_files_in_l0: 8 } + Self { + max_files_in_l0: 8, + max_purge_tasks: 32, + } } } diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index 0f8bd36c83..cd3050f1ee 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -28,11 +28,13 @@ use crate::background::JobPoolImpl; use crate::compaction::CompactionSchedulerRef; use crate::config::EngineConfig; use crate::error::{self, Error, Result}; +use crate::file_purger::{FilePurgeHandler, FilePurgerRef}; use crate::flush::{FlushSchedulerImpl, FlushSchedulerRef, FlushStrategyRef, SizeBasedStrategy}; use crate::manifest::region::RegionManifest; use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef}; use crate::metadata::RegionMetadata; use crate::region::{RegionImpl, StoreConfig}; +use crate::scheduler::{LocalScheduler, SchedulerConfig}; use crate::sst::FsAccessLayer; /// [StorageEngine] implementation. @@ -222,6 +224,7 @@ struct EngineInner { flush_scheduler: FlushSchedulerRef, flush_strategy: FlushStrategyRef, compaction_scheduler: CompactionSchedulerRef, + file_purger: FilePurgerRef, config: Arc, } @@ -234,6 +237,13 @@ impl EngineInner { ) -> Self { let job_pool = Arc::new(JobPoolImpl {}); let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool)); + + let file_purger = Arc::new(LocalScheduler::new( + SchedulerConfig { + max_inflight_tasks: config.max_purge_tasks, + }, + FilePurgeHandler, + )); Self { object_store, log_store, @@ -242,6 +252,7 @@ impl EngineInner { flush_scheduler, flush_strategy: Arc::new(SizeBasedStrategy::default()), compaction_scheduler, + file_purger, config: Arc::new(config), } } @@ -341,6 +352,7 @@ impl EngineInner { flush_strategy: self.flush_strategy.clone(), compaction_scheduler: self.compaction_scheduler.clone(), engine_config: self.config.clone(), + file_purger: self.file_purger.clone(), } } } diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 14d0e0a4bf..339711a4c6 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -420,11 +420,20 @@ pub enum Error { #[snafu(display("Scheduler rate limited, msg: {}", msg))] RateLimited { msg: String, backtrace: Backtrace }, - #[snafu(display("Failed to stop scheduler, source: {:?}", source))] + #[snafu(display("Cannot schedule request, scheduler's already stopped"))] + IllegalSchedulerState { backtrace: Backtrace }, + + #[snafu(display("Failed to stop scheduler, source: {}", source))] StopScheduler { source: JoinError, backtrace: Backtrace, }, + + #[snafu(display("Failed to delete SST file, source: {}", source))] + DeleteSst { + source: object_store::Error, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -497,6 +506,8 @@ impl ErrorExt for Error { DecodeParquetTimeRange { .. } => StatusCode::Unexpected, RateLimited { .. } => StatusCode::Internal, StopScheduler { .. } => StatusCode::Internal, + DeleteSst { .. } => StatusCode::StorageUnavailable, + IllegalSchedulerState { .. } => StatusCode::Unexpected, } } diff --git a/src/storage/src/file_purger.rs b/src/storage/src/file_purger.rs new file mode 100644 index 0000000000..f395e37914 --- /dev/null +++ b/src/storage/src/file_purger.rs @@ -0,0 +1,233 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_telemetry::{debug, error}; +use store_api::storage::RegionId; +use tokio::sync::Notify; + +use crate::scheduler::rate_limit::{BoxedRateLimitToken, RateLimitToken}; +use crate::scheduler::{Handler, LocalScheduler, Request}; +use crate::sst::AccessLayerRef; + +pub struct FilePurgeRequest { + pub region_id: RegionId, + pub file_name: String, + pub sst_layer: AccessLayerRef, +} + +impl Request for FilePurgeRequest { + type Key = String; + + fn key(&self) -> Self::Key { + format!("{}/{}", self.region_id, self.file_name) + } +} + +pub struct FilePurgeHandler; + +#[async_trait::async_trait] +impl Handler for FilePurgeHandler { + type Request = FilePurgeRequest; + + async fn handle_request( + &self, + req: Self::Request, + token: BoxedRateLimitToken, + finish_notifier: Arc, + ) -> crate::error::Result<()> { + req.sst_layer + .delete_sst(&req.file_name) + .await + .map_err(|e| { + error!(e; "Failed to delete SST file, file: {}, region: {}", req.file_name, req.region_id); + e + })?; + debug!( + "Successfully deleted SST file: {}, region: {}", + req.file_name, req.region_id + ); + token.try_release(); + finish_notifier.notify_one(); + Ok(()) + } +} + +pub type FilePurgerRef = Arc>; + +#[cfg(test)] +pub mod noop { + use std::sync::Arc; + + use tokio::sync::Notify; + + use crate::file_purger::{FilePurgeRequest, FilePurgerRef}; + use crate::scheduler::rate_limit::{BoxedRateLimitToken, RateLimitToken}; + use crate::scheduler::{Handler, LocalScheduler, SchedulerConfig}; + + pub fn new_noop_file_purger() -> FilePurgerRef { + Arc::new(LocalScheduler::new( + SchedulerConfig::default(), + NoopFilePurgeHandler, + )) + } + + #[derive(Debug)] + pub struct NoopFilePurgeHandler; + + #[async_trait::async_trait] + impl Handler for NoopFilePurgeHandler { + type Request = FilePurgeRequest; + + async fn handle_request( + &self, + _req: Self::Request, + token: BoxedRateLimitToken, + finish_notifier: Arc, + ) -> crate::error::Result<()> { + token.try_release(); + finish_notifier.notify_one(); + Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + use object_store::backend::fs::Builder; + use object_store::ObjectStore; + use store_api::storage::OpType; + use tempdir::TempDir; + + use super::*; + use crate::file_purger::noop::NoopFilePurgeHandler; + use crate::memtable::tests::{schema_for_test, write_kvs}; + use crate::memtable::{DefaultMemtableBuilder, IterContext, MemtableBuilder}; + use crate::scheduler::{Scheduler, SchedulerConfig}; + use crate::sst::{AccessLayer, FileHandle, FileMeta, FsAccessLayer, Source, WriteOptions}; + + struct MockRateLimitToken; + + impl RateLimitToken for MockRateLimitToken { + fn try_release(&self) {} + } + + async fn create_sst_file( + os: ObjectStore, + sst_file_name: &str, + file_purger: FilePurgerRef, + ) -> (FileHandle, String, AccessLayerRef) { + let schema = schema_for_test(); + let memtable = DefaultMemtableBuilder::default().build(schema.clone()); + + write_kvs( + &*memtable, + 10, + OpType::Put, + &[(1, 1), (2, 2)], + &[(Some(1), Some(1)), (Some(2), Some(2))], + ); + + let iter = memtable.iter(&IterContext::default()).unwrap(); + let sst_path = "table1"; + let layer = Arc::new(FsAccessLayer::new(sst_path, os.clone())); + let _sst_info = layer + .write_sst(sst_file_name, Source::Iter(iter), &WriteOptions {}) + .await + .unwrap(); + + ( + FileHandle::new( + FileMeta { + region_id: 0, + file_name: sst_file_name.to_string(), + time_range: None, + level: 0, + }, + layer.clone(), + file_purger, + ), + sst_path.to_string(), + layer as _, + ) + } + + #[tokio::test] + async fn test_file_purger_handler() { + let dir = TempDir::new("file-purge").unwrap(); + let object_store = ObjectStore::new( + Builder::default() + .root(dir.path().to_str().unwrap()) + .build() + .unwrap(), + ); + let sst_file_name = "test-file-purge-handler.parquet"; + + let noop_file_purger = Arc::new(LocalScheduler::new( + SchedulerConfig::default(), + NoopFilePurgeHandler, + )); + let (file, path, layer) = + create_sst_file(object_store.clone(), sst_file_name, noop_file_purger).await; + let request = FilePurgeRequest { + region_id: 0, + file_name: file.file_name().to_string(), + sst_layer: layer, + }; + + let handler = FilePurgeHandler; + let notify = Arc::new(Notify::new()); + handler + .handle_request(request, Box::new(MockRateLimitToken {}), notify.clone()) + .await + .unwrap(); + + notify.notified().await; + + let object = object_store.object(&format!("{}/{}", path, sst_file_name)); + assert!(!object.is_exist().await.unwrap()); + } + + #[tokio::test] + async fn test_file_purge_loop() { + common_telemetry::init_default_ut_logging(); + let dir = TempDir::new("file-purge").unwrap(); + let object_store = ObjectStore::new( + Builder::default() + .root(dir.path().to_str().unwrap()) + .build() + .unwrap(), + ); + let sst_file_name = "test-file-purger.parquet"; + let scheduler = Arc::new(LocalScheduler::new( + SchedulerConfig::default(), + FilePurgeHandler, + )); + let (handle, path, _layer) = + create_sst_file(object_store.clone(), sst_file_name, scheduler.clone()).await; + + { + // mark file as deleted and drop the handle, we expect the file is deleted. + handle.mark_deleted(); + drop(handle); + } + scheduler.stop(true).await.unwrap(); + assert!(!object_store + .object(&format!("{}/{}", path, sst_file_name)) + .is_exist() + .await + .unwrap()); + } +} diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 278a3aa4dc..2e1ac35a66 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -174,6 +174,7 @@ impl FlushJob { return CancelledSnafu {}.fail(); } + let region_id = self.shared.id(); let mut futures = Vec::with_capacity(self.memtables.len()); let iter_ctx = IterContext { for_flush: true, @@ -198,6 +199,7 @@ impl FlushJob { .await?; Ok(FileMeta { + region_id, file_name, time_range, level: 0, diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index a1b0382aae..85f00644da 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -40,3 +40,4 @@ mod wal; pub mod write_batch; pub use engine::EngineImpl; +mod file_purger; diff --git a/src/storage/src/manifest/test_utils.rs b/src/storage/src/manifest/test_utils.rs index cf960e78a9..1ff4a71028 100644 --- a/src/storage/src/manifest/test_utils.rs +++ b/src/storage/src/manifest/test_utils.rs @@ -41,6 +41,7 @@ pub fn build_region_edit( files_to_add: files_to_add .iter() .map(|f| FileMeta { + region_id: 0, file_name: f.to_string(), time_range: None, level: 0, @@ -49,6 +50,7 @@ pub fn build_region_edit( files_to_remove: files_to_remove .iter() .map(|f| FileMeta { + region_id: 0, file_name: f.to_string(), time_range: None, level: 0, diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 090d711b77..fb136a48a3 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -31,6 +31,7 @@ use store_api::storage::{ use crate::compaction::CompactionSchedulerRef; use crate::config::EngineConfig; use crate::error::{self, Error, Result}; +use crate::file_purger::FilePurgerRef; use crate::flush::{FlushSchedulerRef, FlushStrategyRef}; use crate::manifest::action::{ RawRegionMetadata, RegionChange, RegionMetaAction, RegionMetaActionList, @@ -122,6 +123,7 @@ pub struct StoreConfig { pub flush_strategy: FlushStrategyRef, pub compaction_scheduler: CompactionSchedulerRef, pub engine_config: Arc, + pub file_purger: FilePurgerRef, } pub type RecoverdMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata)); @@ -151,7 +153,13 @@ impl RegionImpl { let mutable_memtable = store_config .memtable_builder .build(metadata.schema().clone()); - let version = Version::with_manifest_version(metadata, manifest_version, mutable_memtable); + let version = Version::with_manifest_version( + metadata, + manifest_version, + mutable_memtable, + store_config.sst_layer.clone(), + store_config.file_purger.clone(), + ); let region = RegionImpl::new(version, store_config); Ok(region) @@ -198,6 +206,8 @@ impl RegionImpl { let (version, mut recovered_metadata) = match Self::recover_from_manifest( &store_config.manifest, &store_config.memtable_builder, + &store_config.sst_layer, + &store_config.file_purger, ) .await? { @@ -289,6 +299,8 @@ impl RegionImpl { async fn recover_from_manifest( manifest: &RegionManifest, memtable_builder: &MemtableBuilderRef, + sst_layer: &AccessLayerRef, + file_purger: &FilePurgerRef, ) -> Result<(Option, RecoveredMetadataMap)> { let (start, end) = Self::manifest_scan_range(); let mut iter = manifest.scan(start, end).await?; @@ -316,6 +328,8 @@ impl RegionImpl { Arc::new(region_metadata), last_manifest_version, memtable, + sst_layer.clone(), + file_purger.clone(), )); for (manifest_version, action) in actions.drain(..) { version = Self::replay_edit(manifest_version, action, version); diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 1041facb56..a48fac33e8 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -37,9 +37,12 @@ use store_api::storage::{ use tempdir::TempDir; use super::*; +use crate::file_purger::noop::NoopFilePurgeHandler; use crate::manifest::action::{RegionChange, RegionMetaActionList}; use crate::manifest::test_utils::*; use crate::memtable::DefaultMemtableBuilder; +use crate::scheduler::{LocalScheduler, SchedulerConfig}; +use crate::sst::FsAccessLayer; use crate::test_util::descriptor_util::RegionDescBuilder; use crate::test_util::{self, config_util, schema_util, write_batch_util}; @@ -285,17 +288,25 @@ async fn test_recover_region_manifets() { .unwrap(), ); - let manifest = RegionManifest::new("/manifest/", object_store); + let manifest = RegionManifest::new("/manifest/", object_store.clone()); let region_meta = Arc::new(build_region_meta()); + let sst_layer = Arc::new(FsAccessLayer::new("sst", object_store)) as _; + let file_purger = Arc::new(LocalScheduler::new( + SchedulerConfig::default(), + NoopFilePurgeHandler, + )); // Recover from empty - assert!( - RegionImpl::::recover_from_manifest(&manifest, &memtable_builder) - .await - .unwrap() - .0 - .is_none() - ); + assert!(RegionImpl::::recover_from_manifest( + &manifest, + &memtable_builder, + &sst_layer, + &file_purger + ) + .await + .unwrap() + .0 + .is_none()); { // save some actions into region_meta @@ -329,10 +340,14 @@ async fn test_recover_region_manifets() { } // try to recover - let (version, recovered_metadata) = - RegionImpl::::recover_from_manifest(&manifest, &memtable_builder) - .await - .unwrap(); + let (version, recovered_metadata) = RegionImpl::::recover_from_manifest( + &manifest, + &memtable_builder, + &sst_layer, + &file_purger, + ) + .await + .unwrap(); assert_eq!(42, *recovered_metadata.first_key_value().unwrap().0); let version = version.unwrap(); diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 188161f99b..9a4d2a574c 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -652,7 +652,7 @@ impl WriterInner { ); return; } - match compaction_scheduler.schedule(compaction_request).await { + match compaction_scheduler.schedule(compaction_request) { Ok(scheduled) => { info!( "Schedule region {} compaction request result: {}", diff --git a/src/storage/src/scheduler.rs b/src/storage/src/scheduler.rs index 3dfba7da60..cb6ccb3efe 100644 --- a/src/storage/src/scheduler.rs +++ b/src/storage/src/scheduler.rs @@ -14,17 +14,18 @@ use std::fmt::{Debug, Formatter}; use std::hash::Hash; +use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use async_trait::async_trait; use common_telemetry::{debug, error, info}; -use snafu::ResultExt; +use snafu::{ensure, ResultExt}; use tokio::sync::Notify; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use crate::error; -use crate::error::StopSchedulerSnafu; +use crate::error::{IllegalSchedulerStateSnafu, StopSchedulerSnafu}; use crate::scheduler::dedup_deque::DedupDeque; use crate::scheduler::rate_limit::{ BoxedRateLimitToken, CascadeRateLimiter, MaxInflightTaskLimiter, RateLimiter, @@ -58,28 +59,35 @@ pub trait Handler { #[async_trait] pub trait Scheduler: Debug { type Request; + /// Schedules a request. /// Returns true if request is scheduled. Returns false if task queue already /// contains the request with same key. - async fn schedule(&self, request: Self::Request) -> error::Result; + fn schedule(&self, request: Self::Request) -> error::Result; - /// Stops scheduler. - async fn stop(&self) -> error::Result<()>; + /// Stops scheduler. If `await_termination` is set to true, the scheduler will + /// wait until all queued requests are processed. + async fn stop(&self, await_termination: bool) -> error::Result<()>; } /// Scheduler config. #[derive(Debug)] pub struct SchedulerConfig { - pub max_inflight_task: usize, + pub max_inflight_tasks: usize, } impl Default for SchedulerConfig { fn default() -> Self { Self { - max_inflight_task: 4, + max_inflight_tasks: 4, } } } + +const STATE_RUNNING: u8 = 0; +const STATE_STOP: u8 = 1; +const STATE_AWAIT_TERMINATION: u8 = 2; + /// Request scheduler based on local state. pub struct LocalScheduler { /// Request FIFO with key deduplication. @@ -90,6 +98,8 @@ pub struct LocalScheduler { task_notifier: Arc, /// Join handle of spawned request handling loop. join_handle: Mutex>>, + /// State of scheduler. + state: Arc, } impl Debug for LocalScheduler @@ -108,7 +118,8 @@ where { type Request = R; - async fn schedule(&self, request: Self::Request) -> error::Result { + fn schedule(&self, request: Self::Request) -> error::Result { + ensure!(self.running(), IllegalSchedulerStateSnafu); debug!( "Schedule request: {:?}, queue size: {}", request.key(), @@ -120,7 +131,14 @@ where Ok(res) } - async fn stop(&self) -> error::Result<()> { + async fn stop(&self, await_termination: bool) -> error::Result<()> { + let state = if await_termination { + STATE_AWAIT_TERMINATION + } else { + STATE_STOP + }; + self.state.store(state, Ordering::Relaxed); + self.cancel_token.cancel(); let handle = { self.join_handle.lock().unwrap().take() }; if let Some(handle) = handle { @@ -142,15 +160,16 @@ where let request_queue = Arc::new(RwLock::new(DedupDeque::default())); let cancel_token = CancellationToken::new(); let task_notifier = Arc::new(Notify::new()); - + let state = Arc::new(AtomicU8::new(STATE_RUNNING)); let handle_loop = HandlerLoop { task_notifier: task_notifier.clone(), req_queue: request_queue.clone(), cancel_token: cancel_token.child_token(), limiter: Arc::new(CascadeRateLimiter::new(vec![Box::new( - MaxInflightTaskLimiter::new(config.max_inflight_task), + MaxInflightTaskLimiter::new(config.max_inflight_tasks), )])), request_handler: handler, + state: state.clone(), }; let join_handle = common_runtime::spawn_bg(async move { debug!("Task handler loop spawned"); @@ -161,6 +180,7 @@ where request_queue, cancel_token, task_notifier, + state, } } @@ -169,6 +189,11 @@ where fn remaining_requests(&self) -> usize { self.request_queue.read().unwrap().len() } + + #[inline] + fn running(&self) -> bool { + self.state.load(Ordering::Relaxed) == STATE_RUNNING + } } pub struct HandlerLoop { @@ -177,6 +202,7 @@ pub struct HandlerLoop { pub task_notifier: Arc, pub request_handler: H, pub limiter: Arc>, + pub state: Arc, } impl HandlerLoop @@ -186,36 +212,52 @@ where { /// Runs scheduled requests dispatch loop. pub async fn run(&self) { - let task_notifier = self.task_notifier.clone(); let limiter = self.limiter.clone(); - loop { + while self.running() { tokio::select! { - _ = task_notifier.notified() => { - // poll requests as many as possible until rate limited, and then wait for - // notification (some task's finished). - debug!("Notified, queue size: {:?}", self.req_queue.read().unwrap().len()); - while let Some((task_key, req)) = self.poll_task().await{ - if let Ok(token) = limiter.acquire_token(&req) { - debug!("Executing request: {:?}", task_key); - if let Err(e) = self.handle_request(req, token, self.task_notifier.clone()).await { - error!(e; "Failed to submit request: {:?}", task_key); - } else { - info!("Submitted task: {:?}", task_key); - } - } else { - // rate limited, put back to req queue to wait for next schedule - debug!("Put back request {:?}, queue size: {}", task_key, self.req_queue.read().unwrap().len()); - self.put_back_req(task_key, req).await; - break; - } - } + _ = self.task_notifier.notified() => { + debug!("Notified, queue size: {:?}",self.req_queue.read().unwrap().len()); + self.poll_and_execute(&limiter).await; } _ = self.cancel_token.cancelled() => { - info!("Task scheduler stopped."); - return; + info!("Task scheduler cancelled."); + break; } } } + // For correctness, we need to poll requests from fifo again. + if self.state.load(Ordering::Relaxed) == STATE_AWAIT_TERMINATION { + info!("Waiting for all pending tasks to finish."); + self.poll_and_execute(&limiter).await; + self.state.store(STATE_STOP, Ordering::Relaxed); + } + info!("Task scheduler stopped"); + } + + /// Polls and executes requests as many as possible until rate limited. + async fn poll_and_execute(&self, limiter: &Arc>) { + while let Some((task_key, req)) = self.poll_task().await { + if let Ok(token) = limiter.acquire_token(&req) { + debug!("Executing request: {:?}", task_key); + if let Err(e) = self + .handle_request(req, token, self.task_notifier.clone()) + .await + { + error!(e; "Failed to submit request: {:?}", task_key); + } else { + info!("Submitted task: {:?}", task_key); + } + } else { + // rate limited, put back to req queue to wait for next schedule + debug!( + "Put back request {:?}, queue size: {}", + task_key, + self.req_queue.read().unwrap().len() + ); + self.put_back_req(task_key, req).await; + break; + } + } } #[inline] @@ -242,4 +284,288 @@ where .handle_request(req, token, finish_notifier) .await } + + #[inline] + fn running(&self) -> bool { + self.state.load(Ordering::Relaxed) == STATE_RUNNING + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicI32; + use std::time::Duration; + + use store_api::storage::RegionId; + + use super::*; + use crate::scheduler::dedup_deque::DedupDeque; + use crate::scheduler::rate_limit::{ + BoxedRateLimitToken, CascadeRateLimiter, MaxInflightTaskLimiter, + }; + use crate::scheduler::{HandlerLoop, LocalScheduler, Scheduler, SchedulerConfig}; + + struct CountdownLatch { + counter: std::sync::Mutex, + notifies: std::sync::RwLock>>, + } + + impl CountdownLatch { + fn new(size: usize) -> Self { + Self { + counter: std::sync::Mutex::new(size), + notifies: std::sync::RwLock::new(vec![]), + } + } + + fn countdown(&self) { + let mut counter = self.counter.lock().unwrap(); + if *counter >= 1 { + *counter -= 1; + if *counter == 0 { + let notifies = self.notifies.read().unwrap(); + for waiter in notifies.iter() { + waiter.notify_one(); + } + } + } + } + + async fn wait(&self) { + let notify = Arc::new(Notify::new()); + { + let notify = notify.clone(); + let mut notifies = self.notifies.write().unwrap(); + notifies.push(notify); + } + notify.notified().await + } + } + + #[tokio::test] + async fn test_schedule_handler() { + common_telemetry::init_default_ut_logging(); + let queue = Arc::new(std::sync::RwLock::new(DedupDeque::default())); + let latch = Arc::new(CountdownLatch::new(2)); + let latch_cloned = latch.clone(); + let handler = Arc::new(HandlerLoop { + req_queue: queue.clone(), + cancel_token: Default::default(), + task_notifier: Arc::new(Default::default()), + request_handler: MockHandler { + cb: move || { + latch_cloned.countdown(); + }, + }, + limiter: Arc::new(CascadeRateLimiter::new(vec![Box::new( + MaxInflightTaskLimiter::new(3), + )])), + state: Arc::new(AtomicU8::default()), + }); + + let handler_cloned = handler.clone(); + common_runtime::spawn_bg(async move { handler_cloned.run().await }); + + queue.write().unwrap().push_back(1, MockRequest::default()); + handler.task_notifier.notify_one(); + queue.write().unwrap().push_back(2, MockRequest::default()); + handler.task_notifier.notify_one(); + + tokio::time::timeout(Duration::from_secs(1), latch.wait()) + .await + .unwrap(); + } + + #[derive(Default, Debug)] + struct MockRequest { + region_id: RegionId, + } + + struct MockHandler { + cb: F, + } + + #[async_trait::async_trait] + impl Handler for MockHandler + where + F: Fn() + Send + Sync, + { + type Request = MockRequest; + + async fn handle_request( + &self, + _req: Self::Request, + token: BoxedRateLimitToken, + finish_notifier: Arc, + ) -> error::Result<()> { + (self.cb)(); + token.try_release(); + finish_notifier.notify_one(); + Ok(()) + } + } + + impl Request for MockRequest { + type Key = RegionId; + + fn key(&self) -> Self::Key { + self.region_id + } + } + + #[tokio::test] + async fn test_scheduler() { + let latch = Arc::new(CountdownLatch::new(2)); + let latch_cloned = latch.clone(); + + let handler = MockHandler { + cb: move || { + latch_cloned.countdown(); + }, + }; + let scheduler: LocalScheduler = LocalScheduler::new( + SchedulerConfig { + max_inflight_tasks: 3, + }, + handler, + ); + + scheduler.schedule(MockRequest { region_id: 1 }).unwrap(); + + scheduler.schedule(MockRequest { region_id: 2 }).unwrap(); + + tokio::time::timeout(Duration::from_secs(1), latch.wait()) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_scheduler_many() { + common_telemetry::init_default_ut_logging(); + let task_size = 100; + + let latch = Arc::new(CountdownLatch::new(task_size)); + let latch_clone = latch.clone(); + + let handler = MockHandler { + cb: move || { + latch_clone.countdown(); + }, + }; + + let config = SchedulerConfig { + max_inflight_tasks: 3, + }; + let scheduler = LocalScheduler::new(config, handler); + + for i in 0..task_size { + scheduler + .schedule(MockRequest { + region_id: i as RegionId, + }) + .unwrap(); + } + + tokio::time::timeout(Duration::from_secs(3), latch.wait()) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_scheduler_interval() { + common_telemetry::init_default_ut_logging(); + let task_size = 100; + let latch = Arc::new(CountdownLatch::new(task_size)); + let latch_clone = latch.clone(); + + let handler = MockHandler { + cb: move || { + latch_clone.countdown(); + }, + }; + + let config = SchedulerConfig { + max_inflight_tasks: 3, + }; + let scheduler = LocalScheduler::new(config, handler); + + for i in 0..task_size / 2 { + scheduler + .schedule(MockRequest { + region_id: i as RegionId, + }) + .unwrap(); + } + + tokio::time::sleep(Duration::from_millis(100)).await; + for i in task_size / 2..task_size { + scheduler + .schedule(MockRequest { + region_id: i as RegionId, + }) + .unwrap(); + } + + tokio::time::timeout(Duration::from_secs(6), latch.wait()) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_schedule_duplicate_tasks() { + common_telemetry::init_default_ut_logging(); + let handler = MockHandler { cb: || {} }; + let config = SchedulerConfig { + max_inflight_tasks: 30, + }; + let scheduler = LocalScheduler::new(config, handler); + + let mut scheduled_task = 0; + for _ in 0..10 { + if scheduler.schedule(MockRequest { region_id: 1 }).unwrap() { + scheduled_task += 1; + } + } + scheduler.stop(true).await.unwrap(); + debug!("Schedule tasks: {}", scheduled_task); + assert!(scheduled_task < 10); + } + + #[tokio::test] + async fn test_await_termination() { + common_telemetry::init_default_ut_logging(); + + let finished = Arc::new(AtomicI32::new(0)); + let finished_clone = finished.clone(); + let handler = MockHandler { + cb: move || { + finished_clone.fetch_add(1, Ordering::Relaxed); + }, + }; + + let config = SchedulerConfig { + max_inflight_tasks: 3, + }; + let scheduler = Arc::new(LocalScheduler::new(config, handler)); + let scheduler_cloned = scheduler.clone(); + let task_scheduled = Arc::new(AtomicI32::new(0)); + let task_scheduled_cloned = task_scheduled.clone(); + + common_runtime::spawn_write(async move { + for i in 0..10000 { + if let Ok(res) = scheduler_cloned.schedule(MockRequest { + region_id: i as RegionId, + }) { + if res { + task_scheduled_cloned.fetch_add(1, Ordering::Relaxed); + } + } + } + }); + + tokio::time::sleep(Duration::from_millis(1)).await; + scheduler.stop(true).await.unwrap(); + let finished = finished.load(Ordering::Relaxed); + assert_eq!(finished, task_scheduled.load(Ordering::Relaxed)); + } } diff --git a/src/storage/src/scheduler/rate_limit.rs b/src/storage/src/scheduler/rate_limit.rs index c0c689e0ae..57efcbb140 100644 --- a/src/storage/src/scheduler/rate_limit.rs +++ b/src/storage/src/scheduler/rate_limit.rs @@ -45,16 +45,16 @@ pub type BoxedRateLimiter = Box + Send + Sync>; /// Limits max inflight tasks number. pub struct MaxInflightTaskLimiter { - max_inflight_task: usize, - inflight_task: Arc, + max_inflight_tasks: usize, + inflight_tasks: Arc, _phantom_data: PhantomData, } impl MaxInflightTaskLimiter { - pub fn new(max_inflight_task: usize) -> Self { + pub fn new(max_inflight_tasks: usize) -> Self { Self { - max_inflight_task, - inflight_task: Arc::new(AtomicUsize::new(0)), + max_inflight_tasks, + inflight_tasks: Arc::new(AtomicUsize::new(0)), _phantom_data: Default::default(), } } @@ -64,20 +64,20 @@ impl RateLimiter for MaxInflightTaskLimiter { type Request = R; fn acquire_token(&self, _: &Self::Request) -> Result { - if self.inflight_task.fetch_add(1, Ordering::Relaxed) >= self.max_inflight_task { - self.inflight_task.fetch_sub(1, Ordering::Relaxed); + if self.inflight_tasks.fetch_add(1, Ordering::Relaxed) >= self.max_inflight_tasks { + self.inflight_tasks.fetch_sub(1, Ordering::Relaxed); return RateLimitedSnafu { msg: format!( "Max inflight task num exceeds, current: {}, max: {}", - self.inflight_task.load(Ordering::Relaxed), - self.max_inflight_task + self.inflight_tasks.load(Ordering::Relaxed), + self.max_inflight_tasks ), } .fail(); } Ok(Box::new(MaxInflightLimiterToken::new( - self.inflight_task.clone(), + self.inflight_tasks.clone(), ))) } } @@ -160,14 +160,14 @@ mod tests { fn test_max_inflight_limiter() { let limiter = MaxInflightTaskLimiter::new(3); let t1 = limiter.acquire_token(&1).unwrap(); - assert_eq!(1, limiter.inflight_task.load(Ordering::Relaxed)); + assert_eq!(1, limiter.inflight_tasks.load(Ordering::Relaxed)); let _t2 = limiter.acquire_token(&1).unwrap(); - assert_eq!(2, limiter.inflight_task.load(Ordering::Relaxed)); + assert_eq!(2, limiter.inflight_tasks.load(Ordering::Relaxed)); let _t3 = limiter.acquire_token(&1).unwrap(); - assert_eq!(3, limiter.inflight_task.load(Ordering::Relaxed)); + assert_eq!(3, limiter.inflight_tasks.load(Ordering::Relaxed)); assert!(limiter.acquire_token(&1).is_err()); t1.try_release(); - assert_eq!(2, limiter.inflight_task.load(Ordering::Relaxed)); + assert_eq!(2, limiter.inflight_tasks.load(Ordering::Relaxed)); let _t4 = limiter.acquire_token(&1).unwrap(); } diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 92bf555b7a..25edf74e93 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -19,17 +19,21 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use async_trait::async_trait; +use common_telemetry::{error, info}; use common_time::range::TimestampRange; use common_time::Timestamp; use object_store::{util, ObjectStore}; use serde::{Deserialize, Serialize}; -use store_api::storage::ChunkReader; +use snafu::ResultExt; +use store_api::storage::{ChunkReader, RegionId}; use table::predicate::Predicate; use crate::chunk::ChunkReaderImpl; -use crate::error::Result; +use crate::error::{DeleteSstSnafu, Result}; +use crate::file_purger::{FilePurgeRequest, FilePurgerRef}; use crate::memtable::BoxedBatchIterator; use crate::read::{Batch, BoxedBatchReader}; +use crate::scheduler::Scheduler; use crate::schema::ProjectedSchemaRef; use crate::sst::parquet::{ParquetReader, ParquetWriter}; @@ -48,13 +52,17 @@ type LevelMetaVec = [LevelMeta; MAX_LEVEL as usize]; #[derive(Debug, Clone)] pub struct LevelMetas { levels: LevelMetaVec, + sst_layer: AccessLayerRef, + file_purger: FilePurgerRef, } impl LevelMetas { /// Create a new LevelMetas and initialized each level. - pub fn new() -> LevelMetas { + pub fn new(sst_layer: AccessLayerRef, file_purger: FilePurgerRef) -> LevelMetas { LevelMetas { levels: new_level_meta_vec(), + sst_layer, + file_purger, } } @@ -75,18 +83,21 @@ impl LevelMetas { /// Panics if level of [FileHandle] is greater than [MAX_LEVEL]. pub fn merge( &self, - files_to_add: impl Iterator, - files_to_remove: impl Iterator, + files_to_add: impl Iterator, + files_to_remove: impl Iterator, ) -> LevelMetas { let mut merged = self.clone(); for file in files_to_add { - let level = file.level(); - merged.levels[level as usize].add_file(file); + let level = file.level; + let handle = FileHandle::new(file, self.sst_layer.clone(), self.file_purger.clone()); + merged.levels[level as usize].add_file(handle); } for file in files_to_remove { - let level = file.level(); - merged.levels[level as usize].remove_file(file); + let level = file.level; + if let Some(removed_file) = merged.levels[level as usize].remove_file(&file.file_name) { + removed_file.mark_deleted(); + } } merged } @@ -96,12 +107,6 @@ impl LevelMetas { } } -impl Default for LevelMetas { - fn default() -> LevelMetas { - LevelMetas::new() - } -} - /// Metadata of files in same SST level. #[derive(Debug, Default, Clone)] pub struct LevelMeta { @@ -124,8 +129,8 @@ impl LevelMeta { self.files.insert(file.file_name().to_string(), file); } - fn remove_file(&mut self, file_to_remove: FileHandle) { - self.files.remove(file_to_remove.file_name()); + fn remove_file(&mut self, file_to_remove: &str) -> Option { + self.files.remove(file_to_remove) } /// Returns the level of level meta. @@ -160,9 +165,13 @@ pub struct FileHandle { } impl FileHandle { - pub fn new(meta: FileMeta) -> FileHandle { + pub fn new( + meta: FileMeta, + sst_layer: AccessLayerRef, + file_purger: FilePurgerRef, + ) -> FileHandle { FileHandle { - inner: Arc::new(FileHandleInner::new(meta)), + inner: Arc::new(FileHandleInner::new(meta, sst_layer, file_purger)), } } @@ -190,9 +199,19 @@ impl FileHandle { /// Sets the compacting flag. #[inline] - pub fn set_compacting(&self, compacting: bool) { + pub fn mark_compacting(&self, compacting: bool) { self.inner.compacting.store(compacting, Ordering::Relaxed); } + + #[inline] + pub fn deleted(&self) -> bool { + self.inner.deleted.load(Ordering::Relaxed) + } + + #[inline] + pub fn mark_deleted(&self) { + self.inner.deleted.store(true, Ordering::Relaxed); + } } /// Actually data of [FileHandle]. @@ -202,13 +221,46 @@ impl FileHandle { struct FileHandleInner { meta: FileMeta, compacting: AtomicBool, + deleted: AtomicBool, + sst_layer: AccessLayerRef, + file_purger: FilePurgerRef, +} + +impl Drop for FileHandleInner { + fn drop(&mut self) { + if self.deleted.load(Ordering::Relaxed) { + let request = FilePurgeRequest { + sst_layer: self.sst_layer.clone(), + file_name: self.meta.file_name.clone(), + region_id: self.meta.region_id, + }; + match self.file_purger.schedule(request) { + Ok(res) => { + info!( + "Scheduled SST purge task, region: {}, name: {}, res: {}", + self.meta.region_id, self.meta.file_name, res + ); + } + Err(e) => { + error!(e; "Failed to schedule SST purge task, region: {}, name: {}", self.meta.region_id, self.meta.file_name); + } + } + } + } } impl FileHandleInner { - fn new(meta: FileMeta) -> FileHandleInner { + fn new( + meta: FileMeta, + sst_layer: AccessLayerRef, + file_purger: FilePurgerRef, + ) -> FileHandleInner { FileHandleInner { meta, compacting: AtomicBool::new(false), + deleted: AtomicBool::new(false), + sst_layer, + file_purger, } } } @@ -216,7 +268,11 @@ impl FileHandleInner { /// Immutable metadata of a sst file. #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct FileMeta { + /// Region of file. + pub region_id: RegionId, + /// File name pub file_name: String, + /// Timestamp range of file. pub time_range: Option<(Timestamp, Timestamp)>, /// SST level of the file. pub level: Level, @@ -257,8 +313,8 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug { /// Read SST file with given `file_name` and schema. async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result; - /// Returns backend object store. - fn object_store(&self) -> ObjectStore; + /// Deletes a SST file with given name. + async fn delete_sst(&self, file_name: &str) -> Result<()>; } pub type AccessLayerRef = Arc; @@ -340,8 +396,10 @@ impl AccessLayer for FsAccessLayer { Ok(Box::new(stream)) } - fn object_store(&self) -> ObjectStore { - self.object_store.clone() + async fn delete_sst(&self, file_name: &str) -> Result<()> { + let path = self.sst_file_path(file_name); + let object = self.object_store.object(&path); + object.delete().await.context(DeleteSstSnafu) } } @@ -350,20 +408,28 @@ mod tests { use std::collections::HashSet; use super::*; + use crate::file_purger::noop::NoopFilePurgeHandler; + use crate::scheduler::{LocalScheduler, SchedulerConfig}; - fn create_handle(name: &str, level: Level) -> FileHandle { - FileHandle::new(FileMeta { + fn create_file_meta(name: &str, level: Level) -> FileMeta { + FileMeta { + region_id: 0, file_name: name.to_string(), time_range: None, level, - }) + } } #[test] fn test_level_metas_add_and_remove() { - let metas = LevelMetas::new(); + let layer = Arc::new(crate::test_util::access_layer_util::MockAccessLayer {}); + let purger = Arc::new(LocalScheduler::new( + SchedulerConfig::default(), + NoopFilePurgeHandler, + )); + let metas = LevelMetas::new(layer, purger); let merged = metas.merge( - vec![create_handle("a", 0), create_handle("b", 0)].into_iter(), + vec![create_file_meta("a", 0), create_file_meta("b", 0)].into_iter(), vec![].into_iter(), ); @@ -377,7 +443,7 @@ mod tests { ); let merged1 = merged.merge( - vec![create_handle("c", 1), create_handle("d", 1)].into_iter(), + vec![create_file_meta("c", 1), create_file_meta("d", 1)].into_iter(), vec![].into_iter(), ); assert_eq!( @@ -400,7 +466,7 @@ mod tests { let removed1 = merged1.merge( vec![].into_iter(), - vec![create_handle("a", 0), create_handle("c", 0)].into_iter(), + vec![create_file_meta("a", 0), create_file_meta("c", 0)].into_iter(), ); assert_eq!( HashSet::from(["b".to_string()]), @@ -422,7 +488,7 @@ mod tests { let removed2 = removed1.merge( vec![].into_iter(), - vec![create_handle("c", 1), create_handle("d", 1)].into_iter(), + vec![create_file_meta("c", 1), create_file_meta("d", 1)].into_iter(), ); assert_eq!( HashSet::from(["b".to_string()]), diff --git a/src/storage/src/test_util.rs b/src/storage/src/test_util.rs index bdf9483854..1acbda1487 100644 --- a/src/storage/src/test_util.rs +++ b/src/storage/src/test_util.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod access_layer_util; pub mod config_util; pub mod descriptor_util; pub mod flush_switch; diff --git a/src/storage/src/test_util/access_layer_util.rs b/src/storage/src/test_util/access_layer_util.rs new file mode 100644 index 0000000000..32ce28fc8b --- /dev/null +++ b/src/storage/src/test_util/access_layer_util.rs @@ -0,0 +1,43 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::read::BoxedBatchReader; +use crate::sst::{AccessLayer, ReadOptions, Source, SstInfo, WriteOptions}; + +#[derive(Debug)] +pub struct MockAccessLayer; + +#[async_trait::async_trait] +impl AccessLayer for MockAccessLayer { + async fn write_sst( + &self, + _file_name: &str, + _source: Source, + _opts: &WriteOptions, + ) -> crate::error::Result { + unimplemented!() + } + + async fn read_sst( + &self, + _file_name: &str, + _opts: &ReadOptions, + ) -> crate::error::Result { + unimplemented!() + } + + async fn delete_sst(&self, _file_name: &str) -> crate::error::Result<()> { + Ok(()) + } +} diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index ca3d3fb57c..261b7d3606 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -22,10 +22,12 @@ use object_store::ObjectStore; use crate::background::JobPoolImpl; use crate::compaction::noop::NoopCompactionScheduler; use crate::engine; +use crate::file_purger::noop::NoopFilePurgeHandler; use crate::flush::{FlushSchedulerImpl, SizeBasedStrategy}; use crate::manifest::region::RegionManifest; use crate::memtable::DefaultMemtableBuilder; use crate::region::StoreConfig; +use crate::scheduler::{LocalScheduler, SchedulerConfig}; use crate::sst::FsAccessLayer; fn log_store_dir(store_dir: &str) -> String { @@ -53,6 +55,10 @@ pub async fn new_store_config( }; let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap()); let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); + let file_purger = Arc::new(LocalScheduler::new( + SchedulerConfig::default(), + NoopFilePurgeHandler, + )); StoreConfig { log_store, sst_layer, @@ -62,5 +68,6 @@ pub async fn new_store_config( flush_strategy: Arc::new(SizeBasedStrategy::default()), compaction_scheduler, engine_config: Default::default(), + file_purger, } } diff --git a/src/storage/src/version.rs b/src/storage/src/version.rs index 6f84b0c5f5..1a915b7ca9 100644 --- a/src/storage/src/version.rs +++ b/src/storage/src/version.rs @@ -28,12 +28,12 @@ use common_telemetry::info; use store_api::manifest::ManifestVersion; use store_api::storage::{SchemaRef, SequenceNumber}; +use crate::file_purger::FilePurgerRef; use crate::memtable::{MemtableId, MemtableRef, MemtableVersion}; use crate::metadata::RegionMetadataRef; use crate::schema::RegionSchemaRef; -use crate::sst::{FileHandle, FileMeta, LevelMetas}; +use crate::sst::{AccessLayerRef, FileMeta, LevelMetas}; use crate::sync::CowCell; - pub const INIT_COMMITTED_SEQUENCE: u64 = 0; /// Controls version of in memory state for a region. @@ -167,7 +167,12 @@ impl Version { /// Create a new `Version` with given `metadata`. #[cfg(test)] pub fn new(metadata: RegionMetadataRef, memtable: MemtableRef) -> Version { - Version::with_manifest_version(metadata, 0, memtable) + let sst_layer = Arc::new(crate::test_util::access_layer_util::MockAccessLayer) as Arc<_>; + let file_purger = Arc::new(crate::scheduler::LocalScheduler::new( + crate::scheduler::SchedulerConfig::default(), + crate::file_purger::noop::NoopFilePurgeHandler, + )); + Version::with_manifest_version(metadata, 0, memtable, sst_layer, file_purger) } /// Create a new `Version` with given `metadata` and initial `manifest_version`. @@ -175,11 +180,13 @@ impl Version { metadata: RegionMetadataRef, manifest_version: ManifestVersion, mutable_memtable: MemtableRef, + sst_layer: AccessLayerRef, + file_purger: FilePurgerRef, ) -> Version { Version { metadata, memtables: Arc::new(MemtableVersion::new(mutable_memtable)), - ssts: Arc::new(LevelMetas::new()), + ssts: Arc::new(LevelMetas::new(sst_layer, file_purger)), flushed_sequence: 0, manifest_version, } @@ -236,11 +243,10 @@ impl Version { self.memtables = Arc::new(removed); } - let handles_to_add = edit.files_to_add.into_iter().map(FileHandle::new); - let merged_ssts = self.ssts.merge( - handles_to_add, - edit.files_to_remove.into_iter().map(FileHandle::new), - ); + let handles_to_add = edit.files_to_add.into_iter(); + let merged_ssts = self + .ssts + .merge(handles_to_add, edit.files_to_remove.into_iter()); info!( "After apply edit, region: {}, SST files: {:?}",