mirror of
https://github.com/neondatabase/neon.git
synced 2026-04-21 16:30:38 +00:00
Compare commits
16 Commits
quantumish
...
jcsp/faste
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
12ca2550ba | ||
|
|
a56bef7d9c | ||
|
|
6162506162 | ||
|
|
b182666712 | ||
|
|
b0147d96ea | ||
|
|
c35c420edc | ||
|
|
f534793707 | ||
|
|
a9cff1e881 | ||
|
|
2afcc58ce0 | ||
|
|
6a267cf9e3 | ||
|
|
081161060e | ||
|
|
2ce975e405 | ||
|
|
775a4958d7 | ||
|
|
e37236fd56 | ||
|
|
425c0c314f | ||
|
|
201f1b5948 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -6850,6 +6850,7 @@ dependencies = [
|
||||
"serde_path_to_error",
|
||||
"serde_with",
|
||||
"signal-hook",
|
||||
"smallvec",
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"thiserror",
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
///
|
||||
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
|
||||
/// for what we actually store in these fields.
|
||||
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, Ord, Serialize, Deserialize)]
|
||||
pub struct Key {
|
||||
pub field1: u8,
|
||||
pub field2: u32,
|
||||
@@ -22,6 +22,41 @@ pub struct Key {
|
||||
pub field6: u32,
|
||||
}
|
||||
|
||||
impl PartialOrd for Key {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
if self.field1 == other.field1
|
||||
&& self.field2 == other.field2
|
||||
&& self.field3 == other.field3
|
||||
&& self.field4 == other.field4
|
||||
&& self.field5 == other.field5
|
||||
{
|
||||
self.field6.partial_cmp(&other.field6)
|
||||
} else {
|
||||
match self.field1.partial_cmp(&other.field1) {
|
||||
Some(core::cmp::Ordering::Equal) => {}
|
||||
ord => return ord,
|
||||
}
|
||||
match self.field2.partial_cmp(&other.field2) {
|
||||
Some(core::cmp::Ordering::Equal) => {}
|
||||
ord => return ord,
|
||||
}
|
||||
match self.field3.partial_cmp(&other.field3) {
|
||||
Some(core::cmp::Ordering::Equal) => {}
|
||||
ord => return ord,
|
||||
}
|
||||
match self.field4.partial_cmp(&other.field4) {
|
||||
Some(core::cmp::Ordering::Equal) => {}
|
||||
ord => return ord,
|
||||
}
|
||||
match self.field5.partial_cmp(&other.field5) {
|
||||
Some(core::cmp::Ordering::Equal) => {}
|
||||
ord => return ord,
|
||||
}
|
||||
self.field6.partial_cmp(&other.field6)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The storage key size.
|
||||
pub const KEY_SIZE: usize = 18;
|
||||
|
||||
|
||||
@@ -132,7 +132,7 @@ pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
|
||||
pub const XLOG_BLCKSZ: usize = 8192;
|
||||
pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
|
||||
|
||||
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
|
||||
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 128;
|
||||
|
||||
// Export some version independent functions that are used outside of this mod
|
||||
pub use v14::xlog_utils::encode_logical_message;
|
||||
|
||||
@@ -36,6 +36,7 @@ routerify.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
signal-hook.workspace = true
|
||||
smallvec.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-tar.workspace = true
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
use std::{alloc::Layout, cmp::Ordering, ops::RangeBounds};
|
||||
|
||||
use smallvec::SmallVec;
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub enum VecMapOrdering {
|
||||
Greater,
|
||||
GreaterOrEqual,
|
||||
}
|
||||
|
||||
const INLINE_ELEMENTS: usize = 1;
|
||||
|
||||
/// Ordered map datastructure implemented in a Vec.
|
||||
/// Append only - can only add keys that are larger than the
|
||||
/// current max key.
|
||||
@@ -13,7 +17,7 @@ pub enum VecMapOrdering {
|
||||
/// during `VecMap` construction.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct VecMap<K, V> {
|
||||
data: Vec<(K, V)>,
|
||||
data: SmallVec<[(K, V); INLINE_ELEMENTS]>,
|
||||
ordering: VecMapOrdering,
|
||||
}
|
||||
|
||||
@@ -37,14 +41,18 @@ pub enum VecMapError {
|
||||
impl<K: Ord, V> VecMap<K, V> {
|
||||
pub fn new(ordering: VecMapOrdering) -> Self {
|
||||
Self {
|
||||
data: Vec::new(),
|
||||
data: Default::default(),
|
||||
ordering,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.data.len()
|
||||
}
|
||||
|
||||
pub fn with_capacity(capacity: usize, ordering: VecMapOrdering) -> Self {
|
||||
Self {
|
||||
data: Vec::with_capacity(capacity),
|
||||
data: SmallVec::with_capacity(capacity),
|
||||
ordering,
|
||||
}
|
||||
}
|
||||
@@ -119,6 +127,11 @@ impl<K: Ord, V> VecMap<K, V> {
|
||||
Ok((None, delta_size))
|
||||
}
|
||||
|
||||
/// Where the key is known to be unique, and we don't want any instrumentation
|
||||
pub fn append2(&mut self, key: K, value: V) {
|
||||
self.data.push((key, value));
|
||||
}
|
||||
|
||||
/// Split the map into two.
|
||||
///
|
||||
/// The left map contains everything before `cutoff` (exclusive).
|
||||
@@ -135,11 +148,11 @@ impl<K: Ord, V> VecMap<K, V> {
|
||||
|
||||
(
|
||||
VecMap {
|
||||
data: self.data[..split_idx].to_vec(),
|
||||
data: SmallVec::from(&self.data[..split_idx]),
|
||||
ordering: self.ordering,
|
||||
},
|
||||
VecMap {
|
||||
data: self.data[split_idx..].to_vec(),
|
||||
data: SmallVec::from(&self.data[split_idx..]),
|
||||
ordering: self.ordering,
|
||||
},
|
||||
)
|
||||
@@ -186,7 +199,10 @@ impl<K: Ord, V> VecMap<K, V> {
|
||||
/// Instrument an operation on the underlying [`Vec`].
|
||||
/// Will panic if the operation decreases capacity.
|
||||
/// Returns the increase in memory usage caused by the op.
|
||||
fn instrument_vec_op(&mut self, op: impl FnOnce(&mut Vec<(K, V)>)) -> usize {
|
||||
fn instrument_vec_op(
|
||||
&mut self,
|
||||
op: impl FnOnce(&mut SmallVec<[(K, V); INLINE_ELEMENTS]>),
|
||||
) -> usize {
|
||||
let old_cap = self.data.capacity();
|
||||
op(&mut self.data);
|
||||
let new_cap = self.data.capacity();
|
||||
@@ -226,7 +242,7 @@ impl<K: Ord, V> VecMap<K, V> {
|
||||
|
||||
impl<K: Ord, V> IntoIterator for VecMap<K, V> {
|
||||
type Item = (K, V);
|
||||
type IntoIter = std::vec::IntoIter<(K, V)>;
|
||||
type IntoIter = smallvec::IntoIter<[(K, V); INLINE_ELEMENTS]>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
self.data.into_iter()
|
||||
|
||||
@@ -174,6 +174,7 @@ impl Timeline {
|
||||
pending_deletions: Vec::new(),
|
||||
pending_nblocks: 0,
|
||||
pending_directory_entries: Vec::new(),
|
||||
latest_rel_sizes: Default::default(),
|
||||
lsn,
|
||||
}
|
||||
}
|
||||
@@ -1045,6 +1046,11 @@ pub struct DatadirModification<'a> {
|
||||
pending_deletions: Vec<(Range<Key>, Lsn)>,
|
||||
pending_nblocks: i64,
|
||||
|
||||
// We update relation sizes when appending. Since writing is single threaded, once we
|
||||
// have updated a relation size we may be sure that its size is unchanged within the
|
||||
// same DatadirModification
|
||||
latest_rel_sizes: HashMap<RelTag, u32>,
|
||||
|
||||
/// For special "directory" keys that store key-value maps, track the size of the map
|
||||
/// if it was updated in this modification.
|
||||
pending_directory_entries: Vec<(DirectoryKind, usize)>,
|
||||
@@ -1407,7 +1413,10 @@ impl<'a> DatadirModification<'a> {
|
||||
|
||||
// Put size
|
||||
let size_key = rel_size_to_key(rel);
|
||||
let old_size = self.get(size_key, ctx).await?.get_u32_le();
|
||||
let old_size = match self.latest_rel_sizes.get(&rel) {
|
||||
Some(s) => *s,
|
||||
None => self.get(size_key, ctx).await?.get_u32_le(),
|
||||
};
|
||||
|
||||
// only extend relation here. never decrease the size
|
||||
if nblocks > old_size {
|
||||
@@ -1418,6 +1427,8 @@ impl<'a> DatadirModification<'a> {
|
||||
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
|
||||
|
||||
self.pending_nblocks += nblocks as i64 - old_size as i64;
|
||||
|
||||
self.latest_rel_sizes.insert(rel, nblocks);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ use crate::{
|
||||
},
|
||||
};
|
||||
|
||||
const TAIL_SZ: usize = 64 * 1024;
|
||||
const TAIL_SZ: usize = 4096 * 1024;
|
||||
|
||||
/// See module-level comment.
|
||||
pub struct RW<W: OwnedAsyncWriter> {
|
||||
|
||||
@@ -521,6 +521,30 @@ impl InMemoryLayer {
|
||||
self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn put_values(
|
||||
&self,
|
||||
mut values: Vec<(Lsn, Key, smallvec::SmallVec<[u8; 256]>, u64)>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
self.assert_writable();
|
||||
for (_lsn, _key, buf, off) in &mut values {
|
||||
*off = self.put_value_locked2(&mut inner, &buf, ctx).await?;
|
||||
}
|
||||
|
||||
for (lsn, key, _buf, off) in values.into_iter() {
|
||||
let vec_map = inner.index.entry(key).or_default();
|
||||
|
||||
// Use fast version of append, since we know our LSNs are already sorted
|
||||
vec_map.append2(lsn, off);
|
||||
}
|
||||
|
||||
let size = inner.file.len();
|
||||
inner.resource_units.maybe_publish_size(size);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn put_value_locked(
|
||||
&self,
|
||||
locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
|
||||
@@ -556,6 +580,27 @@ impl InMemoryLayer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn put_value_locked2(
|
||||
&self,
|
||||
locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
|
||||
buf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64> {
|
||||
let off = {
|
||||
locked_inner
|
||||
.file
|
||||
.write_blob(
|
||||
buf,
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build(),
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
||||
Ok(off)
|
||||
}
|
||||
|
||||
pub(crate) fn get_opened_at(&self) -> Instant {
|
||||
self.opened_at
|
||||
}
|
||||
@@ -574,8 +619,6 @@ impl InMemoryLayer {
|
||||
/// Records the end_lsn for non-dropped layers.
|
||||
/// `end_lsn` is exclusive
|
||||
pub async fn freeze(&self, end_lsn: Lsn) {
|
||||
let inner = self.inner.write().await;
|
||||
|
||||
assert!(
|
||||
self.start_lsn < end_lsn,
|
||||
"{} >= {}",
|
||||
@@ -593,9 +636,13 @@ impl InMemoryLayer {
|
||||
})
|
||||
.expect("frozen_local_path_str set only once");
|
||||
|
||||
for vec_map in inner.index.values() {
|
||||
for (lsn, _pos) in vec_map.as_slice() {
|
||||
assert!(*lsn < end_lsn);
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
let inner = self.inner.write().await;
|
||||
for vec_map in inner.index.values() {
|
||||
for (lsn, _pos) in vec_map.as_slice() {
|
||||
debug_assert!(*lsn < end_lsn);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5990,10 +5990,45 @@ impl<'a> TimelineWriter<'a> {
|
||||
batch: VecMap<Lsn, (Key, Value)>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
for (lsn, (key, val)) in batch {
|
||||
self.put(key, lsn, &val, ctx).await?
|
||||
if batch.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let first_lsn = batch.as_slice().first().unwrap().0;
|
||||
let last_lsn = batch.as_slice().last().unwrap().0;
|
||||
let mut total_serialized_size = 0;
|
||||
|
||||
let mut serialized = Vec::with_capacity(batch.len());
|
||||
for (l, (k, v)) in batch.into_iter() {
|
||||
// Avoid doing allocations for "small" values.
|
||||
// In the regression test suite, the limit of 256 avoided allocations in 95% of cases:
|
||||
// https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061
|
||||
let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
|
||||
v.ser_into(&mut buf)
|
||||
.expect("Serialization of Value is infallible");
|
||||
let buf_size: u64 = buf.len().try_into().expect("oversized value buf");
|
||||
total_serialized_size += buf_size;
|
||||
serialized.push((l, k, buf, 0));
|
||||
}
|
||||
|
||||
let action = self.get_open_layer_action(first_lsn, total_serialized_size);
|
||||
let layer = self
|
||||
.handle_open_layer_action(first_lsn, action, ctx)
|
||||
.await?;
|
||||
|
||||
layer.put_values(serialized, ctx).await?;
|
||||
|
||||
// Update the current size only when the entire write was ok.
|
||||
// In case of failures, we may have had partial writes which
|
||||
// render the size tracking out of sync. That's ok because
|
||||
// the checkpoint distance should be significantly smaller
|
||||
// than the S3 single shot upload limit of 5GiB.
|
||||
let state = self.write_guard.as_mut().unwrap();
|
||||
|
||||
state.current_size += total_serialized_size;
|
||||
state.prev_lsn = Some(last_lsn);
|
||||
state.max_lsn = std::cmp::max(state.max_lsn, Some(last_lsn));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -456,7 +456,7 @@ impl SafekeeperPostgresHandler {
|
||||
// not synchronized with sends, so this avoids deadlocks.
|
||||
let reader = pgb.split().context("START_REPLICATION split")?;
|
||||
|
||||
let mut sender = WalSender {
|
||||
let mut sender = Box::new(WalSender {
|
||||
pgb,
|
||||
// should succeed since we're already holding another guard
|
||||
tli: tli.wal_residence_guard().await?,
|
||||
@@ -468,7 +468,7 @@ impl SafekeeperPostgresHandler {
|
||||
ws_guard: ws_guard.clone(),
|
||||
wal_reader,
|
||||
send_buf: [0; MAX_SEND_SIZE],
|
||||
};
|
||||
});
|
||||
let mut reply_reader = ReplyReader {
|
||||
reader,
|
||||
ws_guard: ws_guard.clone(),
|
||||
@@ -586,6 +586,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
}
|
||||
let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
|
||||
let send_buf = &mut self.send_buf[..send_size];
|
||||
|
||||
let send_size: usize;
|
||||
{
|
||||
// If uncommitted part is being pulled, check that the term is
|
||||
|
||||
@@ -45,6 +45,12 @@ class PgCompare(ABC):
|
||||
def flush(self):
|
||||
pass
|
||||
|
||||
def flush1(self):
|
||||
_x = 1 + 2
|
||||
|
||||
def flush2(self):
|
||||
_x = 1 + 2
|
||||
|
||||
@abstractmethod
|
||||
def report_peak_memory_use(self):
|
||||
pass
|
||||
@@ -130,7 +136,13 @@ class NeonCompare(PgCompare):
|
||||
return self._pg_bin
|
||||
|
||||
def flush(self):
|
||||
self.flush1()
|
||||
self.flush2()
|
||||
|
||||
def flush1(self):
|
||||
wait_for_last_flush_lsn(self.env, self._pg, self.tenant, self.timeline)
|
||||
|
||||
def flush2(self):
|
||||
self.pageserver_http_client.timeline_checkpoint(self.tenant, self.timeline)
|
||||
self.pageserver_http_client.timeline_gc(self.tenant, self.timeline, 0)
|
||||
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import time
|
||||
from contextlib import closing
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport
|
||||
from fixtures.common_types import Lsn
|
||||
from fixtures.compare_fixtures import NeonCompare, PgCompare
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
@@ -17,7 +18,6 @@ from fixtures.pg_version import PgVersion
|
||||
# 3. Disk space used
|
||||
# 4. Peak memory usage
|
||||
#
|
||||
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/7124")
|
||||
def test_bulk_insert(neon_with_baseline: PgCompare):
|
||||
env = neon_with_baseline
|
||||
|
||||
@@ -28,10 +28,15 @@ def test_bulk_insert(neon_with_baseline: PgCompare):
|
||||
cur.execute("create table huge (i int, j int);")
|
||||
|
||||
# Run INSERT, recording the time and I/O it takes
|
||||
log.info("Writing...")
|
||||
with env.record_pageserver_writes("pageserver_writes"):
|
||||
with env.record_duration("insert"):
|
||||
cur.execute("insert into huge values (generate_series(1, 5000000), 0);")
|
||||
env.flush()
|
||||
cur.execute("insert into huge values (generate_series(1, 20000000), 0);")
|
||||
env.flush1()
|
||||
|
||||
log.info("Finished writing")
|
||||
|
||||
env.flush2()
|
||||
|
||||
env.report_peak_memory_use()
|
||||
env.report_size()
|
||||
@@ -70,8 +75,12 @@ def measure_recovery_time(env: NeonCompare):
|
||||
env.env.pageserver.tenant_create(tenant_id=env.tenant, generation=attach_gen)
|
||||
|
||||
# Measure recovery time
|
||||
with env.record_duration("wal_recovery"):
|
||||
client.timeline_create(pg_version, env.tenant, env.timeline)
|
||||
client.timeline_create(pg_version, env.tenant, env.timeline)
|
||||
|
||||
log.info("Recovering...")
|
||||
with env.record_duration("wal_recovery"):
|
||||
# Flush, which will also wait for lsn to catch up
|
||||
env.flush()
|
||||
env.flush1()
|
||||
|
||||
log.info("Finished recovering")
|
||||
time.sleep(5)
|
||||
|
||||
Reference in New Issue
Block a user