mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 12:00:42 +00:00
Move tracking of walsenders out of Timeline.
Refactors walsenders out of timeline.rs to makes it less convoluted into separate WalSenders with its own lock, but otherwise having the same structure. Tracking of in-memory remote_consistent_lsn is also moved there as it is mainly received from pageserver. State of walsender (feedback) is also restructured to be cleaner; now it is either PageserverFeedback or StandbyFeedback(StandbyReply, HotStandbyFeedback), but not both.
This commit is contained in:
@@ -947,9 +947,10 @@ impl<'a> BeMessage<'a> {
|
||||
pub struct PageserverFeedback {
|
||||
/// Last known size of the timeline. Used to enforce timeline size limit.
|
||||
pub current_timeline_size: u64,
|
||||
/// LSN last received and ingested by the pageserver.
|
||||
/// LSN last received and ingested by the pageserver. Controls backpressure.
|
||||
pub last_received_lsn: u64,
|
||||
/// LSN up to which data is persisted by the pageserver to its local disc.
|
||||
/// Controls backpressure.
|
||||
pub disk_consistent_lsn: u64,
|
||||
/// LSN up to which data is persisted by the pageserver on s3; safekeepers
|
||||
/// consider WAL before it can be removed.
|
||||
@@ -968,7 +969,7 @@ impl PageserverFeedback {
|
||||
last_received_lsn: 0,
|
||||
remote_consistent_lsn: 0,
|
||||
disk_consistent_lsn: 0,
|
||||
replytime: SystemTime::now(),
|
||||
replytime: *PG_EPOCH,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -91,7 +91,7 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
|
||||
// connection to the broker.
|
||||
|
||||
// note: there are blocking operations below, but it's considered fine for now
|
||||
tli.record_safekeeper_info(&msg).await?
|
||||
tli.record_safekeeper_info(msg).await?
|
||||
}
|
||||
}
|
||||
bail!("end of stream");
|
||||
|
||||
@@ -22,7 +22,7 @@ use crate::safekeeper::SafekeeperMemState;
|
||||
use crate::safekeeper::TermHistory;
|
||||
use crate::SafeKeeperConf;
|
||||
|
||||
use crate::timeline::ReplicaState;
|
||||
use crate::send_wal::WalSenderState;
|
||||
use crate::GlobalTimelines;
|
||||
|
||||
/// Various filters that influence the resulting JSON output.
|
||||
@@ -87,7 +87,7 @@ pub struct Timeline {
|
||||
pub struct Memory {
|
||||
pub is_cancelled: bool,
|
||||
pub peers_info_len: usize,
|
||||
pub replicas: Vec<Option<ReplicaState>>,
|
||||
pub walsenders: Vec<WalSenderState>,
|
||||
pub wal_backup_active: bool,
|
||||
pub active: bool,
|
||||
pub num_computes: u32,
|
||||
|
||||
@@ -144,7 +144,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
commit_lsn: inmem.commit_lsn,
|
||||
backup_lsn: inmem.backup_lsn,
|
||||
peer_horizon_lsn: inmem.peer_horizon_lsn,
|
||||
remote_consistent_lsn: inmem.remote_consistent_lsn,
|
||||
remote_consistent_lsn: tli.get_walsenders().get_remote_consistent_lsn(),
|
||||
};
|
||||
json_response(StatusCode::OK, status)
|
||||
}
|
||||
@@ -246,7 +246,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
};
|
||||
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
tli.record_safekeeper_info(&proto_sk_info)
|
||||
tli.record_safekeeper_info(proto_sk_info)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
|
||||
@@ -15,11 +15,11 @@ use metrics::{
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use postgres_ffi::XLogSegNo;
|
||||
use pq_proto::PageserverFeedback;
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
use crate::{
|
||||
safekeeper::{SafeKeeperState, SafekeeperMemState},
|
||||
timeline::ReplicaState,
|
||||
GlobalTimelines,
|
||||
};
|
||||
|
||||
@@ -231,7 +231,7 @@ pub fn time_io_closure(closure: impl FnOnce() -> Result<()>) -> Result<f64> {
|
||||
/// Metrics for a single timeline.
|
||||
pub struct FullTimelineInfo {
|
||||
pub ttid: TenantTimelineId,
|
||||
pub replicas: Vec<ReplicaState>,
|
||||
pub ps_feedback: PageserverFeedback,
|
||||
pub wal_backup_active: bool,
|
||||
pub timeline_is_active: bool,
|
||||
pub num_computes: u32,
|
||||
@@ -242,6 +242,7 @@ pub struct FullTimelineInfo {
|
||||
pub persisted_state: SafeKeeperState,
|
||||
|
||||
pub flush_lsn: Lsn,
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
|
||||
pub wal_storage: WalStorageMetrics,
|
||||
}
|
||||
@@ -514,19 +515,6 @@ impl Collector for TimelineCollector {
|
||||
let timeline_id = tli.ttid.timeline_id.to_string();
|
||||
let labels = &[tenant_id.as_str(), timeline_id.as_str()];
|
||||
|
||||
let mut most_advanced: Option<pq_proto::PageserverFeedback> = None;
|
||||
for replica in tli.replicas.iter() {
|
||||
if let Some(replica_feedback) = replica.pageserver_feedback {
|
||||
if let Some(current) = most_advanced {
|
||||
if current.last_received_lsn < replica_feedback.last_received_lsn {
|
||||
most_advanced = Some(replica_feedback);
|
||||
}
|
||||
} else {
|
||||
most_advanced = Some(replica_feedback);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.commit_lsn
|
||||
.with_label_values(labels)
|
||||
.set(tli.mem_state.commit_lsn.into());
|
||||
@@ -544,7 +532,7 @@ impl Collector for TimelineCollector {
|
||||
.set(tli.mem_state.peer_horizon_lsn.into());
|
||||
self.remote_consistent_lsn
|
||||
.with_label_values(labels)
|
||||
.set(tli.mem_state.remote_consistent_lsn.into());
|
||||
.set(tli.remote_consistent_lsn.into());
|
||||
self.timeline_active
|
||||
.with_label_values(labels)
|
||||
.set(tli.timeline_is_active as u64);
|
||||
@@ -567,15 +555,17 @@ impl Collector for TimelineCollector {
|
||||
.with_label_values(labels)
|
||||
.set(tli.wal_storage.flush_wal_seconds);
|
||||
|
||||
if let Some(feedback) = most_advanced {
|
||||
self.ps_last_received_lsn
|
||||
self.ps_last_received_lsn
|
||||
.with_label_values(labels)
|
||||
.set(tli.ps_feedback.last_received_lsn);
|
||||
if let Ok(unix_time) = tli
|
||||
.ps_feedback
|
||||
.replytime
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
{
|
||||
self.feedback_last_time_seconds
|
||||
.with_label_values(labels)
|
||||
.set(feedback.last_received_lsn);
|
||||
if let Ok(unix_time) = feedback.replytime.duration_since(SystemTime::UNIX_EPOCH) {
|
||||
self.feedback_last_time_seconds
|
||||
.with_label_values(labels)
|
||||
.set(unix_time.as_secs());
|
||||
}
|
||||
.set(unix_time.as_secs());
|
||||
}
|
||||
|
||||
if tli.last_removed_segno != 0 {
|
||||
|
||||
@@ -212,7 +212,6 @@ pub struct SafekeeperMemState {
|
||||
pub commit_lsn: Lsn,
|
||||
pub backup_lsn: Lsn,
|
||||
pub peer_horizon_lsn: Lsn,
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
#[serde(with = "hex")]
|
||||
pub proposer_uuid: PgUuid,
|
||||
}
|
||||
@@ -540,7 +539,6 @@ where
|
||||
commit_lsn: state.commit_lsn,
|
||||
backup_lsn: state.backup_lsn,
|
||||
peer_horizon_lsn: state.peer_horizon_lsn,
|
||||
remote_consistent_lsn: state.remote_consistent_lsn,
|
||||
proposer_uuid: state.proposer_uuid,
|
||||
},
|
||||
state,
|
||||
@@ -781,10 +779,6 @@ where
|
||||
|
||||
// Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
|
||||
self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn);
|
||||
// Initializing remote_consistent_lsn sets that we have nothing to
|
||||
// stream to pageserver(s) immediately after creation.
|
||||
self.inmem.remote_consistent_lsn =
|
||||
max(self.inmem.remote_consistent_lsn, state.timeline_start_lsn);
|
||||
|
||||
state.acceptor_state.term_history = msg.term_history.clone();
|
||||
self.persist_control_file(state)?;
|
||||
@@ -837,7 +831,6 @@ where
|
||||
state.commit_lsn = self.inmem.commit_lsn;
|
||||
state.backup_lsn = self.inmem.backup_lsn;
|
||||
state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
|
||||
state.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
|
||||
state.proposer_uuid = self.inmem.proposer_uuid;
|
||||
self.state.persist(&state)
|
||||
}
|
||||
@@ -940,14 +933,12 @@ where
|
||||
self.state.backup_lsn + (self.state.server.wal_seg_size as u64) < new_backup_lsn;
|
||||
self.inmem.backup_lsn = new_backup_lsn;
|
||||
|
||||
let new_remote_consistent_lsn = max(
|
||||
Lsn(sk_info.remote_consistent_lsn),
|
||||
self.inmem.remote_consistent_lsn,
|
||||
);
|
||||
// value in sk_info should be maximized over our local in memory value.
|
||||
let new_remote_consistent_lsn = Lsn(sk_info.remote_consistent_lsn);
|
||||
assert!(self.state.remote_consistent_lsn <= new_remote_consistent_lsn);
|
||||
sync_control_file |= self.state.remote_consistent_lsn
|
||||
+ (self.state.server.wal_seg_size as u64)
|
||||
< new_remote_consistent_lsn;
|
||||
self.inmem.remote_consistent_lsn = new_remote_consistent_lsn;
|
||||
|
||||
let new_peer_horizon_lsn = max(Lsn(sk_info.peer_horizon_lsn), self.inmem.peer_horizon_lsn);
|
||||
sync_control_file |= self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
|
||||
@@ -955,7 +946,12 @@ where
|
||||
self.inmem.peer_horizon_lsn = new_peer_horizon_lsn;
|
||||
|
||||
if sync_control_file {
|
||||
self.persist_control_file(self.state.clone())?;
|
||||
let mut state = self.state.clone();
|
||||
// Note: we do not persist remote_consistent_lsn in other paths of
|
||||
// persisting cf -- that is not much needed currently. We could do
|
||||
// that by storing Arc to walsenders in Safekeeper.
|
||||
state.remote_consistent_lsn = new_remote_consistent_lsn;
|
||||
self.persist_control_file(state)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
//! This module implements the streaming side of replication protocol, starting
|
||||
//! with the "START_REPLICATION" message.
|
||||
//! with the "START_REPLICATION" message, and registry of walsenders.
|
||||
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::timeline::{ReplicaState, Timeline};
|
||||
use crate::timeline::Timeline;
|
||||
use crate::wal_service::ConnectionId;
|
||||
use crate::wal_storage::WalReader;
|
||||
use crate::GlobalTimelines;
|
||||
use anyhow::Context as AnyhowContext;
|
||||
use bytes::Bytes;
|
||||
use parking_lot::Mutex;
|
||||
use postgres_backend::PostgresBackend;
|
||||
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
|
||||
use postgres_ffi::get_current_timestamp;
|
||||
@@ -14,8 +16,12 @@ use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
|
||||
use pq_proto::{BeMessage, PageserverFeedback, WalSndKeepAlive, XLogDataBody};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use utils::http::json::display_serialize;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::lsn::AtomicLsn;
|
||||
|
||||
use std::cmp::min;
|
||||
use std::cmp::{max, min};
|
||||
use std::net::SocketAddr;
|
||||
use std::str;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -40,6 +46,8 @@ pub struct HotStandbyFeedback {
|
||||
pub catalog_xmin: FullTransactionId,
|
||||
}
|
||||
|
||||
const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
|
||||
|
||||
impl HotStandbyFeedback {
|
||||
pub fn empty() -> HotStandbyFeedback {
|
||||
HotStandbyFeedback {
|
||||
@@ -51,24 +59,293 @@ impl HotStandbyFeedback {
|
||||
}
|
||||
|
||||
/// Standby status update
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
pub struct StandbyReply {
|
||||
pub write_lsn: Lsn, // last lsn received by pageserver
|
||||
pub flush_lsn: Lsn, // pageserver's disk consistent lSN
|
||||
pub apply_lsn: Lsn, // pageserver's remote consistent lSN
|
||||
pub reply_ts: TimestampTz,
|
||||
pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
|
||||
pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
|
||||
pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
|
||||
pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
|
||||
pub reply_requested: bool,
|
||||
}
|
||||
|
||||
/// Scope guard to unregister replication connection from timeline
|
||||
struct ReplicationConnGuard {
|
||||
replica: usize, // replica internal ID assigned by timeline
|
||||
timeline: Arc<Timeline>,
|
||||
impl StandbyReply {
|
||||
fn empty() -> Self {
|
||||
StandbyReply {
|
||||
write_lsn: Lsn::INVALID,
|
||||
flush_lsn: Lsn::INVALID,
|
||||
apply_lsn: Lsn::INVALID,
|
||||
reply_ts: 0,
|
||||
reply_requested: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ReplicationConnGuard {
|
||||
#[derive(Debug, Clone, Copy, Serialize)]
|
||||
pub struct StandbyFeedback {
|
||||
reply: StandbyReply,
|
||||
hs_feedback: HotStandbyFeedback,
|
||||
}
|
||||
|
||||
/// WalSenders registry. Timeline holds it (wrapped in Arc).
|
||||
pub struct WalSenders {
|
||||
/// Lsn maximized over all walsenders *and* peer data, so might be higher
|
||||
/// than what we receive from replicas.
|
||||
remote_consistent_lsn: AtomicLsn,
|
||||
mutex: Mutex<WalSendersShared>,
|
||||
}
|
||||
|
||||
impl WalSenders {
|
||||
pub fn new(remote_consistent_lsn: Lsn) -> Arc<WalSenders> {
|
||||
Arc::new(WalSenders {
|
||||
remote_consistent_lsn: AtomicLsn::from(remote_consistent_lsn),
|
||||
mutex: Mutex::new(WalSendersShared::new()),
|
||||
})
|
||||
}
|
||||
|
||||
/// Register new walsender. Returned guard provides access to the slot and
|
||||
/// automatically deregisters in Drop.
|
||||
fn register(
|
||||
self: &Arc<WalSenders>,
|
||||
ttid: TenantTimelineId,
|
||||
addr: SocketAddr,
|
||||
conn_id: ConnectionId,
|
||||
appname: Option<String>,
|
||||
) -> WalSenderGuard {
|
||||
let slots = &mut self.mutex.lock().slots;
|
||||
let walsender_state = WalSenderState {
|
||||
ttid,
|
||||
addr,
|
||||
conn_id,
|
||||
appname,
|
||||
feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
|
||||
};
|
||||
// find empty slot or create new one
|
||||
let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
|
||||
slots[pos] = Some(walsender_state);
|
||||
pos
|
||||
} else {
|
||||
let pos = slots.len();
|
||||
slots.push(Some(walsender_state));
|
||||
pos
|
||||
};
|
||||
WalSenderGuard {
|
||||
id: pos,
|
||||
walsenders: self.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get state of all walsenders.
|
||||
pub fn get_all(self: &Arc<WalSenders>) -> Vec<WalSenderState> {
|
||||
self.mutex.lock().slots.iter().flatten().cloned().collect()
|
||||
}
|
||||
|
||||
/// Get aggregated pageserver feedback.
|
||||
pub fn get_ps_feedback(self: &Arc<WalSenders>) -> PageserverFeedback {
|
||||
self.mutex.lock().agg_ps_feedback
|
||||
}
|
||||
|
||||
/// Get aggregated pageserver and hot standby feedback (we send them to compute).
|
||||
pub fn get_feedbacks(self: &Arc<WalSenders>) -> (PageserverFeedback, HotStandbyFeedback) {
|
||||
let shared = self.mutex.lock();
|
||||
(shared.agg_ps_feedback, shared.agg_hs_feedback)
|
||||
}
|
||||
|
||||
/// Record new pageserver feedback, update aggregated values.
|
||||
fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
|
||||
let mut shared = self.mutex.lock();
|
||||
shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
|
||||
shared.update_ps_feedback();
|
||||
self.update_remote_consistent_lsn(Lsn(shared.agg_ps_feedback.remote_consistent_lsn));
|
||||
}
|
||||
|
||||
/// Record standby reply.
|
||||
fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
|
||||
let mut shared = self.mutex.lock();
|
||||
let slot = shared.get_slot_mut(id);
|
||||
match &mut slot.feedback {
|
||||
ReplicationFeedback::Standby(sf) => sf.reply = *reply,
|
||||
ReplicationFeedback::Pageserver(_) => {
|
||||
slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
|
||||
reply: *reply,
|
||||
hs_feedback: HotStandbyFeedback::empty(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Record hot standby feedback, update aggregated value.
|
||||
fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
|
||||
let mut shared = self.mutex.lock();
|
||||
let slot = shared.get_slot_mut(id);
|
||||
match &mut slot.feedback {
|
||||
ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
|
||||
ReplicationFeedback::Pageserver(_) => {
|
||||
slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
|
||||
reply: StandbyReply::empty(),
|
||||
hs_feedback: *feedback,
|
||||
})
|
||||
}
|
||||
}
|
||||
shared.update_hs_feedback();
|
||||
}
|
||||
|
||||
/// Get remote_consistent_lsn reported by the pageserver. Returns None if
|
||||
/// client is not pageserver.
|
||||
fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
|
||||
let shared = self.mutex.lock();
|
||||
let slot = shared.get_slot(id);
|
||||
match slot.feedback {
|
||||
ReplicationFeedback::Pageserver(feedback) => Some(Lsn(feedback.remote_consistent_lsn)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get remote_consistent_lsn maximized across all walsenders and peers.
|
||||
pub fn get_remote_consistent_lsn(self: &Arc<WalSenders>) -> Lsn {
|
||||
self.remote_consistent_lsn.load()
|
||||
}
|
||||
|
||||
/// Update maximized remote_consistent_lsn, return new (potentially) value.
|
||||
pub fn update_remote_consistent_lsn(self: &Arc<WalSenders>, candidate: Lsn) -> Lsn {
|
||||
self.remote_consistent_lsn
|
||||
.fetch_max(candidate)
|
||||
.max(candidate)
|
||||
}
|
||||
|
||||
/// Unregister walsender.
|
||||
fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
|
||||
let mut shared = self.mutex.lock();
|
||||
shared.slots[id] = None;
|
||||
shared.update_hs_feedback();
|
||||
}
|
||||
}
|
||||
|
||||
struct WalSendersShared {
|
||||
// aggregated over all walsenders value
|
||||
agg_hs_feedback: HotStandbyFeedback,
|
||||
// aggregated over all walsenders value
|
||||
agg_ps_feedback: PageserverFeedback,
|
||||
slots: Vec<Option<WalSenderState>>,
|
||||
}
|
||||
|
||||
impl WalSendersShared {
|
||||
fn new() -> Self {
|
||||
WalSendersShared {
|
||||
agg_hs_feedback: HotStandbyFeedback::empty(),
|
||||
agg_ps_feedback: PageserverFeedback::empty(),
|
||||
slots: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get content of provided id slot, it must exist.
|
||||
fn get_slot(&self, id: WalSenderId) -> &WalSenderState {
|
||||
self.slots[id].as_ref().expect("walsender doesn't exist")
|
||||
}
|
||||
|
||||
/// Get mut content of provided id slot, it must exist.
|
||||
fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState {
|
||||
self.slots[id].as_mut().expect("walsender doesn't exist")
|
||||
}
|
||||
|
||||
/// Update aggregated hot standy feedback. We just take min of valid xmins
|
||||
/// and ts.
|
||||
fn update_hs_feedback(&mut self) {
|
||||
let mut agg = HotStandbyFeedback::empty();
|
||||
for ws_state in self.slots.iter().flatten() {
|
||||
if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback {
|
||||
let hs_feedback = standby_feedback.hs_feedback;
|
||||
// doing Option math like op1.iter().chain(op2.iter()).min()
|
||||
// would be nicer, but we serialize/deserialize this struct
|
||||
// directly, so leave as is for now
|
||||
if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID {
|
||||
if agg.xmin != INVALID_FULL_TRANSACTION_ID {
|
||||
agg.xmin = min(agg.xmin, hs_feedback.xmin);
|
||||
} else {
|
||||
agg.xmin = hs_feedback.xmin;
|
||||
}
|
||||
agg.ts = min(agg.ts, hs_feedback.ts);
|
||||
}
|
||||
if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
|
||||
if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
|
||||
agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin);
|
||||
} else {
|
||||
agg.catalog_xmin = hs_feedback.catalog_xmin;
|
||||
}
|
||||
agg.ts = min(agg.ts, hs_feedback.ts);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.agg_hs_feedback = agg;
|
||||
}
|
||||
|
||||
/// Update aggregated pageserver feedback. LSNs (last_received,
|
||||
/// disk_consistent, remote_consistent) and reply timestamp are just
|
||||
/// maximized; timeline_size if taken from feedback with highest
|
||||
/// last_received lsn. This is generally reasonable, but we might want to
|
||||
/// implement other policies once multiple pageservers start to be actively
|
||||
/// used.
|
||||
fn update_ps_feedback(&mut self) {
|
||||
let init = PageserverFeedback::empty();
|
||||
let acc =
|
||||
self.slots
|
||||
.iter()
|
||||
.flatten()
|
||||
.fold(init, |mut acc, ws_state| match ws_state.feedback {
|
||||
ReplicationFeedback::Pageserver(feedback) => {
|
||||
if feedback.last_received_lsn > acc.last_received_lsn {
|
||||
acc.current_timeline_size = feedback.current_timeline_size;
|
||||
}
|
||||
acc.last_received_lsn =
|
||||
max(feedback.last_received_lsn, acc.last_received_lsn);
|
||||
acc.disk_consistent_lsn =
|
||||
max(feedback.disk_consistent_lsn, acc.disk_consistent_lsn);
|
||||
acc.remote_consistent_lsn =
|
||||
max(feedback.remote_consistent_lsn, acc.remote_consistent_lsn);
|
||||
acc.replytime = max(feedback.replytime, acc.replytime);
|
||||
acc
|
||||
}
|
||||
ReplicationFeedback::Standby(_) => acc,
|
||||
});
|
||||
self.agg_ps_feedback = acc;
|
||||
}
|
||||
}
|
||||
|
||||
// Serialized is used only for pretty printing in json.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct WalSenderState {
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
ttid: TenantTimelineId,
|
||||
addr: SocketAddr,
|
||||
conn_id: ConnectionId,
|
||||
// postgres application_name
|
||||
appname: Option<String>,
|
||||
feedback: ReplicationFeedback,
|
||||
}
|
||||
|
||||
// Receiver is either pageserver or regular standby, which have different
|
||||
// feedbacks.
|
||||
#[derive(Debug, Clone, Copy, Serialize)]
|
||||
enum ReplicationFeedback {
|
||||
Pageserver(PageserverFeedback),
|
||||
Standby(StandbyFeedback),
|
||||
}
|
||||
|
||||
// id of the occupied slot in WalSenders to access it (and save in the
|
||||
// WalSenderGuard). We could give Arc directly to the slot, but there is not
|
||||
// much sense in that as values aggregation which is performed on each feedback
|
||||
// receival iterates over all walsenders.
|
||||
pub type WalSenderId = usize;
|
||||
|
||||
/// Scope guard to access slot in WalSenders registry and unregister from it in
|
||||
/// Drop.
|
||||
pub struct WalSenderGuard {
|
||||
id: WalSenderId,
|
||||
walsenders: Arc<WalSenders>,
|
||||
}
|
||||
|
||||
impl Drop for WalSenderGuard {
|
||||
fn drop(&mut self) {
|
||||
self.timeline.remove_replica(self.replica);
|
||||
self.walsenders.unregister(self.id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,16 +374,13 @@ impl SafekeeperPostgresHandler {
|
||||
let tli =
|
||||
GlobalTimelines::get(self.ttid).map_err(|e| CopyStreamHandlerEnd::Other(e.into()))?;
|
||||
|
||||
let state = ReplicaState::new();
|
||||
// This replica_id is used below to check if it's time to stop replication.
|
||||
let replica_id = tli.add_replica(state);
|
||||
|
||||
// Use a guard object to remove our entry from the timeline, when the background
|
||||
// thread and us have both finished using it.
|
||||
let _guard = Arc::new(ReplicationConnGuard {
|
||||
replica: replica_id,
|
||||
timeline: tli.clone(),
|
||||
});
|
||||
// Use a guard object to remove our entry from the timeline when we are done.
|
||||
let ws_guard = Arc::new(tli.get_walsenders().register(
|
||||
self.ttid,
|
||||
*pgb.get_peer_addr(),
|
||||
self.conn_id,
|
||||
self.appname.clone(),
|
||||
));
|
||||
|
||||
// Walproposer gets special handling: safekeeper must give proposer all
|
||||
// local WAL till the end, whether committed or not (walproposer will
|
||||
@@ -154,16 +428,11 @@ impl SafekeeperPostgresHandler {
|
||||
end_pos,
|
||||
stop_pos,
|
||||
commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
|
||||
replica_id,
|
||||
ws_guard: ws_guard.clone(),
|
||||
wal_reader,
|
||||
send_buf: [0; MAX_SEND_SIZE],
|
||||
};
|
||||
let mut reply_reader = ReplyReader {
|
||||
reader,
|
||||
tli,
|
||||
replica_id,
|
||||
feedback: ReplicaState::new(),
|
||||
};
|
||||
let mut reply_reader = ReplyReader { reader, ws_guard };
|
||||
|
||||
let res = tokio::select! {
|
||||
// todo: add read|write .context to these errors
|
||||
@@ -190,7 +459,7 @@ struct WalSender<'a, IO> {
|
||||
// in recovery.
|
||||
stop_pos: Option<Lsn>,
|
||||
commit_lsn_watch_rx: Receiver<Lsn>,
|
||||
replica_id: usize,
|
||||
ws_guard: Arc<WalSenderGuard>,
|
||||
wal_reader: WalReader,
|
||||
// buffer for readling WAL into to send it
|
||||
send_buf: [u8; MAX_SEND_SIZE],
|
||||
@@ -264,14 +533,20 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
return Ok(());
|
||||
}
|
||||
// Timed out waiting for WAL, check for termination and send KA
|
||||
if self.tli.should_walsender_stop(self.replica_id) {
|
||||
// Terminate if there is nothing more to send.
|
||||
// TODO close the stream properly
|
||||
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
|
||||
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
|
||||
self.appname, self.start_pos,
|
||||
)));
|
||||
if let Some(remote_consistent_lsn) = self
|
||||
.ws_guard
|
||||
.walsenders
|
||||
.get_ws_remote_consistent_lsn(self.ws_guard.id)
|
||||
{
|
||||
if self.tli.should_walsender_stop(remote_consistent_lsn) {
|
||||
// Terminate if there is nothing more to send.
|
||||
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
|
||||
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
|
||||
self.appname, self.start_pos,
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
self.pgb
|
||||
.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
|
||||
sent_ptr: self.end_pos.0,
|
||||
@@ -286,9 +561,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
/// A half driving receiving replies.
|
||||
struct ReplyReader<IO> {
|
||||
reader: PostgresBackendReader<IO>,
|
||||
tli: Arc<Timeline>,
|
||||
replica_id: usize,
|
||||
feedback: ReplicaState,
|
||||
ws_guard: Arc<WalSenderGuard>,
|
||||
}
|
||||
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
|
||||
@@ -303,29 +576,32 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
|
||||
match msg.first().cloned() {
|
||||
Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
|
||||
// Note: deserializing is on m[1..] because we skip the tag byte.
|
||||
self.feedback.hs_feedback = HotStandbyFeedback::des(&msg[1..])
|
||||
let hs_feedback = HotStandbyFeedback::des(&msg[1..])
|
||||
.context("failed to deserialize HotStandbyFeedback")?;
|
||||
self.tli
|
||||
.update_replica_state(self.replica_id, self.feedback);
|
||||
self.ws_guard
|
||||
.walsenders
|
||||
.record_hs_feedback(self.ws_guard.id, &hs_feedback);
|
||||
}
|
||||
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
|
||||
let _reply =
|
||||
let reply =
|
||||
StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
|
||||
// This must be a regular postgres replica,
|
||||
// because pageserver doesn't send this type of messages to safekeeper.
|
||||
// Currently we just ignore this, tracking progress for them is not supported.
|
||||
self.ws_guard
|
||||
.walsenders
|
||||
.record_standby_reply(self.ws_guard.id, &reply);
|
||||
}
|
||||
Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
|
||||
// pageserver sends this.
|
||||
// Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
|
||||
let buf = Bytes::copy_from_slice(&msg[9..]);
|
||||
let reply = PageserverFeedback::parse(buf);
|
||||
let ps_feedback = PageserverFeedback::parse(buf);
|
||||
|
||||
trace!("PageserverFeedback is {:?}", reply);
|
||||
self.feedback.pageserver_feedback = Some(reply);
|
||||
|
||||
self.tli
|
||||
.update_replica_state(self.replica_id, self.feedback);
|
||||
trace!("PageserverFeedback is {:?}", ps_feedback);
|
||||
self.ws_guard
|
||||
.walsenders
|
||||
.record_ps_feedback(self.ws_guard.id, &ps_feedback);
|
||||
// in principle new remote_consistent_lsn could allow to
|
||||
// deactivate the timeline, but we check that regularly through
|
||||
// broker updated, not need to do it here
|
||||
}
|
||||
_ => warn!("unexpected message {:?}", msg),
|
||||
}
|
||||
@@ -368,3 +644,89 @@ async fn wait_for_lsn(rx: &mut Receiver<Lsn>, lsn: Lsn) -> anyhow::Result<Option
|
||||
Err(_) => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use postgres_protocol::PG_EPOCH;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::*;
|
||||
|
||||
fn mock_ttid() -> TenantTimelineId {
|
||||
TenantTimelineId {
|
||||
tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
|
||||
timeline_id: TimelineId::from_slice(&[0x00; 16]).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
fn mock_addr() -> SocketAddr {
|
||||
"127.0.0.1:8080".parse().unwrap()
|
||||
}
|
||||
|
||||
// add to wss specified feedback setting other fields to dummy values
|
||||
fn push_feedback(wss: &mut WalSendersShared, feedback: ReplicationFeedback) {
|
||||
let walsender_state = WalSenderState {
|
||||
ttid: mock_ttid(),
|
||||
addr: mock_addr(),
|
||||
conn_id: 1,
|
||||
appname: None,
|
||||
feedback,
|
||||
};
|
||||
wss.slots.push(Some(walsender_state))
|
||||
}
|
||||
|
||||
// form standby feedback with given hot standby feedback ts/xmin and the
|
||||
// rest set to dummy values.
|
||||
fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
|
||||
ReplicationFeedback::Standby(StandbyFeedback {
|
||||
reply: StandbyReply::empty(),
|
||||
hs_feedback: HotStandbyFeedback {
|
||||
ts,
|
||||
xmin,
|
||||
catalog_xmin: 0,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// test that hs aggregation works as expected
|
||||
#[test]
|
||||
fn test_hs_feedback_no_valid() {
|
||||
let mut wss = WalSendersShared::new();
|
||||
push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
|
||||
wss.update_hs_feedback();
|
||||
assert_eq!(wss.agg_hs_feedback.xmin, INVALID_FULL_TRANSACTION_ID);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_hs_feedback() {
|
||||
let mut wss = WalSendersShared::new();
|
||||
push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
|
||||
push_feedback(&mut wss, hs_feedback(1, 42));
|
||||
push_feedback(&mut wss, hs_feedback(1, 64));
|
||||
wss.update_hs_feedback();
|
||||
assert_eq!(wss.agg_hs_feedback.xmin, 42);
|
||||
}
|
||||
|
||||
// form pageserver feedback with given last_record_lsn / tli size and the
|
||||
// rest set to dummy values.
|
||||
fn ps_feedback(current_timeline_size: u64, last_received_lsn: Lsn) -> ReplicationFeedback {
|
||||
ReplicationFeedback::Pageserver(PageserverFeedback {
|
||||
current_timeline_size,
|
||||
last_received_lsn,
|
||||
disk_consistent_lsn: Lsn::INVALID,
|
||||
remote_consistent_lsn: Lsn::INVALID,
|
||||
replytime: *PG_EPOCH,
|
||||
})
|
||||
}
|
||||
|
||||
// test that ps aggregation works as expected
|
||||
#[test]
|
||||
fn test_ps_feedback() {
|
||||
let mut wss = WalSendersShared::new();
|
||||
push_feedback(&mut wss, ps_feedback(8, Lsn(42)));
|
||||
push_feedback(&mut wss, ps_feedback(4, Lsn(84)));
|
||||
wss.update_ps_feedback();
|
||||
assert_eq!(wss.agg_ps_feedback.current_timeline_size, 4);
|
||||
assert_eq!(wss.agg_ps_feedback.last_received_lsn, Lsn(84));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,10 +4,10 @@
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use parking_lot::{Mutex, MutexGuard};
|
||||
use postgres_ffi::XLogSegNo;
|
||||
use pq_proto::PageserverFeedback;
|
||||
use serde::Serialize;
|
||||
use std::cmp::{max, min};
|
||||
|
||||
use std::cmp::max;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::{
|
||||
sync::{mpsc::Sender, watch},
|
||||
time::Instant,
|
||||
@@ -26,7 +26,7 @@ use crate::safekeeper::{
|
||||
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
|
||||
SafekeeperMemState, ServerInfo, Term,
|
||||
};
|
||||
use crate::send_wal::HotStandbyFeedback;
|
||||
use crate::send_wal::WalSenders;
|
||||
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
|
||||
|
||||
use crate::metrics::FullTimelineInfo;
|
||||
@@ -81,48 +81,12 @@ impl PeersInfo {
|
||||
}
|
||||
}
|
||||
|
||||
/// Replica status update + hot standby feedback
|
||||
#[derive(Debug, Clone, Copy, Serialize)]
|
||||
pub struct ReplicaState {
|
||||
/// last known lsn received by replica
|
||||
pub last_received_lsn: Lsn, // None means we don't know
|
||||
/// combined remote consistent lsn of pageservers
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
/// combined hot standby feedback from all replicas
|
||||
pub hs_feedback: HotStandbyFeedback,
|
||||
/// Replication specific feedback received from pageserver, if any
|
||||
pub pageserver_feedback: Option<PageserverFeedback>,
|
||||
}
|
||||
|
||||
impl Default for ReplicaState {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl ReplicaState {
|
||||
pub fn new() -> ReplicaState {
|
||||
ReplicaState {
|
||||
last_received_lsn: Lsn::MAX,
|
||||
remote_consistent_lsn: Lsn(0),
|
||||
hs_feedback: HotStandbyFeedback {
|
||||
ts: 0,
|
||||
xmin: u64::MAX,
|
||||
catalog_xmin: u64::MAX,
|
||||
},
|
||||
pageserver_feedback: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared state associated with database instance
|
||||
pub struct SharedState {
|
||||
/// Safekeeper object
|
||||
sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
|
||||
/// In memory list containing state of peers sent in latest messages from them.
|
||||
peers_info: PeersInfo,
|
||||
/// State of replicas
|
||||
replicas: Vec<Option<ReplicaState>>,
|
||||
/// True when WAL backup launcher oversees the timeline, making sure WAL is
|
||||
/// offloaded, allows to bother launcher less.
|
||||
wal_backup_active: bool,
|
||||
@@ -171,7 +135,6 @@ impl SharedState {
|
||||
Ok(Self {
|
||||
sk,
|
||||
peers_info: PeersInfo(vec![]),
|
||||
replicas: vec![],
|
||||
wal_backup_active: false,
|
||||
active: false,
|
||||
num_computes: 0,
|
||||
@@ -191,7 +154,6 @@ impl SharedState {
|
||||
Ok(Self {
|
||||
sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
|
||||
peers_info: PeersInfo(vec![]),
|
||||
replicas: Vec::new(),
|
||||
wal_backup_active: false,
|
||||
active: false,
|
||||
num_computes: 0,
|
||||
@@ -199,17 +161,17 @@ impl SharedState {
|
||||
})
|
||||
}
|
||||
|
||||
fn is_active(&self) -> bool {
|
||||
fn is_active(&self, remote_consistent_lsn: Lsn) -> bool {
|
||||
self.is_wal_backup_required()
|
||||
// FIXME: add tracking of relevant pageservers and check them here individually,
|
||||
// otherwise migration won't work (we suspend too early).
|
||||
|| self.sk.inmem.remote_consistent_lsn < self.sk.inmem.commit_lsn
|
||||
|| remote_consistent_lsn < self.sk.inmem.commit_lsn
|
||||
}
|
||||
|
||||
/// Mark timeline active/inactive and return whether s3 offloading requires
|
||||
/// start/stop action.
|
||||
fn update_status(&mut self, ttid: TenantTimelineId) -> bool {
|
||||
let is_active = self.is_active();
|
||||
fn update_status(&mut self, remote_consistent_lsn: Lsn, ttid: TenantTimelineId) -> bool {
|
||||
let is_active = self.is_active(remote_consistent_lsn);
|
||||
if self.active != is_active {
|
||||
info!("timeline {} active={} now", ttid, is_active);
|
||||
}
|
||||
@@ -254,68 +216,11 @@ impl SharedState {
|
||||
self.sk.state.server.wal_seg_size as usize
|
||||
}
|
||||
|
||||
/// Get combined state of all alive replicas
|
||||
pub fn get_replicas_state(&self) -> ReplicaState {
|
||||
let mut acc = ReplicaState::new();
|
||||
for state in self.replicas.iter().flatten() {
|
||||
acc.hs_feedback.ts = max(acc.hs_feedback.ts, state.hs_feedback.ts);
|
||||
acc.hs_feedback.xmin = min(acc.hs_feedback.xmin, state.hs_feedback.xmin);
|
||||
acc.hs_feedback.catalog_xmin =
|
||||
min(acc.hs_feedback.catalog_xmin, state.hs_feedback.catalog_xmin);
|
||||
|
||||
// FIXME
|
||||
// If multiple pageservers are streaming WAL and send feedback for the same timeline simultaneously,
|
||||
// this code is not correct.
|
||||
// Now the most advanced feedback is used.
|
||||
// If one pageserver lags when another doesn't, the backpressure won't be activated on compute and lagging
|
||||
// pageserver is prone to timeout errors.
|
||||
//
|
||||
// To choose what feedback to use and resend to compute node,
|
||||
// we need to know which pageserver compute node considers to be main.
|
||||
// See https://github.com/neondatabase/neon/issues/1171
|
||||
//
|
||||
if let Some(pageserver_feedback) = state.pageserver_feedback {
|
||||
if let Some(acc_feedback) = acc.pageserver_feedback {
|
||||
if acc_feedback.last_received_lsn < pageserver_feedback.last_received_lsn {
|
||||
warn!("More than one pageserver is streaming WAL for the timeline. Feedback resolving is not fully supported yet.");
|
||||
acc.pageserver_feedback = Some(pageserver_feedback);
|
||||
}
|
||||
} else {
|
||||
acc.pageserver_feedback = Some(pageserver_feedback);
|
||||
}
|
||||
|
||||
// last lsn received by pageserver
|
||||
// FIXME if multiple pageservers are streaming WAL, last_received_lsn must be tracked per pageserver.
|
||||
// See https://github.com/neondatabase/neon/issues/1171
|
||||
acc.last_received_lsn = Lsn::from(pageserver_feedback.last_received_lsn);
|
||||
|
||||
// When at least one pageserver has preserved data up to remote_consistent_lsn,
|
||||
// safekeeper is free to delete it, so choose max of all pageservers.
|
||||
acc.remote_consistent_lsn = max(
|
||||
Lsn::from(pageserver_feedback.remote_consistent_lsn),
|
||||
acc.remote_consistent_lsn,
|
||||
);
|
||||
}
|
||||
}
|
||||
acc
|
||||
}
|
||||
|
||||
/// Assign new replica ID. We choose first empty cell in the replicas vector
|
||||
/// or extend the vector if there are no free slots.
|
||||
pub fn add_replica(&mut self, state: ReplicaState) -> usize {
|
||||
if let Some(pos) = self.replicas.iter().position(|r| r.is_none()) {
|
||||
self.replicas[pos] = Some(state);
|
||||
return pos;
|
||||
}
|
||||
let pos = self.replicas.len();
|
||||
self.replicas.push(Some(state));
|
||||
pos
|
||||
}
|
||||
|
||||
fn get_safekeeper_info(
|
||||
&self,
|
||||
ttid: &TenantTimelineId,
|
||||
conf: &SafeKeeperConf,
|
||||
remote_consistent_lsn: Lsn,
|
||||
) -> SafekeeperTimelineInfo {
|
||||
SafekeeperTimelineInfo {
|
||||
safekeeper_id: conf.my_id.0,
|
||||
@@ -328,11 +233,7 @@ impl SharedState {
|
||||
// note: this value is not flushed to control file yet and can be lost
|
||||
commit_lsn: self.sk.inmem.commit_lsn.0,
|
||||
// TODO: rework feedbacks to avoid max here
|
||||
remote_consistent_lsn: max(
|
||||
self.get_replicas_state().remote_consistent_lsn,
|
||||
self.sk.inmem.remote_consistent_lsn,
|
||||
)
|
||||
.0,
|
||||
remote_consistent_lsn: remote_consistent_lsn.0,
|
||||
peer_horizon_lsn: self.sk.inmem.peer_horizon_lsn.0,
|
||||
safekeeper_connstr: conf.listen_pg_addr.clone(),
|
||||
backup_lsn: self.sk.inmem.backup_lsn.0,
|
||||
@@ -387,6 +288,7 @@ pub struct Timeline {
|
||||
/// Safekeeper and other state, that should remain consistent and synchronized
|
||||
/// with the disk.
|
||||
mutex: Mutex<SharedState>,
|
||||
walsenders: Arc<WalSenders>,
|
||||
|
||||
/// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal.
|
||||
cancellation_tx: watch::Sender<bool>,
|
||||
@@ -409,6 +311,7 @@ impl Timeline {
|
||||
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
|
||||
|
||||
let shared_state = SharedState::restore(&conf, &ttid)?;
|
||||
let rcl = shared_state.sk.state.remote_consistent_lsn;
|
||||
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
|
||||
watch::channel(shared_state.sk.state.commit_lsn);
|
||||
let (cancellation_tx, cancellation_rx) = watch::channel(false);
|
||||
@@ -419,6 +322,7 @@ impl Timeline {
|
||||
commit_lsn_watch_tx,
|
||||
commit_lsn_watch_rx,
|
||||
mutex: Mutex::new(shared_state),
|
||||
walsenders: WalSenders::new(rcl),
|
||||
cancellation_rx,
|
||||
cancellation_tx,
|
||||
timeline_dir: conf.timeline_dir(&ttid),
|
||||
@@ -444,6 +348,7 @@ impl Timeline {
|
||||
commit_lsn_watch_tx,
|
||||
commit_lsn_watch_rx,
|
||||
mutex: Mutex::new(SharedState::create_new(&conf, &ttid, state)?),
|
||||
walsenders: WalSenders::new(Lsn(0)),
|
||||
cancellation_rx,
|
||||
cancellation_tx,
|
||||
timeline_dir: conf.timeline_dir(&ttid),
|
||||
@@ -475,7 +380,7 @@ impl Timeline {
|
||||
match || -> Result<()> {
|
||||
shared_state.sk.persist()?;
|
||||
// TODO: add more initialization steps here
|
||||
shared_state.update_status(self.ttid);
|
||||
self.update_status(shared_state);
|
||||
Ok(())
|
||||
}() {
|
||||
Ok(_) => Ok(()),
|
||||
@@ -531,6 +436,10 @@ impl Timeline {
|
||||
self.mutex.lock()
|
||||
}
|
||||
|
||||
fn update_status(&self, shared_state: &mut SharedState) -> bool {
|
||||
shared_state.update_status(self.get_walsenders().get_remote_consistent_lsn(), self.ttid)
|
||||
}
|
||||
|
||||
/// Register compute connection, starting timeline-related activity if it is
|
||||
/// not running yet.
|
||||
pub async fn on_compute_connect(&self) -> Result<()> {
|
||||
@@ -542,7 +451,7 @@ impl Timeline {
|
||||
{
|
||||
let mut shared_state = self.write_shared_state();
|
||||
shared_state.num_computes += 1;
|
||||
is_wal_backup_action_pending = shared_state.update_status(self.ttid);
|
||||
is_wal_backup_action_pending = self.update_status(&mut shared_state);
|
||||
}
|
||||
// Wake up wal backup launcher, if offloading not started yet.
|
||||
if is_wal_backup_action_pending {
|
||||
@@ -559,7 +468,7 @@ impl Timeline {
|
||||
{
|
||||
let mut shared_state = self.write_shared_state();
|
||||
shared_state.num_computes -= 1;
|
||||
is_wal_backup_action_pending = shared_state.update_status(self.ttid);
|
||||
is_wal_backup_action_pending = self.update_status(&mut shared_state);
|
||||
}
|
||||
// Wake up wal backup launcher, if it is time to stop the offloading.
|
||||
if is_wal_backup_action_pending {
|
||||
@@ -574,26 +483,19 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns true if walsender should stop sending WAL to pageserver.
|
||||
/// TODO: check this pageserver is actually interested in this timeline.
|
||||
pub fn should_walsender_stop(&self, replica_id: usize) -> bool {
|
||||
/// Returns true if walsender should stop sending WAL to pageserver. We
|
||||
/// terminate it if remote_consistent_lsn reached commit_lsn and there is no
|
||||
/// computes. While there might be nothing to stream already, we learn about
|
||||
/// remote_consistent_lsn update through replication feedback, and we want
|
||||
/// to stop pushing to the broker if pageserver is fully caughtup.
|
||||
pub fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
|
||||
if self.is_cancelled() {
|
||||
return true;
|
||||
}
|
||||
let mut shared_state = self.write_shared_state();
|
||||
let shared_state = self.write_shared_state();
|
||||
if shared_state.num_computes == 0 {
|
||||
let replica_state = shared_state.replicas[replica_id].unwrap();
|
||||
let reported_remote_consistent_lsn = replica_state
|
||||
.pageserver_feedback
|
||||
.map(|f| Lsn(f.remote_consistent_lsn))
|
||||
.unwrap_or(Lsn::INVALID);
|
||||
let stop = shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
|
||||
(reported_remote_consistent_lsn!= Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
|
||||
reported_remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
|
||||
if stop {
|
||||
shared_state.update_status(self.ttid);
|
||||
return true;
|
||||
}
|
||||
return shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
|
||||
reported_remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn;
|
||||
}
|
||||
false
|
||||
}
|
||||
@@ -628,13 +530,12 @@ impl Timeline {
|
||||
let mut shared_state = self.write_shared_state();
|
||||
rmsg = shared_state.sk.process_msg(msg)?;
|
||||
|
||||
// if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn
|
||||
// if this is AppendResponse, fill in proper pageserver and hot
|
||||
// standby feedback.
|
||||
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
|
||||
let state = shared_state.get_replicas_state();
|
||||
resp.hs_feedback = state.hs_feedback;
|
||||
if let Some(pageserver_feedback) = state.pageserver_feedback {
|
||||
resp.pageserver_feedback = pageserver_feedback;
|
||||
}
|
||||
let (ps_feedback, hs_feedback) = self.walsenders.get_feedbacks();
|
||||
resp.hs_feedback = hs_feedback;
|
||||
resp.pageserver_feedback = ps_feedback;
|
||||
}
|
||||
|
||||
commit_lsn = shared_state.sk.inmem.commit_lsn;
|
||||
@@ -684,19 +585,29 @@ impl Timeline {
|
||||
/// Get safekeeper info for broadcasting to broker and other peers.
|
||||
pub fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
|
||||
let shared_state = self.write_shared_state();
|
||||
shared_state.get_safekeeper_info(&self.ttid, conf)
|
||||
shared_state.get_safekeeper_info(
|
||||
&self.ttid,
|
||||
conf,
|
||||
self.walsenders.get_remote_consistent_lsn(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Update timeline state with peer safekeeper data.
|
||||
pub async fn record_safekeeper_info(&self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
|
||||
pub async fn record_safekeeper_info(&self, mut sk_info: SafekeeperTimelineInfo) -> Result<()> {
|
||||
// Update local remote_consistent_lsn in memory (in .walsenders) and in
|
||||
// sk_info to pass it down to control file.
|
||||
sk_info.remote_consistent_lsn = self
|
||||
.walsenders
|
||||
.update_remote_consistent_lsn(Lsn(sk_info.remote_consistent_lsn))
|
||||
.0;
|
||||
let is_wal_backup_action_pending: bool;
|
||||
let commit_lsn: Lsn;
|
||||
{
|
||||
let mut shared_state = self.write_shared_state();
|
||||
shared_state.sk.record_safekeeper_info(sk_info)?;
|
||||
let peer_info = PeerInfo::from_sk_info(sk_info, Instant::now());
|
||||
shared_state.sk.record_safekeeper_info(&sk_info)?;
|
||||
let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
|
||||
shared_state.peers_info.upsert(&peer_info);
|
||||
is_wal_backup_action_pending = shared_state.update_status(self.ttid);
|
||||
is_wal_backup_action_pending = self.update_status(&mut shared_state);
|
||||
commit_lsn = shared_state.sk.inmem.commit_lsn;
|
||||
}
|
||||
self.commit_lsn_watch_tx.send(commit_lsn)?;
|
||||
@@ -723,22 +634,8 @@ impl Timeline {
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Add send_wal replica to the in-memory vector of replicas.
|
||||
pub fn add_replica(&self, state: ReplicaState) -> usize {
|
||||
self.write_shared_state().add_replica(state)
|
||||
}
|
||||
|
||||
/// Update replication replica state.
|
||||
pub fn update_replica_state(&self, id: usize, state: ReplicaState) {
|
||||
let mut shared_state = self.write_shared_state();
|
||||
shared_state.replicas[id] = Some(state);
|
||||
}
|
||||
|
||||
/// Remove send_wal replica from the in-memory vector of replicas.
|
||||
pub fn remove_replica(&self, id: usize) {
|
||||
let mut shared_state = self.write_shared_state();
|
||||
assert!(shared_state.replicas[id].is_some());
|
||||
shared_state.replicas[id] = None;
|
||||
pub fn get_walsenders(&self) -> &Arc<WalSenders> {
|
||||
&self.walsenders
|
||||
}
|
||||
|
||||
/// Returns flush_lsn.
|
||||
@@ -781,16 +678,12 @@ impl Timeline {
|
||||
return None;
|
||||
}
|
||||
|
||||
let ps_feedback = self.walsenders.get_ps_feedback();
|
||||
let state = self.write_shared_state();
|
||||
if state.active {
|
||||
Some(FullTimelineInfo {
|
||||
ttid: self.ttid,
|
||||
replicas: state
|
||||
.replicas
|
||||
.iter()
|
||||
.filter_map(|r| r.as_ref())
|
||||
.copied()
|
||||
.collect(),
|
||||
ps_feedback,
|
||||
wal_backup_active: state.wal_backup_active,
|
||||
timeline_is_active: state.active,
|
||||
num_computes: state.num_computes,
|
||||
@@ -799,6 +692,7 @@ impl Timeline {
|
||||
mem_state: state.sk.inmem.clone(),
|
||||
persisted_state: state.sk.state.clone(),
|
||||
flush_lsn: state.sk.wal_store.flush_lsn(),
|
||||
remote_consistent_lsn: self.get_walsenders().get_remote_consistent_lsn(),
|
||||
wal_storage: state.sk.wal_store.get_metrics(),
|
||||
})
|
||||
} else {
|
||||
@@ -816,7 +710,7 @@ impl Timeline {
|
||||
debug_dump::Memory {
|
||||
is_cancelled: self.is_cancelled(),
|
||||
peers_info_len: state.peers_info.0.len(),
|
||||
replicas: state.replicas.clone(),
|
||||
walsenders: self.walsenders.get_all(),
|
||||
wal_backup_active: state.wal_backup_active,
|
||||
active: state.active,
|
||||
num_computes: state.num_computes,
|
||||
|
||||
@@ -2586,6 +2586,7 @@ class SafekeeperTimelineStatus:
|
||||
commit_lsn: Lsn
|
||||
timeline_start_lsn: Lsn
|
||||
backup_lsn: Lsn
|
||||
peer_horizon_lsn: Lsn
|
||||
remote_consistent_lsn: Lsn
|
||||
|
||||
|
||||
@@ -2643,6 +2644,7 @@ class SafekeeperHttpClient(requests.Session):
|
||||
commit_lsn=Lsn(resj["commit_lsn"]),
|
||||
timeline_start_lsn=Lsn(resj["timeline_start_lsn"]),
|
||||
backup_lsn=Lsn(resj["backup_lsn"]),
|
||||
peer_horizon_lsn=Lsn(resj["peer_horizon_lsn"]),
|
||||
remote_consistent_lsn=Lsn(resj["remote_consistent_lsn"]),
|
||||
)
|
||||
|
||||
|
||||
@@ -299,7 +299,7 @@ def test_broker(neon_env_builder: NeonEnvBuilder):
|
||||
raise RuntimeError(
|
||||
f"timed out waiting {elapsed:.0f}s for remote_consistent_lsn propagation: status before {stat_before}, status current {stat_after}"
|
||||
)
|
||||
time.sleep(0.5)
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
# Test that old WAL consumed by peers and pageserver is removed from safekeepers.
|
||||
@@ -383,12 +383,15 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
wait(
|
||||
lambda first_segments=first_segments: all(not os.path.exists(p) for p in first_segments),
|
||||
"first segment get removed",
|
||||
wait_f=lambda http_cli=http_cli, tenant_id=tenant_id, timeline_id=timeline_id: log.info(
|
||||
f"waiting for segments removal, sk info: {http_cli.timeline_status(tenant_id=tenant_id, timeline_id=timeline_id)}"
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
# Wait for something, defined as f() returning True, raising error if this
|
||||
# doesn't happen without timeout seconds.
|
||||
def wait(f, desc, timeout=30):
|
||||
# doesn't happen without timeout seconds, and calling wait_f while waiting.
|
||||
def wait(f, desc, timeout=30, wait_f=None):
|
||||
started_at = time.time()
|
||||
while True:
|
||||
if f():
|
||||
@@ -397,6 +400,8 @@ def wait(f, desc, timeout=30):
|
||||
if elapsed > timeout:
|
||||
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for {desc}")
|
||||
time.sleep(0.5)
|
||||
if wait_f is not None:
|
||||
wait_f()
|
||||
|
||||
|
||||
def is_segment_offloaded(
|
||||
|
||||
Reference in New Issue
Block a user