Compare commits

...

1 Commits

Author SHA1 Message Date
Erik Grinaker
4308ffe5c0 safekeeper: batch AppendRequest writes 2024-11-13 15:09:57 +01:00
5 changed files with 176 additions and 21 deletions

4
Cargo.lock generated
View File

@@ -994,9 +994,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.5.0"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
dependencies = [
"serde",
]

View File

@@ -262,7 +262,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
// Send requests.
for req in reqgen {
_ = reply_rx.try_recv(); // discard any replies, to avoid blocking
while reply_rx.try_recv().is_ok() {} // discard replies, to avoid blocking
let msg = ProposerAcceptorMessage::AppendRequest(req);
msg_tx.send(msg).await.expect("send failed");
}

View File

@@ -217,7 +217,8 @@ pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
buckets.insert(0, 0.0);
buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
assert!(buckets.len() <= 12, "too many histogram buckets");
// TODO: tweak this.
assert!(buckets.len() <= 16, "too many histogram buckets");
register_histogram!(
"safekeeper_wal_receiver_queue_depth",

View File

@@ -7,14 +7,15 @@ use crate::metrics::{
WAL_RECEIVERS, WAL_RECEIVER_QUEUE_DEPTH, WAL_RECEIVER_QUEUE_DEPTH_TOTAL,
WAL_RECEIVER_QUEUE_SIZE_TOTAL,
};
use crate::safekeeper::AcceptorProposerMessage;
use crate::safekeeper::ProposerAcceptorMessage;
use crate::safekeeper::ServerInfo;
use crate::safekeeper::{
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
ServerInfo,
};
use crate::timeline::WalResidentTimeline;
use crate::wal_service::ConnectionId;
use crate::GlobalTimelines;
use anyhow::{anyhow, Context};
use bytes::BytesMut;
use bytes::{BufMut as _, Bytes, BytesMut};
use parking_lot::MappedMutexGuard;
use parking_lot::Mutex;
use parking_lot::MutexGuard;
@@ -206,7 +207,8 @@ impl Drop for WalReceiverGuard {
}
}
pub const MSG_QUEUE_SIZE: usize = 256;
// TODO: reconsider this.
pub const MSG_QUEUE_SIZE: usize = 4096;
pub const REPLY_QUEUE_SIZE: usize = 16;
impl SafekeeperPostgresHandler {
@@ -484,6 +486,9 @@ const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
/// every 5 seconds, for 12 samples per poll. This will give a count of up to 12x active timelines.
const METRICS_INTERVAL: Duration = Duration::from_secs(5);
/// The AppendRequest buffer size.
const APPEND_BUFFER_SIZE: usize = 1024 * 1024;
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
/// replies to reply_tx.
///
@@ -530,6 +535,9 @@ impl WalAcceptor {
async fn run(&mut self) -> anyhow::Result<()> {
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
// Buffer AppendRequests to submit them as a single large write.
let mut append_buf = BufferedAppendRequest::new(APPEND_BUFFER_SIZE);
// Periodically flush the WAL and compute metrics.
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
@@ -546,7 +554,7 @@ impl WalAcceptor {
// Process inbound message.
msg = self.msg_rx.recv() => {
// If disconnected, break to flush WAL and return.
let Some(mut msg) = msg else {
let Some(msg) = msg else {
break;
};
@@ -563,11 +571,44 @@ impl WalAcceptor {
// This batches multiple appends per fsync. If the channel is empty after
// sending the reply, we'll schedule an immediate flush.
if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
dirty = true;
}
// Try to batch multiple messages into a single large write.
if !append_buf.is_empty() || !self.msg_rx.is_empty() {
if append_buf.add(&append_request) {
continue; // message buffered, go get next message
}
self.tli.process_msg(&msg).await?
// Full buffer, write it and buffer this message for next iteration.
dirty = true;
let buf_req = append_buf.take().expect("empty buffer");
let buf_msg = ProposerAcceptorMessage::NoFlushAppendRequest(buf_req);
let reply = self.tli.process_msg(&buf_msg).await?;
drop(buf_msg); // allow reusing buffer for add
assert!(append_buf.add(&append_request), "empty buffer rejected msg");
reply
} else {
dirty = true;
let msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
self.tli.process_msg(&msg).await?
}
} else {
self.tli.process_msg(&msg).await?
}
}
// If there are no pending messages, write the append buffer.
//
// NB: we don't also flush the WAL here. Otherwise we can get into a regime where we
// quickly drain msg_rx and fsync before the sender is able to repopulate msg_rx.
// This happens consistently due to Tokio scheduling, leading to overeager fsyncing.
// Instead, we perform the write without fsyncing and give the sender a chance to
// get scheduled and populate msg_rx for the next iteration. If there are no further
// messages, the next iteration will flush the WAL.
_ = future::ready(()), if self.msg_rx.is_empty() && !append_buf.is_empty() => {
dirty = true;
let buf_req = append_buf.take().expect("empty buffer");
self.tli
.process_msg(&ProposerAcceptorMessage::NoFlushAppendRequest(buf_req))
.await?
}
// While receiving AppendRequests, flush the WAL periodically and respond with an
@@ -579,11 +620,11 @@ impl WalAcceptor {
.await?
}
// If there are no pending messages, flush the WAL immediately.
// If there are no pending messages, flush the WAL and append buffer immediately.
//
// TODO: this should be done via flush_ticker.reset_immediately(), but that's always
// delayed by 1ms due to this bug: https://github.com/tokio-rs/tokio/issues/6866.
_ = future::ready(()), if dirty && self.msg_rx.is_empty() => {
_ = future::ready(()), if self.msg_rx.is_empty() && dirty => {
dirty = false;
flush_ticker.reset();
self.tli
@@ -627,3 +668,115 @@ impl Drop for WalAcceptor {
}
}
}
/// Buffers WAL data for multiple AppendRequests, to submit them as a single write.
struct BufferedAppendRequest {
/// The buffer capacity.
capacity: usize,
/// The buffered header and WAL data.
buf: Option<(AppendRequestHeader, BytesMut)>,
/// A previous buffer that can be reused when the returned message is dropped.
reuse_buf: Option<Bytes>,
/// If an AppendRequest is larger than the buffer capacity (when empty), just stash it here to
/// avoid growing the buffer and copying it. This will be returned as-is.
large: Option<AppendRequest>,
}
impl BufferedAppendRequest {
/// Creates a new append request buffer with the given capacity.
fn new(capacity: usize) -> Self {
Self {
capacity,
buf: None,
reuse_buf: None,
large: None,
}
}
/// Adds the given append request to the buffer, if possible. Returns `false` if the message
/// can't be buffered, leaving self unmodified. An empty buffer will always accept a message.
///
/// If the buffer is not empty, the message must have the same term and proposer and contiguous
/// `begin_lsn` and `end_lsn`. The buffer must have available capacity for the entire
/// `wal_data`. If the message is greater than an empty buffer's capacity, it is accepted but
/// simply stashed away in `large` without growing the buffer.
pub fn add(&mut self, msg: &AppendRequest) -> bool {
// If there is a stashed large message, reject further messages.
if self.large.is_some() {
return false;
}
// If there is no existing buffer, initialize one with the message.
let Some((ref mut h, ref mut wal_data)) = self.buf else {
// If the message is larger than the buffer capacity, just stash it instead of growing.
if msg.wal_data.len() > self.capacity {
assert!(self.large.is_none());
self.large = Some(msg.clone()); // clone is cheap with Bytes
return true;
}
// Reuse a previous buffer, if any, or allocate a new one.
//
// TODO: try_into_mut() is essentially runtime borrow checking. If AppendRequest used a
// normal Vec<u8> we could do compile-time borrow checking instead and avoid panic.
let mut wal_data = match self.reuse_buf.take() {
Some(reuse_buf) => match reuse_buf.try_into_mut() {
Ok(mut reuse_buf) => {
assert_eq!(reuse_buf.capacity(), self.capacity);
reuse_buf.clear();
reuse_buf
}
Err(_) => panic!("couldn't reuse buffer, still in use"),
},
None => BytesMut::with_capacity(self.capacity),
};
// Copy the append request into the buffer.
wal_data.put_slice(&msg.wal_data);
self.buf = Some((msg.h, wal_data));
return true;
};
// The messages must have the same term and proposer.
if h.term != msg.h.term || h.proposer_uuid != msg.h.proposer_uuid {
return false;
}
// The messages must be contiguous.
if h.end_lsn != msg.h.begin_lsn {
return false;
}
// The message must fit in the buffer.
if wal_data.len() + msg.wal_data.len() > self.capacity {
return false;
}
// Add the message to the buffer, bumping the commit and truncate LSNs. We assume that later
// messages have later commit/truncate LSNs.
h.end_lsn = msg.h.end_lsn;
h.commit_lsn = msg.h.commit_lsn;
h.truncate_lsn = msg.h.truncate_lsn;
wal_data.put_slice(&msg.wal_data);
true
}
/// Returns true if there is no buffered message.
fn is_empty(&self) -> bool {
self.buf.is_none() && self.large.is_none()
}
/// Takes the buffered AppendRequest (if any), leaving a None in its place.
///
/// NB: The returned `wal_data` Bytes must be dropped before the next call to `add()`, in order
/// to reuse the buffer. This is basically runtime borrow checking, because of Bytes.
fn take(&mut self) -> Option<AppendRequest> {
// If there is a stashed large message, return it.
if let Some(large) = self.large.take() {
assert!(self.buf.is_none(), "both buf and large are set");
return Some(large);
}
let (h, wal_data) = self.buf.take()?;
let wal_data = wal_data.freeze();
self.reuse_buf = Some(wal_data.clone()); // keep a reference to the buffer
Some(AppendRequest { h, wal_data })
}
}

View File

@@ -296,12 +296,13 @@ pub struct ProposerElected {
/// Request with WAL message sent from proposer to safekeeper. Along the way it
/// communicates commit_lsn.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct AppendRequest {
pub h: AppendRequestHeader,
pub wal_data: Bytes,
}
#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone, Copy, Deserialize)]
pub struct AppendRequestHeader {
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term,
@@ -1166,7 +1167,7 @@ mod tests {
proposer_uuid: [0; 16],
};
let mut append_request = AppendRequest {
h: ar_hdr.clone(),
h: ar_hdr,
wal_data: Bytes::from_static(b"b"),
};
@@ -1240,7 +1241,7 @@ mod tests {
proposer_uuid: [0; 16],
};
let append_request = AppendRequest {
h: ar_hdr.clone(),
h: ar_hdr,
wal_data: Bytes::from_static(b"b"),
};
@@ -1248,7 +1249,7 @@ mod tests {
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await
.unwrap();
let mut ar_hrd2 = ar_hdr.clone();
let mut ar_hrd2 = ar_hdr;
ar_hrd2.begin_lsn = Lsn(4);
ar_hrd2.end_lsn = Lsn(5);
let append_request = AppendRequest {