L0 flush: opt-in mechanism to bypass PageCache reads and writes (#8190)

part of https://github.com/neondatabase/neon/issues/7418

# Motivation

(reproducing #7418)

When we do an `InMemoryLayer::write_to_disk`, there is a tremendous
amount of random read I/O, as deltas from the ephemeral file (written in
LSN order) are written out to the delta layer in key order.

In benchmarks (https://github.com/neondatabase/neon/pull/7409) we can
see that this delta layer writing phase is substantially more expensive
than the initial ingest of data, and that within the delta layer write a
significant amount of the CPU time is spent traversing the page cache.

# High-Level Changes

Add a new mode for L0 flush that works as follows:

* Read the full ephemeral file into memory -- layers are much smaller
than total memory, so this is afforable
* Do all the random reads directly from this in memory buffer instead of
using blob IO/page cache/disk reads.
* Add a semaphore to limit how many timelines may concurrently do this
(limit peak memory).
* Make the semaphore configurable via PS config.

# Implementation Details

The new `BlobReaderRef::Slice` is a temporary hack until we can ditch
`blob_io` for `InMemoryLayer` => Plan for this is laid out in
https://github.com/neondatabase/neon/issues/8183

# Correctness

The correctness of this change is quite obvious to me: we do what we did
before (`blob_io`) but read from memory instead of going to disk.

The highest bug potential is in doing owned-buffers IO. I refactored the
API a bit in preliminary PR
https://github.com/neondatabase/neon/pull/8186 to make it less
error-prone, but still, careful review is requested.

# Performance

I manually measured single-client ingest performance from `pgbench -i
...`.

Full report:
https://neondatabase.notion.site/2024-06-28-benchmarking-l0-flush-performance-e98cff3807f94cb38f2054d8c818fe84?pvs=4

tl;dr:

* no speed improvements during ingest,  but
* significantly lower pressure on PS PageCache (eviction rate drops to
1/3)
  * (that's why I'm working on this)
* noticable but modestly lower CPU time

This is good enough for merging this PR because the changes require
opt-in.

We'll do more testing in staging & pre-prod.

# Stability / Monitoring

**memory consumption**: there's no _hard_ limit on max `InMemoryLayer`
size (aka "checkpoint distance") , hence there's no hard limit on the
memory allocation we do for flushing. In practice, we a) [log a
warning](23827c6b0d/pageserver/src/tenant/timeline.rs (L5741-L5743))
when we flush oversized layers, so we'd know which tenant is to blame
and b) if we were to put a hard limit in place, we would have to decide
what to do if there is an InMemoryLayer that exceeds the limit.
It seems like a better option to guarantee a max size for frozen layer,
dependent on `checkpoint_distance`. Then limit concurrency based on
that.

**metrics**: we do have the
[flush_time_histo](23827c6b0d/pageserver/src/tenant/timeline.rs (L3725-L3726)),
but that includes the wait time for the semaphore. We could add a
separate metric for the time spent after acquiring the semaphore, so one
can infer the wait time. Seems unnecessary at this point, though.
This commit is contained in:
Christian Schwarz
2024-07-02 16:29:09 +02:00
committed by GitHub
parent 25eefdeb1f
commit 5de896e7d8
12 changed files with 322 additions and 57 deletions

View File

@@ -421,6 +421,10 @@ fn start_pageserver(
background_jobs_can_start: background_jobs_barrier.clone(),
};
info!(config=?conf.l0_flush, "using l0_flush config");
let l0_flush_global_state =
pageserver::l0_flush::L0FlushGlobalState::new(conf.l0_flush.clone());
// Scan the local 'tenants/' directory and start loading the tenants
let deletion_queue_client = deletion_queue.new_client();
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
@@ -429,6 +433,7 @@ fn start_pageserver(
broker_client: broker_client.clone(),
remote_storage: remote_storage.clone(),
deletion_queue_client,
l0_flush_global_state,
},
order,
shutdown_pageserver.clone(),

View File

@@ -30,11 +30,11 @@ use utils::{
logging::LogFormat,
};
use crate::tenant::timeline::GetVectoredImpl;
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
use crate::tenant::{config::TenantConfOpt, timeline::GetImpl};
use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use crate::{disk_usage_eviction_task::DiskUsageEvictionTaskConfig, virtual_file::io_engine};
use crate::{l0_flush::L0FlushConfig, tenant::timeline::GetVectoredImpl};
use crate::{tenant::config::TenantConf, virtual_file};
use crate::{TENANT_HEATMAP_BASENAME, TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX};
@@ -296,6 +296,8 @@ pub struct PageServerConf {
///
/// Setting this to zero disables limits on total ephemeral layer size.
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: L0FlushConfig,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -403,6 +405,8 @@ struct PageServerConfigBuilder {
image_compression: BuilderValue<Option<ImageCompressionAlgorithm>>,
ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
l0_flush: BuilderValue<L0FlushConfig>,
}
impl PageServerConfigBuilder {
@@ -492,6 +496,7 @@ impl PageServerConfigBuilder {
image_compression: Set(DEFAULT_IMAGE_COMPRESSION),
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: Set(L0FlushConfig::default()),
}
}
}
@@ -683,6 +688,10 @@ impl PageServerConfigBuilder {
self.ephemeral_bytes_per_memory_kb = BuilderValue::Set(value);
}
pub fn l0_flush(&mut self, value: L0FlushConfig) {
self.l0_flush = BuilderValue::Set(value);
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let default = Self::default_values();
@@ -741,6 +750,7 @@ impl PageServerConfigBuilder {
validate_vectored_get,
image_compression,
ephemeral_bytes_per_memory_kb,
l0_flush,
}
CUSTOM LOGIC
{
@@ -1023,6 +1033,9 @@ impl PageServerConf {
"ephemeral_bytes_per_memory_kb" => {
builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize)
}
"l0_flush" => {
builder.l0_flush(utils::toml_edit_ext::deserialize_item(item).context("l0_flush")?)
}
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -1107,6 +1120,7 @@ impl PageServerConf {
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
}
}
}
@@ -1347,6 +1361,7 @@ background_task_maximum_delay = '334 s'
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
},
"Correct defaults should be used when no config values are provided"
);
@@ -1421,6 +1436,7 @@ background_task_maximum_delay = '334 s'
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -0,0 +1,46 @@
use std::{num::NonZeroUsize, sync::Arc};
use crate::tenant::ephemeral_file;
#[derive(Default, Debug, PartialEq, Eq, Clone, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum L0FlushConfig {
#[default]
PageCached,
#[serde(rename_all = "snake_case")]
Direct { max_concurrency: NonZeroUsize },
}
#[derive(Clone)]
pub struct L0FlushGlobalState(Arc<Inner>);
pub(crate) enum Inner {
PageCached,
Direct { semaphore: tokio::sync::Semaphore },
}
impl L0FlushGlobalState {
pub fn new(config: L0FlushConfig) -> Self {
match config {
L0FlushConfig::PageCached => Self(Arc::new(Inner::PageCached)),
L0FlushConfig::Direct { max_concurrency } => {
let semaphore = tokio::sync::Semaphore::new(max_concurrency.get());
Self(Arc::new(Inner::Direct { semaphore }))
}
}
}
pub(crate) fn inner(&self) -> &Arc<Inner> {
&self.0
}
}
impl L0FlushConfig {
pub(crate) fn prewarm_on_write(&self) -> ephemeral_file::PrewarmPageCacheOnWrite {
use L0FlushConfig::*;
match self {
PageCached => ephemeral_file::PrewarmPageCacheOnWrite::Yes,
Direct { .. } => ephemeral_file::PrewarmPageCacheOnWrite::No,
}
}
}

View File

@@ -11,6 +11,7 @@ pub mod deletion_queue;
pub mod disk_usage_eviction_task;
pub mod http;
pub mod import_datadir;
pub mod l0_flush;
pub use pageserver_api::keyspace;
pub mod aux_file;
pub mod metrics;

View File

@@ -73,6 +73,7 @@ use crate::deletion_queue::DeletionQueueClient;
use crate::deletion_queue::DeletionQueueError;
use crate::import_datadir;
use crate::is_uninit_mark;
use crate::l0_flush::L0FlushGlobalState;
use crate::metrics::TENANT;
use crate::metrics::{
remove_tenant_metrics, BROKEN_TENANTS_SET, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC,
@@ -166,6 +167,7 @@ pub struct TenantSharedResources {
pub broker_client: storage_broker::BrokerClientChannel,
pub remote_storage: GenericRemoteStorage,
pub deletion_queue_client: DeletionQueueClient,
pub l0_flush_global_state: L0FlushGlobalState,
}
/// A [`Tenant`] is really an _attached_ tenant. The configuration
@@ -294,6 +296,8 @@ pub struct Tenant {
/// An ongoing timeline detach must be checked during attempts to GC or compact a timeline.
ongoing_timeline_detach: std::sync::Mutex<Option<(TimelineId, utils::completion::Barrier)>>,
l0_flush_global_state: L0FlushGlobalState,
}
impl std::fmt::Debug for Tenant {
@@ -676,6 +680,7 @@ impl Tenant {
broker_client,
remote_storage,
deletion_queue_client,
l0_flush_global_state,
} = resources;
let attach_mode = attached_conf.location.attach_mode;
@@ -690,6 +695,7 @@ impl Tenant {
tenant_shard_id,
remote_storage.clone(),
deletion_queue_client,
l0_flush_global_state,
));
// The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
@@ -989,6 +995,7 @@ impl Tenant {
TimelineResources {
remote_client,
timeline_get_throttle: self.timeline_get_throttle.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
},
ctx,
)
@@ -2478,6 +2485,7 @@ impl Tenant {
tenant_shard_id: TenantShardId,
remote_storage: GenericRemoteStorage,
deletion_queue_client: DeletionQueueClient,
l0_flush_global_state: L0FlushGlobalState,
) -> Tenant {
debug_assert!(
!attached_conf.location.generation.is_none() || conf.control_plane_api.is_none()
@@ -2565,6 +2573,7 @@ impl Tenant {
)),
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
ongoing_timeline_detach: std::sync::Mutex::default(),
l0_flush_global_state,
}
}
@@ -3302,6 +3311,7 @@ impl Tenant {
TimelineResources {
remote_client,
timeline_get_throttle: self.timeline_get_throttle.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
}
}
@@ -3638,6 +3648,7 @@ pub(crate) mod harness {
use utils::logging;
use crate::deletion_queue::mock::MockDeletionQueue;
use crate::l0_flush::L0FlushConfig;
use crate::walredo::apply_neon;
use crate::{repository::Key, walrecord::NeonWalRecord};
@@ -3827,6 +3838,8 @@ pub(crate) mod harness {
self.tenant_shard_id,
self.remote_storage.clone(),
self.deletion_queue.new_client(),
// TODO: ideally we should run all unit tests with both configs
L0FlushGlobalState::new(L0FlushConfig::default()),
));
let preload = tenant

View File

@@ -37,6 +37,7 @@ where
pub enum BlockLease<'a> {
PageReadGuard(PageReadGuard<'static>),
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
Slice(&'a [u8; PAGE_SZ]),
#[cfg(test)]
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
#[cfg(test)]
@@ -63,6 +64,7 @@ impl<'a> Deref for BlockLease<'a> {
match self {
BlockLease::PageReadGuard(v) => v.deref(),
BlockLease::EphemeralFileMutableTail(v) => v,
BlockLease::Slice(v) => v,
#[cfg(test)]
BlockLease::Arc(v) => v.deref(),
#[cfg(test)]
@@ -81,6 +83,7 @@ pub(crate) enum BlockReaderRef<'a> {
FileBlockReader(&'a FileBlockReader<'a>),
EphemeralFile(&'a EphemeralFile),
Adapter(Adapter<&'a DeltaLayerInner>),
Slice(&'a [u8]),
#[cfg(test)]
TestDisk(&'a super::disk_btree::tests::TestDisk),
#[cfg(test)]
@@ -99,6 +102,7 @@ impl<'a> BlockReaderRef<'a> {
FileBlockReader(r) => r.read_blk(blknum, ctx).await,
EphemeralFile(r) => r.read_blk(blknum, ctx).await,
Adapter(r) => r.read_blk(blknum, ctx).await,
Slice(s) => Self::read_blk_slice(s, blknum),
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
#[cfg(test)]
@@ -107,6 +111,24 @@ impl<'a> BlockReaderRef<'a> {
}
}
impl<'a> BlockReaderRef<'a> {
fn read_blk_slice(slice: &[u8], blknum: u32) -> std::io::Result<BlockLease> {
let start = (blknum as usize).checked_mul(PAGE_SZ).unwrap();
let end = start.checked_add(PAGE_SZ).unwrap();
if end > slice.len() {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("slice too short, len={} end={}", slice.len(), end),
));
}
let slice = &slice[start..end];
let page_sized: &[u8; PAGE_SZ] = slice
.try_into()
.expect("we add PAGE_SZ to start, so the slice must have PAGE_SZ");
Ok(BlockLease::Slice(page_sized))
}
}
///
/// A "cursor" for efficiently reading multiple pages from a BlockReader
///

View File

@@ -21,6 +21,7 @@ pub struct EphemeralFile {
}
mod page_caching;
pub(crate) use page_caching::PrewarmOnWrite as PrewarmPageCacheOnWrite;
mod zero_padded_read_write;
impl EphemeralFile {
@@ -53,7 +54,7 @@ impl EphemeralFile {
Ok(EphemeralFile {
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
rw: page_caching::RW::new(file),
rw: page_caching::RW::new(file, conf.l0_flush.prewarm_on_write()),
})
}
@@ -65,6 +66,11 @@ impl EphemeralFile {
self.rw.page_cache_file_id()
}
/// See [`self::page_caching::RW::load_to_vec`].
pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
self.rw.load_to_vec(ctx).await
}
pub(crate) async fn read_blk(
&self,
blknum: u32,

View File

@@ -8,6 +8,7 @@ use crate::virtual_file::VirtualFile;
use once_cell::sync::Lazy;
use std::io::{self, ErrorKind};
use std::ops::{Deref, Range};
use tokio_epoll_uring::BoundedBuf;
use tracing::*;
@@ -19,14 +20,23 @@ pub struct RW {
rw: super::zero_padded_read_write::RW<PreWarmingWriter>,
}
/// When we flush a block to the underlying [`crate::virtual_file::VirtualFile`],
/// should we pre-warm the [`crate::page_cache`] with the contents?
#[derive(Clone, Copy)]
pub enum PrewarmOnWrite {
Yes,
No,
}
impl RW {
pub fn new(file: VirtualFile) -> Self {
pub fn new(file: VirtualFile, prewarm_on_write: PrewarmOnWrite) -> Self {
let page_cache_file_id = page_cache::next_file_id();
Self {
page_cache_file_id,
rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new(
page_cache_file_id,
file,
prewarm_on_write,
)),
}
}
@@ -49,6 +59,43 @@ impl RW {
self.rw.bytes_written()
}
/// Load all blocks that can be read via [`Self::read_blk`] into a contiguous memory buffer.
///
/// This includes the blocks that aren't yet flushed to disk by the internal buffered writer.
/// The last block is zero-padded to [`PAGE_SZ`], so, the returned buffer is always a multiple of [`PAGE_SZ`].
pub(super) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
// round up to the next PAGE_SZ multiple, required by blob_io
let size = {
let s = usize::try_from(self.bytes_written()).unwrap();
if s % PAGE_SZ == 0 {
s
} else {
s.checked_add(PAGE_SZ - (s % PAGE_SZ)).unwrap()
}
};
let vec = Vec::with_capacity(size);
// read from disk what we've already flushed
let writer = self.rw.as_writer();
let flushed_range = writer.written_range();
let mut vec = writer
.file
.read_exact_at(
vec.slice(0..(flushed_range.end - flushed_range.start)),
u64::try_from(flushed_range.start).unwrap(),
ctx,
)
.await?
.into_inner();
// copy from in-memory buffer what we haven't flushed yet but would return when accessed via read_blk
let buffered = self.rw.get_tail_zero_padded();
vec.extend_from_slice(buffered);
assert_eq!(vec.len(), size);
assert_eq!(vec.len() % PAGE_SZ, 0);
Ok(vec)
}
pub(crate) async fn read_blk(
&self,
blknum: u32,
@@ -116,19 +163,40 @@ impl Drop for RW {
}
struct PreWarmingWriter {
prewarm_on_write: PrewarmOnWrite,
nwritten_blocks: u32,
page_cache_file_id: page_cache::FileId,
file: VirtualFile,
}
impl PreWarmingWriter {
fn new(page_cache_file_id: page_cache::FileId, file: VirtualFile) -> Self {
fn new(
page_cache_file_id: page_cache::FileId,
file: VirtualFile,
prewarm_on_write: PrewarmOnWrite,
) -> Self {
Self {
prewarm_on_write,
nwritten_blocks: 0,
page_cache_file_id,
file,
}
}
/// Return the byte range within `file` that has been written though `write_all`.
///
/// The returned range would be invalidated by another `write_all`. To prevent that, we capture `&_`.
fn written_range(&self) -> (impl Deref<Target = Range<usize>> + '_) {
let nwritten_blocks = usize::try_from(self.nwritten_blocks).unwrap();
struct Wrapper(Range<usize>);
impl Deref for Wrapper {
type Target = Range<usize>;
fn deref(&self) -> &Range<usize> {
&self.0
}
}
Wrapper(0..nwritten_blocks * PAGE_SZ)
}
}
impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter {
@@ -178,45 +246,51 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi
assert_eq!(&check_bounds_stuff_works, &*buf);
}
// Pre-warm page cache with the contents.
// At least in isolated bulk ingest benchmarks (test_bulk_insert.py), the pre-warming
// benefits the code that writes InMemoryLayer=>L0 layers.
let nblocks = buflen / PAGE_SZ;
let nblocks32 = u32::try_from(nblocks).unwrap();
let cache = page_cache::get();
static CTX: Lazy<RequestContext> = Lazy::new(|| {
RequestContext::new(
crate::task_mgr::TaskKind::EphemeralFilePreWarmPageCache,
crate::context::DownloadBehavior::Error,
)
});
for blknum_in_buffer in 0..nblocks {
let blk_in_buffer = &buf[blknum_in_buffer * PAGE_SZ..(blknum_in_buffer + 1) * PAGE_SZ];
let blknum = self
.nwritten_blocks
.checked_add(blknum_in_buffer as u32)
.unwrap();
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, &CTX)
.await
{
Err(e) => {
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
}
Ok(v) => match v {
page_cache::ReadBufResult::Found(_guard) => {
// This function takes &mut self, so, it shouldn't be possible to reach this point.
unreachable!("we just wrote block {blknum} to the VirtualFile, which is owned by Self, \
if matches!(self.prewarm_on_write, PrewarmOnWrite::Yes) {
// Pre-warm page cache with the contents.
// At least in isolated bulk ingest benchmarks (test_bulk_insert.py), the pre-warming
// benefits the code that writes InMemoryLayer=>L0 layers.
let cache = page_cache::get();
static CTX: Lazy<RequestContext> = Lazy::new(|| {
RequestContext::new(
crate::task_mgr::TaskKind::EphemeralFilePreWarmPageCache,
crate::context::DownloadBehavior::Error,
)
});
for blknum_in_buffer in 0..nblocks {
let blk_in_buffer =
&buf[blknum_in_buffer * PAGE_SZ..(blknum_in_buffer + 1) * PAGE_SZ];
let blknum = self
.nwritten_blocks
.checked_add(blknum_in_buffer as u32)
.unwrap();
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, &CTX)
.await
{
Err(e) => {
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
}
Ok(v) => match v {
page_cache::ReadBufResult::Found(_guard) => {
// This function takes &mut self, so, it shouldn't be possible to reach this point.
unreachable!("we just wrote block {blknum} to the VirtualFile, which is owned by Self, \
and this function takes &mut self, so, no concurrent read_blk is possible");
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
write_guard.copy_from_slice(blk_in_buffer);
let _ = write_guard.mark_valid();
}
},
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
write_guard.copy_from_slice(blk_in_buffer);
let _ = write_guard.mark_valid();
}
},
}
}
}
self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap();
Ok((buflen, buf.into_inner()))
}

View File

@@ -75,6 +75,21 @@ where
flushed_offset + u64::try_from(buffer.pending()).unwrap()
}
/// Get a slice of all blocks that [`Self::read_blk`] would return as [`ReadResult::ServedFromZeroPaddedMutableTail`].
pub fn get_tail_zero_padded(&self) -> &[u8] {
let buffer: &zero_padded::Buffer<TAIL_SZ> = self.buffered_writer.inspect_buffer();
let buffer_written_up_to = buffer.pending();
// pad to next page boundary
let read_up_to = if buffer_written_up_to % PAGE_SZ == 0 {
buffer_written_up_to
} else {
buffer_written_up_to
.checked_add(PAGE_SZ - (buffer_written_up_to % PAGE_SZ))
.unwrap()
};
&buffer.as_zero_padded_slice()[0..read_up_to]
}
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<ReadResult<'_, W>, std::io::Error> {
let flushed_offset = self.buffered_writer.as_inner().bytes_written();
let buffer: &zero_padded::Buffer<TAIL_SZ> = self.buffered_writer.inspect_buffer();

View File

@@ -6,13 +6,14 @@
//!
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, Value};
use crate::tenant::block_io::BlockReader;
use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::ValueReconstructResult;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::{page_cache, walrecord};
use crate::{l0_flush, page_cache, walrecord};
use anyhow::{anyhow, ensure, Result};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
@@ -410,6 +411,7 @@ impl InMemoryLayer {
continue;
}
// TODO: this uses the page cache => https://github.com/neondatabase/neon/issues/8183
let buf = reader.read_blob(block_read.block_offset, &ctx).await;
if let Err(e) = buf {
reconstruct_state
@@ -620,6 +622,13 @@ impl InMemoryLayer {
// rare though, so we just accept the potential latency hit for now.
let inner = self.inner.read().await;
let l0_flush_global_state = timeline.l0_flush_global_state.inner().clone();
use l0_flush::Inner;
let _concurrency_permit = match &*l0_flush_global_state {
Inner::PageCached => None,
Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
};
let end_lsn = *self.end_lsn.get().unwrap();
let key_count = if let Some(key_range) = key_range {
@@ -645,28 +654,77 @@ impl InMemoryLayer {
)
.await?;
let mut buf = Vec::new();
match &*l0_flush_global_state {
l0_flush::Inner::PageCached => {
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
let cursor = inner.file.block_cursor();
let mut buf = Vec::new();
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
let will_init = Value::des(&buf)?.will_init();
let res;
(buf, res) = delta_layer_writer
.put_value_bytes(*key, *lsn, buf, will_init, &ctx)
.await;
res?;
let cursor = inner.file.block_cursor();
for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
let will_init = Value::des(&buf)?.will_init();
let res;
(buf, res) = delta_layer_writer
.put_value_bytes(*key, *lsn, buf, will_init, &ctx)
.await;
res?;
}
}
}
l0_flush::Inner::Direct { .. } => {
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
assert_eq!(
file_contents.len() % PAGE_SZ,
0,
"needed by BlockReaderRef::Slice"
);
assert_eq!(file_contents.len(), {
let written = usize::try_from(inner.file.len()).unwrap();
if written % PAGE_SZ == 0 {
written
} else {
written.checked_add(PAGE_SZ - (written % PAGE_SZ)).unwrap()
}
});
let cursor = BlockCursor::new(BlockReaderRef::Slice(&file_contents));
let mut buf = Vec::new();
for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
// TODO: once we have blob lengths in the in-memory index, we can
// 1. get rid of the blob_io / BlockReaderRef::Slice business and
// 2. load the file contents into a Bytes and
// 3. the use `Bytes::slice` to get the `buf` that is our blob
// 4. pass that `buf` into `put_value_bytes`
// => https://github.com/neondatabase/neon/issues/8183
cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
let will_init = Value::des(&buf)?.will_init();
let res;
(buf, res) = delta_layer_writer
.put_value_bytes(*key, *lsn, buf, will_init, ctx)
.await;
res?;
}
}
// Hold the permit until the IO is done; if we didn't, one could drop this future,
// thereby releasing the permit, but the Vec<u8> remains allocated until the IO completes.
// => we'd have more concurrenct Vec<u8> than allowed as per the semaphore.
drop(_concurrency_permit);
}
}
// MAX is used here because we identify L0 layers by full key range
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, &ctx).await?;
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, ctx).await?;
Ok(Some(delta_layer))
}
}

View File

@@ -65,7 +65,6 @@ use std::{
ops::{Deref, Range},
};
use crate::metrics::GetKind;
use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS;
use crate::{
aux_file::AuxFileSizeEstimator,
@@ -90,6 +89,10 @@ use crate::{
use crate::{
disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
};
use crate::{
l0_flush::{self, L0FlushGlobalState},
metrics::GetKind,
};
use crate::{
metrics::ScanLatencyOngoingRecording, tenant::timeline::logical_size::CurrentLogicalSize,
};
@@ -208,6 +211,7 @@ pub struct TimelineResources {
pub timeline_get_throttle: Arc<
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
}
pub(crate) struct AuxFilesState {
@@ -433,6 +437,8 @@ pub struct Timeline {
/// in the future, add `extra_test_sparse_keyspace` if necessary.
#[cfg(test)]
pub(crate) extra_test_dense_keyspace: ArcSwap<KeySpace>,
pub(crate) l0_flush_global_state: L0FlushGlobalState,
}
pub struct WalReceiverInfo {
@@ -2392,6 +2398,8 @@ impl Timeline {
#[cfg(test)]
extra_test_dense_keyspace: ArcSwap::new(Arc::new(KeySpace::default())),
l0_flush_global_state: resources.l0_flush_global_state,
};
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;

View File

@@ -272,6 +272,7 @@ impl DeleteTimelineFlow {
TimelineResources {
remote_client,
timeline_get_throttle: tenant.timeline_get_throttle.clone(),
l0_flush_global_state: tenant.l0_flush_global_state.clone(),
},
// Important. We dont pass ancestor above because it can be missing.
// Thus we need to skip the validation here.