diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 0ff03303d4..65f8ddaab4 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -15,6 +15,7 @@ use crate::walrecord::NeonWalRecord; use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; use enum_map::Enum; +use itertools::Itertools; use pageserver_api::key::{ dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key, @@ -1492,7 +1493,7 @@ impl<'a> DatadirModification<'a> { return Ok(()); } - let writer = self.tline.writer().await; + let mut writer = self.tline.writer().await; // Flush relation and SLRU data blocks, keep metadata. let mut retained_pending_updates = HashMap::<_, Vec<_>>::new(); @@ -1531,13 +1532,23 @@ impl<'a> DatadirModification<'a> { /// All the modifications in this atomic update are stamped by the specified LSN. /// pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> { - let writer = self.tline.writer().await; + let mut writer = self.tline.writer().await; let pending_nblocks = self.pending_nblocks; self.pending_nblocks = 0; if !self.pending_updates.is_empty() { - writer.put_batch(&self.pending_updates, ctx).await?; + let prev_pending_updates = std::mem::take(&mut self.pending_updates); + + // The put_batch call below expects expects the inputs to be sorted by Lsn, + // so we do that first. + let lsn_ordered_batch: Vec<(Key, Lsn, Value)> = prev_pending_updates + .into_iter() + .map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (key, lsn, val))) + .kmerge_by(|lhs, rhs| lhs.1 .0 < rhs.1 .0) + .collect(); + + writer.put_batch(lsn_ordered_batch, ctx).await?; self.pending_updates.clear(); } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index a4d3a4142a..c646e5cf90 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3890,7 +3890,7 @@ mod tests { .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx) .await?; - let writer = tline.writer().await; + let mut writer = tline.writer().await; writer .put( *TEST_KEY, @@ -3902,7 +3902,7 @@ mod tests { writer.finish_write(Lsn(0x10)); drop(writer); - let writer = tline.writer().await; + let mut writer = tline.writer().await; writer .put( *TEST_KEY, @@ -3968,7 +3968,7 @@ mod tests { let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) .await?; - let writer = tline.writer().await; + let mut writer = tline.writer().await; #[allow(non_snake_case)] let TEST_KEY_A: Key = Key::from_hex("110000000033333333444444445500000001").unwrap(); @@ -4002,7 +4002,7 @@ mod tests { let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); - let new_writer = newtline.writer().await; + let mut new_writer = newtline.writer().await; new_writer .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"), &ctx) .await?; @@ -4034,7 +4034,7 @@ mod tests { ) -> anyhow::Result<()> { let mut lsn = start_lsn; { - let writer = tline.writer().await; + let mut writer = tline.writer().await; // Create a relation on the timeline writer .put( @@ -4059,7 +4059,7 @@ mod tests { } tline.freeze_and_flush().await?; { - let writer = tline.writer().await; + let mut writer = tline.writer().await; writer .put( *TEST_KEY, @@ -4422,7 +4422,7 @@ mod tests { .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx) .await?; - let writer = tline.writer().await; + let mut writer = tline.writer().await; writer .put( *TEST_KEY, @@ -4439,7 +4439,7 @@ mod tests { .compact(&CancellationToken::new(), EnumSet::empty(), &ctx) .await?; - let writer = tline.writer().await; + let mut writer = tline.writer().await; writer .put( *TEST_KEY, @@ -4456,7 +4456,7 @@ mod tests { .compact(&CancellationToken::new(), EnumSet::empty(), &ctx) .await?; - let writer = tline.writer().await; + let mut writer = tline.writer().await; writer .put( *TEST_KEY, @@ -4473,7 +4473,7 @@ mod tests { .compact(&CancellationToken::new(), EnumSet::empty(), &ctx) .await?; - let writer = tline.writer().await; + let mut writer = tline.writer().await; writer .put( *TEST_KEY, @@ -4535,7 +4535,7 @@ mod tests { for _ in 0..50 { for _ in 0..10000 { test_key.field6 = blknum; - let writer = tline.writer().await; + let mut writer = tline.writer().await; writer .put( test_key, @@ -4597,7 +4597,7 @@ mod tests { for blknum in 0..NUM_KEYS { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; - let writer = tline.writer().await; + let mut writer = tline.writer().await; writer .put( test_key, @@ -4618,7 +4618,7 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; - let writer = tline.writer().await; + let mut writer = tline.writer().await; writer .put( test_key, @@ -4686,7 +4686,7 @@ mod tests { for blknum in 0..NUM_KEYS { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; - let writer = tline.writer().await; + let mut writer = tline.writer().await; writer .put( test_key, @@ -4715,7 +4715,7 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; - let writer = tline.writer().await; + let mut writer = tline.writer().await; writer .put( test_key, @@ -4792,7 +4792,7 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; - let writer = tline.writer().await; + let mut writer = tline.writer().await; writer .put( test_key, diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index c597b15533..4b06a787ce 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -246,32 +246,17 @@ impl InMemoryLayer { /// Common subroutine of the public put_wal_record() and put_page_image() functions. /// Adds the page version to the in-memory tree + pub(crate) async fn put_value( &self, key: Key, lsn: Lsn, - val: &Value, + buf: &[u8], ctx: &RequestContext, ) -> Result<()> { let mut inner = self.inner.write().await; self.assert_writable(); - self.put_value_locked(&mut inner, key, lsn, val, ctx).await - } - - pub(crate) async fn put_values( - &self, - values: &HashMap>, - ctx: &RequestContext, - ) -> Result<()> { - let mut inner = self.inner.write().await; - self.assert_writable(); - for (key, vals) in values { - for (lsn, val) in vals { - self.put_value_locked(&mut inner, *key, *lsn, val, ctx) - .await?; - } - } - Ok(()) + self.put_value_locked(&mut inner, key, lsn, buf, ctx).await } async fn put_value_locked( @@ -279,22 +264,16 @@ impl InMemoryLayer { locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>, key: Key, lsn: Lsn, - val: &Value, + buf: &[u8], ctx: &RequestContext, ) -> Result<()> { trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn); let off = { - // Avoid doing allocations for "small" values. - // In the regression test suite, the limit of 256 avoided allocations in 95% of cases: - // https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061 - let mut buf = smallvec::SmallVec::<[u8; 256]>::new(); - buf.clear(); - val.ser_into(&mut buf)?; locked_inner .file .write_blob( - &buf, + buf, &RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::InMemoryLayer) .build(), @@ -322,7 +301,12 @@ impl InMemoryLayer { pub async fn freeze(&self, end_lsn: Lsn) { let inner = self.inner.write().await; - assert!(self.start_lsn < end_lsn); + assert!( + self.start_lsn < end_lsn, + "{} >= {}", + self.start_lsn, + end_lsn + ); self.end_lsn.set(end_lsn).expect("end_lsn set only once"); for vec_map in inner.index.values() { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ec1dbddfc6..dcb00a1683 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -33,7 +33,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::*; -use utils::sync::gate::Gate; +use utils::{bin_ser::BeSer, sync::gate::Gate}; use std::ops::{Deref, Range}; use std::pin::pin; @@ -274,7 +274,7 @@ pub struct Timeline { /// Locked automatically by [`TimelineWriter`] and checkpointer. /// Must always be acquired before the layer map/individual layer lock /// to avoid deadlock. - write_lock: tokio::sync::Mutex<()>, + write_lock: tokio::sync::Mutex>, /// Used to avoid multiple `flush_loop` tasks running pub(super) flush_loop_state: Mutex, @@ -1051,53 +1051,10 @@ impl Timeline { pub(crate) async fn writer(&self) -> TimelineWriter<'_> { TimelineWriter { tl: self, - _write_guard: self.write_lock.lock().await, + write_guard: self.write_lock.lock().await, } } - /// Check if more than 'checkpoint_distance' of WAL has been accumulated in - /// the in-memory layer, and initiate flushing it if so. - /// - /// Also flush after a period of time without new data -- it helps - /// safekeepers to regard pageserver as caught up and suspend activity. - pub(crate) async fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { - let last_lsn = self.get_last_record_lsn(); - let open_layer_size = { - let guard = self.layers.read().await; - let layers = guard.layer_map(); - let Some(open_layer) = layers.open_layer.as_ref() else { - return Ok(()); - }; - open_layer.size().await? - }; - let last_freeze_at = self.last_freeze_at.load(); - let last_freeze_ts = *(self.last_freeze_ts.read().unwrap()); - let distance = last_lsn.widening_sub(last_freeze_at); - // Checkpointing the open layer can be triggered by layer size or LSN range. - // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and - // we want to stay below that with a big margin. The LSN distance determines how - // much WAL the safekeepers need to store. - if distance >= self.get_checkpoint_distance().into() - || open_layer_size > self.get_checkpoint_distance() - || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout()) - { - info!( - "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}", - distance, - open_layer_size, - last_freeze_ts.elapsed() - ); - - self.freeze_inmem_layer(true).await; - self.last_freeze_at.store(last_lsn); - *(self.last_freeze_ts.write().unwrap()) = Instant::now(); - - // Wake up the layer flusher - self.flush_frozen_layers(); - } - Ok(()) - } - pub(crate) fn activate( self: &Arc, broker_client: BrokerClientChannel, @@ -1529,7 +1486,7 @@ impl Timeline { layer_flush_start_tx, layer_flush_done_tx, - write_lock: tokio::sync::Mutex::new(()), + write_lock: tokio::sync::Mutex::new(None), gc_info: std::sync::RwLock::new(GcInfo { retain_lsns: Vec::new(), @@ -2702,43 +2659,6 @@ impl Timeline { Ok(layer) } - async fn put_value( - &self, - key: Key, - lsn: Lsn, - val: &Value, - ctx: &RequestContext, - ) -> anyhow::Result<()> { - //info!("PUT: key {} at {}", key, lsn); - let layer = self.get_layer_for_write(lsn).await?; - layer.put_value(key, lsn, val, ctx).await?; - Ok(()) - } - - async fn put_values( - &self, - values: &HashMap>, - ctx: &RequestContext, - ) -> anyhow::Result<()> { - // Pick the first LSN in the batch to get the layer to write to. - for lsns in values.values() { - if let Some((lsn, _)) = lsns.first() { - let layer = self.get_layer_for_write(*lsn).await?; - layer.put_values(values, ctx).await?; - break; - } - } - Ok(()) - } - - async fn put_tombstones(&self, tombstones: &[(Range, Lsn)]) -> anyhow::Result<()> { - if let Some((_, lsn)) = tombstones.first() { - let layer = self.get_layer_for_write(*lsn).await?; - layer.put_tombstones(tombstones).await?; - } - Ok(()) - } - pub(crate) fn finish_write(&self, new_lsn: Lsn) { assert!(new_lsn.is_aligned()); @@ -2749,14 +2669,20 @@ impl Timeline { async fn freeze_inmem_layer(&self, write_lock_held: bool) { // Freeze the current open in-memory layer. It will be written to disk on next // iteration. + let _write_guard = if write_lock_held { None } else { Some(self.write_lock.lock().await) }; + + self.freeze_inmem_layer_at(self.get_last_record_lsn()).await; + } + + async fn freeze_inmem_layer_at(&self, at: Lsn) { let mut guard = self.layers.write().await; guard - .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at) + .try_freeze_in_memory_layer(at, &self.last_freeze_at) .await; } @@ -4779,13 +4705,43 @@ fn layer_traversal_error(msg: String, path: Vec) -> PageRecon PageReconstructError::from(msg) } +struct TimelineWriterState { + open_layer: Arc, + current_size: u64, + // Previous Lsn which passed through + prev_lsn: Option, + // Largest Lsn which passed through the current writer + max_lsn: Option, + // Cached details of the last freeze. Avoids going trough the atomic/lock on every put. + cached_last_freeze_at: Lsn, + cached_last_freeze_ts: Instant, +} + +impl TimelineWriterState { + fn new( + open_layer: Arc, + current_size: u64, + last_freeze_at: Lsn, + last_freeze_ts: Instant, + ) -> Self { + Self { + open_layer, + current_size, + prev_lsn: None, + max_lsn: None, + cached_last_freeze_at: last_freeze_at, + cached_last_freeze_ts: last_freeze_ts, + } + } +} + /// Various functions to mutate the timeline. // TODO Currently, Deref is used to allow easy access to read methods from this trait. // This is probably considered a bad practice in Rust and should be fixed eventually, // but will cause large code changes. pub(crate) struct TimelineWriter<'a> { tl: &'a Timeline, - _write_guard: tokio::sync::MutexGuard<'a, ()>, + write_guard: tokio::sync::MutexGuard<'a, Option>, } impl Deref for TimelineWriter<'_> { @@ -4796,31 +4752,189 @@ impl Deref for TimelineWriter<'_> { } } +impl Drop for TimelineWriter<'_> { + fn drop(&mut self) { + self.write_guard.take(); + } +} + +enum OpenLayerAction { + Roll, + Open, + None, +} + impl<'a> TimelineWriter<'a> { /// Put a new page version that can be constructed from a WAL record /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. pub(crate) async fn put( - &self, + &mut self, key: Key, lsn: Lsn, value: &Value, ctx: &RequestContext, ) -> anyhow::Result<()> { - self.tl.put_value(key, lsn, value, ctx).await + // Avoid doing allocations for "small" values. + // In the regression test suite, the limit of 256 avoided allocations in 95% of cases: + // https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061 + let mut buf = smallvec::SmallVec::<[u8; 256]>::new(); + buf.clear(); + value.ser_into(&mut buf)?; + let buf_size: u64 = buf.len().try_into().expect("oversized value buf"); + + let action = self.get_open_layer_action(lsn, buf_size); + let layer = self.handle_open_layer_action(lsn, action).await?; + let res = layer.put_value(key, lsn, &buf, ctx).await; + + if res.is_ok() { + // Update the current size only when the entire write was ok. + // In case of failures, we may have had partial writes which + // render the size tracking out of sync. That's ok because + // the checkpoint distance should be significantly smaller + // than the S3 single shot upload limit of 5GiB. + let state = self.write_guard.as_mut().unwrap(); + + state.current_size += buf_size; + state.prev_lsn = Some(lsn); + state.max_lsn = std::cmp::max(state.max_lsn, Some(lsn)); + } + + res } + async fn handle_open_layer_action( + &mut self, + at: Lsn, + action: OpenLayerAction, + ) -> anyhow::Result<&Arc> { + match action { + OpenLayerAction::Roll => { + let max_lsn = self.write_guard.as_ref().unwrap().max_lsn.unwrap(); + self.tl.freeze_inmem_layer_at(max_lsn).await; + + let now = Instant::now(); + *(self.last_freeze_ts.write().unwrap()) = now; + + self.tl.flush_frozen_layers(); + + let current_size = self.write_guard.as_ref().unwrap().current_size; + if current_size > self.get_checkpoint_distance() { + warn!("Flushed oversized open layer with size {}", current_size) + } + + assert!(self.write_guard.is_some()); + + let layer = self.tl.get_layer_for_write(at).await?; + let initial_size = layer.size().await?; + self.write_guard.replace(TimelineWriterState::new( + layer, + initial_size, + Lsn(max_lsn.0 + 1), + now, + )); + } + OpenLayerAction::Open => { + assert!(self.write_guard.is_none()); + + let layer = self.tl.get_layer_for_write(at).await?; + let initial_size = layer.size().await?; + + let last_freeze_at = self.last_freeze_at.load(); + let last_freeze_ts = *self.last_freeze_ts.read().unwrap(); + self.write_guard.replace(TimelineWriterState::new( + layer, + initial_size, + last_freeze_at, + last_freeze_ts, + )); + } + OpenLayerAction::None => { + assert!(self.write_guard.is_some()); + } + } + + Ok(&self.write_guard.as_ref().unwrap().open_layer) + } + + fn get_open_layer_action(&self, lsn: Lsn, new_value_size: u64) -> OpenLayerAction { + let state = &*self.write_guard; + let Some(state) = &state else { + return OpenLayerAction::Open; + }; + + if state.prev_lsn == Some(lsn) { + // Rolling mid LSN is not supported by downstream code. + // Hence, only roll at LSN boundaries. + return OpenLayerAction::None; + } + + let distance = lsn.widening_sub(state.cached_last_freeze_at); + let proposed_open_layer_size = state.current_size + new_value_size; + + // Rolling the open layer can be triggered by: + // 1. The distance from the last LSN we rolled at. This bounds the amount of WAL that + // the safekeepers need to store. + // 2. The size of the currently open layer. + // 3. The time since the last roll. It helps safekeepers to regard pageserver as caught + // up and suspend activity. + if distance >= self.get_checkpoint_distance().into() { + info!( + "Will roll layer at {} with layer size {} due to LSN distance ({})", + lsn, state.current_size, distance + ); + + OpenLayerAction::Roll + } else if state.current_size > 0 + && proposed_open_layer_size >= self.get_checkpoint_distance() + { + info!( + "Will roll layer at {} with layer size {} due to layer size ({})", + lsn, state.current_size, proposed_open_layer_size + ); + + OpenLayerAction::Roll + } else if distance > 0 + && state.cached_last_freeze_ts.elapsed() >= self.get_checkpoint_timeout() + { + info!( + "Will roll layer at {} with layer size {} due to time since last flush ({:?})", + lsn, + state.current_size, + state.cached_last_freeze_ts.elapsed() + ); + + OpenLayerAction::Roll + } else { + OpenLayerAction::None + } + } + + /// Put a batch keys at the specified Lsns. + /// + /// The batch should be sorted by Lsn such that it's safe + /// to roll the open layer mid batch. pub(crate) async fn put_batch( - &self, - batch: &HashMap>, + &mut self, + batch: Vec<(Key, Lsn, Value)>, ctx: &RequestContext, ) -> anyhow::Result<()> { - self.tl.put_values(batch, ctx).await + for (key, lsn, val) in batch { + self.put(key, lsn, &val, ctx).await? + } + + Ok(()) } - pub(crate) async fn delete_batch(&self, batch: &[(Range, Lsn)]) -> anyhow::Result<()> { - self.tl.put_tombstones(batch).await + pub(crate) async fn delete_batch(&mut self, batch: &[(Range, Lsn)]) -> anyhow::Result<()> { + if let Some((_, lsn)) = batch.first() { + let action = self.get_open_layer_action(*lsn, 0); + let layer = self.handle_open_layer_action(*lsn, action).await?; + layer.put_tombstones(batch).await?; + } + + Ok(()) } /// Track the end of the latest digested WAL record. diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 9cb53f46d1..0333fcac67 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -343,23 +343,6 @@ pub(super) async fn handle_walreceiver_connection( modification.commit(&ctx).await?; uncommitted_records = 0; filtered_records = 0; - - // - // We should check checkpoint distance after appending each ingest_batch_size bytes because otherwise - // layer size can become much larger than `checkpoint_distance`. - // It can append because wal-sender is sending WAL using 125kb chucks and some WAL records can cause writing large - // amount of data to key-value storage. So performing this check only after processing - // all WAL records in the chunk, can cause huge L0 layer files. - // - timeline - .check_checkpoint_distance() - .await - .with_context(|| { - format!( - "Failed to check checkpoint distance for timeline {}", - timeline.timeline_id - ) - })?; } } @@ -406,16 +389,6 @@ pub(super) async fn handle_walreceiver_connection( } } - timeline - .check_checkpoint_distance() - .await - .with_context(|| { - format!( - "Failed to check checkpoint distance for timeline {}", - timeline.timeline_id - ) - })?; - if let Some(last_lsn) = status_update { let timeline_remote_consistent_lsn = timeline .get_remote_consistent_lsn_visible() diff --git a/test_runner/fixtures/pageserver/allowed_errors.py b/test_runner/fixtures/pageserver/allowed_errors.py index 74c6bddf23..8ff4341cc0 100755 --- a/test_runner/fixtures/pageserver/allowed_errors.py +++ b/test_runner/fixtures/pageserver/allowed_errors.py @@ -82,6 +82,11 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = ( # During shutdown, DownloadError::Cancelled may be logged as an error. Cleaning this # up is tracked in https://github.com/neondatabase/neon/issues/6096 ".*Cancelled, shutting down.*", + # Open layers are only rolled at Lsn boundaries to avoid name clashses. + # Hence, we can overshoot the soft limit set by checkpoint distance. + # This is especially pronounced in tests that set small checkpoint + # distances. + ".*Flushed oversized open layer with size.*", )