mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
pageserver: batch ephemeral layer writes during ingest
This commit is contained in:
@@ -103,6 +103,19 @@ impl EphemeralFile {
|
||||
|
||||
Ok(pos)
|
||||
}
|
||||
|
||||
pub(crate) async fn write_raw(
|
||||
&mut self,
|
||||
srcbuf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, io::Error> {
|
||||
let pos = self.rw.bytes_written();
|
||||
|
||||
// Write the payload
|
||||
self.rw.write_all_borrowed(srcbuf, ctx).await?;
|
||||
|
||||
Ok(pos)
|
||||
}
|
||||
}
|
||||
|
||||
/// Does the given filename look like an ephemeral file?
|
||||
|
||||
@@ -495,6 +495,43 @@ impl InMemoryLayer {
|
||||
self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn put_batch(
|
||||
&self,
|
||||
buf: Vec<u8>,
|
||||
values: Vec<(Key, Lsn, u64)>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
self.assert_writable();
|
||||
|
||||
let base_off = {
|
||||
inner
|
||||
.file
|
||||
.write_raw(
|
||||
&buf,
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build(),
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
||||
for (key, lsn, relative_off) in values {
|
||||
let off = base_off + relative_off;
|
||||
let vec_map = inner.index.entry(key).or_default();
|
||||
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
|
||||
if old.is_some() {
|
||||
// We already had an entry for this LSN. That's odd..
|
||||
warn!("Key {} at {} already exists", key, lsn);
|
||||
}
|
||||
}
|
||||
|
||||
let size = inner.file.len();
|
||||
inner.resource_units.maybe_publish_size(size);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn put_value_locked(
|
||||
&self,
|
||||
locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
|
||||
|
||||
@@ -50,7 +50,6 @@ use utils::{
|
||||
vec_map::VecMap,
|
||||
};
|
||||
|
||||
use std::pin::pin;
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::{Arc, Mutex, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
@@ -67,6 +66,7 @@ use std::{
|
||||
collections::btree_map::Entry,
|
||||
ops::{Deref, Range},
|
||||
};
|
||||
use std::{io::Write, pin::pin};
|
||||
|
||||
use crate::{
|
||||
aux_file::AuxFileSizeEstimator,
|
||||
@@ -6079,11 +6079,63 @@ impl<'a> TimelineWriter<'a> {
|
||||
batch: VecMap<Lsn, (Key, Value)>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
for (lsn, (key, val)) in batch {
|
||||
self.put(key, lsn, &val, ctx).await?
|
||||
if batch.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
let mut values: Vec<(Key, Lsn, u64)> = Vec::new();
|
||||
let mut cursor = std::io::Cursor::new(Vec::<u8>::new());
|
||||
let mut batch_max_lsn: Lsn = Lsn(0);
|
||||
let mut value_buf = smallvec::SmallVec::<[u8; 256]>::new();
|
||||
for (lsn, (key, val)) in batch {
|
||||
let relative_off = cursor.position();
|
||||
|
||||
value_buf.clear();
|
||||
val.ser_into(&mut value_buf)?;
|
||||
if value_buf.len() < 0x80 {
|
||||
// short one-byte length header
|
||||
let len_buf = [value_buf.len() as u8];
|
||||
|
||||
cursor.write_all(&len_buf)?;
|
||||
} else {
|
||||
let mut len_buf = u32::to_be_bytes(value_buf.len() as u32);
|
||||
len_buf[0] |= 0x80;
|
||||
cursor.write_all(&len_buf)?;
|
||||
}
|
||||
cursor.write_all(&value_buf)?;
|
||||
|
||||
// We can't write straight into the buffer, because the InMemoryLayer file format requires
|
||||
// the size to come before the value. However... we could probably calculate the size before
|
||||
// actually serializing the value
|
||||
//val.ser_into(&mut cursor)?;
|
||||
|
||||
values.push((key, lsn, relative_off));
|
||||
batch_max_lsn = std::cmp::max(batch_max_lsn, lsn);
|
||||
}
|
||||
|
||||
let buf = cursor.into_inner();
|
||||
let buf_size: u64 = buf.len() as u64;
|
||||
|
||||
let action = self.get_open_layer_action(batch_max_lsn, buf_size);
|
||||
let layer = self
|
||||
.handle_open_layer_action(batch_max_lsn, action, ctx)
|
||||
.await?;
|
||||
let res = layer.put_batch(buf, values, ctx).await;
|
||||
|
||||
if res.is_ok() {
|
||||
// Update the current size only when the entire write was ok.
|
||||
// In case of failures, we may have had partial writes which
|
||||
// render the size tracking out of sync. That's ok because
|
||||
// the checkpoint distance should be significantly smaller
|
||||
// than the S3 single shot upload limit of 5GiB.
|
||||
let state = self.write_guard.as_mut().unwrap();
|
||||
|
||||
state.current_size += buf_size;
|
||||
state.prev_lsn = Some(batch_max_lsn);
|
||||
state.max_lsn = std::cmp::max(state.max_lsn, Some(batch_max_lsn));
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_batch(
|
||||
|
||||
Reference in New Issue
Block a user