From 7c74112b2a6e23c07bfd9cc62c240cd6bbdd3bd9 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 22 Aug 2024 11:04:42 +0100 Subject: [PATCH] pageserver: batch InMemoryLayer `put`s, remove need to sort items by LSN during ingest (#8591) ## Problem/Solution TimelineWriter::put_batch is simply a loop over individual puts. Each put acquires and releases locks, and checks for potentially starting a new layer. Batching these is more efficient, but more importantly unlocks future changes where we can pre-build serialized buffers much earlier in the ingest process, potentially even on the safekeeper (imagine a future model where some variant of DatadirModification lives on the safekeeper). Ensuring that the values in put_batch are written to one layer also enables a simplification upstream, where we no longer need to write values in LSN-order. This saves us a sort, but also simplifies follow-on refactors to DatadirModification: we can store metadata keys and data keys separately at that level without needing to zip them together in LSN order later. ## Why? In this PR, these changes are simplify optimizations, but they are motivated by evolving the ingest path in the direction of disentangling extracting DatadirModification from Timeline. It may not obvious how right now, but the general idea is that we'll end up with three phases of ingest: - A) Decode walrecords and build a datadirmodification with all the simple data contents already in a big serialized buffer ready to write to an ephemeral layer **<-- this part can be pipelined and parallelized, and done on a safekeeper!** - B) Let that datadirmodification see a Timeline, so that it can also generate all the metadata updates that require a read-modify-write of existing pages - C) Dump the results of B into an ephemeral layer. Related: https://github.com/neondatabase/neon/issues/8452 ## Caveats Doing a big monolithic buffer of values to write to disk is ordinarily an anti-pattern: we prefer nice streaming I/O. However: - In future, when we do this first decode stage on the safekeeper, it would be inefficient to serialize a Vec of Value, and then later deserialize it just to add blob size headers while writing into the ephemeral layer format. The idea is that for bulk write data, we will serialize exactly once. - The monolithic buffer is a stepping stone to pipelining more of this: by seriailizing earlier (rather than at the final put_value), we will be able to parallelize the wal decoding and bulk serialization of data page writes. - The ephemeral layer's buffered writer already stalls writes while it waits to flush: so while yes we'll stall for a couple milliseconds to write a couple megabytes, we already have stalls like this, just distributed across smaller writes. ## Benchmarks This PR is primarily a stepping stone to safekeeper ingest filtering, but also provides a modest efficiency improvement to the `wal_recovery` part of `test_bulk_ingest`. test_bulk_ingest: ``` test_bulk_insert[neon-release-pg16].insert: 23.659 s test_bulk_insert[neon-release-pg16].pageserver_writes: 5,428 MB test_bulk_insert[neon-release-pg16].peak_mem: 626 MB test_bulk_insert[neon-release-pg16].size: 0 MB test_bulk_insert[neon-release-pg16].data_uploaded: 1,922 MB test_bulk_insert[neon-release-pg16].num_files_uploaded: 8 test_bulk_insert[neon-release-pg16].wal_written: 1,382 MB test_bulk_insert[neon-release-pg16].wal_recovery: 18.981 s test_bulk_insert[neon-release-pg16].compaction: 0.055 s vs. tip of main: test_bulk_insert[neon-release-pg16].insert: 24.001 s test_bulk_insert[neon-release-pg16].pageserver_writes: 5,428 MB test_bulk_insert[neon-release-pg16].peak_mem: 604 MB test_bulk_insert[neon-release-pg16].size: 0 MB test_bulk_insert[neon-release-pg16].data_uploaded: 1,922 MB test_bulk_insert[neon-release-pg16].num_files_uploaded: 8 test_bulk_insert[neon-release-pg16].wal_written: 1,382 MB test_bulk_insert[neon-release-pg16].wal_recovery: 23.586 s test_bulk_insert[neon-release-pg16].compaction: 0.054 s ``` --- pageserver/benches/bench_ingest.rs | 19 ++- pageserver/src/pgdatadir_mapping.rs | 70 +++++++--- pageserver/src/tenant/ephemeral_file.rs | 35 +++-- pageserver/src/tenant/storage_layer.rs | 2 +- .../tenant/storage_layer/inmemory_layer.rs | 131 +++++++++++++----- pageserver/src/tenant/timeline.rs | 101 +++++++------- .../walreceiver/walreceiver_connection.rs | 9 +- 7 files changed, 247 insertions(+), 120 deletions(-) diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index 0336302de0..bd99f5289d 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -10,6 +10,7 @@ use pageserver::{ page_cache, repository::Value, task_mgr::TaskKind, + tenant::storage_layer::inmemory_layer::SerializedBatch, tenant::storage_layer::InMemoryLayer, virtual_file, }; @@ -67,12 +68,16 @@ async fn ingest( let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, entered, &ctx).await?; - let data = Value::Image(Bytes::from(vec![0u8; put_size])).ser()?; + let data = Value::Image(Bytes::from(vec![0u8; put_size])); + let data_ser_size = data.serialized_size().unwrap() as usize; let ctx = RequestContext::new( pageserver::task_mgr::TaskKind::WalReceiverConnectionHandler, pageserver::context::DownloadBehavior::Download, ); + const BATCH_SIZE: usize = 16; + let mut batch = Vec::new(); + for i in 0..put_count { lsn += put_size as u64; @@ -95,7 +100,17 @@ async fn ingest( } } - layer.put_value(key.to_compact(), lsn, &data, &ctx).await?; + batch.push((key.to_compact(), lsn, data_ser_size, data.clone())); + if batch.len() >= BATCH_SIZE { + let this_batch = std::mem::take(&mut batch); + let serialized = SerializedBatch::from_values(this_batch); + layer.put_batch(serialized, &ctx).await?; + } + } + if !batch.is_empty() { + let this_batch = std::mem::take(&mut batch); + let serialized = SerializedBatch::from_values(this_batch); + layer.put_batch(serialized, &ctx).await?; } layer.freeze(lsn + 1).await; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 4f7eb1a00c..d6e0b82e1d 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -15,12 +15,11 @@ use crate::{aux_file, repository::*}; use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; use enum_map::Enum; -use itertools::Itertools; use pageserver_api::key::{ dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key, slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range, - AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, + CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, }; use pageserver_api::keyspace::SparseKeySpace; use pageserver_api::models::AuxFilePolicy; @@ -37,7 +36,6 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, info, trace, warn}; use utils::bin_ser::DeserializeError; use utils::pausable_failpoint; -use utils::vec_map::{VecMap, VecMapOrdering}; use utils::{bin_ser::BeSer, lsn::Lsn}; /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached. @@ -174,6 +172,7 @@ impl Timeline { pending_deletions: Vec::new(), pending_nblocks: 0, pending_directory_entries: Vec::new(), + pending_bytes: 0, lsn, } } @@ -1022,21 +1021,33 @@ pub struct DatadirModification<'a> { // The put-functions add the modifications here, and they are flushed to the // underlying key-value store by the 'finish' function. pending_lsns: Vec, - pending_updates: HashMap>, + pending_updates: HashMap>, pending_deletions: Vec<(Range, Lsn)>, pending_nblocks: i64, /// For special "directory" keys that store key-value maps, track the size of the map /// if it was updated in this modification. pending_directory_entries: Vec<(DirectoryKind, usize)>, + + /// An **approximation** of how large our EphemeralFile write will be when committed. + pending_bytes: usize, } impl<'a> DatadirModification<'a> { + // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can + // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we + // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed. + pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024; + /// Get the current lsn pub(crate) fn get_lsn(&self) -> Lsn { self.lsn } + pub(crate) fn approx_pending_bytes(&self) -> usize { + self.pending_bytes + } + /// Set the current lsn pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> { ensure!( @@ -1769,21 +1780,25 @@ impl<'a> DatadirModification<'a> { // Flush relation and SLRU data blocks, keep metadata. let mut retained_pending_updates = HashMap::<_, Vec<_>>::new(); for (key, values) in self.pending_updates.drain() { - for (lsn, value) in values { + let mut write_batch = Vec::new(); + for (lsn, value_ser_size, value) in values { if key.is_rel_block_key() || key.is_slru_block_key() { // This bails out on first error without modifying pending_updates. // That's Ok, cf this function's doc comment. - writer.put(key, lsn, &value, ctx).await?; + write_batch.push((key.to_compact(), lsn, value_ser_size, value)); } else { - retained_pending_updates - .entry(key) - .or_default() - .push((lsn, value)); + retained_pending_updates.entry(key).or_default().push(( + lsn, + value_ser_size, + value, + )); } } + writer.put_batch(write_batch, ctx).await?; } self.pending_updates = retained_pending_updates; + self.pending_bytes = 0; if pending_nblocks != 0 { writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); @@ -1809,17 +1824,20 @@ impl<'a> DatadirModification<'a> { self.pending_nblocks = 0; if !self.pending_updates.is_empty() { - // The put_batch call below expects expects the inputs to be sorted by Lsn, - // so we do that first. - let lsn_ordered_batch: VecMap = VecMap::from_iter( - self.pending_updates - .drain() - .map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (lsn, (key, val)))) - .kmerge_by(|lhs, rhs| lhs.0 < rhs.0), - VecMapOrdering::GreaterOrEqual, - ); + // Ordering: the items in this batch do not need to be in any global order, but values for + // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on + // this to do efficient updates to its index. + let batch: Vec<(CompactKey, Lsn, usize, Value)> = self + .pending_updates + .drain() + .flat_map(|(key, values)| { + values.into_iter().map(move |(lsn, val_ser_size, value)| { + (key.to_compact(), lsn, val_ser_size, value) + }) + }) + .collect::>(); - writer.put_batch(lsn_ordered_batch, ctx).await?; + writer.put_batch(batch, ctx).await?; } if !self.pending_deletions.is_empty() { @@ -1844,6 +1862,8 @@ impl<'a> DatadirModification<'a> { writer.update_directory_entries_count(kind, count as u64); } + self.pending_bytes = 0; + Ok(()) } @@ -1860,7 +1880,7 @@ impl<'a> DatadirModification<'a> { // Note: we don't check pending_deletions. It is an error to request a // value that has been removed, deletion only avoids leaking storage. if let Some(values) = self.pending_updates.get(&key) { - if let Some((_, value)) = values.last() { + if let Some((_, _, value)) = values.last() { return if let Value::Image(img) = value { Ok(img.clone()) } else { @@ -1888,13 +1908,17 @@ impl<'a> DatadirModification<'a> { fn put(&mut self, key: Key, val: Value) { let values = self.pending_updates.entry(key).or_default(); // Replace the previous value if it exists at the same lsn - if let Some((last_lsn, last_value)) = values.last_mut() { + if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() { if *last_lsn == self.lsn { + *last_value_ser_size = val.serialized_size().unwrap() as usize; *last_value = val; return; } } - values.push((self.lsn, val)); + + let val_serialized_size = val.serialized_size().unwrap() as usize; + self.pending_bytes += val_serialized_size; + values.push((self.lsn, val_serialized_size, val)); } fn delete(&mut self, key_range: Range) { diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 3eb8384d05..44f0fc7ab1 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -79,6 +79,8 @@ impl EphemeralFile { self.rw.read_blk(blknum, ctx).await } + #[cfg(test)] + // This is a test helper: outside of tests, we are always written to via a pre-serialized batch. pub(crate) async fn write_blob( &mut self, srcbuf: &[u8], @@ -86,17 +88,30 @@ impl EphemeralFile { ) -> Result { let pos = self.rw.bytes_written(); - // Write the length field - if srcbuf.len() < 0x80 { - // short one-byte length header - let len_buf = [srcbuf.len() as u8]; + let mut len_bytes = std::io::Cursor::new(Vec::new()); + crate::tenant::storage_layer::inmemory_layer::SerializedBatch::write_blob_length( + srcbuf.len(), + &mut len_bytes, + ); + let len_bytes = len_bytes.into_inner(); - self.rw.write_all_borrowed(&len_buf, ctx).await?; - } else { - let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32); - len_buf[0] |= 0x80; - self.rw.write_all_borrowed(&len_buf, ctx).await?; - } + // Write the length field + self.rw.write_all_borrowed(&len_bytes, ctx).await?; + + // Write the payload + self.rw.write_all_borrowed(srcbuf, ctx).await?; + + Ok(pos) + } + + /// Returns the offset at which the first byte of the input was written, for use + /// in constructing indices over the written value. + pub(crate) async fn write_raw( + &mut self, + srcbuf: &[u8], + ctx: &RequestContext, + ) -> Result { + let pos = self.rw.bytes_written(); // Write the payload self.rw.write_all_borrowed(srcbuf, ctx).await?; diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 04f89db401..133b34b8b5 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -2,7 +2,7 @@ pub mod delta_layer; pub mod image_layer; -pub(crate) mod inmemory_layer; +pub mod inmemory_layer; pub(crate) mod layer; mod layer_desc; mod layer_name; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 130d1002a0..a71b4dd83b 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -33,7 +33,7 @@ use std::fmt::Write; use std::ops::Range; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::atomic::{AtomicU64, AtomicUsize}; -use tokio::sync::{RwLock, RwLockWriteGuard}; +use tokio::sync::RwLock; use super::{ DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState, @@ -320,6 +320,82 @@ impl InMemoryLayer { } } +/// Offset of a particular Value within a serialized batch. +struct SerializedBatchOffset { + key: CompactKey, + lsn: Lsn, + /// offset in bytes from the start of the batch's buffer to the Value's serialized size header. + offset: u64, +} + +pub struct SerializedBatch { + /// Blobs serialized in EphemeralFile's native format, ready for passing to [`EphemeralFile::write_raw`]. + pub(crate) raw: Vec, + + /// Index of values in [`Self::raw`], using offsets relative to the start of the buffer. + offsets: Vec, + + /// The highest LSN of any value in the batch + pub(crate) max_lsn: Lsn, +} + +impl SerializedBatch { + /// Write a blob length in the internal format of the EphemeralFile + pub(crate) fn write_blob_length(len: usize, cursor: &mut std::io::Cursor>) { + use std::io::Write; + + if len < 0x80 { + // short one-byte length header + let len_buf = [len as u8]; + + cursor + .write_all(&len_buf) + .expect("Writing to Vec is infallible"); + } else { + let mut len_buf = u32::to_be_bytes(len as u32); + len_buf[0] |= 0x80; + cursor + .write_all(&len_buf) + .expect("Writing to Vec is infallible"); + } + } + + pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> Self { + // Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by + // [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`] + let buffer_size = batch.iter().map(|i| i.2).sum::() + 4 * batch.len(); + let mut cursor = std::io::Cursor::new(Vec::::with_capacity(buffer_size)); + + let mut offsets: Vec = Vec::with_capacity(batch.len()); + let mut max_lsn: Lsn = Lsn(0); + for (key, lsn, val_ser_size, val) in batch { + let relative_off = cursor.position(); + + Self::write_blob_length(val_ser_size, &mut cursor); + val.ser_into(&mut cursor) + .expect("Writing into in-memory buffer is infallible"); + + offsets.push(SerializedBatchOffset { + key, + lsn, + offset: relative_off, + }); + max_lsn = std::cmp::max(max_lsn, lsn); + } + + let buffer = cursor.into_inner(); + + // Assert that we didn't do any extra allocations while building buffer. + debug_assert!(buffer.len() <= buffer_size); + + Self { + raw: buffer, + offsets, + max_lsn, + } + } +} + fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result { write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0) } @@ -380,37 +456,20 @@ impl InMemoryLayer { }) } - // Write operations - - /// Common subroutine of the public put_wal_record() and put_page_image() functions. - /// Adds the page version to the in-memory tree - pub async fn put_value( + // Write path. + pub async fn put_batch( &self, - key: CompactKey, - lsn: Lsn, - buf: &[u8], + serialized_batch: SerializedBatch, ctx: &RequestContext, ) -> Result<()> { let mut inner = self.inner.write().await; self.assert_writable(); - self.put_value_locked(&mut inner, key, lsn, buf, ctx).await - } - async fn put_value_locked( - &self, - locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>, - key: CompactKey, - lsn: Lsn, - buf: &[u8], - ctx: &RequestContext, - ) -> Result<()> { - trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn); - - let off = { - locked_inner + let base_off = { + inner .file - .write_blob( - buf, + .write_raw( + &serialized_batch.raw, &RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::InMemoryLayer) .build(), @@ -418,15 +477,23 @@ impl InMemoryLayer { .await? }; - let vec_map = locked_inner.index.entry(key).or_default(); - let old = vec_map.append_or_update_last(lsn, off).unwrap().0; - if old.is_some() { - // We already had an entry for this LSN. That's odd.. - warn!("Key {} at {} already exists", key, lsn); + for SerializedBatchOffset { + key, + lsn, + offset: relative_off, + } in serialized_batch.offsets + { + let off = base_off + relative_off; + let vec_map = inner.index.entry(key).or_default(); + let old = vec_map.append_or_update_last(lsn, off).unwrap().0; + if old.is_some() { + // We already had an entry for this LSN. That's odd.. + warn!("Key {} at {} already exists", key, lsn); + } } - let size = locked_inner.file.len(); - locked_inner.resource_units.maybe_publish_size(size); + let size = inner.file.len(); + inner.resource_units.maybe_publish_size(size); Ok(()) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 80e3843021..e90f65942f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -22,8 +22,8 @@ use handle::ShardTimelineId; use once_cell::sync::Lazy; use pageserver_api::{ key::{ - KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE, - NON_INHERITED_SPARSE_RANGE, + CompactKey, KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, + NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE, }, keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning}, models::{ @@ -44,10 +44,8 @@ use tokio::{ use tokio_util::sync::CancellationToken; use tracing::*; use utils::{ - bin_ser::BeSer, fs_ext, pausable_failpoint, sync::gate::{Gate, GateGuard}, - vec_map::VecMap, }; use std::pin::pin; @@ -137,7 +135,10 @@ use self::layer_manager::LayerManager; use self::logical_size::LogicalSize; use self::walreceiver::{WalReceiver, WalReceiverConf}; -use super::{config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized}; +use super::{ + config::TenantConf, storage_layer::inmemory_layer, storage_layer::LayerVisibilityHint, + upload_queue::NotInitialized, +}; use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf}; use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe}; use super::{ @@ -5574,44 +5575,6 @@ enum OpenLayerAction { } impl<'a> TimelineWriter<'a> { - /// Put a new page version that can be constructed from a WAL record - /// - /// This will implicitly extend the relation, if the page is beyond the - /// current end-of-file. - pub(crate) async fn put( - &mut self, - key: Key, - lsn: Lsn, - value: &Value, - ctx: &RequestContext, - ) -> anyhow::Result<()> { - // Avoid doing allocations for "small" values. - // In the regression test suite, the limit of 256 avoided allocations in 95% of cases: - // https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061 - let mut buf = smallvec::SmallVec::<[u8; 256]>::new(); - value.ser_into(&mut buf)?; - let buf_size: u64 = buf.len().try_into().expect("oversized value buf"); - - let action = self.get_open_layer_action(lsn, buf_size); - let layer = self.handle_open_layer_action(lsn, action, ctx).await?; - let res = layer.put_value(key.to_compact(), lsn, &buf, ctx).await; - - if res.is_ok() { - // Update the current size only when the entire write was ok. - // In case of failures, we may have had partial writes which - // render the size tracking out of sync. That's ok because - // the checkpoint distance should be significantly smaller - // than the S3 single shot upload limit of 5GiB. - let state = self.write_guard.as_mut().unwrap(); - - state.current_size += buf_size; - state.prev_lsn = Some(lsn); - state.max_lsn = std::cmp::max(state.max_lsn, Some(lsn)); - } - - res - } - async fn handle_open_layer_action( &mut self, at: Lsn, @@ -5717,18 +5680,58 @@ impl<'a> TimelineWriter<'a> { } /// Put a batch of keys at the specified Lsns. - /// - /// The batch is sorted by Lsn (enforced by usage of [`utils::vec_map::VecMap`]. pub(crate) async fn put_batch( &mut self, - batch: VecMap, + batch: Vec<(CompactKey, Lsn, usize, Value)>, ctx: &RequestContext, ) -> anyhow::Result<()> { - for (lsn, (key, val)) in batch { - self.put(key, lsn, &val, ctx).await? + if batch.is_empty() { + return Ok(()); } - Ok(()) + let serialized_batch = inmemory_layer::SerializedBatch::from_values(batch); + let batch_max_lsn = serialized_batch.max_lsn; + let buf_size: u64 = serialized_batch.raw.len() as u64; + + let action = self.get_open_layer_action(batch_max_lsn, buf_size); + let layer = self + .handle_open_layer_action(batch_max_lsn, action, ctx) + .await?; + + let res = layer.put_batch(serialized_batch, ctx).await; + + if res.is_ok() { + // Update the current size only when the entire write was ok. + // In case of failures, we may have had partial writes which + // render the size tracking out of sync. That's ok because + // the checkpoint distance should be significantly smaller + // than the S3 single shot upload limit of 5GiB. + let state = self.write_guard.as_mut().unwrap(); + + state.current_size += buf_size; + state.prev_lsn = Some(batch_max_lsn); + state.max_lsn = std::cmp::max(state.max_lsn, Some(batch_max_lsn)); + } + + res + } + + #[cfg(test)] + /// Test helper, for tests that would like to poke individual values without composing a batch + pub(crate) async fn put( + &mut self, + key: Key, + lsn: Lsn, + value: &Value, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + use utils::bin_ser::BeSer; + let val_ser_size = value.serialized_size().unwrap() as usize; + self.put_batch( + vec![(key.to_compact(), lsn, val_ser_size, value.clone())], + ctx, + ) + .await } pub(crate) async fn delete_batch( diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index b5c577af72..0114473eda 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -27,8 +27,8 @@ use super::TaskStateUpdate; use crate::{ context::RequestContext, metrics::{LIVE_CONNECTIONS, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST}, - task_mgr::TaskKind, - task_mgr::WALRECEIVER_RUNTIME, + pgdatadir_mapping::DatadirModification, + task_mgr::{TaskKind, WALRECEIVER_RUNTIME}, tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo}, walingest::WalIngest, walrecord::DecodedWALRecord, @@ -345,7 +345,10 @@ pub(super) async fn handle_walreceiver_connection( // Commit every ingest_batch_size records. Even if we filtered out // all records, we still need to call commit to advance the LSN. uncommitted_records += 1; - if uncommitted_records >= ingest_batch_size { + if uncommitted_records >= ingest_batch_size + || modification.approx_pending_bytes() + > DatadirModification::MAX_PENDING_BYTES + { WAL_INGEST .records_committed .inc_by(uncommitted_records - filtered_records);