refactor: remove associate type in scheduler to simplify it #2153 (#2194)

* feature: add a simple scheduler using flume

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* fix: only use a sender rather clone many senders

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* fix: use select to avoid loop

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* feat: add parameters in new function to build the flume capacity and number of receivers

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* test: add countdownlatch test concurrency

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* test: add barrier replacing countdownlatch to test concurrency and add wait all tasks finished in stop

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: add some document annotation

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: add license header

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: code format

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: add Cargo.lock

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: Cargo.toml format

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: delete println in test

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: code format

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: code format

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* feat: add error handle

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* fix: fix error handle and add test scheduler stop

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: spelling mistake

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* fix: wait all tasks finished

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: add todo which need wrap Future returned by send_async

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: code format

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* test: remove unnessary sleep in test

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* fix: resolve some conflicts

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* fix: resolve conversation

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: code format

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: code format

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* feat: modify the function of schedule to synchronize and drop sender after stopping scheduler

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

---------

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>
This commit is contained in:
Bamboo1
2023-08-23 14:28:00 +08:00
committed by GitHub
parent af95e46512
commit 4dbc32f532
6 changed files with 339 additions and 6 deletions

45
Cargo.lock generated
View File

@@ -183,9 +183,9 @@ dependencies = [
[[package]]
name = "anstyle-wincon"
version = "1.0.2"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c677ab05e09154296dd37acecd46420c17b9713e8366facafa8fc0885167cf4c"
checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188"
dependencies = [
"anstyle",
"windows-sys 0.48.0",
@@ -3234,6 +3234,19 @@ dependencies = [
"miniz_oxide",
]
[[package]]
name = "flume"
version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"pin-project",
"spin 0.9.8",
]
[[package]]
name = "fnv"
version = "1.0.7"
@@ -4794,7 +4807,7 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
dependencies = [
"spin",
"spin 0.5.2",
]
[[package]]
@@ -5544,6 +5557,7 @@ dependencies = [
"datafusion",
"datafusion-common",
"datatypes",
"flume",
"futures",
"lazy_static",
"log-store",
@@ -5562,6 +5576,7 @@ dependencies = [
"strum 0.21.0",
"table",
"tokio",
"tokio-util",
"uuid",
]
@@ -5752,6 +5767,15 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom",
]
[[package]]
name = "new_debug_unreachable"
version = "1.0.4"
@@ -7764,7 +7788,7 @@ dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"spin 0.5.2",
"untrusted",
"web-sys",
"winapi",
@@ -9165,6 +9189,15 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]]
name = "spki"
version = "0.5.4"
@@ -9879,9 +9912,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "target-lexicon"
version = "0.12.11"
version = "0.12.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d0e916b1148c8e263850e1ebcbd046f333e0683c724876bb0da63ea4373dc8a"
checksum = "1d2faeef5759ab89935255b1a4cd98e0baf99d1085e37d36599c625dac49ae8e"
[[package]]
name = "temp-env"

View File

@@ -33,6 +33,7 @@ dashmap = "5.4"
datafusion-common.workspace = true
datafusion.workspace = true
datatypes = { workspace = true }
flume = "0.10"
futures.workspace = true
lazy_static = "1.4"
log-store = { workspace = true }
@@ -50,6 +51,7 @@ storage = { workspace = true }
store-api = { workspace = true }
strum = "0.21"
table = { workspace = true }
tokio-util.workspace = true
tokio.workspace = true
uuid.workspace = true

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use common_datasource::compression::CompressionType;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_runtime::JoinError;
use datatypes::arrow::error::ArrowError;
use datatypes::prelude::ConcreteDataType;
use prost::{DecodeError, EncodeError};
@@ -376,6 +377,18 @@ pub enum Error {
source: datatypes::error::Error,
location: Location,
},
#[snafu(display("Invalid flume sender, location: {}", location,))]
InvalidFlumeSender { location: Location },
#[snafu(display("Invalid scheduler state location: {}", location,))]
InvalidSchedulerState { location: Location },
#[snafu(display("Failed to stop scheduler, source: {}", source))]
StopScheduler {
source: JoinError,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -435,6 +448,9 @@ impl ErrorExt for Error {
PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments,
SortValues { .. } => StatusCode::Unexpected,
CompactValues { source, .. } => source.status_code(),
InvalidFlumeSender { .. } => StatusCode::InvalidArguments,
InvalidSchedulerState { .. } => StatusCode::InvalidArguments,
StopScheduler { .. } => StatusCode::Internal,
}
}

View File

@@ -37,6 +37,8 @@ pub mod request;
#[allow(dead_code)]
mod row_converter;
#[allow(dead_code)]
mod schedule;
#[allow(dead_code)]
pub mod sst;
pub mod wal;
#[allow(dead_code)]

15
src/mito2/src/schedule.rs Normal file
View File

@@ -0,0 +1,15 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod scheduler;

View File

@@ -0,0 +1,265 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::{Arc, RwLock};
use common_telemetry::logging;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::error::{
InvalidFlumeSenderSnafu, InvalidSchedulerStateSnafu, Result, StopSchedulerSnafu,
};
pub type Job = Pin<Box<dyn Future<Output = ()> + Send>>;
///The state of scheduler
const STATE_RUNNING: u8 = 0;
const STATE_STOP: u8 = 1;
const STATE_AWAIT_TERMINATION: u8 = 2;
/// [Scheduler] defines a set of API to schedule Jobs
#[async_trait::async_trait]
pub trait Scheduler {
/// Schedules a Job
fn schedule(&self, req: Job) -> Result<()>;
/// Stops scheduler. If `await_termination` is set to true, the scheduler will wait until all tasks are processed.
async fn stop(&self, await_termination: bool) -> Result<()>;
}
/// Request scheduler based on local state.
pub struct LocalScheduler {
/// Sends jobs to flume bounded channel
sender: RwLock<Option<flume::Sender<Job>>>,
/// Task handles
handles: Mutex<Vec<JoinHandle<()>>>,
/// Token used to halt the scheduler
cancel_token: CancellationToken,
/// State of scheduler
state: Arc<AtomicU8>,
}
impl LocalScheduler {
/// cap: flume bounded cap
/// concurrency: the number of bounded receiver
pub fn new(concurrency: usize) -> Self {
let (tx, rx) = flume::unbounded();
let token = CancellationToken::new();
let state = Arc::new(AtomicU8::new(STATE_RUNNING));
let mut handles = Vec::with_capacity(concurrency);
for _ in 0..concurrency {
let child = token.child_token();
let receiver = rx.clone();
let state_clone = state.clone();
let handle = common_runtime::spawn_bg(async move {
while state_clone.load(Ordering::Relaxed) == STATE_RUNNING {
tokio::select! {
_ = child.cancelled() => {
break;
}
req_opt = receiver.recv_async() =>{
if let Ok(req) = req_opt {
req.await;
}
}
}
}
// When task scheduler is cancelled, we will wait all task finished
if state_clone.load(Ordering::Relaxed) == STATE_AWAIT_TERMINATION {
// recv_async waits until all sender's been dropped.
while let Ok(req) = receiver.recv_async().await {
req.await;
}
state_clone.store(STATE_STOP, Ordering::Relaxed);
}
});
handles.push(handle);
}
Self {
sender: RwLock::new(Some(tx)),
cancel_token: token,
handles: Mutex::new(handles),
state,
}
}
#[inline]
fn running(&self) -> bool {
self.state.load(Ordering::Relaxed) == STATE_RUNNING
}
}
#[async_trait::async_trait]
impl Scheduler for LocalScheduler {
fn schedule(&self, req: Job) -> Result<()> {
ensure!(
self.state.load(Ordering::Relaxed) == STATE_RUNNING,
InvalidSchedulerStateSnafu
);
self.sender
.read()
.unwrap()
.as_ref()
.context(InvalidSchedulerStateSnafu)?
.send(req)
.map_err(|_| InvalidFlumeSenderSnafu {}.build())
}
async fn stop(&self, await_termination: bool) -> Result<()> {
ensure!(
self.state.load(Ordering::Relaxed) == STATE_RUNNING,
InvalidSchedulerStateSnafu
);
let state = if await_termination {
STATE_AWAIT_TERMINATION
} else {
STATE_STOP
};
self.sender.write().unwrap().take();
self.state.store(state, Ordering::Relaxed);
self.cancel_token.cancel();
futures::future::join_all(self.handles.lock().await.drain(..))
.await
.into_iter()
.collect::<std::result::Result<Vec<_>, _>>()
.context(StopSchedulerSnafu)?;
Ok(())
}
}
impl Drop for LocalScheduler {
fn drop(&mut self) {
if self.state.load(Ordering::Relaxed) != STATE_STOP {
logging::error!("scheduler must be stopped before dropping, which means the state of scheduler must be STATE_STOP");
}
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicI32;
use std::sync::Arc;
use tokio::sync::Barrier;
use tokio::time::Duration;
use super::*;
#[tokio::test]
async fn test_sum_cap() {
let task_size = 1000;
let sum = Arc::new(AtomicI32::new(0));
let local = LocalScheduler::new(task_size);
for _ in 0..task_size {
let sum_clone = sum.clone();
local
.schedule(Box::pin(async move {
sum_clone.fetch_add(1, Ordering::Relaxed);
}))
.unwrap();
}
local.stop(true).await.unwrap();
assert_eq!(sum.load(Ordering::Relaxed), 1000);
}
#[tokio::test]
async fn test_sum_consumer_num() {
let task_size = 1000;
let sum = Arc::new(AtomicI32::new(0));
let local = LocalScheduler::new(3);
let mut target = 0;
for _ in 0..task_size {
let sum_clone = sum.clone();
let ok = local
.schedule(Box::pin(async move {
sum_clone.fetch_add(1, Ordering::Relaxed);
}))
.is_ok();
if ok {
target += 1;
}
}
local.stop(true).await.unwrap();
assert_eq!(sum.load(Ordering::Relaxed), target);
}
#[tokio::test]
async fn test_scheduler_many() {
let task_size = 1000;
let barrier = Arc::new(Barrier::new(task_size + 1));
let local: LocalScheduler = LocalScheduler::new(task_size);
for _ in 0..task_size {
let barrier_clone = barrier.clone();
local
.schedule(Box::pin(async move {
barrier_clone.wait().await;
}))
.unwrap();
}
barrier.wait().await;
local.stop(true).await.unwrap();
}
#[tokio::test]
async fn test_scheduler_continuous_stop() {
let sum = Arc::new(AtomicI32::new(0));
let local = Arc::new(LocalScheduler::new(1000));
let barrier = Arc::new(Barrier::new(2));
let barrier_clone = barrier.clone();
let local_stop = local.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(5)).await;
local_stop.stop(false).await.unwrap();
barrier_clone.wait().await;
});
let target = Arc::new(AtomicI32::new(0));
let local_task = local.clone();
let target_clone = target.clone();
let sum_clone = sum.clone();
tokio::spawn(async move {
loop {
let sum_c = sum_clone.clone();
let ok = local_task
.schedule(Box::pin(async move {
sum_c.fetch_add(1, Ordering::Relaxed);
}))
.is_ok();
if ok {
target_clone.fetch_add(1, Ordering::Relaxed);
} else {
break;
}
tokio::task::yield_now().await;
}
});
barrier.wait().await;
assert_eq!(sum.load(Ordering::Relaxed), target.load(Ordering::Relaxed));
}
}