mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 21:50:37 +00:00
pageserver: add bench_ingest (#7409)
## Problem We lack a rust bench for the inmemory layer and delta layer write paths: it is useful to benchmark these components independent of postgres & WAL decoding. Related: https://github.com/neondatabase/neon/issues/8452 ## Summary of changes - Refactor DeltaLayerWriter to avoid carrying a Timeline, so that it can be cleanly tested + benched without a Tenant/Timeline test harness. It only needed the Timeline for building `Layer`, so this can be done in a separate step. - Add `bench_ingest`, which exercises a variety of workload "shapes" (big values, small values, sequential keys, random keys) - Include a small uncontroversial optimization: in `freeze`, only exhaustively walk values to assert ordering relative to end_lsn in debug mode. These benches are limited by drive performance on a lot of machines, but still useful as a local tool for iterating on CPU/memory improvements around this code path. Anecdotal measurements on Hetzner AX102 (Ryzen 7950xd): ``` ingest-small-values/ingest 128MB/100b seq time: [1.1160 s 1.1230 s 1.1289 s] thrpt: [113.38 MiB/s 113.98 MiB/s 114.70 MiB/s] Found 1 outliers among 10 measurements (10.00%) 1 (10.00%) low mild Benchmarking ingest-small-values/ingest 128MB/100b rand: Warming up for 3.0000 s Warning: Unable to complete 10 samples in 10.0s. You may wish to increase target time to 18.9s. ingest-small-values/ingest 128MB/100b rand time: [1.9001 s 1.9056 s 1.9110 s] thrpt: [66.982 MiB/s 67.171 MiB/s 67.365 MiB/s] Benchmarking ingest-small-values/ingest 128MB/100b rand-1024keys: Warming up for 3.0000 s Warning: Unable to complete 10 samples in 10.0s. You may wish to increase target time to 11.0s. ingest-small-values/ingest 128MB/100b rand-1024keys time: [1.0715 s 1.0828 s 1.0937 s] thrpt: [117.04 MiB/s 118.21 MiB/s 119.46 MiB/s] ingest-small-values/ingest 128MB/100b seq, no delta time: [425.49 ms 429.07 ms 432.04 ms] thrpt: [296.27 MiB/s 298.32 MiB/s 300.83 MiB/s] Found 1 outliers among 10 measurements (10.00%) 1 (10.00%) low mild ingest-big-values/ingest 128MB/8k seq time: [373.03 ms 375.84 ms 379.17 ms] thrpt: [337.58 MiB/s 340.57 MiB/s 343.13 MiB/s] Found 1 outliers among 10 measurements (10.00%) 1 (10.00%) high mild ingest-big-values/ingest 128MB/8k seq, no delta time: [81.534 ms 82.811 ms 83.364 ms] thrpt: [1.4994 GiB/s 1.5095 GiB/s 1.5331 GiB/s] Found 1 outliers among 10 measurements (10.00%) ```
This commit is contained in:
@@ -108,3 +108,7 @@ harness = false
|
|||||||
[[bench]]
|
[[bench]]
|
||||||
name = "bench_walredo"
|
name = "bench_walredo"
|
||||||
harness = false
|
harness = false
|
||||||
|
|
||||||
|
[[bench]]
|
||||||
|
name = "bench_ingest"
|
||||||
|
harness = false
|
||||||
|
|||||||
235
pageserver/benches/bench_ingest.rs
Normal file
235
pageserver/benches/bench_ingest.rs
Normal file
@@ -0,0 +1,235 @@
|
|||||||
|
use std::{env, num::NonZeroUsize};
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
use camino::Utf8PathBuf;
|
||||||
|
use criterion::{criterion_group, criterion_main, Criterion};
|
||||||
|
use pageserver::{
|
||||||
|
config::PageServerConf,
|
||||||
|
context::{DownloadBehavior, RequestContext},
|
||||||
|
l0_flush::{L0FlushConfig, L0FlushGlobalState},
|
||||||
|
page_cache,
|
||||||
|
repository::Value,
|
||||||
|
task_mgr::TaskKind,
|
||||||
|
tenant::storage_layer::InMemoryLayer,
|
||||||
|
virtual_file::{self, api::IoEngineKind},
|
||||||
|
};
|
||||||
|
use pageserver_api::{key::Key, shard::TenantShardId};
|
||||||
|
use utils::{
|
||||||
|
bin_ser::BeSer,
|
||||||
|
id::{TenantId, TimelineId},
|
||||||
|
};
|
||||||
|
|
||||||
|
// A very cheap hash for generating non-sequential keys.
|
||||||
|
fn murmurhash32(mut h: u32) -> u32 {
|
||||||
|
h ^= h >> 16;
|
||||||
|
h = h.wrapping_mul(0x85ebca6b);
|
||||||
|
h ^= h >> 13;
|
||||||
|
h = h.wrapping_mul(0xc2b2ae35);
|
||||||
|
h ^= h >> 16;
|
||||||
|
h
|
||||||
|
}
|
||||||
|
|
||||||
|
enum KeyLayout {
|
||||||
|
/// Sequential unique keys
|
||||||
|
Sequential,
|
||||||
|
/// Random unique keys
|
||||||
|
Random,
|
||||||
|
/// Random keys, but only use the bits from the mask of them
|
||||||
|
RandomReuse(u32),
|
||||||
|
}
|
||||||
|
|
||||||
|
enum WriteDelta {
|
||||||
|
Yes,
|
||||||
|
No,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn ingest(
|
||||||
|
conf: &'static PageServerConf,
|
||||||
|
put_size: usize,
|
||||||
|
put_count: usize,
|
||||||
|
key_layout: KeyLayout,
|
||||||
|
write_delta: WriteDelta,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let mut lsn = utils::lsn::Lsn(1000);
|
||||||
|
let mut key = Key::from_i128(0x0);
|
||||||
|
|
||||||
|
let timeline_id = TimelineId::generate();
|
||||||
|
let tenant_id = TenantId::generate();
|
||||||
|
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||||
|
|
||||||
|
tokio::fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id)).await?;
|
||||||
|
|
||||||
|
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||||
|
|
||||||
|
let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &ctx).await?;
|
||||||
|
|
||||||
|
let data = Value::Image(Bytes::from(vec![0u8; put_size])).ser()?;
|
||||||
|
let ctx = RequestContext::new(
|
||||||
|
pageserver::task_mgr::TaskKind::WalReceiverConnectionHandler,
|
||||||
|
pageserver::context::DownloadBehavior::Download,
|
||||||
|
);
|
||||||
|
|
||||||
|
for i in 0..put_count {
|
||||||
|
lsn += put_size as u64;
|
||||||
|
|
||||||
|
// Generate lots of keys within a single relation, which simulates the typical bulk ingest case: people
|
||||||
|
// usually care the most about write performance when they're blasting a huge batch of data into a huge table.
|
||||||
|
match key_layout {
|
||||||
|
KeyLayout::Sequential => {
|
||||||
|
// Use sequential order to illustrate the experience a user is likely to have
|
||||||
|
// when ingesting bulk data.
|
||||||
|
key.field6 = i as u32;
|
||||||
|
}
|
||||||
|
KeyLayout::Random => {
|
||||||
|
// Use random-order keys to avoid giving a false advantage to data structures that are
|
||||||
|
// faster when inserting on the end.
|
||||||
|
key.field6 = murmurhash32(i as u32);
|
||||||
|
}
|
||||||
|
KeyLayout::RandomReuse(mask) => {
|
||||||
|
// Use low bits only, to limit cardinality
|
||||||
|
key.field6 = murmurhash32(i as u32) & mask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
layer.put_value(key, lsn, &data, &ctx).await?;
|
||||||
|
}
|
||||||
|
layer.freeze(lsn + 1).await;
|
||||||
|
|
||||||
|
if matches!(write_delta, WriteDelta::Yes) {
|
||||||
|
let l0_flush_state = L0FlushGlobalState::new(L0FlushConfig::Direct {
|
||||||
|
max_concurrency: NonZeroUsize::new(1).unwrap(),
|
||||||
|
});
|
||||||
|
let (_desc, path) = layer
|
||||||
|
.write_to_disk(&ctx, None, l0_flush_state.inner())
|
||||||
|
.await?
|
||||||
|
.unwrap();
|
||||||
|
tokio::fs::remove_file(path).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wrapper to instantiate a tokio runtime
|
||||||
|
fn ingest_main(
|
||||||
|
conf: &'static PageServerConf,
|
||||||
|
put_size: usize,
|
||||||
|
put_count: usize,
|
||||||
|
key_layout: KeyLayout,
|
||||||
|
write_delta: WriteDelta,
|
||||||
|
) {
|
||||||
|
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
runtime.block_on(async move {
|
||||||
|
let r = ingest(conf, put_size, put_count, key_layout, write_delta).await;
|
||||||
|
if let Err(e) = r {
|
||||||
|
panic!("{e:?}");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Declare a series of benchmarks for the Pageserver's ingest write path.
|
||||||
|
///
|
||||||
|
/// This benchmark does not include WAL decode: it starts at InMemoryLayer::put_value, and ends either
|
||||||
|
/// at freezing the ephemeral layer, or writing the ephemeral layer out to an L0 (depending on whether WriteDelta is set).
|
||||||
|
///
|
||||||
|
/// Genuine disk I/O is used, so expect results to differ depending on storage. However, when running on
|
||||||
|
/// a fast disk, CPU is the bottleneck at time of writing.
|
||||||
|
fn criterion_benchmark(c: &mut Criterion) {
|
||||||
|
let temp_dir_parent: Utf8PathBuf = env::current_dir().unwrap().try_into().unwrap();
|
||||||
|
let temp_dir = camino_tempfile::tempdir_in(temp_dir_parent).unwrap();
|
||||||
|
eprintln!("Data directory: {}", temp_dir.path());
|
||||||
|
|
||||||
|
let conf: &'static PageServerConf = Box::leak(Box::new(
|
||||||
|
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
|
||||||
|
));
|
||||||
|
virtual_file::init(16384, IoEngineKind::TokioEpollUring);
|
||||||
|
page_cache::init(conf.page_cache_size);
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut group = c.benchmark_group("ingest-small-values");
|
||||||
|
let put_size = 100usize;
|
||||||
|
let put_count = 128 * 1024 * 1024 / put_size;
|
||||||
|
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
|
||||||
|
group.sample_size(10);
|
||||||
|
group.bench_function("ingest 128MB/100b seq", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
ingest_main(
|
||||||
|
conf,
|
||||||
|
put_size,
|
||||||
|
put_count,
|
||||||
|
KeyLayout::Sequential,
|
||||||
|
WriteDelta::Yes,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
});
|
||||||
|
group.bench_function("ingest 128MB/100b rand", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
ingest_main(
|
||||||
|
conf,
|
||||||
|
put_size,
|
||||||
|
put_count,
|
||||||
|
KeyLayout::Random,
|
||||||
|
WriteDelta::Yes,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
});
|
||||||
|
group.bench_function("ingest 128MB/100b rand-1024keys", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
ingest_main(
|
||||||
|
conf,
|
||||||
|
put_size,
|
||||||
|
put_count,
|
||||||
|
KeyLayout::RandomReuse(0x3ff),
|
||||||
|
WriteDelta::Yes,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
});
|
||||||
|
group.bench_function("ingest 128MB/100b seq, no delta", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
ingest_main(
|
||||||
|
conf,
|
||||||
|
put_size,
|
||||||
|
put_count,
|
||||||
|
KeyLayout::Sequential,
|
||||||
|
WriteDelta::No,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut group = c.benchmark_group("ingest-big-values");
|
||||||
|
let put_size = 8192usize;
|
||||||
|
let put_count = 128 * 1024 * 1024 / put_size;
|
||||||
|
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
|
||||||
|
group.sample_size(10);
|
||||||
|
group.bench_function("ingest 128MB/8k seq", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
ingest_main(
|
||||||
|
conf,
|
||||||
|
put_size,
|
||||||
|
put_count,
|
||||||
|
KeyLayout::Sequential,
|
||||||
|
WriteDelta::Yes,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
});
|
||||||
|
group.bench_function("ingest 128MB/8k seq, no delta", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
ingest_main(
|
||||||
|
conf,
|
||||||
|
put_size,
|
||||||
|
put_count,
|
||||||
|
KeyLayout::Sequential,
|
||||||
|
WriteDelta::No,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
criterion_group!(benches, criterion_benchmark);
|
||||||
|
criterion_main!(benches);
|
||||||
@@ -24,7 +24,7 @@ impl Default for L0FlushConfig {
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct L0FlushGlobalState(Arc<Inner>);
|
pub struct L0FlushGlobalState(Arc<Inner>);
|
||||||
|
|
||||||
pub(crate) enum Inner {
|
pub enum Inner {
|
||||||
PageCached,
|
PageCached,
|
||||||
Direct { semaphore: tokio::sync::Semaphore },
|
Direct { semaphore: tokio::sync::Semaphore },
|
||||||
}
|
}
|
||||||
@@ -40,7 +40,7 @@ impl L0FlushGlobalState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn inner(&self) -> &Arc<Inner> {
|
pub fn inner(&self) -> &Arc<Inner> {
|
||||||
&self.0
|
&self.0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -36,13 +36,12 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi
|
|||||||
use crate::tenant::disk_btree::{
|
use crate::tenant::disk_btree::{
|
||||||
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
|
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
|
||||||
};
|
};
|
||||||
use crate::tenant::storage_layer::Layer;
|
|
||||||
use crate::tenant::timeline::GetVectoredError;
|
use crate::tenant::timeline::GetVectoredError;
|
||||||
use crate::tenant::vectored_blob_io::{
|
use crate::tenant::vectored_blob_io::{
|
||||||
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||||
VectoredReadPlanner,
|
VectoredReadPlanner,
|
||||||
};
|
};
|
||||||
use crate::tenant::{PageReconstructError, Timeline};
|
use crate::tenant::PageReconstructError;
|
||||||
use crate::virtual_file::{self, VirtualFile};
|
use crate::virtual_file::{self, VirtualFile};
|
||||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||||
@@ -72,7 +71,7 @@ use utils::{
|
|||||||
lsn::Lsn,
|
lsn::Lsn,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ResidentLayer, ValuesReconstructState};
|
use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Header stored in the beginning of the file
|
/// Header stored in the beginning of the file
|
||||||
@@ -367,7 +366,6 @@ impl DeltaLayer {
|
|||||||
/// 3. Call `finish`.
|
/// 3. Call `finish`.
|
||||||
///
|
///
|
||||||
struct DeltaLayerWriterInner {
|
struct DeltaLayerWriterInner {
|
||||||
conf: &'static PageServerConf,
|
|
||||||
pub path: Utf8PathBuf,
|
pub path: Utf8PathBuf,
|
||||||
timeline_id: TimelineId,
|
timeline_id: TimelineId,
|
||||||
tenant_shard_id: TenantShardId,
|
tenant_shard_id: TenantShardId,
|
||||||
@@ -414,7 +412,6 @@ impl DeltaLayerWriterInner {
|
|||||||
let tree_builder = DiskBtreeBuilder::new(block_buf);
|
let tree_builder = DiskBtreeBuilder::new(block_buf);
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
conf,
|
|
||||||
path,
|
path,
|
||||||
timeline_id,
|
timeline_id,
|
||||||
tenant_shard_id,
|
tenant_shard_id,
|
||||||
@@ -489,11 +486,10 @@ impl DeltaLayerWriterInner {
|
|||||||
async fn finish(
|
async fn finish(
|
||||||
self,
|
self,
|
||||||
key_end: Key,
|
key_end: Key,
|
||||||
timeline: &Arc<Timeline>,
|
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> anyhow::Result<ResidentLayer> {
|
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||||
let temp_path = self.path.clone();
|
let temp_path = self.path.clone();
|
||||||
let result = self.finish0(key_end, timeline, ctx).await;
|
let result = self.finish0(key_end, ctx).await;
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
|
tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
|
||||||
if let Err(e) = std::fs::remove_file(&temp_path) {
|
if let Err(e) = std::fs::remove_file(&temp_path) {
|
||||||
@@ -506,9 +502,8 @@ impl DeltaLayerWriterInner {
|
|||||||
async fn finish0(
|
async fn finish0(
|
||||||
self,
|
self,
|
||||||
key_end: Key,
|
key_end: Key,
|
||||||
timeline: &Arc<Timeline>,
|
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> anyhow::Result<ResidentLayer> {
|
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||||
let index_start_blk =
|
let index_start_blk =
|
||||||
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
||||||
|
|
||||||
@@ -573,11 +568,9 @@ impl DeltaLayerWriterInner {
|
|||||||
// fsync the file
|
// fsync the file
|
||||||
file.sync_all().await?;
|
file.sync_all().await?;
|
||||||
|
|
||||||
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
|
trace!("created delta layer {}", self.path);
|
||||||
|
|
||||||
trace!("created delta layer {}", layer.local_path());
|
Ok((desc, self.path))
|
||||||
|
|
||||||
Ok(layer)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -678,14 +671,9 @@ impl DeltaLayerWriter {
|
|||||||
pub(crate) async fn finish(
|
pub(crate) async fn finish(
|
||||||
mut self,
|
mut self,
|
||||||
key_end: Key,
|
key_end: Key,
|
||||||
timeline: &Arc<Timeline>,
|
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> anyhow::Result<ResidentLayer> {
|
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||||
self.inner
|
self.inner.take().unwrap().finish(key_end, ctx).await
|
||||||
.take()
|
|
||||||
.unwrap()
|
|
||||||
.finish(key_end, timeline, ctx)
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -1592,8 +1580,9 @@ pub(crate) mod test {
|
|||||||
use super::*;
|
use super::*;
|
||||||
use crate::repository::Value;
|
use crate::repository::Value;
|
||||||
use crate::tenant::harness::TIMELINE_ID;
|
use crate::tenant::harness::TIMELINE_ID;
|
||||||
|
use crate::tenant::storage_layer::{Layer, ResidentLayer};
|
||||||
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
|
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
|
||||||
use crate::tenant::Tenant;
|
use crate::tenant::{Tenant, Timeline};
|
||||||
use crate::{
|
use crate::{
|
||||||
context::DownloadBehavior,
|
context::DownloadBehavior,
|
||||||
task_mgr::TaskKind,
|
task_mgr::TaskKind,
|
||||||
@@ -1887,9 +1876,8 @@ pub(crate) mod test {
|
|||||||
res?;
|
res?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let resident = writer
|
let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?;
|
||||||
.finish(entries_meta.key_range.end, &timeline, &ctx)
|
let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?;
|
||||||
.await?;
|
|
||||||
|
|
||||||
let inner = resident.get_as_delta(&ctx).await?;
|
let inner = resident.get_as_delta(&ctx).await?;
|
||||||
|
|
||||||
@@ -2078,7 +2066,8 @@ pub(crate) mod test {
|
|||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let copied_layer = writer.finish(Key::MAX, &branch, ctx).await.unwrap();
|
let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap();
|
||||||
|
let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap();
|
||||||
|
|
||||||
copied_layer.get_as_delta(ctx).await.unwrap();
|
copied_layer.get_as_delta(ctx).await.unwrap();
|
||||||
|
|
||||||
@@ -2206,7 +2195,9 @@ pub(crate) mod test {
|
|||||||
for (key, lsn, value) in deltas {
|
for (key, lsn, value) in deltas {
|
||||||
writer.put_value(key, lsn, value, ctx).await?;
|
writer.put_value(key, lsn, value, ctx).await?;
|
||||||
}
|
}
|
||||||
let delta_layer = writer.finish(key_end, tline, ctx).await?;
|
|
||||||
|
let (desc, path) = writer.finish(key_end, ctx).await?;
|
||||||
|
let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
|
||||||
|
|
||||||
Ok::<_, anyhow::Error>(delta_layer)
|
Ok::<_, anyhow::Error>(delta_layer)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,9 +11,10 @@ use crate::repository::{Key, Value};
|
|||||||
use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
|
use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
|
||||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||||
use crate::tenant::timeline::GetVectoredError;
|
use crate::tenant::timeline::GetVectoredError;
|
||||||
use crate::tenant::{PageReconstructError, Timeline};
|
use crate::tenant::PageReconstructError;
|
||||||
use crate::{l0_flush, page_cache, walrecord};
|
use crate::{l0_flush, page_cache, walrecord};
|
||||||
use anyhow::{anyhow, Result};
|
use anyhow::{anyhow, Result};
|
||||||
|
use camino::Utf8PathBuf;
|
||||||
use pageserver_api::keyspace::KeySpace;
|
use pageserver_api::keyspace::KeySpace;
|
||||||
use pageserver_api::models::InMemoryLayerInfo;
|
use pageserver_api::models::InMemoryLayerInfo;
|
||||||
use pageserver_api::shard::TenantShardId;
|
use pageserver_api::shard::TenantShardId;
|
||||||
@@ -32,7 +33,9 @@ use std::sync::atomic::Ordering as AtomicOrdering;
|
|||||||
use std::sync::atomic::{AtomicU64, AtomicUsize};
|
use std::sync::atomic::{AtomicU64, AtomicUsize};
|
||||||
use tokio::sync::{RwLock, RwLockWriteGuard};
|
use tokio::sync::{RwLock, RwLockWriteGuard};
|
||||||
|
|
||||||
use super::{DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValuesReconstructState};
|
use super::{
|
||||||
|
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
|
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
|
||||||
pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
|
pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
|
||||||
@@ -410,8 +413,7 @@ impl InMemoryLayer {
|
|||||||
|
|
||||||
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
|
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
|
||||||
/// Adds the page version to the in-memory tree
|
/// Adds the page version to the in-memory tree
|
||||||
|
pub async fn put_value(
|
||||||
pub(crate) async fn put_value(
|
|
||||||
&self,
|
&self,
|
||||||
key: Key,
|
key: Key,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
@@ -476,8 +478,6 @@ impl InMemoryLayer {
|
|||||||
/// Records the end_lsn for non-dropped layers.
|
/// Records the end_lsn for non-dropped layers.
|
||||||
/// `end_lsn` is exclusive
|
/// `end_lsn` is exclusive
|
||||||
pub async fn freeze(&self, end_lsn: Lsn) {
|
pub async fn freeze(&self, end_lsn: Lsn) {
|
||||||
let inner = self.inner.write().await;
|
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
self.start_lsn < end_lsn,
|
self.start_lsn < end_lsn,
|
||||||
"{} >= {}",
|
"{} >= {}",
|
||||||
@@ -495,9 +495,13 @@ impl InMemoryLayer {
|
|||||||
})
|
})
|
||||||
.expect("frozen_local_path_str set only once");
|
.expect("frozen_local_path_str set only once");
|
||||||
|
|
||||||
for vec_map in inner.index.values() {
|
#[cfg(debug_assertions)]
|
||||||
for (lsn, _pos) in vec_map.as_slice() {
|
{
|
||||||
assert!(*lsn < end_lsn);
|
let inner = self.inner.write().await;
|
||||||
|
for vec_map in inner.index.values() {
|
||||||
|
for (lsn, _pos) in vec_map.as_slice() {
|
||||||
|
assert!(*lsn < end_lsn);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -507,12 +511,12 @@ impl InMemoryLayer {
|
|||||||
/// if there are no matching keys.
|
/// if there are no matching keys.
|
||||||
///
|
///
|
||||||
/// Returns a new delta layer with all the same data as this in-memory layer
|
/// Returns a new delta layer with all the same data as this in-memory layer
|
||||||
pub(crate) async fn write_to_disk(
|
pub async fn write_to_disk(
|
||||||
&self,
|
&self,
|
||||||
timeline: &Arc<Timeline>,
|
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
key_range: Option<Range<Key>>,
|
key_range: Option<Range<Key>>,
|
||||||
) -> Result<Option<ResidentLayer>> {
|
l0_flush_global_state: &l0_flush::Inner,
|
||||||
|
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
|
||||||
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
||||||
// layer is not writeable anymore, no one should be trying to acquire the
|
// layer is not writeable anymore, no one should be trying to acquire the
|
||||||
// write lock on it, so we shouldn't block anyone. There's one exception
|
// write lock on it, so we shouldn't block anyone. There's one exception
|
||||||
@@ -524,9 +528,8 @@ impl InMemoryLayer {
|
|||||||
// rare though, so we just accept the potential latency hit for now.
|
// rare though, so we just accept the potential latency hit for now.
|
||||||
let inner = self.inner.read().await;
|
let inner = self.inner.read().await;
|
||||||
|
|
||||||
let l0_flush_global_state = timeline.l0_flush_global_state.inner().clone();
|
|
||||||
use l0_flush::Inner;
|
use l0_flush::Inner;
|
||||||
let _concurrency_permit = match &*l0_flush_global_state {
|
let _concurrency_permit = match l0_flush_global_state {
|
||||||
Inner::PageCached => None,
|
Inner::PageCached => None,
|
||||||
Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
|
Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
|
||||||
};
|
};
|
||||||
@@ -556,7 +559,7 @@ impl InMemoryLayer {
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
match &*l0_flush_global_state {
|
match l0_flush_global_state {
|
||||||
l0_flush::Inner::PageCached => {
|
l0_flush::Inner::PageCached => {
|
||||||
let ctx = RequestContextBuilder::extend(ctx)
|
let ctx = RequestContextBuilder::extend(ctx)
|
||||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||||
@@ -621,7 +624,7 @@ impl InMemoryLayer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// MAX is used here because we identify L0 layers by full key range
|
// MAX is used here because we identify L0 layers by full key range
|
||||||
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, ctx).await?;
|
let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
|
||||||
|
|
||||||
// Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
|
// Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
|
||||||
//
|
//
|
||||||
@@ -633,6 +636,6 @@ impl InMemoryLayer {
|
|||||||
// we dirtied when writing to the filesystem have been flushed and marked !dirty.
|
// we dirtied when writing to the filesystem have been flushed and marked !dirty.
|
||||||
drop(_concurrency_permit);
|
drop(_concurrency_permit);
|
||||||
|
|
||||||
Ok(Some(delta_layer))
|
Ok(Some((desc, path)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ use bytes::Bytes;
|
|||||||
use pageserver_api::key::{Key, KEY_SIZE};
|
use pageserver_api::key::{Key, KEY_SIZE};
|
||||||
use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
|
use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
|
||||||
|
|
||||||
|
use crate::tenant::storage_layer::Layer;
|
||||||
use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
|
use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
|
||||||
|
|
||||||
use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer};
|
use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer};
|
||||||
@@ -173,8 +174,9 @@ impl SplitDeltaLayerWriter {
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
|
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
|
||||||
self.generated_layers
|
let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
|
||||||
.push(prev_delta_writer.finish(key, tline, ctx).await?);
|
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
|
||||||
|
self.generated_layers.push(delta_layer);
|
||||||
}
|
}
|
||||||
self.inner.put_value(key, lsn, val, ctx).await
|
self.inner.put_value(key, lsn, val, ctx).await
|
||||||
}
|
}
|
||||||
@@ -190,7 +192,10 @@ impl SplitDeltaLayerWriter {
|
|||||||
inner,
|
inner,
|
||||||
..
|
..
|
||||||
} = self;
|
} = self;
|
||||||
generated_layers.push(inner.finish(end_key, tline, ctx).await?);
|
|
||||||
|
let (desc, path) = inner.finish(end_key, ctx).await?;
|
||||||
|
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
|
||||||
|
generated_layers.push(delta_layer);
|
||||||
Ok(generated_layers)
|
Ok(generated_layers)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3709,12 +3709,14 @@ impl Timeline {
|
|||||||
let frozen_layer = Arc::clone(frozen_layer);
|
let frozen_layer = Arc::clone(frozen_layer);
|
||||||
let ctx = ctx.attached_child();
|
let ctx = ctx.attached_child();
|
||||||
let work = async move {
|
let work = async move {
|
||||||
let Some(new_delta) = frozen_layer
|
let Some((desc, path)) = frozen_layer
|
||||||
.write_to_disk(&self_clone, &ctx, key_range)
|
.write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner())
|
||||||
.await?
|
.await?
|
||||||
else {
|
else {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
|
let new_delta = Layer::finish_creating(self_clone.conf, &self_clone, desc, &path)?;
|
||||||
|
|
||||||
// The write_to_disk() above calls writer.finish() which already did the fsync of the inodes.
|
// The write_to_disk() above calls writer.finish() which already did the fsync of the inodes.
|
||||||
// We just need to fsync the directory in which these inodes are linked,
|
// We just need to fsync the directory in which these inodes are linked,
|
||||||
// which we know to be the timeline directory.
|
// which we know to be the timeline directory.
|
||||||
@@ -5347,9 +5349,8 @@ impl Timeline {
|
|||||||
for (key, lsn, val) in deltas.data {
|
for (key, lsn, val) in deltas.data {
|
||||||
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
|
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
|
||||||
}
|
}
|
||||||
let delta_layer = delta_layer_writer
|
let (desc, path) = delta_layer_writer.finish(deltas.key_range.end, ctx).await?;
|
||||||
.finish(deltas.key_range.end, self, ctx)
|
let delta_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
|
||||||
.await?;
|
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut guard = self.layers.write().await;
|
let mut guard = self.layers.write().await;
|
||||||
|
|||||||
@@ -1104,14 +1104,16 @@ impl Timeline {
|
|||||||
|| contains_hole
|
|| contains_hole
|
||||||
{
|
{
|
||||||
// ... if so, flush previous layer and prepare to write new one
|
// ... if so, flush previous layer and prepare to write new one
|
||||||
new_layers.push(
|
let (desc, path) = writer
|
||||||
writer
|
.take()
|
||||||
.take()
|
.unwrap()
|
||||||
.unwrap()
|
.finish(prev_key.unwrap().next(), ctx)
|
||||||
.finish(prev_key.unwrap().next(), self, ctx)
|
.await
|
||||||
.await
|
.map_err(CompactionError::Other)?;
|
||||||
.map_err(CompactionError::Other)?,
|
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
|
||||||
);
|
.map_err(CompactionError::Other)?;
|
||||||
|
|
||||||
|
new_layers.push(new_delta);
|
||||||
writer = None;
|
writer = None;
|
||||||
|
|
||||||
if contains_hole {
|
if contains_hole {
|
||||||
@@ -1174,12 +1176,13 @@ impl Timeline {
|
|||||||
prev_key = Some(key);
|
prev_key = Some(key);
|
||||||
}
|
}
|
||||||
if let Some(writer) = writer {
|
if let Some(writer) = writer {
|
||||||
new_layers.push(
|
let (desc, path) = writer
|
||||||
writer
|
.finish(prev_key.unwrap().next(), ctx)
|
||||||
.finish(prev_key.unwrap().next(), self, ctx)
|
.await
|
||||||
.await
|
.map_err(CompactionError::Other)?;
|
||||||
.map_err(CompactionError::Other)?,
|
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
|
||||||
);
|
.map_err(CompactionError::Other)?;
|
||||||
|
new_layers.push(new_delta);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync layers
|
// Sync layers
|
||||||
@@ -1966,13 +1969,16 @@ impl Timeline {
|
|||||||
for (key, lsn, val) in deltas {
|
for (key, lsn, val) in deltas {
|
||||||
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
|
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
stats.produce_delta_layer(delta_layer_writer.size());
|
stats.produce_delta_layer(delta_layer_writer.size());
|
||||||
if dry_run {
|
if dry_run {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
let delta_layer = delta_layer_writer
|
|
||||||
.finish(delta_key.key_range.end, tline, ctx)
|
let (desc, path) = delta_layer_writer
|
||||||
|
.finish(delta_key.key_range.end, ctx)
|
||||||
.await?;
|
.await?;
|
||||||
|
let delta_layer = Layer::finish_creating(tline.conf, tline, desc, &path)?;
|
||||||
Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer)))
|
Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer)))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2413,9 +2419,9 @@ impl CompactionJobExecutor for TimelineAdaptor {
|
|||||||
))
|
))
|
||||||
});
|
});
|
||||||
|
|
||||||
let new_delta_layer = writer
|
let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
|
||||||
.finish(prev.unwrap().0.next(), &self.timeline, ctx)
|
let new_delta_layer =
|
||||||
.await?;
|
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
|
||||||
|
|
||||||
self.new_deltas.push(new_delta_layer);
|
self.new_deltas.push(new_delta_layer);
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -488,10 +488,12 @@ async fn copy_lsn_prefix(
|
|||||||
// reuse the key instead of adding more holes between layers by using the real
|
// reuse the key instead of adding more holes between layers by using the real
|
||||||
// highest key in the layer.
|
// highest key in the layer.
|
||||||
let reused_highest_key = layer.layer_desc().key_range.end;
|
let reused_highest_key = layer.layer_desc().key_range.end;
|
||||||
let copied = writer
|
let (desc, path) = writer
|
||||||
.finish(reused_highest_key, target_timeline, ctx)
|
.finish(reused_highest_key, ctx)
|
||||||
.await
|
.await
|
||||||
.map_err(CopyDeltaPrefix)?;
|
.map_err(CopyDeltaPrefix)?;
|
||||||
|
let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
|
||||||
|
.map_err(CopyDeltaPrefix)?;
|
||||||
|
|
||||||
tracing::debug!(%layer, %copied, "new layer produced");
|
tracing::debug!(%layer, %copied, "new layer produced");
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user