mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-31 09:10:38 +00:00
Compare commits
1 Commits
diko/baseb
...
erik/batch
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4308ffe5c0 |
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -994,9 +994,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.5.0"
|
||||
version = "1.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
|
||||
checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
@@ -262,7 +262,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
|
||||
|
||||
// Send requests.
|
||||
for req in reqgen {
|
||||
_ = reply_rx.try_recv(); // discard any replies, to avoid blocking
|
||||
while reply_rx.try_recv().is_ok() {} // discard replies, to avoid blocking
|
||||
let msg = ProposerAcceptorMessage::AppendRequest(req);
|
||||
msg_tx.send(msg).await.expect("send failed");
|
||||
}
|
||||
|
||||
@@ -217,7 +217,8 @@ pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
|
||||
let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
|
||||
buckets.insert(0, 0.0);
|
||||
buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
|
||||
assert!(buckets.len() <= 12, "too many histogram buckets");
|
||||
// TODO: tweak this.
|
||||
assert!(buckets.len() <= 16, "too many histogram buckets");
|
||||
|
||||
register_histogram!(
|
||||
"safekeeper_wal_receiver_queue_depth",
|
||||
|
||||
@@ -7,14 +7,15 @@ use crate::metrics::{
|
||||
WAL_RECEIVERS, WAL_RECEIVER_QUEUE_DEPTH, WAL_RECEIVER_QUEUE_DEPTH_TOTAL,
|
||||
WAL_RECEIVER_QUEUE_SIZE_TOTAL,
|
||||
};
|
||||
use crate::safekeeper::AcceptorProposerMessage;
|
||||
use crate::safekeeper::ProposerAcceptorMessage;
|
||||
use crate::safekeeper::ServerInfo;
|
||||
use crate::safekeeper::{
|
||||
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
|
||||
ServerInfo,
|
||||
};
|
||||
use crate::timeline::WalResidentTimeline;
|
||||
use crate::wal_service::ConnectionId;
|
||||
use crate::GlobalTimelines;
|
||||
use anyhow::{anyhow, Context};
|
||||
use bytes::BytesMut;
|
||||
use bytes::{BufMut as _, Bytes, BytesMut};
|
||||
use parking_lot::MappedMutexGuard;
|
||||
use parking_lot::Mutex;
|
||||
use parking_lot::MutexGuard;
|
||||
@@ -206,7 +207,8 @@ impl Drop for WalReceiverGuard {
|
||||
}
|
||||
}
|
||||
|
||||
pub const MSG_QUEUE_SIZE: usize = 256;
|
||||
// TODO: reconsider this.
|
||||
pub const MSG_QUEUE_SIZE: usize = 4096;
|
||||
pub const REPLY_QUEUE_SIZE: usize = 16;
|
||||
|
||||
impl SafekeeperPostgresHandler {
|
||||
@@ -484,6 +486,9 @@ const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
|
||||
/// every 5 seconds, for 12 samples per poll. This will give a count of up to 12x active timelines.
|
||||
const METRICS_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
||||
/// The AppendRequest buffer size.
|
||||
const APPEND_BUFFER_SIZE: usize = 1024 * 1024;
|
||||
|
||||
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
|
||||
/// replies to reply_tx.
|
||||
///
|
||||
@@ -530,6 +535,9 @@ impl WalAcceptor {
|
||||
async fn run(&mut self) -> anyhow::Result<()> {
|
||||
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
|
||||
|
||||
// Buffer AppendRequests to submit them as a single large write.
|
||||
let mut append_buf = BufferedAppendRequest::new(APPEND_BUFFER_SIZE);
|
||||
|
||||
// Periodically flush the WAL and compute metrics.
|
||||
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
|
||||
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||
@@ -546,7 +554,7 @@ impl WalAcceptor {
|
||||
// Process inbound message.
|
||||
msg = self.msg_rx.recv() => {
|
||||
// If disconnected, break to flush WAL and return.
|
||||
let Some(mut msg) = msg else {
|
||||
let Some(msg) = msg else {
|
||||
break;
|
||||
};
|
||||
|
||||
@@ -563,11 +571,44 @@ impl WalAcceptor {
|
||||
// This batches multiple appends per fsync. If the channel is empty after
|
||||
// sending the reply, we'll schedule an immediate flush.
|
||||
if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
|
||||
msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
|
||||
dirty = true;
|
||||
}
|
||||
// Try to batch multiple messages into a single large write.
|
||||
if !append_buf.is_empty() || !self.msg_rx.is_empty() {
|
||||
if append_buf.add(&append_request) {
|
||||
continue; // message buffered, go get next message
|
||||
}
|
||||
|
||||
self.tli.process_msg(&msg).await?
|
||||
// Full buffer, write it and buffer this message for next iteration.
|
||||
dirty = true;
|
||||
let buf_req = append_buf.take().expect("empty buffer");
|
||||
let buf_msg = ProposerAcceptorMessage::NoFlushAppendRequest(buf_req);
|
||||
let reply = self.tli.process_msg(&buf_msg).await?;
|
||||
drop(buf_msg); // allow reusing buffer for add
|
||||
assert!(append_buf.add(&append_request), "empty buffer rejected msg");
|
||||
reply
|
||||
} else {
|
||||
dirty = true;
|
||||
let msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
|
||||
self.tli.process_msg(&msg).await?
|
||||
}
|
||||
} else {
|
||||
self.tli.process_msg(&msg).await?
|
||||
}
|
||||
}
|
||||
|
||||
// If there are no pending messages, write the append buffer.
|
||||
//
|
||||
// NB: we don't also flush the WAL here. Otherwise we can get into a regime where we
|
||||
// quickly drain msg_rx and fsync before the sender is able to repopulate msg_rx.
|
||||
// This happens consistently due to Tokio scheduling, leading to overeager fsyncing.
|
||||
// Instead, we perform the write without fsyncing and give the sender a chance to
|
||||
// get scheduled and populate msg_rx for the next iteration. If there are no further
|
||||
// messages, the next iteration will flush the WAL.
|
||||
_ = future::ready(()), if self.msg_rx.is_empty() && !append_buf.is_empty() => {
|
||||
dirty = true;
|
||||
let buf_req = append_buf.take().expect("empty buffer");
|
||||
self.tli
|
||||
.process_msg(&ProposerAcceptorMessage::NoFlushAppendRequest(buf_req))
|
||||
.await?
|
||||
}
|
||||
|
||||
// While receiving AppendRequests, flush the WAL periodically and respond with an
|
||||
@@ -579,11 +620,11 @@ impl WalAcceptor {
|
||||
.await?
|
||||
}
|
||||
|
||||
// If there are no pending messages, flush the WAL immediately.
|
||||
// If there are no pending messages, flush the WAL and append buffer immediately.
|
||||
//
|
||||
// TODO: this should be done via flush_ticker.reset_immediately(), but that's always
|
||||
// delayed by 1ms due to this bug: https://github.com/tokio-rs/tokio/issues/6866.
|
||||
_ = future::ready(()), if dirty && self.msg_rx.is_empty() => {
|
||||
_ = future::ready(()), if self.msg_rx.is_empty() && dirty => {
|
||||
dirty = false;
|
||||
flush_ticker.reset();
|
||||
self.tli
|
||||
@@ -627,3 +668,115 @@ impl Drop for WalAcceptor {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Buffers WAL data for multiple AppendRequests, to submit them as a single write.
|
||||
struct BufferedAppendRequest {
|
||||
/// The buffer capacity.
|
||||
capacity: usize,
|
||||
/// The buffered header and WAL data.
|
||||
buf: Option<(AppendRequestHeader, BytesMut)>,
|
||||
/// A previous buffer that can be reused when the returned message is dropped.
|
||||
reuse_buf: Option<Bytes>,
|
||||
/// If an AppendRequest is larger than the buffer capacity (when empty), just stash it here to
|
||||
/// avoid growing the buffer and copying it. This will be returned as-is.
|
||||
large: Option<AppendRequest>,
|
||||
}
|
||||
|
||||
impl BufferedAppendRequest {
|
||||
/// Creates a new append request buffer with the given capacity.
|
||||
fn new(capacity: usize) -> Self {
|
||||
Self {
|
||||
capacity,
|
||||
buf: None,
|
||||
reuse_buf: None,
|
||||
large: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds the given append request to the buffer, if possible. Returns `false` if the message
|
||||
/// can't be buffered, leaving self unmodified. An empty buffer will always accept a message.
|
||||
///
|
||||
/// If the buffer is not empty, the message must have the same term and proposer and contiguous
|
||||
/// `begin_lsn` and `end_lsn`. The buffer must have available capacity for the entire
|
||||
/// `wal_data`. If the message is greater than an empty buffer's capacity, it is accepted but
|
||||
/// simply stashed away in `large` without growing the buffer.
|
||||
pub fn add(&mut self, msg: &AppendRequest) -> bool {
|
||||
// If there is a stashed large message, reject further messages.
|
||||
if self.large.is_some() {
|
||||
return false;
|
||||
}
|
||||
|
||||
// If there is no existing buffer, initialize one with the message.
|
||||
let Some((ref mut h, ref mut wal_data)) = self.buf else {
|
||||
// If the message is larger than the buffer capacity, just stash it instead of growing.
|
||||
if msg.wal_data.len() > self.capacity {
|
||||
assert!(self.large.is_none());
|
||||
self.large = Some(msg.clone()); // clone is cheap with Bytes
|
||||
return true;
|
||||
}
|
||||
|
||||
// Reuse a previous buffer, if any, or allocate a new one.
|
||||
//
|
||||
// TODO: try_into_mut() is essentially runtime borrow checking. If AppendRequest used a
|
||||
// normal Vec<u8> we could do compile-time borrow checking instead and avoid panic.
|
||||
let mut wal_data = match self.reuse_buf.take() {
|
||||
Some(reuse_buf) => match reuse_buf.try_into_mut() {
|
||||
Ok(mut reuse_buf) => {
|
||||
assert_eq!(reuse_buf.capacity(), self.capacity);
|
||||
reuse_buf.clear();
|
||||
reuse_buf
|
||||
}
|
||||
Err(_) => panic!("couldn't reuse buffer, still in use"),
|
||||
},
|
||||
None => BytesMut::with_capacity(self.capacity),
|
||||
};
|
||||
// Copy the append request into the buffer.
|
||||
wal_data.put_slice(&msg.wal_data);
|
||||
self.buf = Some((msg.h, wal_data));
|
||||
return true;
|
||||
};
|
||||
|
||||
// The messages must have the same term and proposer.
|
||||
if h.term != msg.h.term || h.proposer_uuid != msg.h.proposer_uuid {
|
||||
return false;
|
||||
}
|
||||
// The messages must be contiguous.
|
||||
if h.end_lsn != msg.h.begin_lsn {
|
||||
return false;
|
||||
}
|
||||
// The message must fit in the buffer.
|
||||
if wal_data.len() + msg.wal_data.len() > self.capacity {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Add the message to the buffer, bumping the commit and truncate LSNs. We assume that later
|
||||
// messages have later commit/truncate LSNs.
|
||||
h.end_lsn = msg.h.end_lsn;
|
||||
h.commit_lsn = msg.h.commit_lsn;
|
||||
h.truncate_lsn = msg.h.truncate_lsn;
|
||||
wal_data.put_slice(&msg.wal_data);
|
||||
true
|
||||
}
|
||||
|
||||
/// Returns true if there is no buffered message.
|
||||
fn is_empty(&self) -> bool {
|
||||
self.buf.is_none() && self.large.is_none()
|
||||
}
|
||||
|
||||
/// Takes the buffered AppendRequest (if any), leaving a None in its place.
|
||||
///
|
||||
/// NB: The returned `wal_data` Bytes must be dropped before the next call to `add()`, in order
|
||||
/// to reuse the buffer. This is basically runtime borrow checking, because of Bytes.
|
||||
fn take(&mut self) -> Option<AppendRequest> {
|
||||
// If there is a stashed large message, return it.
|
||||
if let Some(large) = self.large.take() {
|
||||
assert!(self.buf.is_none(), "both buf and large are set");
|
||||
return Some(large);
|
||||
}
|
||||
|
||||
let (h, wal_data) = self.buf.take()?;
|
||||
let wal_data = wal_data.freeze();
|
||||
self.reuse_buf = Some(wal_data.clone()); // keep a reference to the buffer
|
||||
Some(AppendRequest { h, wal_data })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -296,12 +296,13 @@ pub struct ProposerElected {
|
||||
|
||||
/// Request with WAL message sent from proposer to safekeeper. Along the way it
|
||||
/// communicates commit_lsn.
|
||||
#[derive(Debug)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AppendRequest {
|
||||
pub h: AppendRequestHeader,
|
||||
pub wal_data: Bytes,
|
||||
}
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
|
||||
#[derive(Debug, Clone, Copy, Deserialize)]
|
||||
pub struct AppendRequestHeader {
|
||||
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
pub term: Term,
|
||||
@@ -1166,7 +1167,7 @@ mod tests {
|
||||
proposer_uuid: [0; 16],
|
||||
};
|
||||
let mut append_request = AppendRequest {
|
||||
h: ar_hdr.clone(),
|
||||
h: ar_hdr,
|
||||
wal_data: Bytes::from_static(b"b"),
|
||||
};
|
||||
|
||||
@@ -1240,7 +1241,7 @@ mod tests {
|
||||
proposer_uuid: [0; 16],
|
||||
};
|
||||
let append_request = AppendRequest {
|
||||
h: ar_hdr.clone(),
|
||||
h: ar_hdr,
|
||||
wal_data: Bytes::from_static(b"b"),
|
||||
};
|
||||
|
||||
@@ -1248,7 +1249,7 @@ mod tests {
|
||||
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
|
||||
.await
|
||||
.unwrap();
|
||||
let mut ar_hrd2 = ar_hdr.clone();
|
||||
let mut ar_hrd2 = ar_hdr;
|
||||
ar_hrd2.begin_lsn = Lsn(4);
|
||||
ar_hrd2.end_lsn = Lsn(5);
|
||||
let append_request = AppendRequest {
|
||||
|
||||
Reference in New Issue
Block a user