pageserver: add CompactKey, use it in InMemoryLayer (#8652)

## Problem

This follows a PR that insists all input keys are representable in 16
bytes:
- https://github.com/neondatabase/neon/pull/8648

& a PR that prevents postgres from sending us keys that use the high
bits of field2:
- https://github.com/neondatabase/neon/pull/8657

Motivation for this change:
1. Ingest is bottlenecked on CPU
2. InMemoryLayer can create huge (~1M value) BTreeMap<Key,_> for its
index.
3. Maps over i128 are much faster than maps over an arbitrary 18 byte
struct.

It may still be worthwhile to make the index two-tier to optimize for
the case where only the last 4 bytes (blkno) of the key vary frequently,
but simply using the i128 representation of keys has a big impact for
very little effort.

Related: #8452 

## Summary of changes

- Introduce `CompactKey` type which contains an i128
- Use this instead of Key in InMemoryLayer's index, converting back and
forth as needed.

## Performance

All the small-value `bench_ingest` cases show improved throughput.

The one that exercises this index most directly shows a 35% throughput
increase:

```
ingest-small-values/ingest 128MB/100b seq, no delta
                        time:   [374.29 ms 378.56 ms 383.38 ms]
                        thrpt:  [333.88 MiB/s 338.13 MiB/s 341.98 MiB/s]
                 change:
                        time:   [-26.993% -26.117% -25.111%] (p = 0.00 < 0.05)
                        thrpt:  [+33.531% +35.349% +36.974%]
                        Performance has improved.
```
This commit is contained in:
John Spray
2024-08-13 11:48:23 +01:00
committed by GitHub
parent d24f1b6c04
commit 3379cbcaa4
4 changed files with 39 additions and 14 deletions

View File

@@ -22,6 +22,11 @@ pub struct Key {
pub field6: u32,
}
/// When working with large numbers of Keys in-memory, it is more efficient to handle them as i128 than as
/// a struct of fields.
#[derive(Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd)]
pub struct CompactKey(i128);
/// The storage key size.
pub const KEY_SIZE: usize = 18;
@@ -130,6 +135,14 @@ impl Key {
}
}
pub fn to_compact(&self) -> CompactKey {
CompactKey(self.to_i128())
}
pub fn from_compact(k: CompactKey) -> Self {
Self::from_i128(k.0)
}
pub const fn next(&self) -> Key {
self.add(1)
}
@@ -199,6 +212,13 @@ impl fmt::Display for Key {
}
}
impl fmt::Display for CompactKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let k = Key::from_compact(*self);
k.fmt(f)
}
}
impl Key {
pub const MIN: Key = Key {
field1: u8::MIN,

View File

@@ -95,7 +95,7 @@ async fn ingest(
}
}
layer.put_value(key, lsn, &data, &ctx).await?;
layer.put_value(key.to_compact(), lsn, &data, &ctx).await?;
}
layer.freeze(lsn + 1).await;

View File

@@ -15,6 +15,7 @@ use crate::tenant::PageReconstructError;
use crate::{l0_flush, page_cache, walrecord};
use anyhow::{anyhow, Result};
use camino::Utf8PathBuf;
use pageserver_api::key::CompactKey;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
@@ -78,7 +79,7 @@ pub struct InMemoryLayerInner {
/// All versions of all pages in the layer are kept here. Indexed
/// by block number and LSN. The value is an offset into the
/// ephemeral file where the page version is stored.
index: BTreeMap<Key, VecMap<Lsn, u64>>,
index: BTreeMap<CompactKey, VecMap<Lsn, u64>>,
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
@@ -312,8 +313,12 @@ impl InMemoryLayer {
let reader = inner.file.block_cursor();
for range in keyspace.ranges.iter() {
for (key, vec_map) in inner.index.range(range.start..range.end) {
let lsn_range = match reconstruct_state.get_cached_lsn(key) {
for (key, vec_map) in inner
.index
.range(range.start.to_compact()..range.end.to_compact())
{
let key = Key::from_compact(*key);
let lsn_range = match reconstruct_state.get_cached_lsn(&key) {
Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
None => self.start_lsn..end_lsn,
};
@@ -324,20 +329,18 @@ impl InMemoryLayer {
// TODO: this uses the page cache => https://github.com/neondatabase/neon/issues/8183
let buf = reader.read_blob(*pos, &ctx).await;
if let Err(e) = buf {
reconstruct_state
.on_key_error(*key, PageReconstructError::from(anyhow!(e)));
reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
break;
}
let value = Value::des(&buf.unwrap());
if let Err(e) = value {
reconstruct_state
.on_key_error(*key, PageReconstructError::from(anyhow!(e)));
reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
break;
}
let key_situation =
reconstruct_state.update_key(key, *entry_lsn, value.unwrap());
reconstruct_state.update_key(&key, *entry_lsn, value.unwrap());
if key_situation == ValueReconstructSituation::Complete {
break;
}
@@ -417,7 +420,7 @@ impl InMemoryLayer {
/// Adds the page version to the in-memory tree
pub async fn put_value(
&self,
key: Key,
key: CompactKey,
lsn: Lsn,
buf: &[u8],
ctx: &RequestContext,
@@ -430,7 +433,7 @@ impl InMemoryLayer {
async fn put_value_locked(
&self,
locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
key: Key,
key: CompactKey,
lsn: Lsn,
buf: &[u8],
ctx: &RequestContext,
@@ -539,6 +542,8 @@ impl InMemoryLayer {
let end_lsn = *self.end_lsn.get().unwrap();
let key_count = if let Some(key_range) = key_range {
let key_range = key_range.start.to_compact()..key_range.end.to_compact();
inner
.index
.iter()
@@ -578,7 +583,7 @@ impl InMemoryLayer {
let will_init = Value::des(&buf)?.will_init();
let res;
(buf, res) = delta_layer_writer
.put_value_bytes(*key, *lsn, buf, will_init, &ctx)
.put_value_bytes(Key::from_compact(*key), *lsn, buf, will_init, &ctx)
.await;
res?;
}
@@ -617,7 +622,7 @@ impl InMemoryLayer {
let will_init = Value::des(&buf)?.will_init();
let res;
(buf, res) = delta_layer_writer
.put_value_bytes(*key, *lsn, buf, will_init, ctx)
.put_value_bytes(Key::from_compact(*key), *lsn, buf, will_init, ctx)
.await;
res?;
}

View File

@@ -5553,7 +5553,7 @@ impl<'a> TimelineWriter<'a> {
let action = self.get_open_layer_action(lsn, buf_size);
let layer = self.handle_open_layer_action(lsn, action, ctx).await?;
let res = layer.put_value(key, lsn, &buf, ctx).await;
let res = layer.put_value(key.to_compact(), lsn, &buf, ctx).await;
if res.is_ok() {
// Update the current size only when the entire write was ok.