mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-15 20:40:39 +00:00
feat: RepeatedTask adds execute-first-wait-later behavior. (#2625)
* feat: RepeatedTask adds execute-first-wait-later behavior. * feat: add inverval generator for repeate task component * feat: impl debug for dyn IntervalGenerator trait * chore: change some words * chore: instead of complicated way, we add an initial_delay to control task interval * chore: some improve by pr comment
This commit is contained in:
@@ -57,7 +57,10 @@ impl GreptimeDBTelemetryTask {
|
||||
task_fn: BoxedTaskFunction<Error>,
|
||||
should_report: Arc<AtomicBool>,
|
||||
) -> Self {
|
||||
GreptimeDBTelemetryTask::Enable((RepeatedTask::new(interval, task_fn), should_report))
|
||||
GreptimeDBTelemetryTask::Enable((
|
||||
RepeatedTask::new(interval, task_fn).with_initial_delay(Some(Duration::ZERO)),
|
||||
should_report,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn disable() -> Self {
|
||||
|
||||
@@ -40,6 +40,7 @@ pub type BoxedTaskFunction<E> = Box<dyn TaskFunction<E> + Send + Sync + 'static>
|
||||
struct TaskInner<E> {
|
||||
/// The repeated task handle. This handle is Some if the task is started.
|
||||
task_handle: Option<JoinHandle<()>>,
|
||||
|
||||
/// The task_fn to run. This is Some if the task is not started.
|
||||
task_fn: Option<BoxedTaskFunction<E>>,
|
||||
}
|
||||
@@ -50,6 +51,7 @@ pub struct RepeatedTask<E> {
|
||||
inner: Mutex<TaskInner<E>>,
|
||||
started: AtomicBool,
|
||||
interval: Duration,
|
||||
initial_delay: Option<Duration>,
|
||||
}
|
||||
|
||||
impl<E> std::fmt::Display for RepeatedTask<E> {
|
||||
@@ -75,6 +77,9 @@ impl<E> Drop for RepeatedTask<E> {
|
||||
}
|
||||
|
||||
impl<E: ErrorExt + 'static> RepeatedTask<E> {
|
||||
/// Creates a new repeated task. The `initial_delay` is the delay before the first execution.
|
||||
/// `initial_delay` default is None, the initial interval uses the `interval`.
|
||||
/// You can use `with_initial_delay` to set the `initial_delay`.
|
||||
pub fn new(interval: Duration, task_fn: BoxedTaskFunction<E>) -> Self {
|
||||
Self {
|
||||
name: task_fn.name().to_string(),
|
||||
@@ -85,9 +90,15 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> {
|
||||
}),
|
||||
started: AtomicBool::new(false),
|
||||
interval,
|
||||
initial_delay: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_initial_delay(mut self, initial_delay: Option<Duration>) -> Self {
|
||||
self.initial_delay = initial_delay;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn started(&self) -> bool {
|
||||
self.started.load(Ordering::Relaxed)
|
||||
}
|
||||
@@ -99,17 +110,21 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> {
|
||||
IllegalStateSnafu { name: &self.name }
|
||||
);
|
||||
|
||||
let interval = self.interval;
|
||||
let child = self.cancel_token.child_token();
|
||||
// Safety: The task is not started.
|
||||
let mut task_fn = inner.task_fn.take().unwrap();
|
||||
let interval = self.interval;
|
||||
let mut initial_delay = self.initial_delay;
|
||||
// TODO(hl): Maybe spawn to a blocking runtime.
|
||||
let handle = runtime.spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(interval) => {}
|
||||
_ = child.cancelled() => {
|
||||
return;
|
||||
let sleep_time = initial_delay.take().unwrap_or(interval);
|
||||
if sleep_time > Duration::ZERO {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(sleep_time) => {}
|
||||
_ = child.cancelled() => {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Err(e) = task_fn.call().await {
|
||||
@@ -192,4 +207,21 @@ mod tests {
|
||||
|
||||
assert_eq!(n.load(Ordering::Relaxed), 5);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_repeated_task_prior_exec() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let n = Arc::new(AtomicI32::new(0));
|
||||
let task_fn = TickTask { n: n.clone() };
|
||||
|
||||
let task = RepeatedTask::new(Duration::from_millis(100), Box::new(task_fn))
|
||||
.with_initial_delay(Some(Duration::ZERO));
|
||||
|
||||
task.start(crate::bg_runtime()).unwrap();
|
||||
tokio::time::sleep(Duration::from_millis(550)).await;
|
||||
task.stop().await.unwrap();
|
||||
|
||||
assert_eq!(n.load(Ordering::Relaxed), 6);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user