mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
Remove unused singular puts
This commit is contained in:
@@ -1791,11 +1791,12 @@ impl<'a> DatadirModification<'a> {
|
||||
// Flush relation and SLRU data blocks, keep metadata.
|
||||
let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
|
||||
for (key, values) in self.pending_updates.drain() {
|
||||
let mut write_batch = Vec::new();
|
||||
for (lsn, value) in values {
|
||||
if key.is_rel_block_key() || key.is_slru_block_key() {
|
||||
// This bails out on first error without modifying pending_updates.
|
||||
// That's Ok, cf this function's doc comment.
|
||||
writer.put(key, lsn, &value, ctx).await?;
|
||||
write_batch.push((key, lsn, value));
|
||||
} else {
|
||||
retained_pending_updates
|
||||
.entry(key)
|
||||
@@ -1803,6 +1804,7 @@ impl<'a> DatadirModification<'a> {
|
||||
.push((lsn, value));
|
||||
}
|
||||
}
|
||||
writer.put_batch(write_batch, ctx).await?;
|
||||
}
|
||||
|
||||
self.pending_updates = retained_pending_updates;
|
||||
|
||||
@@ -79,6 +79,7 @@ impl EphemeralFile {
|
||||
self.rw.read_blk(blknum, ctx).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn write_blob(
|
||||
&mut self,
|
||||
srcbuf: &[u8],
|
||||
|
||||
@@ -32,7 +32,7 @@ use std::fmt::Write;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::atomic::{AtomicU64, AtomicUsize};
|
||||
use tokio::sync::{RwLock, RwLockWriteGuard};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::{
|
||||
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValueReconstructState,
|
||||
@@ -479,22 +479,8 @@ impl InMemoryLayer {
|
||||
})
|
||||
}
|
||||
|
||||
// Write operations
|
||||
|
||||
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
|
||||
/// Adds the page version to the in-memory tree
|
||||
pub async fn put_value(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
buf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
self.assert_writable();
|
||||
self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
|
||||
}
|
||||
|
||||
// Write path.
|
||||
pub(crate) async fn put_batch(
|
||||
&self,
|
||||
buf: Vec<u8>,
|
||||
@@ -532,41 +518,6 @@ impl InMemoryLayer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn put_value_locked(
|
||||
&self,
|
||||
locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
buf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
|
||||
|
||||
let off = {
|
||||
locked_inner
|
||||
.file
|
||||
.write_blob(
|
||||
buf,
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build(),
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
||||
let vec_map = locked_inner.index.entry(key).or_default();
|
||||
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
|
||||
if old.is_some() {
|
||||
// We already had an entry for this LSN. That's odd..
|
||||
warn!("Key {} at {} already exists", key, lsn);
|
||||
}
|
||||
|
||||
let size = locked_inner.file.len();
|
||||
locked_inner.resource_units.maybe_publish_size(size);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn get_opened_at(&self) -> Instant {
|
||||
self.opened_at
|
||||
}
|
||||
|
||||
@@ -5931,44 +5931,6 @@ enum OpenLayerAction {
|
||||
}
|
||||
|
||||
impl<'a> TimelineWriter<'a> {
|
||||
/// Put a new page version that can be constructed from a WAL record
|
||||
///
|
||||
/// This will implicitly extend the relation, if the page is beyond the
|
||||
/// current end-of-file.
|
||||
pub(crate) async fn put(
|
||||
&mut self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
value: &Value,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// Avoid doing allocations for "small" values.
|
||||
// In the regression test suite, the limit of 256 avoided allocations in 95% of cases:
|
||||
// https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061
|
||||
let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
|
||||
value.ser_into(&mut buf)?;
|
||||
let buf_size: u64 = buf.len().try_into().expect("oversized value buf");
|
||||
|
||||
let action = self.get_open_layer_action(lsn, buf_size);
|
||||
let layer = self.handle_open_layer_action(lsn, action, ctx).await?;
|
||||
let res = layer.put_value(key, lsn, &buf, ctx).await;
|
||||
|
||||
if res.is_ok() {
|
||||
// Update the current size only when the entire write was ok.
|
||||
// In case of failures, we may have had partial writes which
|
||||
// render the size tracking out of sync. That's ok because
|
||||
// the checkpoint distance should be significantly smaller
|
||||
// than the S3 single shot upload limit of 5GiB.
|
||||
let state = self.write_guard.as_mut().unwrap();
|
||||
|
||||
state.current_size += buf_size;
|
||||
state.prev_lsn = Some(lsn);
|
||||
state.max_lsn = std::cmp::max(state.max_lsn, Some(lsn));
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
async fn handle_open_layer_action(
|
||||
&mut self,
|
||||
at: Lsn,
|
||||
@@ -6071,8 +6033,6 @@ impl<'a> TimelineWriter<'a> {
|
||||
}
|
||||
|
||||
/// Put a batch of keys at the specified Lsns.
|
||||
///
|
||||
/// The batch is sorted by Lsn (enforced by usage of [`utils::vec_map::VecMap`].
|
||||
pub(crate) async fn put_batch(
|
||||
&mut self,
|
||||
batch: Vec<(Key, Lsn, Value)>,
|
||||
@@ -6137,6 +6097,18 @@ impl<'a> TimelineWriter<'a> {
|
||||
res
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
/// Test helper, for tests that would like to poke individual values without composing a batch
|
||||
pub(crate) async fn put(
|
||||
&mut self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
value: &Value,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
self.put_batch(vec![(key, lsn, value.clone())], ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_batch(
|
||||
&mut self,
|
||||
batch: &[(Range<Key>, Lsn)],
|
||||
|
||||
Reference in New Issue
Block a user