diff --git a/libs/utils/src/vec_map.rs b/libs/utils/src/vec_map.rs index e8b0887473..10990590b2 100644 --- a/libs/utils/src/vec_map.rs +++ b/libs/utils/src/vec_map.rs @@ -99,6 +99,10 @@ impl VecMap { Ok(delta_size) } + pub fn append_fast(&mut self, key: K, value: V) { + self.data.push((key, value)) + } + /// Update the maximum key value pair or add a new key value pair to the map. /// If `key` is not respective of the `self` ordering no updates or additions /// will occur and `InvalidKey` error will be returned. diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index bd0a4be394..b27d4b40ec 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -69,7 +69,7 @@ async fn ingest( pageserver::context::DownloadBehavior::Download, ); - let batch_pages = 100; + let batch_pages = 10000; let mut batch_values = vec![]; for i in 0..put_count { @@ -97,15 +97,16 @@ async fn ingest( batch_values.push((key, lsn, value.clone())); if batch_values.len() >= batch_pages { - let batch = SerializedBatch::from_values(vec![(key, lsn, value.clone())]); + let write_batch = std::mem::take(&mut batch_values); + let batch = SerializedBatch::from_values(write_batch); - layer.put_batch(batch, &ctx).await?; + layer.put_batch(&batch, &ctx).await?; } } if !batch_values.is_empty() { let batch = SerializedBatch::from_values(vec![(key, lsn, value.clone())]); - layer.put_batch(batch, &ctx).await?; + layer.put_batch(&batch, &ctx).await?; } layer.freeze(lsn + 1).await; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index c6361698a9..5dcd981afe 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -42,16 +42,34 @@ use super::{ #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] pub(crate) struct InMemoryLayerFileId(page_cache::FileId); -fn materialize_key(prefix: &Key, blkno: u32) -> Key { - let mut key = prefix.clone(); - key.field6 = blkno; - key +#[derive(Ord, PartialOrd, Eq, PartialEq)] +struct IndexPrefix { + field1: u8, + field2: u32, + field3: u32, + field4: u32, + field5: u8, } -fn key_to_prefix(key: &Key) -> Key { - let mut prefix = key.clone(); - prefix.field6 = 0; - prefix +fn materialize_key(prefix: &IndexPrefix, blkno: u32) -> Key { + Key { + field1: prefix.field1, + field2: prefix.field2, + field3: prefix.field3, + field4: prefix.field4, + field5: prefix.field5, + field6: blkno, + } +} + +fn key_to_prefix(key: &Key) -> IndexPrefix { + IndexPrefix { + field1: key.field1, + field2: key.field2, + field3: key.field3, + field4: key.field4, + field5: key.field5, + } } pub struct InMemoryLayer { @@ -95,7 +113,7 @@ pub struct InMemoryLayerInner { /// All versions of all pages in the layer are kept here. Indexed /// by block number and LSN. The value is an offset into the /// ephemeral file where the page version is stored. - index: BTreeMap>>, + index: BTreeMap>>, /// The values are stored in a serialized format in this file. /// Each serialized Value is preceded by a 'u32' length field. @@ -490,7 +508,7 @@ impl SerializedBatch { use std::io::Write; let mut offsets: Vec<(Key, Lsn, u64)> = Vec::new(); - let mut cursor = std::io::Cursor::new(Vec::::new()); + let mut cursor = std::io::Cursor::new(Vec::::with_capacity(batch.len() * 8192)); let mut max_lsn: Lsn = Lsn(0); let mut value_buf = smallvec::SmallVec::<[u8; 256]>::new(); for (key, lsn, val) in batch { @@ -588,11 +606,11 @@ impl InMemoryLayer { // Write path. pub async fn put_batch( &self, - serialized_batch: SerializedBatch, + serialized_batch: &SerializedBatch, ctx: &RequestContext, ) -> Result<()> { let mut inner = self.inner.write().await; - self.assert_writable(); + //self.assert_writable(); let base_off = { inner @@ -606,17 +624,17 @@ impl InMemoryLayer { .await? }; - for (key, lsn, relative_off) in serialized_batch.offsets { + for (key, lsn, relative_off) in &serialized_batch.offsets { let prefix = key_to_prefix(&key); - let relation_idx = inner.index.entry(prefix).or_default(); + + let relation_idx = match inner.index.get_mut(&prefix) { + Some(i) => i, + None => inner.index.entry(prefix).or_default(), + }; let off = base_off + relative_off; let vec_map = relation_idx.entry(key.field6).or_default(); - let old = vec_map.append_or_update_last(lsn, off).unwrap().0; - if old.is_some() { - // We already had an entry for this LSN. That's odd.. - warn!("Key {} at {} already exists", key, lsn); - } + vec_map.append_fast(*lsn, off); } let size = inner.file.len(); @@ -702,11 +720,12 @@ impl InMemoryLayer { let end_lsn = *self.end_lsn.get().unwrap(); let key_count = if let Some(key_range) = key_range { - inner - .index - .iter() - .filter(|(k, _)| key_range.contains(k)) - .count() + panic!("Update for IndexPrefix"); + // inner + // .index + // .iter() + // .filter(|(k, _)| key_range.contains(k)) + // .count() } else { inner.index.len() }; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 3cec869c77..68d5ef0ab6 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -6051,7 +6051,7 @@ impl<'a> TimelineWriter<'a> { .handle_open_layer_action(batch_max_lsn, action, ctx) .await?; - let res = layer.put_batch(serialized_batch, ctx).await; + let res = layer.put_batch(&serialized_batch, ctx).await; if res.is_ok() { // Update the current size only when the entire write was ok.