mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
Closes #9387.
## Problem
`BufferedWriter` cannot proceed while the owned buffer is flushing to
disk. We want to implement double buffering so that the flush can happen
in the background. See #9387.
## Summary of changes
- Maintain two owned buffers in `BufferedWriter`.
- The writer is in charge of copying the data into owned, aligned
buffer, once full, submit it to the flush task.
- The flush background task is in charge of flushing the owned buffer to
disk, and returned the buffer to the writer for reuse.
- The writer and the flush background task communicate through a
bi-directional channel.
For in-memory layer, we also need to be able to read from the buffered
writer in `get_values_reconstruct_data`. To handle this case, we did the
following
- Use replace `VirtualFile::write_all` with `VirtualFile::write_all_at`,
and use `Arc` to share it between writer and background task.
- leverage `IoBufferMut::freeze` to get a cheaply clonable `IoBuffer`,
one clone will be submitted to the channel, the other clone will be
saved within the writer to serve reads. When we want to reuse the
buffer, we can invoke `IoBuffer::into_mut`, which gives us back the
mutable aligned buffer.
- InMemoryLayer reads is now aware of the maybe_flushed part of the
buffer.
**Caveat**
- We removed the owned version of write, because this interface does not
work well with buffer alignment. The result is that without direct IO
enabled,
[`download_object`](a439d57050/pageserver/src/tenant/remote_timeline_client/download.rs (L243))
does one more memcpy than before this PR due to the switch to use
`_borrowed` version of the write.
- "Bypass aligned part of write" could be implemented later to avoid
large amount of memcpy.
**Testing**
- use an oneshot channel based control mechanism to make flush behavior
deterministic in test.
- test reading from `EphemeralFile` when the last submitted buffer is
not flushed, in-progress, and done flushing to disk.
## Performance
We see performance improvement for small values, and regression on big
values, likely due to being CPU bound + disk write latency.
[Results](https://www.notion.so/neondatabase/Benchmarking-New-BufferedWriter-11-20-2024-143f189e0047805ba99acda89f984d51?pvs=4)
## Checklist before requesting a review
- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.
## Checklist before merging
- [ ] Do not forget to reformat commit message to not include the above
checklist
---------
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
257 lines
8.1 KiB
Rust
257 lines
8.1 KiB
Rust
use std::{env, num::NonZeroUsize};
|
|
|
|
use bytes::Bytes;
|
|
use camino::Utf8PathBuf;
|
|
use criterion::{criterion_group, criterion_main, Criterion};
|
|
use pageserver::{
|
|
config::PageServerConf,
|
|
context::{DownloadBehavior, RequestContext},
|
|
l0_flush::{L0FlushConfig, L0FlushGlobalState},
|
|
page_cache,
|
|
task_mgr::TaskKind,
|
|
tenant::storage_layer::InMemoryLayer,
|
|
virtual_file,
|
|
};
|
|
use pageserver_api::{key::Key, shard::TenantShardId, value::Value};
|
|
use utils::{
|
|
bin_ser::BeSer,
|
|
id::{TenantId, TimelineId},
|
|
};
|
|
use wal_decoder::serialized_batch::SerializedValueBatch;
|
|
|
|
// A very cheap hash for generating non-sequential keys.
|
|
fn murmurhash32(mut h: u32) -> u32 {
|
|
h ^= h >> 16;
|
|
h = h.wrapping_mul(0x85ebca6b);
|
|
h ^= h >> 13;
|
|
h = h.wrapping_mul(0xc2b2ae35);
|
|
h ^= h >> 16;
|
|
h
|
|
}
|
|
|
|
enum KeyLayout {
|
|
/// Sequential unique keys
|
|
Sequential,
|
|
/// Random unique keys
|
|
Random,
|
|
/// Random keys, but only use the bits from the mask of them
|
|
RandomReuse(u32),
|
|
}
|
|
|
|
enum WriteDelta {
|
|
Yes,
|
|
No,
|
|
}
|
|
|
|
async fn ingest(
|
|
conf: &'static PageServerConf,
|
|
put_size: usize,
|
|
put_count: usize,
|
|
key_layout: KeyLayout,
|
|
write_delta: WriteDelta,
|
|
) -> anyhow::Result<()> {
|
|
let mut lsn = utils::lsn::Lsn(1000);
|
|
let mut key = Key::from_i128(0x0);
|
|
|
|
let timeline_id = TimelineId::generate();
|
|
let tenant_id = TenantId::generate();
|
|
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
|
|
|
tokio::fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id)).await?;
|
|
|
|
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
|
|
|
let gate = utils::sync::gate::Gate::default();
|
|
|
|
let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &gate, &ctx).await?;
|
|
|
|
let data = Value::Image(Bytes::from(vec![0u8; put_size]));
|
|
let data_ser_size = data.serialized_size().unwrap() as usize;
|
|
let ctx = RequestContext::new(
|
|
pageserver::task_mgr::TaskKind::WalReceiverConnectionHandler,
|
|
pageserver::context::DownloadBehavior::Download,
|
|
);
|
|
|
|
const BATCH_SIZE: usize = 16;
|
|
let mut batch = Vec::new();
|
|
|
|
for i in 0..put_count {
|
|
lsn += put_size as u64;
|
|
|
|
// Generate lots of keys within a single relation, which simulates the typical bulk ingest case: people
|
|
// usually care the most about write performance when they're blasting a huge batch of data into a huge table.
|
|
match key_layout {
|
|
KeyLayout::Sequential => {
|
|
// Use sequential order to illustrate the experience a user is likely to have
|
|
// when ingesting bulk data.
|
|
key.field6 = i as u32;
|
|
}
|
|
KeyLayout::Random => {
|
|
// Use random-order keys to avoid giving a false advantage to data structures that are
|
|
// faster when inserting on the end.
|
|
key.field6 = murmurhash32(i as u32);
|
|
}
|
|
KeyLayout::RandomReuse(mask) => {
|
|
// Use low bits only, to limit cardinality
|
|
key.field6 = murmurhash32(i as u32) & mask;
|
|
}
|
|
}
|
|
|
|
batch.push((key.to_compact(), lsn, data_ser_size, data.clone()));
|
|
if batch.len() >= BATCH_SIZE {
|
|
let this_batch = std::mem::take(&mut batch);
|
|
let serialized = SerializedValueBatch::from_values(this_batch);
|
|
layer.put_batch(serialized, &ctx).await?;
|
|
}
|
|
}
|
|
if !batch.is_empty() {
|
|
let this_batch = std::mem::take(&mut batch);
|
|
let serialized = SerializedValueBatch::from_values(this_batch);
|
|
layer.put_batch(serialized, &ctx).await?;
|
|
}
|
|
layer.freeze(lsn + 1).await;
|
|
|
|
if matches!(write_delta, WriteDelta::Yes) {
|
|
let l0_flush_state = L0FlushGlobalState::new(L0FlushConfig::Direct {
|
|
max_concurrency: NonZeroUsize::new(1).unwrap(),
|
|
});
|
|
let (_desc, path) = layer
|
|
.write_to_disk(&ctx, None, l0_flush_state.inner())
|
|
.await?
|
|
.unwrap();
|
|
tokio::fs::remove_file(path).await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Wrapper to instantiate a tokio runtime
|
|
fn ingest_main(
|
|
conf: &'static PageServerConf,
|
|
put_size: usize,
|
|
put_count: usize,
|
|
key_layout: KeyLayout,
|
|
write_delta: WriteDelta,
|
|
) {
|
|
let runtime = tokio::runtime::Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.unwrap();
|
|
|
|
runtime.block_on(async move {
|
|
let r = ingest(conf, put_size, put_count, key_layout, write_delta).await;
|
|
if let Err(e) = r {
|
|
panic!("{e:?}");
|
|
}
|
|
});
|
|
}
|
|
|
|
/// Declare a series of benchmarks for the Pageserver's ingest write path.
|
|
///
|
|
/// This benchmark does not include WAL decode: it starts at InMemoryLayer::put_value, and ends either
|
|
/// at freezing the ephemeral layer, or writing the ephemeral layer out to an L0 (depending on whether WriteDelta is set).
|
|
///
|
|
/// Genuine disk I/O is used, so expect results to differ depending on storage. However, when running on
|
|
/// a fast disk, CPU is the bottleneck at time of writing.
|
|
fn criterion_benchmark(c: &mut Criterion) {
|
|
let temp_dir_parent: Utf8PathBuf = env::current_dir().unwrap().try_into().unwrap();
|
|
let temp_dir = camino_tempfile::tempdir_in(temp_dir_parent).unwrap();
|
|
eprintln!("Data directory: {}", temp_dir.path());
|
|
|
|
let conf: &'static PageServerConf = Box::leak(Box::new(
|
|
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
|
|
));
|
|
virtual_file::init(
|
|
16384,
|
|
virtual_file::io_engine_for_bench(),
|
|
conf.virtual_file_io_mode,
|
|
virtual_file::SyncMode::Sync,
|
|
);
|
|
page_cache::init(conf.page_cache_size);
|
|
|
|
{
|
|
let mut group = c.benchmark_group("ingest-small-values");
|
|
let put_size = 100usize;
|
|
let put_count = 128 * 1024 * 1024 / put_size;
|
|
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
|
|
group.sample_size(10);
|
|
group.bench_function("ingest 128MB/100b seq", |b| {
|
|
b.iter(|| {
|
|
ingest_main(
|
|
conf,
|
|
put_size,
|
|
put_count,
|
|
KeyLayout::Sequential,
|
|
WriteDelta::Yes,
|
|
)
|
|
})
|
|
});
|
|
group.bench_function("ingest 128MB/100b rand", |b| {
|
|
b.iter(|| {
|
|
ingest_main(
|
|
conf,
|
|
put_size,
|
|
put_count,
|
|
KeyLayout::Random,
|
|
WriteDelta::Yes,
|
|
)
|
|
})
|
|
});
|
|
group.bench_function("ingest 128MB/100b rand-1024keys", |b| {
|
|
b.iter(|| {
|
|
ingest_main(
|
|
conf,
|
|
put_size,
|
|
put_count,
|
|
KeyLayout::RandomReuse(0x3ff),
|
|
WriteDelta::Yes,
|
|
)
|
|
})
|
|
});
|
|
group.bench_function("ingest 128MB/100b seq, no delta", |b| {
|
|
b.iter(|| {
|
|
ingest_main(
|
|
conf,
|
|
put_size,
|
|
put_count,
|
|
KeyLayout::Sequential,
|
|
WriteDelta::No,
|
|
)
|
|
})
|
|
});
|
|
}
|
|
|
|
{
|
|
let mut group = c.benchmark_group("ingest-big-values");
|
|
let put_size = 8192usize;
|
|
let put_count = 128 * 1024 * 1024 / put_size;
|
|
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
|
|
group.sample_size(10);
|
|
group.bench_function("ingest 128MB/8k seq", |b| {
|
|
b.iter(|| {
|
|
ingest_main(
|
|
conf,
|
|
put_size,
|
|
put_count,
|
|
KeyLayout::Sequential,
|
|
WriteDelta::Yes,
|
|
)
|
|
})
|
|
});
|
|
group.bench_function("ingest 128MB/8k seq, no delta", |b| {
|
|
b.iter(|| {
|
|
ingest_main(
|
|
conf,
|
|
put_size,
|
|
put_count,
|
|
KeyLayout::Sequential,
|
|
WriteDelta::No,
|
|
)
|
|
})
|
|
});
|
|
}
|
|
}
|
|
|
|
criterion_group!(benches, criterion_benchmark);
|
|
criterion_main!(benches);
|