From 6b98dd3fb43a93c62c6bb37643017d8a988f43bf Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 27 Feb 2024 18:10:39 +0100 Subject: [PATCH] Revert "pageserver: roll open layer in timeline writer (#6661)" This reverts commit 587cb705b898565d459d044df84d1ac2633f00bf. Conflicts: pageserver/src/tenant.rs pageserver/src/tenant/timeline.rs The conflicts were with * pageserver: adjust checkpoint distance for sharded tenants (#6852) * pageserver: add vectored get implementation (#6576) --- pageserver/src/pgdatadir_mapping.rs | 17 +- pageserver/src/tenant.rs | 32 +- .../tenant/storage_layer/inmemory_layer.rs | 38 +- pageserver/src/tenant/timeline.rs | 324 ++++++------------ .../walreceiver/walreceiver_connection.rs | 27 ++ .../fixtures/pageserver/allowed_errors.py | 5 - 6 files changed, 178 insertions(+), 265 deletions(-) diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 65f8ddaab4..0ff03303d4 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -15,7 +15,6 @@ use crate::walrecord::NeonWalRecord; use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; use enum_map::Enum; -use itertools::Itertools; use pageserver_api::key::{ dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key, @@ -1493,7 +1492,7 @@ impl<'a> DatadirModification<'a> { return Ok(()); } - let mut writer = self.tline.writer().await; + let writer = self.tline.writer().await; // Flush relation and SLRU data blocks, keep metadata. let mut retained_pending_updates = HashMap::<_, Vec<_>>::new(); @@ -1532,23 +1531,13 @@ impl<'a> DatadirModification<'a> { /// All the modifications in this atomic update are stamped by the specified LSN. /// pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> { - let mut writer = self.tline.writer().await; + let writer = self.tline.writer().await; let pending_nblocks = self.pending_nblocks; self.pending_nblocks = 0; if !self.pending_updates.is_empty() { - let prev_pending_updates = std::mem::take(&mut self.pending_updates); - - // The put_batch call below expects expects the inputs to be sorted by Lsn, - // so we do that first. - let lsn_ordered_batch: Vec<(Key, Lsn, Value)> = prev_pending_updates - .into_iter() - .map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (key, lsn, val))) - .kmerge_by(|lhs, rhs| lhs.1 .0 < rhs.1 .0) - .collect(); - - writer.put_batch(lsn_ordered_batch, ctx).await?; + writer.put_batch(&self.pending_updates, ctx).await?; self.pending_updates.clear(); } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ebdff5f924..8e25332e85 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3853,7 +3853,7 @@ mod tests { .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx) .await?; - let mut writer = tline.writer().await; + let writer = tline.writer().await; writer .put( *TEST_KEY, @@ -3865,7 +3865,7 @@ mod tests { writer.finish_write(Lsn(0x10)); drop(writer); - let mut writer = tline.writer().await; + let writer = tline.writer().await; writer .put( *TEST_KEY, @@ -3931,7 +3931,7 @@ mod tests { let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) .await?; - let mut writer = tline.writer().await; + let writer = tline.writer().await; #[allow(non_snake_case)] let TEST_KEY_A: Key = Key::from_hex("110000000033333333444444445500000001").unwrap(); @@ -3965,7 +3965,7 @@ mod tests { let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); - let mut new_writer = newtline.writer().await; + let new_writer = newtline.writer().await; new_writer .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"), &ctx) .await?; @@ -3997,7 +3997,7 @@ mod tests { ) -> anyhow::Result<()> { let mut lsn = start_lsn; { - let mut writer = tline.writer().await; + let writer = tline.writer().await; // Create a relation on the timeline writer .put( @@ -4022,7 +4022,7 @@ mod tests { } tline.freeze_and_flush().await?; { - let mut writer = tline.writer().await; + let writer = tline.writer().await; writer .put( *TEST_KEY, @@ -4385,7 +4385,7 @@ mod tests { .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx) .await?; - let mut writer = tline.writer().await; + let writer = tline.writer().await; writer .put( *TEST_KEY, @@ -4402,7 +4402,7 @@ mod tests { .compact(&CancellationToken::new(), EnumSet::empty(), &ctx) .await?; - let mut writer = tline.writer().await; + let writer = tline.writer().await; writer .put( *TEST_KEY, @@ -4419,7 +4419,7 @@ mod tests { .compact(&CancellationToken::new(), EnumSet::empty(), &ctx) .await?; - let mut writer = tline.writer().await; + let writer = tline.writer().await; writer .put( *TEST_KEY, @@ -4436,7 +4436,7 @@ mod tests { .compact(&CancellationToken::new(), EnumSet::empty(), &ctx) .await?; - let mut writer = tline.writer().await; + let writer = tline.writer().await; writer .put( *TEST_KEY, @@ -4493,7 +4493,7 @@ mod tests { for _ in 0..repeat { for _ in 0..key_count { test_key.field6 = blknum; - let mut writer = timeline.writer().await; + let writer = timeline.writer().await; writer .put( test_key, @@ -4664,7 +4664,7 @@ mod tests { for blknum in 0..NUM_KEYS { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; - let mut writer = tline.writer().await; + let writer = tline.writer().await; writer .put( test_key, @@ -4685,7 +4685,7 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; - let mut writer = tline.writer().await; + let writer = tline.writer().await; writer .put( test_key, @@ -4753,7 +4753,7 @@ mod tests { for blknum in 0..NUM_KEYS { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; - let mut writer = tline.writer().await; + let writer = tline.writer().await; writer .put( test_key, @@ -4782,7 +4782,7 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; - let mut writer = tline.writer().await; + let writer = tline.writer().await; writer .put( test_key, @@ -4859,7 +4859,7 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; - let mut writer = tline.writer().await; + let writer = tline.writer().await; writer .put( test_key, diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 5f1db21d49..e7da28b8d6 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -336,17 +336,32 @@ impl InMemoryLayer { /// Common subroutine of the public put_wal_record() and put_page_image() functions. /// Adds the page version to the in-memory tree - pub(crate) async fn put_value( &self, key: Key, lsn: Lsn, - buf: &[u8], + val: &Value, ctx: &RequestContext, ) -> Result<()> { let mut inner = self.inner.write().await; self.assert_writable(); - self.put_value_locked(&mut inner, key, lsn, buf, ctx).await + self.put_value_locked(&mut inner, key, lsn, val, ctx).await + } + + pub(crate) async fn put_values( + &self, + values: &HashMap>, + ctx: &RequestContext, + ) -> Result<()> { + let mut inner = self.inner.write().await; + self.assert_writable(); + for (key, vals) in values { + for (lsn, val) in vals { + self.put_value_locked(&mut inner, *key, *lsn, val, ctx) + .await?; + } + } + Ok(()) } async fn put_value_locked( @@ -354,16 +369,22 @@ impl InMemoryLayer { locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>, key: Key, lsn: Lsn, - buf: &[u8], + val: &Value, ctx: &RequestContext, ) -> Result<()> { trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn); let off = { + // Avoid doing allocations for "small" values. + // In the regression test suite, the limit of 256 avoided allocations in 95% of cases: + // https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061 + let mut buf = smallvec::SmallVec::<[u8; 256]>::new(); + buf.clear(); + val.ser_into(&mut buf)?; locked_inner .file .write_blob( - buf, + &buf, &RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::InMemoryLayer) .build(), @@ -391,12 +412,7 @@ impl InMemoryLayer { pub async fn freeze(&self, end_lsn: Lsn) { let inner = self.inner.write().await; - assert!( - self.start_lsn < end_lsn, - "{} >= {}", - self.start_lsn, - end_lsn - ); + assert!(self.start_lsn < end_lsn); self.end_lsn.set(end_lsn).expect("end_lsn set only once"); for vec_map in inner.index.values() { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 2c2351d531..4d0760fda0 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -26,15 +26,6 @@ use pageserver_api::{ }; use rand::Rng; use serde_with::serde_as; -use storage_broker::BrokerClientChannel; -use tokio::{ - runtime::Handle, - sync::{oneshot, watch}, -}; -use tokio_util::sync::CancellationToken; -use tracing::*; -use utils::{bin_ser::BeSer, sync::gate::Gate}; - use std::ops::{Deref, Range}; use std::pin::pin; use std::sync::atomic::Ordering as AtomicOrdering; @@ -49,6 +40,14 @@ use std::{ cmp::{max, min, Ordering}, ops::ControlFlow, }; +use storage_broker::BrokerClientChannel; +use tokio::{ + runtime::Handle, + sync::{oneshot, watch}, +}; +use tokio_util::sync::CancellationToken; +use tracing::*; +use utils::sync::gate::Gate; use crate::pgdatadir_mapping::DirectoryKind; use crate::tenant::timeline::logical_size::CurrentLogicalSize; @@ -274,7 +273,7 @@ pub struct Timeline { /// Locked automatically by [`TimelineWriter`] and checkpointer. /// Must always be acquired before the layer map/individual layer lock /// to avoid deadlock. - write_lock: tokio::sync::Mutex>, + write_lock: tokio::sync::Mutex<()>, /// Used to avoid multiple `flush_loop` tasks running pub(super) flush_loop_state: Mutex, @@ -1204,10 +1203,58 @@ impl Timeline { pub(crate) async fn writer(&self) -> TimelineWriter<'_> { TimelineWriter { tl: self, - write_guard: self.write_lock.lock().await, + _write_guard: self.write_lock.lock().await, } } + /// Check if more than 'checkpoint_distance' of WAL has been accumulated in + /// the in-memory layer, and initiate flushing it if so. + /// + /// Also flush after a period of time without new data -- it helps + /// safekeepers to regard pageserver as caught up and suspend activity. + pub(crate) async fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { + let last_lsn = self.get_last_record_lsn(); + let open_layer_size = { + let guard = self.layers.read().await; + let layers = guard.layer_map(); + let Some(open_layer) = layers.open_layer.as_ref() else { + return Ok(()); + }; + open_layer.size().await? + }; + let last_freeze_at = self.last_freeze_at.load(); + let last_freeze_ts = *(self.last_freeze_ts.read().unwrap()); + let distance = last_lsn.widening_sub(last_freeze_at); + // Rolling the open layer can be triggered by: + // 1. The distance from the last LSN we rolled at. This bounds the amount of WAL that + // the safekeepers need to store. For sharded tenants, we multiply by shard count to + // account for how writes are distributed across shards: we expect each node to consume + // 1/count of the LSN on average. + // 2. The size of the currently open layer. + // 3. The time since the last roll. It helps safekeepers to regard pageserver as caught + // up and suspend activity. + if (distance + >= self.get_checkpoint_distance() as i128 * self.shard_identity.count.count() as i128) + || open_layer_size > self.get_checkpoint_distance() + || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout()) + { + info!( + "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}", + distance, + open_layer_size, + last_freeze_ts.elapsed() + ); + + self.freeze_inmem_layer(true).await; + self.last_freeze_at.store(last_lsn); + *(self.last_freeze_ts.write().unwrap()) = Instant::now(); + + // Wake up the layer flusher + self.flush_frozen_layers(); + } + Ok(()) + } + pub(crate) fn activate( self: &Arc, broker_client: BrokerClientChannel, @@ -1639,7 +1686,7 @@ impl Timeline { layer_flush_start_tx, layer_flush_done_tx, - write_lock: tokio::sync::Mutex::new(None), + write_lock: tokio::sync::Mutex::new(()), gc_info: std::sync::RwLock::new(GcInfo { retain_lsns: Vec::new(), @@ -2980,6 +3027,43 @@ impl Timeline { Ok(layer) } + async fn put_value( + &self, + key: Key, + lsn: Lsn, + val: &Value, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + //info!("PUT: key {} at {}", key, lsn); + let layer = self.get_layer_for_write(lsn).await?; + layer.put_value(key, lsn, val, ctx).await?; + Ok(()) + } + + async fn put_values( + &self, + values: &HashMap>, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + // Pick the first LSN in the batch to get the layer to write to. + for lsns in values.values() { + if let Some((lsn, _)) = lsns.first() { + let layer = self.get_layer_for_write(*lsn).await?; + layer.put_values(values, ctx).await?; + break; + } + } + Ok(()) + } + + async fn put_tombstones(&self, tombstones: &[(Range, Lsn)]) -> anyhow::Result<()> { + if let Some((_, lsn)) = tombstones.first() { + let layer = self.get_layer_for_write(*lsn).await?; + layer.put_tombstones(tombstones).await?; + } + Ok(()) + } + pub(crate) fn finish_write(&self, new_lsn: Lsn) { assert!(new_lsn.is_aligned()); @@ -2990,20 +3074,14 @@ impl Timeline { async fn freeze_inmem_layer(&self, write_lock_held: bool) { // Freeze the current open in-memory layer. It will be written to disk on next // iteration. - let _write_guard = if write_lock_held { None } else { Some(self.write_lock.lock().await) }; - - self.freeze_inmem_layer_at(self.get_last_record_lsn()).await; - } - - async fn freeze_inmem_layer_at(&self, at: Lsn) { let mut guard = self.layers.write().await; guard - .try_freeze_in_memory_layer(at, &self.last_freeze_at) + .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at) .await; } @@ -4982,43 +5060,13 @@ fn layer_traversal_error(msg: String, path: Vec) -> PageRecon PageReconstructError::from(msg) } -struct TimelineWriterState { - open_layer: Arc, - current_size: u64, - // Previous Lsn which passed through - prev_lsn: Option, - // Largest Lsn which passed through the current writer - max_lsn: Option, - // Cached details of the last freeze. Avoids going trough the atomic/lock on every put. - cached_last_freeze_at: Lsn, - cached_last_freeze_ts: Instant, -} - -impl TimelineWriterState { - fn new( - open_layer: Arc, - current_size: u64, - last_freeze_at: Lsn, - last_freeze_ts: Instant, - ) -> Self { - Self { - open_layer, - current_size, - prev_lsn: None, - max_lsn: None, - cached_last_freeze_at: last_freeze_at, - cached_last_freeze_ts: last_freeze_ts, - } - } -} - /// Various functions to mutate the timeline. // TODO Currently, Deref is used to allow easy access to read methods from this trait. // This is probably considered a bad practice in Rust and should be fixed eventually, // but will cause large code changes. pub(crate) struct TimelineWriter<'a> { tl: &'a Timeline, - write_guard: tokio::sync::MutexGuard<'a, Option>, + _write_guard: tokio::sync::MutexGuard<'a, ()>, } impl Deref for TimelineWriter<'_> { @@ -5029,193 +5077,31 @@ impl Deref for TimelineWriter<'_> { } } -impl Drop for TimelineWriter<'_> { - fn drop(&mut self) { - self.write_guard.take(); - } -} - -enum OpenLayerAction { - Roll, - Open, - None, -} - impl<'a> TimelineWriter<'a> { /// Put a new page version that can be constructed from a WAL record /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. pub(crate) async fn put( - &mut self, + &self, key: Key, lsn: Lsn, value: &Value, ctx: &RequestContext, ) -> anyhow::Result<()> { - // Avoid doing allocations for "small" values. - // In the regression test suite, the limit of 256 avoided allocations in 95% of cases: - // https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061 - let mut buf = smallvec::SmallVec::<[u8; 256]>::new(); - buf.clear(); - value.ser_into(&mut buf)?; - let buf_size: u64 = buf.len().try_into().expect("oversized value buf"); - - let action = self.get_open_layer_action(lsn, buf_size); - let layer = self.handle_open_layer_action(lsn, action).await?; - let res = layer.put_value(key, lsn, &buf, ctx).await; - - if res.is_ok() { - // Update the current size only when the entire write was ok. - // In case of failures, we may have had partial writes which - // render the size tracking out of sync. That's ok because - // the checkpoint distance should be significantly smaller - // than the S3 single shot upload limit of 5GiB. - let state = self.write_guard.as_mut().unwrap(); - - state.current_size += buf_size; - state.prev_lsn = Some(lsn); - state.max_lsn = std::cmp::max(state.max_lsn, Some(lsn)); - } - - res + self.tl.put_value(key, lsn, value, ctx).await } - async fn handle_open_layer_action( - &mut self, - at: Lsn, - action: OpenLayerAction, - ) -> anyhow::Result<&Arc> { - match action { - OpenLayerAction::Roll => { - let max_lsn = self.write_guard.as_ref().unwrap().max_lsn.unwrap(); - self.tl.freeze_inmem_layer_at(max_lsn).await; - - let now = Instant::now(); - *(self.last_freeze_ts.write().unwrap()) = now; - - self.tl.flush_frozen_layers(); - - let current_size = self.write_guard.as_ref().unwrap().current_size; - if current_size > self.get_checkpoint_distance() { - warn!("Flushed oversized open layer with size {}", current_size) - } - - assert!(self.write_guard.is_some()); - - let layer = self.tl.get_layer_for_write(at).await?; - let initial_size = layer.size().await?; - self.write_guard.replace(TimelineWriterState::new( - layer, - initial_size, - Lsn(max_lsn.0 + 1), - now, - )); - } - OpenLayerAction::Open => { - assert!(self.write_guard.is_none()); - - let layer = self.tl.get_layer_for_write(at).await?; - let initial_size = layer.size().await?; - - let last_freeze_at = self.last_freeze_at.load(); - let last_freeze_ts = *self.last_freeze_ts.read().unwrap(); - self.write_guard.replace(TimelineWriterState::new( - layer, - initial_size, - last_freeze_at, - last_freeze_ts, - )); - } - OpenLayerAction::None => { - assert!(self.write_guard.is_some()); - } - } - - Ok(&self.write_guard.as_ref().unwrap().open_layer) - } - - fn get_open_layer_action(&self, lsn: Lsn, new_value_size: u64) -> OpenLayerAction { - let state = &*self.write_guard; - let Some(state) = &state else { - return OpenLayerAction::Open; - }; - - if state.prev_lsn == Some(lsn) { - // Rolling mid LSN is not supported by downstream code. - // Hence, only roll at LSN boundaries. - return OpenLayerAction::None; - } - - let distance = lsn.widening_sub(state.cached_last_freeze_at); - let proposed_open_layer_size = state.current_size + new_value_size; - - // Rolling the open layer can be triggered by: - // 1. The distance from the last LSN we rolled at. This bounds the amount of WAL that - // the safekeepers need to store. For sharded tenants, we multiply by shard count to - // account for how writes are distributed across shards: we expect each node to consume - // 1/count of the LSN on average. - // 2. The size of the currently open layer. - // 3. The time since the last roll. It helps safekeepers to regard pageserver as caught - // up and suspend activity. - if distance - >= self.get_checkpoint_distance() as i128 * self.shard_identity.count.count() as i128 - { - info!( - "Will roll layer at {} with layer size {} due to LSN distance ({})", - lsn, state.current_size, distance - ); - - OpenLayerAction::Roll - } else if state.current_size > 0 - && proposed_open_layer_size >= self.get_checkpoint_distance() - { - info!( - "Will roll layer at {} with layer size {} due to layer size ({})", - lsn, state.current_size, proposed_open_layer_size - ); - - OpenLayerAction::Roll - } else if distance > 0 - && state.cached_last_freeze_ts.elapsed() >= self.get_checkpoint_timeout() - { - info!( - "Will roll layer at {} with layer size {} due to time since last flush ({:?})", - lsn, - state.current_size, - state.cached_last_freeze_ts.elapsed() - ); - - OpenLayerAction::Roll - } else { - OpenLayerAction::None - } - } - - /// Put a batch keys at the specified Lsns. - /// - /// The batch should be sorted by Lsn such that it's safe - /// to roll the open layer mid batch. pub(crate) async fn put_batch( - &mut self, - batch: Vec<(Key, Lsn, Value)>, + &self, + batch: &HashMap>, ctx: &RequestContext, ) -> anyhow::Result<()> { - for (key, lsn, val) in batch { - self.put(key, lsn, &val, ctx).await? - } - - Ok(()) + self.tl.put_values(batch, ctx).await } - pub(crate) async fn delete_batch(&mut self, batch: &[(Range, Lsn)]) -> anyhow::Result<()> { - if let Some((_, lsn)) = batch.first() { - let action = self.get_open_layer_action(*lsn, 0); - let layer = self.handle_open_layer_action(*lsn, action).await?; - layer.put_tombstones(batch).await?; - } - - Ok(()) + pub(crate) async fn delete_batch(&self, batch: &[(Range, Lsn)]) -> anyhow::Result<()> { + self.tl.put_tombstones(batch).await } /// Track the end of the latest digested WAL record. diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 0333fcac67..9cb53f46d1 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -343,6 +343,23 @@ pub(super) async fn handle_walreceiver_connection( modification.commit(&ctx).await?; uncommitted_records = 0; filtered_records = 0; + + // + // We should check checkpoint distance after appending each ingest_batch_size bytes because otherwise + // layer size can become much larger than `checkpoint_distance`. + // It can append because wal-sender is sending WAL using 125kb chucks and some WAL records can cause writing large + // amount of data to key-value storage. So performing this check only after processing + // all WAL records in the chunk, can cause huge L0 layer files. + // + timeline + .check_checkpoint_distance() + .await + .with_context(|| { + format!( + "Failed to check checkpoint distance for timeline {}", + timeline.timeline_id + ) + })?; } } @@ -389,6 +406,16 @@ pub(super) async fn handle_walreceiver_connection( } } + timeline + .check_checkpoint_distance() + .await + .with_context(|| { + format!( + "Failed to check checkpoint distance for timeline {}", + timeline.timeline_id + ) + })?; + if let Some(last_lsn) = status_update { let timeline_remote_consistent_lsn = timeline .get_remote_consistent_lsn_visible() diff --git a/test_runner/fixtures/pageserver/allowed_errors.py b/test_runner/fixtures/pageserver/allowed_errors.py index 8ff4341cc0..74c6bddf23 100755 --- a/test_runner/fixtures/pageserver/allowed_errors.py +++ b/test_runner/fixtures/pageserver/allowed_errors.py @@ -82,11 +82,6 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = ( # During shutdown, DownloadError::Cancelled may be logged as an error. Cleaning this # up is tracked in https://github.com/neondatabase/neon/issues/6096 ".*Cancelled, shutting down.*", - # Open layers are only rolled at Lsn boundaries to avoid name clashses. - # Hence, we can overshoot the soft limit set by checkpoint distance. - # This is especially pronounced in tests that set small checkpoint - # distances. - ".*Flushed oversized open layer with size.*", )