feat: compaction scheduler and rate limiter (#947)

* wip: compaction schdduler

* feat: imple simple compaction scheduler

* fix: typo

* feat: add generic parameter to make scheduler friendly to tests

* chore: add more tests

* fix: CR comments

* fix: CR comments

* fix: ensure idempotency for rate limit token

* fix: Cr ct omments
This commit is contained in:
Lei, HUANG
2023-02-09 11:43:20 +08:00
committed by GitHub
parent 4ce62f850b
commit 1e9918ddf9
12 changed files with 915 additions and 1 deletions

1
Cargo.lock generated
View File

@@ -7075,6 +7075,7 @@ dependencies = [
"table",
"tempdir",
"tokio",
"tokio-util",
"tonic",
"tonic-build",
"uuid",

View File

@@ -70,6 +70,7 @@ serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
sqlparser = "0.28"
tokio = { version = "1.24.2", features = ["full"] }
tokio-util = "0.7"
tonic = "0.8"
uuid = { version = "1", features = ["serde", "v4", "fast-rng"] }

View File

@@ -30,7 +30,7 @@ snafu = { version = "0.7", features = ["backtraces"] }
store-api = { path = "../store-api" }
tempdir = "0.3"
tokio.workspace = true
tokio-util = "0.7"
tokio-util.workspace = true
[dev-dependencies]
rand = "0.8"

View File

@@ -34,6 +34,7 @@ snafu = { version = "0.7", features = ["backtraces"] }
store-api = { path = "../store-api" }
table = { path = "../table" }
tokio.workspace = true
tokio-util.workspace = true
tonic.workspace = true
uuid.workspace = true

View File

@@ -0,0 +1,19 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod dedup_deque;
mod picker;
mod rate_limit;
mod scheduler;
mod task;

View File

@@ -0,0 +1,101 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::fmt::{Debug, Formatter};
use std::hash::Hash;
/// Deque with key deduplication.
#[derive(Default)]
pub struct DedupDeque<K, V> {
deque: VecDeque<K>,
existing: HashMap<K, V>,
}
impl<K: Eq + Hash + Clone, V> DedupDeque<K, V> {
/// Pushes a key value to the back of deque.
/// Returns true if the deque does not already contain value with the same key, otherwise
/// returns false.
pub fn push_back(&mut self, key: K, value: V) -> bool {
debug_assert_eq!(self.deque.len(), self.existing.len());
if let Entry::Vacant(entry) = self.existing.entry(key.clone()) {
entry.insert(value);
self.deque.push_back(key);
return true;
}
false
}
/// Pushes a key value to the front of deque.
/// Returns true if the deque does not already contain value with the same key, otherwise
/// returns false.
pub fn push_front(&mut self, key: K, value: V) -> bool {
if let Entry::Vacant(entry) = self.existing.entry(key.clone()) {
entry.insert(value);
self.deque.push_front(key);
return true;
}
false
}
/// Pops a pair from the back of deque. Returns [None] if the deque is empty.
pub fn pop_front(&mut self) -> Option<(K, V)> {
debug_assert_eq!(self.deque.len(), self.existing.len());
let key = self.deque.pop_front()?;
let value = self.existing.remove(&key)?;
Some((key, value))
}
pub fn len(&self) -> usize {
debug_assert_eq!(self.deque.len(), self.existing.len());
self.deque.len()
}
}
impl<K, V> Debug for DedupDeque<K, V>
where
K: Debug,
V: Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DedupDeque")
.field("deque", &self.deque)
.field("existing", &self.existing)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dedup_deque() {
let mut deque = DedupDeque::default();
assert!(deque.push_back(1, "hello".to_string()));
assert_eq!(1, deque.len());
assert!(deque.push_back(2, "world".to_string()));
assert_eq!(2, deque.len());
assert_eq!((1, "hello".to_string()), deque.pop_front().unwrap());
assert_eq!(1, deque.len());
assert_eq!((2, "world".to_string()), deque.pop_front().unwrap());
assert_eq!(0, deque.len());
// insert duplicated item
assert!(deque.push_back(1, "hello".to_string()));
assert!(!deque.push_back(1, "world".to_string()));
assert_eq!((1, "hello".to_string()), deque.pop_front().unwrap());
}
}

View File

@@ -0,0 +1,67 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::compaction::scheduler::CompactionRequestImpl;
use crate::compaction::task::{CompactionTask, CompactionTaskImpl};
/// Picker picks input SST files and build the compaction task.
/// Different compaction strategy may implement different pickers.
pub trait Picker<R, T: CompactionTask>: Send + 'static {
fn pick(&self, req: &R) -> crate::error::Result<T>;
}
/// L0 -> L1 all-to-all compaction based on time windows.
pub(crate) struct SimplePicker {}
#[allow(unused)]
impl SimplePicker {
pub fn new() -> Self {
Self {}
}
}
impl Picker<CompactionRequestImpl, CompactionTaskImpl> for SimplePicker {
fn pick(&self, _req: &CompactionRequestImpl) -> crate::error::Result<CompactionTaskImpl> {
todo!()
}
}
#[cfg(test)]
pub mod tests {
use std::marker::PhantomData;
use super::*;
use crate::compaction::scheduler::CompactionRequest;
use crate::compaction::task::tests::{CallbackRef, NoopCompactionTask};
pub(crate) struct MockPicker<R: CompactionRequest> {
pub cbs: Vec<CallbackRef>,
_phantom_data: PhantomData<R>,
}
impl<R: CompactionRequest> MockPicker<R> {
pub fn new(cbs: Vec<CallbackRef>) -> Self {
Self {
cbs,
_phantom_data: Default::default(),
}
}
}
impl<R: CompactionRequest> Picker<R, NoopCompactionTask> for MockPicker<R> {
fn pick(&self, _req: &R) -> crate::error::Result<NoopCompactionTask> {
Ok(NoopCompactionTask::new(self.cbs.clone()))
}
}
}

View File

@@ -0,0 +1,186 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use crate::error::{CompactionRateLimitedSnafu, Result};
pub trait RateLimitToken {
/// Releases the token.
/// ### Note
/// Implementation should guarantee the idempotency.
fn try_release(&self);
}
pub type BoxedRateLimitToken = Box<dyn RateLimitToken + Send + Sync>;
impl<T: RateLimitToken + ?Sized> RateLimitToken for Box<T> {
fn try_release(&self) {
(**self).try_release()
}
}
/// Rate limiter
pub trait RateLimiter {
type Request;
/// Acquires a token from rate limiter. Returns `Err` on failure.
fn acquire_token(&self, req: &Self::Request) -> Result<BoxedRateLimitToken>;
}
pub type BoxedRateLimiter<R> = Box<dyn RateLimiter<Request = R> + Send + Sync>;
/// Limits max inflight tasks number.
pub struct MaxInflightTaskLimiter<R> {
max_inflight_task: usize,
inflight_task: Arc<AtomicUsize>,
_phantom_data: PhantomData<R>,
}
#[allow(unused)]
impl<R> MaxInflightTaskLimiter<R> {
pub fn new(max_inflight_task: usize) -> Self {
Self {
max_inflight_task,
inflight_task: Arc::new(AtomicUsize::new(0)),
_phantom_data: Default::default(),
}
}
}
impl<R> RateLimiter for MaxInflightTaskLimiter<R> {
type Request = R;
fn acquire_token(&self, _: &Self::Request) -> Result<BoxedRateLimitToken> {
if self.inflight_task.fetch_add(1, Ordering::Relaxed) >= self.max_inflight_task {
self.inflight_task.fetch_sub(1, Ordering::Relaxed);
return CompactionRateLimitedSnafu {
msg: format!(
"Max inflight task num exceeds, current: {}, max: {}",
self.inflight_task.load(Ordering::Relaxed),
self.max_inflight_task
),
}
.fail();
}
Ok(Box::new(MaxInflightLimiterToken::new(
self.inflight_task.clone(),
)))
}
}
pub struct MaxInflightLimiterToken {
counter: Arc<AtomicUsize>,
released: AtomicBool,
}
impl MaxInflightLimiterToken {
pub fn new(counter: Arc<AtomicUsize>) -> Self {
Self {
counter,
released: AtomicBool::new(false),
}
}
}
impl RateLimitToken for MaxInflightLimiterToken {
fn try_release(&self) {
if self
.released
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
self.counter.fetch_sub(1, Ordering::Relaxed);
}
}
}
/// A composite rate limiter that allows token acquisition only when all internal limiters allow.
pub struct CascadeRateLimiter<T> {
limits: Vec<BoxedRateLimiter<T>>,
}
impl<T> CascadeRateLimiter<T> {
pub fn new(limits: Vec<BoxedRateLimiter<T>>) -> Self {
Self { limits }
}
}
impl<T> RateLimiter for CascadeRateLimiter<T> {
type Request = T;
fn acquire_token(&self, req: &Self::Request) -> Result<BoxedRateLimitToken> {
let mut res = vec![];
for limit in &self.limits {
match limit.acquire_token(req) {
Ok(token) => {
res.push(token);
}
Err(e) => {
res.iter().for_each(RateLimitToken::try_release);
return Err(e);
}
}
}
Ok(Box::new(CompositeToken { tokens: res }))
}
}
/// Composite token that releases all acquired token when released.
pub struct CompositeToken {
tokens: Vec<BoxedRateLimitToken>,
}
impl RateLimitToken for CompositeToken {
fn try_release(&self) {
for token in &self.tokens {
token.try_release();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_max_inflight_limiter() {
let limiter = MaxInflightTaskLimiter::new(3);
let t1 = limiter.acquire_token(&1).unwrap();
assert_eq!(1, limiter.inflight_task.load(Ordering::Relaxed));
let _t2 = limiter.acquire_token(&1).unwrap();
assert_eq!(2, limiter.inflight_task.load(Ordering::Relaxed));
let _t3 = limiter.acquire_token(&1).unwrap();
assert_eq!(3, limiter.inflight_task.load(Ordering::Relaxed));
assert!(limiter.acquire_token(&1).is_err());
t1.try_release();
assert_eq!(2, limiter.inflight_task.load(Ordering::Relaxed));
let _t4 = limiter.acquire_token(&1).unwrap();
}
#[test]
fn test_cascade_limiter() {
let limiter: CascadeRateLimiter<usize> =
CascadeRateLimiter::new(vec![Box::new(MaxInflightTaskLimiter::new(3))]);
let t1 = limiter.acquire_token(&1).unwrap();
let _t2 = limiter.acquire_token(&1).unwrap();
let _t3 = limiter.acquire_token(&1).unwrap();
assert!(limiter.acquire_token(&1).is_err());
t1.try_release();
let _t4 = limiter.acquire_token(&1).unwrap();
}
}

View File

@@ -0,0 +1,455 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::marker::PhantomData;
use std::sync::{Arc, Mutex, RwLock};
use async_trait::async_trait;
use common_telemetry::{debug, info};
use snafu::ResultExt;
use table::metadata::TableId;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::compaction::dedup_deque::DedupDeque;
use crate::compaction::picker::Picker;
use crate::compaction::rate_limit::{
BoxedRateLimitToken, CascadeRateLimiter, MaxInflightTaskLimiter, RateLimitToken, RateLimiter,
};
use crate::compaction::task::CompactionTask;
use crate::error::{Result, StopCompactionSchedulerSnafu};
/// Table compaction request.
#[derive(Default)]
pub struct CompactionRequestImpl {
table_id: TableId,
}
impl CompactionRequest for CompactionRequestImpl {
#[inline]
fn table_id(&self) -> TableId {
self.table_id
}
}
pub trait CompactionRequest: Send + Sync + Default + 'static {
fn table_id(&self) -> TableId;
}
#[derive(Debug)]
pub struct CompactionSchedulerConfig {
max_inflight_task: usize,
}
impl Default for CompactionSchedulerConfig {
fn default() -> Self {
Self {
max_inflight_task: 16,
}
}
}
/// CompactionScheduler defines a set of API to schedule compaction tasks.
#[async_trait]
pub trait CompactionScheduler<R> {
/// Schedules a compaction request.
/// Returns true if request is scheduled. Returns false if task queue already
/// contains the request with same table id.
async fn schedule(&self, request: R) -> Result<bool>;
/// Stops compaction scheduler.
async fn stop(&self) -> Result<()>;
}
/// Compaction task scheduler based on local state.
#[allow(unused)]
pub struct LocalCompactionScheduler<R: CompactionRequest> {
request_queue: Arc<RwLock<DedupDeque<TableId, R>>>,
cancel_token: CancellationToken,
task_notifier: Arc<Notify>,
join_handle: Mutex<Option<JoinHandle<()>>>,
}
#[async_trait]
impl<R> CompactionScheduler<R> for LocalCompactionScheduler<R>
where
R: CompactionRequest + Send + Sync,
{
async fn schedule(&self, request: R) -> Result<bool> {
debug!(
"Schedule request: {}, queue size: {}",
request.table_id(),
self.remaining_requests().await
);
let mut queue = self.request_queue.write().unwrap();
let res = queue.push_back(request.table_id(), request);
self.task_notifier.notify_one();
Ok(res)
}
async fn stop(&self) -> Result<()> {
self.cancel_token.cancel();
let handle = { self.join_handle.lock().unwrap().take() };
if let Some(handle) = handle {
handle.await.context(StopCompactionSchedulerSnafu)?;
}
Ok(())
}
}
#[allow(unused)]
impl<R> LocalCompactionScheduler<R>
where
R: CompactionRequest,
{
pub fn new<P, T>(config: CompactionSchedulerConfig, picker: P) -> Self
where
T: CompactionTask,
P: Picker<R, T> + Send + Sync,
{
let request_queue: Arc<RwLock<DedupDeque<TableId, R>>> =
Arc::new(RwLock::new(DedupDeque::default()));
let cancel_token = CancellationToken::new();
let task_notifier = Arc::new(Notify::new());
let handler = CompactionHandler {
task_notifier: task_notifier.clone(),
req_queue: request_queue.clone(),
cancel_token: cancel_token.child_token(),
limiter: Arc::new(CascadeRateLimiter::new(vec![Box::new(
MaxInflightTaskLimiter::new(config.max_inflight_task),
)])),
picker,
_phantom_data: PhantomData::<T>::default(),
};
let join_handle = common_runtime::spawn_bg(async move {
debug!("Compaction handler loop spawned");
handler.run().await;
});
Self {
join_handle: Mutex::new(Some(join_handle)),
request_queue,
cancel_token,
task_notifier,
}
}
async fn remaining_requests(&self) -> usize {
self.request_queue.read().unwrap().len()
}
}
#[allow(unused)]
struct CompactionHandler<R, T: CompactionTask, P: Picker<R, T>> {
req_queue: Arc<RwLock<DedupDeque<TableId, R>>>,
cancel_token: CancellationToken,
task_notifier: Arc<Notify>,
limiter: Arc<CascadeRateLimiter<R>>,
picker: P,
_phantom_data: PhantomData<T>,
}
#[allow(unused)]
impl<R, T: CompactionTask, P: Picker<R, T>> CompactionHandler<R, T, P> {
/// Runs table compaction requests dispatch loop.
pub async fn run(&self) {
let task_notifier = self.task_notifier.clone();
let limiter = self.limiter.clone();
loop {
tokio::select! {
_ = task_notifier.notified() => {
// poll requests as many as possible until rate limited, and then wait for
// notification (some task's finished).
debug!("Notified, queue size: {:?}", self.req_queue.read().unwrap().len());
while let Some((table_id, req)) = self.poll_task().await {
if let Ok(token) = limiter.acquire_token(&req) {
debug!("Executing compaction request: {}", table_id);
self.handle_compaction_request(req, token).await;
} else {
// compaction rate limited, put back to req queue to wait for next
// schedule
debug!("Put back request {}, queue size: {}", table_id, self.req_queue.read().unwrap().len());
self.put_back_req(table_id, req).await;
break;
}
}
}
_ = self.cancel_token.cancelled() => {
info!("Compaction tasks scheduler stopped.");
return;
}
}
}
}
#[inline]
async fn poll_task(&self) -> Option<(TableId, R)> {
let mut queue = self.req_queue.write().unwrap();
queue.pop_front()
}
/// Puts request back to the front of request queue.
#[inline]
async fn put_back_req(&self, table_id: TableId, req: R) {
let mut queue = self.req_queue.write().unwrap();
queue.push_front(table_id, req);
}
// Handles compaction request, submit task to bg runtime.
async fn handle_compaction_request(
&self,
mut req: R,
token: BoxedRateLimitToken,
) -> Result<()> {
let cloned_notify = self.task_notifier.clone();
let task = self.build_compaction_task(req).await?;
// TODO(hl): we need to keep a track of task handle here to allow task cancellation.
common_runtime::spawn_bg(async move {
task.run().await; // TODO(hl): handle errors
// releases rate limit token
token.try_release();
// notify scheduler to schedule next task when current task finishes.
cloned_notify.notify_one();
});
Ok(())
}
// TODO(hl): generate compaction task(find SSTs to compact along with the output of compaction)
async fn build_compaction_task(&self, req: R) -> crate::error::Result<T> {
self.picker.pick(&req)
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use crate::compaction::picker::tests::MockPicker;
use crate::compaction::rate_limit::MaxInflightTaskLimiter;
struct CountdownLatch {
counter: std::sync::Mutex<usize>,
notifies: std::sync::RwLock<Vec<Arc<Notify>>>,
}
impl CountdownLatch {
fn new(size: usize) -> Self {
Self {
counter: std::sync::Mutex::new(size),
notifies: std::sync::RwLock::new(vec![]),
}
}
fn countdown(&self) {
let mut counter = self.counter.lock().unwrap();
if *counter >= 1 {
*counter -= 1;
if *counter == 0 {
let notifies = self.notifies.read().unwrap();
for waiter in notifies.iter() {
waiter.notify_one();
}
}
}
}
async fn wait(&self) {
let notify = Arc::new(Notify::new());
{
let notify = notify.clone();
let mut notifies = self.notifies.write().unwrap();
notifies.push(notify);
}
notify.notified().await
}
}
#[tokio::test]
async fn test_schedule_handler() {
common_telemetry::init_default_ut_logging();
let queue = Arc::new(RwLock::new(DedupDeque::default()));
let latch = Arc::new(CountdownLatch::new(2));
let latch_cloned = latch.clone();
let picker = MockPicker::new(vec![Arc::new(move || {
latch_cloned.countdown();
})]);
let handler = Arc::new(CompactionHandler {
req_queue: queue.clone(),
cancel_token: Default::default(),
task_notifier: Arc::new(Default::default()),
limiter: Arc::new(CascadeRateLimiter::new(vec![Box::new(
MaxInflightTaskLimiter::new(3),
)])),
picker,
_phantom_data: Default::default(),
});
let handler_cloned = handler.clone();
common_runtime::spawn_bg(async move { handler_cloned.run().await });
queue
.write()
.unwrap()
.push_back(1, CompactionRequestImpl::default());
handler.task_notifier.notify_one();
queue
.write()
.unwrap()
.push_back(2, CompactionRequestImpl::default());
handler.task_notifier.notify_one();
tokio::time::timeout(Duration::from_secs(1), latch.wait())
.await
.unwrap();
}
#[derive(Default, Debug)]
struct MockRequest {
table_id: TableId,
}
impl CompactionRequest for MockRequest {
fn table_id(&self) -> TableId {
self.table_id
}
}
#[tokio::test]
async fn test_scheduler() {
let latch = Arc::new(CountdownLatch::new(2));
let latch_cloned = latch.clone();
let picker = MockPicker::new(vec![Arc::new(move || latch_cloned.countdown())]);
let scheduler = LocalCompactionScheduler::new(
CompactionSchedulerConfig {
max_inflight_task: 3,
},
picker,
);
scheduler
.schedule(MockRequest { table_id: 1 })
.await
.unwrap();
scheduler
.schedule(MockRequest { table_id: 2 })
.await
.unwrap();
tokio::time::timeout(Duration::from_secs(1), latch.wait())
.await
.unwrap();
}
#[tokio::test]
async fn test_scheduler_many() {
common_telemetry::init_default_ut_logging();
let task_size = 100;
let latch = Arc::new(CountdownLatch::new(task_size));
let latch_clone = latch.clone();
let picker = MockPicker::new(vec![Arc::new(move || {
latch_clone.countdown();
})]);
let config = CompactionSchedulerConfig {
max_inflight_task: 3,
};
let scheduler = LocalCompactionScheduler::new(config, picker);
for i in 0..task_size {
scheduler
.schedule(MockRequest {
table_id: i as TableId,
})
.await
.unwrap();
}
tokio::time::timeout(Duration::from_secs(3), latch.wait())
.await
.unwrap();
}
#[tokio::test]
async fn test_scheduler_interval() {
common_telemetry::init_default_ut_logging();
let task_size = 100;
let latch = Arc::new(CountdownLatch::new(task_size));
let latch_clone = latch.clone();
let picker = MockPicker::new(vec![Arc::new(move || {
latch_clone.countdown();
})]);
let config = CompactionSchedulerConfig {
max_inflight_task: 3,
};
let scheduler = LocalCompactionScheduler::new(config, picker);
for i in 0..task_size / 2 {
scheduler
.schedule(MockRequest {
table_id: i as TableId,
})
.await
.unwrap();
}
tokio::time::sleep(Duration::from_millis(100)).await;
for i in task_size / 2..task_size {
scheduler
.schedule(MockRequest {
table_id: i as TableId,
})
.await
.unwrap();
}
tokio::time::timeout(Duration::from_secs(6), latch.wait())
.await
.unwrap();
}
#[tokio::test]
async fn test_schedule_duplicate_tasks() {
common_telemetry::init_default_ut_logging();
let picker = MockPicker::new(vec![]);
let config = CompactionSchedulerConfig {
max_inflight_task: 3,
};
let scheduler = LocalCompactionScheduler::new(config, picker);
let mut scheduled_task = 0;
for _ in 0..10 {
if scheduler
.schedule(MockRequest { table_id: 1 })
.await
.unwrap()
{
scheduled_task += 1;
}
}
scheduler.stop().await.unwrap();
debug!("Schedule tasks: {}", scheduled_task);
assert!(scheduled_task < 10);
}
}

View File

@@ -0,0 +1,70 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::Result;
use crate::sst::FileHandle;
#[async_trait::async_trait]
pub trait CompactionTask: Send + Sync + 'static {
async fn run(&self) -> Result<()>;
}
#[allow(unused)]
pub(crate) struct CompactionTaskImpl {
inputs: Vec<CompactionInput>,
}
#[async_trait::async_trait]
impl CompactionTask for CompactionTaskImpl {
// TODO(hl): Actual SST compaction tasks
async fn run(&self) -> Result<()> {
Ok(())
}
}
#[allow(unused)]
pub(crate) struct CompactionInput {
input_level: u8,
output_level: u8,
file: FileHandle,
}
#[cfg(test)]
pub mod tests {
use std::sync::Arc;
use super::*;
use crate::compaction::task::CompactionTask;
pub type CallbackRef = Arc<dyn Fn() + Send + Sync>;
pub struct NoopCompactionTask {
pub cbs: Vec<CallbackRef>,
}
impl NoopCompactionTask {
pub fn new(cbs: Vec<CallbackRef>) -> Self {
Self { cbs }
}
}
#[async_trait::async_trait]
impl CompactionTask for NoopCompactionTask {
async fn run(&self) -> Result<()> {
for cb in &self.cbs {
cb()
}
Ok(())
}
}
}

View File

@@ -23,6 +23,7 @@ use serde_json::error::Error as JsonError;
use store_api::manifest::action::ProtocolVersion;
use store_api::manifest::ManifestVersion;
use store_api::storage::{RegionId, SequenceNumber};
use tokio::task::JoinError;
use crate::metadata::Error as MetadataError;
use crate::write_batch;
@@ -412,6 +413,15 @@ pub enum Error {
#[snafu(display("Failed to decode parquet file time range, msg: {}", msg))]
DecodeParquetTimeRange { msg: String, backtrace: Backtrace },
#[snafu(display("Compaction rate limited, msg: {}", msg))]
CompactionRateLimited { msg: String, backtrace: Backtrace },
#[snafu(display("Failed to stop compaction scheduler, source: {:?}", source))]
StopCompactionScheduler {
source: JoinError,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -481,6 +491,8 @@ impl ErrorExt for Error {
ConvertChunk { source, .. } => source.status_code(),
MarkWalObsolete { source, .. } => source.status_code(),
DecodeParquetTimeRange { .. } => StatusCode::Unexpected,
CompactionRateLimited { .. } => StatusCode::Internal,
StopCompactionScheduler { .. } => StatusCode::Internal,
}
}

View File

@@ -17,6 +17,7 @@
mod background;
mod chunk;
pub mod codec;
mod compaction;
pub mod config;
mod engine;
pub mod error;