diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index f2e075ba1d..bd99f5289d 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -10,6 +10,7 @@ use pageserver::{ page_cache, repository::Value, task_mgr::TaskKind, + tenant::storage_layer::inmemory_layer::SerializedBatch, tenant::storage_layer::InMemoryLayer, virtual_file, }; @@ -67,13 +68,16 @@ async fn ingest( let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, entered, &ctx).await?; - let value = Value::Image(Bytes::from(vec![0u8; put_size])); - let data = value.ser()?; + let data = Value::Image(Bytes::from(vec![0u8; put_size])); + let data_ser_size = data.serialized_size().unwrap() as usize; let ctx = RequestContext::new( pageserver::task_mgr::TaskKind::WalReceiverConnectionHandler, pageserver::context::DownloadBehavior::Download, ); + const BATCH_SIZE: usize = 16; + let mut batch = Vec::new(); + for i in 0..put_count { lsn += put_size as u64; @@ -96,9 +100,17 @@ async fn ingest( } } - layer - .put_value(key.to_compact(), lsn, &data, value.will_init(), &ctx) - .await?; + batch.push((key.to_compact(), lsn, data_ser_size, data.clone())); + if batch.len() >= BATCH_SIZE { + let this_batch = std::mem::take(&mut batch); + let serialized = SerializedBatch::from_values(this_batch); + layer.put_batch(serialized, &ctx).await?; + } + } + if !batch.is_empty() { + let this_batch = std::mem::take(&mut batch); + let serialized = SerializedBatch::from_values(this_batch); + layer.put_batch(serialized, &ctx).await?; } layer.freeze(lsn + 1).await; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 4f7eb1a00c..d6e0b82e1d 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -15,12 +15,11 @@ use crate::{aux_file, repository::*}; use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; use enum_map::Enum; -use itertools::Itertools; use pageserver_api::key::{ dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key, slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range, - AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, + CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, }; use pageserver_api::keyspace::SparseKeySpace; use pageserver_api::models::AuxFilePolicy; @@ -37,7 +36,6 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, info, trace, warn}; use utils::bin_ser::DeserializeError; use utils::pausable_failpoint; -use utils::vec_map::{VecMap, VecMapOrdering}; use utils::{bin_ser::BeSer, lsn::Lsn}; /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached. @@ -174,6 +172,7 @@ impl Timeline { pending_deletions: Vec::new(), pending_nblocks: 0, pending_directory_entries: Vec::new(), + pending_bytes: 0, lsn, } } @@ -1022,21 +1021,33 @@ pub struct DatadirModification<'a> { // The put-functions add the modifications here, and they are flushed to the // underlying key-value store by the 'finish' function. pending_lsns: Vec, - pending_updates: HashMap>, + pending_updates: HashMap>, pending_deletions: Vec<(Range, Lsn)>, pending_nblocks: i64, /// For special "directory" keys that store key-value maps, track the size of the map /// if it was updated in this modification. pending_directory_entries: Vec<(DirectoryKind, usize)>, + + /// An **approximation** of how large our EphemeralFile write will be when committed. + pending_bytes: usize, } impl<'a> DatadirModification<'a> { + // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can + // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we + // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed. + pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024; + /// Get the current lsn pub(crate) fn get_lsn(&self) -> Lsn { self.lsn } + pub(crate) fn approx_pending_bytes(&self) -> usize { + self.pending_bytes + } + /// Set the current lsn pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> { ensure!( @@ -1769,21 +1780,25 @@ impl<'a> DatadirModification<'a> { // Flush relation and SLRU data blocks, keep metadata. let mut retained_pending_updates = HashMap::<_, Vec<_>>::new(); for (key, values) in self.pending_updates.drain() { - for (lsn, value) in values { + let mut write_batch = Vec::new(); + for (lsn, value_ser_size, value) in values { if key.is_rel_block_key() || key.is_slru_block_key() { // This bails out on first error without modifying pending_updates. // That's Ok, cf this function's doc comment. - writer.put(key, lsn, &value, ctx).await?; + write_batch.push((key.to_compact(), lsn, value_ser_size, value)); } else { - retained_pending_updates - .entry(key) - .or_default() - .push((lsn, value)); + retained_pending_updates.entry(key).or_default().push(( + lsn, + value_ser_size, + value, + )); } } + writer.put_batch(write_batch, ctx).await?; } self.pending_updates = retained_pending_updates; + self.pending_bytes = 0; if pending_nblocks != 0 { writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); @@ -1809,17 +1824,20 @@ impl<'a> DatadirModification<'a> { self.pending_nblocks = 0; if !self.pending_updates.is_empty() { - // The put_batch call below expects expects the inputs to be sorted by Lsn, - // so we do that first. - let lsn_ordered_batch: VecMap = VecMap::from_iter( - self.pending_updates - .drain() - .map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (lsn, (key, val)))) - .kmerge_by(|lhs, rhs| lhs.0 < rhs.0), - VecMapOrdering::GreaterOrEqual, - ); + // Ordering: the items in this batch do not need to be in any global order, but values for + // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on + // this to do efficient updates to its index. + let batch: Vec<(CompactKey, Lsn, usize, Value)> = self + .pending_updates + .drain() + .flat_map(|(key, values)| { + values.into_iter().map(move |(lsn, val_ser_size, value)| { + (key.to_compact(), lsn, val_ser_size, value) + }) + }) + .collect::>(); - writer.put_batch(lsn_ordered_batch, ctx).await?; + writer.put_batch(batch, ctx).await?; } if !self.pending_deletions.is_empty() { @@ -1844,6 +1862,8 @@ impl<'a> DatadirModification<'a> { writer.update_directory_entries_count(kind, count as u64); } + self.pending_bytes = 0; + Ok(()) } @@ -1860,7 +1880,7 @@ impl<'a> DatadirModification<'a> { // Note: we don't check pending_deletions. It is an error to request a // value that has been removed, deletion only avoids leaking storage. if let Some(values) = self.pending_updates.get(&key) { - if let Some((_, value)) = values.last() { + if let Some((_, _, value)) = values.last() { return if let Value::Image(img) = value { Ok(img.clone()) } else { @@ -1888,13 +1908,17 @@ impl<'a> DatadirModification<'a> { fn put(&mut self, key: Key, val: Value) { let values = self.pending_updates.entry(key).or_default(); // Replace the previous value if it exists at the same lsn - if let Some((last_lsn, last_value)) = values.last_mut() { + if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() { if *last_lsn == self.lsn { + *last_value_ser_size = val.serialized_size().unwrap() as usize; *last_value = val; return; } } - values.push((self.lsn, val)); + + let val_serialized_size = val.serialized_size().unwrap() as usize; + self.pending_bytes += val_serialized_size; + values.push((self.lsn, val_serialized_size, val)); } fn delete(&mut self, key_range: Range) { diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index b67a72a7f7..f81045165e 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -4,14 +4,14 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache; -use crate::tenant::storage_layer::inmemory_layer::InMemoryLayerIndexValueUnpacked; +use crate::tenant::storage_layer::inmemory_layer::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64}; use crate::virtual_file::owned_buffers_io::slice::SliceMutExt; use crate::virtual_file::owned_buffers_io::util::size_tracking_writer; use crate::virtual_file::owned_buffers_io::write::Buffer; use crate::virtual_file::{self, owned_buffers_io, VirtualFile}; -use anyhow::Context; use bytes::BytesMut; use camino::Utf8PathBuf; +use num_traits::Num; use pageserver_api::shard::TenantShardId; use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice}; use tracing::error; @@ -24,7 +24,7 @@ pub struct EphemeralFile { _tenant_shard_id: TenantShardId, _timeline_id: TimelineId, page_cache_file_id: page_cache::FileId, - bytes_written: u32, + bytes_written: u64, buffered_writer: owned_buffers_io::write::BufferedWriter< BytesMut, size_tracking_writer::Writer, @@ -33,8 +33,6 @@ pub struct EphemeralFile { _gate_guard: utils::sync::gate::GateGuard, } -use super::storage_layer::inmemory_layer::InMemoryLayerIndexValue; - const TAIL_SZ: usize = 64 * 1024; impl EphemeralFile { @@ -100,7 +98,7 @@ impl Drop for EphemeralFile { } impl EphemeralFile { - pub(crate) fn len(&self) -> u32 { + pub(crate) fn len(&self) -> u64 { self.bytes_written } @@ -141,32 +139,21 @@ impl EphemeralFile { /// No guarantees are made about the remaining bytes in `dst`, i.e., assume their contents are random. pub(crate) async fn read_at_to_end( &self, - start: u32, + start: u64, dst: Slice, ctx: &RequestContext, ) -> std::io::Result<(Slice, usize)> { let file_size_tracking_writer = self.buffered_writer.as_inner(); - let flushed_offset = u32::try_from(file_size_tracking_writer.bytes_written()) - .expect("we don't allow writing more than u32::MAX bytes"); + let flushed_offset = file_size_tracking_writer.bytes_written(); let buffer = self.buffered_writer.inspect_buffer(); let buffered = &buffer[0..buffer.pending()]; - let dst_cap = u32::try_from(dst.bytes_total()) - .with_context(|| { - format!( - "read_aligned: dst.bytes_total() is too large: {}", - dst.len() - ) - }) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + let dst_cap = dst.bytes_total().as_u64(); let end = { - let mut end = start - .checked_add(dst_cap) - .with_context(|| { - format!("read_aligned: offset + dst.bytes_total() is too large: {start} + {dst_cap}",) - }) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + // saturating_add is correct here because the max file size is u64::MAX, so, + // if start + dst.len() > u64::MAX, then we know it will be a short read + let mut end: u64 = start.saturating_add(dst_cap); if end > self.bytes_written { end = self.bytes_written; } @@ -175,11 +162,11 @@ impl EphemeralFile { // inclusive, exclusive #[derive(Debug)] - struct Range(u32, u32); - impl Range { - fn len(&self) -> u32 { + struct Range(N, N); + impl Range { + fn len(&self) -> N { if self.0 > self.1 { - 0 + N::zero() } else { self.1 - self.0 } @@ -193,7 +180,7 @@ impl EphemeralFile { let bounds = dst.bounds(); let slice = file .read_exact_at( - dst.slice(0..written_range.len() as usize), + dst.slice(0..written_range.len().as_usize()), start as u64, ctx, ) @@ -204,14 +191,21 @@ impl EphemeralFile { }; let dst = if buffered_range.len() > 0 { - let offset_in_buffer = - usize::try_from(buffered_range.0.checked_sub(flushed_offset).unwrap()).unwrap(); + let offset_in_buffer = buffered_range + .0 + .checked_sub(flushed_offset) + .unwrap() + .as_usize(); let to_copy = - &buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len() as usize)]; + &buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len().as_usize())]; let bounds = dst.bounds(); let mut view = dst.slice( - written_range.len() as usize - ..written_range.len() as usize + buffered_range.len() as usize, + written_range.len().as_usize() + ..written_range + .len() + .checked_add(buffered_range.len()) + .unwrap() + .as_usize(), ); view.as_mut_rust_slice_full_zeroed() .copy_from_slice(to_copy); @@ -222,29 +216,45 @@ impl EphemeralFile { // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs - Ok((dst, (end - start) as usize)) + Ok((dst, (end - start).as_usize())) } - pub(crate) async fn write_blob( + /// Returns the offset at which the first byte of the input was written, for use + /// in constructing indices over the written value. + /// + /// Panics if the write is short because there's no way we can recover from that. + /// TODO: make upstack handle this as an error. + pub(crate) async fn write_raw( &mut self, - buf: &[u8], - will_init: bool, + srcbuf: &[u8], ctx: &RequestContext, - ) -> anyhow::Result { + ) -> std::io::Result { let pos = self.bytes_written; - let index_value = InMemoryLayerIndexValue::new(InMemoryLayerIndexValueUnpacked { - pos: self.bytes_written, - len: buf.len(), - will_init, - })?; - debug_assert_eq!(index_value.unpack().pos, pos); - debug_assert_eq!(index_value.unpack().len as usize, buf.len()); - self.buffered_writer - .write_buffered_borrowed(buf, ctx) - .await?; - self.bytes_written += index_value.unpack().len; // index_value is checked for overflow in release mode - Ok(index_value) + let new_bytes_written = pos.checked_add(srcbuf.len().as_u64()).ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}", + srcbuf_len = srcbuf.len(), + ), + ) + })?; + + // Write the payload + let nwritten = self + .buffered_writer + .write_buffered_borrowed(srcbuf, ctx) + .await?; + assert_eq!( + nwritten, + srcbuf.len(), + "buffered writer has no short writes" + ); + + self.bytes_written = new_bytes_written; + + Ok(pos) } } @@ -348,29 +358,18 @@ mod tests { .take(write_nbytes) .collect(); - let mut index_values = Vec::new(); + let mut value_offsets = Vec::new(); for i in 0..write_nbytes { - let index_value = file - .write_blob(&content[i..i + 1], i % 2 == 0, &ctx) - .await - .unwrap(); - index_values.push(index_value); + let off = file.write_raw(&content[i..i + 1], &ctx).await.unwrap(); + value_offsets.push(off); } assert!(file.len() as usize == write_nbytes); for i in 0..write_nbytes { - assert_eq!( - index_values[i], - InMemoryLayerIndexValue::new(InMemoryLayerIndexValueUnpacked { - pos: i as u32, - len: 1, - will_init: i % 2 == 0, - }) - .unwrap() - ); + assert_eq!(value_offsets[i], i.as_u64()); let buf = Vec::with_capacity(1); let (buf_slice, nread) = file - .read_at_to_end(i as u32, buf.slice_full(), &ctx) + .read_at_to_end(i.as_u64(), buf.slice_full(), &ctx) .await .unwrap(); let buf = buf_slice.into_inner(); diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 04f89db401..133b34b8b5 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -2,7 +2,7 @@ pub mod delta_layer; pub mod image_layer; -pub(crate) mod inmemory_layer; +pub mod inmemory_layer; pub(crate) mod layer; mod layer_desc; mod layer_name; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 3e67df0aee..de5196fddd 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -13,6 +13,7 @@ use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::{l0_flush, page_cache}; use anyhow::{anyhow, Context, Result}; +use assert_u64_eq_usize::{U64IsUsize, UsizeIsU64}; use bytes::Bytes; use camino::Utf8PathBuf; use pageserver_api::key::CompactKey; @@ -32,12 +33,13 @@ use std::fmt::Write; use std::ops::Range; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::atomic::{AtomicU64, AtomicUsize}; -use tokio::sync::{RwLock, RwLockWriteGuard}; +use tokio::sync::RwLock; use super::{ DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState, }; +pub(crate) mod assert_u64_eq_usize; mod vectored_dio_read; #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] @@ -124,6 +126,7 @@ impl InMemoryLayerIndexValue { } remainder }; + const MAX_SUPPORTED_POS: usize = (1 << Self::MAX_SUPPORTED_POS_BITS) - 1; // Layout const WILL_INIT_RANGE: Range = 0..1; @@ -157,66 +160,67 @@ impl InMemoryLayerIndexValue { /// Checks that the `len` is within the supported range /// and that `pos + len` fits within a u32. - pub(crate) fn new(unpacked: InMemoryLayerIndexValueUnpacked) -> anyhow::Result { - let InMemoryLayerIndexValueUnpacked { - pos, + #[inline(always)] + fn new(arg: InMemoryLayerIndexValueNewArgs) -> anyhow::Result { + let InMemoryLayerIndexValueNewArgs { + base_offset, + batch_offset, len, will_init, - } = unpacked; + } = arg; + + let pos = base_offset + .checked_add(batch_offset) + .ok_or_else(|| anyhow::anyhow!("base_offset + batch_offset overflows u64: base_offset={base_offset} batch_offset={batch_offset}"))?; + + if pos.as_usize() > Self::MAX_SUPPORTED_POS { + anyhow::bail!( + "base_offset+batch_offset exceeds the maximum supported value: base_offset={base_offset} batch_offset={batch_offset} (+)={pos} max={max}", + max = Self::MAX_SUPPORTED_POS + ); + } if len > MAX_SUPPORTED_BLOB_LEN { anyhow::bail!( "len exceeds the maximum supported length: len={len} max={MAX_SUPPORTED_BLOB_LEN}", ); } - const _: () = { - if MAX_SUPPORTED_BLOB_LEN > u32::MAX as usize { - panic!() - } - }; - let len = u32::try_from(len).expect("see const assertion above"); - - pos.checked_add(len).ok_or_else(|| { - anyhow::anyhow!("pos + len overflows u32, not representable in EphemeralFile") - })?; let mut data: u64 = 0; use bit_field::BitField; data.set_bits(Self::WILL_INIT_RANGE, if will_init { 1 } else { 0 }); - data.set_bits(Self::LEN_RANGE, len as u64); - data.set_bits(Self::POS_RANGE, pos as u64); + data.set_bits(Self::LEN_RANGE, len.as_u64()); + data.set_bits(Self::POS_RANGE, pos); Ok(Self(data)) } - #[inline] - pub(crate) fn unpack(&self) -> InMemoryLayerIndexValueUnpacked { + #[inline(always)] + fn unpack(&self) -> InMemoryLayerIndexValueUnpacked { use bit_field::BitField; InMemoryLayerIndexValueUnpacked { will_init: self.0.get_bits(Self::WILL_INIT_RANGE) != 0, - len: self.0.get_bits(Self::LEN_RANGE) as u32, - pos: self.0.get_bits(Self::POS_RANGE) as u32, + len: self.0.get_bits(Self::LEN_RANGE), + pos: self.0.get_bits(Self::POS_RANGE), } } } +/// Args to [`InMemoryLayerIndexValue::new`]. +#[derive(Clone, Copy)] +struct InMemoryLayerIndexValueNewArgs { + base_offset: u64, + batch_offset: u64, + len: usize, + will_init: bool, +} + /// Unpacked representation of the bitfielded [`InMemoryLayerIndexValue`]. #[derive(Clone, Copy, PartialEq, Eq, Debug)] -pub(crate) struct InMemoryLayerIndexValueUnpacked { - pub(crate) will_init: bool, - pub(crate) len: L, - pub(crate) pos: u32, -} - -impl InMemoryLayerIndexValueUnpacked { - #[cfg(test)] - pub(crate) fn as_usize(&self) -> InMemoryLayerIndexValueUnpacked { - InMemoryLayerIndexValueUnpacked { - will_init: self.will_init, - len: self.len as usize, - pos: self.pos, - } - } +struct InMemoryLayerIndexValueUnpacked { + will_init: bool, + len: u64, + pos: u64, } impl std::fmt::Debug for InMemoryLayerInner { @@ -359,7 +363,7 @@ impl InMemoryLayer { } } - pub(crate) fn try_len(&self) -> Option { + pub(crate) fn try_len(&self) -> Option { self.inner.try_read().map(|i| i.file.len()).ok() } @@ -446,6 +450,16 @@ impl InMemoryLayer { } // Execute the reads. + impl vectored_dio_read::File for EphemeralFile { + async fn read_at_to_end<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>( + &'b self, + start: u64, + dst: tokio_epoll_uring::Slice, + ctx: &'a RequestContext, + ) -> std::io::Result<(tokio_epoll_uring::Slice, usize)> { + EphemeralFile::read_at_to_end(self, start, dst, ctx).await + } + } let f = vectored_dio_read::execute( &inner.file, reads @@ -491,14 +505,65 @@ impl InMemoryLayer { } } -impl vectored_dio_read::File for EphemeralFile { - async fn read_at_to_end<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>( - &'b self, - start: u32, - dst: tokio_epoll_uring::Slice, - ctx: &'a RequestContext, - ) -> std::io::Result<(tokio_epoll_uring::Slice, usize)> { - EphemeralFile::read_at_to_end(self, start, dst, ctx).await +/// Offset of a particular Value within a serialized batch. +struct SerializedBatchOffset { + key: CompactKey, + lsn: Lsn, + // TODO: separate type when we start serde-serializing this value, to avoid coupling + // in-memory representation to serialization format. + value: InMemoryLayerIndexValue, +} + +pub struct SerializedBatch { + /// Blobs serialized in EphemeralFile's native format, ready for passing to [`EphemeralFile::write_raw`]. + pub(crate) raw: Vec, + + /// Index of values in [`Self::raw`], using offsets relative to the start of the buffer. + offsets: Vec, + + /// The highest LSN of any value in the batch + pub(crate) max_lsn: Lsn, +} + +impl SerializedBatch { + pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> Self { + // Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by + // [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`] + let buffer_size = batch.iter().map(|i| i.2).sum::() + 4 * batch.len(); + let mut cursor = std::io::Cursor::new(Vec::::with_capacity(buffer_size)); + + let mut offsets: Vec = Vec::with_capacity(batch.len()); + let mut max_lsn: Lsn = Lsn(0); + for (key, lsn, val_ser_size, val) in batch { + let relative_off = cursor.position(); + + val.ser_into(&mut cursor) + .expect("Writing into in-memory buffer is infallible"); + + offsets.push(SerializedBatchOffset { + key, + lsn, + value: InMemoryLayerIndexValue::new(InMemoryLayerIndexValueNewArgs { + base_offset: 0, + batch_offset: relative_off, + len: val_ser_size, + will_init: val.will_init(), + }) + .expect("higher-level code ensures that values are within supported ranges"), + }); + max_lsn = std::cmp::max(max_lsn, lsn); + } + + let buffer = cursor.into_inner(); + + // Assert that we didn't do any extra allocations while building buffer. + debug_assert!(buffer.len() <= buffer_size); + + Self { + raw: buffer, + offsets, + max_lsn, + } } } @@ -562,55 +627,66 @@ impl InMemoryLayer { }) } - // Write operations - - /// Common subroutine of the public put_wal_record() and put_page_image() functions. - /// Adds the page version to the in-memory tree - pub async fn put_value( + /// Write path. + /// + /// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from. + /// The reason why it's not retryable is that the [`EphemeralFile`] writes are not retryable. + /// TODO: it can be made retryable if we aborted the process on EphemeralFile write errors. + pub async fn put_batch( &self, - key: CompactKey, - lsn: Lsn, - buf: &[u8], - will_init: bool, + serialized_batch: SerializedBatch, ctx: &RequestContext, - ) -> Result<()> { + ) -> anyhow::Result<()> { let mut inner = self.inner.write().await; self.assert_writable(); - self.put_value_locked(&mut inner, key, lsn, buf, will_init, ctx) - .await - } - async fn put_value_locked( - &self, - locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>, - key: CompactKey, - lsn: Lsn, - buf: &[u8], - will_init: bool, - ctx: &RequestContext, - ) -> Result<()> { - trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn); + let base_offset = inner.file.len(); - let entry = locked_inner - .file - .write_blob( - buf, - will_init, - &RequestContextBuilder::extend(ctx) - .page_content_kind(PageContentKind::InMemoryLayer) - .build(), - ) - .await?; + // Add the base_offset to the batch's index values which are relative to the batch start. + let index_values: Vec = serialized_batch + .offsets + .into_iter() + .map(|SerializedBatchOffset { key, lsn, value }| { + let InMemoryLayerIndexValueUnpacked { + will_init, + len, + pos, + } = value.unpack(); + anyhow::Ok(SerializedBatchOffset { + key, + lsn, + value: InMemoryLayerIndexValue::new(InMemoryLayerIndexValueNewArgs { + base_offset, + batch_offset: pos, + len: len.as_usize(), + will_init, + })?, + }) + }) + .collect::>>()?; - let vec_map = locked_inner.index.entry(key).or_default(); - let old = vec_map.append_or_update_last(lsn, entry).unwrap().0; - if old.is_some() { - // We already had an entry for this LSN. That's odd.. - warn!("Key {} at {} already exists", key, lsn); + // Write the batch to the file + inner.file.write_raw(&serialized_batch.raw, ctx).await?; + let new_size = inner.file.len(); + let expected_new_len = base_offset + .checked_add(serialized_batch.raw.len().as_u64()) + // write_raw would error if we were to overflow u64. + // also InMemoryLayerIndexValue and higher levels in + //the code don't allow the file to grow that large + .unwrap(); + assert_eq!(new_size, expected_new_len); + + // Update the index with the new values + for SerializedBatchOffset { key, lsn, value } in index_values { + let vec_map = inner.index.entry(key).or_default(); + let old = vec_map.append_or_update_last(lsn, value).unwrap().0; + if old.is_some() { + // We already had an entry for this LSN. That's odd.. + warn!("Key {} at {} already exists", key, lsn); + } } - let size = locked_inner.file.len(); - locked_inner.resource_units.maybe_publish_size(size as u64); + inner.resource_units.maybe_publish_size(new_size); Ok(()) } @@ -772,57 +848,127 @@ mod tests { #[test] fn test_index_value() { + const MAX_SUPPORTED_POS: usize = InMemoryLayerIndexValue::MAX_SUPPORTED_POS; + use InMemoryLayerIndexValueNewArgs as Args; use InMemoryLayerIndexValueUnpacked as Unpacked; - let roundtrip = |input| { - let res = InMemoryLayerIndexValue::new(input).expect("this tests expects no errors"); - assert_eq!(res.unpack().as_usize(), input); + + let roundtrip = |args, expect: Unpacked| { + let res = InMemoryLayerIndexValue::new(args).expect("this tests expects no errors"); + let InMemoryLayerIndexValueUnpacked { + will_init, + len, + pos, + } = res.unpack(); + assert_eq!(will_init, expect.will_init); + assert_eq!(len, expect.len); + assert_eq!(pos, expect.pos); }; - // will_init - roundtrip(Unpacked { - will_init: false, - len: 0, - pos: 0, - }); - roundtrip(Unpacked { - will_init: true, - len: 0, - pos: 0, - }); + // basic roundtrip + for pos in [0, MAX_SUPPORTED_POS] { + for len in [0, MAX_SUPPORTED_BLOB_LEN] { + for will_init in [true, false] { + let expect = Unpacked { + will_init, + len: len.as_u64(), + pos: pos.as_u64(), + }; + roundtrip( + Args { + will_init, + base_offset: pos.as_u64(), + batch_offset: 0, + len, + }, + expect, + ); + roundtrip( + Args { + will_init, + base_offset: 0, + batch_offset: pos.as_u64(), + len, + }, + expect, + ); + } + } + } - // len - roundtrip(Unpacked { - will_init: false, - len: MAX_SUPPORTED_BLOB_LEN, - pos: 0, - }); - let too_large = Unpacked { + // too-large len + let too_large = Args { will_init: false, len: MAX_SUPPORTED_BLOB_LEN + 1, - pos: 0, + base_offset: 0, + batch_offset: 0, }; assert!(InMemoryLayerIndexValue::new(too_large).is_err()); - // pos - roundtrip(Unpacked { - will_init: false, - len: 0, - pos: { - let max_as_per_supported_bits: usize = - (1 << InMemoryLayerIndexValue::MAX_SUPPORTED_POS_BITS) - 1; - if max_as_per_supported_bits < u32::MAX as usize { - panic!("current implementation has space for `pos` values > u32::MAX") - } - u32::MAX // but at the type system level, we enforce u32::MAX - }, - }); + // too-large pos + { + let too_large = Args { + will_init: false, + len: 0, + base_offset: MAX_SUPPORTED_POS.as_u64() + 1, + batch_offset: 0, + }; + assert!(InMemoryLayerIndexValue::new(too_large).is_err()); + let too_large = Args { + will_init: false, + len: 0, + base_offset: 0, + batch_offset: MAX_SUPPORTED_POS.as_u64() + 1, + }; + assert!(InMemoryLayerIndexValue::new(too_large).is_err()); + } - // pos + len - let too_large = Unpacked { - will_init: false, - len: 1, - pos: u32::MAX, - }; - assert!(InMemoryLayerIndexValue::new(too_large).is_err()); + // too large (base_offset + batch_offset) + { + let too_large = Args { + will_init: false, + len: 0, + base_offset: MAX_SUPPORTED_POS.as_u64(), + batch_offset: 1, + }; + assert!(InMemoryLayerIndexValue::new(too_large).is_err()); + let too_large = Args { + will_init: false, + len: 0, + base_offset: MAX_SUPPORTED_POS.as_u64() - 1, + batch_offset: MAX_SUPPORTED_POS.as_u64() - 1, + }; + assert!(InMemoryLayerIndexValue::new(too_large).is_err()); + } + + // valid special cases + // - area past the max supported pos that is accessible by len + for len in [1, MAX_SUPPORTED_BLOB_LEN] { + roundtrip( + Args { + will_init: false, + len, + base_offset: MAX_SUPPORTED_POS.as_u64(), + batch_offset: 0, + }, + Unpacked { + will_init: false, + len: len as u64, + pos: MAX_SUPPORTED_POS.as_u64(), + }, + ); + roundtrip( + Args { + will_init: false, + len, + base_offset: 0, + batch_offset: MAX_SUPPORTED_POS.as_u64(), + }, + Unpacked { + will_init: false, + len: len as u64, + pos: MAX_SUPPORTED_POS.as_u64(), + }, + ); + } } } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer/assert_u64_eq_usize.rs b/pageserver/src/tenant/storage_layer/inmemory_layer/assert_u64_eq_usize.rs new file mode 100644 index 0000000000..e635a22aab --- /dev/null +++ b/pageserver/src/tenant/storage_layer/inmemory_layer/assert_u64_eq_usize.rs @@ -0,0 +1,29 @@ +pub(crate) const _ASSERT_U64_EQ_USIZE: () = { + if std::mem::size_of::() != std::mem::size_of::() { + panic!("the traits defined in this module assume that usize and u64 can be converted to each other without loss of information"); + } +}; + +pub(crate) trait U64IsUsize { + fn as_usize(self) -> usize; +} + +impl U64IsUsize for u64 { + #[inline(always)] + fn as_usize(self) -> usize { + let _ = _ASSERT_U64_EQ_USIZE; + self as usize + } +} + +pub(crate) trait UsizeIsU64 { + fn as_u64(self) -> u64; +} + +impl UsizeIsU64 for usize { + #[inline(always)] + fn as_u64(self) -> u64 { + let _ = _ASSERT_U64_EQ_USIZE; + self as u64 + } +} diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs index da4830c9cf..bbd4be03d1 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs @@ -6,7 +6,10 @@ use std::{ use itertools::Itertools; use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice}; -use crate::context::RequestContext; +use crate::{ + context::RequestContext, + tenant::storage_layer::inmemory_layer::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64}, +}; mod sealed { pub trait Sealed {} @@ -16,7 +19,7 @@ mod sealed { pub trait File: Send { async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>( &'b self, - start: u32, + start: u64, dst: Slice, ctx: &'a RequestContext, ) -> std::io::Result<(Slice, usize)>; @@ -24,7 +27,7 @@ pub trait File: Send { /// A logical read from [`File`]. See [`Self::new`]. pub struct LogicalRead { - pos: u32, + pos: u64, state: RwLockRefCell>, } @@ -38,7 +41,7 @@ enum LogicalReadState { impl LogicalRead { /// Create a new [`LogicalRead`] from [`File`] of the data in the file in range `[ pos, pos + buf.cap() )`. - pub fn new(pos: u32, buf: B) -> Self { + pub fn new(pos: u64, buf: B) -> Self { Self { pos, state: RwLockRefCell::new(LogicalReadState::NotStarted(buf)), @@ -100,11 +103,11 @@ where let (reads, assert_logical_reads): (_, Option>>) = (reads, None); // Plan which parts of which chunks need to be appended to which buffer - let mut by_chunk: BTreeMap>> = BTreeMap::new(); + let mut by_chunk: BTreeMap>> = BTreeMap::new(); struct Interest<'a, B: Buffer> { logical_read: &'a LogicalRead, - offset_in_chunk: u32, - len: u32, + offset_in_chunk: u64, + len: u64, } for logical_read in reads { let LogicalRead { pos, state } = logical_read; @@ -129,14 +132,14 @@ where // plan which chunks we need to read from let mut remaining = req_len; - let mut chunk_no = *pos / (DIO_CHUNK_SIZE as u32); - let mut offset_in_chunk = usize::try_from(*pos % (DIO_CHUNK_SIZE as u32)).unwrap(); + let mut chunk_no = *pos / (DIO_CHUNK_SIZE.as_u64()); + let mut offset_in_chunk = pos.as_usize() % DIO_CHUNK_SIZE; while remaining > 0 { let remaining_in_chunk = std::cmp::min(remaining, DIO_CHUNK_SIZE - offset_in_chunk); by_chunk.entry(chunk_no).or_default().push(Interest { logical_read, - offset_in_chunk: offset_in_chunk as u32, - len: remaining_in_chunk as u32, + offset_in_chunk: offset_in_chunk.as_u64(), + len: remaining_in_chunk.as_u64(), }); offset_in_chunk = 0; chunk_no += 1; @@ -149,20 +152,20 @@ where // However, we can merge adjacent chunks into batches of MAX_CHUNK_BATCH_SIZE // so we issue fewer IOs = fewer roundtrips = lower overall latency. struct PhysicalRead<'a, B: Buffer> { - start_chunk_no: u32, - nchunks: u32, + start_chunk_no: u64, + nchunks: usize, dsts: Vec>, } struct MergedInterest<'a, B: Buffer> { logical_read: &'a LogicalRead, - offset_in_physical_read: u32, - len: u32, + offset_in_physical_read: u64, + len: u64, } let mut physical_reads: Vec> = Vec::new(); let mut by_chunk = by_chunk.into_iter().peekable(); loop { let mut last_chunk_no = None; - let to_merge: Vec<(u32, Vec>)> = by_chunk + let to_merge: Vec<(u64, Vec>)> = by_chunk .peeking_take_while(|(chunk_no, _)| { if let Some(last_chunk_no) = last_chunk_no { if *chunk_no != last_chunk_no + 1 { @@ -177,7 +180,7 @@ where let Some(start_chunk_no) = to_merge.first().map(|(chunk_no, _)| *chunk_no) else { break; }; - let nchunks = to_merge.len() as u32; + let nchunks = to_merge.len(); let dsts = to_merge .into_iter() .enumerate() @@ -190,7 +193,10 @@ where }| { MergedInterest { logical_read, - offset_in_physical_read: i as u32 * DIO_CHUNK_SIZE as u32 + offset_in_physical_read: i + .checked_mul(DIO_CHUNK_SIZE) + .unwrap() + .as_u64() + offset_in_chunk, len, } @@ -208,7 +214,7 @@ where // Execute physical reads and fill the logical read buffers // TODO: prefetch - let get_io_buffer = |nchunks| Vec::with_capacity(nchunks as usize * (DIO_CHUNK_SIZE)); + let get_io_buffer = |nchunks| Vec::with_capacity(nchunks * DIO_CHUNK_SIZE); for PhysicalRead { start_chunk_no, nchunks, @@ -221,13 +227,12 @@ where if all_done { continue; } - let read_offset = start_chunk_no * DIO_CHUNK_SIZE as u32; + let read_offset = start_chunk_no + .checked_mul(DIO_CHUNK_SIZE.as_u64()) + .expect("we produce chunk_nos by dividing by DIO_CHUNK_SIZE earlier"); let io_buf = get_io_buffer(nchunks).slice_full(); let req_len = io_buf.len(); - let (io_buf_slice, nread) = match file - .read_at_to_end(start_chunk_no * DIO_CHUNK_SIZE as u32, io_buf, ctx) - .await - { + let (io_buf_slice, nread) = match file.read_at_to_end(read_offset, io_buf, ctx).await { Ok(t) => t, Err(e) => { let e = Arc::new(e); @@ -431,7 +436,7 @@ mod tests { .collect(), } } - fn test_logical_read(&self, pos: u32, len: usize) -> TestLogicalRead { + fn test_logical_read(&self, pos: u64, len: usize) -> TestLogicalRead { let expected_result = if pos as usize + len > self.content.len() { Err("InMemoryFile short read".to_string()) } else { @@ -467,7 +472,7 @@ mod tests { impl File for InMemoryFile { async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>( &'b self, - start: u32, + start: u64, mut dst: Slice, _ctx: &'a RequestContext, ) -> std::io::Result<(Slice, usize)> { @@ -490,13 +495,13 @@ mod tests { #[derive(Clone)] struct TestLogicalRead { - pos: u32, + pos: u64, len: usize, expected_result: Result, String>, } impl TestLogicalRead { - fn new(pos: u32, len: usize, expected_result: Result, String>) -> Self { + fn new(pos: u64, len: usize, expected_result: Result, String>) -> Self { Self { pos, len, @@ -535,7 +540,7 @@ mod tests { async fn test_blackbox() { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); let cs = DIO_CHUNK_SIZE; - let cs_u32 = u32::try_from(cs).unwrap(); + let cs_u64 = cs.as_u64(); let file = InMemoryFile::new_random(10 * cs); @@ -545,12 +550,12 @@ mod tests { file.test_logical_read(1, 2), // gap // spans adjacent chunks - file.test_logical_read(cs_u32 - 1, 2), + file.test_logical_read(cs_u64 - 1, 2), // gap // tail of chunk 3, all of chunk 4, and 2 bytes of chunk 5 - file.test_logical_read(3 * cs_u32 - 1, cs + 2), + file.test_logical_read(3 * cs_u64 - 1, cs + 2), // gap - file.test_logical_read(5 * cs_u32, 1), + file.test_logical_read(5 * cs_u64, 1), ]; let num_test_logical_reads = test_logical_reads.len(); let test_logical_reads_perms = test_logical_reads @@ -581,7 +586,7 @@ mod tests { } struct RecordedRead { - pos: u32, + pos: u64, req_len: usize, res: Vec, } @@ -598,7 +603,7 @@ mod tests { impl<'x> File for RecorderFile<'x> { async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>( &'b self, - start: u32, + start: u64, dst: Slice, ctx: &'a RequestContext, ) -> std::io::Result<(Slice, usize)> { @@ -618,8 +623,8 @@ mod tests { let file = InMemoryFile::new_random(2 * DIO_CHUNK_SIZE); - let a = file.test_logical_read(DIO_CHUNK_SIZE as u32, 10); - let b = file.test_logical_read(DIO_CHUNK_SIZE as u32 + 30, 20); + let a = file.test_logical_read(DIO_CHUNK_SIZE.as_u64(), 10); + let b = file.test_logical_read(DIO_CHUNK_SIZE.as_u64() + 30, 20); let recorder = RecorderFile::new(&file); @@ -628,7 +633,7 @@ mod tests { let recorded = recorder.recorded.borrow(); assert_eq!(recorded.len(), 1); let RecordedRead { pos, req_len, .. } = &recorded[0]; - assert_eq!(*pos, DIO_CHUNK_SIZE as u32); + assert_eq!(*pos, DIO_CHUNK_SIZE.as_u64()); assert_eq!(*req_len, DIO_CHUNK_SIZE); } @@ -644,7 +649,7 @@ mod tests { let mut test_logical_reads = Vec::new(); for i in 3..3 + MAX_CHUNK_BATCH_SIZE + MAX_CHUNK_BATCH_SIZE / 2 { test_logical_reads - .push(file.test_logical_read(i as u32 * DIO_CHUNK_SIZE as u32 + 10, 1)); + .push(file.test_logical_read(i.as_u64() * DIO_CHUNK_SIZE.as_u64() + 10, 1)); } let recorder = RecorderFile::new(&file); @@ -673,7 +678,7 @@ mod tests { let file = InMemoryFile::new_random(3 * DIO_CHUNK_SIZE); let a = file.test_logical_read(0, 1); // chunk 0 - let b = file.test_logical_read(2 * DIO_CHUNK_SIZE as u32, 1); // chunk 2 + let b = file.test_logical_read(2 * DIO_CHUNK_SIZE.as_u64(), 1); // chunk 2 let recorder = RecorderFile::new(&file); @@ -689,13 +694,13 @@ mod tests { } { let RecordedRead { pos, req_len, .. } = &recorded[1]; - assert_eq!(*pos, 2 * DIO_CHUNK_SIZE as u32); + assert_eq!(*pos, 2 * DIO_CHUNK_SIZE.as_u64()); assert_eq!(*req_len, DIO_CHUNK_SIZE); } } struct ExpectedRead { - expect_pos: u32, + expect_pos: u64, expect_len: usize, respond: Result, String>, } @@ -728,7 +733,7 @@ mod tests { impl File for MockFile { async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>( &'b self, - start: u32, + start: u64, mut dst: Slice, _ctx: &'a RequestContext, ) -> std::io::Result<(Slice, usize)> { @@ -809,19 +814,19 @@ mod tests { let test_logical_reads = vec![ // read spanning two batches TestLogicalRead::new( - DIO_CHUNK_SIZE as u32 / 2, + DIO_CHUNK_SIZE.as_u64() / 2, MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE, Err("foo".to_owned()), ), // second read in failing chunk TestLogicalRead::new( - (MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE) as u32 + DIO_CHUNK_SIZE as u32 - 10, + (MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE).as_u64() + DIO_CHUNK_SIZE.as_u64() - 10, 5, Err("foo".to_owned()), ), // read unaffected TestLogicalRead::new( - (MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE) as u32 + 2 * DIO_CHUNK_SIZE as u32 + 10, + (MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE).as_u64() + 2 * DIO_CHUNK_SIZE.as_u64() + 10, 5, Ok(vec![1; 5]), ), @@ -832,8 +837,8 @@ mod tests { for test_logical_reads in test_logical_read_perms { let file = mock_file!( 0, MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE => Ok(vec![0; MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE]), - (MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE) as u32, DIO_CHUNK_SIZE => Err("foo".to_owned()), - (MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE + 2*DIO_CHUNK_SIZE) as u32, DIO_CHUNK_SIZE => Ok(vec![1; DIO_CHUNK_SIZE]), + (MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE).as_u64(), DIO_CHUNK_SIZE => Err("foo".to_owned()), + (MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE + 2*DIO_CHUNK_SIZE).as_u64(), DIO_CHUNK_SIZE => Ok(vec![1; DIO_CHUNK_SIZE]), ); execute_and_validate_test_logical_reads(&file, test_logical_reads, &ctx).await; } @@ -842,12 +847,12 @@ mod tests { struct TestShortReadsSetup { ctx: RequestContext, file: InMemoryFile, - written: u32, + written: u64, } fn setup_short_chunk_read_tests() -> TestShortReadsSetup { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); assert!(DIO_CHUNK_SIZE > 20, "test assumption"); - let written = (2 * DIO_CHUNK_SIZE - 10) as u32; + let written = (2 * DIO_CHUNK_SIZE - 10).as_u64(); let file = InMemoryFile::new_random(written as usize); TestShortReadsSetup { ctx, file, written } } @@ -869,7 +874,7 @@ mod tests { let recorded = recorder.recorded.borrow(); assert_eq!(recorded.len(), 1); let RecordedRead { pos, req_len, res } = &recorded[0]; - assert_eq!(*pos, DIO_CHUNK_SIZE as u32); + assert_eq!(*pos, DIO_CHUNK_SIZE.as_u64()); assert_eq!(*req_len, DIO_CHUNK_SIZE); assert_eq!(res, &file.content[DIO_CHUNK_SIZE..(written as usize)]); } @@ -882,10 +887,16 @@ mod tests { // the logical reads end in the unwritten range. // // All should fail with UnexpectedEof and have the same IO pattern. - async fn the_impl(offset_delta: i32) { + async fn the_impl(offset_delta: i64) { let TestShortReadsSetup { ctx, file, written } = setup_short_chunk_read_tests(); - let offset = (written as i32 + offset_delta) as u32; + let offset = u64::try_from( + i64::try_from(written) + .unwrap() + .checked_add(offset_delta) + .unwrap(), + ) + .unwrap(); let a = file.test_logical_read(offset, 5); let recorder = RecorderFile::new(&file); let a_vr = a.make_logical_read(); @@ -900,7 +911,7 @@ mod tests { let recorded = recorder.recorded.borrow(); assert_eq!(recorded.len(), 1); let RecordedRead { pos, req_len, res } = &recorded[0]; - assert_eq!(*pos, DIO_CHUNK_SIZE as u32); + assert_eq!(*pos, DIO_CHUNK_SIZE.as_u64()); assert_eq!(*req_len, DIO_CHUNK_SIZE); assert_eq!(res, &file.content[DIO_CHUNK_SIZE..(written as usize)]); } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 27e4f5ab0b..6220d25489 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -22,8 +22,8 @@ use handle::ShardTimelineId; use once_cell::sync::Lazy; use pageserver_api::{ key::{ - KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE, - NON_INHERITED_SPARSE_RANGE, + CompactKey, KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, + NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE, }, keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning}, models::{ @@ -44,10 +44,8 @@ use tokio::{ use tokio_util::sync::CancellationToken; use tracing::*; use utils::{ - bin_ser::BeSer, fs_ext, pausable_failpoint, sync::gate::{Gate, GateGuard}, - vec_map::VecMap, }; use std::pin::pin; @@ -137,7 +135,10 @@ use self::layer_manager::LayerManager; use self::logical_size::LogicalSize; use self::walreceiver::{WalReceiver, WalReceiverConf}; -use super::{config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized}; +use super::{ + config::TenantConf, storage_layer::inmemory_layer, storage_layer::LayerVisibilityHint, + upload_queue::NotInitialized, +}; use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf}; use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe}; use super::{ @@ -1463,7 +1464,6 @@ impl Timeline { tracing::warn!("Lock conflict while reading size of open layer"); return; }; - let current_size = current_size as u64; let current_lsn = self.get_last_record_lsn(); @@ -3592,34 +3592,6 @@ impl Timeline { return Err(FlushLayerError::Cancelled); } - // FIXME(auxfilesv2): support multiple metadata key partitions might need initdb support as well? - // This code path will not be hit during regression tests. After #7099 we have a single partition - // with two key ranges. If someone wants to fix initdb optimization in the future, this might need - // to be fixed. - - // For metadata, always create delta layers. - let delta_layer = if !metadata_partition.parts.is_empty() { - assert_eq!( - metadata_partition.parts.len(), - 1, - "currently sparse keyspace should only contain a single metadata keyspace" - ); - let metadata_keyspace = &metadata_partition.parts[0]; - self.create_delta_layer( - &frozen_layer, - Some( - metadata_keyspace.0.ranges.first().unwrap().start - ..metadata_keyspace.0.ranges.last().unwrap().end, - ), - ctx, - ) - .await - .map_err(|e| FlushLayerError::from_anyhow(self, e))? - } else { - None - }; - - // For image layers, we add them immediately into the layer map. let mut layers_to_upload = Vec::new(); layers_to_upload.extend( self.create_image_layers( @@ -3630,13 +3602,27 @@ impl Timeline { ) .await?, ); - - if let Some(delta_layer) = delta_layer { - layers_to_upload.push(delta_layer.clone()); - (layers_to_upload, Some(delta_layer)) - } else { - (layers_to_upload, None) + if !metadata_partition.parts.is_empty() { + assert_eq!( + metadata_partition.parts.len(), + 1, + "currently sparse keyspace should only contain a single metadata keyspace" + ); + layers_to_upload.extend( + self.create_image_layers( + // Safety: create_image_layers treat sparse keyspaces differently that it does not scan + // every single key within the keyspace, and therefore, it's safe to force converting it + // into a dense keyspace before calling this function. + &metadata_partition.into_dense(), + self.initdb_lsn, + ImageLayerCreationMode::Initial, + ctx, + ) + .await?, + ); } + + (layers_to_upload, None) } else { // Normal case, write out a L0 delta layer file. // `create_delta_layer` will not modify the layer map. @@ -4046,8 +4032,6 @@ impl Timeline { mode: ImageLayerCreationMode, start: Key, ) -> Result { - assert!(!matches!(mode, ImageLayerCreationMode::Initial)); - // Metadata keys image layer creation. let mut reconstruct_state = ValuesReconstructState::default(); let data = self @@ -4213,15 +4197,13 @@ impl Timeline { "metadata keys must be partitioned separately" ); } - if mode == ImageLayerCreationMode::Initial { - return Err(CreateImageLayersError::Other(anyhow::anyhow!("no image layer should be created for metadata keys when flushing frozen layers"))); - } if mode == ImageLayerCreationMode::Try && !check_for_image_layers { // Skip compaction if there are not enough updates. Metadata compaction will do a scan and // might mess up with evictions. start = img_range.end; continue; } + // For initial and force modes, we always generate image layers for metadata keys. } else if let ImageLayerCreationMode::Try = mode { // check_for_image_layers = false -> skip // check_for_image_layers = true -> check time_for_new_image_layer -> skip/generate @@ -4229,7 +4211,8 @@ impl Timeline { start = img_range.end; continue; } - } else if let ImageLayerCreationMode::Force = mode { + } + if let ImageLayerCreationMode::Force = mode { // When forced to create image layers, we might try and create them where they already // exist. This mode is only used in tests/debug. let layers = self.layers.read().await; @@ -4243,6 +4226,7 @@ impl Timeline { img_range.start, img_range.end ); + start = img_range.end; continue; } } @@ -5593,46 +5577,6 @@ enum OpenLayerAction { } impl<'a> TimelineWriter<'a> { - /// Put a new page version that can be constructed from a WAL record - /// - /// This will implicitly extend the relation, if the page is beyond the - /// current end-of-file. - pub(crate) async fn put( - &mut self, - key: Key, - lsn: Lsn, - value: &Value, - ctx: &RequestContext, - ) -> anyhow::Result<()> { - // Avoid doing allocations for "small" values. - // In the regression test suite, the limit of 256 avoided allocations in 95% of cases: - // https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061 - let mut buf = smallvec::SmallVec::<[u8; 256]>::new(); - value.ser_into(&mut buf)?; - let buf_size: u64 = buf.len().try_into().expect("oversized value buf"); - - let action = self.get_open_layer_action(lsn, buf_size); - let layer = self.handle_open_layer_action(lsn, action, ctx).await?; - let res = layer - .put_value(key.to_compact(), lsn, &buf, value.will_init(), ctx) - .await; - - if res.is_ok() { - // Update the current size only when the entire write was ok. - // In case of failures, we may have had partial writes which - // render the size tracking out of sync. That's ok because - // the checkpoint distance should be significantly smaller - // than the S3 single shot upload limit of 5GiB. - let state = self.write_guard.as_mut().unwrap(); - - state.current_size += buf_size; - state.prev_lsn = Some(lsn); - state.max_lsn = std::cmp::max(state.max_lsn, Some(lsn)); - } - - res - } - async fn handle_open_layer_action( &mut self, at: Lsn, @@ -5738,18 +5682,58 @@ impl<'a> TimelineWriter<'a> { } /// Put a batch of keys at the specified Lsns. - /// - /// The batch is sorted by Lsn (enforced by usage of [`utils::vec_map::VecMap`]. pub(crate) async fn put_batch( &mut self, - batch: VecMap, + batch: Vec<(CompactKey, Lsn, usize, Value)>, ctx: &RequestContext, ) -> anyhow::Result<()> { - for (lsn, (key, val)) in batch { - self.put(key, lsn, &val, ctx).await? + if batch.is_empty() { + return Ok(()); } - Ok(()) + let serialized_batch = inmemory_layer::SerializedBatch::from_values(batch); + let batch_max_lsn = serialized_batch.max_lsn; + let buf_size: u64 = serialized_batch.raw.len() as u64; + + let action = self.get_open_layer_action(batch_max_lsn, buf_size); + let layer = self + .handle_open_layer_action(batch_max_lsn, action, ctx) + .await?; + + let res = layer.put_batch(serialized_batch, ctx).await; + + if res.is_ok() { + // Update the current size only when the entire write was ok. + // In case of failures, we may have had partial writes which + // render the size tracking out of sync. That's ok because + // the checkpoint distance should be significantly smaller + // than the S3 single shot upload limit of 5GiB. + let state = self.write_guard.as_mut().unwrap(); + + state.current_size += buf_size; + state.prev_lsn = Some(batch_max_lsn); + state.max_lsn = std::cmp::max(state.max_lsn, Some(batch_max_lsn)); + } + + res + } + + #[cfg(test)] + /// Test helper, for tests that would like to poke individual values without composing a batch + pub(crate) async fn put( + &mut self, + key: Key, + lsn: Lsn, + value: &Value, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + use utils::bin_ser::BeSer; + let val_ser_size = value.serialized_size().unwrap() as usize; + self.put_batch( + vec![(key.to_compact(), lsn, val_ser_size, value.clone())], + ctx, + ) + .await } pub(crate) async fn delete_batch( diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index b5c577af72..0114473eda 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -27,8 +27,8 @@ use super::TaskStateUpdate; use crate::{ context::RequestContext, metrics::{LIVE_CONNECTIONS, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST}, - task_mgr::TaskKind, - task_mgr::WALRECEIVER_RUNTIME, + pgdatadir_mapping::DatadirModification, + task_mgr::{TaskKind, WALRECEIVER_RUNTIME}, tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo}, walingest::WalIngest, walrecord::DecodedWALRecord, @@ -345,7 +345,10 @@ pub(super) async fn handle_walreceiver_connection( // Commit every ingest_batch_size records. Even if we filtered out // all records, we still need to call commit to advance the LSN. uncommitted_records += 1; - if uncommitted_records >= ingest_batch_size { + if uncommitted_records >= ingest_batch_size + || modification.approx_pending_bytes() + > DatadirModification::MAX_PENDING_BYTES + { WAL_INGEST .records_committed .inc_by(uncommitted_records - filtered_records); diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index f8f37b17e3..568cf62e56 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -78,6 +78,7 @@ where .expect("must not use after we returned an error") } + /// Guarantees that if Ok() is returned, all bytes in `chunk` have been accepted. #[cfg_attr(target_os = "macos", allow(dead_code))] pub async fn write_buffered( &mut self,