From fd60904376e17ccc0d4dc40d9a2a221e3c07aab7 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 2 Aug 2024 17:12:51 +0000 Subject: [PATCH] pageserver: batch ephemeral layer writes during ingest --- pageserver/src/tenant/ephemeral_file.rs | 13 ++++ .../tenant/storage_layer/inmemory_layer.rs | 37 ++++++++++++ pageserver/src/tenant/timeline.rs | 60 +++++++++++++++++-- 3 files changed, 106 insertions(+), 4 deletions(-) diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index bb65ae24fc..a727c148e0 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -103,6 +103,19 @@ impl EphemeralFile { Ok(pos) } + + pub(crate) async fn write_raw( + &mut self, + srcbuf: &[u8], + ctx: &RequestContext, + ) -> Result { + let pos = self.rw.bytes_written(); + + // Write the payload + self.rw.write_all_borrowed(srcbuf, ctx).await?; + + Ok(pos) + } } /// Does the given filename look like an ephemeral file? diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index d5fa51d93b..a600ed1251 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -495,6 +495,43 @@ impl InMemoryLayer { self.put_value_locked(&mut inner, key, lsn, buf, ctx).await } + pub(crate) async fn put_batch( + &self, + buf: Vec, + values: Vec<(Key, Lsn, u64)>, + ctx: &RequestContext, + ) -> Result<()> { + let mut inner = self.inner.write().await; + self.assert_writable(); + + let base_off = { + inner + .file + .write_raw( + &buf, + &RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::InMemoryLayer) + .build(), + ) + .await? + }; + + for (key, lsn, relative_off) in values { + let off = base_off + relative_off; + let vec_map = inner.index.entry(key).or_default(); + let old = vec_map.append_or_update_last(lsn, off).unwrap().0; + if old.is_some() { + // We already had an entry for this LSN. That's odd.. + warn!("Key {} at {} already exists", key, lsn); + } + } + + let size = inner.file.len(); + inner.resource_units.maybe_publish_size(size); + + Ok(()) + } + async fn put_value_locked( &self, locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 490e8e6011..4c1e18b210 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -50,7 +50,6 @@ use utils::{ vec_map::VecMap, }; -use std::pin::pin; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::{Arc, Mutex, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; @@ -67,6 +66,7 @@ use std::{ collections::btree_map::Entry, ops::{Deref, Range}, }; +use std::{io::Write, pin::pin}; use crate::{ aux_file::AuxFileSizeEstimator, @@ -6079,11 +6079,63 @@ impl<'a> TimelineWriter<'a> { batch: VecMap, ctx: &RequestContext, ) -> anyhow::Result<()> { - for (lsn, (key, val)) in batch { - self.put(key, lsn, &val, ctx).await? + if batch.is_empty() { + return Ok(()); } - Ok(()) + let mut values: Vec<(Key, Lsn, u64)> = Vec::new(); + let mut cursor = std::io::Cursor::new(Vec::::new()); + let mut batch_max_lsn: Lsn = Lsn(0); + let mut value_buf = smallvec::SmallVec::<[u8; 256]>::new(); + for (lsn, (key, val)) in batch { + let relative_off = cursor.position(); + + value_buf.clear(); + val.ser_into(&mut value_buf)?; + if value_buf.len() < 0x80 { + // short one-byte length header + let len_buf = [value_buf.len() as u8]; + + cursor.write_all(&len_buf)?; + } else { + let mut len_buf = u32::to_be_bytes(value_buf.len() as u32); + len_buf[0] |= 0x80; + cursor.write_all(&len_buf)?; + } + cursor.write_all(&value_buf)?; + + // We can't write straight into the buffer, because the InMemoryLayer file format requires + // the size to come before the value. However... we could probably calculate the size before + // actually serializing the value + //val.ser_into(&mut cursor)?; + + values.push((key, lsn, relative_off)); + batch_max_lsn = std::cmp::max(batch_max_lsn, lsn); + } + + let buf = cursor.into_inner(); + let buf_size: u64 = buf.len() as u64; + + let action = self.get_open_layer_action(batch_max_lsn, buf_size); + let layer = self + .handle_open_layer_action(batch_max_lsn, action, ctx) + .await?; + let res = layer.put_batch(buf, values, ctx).await; + + if res.is_ok() { + // Update the current size only when the entire write was ok. + // In case of failures, we may have had partial writes which + // render the size tracking out of sync. That's ok because + // the checkpoint distance should be significantly smaller + // than the S3 single shot upload limit of 5GiB. + let state = self.write_guard.as_mut().unwrap(); + + state.current_size += buf_size; + state.prev_lsn = Some(batch_max_lsn); + state.max_lsn = std::cmp::max(state.max_lsn, Some(batch_max_lsn)); + } + + res } pub(crate) async fn delete_batch(