From ca5390a89d8ae4b485c3471ccdac5910a86079dd Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 6 Aug 2024 17:39:40 +0100 Subject: [PATCH] pageserver: add `bench_ingest` (#7409) ## Problem We lack a rust bench for the inmemory layer and delta layer write paths: it is useful to benchmark these components independent of postgres & WAL decoding. Related: https://github.com/neondatabase/neon/issues/8452 ## Summary of changes - Refactor DeltaLayerWriter to avoid carrying a Timeline, so that it can be cleanly tested + benched without a Tenant/Timeline test harness. It only needed the Timeline for building `Layer`, so this can be done in a separate step. - Add `bench_ingest`, which exercises a variety of workload "shapes" (big values, small values, sequential keys, random keys) - Include a small uncontroversial optimization: in `freeze`, only exhaustively walk values to assert ordering relative to end_lsn in debug mode. These benches are limited by drive performance on a lot of machines, but still useful as a local tool for iterating on CPU/memory improvements around this code path. Anecdotal measurements on Hetzner AX102 (Ryzen 7950xd): ``` ingest-small-values/ingest 128MB/100b seq time: [1.1160 s 1.1230 s 1.1289 s] thrpt: [113.38 MiB/s 113.98 MiB/s 114.70 MiB/s] Found 1 outliers among 10 measurements (10.00%) 1 (10.00%) low mild Benchmarking ingest-small-values/ingest 128MB/100b rand: Warming up for 3.0000 s Warning: Unable to complete 10 samples in 10.0s. You may wish to increase target time to 18.9s. ingest-small-values/ingest 128MB/100b rand time: [1.9001 s 1.9056 s 1.9110 s] thrpt: [66.982 MiB/s 67.171 MiB/s 67.365 MiB/s] Benchmarking ingest-small-values/ingest 128MB/100b rand-1024keys: Warming up for 3.0000 s Warning: Unable to complete 10 samples in 10.0s. You may wish to increase target time to 11.0s. ingest-small-values/ingest 128MB/100b rand-1024keys time: [1.0715 s 1.0828 s 1.0937 s] thrpt: [117.04 MiB/s 118.21 MiB/s 119.46 MiB/s] ingest-small-values/ingest 128MB/100b seq, no delta time: [425.49 ms 429.07 ms 432.04 ms] thrpt: [296.27 MiB/s 298.32 MiB/s 300.83 MiB/s] Found 1 outliers among 10 measurements (10.00%) 1 (10.00%) low mild ingest-big-values/ingest 128MB/8k seq time: [373.03 ms 375.84 ms 379.17 ms] thrpt: [337.58 MiB/s 340.57 MiB/s 343.13 MiB/s] Found 1 outliers among 10 measurements (10.00%) 1 (10.00%) high mild ingest-big-values/ingest 128MB/8k seq, no delta time: [81.534 ms 82.811 ms 83.364 ms] thrpt: [1.4994 GiB/s 1.5095 GiB/s 1.5331 GiB/s] Found 1 outliers among 10 measurements (10.00%) ``` --- pageserver/Cargo.toml | 4 + pageserver/benches/bench_ingest.rs | 235 ++++++++++++++++++ pageserver/src/l0_flush.rs | 4 +- .../src/tenant/storage_layer/delta_layer.rs | 45 ++-- .../tenant/storage_layer/inmemory_layer.rs | 37 +-- .../src/tenant/storage_layer/split_writer.rs | 11 +- pageserver/src/tenant/timeline.rs | 11 +- pageserver/src/tenant/timeline/compaction.rs | 44 ++-- .../src/tenant/timeline/detach_ancestor.rs | 6 +- 9 files changed, 322 insertions(+), 75 deletions(-) create mode 100644 pageserver/benches/bench_ingest.rs diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 43976250a4..0e748ee3db 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -108,3 +108,7 @@ harness = false [[bench]] name = "bench_walredo" harness = false + +[[bench]] +name = "bench_ingest" +harness = false diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs new file mode 100644 index 0000000000..af2b6934c6 --- /dev/null +++ b/pageserver/benches/bench_ingest.rs @@ -0,0 +1,235 @@ +use std::{env, num::NonZeroUsize}; + +use bytes::Bytes; +use camino::Utf8PathBuf; +use criterion::{criterion_group, criterion_main, Criterion}; +use pageserver::{ + config::PageServerConf, + context::{DownloadBehavior, RequestContext}, + l0_flush::{L0FlushConfig, L0FlushGlobalState}, + page_cache, + repository::Value, + task_mgr::TaskKind, + tenant::storage_layer::InMemoryLayer, + virtual_file::{self, api::IoEngineKind}, +}; +use pageserver_api::{key::Key, shard::TenantShardId}; +use utils::{ + bin_ser::BeSer, + id::{TenantId, TimelineId}, +}; + +// A very cheap hash for generating non-sequential keys. +fn murmurhash32(mut h: u32) -> u32 { + h ^= h >> 16; + h = h.wrapping_mul(0x85ebca6b); + h ^= h >> 13; + h = h.wrapping_mul(0xc2b2ae35); + h ^= h >> 16; + h +} + +enum KeyLayout { + /// Sequential unique keys + Sequential, + /// Random unique keys + Random, + /// Random keys, but only use the bits from the mask of them + RandomReuse(u32), +} + +enum WriteDelta { + Yes, + No, +} + +async fn ingest( + conf: &'static PageServerConf, + put_size: usize, + put_count: usize, + key_layout: KeyLayout, + write_delta: WriteDelta, +) -> anyhow::Result<()> { + let mut lsn = utils::lsn::Lsn(1000); + let mut key = Key::from_i128(0x0); + + let timeline_id = TimelineId::generate(); + let tenant_id = TenantId::generate(); + let tenant_shard_id = TenantShardId::unsharded(tenant_id); + + tokio::fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id)).await?; + + let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); + + let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &ctx).await?; + + let data = Value::Image(Bytes::from(vec![0u8; put_size])).ser()?; + let ctx = RequestContext::new( + pageserver::task_mgr::TaskKind::WalReceiverConnectionHandler, + pageserver::context::DownloadBehavior::Download, + ); + + for i in 0..put_count { + lsn += put_size as u64; + + // Generate lots of keys within a single relation, which simulates the typical bulk ingest case: people + // usually care the most about write performance when they're blasting a huge batch of data into a huge table. + match key_layout { + KeyLayout::Sequential => { + // Use sequential order to illustrate the experience a user is likely to have + // when ingesting bulk data. + key.field6 = i as u32; + } + KeyLayout::Random => { + // Use random-order keys to avoid giving a false advantage to data structures that are + // faster when inserting on the end. + key.field6 = murmurhash32(i as u32); + } + KeyLayout::RandomReuse(mask) => { + // Use low bits only, to limit cardinality + key.field6 = murmurhash32(i as u32) & mask; + } + } + + layer.put_value(key, lsn, &data, &ctx).await?; + } + layer.freeze(lsn + 1).await; + + if matches!(write_delta, WriteDelta::Yes) { + let l0_flush_state = L0FlushGlobalState::new(L0FlushConfig::Direct { + max_concurrency: NonZeroUsize::new(1).unwrap(), + }); + let (_desc, path) = layer + .write_to_disk(&ctx, None, l0_flush_state.inner()) + .await? + .unwrap(); + tokio::fs::remove_file(path).await?; + } + + Ok(()) +} + +/// Wrapper to instantiate a tokio runtime +fn ingest_main( + conf: &'static PageServerConf, + put_size: usize, + put_count: usize, + key_layout: KeyLayout, + write_delta: WriteDelta, +) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + runtime.block_on(async move { + let r = ingest(conf, put_size, put_count, key_layout, write_delta).await; + if let Err(e) = r { + panic!("{e:?}"); + } + }); +} + +/// Declare a series of benchmarks for the Pageserver's ingest write path. +/// +/// This benchmark does not include WAL decode: it starts at InMemoryLayer::put_value, and ends either +/// at freezing the ephemeral layer, or writing the ephemeral layer out to an L0 (depending on whether WriteDelta is set). +/// +/// Genuine disk I/O is used, so expect results to differ depending on storage. However, when running on +/// a fast disk, CPU is the bottleneck at time of writing. +fn criterion_benchmark(c: &mut Criterion) { + let temp_dir_parent: Utf8PathBuf = env::current_dir().unwrap().try_into().unwrap(); + let temp_dir = camino_tempfile::tempdir_in(temp_dir_parent).unwrap(); + eprintln!("Data directory: {}", temp_dir.path()); + + let conf: &'static PageServerConf = Box::leak(Box::new( + pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()), + )); + virtual_file::init(16384, IoEngineKind::TokioEpollUring); + page_cache::init(conf.page_cache_size); + + { + let mut group = c.benchmark_group("ingest-small-values"); + let put_size = 100usize; + let put_count = 128 * 1024 * 1024 / put_size; + group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64)); + group.sample_size(10); + group.bench_function("ingest 128MB/100b seq", |b| { + b.iter(|| { + ingest_main( + conf, + put_size, + put_count, + KeyLayout::Sequential, + WriteDelta::Yes, + ) + }) + }); + group.bench_function("ingest 128MB/100b rand", |b| { + b.iter(|| { + ingest_main( + conf, + put_size, + put_count, + KeyLayout::Random, + WriteDelta::Yes, + ) + }) + }); + group.bench_function("ingest 128MB/100b rand-1024keys", |b| { + b.iter(|| { + ingest_main( + conf, + put_size, + put_count, + KeyLayout::RandomReuse(0x3ff), + WriteDelta::Yes, + ) + }) + }); + group.bench_function("ingest 128MB/100b seq, no delta", |b| { + b.iter(|| { + ingest_main( + conf, + put_size, + put_count, + KeyLayout::Sequential, + WriteDelta::No, + ) + }) + }); + } + + { + let mut group = c.benchmark_group("ingest-big-values"); + let put_size = 8192usize; + let put_count = 128 * 1024 * 1024 / put_size; + group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64)); + group.sample_size(10); + group.bench_function("ingest 128MB/8k seq", |b| { + b.iter(|| { + ingest_main( + conf, + put_size, + put_count, + KeyLayout::Sequential, + WriteDelta::Yes, + ) + }) + }); + group.bench_function("ingest 128MB/8k seq, no delta", |b| { + b.iter(|| { + ingest_main( + conf, + put_size, + put_count, + KeyLayout::Sequential, + WriteDelta::No, + ) + }) + }); + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/pageserver/src/l0_flush.rs b/pageserver/src/l0_flush.rs index 8945e5accd..10187f2ba3 100644 --- a/pageserver/src/l0_flush.rs +++ b/pageserver/src/l0_flush.rs @@ -24,7 +24,7 @@ impl Default for L0FlushConfig { #[derive(Clone)] pub struct L0FlushGlobalState(Arc); -pub(crate) enum Inner { +pub enum Inner { PageCached, Direct { semaphore: tokio::sync::Semaphore }, } @@ -40,7 +40,7 @@ impl L0FlushGlobalState { } } - pub(crate) fn inner(&self) -> &Arc { + pub fn inner(&self) -> &Arc { &self.0 } } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 962faa6796..bff8f7cb24 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -36,13 +36,12 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi use crate::tenant::disk_btree::{ DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection, }; -use crate::tenant::storage_layer::Layer; use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner, }; -use crate::tenant::{PageReconstructError, Timeline}; +use crate::tenant::PageReconstructError; use crate::virtual_file::{self, VirtualFile}; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; @@ -72,7 +71,7 @@ use utils::{ lsn::Lsn, }; -use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ResidentLayer, ValuesReconstructState}; +use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState}; /// /// Header stored in the beginning of the file @@ -367,7 +366,6 @@ impl DeltaLayer { /// 3. Call `finish`. /// struct DeltaLayerWriterInner { - conf: &'static PageServerConf, pub path: Utf8PathBuf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, @@ -414,7 +412,6 @@ impl DeltaLayerWriterInner { let tree_builder = DiskBtreeBuilder::new(block_buf); Ok(Self { - conf, path, timeline_id, tenant_shard_id, @@ -489,11 +486,10 @@ impl DeltaLayerWriterInner { async fn finish( self, key_end: Key, - timeline: &Arc, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { let temp_path = self.path.clone(); - let result = self.finish0(key_end, timeline, ctx).await; + let result = self.finish0(key_end, ctx).await; if result.is_err() { tracing::info!(%temp_path, "cleaning up temporary file after error during writing"); if let Err(e) = std::fs::remove_file(&temp_path) { @@ -506,9 +502,8 @@ impl DeltaLayerWriterInner { async fn finish0( self, key_end: Key, - timeline: &Arc, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { let index_start_blk = ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; @@ -573,11 +568,9 @@ impl DeltaLayerWriterInner { // fsync the file file.sync_all().await?; - let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?; + trace!("created delta layer {}", self.path); - trace!("created delta layer {}", layer.local_path()); - - Ok(layer) + Ok((desc, self.path)) } } @@ -678,14 +671,9 @@ impl DeltaLayerWriter { pub(crate) async fn finish( mut self, key_end: Key, - timeline: &Arc, ctx: &RequestContext, - ) -> anyhow::Result { - self.inner - .take() - .unwrap() - .finish(key_end, timeline, ctx) - .await + ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { + self.inner.take().unwrap().finish(key_end, ctx).await } #[cfg(test)] @@ -1592,8 +1580,9 @@ pub(crate) mod test { use super::*; use crate::repository::Value; use crate::tenant::harness::TIMELINE_ID; + use crate::tenant::storage_layer::{Layer, ResidentLayer}; use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner; - use crate::tenant::Tenant; + use crate::tenant::{Tenant, Timeline}; use crate::{ context::DownloadBehavior, task_mgr::TaskKind, @@ -1887,9 +1876,8 @@ pub(crate) mod test { res?; } - let resident = writer - .finish(entries_meta.key_range.end, &timeline, &ctx) - .await?; + let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?; + let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?; let inner = resident.get_as_delta(&ctx).await?; @@ -2078,7 +2066,8 @@ pub(crate) mod test { .await .unwrap(); - let copied_layer = writer.finish(Key::MAX, &branch, ctx).await.unwrap(); + let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap(); + let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap(); copied_layer.get_as_delta(ctx).await.unwrap(); @@ -2206,7 +2195,9 @@ pub(crate) mod test { for (key, lsn, value) in deltas { writer.put_value(key, lsn, value, ctx).await?; } - let delta_layer = writer.finish(key_end, tline, ctx).await?; + + let (desc, path) = writer.finish(key_end, ctx).await?; + let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?; Ok::<_, anyhow::Error>(delta_layer) } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 6abc89c2ed..f118f3d8d8 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -11,9 +11,10 @@ use crate::repository::{Key, Value}; use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef}; use crate::tenant::ephemeral_file::EphemeralFile; use crate::tenant::timeline::GetVectoredError; -use crate::tenant::{PageReconstructError, Timeline}; +use crate::tenant::PageReconstructError; use crate::{l0_flush, page_cache, walrecord}; use anyhow::{anyhow, Result}; +use camino::Utf8PathBuf; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::InMemoryLayerInfo; use pageserver_api::shard::TenantShardId; @@ -32,7 +33,9 @@ use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::atomic::{AtomicU64, AtomicUsize}; use tokio::sync::{RwLock, RwLockWriteGuard}; -use super::{DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValuesReconstructState}; +use super::{ + DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState, +}; #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] pub(crate) struct InMemoryLayerFileId(page_cache::FileId); @@ -410,8 +413,7 @@ impl InMemoryLayer { /// Common subroutine of the public put_wal_record() and put_page_image() functions. /// Adds the page version to the in-memory tree - - pub(crate) async fn put_value( + pub async fn put_value( &self, key: Key, lsn: Lsn, @@ -476,8 +478,6 @@ impl InMemoryLayer { /// Records the end_lsn for non-dropped layers. /// `end_lsn` is exclusive pub async fn freeze(&self, end_lsn: Lsn) { - let inner = self.inner.write().await; - assert!( self.start_lsn < end_lsn, "{} >= {}", @@ -495,9 +495,13 @@ impl InMemoryLayer { }) .expect("frozen_local_path_str set only once"); - for vec_map in inner.index.values() { - for (lsn, _pos) in vec_map.as_slice() { - assert!(*lsn < end_lsn); + #[cfg(debug_assertions)] + { + let inner = self.inner.write().await; + for vec_map in inner.index.values() { + for (lsn, _pos) in vec_map.as_slice() { + assert!(*lsn < end_lsn); + } } } } @@ -507,12 +511,12 @@ impl InMemoryLayer { /// if there are no matching keys. /// /// Returns a new delta layer with all the same data as this in-memory layer - pub(crate) async fn write_to_disk( + pub async fn write_to_disk( &self, - timeline: &Arc, ctx: &RequestContext, key_range: Option>, - ) -> Result> { + l0_flush_global_state: &l0_flush::Inner, + ) -> Result> { // Grab the lock in read-mode. We hold it over the I/O, but because this // layer is not writeable anymore, no one should be trying to acquire the // write lock on it, so we shouldn't block anyone. There's one exception @@ -524,9 +528,8 @@ impl InMemoryLayer { // rare though, so we just accept the potential latency hit for now. let inner = self.inner.read().await; - let l0_flush_global_state = timeline.l0_flush_global_state.inner().clone(); use l0_flush::Inner; - let _concurrency_permit = match &*l0_flush_global_state { + let _concurrency_permit = match l0_flush_global_state { Inner::PageCached => None, Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await), }; @@ -556,7 +559,7 @@ impl InMemoryLayer { ) .await?; - match &*l0_flush_global_state { + match l0_flush_global_state { l0_flush::Inner::PageCached => { let ctx = RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::InMemoryLayer) @@ -621,7 +624,7 @@ impl InMemoryLayer { } // MAX is used here because we identify L0 layers by full key range - let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, ctx).await?; + let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?; // Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``. // @@ -633,6 +636,6 @@ impl InMemoryLayer { // we dirtied when writing to the filesystem have been flushed and marked !dirty. drop(_concurrency_permit); - Ok(Some(delta_layer)) + Ok(Some((desc, path))) } } diff --git a/pageserver/src/tenant/storage_layer/split_writer.rs b/pageserver/src/tenant/storage_layer/split_writer.rs index a966775f9e..d7bfe48c60 100644 --- a/pageserver/src/tenant/storage_layer/split_writer.rs +++ b/pageserver/src/tenant/storage_layer/split_writer.rs @@ -4,6 +4,7 @@ use bytes::Bytes; use pageserver_api::key::{Key, KEY_SIZE}; use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId}; +use crate::tenant::storage_layer::Layer; use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline}; use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer}; @@ -173,8 +174,9 @@ impl SplitDeltaLayerWriter { ) .await?; let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer); - self.generated_layers - .push(prev_delta_writer.finish(key, tline, ctx).await?); + let (desc, path) = prev_delta_writer.finish(key, ctx).await?; + let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; + self.generated_layers.push(delta_layer); } self.inner.put_value(key, lsn, val, ctx).await } @@ -190,7 +192,10 @@ impl SplitDeltaLayerWriter { inner, .. } = self; - generated_layers.push(inner.finish(end_key, tline, ctx).await?); + + let (desc, path) = inner.finish(end_key, ctx).await?; + let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; + generated_layers.push(delta_layer); Ok(generated_layers) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4ff87f20f1..a05e4e0712 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3709,12 +3709,14 @@ impl Timeline { let frozen_layer = Arc::clone(frozen_layer); let ctx = ctx.attached_child(); let work = async move { - let Some(new_delta) = frozen_layer - .write_to_disk(&self_clone, &ctx, key_range) + let Some((desc, path)) = frozen_layer + .write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner()) .await? else { return Ok(None); }; + let new_delta = Layer::finish_creating(self_clone.conf, &self_clone, desc, &path)?; + // The write_to_disk() above calls writer.finish() which already did the fsync of the inodes. // We just need to fsync the directory in which these inodes are linked, // which we know to be the timeline directory. @@ -5347,9 +5349,8 @@ impl Timeline { for (key, lsn, val) in deltas.data { delta_layer_writer.put_value(key, lsn, val, ctx).await?; } - let delta_layer = delta_layer_writer - .finish(deltas.key_range.end, self, ctx) - .await?; + let (desc, path) = delta_layer_writer.finish(deltas.key_range.end, ctx).await?; + let delta_layer = Layer::finish_creating(self.conf, self, desc, &path)?; { let mut guard = self.layers.write().await; diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 1ff029a313..276d7b4967 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1104,14 +1104,16 @@ impl Timeline { || contains_hole { // ... if so, flush previous layer and prepare to write new one - new_layers.push( - writer - .take() - .unwrap() - .finish(prev_key.unwrap().next(), self, ctx) - .await - .map_err(CompactionError::Other)?, - ); + let (desc, path) = writer + .take() + .unwrap() + .finish(prev_key.unwrap().next(), ctx) + .await + .map_err(CompactionError::Other)?; + let new_delta = Layer::finish_creating(self.conf, self, desc, &path) + .map_err(CompactionError::Other)?; + + new_layers.push(new_delta); writer = None; if contains_hole { @@ -1174,12 +1176,13 @@ impl Timeline { prev_key = Some(key); } if let Some(writer) = writer { - new_layers.push( - writer - .finish(prev_key.unwrap().next(), self, ctx) - .await - .map_err(CompactionError::Other)?, - ); + let (desc, path) = writer + .finish(prev_key.unwrap().next(), ctx) + .await + .map_err(CompactionError::Other)?; + let new_delta = Layer::finish_creating(self.conf, self, desc, &path) + .map_err(CompactionError::Other)?; + new_layers.push(new_delta); } // Sync layers @@ -1966,13 +1969,16 @@ impl Timeline { for (key, lsn, val) in deltas { delta_layer_writer.put_value(key, lsn, val, ctx).await?; } + stats.produce_delta_layer(delta_layer_writer.size()); if dry_run { return Ok(None); } - let delta_layer = delta_layer_writer - .finish(delta_key.key_range.end, tline, ctx) + + let (desc, path) = delta_layer_writer + .finish(delta_key.key_range.end, ctx) .await?; + let delta_layer = Layer::finish_creating(tline.conf, tline, desc, &path)?; Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer))) } @@ -2413,9 +2419,9 @@ impl CompactionJobExecutor for TimelineAdaptor { )) }); - let new_delta_layer = writer - .finish(prev.unwrap().0.next(), &self.timeline, ctx) - .await?; + let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?; + let new_delta_layer = + Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?; self.new_deltas.push(new_delta_layer); Ok(()) diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index ee5f8cd52a..645b5ad2bf 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -488,10 +488,12 @@ async fn copy_lsn_prefix( // reuse the key instead of adding more holes between layers by using the real // highest key in the layer. let reused_highest_key = layer.layer_desc().key_range.end; - let copied = writer - .finish(reused_highest_key, target_timeline, ctx) + let (desc, path) = writer + .finish(reused_highest_key, ctx) .await .map_err(CopyDeltaPrefix)?; + let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path) + .map_err(CopyDeltaPrefix)?; tracing::debug!(%layer, %copied, "new layer produced");