From ee3e1dbdaabeb56ef45e07b2b4b10f5fd375715a Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 10 May 2023 15:16:51 +0800 Subject: [PATCH] feat: Use LocalScheduler framework to implement FlushScheduler (#1531) * test: simplify countdownlatch * feat: impl Drop for LocalScheduler * feat(storage): Impl FlushRequest and FlushHandler * feat(storage): Use scheduler to handle flush job * chore(storage): remove unused code * feat(storage): Use new type pattern for RegionMap * feat(storage): Remove on_success callback * feat(storage): Address CR comments and add some metrics to flush --- Cargo.lock | 1 + src/storage/Cargo.toml | 1 + src/storage/src/background.rs | 120 ----------- src/storage/src/compaction/scheduler.rs | 3 +- src/storage/src/engine.rs | 118 ++++++----- src/storage/src/error.rs | 38 ++-- src/storage/src/flush.rs | 70 ++----- src/storage/src/flush/scheduler.rs | 252 +++++++++++++++++++++++ src/storage/src/lib.rs | 1 - src/storage/src/metrics.rs | 6 + src/storage/src/region.rs | 6 +- src/storage/src/region/writer.rs | 182 ++++++---------- src/storage/src/scheduler.rs | 33 +-- src/storage/src/test_util/config_util.rs | 9 +- 14 files changed, 463 insertions(+), 377 deletions(-) delete mode 100644 src/storage/src/background.rs create mode 100644 src/storage/src/flush/scheduler.rs diff --git a/Cargo.lock b/Cargo.lock index 2bfb7741eb..d42e699cbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8323,6 +8323,7 @@ dependencies = [ "futures-util", "lazy_static", "log-store", + "metrics", "object-store", "parquet", "paste", diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 1b2012d820..f8b924f4db 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -26,6 +26,7 @@ datafusion-expr.workspace = true futures.workspace = true futures-util.workspace = true lazy_static = "1.4" +metrics.workspace = true object-store = { path = "../object-store" } parquet = { workspace = true, features = ["async"] } paste.workspace = true diff --git a/src/storage/src/background.rs b/src/storage/src/background.rs deleted file mode 100644 index 2c04fd5c83..0000000000 --- a/src/storage/src/background.rs +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Background job management. - -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; - -use async_trait::async_trait; -use common_runtime::{self, JoinHandle}; -use snafu::ResultExt; - -use crate::error::{self, Result}; - -/// Background job context. -#[derive(Clone, Debug, Default)] -pub struct Context { - inner: Arc, -} - -impl Context { - fn new() -> Context { - Context::default() - } - - /// Marks this context as cancelled. - /// - /// Job accessing this context should check `is_cancelled()` and exit if it - /// returns true. - pub fn cancel(&self) { - self.inner.cancelled.store(false, Ordering::Relaxed); - } - - /// Returns true if this context is cancelled. - pub fn is_cancelled(&self) -> bool { - self.inner.cancelled.load(Ordering::Relaxed) - } -} - -#[derive(Debug, Default)] -struct ContextInner { - cancelled: AtomicBool, -} - -/// Handle to the background job. -#[derive(Debug)] -pub struct JobHandle { - ctx: Context, - handle: JoinHandle>, -} - -impl JobHandle { - /// Waits until this background job is finished. - pub async fn join(self) -> Result<()> { - self.handle.await.context(error::JoinTaskSnafu)? - } - - /// Cancels this background job gracefully and waits until it exits. - #[allow(unused)] - pub async fn cancel(self) -> Result<()> { - // Tokio also provides an [`abort()`](https://docs.rs/tokio/latest/tokio/task/struct.JoinHandle.html#method.abort) - // method to abort current task, consider using it if we need to abort a background job. - self.ctx.cancel(); - - self.join().await - } -} - -#[async_trait] -pub trait Job: Send { - async fn run(&mut self, ctx: &Context) -> Result<()>; -} - -type BoxedJob = Box; - -/// Thread pool that runs all background jobs. -#[async_trait] -pub trait JobPool: Send + Sync + std::fmt::Debug { - /// Submit a job to run in background. - /// - /// Returns the [JobHandle] to the job. - async fn submit(&self, job: BoxedJob) -> Result; - - /// Shutdown the manager, pending background jobs may be discarded. - async fn shutdown(&self) -> Result<()>; -} - -pub type JobPoolRef = Arc; - -#[derive(Debug)] -pub struct JobPoolImpl {} - -#[async_trait] -impl JobPool for JobPoolImpl { - async fn submit(&self, mut job: BoxedJob) -> Result { - // TODO(yingwen): [flush] Schedule background jobs to background workers, controlling parallelism. - - let ctx = Context::new(); - let job_ctx = ctx.clone(); - let handle = common_runtime::spawn_bg(async move { job.run(&job_ctx).await }); - - Ok(JobHandle { ctx, handle }) - } - - async fn shutdown(&self) -> Result<()> { - // TODO(yingwen): [flush] Stop background workers. - unimplemented!() - } -} diff --git a/src/storage/src/compaction/scheduler.rs b/src/storage/src/compaction/scheduler.rs index d5fe262451..286fce8d3c 100644 --- a/src/storage/src/compaction/scheduler.rs +++ b/src/storage/src/compaction/scheduler.rs @@ -44,7 +44,8 @@ impl Request for CompactionRequestImpl { fn complete(self, result: Result<()>) { if let Some(sender) = self.sender { - // We don't care the send result as + // We don't care the send result as callers might not + // wait the result. let _ = sender.send(result); } } diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index 0a80bcc1b7..c85984500f 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -26,12 +26,11 @@ use store_api::storage::{ CreateOptions, EngineContext, OpenOptions, Region, RegionDescriptor, StorageEngine, }; -use crate::background::JobPoolImpl; use crate::compaction::CompactionSchedulerRef; use crate::config::EngineConfig; use crate::error::{self, Error, Result}; use crate::file_purger::{FilePurgeHandler, FilePurgerRef}; -use crate::flush::{FlushSchedulerImpl, FlushSchedulerRef, FlushStrategyRef, SizeBasedStrategy}; +use crate::flush::{FlushScheduler, FlushSchedulerRef, FlushStrategyRef, SizeBasedStrategy}; use crate::manifest::region::RegionManifest; use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef}; use crate::metadata::RegionMetadata; @@ -132,7 +131,6 @@ enum RegionSlot { Opening, /// The region is ready for access. Ready(RegionImpl), - // TODO(yingwen): Closing state. } impl RegionSlot { @@ -181,12 +179,12 @@ impl Clone for RegionSlot { /// Used to update slot or clean the slot on failure. struct SlotGuard<'a, S: LogStore> { name: &'a str, - regions: &'a RwLock>, + regions: &'a RegionMap, skip_clean: bool, } impl<'a, S: LogStore> SlotGuard<'a, S> { - fn new(name: &'a str, regions: &'a RwLock>) -> SlotGuard<'a, S> { + fn new(name: &'a str, regions: &'a RegionMap) -> SlotGuard<'a, S> { SlotGuard { name, regions, @@ -196,13 +194,7 @@ impl<'a, S: LogStore> SlotGuard<'a, S> { /// Update the slot and skip cleaning on drop. fn update(&mut self, slot: RegionSlot) { - { - let mut regions = self.regions.write().unwrap(); - if let Some(old) = regions.get_mut(self.name) { - *old = slot; - } - } - + self.regions.update(self.name, slot); self.skip_clean = true; } } @@ -210,20 +202,70 @@ impl<'a, S: LogStore> SlotGuard<'a, S> { impl<'a, S: LogStore> Drop for SlotGuard<'a, S> { fn drop(&mut self) { if !self.skip_clean { - let mut regions = self.regions.write().unwrap(); - regions.remove(self.name); + self.regions.remove(self.name) } } } -type RegionMap = HashMap>; +/// Region slot map. +struct RegionMap(RwLock>>); + +impl RegionMap { + /// Returns a new region map. + fn new() -> RegionMap { + RegionMap(RwLock::new(HashMap::new())) + } + + /// Returns the `Some(slot)` if there is existing slot with given `name`, or insert + /// given `slot` and returns `None`. + fn get_or_occupy_slot(&self, name: &str, slot: RegionSlot) -> Option> { + { + // Try to get the region under read lock. + let regions = self.0.read().unwrap(); + if let Some(slot) = regions.get(name) { + return Some(slot.clone()); + } + } + + // Get the region under write lock. + let mut regions = self.0.write().unwrap(); + if let Some(slot) = regions.get(name) { + return Some(slot.clone()); + } + + // No slot in map, we can insert the slot now. + regions.insert(name.to_string(), slot); + + None + } + + /// Gets the region by the specific name. + fn get_region(&self, name: &str) -> Option> { + let slot = self.0.read().unwrap().get(name).cloned()?; + slot.get_ready_region() + } + + /// Update the slot by name. + fn update(&self, name: &str, slot: RegionSlot) { + let mut regions = self.0.write().unwrap(); + if let Some(old) = regions.get_mut(name) { + *old = slot; + } + } + + /// Remove region by name. + fn remove(&self, name: &str) { + let mut regions = self.0.write().unwrap(); + regions.remove(name); + } +} struct EngineInner { object_store: ObjectStore, log_store: Arc, - regions: RwLock>, + regions: Arc>, memtable_builder: MemtableBuilderRef, - flush_scheduler: FlushSchedulerRef, + flush_scheduler: FlushSchedulerRef, flush_strategy: FlushStrategyRef, compaction_scheduler: CompactionSchedulerRef, file_purger: FilePurgerRef, @@ -237,8 +279,11 @@ impl EngineInner { object_store: ObjectStore, compaction_scheduler: CompactionSchedulerRef, ) -> Self { - let job_pool = Arc::new(JobPoolImpl {}); - let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool)); + // TODO(yingwen): max inflight flush tasks. + let flush_scheduler = Arc::new(FlushScheduler::new( + SchedulerConfig::default(), + compaction_scheduler.clone(), + )); let file_purger = Arc::new(LocalScheduler::new( SchedulerConfig { @@ -249,7 +294,7 @@ impl EngineInner { Self { object_store, log_store, - regions: RwLock::new(Default::default()), + regions: Arc::new(RegionMap::new()), memtable_builder: Arc::new(DefaultMemtableBuilder::default()), flush_scheduler, flush_strategy: Arc::new(SizeBasedStrategy::default()), @@ -259,33 +304,10 @@ impl EngineInner { } } - /// Returns the `Some(slot)` if there is existing slot with given `name`, or insert - /// given `slot` and returns `None`. - fn get_or_occupy_slot(&self, name: &str, slot: RegionSlot) -> Option> { - { - // Try to get the region under read lock. - let regions = self.regions.read().unwrap(); - if let Some(slot) = regions.get(name) { - return Some(slot.clone()); - } - } - - // Get the region under write lock. - let mut regions = self.regions.write().unwrap(); - if let Some(slot) = regions.get(name) { - return Some(slot.clone()); - } - - // No slot in map, we can insert the slot now. - regions.insert(name.to_string(), slot); - - None - } - async fn open_region(&self, name: &str, opts: &OpenOptions) -> Result>> { // We can wait until the state of the slot has been changed to ready, but this will // make the code more complicate, so we just return the error here. - if let Some(slot) = self.get_or_occupy_slot(name, RegionSlot::Opening) { + if let Some(slot) = self.regions.get_or_occupy_slot(name, RegionSlot::Opening) { return slot.try_get_ready_region().map(Some); } @@ -320,7 +342,10 @@ impl EngineInner { descriptor: RegionDescriptor, opts: &CreateOptions, ) -> Result> { - if let Some(slot) = self.get_or_occupy_slot(&descriptor.name, RegionSlot::Creating) { + if let Some(slot) = self + .regions + .get_or_occupy_slot(&descriptor.name, RegionSlot::Creating) + { return slot.try_get_ready_region(); } @@ -359,8 +384,7 @@ impl EngineInner { } fn get_region(&self, name: &str) -> Option> { - let slot = self.regions.read().unwrap().get(name).cloned()?; - slot.get_ready_region() + self.regions.get_region(name) } async fn region_store_config( diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 3f3d1d78dd..9bef856610 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -138,21 +138,17 @@ pub enum Error { source: std::io::Error, }, - #[snafu(display("Failed to join task, source: {}", source))] - JoinTask { - source: common_runtime::JoinError, + #[snafu(display( + "Failed to wait flushing, region_id: {}, source: {}", + region_id, + source + ))] + WaitFlush { + region_id: RegionId, + source: tokio::sync::oneshot::error::RecvError, location: Location, }, - #[snafu(display("Task already cancelled"))] - Cancelled { location: Location }, - - #[snafu(display("Failed to cancel flush, source: {}", source))] - CancelFlush { - #[snafu(backtrace)] - source: BoxedError, - }, - #[snafu(display( "Manifest protocol forbid to read, min_version: {}, supported_version: {}", min_version, @@ -455,6 +451,17 @@ pub enum Error { region_id: RegionId, source: tokio::sync::oneshot::error::RecvError, }, + + #[snafu(display( + "The flush request is duplicate, region_id: {}, sequence: {}", + region_id, + sequence + ))] + DuplicateFlush { + region_id: RegionId, + sequence: SequenceNumber, + location: Location, + }, } pub type Result = std::result::Result; @@ -492,9 +499,7 @@ impl ErrorExt for Error { Utf8 { .. } | EncodeJson { .. } | DecodeJson { .. } - | JoinTask { .. } - | Cancelled { .. } - | CancelFlush { .. } + | WaitFlush { .. } | DecodeMetaActionList { .. } | Readline { .. } | WalDataCorrupted { .. } @@ -546,7 +551,8 @@ impl ErrorExt for Error { StartManifestGcTask { .. } | StopManifestGcTask { .. } - | IllegalSchedulerState { .. } => StatusCode::Unexpected, + | IllegalSchedulerState { .. } + | DuplicateFlush { .. } => StatusCode::Unexpected, TtlCalculation { source, .. } => source.status_code(), } diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index d4d87b2b74..2a212fe430 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -12,22 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::future::Future; -use std::pin::Pin; +mod scheduler; + use std::sync::Arc; -use async_trait::async_trait; -use common_telemetry::logging; +use common_telemetry::{logging, timer}; +pub use scheduler::{FlushHandle, FlushRequest, FlushScheduler, FlushSchedulerRef}; use store_api::logstore::LogStore; use store_api::storage::consts::WRITE_ROW_GROUP_SIZE; use store_api::storage::SequenceNumber; -use crate::background::{Context, Job, JobHandle, JobPoolRef}; use crate::config::EngineConfig; -use crate::error::{CancelledSnafu, Result}; +use crate::error::Result; use crate::manifest::action::*; use crate::manifest::region::RegionManifest; use crate::memtable::{IterContext, MemtableId, MemtableRef}; +use crate::metrics::FLUSH_ELAPSED; use crate::region::{RegionWriterRef, SharedDataRef}; use crate::sst::{AccessLayerRef, FileId, FileMeta, Source, SstInfo, WriteOptions}; use crate::wal::Wal; @@ -127,34 +127,6 @@ impl FlushStrategy for SizeBasedStrategy { } } -#[async_trait] -pub trait FlushScheduler: Send + Sync + std::fmt::Debug { - async fn schedule_flush(&self, flush_job: Box) -> Result; -} - -#[derive(Debug)] -pub struct FlushSchedulerImpl { - job_pool: JobPoolRef, -} - -impl FlushSchedulerImpl { - pub fn new(job_pool: JobPoolRef) -> FlushSchedulerImpl { - FlushSchedulerImpl { job_pool } - } -} - -#[async_trait] -impl FlushScheduler for FlushSchedulerImpl { - async fn schedule_flush(&self, flush_job: Box) -> Result { - // TODO(yingwen): [flush] Implements flush schedule strategy, controls max background flushes. - self.job_pool.submit(flush_job).await - } -} - -pub type FlushSchedulerRef = Arc; - -pub type FlushCallback = Pin + Send + 'static>>; - pub struct FlushJob { /// Max memtable id in these memtables, /// used to remove immutable memtables in current version. @@ -173,18 +145,22 @@ pub struct FlushJob { pub wal: Wal, /// Region manifest service, used to persist metadata. pub manifest: RegionManifest, - /// Callbacks that get invoked on flush success. - pub on_success: Option, /// Storage engine config pub engine_config: Arc, } impl FlushJob { - async fn write_memtables_to_layer(&mut self, ctx: &Context) -> Result> { - if ctx.is_cancelled() { - return CancelledSnafu {}.fail(); - } + /// Execute the flush job. + async fn run(&mut self) -> Result<()> { + let _timer = timer!(FLUSH_ELAPSED); + let file_metas = self.write_memtables_to_layer().await?; + self.write_manifest_and_apply(&file_metas).await?; + + Ok(()) + } + + async fn write_memtables_to_layer(&mut self) -> Result> { let region_id = self.shared.id(); let mut futures = Vec::with_capacity(self.memtables.len()); let iter_ctx = IterContext { @@ -258,20 +234,6 @@ impl FlushJob { } } -#[async_trait] -impl Job for FlushJob { - // TODO(yingwen): [flush] Support in-job parallelism (Flush memtables concurrently) - async fn run(&mut self, ctx: &Context) -> Result<()> { - let file_metas = self.write_memtables_to_layer(ctx).await?; - self.write_manifest_and_apply(&file_metas).await?; - - if let Some(cb) = self.on_success.take() { - cb.await; - } - Ok(()) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/storage/src/flush/scheduler.rs b/src/storage/src/flush/scheduler.rs new file mode 100644 index 0000000000..9eadd63333 --- /dev/null +++ b/src/storage/src/flush/scheduler.rs @@ -0,0 +1,252 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use common_telemetry::logging; +use metrics::increment_counter; +use snafu::{ensure, ResultExt}; +use store_api::logstore::LogStore; +use store_api::storage::{RegionId, SequenceNumber}; +use tokio::sync::oneshot::{Receiver, Sender}; +use tokio::sync::{oneshot, Notify}; + +use crate::compaction::{CompactionRequestImpl, CompactionSchedulerRef}; +use crate::config::EngineConfig; +use crate::error::{DuplicateFlushSnafu, Result, WaitFlushSnafu}; +use crate::flush::FlushJob; +use crate::manifest::region::RegionManifest; +use crate::memtable::{MemtableId, MemtableRef}; +use crate::metrics::{FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL}; +use crate::region; +use crate::region::{RegionWriterRef, SharedDataRef}; +use crate::scheduler::rate_limit::BoxedRateLimitToken; +use crate::scheduler::{Handler, LocalScheduler, Request, Scheduler, SchedulerConfig}; +use crate::sst::AccessLayerRef; +use crate::wal::Wal; + +/// Key for [FlushRequest], consist of a region id and the flush +/// sequence. +type FlushKey = (RegionId, SequenceNumber); + +/// Region flush request. +pub struct FlushRequest { + /// Max memtable id in these memtables, + /// used to remove immutable memtables in current version. + pub max_memtable_id: MemtableId, + /// Memtables to be flushed. + pub memtables: Vec, + /// Last sequence of data to be flushed. + pub flush_sequence: SequenceNumber, + /// Shared data of region to be flushed. + pub shared: SharedDataRef, + /// Sst access layer of the region. + pub sst_layer: AccessLayerRef, + /// Region writer, used to persist log entry that points to the latest manifest file. + pub writer: RegionWriterRef, + /// Region write-ahead logging, used to write data/meta to the log file. + pub wal: Wal, + /// Region manifest service, used to persist metadata. + pub manifest: RegionManifest, + /// Storage engine config + pub engine_config: Arc, + /// Flush result sender. Callers should set the sender to None. + pub sender: Option>>, + + // Compaction related options: + /// TTL of the region. + pub ttl: Option, + /// Time window for compaction. + pub compaction_time_window: Option, +} + +impl FlushRequest { + #[inline] + fn region_id(&self) -> RegionId { + self.shared.id() + } +} + +impl Request for FlushRequest { + type Key = FlushKey; + + #[inline] + fn key(&self) -> FlushKey { + (self.shared.id(), self.flush_sequence) + } + + fn complete(self, result: Result<()>) { + if let Some(sender) = self.sender { + let _ = sender.send(result); + } + } +} + +impl From<&FlushRequest> for FlushJob { + fn from(req: &FlushRequest) -> FlushJob { + FlushJob { + max_memtable_id: req.max_memtable_id, + memtables: req.memtables.clone(), + flush_sequence: req.flush_sequence, + shared: req.shared.clone(), + sst_layer: req.sst_layer.clone(), + writer: req.writer.clone(), + wal: req.wal.clone(), + manifest: req.manifest.clone(), + engine_config: req.engine_config.clone(), + } + } +} + +impl From<&FlushRequest> for CompactionRequestImpl { + fn from(req: &FlushRequest) -> CompactionRequestImpl { + CompactionRequestImpl { + region_id: req.region_id(), + sst_layer: req.sst_layer.clone(), + writer: req.writer.clone(), + shared: req.shared.clone(), + manifest: req.manifest.clone(), + wal: req.wal.clone(), + ttl: req.ttl, + compaction_time_window: req.compaction_time_window, + sender: None, + sst_write_buffer_size: req.engine_config.sst_write_buffer_size, + } + } +} + +/// A handle to get the flush result. +#[derive(Debug)] +pub struct FlushHandle { + region_id: RegionId, + receiver: Receiver>, +} + +impl FlushHandle { + /// Waits until the flush job is finished. + pub async fn wait(self) -> Result<()> { + self.receiver.await.context(WaitFlushSnafu { + region_id: self.region_id, + })? + } +} + +/// Flush scheduler. +pub struct FlushScheduler { + scheduler: LocalScheduler>, +} + +pub type FlushSchedulerRef = Arc>; + +impl FlushScheduler { + /// Returns a new [FlushScheduler]. + pub fn new(config: SchedulerConfig, compaction_scheduler: CompactionSchedulerRef) -> Self { + let handler = FlushHandler { + compaction_scheduler, + }; + Self { + scheduler: LocalScheduler::new(config, handler), + } + } + + /// Schedules a flush request and return the handle to the flush task. + /// + /// # Panics + /// Panics if `sender` of the `req` is not `None`. + pub fn schedule_flush(&self, mut req: FlushRequest) -> Result { + assert!(req.sender.is_none()); + + let region_id = req.region_id(); + let sequence = req.flush_sequence; + let (sender, receiver) = oneshot::channel(); + req.sender = Some(sender); + + let scheduled = self.scheduler.schedule(req)?; + // Normally we should not have duplicate flush request. + ensure!( + scheduled, + DuplicateFlushSnafu { + region_id, + sequence, + } + ); + + increment_counter!(FLUSH_REQUESTS_TOTAL); + + Ok(FlushHandle { + region_id, + receiver, + }) + } +} + +struct FlushHandler { + compaction_scheduler: CompactionSchedulerRef, +} + +#[async_trait::async_trait] +impl Handler for FlushHandler { + type Request = FlushRequest; + + async fn handle_request( + &self, + req: FlushRequest, + token: BoxedRateLimitToken, + finish_notifier: Arc, + ) -> Result<()> { + let compaction_scheduler = self.compaction_scheduler.clone(); + common_runtime::spawn_bg(async move { + execute_flush(req, compaction_scheduler).await; + + // releases rate limit token + token.try_release(); + // notify scheduler to schedule next task when current task finishes. + finish_notifier.notify_one(); + }); + + Ok(()) + } +} + +async fn execute_flush( + req: FlushRequest, + compaction_scheduler: CompactionSchedulerRef, +) { + let mut flush_job = FlushJob::from(&req); + + if let Err(e) = flush_job.run().await { + logging::error!(e; "Failed to flush regoin {}", req.region_id()); + + increment_counter!(FLUSH_ERRORS_TOTAL); + + req.complete(Err(e)); + } else { + logging::debug!("Successfully flush region: {}", req.region_id()); + + let compaction_request = CompactionRequestImpl::from(&req); + let max_files_in_l0 = req.engine_config.max_files_in_l0; + let shared_data = req.shared.clone(); + + // If flush is success, schedule a compaction request for this region. + region::schedule_compaction( + shared_data, + compaction_scheduler, + compaction_request, + max_files_in_l0, + ); + + req.complete(Ok(())); + } +} diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 9be0890bf4..bba07f7ef3 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -14,7 +14,6 @@ //! Storage engine implementation. -mod background; mod chunk; pub mod codec; pub mod compaction; diff --git a/src/storage/src/metrics.rs b/src/storage/src/metrics.rs index 89347f1711..d37af341d0 100644 --- a/src/storage/src/metrics.rs +++ b/src/storage/src/metrics.rs @@ -16,3 +16,9 @@ /// Elapsed time of updating manifest when creating regions. pub const CREATE_REGION_UPDATE_MANIFEST: &str = "storage.create_region.update_manifest"; +/// Counter of scheduled flush requests. +pub const FLUSH_REQUESTS_TOTAL: &str = "storage.flush.requests_total"; +/// Counter of scheduled failed flush jobs. +pub const FLUSH_ERRORS_TOTAL: &str = "storage.flush.errors_total"; +/// Elapsed time of a flush job. +pub const FLUSH_ELAPSED: &str = "storage.flush.elapsed"; diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 8c41d84f55..f9482e4748 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -42,6 +42,7 @@ use crate::manifest::action::{ use crate::manifest::region::RegionManifest; use crate::memtable::MemtableBuilderRef; use crate::metadata::{RegionMetaImpl, RegionMetadata, RegionMetadataRef}; +pub(crate) use crate::region::writer::schedule_compaction; pub use crate::region::writer::{AlterContext, RegionWriter, RegionWriterRef, WriterContext}; use crate::schema::compat::CompatWrite; use crate::snapshot::SnapshotImpl; @@ -72,7 +73,6 @@ impl fmt::Debug for RegionImpl { .field("name", &self.inner.shared.name) .field("wal", &self.inner.wal) .field("flush_strategy", &self.inner.flush_strategy) - .field("flush_scheduler", &self.inner.flush_scheduler) .field("compaction_scheduler", &self.inner.compaction_scheduler) .field("sst_layer", &self.inner.sst_layer) .field("manifest", &self.inner.manifest) @@ -150,7 +150,7 @@ pub struct StoreConfig { pub sst_layer: AccessLayerRef, pub manifest: RegionManifest, pub memtable_builder: MemtableBuilderRef, - pub flush_scheduler: FlushSchedulerRef, + pub flush_scheduler: FlushSchedulerRef, pub flush_strategy: FlushStrategyRef, pub compaction_scheduler: CompactionSchedulerRef, pub engine_config: Arc, @@ -575,7 +575,7 @@ struct RegionInner { writer: RegionWriterRef, wal: Wal, flush_strategy: FlushStrategyRef, - flush_scheduler: FlushSchedulerRef, + flush_scheduler: FlushSchedulerRef, compaction_scheduler: CompactionSchedulerRef, sst_layer: AccessLayerRef, manifest: RegionManifest, diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index dea40d251e..740c11305a 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -16,9 +16,7 @@ use std::sync::Arc; use std::time::Duration; use common_base::readable_size::ReadableSize; -use common_error::prelude::BoxedError; -use common_telemetry::tracing::log::{debug, info}; -use common_telemetry::{error, logging}; +use common_telemetry::logging; use futures::TryStreamExt; use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; @@ -26,11 +24,10 @@ use store_api::manifest::{Manifest, ManifestVersion, MetaAction}; use store_api::storage::{AlterRequest, FlushContext, SequenceNumber, WriteContext, WriteResponse}; use tokio::sync::{oneshot, Mutex}; -use crate::background::JobHandle; use crate::compaction::{CompactionRequestImpl, CompactionSchedulerRef}; use crate::config::EngineConfig; use crate::error::{self, Result}; -use crate::flush::{FlushCallback, FlushJob, FlushSchedulerRef, FlushStrategyRef}; +use crate::flush::{FlushHandle, FlushRequest, FlushSchedulerRef, FlushStrategyRef}; use crate::manifest::action::{ RawRegionMetadata, RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, }; @@ -42,7 +39,7 @@ use crate::region::{ }; use crate::schema::compat::CompatWrite; use crate::sst::AccessLayerRef; -use crate::version::{VersionControl, VersionControlRef, VersionEdit, VersionRef}; +use crate::version::{VersionControl, VersionControlRef, VersionEdit}; use crate::wal::Wal; use crate::write_batch::WriteBatch; @@ -267,7 +264,7 @@ impl RegionWriter { } // we release the writer lock once for rejecting any following potential writing requests immediately. - self.cancel_flush().await?; + self.wait_flush().await?; // TODO: canncel the compaction task @@ -288,7 +285,7 @@ impl RegionWriter { if ctx.wait { if let Some(handle) = inner.flush_handle.take() { - handle.join().await?; + handle.wait().await?; } } @@ -310,15 +307,12 @@ impl RegionWriter { .await } - /// Cancel flush task if any - async fn cancel_flush(&self) -> Result<()> { + /// Wait flush task if any + async fn wait_flush(&self) -> Result<()> { let mut inner = self.inner.lock().await; - if let Some(task) = inner.flush_handle.take() { - task.cancel() - .await - .map_err(BoxedError::new) - .context(error::CancelFlushSnafu)?; + if let Some(handle) = inner.flush_handle.take() { + handle.wait().await?; } Ok(()) @@ -328,7 +322,7 @@ impl RegionWriter { pub struct WriterContext<'a, S: LogStore> { pub shared: &'a SharedDataRef, pub flush_strategy: &'a FlushStrategyRef, - pub flush_scheduler: &'a FlushSchedulerRef, + pub flush_scheduler: &'a FlushSchedulerRef, pub compaction_scheduler: &'a CompactionSchedulerRef, pub sst_layer: &'a AccessLayerRef, pub wal: &'a Wal, @@ -359,7 +353,7 @@ impl<'a, S: LogStore> AlterContext<'a, S> { #[derive(Debug)] struct WriterInner { memtable_builder: MemtableBuilderRef, - flush_handle: Option, + flush_handle: Option, /// `WriterInner` will reject any future writing, if the closed flag is set. /// @@ -625,15 +619,13 @@ impl WriterInner { version_control.freeze_mutable(new_mutable); if let Some(flush_handle) = self.flush_handle.take() { - // Previous flush job is incomplete, wait util it is finished (write stall). + // Previous flush job is incomplete, wait util it is finished. // However the last flush job may fail, in which case, we just return error // and abort current write request. The flush handle is left empty, so the next // time we still have chance to trigger a new flush. - logging::info!("Write stall, region: {}", ctx.shared.name); - // TODO(yingwen): We should release the write lock during waiting flush done, which // needs something like async condvar. - flush_handle.join().await.map_err(|e| { + flush_handle.wait().await.map_err(|e| { logging::error!(e; "Previous flush job failed, region: {}", ctx.shared.name); e })?; @@ -647,15 +639,7 @@ impl WriterInner { return Ok(()); } - let cb = Self::build_flush_callback( - ¤t_version, - ctx, - &self.engine_config, - self.ttl, - self.compaction_time_window, - ); - - let flush_req = FlushJob { + let flush_req = FlushRequest { max_memtable_id: max_memtable_id.unwrap(), memtables: mem_to_flush, // In write thread, safe to use current committed sequence. @@ -665,14 +649,16 @@ impl WriterInner { writer: ctx.writer.clone(), wal: ctx.wal.clone(), manifest: ctx.manifest.clone(), - on_success: cb, engine_config: self.engine_config.clone(), + sender: None, + ttl: self.ttl, + compaction_time_window: self.compaction_time_window, }; - let flush_handle = ctx - .flush_scheduler - .schedule_flush(Box::new(flush_req)) - .await?; + let flush_handle = ctx.flush_scheduler.schedule_flush(flush_req).map_err(|e| { + logging::error!(e; "Failed to schedule flush request"); + e + })?; self.flush_handle = Some(flush_handle); Ok(()) @@ -711,106 +697,28 @@ impl WriterInner { let (sender, receiver) = oneshot::channel(); compaction_request.sender = Some(sender); - if Self::schedule_compaction( + if schedule_compaction( shared_data, compaction_scheduler, compaction_request, compact_ctx.max_files_in_l0, - ) - .await - { + ) { receiver .await .context(error::CompactTaskCancelSnafu { region_id })??; } } else { - Self::schedule_compaction( + schedule_compaction( shared_data, compaction_scheduler, compaction_request, compact_ctx.max_files_in_l0, - ) - .await; + ); } Ok(()) } - fn build_flush_callback( - version: &VersionRef, - ctx: &WriterContext, - config: &Arc, - ttl: Option, - compaction_time_window: Option, - ) -> Option { - let region_id = version.metadata().id(); - let compaction_request = CompactionRequestImpl { - region_id, - sst_layer: ctx.sst_layer.clone(), - writer: ctx.writer.clone(), - shared: ctx.shared.clone(), - manifest: ctx.manifest.clone(), - wal: ctx.wal.clone(), - ttl, - compaction_time_window, - sender: None, - sst_write_buffer_size: config.sst_write_buffer_size, - }; - let compaction_scheduler = ctx.compaction_scheduler.clone(); - let shared_data = ctx.shared.clone(); - let max_files_in_l0 = config.max_files_in_l0; - - let schedule_compaction_cb = Box::pin(async move { - Self::schedule_compaction( - shared_data, - compaction_scheduler, - compaction_request, - max_files_in_l0, - ) - .await; - }); - Some(schedule_compaction_cb) - } - - /// Schedule compaction task, returns whether the task is scheduled. - async fn schedule_compaction( - shared_data: SharedDataRef, - compaction_scheduler: CompactionSchedulerRef, - compaction_request: CompactionRequestImpl, - max_files_in_l0: usize, - ) -> bool { - let region_id = shared_data.id(); - let level0_file_num = shared_data - .version_control - .current() - .ssts() - .level(0) - .file_num(); - - if level0_file_num <= max_files_in_l0 { - debug!( - "No enough SST files in level 0 (threshold: {}), skip compaction", - max_files_in_l0 - ); - return false; - } - match compaction_scheduler.schedule(compaction_request) { - Ok(scheduled) => { - info!( - "Schedule region {} compaction request result: {}", - region_id, scheduled - ); - - scheduled - } - Err(e) => { - error!(e;"Failed to schedule region compaction request {}", region_id); - - false - } - } - } - async fn manual_flush(&mut self, writer_ctx: WriterContext<'_, S>) -> Result<()> { self.trigger_flush(&writer_ctx).await?; Ok(()) @@ -826,3 +734,43 @@ impl WriterInner { self.closed = true; } } + +/// Schedule compaction task, returns whether the task is scheduled. +pub(crate) fn schedule_compaction( + shared_data: SharedDataRef, + compaction_scheduler: CompactionSchedulerRef, + compaction_request: CompactionRequestImpl, + max_files_in_l0: usize, +) -> bool { + let region_id = shared_data.id(); + let level0_file_num = shared_data + .version_control + .current() + .ssts() + .level(0) + .file_num(); + + if level0_file_num <= max_files_in_l0 { + logging::debug!( + "No enough SST files in level 0 (threshold: {}), skip compaction", + max_files_in_l0 + ); + return false; + } + match compaction_scheduler.schedule(compaction_request) { + Ok(scheduled) => { + logging::info!( + "Schedule region {} compaction request result: {}", + region_id, + scheduled + ); + + scheduled + } + Err(e) => { + logging::error!(e;"Failed to schedule region compaction request {}", region_id); + + false + } + } +} diff --git a/src/storage/src/scheduler.rs b/src/storage/src/scheduler.rs index cb5faa9a62..33249fbe28 100644 --- a/src/storage/src/scheduler.rs +++ b/src/storage/src/scheduler.rs @@ -110,7 +110,20 @@ where R: Request + Send + Sync, { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LocalScheduler<...>").finish() + f.debug_struct("LocalScheduler") + .field("state", &self.state) + .finish() + } +} + +impl Drop for LocalScheduler +where + R: Request, +{ + fn drop(&mut self) { + self.state.store(STATE_STOP, Ordering::Relaxed); + + self.cancel_token.cancel(); } } @@ -310,14 +323,14 @@ mod tests { struct CountdownLatch { counter: std::sync::Mutex, - notifies: std::sync::RwLock>>, + notify: Notify, } impl CountdownLatch { fn new(size: usize) -> Self { Self { counter: std::sync::Mutex::new(size), - notifies: std::sync::RwLock::new(vec![]), + notify: Notify::new(), } } @@ -326,22 +339,14 @@ mod tests { if *counter >= 1 { *counter -= 1; if *counter == 0 { - let notifies = self.notifies.read().unwrap(); - for waiter in notifies.iter() { - waiter.notify_one(); - } + self.notify.notify_one(); } } } + /// Users should only call this once. async fn wait(&self) { - let notify = Arc::new(Notify::new()); - { - let notify = notify.clone(); - let mut notifies = self.notifies.write().unwrap(); - notifies.push(notify); - } - notify.notified().await + self.notify.notified().await } } diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index 4faab3af89..b969fe4608 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -20,11 +20,10 @@ use object_store::services::Fs; use object_store::ObjectStore; use store_api::manifest::Manifest; -use crate::background::JobPoolImpl; use crate::compaction::noop::NoopCompactionScheduler; use crate::engine; use crate::file_purger::noop::NoopFilePurgeHandler; -use crate::flush::{FlushSchedulerImpl, SizeBasedStrategy}; +use crate::flush::{FlushScheduler, SizeBasedStrategy}; use crate::manifest::region::RegionManifest; use crate::memtable::DefaultMemtableBuilder; use crate::region::StoreConfig; @@ -60,14 +59,16 @@ pub async fn new_store_config_with_object_store( let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone())); let manifest = RegionManifest::with_checkpointer(&manifest_dir, object_store, None, None); manifest.start().await.unwrap(); - let job_pool = Arc::new(JobPoolImpl {}); - let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool)); let log_config = LogConfig { log_file_dir: log_store_dir(store_dir), ..Default::default() }; let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap()); let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); + let flush_scheduler = Arc::new(FlushScheduler::new( + SchedulerConfig::default(), + compaction_scheduler.clone(), + )); let file_purger = Arc::new(LocalScheduler::new( SchedulerConfig::default(), NoopFilePurgeHandler,