diff --git a/Cargo.lock b/Cargo.lock index d41f362a5a..5ded1ed48a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -936,6 +936,12 @@ dependencies = [ "which", ] +[[package]] +name = "bit_field" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc827186963e592360843fb5ba4b973e145841266c1357f7180c43526f2e5b61" + [[package]] name = "bitflags" version = "1.3.2" @@ -3687,6 +3693,7 @@ dependencies = [ "async-compression", "async-stream", "async-trait", + "bit_field", "byteorder", "bytes", "camino", diff --git a/Cargo.toml b/Cargo.toml index e513757e16..5f2cfdb957 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ axum = { version = "0.6.20", features = ["ws"] } base64 = "0.13.0" bincode = "1.3" bindgen = "0.65" +bit_field = "0.10.2" bstr = "1.0" byteorder = "1.4" bytes = "1.0" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index cb368a69f0..85c5e24afc 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -16,6 +16,7 @@ arc-swap.workspace = true async-compression.workspace = true async-stream.workspace = true async-trait.workspace = true +bit_field.workspace = true byteorder.workspace = true bytes.workspace = true camino.workspace = true diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 8ab8d08ce1..ec10c9efe8 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -845,6 +845,12 @@ impl Tenant { }); }; + // TODO: should also be rejecting tenant conf changes that violate this check. + if let Err(e) = crate::tenant::storage_layer::inmemory_layer::InMemoryLayerIndexValue::does_timeline_should_roll_prevent_failure(tenant_clone.get_checkpoint_distance()) { + make_broken(&tenant_clone, anyhow::anyhow!(e), BrokenVerbosity::Error); + return Ok(()); + } + let mut init_order = init_order; // take the completion because initial tenant loading will complete when all of // these tasks complete. diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index a245c99a88..dd70f6bbff 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -148,7 +148,7 @@ pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0; /// The maximum size of blobs we support. The highest few bits /// are reserved for compression and other further uses. -const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff; +pub(crate) const MAX_SUPPORTED_BLOB_LEN: usize = 0x0fff_ffff; pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80; pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10; @@ -326,7 +326,7 @@ impl BlobWriter { (self.write_all(io_buf.slice_len(), ctx).await, srcbuf) } else { // Write a 4-byte length header - if len > MAX_SUPPORTED_LEN { + if len > MAX_SUPPORTED_BLOB_LEN { return ( ( io_buf.slice_len(), diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 46f02e8703..b67a72a7f7 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -4,6 +4,7 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache; +use crate::tenant::storage_layer::inmemory_layer::InMemoryLayerIndexValueUnpacked; use crate::virtual_file::owned_buffers_io::slice::SliceMutExt; use crate::virtual_file::owned_buffers_io::util::size_tracking_writer; use crate::virtual_file::owned_buffers_io::write::Buffer; @@ -229,34 +230,21 @@ impl EphemeralFile { buf: &[u8], will_init: bool, ctx: &RequestContext, - ) -> Result { + ) -> anyhow::Result { let pos = self.bytes_written; - let len = u32::try_from(buf.len()).map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - anyhow::anyhow!( - "EphemeralFile::write_blob value too large: {}: {e}", - buf.len() - ), - ) + let index_value = InMemoryLayerIndexValue::new(InMemoryLayerIndexValueUnpacked { + pos: self.bytes_written, + len: buf.len(), + will_init, })?; - pos.checked_add(len).ok_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "EphemeralFile::write_blob: overflow", - ) - })?; - + debug_assert_eq!(index_value.unpack().pos, pos); + debug_assert_eq!(index_value.unpack().len as usize, buf.len()); self.buffered_writer .write_buffered_borrowed(buf, ctx) .await?; - self.bytes_written += len; + self.bytes_written += index_value.unpack().len; // index_value is checked for overflow in release mode - Ok(InMemoryLayerIndexValue { - pos, - len, - will_init, - }) + Ok(index_value) } } @@ -373,11 +361,12 @@ mod tests { for i in 0..write_nbytes { assert_eq!( index_values[i], - InMemoryLayerIndexValue { + InMemoryLayerIndexValue::new(InMemoryLayerIndexValueUnpacked { pos: i as u32, len: 1, will_init: i % 2 == 0, - } + }) + .unwrap() ); let buf = Vec::with_capacity(1); let (buf_slice, nread) = file diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 49fa3a9e06..5e11049ee0 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -12,7 +12,7 @@ use crate::tenant::timeline::GetVectoredError; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::{l0_flush, page_cache}; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use bytes::Bytes; use camino::Utf8PathBuf; use pageserver_api::key::CompactKey; @@ -38,6 +38,8 @@ use super::{ DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState, }; +mod vectored_dio_read; + #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] pub(crate) struct InMemoryLayerFileId(page_cache::FileId); @@ -88,11 +90,133 @@ pub struct InMemoryLayerInner { resource_units: GlobalResourceUnits, } -#[derive(Debug, PartialEq, Eq)] -pub(crate) struct InMemoryLayerIndexValue { + +/// Support the same max blob length as blob_io, because ultimately +/// all the InMemoryLayer contents end up being written into a delta layer, +/// using the [`crate::tenant::blob_io`]. +const MAX_SUPPORTED_BLOB_LEN: usize = crate::tenant::blob_io::MAX_SUPPORTED_BLOB_LEN; +const MAX_SUPPORTED_BLOB_LEN_BITS: usize = { + let trailing_ones = MAX_SUPPORTED_BLOB_LEN.trailing_ones() as usize; + let leading_zeroes = MAX_SUPPORTED_BLOB_LEN.leading_zeros() as usize; + assert!(trailing_ones + leading_zeroes == std::mem::size_of::() * 8); + trailing_ones +}; + +/// See [`InMemoryLayerInner::index`]. +/// +/// For space-efficiency, this value is a bitfield. +/// +/// Layout: +/// - 1 bit: `will_init` +/// - [`MAX_SUPPORTED_BLOB_LEN_BITS`]: `len` +/// - [`MAX_SUPPORTED_POS_BITS`]: `pos` +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct InMemoryLayerIndexValue(u64); + +impl InMemoryLayerIndexValue { + /// Derive remaining space for pos. + /// TODO: define and enforce a hard limit at the [`Timline::should_roll`] level. + /// => see also [`Self::does_timeline_should_roll_prevent_failure`] + const MAX_SUPPORTED_POS_BITS: usize = { + let remainder = 64 - 1 - MAX_SUPPORTED_BLOB_LEN_BITS; + if remainder < 32 { + panic!("pos can be u32 as per type system, support that"); + } + remainder + }; + + // Layout + const WILL_INIT_RANGE: Range = 0..1; + const LEN_RANGE: Range = + Self::WILL_INIT_RANGE.end..Self::WILL_INIT_RANGE.end + MAX_SUPPORTED_BLOB_LEN_BITS; + const POS_RANGE: Range = + Self::LEN_RANGE.end..Self::LEN_RANGE.end + Self::MAX_SUPPORTED_POS_BITS; + const _ASSERT: () = { + if Self::POS_RANGE.end != 64 { + panic!("we don't want undefined bits for our own sanity") + } + }; + + /// Call this with the checkpoint distance enforced by Timeline::should_roll to check whether + /// [`Self`] can accomodate large enough values. + /// + /// TODO: this check should happen much earlier, ideally at the type system level. + /// When cleaning this up, also look into the s3 max file size check that is performed in delta layer writer. + /// See also [`Self::MAX_SUPPORTED_POS_BITS`]. + pub(crate) fn does_timeline_should_roll_prevent_failure( + checkpoint_distance: u64, + ) -> anyhow::Result<()> { + // keep these checks concsistent with Self::new() + let checkpoint_distance = u32::try_from(checkpoint_distance) + .context("checkpoint distance doesn't fit into u32")?; + checkpoint_distance + .checked_add(MAX_SUPPORTED_BLOB_LEN as u32) + .context("checkpoint distane + max supported blob len would not fit")?; + Ok(()) + } + + /// Checks that the `len` is within the supported range + /// and that `pos + len` fits within a u32. + pub(crate) fn new(unpacked: InMemoryLayerIndexValueUnpacked) -> anyhow::Result { + let InMemoryLayerIndexValueUnpacked { + pos, + len, + will_init, + } = unpacked; + + if len > MAX_SUPPORTED_BLOB_LEN { + anyhow::bail!( + "len exceeds the maximum supported length: len={len} max={MAX_SUPPORTED_BLOB_LEN}", + ); + } + const _: () = { + if MAX_SUPPORTED_BLOB_LEN > u32::MAX as usize { + panic!() + } + }; + let len = u32::try_from(len).expect("see const assertion above"); + + pos.checked_add(len).ok_or_else(|| { + anyhow::anyhow!("pos + len overflows u32, not representable in EphemeralFile") + })?; + + let mut data: u64 = 0; + use bit_field::BitField; + data.set_bits(Self::WILL_INIT_RANGE, if will_init { 1 } else { 0 }); + data.set_bits(Self::LEN_RANGE, len as u64); + data.set_bits(Self::POS_RANGE, pos as u64); + + Ok(Self(data)) + } + + #[inline] + pub(crate) fn unpack(&self) -> InMemoryLayerIndexValueUnpacked { + use bit_field::BitField; + InMemoryLayerIndexValueUnpacked { + will_init: self.0.get_bits(Self::WILL_INIT_RANGE) != 0, + len: self.0.get_bits(Self::LEN_RANGE) as u32, + pos: self.0.get_bits(Self::POS_RANGE) as u32, + } + } +} + +/// Unpacked representation of the bitfielded [`InMemoryLayerIndexValue`]. +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub(crate) struct InMemoryLayerIndexValueUnpacked { + pub(crate) will_init: bool, + pub(crate) len: L, pub(crate) pos: u32, - pub(crate) len: u32, - pub(crate) will_init: bool, // XXX this blows up the size, can we shrink down `len`? +} + +impl InMemoryLayerIndexValueUnpacked { + #[cfg(test)] + pub(crate) fn as_usize(&self) -> InMemoryLayerIndexValueUnpacked { + InMemoryLayerIndexValueUnpacked { + will_init: self.will_init, + len: self.len as usize, + pos: self.pos, + } + } } impl std::fmt::Debug for InMemoryLayerInner { @@ -302,14 +426,19 @@ impl InMemoryLayer { let slice = vec_map.slice_range(lsn_range); for (entry_lsn, index_value) in slice.iter().rev() { + let InMemoryLayerIndexValueUnpacked { + pos, + len, + will_init, + } = index_value.unpack(); reads.entry(key).or_default().push(ValueRead { entry_lsn: *entry_lsn, read: vectored_dio_read::LogicalRead::new( - index_value.pos, - Vec::with_capacity(index_value.len as usize), + pos, + Vec::with_capacity(len as usize), ), }); - if index_value.will_init { + if will_init { break; } } @@ -594,20 +723,23 @@ impl InMemoryLayer { for (key, vec_map) in inner.index.iter() { // Write all page versions - for (lsn, entry) in vec_map.as_slice() { - let InMemoryLayerIndexValue { + for (lsn, entry) in vec_map + .as_slice() + .iter() + .map(|(lsn, entry)| (lsn, entry.unpack())) + { + let InMemoryLayerIndexValueUnpacked { pos, len, will_init, } = entry; - let buf = - Bytes::slice(&file_contents, *pos as usize..(*pos + *len) as usize); + let buf = Bytes::slice(&file_contents, pos as usize..(pos + len) as usize); let (_buf, res) = delta_layer_writer .put_value_bytes( Key::from_compact(*key), *lsn, buf.slice_len(), - *will_init, + will_init, ctx, ) .await; @@ -634,4 +766,63 @@ impl InMemoryLayer { } } -mod vectored_dio_read; +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_index_value() { + use InMemoryLayerIndexValueUnpacked as Unpacked; + let roundtrip = |input| { + let res = InMemoryLayerIndexValue::new(input).expect("this tests expects no errors"); + assert_eq!(res.unpack().as_usize(), input); + }; + + // will_init + roundtrip(Unpacked { + will_init: false, + len: 0, + pos: 0, + }); + roundtrip(Unpacked { + will_init: true, + len: 0, + pos: 0, + }); + + // len + roundtrip(Unpacked { + will_init: false, + len: MAX_SUPPORTED_BLOB_LEN, + pos: 0, + }); + let too_large = Unpacked { + will_init: false, + len: MAX_SUPPORTED_BLOB_LEN + 1, + pos: 0, + }; + assert!(InMemoryLayerIndexValue::new(too_large).is_err()); + + // pos + roundtrip(Unpacked { + will_init: false, + len: 0, + pos: { + let max_as_per_supported_bits: usize = + (1 << InMemoryLayerIndexValue::MAX_SUPPORTED_POS_BITS) - 1; + if max_as_per_supported_bits < u32::MAX as usize { + panic!("current implementation has space for `pos` values > u32::MAX") + } + u32::MAX // but at the type system level, we enforce u32::MAX + }, + }); + + // pos + len + let too_large = Unpacked { + will_init: false, + len: 1, + pos: u32::MAX, + }; + assert!(InMemoryLayerIndexValue::new(too_large).is_err()); + } +} diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 6d920b9a8a..bd44e452ee 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1893,6 +1893,8 @@ impl Timeline { true } else if projected_layer_size >= checkpoint_distance { + // NB: The InMemoryLayerIndexValue::does_timeline_should_roll_prevent_overflow relies on us + // rolling a new layer if we reach checkpoint_distance. info!( "Will roll layer at {} with layer size {} due to layer size ({})", projected_lsn, layer_size, projected_layer_size