Compare commits

...

16 Commits

Author SHA1 Message Date
John Spray
12ca2550ba big batches, doesn't change much 2024-07-21 17:40:37 +01:00
John Spray
a56bef7d9c Set MAX_SEND_SIZE to 1MB 2024-07-20 23:24:34 +01:00
John Spray
6162506162 Set default batch size to 1000 2024-07-20 23:09:36 +01:00
John Spray
b182666712 Avoid querying the same relation's size repeatedly in one
DatadirModification
2024-07-20 23:03:31 +01:00
John Spray
b0147d96ea utils: use SmallVec in VecMap 2024-07-20 20:55:13 +01:00
John Spray
c35c420edc downgrade exhaustive assertion to debug 2024-07-20 20:52:58 +01:00
John Spray
f534793707 f optimize 2024-07-20 20:51:43 +01:00
John Spray
a9cff1e881 speculative maybe-optimization for Key::partial_cmp 2024-07-20 20:51:30 +01:00
John Spray
2afcc58ce0 faster vecmap appends 2024-07-20 20:27:31 +01:00
John Spray
6a267cf9e3 avoid some allocs 2024-07-20 20:21:31 +01:00
John Spray
081161060e 4MB write buffer 2024-07-20 20:10:32 +01:00
John Spray
2ce975e405 better benchmark 2024-07-20 20:03:29 +01:00
John Spray
775a4958d7 moar CPU efficiency 2024-07-20 20:03:29 +01:00
John Spray
e37236fd56 moar cpu efficiency 2024-07-20 18:22:29 +01:00
John Spray
425c0c314f pageserver: CPU efficiency in put_batch 2024-07-20 18:22:29 +01:00
John Spray
201f1b5948 tests: reinstate test_bulk_insert 2024-07-20 18:18:10 +01:00
12 changed files with 195 additions and 27 deletions

1
Cargo.lock generated
View File

@@ -6850,6 +6850,7 @@ dependencies = [
"serde_path_to_error",
"serde_with",
"signal-hook",
"smallvec",
"strum",
"strum_macros",
"thiserror",

View File

@@ -12,7 +12,7 @@ use crate::reltag::{BlockNumber, RelTag, SlruKind};
///
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
/// for what we actually store in these fields.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, Ord, Serialize, Deserialize)]
pub struct Key {
pub field1: u8,
pub field2: u32,
@@ -22,6 +22,41 @@ pub struct Key {
pub field6: u32,
}
impl PartialOrd for Key {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
if self.field1 == other.field1
&& self.field2 == other.field2
&& self.field3 == other.field3
&& self.field4 == other.field4
&& self.field5 == other.field5
{
self.field6.partial_cmp(&other.field6)
} else {
match self.field1.partial_cmp(&other.field1) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
match self.field2.partial_cmp(&other.field2) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
match self.field3.partial_cmp(&other.field3) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
match self.field4.partial_cmp(&other.field4) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
match self.field5.partial_cmp(&other.field5) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
self.field6.partial_cmp(&other.field6)
}
}
}
/// The storage key size.
pub const KEY_SIZE: usize = 18;

View File

@@ -132,7 +132,7 @@ pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
pub const XLOG_BLCKSZ: usize = 8192;
pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 128;
// Export some version independent functions that are used outside of this mod
pub use v14::xlog_utils::encode_logical_message;

View File

@@ -36,6 +36,7 @@ routerify.workspace = true
serde.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
smallvec.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-tar.workspace = true

View File

@@ -1,11 +1,15 @@
use std::{alloc::Layout, cmp::Ordering, ops::RangeBounds};
use smallvec::SmallVec;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum VecMapOrdering {
Greater,
GreaterOrEqual,
}
const INLINE_ELEMENTS: usize = 1;
/// Ordered map datastructure implemented in a Vec.
/// Append only - can only add keys that are larger than the
/// current max key.
@@ -13,7 +17,7 @@ pub enum VecMapOrdering {
/// during `VecMap` construction.
#[derive(Clone, Debug)]
pub struct VecMap<K, V> {
data: Vec<(K, V)>,
data: SmallVec<[(K, V); INLINE_ELEMENTS]>,
ordering: VecMapOrdering,
}
@@ -37,14 +41,18 @@ pub enum VecMapError {
impl<K: Ord, V> VecMap<K, V> {
pub fn new(ordering: VecMapOrdering) -> Self {
Self {
data: Vec::new(),
data: Default::default(),
ordering,
}
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn with_capacity(capacity: usize, ordering: VecMapOrdering) -> Self {
Self {
data: Vec::with_capacity(capacity),
data: SmallVec::with_capacity(capacity),
ordering,
}
}
@@ -119,6 +127,11 @@ impl<K: Ord, V> VecMap<K, V> {
Ok((None, delta_size))
}
/// Where the key is known to be unique, and we don't want any instrumentation
pub fn append2(&mut self, key: K, value: V) {
self.data.push((key, value));
}
/// Split the map into two.
///
/// The left map contains everything before `cutoff` (exclusive).
@@ -135,11 +148,11 @@ impl<K: Ord, V> VecMap<K, V> {
(
VecMap {
data: self.data[..split_idx].to_vec(),
data: SmallVec::from(&self.data[..split_idx]),
ordering: self.ordering,
},
VecMap {
data: self.data[split_idx..].to_vec(),
data: SmallVec::from(&self.data[split_idx..]),
ordering: self.ordering,
},
)
@@ -186,7 +199,10 @@ impl<K: Ord, V> VecMap<K, V> {
/// Instrument an operation on the underlying [`Vec`].
/// Will panic if the operation decreases capacity.
/// Returns the increase in memory usage caused by the op.
fn instrument_vec_op(&mut self, op: impl FnOnce(&mut Vec<(K, V)>)) -> usize {
fn instrument_vec_op(
&mut self,
op: impl FnOnce(&mut SmallVec<[(K, V); INLINE_ELEMENTS]>),
) -> usize {
let old_cap = self.data.capacity();
op(&mut self.data);
let new_cap = self.data.capacity();
@@ -226,7 +242,7 @@ impl<K: Ord, V> VecMap<K, V> {
impl<K: Ord, V> IntoIterator for VecMap<K, V> {
type Item = (K, V);
type IntoIter = std::vec::IntoIter<(K, V)>;
type IntoIter = smallvec::IntoIter<[(K, V); INLINE_ELEMENTS]>;
fn into_iter(self) -> Self::IntoIter {
self.data.into_iter()

View File

@@ -174,6 +174,7 @@ impl Timeline {
pending_deletions: Vec::new(),
pending_nblocks: 0,
pending_directory_entries: Vec::new(),
latest_rel_sizes: Default::default(),
lsn,
}
}
@@ -1045,6 +1046,11 @@ pub struct DatadirModification<'a> {
pending_deletions: Vec<(Range<Key>, Lsn)>,
pending_nblocks: i64,
// We update relation sizes when appending. Since writing is single threaded, once we
// have updated a relation size we may be sure that its size is unchanged within the
// same DatadirModification
latest_rel_sizes: HashMap<RelTag, u32>,
/// For special "directory" keys that store key-value maps, track the size of the map
/// if it was updated in this modification.
pending_directory_entries: Vec<(DirectoryKind, usize)>,
@@ -1407,7 +1413,10 @@ impl<'a> DatadirModification<'a> {
// Put size
let size_key = rel_size_to_key(rel);
let old_size = self.get(size_key, ctx).await?.get_u32_le();
let old_size = match self.latest_rel_sizes.get(&rel) {
Some(s) => *s,
None => self.get(size_key, ctx).await?.get_u32_le(),
};
// only extend relation here. never decrease the size
if nblocks > old_size {
@@ -1418,6 +1427,8 @@ impl<'a> DatadirModification<'a> {
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
self.pending_nblocks += nblocks as i64 - old_size as i64;
self.latest_rel_sizes.insert(rel, nblocks);
}
Ok(())
}

View File

@@ -28,7 +28,7 @@ use crate::{
},
};
const TAIL_SZ: usize = 64 * 1024;
const TAIL_SZ: usize = 4096 * 1024;
/// See module-level comment.
pub struct RW<W: OwnedAsyncWriter> {

View File

@@ -521,6 +521,30 @@ impl InMemoryLayer {
self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
}
pub(crate) async fn put_values(
&self,
mut values: Vec<(Lsn, Key, smallvec::SmallVec<[u8; 256]>, u64)>,
ctx: &RequestContext,
) -> Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
for (_lsn, _key, buf, off) in &mut values {
*off = self.put_value_locked2(&mut inner, &buf, ctx).await?;
}
for (lsn, key, _buf, off) in values.into_iter() {
let vec_map = inner.index.entry(key).or_default();
// Use fast version of append, since we know our LSNs are already sorted
vec_map.append2(lsn, off);
}
let size = inner.file.len();
inner.resource_units.maybe_publish_size(size);
Ok(())
}
async fn put_value_locked(
&self,
locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
@@ -556,6 +580,27 @@ impl InMemoryLayer {
Ok(())
}
async fn put_value_locked2(
&self,
locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
buf: &[u8],
ctx: &RequestContext,
) -> Result<u64> {
let off = {
locked_inner
.file
.write_blob(
buf,
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build(),
)
.await?
};
Ok(off)
}
pub(crate) fn get_opened_at(&self) -> Instant {
self.opened_at
}
@@ -574,8 +619,6 @@ impl InMemoryLayer {
/// Records the end_lsn for non-dropped layers.
/// `end_lsn` is exclusive
pub async fn freeze(&self, end_lsn: Lsn) {
let inner = self.inner.write().await;
assert!(
self.start_lsn < end_lsn,
"{} >= {}",
@@ -593,9 +636,13 @@ impl InMemoryLayer {
})
.expect("frozen_local_path_str set only once");
for vec_map in inner.index.values() {
for (lsn, _pos) in vec_map.as_slice() {
assert!(*lsn < end_lsn);
#[cfg(debug_assertions)]
{
let inner = self.inner.write().await;
for vec_map in inner.index.values() {
for (lsn, _pos) in vec_map.as_slice() {
debug_assert!(*lsn < end_lsn);
}
}
}
}

View File

@@ -5990,10 +5990,45 @@ impl<'a> TimelineWriter<'a> {
batch: VecMap<Lsn, (Key, Value)>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
for (lsn, (key, val)) in batch {
self.put(key, lsn, &val, ctx).await?
if batch.is_empty() {
return Ok(());
}
let first_lsn = batch.as_slice().first().unwrap().0;
let last_lsn = batch.as_slice().last().unwrap().0;
let mut total_serialized_size = 0;
let mut serialized = Vec::with_capacity(batch.len());
for (l, (k, v)) in batch.into_iter() {
// Avoid doing allocations for "small" values.
// In the regression test suite, the limit of 256 avoided allocations in 95% of cases:
// https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061
let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
v.ser_into(&mut buf)
.expect("Serialization of Value is infallible");
let buf_size: u64 = buf.len().try_into().expect("oversized value buf");
total_serialized_size += buf_size;
serialized.push((l, k, buf, 0));
}
let action = self.get_open_layer_action(first_lsn, total_serialized_size);
let layer = self
.handle_open_layer_action(first_lsn, action, ctx)
.await?;
layer.put_values(serialized, ctx).await?;
// Update the current size only when the entire write was ok.
// In case of failures, we may have had partial writes which
// render the size tracking out of sync. That's ok because
// the checkpoint distance should be significantly smaller
// than the S3 single shot upload limit of 5GiB.
let state = self.write_guard.as_mut().unwrap();
state.current_size += total_serialized_size;
state.prev_lsn = Some(last_lsn);
state.max_lsn = std::cmp::max(state.max_lsn, Some(last_lsn));
Ok(())
}

View File

@@ -456,7 +456,7 @@ impl SafekeeperPostgresHandler {
// not synchronized with sends, so this avoids deadlocks.
let reader = pgb.split().context("START_REPLICATION split")?;
let mut sender = WalSender {
let mut sender = Box::new(WalSender {
pgb,
// should succeed since we're already holding another guard
tli: tli.wal_residence_guard().await?,
@@ -468,7 +468,7 @@ impl SafekeeperPostgresHandler {
ws_guard: ws_guard.clone(),
wal_reader,
send_buf: [0; MAX_SEND_SIZE],
};
});
let mut reply_reader = ReplyReader {
reader,
ws_guard: ws_guard.clone(),
@@ -586,6 +586,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
}
let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
let send_buf = &mut self.send_buf[..send_size];
let send_size: usize;
{
// If uncommitted part is being pulled, check that the term is

View File

@@ -45,6 +45,12 @@ class PgCompare(ABC):
def flush(self):
pass
def flush1(self):
_x = 1 + 2
def flush2(self):
_x = 1 + 2
@abstractmethod
def report_peak_memory_use(self):
pass
@@ -130,7 +136,13 @@ class NeonCompare(PgCompare):
return self._pg_bin
def flush(self):
self.flush1()
self.flush2()
def flush1(self):
wait_for_last_flush_lsn(self.env, self._pg, self.tenant, self.timeline)
def flush2(self):
self.pageserver_http_client.timeline_checkpoint(self.tenant, self.timeline)
self.pageserver_http_client.timeline_gc(self.tenant, self.timeline, 0)

View File

@@ -1,9 +1,10 @@
import time
from contextlib import closing
import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.common_types import Lsn
from fixtures.compare_fixtures import NeonCompare, PgCompare
from fixtures.log_helper import log
from fixtures.pg_version import PgVersion
@@ -17,7 +18,6 @@ from fixtures.pg_version import PgVersion
# 3. Disk space used
# 4. Peak memory usage
#
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/7124")
def test_bulk_insert(neon_with_baseline: PgCompare):
env = neon_with_baseline
@@ -28,10 +28,15 @@ def test_bulk_insert(neon_with_baseline: PgCompare):
cur.execute("create table huge (i int, j int);")
# Run INSERT, recording the time and I/O it takes
log.info("Writing...")
with env.record_pageserver_writes("pageserver_writes"):
with env.record_duration("insert"):
cur.execute("insert into huge values (generate_series(1, 5000000), 0);")
env.flush()
cur.execute("insert into huge values (generate_series(1, 20000000), 0);")
env.flush1()
log.info("Finished writing")
env.flush2()
env.report_peak_memory_use()
env.report_size()
@@ -70,8 +75,12 @@ def measure_recovery_time(env: NeonCompare):
env.env.pageserver.tenant_create(tenant_id=env.tenant, generation=attach_gen)
# Measure recovery time
with env.record_duration("wal_recovery"):
client.timeline_create(pg_version, env.tenant, env.timeline)
client.timeline_create(pg_version, env.tenant, env.timeline)
log.info("Recovering...")
with env.record_duration("wal_recovery"):
# Flush, which will also wait for lsn to catch up
env.flush()
env.flush1()
log.info("Finished recovering")
time.sleep(5)