diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 0b2f7876db..63ba4c10e6 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -61,6 +61,13 @@ use crate::{ storage_sync::{self, index::LayerFileMetadata}, }; +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +enum FlushLoopState { + NotStarted, + Running, + Exited, +} + pub struct Timeline { conf: &'static PageServerConf, tenant_conf: Arc>, @@ -122,7 +129,7 @@ pub struct Timeline { write_lock: Mutex<()>, /// Used to avoid multiple `flush_loop` tasks running - flush_loop_started: Mutex, + flush_loop_state: Mutex, /// layer_flush_start_tx can be used to wake up the layer-flushing task. /// The value is a counter, incremented every time a new flush cycle is requested. @@ -755,7 +762,7 @@ impl Timeline { upload_layers: AtomicBool::new(upload_layers), - flush_loop_started: Mutex::new(false), + flush_loop_state: Mutex::new(FlushLoopState::NotStarted), layer_flush_start_tx, layer_flush_done_tx, @@ -794,13 +801,23 @@ impl Timeline { } pub(super) fn maybe_spawn_flush_loop(self: &Arc) { - let mut flush_loop_started = self.flush_loop_started.lock().unwrap(); - if *flush_loop_started { - info!( - "skipping attempt to start flush_loop twice {}/{}", - self.tenant_id, self.timeline_id - ); - return; + let mut flush_loop_state = self.flush_loop_state.lock().unwrap(); + match *flush_loop_state { + FlushLoopState::NotStarted => (), + FlushLoopState::Running => { + info!( + "skipping attempt to start flush_loop twice {}/{}", + self.tenant_id, self.timeline_id + ); + return; + } + FlushLoopState::Exited => { + warn!( + "ignoring attempt to restart exited flush_loop {}/{}", + self.tenant_id, self.timeline_id + ); + return; + } } let layer_flush_start_rx = self.layer_flush_start_tx.subscribe(); @@ -813,11 +830,16 @@ impl Timeline { Some(self.timeline_id), "layer flush task", false, - async move { self_clone.flush_loop(layer_flush_start_rx).await; Ok(()) } + async move { + self_clone.flush_loop(layer_flush_start_rx).await; + let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap(); + assert_eq!(*flush_loop_state, FlushLoopState::Running); + *flush_loop_state = FlushLoopState::Exited; + Ok(()) } .instrument(info_span!(parent: None, "layer flush task", tenant = %self.tenant_id, timeline = %self.timeline_id)) ); - *flush_loop_started = true; + *flush_loop_state = FlushLoopState::Running; } pub(super) fn launch_wal_receiver(self: &Arc) { @@ -1365,8 +1387,9 @@ impl Timeline { // finished, instead of some other flush that was started earlier. let mut my_flush_request = 0; - if !&*self.flush_loop_started.lock().unwrap() { - anyhow::bail!("cannot flush frozen layers when flush_loop is not running") + let flush_loop_state = { *self.flush_loop_state.lock().unwrap() }; + if flush_loop_state != FlushLoopState::Running { + anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}") } self.layer_flush_start_tx.send_modify(|counter| {