From eb7241c798d445cd7bcb52d14fbf6c59f4a54d32 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 19 Aug 2024 16:35:34 +0200 Subject: [PATCH] l0_flush: remove support for mode `page-cached` (#8739) It's been rolled out everywhere, no configs are referencing it. All code that's made dead by the removal of the config option is removed as part of this PR. The `page_caching::PreWarmingWriter` in `::No` mode is equivalent to a `size_tracking_writer`, so, use that. part of https://github.com/neondatabase/neon/issues/7418 --- pageserver/src/l0_flush.rs | 19 +- pageserver/src/tenant/ephemeral_file.rs | 5 +- .../src/tenant/ephemeral_file/page_caching.rs | 169 ++---------------- .../tenant/storage_layer/inmemory_layer.rs | 68 +------ 4 files changed, 20 insertions(+), 241 deletions(-) diff --git a/pageserver/src/l0_flush.rs b/pageserver/src/l0_flush.rs index 10187f2ba3..313a7961a6 100644 --- a/pageserver/src/l0_flush.rs +++ b/pageserver/src/l0_flush.rs @@ -1,15 +1,10 @@ use std::{num::NonZeroUsize, sync::Arc}; -use crate::tenant::ephemeral_file; - #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize)] #[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)] pub enum L0FlushConfig { - PageCached, #[serde(rename_all = "snake_case")] - Direct { - max_concurrency: NonZeroUsize, - }, + Direct { max_concurrency: NonZeroUsize }, } impl Default for L0FlushConfig { @@ -25,14 +20,12 @@ impl Default for L0FlushConfig { pub struct L0FlushGlobalState(Arc); pub enum Inner { - PageCached, Direct { semaphore: tokio::sync::Semaphore }, } impl L0FlushGlobalState { pub fn new(config: L0FlushConfig) -> Self { match config { - L0FlushConfig::PageCached => Self(Arc::new(Inner::PageCached)), L0FlushConfig::Direct { max_concurrency } => { let semaphore = tokio::sync::Semaphore::new(max_concurrency.get()); Self(Arc::new(Inner::Direct { semaphore })) @@ -44,13 +37,3 @@ impl L0FlushGlobalState { &self.0 } } - -impl L0FlushConfig { - pub(crate) fn prewarm_on_write(&self) -> ephemeral_file::PrewarmPageCacheOnWrite { - use L0FlushConfig::*; - match self { - PageCached => ephemeral_file::PrewarmPageCacheOnWrite::Yes, - Direct { .. } => ephemeral_file::PrewarmPageCacheOnWrite::No, - } - } -} diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 770f3ca5f0..3eb8384d05 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -21,7 +21,6 @@ pub struct EphemeralFile { } mod page_caching; -pub(crate) use page_caching::PrewarmOnWrite as PrewarmPageCacheOnWrite; mod zero_padded_read_write; impl EphemeralFile { @@ -52,12 +51,10 @@ impl EphemeralFile { ) .await?; - let prewarm = conf.l0_flush.prewarm_on_write(); - Ok(EphemeralFile { _tenant_shard_id: tenant_shard_id, _timeline_id: timeline_id, - rw: page_caching::RW::new(file, prewarm, gate_guard), + rw: page_caching::RW::new(file, gate_guard), }) } diff --git a/pageserver/src/tenant/ephemeral_file/page_caching.rs b/pageserver/src/tenant/ephemeral_file/page_caching.rs index 7355b3b5a3..48926354f1 100644 --- a/pageserver/src/tenant/ephemeral_file/page_caching.rs +++ b/pageserver/src/tenant/ephemeral_file/page_caching.rs @@ -1,15 +1,15 @@ //! Wrapper around [`super::zero_padded_read_write::RW`] that uses the //! [`crate::page_cache`] to serve reads that need to go to the underlying [`VirtualFile`]. +//! +//! Subject to removal in use crate::context::RequestContext; use crate::page_cache::{self, PAGE_SZ}; use crate::tenant::block_io::BlockLease; -use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice; +use crate::virtual_file::owned_buffers_io::util::size_tracking_writer; use crate::virtual_file::VirtualFile; -use once_cell::sync::Lazy; -use std::io::{self, ErrorKind}; -use std::ops::{Deref, Range}; +use std::io::{self}; use tokio_epoll_uring::BoundedBuf; use tracing::*; @@ -18,33 +18,17 @@ use super::zero_padded_read_write; /// See module-level comment. pub struct RW { page_cache_file_id: page_cache::FileId, - rw: super::zero_padded_read_write::RW, + rw: super::zero_padded_read_write::RW>, /// Gate guard is held on as long as we need to do operations in the path (delete on drop). _gate_guard: utils::sync::gate::GateGuard, } -/// When we flush a block to the underlying [`crate::virtual_file::VirtualFile`], -/// should we pre-warm the [`crate::page_cache`] with the contents? -#[derive(Clone, Copy)] -pub enum PrewarmOnWrite { - Yes, - No, -} - impl RW { - pub fn new( - file: VirtualFile, - prewarm_on_write: PrewarmOnWrite, - _gate_guard: utils::sync::gate::GateGuard, - ) -> Self { + pub fn new(file: VirtualFile, _gate_guard: utils::sync::gate::GateGuard) -> Self { let page_cache_file_id = page_cache::next_file_id(); Self { page_cache_file_id, - rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new( - page_cache_file_id, - file, - prewarm_on_write, - )), + rw: super::zero_padded_read_write::RW::new(size_tracking_writer::Writer::new(file)), _gate_guard, } } @@ -84,10 +68,10 @@ impl RW { let vec = Vec::with_capacity(size); // read from disk what we've already flushed - let writer = self.rw.as_writer(); - let flushed_range = writer.written_range(); - let mut vec = writer - .file + let file_size_tracking_writer = self.rw.as_writer(); + let flushed_range = 0..usize::try_from(file_size_tracking_writer.bytes_written()).unwrap(); + let mut vec = file_size_tracking_writer + .as_inner() .read_exact_at( vec.slice(0..(flushed_range.end - flushed_range.start)), u64::try_from(flushed_range.start).unwrap(), @@ -122,7 +106,7 @@ impl RW { format!( "ephemeral file: read immutable page #{}: {}: {:#}", blknum, - self.rw.as_writer().file.path, + self.rw.as_writer().as_inner().path, e, ), ) @@ -132,7 +116,7 @@ impl RW { } page_cache::ReadBufResult::NotFound(write_guard) => { let write_guard = writer - .file + .as_inner() .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64, ctx) .await?; let read_guard = write_guard.mark_valid(); @@ -154,137 +138,16 @@ impl Drop for RW { // unlink the file // we are clear to do this, because we have entered a gate - let res = std::fs::remove_file(&self.rw.as_writer().file.path); + let path = &self.rw.as_writer().as_inner().path; + let res = std::fs::remove_file(path); if let Err(e) = res { if e.kind() != std::io::ErrorKind::NotFound { // just never log the not found errors, we cannot do anything for them; on detach // the tenant directory is already gone. // // not found files might also be related to https://github.com/neondatabase/neon/issues/2442 - error!( - "could not remove ephemeral file '{}': {}", - self.rw.as_writer().file.path, - e - ); + error!("could not remove ephemeral file '{path}': {e}"); } } } } - -struct PreWarmingWriter { - prewarm_on_write: PrewarmOnWrite, - nwritten_blocks: u32, - page_cache_file_id: page_cache::FileId, - file: VirtualFile, -} - -impl PreWarmingWriter { - fn new( - page_cache_file_id: page_cache::FileId, - file: VirtualFile, - prewarm_on_write: PrewarmOnWrite, - ) -> Self { - Self { - prewarm_on_write, - nwritten_blocks: 0, - page_cache_file_id, - file, - } - } - - /// Return the byte range within `file` that has been written though `write_all`. - /// - /// The returned range would be invalidated by another `write_all`. To prevent that, we capture `&_`. - fn written_range(&self) -> (impl Deref> + '_) { - let nwritten_blocks = usize::try_from(self.nwritten_blocks).unwrap(); - struct Wrapper(Range); - impl Deref for Wrapper { - type Target = Range; - fn deref(&self) -> &Range { - &self.0 - } - } - Wrapper(0..nwritten_blocks * PAGE_SZ) - } -} - -impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter { - async fn write_all( - &mut self, - buf: FullSlice, - ctx: &RequestContext, - ) -> std::io::Result<(usize, FullSlice)> { - let buflen = buf.len(); - assert_eq!( - buflen % PAGE_SZ, - 0, - "{buflen} ; we know TAIL_SZ is a PAGE_SZ multiple, and write_buffered_borrowed is used" - ); - - // Do the IO. - let buf = match self.file.write_all(buf, ctx).await { - (buf, Ok(nwritten)) => { - assert_eq!(nwritten, buflen); - buf - } - (_, Err(e)) => { - return Err(std::io::Error::new( - ErrorKind::Other, - // order error before path because path is long and error is short - format!( - "ephemeral_file: write_blob: write-back tail self.nwritten_blocks={}, buflen={}, {:#}: {}", - self.nwritten_blocks, buflen, e, self.file.path, - ), - )); - } - }; - - let nblocks = buflen / PAGE_SZ; - let nblocks32 = u32::try_from(nblocks).unwrap(); - - if matches!(self.prewarm_on_write, PrewarmOnWrite::Yes) { - // Pre-warm page cache with the contents. - // At least in isolated bulk ingest benchmarks (test_bulk_insert.py), the pre-warming - // benefits the code that writes InMemoryLayer=>L0 layers. - - let cache = page_cache::get(); - static CTX: Lazy = Lazy::new(|| { - RequestContext::new( - crate::task_mgr::TaskKind::EphemeralFilePreWarmPageCache, - crate::context::DownloadBehavior::Error, - ) - }); - for blknum_in_buffer in 0..nblocks { - let blk_in_buffer = - &buf[blknum_in_buffer * PAGE_SZ..(blknum_in_buffer + 1) * PAGE_SZ]; - let blknum = self - .nwritten_blocks - .checked_add(blknum_in_buffer as u32) - .unwrap(); - match cache - .read_immutable_buf(self.page_cache_file_id, blknum, &CTX) - .await - { - Err(e) => { - error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}"); - // fail gracefully, it's not the end of the world if we can't pre-warm the cache here - } - Ok(v) => match v { - page_cache::ReadBufResult::Found(_guard) => { - // This function takes &mut self, so, it shouldn't be possible to reach this point. - unreachable!("we just wrote block {blknum} to the VirtualFile, which is owned by Self, \ - and this function takes &mut self, so, no concurrent read_blk is possible"); - } - page_cache::ReadBufResult::NotFound(mut write_guard) => { - write_guard.copy_from_slice(blk_in_buffer); - let _ = write_guard.mark_valid(); - } - }, - } - } - } - - self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap(); - Ok((buflen, buf)) - } -} diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 748d79c149..130d1002a0 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -13,7 +13,7 @@ use crate::tenant::ephemeral_file::EphemeralFile; use crate::tenant::timeline::GetVectoredError; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; -use crate::{l0_flush, page_cache, walrecord}; +use crate::{l0_flush, page_cache}; use anyhow::{anyhow, Result}; use camino::Utf8PathBuf; use pageserver_api::key::CompactKey; @@ -249,9 +249,7 @@ impl InMemoryLayer { /// debugging function to print out the contents of the layer /// /// this is likely completly unused - pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { - let inner = self.inner.read().await; - + pub async fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> { let end_str = self.end_lsn_or_max(); println!( @@ -259,39 +257,6 @@ impl InMemoryLayer { self.timeline_id, self.start_lsn, end_str, ); - if !verbose { - return Ok(()); - } - - let cursor = inner.file.block_cursor(); - let mut buf = Vec::new(); - for (key, vec_map) in inner.index.iter() { - for (lsn, pos) in vec_map.as_slice() { - let mut desc = String::new(); - cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?; - let val = Value::des(&buf); - match val { - Ok(Value::Image(img)) => { - write!(&mut desc, " img {} bytes", img.len())?; - } - Ok(Value::WalRecord(rec)) => { - let wal_desc = walrecord::describe_wal_record(&rec).unwrap(); - write!( - &mut desc, - " rec {} bytes will_init: {} {}", - buf.len(), - rec.will_init(), - wal_desc - )?; - } - Err(err) => { - write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?; - } - } - println!(" key {} at {}: {}", key, lsn, desc); - } - } - Ok(()) } @@ -536,7 +501,6 @@ impl InMemoryLayer { use l0_flush::Inner; let _concurrency_permit = match l0_flush_global_state { - Inner::PageCached => None, Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await), }; @@ -568,34 +532,6 @@ impl InMemoryLayer { .await?; match l0_flush_global_state { - l0_flush::Inner::PageCached => { - let ctx = RequestContextBuilder::extend(ctx) - .page_content_kind(PageContentKind::InMemoryLayer) - .build(); - - let mut buf = Vec::new(); - - let cursor = inner.file.block_cursor(); - - for (key, vec_map) in inner.index.iter() { - // Write all page versions - for (lsn, pos) in vec_map.as_slice() { - cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?; - let will_init = Value::des(&buf)?.will_init(); - let (tmp, res) = delta_layer_writer - .put_value_bytes( - Key::from_compact(*key), - *lsn, - buf.slice_len(), - will_init, - &ctx, - ) - .await; - res?; - buf = tmp.into_raw_slice().into_inner(); - } - } - } l0_flush::Inner::Direct { .. } => { let file_contents: Vec = inner.file.load_to_vec(ctx).await?; assert_eq!(