Forward all backpressure feedback to compute (#7079)

Previously we aggregated ps_feedback on each safekeeper and sent it to
walproposer with every AppendResponse. This PR changes it to send
ps_feedback to walproposer right after receiving it from pageserver,
without aggregating it in memory. Also contains some preparations for
implementing backpressure support for sharding.
This commit is contained in:
Arthur Petukhovsky
2024-03-12 13:14:02 +01:00
committed by GitHub
parent 09699d4bd8
commit 580e136b2e
6 changed files with 172 additions and 107 deletions

View File

@@ -123,6 +123,12 @@ impl PageserverFeedback {
rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
}
}
b"shard_number" => {
let len = buf.get_i32();
// TODO: this will be implemented in the next update,
// for now, we just skip the value.
buf.advance(len as usize);
}
_ => {
let len = buf.get_i32();
warn!(

View File

@@ -140,6 +140,13 @@ pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
)
.expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
});
pub static RECEIVED_PS_FEEDBACKS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_received_ps_feedbacks_total",
"Number of pageserver feedbacks received"
)
.expect("Failed to register safekeeper_received_ps_feedbacks_total counter")
});
pub const LABEL_UNKNOWN: &str = "unknown";
@@ -301,7 +308,8 @@ pub async fn time_io_closure<E: Into<anyhow::Error>>(
#[derive(Clone)]
pub struct FullTimelineInfo {
pub ttid: TenantTimelineId,
pub ps_feedback: PageserverFeedback,
pub ps_feedback_count: u64,
pub last_ps_feedback: PageserverFeedback,
pub wal_backup_active: bool,
pub timeline_is_active: bool,
pub num_computes: u32,
@@ -327,6 +335,7 @@ pub struct TimelineCollector {
remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
ps_feedback_count: GenericGaugeVec<AtomicU64>,
timeline_active: GenericGaugeVec<AtomicU64>,
wal_backup_active: GenericGaugeVec<AtomicU64>,
connected_computes: IntGaugeVec,
@@ -430,6 +439,15 @@ impl TimelineCollector {
.unwrap();
descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
let ps_feedback_count = GenericGaugeVec::new(
Opts::new(
"safekeeper_ps_feedback_count_total",
"Number of feedbacks received from the pageserver",
),
&["tenant_id", "timeline_id"],
)
.unwrap();
let timeline_active = GenericGaugeVec::new(
Opts::new(
"safekeeper_timeline_active",
@@ -538,6 +556,7 @@ impl TimelineCollector {
remote_consistent_lsn,
ps_last_received_lsn,
feedback_last_time_seconds,
ps_feedback_count,
timeline_active,
wal_backup_active,
connected_computes,
@@ -570,6 +589,7 @@ impl Collector for TimelineCollector {
self.remote_consistent_lsn.reset();
self.ps_last_received_lsn.reset();
self.feedback_last_time_seconds.reset();
self.ps_feedback_count.reset();
self.timeline_active.reset();
self.wal_backup_active.reset();
self.connected_computes.reset();
@@ -646,9 +666,12 @@ impl Collector for TimelineCollector {
self.ps_last_received_lsn
.with_label_values(labels)
.set(tli.ps_feedback.last_received_lsn.0);
.set(tli.last_ps_feedback.last_received_lsn.0);
self.ps_feedback_count
.with_label_values(labels)
.set(tli.ps_feedback_count);
if let Ok(unix_time) = tli
.ps_feedback
.last_ps_feedback
.replytime
.duration_since(SystemTime::UNIX_EPOCH)
{
@@ -679,6 +702,7 @@ impl Collector for TimelineCollector {
mfs.extend(self.remote_consistent_lsn.collect());
mfs.extend(self.ps_last_received_lsn.collect());
mfs.extend(self.feedback_last_time_seconds.collect());
mfs.extend(self.ps_feedback_count.collect());
mfs.extend(self.timeline_active.collect());
mfs.extend(self.wal_backup_active.collect());
mfs.extend(self.connected_computes.collect());

View File

@@ -36,11 +36,15 @@ use tokio::time::Instant;
use tracing::*;
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback;
const DEFAULT_FEEDBACK_CAPACITY: usize = 8;
/// Registry of WalReceivers (compute connections). Timeline holds it (wrapped
/// in Arc).
pub struct WalReceivers {
mutex: Mutex<WalReceiversShared>,
pageserver_feedback_tx: tokio::sync::broadcast::Sender<PageserverFeedback>,
}
/// Id under which walreceiver is registered in shmem.
@@ -48,8 +52,12 @@ type WalReceiverId = usize;
impl WalReceivers {
pub fn new() -> Arc<WalReceivers> {
let (pageserver_feedback_tx, _) =
tokio::sync::broadcast::channel(DEFAULT_FEEDBACK_CAPACITY);
Arc::new(WalReceivers {
mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
pageserver_feedback_tx,
})
}
@@ -116,6 +124,12 @@ impl WalReceivers {
let mut shared = self.mutex.lock();
shared.slots[id] = None;
}
/// Broadcast pageserver feedback to connected walproposers.
pub fn broadcast_pageserver_feedback(&self, feedback: PageserverFeedback) {
// Err means there is no subscribers, it is fine.
let _ = self.pageserver_feedback_tx.send(feedback);
}
}
/// Only a few connections are expected (normally one), so store in Vec.
@@ -197,17 +211,28 @@ impl SafekeeperPostgresHandler {
// sends, so this avoids deadlocks.
let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?;
let peer_addr = *pgb.get_peer_addr();
let network_reader = NetworkReader {
let mut network_reader = NetworkReader {
ttid: self.ttid,
conn_id: self.conn_id,
pgb_reader: &mut pgb_reader,
peer_addr,
acceptor_handle: &mut acceptor_handle,
};
let res = tokio::select! {
// todo: add read|write .context to these errors
r = network_reader.run(msg_tx, msg_rx, reply_tx) => r,
r = network_write(pgb, reply_rx) => r,
// Read first message and create timeline if needed.
let res = network_reader.read_first_message().await;
let res = if let Ok((tli, next_msg)) = res {
let pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback> =
tli.get_walreceivers().pageserver_feedback_tx.subscribe();
tokio::select! {
// todo: add read|write .context to these errors
r = network_reader.run(msg_tx, msg_rx, reply_tx, tli.clone(), next_msg) => r,
r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r,
}
} else {
res.map(|_| ())
};
// Join pg backend back.
@@ -251,12 +276,9 @@ struct NetworkReader<'a, IO> {
}
impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
async fn run(
self,
msg_tx: Sender<ProposerAcceptorMessage>,
msg_rx: Receiver<ProposerAcceptorMessage>,
reply_tx: Sender<AcceptorProposerMessage>,
) -> Result<(), CopyStreamHandlerEnd> {
async fn read_first_message(
&mut self,
) -> Result<(Arc<Timeline>, ProposerAcceptorMessage), CopyStreamHandlerEnd> {
// Receive information about server to create timeline, if not yet.
let next_msg = read_message(self.pgb_reader).await?;
let tli = match next_msg {
@@ -278,9 +300,19 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
)))
}
};
Ok((tli, next_msg))
}
async fn run(
self,
msg_tx: Sender<ProposerAcceptorMessage>,
msg_rx: Receiver<ProposerAcceptorMessage>,
reply_tx: Sender<AcceptorProposerMessage>,
tli: Arc<Timeline>,
next_msg: ProposerAcceptorMessage,
) -> Result<(), CopyStreamHandlerEnd> {
*self.acceptor_handle = Some(WalAcceptor::spawn(
tli.clone(),
tli,
msg_rx,
reply_tx,
Some(self.conn_id),
@@ -320,18 +352,46 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
pgb_writer: &mut PostgresBackend<IO>,
mut reply_rx: Receiver<AcceptorProposerMessage>,
mut pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback>,
) -> Result<(), CopyStreamHandlerEnd> {
let mut buf = BytesMut::with_capacity(128);
// storing append_response to inject PageserverFeedback into it
let mut last_append_response = None;
loop {
match reply_rx.recv().await {
Some(msg) => {
buf.clear();
msg.serialize(&mut buf)?;
pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
// trying to read either AcceptorProposerMessage or PageserverFeedback
let msg = tokio::select! {
reply = reply_rx.recv() => {
if let Some(msg) = reply {
if let AcceptorProposerMessage::AppendResponse(append_response) = &msg {
last_append_response = Some(append_response.clone());
}
Some(msg)
} else {
return Ok(()); // chan closed, WalAcceptor terminated
}
}
None => return Ok(()), // chan closed, WalAcceptor terminated
}
feedback = pageserver_feedback_rx.recv() =>
match (feedback, &last_append_response) {
(Ok(feedback), Some(append_response)) => {
// clone AppendResponse and inject PageserverFeedback into it
let mut append_response = append_response.clone();
append_response.pageserver_feedback = Some(feedback);
Some(AcceptorProposerMessage::AppendResponse(append_response))
}
_ => None,
}
};
let Some(msg) = msg else {
continue;
};
buf.clear();
msg.serialize(&mut buf)?;
pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
}
}

View File

@@ -321,7 +321,7 @@ pub struct AppendRequestHeader {
}
/// Report safekeeper state to proposer
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone)]
pub struct AppendResponse {
// Current term of the safekeeper; if it is higher than proposer's, the
// compute is out of date.
@@ -334,7 +334,7 @@ pub struct AppendResponse {
// a criterion for walproposer --sync mode exit
pub commit_lsn: Lsn,
pub hs_feedback: HotStandbyFeedback,
pub pageserver_feedback: PageserverFeedback,
pub pageserver_feedback: Option<PageserverFeedback>,
}
impl AppendResponse {
@@ -344,7 +344,7 @@ impl AppendResponse {
flush_lsn: Lsn(0),
commit_lsn: Lsn(0),
hs_feedback: HotStandbyFeedback::empty(),
pageserver_feedback: PageserverFeedback::empty(),
pageserver_feedback: None,
}
}
}
@@ -462,7 +462,11 @@ impl AcceptorProposerMessage {
buf.put_u64_le(msg.hs_feedback.xmin);
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
msg.pageserver_feedback.serialize(buf);
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
// if it is not present.
if let Some(ref msg) = msg.pageserver_feedback {
msg.serialize(buf);
}
}
}
@@ -681,7 +685,7 @@ where
commit_lsn: self.state.commit_lsn,
// will be filled by the upper code to avoid bothering safekeeper
hs_feedback: HotStandbyFeedback::empty(),
pageserver_feedback: PageserverFeedback::empty(),
pageserver_feedback: None,
};
trace!("formed AppendResponse {:?}", ar);
ar

View File

@@ -2,6 +2,8 @@
//! with the "START_REPLICATION" message, and registry of walsenders.
use crate::handler::SafekeeperPostgresHandler;
use crate::metrics::RECEIVED_PS_FEEDBACKS;
use crate::receive_wal::WalReceivers;
use crate::safekeeper::{Term, TermLsn};
use crate::timeline::Timeline;
use crate::wal_service::ConnectionId;
@@ -21,7 +23,7 @@ use utils::failpoint_support;
use utils::id::TenantTimelineId;
use utils::pageserver_feedback::PageserverFeedback;
use std::cmp::{max, min};
use std::cmp::min;
use std::net::SocketAddr;
use std::str;
use std::sync::Arc;
@@ -90,12 +92,14 @@ pub struct StandbyFeedback {
/// WalSenders registry. Timeline holds it (wrapped in Arc).
pub struct WalSenders {
mutex: Mutex<WalSendersShared>,
walreceivers: Arc<WalReceivers>,
}
impl WalSenders {
pub fn new() -> Arc<WalSenders> {
pub fn new(walreceivers: Arc<WalReceivers>) -> Arc<WalSenders> {
Arc::new(WalSenders {
mutex: Mutex::new(WalSendersShared::new()),
walreceivers,
})
}
@@ -151,22 +155,29 @@ impl WalSenders {
.min()
}
/// Get aggregated pageserver feedback.
pub fn get_ps_feedback(self: &Arc<WalSenders>) -> PageserverFeedback {
self.mutex.lock().agg_ps_feedback
/// Returns total counter of pageserver feedbacks received and last feedback.
pub fn get_ps_feedback_stats(self: &Arc<WalSenders>) -> (u64, PageserverFeedback) {
let shared = self.mutex.lock();
(shared.ps_feedback_counter, shared.last_ps_feedback)
}
/// Get aggregated pageserver and hot standby feedback (we send them to compute).
pub fn get_feedbacks(self: &Arc<WalSenders>) -> (PageserverFeedback, HotStandbyFeedback) {
let shared = self.mutex.lock();
(shared.agg_ps_feedback, shared.agg_hs_feedback)
/// Get aggregated hot standby feedback (we send it to compute).
pub fn get_hotstandby(self: &Arc<WalSenders>) -> HotStandbyFeedback {
self.mutex.lock().agg_hs_feedback
}
/// Record new pageserver feedback, update aggregated values.
fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
let mut shared = self.mutex.lock();
shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
shared.update_ps_feedback();
shared.last_ps_feedback = *feedback;
shared.ps_feedback_counter += 1;
drop(shared);
RECEIVED_PS_FEEDBACKS.inc();
// send feedback to connected walproposers
self.walreceivers.broadcast_pageserver_feedback(*feedback);
}
/// Record standby reply.
@@ -222,8 +233,10 @@ impl WalSenders {
struct WalSendersShared {
// aggregated over all walsenders value
agg_hs_feedback: HotStandbyFeedback,
// aggregated over all walsenders value
agg_ps_feedback: PageserverFeedback,
// last feedback ever received from any pageserver, empty if none
last_ps_feedback: PageserverFeedback,
// total counter of pageserver feedbacks received
ps_feedback_counter: u64,
slots: Vec<Option<WalSenderState>>,
}
@@ -231,7 +244,8 @@ impl WalSendersShared {
fn new() -> Self {
WalSendersShared {
agg_hs_feedback: HotStandbyFeedback::empty(),
agg_ps_feedback: PageserverFeedback::empty(),
last_ps_feedback: PageserverFeedback::empty(),
ps_feedback_counter: 0,
slots: Vec::new(),
}
}
@@ -276,37 +290,6 @@ impl WalSendersShared {
}
self.agg_hs_feedback = agg;
}
/// Update aggregated pageserver feedback. LSNs (last_received,
/// disk_consistent, remote_consistent) and reply timestamp are just
/// maximized; timeline_size if taken from feedback with highest
/// last_received lsn. This is generally reasonable, but we might want to
/// implement other policies once multiple pageservers start to be actively
/// used.
fn update_ps_feedback(&mut self) {
let init = PageserverFeedback::empty();
let acc =
self.slots
.iter()
.flatten()
.fold(init, |mut acc, ws_state| match ws_state.feedback {
ReplicationFeedback::Pageserver(feedback) => {
if feedback.last_received_lsn > acc.last_received_lsn {
acc.current_timeline_size = feedback.current_timeline_size;
}
acc.last_received_lsn =
max(feedback.last_received_lsn, acc.last_received_lsn);
acc.disk_consistent_lsn =
max(feedback.disk_consistent_lsn, acc.disk_consistent_lsn);
acc.remote_consistent_lsn =
max(feedback.remote_consistent_lsn, acc.remote_consistent_lsn);
acc.replytime = max(feedback.replytime, acc.replytime);
acc
}
ReplicationFeedback::Standby(_) => acc,
});
self.agg_ps_feedback = acc;
}
}
// Serialized is used only for pretty printing in json.
@@ -443,7 +426,7 @@ impl SafekeeperPostgresHandler {
};
let mut reply_reader = ReplyReader {
reader,
ws_guard,
ws_guard: ws_guard.clone(),
tli,
};
@@ -452,6 +435,18 @@ impl SafekeeperPostgresHandler {
r = sender.run() => r,
r = reply_reader.run() => r,
};
let ws_state = ws_guard
.walsenders
.mutex
.lock()
.get_slot(ws_guard.id)
.clone();
info!(
"finished streaming to {}, feedback={:?}",
ws_state.addr, ws_state.feedback,
);
// Join pg backend back.
pgb.unsplit(reply_reader.reader)?;
@@ -733,7 +728,6 @@ async fn wait_for_lsn(
#[cfg(test)]
mod tests {
use postgres_protocol::PG_EPOCH;
use utils::id::{TenantId, TimelineId};
use super::*;
@@ -792,27 +786,4 @@ mod tests {
wss.update_hs_feedback();
assert_eq!(wss.agg_hs_feedback.xmin, 42);
}
// form pageserver feedback with given last_record_lsn / tli size and the
// rest set to dummy values.
fn ps_feedback(current_timeline_size: u64, last_received_lsn: Lsn) -> ReplicationFeedback {
ReplicationFeedback::Pageserver(PageserverFeedback {
current_timeline_size,
last_received_lsn,
disk_consistent_lsn: Lsn::INVALID,
remote_consistent_lsn: Lsn::INVALID,
replytime: *PG_EPOCH,
})
}
// test that ps aggregation works as expected
#[test]
fn test_ps_feedback() {
let mut wss = WalSendersShared::new();
push_feedback(&mut wss, ps_feedback(8, Lsn(42)));
push_feedback(&mut wss, ps_feedback(4, Lsn(84)));
wss.update_ps_feedback();
assert_eq!(wss.agg_ps_feedback.current_timeline_size, 4);
assert_eq!(wss.agg_ps_feedback.last_received_lsn, Lsn(84));
}
}

View File

@@ -402,6 +402,7 @@ impl Timeline {
)));
let (cancellation_tx, cancellation_rx) = watch::channel(false);
let walreceivers = WalReceivers::new();
Ok(Timeline {
ttid,
wal_backup_launcher_tx,
@@ -410,8 +411,8 @@ impl Timeline {
term_flush_lsn_watch_tx,
term_flush_lsn_watch_rx,
mutex: Mutex::new(shared_state),
walsenders: WalSenders::new(),
walreceivers: WalReceivers::new(),
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
cancellation_rx,
cancellation_tx,
timeline_dir: conf.timeline_dir(&ttid),
@@ -435,6 +436,7 @@ impl Timeline {
let state =
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
let walreceivers = WalReceivers::new();
Ok(Timeline {
ttid,
wal_backup_launcher_tx,
@@ -443,8 +445,8 @@ impl Timeline {
term_flush_lsn_watch_tx,
term_flush_lsn_watch_rx,
mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
walsenders: WalSenders::new(),
walreceivers: WalReceivers::new(),
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
cancellation_rx,
cancellation_tx,
timeline_dir: conf.timeline_dir(&ttid),
@@ -656,12 +658,9 @@ impl Timeline {
let mut shared_state = self.write_shared_state().await;
rmsg = shared_state.sk.process_msg(msg).await?;
// if this is AppendResponse, fill in proper pageserver and hot
// standby feedback.
// if this is AppendResponse, fill in proper hot standby feedback.
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
let (ps_feedback, hs_feedback) = self.walsenders.get_feedbacks();
resp.hs_feedback = hs_feedback;
resp.pageserver_feedback = ps_feedback;
resp.hs_feedback = self.walsenders.get_hotstandby();
}
commit_lsn = shared_state.sk.state.inmem.commit_lsn;
@@ -898,12 +897,13 @@ impl Timeline {
return None;
}
let ps_feedback = self.walsenders.get_ps_feedback();
let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats();
let state = self.write_shared_state().await;
if state.active {
Some(FullTimelineInfo {
ttid: self.ttid,
ps_feedback,
ps_feedback_count,
last_ps_feedback,
wal_backup_active: state.wal_backup_active,
timeline_is_active: state.active,
num_computes: self.walreceivers.get_num() as u32,