From 86dd8c96d3c76ae45d60f5af7effe17c236dbfdb Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 9 Jun 2023 18:14:00 +0200 Subject: [PATCH] add infrastructure to expect use of initdb_lsn flush optimization --- pageserver/src/tenant/timeline.rs | 67 +++++++++++++++++++++++++------ 1 file changed, 54 insertions(+), 13 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 71f83bf127..97b68976f5 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -84,9 +84,14 @@ use super::remote_timeline_client::RemoteTimelineClient; use super::storage_layer::{DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset}; #[derive(Debug, PartialEq, Eq, Clone, Copy)] -enum FlushLoopState { +pub(super) enum FlushLoopState { NotStarted, - Running, + Running { + #[cfg(test)] + expect_initdb_optimization: bool, + #[cfg(test)] + initdb_optimization_count: usize, + }, Exited, } @@ -183,7 +188,7 @@ pub struct Timeline { write_lock: Mutex<()>, /// Used to avoid multiple `flush_loop` tasks running - flush_loop_state: Mutex, + pub(super) flush_loop_state: Mutex, /// layer_flush_start_tx can be used to wake up the layer-flushing task. /// The value is a counter, incremented every time a new flush cycle is requested. @@ -1497,7 +1502,7 @@ impl Timeline { let mut flush_loop_state = self.flush_loop_state.lock().unwrap(); match *flush_loop_state { FlushLoopState::NotStarted => (), - FlushLoopState::Running => { + FlushLoopState::Running { .. } => { info!( "skipping attempt to start flush_loop twice {}/{}", self.tenant_id, self.timeline_id @@ -1517,6 +1522,12 @@ impl Timeline { let self_clone = Arc::clone(self); info!("spawning flush loop"); + *flush_loop_state = FlushLoopState::Running { + #[cfg(test)] + expect_initdb_optimization: false, + #[cfg(test)] + initdb_optimization_count: 0, + }; task_mgr::spawn( task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::LayerFlushTask, @@ -1528,14 +1539,12 @@ impl Timeline { let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error); self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await; let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap(); - assert_eq!(*flush_loop_state, FlushLoopState::Running); + assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..})); *flush_loop_state = FlushLoopState::Exited; Ok(()) } .instrument(info_span!(parent: None, "layer flush task", tenant = %self.tenant_id, timeline = %self.timeline_id)) ); - - *flush_loop_state = FlushLoopState::Running; } /// Creates and starts the wal receiver. @@ -2385,10 +2394,17 @@ impl Timeline { } ValueReconstructResult::Missing => { return Err(layer_traversal_error( - format!( - "could not find data for key {} at LSN {}, for request at LSN {}", - key, cont_lsn, request_lsn - ), + if cfg!(test) { + format!( + "could not find data for key {} at LSN {}, for request at LSN {}\n{}", + key, cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(), + ) + } else { + format!( + "could not find data for key {} at LSN {}, for request at LSN {}", + key, cont_lsn, request_lsn + ) + }, traversal_path, )); } @@ -2644,9 +2660,10 @@ impl Timeline { let last_record_lsn = self.get_last_record_lsn(); ensure!( lsn > last_record_lsn, - "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})", + "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})\n{}", lsn, last_record_lsn, + std::backtrace::Backtrace::force_capture(), ); // Do we have a layer open for writing already? @@ -2781,7 +2798,7 @@ impl Timeline { let mut my_flush_request = 0; let flush_loop_state = { *self.flush_loop_state.lock().unwrap() }; - if flush_loop_state != FlushLoopState::Running { + if !matches!(flush_loop_state, FlushLoopState::Running { .. }) { anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}") } @@ -2831,6 +2848,18 @@ impl Timeline { let lsn_range = frozen_layer.get_lsn_range(); let layer_paths_to_upload = if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) { + #[cfg(test)] + match &mut *self.flush_loop_state.lock().unwrap() { + FlushLoopState::NotStarted | FlushLoopState::Exited => { + panic!("flush loop not running") + } + FlushLoopState::Running { + initdb_optimization_count, + .. + } => { + *initdb_optimization_count += 1; + } + } // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not // require downloading anything during initial import. let (partitioning, _lsn) = self @@ -2839,6 +2868,18 @@ impl Timeline { self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx) .await? } else { + #[cfg(test)] + match &mut *self.flush_loop_state.lock().unwrap() { + FlushLoopState::NotStarted | FlushLoopState::Exited => { + panic!("flush loop not running") + } + FlushLoopState::Running { + expect_initdb_optimization, + .. + } => { + assert!(!*expect_initdb_optimization, "expected initdb optimization"); + } + } // normal case, write out a L0 delta layer file. let this = self.clone(); let frozen_layer = frozen_layer.clone();