timeline: explicit tracking of flush loop state: NotStarted, Running, Exited

This allows us to error out in the case where we request flush but the
flush loop is not running.
Before, we would only track whether it was started, but not when it
exited.
Better to use an enum with 3 states than a 2-state bool because then
the error message can answer the question whether we ever started
the flush loop or not.
This commit is contained in:
Christian Schwarz
2022-11-17 12:32:11 -05:00
committed by Christian Schwarz
parent 2655bdbb2e
commit d783889a1f

View File

@@ -61,6 +61,13 @@ use crate::{
storage_sync::{self, index::LayerFileMetadata},
};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum FlushLoopState {
NotStarted,
Running,
Exited,
}
pub struct Timeline {
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
@@ -122,7 +129,7 @@ pub struct Timeline {
write_lock: Mutex<()>,
/// Used to avoid multiple `flush_loop` tasks running
flush_loop_started: Mutex<bool>,
flush_loop_state: Mutex<FlushLoopState>,
/// layer_flush_start_tx can be used to wake up the layer-flushing task.
/// The value is a counter, incremented every time a new flush cycle is requested.
@@ -755,7 +762,7 @@ impl Timeline {
upload_layers: AtomicBool::new(upload_layers),
flush_loop_started: Mutex::new(false),
flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
layer_flush_start_tx,
layer_flush_done_tx,
@@ -794,13 +801,23 @@ impl Timeline {
}
pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
let mut flush_loop_started = self.flush_loop_started.lock().unwrap();
if *flush_loop_started {
info!(
"skipping attempt to start flush_loop twice {}/{}",
self.tenant_id, self.timeline_id
);
return;
let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
match *flush_loop_state {
FlushLoopState::NotStarted => (),
FlushLoopState::Running => {
info!(
"skipping attempt to start flush_loop twice {}/{}",
self.tenant_id, self.timeline_id
);
return;
}
FlushLoopState::Exited => {
warn!(
"ignoring attempt to restart exited flush_loop {}/{}",
self.tenant_id, self.timeline_id
);
return;
}
}
let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
@@ -813,11 +830,16 @@ impl Timeline {
Some(self.timeline_id),
"layer flush task",
false,
async move { self_clone.flush_loop(layer_flush_start_rx).await; Ok(()) }
async move {
self_clone.flush_loop(layer_flush_start_rx).await;
let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
assert_eq!(*flush_loop_state, FlushLoopState::Running);
*flush_loop_state = FlushLoopState::Exited;
Ok(()) }
.instrument(info_span!(parent: None, "layer flush task", tenant = %self.tenant_id, timeline = %self.timeline_id))
);
*flush_loop_started = true;
*flush_loop_state = FlushLoopState::Running;
}
pub(super) fn launch_wal_receiver(self: &Arc<Self>) {
@@ -1365,8 +1387,9 @@ impl Timeline {
// finished, instead of some other flush that was started earlier.
let mut my_flush_request = 0;
if !&*self.flush_loop_started.lock().unwrap() {
anyhow::bail!("cannot flush frozen layers when flush_loop is not running")
let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
if flush_loop_state != FlushLoopState::Running {
anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
}
self.layer_flush_start_tx.send_modify(|counter| {