From a5154cf990a1d7e4fee95ef8c48ff595f0830d31 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Tue, 11 Jun 2024 14:56:30 +0100 Subject: [PATCH] storcon: add util for kicking a set of waiters repeatedly --- storage_controller/src/service.rs | 22 +++++++++++++++++++++- storage_controller/src/tenant_shard.rs | 16 ++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index db3e700309..dcc1dac165 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -15,7 +15,8 @@ use crate::{ reconciler::{ReconcileError, ReconcileUnits}, scheduler::{MaySchedule, ScheduleContext, ScheduleMode}, tenant_shard::{ - MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction, + MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization, + ScheduleOptimizationAction, }, }; use anyhow::Context; @@ -1875,6 +1876,25 @@ impl Service { Ok(()) } + /// Same as [`Service::await_waiters`], but returns the waiters which are still + /// in progress + async fn kick_waiters( + &self, + waiters: Vec, + timeout: Duration, + ) -> Vec { + let deadline = Instant::now().checked_add(timeout).unwrap(); + for waiter in waiters.iter() { + let timeout = deadline.duration_since(Instant::now()); + let _ = waiter.wait_timeout(timeout).await; + } + + waiters + .into_iter() + .filter(|waiter| matches!(waiter.get_status(), ReconcilerStatus::InProgress)) + .collect::>() + } + /// Part of [`Self::tenant_location_config`]: dissect an incoming location config request, /// and transform it into either a tenant creation of a series of shard updates. /// diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 77bbf4c604..da01da4e18 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -311,6 +311,12 @@ pub(crate) struct ReconcilerWaiter { seq: Sequence, } +pub(crate) enum ReconcilerStatus { + Done, + Failed, + InProgress, +} + #[derive(thiserror::Error, Debug)] pub(crate) enum ReconcileWaitError { #[error("Timeout waiting for shard {0}")] @@ -373,6 +379,16 @@ impl ReconcilerWaiter { Ok(()) } + + pub(crate) fn get_status(&self) -> ReconcilerStatus { + if self.seq_wait.would_wait_for(self.seq).is_err() { + ReconcilerStatus::Done + } else if self.error_seq_wait.would_wait_for(self.seq).is_err() { + ReconcilerStatus::Failed + } else { + ReconcilerStatus::InProgress + } + } } /// Having spawned a reconciler task, the tenant shard's state will carry enough