fix: flume bug (#2298)

fix: flume
This commit is contained in:
Lei, HUANG
2023-09-01 11:11:26 +08:00
committed by Ruihang Xia
parent b42d343ae6
commit 86d56f71ef
4 changed files with 26 additions and 47 deletions

48
Cargo.lock generated
View File

@@ -517,6 +517,17 @@ dependencies = [
"term",
]
[[package]]
name = "async-channel"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-compat"
version = "0.2.1"
@@ -3173,19 +3184,6 @@ dependencies = [
"miniz_oxide",
]
[[package]]
name = "flume"
version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"pin-project",
"spin 0.9.8",
]
[[package]]
name = "fnv"
version = "1.0.7"
@@ -4701,7 +4699,7 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
dependencies = [
"spin 0.5.2",
"spin",
]
[[package]]
@@ -5431,6 +5429,7 @@ dependencies = [
"api",
"aquamarine",
"arc-swap",
"async-channel",
"async-compat",
"async-stream",
"async-trait",
@@ -5452,7 +5451,6 @@ dependencies = [
"datafusion",
"datafusion-common",
"datatypes",
"flume",
"futures",
"lazy_static",
"log-store",
@@ -5662,15 +5660,6 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom",
]
[[package]]
name = "new_debug_unreachable"
version = "1.0.4"
@@ -7627,7 +7616,7 @@ dependencies = [
"cc",
"libc",
"once_cell",
"spin 0.5.2",
"spin",
"untrusted",
"web-sys",
"winapi",
@@ -8957,15 +8946,6 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]]
name = "spki"
version = "0.7.2"

View File

@@ -13,6 +13,7 @@ anymap = "1.0.0-beta.2"
api.workspace = true
aquamarine = "0.3"
arc-swap = "1.0"
async-channel = "1.9"
async-compat = "0.2"
async-stream.workspace = true
async-trait = "0.1"
@@ -33,7 +34,6 @@ dashmap = "5.4"
datafusion-common.workspace = true
datafusion.workspace = true
datatypes = { workspace = true }
flume = "0.10"
futures.workspace = true
lazy_static = "1.4"
log-store = { workspace = true }

View File

@@ -367,8 +367,8 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid flume sender, location: {}", location,))]
InvalidFlumeSender { location: Location },
#[snafu(display("Invalid sender, location: {}", location,))]
InvalidSender { location: Location },
#[snafu(display("Invalid scheduler state, location: {}", location))]
InvalidSchedulerState { location: Location },
@@ -452,7 +452,7 @@ impl ErrorExt for Error {
ComputeArrow { .. } => StatusCode::Internal,
ComputeVector { .. } => StatusCode::Internal,
PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments,
InvalidFlumeSender { .. } => StatusCode::InvalidArguments,
InvalidSender { .. } => StatusCode::InvalidArguments,
InvalidSchedulerState { .. } => StatusCode::InvalidArguments,
StopScheduler { .. } => StatusCode::Internal,
BuildPredicate { source, .. } => source.status_code(),

View File

@@ -23,9 +23,7 @@ use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::error::{
InvalidFlumeSenderSnafu, InvalidSchedulerStateSnafu, Result, StopSchedulerSnafu,
};
use crate::error::{InvalidSchedulerStateSnafu, InvalidSenderSnafu, Result, StopSchedulerSnafu};
pub type Job = Pin<Box<dyn Future<Output = ()> + Send>>;
@@ -49,7 +47,7 @@ pub type SchedulerRef = Arc<dyn Scheduler>;
/// Request scheduler based on local state.
pub struct LocalScheduler {
/// Sends jobs to flume bounded channel
sender: RwLock<Option<flume::Sender<Job>>>,
sender: RwLock<Option<async_channel::Sender<Job>>>,
/// Task handles
handles: Mutex<Vec<JoinHandle<()>>>,
/// Token used to halt the scheduler
@@ -63,7 +61,7 @@ impl LocalScheduler {
///
/// concurrency: the number of bounded receiver
pub fn new(concurrency: usize) -> Self {
let (tx, rx) = flume::unbounded();
let (tx, rx) = async_channel::unbounded();
let token = CancellationToken::new();
let state = Arc::new(AtomicU8::new(STATE_RUNNING));
@@ -79,7 +77,7 @@ impl LocalScheduler {
_ = child.cancelled() => {
break;
}
req_opt = receiver.recv_async() =>{
req_opt = receiver.recv() =>{
if let Ok(job) = req_opt {
job.await;
}
@@ -89,7 +87,7 @@ impl LocalScheduler {
// When task scheduler is cancelled, we will wait all task finished
if state_clone.load(Ordering::Relaxed) == STATE_AWAIT_TERMINATION {
// recv_async waits until all sender's been dropped.
while let Ok(job) = receiver.recv_async().await {
while let Ok(job) = receiver.recv().await {
job.await;
}
state_clone.store(STATE_STOP, Ordering::Relaxed);
@@ -119,13 +117,14 @@ impl Scheduler for LocalScheduler {
self.state.load(Ordering::Relaxed) == STATE_RUNNING,
InvalidSchedulerStateSnafu
);
self.sender
.read()
.unwrap()
.as_ref()
.context(InvalidSchedulerStateSnafu)?
.send(job)
.map_err(|_| InvalidFlumeSenderSnafu {}.build())
.try_send(job)
.map_err(|_| InvalidSenderSnafu {}.build())
}
/// if await_termination is true, scheduler will wait all tasks finished before stopping