This commit is contained in:
John Spray
2024-08-05 17:30:38 +00:00
parent 4bd26c54e5
commit eed100b21e

View File

@@ -608,8 +608,11 @@ impl InMemoryLayer {
};
for (key, lsn, relative_off) in serialized_batch.offsets {
let prefix = key_to_prefix(&key);
let relation_idx = inner.index.entry(prefix).or_default();
let off = base_off + relative_off;
let vec_map = inner.index.entry(key).or_default();
let vec_map = relation_idx.entry(key.field6).or_default();
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
@@ -658,15 +661,15 @@ impl InMemoryLayer {
})
.expect("frozen_local_path_str set only once");
#[cfg(debug_assertions)]
{
let inner = self.inner.write().await;
for vec_map in inner.index.values() {
for (lsn, _pos) in vec_map.as_slice() {
assert!(*lsn < end_lsn);
}
}
}
// #[cfg(debug_assertions)]
// {
// let inner = self.inner.write().await;
// for vec_map in inner.index.values() {
// for (lsn, _pos) in vec_map.as_slice() {
// assert!(*lsn < end_lsn);
// }
// }
// }
}
/// Write this frozen in-memory layer to disk. If `key_range` is set, the delta
@@ -732,16 +735,20 @@ impl InMemoryLayer {
let cursor = inner.file.block_cursor();
for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
let will_init = Value::des(&buf)?.will_init();
let res;
(buf, res) = delta_layer_writer
.put_value_bytes(*key, *lsn, buf, will_init, &ctx)
.await;
res?;
for (key_prefix, inner) in inner.index.iter() {
for (blkno, vec_map) in inner {
let key = materialize_key(key_prefix, *blkno);
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
let will_init = Value::des(&buf)?.will_init();
let res;
(buf, res) = delta_layer_writer
.put_value_bytes(key, *lsn, buf, will_init, &ctx)
.await;
res?;
}
}
}
}
@@ -765,22 +772,25 @@ impl InMemoryLayer {
let mut buf = Vec::new();
for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
// TODO: once we have blob lengths in the in-memory index, we can
// 1. get rid of the blob_io / BlockReaderRef::Slice business and
// 2. load the file contents into a Bytes and
// 3. the use `Bytes::slice` to get the `buf` that is our blob
// 4. pass that `buf` into `put_value_bytes`
// => https://github.com/neondatabase/neon/issues/8183
cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
let will_init = Value::des(&buf)?.will_init();
let res;
(buf, res) = delta_layer_writer
.put_value_bytes(*key, *lsn, buf, will_init, ctx)
.await;
res?;
for (key_prefix, inner) in inner.index.iter() {
for (blkno, vec_map) in inner {
// Write all page versions
let key = materialize_key(key_prefix, *blkno);
for (lsn, pos) in vec_map.as_slice() {
// TODO: once we have blob lengths in the in-memory index, we can
// 1. get rid of the blob_io / BlockReaderRef::Slice business and
// 2. load the file contents into a Bytes and
// 3. the use `Bytes::slice` to get the `buf` that is our blob
// 4. pass that `buf` into `put_value_bytes`
// => https://github.com/neondatabase/neon/issues/8183
cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
let will_init = Value::des(&buf)?.will_init();
let res;
(buf, res) = delta_layer_writer
.put_value_bytes(key, *lsn, buf, will_init, ctx)
.await;
res?;
}
}
}
}