From d783889a1fc58f00ceb5d2bf0dcfe5f7ca3f4406 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 17 Nov 2022 12:32:11 -0500 Subject: [PATCH] timeline: explicit tracking of flush loop state: NotStarted, Running, Exited This allows us to error out in the case where we request flush but the flush loop is not running. Before, we would only track whether it was started, but not when it exited. Better to use an enum with 3 states than a 2-state bool because then the error message can answer the question whether we ever started the flush loop or not. --- pageserver/src/tenant/timeline.rs | 49 +++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 0b2f7876db..63ba4c10e6 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -61,6 +61,13 @@ use crate::{ storage_sync::{self, index::LayerFileMetadata}, }; +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +enum FlushLoopState { + NotStarted, + Running, + Exited, +} + pub struct Timeline { conf: &'static PageServerConf, tenant_conf: Arc>, @@ -122,7 +129,7 @@ pub struct Timeline { write_lock: Mutex<()>, /// Used to avoid multiple `flush_loop` tasks running - flush_loop_started: Mutex, + flush_loop_state: Mutex, /// layer_flush_start_tx can be used to wake up the layer-flushing task. /// The value is a counter, incremented every time a new flush cycle is requested. @@ -755,7 +762,7 @@ impl Timeline { upload_layers: AtomicBool::new(upload_layers), - flush_loop_started: Mutex::new(false), + flush_loop_state: Mutex::new(FlushLoopState::NotStarted), layer_flush_start_tx, layer_flush_done_tx, @@ -794,13 +801,23 @@ impl Timeline { } pub(super) fn maybe_spawn_flush_loop(self: &Arc) { - let mut flush_loop_started = self.flush_loop_started.lock().unwrap(); - if *flush_loop_started { - info!( - "skipping attempt to start flush_loop twice {}/{}", - self.tenant_id, self.timeline_id - ); - return; + let mut flush_loop_state = self.flush_loop_state.lock().unwrap(); + match *flush_loop_state { + FlushLoopState::NotStarted => (), + FlushLoopState::Running => { + info!( + "skipping attempt to start flush_loop twice {}/{}", + self.tenant_id, self.timeline_id + ); + return; + } + FlushLoopState::Exited => { + warn!( + "ignoring attempt to restart exited flush_loop {}/{}", + self.tenant_id, self.timeline_id + ); + return; + } } let layer_flush_start_rx = self.layer_flush_start_tx.subscribe(); @@ -813,11 +830,16 @@ impl Timeline { Some(self.timeline_id), "layer flush task", false, - async move { self_clone.flush_loop(layer_flush_start_rx).await; Ok(()) } + async move { + self_clone.flush_loop(layer_flush_start_rx).await; + let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap(); + assert_eq!(*flush_loop_state, FlushLoopState::Running); + *flush_loop_state = FlushLoopState::Exited; + Ok(()) } .instrument(info_span!(parent: None, "layer flush task", tenant = %self.tenant_id, timeline = %self.timeline_id)) ); - *flush_loop_started = true; + *flush_loop_state = FlushLoopState::Running; } pub(super) fn launch_wal_receiver(self: &Arc) { @@ -1365,8 +1387,9 @@ impl Timeline { // finished, instead of some other flush that was started earlier. let mut my_flush_request = 0; - if !&*self.flush_loop_started.lock().unwrap() { - anyhow::bail!("cannot flush frozen layers when flush_loop is not running") + let flush_loop_state = { *self.flush_loop_state.lock().unwrap() }; + if flush_loop_state != FlushLoopState::Running { + anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}") } self.layer_flush_start_tx.send_modify(|counter| {