From eed100b21e321fb7833fe8bee5d6dc7e35372d29 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 5 Aug 2024 17:30:38 +0000 Subject: [PATCH] wip --- .../tenant/storage_layer/inmemory_layer.rs | 82 +++++++++++-------- 1 file changed, 46 insertions(+), 36 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index d4d0fdaaf5..a3464b5f02 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -608,8 +608,11 @@ impl InMemoryLayer { }; for (key, lsn, relative_off) in serialized_batch.offsets { + let prefix = key_to_prefix(&key); + let relation_idx = inner.index.entry(prefix).or_default(); + let off = base_off + relative_off; - let vec_map = inner.index.entry(key).or_default(); + let vec_map = relation_idx.entry(key.field6).or_default(); let old = vec_map.append_or_update_last(lsn, off).unwrap().0; if old.is_some() { // We already had an entry for this LSN. That's odd.. @@ -658,15 +661,15 @@ impl InMemoryLayer { }) .expect("frozen_local_path_str set only once"); - #[cfg(debug_assertions)] - { - let inner = self.inner.write().await; - for vec_map in inner.index.values() { - for (lsn, _pos) in vec_map.as_slice() { - assert!(*lsn < end_lsn); - } - } - } + // #[cfg(debug_assertions)] + // { + // let inner = self.inner.write().await; + // for vec_map in inner.index.values() { + // for (lsn, _pos) in vec_map.as_slice() { + // assert!(*lsn < end_lsn); + // } + // } + // } } /// Write this frozen in-memory layer to disk. If `key_range` is set, the delta @@ -732,16 +735,20 @@ impl InMemoryLayer { let cursor = inner.file.block_cursor(); - for (key, vec_map) in inner.index.iter() { - // Write all page versions - for (lsn, pos) in vec_map.as_slice() { - cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?; - let will_init = Value::des(&buf)?.will_init(); - let res; - (buf, res) = delta_layer_writer - .put_value_bytes(*key, *lsn, buf, will_init, &ctx) - .await; - res?; + for (key_prefix, inner) in inner.index.iter() { + for (blkno, vec_map) in inner { + let key = materialize_key(key_prefix, *blkno); + + // Write all page versions + for (lsn, pos) in vec_map.as_slice() { + cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?; + let will_init = Value::des(&buf)?.will_init(); + let res; + (buf, res) = delta_layer_writer + .put_value_bytes(key, *lsn, buf, will_init, &ctx) + .await; + res?; + } } } } @@ -765,22 +772,25 @@ impl InMemoryLayer { let mut buf = Vec::new(); - for (key, vec_map) in inner.index.iter() { - // Write all page versions - for (lsn, pos) in vec_map.as_slice() { - // TODO: once we have blob lengths in the in-memory index, we can - // 1. get rid of the blob_io / BlockReaderRef::Slice business and - // 2. load the file contents into a Bytes and - // 3. the use `Bytes::slice` to get the `buf` that is our blob - // 4. pass that `buf` into `put_value_bytes` - // => https://github.com/neondatabase/neon/issues/8183 - cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?; - let will_init = Value::des(&buf)?.will_init(); - let res; - (buf, res) = delta_layer_writer - .put_value_bytes(*key, *lsn, buf, will_init, ctx) - .await; - res?; + for (key_prefix, inner) in inner.index.iter() { + for (blkno, vec_map) in inner { + // Write all page versions + let key = materialize_key(key_prefix, *blkno); + for (lsn, pos) in vec_map.as_slice() { + // TODO: once we have blob lengths in the in-memory index, we can + // 1. get rid of the blob_io / BlockReaderRef::Slice business and + // 2. load the file contents into a Bytes and + // 3. the use `Bytes::slice` to get the `buf` that is our blob + // 4. pass that `buf` into `put_value_bytes` + // => https://github.com/neondatabase/neon/issues/8183 + cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?; + let will_init = Value::des(&buf)?.will_init(); + let res; + (buf, res) = delta_layer_writer + .put_value_bytes(key, *lsn, buf, will_init, ctx) + .await; + res?; + } } } }