diff --git a/libs/etcd_broker/src/subscription_value.rs b/libs/etcd_broker/src/subscription_value.rs index d3e2011761..60a5411926 100644 --- a/libs/etcd_broker/src/subscription_value.rs +++ b/libs/etcd_broker/src/subscription_value.rs @@ -29,6 +29,9 @@ pub struct SkTimelineInfo { #[serde_as(as = "Option")] #[serde(default)] pub peer_horizon_lsn: Option, + #[serde_as(as = "Option")] + #[serde(default)] + pub local_start_lsn: Option, /// A connection string to use for WAL receiving. #[serde(default)] pub safekeeper_connstr: Option, diff --git a/libs/utils/src/id.rs b/libs/utils/src/id.rs index 059ce69ca4..f245f7c3d4 100644 --- a/libs/utils/src/id.rs +++ b/libs/utils/src/id.rs @@ -75,6 +75,12 @@ impl From<[u8; 16]> for Id { } } +impl From for u128 { + fn from(id: Id) -> Self { + u128::from_le_bytes(id.0) + } +} + impl fmt::Display for Id { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(&self.hex_encode()) @@ -136,6 +142,12 @@ macro_rules! id_newtype { } } + impl From<$t> for u128 { + fn from(id: $t) -> Self { + u128::from(id.0) + } + } + impl fmt::Display for $t { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.0.fmt(f) diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index 29179e9871..c843a5c703 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -802,6 +802,7 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, safekeeper_connstr: None, }, etcd_version: 0, @@ -818,6 +819,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some("no commit_lsn".to_string()), }, etcd_version: 0, @@ -834,6 +837,7 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, safekeeper_connstr: Some("no commit_lsn".to_string()), }, etcd_version: 0, @@ -850,6 +854,7 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, safekeeper_connstr: None, }, etcd_version: 0, @@ -909,6 +914,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -925,6 +932,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some("not advanced Lsn".to_string()), }, etcd_version: 0, @@ -941,6 +950,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some("not enough advanced Lsn".to_string()), }, etcd_version: 0, @@ -975,6 +986,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1007,6 +1020,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some("smaller commit_lsn".to_string()), }, etcd_version: 0, @@ -1023,6 +1038,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1039,6 +1056,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: None, }, etcd_version: 0, @@ -1084,6 +1103,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1100,6 +1121,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1169,6 +1192,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1185,6 +1210,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()), }, etcd_version: 0, @@ -1256,6 +1283,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1327,6 +1356,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 3f55d823cc..dbaa989c9b 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -21,7 +21,8 @@ use metrics::set_build_info_metric; use safekeeper::broker; use safekeeper::control_file; use safekeeper::defaults::{ - DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS, + DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG, + DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS, }; use safekeeper::http; use safekeeper::remove_wal; @@ -79,12 +80,6 @@ fn main() -> anyhow::Result<()> { .long("pageserver") .takes_value(true), ) - .arg( - Arg::new("recall") - .long("recall") - .takes_value(true) - .help("Period for requestion pageserver to call for replication"), - ) .arg( Arg::new("daemonize") .short('d') @@ -119,6 +114,12 @@ fn main() -> anyhow::Result<()> { .takes_value(true) .help("a prefix to always use when polling/pusing data in etcd from this safekeeper"), ) + .arg( + Arg::new("heartbeat-timeout") + .long("heartbeat-timeout") + .takes_value(true) + .help(formatcp!("Peer is considered dead after not receiving heartbeats from it during this period (default {}s), passed as a human readable duration.", DEFAULT_HEARTBEAT_TIMEOUT.as_secs())) + ) .arg( Arg::new("wal-backup-threads").long("backup-threads").takes_value(true).help(formatcp!("number of threads for wal backup (default {DEFAULT_WAL_BACKUP_RUNTIME_THREADS}")), ).arg( @@ -127,6 +128,12 @@ fn main() -> anyhow::Result<()> { .takes_value(true) .help("Remote storage configuration for WAL backup (offloading to s3) as TOML inline table, e.g. {\"max_concurrent_syncs\" = 17, \"max_sync_errors\": 13, \"bucket_name\": \"\", \"bucket_region\":\"\", \"concurrency_limit\": 119}.\nSafekeeper offloads WAL to [prefix_in_bucket/]//, mirroring structure on the file system.") ) + .arg( + Arg::new("max-offloader-lag") + .long("max-offloader-lag") + .takes_value(true) + .help(formatcp!("Safekeeper won't be elected for WAL offloading if it is lagging for more than this value (default {}MB) in bytes", DEFAULT_MAX_OFFLOADER_LAG / (1 << 20))) + ) .arg( Arg::new("enable-wal-backup") .long("enable-wal-backup") @@ -173,10 +180,6 @@ fn main() -> anyhow::Result<()> { conf.listen_http_addr = addr.to_owned(); } - if let Some(recall) = arg_matches.value_of("recall") { - conf.recall_period = humantime::parse_duration(recall)?; - } - let mut given_id = None; if let Some(given_id_str) = arg_matches.value_of("id") { given_id = Some(NodeId( @@ -194,6 +197,16 @@ fn main() -> anyhow::Result<()> { conf.broker_etcd_prefix = prefix.to_string(); } + if let Some(heartbeat_timeout_str) = arg_matches.value_of("heartbeat-timeout") { + conf.heartbeat_timeout = + humantime::parse_duration(heartbeat_timeout_str).with_context(|| { + format!( + "failed to parse heartbeat-timeout {}", + heartbeat_timeout_str + ) + })?; + } + if let Some(backup_threads) = arg_matches.value_of("wal-backup-threads") { conf.backup_runtime_threads = backup_threads .parse() @@ -206,6 +219,14 @@ fn main() -> anyhow::Result<()> { let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again conf.remote_storage = Some(RemoteStorageConfig::from_toml(storage_conf_parsed_toml)?); } + if let Some(max_offloader_lag_str) = arg_matches.value_of("max-offloader-lag") { + conf.max_offloader_lag = max_offloader_lag_str.parse().with_context(|| { + format!( + "failed to parse max offloader lag {}", + max_offloader_lag_str + ) + })?; + } // Seems like there is no better way to accept bool values explicitly in clap. conf.wal_backup_enabled = arg_matches .value_of("enable-wal-backup") diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 6a2456ecda..acc45625ca 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -1,6 +1,5 @@ //! Communication with etcd, providing safekeeper peers and pageserver coordination. -use anyhow::anyhow; use anyhow::Context; use anyhow::Error; use anyhow::Result; @@ -12,11 +11,9 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::collections::HashSet; use std::time::Duration; -use tokio::spawn; use tokio::task::JoinHandle; use tokio::{runtime, time::sleep}; use tracing::*; -use url::Url; use crate::GlobalTimelines; use crate::SafeKeeperConf; @@ -56,113 +53,6 @@ fn timeline_safekeeper_path( ) } -pub struct Election { - pub election_name: String, - pub candidate_name: String, - pub broker_endpoints: Vec, -} - -impl Election { - pub fn new(election_name: String, candidate_name: String, broker_endpoints: Vec) -> Self { - Self { - election_name, - candidate_name, - broker_endpoints, - } - } -} - -pub struct ElectionLeader { - client: Client, - keep_alive: JoinHandle>, -} - -impl ElectionLeader { - pub async fn check_am_i( - &mut self, - election_name: String, - candidate_name: String, - ) -> Result { - let resp = self.client.leader(election_name).await?; - - let kv = resp - .kv() - .ok_or_else(|| anyhow!("failed to get leader response"))?; - let leader = kv.value_str()?; - - Ok(leader == candidate_name) - } - - pub async fn give_up(self) { - self.keep_alive.abort(); - // TODO: it'll be wise to resign here but it'll happen after lease expiration anyway - // should we await for keep alive termination? - let _ = self.keep_alive.await; - } -} - -pub async fn get_leader(req: &Election, leader: &mut Option) -> Result<()> { - let mut client = Client::connect(req.broker_endpoints.clone(), None) - .await - .context("Could not connect to etcd")?; - - let lease = client - .lease_grant(LEASE_TTL_SEC, None) - .await - .context("Could not acquire a lease"); - - let lease_id = lease.map(|l| l.id()).unwrap(); - - // kill previous keepalive, if any - if let Some(l) = leader.take() { - l.give_up().await; - } - - let keep_alive = spawn::<_>(lease_keep_alive(client.clone(), lease_id)); - // immediately save handle to kill task if we get canceled below - *leader = Some(ElectionLeader { - client: client.clone(), - keep_alive, - }); - - client - .campaign( - req.election_name.clone(), - req.candidate_name.clone(), - lease_id, - ) - .await?; - - Ok(()) -} - -async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> { - let (mut keeper, mut ka_stream) = client - .lease_keep_alive(lease_id) - .await - .context("failed to create keepalive stream")?; - - loop { - let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC); - - keeper - .keep_alive() - .await - .context("failed to send LeaseKeepAliveRequest")?; - - ka_stream - .message() - .await - .context("failed to receive LeaseKeepAliveResponse")?; - - sleep(push_interval).await; - } -} - -pub fn get_candiate_name(system_id: NodeId) -> String { - format!("id_{system_id}") -} - async fn push_sk_info( ttid: TenantTimelineId, mut client: Client, @@ -236,7 +126,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { let handles = active_tlis .iter() .map(|tli| { - let sk_info = tli.get_public_info(&conf); + let sk_info = tli.get_safekeer_info(&conf); let key = timeline_safekeeper_path(conf.broker_etcd_prefix.clone(), tli.ttid, conf.my_id); let lease = leases.remove(&tli.ttid).unwrap(); @@ -282,6 +172,9 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { Some(new_info) => { // note: there are blocking operations below, but it's considered fine for now if let Ok(tli) = GlobalTimelines::get(new_info.key.id) { + // Note that we also receive *our own* info. That's + // important, as it is used as an indication of live + // connection to the broker. tli.record_safekeeper_info(&new_info.value, new_info.key.node_id) .await? } diff --git a/safekeeper/src/control_file_upgrade.rs b/safekeeper/src/control_file_upgrade.rs index 1ce9186085..856c164be8 100644 --- a/safekeeper/src/control_file_upgrade.rs +++ b/safekeeper/src/control_file_upgrade.rs @@ -1,6 +1,7 @@ //! Code to deal with safekeeper control file upgrades use crate::safekeeper::{ - AcceptorState, Peers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermSwitchEntry, + AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, + TermSwitchEntry, }; use anyhow::{bail, Result}; use serde::{Deserialize, Serialize}; @@ -134,7 +135,7 @@ pub struct SafeKeeperStateV4 { // fundamental; but state is saved here only for informational purposes and // obviously can be stale. (Currently not saved at all, but let's provision // place to have less file version upgrades). - pub peers: Peers, + pub peers: PersistedPeers, } pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result { @@ -165,7 +166,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result backup_lsn: Lsn(0), peer_horizon_lsn: oldstate.truncate_lsn, remote_consistent_lsn: Lsn(0), - peers: Peers(vec![]), + peers: PersistedPeers(vec![]), }); // migrate to hexing some ids } else if version == 2 { @@ -188,7 +189,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result backup_lsn: Lsn(0), peer_horizon_lsn: oldstate.truncate_lsn, remote_consistent_lsn: Lsn(0), - peers: Peers(vec![]), + peers: PersistedPeers(vec![]), }); // migrate to moving tenant_id/timeline_id to the top and adding some lsns } else if version == 3 { @@ -211,7 +212,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result backup_lsn: Lsn(0), peer_horizon_lsn: oldstate.truncate_lsn, remote_consistent_lsn: Lsn(0), - peers: Peers(vec![]), + peers: PersistedPeers(vec![]), }); // migrate to having timeline_start_lsn } else if version == 4 { @@ -234,7 +235,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result backup_lsn: Lsn::INVALID, peer_horizon_lsn: oldstate.peer_horizon_lsn, remote_consistent_lsn: Lsn(0), - peers: Peers(vec![]), + peers: PersistedPeers(vec![]), }); } else if version == 5 { info!("reading safekeeper control file version {}", version); diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index e38a5a4633..81bc248b93 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -1,4 +1,6 @@ -use defaults::DEFAULT_WAL_BACKUP_RUNTIME_THREADS; +use defaults::{ + DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_MAX_OFFLOADER_LAG, DEFAULT_WAL_BACKUP_RUNTIME_THREADS, +}; // use remote_storage::RemoteStorageConfig; use std::path::PathBuf; @@ -36,6 +38,8 @@ pub mod defaults { pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10); pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8; + pub const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5); + pub const DEFAULT_MAX_OFFLOADER_LAG: u64 = 128 * (1 << 20); } #[derive(Debug, Clone)] @@ -60,6 +64,8 @@ pub struct SafeKeeperConf { pub broker_endpoints: Vec, pub broker_etcd_prefix: String, pub auth_validation_public_key_path: Option, + pub heartbeat_timeout: Duration, + pub max_offloader_lag: u64, } impl SafeKeeperConf { @@ -92,6 +98,8 @@ impl Default for SafeKeeperConf { backup_runtime_threads: DEFAULT_WAL_BACKUP_RUNTIME_THREADS, wal_backup_enabled: true, auth_validation_public_key_path: None, + heartbeat_timeout: DEFAULT_HEARTBEAT_TIMEOUT, + max_offloader_lag: DEFAULT_MAX_OFFLOADER_LAG, } } } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 7b11aaf92a..3f9b70f282 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -11,6 +11,7 @@ use std::cmp::max; use std::cmp::min; use std::fmt; use std::io::Read; + use tracing::*; use crate::control_file; @@ -132,9 +133,8 @@ pub struct ServerInfo { pub wal_seg_size: u32, } -/// Data published by safekeeper to the peers #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct PeerInfo { +pub struct PersistedPeerInfo { /// LSN up to which safekeeper offloaded WAL to s3. backup_lsn: Lsn, /// Term of the last entry. @@ -145,7 +145,7 @@ pub struct PeerInfo { commit_lsn: Lsn, } -impl PeerInfo { +impl PersistedPeerInfo { fn new() -> Self { Self { backup_lsn: Lsn::INVALID, @@ -156,10 +156,8 @@ impl PeerInfo { } } -// vector-based node id -> peer state map with very limited functionality we -// need/ #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Peers(pub Vec<(NodeId, PeerInfo)>); +pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>); /// Persistent information stored on safekeeper node /// On disk data is prefixed by magic and format version and followed by checksum. @@ -203,7 +201,7 @@ pub struct SafeKeeperState { // fundamental; but state is saved here only for informational purposes and // obviously can be stale. (Currently not saved at all, but let's provision // place to have less file version upgrades). - pub peers: Peers, + pub peers: PersistedPeers, } #[derive(Debug, Clone)] @@ -240,7 +238,12 @@ impl SafeKeeperState { backup_lsn: local_start_lsn, peer_horizon_lsn: local_start_lsn, remote_consistent_lsn: Lsn(0), - peers: Peers(peers.iter().map(|p| (*p, PeerInfo::new())).collect()), + peers: PersistedPeers( + peers + .iter() + .map(|p| (*p, PersistedPeerInfo::new())) + .collect(), + ), } } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 3fb77bf582..0b5935349b 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -7,7 +7,7 @@ use etcd_broker::subscription_value::SkTimelineInfo; use postgres_ffi::XLogSegNo; -use tokio::sync::watch; +use tokio::{sync::watch, time::Instant}; use std::cmp::{max, min}; @@ -26,7 +26,7 @@ use utils::{ use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, - SafekeeperMemState, ServerInfo, + SafekeeperMemState, ServerInfo, Term, }; use crate::send_wal::HotStandbyFeedback; use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION}; @@ -36,6 +36,53 @@ use crate::wal_storage; use crate::wal_storage::Storage as wal_storage_iface; use crate::SafeKeeperConf; +/// Things safekeeper should know about timeline state on peers. +#[derive(Debug, Clone)] +pub struct PeerInfo { + pub sk_id: NodeId, + /// Term of the last entry. + _last_log_term: Term, + /// LSN of the last record. + _flush_lsn: Lsn, + pub commit_lsn: Lsn, + /// Since which LSN safekeeper has WAL. TODO: remove this once we fill new + /// sk since backup_lsn. + pub local_start_lsn: Lsn, + /// When info was received. + ts: Instant, +} + +impl PeerInfo { + fn from_sk_info(sk_id: NodeId, sk_info: &SkTimelineInfo, ts: Instant) -> PeerInfo { + PeerInfo { + sk_id, + _last_log_term: sk_info.last_log_term.unwrap_or(0), + _flush_lsn: sk_info.flush_lsn.unwrap_or(Lsn::INVALID), + commit_lsn: sk_info.commit_lsn.unwrap_or(Lsn::INVALID), + local_start_lsn: sk_info.local_start_lsn.unwrap_or(Lsn::INVALID), + ts, + } + } +} + +// vector-based node id -> peer state map with very limited functionality we +// need. +#[derive(Debug, Clone, Default)] +pub struct PeersInfo(pub Vec); + +impl PeersInfo { + fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> { + self.0.iter_mut().find(|p| p.sk_id == id) + } + + fn upsert(&mut self, p: &PeerInfo) { + match self.get(p.sk_id) { + Some(rp) => *rp = p.clone(), + None => self.0.push(p.clone()), + } + } +} + /// Replica status update + hot standby feedback #[derive(Debug, Clone, Copy)] pub struct ReplicaState { @@ -74,6 +121,8 @@ impl ReplicaState { pub struct SharedState { /// Safekeeper object sk: SafeKeeper, + /// State of peers as we know it. + peers_info: PeersInfo, /// State of replicas replicas: Vec>, /// True when WAL backup launcher oversees the timeline, making sure WAL is @@ -123,7 +172,8 @@ impl SharedState { Ok(Self { sk, - replicas: Vec::new(), + peers_info: PeersInfo(vec![]), + replicas: vec![], wal_backup_active: false, active: false, num_computes: 0, @@ -142,6 +192,7 @@ impl SharedState { Ok(Self { sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?, + peers_info: PeersInfo(vec![]), replicas: Vec::new(), wal_backup_active: false, active: false, @@ -268,6 +319,24 @@ impl SharedState { self.replicas.push(Some(state)); pos } + + fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo { + SkTimelineInfo { + last_log_term: Some(self.sk.get_epoch()), + flush_lsn: Some(self.sk.wal_store.flush_lsn()), + // note: this value is not flushed to control file yet and can be lost + commit_lsn: Some(self.sk.inmem.commit_lsn), + // TODO: rework feedbacks to avoid max here + remote_consistent_lsn: Some(max( + self.get_replicas_state().remote_consistent_lsn, + self.sk.inmem.remote_consistent_lsn, + )), + peer_horizon_lsn: Some(self.sk.inmem.peer_horizon_lsn), + safekeeper_connstr: Some(conf.listen_pg_addr.clone()), + backup_lsn: Some(self.sk.inmem.backup_lsn), + local_start_lsn: Some(self.sk.state.local_start_lsn), + } + } } #[derive(Debug, thiserror::Error)] @@ -632,36 +701,25 @@ impl Timeline { Ok(()) } - /// Return public safekeeper info for broadcasting to broker and other peers. - pub fn get_public_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo { + /// Get safekeeper info for broadcasting to broker and other peers. + pub fn get_safekeer_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo { let shared_state = self.write_shared_state(); - SkTimelineInfo { - last_log_term: Some(shared_state.sk.get_epoch()), - flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()), - // note: this value is not flushed to control file yet and can be lost - commit_lsn: Some(shared_state.sk.inmem.commit_lsn), - // TODO: rework feedbacks to avoid max here - remote_consistent_lsn: Some(max( - shared_state.get_replicas_state().remote_consistent_lsn, - shared_state.sk.inmem.remote_consistent_lsn, - )), - peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn), - safekeeper_connstr: Some(conf.listen_pg_addr.clone()), - backup_lsn: Some(shared_state.sk.inmem.backup_lsn), - } + shared_state.get_safekeeper_info(conf) } /// Update timeline state with peer safekeeper data. pub async fn record_safekeeper_info( &self, sk_info: &SkTimelineInfo, - _sk_id: NodeId, + sk_id: NodeId, ) -> Result<()> { let is_wal_backup_action_pending: bool; let commit_lsn: Lsn; { let mut shared_state = self.write_shared_state(); shared_state.sk.record_safekeeper_info(sk_info)?; + let peer_info = PeerInfo::from_sk_info(sk_id, sk_info, Instant::now()); + shared_state.peers_info.upsert(&peer_info); is_wal_backup_action_pending = shared_state.update_status(self.ttid); commit_lsn = shared_state.sk.inmem.commit_lsn; } @@ -673,6 +731,22 @@ impl Timeline { Ok(()) } + /// Get our latest view of alive peers status on the timeline. + /// We pass our own info through the broker as well, so when we don't have connection + /// to the broker returned vec is empty. + pub fn get_peers(&self, conf: &SafeKeeperConf) -> Vec { + let shared_state = self.write_shared_state(); + let now = Instant::now(); + shared_state + .peers_info + .0 + .iter() + // Regard peer as absent if we haven't heard from it within heartbeat_timeout. + .filter(|p| now.duration_since(p.ts) <= conf.heartbeat_timeout) + .cloned() + .collect() + } + /// Add send_wal replica to the in-memory vector of replicas. pub fn add_replica(&self, state: ReplicaState) -> usize { self.write_shared_state().add_replica(state) diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index c82a003161..797dab86b0 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -1,8 +1,7 @@ use anyhow::{Context, Result}; -use etcd_broker::subscription_key::{ - NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind, -}; + use tokio::task::JoinHandle; +use utils::id::NodeId; use std::cmp::min; use std::collections::HashMap; @@ -26,14 +25,11 @@ use tracing::*; use utils::{id::TenantTimelineId, lsn::Lsn}; -use crate::broker::{Election, ElectionLeader}; -use crate::timeline::Timeline; -use crate::{broker, GlobalTimelines, SafeKeeperConf}; +use crate::timeline::{PeerInfo, Timeline}; +use crate::{GlobalTimelines, SafeKeeperConf}; use once_cell::sync::OnceCell; -const BROKER_CONNECTION_RETRY_DELAY_MS: u64 = 1000; - const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10; const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000; @@ -70,47 +66,100 @@ struct WalBackupTimelineEntry { handle: Option, } -/// Start per timeline task, if it makes sense for this safekeeper to offload. -fn consider_start_task( +async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) { + if let Some(wb_handle) = entry.handle.take() { + // Tell the task to shutdown. Error means task exited earlier, that's ok. + let _ = wb_handle.shutdown_tx.send(()).await; + // Await the task itself. TODO: restart panicked tasks earlier. + if let Err(e) = wb_handle.handle.await { + warn!("WAL backup task for {} panicked: {}", ttid, e); + } + } +} + +/// The goal is to ensure that normally only one safekeepers offloads. However, +/// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short +/// time we have several ones as they PUT the same files. Also, +/// - frequently changing the offloader would be bad; +/// - electing seriously lagging safekeeper is undesirable; +/// So we deterministically choose among the reasonably caught up candidates. +fn determine_offloader( + alive_peers: &[PeerInfo], + wal_backup_lsn: Lsn, + ttid: TenantTimelineId, + conf: &SafeKeeperConf, +) -> (Option, String) { + // TODO: remove this once we fill newly joined safekeepers since backup_lsn. + let capable_peers = alive_peers + .iter() + .filter(|p| p.local_start_lsn <= wal_backup_lsn); + match alive_peers.iter().map(|p| p.commit_lsn).max() { + None => (None, "no connected peers to elect from".to_string()), + Some(max_commit_lsn) => { + let threshold = max_commit_lsn + .checked_sub(conf.max_offloader_lag) + .unwrap_or(Lsn(0)); + let mut caughtup_peers = capable_peers + .clone() + .filter(|p| p.commit_lsn >= threshold) + .collect::>(); + caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id)); + + // To distribute the load, shift by timeline_id. + let offloader = caughtup_peers + [(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize] + .sk_id; + + ( + Some(offloader), + format!( + "elected {} among {:?} peers, with {} of them being caughtup", + offloader, + capable_peers + .map(|p| (p.sk_id, p.commit_lsn)) + .collect::>(), + caughtup_peers.len() + ), + ) + } + } +} + +/// Based on peer information determine which safekeeper should offload; if it +/// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task +/// is running, kill it. +async fn update_task( conf: &SafeKeeperConf, ttid: TenantTimelineId, - task: &mut WalBackupTimelineEntry, + entry: &mut WalBackupTimelineEntry, ) { - if !task.timeline.can_wal_backup() { - return; + let alive_peers = entry.timeline.get_peers(conf); + let wal_backup_lsn = entry.timeline.get_wal_backup_lsn(); + let (offloader, election_dbg_str) = + determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf); + let elected_me = Some(conf.my_id) == offloader; + + if elected_me != (entry.handle.is_some()) { + if elected_me { + info!("elected for backup {}: {}", ttid, election_dbg_str); + + let (shutdown_tx, shutdown_rx) = mpsc::channel(1); + let timeline_dir = conf.timeline_dir(&ttid); + + let handle = tokio::spawn( + backup_task_main(ttid, timeline_dir, shutdown_rx) + .instrument(info_span!("WAL backup task", ttid = %ttid)), + ); + + entry.handle = Some(WalBackupTaskHandle { + shutdown_tx, + handle, + }); + } else { + info!("stepping down from backup {}: {}", ttid, election_dbg_str); + shut_down_task(ttid, entry).await; + } } - info!("starting WAL backup task for {}", ttid); - - // TODO: decide who should offload right here by simply checking current - // state instead of running elections in offloading task. - let election_name = SubscriptionKey { - cluster_prefix: conf.broker_etcd_prefix.clone(), - kind: SubscriptionKind::Operation( - ttid, - NodeKind::Safekeeper, - OperationKind::Safekeeper(SkOperationKind::WalBackup), - ), - } - .watch_key(); - let my_candidate_name = broker::get_candiate_name(conf.my_id); - let election = broker::Election::new( - election_name, - my_candidate_name, - conf.broker_endpoints.clone(), - ); - - let (shutdown_tx, shutdown_rx) = mpsc::channel(1); - let timeline_dir = conf.timeline_dir(&ttid); - - let handle = tokio::spawn( - backup_task_main(ttid, timeline_dir, shutdown_rx, election) - .instrument(info_span!("WAL backup task", ttid = %ttid)), - ); - - task.handle = Some(WalBackupTaskHandle { - shutdown_tx, - handle, - }); } const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000; @@ -158,27 +207,20 @@ async fn wal_backup_launcher_main_loop( timeline, handle: None, }); - consider_start_task(&conf, ttid, entry); + update_task(&conf, ttid, entry).await; } else { // need to stop the task info!("stopping WAL backup task for {}", ttid); - - let entry = tasks.remove(&ttid).unwrap(); - if let Some(wb_handle) = entry.handle { - // Tell the task to shutdown. Error means task exited earlier, that's ok. - let _ = wb_handle.shutdown_tx.send(()).await; - // Await the task itself. TODO: restart panicked tasks earlier. - if let Err(e) = wb_handle.handle.await { - warn!("WAL backup task for {} panicked: {}", ttid, e); - } - } + let mut entry = tasks.remove(&ttid).unwrap(); + shut_down_task(ttid, &mut entry).await; } } } - // Start known tasks, if needed and possible. + // For each timeline needing offloading, check if this safekeeper + // should do the job and start/stop the task accordingly. _ = ticker.tick() => { - for (ttid, entry) in tasks.iter_mut().filter(|(_, entry)| entry.handle.is_none()) { - consider_start_task(&conf, *ttid, entry); + for (ttid, entry) in tasks.iter_mut() { + update_task(&conf, *ttid, entry).await; } } } @@ -190,17 +232,13 @@ struct WalBackupTask { timeline_dir: PathBuf, wal_seg_size: usize, commit_lsn_watch_rx: watch::Receiver, - leader: Option, - election: Election, } -/// Offload single timeline. Called only after we checked that backup -/// is required (wal_backup_attend) and possible (can_wal_backup). +/// Offload single timeline. async fn backup_task_main( ttid: TenantTimelineId, timeline_dir: PathBuf, mut shutdown_rx: Receiver<()>, - election: Election, ) { info!("started"); let res = GlobalTimelines::get(ttid); @@ -215,8 +253,6 @@ async fn backup_task_main( commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(), timeline: tli, timeline_dir, - leader: None, - election, }; // task is spinned up only when wal_seg_size already initialized @@ -229,9 +265,6 @@ async fn backup_task_main( canceled = true; } } - if let Some(l) = wb.leader { - l.give_up().await; - } info!("task {}", if canceled { "canceled" } else { "terminated" }); } @@ -239,107 +272,68 @@ impl WalBackupTask { async fn run(&mut self) { let mut backup_lsn = Lsn(0); - // election loop + let mut retry_attempt = 0u32; + // offload loop loop { - let mut retry_attempt = 0u32; + if retry_attempt == 0 { + // wait for new WAL to arrive + if let Err(e) = self.commit_lsn_watch_rx.changed().await { + // should never happen, as we hold Arc to timeline. + error!("commit_lsn watch shut down: {:?}", e); + return; + } + } else { + // or just sleep if we errored previously + let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS; + if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt) + { + retry_delay = min(retry_delay, backoff_delay); + } + sleep(Duration::from_millis(retry_delay)).await; + } - info!("acquiring leadership"); - if let Err(e) = broker::get_leader(&self.election, &mut self.leader).await { - error!("error during leader election {:?}", e); - sleep(Duration::from_millis(BROKER_CONNECTION_RETRY_DELAY_MS)).await; + let commit_lsn = *self.commit_lsn_watch_rx.borrow(); + + // Note that backup_lsn can be higher than commit_lsn if we + // don't have much local WAL and others already uploaded + // segments we don't even have. + if backup_lsn.segment_number(self.wal_seg_size) + >= commit_lsn.segment_number(self.wal_seg_size) + { + continue; /* nothing to do, common case as we wake up on every commit_lsn bump */ + } + // Perhaps peers advanced the position, check shmem value. + backup_lsn = self.timeline.get_wal_backup_lsn(); + if backup_lsn.segment_number(self.wal_seg_size) + >= commit_lsn.segment_number(self.wal_seg_size) + { continue; } - info!("acquired leadership"); - // offload loop - loop { - if retry_attempt == 0 { - // wait for new WAL to arrive - if let Err(e) = self.commit_lsn_watch_rx.changed().await { - // should never happen, as we hold Arc to timeline. - error!("commit_lsn watch shut down: {:?}", e); + match backup_lsn_range( + backup_lsn, + commit_lsn, + self.wal_seg_size, + &self.timeline_dir, + ) + .await + { + Ok(backup_lsn_result) => { + backup_lsn = backup_lsn_result; + let res = self.timeline.set_wal_backup_lsn(backup_lsn_result); + if let Err(e) = res { + error!("backup error: {}", e); return; } - } else { - // or just sleep if we errored previously - let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS; - if let Some(backoff_delay) = - UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt) - { - retry_delay = min(retry_delay, backoff_delay); - } - sleep(Duration::from_millis(retry_delay)).await; + retry_attempt = 0; } + Err(e) => { + error!( + "failed while offloading range {}-{}: {:?}", + backup_lsn, commit_lsn, e + ); - let commit_lsn = *self.commit_lsn_watch_rx.borrow(); - - // Note that backup_lsn can be higher than commit_lsn if we - // don't have much local WAL and others already uploaded - // segments we don't even have. - if backup_lsn.segment_number(self.wal_seg_size) - >= commit_lsn.segment_number(self.wal_seg_size) - { - continue; /* nothing to do, common case as we wake up on every commit_lsn bump */ - } - // Perhaps peers advanced the position, check shmem value. - backup_lsn = self.timeline.get_wal_backup_lsn(); - if backup_lsn.segment_number(self.wal_seg_size) - >= commit_lsn.segment_number(self.wal_seg_size) - { - continue; - } - - if let Some(l) = self.leader.as_mut() { - // Optimization idea for later: - // Avoid checking election leader every time by returning current lease grant expiration time - // Re-check leadership only after expiration time, - // such approach would reduce overhead on write-intensive workloads - - match l - .check_am_i( - self.election.election_name.clone(), - self.election.candidate_name.clone(), - ) - .await - { - Ok(leader) => { - if !leader { - info!("lost leadership"); - break; - } - } - Err(e) => { - warn!("error validating leader, {:?}", e); - break; - } - } - } - - match backup_lsn_range( - backup_lsn, - commit_lsn, - self.wal_seg_size, - &self.timeline_dir, - ) - .await - { - Ok(backup_lsn_result) => { - backup_lsn = backup_lsn_result; - let res = self.timeline.set_wal_backup_lsn(backup_lsn_result); - if let Err(e) = res { - error!("backup error: {}", e); - return; - } - retry_attempt = 0; - } - Err(e) => { - error!( - "failed while offloading range {}-{}: {:?}", - backup_lsn, commit_lsn, e - ); - - retry_attempt = min(retry_attempt + 1, u32::MAX); - } + retry_attempt = min(retry_attempt + 1, u32::MAX); } } }