From 86d56f71ef3e16b58a460f1d77245f8239edcfdd Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Fri, 1 Sep 2023 11:11:26 +0800 Subject: [PATCH] fix: flume bug (#2298) fix: flume --- Cargo.lock | 48 +++++++++-------------------- src/mito2/Cargo.toml | 2 +- src/mito2/src/error.rs | 6 ++-- src/mito2/src/schedule/scheduler.rs | 17 +++++----- 4 files changed, 26 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 40ccd68074..c4ba4cbf1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -517,6 +517,17 @@ dependencies = [ "term", ] +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-compat" version = "0.2.1" @@ -3173,19 +3184,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "flume" -version = "0.10.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" -dependencies = [ - "futures-core", - "futures-sink", - "nanorand", - "pin-project", - "spin 0.9.8", -] - [[package]] name = "fnv" version = "1.0.7" @@ -4701,7 +4699,7 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" dependencies = [ - "spin 0.5.2", + "spin", ] [[package]] @@ -5431,6 +5429,7 @@ dependencies = [ "api", "aquamarine", "arc-swap", + "async-channel", "async-compat", "async-stream", "async-trait", @@ -5452,7 +5451,6 @@ dependencies = [ "datafusion", "datafusion-common", "datatypes", - "flume", "futures", "lazy_static", "log-store", @@ -5662,15 +5660,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "nanorand" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" -dependencies = [ - "getrandom", -] - [[package]] name = "new_debug_unreachable" version = "1.0.4" @@ -7627,7 +7616,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin 0.5.2", + "spin", "untrusted", "web-sys", "winapi", @@ -8957,15 +8946,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" -[[package]] -name = "spin" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" -dependencies = [ - "lock_api", -] - [[package]] name = "spki" version = "0.7.2" diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 0b9f1c8857..38b239f9a9 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -13,6 +13,7 @@ anymap = "1.0.0-beta.2" api.workspace = true aquamarine = "0.3" arc-swap = "1.0" +async-channel = "1.9" async-compat = "0.2" async-stream.workspace = true async-trait = "0.1" @@ -33,7 +34,6 @@ dashmap = "5.4" datafusion-common.workspace = true datafusion.workspace = true datatypes = { workspace = true } -flume = "0.10" futures.workspace = true lazy_static = "1.4" log-store = { workspace = true } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 2b52e20ce2..f84abfc654 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -367,8 +367,8 @@ pub enum Error { location: Location, }, - #[snafu(display("Invalid flume sender, location: {}", location,))] - InvalidFlumeSender { location: Location }, + #[snafu(display("Invalid sender, location: {}", location,))] + InvalidSender { location: Location }, #[snafu(display("Invalid scheduler state, location: {}", location))] InvalidSchedulerState { location: Location }, @@ -452,7 +452,7 @@ impl ErrorExt for Error { ComputeArrow { .. } => StatusCode::Internal, ComputeVector { .. } => StatusCode::Internal, PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments, - InvalidFlumeSender { .. } => StatusCode::InvalidArguments, + InvalidSender { .. } => StatusCode::InvalidArguments, InvalidSchedulerState { .. } => StatusCode::InvalidArguments, StopScheduler { .. } => StatusCode::Internal, BuildPredicate { source, .. } => source.status_code(), diff --git a/src/mito2/src/schedule/scheduler.rs b/src/mito2/src/schedule/scheduler.rs index 27de01c6e0..5b9de002f4 100644 --- a/src/mito2/src/schedule/scheduler.rs +++ b/src/mito2/src/schedule/scheduler.rs @@ -23,9 +23,7 @@ use tokio::sync::Mutex; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; -use crate::error::{ - InvalidFlumeSenderSnafu, InvalidSchedulerStateSnafu, Result, StopSchedulerSnafu, -}; +use crate::error::{InvalidSchedulerStateSnafu, InvalidSenderSnafu, Result, StopSchedulerSnafu}; pub type Job = Pin + Send>>; @@ -49,7 +47,7 @@ pub type SchedulerRef = Arc; /// Request scheduler based on local state. pub struct LocalScheduler { /// Sends jobs to flume bounded channel - sender: RwLock>>, + sender: RwLock>>, /// Task handles handles: Mutex>>, /// Token used to halt the scheduler @@ -63,7 +61,7 @@ impl LocalScheduler { /// /// concurrency: the number of bounded receiver pub fn new(concurrency: usize) -> Self { - let (tx, rx) = flume::unbounded(); + let (tx, rx) = async_channel::unbounded(); let token = CancellationToken::new(); let state = Arc::new(AtomicU8::new(STATE_RUNNING)); @@ -79,7 +77,7 @@ impl LocalScheduler { _ = child.cancelled() => { break; } - req_opt = receiver.recv_async() =>{ + req_opt = receiver.recv() =>{ if let Ok(job) = req_opt { job.await; } @@ -89,7 +87,7 @@ impl LocalScheduler { // When task scheduler is cancelled, we will wait all task finished if state_clone.load(Ordering::Relaxed) == STATE_AWAIT_TERMINATION { // recv_async waits until all sender's been dropped. - while let Ok(job) = receiver.recv_async().await { + while let Ok(job) = receiver.recv().await { job.await; } state_clone.store(STATE_STOP, Ordering::Relaxed); @@ -119,13 +117,14 @@ impl Scheduler for LocalScheduler { self.state.load(Ordering::Relaxed) == STATE_RUNNING, InvalidSchedulerStateSnafu ); + self.sender .read() .unwrap() .as_ref() .context(InvalidSchedulerStateSnafu)? - .send(job) - .map_err(|_| InvalidFlumeSenderSnafu {}.build()) + .try_send(job) + .map_err(|_| InvalidSenderSnafu {}.build()) } /// if await_termination is true, scheduler will wait all tasks finished before stopping