mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 18:10:37 +00:00
add infrastructure to expect use of initdb_lsn flush optimization
This commit is contained in:
@@ -84,9 +84,14 @@ use super::remote_timeline_client::RemoteTimelineClient;
|
||||
use super::storage_layer::{DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
enum FlushLoopState {
|
||||
pub(super) enum FlushLoopState {
|
||||
NotStarted,
|
||||
Running,
|
||||
Running {
|
||||
#[cfg(test)]
|
||||
expect_initdb_optimization: bool,
|
||||
#[cfg(test)]
|
||||
initdb_optimization_count: usize,
|
||||
},
|
||||
Exited,
|
||||
}
|
||||
|
||||
@@ -183,7 +188,7 @@ pub struct Timeline {
|
||||
write_lock: Mutex<()>,
|
||||
|
||||
/// Used to avoid multiple `flush_loop` tasks running
|
||||
flush_loop_state: Mutex<FlushLoopState>,
|
||||
pub(super) flush_loop_state: Mutex<FlushLoopState>,
|
||||
|
||||
/// layer_flush_start_tx can be used to wake up the layer-flushing task.
|
||||
/// The value is a counter, incremented every time a new flush cycle is requested.
|
||||
@@ -1497,7 +1502,7 @@ impl Timeline {
|
||||
let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
|
||||
match *flush_loop_state {
|
||||
FlushLoopState::NotStarted => (),
|
||||
FlushLoopState::Running => {
|
||||
FlushLoopState::Running { .. } => {
|
||||
info!(
|
||||
"skipping attempt to start flush_loop twice {}/{}",
|
||||
self.tenant_id, self.timeline_id
|
||||
@@ -1517,6 +1522,12 @@ impl Timeline {
|
||||
let self_clone = Arc::clone(self);
|
||||
|
||||
info!("spawning flush loop");
|
||||
*flush_loop_state = FlushLoopState::Running {
|
||||
#[cfg(test)]
|
||||
expect_initdb_optimization: false,
|
||||
#[cfg(test)]
|
||||
initdb_optimization_count: 0,
|
||||
};
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
task_mgr::TaskKind::LayerFlushTask,
|
||||
@@ -1528,14 +1539,12 @@ impl Timeline {
|
||||
let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
|
||||
self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
|
||||
let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
|
||||
assert_eq!(*flush_loop_state, FlushLoopState::Running);
|
||||
assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
|
||||
*flush_loop_state = FlushLoopState::Exited;
|
||||
Ok(())
|
||||
}
|
||||
.instrument(info_span!(parent: None, "layer flush task", tenant = %self.tenant_id, timeline = %self.timeline_id))
|
||||
);
|
||||
|
||||
*flush_loop_state = FlushLoopState::Running;
|
||||
}
|
||||
|
||||
/// Creates and starts the wal receiver.
|
||||
@@ -2385,10 +2394,17 @@ impl Timeline {
|
||||
}
|
||||
ValueReconstructResult::Missing => {
|
||||
return Err(layer_traversal_error(
|
||||
format!(
|
||||
"could not find data for key {} at LSN {}, for request at LSN {}",
|
||||
key, cont_lsn, request_lsn
|
||||
),
|
||||
if cfg!(test) {
|
||||
format!(
|
||||
"could not find data for key {} at LSN {}, for request at LSN {}\n{}",
|
||||
key, cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
|
||||
)
|
||||
} else {
|
||||
format!(
|
||||
"could not find data for key {} at LSN {}, for request at LSN {}",
|
||||
key, cont_lsn, request_lsn
|
||||
)
|
||||
},
|
||||
traversal_path,
|
||||
));
|
||||
}
|
||||
@@ -2644,9 +2660,10 @@ impl Timeline {
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
ensure!(
|
||||
lsn > last_record_lsn,
|
||||
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
|
||||
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})\n{}",
|
||||
lsn,
|
||||
last_record_lsn,
|
||||
std::backtrace::Backtrace::force_capture(),
|
||||
);
|
||||
|
||||
// Do we have a layer open for writing already?
|
||||
@@ -2781,7 +2798,7 @@ impl Timeline {
|
||||
let mut my_flush_request = 0;
|
||||
|
||||
let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
|
||||
if flush_loop_state != FlushLoopState::Running {
|
||||
if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
|
||||
anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
|
||||
}
|
||||
|
||||
@@ -2831,6 +2848,18 @@ impl Timeline {
|
||||
let lsn_range = frozen_layer.get_lsn_range();
|
||||
let layer_paths_to_upload =
|
||||
if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
|
||||
#[cfg(test)]
|
||||
match &mut *self.flush_loop_state.lock().unwrap() {
|
||||
FlushLoopState::NotStarted | FlushLoopState::Exited => {
|
||||
panic!("flush loop not running")
|
||||
}
|
||||
FlushLoopState::Running {
|
||||
initdb_optimization_count,
|
||||
..
|
||||
} => {
|
||||
*initdb_optimization_count += 1;
|
||||
}
|
||||
}
|
||||
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
|
||||
// require downloading anything during initial import.
|
||||
let (partitioning, _lsn) = self
|
||||
@@ -2839,6 +2868,18 @@ impl Timeline {
|
||||
self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
|
||||
.await?
|
||||
} else {
|
||||
#[cfg(test)]
|
||||
match &mut *self.flush_loop_state.lock().unwrap() {
|
||||
FlushLoopState::NotStarted | FlushLoopState::Exited => {
|
||||
panic!("flush loop not running")
|
||||
}
|
||||
FlushLoopState::Running {
|
||||
expect_initdb_optimization,
|
||||
..
|
||||
} => {
|
||||
assert!(!*expect_initdb_optimization, "expected initdb optimization");
|
||||
}
|
||||
}
|
||||
// normal case, write out a L0 delta layer file.
|
||||
let this = self.clone();
|
||||
let frozen_layer = frozen_layer.clone();
|
||||
|
||||
Reference in New Issue
Block a user