Determine safekeeper for offloading WAL without etcd election API.

This API is rather pointless, as sane choice anyway requires knowledge of peers
status and leaders lifetime in any case can intersect, which is fine for us --
so manual elections are straightforward. Here, we deterministically choose among
the reasonably caught up safekeepers, shifting by timeline id to spread the
load.

A step towards custom broker https://github.com/neondatabase/neon/issues/2394
This commit is contained in:
Arseny Sher
2022-10-16 15:51:21 +04:00
parent 5d6553d41d
commit 40a56a302a
10 changed files with 358 additions and 318 deletions

View File

@@ -29,6 +29,9 @@ pub struct SkTimelineInfo {
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub peer_horizon_lsn: Option<Lsn>,
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub local_start_lsn: Option<Lsn>,
/// A connection string to use for WAL receiving.
#[serde(default)]
pub safekeeper_connstr: Option<String>,

View File

@@ -75,6 +75,12 @@ impl From<[u8; 16]> for Id {
}
}
impl From<Id> for u128 {
fn from(id: Id) -> Self {
u128::from_le_bytes(id.0)
}
}
impl fmt::Display for Id {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.hex_encode())
@@ -136,6 +142,12 @@ macro_rules! id_newtype {
}
}
impl From<$t> for u128 {
fn from(id: $t) -> Self {
u128::from(id.0)
}
}
impl fmt::Display for $t {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)

View File

@@ -802,6 +802,7 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: None,
},
etcd_version: 0,
@@ -818,6 +819,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("no commit_lsn".to_string()),
},
etcd_version: 0,
@@ -834,6 +837,7 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("no commit_lsn".to_string()),
},
etcd_version: 0,
@@ -850,6 +854,7 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: None,
},
etcd_version: 0,
@@ -909,6 +914,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -925,6 +932,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("not advanced Lsn".to_string()),
},
etcd_version: 0,
@@ -941,6 +950,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("not enough advanced Lsn".to_string()),
},
etcd_version: 0,
@@ -975,6 +986,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1007,6 +1020,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("smaller commit_lsn".to_string()),
},
etcd_version: 0,
@@ -1023,6 +1038,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1039,6 +1056,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: None,
},
etcd_version: 0,
@@ -1084,6 +1103,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1100,6 +1121,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1169,6 +1192,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1185,6 +1210,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()),
},
etcd_version: 0,
@@ -1256,6 +1283,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1327,6 +1356,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,

View File

@@ -21,7 +21,8 @@ use metrics::set_build_info_metric;
use safekeeper::broker;
use safekeeper::control_file;
use safekeeper::defaults::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG,
DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
};
use safekeeper::http;
use safekeeper::remove_wal;
@@ -79,12 +80,6 @@ fn main() -> anyhow::Result<()> {
.long("pageserver")
.takes_value(true),
)
.arg(
Arg::new("recall")
.long("recall")
.takes_value(true)
.help("Period for requestion pageserver to call for replication"),
)
.arg(
Arg::new("daemonize")
.short('d')
@@ -119,6 +114,12 @@ fn main() -> anyhow::Result<()> {
.takes_value(true)
.help("a prefix to always use when polling/pusing data in etcd from this safekeeper"),
)
.arg(
Arg::new("heartbeat-timeout")
.long("heartbeat-timeout")
.takes_value(true)
.help(formatcp!("Peer is considered dead after not receiving heartbeats from it during this period (default {}s), passed as a human readable duration.", DEFAULT_HEARTBEAT_TIMEOUT.as_secs()))
)
.arg(
Arg::new("wal-backup-threads").long("backup-threads").takes_value(true).help(formatcp!("number of threads for wal backup (default {DEFAULT_WAL_BACKUP_RUNTIME_THREADS}")),
).arg(
@@ -127,6 +128,12 @@ fn main() -> anyhow::Result<()> {
.takes_value(true)
.help("Remote storage configuration for WAL backup (offloading to s3) as TOML inline table, e.g. {\"max_concurrent_syncs\" = 17, \"max_sync_errors\": 13, \"bucket_name\": \"<BUCKETNAME>\", \"bucket_region\":\"<REGION>\", \"concurrency_limit\": 119}.\nSafekeeper offloads WAL to [prefix_in_bucket/]<tenant_id>/<timeline_id>/<segment_file>, mirroring structure on the file system.")
)
.arg(
Arg::new("max-offloader-lag")
.long("max-offloader-lag")
.takes_value(true)
.help(formatcp!("Safekeeper won't be elected for WAL offloading if it is lagging for more than this value (default {}MB) in bytes", DEFAULT_MAX_OFFLOADER_LAG / (1 << 20)))
)
.arg(
Arg::new("enable-wal-backup")
.long("enable-wal-backup")
@@ -173,10 +180,6 @@ fn main() -> anyhow::Result<()> {
conf.listen_http_addr = addr.to_owned();
}
if let Some(recall) = arg_matches.value_of("recall") {
conf.recall_period = humantime::parse_duration(recall)?;
}
let mut given_id = None;
if let Some(given_id_str) = arg_matches.value_of("id") {
given_id = Some(NodeId(
@@ -194,6 +197,16 @@ fn main() -> anyhow::Result<()> {
conf.broker_etcd_prefix = prefix.to_string();
}
if let Some(heartbeat_timeout_str) = arg_matches.value_of("heartbeat-timeout") {
conf.heartbeat_timeout =
humantime::parse_duration(heartbeat_timeout_str).with_context(|| {
format!(
"failed to parse heartbeat-timeout {}",
heartbeat_timeout_str
)
})?;
}
if let Some(backup_threads) = arg_matches.value_of("wal-backup-threads") {
conf.backup_runtime_threads = backup_threads
.parse()
@@ -206,6 +219,14 @@ fn main() -> anyhow::Result<()> {
let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again
conf.remote_storage = Some(RemoteStorageConfig::from_toml(storage_conf_parsed_toml)?);
}
if let Some(max_offloader_lag_str) = arg_matches.value_of("max-offloader-lag") {
conf.max_offloader_lag = max_offloader_lag_str.parse().with_context(|| {
format!(
"failed to parse max offloader lag {}",
max_offloader_lag_str
)
})?;
}
// Seems like there is no better way to accept bool values explicitly in clap.
conf.wal_backup_enabled = arg_matches
.value_of("enable-wal-backup")

View File

@@ -1,6 +1,5 @@
//! Communication with etcd, providing safekeeper peers and pageserver coordination.
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
@@ -12,11 +11,9 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::HashSet;
use std::time::Duration;
use tokio::spawn;
use tokio::task::JoinHandle;
use tokio::{runtime, time::sleep};
use tracing::*;
use url::Url;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
@@ -56,113 +53,6 @@ fn timeline_safekeeper_path(
)
}
pub struct Election {
pub election_name: String,
pub candidate_name: String,
pub broker_endpoints: Vec<Url>,
}
impl Election {
pub fn new(election_name: String, candidate_name: String, broker_endpoints: Vec<Url>) -> Self {
Self {
election_name,
candidate_name,
broker_endpoints,
}
}
}
pub struct ElectionLeader {
client: Client,
keep_alive: JoinHandle<Result<()>>,
}
impl ElectionLeader {
pub async fn check_am_i(
&mut self,
election_name: String,
candidate_name: String,
) -> Result<bool> {
let resp = self.client.leader(election_name).await?;
let kv = resp
.kv()
.ok_or_else(|| anyhow!("failed to get leader response"))?;
let leader = kv.value_str()?;
Ok(leader == candidate_name)
}
pub async fn give_up(self) {
self.keep_alive.abort();
// TODO: it'll be wise to resign here but it'll happen after lease expiration anyway
// should we await for keep alive termination?
let _ = self.keep_alive.await;
}
}
pub async fn get_leader(req: &Election, leader: &mut Option<ElectionLeader>) -> Result<()> {
let mut client = Client::connect(req.broker_endpoints.clone(), None)
.await
.context("Could not connect to etcd")?;
let lease = client
.lease_grant(LEASE_TTL_SEC, None)
.await
.context("Could not acquire a lease");
let lease_id = lease.map(|l| l.id()).unwrap();
// kill previous keepalive, if any
if let Some(l) = leader.take() {
l.give_up().await;
}
let keep_alive = spawn::<_>(lease_keep_alive(client.clone(), lease_id));
// immediately save handle to kill task if we get canceled below
*leader = Some(ElectionLeader {
client: client.clone(),
keep_alive,
});
client
.campaign(
req.election_name.clone(),
req.candidate_name.clone(),
lease_id,
)
.await?;
Ok(())
}
async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> {
let (mut keeper, mut ka_stream) = client
.lease_keep_alive(lease_id)
.await
.context("failed to create keepalive stream")?;
loop {
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
keeper
.keep_alive()
.await
.context("failed to send LeaseKeepAliveRequest")?;
ka_stream
.message()
.await
.context("failed to receive LeaseKeepAliveResponse")?;
sleep(push_interval).await;
}
}
pub fn get_candiate_name(system_id: NodeId) -> String {
format!("id_{system_id}")
}
async fn push_sk_info(
ttid: TenantTimelineId,
mut client: Client,
@@ -236,7 +126,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
let handles = active_tlis
.iter()
.map(|tli| {
let sk_info = tli.get_public_info(&conf);
let sk_info = tli.get_safekeer_info(&conf);
let key =
timeline_safekeeper_path(conf.broker_etcd_prefix.clone(), tli.ttid, conf.my_id);
let lease = leases.remove(&tli.ttid).unwrap();
@@ -282,6 +172,9 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
Some(new_info) => {
// note: there are blocking operations below, but it's considered fine for now
if let Ok(tli) = GlobalTimelines::get(new_info.key.id) {
// Note that we also receive *our own* info. That's
// important, as it is used as an indication of live
// connection to the broker.
tli.record_safekeeper_info(&new_info.value, new_info.key.node_id)
.await?
}

View File

@@ -1,6 +1,7 @@
//! Code to deal with safekeeper control file upgrades
use crate::safekeeper::{
AcceptorState, Peers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermSwitchEntry,
AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory,
TermSwitchEntry,
};
use anyhow::{bail, Result};
use serde::{Deserialize, Serialize};
@@ -134,7 +135,7 @@ pub struct SafeKeeperStateV4 {
// fundamental; but state is saved here only for informational purposes and
// obviously can be stale. (Currently not saved at all, but let's provision
// place to have less file version upgrades).
pub peers: Peers,
pub peers: PersistedPeers,
}
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState> {
@@ -165,7 +166,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
peers: PersistedPeers(vec![]),
});
// migrate to hexing some ids
} else if version == 2 {
@@ -188,7 +189,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
peers: PersistedPeers(vec![]),
});
// migrate to moving tenant_id/timeline_id to the top and adding some lsns
} else if version == 3 {
@@ -211,7 +212,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
peers: PersistedPeers(vec![]),
});
// migrate to having timeline_start_lsn
} else if version == 4 {
@@ -234,7 +235,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
backup_lsn: Lsn::INVALID,
peer_horizon_lsn: oldstate.peer_horizon_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
peers: PersistedPeers(vec![]),
});
} else if version == 5 {
info!("reading safekeeper control file version {}", version);

View File

@@ -1,4 +1,6 @@
use defaults::DEFAULT_WAL_BACKUP_RUNTIME_THREADS;
use defaults::{
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_MAX_OFFLOADER_LAG, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
};
//
use remote_storage::RemoteStorageConfig;
use std::path::PathBuf;
@@ -36,6 +38,8 @@ pub mod defaults {
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
pub const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
pub const DEFAULT_MAX_OFFLOADER_LAG: u64 = 128 * (1 << 20);
}
#[derive(Debug, Clone)]
@@ -60,6 +64,8 @@ pub struct SafeKeeperConf {
pub broker_endpoints: Vec<Url>,
pub broker_etcd_prefix: String,
pub auth_validation_public_key_path: Option<PathBuf>,
pub heartbeat_timeout: Duration,
pub max_offloader_lag: u64,
}
impl SafeKeeperConf {
@@ -92,6 +98,8 @@ impl Default for SafeKeeperConf {
backup_runtime_threads: DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
wal_backup_enabled: true,
auth_validation_public_key_path: None,
heartbeat_timeout: DEFAULT_HEARTBEAT_TIMEOUT,
max_offloader_lag: DEFAULT_MAX_OFFLOADER_LAG,
}
}
}

View File

@@ -11,6 +11,7 @@ use std::cmp::max;
use std::cmp::min;
use std::fmt;
use std::io::Read;
use tracing::*;
use crate::control_file;
@@ -132,9 +133,8 @@ pub struct ServerInfo {
pub wal_seg_size: u32,
}
/// Data published by safekeeper to the peers
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
pub struct PersistedPeerInfo {
/// LSN up to which safekeeper offloaded WAL to s3.
backup_lsn: Lsn,
/// Term of the last entry.
@@ -145,7 +145,7 @@ pub struct PeerInfo {
commit_lsn: Lsn,
}
impl PeerInfo {
impl PersistedPeerInfo {
fn new() -> Self {
Self {
backup_lsn: Lsn::INVALID,
@@ -156,10 +156,8 @@ impl PeerInfo {
}
}
// vector-based node id -> peer state map with very limited functionality we
// need/
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Peers(pub Vec<(NodeId, PeerInfo)>);
pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
/// Persistent information stored on safekeeper node
/// On disk data is prefixed by magic and format version and followed by checksum.
@@ -203,7 +201,7 @@ pub struct SafeKeeperState {
// fundamental; but state is saved here only for informational purposes and
// obviously can be stale. (Currently not saved at all, but let's provision
// place to have less file version upgrades).
pub peers: Peers,
pub peers: PersistedPeers,
}
#[derive(Debug, Clone)]
@@ -240,7 +238,12 @@ impl SafeKeeperState {
backup_lsn: local_start_lsn,
peer_horizon_lsn: local_start_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(peers.iter().map(|p| (*p, PeerInfo::new())).collect()),
peers: PersistedPeers(
peers
.iter()
.map(|p| (*p, PersistedPeerInfo::new()))
.collect(),
),
}
}

View File

@@ -7,7 +7,7 @@ use etcd_broker::subscription_value::SkTimelineInfo;
use postgres_ffi::XLogSegNo;
use tokio::sync::watch;
use tokio::{sync::watch, time::Instant};
use std::cmp::{max, min};
@@ -26,7 +26,7 @@ use utils::{
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState, ServerInfo,
SafekeeperMemState, ServerInfo, Term,
};
use crate::send_wal::HotStandbyFeedback;
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
@@ -36,6 +36,53 @@ use crate::wal_storage;
use crate::wal_storage::Storage as wal_storage_iface;
use crate::SafeKeeperConf;
/// Things safekeeper should know about timeline state on peers.
#[derive(Debug, Clone)]
pub struct PeerInfo {
pub sk_id: NodeId,
/// Term of the last entry.
_last_log_term: Term,
/// LSN of the last record.
_flush_lsn: Lsn,
pub commit_lsn: Lsn,
/// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
/// sk since backup_lsn.
pub local_start_lsn: Lsn,
/// When info was received.
ts: Instant,
}
impl PeerInfo {
fn from_sk_info(sk_id: NodeId, sk_info: &SkTimelineInfo, ts: Instant) -> PeerInfo {
PeerInfo {
sk_id,
_last_log_term: sk_info.last_log_term.unwrap_or(0),
_flush_lsn: sk_info.flush_lsn.unwrap_or(Lsn::INVALID),
commit_lsn: sk_info.commit_lsn.unwrap_or(Lsn::INVALID),
local_start_lsn: sk_info.local_start_lsn.unwrap_or(Lsn::INVALID),
ts,
}
}
}
// vector-based node id -> peer state map with very limited functionality we
// need.
#[derive(Debug, Clone, Default)]
pub struct PeersInfo(pub Vec<PeerInfo>);
impl PeersInfo {
fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
self.0.iter_mut().find(|p| p.sk_id == id)
}
fn upsert(&mut self, p: &PeerInfo) {
match self.get(p.sk_id) {
Some(rp) => *rp = p.clone(),
None => self.0.push(p.clone()),
}
}
}
/// Replica status update + hot standby feedback
#[derive(Debug, Clone, Copy)]
pub struct ReplicaState {
@@ -74,6 +121,8 @@ impl ReplicaState {
pub struct SharedState {
/// Safekeeper object
sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
/// State of peers as we know it.
peers_info: PeersInfo,
/// State of replicas
replicas: Vec<Option<ReplicaState>>,
/// True when WAL backup launcher oversees the timeline, making sure WAL is
@@ -123,7 +172,8 @@ impl SharedState {
Ok(Self {
sk,
replicas: Vec::new(),
peers_info: PeersInfo(vec![]),
replicas: vec![],
wal_backup_active: false,
active: false,
num_computes: 0,
@@ -142,6 +192,7 @@ impl SharedState {
Ok(Self {
sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
peers_info: PeersInfo(vec![]),
replicas: Vec::new(),
wal_backup_active: false,
active: false,
@@ -268,6 +319,24 @@ impl SharedState {
self.replicas.push(Some(state));
pos
}
fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
SkTimelineInfo {
last_log_term: Some(self.sk.get_epoch()),
flush_lsn: Some(self.sk.wal_store.flush_lsn()),
// note: this value is not flushed to control file yet and can be lost
commit_lsn: Some(self.sk.inmem.commit_lsn),
// TODO: rework feedbacks to avoid max here
remote_consistent_lsn: Some(max(
self.get_replicas_state().remote_consistent_lsn,
self.sk.inmem.remote_consistent_lsn,
)),
peer_horizon_lsn: Some(self.sk.inmem.peer_horizon_lsn),
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
backup_lsn: Some(self.sk.inmem.backup_lsn),
local_start_lsn: Some(self.sk.state.local_start_lsn),
}
}
}
#[derive(Debug, thiserror::Error)]
@@ -632,36 +701,25 @@ impl Timeline {
Ok(())
}
/// Return public safekeeper info for broadcasting to broker and other peers.
pub fn get_public_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
/// Get safekeeper info for broadcasting to broker and other peers.
pub fn get_safekeer_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
let shared_state = self.write_shared_state();
SkTimelineInfo {
last_log_term: Some(shared_state.sk.get_epoch()),
flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()),
// note: this value is not flushed to control file yet and can be lost
commit_lsn: Some(shared_state.sk.inmem.commit_lsn),
// TODO: rework feedbacks to avoid max here
remote_consistent_lsn: Some(max(
shared_state.get_replicas_state().remote_consistent_lsn,
shared_state.sk.inmem.remote_consistent_lsn,
)),
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
backup_lsn: Some(shared_state.sk.inmem.backup_lsn),
}
shared_state.get_safekeeper_info(conf)
}
/// Update timeline state with peer safekeeper data.
pub async fn record_safekeeper_info(
&self,
sk_info: &SkTimelineInfo,
_sk_id: NodeId,
sk_id: NodeId,
) -> Result<()> {
let is_wal_backup_action_pending: bool;
let commit_lsn: Lsn;
{
let mut shared_state = self.write_shared_state();
shared_state.sk.record_safekeeper_info(sk_info)?;
let peer_info = PeerInfo::from_sk_info(sk_id, sk_info, Instant::now());
shared_state.peers_info.upsert(&peer_info);
is_wal_backup_action_pending = shared_state.update_status(self.ttid);
commit_lsn = shared_state.sk.inmem.commit_lsn;
}
@@ -673,6 +731,22 @@ impl Timeline {
Ok(())
}
/// Get our latest view of alive peers status on the timeline.
/// We pass our own info through the broker as well, so when we don't have connection
/// to the broker returned vec is empty.
pub fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
let shared_state = self.write_shared_state();
let now = Instant::now();
shared_state
.peers_info
.0
.iter()
// Regard peer as absent if we haven't heard from it within heartbeat_timeout.
.filter(|p| now.duration_since(p.ts) <= conf.heartbeat_timeout)
.cloned()
.collect()
}
/// Add send_wal replica to the in-memory vector of replicas.
pub fn add_replica(&self, state: ReplicaState) -> usize {
self.write_shared_state().add_replica(state)

View File

@@ -1,8 +1,7 @@
use anyhow::{Context, Result};
use etcd_broker::subscription_key::{
NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind,
};
use tokio::task::JoinHandle;
use utils::id::NodeId;
use std::cmp::min;
use std::collections::HashMap;
@@ -26,14 +25,11 @@ use tracing::*;
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::broker::{Election, ElectionLeader};
use crate::timeline::Timeline;
use crate::{broker, GlobalTimelines, SafeKeeperConf};
use crate::timeline::{PeerInfo, Timeline};
use crate::{GlobalTimelines, SafeKeeperConf};
use once_cell::sync::OnceCell;
const BROKER_CONNECTION_RETRY_DELAY_MS: u64 = 1000;
const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
@@ -70,47 +66,100 @@ struct WalBackupTimelineEntry {
handle: Option<WalBackupTaskHandle>,
}
/// Start per timeline task, if it makes sense for this safekeeper to offload.
fn consider_start_task(
async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
if let Some(wb_handle) = entry.handle.take() {
// Tell the task to shutdown. Error means task exited earlier, that's ok.
let _ = wb_handle.shutdown_tx.send(()).await;
// Await the task itself. TODO: restart panicked tasks earlier.
if let Err(e) = wb_handle.handle.await {
warn!("WAL backup task for {} panicked: {}", ttid, e);
}
}
}
/// The goal is to ensure that normally only one safekeepers offloads. However,
/// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
/// time we have several ones as they PUT the same files. Also,
/// - frequently changing the offloader would be bad;
/// - electing seriously lagging safekeeper is undesirable;
/// So we deterministically choose among the reasonably caught up candidates.
fn determine_offloader(
alive_peers: &[PeerInfo],
wal_backup_lsn: Lsn,
ttid: TenantTimelineId,
conf: &SafeKeeperConf,
) -> (Option<NodeId>, String) {
// TODO: remove this once we fill newly joined safekeepers since backup_lsn.
let capable_peers = alive_peers
.iter()
.filter(|p| p.local_start_lsn <= wal_backup_lsn);
match alive_peers.iter().map(|p| p.commit_lsn).max() {
None => (None, "no connected peers to elect from".to_string()),
Some(max_commit_lsn) => {
let threshold = max_commit_lsn
.checked_sub(conf.max_offloader_lag)
.unwrap_or(Lsn(0));
let mut caughtup_peers = capable_peers
.clone()
.filter(|p| p.commit_lsn >= threshold)
.collect::<Vec<_>>();
caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
// To distribute the load, shift by timeline_id.
let offloader = caughtup_peers
[(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
.sk_id;
(
Some(offloader),
format!(
"elected {} among {:?} peers, with {} of them being caughtup",
offloader,
capable_peers
.map(|p| (p.sk_id, p.commit_lsn))
.collect::<Vec<_>>(),
caughtup_peers.len()
),
)
}
}
}
/// Based on peer information determine which safekeeper should offload; if it
/// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
/// is running, kill it.
async fn update_task(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
task: &mut WalBackupTimelineEntry,
entry: &mut WalBackupTimelineEntry,
) {
if !task.timeline.can_wal_backup() {
return;
let alive_peers = entry.timeline.get_peers(conf);
let wal_backup_lsn = entry.timeline.get_wal_backup_lsn();
let (offloader, election_dbg_str) =
determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
let elected_me = Some(conf.my_id) == offloader;
if elected_me != (entry.handle.is_some()) {
if elected_me {
info!("elected for backup {}: {}", ttid, election_dbg_str);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&ttid);
let handle = tokio::spawn(
backup_task_main(ttid, timeline_dir, shutdown_rx)
.instrument(info_span!("WAL backup task", ttid = %ttid)),
);
entry.handle = Some(WalBackupTaskHandle {
shutdown_tx,
handle,
});
} else {
info!("stepping down from backup {}: {}", ttid, election_dbg_str);
shut_down_task(ttid, entry).await;
}
}
info!("starting WAL backup task for {}", ttid);
// TODO: decide who should offload right here by simply checking current
// state instead of running elections in offloading task.
let election_name = SubscriptionKey {
cluster_prefix: conf.broker_etcd_prefix.clone(),
kind: SubscriptionKind::Operation(
ttid,
NodeKind::Safekeeper,
OperationKind::Safekeeper(SkOperationKind::WalBackup),
),
}
.watch_key();
let my_candidate_name = broker::get_candiate_name(conf.my_id);
let election = broker::Election::new(
election_name,
my_candidate_name,
conf.broker_endpoints.clone(),
);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&ttid);
let handle = tokio::spawn(
backup_task_main(ttid, timeline_dir, shutdown_rx, election)
.instrument(info_span!("WAL backup task", ttid = %ttid)),
);
task.handle = Some(WalBackupTaskHandle {
shutdown_tx,
handle,
});
}
const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
@@ -158,27 +207,20 @@ async fn wal_backup_launcher_main_loop(
timeline,
handle: None,
});
consider_start_task(&conf, ttid, entry);
update_task(&conf, ttid, entry).await;
} else {
// need to stop the task
info!("stopping WAL backup task for {}", ttid);
let entry = tasks.remove(&ttid).unwrap();
if let Some(wb_handle) = entry.handle {
// Tell the task to shutdown. Error means task exited earlier, that's ok.
let _ = wb_handle.shutdown_tx.send(()).await;
// Await the task itself. TODO: restart panicked tasks earlier.
if let Err(e) = wb_handle.handle.await {
warn!("WAL backup task for {} panicked: {}", ttid, e);
}
}
let mut entry = tasks.remove(&ttid).unwrap();
shut_down_task(ttid, &mut entry).await;
}
}
}
// Start known tasks, if needed and possible.
// For each timeline needing offloading, check if this safekeeper
// should do the job and start/stop the task accordingly.
_ = ticker.tick() => {
for (ttid, entry) in tasks.iter_mut().filter(|(_, entry)| entry.handle.is_none()) {
consider_start_task(&conf, *ttid, entry);
for (ttid, entry) in tasks.iter_mut() {
update_task(&conf, *ttid, entry).await;
}
}
}
@@ -190,17 +232,13 @@ struct WalBackupTask {
timeline_dir: PathBuf,
wal_seg_size: usize,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
leader: Option<ElectionLeader>,
election: Election,
}
/// Offload single timeline. Called only after we checked that backup
/// is required (wal_backup_attend) and possible (can_wal_backup).
/// Offload single timeline.
async fn backup_task_main(
ttid: TenantTimelineId,
timeline_dir: PathBuf,
mut shutdown_rx: Receiver<()>,
election: Election,
) {
info!("started");
let res = GlobalTimelines::get(ttid);
@@ -215,8 +253,6 @@ async fn backup_task_main(
commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
timeline: tli,
timeline_dir,
leader: None,
election,
};
// task is spinned up only when wal_seg_size already initialized
@@ -229,9 +265,6 @@ async fn backup_task_main(
canceled = true;
}
}
if let Some(l) = wb.leader {
l.give_up().await;
}
info!("task {}", if canceled { "canceled" } else { "terminated" });
}
@@ -239,107 +272,68 @@ impl WalBackupTask {
async fn run(&mut self) {
let mut backup_lsn = Lsn(0);
// election loop
let mut retry_attempt = 0u32;
// offload loop
loop {
let mut retry_attempt = 0u32;
if retry_attempt == 0 {
// wait for new WAL to arrive
if let Err(e) = self.commit_lsn_watch_rx.changed().await {
// should never happen, as we hold Arc to timeline.
error!("commit_lsn watch shut down: {:?}", e);
return;
}
} else {
// or just sleep if we errored previously
let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
{
retry_delay = min(retry_delay, backoff_delay);
}
sleep(Duration::from_millis(retry_delay)).await;
}
info!("acquiring leadership");
if let Err(e) = broker::get_leader(&self.election, &mut self.leader).await {
error!("error during leader election {:?}", e);
sleep(Duration::from_millis(BROKER_CONNECTION_RETRY_DELAY_MS)).await;
let commit_lsn = *self.commit_lsn_watch_rx.borrow();
// Note that backup_lsn can be higher than commit_lsn if we
// don't have much local WAL and others already uploaded
// segments we don't even have.
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
}
// Perhaps peers advanced the position, check shmem value.
backup_lsn = self.timeline.get_wal_backup_lsn();
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
continue;
}
info!("acquired leadership");
// offload loop
loop {
if retry_attempt == 0 {
// wait for new WAL to arrive
if let Err(e) = self.commit_lsn_watch_rx.changed().await {
// should never happen, as we hold Arc to timeline.
error!("commit_lsn watch shut down: {:?}", e);
match backup_lsn_range(
backup_lsn,
commit_lsn,
self.wal_seg_size,
&self.timeline_dir,
)
.await
{
Ok(backup_lsn_result) => {
backup_lsn = backup_lsn_result;
let res = self.timeline.set_wal_backup_lsn(backup_lsn_result);
if let Err(e) = res {
error!("backup error: {}", e);
return;
}
} else {
// or just sleep if we errored previously
let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
if let Some(backoff_delay) =
UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
{
retry_delay = min(retry_delay, backoff_delay);
}
sleep(Duration::from_millis(retry_delay)).await;
retry_attempt = 0;
}
Err(e) => {
error!(
"failed while offloading range {}-{}: {:?}",
backup_lsn, commit_lsn, e
);
let commit_lsn = *self.commit_lsn_watch_rx.borrow();
// Note that backup_lsn can be higher than commit_lsn if we
// don't have much local WAL and others already uploaded
// segments we don't even have.
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
}
// Perhaps peers advanced the position, check shmem value.
backup_lsn = self.timeline.get_wal_backup_lsn();
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
continue;
}
if let Some(l) = self.leader.as_mut() {
// Optimization idea for later:
// Avoid checking election leader every time by returning current lease grant expiration time
// Re-check leadership only after expiration time,
// such approach would reduce overhead on write-intensive workloads
match l
.check_am_i(
self.election.election_name.clone(),
self.election.candidate_name.clone(),
)
.await
{
Ok(leader) => {
if !leader {
info!("lost leadership");
break;
}
}
Err(e) => {
warn!("error validating leader, {:?}", e);
break;
}
}
}
match backup_lsn_range(
backup_lsn,
commit_lsn,
self.wal_seg_size,
&self.timeline_dir,
)
.await
{
Ok(backup_lsn_result) => {
backup_lsn = backup_lsn_result;
let res = self.timeline.set_wal_backup_lsn(backup_lsn_result);
if let Err(e) = res {
error!("backup error: {}", e);
return;
}
retry_attempt = 0;
}
Err(e) => {
error!(
"failed while offloading range {}-{}: {:?}",
backup_lsn, commit_lsn, e
);
retry_attempt = min(retry_attempt + 1, u32::MAX);
}
retry_attempt = min(retry_attempt + 1, u32::MAX);
}
}
}