pageserver: avoid a no-longer-needed sort during ingest

This commit is contained in:
John Spray
2024-08-02 14:43:37 +00:00
parent fd60904376
commit fce68fe84e
2 changed files with 15 additions and 15 deletions

View File

@@ -15,7 +15,6 @@ use crate::{aux_file, repository::*};
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
use pageserver_api::key::{
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
@@ -37,7 +36,6 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::pausable_failpoint;
use utils::vec_map::{VecMap, VecMapOrdering};
use utils::{bin_ser::BeSer, lsn::Lsn};
/// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
@@ -1833,17 +1831,20 @@ impl<'a> DatadirModification<'a> {
self.pending_nblocks = 0;
if !self.pending_updates.is_empty() {
// The put_batch call below expects expects the inputs to be sorted by Lsn,
// so we do that first.
let lsn_ordered_batch: VecMap<Lsn, (Key, Value)> = VecMap::from_iter(
self.pending_updates
.drain()
.map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (lsn, (key, val))))
.kmerge_by(|lhs, rhs| lhs.0 < rhs.0),
VecMapOrdering::GreaterOrEqual,
);
// Ordering: the items in this batch do not need to be in any global order, but values for
// a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
// this to do efficient updates to its index.
let batch: Vec<(Key, Lsn, Value)> = self
.pending_updates
.drain()
.flat_map(|(key, values)| {
values
.into_iter()
.map(move |(lsn, value)| (key, lsn, value))
})
.collect::<Vec<_>>();
writer.put_batch(lsn_ordered_batch, ctx).await?;
writer.put_batch(batch, ctx).await?;
}
if !self.pending_deletions.is_empty() {

View File

@@ -47,7 +47,6 @@ use utils::{
bin_ser::BeSer,
fs_ext, pausable_failpoint,
sync::gate::{Gate, GateGuard},
vec_map::VecMap,
};
use std::sync::atomic::Ordering as AtomicOrdering;
@@ -6076,7 +6075,7 @@ impl<'a> TimelineWriter<'a> {
/// The batch is sorted by Lsn (enforced by usage of [`utils::vec_map::VecMap`].
pub(crate) async fn put_batch(
&mut self,
batch: VecMap<Lsn, (Key, Value)>,
batch: Vec<(Key, Lsn, Value)>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
if batch.is_empty() {
@@ -6087,7 +6086,7 @@ impl<'a> TimelineWriter<'a> {
let mut cursor = std::io::Cursor::new(Vec::<u8>::new());
let mut batch_max_lsn: Lsn = Lsn(0);
let mut value_buf = smallvec::SmallVec::<[u8; 256]>::new();
for (lsn, (key, val)) in batch {
for (key, lsn, val) in batch {
let relative_off = cursor.position();
value_buf.clear();