Implement backpressure for compute node to avoid WAL overflow

Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
Co-authored-by: Alexey Kondratov <kondratov.aleksey@gmail.com>
This commit is contained in:
Konstantin Knizhnik
2021-10-21 18:15:50 +03:00
committed by GitHub
parent ff563ff080
commit c310932121
10 changed files with 166 additions and 71 deletions

View File

@@ -300,7 +300,7 @@ impl PostgresNode {
conf.append("shared_buffers", "1MB");
conf.append("fsync", "off");
conf.append("max_connections", "100");
conf.append("wal_sender_timeout", "0");
conf.append("wal_sender_timeout", "10s");
conf.append("wal_level", "replica");
conf.append("listen_addresses", &self.address.ip().to_string());
conf.append("port", &self.address.port().to_string());

View File

@@ -695,8 +695,8 @@ impl Timeline for LayeredTimeline {
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive",
lsn
"Timed out while waiting for WAL record at LSN {} to arrive, disk consistent LSN={}",
lsn, self.get_disk_consistent_lsn()
)
})?;
@@ -910,6 +910,10 @@ impl Timeline for LayeredTimeline {
Ok(total_blocks * BLCKSZ as usize)
}
fn get_disk_consistent_lsn(&self) -> Lsn {
self.disk_consistent_lsn.load()
}
fn writer<'a>(&'a self) -> Box<dyn TimelineWriter + 'a> {
Box::new(LayeredTimelineWriter {
tl: self,

View File

@@ -134,6 +134,7 @@ pub trait Timeline: Send + Sync {
fn get_last_record_lsn(&self) -> Lsn;
fn get_prev_record_lsn(&self) -> Lsn;
fn get_start_lsn(&self) -> Lsn;
fn get_disk_consistent_lsn(&self) -> Lsn;
/// Mutate the timeline with a [`TimelineWriter`].
fn writer<'a>(&'a self) -> Box<dyn TimelineWriter + 'a>;

View File

@@ -284,12 +284,14 @@ fn walreceiver_main(
if let Some(last_lsn) = status_update {
// TODO: More thought should go into what values are sent here.
let last_lsn = PgLsn::from(u64::from(last_lsn));
let write_lsn = last_lsn;
// We are using disk consistent LSN as `write_lsn`, i.e. LSN at which page server
// may guarantee persistence of all received data. Safekeeper is not free to remove
// WAL preceding `write_lsn`: it should not be requested by this page server.
let write_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn()));
let flush_lsn = last_lsn;
let apply_lsn = PgLsn::from(0);
let ts = SystemTime::now();
const NO_REPLY: u8 = 0;
physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?;
}

View File

@@ -2,3 +2,4 @@
minversion = 6.0
log_format = %(asctime)s.%(msecs)-3d %(levelname)s [%(filename)s:%(lineno)d] %(message)s
log_date_format = %Y-%m-%d %H:%M:%S
log_cli = true

View File

@@ -2,7 +2,7 @@
//! with the "START_REPLICATION" message.
use crate::send_wal::SendWalHandler;
use crate::timeline::{Timeline, TimelineTools};
use crate::timeline::{ReplicaState, Timeline, TimelineTools};
use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use log::*;
@@ -20,7 +20,7 @@ use std::{str, thread};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, FeMessage, XLogDataBody};
use zenith_utils::pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody};
use zenith_utils::sock_split::ReadStream;
pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX;
@@ -32,7 +32,7 @@ const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
type FullTransactionId = u64;
/// Hot standby feedback received from replica
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub struct HotStandbyFeedback {
pub ts: TimestampTz,
pub xmin: FullTransactionId,
@@ -49,6 +49,16 @@ impl HotStandbyFeedback {
}
}
/// Standby status update
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StandbyReply {
pub write_lsn: Lsn, // disk consistent lSN
pub flush_lsn: Lsn, // LSN committedby quorum
pub apply_lsn: Lsn, // not used
pub reply_ts: TimestampTz,
pub reply_requested: bool,
}
/// A network connection that's speaking the replication protocol.
pub struct ReplicationConn {
/// This is an `Option` because we will spawn a background thread that will
@@ -56,16 +66,15 @@ pub struct ReplicationConn {
stream_in: Option<ReadStream>,
}
// TODO: move this to crate::timeline when there's more users
// TODO: design a proper Timeline mock api
trait HsFeedbackSubscriber {
fn add_hs_feedback(&self, _feedback: HotStandbyFeedback) {}
/// Scope guard to unregister replication connection from timeline
struct ReplicationConnGuard {
replica: usize, // replica internal ID assigned by timeline
timeline: Arc<Timeline>,
}
impl HsFeedbackSubscriber for Arc<Timeline> {
#[inline(always)]
fn add_hs_feedback(&self, feedback: HotStandbyFeedback) {
Timeline::add_hs_feedback(self, feedback);
impl Drop for ReplicationConnGuard {
fn drop(&mut self) {
self.timeline.update_replica_state(self.replica, None);
}
}
@@ -79,26 +88,33 @@ impl ReplicationConn {
/// Handle incoming messages from the network.
/// This is spawned into the background by `handle_start_replication`.
fn background_thread(
mut stream_in: impl Read,
subscriber: impl HsFeedbackSubscriber,
) -> Result<()> {
fn background_thread(mut stream_in: impl Read, timeline: Arc<Timeline>) -> Result<()> {
let mut state = ReplicaState::new();
let replica = timeline.add_replica(state);
let _guard = ReplicationConnGuard {
replica,
timeline: timeline.clone(),
};
// Wait for replica's feedback.
while let Some(msg) = FeMessage::read(&mut stream_in)? {
match &msg {
FeMessage::CopyData(m) => {
// There's two possible data messages that the client is supposed to send here:
// `HotStandbyFeedback` and `StandbyStatusUpdate`. We only handle hot standby
// feedback.
// `HotStandbyFeedback` and `StandbyStatusUpdate`.
match m.first().cloned() {
Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
// Note: deserializing is on m[1..] because we skip the tag byte.
let feedback = HotStandbyFeedback::des(&m[1..])
state.hs_feedback = HotStandbyFeedback::des(&m[1..])
.context("failed to deserialize HotStandbyFeedback")?;
subscriber.add_hs_feedback(feedback);
timeline.update_replica_state(replica, Some(state));
}
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
let reply = StandbyReply::des(&m[1..])
.context("failed to deserialize StandbyReply")?;
state.disk_consistent_lsn = reply.write_lsn;
timeline.update_replica_state(replica, Some(state));
}
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => (),
_ => warn!("unexpected message {:?}", msg),
}
}
@@ -187,7 +203,7 @@ impl ReplicationConn {
// switch to copy
pgb.write_message(&BeMessage::CopyBothResponse)?;
let mut end_pos: Lsn;
let mut end_pos = Lsn(0);
let mut wal_file: Option<File> = None;
loop {
@@ -202,7 +218,18 @@ impl ReplicationConn {
} else {
/* normal mode */
let timeline = swh.timeline.get();
end_pos = timeline.wait_for_lsn(start_pos);
if let Some(lsn) = timeline.wait_for_lsn(start_pos) {
end_pos = lsn
} else {
// timeout expired: request pageserver status
pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
sent_ptr: end_pos.0,
timestamp: get_current_timestamp(),
request_reply: true,
}))
.context("Failed to send KeepAlive message")?;
continue;
}
}
if end_pos == END_REPLICATION_MARKER {
break;
@@ -257,18 +284,3 @@ impl ReplicationConn {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
// A no-op impl for tests
impl HsFeedbackSubscriber for () {}
#[test]
fn test_replication_conn_background_thread_eof() {
// Test that background_thread recognizes EOF
let stream: &[u8] = &[];
ReplicationConn::background_thread(stream, ()).unwrap();
}
}

View File

@@ -191,6 +191,8 @@ pub struct AppendResponse {
// We report back our awareness about which WAL is committed, as this is
// a criterion for walproposer --sync mode exit
pub commit_lsn: Lsn,
// Min disk consistent lsn of pageservers (portion of WAL applied and written to the disk by pageservers)
pub disk_consistent_lsn: Lsn,
pub hs_feedback: HotStandbyFeedback,
}
@@ -458,6 +460,7 @@ where
epoch: self.s.acceptor_state.epoch,
commit_lsn: Lsn(0),
flush_lsn: Lsn(0),
disk_consistent_lsn: Lsn(0),
hs_feedback: HotStandbyFeedback::empty(),
};
return Ok(AcceptorProposerMessage::AppendResponse(resp));
@@ -567,6 +570,7 @@ where
epoch: self.s.acceptor_state.epoch,
flush_lsn: self.flush_lsn,
commit_lsn: self.s.commit_lsn,
disk_consistent_lsn: Lsn(0),
// will be filled by caller code to avoid bothering safekeeper
hs_feedback: HotStandbyFeedback::empty(),
};

View File

@@ -11,9 +11,9 @@ use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::{Seek, SeekFrom, Write};
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use zenith_utils::bin_ser::LeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use crate::replication::{HotStandbyFeedback, END_REPLICATION_MARKER};
@@ -25,6 +25,35 @@ use crate::WalAcceptorConf;
use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ};
const CONTROL_FILE_NAME: &str = "safekeeper.control";
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
/// Replica status: host standby feedback + disk consistent lsn
#[derive(Debug, Clone, Copy)]
pub struct ReplicaState {
/// combined disk_consistent_lsn of pageservers
pub disk_consistent_lsn: Lsn,
/// combined hot standby feedback from all replicas
pub hs_feedback: HotStandbyFeedback,
}
impl Default for ReplicaState {
fn default() -> Self {
Self::new()
}
}
impl ReplicaState {
pub fn new() -> ReplicaState {
ReplicaState {
disk_consistent_lsn: Lsn(u64::MAX),
hs_feedback: HotStandbyFeedback {
ts: 0,
xmin: u64::MAX,
catalog_xmin: u64::MAX,
},
}
}
}
/// Shared state associated with database instance (tenant)
struct SharedState {
@@ -33,8 +62,8 @@ struct SharedState {
/// For receiving-sending wal cooperation
/// quorum commit LSN we've notified walsenders about
notified_commit_lsn: Lsn,
/// combined hot standby feedback from all replicas
hs_feedback: HotStandbyFeedback,
/// State of replicas
replicas: Vec<Option<ReplicaState>>,
}
// A named boolean.
@@ -45,6 +74,31 @@ pub enum CreateControlFile {
}
impl SharedState {
/// Get combined stateof all alive replicas
pub fn get_replicas_state(&self) -> ReplicaState {
let mut acc = ReplicaState::new();
for state in self.replicas.iter().flatten() {
acc.hs_feedback.ts = max(acc.hs_feedback.ts, state.hs_feedback.ts);
acc.hs_feedback.xmin = min(acc.hs_feedback.xmin, state.hs_feedback.xmin);
acc.hs_feedback.catalog_xmin =
min(acc.hs_feedback.catalog_xmin, state.hs_feedback.catalog_xmin);
acc.disk_consistent_lsn = Lsn::min(acc.disk_consistent_lsn, state.disk_consistent_lsn);
}
acc
}
/// Assign new replica ID. We choose first empty cell in the replicas vector
/// or extend the vector if there are not free items.
pub fn add_replica(&mut self, state: ReplicaState) -> usize {
if let Some(pos) = self.replicas.iter().position(|r| r.is_none()) {
self.replicas[pos] = Some(state);
return pos;
}
let pos = self.replicas.len();
self.replicas.push(Some(state));
pos
}
/// Restore SharedState from control file. Locks the control file along the
/// way to prevent running more than one instance of safekeeper on the same
/// data dir.
@@ -74,21 +128,10 @@ impl SharedState {
Ok(Self {
notified_commit_lsn: Lsn(0),
sk: SafeKeeper::new(Lsn(flush_lsn), tli, storage, state),
hs_feedback: HotStandbyFeedback {
ts: 0,
xmin: u64::MAX,
catalog_xmin: u64::MAX,
},
replicas: Vec::new(),
})
}
/// Accumulate hot standby feedbacks from replicas
pub fn add_hs_feedback(&mut self, feedback: HotStandbyFeedback) {
self.hs_feedback.xmin = min(self.hs_feedback.xmin, feedback.xmin);
self.hs_feedback.catalog_xmin = min(self.hs_feedback.catalog_xmin, feedback.catalog_xmin);
self.hs_feedback.ts = max(self.hs_feedback.ts, feedback.ts);
}
/// Fetch and lock control file (prevent running more than one instance of safekeeper)
/// If create=false and file doesn't exist, bails out.
fn load_control_file(
@@ -178,20 +221,27 @@ impl Timeline {
}
}
/// Wait for an LSN to be committed.
/// Timed wait for an LSN to be committed.
///
/// Returns the last committed LSN, which will be at least
/// as high as the LSN waited for.
/// as high as the LSN waited for, or None if timeout expired.
///
pub fn wait_for_lsn(&self, lsn: Lsn) -> Lsn {
pub fn wait_for_lsn(&self, lsn: Lsn) -> Option<Lsn> {
let mut shared_state = self.mutex.lock().unwrap();
loop {
let commit_lsn = shared_state.notified_commit_lsn;
// This must be `>`, not `>=`.
if commit_lsn > lsn {
return commit_lsn;
return Some(commit_lsn);
}
shared_state = self.cond.wait(shared_state).unwrap();
let result = self
.cond
.wait_timeout(shared_state, POLL_STATE_TIMEOUT)
.unwrap();
if result.1.timed_out() {
return None;
}
shared_state = result.0
}
}
@@ -219,9 +269,11 @@ impl Timeline {
// commit_lsn if we are catching up safekeeper.
commit_lsn = shared_state.sk.commit_lsn;
// if this is AppendResponse, fill in proper hot standby feedback
// if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn
if let AcceptorProposerMessage::AppendResponse(ref mut resp) = rmsg {
resp.hs_feedback = shared_state.hs_feedback.clone();
let state = shared_state.get_replicas_state();
resp.hs_feedback = state.hs_feedback;
resp.disk_consistent_lsn = state.disk_consistent_lsn;
}
}
// Ping wal sender that new data might be available.
@@ -233,15 +285,14 @@ impl Timeline {
self.mutex.lock().unwrap().sk.s.clone()
}
// Accumulate hot standby feedbacks from replicas
pub fn add_hs_feedback(&self, feedback: HotStandbyFeedback) {
pub fn add_replica(&self, state: ReplicaState) -> usize {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.add_hs_feedback(feedback);
shared_state.add_replica(state)
}
pub fn get_hs_feedback(&self) -> HotStandbyFeedback {
let shared_state = self.mutex.lock().unwrap();
shared_state.hs_feedback.clone()
pub fn update_replica_state(&self, id: usize, state: Option<ReplicaState>) {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.replicas[id] = state;
}
pub fn get_end_of_wal(&self) -> (Lsn, u32) {

View File

@@ -358,6 +358,7 @@ pub enum BeMessage<'a> {
RowDescription(&'a [RowDescriptor<'a>]),
XLogData(XLogDataBody<'a>),
NoticeResponse(String),
KeepAlive(WalSndKeepAlive),
}
// One row desciption in RowDescription packet.
@@ -409,6 +410,13 @@ pub struct XLogDataBody<'a> {
pub data: &'a [u8],
}
#[derive(Debug)]
pub struct WalSndKeepAlive {
pub sent_ptr: u64,
pub timestamp: i64,
pub request_reply: bool,
}
pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]);
// single text column
@@ -721,6 +729,18 @@ impl<'a> BeMessage<'a> {
})
.unwrap();
}
BeMessage::KeepAlive(req) => {
buf.put_u8(b'd');
write_body(buf, |buf| {
buf.put_u8(b'k');
buf.put_u64(req.sent_ptr);
buf.put_i64(req.timestamp);
buf.put_u8(if req.request_reply { 1u8 } else { 0u8 });
Ok::<_, io::Error>(())
})
.unwrap();
}
}
Ok(())
}