mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
wip
This commit is contained in:
@@ -1,7 +1,61 @@
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
||||
use pageserver::{tenant::storage_layer::InMemoryLayer, config::PageServerConf, context::{RequestContext, DownloadBehavior}, task_mgr::TaskKind, repository::Key, virtual_file};
|
||||
use pageserver::repository::Value;
|
||||
use utils::{id::{TimelineId, TenantId}, lsn::Lsn};
|
||||
|
||||
fn bench_writes(c: &mut Criterion) {
|
||||
// TODO setup
|
||||
// Boilerplate
|
||||
// TODO this setup can be avoided if I reuse TenantHarness but it's difficult
|
||||
// because it's only compiled for tests, and it's hacky because tbh we
|
||||
// shouldn't need this many inputs for a function that just writes bytes
|
||||
// from memory to disk. Performance-critical functions should be
|
||||
// self-contained (almost like they're separate libraries) and all the
|
||||
// monolithic pageserver machinery should live outside.
|
||||
virtual_file::init(10);
|
||||
let repo_dir = Utf8PathBuf::from(&"/home/bojan/tmp/repo_dir");
|
||||
let conf = PageServerConf::dummy_conf(repo_dir);
|
||||
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
|
||||
let timeline_id = TimelineId::generate();
|
||||
let tenant_id = TenantId::generate();
|
||||
let start_lsn = Lsn(0);
|
||||
let ctx = RequestContext::new(TaskKind::LayerFlushTask, DownloadBehavior::Error);
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
fn test_img(s: &str) -> Bytes {
|
||||
let mut buf = BytesMut::new();
|
||||
buf.extend_from_slice(s.as_bytes());
|
||||
buf.resize(64, 0);
|
||||
|
||||
buf.freeze()
|
||||
}
|
||||
|
||||
// Make the InMemoryLayer that will be flushed
|
||||
let layer = rt.block_on(async {
|
||||
let l = InMemoryLayer::create(&conf, timeline_id, tenant_id, start_lsn).await.unwrap();
|
||||
|
||||
let mut lsn = Lsn(0x10);
|
||||
let mut key = Key::from_hex("012222222233333333444444445500000000").unwrap();
|
||||
let mut blknum = 0;
|
||||
for _ in 0..100 {
|
||||
key.field6 = blknum;
|
||||
let val = Value::Image(test_img(&format!("{} at {}", blknum, lsn)));
|
||||
l.put_value(key, lsn, &val, &ctx).await.unwrap();
|
||||
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
blknum += 1;
|
||||
}
|
||||
l
|
||||
});
|
||||
|
||||
rt.block_on(async {
|
||||
layer.write_to_disk_bench(&ctx).await.unwrap();
|
||||
});
|
||||
|
||||
|
||||
let mut group = c.benchmark_group("g1");
|
||||
group.bench_function("f1", |b| {
|
||||
|
||||
@@ -4,6 +4,7 @@ pub mod delta_layer;
|
||||
mod filename;
|
||||
mod image_layer;
|
||||
mod inmemory_layer;
|
||||
mod inmemory_layer_raw;
|
||||
mod layer;
|
||||
mod layer_desc;
|
||||
|
||||
|
||||
@@ -367,4 +367,61 @@ impl InMemoryLayer {
|
||||
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline).await?;
|
||||
Ok(delta_layer)
|
||||
}
|
||||
|
||||
/// Write this frozen in-memory layer to disk.
|
||||
///
|
||||
/// Returns a new delta layer with all the same data as this in-memory layer
|
||||
pub async fn write_to_disk_bench(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
||||
// layer is not writeable anymore, no one should be trying to acquire the
|
||||
// write lock on it, so we shouldn't block anyone. There's one exception
|
||||
// though: another thread might have grabbed a reference to this layer
|
||||
// in `get_layer_for_write' just before the checkpointer called
|
||||
// `freeze`, and then `write_to_disk` on it. When the thread gets the
|
||||
// lock, it will see that it's not writeable anymore and retry, but it
|
||||
// would have to wait until we release it. That race condition is very
|
||||
// rare though, so we just accept the potential latency hit for now.
|
||||
let inner = self.inner.read().await;
|
||||
|
||||
let end_lsn = *self.end_lsn.get().unwrap();
|
||||
|
||||
let mut delta_layer_writer = DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_id,
|
||||
Key::MIN,
|
||||
self.start_lsn..end_lsn,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut buf = Vec::new();
|
||||
|
||||
let cursor = inner.file.block_cursor();
|
||||
|
||||
let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
|
||||
keys.sort_by_key(|k| k.0);
|
||||
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build();
|
||||
for (key, vec_map) in keys.iter() {
|
||||
let key = **key;
|
||||
// Write all page versions
|
||||
for (lsn, pos) in vec_map.as_slice() {
|
||||
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
|
||||
let will_init = Value::des(&buf)?.will_init();
|
||||
delta_layer_writer
|
||||
.put_value_bytes(key, *lsn, &buf, will_init)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// MAX is used here because we identify L0 layers by full key range
|
||||
// TODO XXX do this
|
||||
// let delta_layer = delta_layer_writer.finish(Key::MAX, timeline).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
23
pageserver/src/tenant/storage_layer/inmemory_layer_raw.rs
Normal file
23
pageserver/src/tenant/storage_layer/inmemory_layer_raw.rs
Normal file
@@ -0,0 +1,23 @@
|
||||
|
||||
|
||||
pub struct InMemoryLayerRaw {
|
||||
}
|
||||
|
||||
impl InMemoryLayerRaw {
|
||||
pub async fn new() -> Self {
|
||||
Self {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn put_value(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
val: &Value,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user