packed index value

This commit is contained in:
Christian Schwarz
2024-08-21 17:36:05 +00:00
parent 4c115bc9eb
commit e4af31cd7d
8 changed files with 237 additions and 40 deletions

7
Cargo.lock generated
View File

@@ -936,6 +936,12 @@ dependencies = [
"which",
]
[[package]]
name = "bit_field"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc827186963e592360843fb5ba4b973e145841266c1357f7180c43526f2e5b61"
[[package]]
name = "bitflags"
version = "1.3.2"
@@ -3687,6 +3693,7 @@ dependencies = [
"async-compression",
"async-stream",
"async-trait",
"bit_field",
"byteorder",
"bytes",
"camino",

View File

@@ -65,6 +65,7 @@ axum = { version = "0.6.20", features = ["ws"] }
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.65"
bit_field = "0.10.2"
bstr = "1.0"
byteorder = "1.4"
bytes = "1.0"

View File

@@ -16,6 +16,7 @@ arc-swap.workspace = true
async-compression.workspace = true
async-stream.workspace = true
async-trait.workspace = true
bit_field.workspace = true
byteorder.workspace = true
bytes.workspace = true
camino.workspace = true

View File

@@ -845,6 +845,12 @@ impl Tenant {
});
};
// TODO: should also be rejecting tenant conf changes that violate this check.
if let Err(e) = crate::tenant::storage_layer::inmemory_layer::InMemoryLayerIndexValue::does_timeline_should_roll_prevent_failure(tenant_clone.get_checkpoint_distance()) {
make_broken(&tenant_clone, anyhow::anyhow!(e), BrokenVerbosity::Error);
return Ok(());
}
let mut init_order = init_order;
// take the completion because initial tenant loading will complete when all of
// these tasks complete.

View File

@@ -148,7 +148,7 @@ pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
/// The maximum size of blobs we support. The highest few bits
/// are reserved for compression and other further uses.
const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff;
pub(crate) const MAX_SUPPORTED_BLOB_LEN: usize = 0x0fff_ffff;
pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
@@ -326,7 +326,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
(self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
} else {
// Write a 4-byte length header
if len > MAX_SUPPORTED_LEN {
if len > MAX_SUPPORTED_BLOB_LEN {
return (
(
io_buf.slice_len(),

View File

@@ -4,6 +4,7 @@
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache;
use crate::tenant::storage_layer::inmemory_layer::InMemoryLayerIndexValueUnpacked;
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
use crate::virtual_file::owned_buffers_io::write::Buffer;
@@ -229,34 +230,21 @@ impl EphemeralFile {
buf: &[u8],
will_init: bool,
ctx: &RequestContext,
) -> Result<InMemoryLayerIndexValue, io::Error> {
) -> anyhow::Result<InMemoryLayerIndexValue> {
let pos = self.bytes_written;
let len = u32::try_from(buf.len()).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
anyhow::anyhow!(
"EphemeralFile::write_blob value too large: {}: {e}",
buf.len()
),
)
let index_value = InMemoryLayerIndexValue::new(InMemoryLayerIndexValueUnpacked {
pos: self.bytes_written,
len: buf.len(),
will_init,
})?;
pos.checked_add(len).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"EphemeralFile::write_blob: overflow",
)
})?;
debug_assert_eq!(index_value.unpack().pos, pos);
debug_assert_eq!(index_value.unpack().len as usize, buf.len());
self.buffered_writer
.write_buffered_borrowed(buf, ctx)
.await?;
self.bytes_written += len;
self.bytes_written += index_value.unpack().len; // index_value is checked for overflow in release mode
Ok(InMemoryLayerIndexValue {
pos,
len,
will_init,
})
Ok(index_value)
}
}
@@ -373,11 +361,12 @@ mod tests {
for i in 0..write_nbytes {
assert_eq!(
index_values[i],
InMemoryLayerIndexValue {
InMemoryLayerIndexValue::new(InMemoryLayerIndexValueUnpacked {
pos: i as u32,
len: 1,
will_init: i % 2 == 0,
}
})
.unwrap()
);
let buf = Vec::with_capacity(1);
let (buf_slice, nread) = file

View File

@@ -12,7 +12,7 @@ use crate::tenant::timeline::GetVectoredError;
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::{l0_flush, page_cache};
use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use camino::Utf8PathBuf;
use pageserver_api::key::CompactKey;
@@ -38,6 +38,8 @@ use super::{
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
};
mod vectored_dio_read;
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
@@ -88,11 +90,133 @@ pub struct InMemoryLayerInner {
resource_units: GlobalResourceUnits,
}
#[derive(Debug, PartialEq, Eq)]
pub(crate) struct InMemoryLayerIndexValue {
/// Support the same max blob length as blob_io, because ultimately
/// all the InMemoryLayer contents end up being written into a delta layer,
/// using the [`crate::tenant::blob_io`].
const MAX_SUPPORTED_BLOB_LEN: usize = crate::tenant::blob_io::MAX_SUPPORTED_BLOB_LEN;
const MAX_SUPPORTED_BLOB_LEN_BITS: usize = {
let trailing_ones = MAX_SUPPORTED_BLOB_LEN.trailing_ones() as usize;
let leading_zeroes = MAX_SUPPORTED_BLOB_LEN.leading_zeros() as usize;
assert!(trailing_ones + leading_zeroes == std::mem::size_of::<usize>() * 8);
trailing_ones
};
/// See [`InMemoryLayerInner::index`].
///
/// For space-efficiency, this value is a bitfield.
///
/// Layout:
/// - 1 bit: `will_init`
/// - [`MAX_SUPPORTED_BLOB_LEN_BITS`]: `len`
/// - [`MAX_SUPPORTED_POS_BITS`]: `pos`
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct InMemoryLayerIndexValue(u64);
impl InMemoryLayerIndexValue {
/// Derive remaining space for pos.
/// TODO: define and enforce a hard limit at the [`Timline::should_roll`] level.
/// => see also [`Self::does_timeline_should_roll_prevent_failure`]
const MAX_SUPPORTED_POS_BITS: usize = {
let remainder = 64 - 1 - MAX_SUPPORTED_BLOB_LEN_BITS;
if remainder < 32 {
panic!("pos can be u32 as per type system, support that");
}
remainder
};
// Layout
const WILL_INIT_RANGE: Range<usize> = 0..1;
const LEN_RANGE: Range<usize> =
Self::WILL_INIT_RANGE.end..Self::WILL_INIT_RANGE.end + MAX_SUPPORTED_BLOB_LEN_BITS;
const POS_RANGE: Range<usize> =
Self::LEN_RANGE.end..Self::LEN_RANGE.end + Self::MAX_SUPPORTED_POS_BITS;
const _ASSERT: () = {
if Self::POS_RANGE.end != 64 {
panic!("we don't want undefined bits for our own sanity")
}
};
/// Call this with the checkpoint distance enforced by Timeline::should_roll to check whether
/// [`Self`] can accomodate large enough values.
///
/// TODO: this check should happen much earlier, ideally at the type system level.
/// When cleaning this up, also look into the s3 max file size check that is performed in delta layer writer.
/// See also [`Self::MAX_SUPPORTED_POS_BITS`].
pub(crate) fn does_timeline_should_roll_prevent_failure(
checkpoint_distance: u64,
) -> anyhow::Result<()> {
// keep these checks concsistent with Self::new()
let checkpoint_distance = u32::try_from(checkpoint_distance)
.context("checkpoint distance doesn't fit into u32")?;
checkpoint_distance
.checked_add(MAX_SUPPORTED_BLOB_LEN as u32)
.context("checkpoint distane + max supported blob len would not fit")?;
Ok(())
}
/// Checks that the `len` is within the supported range
/// and that `pos + len` fits within a u32.
pub(crate) fn new(unpacked: InMemoryLayerIndexValueUnpacked<usize>) -> anyhow::Result<Self> {
let InMemoryLayerIndexValueUnpacked {
pos,
len,
will_init,
} = unpacked;
if len > MAX_SUPPORTED_BLOB_LEN {
anyhow::bail!(
"len exceeds the maximum supported length: len={len} max={MAX_SUPPORTED_BLOB_LEN}",
);
}
const _: () = {
if MAX_SUPPORTED_BLOB_LEN > u32::MAX as usize {
panic!()
}
};
let len = u32::try_from(len).expect("see const assertion above");
pos.checked_add(len).ok_or_else(|| {
anyhow::anyhow!("pos + len overflows u32, not representable in EphemeralFile")
})?;
let mut data: u64 = 0;
use bit_field::BitField;
data.set_bits(Self::WILL_INIT_RANGE, if will_init { 1 } else { 0 });
data.set_bits(Self::LEN_RANGE, len as u64);
data.set_bits(Self::POS_RANGE, pos as u64);
Ok(Self(data))
}
#[inline]
pub(crate) fn unpack(&self) -> InMemoryLayerIndexValueUnpacked<u32> {
use bit_field::BitField;
InMemoryLayerIndexValueUnpacked {
will_init: self.0.get_bits(Self::WILL_INIT_RANGE) != 0,
len: self.0.get_bits(Self::LEN_RANGE) as u32,
pos: self.0.get_bits(Self::POS_RANGE) as u32,
}
}
}
/// Unpacked representation of the bitfielded [`InMemoryLayerIndexValue`].
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub(crate) struct InMemoryLayerIndexValueUnpacked<L> {
pub(crate) will_init: bool,
pub(crate) len: L,
pub(crate) pos: u32,
pub(crate) len: u32,
pub(crate) will_init: bool, // XXX this blows up the size, can we shrink down `len`?
}
impl InMemoryLayerIndexValueUnpacked<u32> {
#[cfg(test)]
pub(crate) fn as_usize(&self) -> InMemoryLayerIndexValueUnpacked<usize> {
InMemoryLayerIndexValueUnpacked {
will_init: self.will_init,
len: self.len as usize,
pos: self.pos,
}
}
}
impl std::fmt::Debug for InMemoryLayerInner {
@@ -302,14 +426,19 @@ impl InMemoryLayer {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, index_value) in slice.iter().rev() {
let InMemoryLayerIndexValueUnpacked {
pos,
len,
will_init,
} = index_value.unpack();
reads.entry(key).or_default().push(ValueRead {
entry_lsn: *entry_lsn,
read: vectored_dio_read::LogicalRead::new(
index_value.pos,
Vec::with_capacity(index_value.len as usize),
pos,
Vec::with_capacity(len as usize),
),
});
if index_value.will_init {
if will_init {
break;
}
}
@@ -594,20 +723,23 @@ impl InMemoryLayer {
for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, entry) in vec_map.as_slice() {
let InMemoryLayerIndexValue {
for (lsn, entry) in vec_map
.as_slice()
.iter()
.map(|(lsn, entry)| (lsn, entry.unpack()))
{
let InMemoryLayerIndexValueUnpacked {
pos,
len,
will_init,
} = entry;
let buf =
Bytes::slice(&file_contents, *pos as usize..(*pos + *len) as usize);
let buf = Bytes::slice(&file_contents, pos as usize..(pos + len) as usize);
let (_buf, res) = delta_layer_writer
.put_value_bytes(
Key::from_compact(*key),
*lsn,
buf.slice_len(),
*will_init,
will_init,
ctx,
)
.await;
@@ -634,4 +766,63 @@ impl InMemoryLayer {
}
}
mod vectored_dio_read;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_index_value() {
use InMemoryLayerIndexValueUnpacked as Unpacked;
let roundtrip = |input| {
let res = InMemoryLayerIndexValue::new(input).expect("this tests expects no errors");
assert_eq!(res.unpack().as_usize(), input);
};
// will_init
roundtrip(Unpacked {
will_init: false,
len: 0,
pos: 0,
});
roundtrip(Unpacked {
will_init: true,
len: 0,
pos: 0,
});
// len
roundtrip(Unpacked {
will_init: false,
len: MAX_SUPPORTED_BLOB_LEN,
pos: 0,
});
let too_large = Unpacked {
will_init: false,
len: MAX_SUPPORTED_BLOB_LEN + 1,
pos: 0,
};
assert!(InMemoryLayerIndexValue::new(too_large).is_err());
// pos
roundtrip(Unpacked {
will_init: false,
len: 0,
pos: {
let max_as_per_supported_bits: usize =
(1 << InMemoryLayerIndexValue::MAX_SUPPORTED_POS_BITS) - 1;
if max_as_per_supported_bits < u32::MAX as usize {
panic!("current implementation has space for `pos` values > u32::MAX")
}
u32::MAX // but at the type system level, we enforce u32::MAX
},
});
// pos + len
let too_large = Unpacked {
will_init: false,
len: 1,
pos: u32::MAX,
};
assert!(InMemoryLayerIndexValue::new(too_large).is_err());
}
}

View File

@@ -1893,6 +1893,8 @@ impl Timeline {
true
} else if projected_layer_size >= checkpoint_distance {
// NB: The InMemoryLayerIndexValue::does_timeline_should_roll_prevent_overflow relies on us
// rolling a new layer if we reach checkpoint_distance.
info!(
"Will roll layer at {} with layer size {} due to layer size ({})",
projected_lsn, layer_size, projected_layer_size