diff --git a/Cargo.lock b/Cargo.lock index 64231ed11c..c6ca1738af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -994,9 +994,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.5.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" dependencies = [ "serde", ] diff --git a/safekeeper/benches/receive_wal.rs b/safekeeper/benches/receive_wal.rs index e32d7526ca..9465eaf92c 100644 --- a/safekeeper/benches/receive_wal.rs +++ b/safekeeper/benches/receive_wal.rs @@ -262,7 +262,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) { // Send requests. for req in reqgen { - _ = reply_rx.try_recv(); // discard any replies, to avoid blocking + while reply_rx.try_recv().is_ok() {} // discard replies, to avoid blocking let msg = ProposerAcceptorMessage::AppendRequest(req); msg_tx.send(msg).await.expect("send failed"); } diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index bbd2f86898..af2bc17301 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -217,7 +217,8 @@ pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy = Lazy::new(|| { let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE); buckets.insert(0, 0.0); buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64); - assert!(buckets.len() <= 12, "too many histogram buckets"); + // TODO: tweak this. + assert!(buckets.len() <= 16, "too many histogram buckets"); register_histogram!( "safekeeper_wal_receiver_queue_depth", diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index a0a96c6e99..05521f0791 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -7,14 +7,15 @@ use crate::metrics::{ WAL_RECEIVERS, WAL_RECEIVER_QUEUE_DEPTH, WAL_RECEIVER_QUEUE_DEPTH_TOTAL, WAL_RECEIVER_QUEUE_SIZE_TOTAL, }; -use crate::safekeeper::AcceptorProposerMessage; -use crate::safekeeper::ProposerAcceptorMessage; -use crate::safekeeper::ServerInfo; +use crate::safekeeper::{ + AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, + ServerInfo, +}; use crate::timeline::WalResidentTimeline; use crate::wal_service::ConnectionId; use crate::GlobalTimelines; use anyhow::{anyhow, Context}; -use bytes::BytesMut; +use bytes::{BufMut as _, Bytes, BytesMut}; use parking_lot::MappedMutexGuard; use parking_lot::Mutex; use parking_lot::MutexGuard; @@ -206,7 +207,8 @@ impl Drop for WalReceiverGuard { } } -pub const MSG_QUEUE_SIZE: usize = 256; +// TODO: reconsider this. +pub const MSG_QUEUE_SIZE: usize = 4096; pub const REPLY_QUEUE_SIZE: usize = 16; impl SafekeeperPostgresHandler { @@ -484,6 +486,9 @@ const FLUSH_INTERVAL: Duration = Duration::from_secs(1); /// every 5 seconds, for 12 samples per poll. This will give a count of up to 12x active timelines. const METRICS_INTERVAL: Duration = Duration::from_secs(5); +/// The AppendRequest buffer size. +const APPEND_BUFFER_SIZE: usize = 1024 * 1024; + /// Encapsulates a task which takes messages from msg_rx, processes and pushes /// replies to reply_tx. /// @@ -530,6 +535,9 @@ impl WalAcceptor { async fn run(&mut self) -> anyhow::Result<()> { let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id); + // Buffer AppendRequests to submit them as a single large write. + let mut append_buf = BufferedAppendRequest::new(APPEND_BUFFER_SIZE); + // Periodically flush the WAL and compute metrics. let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL); flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -546,7 +554,7 @@ impl WalAcceptor { // Process inbound message. msg = self.msg_rx.recv() => { // If disconnected, break to flush WAL and return. - let Some(mut msg) = msg else { + let Some(msg) = msg else { break; }; @@ -563,11 +571,44 @@ impl WalAcceptor { // This batches multiple appends per fsync. If the channel is empty after // sending the reply, we'll schedule an immediate flush. if let ProposerAcceptorMessage::AppendRequest(append_request) = msg { - msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request); - dirty = true; - } + // Try to batch multiple messages into a single large write. + if !append_buf.is_empty() || !self.msg_rx.is_empty() { + if append_buf.add(&append_request) { + continue; // message buffered, go get next message + } - self.tli.process_msg(&msg).await? + // Full buffer, write it and buffer this message for next iteration. + dirty = true; + let buf_req = append_buf.take().expect("empty buffer"); + let buf_msg = ProposerAcceptorMessage::NoFlushAppendRequest(buf_req); + let reply = self.tli.process_msg(&buf_msg).await?; + drop(buf_msg); // allow reusing buffer for add + assert!(append_buf.add(&append_request), "empty buffer rejected msg"); + reply + } else { + dirty = true; + let msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request); + self.tli.process_msg(&msg).await? + } + } else { + self.tli.process_msg(&msg).await? + } + } + + // If there are no pending messages, write the append buffer. + // + // NB: we don't also flush the WAL here. Otherwise we can get into a regime where we + // quickly drain msg_rx and fsync before the sender is able to repopulate msg_rx. + // This happens consistently due to Tokio scheduling, leading to overeager fsyncing. + // Instead, we perform the write without fsyncing and give the sender a chance to + // get scheduled and populate msg_rx for the next iteration. If there are no further + // messages, the next iteration will flush the WAL. + _ = future::ready(()), if self.msg_rx.is_empty() && !append_buf.is_empty() => { + dirty = true; + let buf_req = append_buf.take().expect("empty buffer"); + self.tli + .process_msg(&ProposerAcceptorMessage::NoFlushAppendRequest(buf_req)) + .await? } // While receiving AppendRequests, flush the WAL periodically and respond with an @@ -579,11 +620,11 @@ impl WalAcceptor { .await? } - // If there are no pending messages, flush the WAL immediately. + // If there are no pending messages, flush the WAL and append buffer immediately. // // TODO: this should be done via flush_ticker.reset_immediately(), but that's always // delayed by 1ms due to this bug: https://github.com/tokio-rs/tokio/issues/6866. - _ = future::ready(()), if dirty && self.msg_rx.is_empty() => { + _ = future::ready(()), if self.msg_rx.is_empty() && dirty => { dirty = false; flush_ticker.reset(); self.tli @@ -627,3 +668,115 @@ impl Drop for WalAcceptor { } } } + +/// Buffers WAL data for multiple AppendRequests, to submit them as a single write. +struct BufferedAppendRequest { + /// The buffer capacity. + capacity: usize, + /// The buffered header and WAL data. + buf: Option<(AppendRequestHeader, BytesMut)>, + /// A previous buffer that can be reused when the returned message is dropped. + reuse_buf: Option, + /// If an AppendRequest is larger than the buffer capacity (when empty), just stash it here to + /// avoid growing the buffer and copying it. This will be returned as-is. + large: Option, +} + +impl BufferedAppendRequest { + /// Creates a new append request buffer with the given capacity. + fn new(capacity: usize) -> Self { + Self { + capacity, + buf: None, + reuse_buf: None, + large: None, + } + } + + /// Adds the given append request to the buffer, if possible. Returns `false` if the message + /// can't be buffered, leaving self unmodified. An empty buffer will always accept a message. + /// + /// If the buffer is not empty, the message must have the same term and proposer and contiguous + /// `begin_lsn` and `end_lsn`. The buffer must have available capacity for the entire + /// `wal_data`. If the message is greater than an empty buffer's capacity, it is accepted but + /// simply stashed away in `large` without growing the buffer. + pub fn add(&mut self, msg: &AppendRequest) -> bool { + // If there is a stashed large message, reject further messages. + if self.large.is_some() { + return false; + } + + // If there is no existing buffer, initialize one with the message. + let Some((ref mut h, ref mut wal_data)) = self.buf else { + // If the message is larger than the buffer capacity, just stash it instead of growing. + if msg.wal_data.len() > self.capacity { + assert!(self.large.is_none()); + self.large = Some(msg.clone()); // clone is cheap with Bytes + return true; + } + + // Reuse a previous buffer, if any, or allocate a new one. + // + // TODO: try_into_mut() is essentially runtime borrow checking. If AppendRequest used a + // normal Vec we could do compile-time borrow checking instead and avoid panic. + let mut wal_data = match self.reuse_buf.take() { + Some(reuse_buf) => match reuse_buf.try_into_mut() { + Ok(mut reuse_buf) => { + assert_eq!(reuse_buf.capacity(), self.capacity); + reuse_buf.clear(); + reuse_buf + } + Err(_) => panic!("couldn't reuse buffer, still in use"), + }, + None => BytesMut::with_capacity(self.capacity), + }; + // Copy the append request into the buffer. + wal_data.put_slice(&msg.wal_data); + self.buf = Some((msg.h, wal_data)); + return true; + }; + + // The messages must have the same term and proposer. + if h.term != msg.h.term || h.proposer_uuid != msg.h.proposer_uuid { + return false; + } + // The messages must be contiguous. + if h.end_lsn != msg.h.begin_lsn { + return false; + } + // The message must fit in the buffer. + if wal_data.len() + msg.wal_data.len() > self.capacity { + return false; + } + + // Add the message to the buffer, bumping the commit and truncate LSNs. We assume that later + // messages have later commit/truncate LSNs. + h.end_lsn = msg.h.end_lsn; + h.commit_lsn = msg.h.commit_lsn; + h.truncate_lsn = msg.h.truncate_lsn; + wal_data.put_slice(&msg.wal_data); + true + } + + /// Returns true if there is no buffered message. + fn is_empty(&self) -> bool { + self.buf.is_none() && self.large.is_none() + } + + /// Takes the buffered AppendRequest (if any), leaving a None in its place. + /// + /// NB: The returned `wal_data` Bytes must be dropped before the next call to `add()`, in order + /// to reuse the buffer. This is basically runtime borrow checking, because of Bytes. + fn take(&mut self) -> Option { + // If there is a stashed large message, return it. + if let Some(large) = self.large.take() { + assert!(self.buf.is_none(), "both buf and large are set"); + return Some(large); + } + + let (h, wal_data) = self.buf.take()?; + let wal_data = wal_data.freeze(); + self.reuse_buf = Some(wal_data.clone()); // keep a reference to the buffer + Some(AppendRequest { h, wal_data }) + } +} diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index f4983d44d0..ba22c5bbcb 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -296,12 +296,13 @@ pub struct ProposerElected { /// Request with WAL message sent from proposer to safekeeper. Along the way it /// communicates commit_lsn. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct AppendRequest { pub h: AppendRequestHeader, pub wal_data: Bytes, } -#[derive(Debug, Clone, Deserialize)] + +#[derive(Debug, Clone, Copy, Deserialize)] pub struct AppendRequestHeader { // safekeeper's current term; if it is higher than proposer's, the compute is out of date. pub term: Term, @@ -1166,7 +1167,7 @@ mod tests { proposer_uuid: [0; 16], }; let mut append_request = AppendRequest { - h: ar_hdr.clone(), + h: ar_hdr, wal_data: Bytes::from_static(b"b"), }; @@ -1240,7 +1241,7 @@ mod tests { proposer_uuid: [0; 16], }; let append_request = AppendRequest { - h: ar_hdr.clone(), + h: ar_hdr, wal_data: Bytes::from_static(b"b"), }; @@ -1248,7 +1249,7 @@ mod tests { sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)) .await .unwrap(); - let mut ar_hrd2 = ar_hdr.clone(); + let mut ar_hrd2 = ar_hdr; ar_hrd2.begin_lsn = Lsn(4); ar_hrd2.end_lsn = Lsn(5); let append_request = AppendRequest {