From b1de46c18d341a0dd35c3fdcbcea89947150dd9c Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Wed, 1 Nov 2023 20:50:20 -0400 Subject: [PATCH] wip --- pageserver/benches/bench_writes.rs | 56 +++++++++++++++++- pageserver/src/tenant/storage_layer.rs | 1 + .../tenant/storage_layer/inmemory_layer.rs | 57 +++++++++++++++++++ .../storage_layer/inmemory_layer_raw.rs | 23 ++++++++ 4 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 pageserver/src/tenant/storage_layer/inmemory_layer_raw.rs diff --git a/pageserver/benches/bench_writes.rs b/pageserver/benches/bench_writes.rs index c5de45856f..a94c208355 100644 --- a/pageserver/benches/bench_writes.rs +++ b/pageserver/benches/bench_writes.rs @@ -1,7 +1,61 @@ +use bytes::{Bytes, BytesMut}; +use camino::{Utf8Path, Utf8PathBuf}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use pageserver::{tenant::storage_layer::InMemoryLayer, config::PageServerConf, context::{RequestContext, DownloadBehavior}, task_mgr::TaskKind, repository::Key, virtual_file}; +use pageserver::repository::Value; +use utils::{id::{TimelineId, TenantId}, lsn::Lsn}; fn bench_writes(c: &mut Criterion) { - // TODO setup + // Boilerplate + // TODO this setup can be avoided if I reuse TenantHarness but it's difficult + // because it's only compiled for tests, and it's hacky because tbh we + // shouldn't need this many inputs for a function that just writes bytes + // from memory to disk. Performance-critical functions should be + // self-contained (almost like they're separate libraries) and all the + // monolithic pageserver machinery should live outside. + virtual_file::init(10); + let repo_dir = Utf8PathBuf::from(&"/home/bojan/tmp/repo_dir"); + let conf = PageServerConf::dummy_conf(repo_dir); + let conf: &'static PageServerConf = Box::leak(Box::new(conf)); + let timeline_id = TimelineId::generate(); + let tenant_id = TenantId::generate(); + let start_lsn = Lsn(0); + let ctx = RequestContext::new(TaskKind::LayerFlushTask, DownloadBehavior::Error); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + fn test_img(s: &str) -> Bytes { + let mut buf = BytesMut::new(); + buf.extend_from_slice(s.as_bytes()); + buf.resize(64, 0); + + buf.freeze() + } + + // Make the InMemoryLayer that will be flushed + let layer = rt.block_on(async { + let l = InMemoryLayer::create(&conf, timeline_id, tenant_id, start_lsn).await.unwrap(); + + let mut lsn = Lsn(0x10); + let mut key = Key::from_hex("012222222233333333444444445500000000").unwrap(); + let mut blknum = 0; + for _ in 0..100 { + key.field6 = blknum; + let val = Value::Image(test_img(&format!("{} at {}", blknum, lsn))); + l.put_value(key, lsn, &val, &ctx).await.unwrap(); + + lsn = Lsn(lsn.0 + 0x10); + blknum += 1; + } + l + }); + + rt.block_on(async { + layer.write_to_disk_bench(&ctx).await.unwrap(); + }); + let mut group = c.benchmark_group("g1"); group.bench_function("f1", |b| { diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 3b2a61dcba..61526d0501 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -4,6 +4,7 @@ pub mod delta_layer; mod filename; mod image_layer; mod inmemory_layer; +mod inmemory_layer_raw; mod layer; mod layer_desc; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 1275250bf0..bafbfd8d84 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -367,4 +367,61 @@ impl InMemoryLayer { let delta_layer = delta_layer_writer.finish(Key::MAX, timeline).await?; Ok(delta_layer) } + + /// Write this frozen in-memory layer to disk. + /// + /// Returns a new delta layer with all the same data as this in-memory layer + pub async fn write_to_disk_bench( + &self, + ctx: &RequestContext, + ) -> Result<()> { + // Grab the lock in read-mode. We hold it over the I/O, but because this + // layer is not writeable anymore, no one should be trying to acquire the + // write lock on it, so we shouldn't block anyone. There's one exception + // though: another thread might have grabbed a reference to this layer + // in `get_layer_for_write' just before the checkpointer called + // `freeze`, and then `write_to_disk` on it. When the thread gets the + // lock, it will see that it's not writeable anymore and retry, but it + // would have to wait until we release it. That race condition is very + // rare though, so we just accept the potential latency hit for now. + let inner = self.inner.read().await; + + let end_lsn = *self.end_lsn.get().unwrap(); + + let mut delta_layer_writer = DeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_id, + Key::MIN, + self.start_lsn..end_lsn, + ) + .await?; + + let mut buf = Vec::new(); + + let cursor = inner.file.block_cursor(); + + let mut keys: Vec<(&Key, &VecMap)> = inner.index.iter().collect(); + keys.sort_by_key(|k| k.0); + + let ctx = RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::InMemoryLayer) + .build(); + for (key, vec_map) in keys.iter() { + let key = **key; + // Write all page versions + for (lsn, pos) in vec_map.as_slice() { + cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?; + let will_init = Value::des(&buf)?.will_init(); + delta_layer_writer + .put_value_bytes(key, *lsn, &buf, will_init) + .await?; + } + } + + // MAX is used here because we identify L0 layers by full key range + // TODO XXX do this + // let delta_layer = delta_layer_writer.finish(Key::MAX, timeline).await?; + Ok(()) + } } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer_raw.rs b/pageserver/src/tenant/storage_layer/inmemory_layer_raw.rs new file mode 100644 index 0000000000..02d9886fa2 --- /dev/null +++ b/pageserver/src/tenant/storage_layer/inmemory_layer_raw.rs @@ -0,0 +1,23 @@ + + +pub struct InMemoryLayerRaw { +} + +impl InMemoryLayerRaw { + pub async fn new() -> Self { + Self { + + } + } + + pub async fn put_value( + &self, + key: Key, + lsn: Lsn, + val: &Value, + ctx: &RequestContext, + ) -> Result<()> { + + Ok(()) + } +}