diff --git a/Cargo.lock b/Cargo.lock index 80c932f7e5..e714d0e154 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -183,9 +183,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "1.0.2" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c677ab05e09154296dd37acecd46420c17b9713e8366facafa8fc0885167cf4c" +checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188" dependencies = [ "anstyle", "windows-sys 0.48.0", @@ -3234,6 +3234,19 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "pin-project", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -4794,7 +4807,7 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" dependencies = [ - "spin", + "spin 0.5.2", ] [[package]] @@ -5544,6 +5557,7 @@ dependencies = [ "datafusion", "datafusion-common", "datatypes", + "flume", "futures", "lazy_static", "log-store", @@ -5562,6 +5576,7 @@ dependencies = [ "strum 0.21.0", "table", "tokio", + "tokio-util", "uuid", ] @@ -5752,6 +5767,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + [[package]] name = "new_debug_unreachable" version = "1.0.4" @@ -7764,7 +7788,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -9165,6 +9189,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.5.4" @@ -9879,9 +9912,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "target-lexicon" -version = "0.12.11" +version = "0.12.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d0e916b1148c8e263850e1ebcbd046f333e0683c724876bb0da63ea4373dc8a" +checksum = "1d2faeef5759ab89935255b1a4cd98e0baf99d1085e37d36599c625dac49ae8e" [[package]] name = "temp-env" diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 76233e2057..48aa8ade17 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -33,6 +33,7 @@ dashmap = "5.4" datafusion-common.workspace = true datafusion.workspace = true datatypes = { workspace = true } +flume = "0.10" futures.workspace = true lazy_static = "1.4" log-store = { workspace = true } @@ -50,6 +51,7 @@ storage = { workspace = true } store-api = { workspace = true } strum = "0.21" table = { workspace = true } +tokio-util.workspace = true tokio.workspace = true uuid.workspace = true diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 453197643b..553106a61d 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use common_datasource::compression::CompressionType; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; +use common_runtime::JoinError; use datatypes::arrow::error::ArrowError; use datatypes::prelude::ConcreteDataType; use prost::{DecodeError, EncodeError}; @@ -376,6 +377,18 @@ pub enum Error { source: datatypes::error::Error, location: Location, }, + + #[snafu(display("Invalid flume sender, location: {}", location,))] + InvalidFlumeSender { location: Location }, + + #[snafu(display("Invalid scheduler state location: {}", location,))] + InvalidSchedulerState { location: Location }, + + #[snafu(display("Failed to stop scheduler, source: {}", source))] + StopScheduler { + source: JoinError, + location: Location, + }, } pub type Result = std::result::Result; @@ -435,6 +448,9 @@ impl ErrorExt for Error { PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments, SortValues { .. } => StatusCode::Unexpected, CompactValues { source, .. } => source.status_code(), + InvalidFlumeSender { .. } => StatusCode::InvalidArguments, + InvalidSchedulerState { .. } => StatusCode::InvalidArguments, + StopScheduler { .. } => StatusCode::Internal, } } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 4f5fcc747e..ab058c05b2 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -37,6 +37,8 @@ pub mod request; #[allow(dead_code)] mod row_converter; #[allow(dead_code)] +mod schedule; +#[allow(dead_code)] pub mod sst; pub mod wal; #[allow(dead_code)] diff --git a/src/mito2/src/schedule.rs b/src/mito2/src/schedule.rs new file mode 100644 index 0000000000..c5762d87ba --- /dev/null +++ b/src/mito2/src/schedule.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod scheduler; diff --git a/src/mito2/src/schedule/scheduler.rs b/src/mito2/src/schedule/scheduler.rs new file mode 100644 index 0000000000..e59635b462 --- /dev/null +++ b/src/mito2/src/schedule/scheduler.rs @@ -0,0 +1,265 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::{Arc, RwLock}; + +use common_telemetry::logging; +use snafu::{ensure, OptionExt, ResultExt}; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; + +use crate::error::{ + InvalidFlumeSenderSnafu, InvalidSchedulerStateSnafu, Result, StopSchedulerSnafu, +}; + +pub type Job = Pin + Send>>; + +///The state of scheduler +const STATE_RUNNING: u8 = 0; +const STATE_STOP: u8 = 1; +const STATE_AWAIT_TERMINATION: u8 = 2; + +/// [Scheduler] defines a set of API to schedule Jobs +#[async_trait::async_trait] +pub trait Scheduler { + /// Schedules a Job + fn schedule(&self, req: Job) -> Result<()>; + + /// Stops scheduler. If `await_termination` is set to true, the scheduler will wait until all tasks are processed. + async fn stop(&self, await_termination: bool) -> Result<()>; +} + +/// Request scheduler based on local state. +pub struct LocalScheduler { + /// Sends jobs to flume bounded channel + sender: RwLock>>, + /// Task handles + handles: Mutex>>, + /// Token used to halt the scheduler + cancel_token: CancellationToken, + /// State of scheduler + state: Arc, +} + +impl LocalScheduler { + /// cap: flume bounded cap + /// concurrency: the number of bounded receiver + pub fn new(concurrency: usize) -> Self { + let (tx, rx) = flume::unbounded(); + let token = CancellationToken::new(); + let state = Arc::new(AtomicU8::new(STATE_RUNNING)); + + let mut handles = Vec::with_capacity(concurrency); + + for _ in 0..concurrency { + let child = token.child_token(); + let receiver = rx.clone(); + let state_clone = state.clone(); + let handle = common_runtime::spawn_bg(async move { + while state_clone.load(Ordering::Relaxed) == STATE_RUNNING { + tokio::select! { + _ = child.cancelled() => { + break; + } + req_opt = receiver.recv_async() =>{ + if let Ok(req) = req_opt { + req.await; + } + } + } + } + // When task scheduler is cancelled, we will wait all task finished + if state_clone.load(Ordering::Relaxed) == STATE_AWAIT_TERMINATION { + // recv_async waits until all sender's been dropped. + while let Ok(req) = receiver.recv_async().await { + req.await; + } + state_clone.store(STATE_STOP, Ordering::Relaxed); + } + }); + handles.push(handle); + } + + Self { + sender: RwLock::new(Some(tx)), + cancel_token: token, + handles: Mutex::new(handles), + state, + } + } + + #[inline] + fn running(&self) -> bool { + self.state.load(Ordering::Relaxed) == STATE_RUNNING + } +} + +#[async_trait::async_trait] +impl Scheduler for LocalScheduler { + fn schedule(&self, req: Job) -> Result<()> { + ensure!( + self.state.load(Ordering::Relaxed) == STATE_RUNNING, + InvalidSchedulerStateSnafu + ); + self.sender + .read() + .unwrap() + .as_ref() + .context(InvalidSchedulerStateSnafu)? + .send(req) + .map_err(|_| InvalidFlumeSenderSnafu {}.build()) + } + + async fn stop(&self, await_termination: bool) -> Result<()> { + ensure!( + self.state.load(Ordering::Relaxed) == STATE_RUNNING, + InvalidSchedulerStateSnafu + ); + let state = if await_termination { + STATE_AWAIT_TERMINATION + } else { + STATE_STOP + }; + self.sender.write().unwrap().take(); + self.state.store(state, Ordering::Relaxed); + self.cancel_token.cancel(); + + futures::future::join_all(self.handles.lock().await.drain(..)) + .await + .into_iter() + .collect::, _>>() + .context(StopSchedulerSnafu)?; + + Ok(()) + } +} + +impl Drop for LocalScheduler { + fn drop(&mut self) { + if self.state.load(Ordering::Relaxed) != STATE_STOP { + logging::error!("scheduler must be stopped before dropping, which means the state of scheduler must be STATE_STOP"); + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicI32; + use std::sync::Arc; + + use tokio::sync::Barrier; + use tokio::time::Duration; + + use super::*; + + #[tokio::test] + async fn test_sum_cap() { + let task_size = 1000; + let sum = Arc::new(AtomicI32::new(0)); + let local = LocalScheduler::new(task_size); + + for _ in 0..task_size { + let sum_clone = sum.clone(); + local + .schedule(Box::pin(async move { + sum_clone.fetch_add(1, Ordering::Relaxed); + })) + .unwrap(); + } + local.stop(true).await.unwrap(); + assert_eq!(sum.load(Ordering::Relaxed), 1000); + } + + #[tokio::test] + async fn test_sum_consumer_num() { + let task_size = 1000; + let sum = Arc::new(AtomicI32::new(0)); + let local = LocalScheduler::new(3); + let mut target = 0; + for _ in 0..task_size { + let sum_clone = sum.clone(); + let ok = local + .schedule(Box::pin(async move { + sum_clone.fetch_add(1, Ordering::Relaxed); + })) + .is_ok(); + if ok { + target += 1; + } + } + local.stop(true).await.unwrap(); + assert_eq!(sum.load(Ordering::Relaxed), target); + } + + #[tokio::test] + async fn test_scheduler_many() { + let task_size = 1000; + + let barrier = Arc::new(Barrier::new(task_size + 1)); + let local: LocalScheduler = LocalScheduler::new(task_size); + + for _ in 0..task_size { + let barrier_clone = barrier.clone(); + local + .schedule(Box::pin(async move { + barrier_clone.wait().await; + })) + .unwrap(); + } + barrier.wait().await; + local.stop(true).await.unwrap(); + } + + #[tokio::test] + async fn test_scheduler_continuous_stop() { + let sum = Arc::new(AtomicI32::new(0)); + let local = Arc::new(LocalScheduler::new(1000)); + + let barrier = Arc::new(Barrier::new(2)); + let barrier_clone = barrier.clone(); + let local_stop = local.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(5)).await; + local_stop.stop(false).await.unwrap(); + barrier_clone.wait().await; + }); + + let target = Arc::new(AtomicI32::new(0)); + let local_task = local.clone(); + let target_clone = target.clone(); + let sum_clone = sum.clone(); + tokio::spawn(async move { + loop { + let sum_c = sum_clone.clone(); + let ok = local_task + .schedule(Box::pin(async move { + sum_c.fetch_add(1, Ordering::Relaxed); + })) + .is_ok(); + if ok { + target_clone.fetch_add(1, Ordering::Relaxed); + } else { + break; + } + tokio::task::yield_now().await; + } + }); + barrier.wait().await; + assert_eq!(sum.load(Ordering::Relaxed), target.load(Ordering::Relaxed)); + } +}