This commit is contained in:
Christian Schwarz
2024-07-29 10:57:08 +00:00
parent 168913bdf0
commit 332ca2bf09
8 changed files with 206 additions and 366 deletions

View File

@@ -1,15 +1,10 @@
use std::{num::NonZeroUsize, sync::Arc};
use crate::tenant::ephemeral_file;
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum L0FlushConfig {
PageCached,
#[serde(rename_all = "snake_case")]
Direct {
max_concurrency: NonZeroUsize,
},
Direct { max_concurrency: NonZeroUsize },
}
impl Default for L0FlushConfig {
@@ -25,14 +20,12 @@ impl Default for L0FlushConfig {
pub struct L0FlushGlobalState(Arc<Inner>);
pub enum Inner {
PageCached,
Direct { semaphore: tokio::sync::Semaphore },
}
impl L0FlushGlobalState {
pub fn new(config: L0FlushConfig) -> Self {
match config {
L0FlushConfig::PageCached => Self(Arc::new(Inner::PageCached)),
L0FlushConfig::Direct { max_concurrency } => {
let semaphore = tokio::sync::Semaphore::new(max_concurrency.get());
Self(Arc::new(Inner::Direct { semaphore }))
@@ -44,13 +37,3 @@ impl L0FlushGlobalState {
&self.0
}
}
impl L0FlushConfig {
pub(crate) fn prewarm_on_write(&self) -> ephemeral_file::PrewarmPageCacheOnWrite {
use L0FlushConfig::*;
match self {
PageCached => ephemeral_file::PrewarmPageCacheOnWrite::Yes,
Direct { .. } => ephemeral_file::PrewarmPageCacheOnWrite::No,
}
}
}

View File

@@ -2,7 +2,6 @@
//! Low-level Block-oriented I/O functions
//!
use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
@@ -81,9 +80,7 @@ impl<'a> Deref for BlockLease<'a> {
/// Unlike traits, we also support the read function to be async though.
pub(crate) enum BlockReaderRef<'a> {
FileBlockReader(&'a FileBlockReader<'a>),
EphemeralFile(&'a EphemeralFile),
Adapter(Adapter<&'a DeltaLayerInner>),
Slice(&'a [u8]),
#[cfg(test)]
TestDisk(&'a super::disk_btree::tests::TestDisk),
#[cfg(test)]
@@ -100,9 +97,7 @@ impl<'a> BlockReaderRef<'a> {
use BlockReaderRef::*;
match self {
FileBlockReader(r) => r.read_blk(blknum, ctx).await,
EphemeralFile(r) => r.read_blk(blknum, ctx).await,
Adapter(r) => r.read_blk(blknum, ctx).await,
Slice(s) => Self::read_blk_slice(s, blknum),
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
#[cfg(test)]
@@ -111,24 +106,6 @@ impl<'a> BlockReaderRef<'a> {
}
}
impl<'a> BlockReaderRef<'a> {
fn read_blk_slice(slice: &[u8], blknum: u32) -> std::io::Result<BlockLease> {
let start = (blknum as usize).checked_mul(PAGE_SZ).unwrap();
let end = start.checked_add(PAGE_SZ).unwrap();
if end > slice.len() {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("slice too short, len={} end={}", slice.len(), end),
));
}
let slice = &slice[start..end];
let page_sized: &[u8; PAGE_SZ] = slice
.try_into()
.expect("we add PAGE_SZ to start, so the slice must have PAGE_SZ");
Ok(BlockLease::Slice(page_sized))
}
}
///
/// A "cursor" for efficiently reading multiple pages from a BlockReader
///

View File

@@ -4,7 +4,6 @@
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache;
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::{self, VirtualFile};
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
@@ -20,8 +19,9 @@ pub struct EphemeralFile {
rw: page_caching::RW,
}
mod page_caching;
pub(crate) use page_caching::PrewarmOnWrite as PrewarmPageCacheOnWrite;
pub(super) mod page_caching;
use super::storage_layer::inmemory_layer::InMemoryLayerIndexValue;
mod zero_padded_read_write;
impl EphemeralFile {
@@ -52,16 +52,14 @@ impl EphemeralFile {
)
.await?;
let prewarm = conf.l0_flush.prewarm_on_write();
Ok(EphemeralFile {
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
rw: page_caching::RW::new(file, prewarm, gate_guard),
rw: page_caching::RW::new(file, gate_guard),
})
}
pub(crate) fn len(&self) -> u64 {
pub(crate) fn len(&self) -> u32 {
self.rw.bytes_written()
}
@@ -74,37 +72,40 @@ impl EphemeralFile {
self.rw.load_to_vec(ctx).await
}
pub(crate) async fn read_blk(
pub(crate) async fn read_page(
&self,
blknum: u32,
dst: page_caching::PageBuf,
ctx: &RequestContext,
) -> Result<BlockLease, io::Error> {
self.rw.read_blk(blknum, ctx).await
) -> Result<page_caching::ReadResult, io::Error> {
self.rw.read_page(blknum, dst, ctx).await
}
pub(crate) async fn write_blob(
&mut self,
srcbuf: &[u8],
buf: &[u8],
ctx: &RequestContext,
) -> Result<u64, io::Error> {
) -> Result<InMemoryLayerIndexValue, io::Error> {
let pos = self.rw.bytes_written();
let len = u32::try_from(buf.len()).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
anyhow::anyhow!(
"EphemeralFile::write_blob value too large: {}: {e}",
buf.len()
),
)
})?;
pos.checked_add(len).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"EphemeralFile::write_blob: overflow",
)
})?;
// Write the length field
if srcbuf.len() < 0x80 {
// short one-byte length header
let len_buf = [srcbuf.len() as u8];
self.rw.write_all_borrowed(buf, ctx).await?;
self.rw.write_all_borrowed(&len_buf, ctx).await?;
} else {
let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
len_buf[0] |= 0x80;
self.rw.write_all_borrowed(&len_buf, ctx).await?;
}
// Write the payload
self.rw.write_all_borrowed(srcbuf, ctx).await?;
Ok(pos)
Ok(InMemoryLayerIndexValue { pos, len })
}
}
@@ -117,19 +118,11 @@ pub fn is_ephemeral_file(filename: &str) -> bool {
}
}
impl BlockReader for EphemeralFile {
fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::context::DownloadBehavior;
use crate::task_mgr::TaskKind;
use crate::tenant::block_io::BlockReaderRef;
use rand::{thread_rng, RngCore};
use std::fs;
use std::str::FromStr;
@@ -160,69 +153,6 @@ mod tests {
Ok((conf, tenant_shard_id, timeline_id, ctx))
}
#[tokio::test]
async fn test_ephemeral_blobs() -> Result<(), io::Error> {
let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
let gate = utils::sync::gate::Gate::default();
let entered = gate.enter().unwrap();
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, entered, &ctx).await?;
let pos_foo = file.write_blob(b"foo", &ctx).await?;
assert_eq!(
b"foo",
file.block_cursor()
.read_blob(pos_foo, &ctx)
.await?
.as_slice()
);
let pos_bar = file.write_blob(b"bar", &ctx).await?;
assert_eq!(
b"foo",
file.block_cursor()
.read_blob(pos_foo, &ctx)
.await?
.as_slice()
);
assert_eq!(
b"bar",
file.block_cursor()
.read_blob(pos_bar, &ctx)
.await?
.as_slice()
);
let mut blobs = Vec::new();
for i in 0..10000 {
let data = Vec::from(format!("blob{}", i).as_bytes());
let pos = file.write_blob(&data, &ctx).await?;
blobs.push((pos, data));
}
// also test with a large blobs
for i in 0..100 {
let data = format!("blob{}", i).as_bytes().repeat(100);
let pos = file.write_blob(&data, &ctx).await?;
blobs.push((pos, data));
}
let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
for (pos, expected) in blobs {
let actual = cursor.read_blob(pos, &ctx).await?;
assert_eq!(actual, expected);
}
// Test a large blob that spans multiple pages
let mut large_data = vec![0; 20000];
thread_rng().fill_bytes(&mut large_data);
let pos_large = file.write_blob(&large_data, &ctx).await?;
let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
assert_eq!(result, large_data);
Ok(())
}
#[tokio::test]
async fn ephemeral_file_holds_gate_open() {
const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);

View File

@@ -3,11 +3,9 @@
use crate::context::RequestContext;
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::BlockLease;
use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
use crate::virtual_file::VirtualFile;
use once_cell::sync::Lazy;
use std::io::{self, ErrorKind};
use std::ops::{Deref, Range};
use tokio_epoll_uring::BoundedBuf;
@@ -23,28 +21,77 @@ pub struct RW {
_gate_guard: utils::sync::gate::GateGuard,
}
/// When we flush a block to the underlying [`crate::virtual_file::VirtualFile`],
/// should we pre-warm the [`crate::page_cache`] with the contents?
#[derive(Clone, Copy)]
pub enum PrewarmOnWrite {
Yes,
No,
/// Result of [`RW::read_page`].
pub(crate) enum ReadResult<'a> {
EphemeralFileMutableTail(PageBuf, &'a [u8; PAGE_SZ]),
Owned(PageBuf),
}
impl ReadResult<'_> {
pub(crate) fn contents(&self) -> &[u8; PAGE_SZ] {
match self {
ReadResult::EphemeralFileMutableTail(_, buf) => buf,
ReadResult::Owned(buf) => buf.deref(),
}
}
pub(crate) fn into_page_buf(self) -> PageBuf {
match self {
ReadResult::EphemeralFileMutableTail(buf, _) => buf,
ReadResult::Owned(buf) => buf,
}
}
}
pub(crate) struct PageBuf(Box<[u8; PAGE_SZ]>);
impl From<Box<[u8; PAGE_SZ]>> for PageBuf {
fn from(buf: Box<[u8; PAGE_SZ]>) -> Self {
Self(buf)
}
}
impl Deref for PageBuf {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
&self.0
}
}
// Safety: `PageBuf` is a fixed-size buffer that is zero-initialized.
unsafe impl tokio_epoll_uring::IoBuf for PageBuf {
fn stable_ptr(&self) -> *const u8 {
self.0.as_ptr()
}
fn bytes_init(&self) -> usize {
self.0.len()
}
fn bytes_total(&self) -> usize {
self.0.len()
}
}
// Safety: the `&mut self` guarantees no aliasing. `set_init` is safe
// because the buffer is always fully initialized.
unsafe impl tokio_epoll_uring::IoBufMut for PageBuf {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.0.as_mut_ptr()
}
unsafe fn set_init(&mut self, pos: usize) {
// this is a no-op because the buffer is always fully initialized
assert!(pos <= self.0.len());
}
}
impl RW {
pub fn new(
file: VirtualFile,
prewarm_on_write: PrewarmOnWrite,
_gate_guard: utils::sync::gate::GateGuard,
) -> Self {
pub fn new(file: VirtualFile, _gate_guard: utils::sync::gate::GateGuard) -> Self {
let page_cache_file_id = page_cache::next_file_id();
Self {
page_cache_file_id,
rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new(
page_cache_file_id,
file,
prewarm_on_write,
)),
rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new(file)),
_gate_guard,
}
}
@@ -57,17 +104,17 @@ impl RW {
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> Result<usize, io::Error> {
) -> Result<(), io::Error> {
// It doesn't make sense to proactively fill the page cache on the Pageserver write path
// because Compute is unlikely to access recently written data.
self.rw.write_all_borrowed(srcbuf, ctx).await
self.rw.write_all_borrowed(srcbuf, ctx).await.map(|_| ())
}
pub(crate) fn bytes_written(&self) -> u64 {
pub(crate) fn bytes_written(&self) -> u32 {
self.rw.bytes_written()
}
/// Load all blocks that can be read via [`Self::read_blk`] into a contiguous memory buffer.
/// Load all blocks that can be read via [`Self::read_page`] into a contiguous memory buffer.
///
/// This includes the blocks that aren't yet flushed to disk by the internal buffered writer.
/// The last block is zero-padded to [`PAGE_SZ`], so, the returned buffer is always a multiple of [`PAGE_SZ`].
@@ -104,45 +151,24 @@ impl RW {
Ok(vec)
}
pub(crate) async fn read_blk(
pub(crate) async fn read_page(
&self,
blknum: u32,
buf: PageBuf,
ctx: &RequestContext,
) -> Result<BlockLease, io::Error> {
) -> Result<ReadResult, io::Error> {
match self.rw.read_blk(blknum).await? {
zero_padded_read_write::ReadResult::NeedsReadFromWriter { writer } => {
let cache = page_cache::get();
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
let buf = writer
.file
.read_exact_at(buf.slice_full(), blknum as u64 * PAGE_SZ as u64, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum,
self.rw.as_writer().file.path,
e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(write_guard) => {
let write_guard = writer
.file
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64, ctx)
.await?;
let read_guard = write_guard.mark_valid();
return Ok(BlockLease::PageReadGuard(read_guard));
}
}
}
zero_padded_read_write::ReadResult::ServedFromZeroPaddedMutableTail { buffer } => {
Ok(BlockLease::EphemeralFileMutableTail(buffer))
.map(|slice| slice.into_inner())?;
Ok(ReadResult::Owned(buf))
}
zero_padded_read_write::ReadResult::ServedFromZeroPaddedMutableTail {
buffer: tail_ref,
} => Ok(ReadResult::EphemeralFileMutableTail(buf, tail_ref)),
}
}
}
@@ -172,22 +198,14 @@ impl Drop for RW {
}
struct PreWarmingWriter {
prewarm_on_write: PrewarmOnWrite,
nwritten_blocks: u32,
page_cache_file_id: page_cache::FileId,
file: VirtualFile,
}
impl PreWarmingWriter {
fn new(
page_cache_file_id: page_cache::FileId,
file: VirtualFile,
prewarm_on_write: PrewarmOnWrite,
) -> Self {
fn new(file: VirtualFile) -> Self {
Self {
prewarm_on_write,
nwritten_blocks: 0,
page_cache_file_id,
file,
}
}
@@ -241,49 +259,6 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi
let nblocks = buflen / PAGE_SZ;
let nblocks32 = u32::try_from(nblocks).unwrap();
if matches!(self.prewarm_on_write, PrewarmOnWrite::Yes) {
// Pre-warm page cache with the contents.
// At least in isolated bulk ingest benchmarks (test_bulk_insert.py), the pre-warming
// benefits the code that writes InMemoryLayer=>L0 layers.
let cache = page_cache::get();
static CTX: Lazy<RequestContext> = Lazy::new(|| {
RequestContext::new(
crate::task_mgr::TaskKind::EphemeralFilePreWarmPageCache,
crate::context::DownloadBehavior::Error,
)
});
for blknum_in_buffer in 0..nblocks {
let blk_in_buffer =
&buf[blknum_in_buffer * PAGE_SZ..(blknum_in_buffer + 1) * PAGE_SZ];
let blknum = self
.nwritten_blocks
.checked_add(blknum_in_buffer as u32)
.unwrap();
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, &CTX)
.await
{
Err(e) => {
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
}
Ok(v) => match v {
page_cache::ReadBufResult::Found(_guard) => {
// This function takes &mut self, so, it shouldn't be possible to reach this point.
unreachable!("we just wrote block {blknum} to the VirtualFile, which is owned by Self, \
and this function takes &mut self, so, no concurrent read_blk is possible");
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
write_guard.copy_from_slice(blk_in_buffer);
let _ = write_guard.mark_valid();
}
},
}
}
}
self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap();
Ok((buflen, buf))
}

View File

@@ -19,6 +19,8 @@
mod zero_padded;
use anyhow::Context;
use crate::{
context::RequestContext,
page_cache::PAGE_SZ,
@@ -69,10 +71,12 @@ where
self.buffered_writer.write_buffered_borrowed(buf, ctx).await
}
pub fn bytes_written(&self) -> u64 {
pub fn bytes_written(&self) -> u32 {
let flushed_offset = self.buffered_writer.as_inner().bytes_written();
let flushed_offset = u32::try_from(flushed_offset).with_context(|| format!("buffered_writer.write_buffered_borrowed() disallows sizes larger than u32::MAX: {flushed_offset}")).unwrap();
let buffer: &zero_padded::Buffer<TAIL_SZ> = self.buffered_writer.inspect_buffer();
flushed_offset + u64::try_from(buffer.pending()).unwrap()
let buffer_pending = u32::try_from(buffer.pending()).expect("TAIL_SZ is < u32::MAX");
flushed_offset.checked_add(buffer_pending).with_context(|| format!("buffered_writer.write_buffered_borrowed() disallows sizes larger than u32::MAX: {flushed_offset} + {buffer_pending}")).unwrap()
}
/// Get a slice of all blocks that [`Self::read_blk`] would return as [`ReadResult::ServedFromZeroPaddedMutableTail`].
@@ -91,10 +95,12 @@ where
}
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<ReadResult<'_, W>, std::io::Error> {
let flushed_offset = self.buffered_writer.as_inner().bytes_written();
let flushed_offset =
u32::try_from(self.buffered_writer.as_inner().bytes_written()).expect("");
let buffer: &zero_padded::Buffer<TAIL_SZ> = self.buffered_writer.inspect_buffer();
let buffered_offset = flushed_offset + u64::try_from(buffer.pending()).unwrap();
let read_offset = (blknum as u64) * (PAGE_SZ as u64);
let buffered_offset = flushed_offset + u32::try_from(buffer.pending()).unwrap();
let page_sz = u32::try_from(PAGE_SZ).unwrap();
let read_offset = blknum.checked_mul(page_sz).unwrap();
// The trailing page ("block") might only be partially filled,
// yet the blob_io code relies on us to return a full PAGE_SZed slice anyway.
@@ -103,28 +109,28 @@ where
// DeltaLayer probably has the same issue, not sure why it needs no special treatment.
// => check here that the read doesn't go beyond this potentially trailing
// => the zero-padding is done in the `else` branch below
let blocks_written = if buffered_offset % (PAGE_SZ as u64) == 0 {
buffered_offset / (PAGE_SZ as u64)
let blocks_written = if buffered_offset % page_sz == 0 {
buffered_offset / page_sz
} else {
(buffered_offset / (PAGE_SZ as u64)) + 1
(buffered_offset / page_sz) + 1
};
if (blknum as u64) >= blocks_written {
if blknum >= blocks_written {
return Err(std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!("read past end of ephemeral_file: read=0x{read_offset:x} buffered=0x{buffered_offset:x} flushed=0x{flushed_offset}")));
}
// assertions for the `if-else` below
assert_eq!(
flushed_offset % (TAIL_SZ as u64), 0,
flushed_offset % (u32::try_from(TAIL_SZ).unwrap()), 0,
"we only use write_buffered_borrowed to write to the buffered writer, so it's guaranteed that flushes happen buffer.cap()-sized chunks"
);
assert_eq!(
flushed_offset % (PAGE_SZ as u64),
flushed_offset % page_sz,
0,
"the logic below can't handle if the page is spread across the flushed part and the buffer"
);
if read_offset < flushed_offset {
assert!(read_offset + (PAGE_SZ as u64) <= flushed_offset);
assert!(read_offset + page_sz <= flushed_offset);
Ok(ReadResult::NeedsReadFromWriter {
writer: self.as_writer(),
})

View File

@@ -64,7 +64,7 @@ use std::os::unix::fs::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::OnceCell;
use tokio_epoll_uring::IoBufMut;
use tokio_epoll_uring::IoBuf;
use tracing::*;
use utils::{
@@ -458,7 +458,7 @@ impl DeltaLayerWriterInner {
ctx: &RequestContext,
) -> (FullSlice<Buf>, anyhow::Result<()>)
where
Buf: IoBufMut + Send,
Buf: IoBuf + Send,
{
assert!(
self.lsn_range.start <= lsn,
@@ -666,7 +666,7 @@ impl DeltaLayerWriter {
ctx: &RequestContext,
) -> (FullSlice<Buf>, anyhow::Result<()>)
where
Buf: IoBufMut + Send,
Buf: IoBuf + Send,
{
self.inner
.as_mut()

View File

@@ -8,13 +8,14 @@ use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, Value};
use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
use crate::tenant::ephemeral_file::page_caching::PageBuf;
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::{l0_flush, page_cache, walrecord};
use crate::{l0_flush, page_cache};
use anyhow::{anyhow, Result};
use bytes::Bytes;
use camino::Utf8PathBuf;
use pageserver_api::key::CompactKey;
use pageserver_api::keyspace::KeySpace;
@@ -80,7 +81,7 @@ pub struct InMemoryLayerInner {
/// All versions of all pages in the layer are kept here. Indexed
/// by block number and LSN. The value is an offset into the
/// ephemeral file where the page version is stored.
index: BTreeMap<CompactKey, VecMap<Lsn, u64>>,
index: BTreeMap<CompactKey, VecMap<Lsn, InMemoryLayerIndexValue>>,
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
@@ -89,6 +90,10 @@ pub struct InMemoryLayerInner {
resource_units: GlobalResourceUnits,
}
pub(crate) struct InMemoryLayerIndexValue {
pub(crate) pos: u32,
pub(crate) len: u32,
}
impl std::fmt::Debug for InMemoryLayerInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -230,7 +235,7 @@ impl InMemoryLayer {
}
}
pub(crate) fn try_len(&self) -> Option<u64> {
pub(crate) fn try_len(&self) -> Option<u32> {
self.inner.try_read().map(|i| i.file.len()).ok()
}
@@ -249,9 +254,7 @@ impl InMemoryLayer {
/// debugging function to print out the contents of the layer
///
/// this is likely completly unused
pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
let inner = self.inner.read().await;
pub async fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
let end_str = self.end_lsn_or_max();
println!(
@@ -259,39 +262,6 @@ impl InMemoryLayer {
self.timeline_id, self.start_lsn, end_str,
);
if !verbose {
return Ok(());
}
let cursor = inner.file.block_cursor();
let mut buf = Vec::new();
for (key, vec_map) in inner.index.iter() {
for (lsn, pos) in vec_map.as_slice() {
let mut desc = String::new();
cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
let val = Value::des(&buf);
match val {
Ok(Value::Image(img)) => {
write!(&mut desc, " img {} bytes", img.len())?;
}
Ok(Value::WalRecord(rec)) => {
let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
write!(
&mut desc,
" rec {} bytes will_init: {} {}",
buf.len(),
rec.will_init(),
wal_desc
)?;
}
Err(err) => {
write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
}
}
println!(" key {} at {}: {}", key, lsn, desc);
}
}
Ok(())
}
@@ -311,7 +281,6 @@ impl InMemoryLayer {
.build();
let inner = self.inner.read().await;
let reader = inner.file.block_cursor();
for range in keyspace.ranges.iter() {
for (key, vec_map) in inner
@@ -326,15 +295,53 @@ impl InMemoryLayer {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
// TODO: this uses the page cache => https://github.com/neondatabase/neon/issues/8183
let buf = reader.read_blob(*pos, &ctx).await;
if let Err(e) = buf {
reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
break;
}
'foreach_value: for (entry_lsn, value) in slice.iter().rev() {
let InMemoryLayerIndexValue { pos, len } = value;
let value = Value::des(&buf.unwrap());
// TODO: coalesce multiple reads that hit the same page into one page read
// Yuchen is working on a VectoredReadPlanner change to support this.
// In the meantime, we prepare the way for direct IO by doing full page reads.
let len = usize::try_from(*len).unwrap();
let mut value_buf = Vec::with_capacity(len);
let mut page_buf_storage = Some(PageBuf::from(Box::new([0u8; PAGE_SZ])));
let mut page_no = *pos / (PAGE_SZ as u32);
let mut offset_in_page = usize::try_from(*pos % (PAGE_SZ as u32)).unwrap();
while value_buf.len() < len {
let read_result = match inner
.file
.read_page(
page_no,
page_buf_storage
.take()
.expect("we put it back each iteration"),
&ctx,
)
.await
{
Ok(page) => page,
Err(e) => {
reconstruct_state
.on_key_error(key, PageReconstructError::from(anyhow!(e)));
break 'foreach_value;
}
};
{
let page_contents = read_result.contents();
let remaining_in_page = std::cmp::min(
len - value_buf.len(),
page_contents.len() - offset_in_page,
);
value_buf.extend_from_slice(
&page_contents[offset_in_page..offset_in_page + remaining_in_page],
);
}
offset_in_page = 0;
page_no += 1;
page_buf_storage = Some(read_result.into_page_buf());
}
assert!(value_buf.len() == len);
let value = Value::des(&value_buf);
if let Err(e) = value {
reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
break;
@@ -380,7 +387,7 @@ impl InMemoryLayer {
/// Get layer size.
pub async fn size(&self) -> Result<u64> {
let inner = self.inner.read().await;
Ok(inner.file.len())
Ok(inner.file.len() as u64)
}
/// Create a new, empty, in-memory layer
@@ -441,27 +448,25 @@ impl InMemoryLayer {
) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
let off = {
locked_inner
.file
.write_blob(
buf,
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build(),
)
.await?
};
let entry = locked_inner
.file
.write_blob(
buf,
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build(),
)
.await?;
let vec_map = locked_inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
let old = vec_map.append_or_update_last(lsn, entry).unwrap().0;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!("Key {} at {} already exists", key, lsn);
}
let size = locked_inner.file.len();
locked_inner.resource_units.maybe_publish_size(size);
locked_inner.resource_units.maybe_publish_size(size as u64);
Ok(())
}
@@ -473,7 +478,7 @@ impl InMemoryLayer {
pub(crate) async fn tick(&self) -> Option<u64> {
let mut inner = self.inner.write().await;
let size = inner.file.len();
inner.resource_units.publish_size(size)
inner.resource_units.publish_size(size as u64)
}
pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
@@ -536,7 +541,6 @@ impl InMemoryLayer {
use l0_flush::Inner;
let _concurrency_permit = match l0_flush_global_state {
Inner::PageCached => None,
Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
};
@@ -568,34 +572,6 @@ impl InMemoryLayer {
.await?;
match l0_flush_global_state {
l0_flush::Inner::PageCached => {
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
let mut buf = Vec::new();
let cursor = inner.file.block_cursor();
for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
let will_init = Value::des(&buf)?.will_init();
let (tmp, res) = delta_layer_writer
.put_value_bytes(
Key::from_compact(*key),
*lsn,
buf.slice_len(),
will_init,
&ctx,
)
.await;
res?;
buf = tmp.into_raw_slice().into_inner();
}
}
}
l0_flush::Inner::Direct { .. } => {
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
assert_eq!(
@@ -612,22 +588,15 @@ impl InMemoryLayer {
}
});
let cursor = BlockCursor::new(BlockReaderRef::Slice(&file_contents));
let mut buf = Vec::new();
let file_contents = Bytes::from(file_contents);
for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
// TODO: once we have blob lengths in the in-memory index, we can
// 1. get rid of the blob_io / BlockReaderRef::Slice business and
// 2. load the file contents into a Bytes and
// 3. the use `Bytes::slice` to get the `buf` that is our blob
// 4. pass that `buf` into `put_value_bytes`
// => https://github.com/neondatabase/neon/issues/8183
cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
for (lsn, entry) in vec_map.as_slice() {
let InMemoryLayerIndexValue { pos, len } = entry;
let buf = file_contents.slice(*pos as usize..(*pos + *len) as usize);
let will_init = Value::des(&buf)?.will_init();
let (tmp, res) = delta_layer_writer
let (_buf, res) = delta_layer_writer
.put_value_bytes(
Key::from_compact(*key),
*lsn,
@@ -637,7 +606,6 @@ impl InMemoryLayer {
)
.await;
res?;
buf = tmp.into_raw_slice().into_inner();
}
}
}

View File

@@ -1463,6 +1463,7 @@ impl Timeline {
tracing::warn!("Lock conflict while reading size of open layer");
return;
};
let current_size = current_size as u64;
let current_lsn = self.get_last_record_lsn();