From fce68fe84ec0cffb3d415f57157c172d0e8cbe5d Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 2 Aug 2024 14:43:37 +0000 Subject: [PATCH] pageserver: avoid a no-longer-needed sort during ingest --- pageserver/src/pgdatadir_mapping.rs | 25 +++++++++++++------------ pageserver/src/tenant/timeline.rs | 5 ++--- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 85f3a6e0fb..5e3fa7ecb2 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -15,7 +15,6 @@ use crate::{aux_file, repository::*}; use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; use enum_map::Enum; -use itertools::Itertools; use pageserver_api::key::{ dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key, @@ -37,7 +36,6 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, info, trace, warn}; use utils::bin_ser::DeserializeError; use utils::pausable_failpoint; -use utils::vec_map::{VecMap, VecMapOrdering}; use utils::{bin_ser::BeSer, lsn::Lsn}; /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached. @@ -1833,17 +1831,20 @@ impl<'a> DatadirModification<'a> { self.pending_nblocks = 0; if !self.pending_updates.is_empty() { - // The put_batch call below expects expects the inputs to be sorted by Lsn, - // so we do that first. - let lsn_ordered_batch: VecMap = VecMap::from_iter( - self.pending_updates - .drain() - .map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (lsn, (key, val)))) - .kmerge_by(|lhs, rhs| lhs.0 < rhs.0), - VecMapOrdering::GreaterOrEqual, - ); + // Ordering: the items in this batch do not need to be in any global order, but values for + // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on + // this to do efficient updates to its index. + let batch: Vec<(Key, Lsn, Value)> = self + .pending_updates + .drain() + .flat_map(|(key, values)| { + values + .into_iter() + .map(move |(lsn, value)| (key, lsn, value)) + }) + .collect::>(); - writer.put_batch(lsn_ordered_batch, ctx).await?; + writer.put_batch(batch, ctx).await?; } if !self.pending_deletions.is_empty() { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4c1e18b210..9c81154d9b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -47,7 +47,6 @@ use utils::{ bin_ser::BeSer, fs_ext, pausable_failpoint, sync::gate::{Gate, GateGuard}, - vec_map::VecMap, }; use std::sync::atomic::Ordering as AtomicOrdering; @@ -6076,7 +6075,7 @@ impl<'a> TimelineWriter<'a> { /// The batch is sorted by Lsn (enforced by usage of [`utils::vec_map::VecMap`]. pub(crate) async fn put_batch( &mut self, - batch: VecMap, + batch: Vec<(Key, Lsn, Value)>, ctx: &RequestContext, ) -> anyhow::Result<()> { if batch.is_empty() { @@ -6087,7 +6086,7 @@ impl<'a> TimelineWriter<'a> { let mut cursor = std::io::Cursor::new(Vec::::new()); let mut batch_max_lsn: Lsn = Lsn(0); let mut value_buf = smallvec::SmallVec::<[u8; 256]>::new(); - for (lsn, (key, val)) in batch { + for (key, lsn, val) in batch { let relative_off = cursor.position(); value_buf.clear();