Merge from main + revise everything to use u64==usize

This commit is contained in:
Christian Schwarz
2024-08-22 12:42:02 +00:00
10 changed files with 579 additions and 370 deletions

View File

@@ -10,6 +10,7 @@ use pageserver::{
page_cache,
repository::Value,
task_mgr::TaskKind,
tenant::storage_layer::inmemory_layer::SerializedBatch,
tenant::storage_layer::InMemoryLayer,
virtual_file,
};
@@ -67,13 +68,16 @@ async fn ingest(
let layer =
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, entered, &ctx).await?;
let value = Value::Image(Bytes::from(vec![0u8; put_size]));
let data = value.ser()?;
let data = Value::Image(Bytes::from(vec![0u8; put_size]));
let data_ser_size = data.serialized_size().unwrap() as usize;
let ctx = RequestContext::new(
pageserver::task_mgr::TaskKind::WalReceiverConnectionHandler,
pageserver::context::DownloadBehavior::Download,
);
const BATCH_SIZE: usize = 16;
let mut batch = Vec::new();
for i in 0..put_count {
lsn += put_size as u64;
@@ -96,9 +100,17 @@ async fn ingest(
}
}
layer
.put_value(key.to_compact(), lsn, &data, value.will_init(), &ctx)
.await?;
batch.push((key.to_compact(), lsn, data_ser_size, data.clone()));
if batch.len() >= BATCH_SIZE {
let this_batch = std::mem::take(&mut batch);
let serialized = SerializedBatch::from_values(this_batch);
layer.put_batch(serialized, &ctx).await?;
}
}
if !batch.is_empty() {
let this_batch = std::mem::take(&mut batch);
let serialized = SerializedBatch::from_values(this_batch);
layer.put_batch(serialized, &ctx).await?;
}
layer.freeze(lsn + 1).await;

View File

@@ -15,12 +15,11 @@ use crate::{aux_file, repository::*};
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
use pageserver_api::key::{
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
};
use pageserver_api::keyspace::SparseKeySpace;
use pageserver_api::models::AuxFilePolicy;
@@ -37,7 +36,6 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::pausable_failpoint;
use utils::vec_map::{VecMap, VecMapOrdering};
use utils::{bin_ser::BeSer, lsn::Lsn};
/// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
@@ -174,6 +172,7 @@ impl Timeline {
pending_deletions: Vec::new(),
pending_nblocks: 0,
pending_directory_entries: Vec::new(),
pending_bytes: 0,
lsn,
}
}
@@ -1022,21 +1021,33 @@ pub struct DatadirModification<'a> {
// The put-functions add the modifications here, and they are flushed to the
// underlying key-value store by the 'finish' function.
pending_lsns: Vec<Lsn>,
pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
pending_updates: HashMap<Key, Vec<(Lsn, usize, Value)>>,
pending_deletions: Vec<(Range<Key>, Lsn)>,
pending_nblocks: i64,
/// For special "directory" keys that store key-value maps, track the size of the map
/// if it was updated in this modification.
pending_directory_entries: Vec<(DirectoryKind, usize)>,
/// An **approximation** of how large our EphemeralFile write will be when committed.
pending_bytes: usize,
}
impl<'a> DatadirModification<'a> {
// When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
// contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
// additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
/// Get the current lsn
pub(crate) fn get_lsn(&self) -> Lsn {
self.lsn
}
pub(crate) fn approx_pending_bytes(&self) -> usize {
self.pending_bytes
}
/// Set the current lsn
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
ensure!(
@@ -1769,21 +1780,25 @@ impl<'a> DatadirModification<'a> {
// Flush relation and SLRU data blocks, keep metadata.
let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
for (key, values) in self.pending_updates.drain() {
for (lsn, value) in values {
let mut write_batch = Vec::new();
for (lsn, value_ser_size, value) in values {
if key.is_rel_block_key() || key.is_slru_block_key() {
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
writer.put(key, lsn, &value, ctx).await?;
write_batch.push((key.to_compact(), lsn, value_ser_size, value));
} else {
retained_pending_updates
.entry(key)
.or_default()
.push((lsn, value));
retained_pending_updates.entry(key).or_default().push((
lsn,
value_ser_size,
value,
));
}
}
writer.put_batch(write_batch, ctx).await?;
}
self.pending_updates = retained_pending_updates;
self.pending_bytes = 0;
if pending_nblocks != 0 {
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
@@ -1809,17 +1824,20 @@ impl<'a> DatadirModification<'a> {
self.pending_nblocks = 0;
if !self.pending_updates.is_empty() {
// The put_batch call below expects expects the inputs to be sorted by Lsn,
// so we do that first.
let lsn_ordered_batch: VecMap<Lsn, (Key, Value)> = VecMap::from_iter(
self.pending_updates
.drain()
.map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (lsn, (key, val))))
.kmerge_by(|lhs, rhs| lhs.0 < rhs.0),
VecMapOrdering::GreaterOrEqual,
);
// Ordering: the items in this batch do not need to be in any global order, but values for
// a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
// this to do efficient updates to its index.
let batch: Vec<(CompactKey, Lsn, usize, Value)> = self
.pending_updates
.drain()
.flat_map(|(key, values)| {
values.into_iter().map(move |(lsn, val_ser_size, value)| {
(key.to_compact(), lsn, val_ser_size, value)
})
})
.collect::<Vec<_>>();
writer.put_batch(lsn_ordered_batch, ctx).await?;
writer.put_batch(batch, ctx).await?;
}
if !self.pending_deletions.is_empty() {
@@ -1844,6 +1862,8 @@ impl<'a> DatadirModification<'a> {
writer.update_directory_entries_count(kind, count as u64);
}
self.pending_bytes = 0;
Ok(())
}
@@ -1860,7 +1880,7 @@ impl<'a> DatadirModification<'a> {
// Note: we don't check pending_deletions. It is an error to request a
// value that has been removed, deletion only avoids leaking storage.
if let Some(values) = self.pending_updates.get(&key) {
if let Some((_, value)) = values.last() {
if let Some((_, _, value)) = values.last() {
return if let Value::Image(img) = value {
Ok(img.clone())
} else {
@@ -1888,13 +1908,17 @@ impl<'a> DatadirModification<'a> {
fn put(&mut self, key: Key, val: Value) {
let values = self.pending_updates.entry(key).or_default();
// Replace the previous value if it exists at the same lsn
if let Some((last_lsn, last_value)) = values.last_mut() {
if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
if *last_lsn == self.lsn {
*last_value_ser_size = val.serialized_size().unwrap() as usize;
*last_value = val;
return;
}
}
values.push((self.lsn, val));
let val_serialized_size = val.serialized_size().unwrap() as usize;
self.pending_bytes += val_serialized_size;
values.push((self.lsn, val_serialized_size, val));
}
fn delete(&mut self, key_range: Range<Key>) {

View File

@@ -4,14 +4,14 @@
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache;
use crate::tenant::storage_layer::inmemory_layer::InMemoryLayerIndexValueUnpacked;
use crate::tenant::storage_layer::inmemory_layer::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
use crate::virtual_file::owned_buffers_io::write::Buffer;
use crate::virtual_file::{self, owned_buffers_io, VirtualFile};
use anyhow::Context;
use bytes::BytesMut;
use camino::Utf8PathBuf;
use num_traits::Num;
use pageserver_api::shard::TenantShardId;
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
use tracing::error;
@@ -24,7 +24,7 @@ pub struct EphemeralFile {
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
page_cache_file_id: page_cache::FileId,
bytes_written: u32,
bytes_written: u64,
buffered_writer: owned_buffers_io::write::BufferedWriter<
BytesMut,
size_tracking_writer::Writer<VirtualFile>,
@@ -33,8 +33,6 @@ pub struct EphemeralFile {
_gate_guard: utils::sync::gate::GateGuard,
}
use super::storage_layer::inmemory_layer::InMemoryLayerIndexValue;
const TAIL_SZ: usize = 64 * 1024;
impl EphemeralFile {
@@ -100,7 +98,7 @@ impl Drop for EphemeralFile {
}
impl EphemeralFile {
pub(crate) fn len(&self) -> u32 {
pub(crate) fn len(&self) -> u64 {
self.bytes_written
}
@@ -141,32 +139,21 @@ impl EphemeralFile {
/// No guarantees are made about the remaining bytes in `dst`, i.e., assume their contents are random.
pub(crate) async fn read_at_to_end<B: IoBufMut + Send>(
&self,
start: u32,
start: u64,
dst: Slice<B>,
ctx: &RequestContext,
) -> std::io::Result<(Slice<B>, usize)> {
let file_size_tracking_writer = self.buffered_writer.as_inner();
let flushed_offset = u32::try_from(file_size_tracking_writer.bytes_written())
.expect("we don't allow writing more than u32::MAX bytes");
let flushed_offset = file_size_tracking_writer.bytes_written();
let buffer = self.buffered_writer.inspect_buffer();
let buffered = &buffer[0..buffer.pending()];
let dst_cap = u32::try_from(dst.bytes_total())
.with_context(|| {
format!(
"read_aligned: dst.bytes_total() is too large: {}",
dst.len()
)
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
let dst_cap = dst.bytes_total().as_u64();
let end = {
let mut end = start
.checked_add(dst_cap)
.with_context(|| {
format!("read_aligned: offset + dst.bytes_total() is too large: {start} + {dst_cap}",)
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
// saturating_add is correct here because the max file size is u64::MAX, so,
// if start + dst.len() > u64::MAX, then we know it will be a short read
let mut end: u64 = start.saturating_add(dst_cap);
if end > self.bytes_written {
end = self.bytes_written;
}
@@ -175,11 +162,11 @@ impl EphemeralFile {
// inclusive, exclusive
#[derive(Debug)]
struct Range(u32, u32);
impl Range {
fn len(&self) -> u32 {
struct Range<N>(N, N);
impl<N: Num + Clone + Copy + PartialOrd + Ord> Range<N> {
fn len(&self) -> N {
if self.0 > self.1 {
0
N::zero()
} else {
self.1 - self.0
}
@@ -193,7 +180,7 @@ impl EphemeralFile {
let bounds = dst.bounds();
let slice = file
.read_exact_at(
dst.slice(0..written_range.len() as usize),
dst.slice(0..written_range.len().as_usize()),
start as u64,
ctx,
)
@@ -204,14 +191,21 @@ impl EphemeralFile {
};
let dst = if buffered_range.len() > 0 {
let offset_in_buffer =
usize::try_from(buffered_range.0.checked_sub(flushed_offset).unwrap()).unwrap();
let offset_in_buffer = buffered_range
.0
.checked_sub(flushed_offset)
.unwrap()
.as_usize();
let to_copy =
&buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len() as usize)];
&buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len().as_usize())];
let bounds = dst.bounds();
let mut view = dst.slice(
written_range.len() as usize
..written_range.len() as usize + buffered_range.len() as usize,
written_range.len().as_usize()
..written_range
.len()
.checked_add(buffered_range.len())
.unwrap()
.as_usize(),
);
view.as_mut_rust_slice_full_zeroed()
.copy_from_slice(to_copy);
@@ -222,29 +216,45 @@ impl EphemeralFile {
// TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
Ok((dst, (end - start) as usize))
Ok((dst, (end - start).as_usize()))
}
pub(crate) async fn write_blob(
/// Returns the offset at which the first byte of the input was written, for use
/// in constructing indices over the written value.
///
/// Panics if the write is short because there's no way we can recover from that.
/// TODO: make upstack handle this as an error.
pub(crate) async fn write_raw(
&mut self,
buf: &[u8],
will_init: bool,
srcbuf: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<InMemoryLayerIndexValue> {
) -> std::io::Result<u64> {
let pos = self.bytes_written;
let index_value = InMemoryLayerIndexValue::new(InMemoryLayerIndexValueUnpacked {
pos: self.bytes_written,
len: buf.len(),
will_init,
})?;
debug_assert_eq!(index_value.unpack().pos, pos);
debug_assert_eq!(index_value.unpack().len as usize, buf.len());
self.buffered_writer
.write_buffered_borrowed(buf, ctx)
.await?;
self.bytes_written += index_value.unpack().len; // index_value is checked for overflow in release mode
Ok(index_value)
let new_bytes_written = pos.checked_add(srcbuf.len().as_u64()).ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
srcbuf_len = srcbuf.len(),
),
)
})?;
// Write the payload
let nwritten = self
.buffered_writer
.write_buffered_borrowed(srcbuf, ctx)
.await?;
assert_eq!(
nwritten,
srcbuf.len(),
"buffered writer has no short writes"
);
self.bytes_written = new_bytes_written;
Ok(pos)
}
}
@@ -348,29 +358,18 @@ mod tests {
.take(write_nbytes)
.collect();
let mut index_values = Vec::new();
let mut value_offsets = Vec::new();
for i in 0..write_nbytes {
let index_value = file
.write_blob(&content[i..i + 1], i % 2 == 0, &ctx)
.await
.unwrap();
index_values.push(index_value);
let off = file.write_raw(&content[i..i + 1], &ctx).await.unwrap();
value_offsets.push(off);
}
assert!(file.len() as usize == write_nbytes);
for i in 0..write_nbytes {
assert_eq!(
index_values[i],
InMemoryLayerIndexValue::new(InMemoryLayerIndexValueUnpacked {
pos: i as u32,
len: 1,
will_init: i % 2 == 0,
})
.unwrap()
);
assert_eq!(value_offsets[i], i.as_u64());
let buf = Vec::with_capacity(1);
let (buf_slice, nread) = file
.read_at_to_end(i as u32, buf.slice_full(), &ctx)
.read_at_to_end(i.as_u64(), buf.slice_full(), &ctx)
.await
.unwrap();
let buf = buf_slice.into_inner();

View File

@@ -2,7 +2,7 @@
pub mod delta_layer;
pub mod image_layer;
pub(crate) mod inmemory_layer;
pub mod inmemory_layer;
pub(crate) mod layer;
mod layer_desc;
mod layer_name;

View File

@@ -13,6 +13,7 @@ use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::{l0_flush, page_cache};
use anyhow::{anyhow, Context, Result};
use assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
use bytes::Bytes;
use camino::Utf8PathBuf;
use pageserver_api::key::CompactKey;
@@ -32,12 +33,13 @@ use std::fmt::Write;
use std::ops::Range;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::atomic::{AtomicU64, AtomicUsize};
use tokio::sync::{RwLock, RwLockWriteGuard};
use tokio::sync::RwLock;
use super::{
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
};
pub(crate) mod assert_u64_eq_usize;
mod vectored_dio_read;
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
@@ -124,6 +126,7 @@ impl InMemoryLayerIndexValue {
}
remainder
};
const MAX_SUPPORTED_POS: usize = (1 << Self::MAX_SUPPORTED_POS_BITS) - 1;
// Layout
const WILL_INIT_RANGE: Range<usize> = 0..1;
@@ -157,66 +160,67 @@ impl InMemoryLayerIndexValue {
/// Checks that the `len` is within the supported range
/// and that `pos + len` fits within a u32.
pub(crate) fn new(unpacked: InMemoryLayerIndexValueUnpacked<usize>) -> anyhow::Result<Self> {
let InMemoryLayerIndexValueUnpacked {
pos,
#[inline(always)]
fn new(arg: InMemoryLayerIndexValueNewArgs) -> anyhow::Result<Self> {
let InMemoryLayerIndexValueNewArgs {
base_offset,
batch_offset,
len,
will_init,
} = unpacked;
} = arg;
let pos = base_offset
.checked_add(batch_offset)
.ok_or_else(|| anyhow::anyhow!("base_offset + batch_offset overflows u64: base_offset={base_offset} batch_offset={batch_offset}"))?;
if pos.as_usize() > Self::MAX_SUPPORTED_POS {
anyhow::bail!(
"base_offset+batch_offset exceeds the maximum supported value: base_offset={base_offset} batch_offset={batch_offset} (+)={pos} max={max}",
max = Self::MAX_SUPPORTED_POS
);
}
if len > MAX_SUPPORTED_BLOB_LEN {
anyhow::bail!(
"len exceeds the maximum supported length: len={len} max={MAX_SUPPORTED_BLOB_LEN}",
);
}
const _: () = {
if MAX_SUPPORTED_BLOB_LEN > u32::MAX as usize {
panic!()
}
};
let len = u32::try_from(len).expect("see const assertion above");
pos.checked_add(len).ok_or_else(|| {
anyhow::anyhow!("pos + len overflows u32, not representable in EphemeralFile")
})?;
let mut data: u64 = 0;
use bit_field::BitField;
data.set_bits(Self::WILL_INIT_RANGE, if will_init { 1 } else { 0 });
data.set_bits(Self::LEN_RANGE, len as u64);
data.set_bits(Self::POS_RANGE, pos as u64);
data.set_bits(Self::LEN_RANGE, len.as_u64());
data.set_bits(Self::POS_RANGE, pos);
Ok(Self(data))
}
#[inline]
pub(crate) fn unpack(&self) -> InMemoryLayerIndexValueUnpacked<u32> {
#[inline(always)]
fn unpack(&self) -> InMemoryLayerIndexValueUnpacked {
use bit_field::BitField;
InMemoryLayerIndexValueUnpacked {
will_init: self.0.get_bits(Self::WILL_INIT_RANGE) != 0,
len: self.0.get_bits(Self::LEN_RANGE) as u32,
pos: self.0.get_bits(Self::POS_RANGE) as u32,
len: self.0.get_bits(Self::LEN_RANGE),
pos: self.0.get_bits(Self::POS_RANGE),
}
}
}
/// Args to [`InMemoryLayerIndexValue::new`].
#[derive(Clone, Copy)]
struct InMemoryLayerIndexValueNewArgs {
base_offset: u64,
batch_offset: u64,
len: usize,
will_init: bool,
}
/// Unpacked representation of the bitfielded [`InMemoryLayerIndexValue`].
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub(crate) struct InMemoryLayerIndexValueUnpacked<L> {
pub(crate) will_init: bool,
pub(crate) len: L,
pub(crate) pos: u32,
}
impl InMemoryLayerIndexValueUnpacked<u32> {
#[cfg(test)]
pub(crate) fn as_usize(&self) -> InMemoryLayerIndexValueUnpacked<usize> {
InMemoryLayerIndexValueUnpacked {
will_init: self.will_init,
len: self.len as usize,
pos: self.pos,
}
}
struct InMemoryLayerIndexValueUnpacked {
will_init: bool,
len: u64,
pos: u64,
}
impl std::fmt::Debug for InMemoryLayerInner {
@@ -359,7 +363,7 @@ impl InMemoryLayer {
}
}
pub(crate) fn try_len(&self) -> Option<u32> {
pub(crate) fn try_len(&self) -> Option<u64> {
self.inner.try_read().map(|i| i.file.len()).ok()
}
@@ -446,6 +450,16 @@ impl InMemoryLayer {
}
// Execute the reads.
impl vectored_dio_read::File for EphemeralFile {
async fn read_at_to_end<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>(
&'b self,
start: u64,
dst: tokio_epoll_uring::Slice<B>,
ctx: &'a RequestContext,
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
EphemeralFile::read_at_to_end(self, start, dst, ctx).await
}
}
let f = vectored_dio_read::execute(
&inner.file,
reads
@@ -491,14 +505,65 @@ impl InMemoryLayer {
}
}
impl vectored_dio_read::File for EphemeralFile {
async fn read_at_to_end<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>(
&'b self,
start: u32,
dst: tokio_epoll_uring::Slice<B>,
ctx: &'a RequestContext,
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
EphemeralFile::read_at_to_end(self, start, dst, ctx).await
/// Offset of a particular Value within a serialized batch.
struct SerializedBatchOffset {
key: CompactKey,
lsn: Lsn,
// TODO: separate type when we start serde-serializing this value, to avoid coupling
// in-memory representation to serialization format.
value: InMemoryLayerIndexValue,
}
pub struct SerializedBatch {
/// Blobs serialized in EphemeralFile's native format, ready for passing to [`EphemeralFile::write_raw`].
pub(crate) raw: Vec<u8>,
/// Index of values in [`Self::raw`], using offsets relative to the start of the buffer.
offsets: Vec<SerializedBatchOffset>,
/// The highest LSN of any value in the batch
pub(crate) max_lsn: Lsn,
}
impl SerializedBatch {
pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> Self {
// Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by
// [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`]
let buffer_size = batch.iter().map(|i| i.2).sum::<usize>() + 4 * batch.len();
let mut cursor = std::io::Cursor::new(Vec::<u8>::with_capacity(buffer_size));
let mut offsets: Vec<SerializedBatchOffset> = Vec::with_capacity(batch.len());
let mut max_lsn: Lsn = Lsn(0);
for (key, lsn, val_ser_size, val) in batch {
let relative_off = cursor.position();
val.ser_into(&mut cursor)
.expect("Writing into in-memory buffer is infallible");
offsets.push(SerializedBatchOffset {
key,
lsn,
value: InMemoryLayerIndexValue::new(InMemoryLayerIndexValueNewArgs {
base_offset: 0,
batch_offset: relative_off,
len: val_ser_size,
will_init: val.will_init(),
})
.expect("higher-level code ensures that values are within supported ranges"),
});
max_lsn = std::cmp::max(max_lsn, lsn);
}
let buffer = cursor.into_inner();
// Assert that we didn't do any extra allocations while building buffer.
debug_assert!(buffer.len() <= buffer_size);
Self {
raw: buffer,
offsets,
max_lsn,
}
}
}
@@ -562,55 +627,66 @@ impl InMemoryLayer {
})
}
// Write operations
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub async fn put_value(
/// Write path.
///
/// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from.
/// The reason why it's not retryable is that the [`EphemeralFile`] writes are not retryable.
/// TODO: it can be made retryable if we aborted the process on EphemeralFile write errors.
pub async fn put_batch(
&self,
key: CompactKey,
lsn: Lsn,
buf: &[u8],
will_init: bool,
serialized_batch: SerializedBatch,
ctx: &RequestContext,
) -> Result<()> {
) -> anyhow::Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
self.put_value_locked(&mut inner, key, lsn, buf, will_init, ctx)
.await
}
async fn put_value_locked(
&self,
locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
key: CompactKey,
lsn: Lsn,
buf: &[u8],
will_init: bool,
ctx: &RequestContext,
) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
let base_offset = inner.file.len();
let entry = locked_inner
.file
.write_blob(
buf,
will_init,
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build(),
)
.await?;
// Add the base_offset to the batch's index values which are relative to the batch start.
let index_values: Vec<SerializedBatchOffset> = serialized_batch
.offsets
.into_iter()
.map(|SerializedBatchOffset { key, lsn, value }| {
let InMemoryLayerIndexValueUnpacked {
will_init,
len,
pos,
} = value.unpack();
anyhow::Ok(SerializedBatchOffset {
key,
lsn,
value: InMemoryLayerIndexValue::new(InMemoryLayerIndexValueNewArgs {
base_offset,
batch_offset: pos,
len: len.as_usize(),
will_init,
})?,
})
})
.collect::<anyhow::Result<Vec<_>>>()?;
let vec_map = locked_inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, entry).unwrap().0;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!("Key {} at {} already exists", key, lsn);
// Write the batch to the file
inner.file.write_raw(&serialized_batch.raw, ctx).await?;
let new_size = inner.file.len();
let expected_new_len = base_offset
.checked_add(serialized_batch.raw.len().as_u64())
// write_raw would error if we were to overflow u64.
// also InMemoryLayerIndexValue and higher levels in
//the code don't allow the file to grow that large
.unwrap();
assert_eq!(new_size, expected_new_len);
// Update the index with the new values
for SerializedBatchOffset { key, lsn, value } in index_values {
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, value).unwrap().0;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!("Key {} at {} already exists", key, lsn);
}
}
let size = locked_inner.file.len();
locked_inner.resource_units.maybe_publish_size(size as u64);
inner.resource_units.maybe_publish_size(new_size);
Ok(())
}
@@ -772,57 +848,127 @@ mod tests {
#[test]
fn test_index_value() {
const MAX_SUPPORTED_POS: usize = InMemoryLayerIndexValue::MAX_SUPPORTED_POS;
use InMemoryLayerIndexValueNewArgs as Args;
use InMemoryLayerIndexValueUnpacked as Unpacked;
let roundtrip = |input| {
let res = InMemoryLayerIndexValue::new(input).expect("this tests expects no errors");
assert_eq!(res.unpack().as_usize(), input);
let roundtrip = |args, expect: Unpacked| {
let res = InMemoryLayerIndexValue::new(args).expect("this tests expects no errors");
let InMemoryLayerIndexValueUnpacked {
will_init,
len,
pos,
} = res.unpack();
assert_eq!(will_init, expect.will_init);
assert_eq!(len, expect.len);
assert_eq!(pos, expect.pos);
};
// will_init
roundtrip(Unpacked {
will_init: false,
len: 0,
pos: 0,
});
roundtrip(Unpacked {
will_init: true,
len: 0,
pos: 0,
});
// basic roundtrip
for pos in [0, MAX_SUPPORTED_POS] {
for len in [0, MAX_SUPPORTED_BLOB_LEN] {
for will_init in [true, false] {
let expect = Unpacked {
will_init,
len: len.as_u64(),
pos: pos.as_u64(),
};
roundtrip(
Args {
will_init,
base_offset: pos.as_u64(),
batch_offset: 0,
len,
},
expect,
);
roundtrip(
Args {
will_init,
base_offset: 0,
batch_offset: pos.as_u64(),
len,
},
expect,
);
}
}
}
// len
roundtrip(Unpacked {
will_init: false,
len: MAX_SUPPORTED_BLOB_LEN,
pos: 0,
});
let too_large = Unpacked {
// too-large len
let too_large = Args {
will_init: false,
len: MAX_SUPPORTED_BLOB_LEN + 1,
pos: 0,
base_offset: 0,
batch_offset: 0,
};
assert!(InMemoryLayerIndexValue::new(too_large).is_err());
// pos
roundtrip(Unpacked {
will_init: false,
len: 0,
pos: {
let max_as_per_supported_bits: usize =
(1 << InMemoryLayerIndexValue::MAX_SUPPORTED_POS_BITS) - 1;
if max_as_per_supported_bits < u32::MAX as usize {
panic!("current implementation has space for `pos` values > u32::MAX")
}
u32::MAX // but at the type system level, we enforce u32::MAX
},
});
// too-large pos
{
let too_large = Args {
will_init: false,
len: 0,
base_offset: MAX_SUPPORTED_POS.as_u64() + 1,
batch_offset: 0,
};
assert!(InMemoryLayerIndexValue::new(too_large).is_err());
let too_large = Args {
will_init: false,
len: 0,
base_offset: 0,
batch_offset: MAX_SUPPORTED_POS.as_u64() + 1,
};
assert!(InMemoryLayerIndexValue::new(too_large).is_err());
}
// pos + len
let too_large = Unpacked {
will_init: false,
len: 1,
pos: u32::MAX,
};
assert!(InMemoryLayerIndexValue::new(too_large).is_err());
// too large (base_offset + batch_offset)
{
let too_large = Args {
will_init: false,
len: 0,
base_offset: MAX_SUPPORTED_POS.as_u64(),
batch_offset: 1,
};
assert!(InMemoryLayerIndexValue::new(too_large).is_err());
let too_large = Args {
will_init: false,
len: 0,
base_offset: MAX_SUPPORTED_POS.as_u64() - 1,
batch_offset: MAX_SUPPORTED_POS.as_u64() - 1,
};
assert!(InMemoryLayerIndexValue::new(too_large).is_err());
}
// valid special cases
// - area past the max supported pos that is accessible by len
for len in [1, MAX_SUPPORTED_BLOB_LEN] {
roundtrip(
Args {
will_init: false,
len,
base_offset: MAX_SUPPORTED_POS.as_u64(),
batch_offset: 0,
},
Unpacked {
will_init: false,
len: len as u64,
pos: MAX_SUPPORTED_POS.as_u64(),
},
);
roundtrip(
Args {
will_init: false,
len,
base_offset: 0,
batch_offset: MAX_SUPPORTED_POS.as_u64(),
},
Unpacked {
will_init: false,
len: len as u64,
pos: MAX_SUPPORTED_POS.as_u64(),
},
);
}
}
}

View File

@@ -0,0 +1,29 @@
pub(crate) const _ASSERT_U64_EQ_USIZE: () = {
if std::mem::size_of::<usize>() != std::mem::size_of::<u64>() {
panic!("the traits defined in this module assume that usize and u64 can be converted to each other without loss of information");
}
};
pub(crate) trait U64IsUsize {
fn as_usize(self) -> usize;
}
impl U64IsUsize for u64 {
#[inline(always)]
fn as_usize(self) -> usize {
let _ = _ASSERT_U64_EQ_USIZE;
self as usize
}
}
pub(crate) trait UsizeIsU64 {
fn as_u64(self) -> u64;
}
impl UsizeIsU64 for usize {
#[inline(always)]
fn as_u64(self) -> u64 {
let _ = _ASSERT_U64_EQ_USIZE;
self as u64
}
}

View File

@@ -6,7 +6,10 @@ use std::{
use itertools::Itertools;
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
use crate::context::RequestContext;
use crate::{
context::RequestContext,
tenant::storage_layer::inmemory_layer::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64},
};
mod sealed {
pub trait Sealed {}
@@ -16,7 +19,7 @@ mod sealed {
pub trait File: Send {
async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>(
&'b self,
start: u32,
start: u64,
dst: Slice<B>,
ctx: &'a RequestContext,
) -> std::io::Result<(Slice<B>, usize)>;
@@ -24,7 +27,7 @@ pub trait File: Send {
/// A logical read from [`File`]. See [`Self::new`].
pub struct LogicalRead<B: Buffer> {
pos: u32,
pos: u64,
state: RwLockRefCell<LogicalReadState<B>>,
}
@@ -38,7 +41,7 @@ enum LogicalReadState<B: Buffer> {
impl<B: Buffer> LogicalRead<B> {
/// Create a new [`LogicalRead`] from [`File`] of the data in the file in range `[ pos, pos + buf.cap() )`.
pub fn new(pos: u32, buf: B) -> Self {
pub fn new(pos: u64, buf: B) -> Self {
Self {
pos,
state: RwLockRefCell::new(LogicalReadState::NotStarted(buf)),
@@ -100,11 +103,11 @@ where
let (reads, assert_logical_reads): (_, Option<Vec<&'a LogicalRead<B>>>) = (reads, None);
// Plan which parts of which chunks need to be appended to which buffer
let mut by_chunk: BTreeMap<u32, Vec<Interest<B>>> = BTreeMap::new();
let mut by_chunk: BTreeMap<u64, Vec<Interest<B>>> = BTreeMap::new();
struct Interest<'a, B: Buffer> {
logical_read: &'a LogicalRead<B>,
offset_in_chunk: u32,
len: u32,
offset_in_chunk: u64,
len: u64,
}
for logical_read in reads {
let LogicalRead { pos, state } = logical_read;
@@ -129,14 +132,14 @@ where
// plan which chunks we need to read from
let mut remaining = req_len;
let mut chunk_no = *pos / (DIO_CHUNK_SIZE as u32);
let mut offset_in_chunk = usize::try_from(*pos % (DIO_CHUNK_SIZE as u32)).unwrap();
let mut chunk_no = *pos / (DIO_CHUNK_SIZE.as_u64());
let mut offset_in_chunk = pos.as_usize() % DIO_CHUNK_SIZE;
while remaining > 0 {
let remaining_in_chunk = std::cmp::min(remaining, DIO_CHUNK_SIZE - offset_in_chunk);
by_chunk.entry(chunk_no).or_default().push(Interest {
logical_read,
offset_in_chunk: offset_in_chunk as u32,
len: remaining_in_chunk as u32,
offset_in_chunk: offset_in_chunk.as_u64(),
len: remaining_in_chunk.as_u64(),
});
offset_in_chunk = 0;
chunk_no += 1;
@@ -149,20 +152,20 @@ where
// However, we can merge adjacent chunks into batches of MAX_CHUNK_BATCH_SIZE
// so we issue fewer IOs = fewer roundtrips = lower overall latency.
struct PhysicalRead<'a, B: Buffer> {
start_chunk_no: u32,
nchunks: u32,
start_chunk_no: u64,
nchunks: usize,
dsts: Vec<MergedInterest<'a, B>>,
}
struct MergedInterest<'a, B: Buffer> {
logical_read: &'a LogicalRead<B>,
offset_in_physical_read: u32,
len: u32,
offset_in_physical_read: u64,
len: u64,
}
let mut physical_reads: Vec<PhysicalRead<B>> = Vec::new();
let mut by_chunk = by_chunk.into_iter().peekable();
loop {
let mut last_chunk_no = None;
let to_merge: Vec<(u32, Vec<Interest<B>>)> = by_chunk
let to_merge: Vec<(u64, Vec<Interest<B>>)> = by_chunk
.peeking_take_while(|(chunk_no, _)| {
if let Some(last_chunk_no) = last_chunk_no {
if *chunk_no != last_chunk_no + 1 {
@@ -177,7 +180,7 @@ where
let Some(start_chunk_no) = to_merge.first().map(|(chunk_no, _)| *chunk_no) else {
break;
};
let nchunks = to_merge.len() as u32;
let nchunks = to_merge.len();
let dsts = to_merge
.into_iter()
.enumerate()
@@ -190,7 +193,10 @@ where
}| {
MergedInterest {
logical_read,
offset_in_physical_read: i as u32 * DIO_CHUNK_SIZE as u32
offset_in_physical_read: i
.checked_mul(DIO_CHUNK_SIZE)
.unwrap()
.as_u64()
+ offset_in_chunk,
len,
}
@@ -208,7 +214,7 @@ where
// Execute physical reads and fill the logical read buffers
// TODO: prefetch
let get_io_buffer = |nchunks| Vec::with_capacity(nchunks as usize * (DIO_CHUNK_SIZE));
let get_io_buffer = |nchunks| Vec::with_capacity(nchunks * DIO_CHUNK_SIZE);
for PhysicalRead {
start_chunk_no,
nchunks,
@@ -221,13 +227,12 @@ where
if all_done {
continue;
}
let read_offset = start_chunk_no * DIO_CHUNK_SIZE as u32;
let read_offset = start_chunk_no
.checked_mul(DIO_CHUNK_SIZE.as_u64())
.expect("we produce chunk_nos by dividing by DIO_CHUNK_SIZE earlier");
let io_buf = get_io_buffer(nchunks).slice_full();
let req_len = io_buf.len();
let (io_buf_slice, nread) = match file
.read_at_to_end(start_chunk_no * DIO_CHUNK_SIZE as u32, io_buf, ctx)
.await
{
let (io_buf_slice, nread) = match file.read_at_to_end(read_offset, io_buf, ctx).await {
Ok(t) => t,
Err(e) => {
let e = Arc::new(e);
@@ -431,7 +436,7 @@ mod tests {
.collect(),
}
}
fn test_logical_read(&self, pos: u32, len: usize) -> TestLogicalRead {
fn test_logical_read(&self, pos: u64, len: usize) -> TestLogicalRead {
let expected_result = if pos as usize + len > self.content.len() {
Err("InMemoryFile short read".to_string())
} else {
@@ -467,7 +472,7 @@ mod tests {
impl File for InMemoryFile {
async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>(
&'b self,
start: u32,
start: u64,
mut dst: Slice<B>,
_ctx: &'a RequestContext,
) -> std::io::Result<(Slice<B>, usize)> {
@@ -490,13 +495,13 @@ mod tests {
#[derive(Clone)]
struct TestLogicalRead {
pos: u32,
pos: u64,
len: usize,
expected_result: Result<Vec<u8>, String>,
}
impl TestLogicalRead {
fn new(pos: u32, len: usize, expected_result: Result<Vec<u8>, String>) -> Self {
fn new(pos: u64, len: usize, expected_result: Result<Vec<u8>, String>) -> Self {
Self {
pos,
len,
@@ -535,7 +540,7 @@ mod tests {
async fn test_blackbox() {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let cs = DIO_CHUNK_SIZE;
let cs_u32 = u32::try_from(cs).unwrap();
let cs_u64 = cs.as_u64();
let file = InMemoryFile::new_random(10 * cs);
@@ -545,12 +550,12 @@ mod tests {
file.test_logical_read(1, 2),
// gap
// spans adjacent chunks
file.test_logical_read(cs_u32 - 1, 2),
file.test_logical_read(cs_u64 - 1, 2),
// gap
// tail of chunk 3, all of chunk 4, and 2 bytes of chunk 5
file.test_logical_read(3 * cs_u32 - 1, cs + 2),
file.test_logical_read(3 * cs_u64 - 1, cs + 2),
// gap
file.test_logical_read(5 * cs_u32, 1),
file.test_logical_read(5 * cs_u64, 1),
];
let num_test_logical_reads = test_logical_reads.len();
let test_logical_reads_perms = test_logical_reads
@@ -581,7 +586,7 @@ mod tests {
}
struct RecordedRead {
pos: u32,
pos: u64,
req_len: usize,
res: Vec<u8>,
}
@@ -598,7 +603,7 @@ mod tests {
impl<'x> File for RecorderFile<'x> {
async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>(
&'b self,
start: u32,
start: u64,
dst: Slice<B>,
ctx: &'a RequestContext,
) -> std::io::Result<(Slice<B>, usize)> {
@@ -618,8 +623,8 @@ mod tests {
let file = InMemoryFile::new_random(2 * DIO_CHUNK_SIZE);
let a = file.test_logical_read(DIO_CHUNK_SIZE as u32, 10);
let b = file.test_logical_read(DIO_CHUNK_SIZE as u32 + 30, 20);
let a = file.test_logical_read(DIO_CHUNK_SIZE.as_u64(), 10);
let b = file.test_logical_read(DIO_CHUNK_SIZE.as_u64() + 30, 20);
let recorder = RecorderFile::new(&file);
@@ -628,7 +633,7 @@ mod tests {
let recorded = recorder.recorded.borrow();
assert_eq!(recorded.len(), 1);
let RecordedRead { pos, req_len, .. } = &recorded[0];
assert_eq!(*pos, DIO_CHUNK_SIZE as u32);
assert_eq!(*pos, DIO_CHUNK_SIZE.as_u64());
assert_eq!(*req_len, DIO_CHUNK_SIZE);
}
@@ -644,7 +649,7 @@ mod tests {
let mut test_logical_reads = Vec::new();
for i in 3..3 + MAX_CHUNK_BATCH_SIZE + MAX_CHUNK_BATCH_SIZE / 2 {
test_logical_reads
.push(file.test_logical_read(i as u32 * DIO_CHUNK_SIZE as u32 + 10, 1));
.push(file.test_logical_read(i.as_u64() * DIO_CHUNK_SIZE.as_u64() + 10, 1));
}
let recorder = RecorderFile::new(&file);
@@ -673,7 +678,7 @@ mod tests {
let file = InMemoryFile::new_random(3 * DIO_CHUNK_SIZE);
let a = file.test_logical_read(0, 1); // chunk 0
let b = file.test_logical_read(2 * DIO_CHUNK_SIZE as u32, 1); // chunk 2
let b = file.test_logical_read(2 * DIO_CHUNK_SIZE.as_u64(), 1); // chunk 2
let recorder = RecorderFile::new(&file);
@@ -689,13 +694,13 @@ mod tests {
}
{
let RecordedRead { pos, req_len, .. } = &recorded[1];
assert_eq!(*pos, 2 * DIO_CHUNK_SIZE as u32);
assert_eq!(*pos, 2 * DIO_CHUNK_SIZE.as_u64());
assert_eq!(*req_len, DIO_CHUNK_SIZE);
}
}
struct ExpectedRead {
expect_pos: u32,
expect_pos: u64,
expect_len: usize,
respond: Result<Vec<u8>, String>,
}
@@ -728,7 +733,7 @@ mod tests {
impl File for MockFile {
async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>(
&'b self,
start: u32,
start: u64,
mut dst: Slice<B>,
_ctx: &'a RequestContext,
) -> std::io::Result<(Slice<B>, usize)> {
@@ -809,19 +814,19 @@ mod tests {
let test_logical_reads = vec![
// read spanning two batches
TestLogicalRead::new(
DIO_CHUNK_SIZE as u32 / 2,
DIO_CHUNK_SIZE.as_u64() / 2,
MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE,
Err("foo".to_owned()),
),
// second read in failing chunk
TestLogicalRead::new(
(MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE) as u32 + DIO_CHUNK_SIZE as u32 - 10,
(MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE).as_u64() + DIO_CHUNK_SIZE.as_u64() - 10,
5,
Err("foo".to_owned()),
),
// read unaffected
TestLogicalRead::new(
(MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE) as u32 + 2 * DIO_CHUNK_SIZE as u32 + 10,
(MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE).as_u64() + 2 * DIO_CHUNK_SIZE.as_u64() + 10,
5,
Ok(vec![1; 5]),
),
@@ -832,8 +837,8 @@ mod tests {
for test_logical_reads in test_logical_read_perms {
let file = mock_file!(
0, MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE => Ok(vec![0; MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE]),
(MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE) as u32, DIO_CHUNK_SIZE => Err("foo".to_owned()),
(MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE + 2*DIO_CHUNK_SIZE) as u32, DIO_CHUNK_SIZE => Ok(vec![1; DIO_CHUNK_SIZE]),
(MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE).as_u64(), DIO_CHUNK_SIZE => Err("foo".to_owned()),
(MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE + 2*DIO_CHUNK_SIZE).as_u64(), DIO_CHUNK_SIZE => Ok(vec![1; DIO_CHUNK_SIZE]),
);
execute_and_validate_test_logical_reads(&file, test_logical_reads, &ctx).await;
}
@@ -842,12 +847,12 @@ mod tests {
struct TestShortReadsSetup {
ctx: RequestContext,
file: InMemoryFile,
written: u32,
written: u64,
}
fn setup_short_chunk_read_tests() -> TestShortReadsSetup {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
assert!(DIO_CHUNK_SIZE > 20, "test assumption");
let written = (2 * DIO_CHUNK_SIZE - 10) as u32;
let written = (2 * DIO_CHUNK_SIZE - 10).as_u64();
let file = InMemoryFile::new_random(written as usize);
TestShortReadsSetup { ctx, file, written }
}
@@ -869,7 +874,7 @@ mod tests {
let recorded = recorder.recorded.borrow();
assert_eq!(recorded.len(), 1);
let RecordedRead { pos, req_len, res } = &recorded[0];
assert_eq!(*pos, DIO_CHUNK_SIZE as u32);
assert_eq!(*pos, DIO_CHUNK_SIZE.as_u64());
assert_eq!(*req_len, DIO_CHUNK_SIZE);
assert_eq!(res, &file.content[DIO_CHUNK_SIZE..(written as usize)]);
}
@@ -882,10 +887,16 @@ mod tests {
// the logical reads end in the unwritten range.
//
// All should fail with UnexpectedEof and have the same IO pattern.
async fn the_impl(offset_delta: i32) {
async fn the_impl(offset_delta: i64) {
let TestShortReadsSetup { ctx, file, written } = setup_short_chunk_read_tests();
let offset = (written as i32 + offset_delta) as u32;
let offset = u64::try_from(
i64::try_from(written)
.unwrap()
.checked_add(offset_delta)
.unwrap(),
)
.unwrap();
let a = file.test_logical_read(offset, 5);
let recorder = RecorderFile::new(&file);
let a_vr = a.make_logical_read();
@@ -900,7 +911,7 @@ mod tests {
let recorded = recorder.recorded.borrow();
assert_eq!(recorded.len(), 1);
let RecordedRead { pos, req_len, res } = &recorded[0];
assert_eq!(*pos, DIO_CHUNK_SIZE as u32);
assert_eq!(*pos, DIO_CHUNK_SIZE.as_u64());
assert_eq!(*req_len, DIO_CHUNK_SIZE);
assert_eq!(res, &file.content[DIO_CHUNK_SIZE..(written as usize)]);
}

View File

@@ -22,8 +22,8 @@ use handle::ShardTimelineId;
use once_cell::sync::Lazy;
use pageserver_api::{
key::{
KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE,
NON_INHERITED_SPARSE_RANGE,
CompactKey, KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX,
NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE,
},
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
models::{
@@ -44,10 +44,8 @@ use tokio::{
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{
bin_ser::BeSer,
fs_ext, pausable_failpoint,
sync::gate::{Gate, GateGuard},
vec_map::VecMap,
};
use std::pin::pin;
@@ -137,7 +135,10 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::{config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized};
use super::{
config::TenantConf, storage_layer::inmemory_layer, storage_layer::LayerVisibilityHint,
upload_queue::NotInitialized,
};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{
@@ -1463,7 +1464,6 @@ impl Timeline {
tracing::warn!("Lock conflict while reading size of open layer");
return;
};
let current_size = current_size as u64;
let current_lsn = self.get_last_record_lsn();
@@ -3592,34 +3592,6 @@ impl Timeline {
return Err(FlushLayerError::Cancelled);
}
// FIXME(auxfilesv2): support multiple metadata key partitions might need initdb support as well?
// This code path will not be hit during regression tests. After #7099 we have a single partition
// with two key ranges. If someone wants to fix initdb optimization in the future, this might need
// to be fixed.
// For metadata, always create delta layers.
let delta_layer = if !metadata_partition.parts.is_empty() {
assert_eq!(
metadata_partition.parts.len(),
1,
"currently sparse keyspace should only contain a single metadata keyspace"
);
let metadata_keyspace = &metadata_partition.parts[0];
self.create_delta_layer(
&frozen_layer,
Some(
metadata_keyspace.0.ranges.first().unwrap().start
..metadata_keyspace.0.ranges.last().unwrap().end,
),
ctx,
)
.await
.map_err(|e| FlushLayerError::from_anyhow(self, e))?
} else {
None
};
// For image layers, we add them immediately into the layer map.
let mut layers_to_upload = Vec::new();
layers_to_upload.extend(
self.create_image_layers(
@@ -3630,13 +3602,27 @@ impl Timeline {
)
.await?,
);
if let Some(delta_layer) = delta_layer {
layers_to_upload.push(delta_layer.clone());
(layers_to_upload, Some(delta_layer))
} else {
(layers_to_upload, None)
if !metadata_partition.parts.is_empty() {
assert_eq!(
metadata_partition.parts.len(),
1,
"currently sparse keyspace should only contain a single metadata keyspace"
);
layers_to_upload.extend(
self.create_image_layers(
// Safety: create_image_layers treat sparse keyspaces differently that it does not scan
// every single key within the keyspace, and therefore, it's safe to force converting it
// into a dense keyspace before calling this function.
&metadata_partition.into_dense(),
self.initdb_lsn,
ImageLayerCreationMode::Initial,
ctx,
)
.await?,
);
}
(layers_to_upload, None)
} else {
// Normal case, write out a L0 delta layer file.
// `create_delta_layer` will not modify the layer map.
@@ -4046,8 +4032,6 @@ impl Timeline {
mode: ImageLayerCreationMode,
start: Key,
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
assert!(!matches!(mode, ImageLayerCreationMode::Initial));
// Metadata keys image layer creation.
let mut reconstruct_state = ValuesReconstructState::default();
let data = self
@@ -4213,15 +4197,13 @@ impl Timeline {
"metadata keys must be partitioned separately"
);
}
if mode == ImageLayerCreationMode::Initial {
return Err(CreateImageLayersError::Other(anyhow::anyhow!("no image layer should be created for metadata keys when flushing frozen layers")));
}
if mode == ImageLayerCreationMode::Try && !check_for_image_layers {
// Skip compaction if there are not enough updates. Metadata compaction will do a scan and
// might mess up with evictions.
start = img_range.end;
continue;
}
// For initial and force modes, we always generate image layers for metadata keys.
} else if let ImageLayerCreationMode::Try = mode {
// check_for_image_layers = false -> skip
// check_for_image_layers = true -> check time_for_new_image_layer -> skip/generate
@@ -4229,7 +4211,8 @@ impl Timeline {
start = img_range.end;
continue;
}
} else if let ImageLayerCreationMode::Force = mode {
}
if let ImageLayerCreationMode::Force = mode {
// When forced to create image layers, we might try and create them where they already
// exist. This mode is only used in tests/debug.
let layers = self.layers.read().await;
@@ -4243,6 +4226,7 @@ impl Timeline {
img_range.start,
img_range.end
);
start = img_range.end;
continue;
}
}
@@ -5593,46 +5577,6 @@ enum OpenLayerAction {
}
impl<'a> TimelineWriter<'a> {
/// Put a new page version that can be constructed from a WAL record
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
pub(crate) async fn put(
&mut self,
key: Key,
lsn: Lsn,
value: &Value,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Avoid doing allocations for "small" values.
// In the regression test suite, the limit of 256 avoided allocations in 95% of cases:
// https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061
let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
value.ser_into(&mut buf)?;
let buf_size: u64 = buf.len().try_into().expect("oversized value buf");
let action = self.get_open_layer_action(lsn, buf_size);
let layer = self.handle_open_layer_action(lsn, action, ctx).await?;
let res = layer
.put_value(key.to_compact(), lsn, &buf, value.will_init(), ctx)
.await;
if res.is_ok() {
// Update the current size only when the entire write was ok.
// In case of failures, we may have had partial writes which
// render the size tracking out of sync. That's ok because
// the checkpoint distance should be significantly smaller
// than the S3 single shot upload limit of 5GiB.
let state = self.write_guard.as_mut().unwrap();
state.current_size += buf_size;
state.prev_lsn = Some(lsn);
state.max_lsn = std::cmp::max(state.max_lsn, Some(lsn));
}
res
}
async fn handle_open_layer_action(
&mut self,
at: Lsn,
@@ -5738,18 +5682,58 @@ impl<'a> TimelineWriter<'a> {
}
/// Put a batch of keys at the specified Lsns.
///
/// The batch is sorted by Lsn (enforced by usage of [`utils::vec_map::VecMap`].
pub(crate) async fn put_batch(
&mut self,
batch: VecMap<Lsn, (Key, Value)>,
batch: Vec<(CompactKey, Lsn, usize, Value)>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
for (lsn, (key, val)) in batch {
self.put(key, lsn, &val, ctx).await?
if batch.is_empty() {
return Ok(());
}
Ok(())
let serialized_batch = inmemory_layer::SerializedBatch::from_values(batch);
let batch_max_lsn = serialized_batch.max_lsn;
let buf_size: u64 = serialized_batch.raw.len() as u64;
let action = self.get_open_layer_action(batch_max_lsn, buf_size);
let layer = self
.handle_open_layer_action(batch_max_lsn, action, ctx)
.await?;
let res = layer.put_batch(serialized_batch, ctx).await;
if res.is_ok() {
// Update the current size only when the entire write was ok.
// In case of failures, we may have had partial writes which
// render the size tracking out of sync. That's ok because
// the checkpoint distance should be significantly smaller
// than the S3 single shot upload limit of 5GiB.
let state = self.write_guard.as_mut().unwrap();
state.current_size += buf_size;
state.prev_lsn = Some(batch_max_lsn);
state.max_lsn = std::cmp::max(state.max_lsn, Some(batch_max_lsn));
}
res
}
#[cfg(test)]
/// Test helper, for tests that would like to poke individual values without composing a batch
pub(crate) async fn put(
&mut self,
key: Key,
lsn: Lsn,
value: &Value,
ctx: &RequestContext,
) -> anyhow::Result<()> {
use utils::bin_ser::BeSer;
let val_ser_size = value.serialized_size().unwrap() as usize;
self.put_batch(
vec![(key.to_compact(), lsn, val_ser_size, value.clone())],
ctx,
)
.await
}
pub(crate) async fn delete_batch(

View File

@@ -27,8 +27,8 @@ use super::TaskStateUpdate;
use crate::{
context::RequestContext,
metrics::{LIVE_CONNECTIONS, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
task_mgr::TaskKind,
task_mgr::WALRECEIVER_RUNTIME,
pgdatadir_mapping::DatadirModification,
task_mgr::{TaskKind, WALRECEIVER_RUNTIME},
tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
walingest::WalIngest,
walrecord::DecodedWALRecord,
@@ -345,7 +345,10 @@ pub(super) async fn handle_walreceiver_connection(
// Commit every ingest_batch_size records. Even if we filtered out
// all records, we still need to call commit to advance the LSN.
uncommitted_records += 1;
if uncommitted_records >= ingest_batch_size {
if uncommitted_records >= ingest_batch_size
|| modification.approx_pending_bytes()
> DatadirModification::MAX_PENDING_BYTES
{
WAL_INGEST
.records_committed
.inc_by(uncommitted_records - filtered_records);

View File

@@ -78,6 +78,7 @@ where
.expect("must not use after we returned an error")
}
/// Guarantees that if Ok() is returned, all bytes in `chunk` have been accepted.
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn write_buffered<S: IoBuf + Send>(
&mut self,