diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index d25fe56b92..758dd54ec1 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -1,11 +1,12 @@ pub mod heatmap; mod heatmap_uploader; +mod scheduler; use std::sync::Arc; use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; -use self::heatmap_uploader::heatmap_uploader_task; +use self::{heatmap_uploader::heatmap_uploader_task, scheduler::TenantScoped}; use super::mgr::TenantManager; @@ -19,6 +20,14 @@ enum UploadCommand { Upload(TenantShardId), } +impl TenantScoped for UploadCommand { + fn get_tenant_shard_id(&self) -> &TenantShardId { + match self { + Self::Upload(id) => &id, + } + } +} + struct CommandRequest { payload: T, response_tx: tokio::sync::oneshot::Sender, @@ -89,7 +98,9 @@ pub fn spawn_tasks( background_jobs_can_start, cancel, ) - .await + .await; + + Ok(()) }, ); diff --git a/pageserver/src/tenant/secondary/heatmap_uploader.rs b/pageserver/src/tenant/secondary/heatmap_uploader.rs index ece2b93ce1..fc32d315dd 100644 --- a/pageserver/src/tenant/secondary/heatmap_uploader.rs +++ b/pageserver/src/tenant/secondary/heatmap_uploader.rs @@ -1,14 +1,14 @@ use std::{ collections::HashMap, sync::{Arc, Weak}, - time::{Duration, Instant}, + time::Instant, }; use crate::{ metrics::SECONDARY_MODE, tenant::{ config::AttachmentMode, mgr::TenantManager, remote_timeline_client::remote_heatmap_path, - secondary::CommandResponse, span::debug_assert_current_span_has_tenant_id, Tenant, + span::debug_assert_current_span_has_tenant_id, Tenant, }, }; @@ -16,28 +16,41 @@ use md5; use pageserver_api::shard::TenantShardId; use remote_storage::GenericRemoteStorage; +use super::{ + scheduler::{ + yielding_loop, HasBarrier, JobGenerator, SchedulingResult, TenantBackgroundJobs, + TenantScoped, + }, + CommandRequest, +}; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::instrument; use utils::{backoff, completion::Barrier}; -use super::{heatmap::HeatMapTenant, CommandRequest, UploadCommand}; - -/// Period between heatmap uploader walking Tenants to look for work to do. -/// If any tenants have a heatmap upload period lower than this, it will be adjusted -/// downward to match. -const DEFAULT_SCHEDULING_INTERVAL: Duration = Duration::from_millis(60000); -const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_millis(1000); +use super::{heatmap::HeatMapTenant, UploadCommand}; struct WriteInProgress { barrier: Barrier, } +impl HasBarrier for WriteInProgress { + fn get_barrier(&self) -> Barrier { + self.barrier.clone() + } +} + struct UploadPending { tenant: Arc, last_digest: Option, } +impl TenantScoped for UploadPending { + fn get_tenant_shard_id(&self) -> &TenantShardId { + self.tenant.get_tenant_shard_id() + } +} + struct WriteComplete { tenant_shard_id: TenantShardId, completed_at: Instant, @@ -45,6 +58,18 @@ struct WriteComplete { next_upload: Option, } +impl TenantScoped for WriteComplete { + fn get_tenant_shard_id(&self) -> &TenantShardId { + &self.tenant_shard_id + } +} + +impl TenantScoped for Tenant { + fn get_tenant_shard_id(&self) -> &TenantShardId { + self.get_tenant_shard_id() + } +} + /// The heatmap uploader keeps a little bit of per-tenant state, mainly to remember /// when we last did a write. We only populate this after doing at least one /// write for a tenant -- this avoids holding state for tenants that have @@ -77,258 +102,124 @@ struct HeatmapUploader { cancel: CancellationToken, tenants: HashMap, - - /// Tenants with work to do, for which tasks should be spawned as soon as concurrency - /// limits permit it. - tenants_pending: std::collections::VecDeque, - - /// Tenants for which a task in `tasks` has been spawned. - tenants_uploading: HashMap, - - tasks: JoinSet<()>, - - /// Channel for our child tasks to send results to: we use a channel for results rather than - /// just getting task results via JoinSet because we need the channel's recv() "sleep until something - /// is available" semantic, rather than JoinSet::join_next()'s "sleep until next thing is available _or_ I'm empty" - /// behavior. - task_result_tx: tokio::sync::mpsc::UnboundedSender, - task_result_rx: tokio::sync::mpsc::UnboundedReceiver, - - concurrent_uploads: usize, - - scheduling_interval: Duration, } -/// The uploader task runs a loop that periodically wakes up and schedules tasks for -/// tenants that require an upload, or handles any commands that have been sent into -/// `command_queue`. No I/O is done in this loop: that all happens in the tasks we -/// spawn. -/// -/// Scheduling iterations are somewhat infrequent. However, each one will enqueue -/// all tenants that require an upload, and in between scheduling iterations we will -/// continue to spawn new tasks for pending tenants, as our concurrency limit permits. -/// -/// While we take a CancellationToken here, it is subordinate to the CancellationTokens -/// of tenants: i.e. we expect all Tenants to have been shut down before we are shut down, otherwise -/// we might block waiting on a Tenant. pub(super) async fn heatmap_uploader_task( tenant_manager: Arc, remote_storage: GenericRemoteStorage, - mut command_queue: tokio::sync::mpsc::Receiver>, + command_queue: tokio::sync::mpsc::Receiver>, background_jobs_can_start: Barrier, cancel: CancellationToken, -) -> anyhow::Result<()> { - let concurrent_uploads = tenant_manager.get_conf().heatmap_upload_concurrency; +) { + let concurrency = tenant_manager.get_conf().heatmap_upload_concurrency; - let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel(); - - let mut uploader = HeatmapUploader { + let generator = HeatmapUploader { tenant_manager, remote_storage, cancel: cancel.clone(), - tasks: JoinSet::new(), tenants: HashMap::new(), - tenants_pending: std::collections::VecDeque::new(), - tenants_uploading: HashMap::new(), - task_result_tx: result_tx, - task_result_rx: result_rx, - concurrent_uploads, - scheduling_interval: DEFAULT_SCHEDULING_INTERVAL, }; + let mut scheduler = Scheduler::new(generator, concurrency); - tracing::info!("Waiting for background_jobs_can start..."); - background_jobs_can_start.wait().await; - tracing::info!("background_jobs_can is ready, proceeding."); - - while !cancel.is_cancelled() { - // Look for new work: this is relatively expensive because we have to go acquire the lock on - // the tenant manager to retrieve tenants, and then iterate over them to figure out which ones - // require an upload. - uploader.schedule_iteration().await?; - - // Between scheduling iterations, we will: - // - Drain any complete tasks and spawn pending tasks - // - Handle incoming administrative commands - // - Check our cancellation token - let next_scheduling_iteration = Instant::now() - .checked_add(uploader.scheduling_interval) - .unwrap_or_else(|| { - tracing::warn!( - "Scheduling interval invalid ({}s), running immediately!", - uploader.scheduling_interval.as_secs_f64() - ); - Instant::now() - }); - loop { - tokio::select! { - _ = cancel.cancelled() => { - // We do not simply drop the JoinSet, in order to have an orderly shutdown without cancellation. - tracing::info!("Heatmap uploader joining tasks"); - while let Some(_r) = uploader.tasks.join_next().await {}; - tracing::info!("Heatmap uploader terminating"); - - break; - }, - _ = tokio::time::sleep(next_scheduling_iteration.duration_since(Instant::now())) => { - tracing::debug!("heatmap_uploader_task: woke for scheduling interval"); - break;}, - cmd = command_queue.recv() => { - tracing::debug!("heatmap_uploader_task: woke for command queue"); - let cmd = match cmd { - Some(c) =>c, - None => { - // SecondaryController was destroyed, and this has raced with - // our CancellationToken - tracing::info!("Heatmap uploader terminating"); - cancel.cancel(); - break; - } - }; - - let CommandRequest{ - response_tx, - payload - } = cmd; - uploader.handle_command(payload, response_tx); - }, - _ = uploader.process_next_completion() => { - if !cancel.is_cancelled() { - uploader.spawn_pending(); - } - } - } - } - } - - Ok(()) + scheduler + .run(command_queue, background_jobs_can_start, cancel) + .instrument(info_span!("heatmap_uploader")) + .await } -impl HeatmapUploader { - /// Periodic execution phase: inspect all attached tenants and schedule any work they require. - async fn schedule_iteration(&mut self) -> anyhow::Result<()> { +type Scheduler = TenantBackgroundJobs< + HeatmapUploader, + UploadPending, + WriteInProgress, + WriteComplete, + UploadCommand, +>; + +#[async_trait::async_trait] +impl JobGenerator + for HeatmapUploader +{ + async fn schedule(&mut self) -> SchedulingResult { // Cull any entries in self.tenants whose Arc is gone self.tenants .retain(|_k, v| v.tenant.upgrade().is_some() && v.next_upload.is_some()); - // The priority order of previously scheduled work may be invalidated by current state: drop - // all pending work (it will be re-scheduled if still needed) - self.tenants_pending.clear(); - - // Used a fixed 'now' through the following loop, for efficiency and fairness. let now = Instant::now(); - // While iterating over the potentially-long list of tenants, we will periodically yield - // to avoid blocking executor. - const YIELD_ITERATIONS: usize = 1000; + let mut result = SchedulingResult { + jobs: Vec::new(), + want_interval: None, + }; - // Iterate over tenants looking for work to do. let tenants = self.tenant_manager.get_attached_active_tenant_shards(); - for (i, tenant) in tenants.into_iter().enumerate() { - // Process is shutting down, drop out - if self.cancel.is_cancelled() { - return Ok(()); - } - // Skip tenants that already have a write in flight - if self - .tenants_uploading - .contains_key(tenant.get_tenant_shard_id()) - { - continue; - } - - self.maybe_schedule_upload(&now, tenant); - - if i + 1 % YIELD_ITERATIONS == 0 { - tokio::task::yield_now().await; - } - } - - // Spawn tasks for as many of our pending tenants as we can. - self.spawn_pending(); - - Ok(()) - } - - /// - /// Cancellation: this method is cancel-safe. - async fn process_next_completion(&mut self) { - match self.task_result_rx.recv().await { - Some(r) => { - self.on_completion(r); - } - None => { - unreachable!("Result sender is stored on Self"); - } - } - } - - /// The 'maybe' refers to the tenant's state: whether it is configured - /// for heatmap uploads at all, and whether sufficient time has passed - /// since the last upload. - fn maybe_schedule_upload(&mut self, now: &Instant, tenant: Arc) { - match tenant.get_heatmap_period() { - None => { - // Heatmaps are disabled for this tenant - return; - } - Some(period) => { - // If any tenant has asked for uploads more frequent than our scheduling interval, - // reduce it to match so that we can keep up. This is mainly useful in testing, where - // we may set rather short intervals. - if period < self.scheduling_interval { - self.scheduling_interval = std::cmp::max(period, MIN_SCHEDULING_INTERVAL); + yielding_loop(1000, &self.cancel, tenants.into_iter(), |tenant| { + match tenant.get_heatmap_period() { + None => { + // Heatmaps are disabled for this tenant + return; + } + Some(period) => { + // If any tenant has asked for uploads more frequent than our scheduling interval, + // reduce it to match so that we can keep up. This is mainly useful in testing, where + // we may set rather short intervals. + result.want_interval = match result.want_interval { + None => Some(period), + Some(existing) => Some(std::cmp::min(period, existing)), + } } } - } - // Stale attachments do not upload anything: if we are in this state, there is probably some - // other attachment in mode Single or Multi running on another pageserver, and we don't - // want to thrash and overwrite their heatmap uploads. - if tenant.get_attach_mode() == AttachmentMode::Stale { - return; - } + // Stale attachments do not upload anything: if we are in this state, there is probably some + // other attachment in mode Single or Multi running on another pageserver, and we don't + // want to thrash and overwrite their heatmap uploads. + if tenant.get_attach_mode() == AttachmentMode::Stale { + return; + } - // Create an entry in self.tenants if one doesn't already exist: this will later be updated - // with the completion time in on_completion. - let state = self - .tenants - .entry(*tenant.get_tenant_shard_id()) - .or_insert_with(|| UploaderTenantState { - tenant: Arc::downgrade(&tenant), - last_upload: None, - next_upload: Some(Instant::now()), - last_digest: None, + // Create an entry in self.tenants if one doesn't already exist: this will later be updated + // with the completion time in on_completion. + let state = self + .tenants + .entry(*tenant.get_tenant_shard_id()) + .or_insert_with(|| UploaderTenantState { + tenant: Arc::downgrade(&tenant), + last_upload: None, + next_upload: Some(Instant::now()), + last_digest: None, + }); + + // Decline to do the upload if insufficient time has passed + if state.next_upload.map(|nu| nu > now).unwrap_or(false) { + return; + } + + let last_digest = state.last_digest; + result.jobs.push(UploadPending { + tenant, + last_digest, }); + }) + .await + .ok(); - // Decline to do the upload if insufficient time has passed - if state.next_upload.map(|nu| &nu > now).unwrap_or(false) { - return; - } + result + } - let last_digest = state.last_digest; - self.tenants_pending.push_back(UploadPending { + fn spawn( + &mut self, + join_set: &mut JoinSet<()>, + result_tx: tokio::sync::mpsc::UnboundedSender, + job: UploadPending, + ) -> WriteInProgress { + let UploadPending { tenant, last_digest, - }) - } + } = job; - fn spawn_pending(&mut self) { - while !self.tenants_pending.is_empty() - && self.tenants_uploading.len() < self.concurrent_uploads - { - // unwrap: loop condition includes !is_empty() - let pending = self.tenants_pending.pop_front().unwrap(); - self.spawn_upload(pending.tenant, pending.last_digest); - } - } - - fn spawn_upload(&mut self, tenant: Arc, last_digest: Option) { let remote_storage = self.remote_storage.clone(); - let tenant_shard_id = *tenant.get_tenant_shard_id(); let (completion, barrier) = utils::completion::channel(); - let result_tx = self.task_result_tx.clone(); - self.tasks.spawn(async move { + join_set.spawn(async move { // Guard for the barrier in [`WriteInProgress`] let _completion = completion; @@ -375,9 +266,25 @@ impl HeatmapUploader { }) .ok(); }); + WriteInProgress { barrier } + } - self.tenants_uploading - .insert(tenant_shard_id, WriteInProgress { barrier }); + fn on_command(&mut self, command: UploadCommand) -> anyhow::Result { + let tenant_shard_id = command.get_tenant_shard_id(); + + tracing::info!( + tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), + "Starting heatmap write on command"); + let tenant = self + .tenant_manager + .get_attached_tenant_shard(*tenant_shard_id, true) + .map_err(|e| anyhow::anyhow!(e))?; + + Ok(UploadPending { + // Ignore our state for last digest: this forces an upload even if nothing has changed + last_digest: None, + tenant, + }) } #[instrument(skip_all, fields(tenant_id=%completion.tenant_shard_id.tenant_id, shard_id=%completion.tenant_shard_id.shard_slug()))] @@ -389,7 +296,6 @@ impl HeatmapUploader { digest, next_upload, } = completion; - self.tenants_uploading.remove(&tenant_shard_id); use std::collections::hash_map::Entry; match self.tenants.entry(tenant_shard_id) { Entry::Vacant(_) => { @@ -402,69 +308,6 @@ impl HeatmapUploader { } } } - - fn handle_command( - &mut self, - command: UploadCommand, - response_tx: tokio::sync::oneshot::Sender, - ) { - match command { - UploadCommand::Upload(tenant_shard_id) => { - // If an upload was ongoing for this tenant, let it finish first. - let barrier = if let Some(writing_state) = - self.tenants_uploading.get(&tenant_shard_id) - { - tracing::info!( - tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), - "Waiting for heatmap write to complete"); - writing_state.barrier.clone() - } else { - // Spawn the upload then immediately wait for it. This will block processing of other commands and - // starting of other background work. - tracing::info!( - tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), - "Starting heatmap write on command"); - let tenant = match self - .tenant_manager - .get_attached_tenant_shard(tenant_shard_id, true) - { - Ok(t) => t, - Err(e) => { - // Drop result of send: we don't care if caller dropped their receiver - drop(response_tx.send(CommandResponse { - result: Err(e.into()), - })); - return; - } - }; - self.spawn_upload(tenant, None); - let writing_state = self - .tenants_uploading - .get(&tenant_shard_id) - .expect("We just inserted this"); - tracing::info!( - tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), - "Waiting for heatmap upload to complete"); - - writing_state.barrier.clone() - }; - - // This task does no I/O: it only listens for a barrier's completion and then - // sends to the command response channel. It is therefore safe to spawn this without - // any gates/task_mgr hooks. - tokio::task::spawn(async move { - barrier.wait().await; - - tracing::info!( - tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), - "Heatmap upload complete"); - - // Drop result of send: we don't care if caller dropped their receiver - drop(response_tx.send(CommandResponse { result: Ok(()) })) - }); - } - } - } } enum UploadHeatmapOutcome { diff --git a/pageserver/src/tenant/secondary/scheduler.rs b/pageserver/src/tenant/secondary/scheduler.rs new file mode 100644 index 0000000000..16c4e66c3b --- /dev/null +++ b/pageserver/src/tenant/secondary/scheduler.rs @@ -0,0 +1,378 @@ +use async_trait; +use std::{ + collections::HashMap, + marker::PhantomData, + time::{Duration, Instant}, +}; + +use pageserver_api::shard::TenantShardId; +use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; +use utils::completion::Barrier; + +use super::{CommandRequest, CommandResponse}; + +/// Scheduling interval is the time between calls to JobGenerator::schedule. +/// When we schedule jobs, the job generator may provide a hint of its preferred +/// interval, which we will respect within these intervals. +const MAX_SCHEDULING_INTERVAL: Duration = Duration::from_secs(10); +const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_secs(1); + +#[derive(thiserror::Error, Debug)] +pub(super) enum YieldingLoopError { + #[error("Cancelled")] + Cancelled, +} + +/// Helper for long synchronous loops, e.g. over all tenants in the system. Periodically +/// yields to avoid blocking the executor, and after resuming checks the provided +/// cancellation token to drop out promptly on shutdown. +pub(super) async fn yielding_loop( + interval: usize, + cancel: &CancellationToken, + iter: I, + mut visitor: F, +) -> Result<(), YieldingLoopError> +where + I: Iterator, + F: FnMut(T), +{ + for (i, item) in iter.enumerate() { + visitor(item); + + if i + 1 % interval == 0 { + tokio::task::yield_now().await; + if cancel.is_cancelled() { + return Err(YieldingLoopError::Cancelled); + } + } + } + + Ok(()) +} + +/// Scheduling helper for background work across many tenants. +/// +/// PE: a 'PEnding' type for job descriptors that are ready to run +/// PR: a 'Running' type for jobs that have been spawned +/// C : a 'Completion' type that spawned jobs will send when they finish +pub(super) struct TenantBackgroundJobs +where + C: TenantScoped, + PE: TenantScoped, + PR: HasBarrier, + G: JobGenerator, +{ + generator: G, + + /// Ready to run. Will progress to `running` once concurrent limit is satisfied, or + /// be removed on next scheduling pass. + pending: std::collections::VecDeque, + + /// Tasks currently running in Self::tasks for these tenants. Check this map + /// before pushing more work into pending for the same tenant. + running: HashMap, + + tasks: JoinSet<()>, + + /// Channel for our child tasks to send results to: we use a channel for results rather than + /// just getting task results via JoinSet because we need the channel's recv() "sleep until something + /// is available" semantic, rather than JoinSet::join_next()'s "sleep until next thing is available _or_ I'm empty" + /// behavior. + task_result_tx: tokio::sync::mpsc::UnboundedSender, + task_result_rx: tokio::sync::mpsc::UnboundedReceiver, + + concurrency: usize, + + /// How often we would like schedule_interval to be called. + pub(super) scheduling_interval: Duration, + + _phantom: PhantomData<(PE, PR, C, CMD)>, +} + +/// For types that logically belong to a particular tenant shard, and can +/// provide its ID on demand. +pub(super) trait TenantScoped { + fn get_tenant_shard_id(&self) -> &TenantShardId; +} + +/// For types that contain a Barrier that may be waited on +pub(super) trait HasBarrier { + fn get_barrier(&self) -> Barrier; +} + +pub(super) struct SchedulingResult { + pub(super) jobs: Vec, + /// The job generator would like to be called again this soon + pub(super) want_interval: Option, +} + +#[async_trait::async_trait] +pub(crate) trait JobGenerator +where + C: TenantScoped, + PE: TenantScoped, + PR: HasBarrier, +{ + /// Called at each scheduling interval. Return a list of jobs to run, most urgent first. + /// + /// This function may be expensive (e.g. walk all tenants), but should not do any I/O. + /// Implementations should take care to yield the executor periodically if running + /// very long loops. + /// + /// Yielding a job here does _not_ guarantee that it will run: if the queue of pending + /// jobs is not drained by the next scheduling interval, pending jobs will be cleared + /// and re-generated. + async fn schedule(&mut self) -> SchedulingResult; + + /// Called when a pending job is ready to be run. + /// // + /// The spawn operation _must_ spawn a task. The task spawned _must_ send + /// its result to the provided result channel (including in error cases). + /// TODO: refactor so that implemeter can't violate these invariants. + fn spawn( + &mut self, + join_set: &mut JoinSet<()>, + result_tx: tokio::sync::mpsc::UnboundedSender, + pending_job: PE, + ) -> PR; + + /// Called when a job previously spawned with spawn() transmits its completion + fn on_completion(&mut self, completion: C); + + /// Called when a command is received. A job will be spawned immediately if the return + /// value is Some, ignoring concurrency limits and the pending queue. + fn on_command(&mut self, cmd: CMD) -> anyhow::Result; +} + +impl TenantBackgroundJobs +where + C: TenantScoped, + PE: TenantScoped, + PR: HasBarrier, + G: JobGenerator, +{ + pub(super) fn new(generator: G, concurrency: usize) -> Self { + let (task_result_tx, task_result_rx) = tokio::sync::mpsc::unbounded_channel(); + + Self { + generator, + pending: std::collections::VecDeque::new(), + running: HashMap::new(), + tasks: JoinSet::new(), + task_result_rx, + task_result_tx, + concurrency, + scheduling_interval: MAX_SCHEDULING_INTERVAL, + _phantom: PhantomData, + } + } + + pub(super) async fn run( + &mut self, + mut command_queue: tokio::sync::mpsc::Receiver>, + background_jobs_can_start: Barrier, + cancel: CancellationToken, + ) { + tracing::info!("Waiting for background_jobs_can start..."); + background_jobs_can_start.wait().await; + tracing::info!("background_jobs_can is ready, proceeding."); + + while !cancel.is_cancelled() { + // Look for new work: this is relatively expensive because we have to go acquire the lock on + // the tenant manager to retrieve tenants, and then iterate over them to figure out which ones + // require an upload. + self.schedule_iteration(&cancel).await; + + if cancel.is_cancelled() { + return; + } + + // Schedule some work, if concurrency limit permits it + self.spawn_pending(); + + // Between scheduling iterations, we will: + // - Drain any complete tasks and spawn pending tasks + // - Handle incoming administrative commands + // - Check our cancellation token + let next_scheduling_iteration = Instant::now() + .checked_add(self.scheduling_interval) + .unwrap_or_else(|| { + tracing::warn!( + "Scheduling interval invalid ({}s), running immediately!", + self.scheduling_interval.as_secs_f64() + ); + Instant::now() + }); + loop { + tokio::select! { + _ = cancel.cancelled() => { + tracing::info!("joining tasks"); + self.shutdown().await; + tracing::info!("terminating on cancellation token."); + + break; + }, + _ = tokio::time::sleep(next_scheduling_iteration.duration_since(Instant::now())) => { + tracing::debug!("woke for scheduling interval"); + break;}, + cmd = command_queue.recv() => { + tracing::debug!("woke for command queue"); + let cmd = match cmd { + Some(c) =>c, + None => { + // SecondaryController was destroyed, and this has raced with + // our CancellationToken + tracing::info!("terminating on command queue destruction"); + cancel.cancel(); + break; + } + }; + + let CommandRequest{ + response_tx, + payload + } = cmd; + self.handle_command(payload, response_tx); + }, + _ = async { + let completion = self.process_next_completion().await; + self.generator.on_completion(completion); + if !cancel.is_cancelled() { + self.spawn_pending(); + } + } => {} + } + } + } + } + + /// For all pending tenants that are elegible for execution, spawn their task. + /// + /// Caller provides the spawn operation, we track the resulting execution. + /// + /// The spawn operation _must_ spawn a task. The task spawned _must_ send + /// its result to the provided result channel (including in error cases). + /// TODO: refactor so that caller can't violate these invariants. + fn spawn_pending(&mut self) { + while !self.pending.is_empty() && self.running.len() < self.concurrency { + // unwrap: loop condition includes !is_empty() + let pending = self.pending.pop_front().unwrap(); + let tenant_shard_id = *pending.get_tenant_shard_id(); + let in_progress = + self.generator + .spawn(&mut self.tasks, self.task_result_tx.clone(), pending); + + self.running.insert(tenant_shard_id, in_progress); + } + } + + /// For administrative commands: skip the pending queue, ignore concurrency limits + fn spawn_now(&mut self, job: PE) -> &PR { + let tenant_shard_id = *job.get_tenant_shard_id(); + let in_progress = self + .generator + .spawn(&mut self.tasks, self.task_result_tx.clone(), job); + + self.running.insert(tenant_shard_id, in_progress); + self.running + .get(&tenant_shard_id) + .expect("We just inserted this") + } + + /// Wait until the next task completes, and handle its completion + /// + /// Cancellation: this method is cancel-safe. + async fn process_next_completion(&mut self) -> C { + match self.task_result_rx.recv().await { + Some(r) => { + self.running.remove(r.get_tenant_shard_id()); + r + } + None => { + unreachable!("Result sender is stored on Self"); + } + } + } + + /// Convert the command into a pending job, spawn it, and when the spawned + /// job completes, send the result down `response_tx`. + fn handle_command( + &mut self, + cmd: CMD, + response_tx: tokio::sync::oneshot::Sender, + ) { + let job = match self.generator.on_command(cmd) { + Ok(j) => j, + Err(e) => { + response_tx.send(CommandResponse { result: Err(e) }).ok(); + return; + } + }; + + let tenant_shard_id = job.get_tenant_shard_id(); + let barrier = if let Some(barrier) = self.get_running(tenant_shard_id) { + barrier + } else { + let running = self.spawn_now(job); + running.get_barrier().clone() + }; + + // This task does no I/O: it only listens for a barrier's completion and then + // sends to the command response channel. It is therefore safe to spawn this without + // any gates/task_mgr hooks. + tokio::task::spawn(async move { + barrier.wait().await; + + response_tx.send(CommandResponse { result: Ok(()) }).ok(); + }); + } + + fn get_running(&self, tenant_shard_id: &TenantShardId) -> Option { + self.running.get(tenant_shard_id).map(|r| r.get_barrier()) + } + + /// Periodic execution phase: inspect all attached tenants and schedule any work they require. + /// + /// The type in `tenants` should be a tenant-like structure, e.g. [`crate::tenant::Tenant`] or [`crate::tenant::secondary::SecondaryTenant`] + /// + /// This function resets the pending list: it is assumed that the caller may change their mind about + /// which tenants need work between calls to schedule_iteration. + async fn schedule_iteration(&mut self, cancel: &CancellationToken) { + let SchedulingResult { + jobs, + want_interval, + } = self.generator.schedule().await; + + // Adjust interval based on feedback from the job generator + if let Some(want_interval) = want_interval { + // Calculation uses second granularity: this scheduler is not intended for high frequency tasks + self.scheduling_interval = Duration::from_secs(std::cmp::min( + std::cmp::max(MIN_SCHEDULING_INTERVAL.as_secs(), want_interval.as_secs()), + MAX_SCHEDULING_INTERVAL.as_secs(), + )); + } + + // The priority order of previously scheduled work may be invalidated by current state: drop + // all pending work (it will be re-scheduled if still needed) + self.pending.clear(); + + // While iterating over the potentially-long list of tenants, we will periodically yield + // to avoid blocking executor. + yielding_loop(1000, cancel, jobs.into_iter(), |job| { + // Skip tenants that already have a write in flight + if !self.running.contains_key(job.get_tenant_shard_id()) { + self.pending.push_back(job); + } + }) + .await + .ok(); + } + + /// It is the callers responsibility to make sure that the tasks they scheduled + /// respect an appropriate cancellation token, to shut down promptly. + async fn shutdown(&mut self) { + // We do not simply drop the JoinSet, in order to have an orderly shutdown without cancellation. + while let Some(_r) = self.tasks.join_next().await {} + } +}