From 06428e856e6ed37b259d63d8985fec2a647db269 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 5 Aug 2024 14:30:00 +0000 Subject: [PATCH] Remove unused singular puts --- pageserver/src/pgdatadir_mapping.rs | 4 +- pageserver/src/tenant/ephemeral_file.rs | 1 + .../tenant/storage_layer/inmemory_layer.rs | 53 +------------------ pageserver/src/tenant/timeline.rs | 52 +++++------------- 4 files changed, 18 insertions(+), 92 deletions(-) diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 5e3fa7ecb2..e329c9e864 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1791,11 +1791,12 @@ impl<'a> DatadirModification<'a> { // Flush relation and SLRU data blocks, keep metadata. let mut retained_pending_updates = HashMap::<_, Vec<_>>::new(); for (key, values) in self.pending_updates.drain() { + let mut write_batch = Vec::new(); for (lsn, value) in values { if key.is_rel_block_key() || key.is_slru_block_key() { // This bails out on first error without modifying pending_updates. // That's Ok, cf this function's doc comment. - writer.put(key, lsn, &value, ctx).await?; + write_batch.push((key, lsn, value)); } else { retained_pending_updates .entry(key) @@ -1803,6 +1804,7 @@ impl<'a> DatadirModification<'a> { .push((lsn, value)); } } + writer.put_batch(write_batch, ctx).await?; } self.pending_updates = retained_pending_updates; diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index a727c148e0..20c8493d7d 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -79,6 +79,7 @@ impl EphemeralFile { self.rw.read_blk(blknum, ctx).await } + #[cfg(test)] pub(crate) async fn write_blob( &mut self, srcbuf: &[u8], diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index a600ed1251..5d2213c260 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -32,7 +32,7 @@ use std::fmt::Write; use std::ops::Range; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::atomic::{AtomicU64, AtomicUsize}; -use tokio::sync::{RwLock, RwLockWriteGuard}; +use tokio::sync::RwLock; use super::{ DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValueReconstructState, @@ -479,22 +479,8 @@ impl InMemoryLayer { }) } - // Write operations - - /// Common subroutine of the public put_wal_record() and put_page_image() functions. - /// Adds the page version to the in-memory tree - pub async fn put_value( - &self, - key: Key, - lsn: Lsn, - buf: &[u8], - ctx: &RequestContext, - ) -> Result<()> { - let mut inner = self.inner.write().await; - self.assert_writable(); - self.put_value_locked(&mut inner, key, lsn, buf, ctx).await - } + // Write path. pub(crate) async fn put_batch( &self, buf: Vec, @@ -532,41 +518,6 @@ impl InMemoryLayer { Ok(()) } - async fn put_value_locked( - &self, - locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>, - key: Key, - lsn: Lsn, - buf: &[u8], - ctx: &RequestContext, - ) -> Result<()> { - trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn); - - let off = { - locked_inner - .file - .write_blob( - buf, - &RequestContextBuilder::extend(ctx) - .page_content_kind(PageContentKind::InMemoryLayer) - .build(), - ) - .await? - }; - - let vec_map = locked_inner.index.entry(key).or_default(); - let old = vec_map.append_or_update_last(lsn, off).unwrap().0; - if old.is_some() { - // We already had an entry for this LSN. That's odd.. - warn!("Key {} at {} already exists", key, lsn); - } - - let size = locked_inner.file.len(); - locked_inner.resource_units.maybe_publish_size(size); - - Ok(()) - } - pub(crate) fn get_opened_at(&self) -> Instant { self.opened_at } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 9c81154d9b..bbf320414f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5931,44 +5931,6 @@ enum OpenLayerAction { } impl<'a> TimelineWriter<'a> { - /// Put a new page version that can be constructed from a WAL record - /// - /// This will implicitly extend the relation, if the page is beyond the - /// current end-of-file. - pub(crate) async fn put( - &mut self, - key: Key, - lsn: Lsn, - value: &Value, - ctx: &RequestContext, - ) -> anyhow::Result<()> { - // Avoid doing allocations for "small" values. - // In the regression test suite, the limit of 256 avoided allocations in 95% of cases: - // https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061 - let mut buf = smallvec::SmallVec::<[u8; 256]>::new(); - value.ser_into(&mut buf)?; - let buf_size: u64 = buf.len().try_into().expect("oversized value buf"); - - let action = self.get_open_layer_action(lsn, buf_size); - let layer = self.handle_open_layer_action(lsn, action, ctx).await?; - let res = layer.put_value(key, lsn, &buf, ctx).await; - - if res.is_ok() { - // Update the current size only when the entire write was ok. - // In case of failures, we may have had partial writes which - // render the size tracking out of sync. That's ok because - // the checkpoint distance should be significantly smaller - // than the S3 single shot upload limit of 5GiB. - let state = self.write_guard.as_mut().unwrap(); - - state.current_size += buf_size; - state.prev_lsn = Some(lsn); - state.max_lsn = std::cmp::max(state.max_lsn, Some(lsn)); - } - - res - } - async fn handle_open_layer_action( &mut self, at: Lsn, @@ -6071,8 +6033,6 @@ impl<'a> TimelineWriter<'a> { } /// Put a batch of keys at the specified Lsns. - /// - /// The batch is sorted by Lsn (enforced by usage of [`utils::vec_map::VecMap`]. pub(crate) async fn put_batch( &mut self, batch: Vec<(Key, Lsn, Value)>, @@ -6137,6 +6097,18 @@ impl<'a> TimelineWriter<'a> { res } + #[cfg(test)] + /// Test helper, for tests that would like to poke individual values without composing a batch + pub(crate) async fn put( + &mut self, + key: Key, + lsn: Lsn, + value: &Value, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + self.put_batch(vec![(key, lsn, value.clone())], ctx).await + } + pub(crate) async fn delete_batch( &mut self, batch: &[(Range, Lsn)],