From 1e9918ddf9c35bd0d43ca69296b0db0585e7ca4b Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Thu, 9 Feb 2023 11:43:20 +0800 Subject: [PATCH] feat: compaction scheduler and rate limiter (#947) * wip: compaction schdduler * feat: imple simple compaction scheduler * fix: typo * feat: add generic parameter to make scheduler friendly to tests * chore: add more tests * fix: CR comments * fix: CR comments * fix: ensure idempotency for rate limit token * fix: Cr ct omments --- Cargo.lock | 1 + Cargo.toml | 1 + src/log-store/Cargo.toml | 2 +- src/storage/Cargo.toml | 1 + src/storage/src/compaction.rs | 19 + src/storage/src/compaction/dedup_deque.rs | 101 +++++ src/storage/src/compaction/picker.rs | 67 ++++ src/storage/src/compaction/rate_limit.rs | 186 +++++++++ src/storage/src/compaction/scheduler.rs | 455 ++++++++++++++++++++++ src/storage/src/compaction/task.rs | 70 ++++ src/storage/src/error.rs | 12 + src/storage/src/lib.rs | 1 + 12 files changed, 915 insertions(+), 1 deletion(-) create mode 100644 src/storage/src/compaction.rs create mode 100644 src/storage/src/compaction/dedup_deque.rs create mode 100644 src/storage/src/compaction/picker.rs create mode 100644 src/storage/src/compaction/rate_limit.rs create mode 100644 src/storage/src/compaction/scheduler.rs create mode 100644 src/storage/src/compaction/task.rs diff --git a/Cargo.lock b/Cargo.lock index c210b1a6f4..c792e77f64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7075,6 +7075,7 @@ dependencies = [ "table", "tempdir", "tokio", + "tokio-util", "tonic", "tonic-build", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 0c8fcb60fe..e2f1c55c45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,7 @@ serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } sqlparser = "0.28" tokio = { version = "1.24.2", features = ["full"] } +tokio-util = "0.7" tonic = "0.8" uuid = { version = "1", features = ["serde", "v4", "fast-rng"] } diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index d9dcf32d4d..f7a83e5a5b 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -30,7 +30,7 @@ snafu = { version = "0.7", features = ["backtraces"] } store-api = { path = "../store-api" } tempdir = "0.3" tokio.workspace = true -tokio-util = "0.7" +tokio-util.workspace = true [dev-dependencies] rand = "0.8" diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index bdda0431a9..b19a2012c9 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -34,6 +34,7 @@ snafu = { version = "0.7", features = ["backtraces"] } store-api = { path = "../store-api" } table = { path = "../table" } tokio.workspace = true +tokio-util.workspace = true tonic.workspace = true uuid.workspace = true diff --git a/src/storage/src/compaction.rs b/src/storage/src/compaction.rs new file mode 100644 index 0000000000..93a944affe --- /dev/null +++ b/src/storage/src/compaction.rs @@ -0,0 +1,19 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod dedup_deque; +mod picker; +mod rate_limit; +mod scheduler; +mod task; diff --git a/src/storage/src/compaction/dedup_deque.rs b/src/storage/src/compaction/dedup_deque.rs new file mode 100644 index 0000000000..f129d5556a --- /dev/null +++ b/src/storage/src/compaction/dedup_deque.rs @@ -0,0 +1,101 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_map::Entry; +use std::collections::{HashMap, VecDeque}; +use std::fmt::{Debug, Formatter}; +use std::hash::Hash; + +/// Deque with key deduplication. +#[derive(Default)] +pub struct DedupDeque { + deque: VecDeque, + existing: HashMap, +} + +impl DedupDeque { + /// Pushes a key value to the back of deque. + /// Returns true if the deque does not already contain value with the same key, otherwise + /// returns false. + pub fn push_back(&mut self, key: K, value: V) -> bool { + debug_assert_eq!(self.deque.len(), self.existing.len()); + if let Entry::Vacant(entry) = self.existing.entry(key.clone()) { + entry.insert(value); + self.deque.push_back(key); + return true; + } + false + } + + /// Pushes a key value to the front of deque. + /// Returns true if the deque does not already contain value with the same key, otherwise + /// returns false. + pub fn push_front(&mut self, key: K, value: V) -> bool { + if let Entry::Vacant(entry) = self.existing.entry(key.clone()) { + entry.insert(value); + self.deque.push_front(key); + return true; + } + false + } + + /// Pops a pair from the back of deque. Returns [None] if the deque is empty. + pub fn pop_front(&mut self) -> Option<(K, V)> { + debug_assert_eq!(self.deque.len(), self.existing.len()); + let key = self.deque.pop_front()?; + let value = self.existing.remove(&key)?; + Some((key, value)) + } + + pub fn len(&self) -> usize { + debug_assert_eq!(self.deque.len(), self.existing.len()); + self.deque.len() + } +} + +impl Debug for DedupDeque +where + K: Debug, + V: Debug, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DedupDeque") + .field("deque", &self.deque) + .field("existing", &self.existing) + .finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dedup_deque() { + let mut deque = DedupDeque::default(); + assert!(deque.push_back(1, "hello".to_string())); + assert_eq!(1, deque.len()); + assert!(deque.push_back(2, "world".to_string())); + assert_eq!(2, deque.len()); + assert_eq!((1, "hello".to_string()), deque.pop_front().unwrap()); + assert_eq!(1, deque.len()); + assert_eq!((2, "world".to_string()), deque.pop_front().unwrap()); + assert_eq!(0, deque.len()); + + // insert duplicated item + assert!(deque.push_back(1, "hello".to_string())); + assert!(!deque.push_back(1, "world".to_string())); + assert_eq!((1, "hello".to_string()), deque.pop_front().unwrap()); + } +} diff --git a/src/storage/src/compaction/picker.rs b/src/storage/src/compaction/picker.rs new file mode 100644 index 0000000000..7b84c593c5 --- /dev/null +++ b/src/storage/src/compaction/picker.rs @@ -0,0 +1,67 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::compaction::scheduler::CompactionRequestImpl; +use crate::compaction::task::{CompactionTask, CompactionTaskImpl}; + +/// Picker picks input SST files and build the compaction task. +/// Different compaction strategy may implement different pickers. +pub trait Picker: Send + 'static { + fn pick(&self, req: &R) -> crate::error::Result; +} + +/// L0 -> L1 all-to-all compaction based on time windows. +pub(crate) struct SimplePicker {} + +#[allow(unused)] +impl SimplePicker { + pub fn new() -> Self { + Self {} + } +} + +impl Picker for SimplePicker { + fn pick(&self, _req: &CompactionRequestImpl) -> crate::error::Result { + todo!() + } +} + +#[cfg(test)] +pub mod tests { + use std::marker::PhantomData; + + use super::*; + use crate::compaction::scheduler::CompactionRequest; + use crate::compaction::task::tests::{CallbackRef, NoopCompactionTask}; + + pub(crate) struct MockPicker { + pub cbs: Vec, + _phantom_data: PhantomData, + } + + impl MockPicker { + pub fn new(cbs: Vec) -> Self { + Self { + cbs, + _phantom_data: Default::default(), + } + } + } + + impl Picker for MockPicker { + fn pick(&self, _req: &R) -> crate::error::Result { + Ok(NoopCompactionTask::new(self.cbs.clone())) + } + } +} diff --git a/src/storage/src/compaction/rate_limit.rs b/src/storage/src/compaction/rate_limit.rs new file mode 100644 index 0000000000..6b96afe7ee --- /dev/null +++ b/src/storage/src/compaction/rate_limit.rs @@ -0,0 +1,186 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::marker::PhantomData; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; + +use crate::error::{CompactionRateLimitedSnafu, Result}; + +pub trait RateLimitToken { + /// Releases the token. + /// ### Note + /// Implementation should guarantee the idempotency. + fn try_release(&self); +} + +pub type BoxedRateLimitToken = Box; + +impl RateLimitToken for Box { + fn try_release(&self) { + (**self).try_release() + } +} + +/// Rate limiter +pub trait RateLimiter { + type Request; + + /// Acquires a token from rate limiter. Returns `Err` on failure. + fn acquire_token(&self, req: &Self::Request) -> Result; +} + +pub type BoxedRateLimiter = Box + Send + Sync>; + +/// Limits max inflight tasks number. +pub struct MaxInflightTaskLimiter { + max_inflight_task: usize, + inflight_task: Arc, + _phantom_data: PhantomData, +} + +#[allow(unused)] +impl MaxInflightTaskLimiter { + pub fn new(max_inflight_task: usize) -> Self { + Self { + max_inflight_task, + inflight_task: Arc::new(AtomicUsize::new(0)), + _phantom_data: Default::default(), + } + } +} + +impl RateLimiter for MaxInflightTaskLimiter { + type Request = R; + + fn acquire_token(&self, _: &Self::Request) -> Result { + if self.inflight_task.fetch_add(1, Ordering::Relaxed) >= self.max_inflight_task { + self.inflight_task.fetch_sub(1, Ordering::Relaxed); + return CompactionRateLimitedSnafu { + msg: format!( + "Max inflight task num exceeds, current: {}, max: {}", + self.inflight_task.load(Ordering::Relaxed), + self.max_inflight_task + ), + } + .fail(); + } + + Ok(Box::new(MaxInflightLimiterToken::new( + self.inflight_task.clone(), + ))) + } +} + +pub struct MaxInflightLimiterToken { + counter: Arc, + released: AtomicBool, +} + +impl MaxInflightLimiterToken { + pub fn new(counter: Arc) -> Self { + Self { + counter, + released: AtomicBool::new(false), + } + } +} + +impl RateLimitToken for MaxInflightLimiterToken { + fn try_release(&self) { + if self + .released + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + self.counter.fetch_sub(1, Ordering::Relaxed); + } + } +} + +/// A composite rate limiter that allows token acquisition only when all internal limiters allow. +pub struct CascadeRateLimiter { + limits: Vec>, +} + +impl CascadeRateLimiter { + pub fn new(limits: Vec>) -> Self { + Self { limits } + } +} + +impl RateLimiter for CascadeRateLimiter { + type Request = T; + + fn acquire_token(&self, req: &Self::Request) -> Result { + let mut res = vec![]; + for limit in &self.limits { + match limit.acquire_token(req) { + Ok(token) => { + res.push(token); + } + Err(e) => { + res.iter().for_each(RateLimitToken::try_release); + return Err(e); + } + } + } + Ok(Box::new(CompositeToken { tokens: res })) + } +} + +/// Composite token that releases all acquired token when released. +pub struct CompositeToken { + tokens: Vec, +} + +impl RateLimitToken for CompositeToken { + fn try_release(&self) { + for token in &self.tokens { + token.try_release(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_max_inflight_limiter() { + let limiter = MaxInflightTaskLimiter::new(3); + let t1 = limiter.acquire_token(&1).unwrap(); + assert_eq!(1, limiter.inflight_task.load(Ordering::Relaxed)); + let _t2 = limiter.acquire_token(&1).unwrap(); + assert_eq!(2, limiter.inflight_task.load(Ordering::Relaxed)); + let _t3 = limiter.acquire_token(&1).unwrap(); + assert_eq!(3, limiter.inflight_task.load(Ordering::Relaxed)); + assert!(limiter.acquire_token(&1).is_err()); + t1.try_release(); + assert_eq!(2, limiter.inflight_task.load(Ordering::Relaxed)); + let _t4 = limiter.acquire_token(&1).unwrap(); + } + + #[test] + fn test_cascade_limiter() { + let limiter: CascadeRateLimiter = + CascadeRateLimiter::new(vec![Box::new(MaxInflightTaskLimiter::new(3))]); + let t1 = limiter.acquire_token(&1).unwrap(); + let _t2 = limiter.acquire_token(&1).unwrap(); + let _t3 = limiter.acquire_token(&1).unwrap(); + assert!(limiter.acquire_token(&1).is_err()); + t1.try_release(); + let _t4 = limiter.acquire_token(&1).unwrap(); + } +} diff --git a/src/storage/src/compaction/scheduler.rs b/src/storage/src/compaction/scheduler.rs new file mode 100644 index 0000000000..c22d977894 --- /dev/null +++ b/src/storage/src/compaction/scheduler.rs @@ -0,0 +1,455 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::marker::PhantomData; +use std::sync::{Arc, Mutex, RwLock}; + +use async_trait::async_trait; +use common_telemetry::{debug, info}; +use snafu::ResultExt; +use table::metadata::TableId; +use tokio::sync::Notify; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; + +use crate::compaction::dedup_deque::DedupDeque; +use crate::compaction::picker::Picker; +use crate::compaction::rate_limit::{ + BoxedRateLimitToken, CascadeRateLimiter, MaxInflightTaskLimiter, RateLimitToken, RateLimiter, +}; +use crate::compaction::task::CompactionTask; +use crate::error::{Result, StopCompactionSchedulerSnafu}; + +/// Table compaction request. +#[derive(Default)] +pub struct CompactionRequestImpl { + table_id: TableId, +} + +impl CompactionRequest for CompactionRequestImpl { + #[inline] + fn table_id(&self) -> TableId { + self.table_id + } +} + +pub trait CompactionRequest: Send + Sync + Default + 'static { + fn table_id(&self) -> TableId; +} + +#[derive(Debug)] +pub struct CompactionSchedulerConfig { + max_inflight_task: usize, +} + +impl Default for CompactionSchedulerConfig { + fn default() -> Self { + Self { + max_inflight_task: 16, + } + } +} + +/// CompactionScheduler defines a set of API to schedule compaction tasks. +#[async_trait] +pub trait CompactionScheduler { + /// Schedules a compaction request. + /// Returns true if request is scheduled. Returns false if task queue already + /// contains the request with same table id. + async fn schedule(&self, request: R) -> Result; + + /// Stops compaction scheduler. + async fn stop(&self) -> Result<()>; +} + +/// Compaction task scheduler based on local state. +#[allow(unused)] +pub struct LocalCompactionScheduler { + request_queue: Arc>>, + cancel_token: CancellationToken, + task_notifier: Arc, + join_handle: Mutex>>, +} + +#[async_trait] +impl CompactionScheduler for LocalCompactionScheduler +where + R: CompactionRequest + Send + Sync, +{ + async fn schedule(&self, request: R) -> Result { + debug!( + "Schedule request: {}, queue size: {}", + request.table_id(), + self.remaining_requests().await + ); + let mut queue = self.request_queue.write().unwrap(); + let res = queue.push_back(request.table_id(), request); + self.task_notifier.notify_one(); + Ok(res) + } + + async fn stop(&self) -> Result<()> { + self.cancel_token.cancel(); + let handle = { self.join_handle.lock().unwrap().take() }; + if let Some(handle) = handle { + handle.await.context(StopCompactionSchedulerSnafu)?; + } + Ok(()) + } +} + +#[allow(unused)] +impl LocalCompactionScheduler +where + R: CompactionRequest, +{ + pub fn new(config: CompactionSchedulerConfig, picker: P) -> Self + where + T: CompactionTask, + P: Picker + Send + Sync, + { + let request_queue: Arc>> = + Arc::new(RwLock::new(DedupDeque::default())); + let cancel_token = CancellationToken::new(); + let task_notifier = Arc::new(Notify::new()); + + let handler = CompactionHandler { + task_notifier: task_notifier.clone(), + req_queue: request_queue.clone(), + cancel_token: cancel_token.child_token(), + limiter: Arc::new(CascadeRateLimiter::new(vec![Box::new( + MaxInflightTaskLimiter::new(config.max_inflight_task), + )])), + picker, + _phantom_data: PhantomData::::default(), + }; + let join_handle = common_runtime::spawn_bg(async move { + debug!("Compaction handler loop spawned"); + handler.run().await; + }); + Self { + join_handle: Mutex::new(Some(join_handle)), + request_queue, + cancel_token, + task_notifier, + } + } + + async fn remaining_requests(&self) -> usize { + self.request_queue.read().unwrap().len() + } +} + +#[allow(unused)] +struct CompactionHandler> { + req_queue: Arc>>, + cancel_token: CancellationToken, + task_notifier: Arc, + limiter: Arc>, + picker: P, + _phantom_data: PhantomData, +} + +#[allow(unused)] +impl> CompactionHandler { + /// Runs table compaction requests dispatch loop. + pub async fn run(&self) { + let task_notifier = self.task_notifier.clone(); + let limiter = self.limiter.clone(); + loop { + tokio::select! { + _ = task_notifier.notified() => { + // poll requests as many as possible until rate limited, and then wait for + // notification (some task's finished). + debug!("Notified, queue size: {:?}", self.req_queue.read().unwrap().len()); + while let Some((table_id, req)) = self.poll_task().await { + if let Ok(token) = limiter.acquire_token(&req) { + debug!("Executing compaction request: {}", table_id); + self.handle_compaction_request(req, token).await; + } else { + // compaction rate limited, put back to req queue to wait for next + // schedule + debug!("Put back request {}, queue size: {}", table_id, self.req_queue.read().unwrap().len()); + self.put_back_req(table_id, req).await; + break; + } + } + } + _ = self.cancel_token.cancelled() => { + info!("Compaction tasks scheduler stopped."); + return; + } + } + } + } + + #[inline] + async fn poll_task(&self) -> Option<(TableId, R)> { + let mut queue = self.req_queue.write().unwrap(); + queue.pop_front() + } + + /// Puts request back to the front of request queue. + #[inline] + async fn put_back_req(&self, table_id: TableId, req: R) { + let mut queue = self.req_queue.write().unwrap(); + queue.push_front(table_id, req); + } + + // Handles compaction request, submit task to bg runtime. + async fn handle_compaction_request( + &self, + mut req: R, + token: BoxedRateLimitToken, + ) -> Result<()> { + let cloned_notify = self.task_notifier.clone(); + let task = self.build_compaction_task(req).await?; + + // TODO(hl): we need to keep a track of task handle here to allow task cancellation. + common_runtime::spawn_bg(async move { + task.run().await; // TODO(hl): handle errors + + // releases rate limit token + token.try_release(); + // notify scheduler to schedule next task when current task finishes. + cloned_notify.notify_one(); + }); + + Ok(()) + } + + // TODO(hl): generate compaction task(find SSTs to compact along with the output of compaction) + async fn build_compaction_task(&self, req: R) -> crate::error::Result { + self.picker.pick(&req) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + use crate::compaction::picker::tests::MockPicker; + use crate::compaction::rate_limit::MaxInflightTaskLimiter; + + struct CountdownLatch { + counter: std::sync::Mutex, + notifies: std::sync::RwLock>>, + } + + impl CountdownLatch { + fn new(size: usize) -> Self { + Self { + counter: std::sync::Mutex::new(size), + notifies: std::sync::RwLock::new(vec![]), + } + } + + fn countdown(&self) { + let mut counter = self.counter.lock().unwrap(); + if *counter >= 1 { + *counter -= 1; + if *counter == 0 { + let notifies = self.notifies.read().unwrap(); + for waiter in notifies.iter() { + waiter.notify_one(); + } + } + } + } + + async fn wait(&self) { + let notify = Arc::new(Notify::new()); + { + let notify = notify.clone(); + let mut notifies = self.notifies.write().unwrap(); + notifies.push(notify); + } + notify.notified().await + } + } + + #[tokio::test] + async fn test_schedule_handler() { + common_telemetry::init_default_ut_logging(); + let queue = Arc::new(RwLock::new(DedupDeque::default())); + let latch = Arc::new(CountdownLatch::new(2)); + let latch_cloned = latch.clone(); + let picker = MockPicker::new(vec![Arc::new(move || { + latch_cloned.countdown(); + })]); + let handler = Arc::new(CompactionHandler { + req_queue: queue.clone(), + cancel_token: Default::default(), + task_notifier: Arc::new(Default::default()), + limiter: Arc::new(CascadeRateLimiter::new(vec![Box::new( + MaxInflightTaskLimiter::new(3), + )])), + picker, + _phantom_data: Default::default(), + }); + + let handler_cloned = handler.clone(); + common_runtime::spawn_bg(async move { handler_cloned.run().await }); + + queue + .write() + .unwrap() + .push_back(1, CompactionRequestImpl::default()); + handler.task_notifier.notify_one(); + queue + .write() + .unwrap() + .push_back(2, CompactionRequestImpl::default()); + handler.task_notifier.notify_one(); + + tokio::time::timeout(Duration::from_secs(1), latch.wait()) + .await + .unwrap(); + } + + #[derive(Default, Debug)] + struct MockRequest { + table_id: TableId, + } + + impl CompactionRequest for MockRequest { + fn table_id(&self) -> TableId { + self.table_id + } + } + + #[tokio::test] + async fn test_scheduler() { + let latch = Arc::new(CountdownLatch::new(2)); + let latch_cloned = latch.clone(); + + let picker = MockPicker::new(vec![Arc::new(move || latch_cloned.countdown())]); + let scheduler = LocalCompactionScheduler::new( + CompactionSchedulerConfig { + max_inflight_task: 3, + }, + picker, + ); + + scheduler + .schedule(MockRequest { table_id: 1 }) + .await + .unwrap(); + + scheduler + .schedule(MockRequest { table_id: 2 }) + .await + .unwrap(); + + tokio::time::timeout(Duration::from_secs(1), latch.wait()) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_scheduler_many() { + common_telemetry::init_default_ut_logging(); + let task_size = 100; + + let latch = Arc::new(CountdownLatch::new(task_size)); + let latch_clone = latch.clone(); + + let picker = MockPicker::new(vec![Arc::new(move || { + latch_clone.countdown(); + })]); + + let config = CompactionSchedulerConfig { + max_inflight_task: 3, + }; + let scheduler = LocalCompactionScheduler::new(config, picker); + + for i in 0..task_size { + scheduler + .schedule(MockRequest { + table_id: i as TableId, + }) + .await + .unwrap(); + } + + tokio::time::timeout(Duration::from_secs(3), latch.wait()) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_scheduler_interval() { + common_telemetry::init_default_ut_logging(); + let task_size = 100; + let latch = Arc::new(CountdownLatch::new(task_size)); + let latch_clone = latch.clone(); + + let picker = MockPicker::new(vec![Arc::new(move || { + latch_clone.countdown(); + })]); + + let config = CompactionSchedulerConfig { + max_inflight_task: 3, + }; + let scheduler = LocalCompactionScheduler::new(config, picker); + + for i in 0..task_size / 2 { + scheduler + .schedule(MockRequest { + table_id: i as TableId, + }) + .await + .unwrap(); + } + + tokio::time::sleep(Duration::from_millis(100)).await; + for i in task_size / 2..task_size { + scheduler + .schedule(MockRequest { + table_id: i as TableId, + }) + .await + .unwrap(); + } + + tokio::time::timeout(Duration::from_secs(6), latch.wait()) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_schedule_duplicate_tasks() { + common_telemetry::init_default_ut_logging(); + let picker = MockPicker::new(vec![]); + let config = CompactionSchedulerConfig { + max_inflight_task: 3, + }; + let scheduler = LocalCompactionScheduler::new(config, picker); + + let mut scheduled_task = 0; + for _ in 0..10 { + if scheduler + .schedule(MockRequest { table_id: 1 }) + .await + .unwrap() + { + scheduled_task += 1; + } + } + scheduler.stop().await.unwrap(); + debug!("Schedule tasks: {}", scheduled_task); + assert!(scheduled_task < 10); + } +} diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs new file mode 100644 index 0000000000..7428b6f23b --- /dev/null +++ b/src/storage/src/compaction/task.rs @@ -0,0 +1,70 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::Result; +use crate::sst::FileHandle; + +#[async_trait::async_trait] +pub trait CompactionTask: Send + Sync + 'static { + async fn run(&self) -> Result<()>; +} + +#[allow(unused)] +pub(crate) struct CompactionTaskImpl { + inputs: Vec, +} + +#[async_trait::async_trait] +impl CompactionTask for CompactionTaskImpl { + // TODO(hl): Actual SST compaction tasks + async fn run(&self) -> Result<()> { + Ok(()) + } +} + +#[allow(unused)] +pub(crate) struct CompactionInput { + input_level: u8, + output_level: u8, + file: FileHandle, +} + +#[cfg(test)] +pub mod tests { + use std::sync::Arc; + + use super::*; + use crate::compaction::task::CompactionTask; + + pub type CallbackRef = Arc; + pub struct NoopCompactionTask { + pub cbs: Vec, + } + + impl NoopCompactionTask { + pub fn new(cbs: Vec) -> Self { + Self { cbs } + } + } + + #[async_trait::async_trait] + impl CompactionTask for NoopCompactionTask { + async fn run(&self) -> Result<()> { + for cb in &self.cbs { + cb() + } + Ok(()) + } + } +} diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index fbb121eff3..f79602002e 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -23,6 +23,7 @@ use serde_json::error::Error as JsonError; use store_api::manifest::action::ProtocolVersion; use store_api::manifest::ManifestVersion; use store_api::storage::{RegionId, SequenceNumber}; +use tokio::task::JoinError; use crate::metadata::Error as MetadataError; use crate::write_batch; @@ -412,6 +413,15 @@ pub enum Error { #[snafu(display("Failed to decode parquet file time range, msg: {}", msg))] DecodeParquetTimeRange { msg: String, backtrace: Backtrace }, + + #[snafu(display("Compaction rate limited, msg: {}", msg))] + CompactionRateLimited { msg: String, backtrace: Backtrace }, + + #[snafu(display("Failed to stop compaction scheduler, source: {:?}", source))] + StopCompactionScheduler { + source: JoinError, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -481,6 +491,8 @@ impl ErrorExt for Error { ConvertChunk { source, .. } => source.status_code(), MarkWalObsolete { source, .. } => source.status_code(), DecodeParquetTimeRange { .. } => StatusCode::Unexpected, + CompactionRateLimited { .. } => StatusCode::Internal, + StopCompactionScheduler { .. } => StatusCode::Internal, } } diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 2d6d2bc514..cc6c592e47 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -17,6 +17,7 @@ mod background; mod chunk; pub mod codec; +mod compaction; pub mod config; mod engine; pub mod error;