test(storage): fix schedule_duplicate_tasks test (#1990)

test: fix schedule_duplicate_tasks test
This commit is contained in:
Yingwen
2023-07-19 12:05:30 +09:00
committed by GitHub
parent 62a41d2280
commit 43bde82e28

View File

@@ -315,6 +315,7 @@ mod tests {
use std::sync::atomic::{AtomicBool, AtomicI32};
use std::time::Duration;
use futures_util::future::BoxFuture;
use store_api::storage::RegionId;
use super::*;
@@ -537,10 +538,49 @@ mod tests {
.unwrap();
}
struct MockAsyncHandler<F> {
cb: F,
}
#[async_trait::async_trait]
impl<F> Handler for MockAsyncHandler<F>
where
F: Fn() -> BoxFuture<'static, ()> + Send + Sync,
{
type Request = MockRequest;
async fn handle_request(
&self,
_req: Self::Request,
token: BoxedRateLimitToken,
finish_notifier: Arc<Notify>,
) -> Result<()> {
let fut = (self.cb)();
fut.await;
token.try_release();
finish_notifier.notify_one();
Ok(())
}
}
#[tokio::test]
async fn test_schedule_duplicate_tasks() {
common_telemetry::init_default_ut_logging();
let handler = MockHandler { cb: || {} };
let (tx, rx) = tokio::sync::watch::channel(false);
let handler = MockAsyncHandler {
cb: move || {
let mut rx = rx.clone();
Box::pin(async move {
// Block the handler so it can't handle more requests.
loop {
rx.changed().await.unwrap();
if *rx.borrow() {
break;
}
}
}) as _ // Casts the Pin<Box<async block>> to Pin<Box<dyn Future>>
},
};
let config = SchedulerConfig {
max_inflight_tasks: 30,
};
@@ -557,6 +597,7 @@ mod tests {
scheduled_task += 1;
}
}
tx.send(true).unwrap();
scheduler.stop(true).await.unwrap();
debug!("Schedule tasks: {}", scheduled_task);
assert!(scheduled_task < 10);