This commit is contained in:
John Spray
2024-08-05 18:39:26 +00:00
parent 1a99aa4834
commit 5664eadb17
4 changed files with 53 additions and 29 deletions

View File

@@ -99,6 +99,10 @@ impl<K: Ord, V> VecMap<K, V> {
Ok(delta_size)
}
pub fn append_fast(&mut self, key: K, value: V) {
self.data.push((key, value))
}
/// Update the maximum key value pair or add a new key value pair to the map.
/// If `key` is not respective of the `self` ordering no updates or additions
/// will occur and `InvalidKey` error will be returned.

View File

@@ -69,7 +69,7 @@ async fn ingest(
pageserver::context::DownloadBehavior::Download,
);
let batch_pages = 100;
let batch_pages = 10000;
let mut batch_values = vec![];
for i in 0..put_count {
@@ -97,15 +97,16 @@ async fn ingest(
batch_values.push((key, lsn, value.clone()));
if batch_values.len() >= batch_pages {
let batch = SerializedBatch::from_values(vec![(key, lsn, value.clone())]);
let write_batch = std::mem::take(&mut batch_values);
let batch = SerializedBatch::from_values(write_batch);
layer.put_batch(batch, &ctx).await?;
layer.put_batch(&batch, &ctx).await?;
}
}
if !batch_values.is_empty() {
let batch = SerializedBatch::from_values(vec![(key, lsn, value.clone())]);
layer.put_batch(batch, &ctx).await?;
layer.put_batch(&batch, &ctx).await?;
}
layer.freeze(lsn + 1).await;

View File

@@ -42,16 +42,34 @@ use super::{
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
fn materialize_key(prefix: &Key, blkno: u32) -> Key {
let mut key = prefix.clone();
key.field6 = blkno;
key
#[derive(Ord, PartialOrd, Eq, PartialEq)]
struct IndexPrefix {
field1: u8,
field2: u32,
field3: u32,
field4: u32,
field5: u8,
}
fn key_to_prefix(key: &Key) -> Key {
let mut prefix = key.clone();
prefix.field6 = 0;
prefix
fn materialize_key(prefix: &IndexPrefix, blkno: u32) -> Key {
Key {
field1: prefix.field1,
field2: prefix.field2,
field3: prefix.field3,
field4: prefix.field4,
field5: prefix.field5,
field6: blkno,
}
}
fn key_to_prefix(key: &Key) -> IndexPrefix {
IndexPrefix {
field1: key.field1,
field2: key.field2,
field3: key.field3,
field4: key.field4,
field5: key.field5,
}
}
pub struct InMemoryLayer {
@@ -95,7 +113,7 @@ pub struct InMemoryLayerInner {
/// All versions of all pages in the layer are kept here. Indexed
/// by block number and LSN. The value is an offset into the
/// ephemeral file where the page version is stored.
index: BTreeMap<Key, BTreeMap<u32, VecMap<Lsn, u64>>>,
index: BTreeMap<IndexPrefix, BTreeMap<u32, VecMap<Lsn, u64>>>,
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
@@ -490,7 +508,7 @@ impl SerializedBatch {
use std::io::Write;
let mut offsets: Vec<(Key, Lsn, u64)> = Vec::new();
let mut cursor = std::io::Cursor::new(Vec::<u8>::new());
let mut cursor = std::io::Cursor::new(Vec::<u8>::with_capacity(batch.len() * 8192));
let mut max_lsn: Lsn = Lsn(0);
let mut value_buf = smallvec::SmallVec::<[u8; 256]>::new();
for (key, lsn, val) in batch {
@@ -588,11 +606,11 @@ impl InMemoryLayer {
// Write path.
pub async fn put_batch(
&self,
serialized_batch: SerializedBatch,
serialized_batch: &SerializedBatch,
ctx: &RequestContext,
) -> Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
//self.assert_writable();
let base_off = {
inner
@@ -606,17 +624,17 @@ impl InMemoryLayer {
.await?
};
for (key, lsn, relative_off) in serialized_batch.offsets {
for (key, lsn, relative_off) in &serialized_batch.offsets {
let prefix = key_to_prefix(&key);
let relation_idx = inner.index.entry(prefix).or_default();
let relation_idx = match inner.index.get_mut(&prefix) {
Some(i) => i,
None => inner.index.entry(prefix).or_default(),
};
let off = base_off + relative_off;
let vec_map = relation_idx.entry(key.field6).or_default();
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!("Key {} at {} already exists", key, lsn);
}
vec_map.append_fast(*lsn, off);
}
let size = inner.file.len();
@@ -702,11 +720,12 @@ impl InMemoryLayer {
let end_lsn = *self.end_lsn.get().unwrap();
let key_count = if let Some(key_range) = key_range {
inner
.index
.iter()
.filter(|(k, _)| key_range.contains(k))
.count()
panic!("Update for IndexPrefix");
// inner
// .index
// .iter()
// .filter(|(k, _)| key_range.contains(k))
// .count()
} else {
inner.index.len()
};

View File

@@ -6051,7 +6051,7 @@ impl<'a> TimelineWriter<'a> {
.handle_open_layer_action(batch_max_lsn, action, ctx)
.await?;
let res = layer.put_batch(serialized_batch, ctx).await;
let res = layer.put_batch(&serialized_batch, ctx).await;
if res.is_ok() {
// Update the current size only when the entire write was ok.