From 7480a0338a3aed73b531625f17df5089f0ab0830 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Sun, 16 Oct 2022 15:51:21 +0400 Subject: [PATCH 01/10] Determine safekeeper for offloading WAL without etcd election API. This API is rather pointless, as sane choice anyway requires knowledge of peers status and leaders lifetime in any case can intersect, which is fine for us -- so manual elections are straightforward. Here, we deterministically choose among the reasonably caught up safekeepers, shifting by timeline id to spread the load. A step towards custom broker https://github.com/neondatabase/neon/issues/2394 --- control_plane/src/safekeeper.rs | 1 - libs/etcd_broker/src/subscription_value.rs | 3 + libs/utils/src/id.rs | 12 + .../src/walreceiver/connection_manager.rs | 31 ++ safekeeper/src/bin/safekeeper.rs | 40 ++- safekeeper/src/broker.rs | 115 +------ safekeeper/src/control_file_upgrade.rs | 13 +- safekeeper/src/lib.rs | 13 +- safekeeper/src/safekeeper.rs | 19 +- safekeeper/src/timeline.rs | 131 +++++-- safekeeper/src/wal_backup.rs | 322 +++++++++--------- 11 files changed, 363 insertions(+), 337 deletions(-) diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 64a89124d2..17f5d0c109 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -123,7 +123,6 @@ impl SafekeeperNode { .args(&["--id", self.id.to_string().as_ref()]) .args(&["--listen-pg", &listen_pg]) .args(&["--listen-http", &listen_http]) - .args(&["--recall", "1 second"]) .arg("--daemonize"), ); if !self.conf.sync { diff --git a/libs/etcd_broker/src/subscription_value.rs b/libs/etcd_broker/src/subscription_value.rs index d3e2011761..60a5411926 100644 --- a/libs/etcd_broker/src/subscription_value.rs +++ b/libs/etcd_broker/src/subscription_value.rs @@ -29,6 +29,9 @@ pub struct SkTimelineInfo { #[serde_as(as = "Option")] #[serde(default)] pub peer_horizon_lsn: Option, + #[serde_as(as = "Option")] + #[serde(default)] + pub local_start_lsn: Option, /// A connection string to use for WAL receiving. #[serde(default)] pub safekeeper_connstr: Option, diff --git a/libs/utils/src/id.rs b/libs/utils/src/id.rs index 059ce69ca4..f245f7c3d4 100644 --- a/libs/utils/src/id.rs +++ b/libs/utils/src/id.rs @@ -75,6 +75,12 @@ impl From<[u8; 16]> for Id { } } +impl From for u128 { + fn from(id: Id) -> Self { + u128::from_le_bytes(id.0) + } +} + impl fmt::Display for Id { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(&self.hex_encode()) @@ -136,6 +142,12 @@ macro_rules! id_newtype { } } + impl From<$t> for u128 { + fn from(id: $t) -> Self { + u128::from(id.0) + } + } + impl fmt::Display for $t { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.0.fmt(f) diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index 3a5d1c7ad6..2380caaff1 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -801,6 +801,7 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, safekeeper_connstr: None, }, etcd_version: 0, @@ -817,6 +818,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some("no commit_lsn".to_string()), }, etcd_version: 0, @@ -833,6 +836,7 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, safekeeper_connstr: Some("no commit_lsn".to_string()), }, etcd_version: 0, @@ -849,6 +853,7 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, safekeeper_connstr: None, }, etcd_version: 0, @@ -908,6 +913,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -924,6 +931,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some("not advanced Lsn".to_string()), }, etcd_version: 0, @@ -940,6 +949,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some("not enough advanced Lsn".to_string()), }, etcd_version: 0, @@ -974,6 +985,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1006,6 +1019,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some("smaller commit_lsn".to_string()), }, etcd_version: 0, @@ -1022,6 +1037,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1038,6 +1055,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: None, }, etcd_version: 0, @@ -1083,6 +1102,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1099,6 +1120,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1168,6 +1191,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1184,6 +1209,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()), }, etcd_version: 0, @@ -1255,6 +1282,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1326,6 +1355,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 9422b55d60..a867aea5af 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -21,7 +21,8 @@ use metrics::set_build_info_metric; use safekeeper::broker; use safekeeper::control_file; use safekeeper::defaults::{ - DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS, + DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, + DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS, }; use safekeeper::http; use safekeeper::remove_wal; @@ -72,10 +73,6 @@ fn main() -> anyhow::Result<()> { conf.listen_http_addr = addr.to_string(); } - if let Some(recall) = arg_matches.get_one::("recall") { - conf.recall_period = humantime::parse_duration(recall)?; - } - let mut given_id = None; if let Some(given_id_str) = arg_matches.get_one::("id") { given_id = Some(NodeId( @@ -93,6 +90,16 @@ fn main() -> anyhow::Result<()> { conf.broker_etcd_prefix = prefix.to_string(); } + if let Some(heartbeat_timeout_str) = arg_matches.get_one::("heartbeat-timeout") { + conf.heartbeat_timeout = + humantime::parse_duration(heartbeat_timeout_str).with_context(|| { + format!( + "failed to parse heartbeat-timeout {}", + heartbeat_timeout_str + ) + })?; + } + if let Some(backup_threads) = arg_matches.get_one::("wal-backup-threads") { conf.backup_runtime_threads = backup_threads .parse() @@ -105,6 +112,14 @@ fn main() -> anyhow::Result<()> { let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again conf.remote_storage = Some(RemoteStorageConfig::from_toml(storage_conf_parsed_toml)?); } + if let Some(max_offloader_lag_str) = arg_matches.get_one::("max-offloader-lag") { + conf.max_offloader_lag_bytes = max_offloader_lag_str.parse().with_context(|| { + format!( + "failed to parse max offloader lag {}", + max_offloader_lag_str + ) + })?; + } // Seems like there is no better way to accept bool values explicitly in clap. conf.wal_backup_enabled = arg_matches .get_one::("enable-wal-backup") @@ -361,11 +376,6 @@ fn cli() -> Command { .short('p') .long("pageserver"), ) - .arg( - Arg::new("recall") - .long("recall") - .help("Period for requestion pageserver to call for replication"), - ) .arg( Arg::new("daemonize") .short('d') @@ -397,6 +407,11 @@ fn cli() -> Command { .long("broker-etcd-prefix") .help("a prefix to always use when polling/pusing data in etcd from this safekeeper"), ) + .arg( + Arg::new("heartbeat-timeout") + .long("heartbeat-timeout") + .help(formatcp!("Peer is considered dead after not receiving heartbeats from it during this period (default {}s), passed as a human readable duration.", DEFAULT_HEARTBEAT_TIMEOUT.as_secs())) + ) .arg( Arg::new("wal-backup-threads").long("backup-threads").help(formatcp!("number of threads for wal backup (default {DEFAULT_WAL_BACKUP_RUNTIME_THREADS}")), ).arg( @@ -404,6 +419,11 @@ fn cli() -> Command { .long("remote-storage") .help("Remote storage configuration for WAL backup (offloading to s3) as TOML inline table, e.g. {\"max_concurrent_syncs\" = 17, \"max_sync_errors\": 13, \"bucket_name\": \"\", \"bucket_region\":\"\", \"concurrency_limit\": 119}.\nSafekeeper offloads WAL to [prefix_in_bucket/]//, mirroring structure on the file system.") ) + .arg( + Arg::new("max-offloader-lag") + .long("max-offloader-lag") + .help(formatcp!("Safekeeper won't be elected for WAL offloading if it is lagging for more than this value (default {}MB) in bytes", DEFAULT_MAX_OFFLOADER_LAG_BYTES / (1 << 20))) + ) .arg( Arg::new("enable-wal-backup") .long("enable-wal-backup") diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 6a2456ecda..76135241b9 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -1,6 +1,5 @@ //! Communication with etcd, providing safekeeper peers and pageserver coordination. -use anyhow::anyhow; use anyhow::Context; use anyhow::Error; use anyhow::Result; @@ -12,11 +11,9 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::collections::HashSet; use std::time::Duration; -use tokio::spawn; use tokio::task::JoinHandle; use tokio::{runtime, time::sleep}; use tracing::*; -use url::Url; use crate::GlobalTimelines; use crate::SafeKeeperConf; @@ -56,113 +53,6 @@ fn timeline_safekeeper_path( ) } -pub struct Election { - pub election_name: String, - pub candidate_name: String, - pub broker_endpoints: Vec, -} - -impl Election { - pub fn new(election_name: String, candidate_name: String, broker_endpoints: Vec) -> Self { - Self { - election_name, - candidate_name, - broker_endpoints, - } - } -} - -pub struct ElectionLeader { - client: Client, - keep_alive: JoinHandle>, -} - -impl ElectionLeader { - pub async fn check_am_i( - &mut self, - election_name: String, - candidate_name: String, - ) -> Result { - let resp = self.client.leader(election_name).await?; - - let kv = resp - .kv() - .ok_or_else(|| anyhow!("failed to get leader response"))?; - let leader = kv.value_str()?; - - Ok(leader == candidate_name) - } - - pub async fn give_up(self) { - self.keep_alive.abort(); - // TODO: it'll be wise to resign here but it'll happen after lease expiration anyway - // should we await for keep alive termination? - let _ = self.keep_alive.await; - } -} - -pub async fn get_leader(req: &Election, leader: &mut Option) -> Result<()> { - let mut client = Client::connect(req.broker_endpoints.clone(), None) - .await - .context("Could not connect to etcd")?; - - let lease = client - .lease_grant(LEASE_TTL_SEC, None) - .await - .context("Could not acquire a lease"); - - let lease_id = lease.map(|l| l.id()).unwrap(); - - // kill previous keepalive, if any - if let Some(l) = leader.take() { - l.give_up().await; - } - - let keep_alive = spawn::<_>(lease_keep_alive(client.clone(), lease_id)); - // immediately save handle to kill task if we get canceled below - *leader = Some(ElectionLeader { - client: client.clone(), - keep_alive, - }); - - client - .campaign( - req.election_name.clone(), - req.candidate_name.clone(), - lease_id, - ) - .await?; - - Ok(()) -} - -async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> { - let (mut keeper, mut ka_stream) = client - .lease_keep_alive(lease_id) - .await - .context("failed to create keepalive stream")?; - - loop { - let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC); - - keeper - .keep_alive() - .await - .context("failed to send LeaseKeepAliveRequest")?; - - ka_stream - .message() - .await - .context("failed to receive LeaseKeepAliveResponse")?; - - sleep(push_interval).await; - } -} - -pub fn get_candiate_name(system_id: NodeId) -> String { - format!("id_{system_id}") -} - async fn push_sk_info( ttid: TenantTimelineId, mut client: Client, @@ -236,7 +126,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { let handles = active_tlis .iter() .map(|tli| { - let sk_info = tli.get_public_info(&conf); + let sk_info = tli.get_safekeeper_info(&conf); let key = timeline_safekeeper_path(conf.broker_etcd_prefix.clone(), tli.ttid, conf.my_id); let lease = leases.remove(&tli.ttid).unwrap(); @@ -282,6 +172,9 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { Some(new_info) => { // note: there are blocking operations below, but it's considered fine for now if let Ok(tli) = GlobalTimelines::get(new_info.key.id) { + // Note that we also receive *our own* info. That's + // important, as it is used as an indication of live + // connection to the broker. tli.record_safekeeper_info(&new_info.value, new_info.key.node_id) .await? } diff --git a/safekeeper/src/control_file_upgrade.rs b/safekeeper/src/control_file_upgrade.rs index 1ce9186085..856c164be8 100644 --- a/safekeeper/src/control_file_upgrade.rs +++ b/safekeeper/src/control_file_upgrade.rs @@ -1,6 +1,7 @@ //! Code to deal with safekeeper control file upgrades use crate::safekeeper::{ - AcceptorState, Peers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermSwitchEntry, + AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, + TermSwitchEntry, }; use anyhow::{bail, Result}; use serde::{Deserialize, Serialize}; @@ -134,7 +135,7 @@ pub struct SafeKeeperStateV4 { // fundamental; but state is saved here only for informational purposes and // obviously can be stale. (Currently not saved at all, but let's provision // place to have less file version upgrades). - pub peers: Peers, + pub peers: PersistedPeers, } pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result { @@ -165,7 +166,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result backup_lsn: Lsn(0), peer_horizon_lsn: oldstate.truncate_lsn, remote_consistent_lsn: Lsn(0), - peers: Peers(vec![]), + peers: PersistedPeers(vec![]), }); // migrate to hexing some ids } else if version == 2 { @@ -188,7 +189,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result backup_lsn: Lsn(0), peer_horizon_lsn: oldstate.truncate_lsn, remote_consistent_lsn: Lsn(0), - peers: Peers(vec![]), + peers: PersistedPeers(vec![]), }); // migrate to moving tenant_id/timeline_id to the top and adding some lsns } else if version == 3 { @@ -211,7 +212,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result backup_lsn: Lsn(0), peer_horizon_lsn: oldstate.truncate_lsn, remote_consistent_lsn: Lsn(0), - peers: Peers(vec![]), + peers: PersistedPeers(vec![]), }); // migrate to having timeline_start_lsn } else if version == 4 { @@ -234,7 +235,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result backup_lsn: Lsn::INVALID, peer_horizon_lsn: oldstate.peer_horizon_lsn, remote_consistent_lsn: Lsn(0), - peers: Peers(vec![]), + peers: PersistedPeers(vec![]), }); } else if version == 5 { info!("reading safekeeper control file version {}", version); diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index e38a5a4633..19dff79b88 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -1,4 +1,6 @@ -use defaults::DEFAULT_WAL_BACKUP_RUNTIME_THREADS; +use defaults::{ + DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_WAL_BACKUP_RUNTIME_THREADS, +}; // use remote_storage::RemoteStorageConfig; use std::path::PathBuf; @@ -34,8 +36,9 @@ pub mod defaults { DEFAULT_PG_LISTEN_PORT, }; - pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10); pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8; + pub const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5); + pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20); } #[derive(Debug, Clone)] @@ -52,7 +55,6 @@ pub struct SafeKeeperConf { pub no_sync: bool, pub listen_pg_addr: String, pub listen_http_addr: String, - pub recall_period: Duration, pub remote_storage: Option, pub backup_runtime_threads: usize, pub wal_backup_enabled: bool, @@ -60,6 +62,8 @@ pub struct SafeKeeperConf { pub broker_endpoints: Vec, pub broker_etcd_prefix: String, pub auth_validation_public_key_path: Option, + pub heartbeat_timeout: Duration, + pub max_offloader_lag_bytes: u64, } impl SafeKeeperConf { @@ -85,13 +89,14 @@ impl Default for SafeKeeperConf { listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), remote_storage: None, - recall_period: defaults::DEFAULT_RECALL_PERIOD, my_id: NodeId(0), broker_endpoints: Vec::new(), broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), backup_runtime_threads: DEFAULT_WAL_BACKUP_RUNTIME_THREADS, wal_backup_enabled: true, auth_validation_public_key_path: None, + heartbeat_timeout: DEFAULT_HEARTBEAT_TIMEOUT, + max_offloader_lag_bytes: DEFAULT_MAX_OFFLOADER_LAG_BYTES, } } } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 7b11aaf92a..3f9b70f282 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -11,6 +11,7 @@ use std::cmp::max; use std::cmp::min; use std::fmt; use std::io::Read; + use tracing::*; use crate::control_file; @@ -132,9 +133,8 @@ pub struct ServerInfo { pub wal_seg_size: u32, } -/// Data published by safekeeper to the peers #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct PeerInfo { +pub struct PersistedPeerInfo { /// LSN up to which safekeeper offloaded WAL to s3. backup_lsn: Lsn, /// Term of the last entry. @@ -145,7 +145,7 @@ pub struct PeerInfo { commit_lsn: Lsn, } -impl PeerInfo { +impl PersistedPeerInfo { fn new() -> Self { Self { backup_lsn: Lsn::INVALID, @@ -156,10 +156,8 @@ impl PeerInfo { } } -// vector-based node id -> peer state map with very limited functionality we -// need/ #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Peers(pub Vec<(NodeId, PeerInfo)>); +pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>); /// Persistent information stored on safekeeper node /// On disk data is prefixed by magic and format version and followed by checksum. @@ -203,7 +201,7 @@ pub struct SafeKeeperState { // fundamental; but state is saved here only for informational purposes and // obviously can be stale. (Currently not saved at all, but let's provision // place to have less file version upgrades). - pub peers: Peers, + pub peers: PersistedPeers, } #[derive(Debug, Clone)] @@ -240,7 +238,12 @@ impl SafeKeeperState { backup_lsn: local_start_lsn, peer_horizon_lsn: local_start_lsn, remote_consistent_lsn: Lsn(0), - peers: Peers(peers.iter().map(|p| (*p, PeerInfo::new())).collect()), + peers: PersistedPeers( + peers + .iter() + .map(|p| (*p, PersistedPeerInfo::new())) + .collect(), + ), } } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 3fb77bf582..1930b3574a 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -7,7 +7,7 @@ use etcd_broker::subscription_value::SkTimelineInfo; use postgres_ffi::XLogSegNo; -use tokio::sync::watch; +use tokio::{sync::watch, time::Instant}; use std::cmp::{max, min}; @@ -26,7 +26,7 @@ use utils::{ use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, - SafekeeperMemState, ServerInfo, + SafekeeperMemState, ServerInfo, Term, }; use crate::send_wal::HotStandbyFeedback; use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION}; @@ -36,6 +36,53 @@ use crate::wal_storage; use crate::wal_storage::Storage as wal_storage_iface; use crate::SafeKeeperConf; +/// Things safekeeper should know about timeline state on peers. +#[derive(Debug, Clone)] +pub struct PeerInfo { + pub sk_id: NodeId, + /// Term of the last entry. + _last_log_term: Term, + /// LSN of the last record. + _flush_lsn: Lsn, + pub commit_lsn: Lsn, + /// Since which LSN safekeeper has WAL. TODO: remove this once we fill new + /// sk since backup_lsn. + pub local_start_lsn: Lsn, + /// When info was received. + ts: Instant, +} + +impl PeerInfo { + fn from_sk_info(sk_id: NodeId, sk_info: &SkTimelineInfo, ts: Instant) -> PeerInfo { + PeerInfo { + sk_id, + _last_log_term: sk_info.last_log_term.unwrap_or(0), + _flush_lsn: sk_info.flush_lsn.unwrap_or(Lsn::INVALID), + commit_lsn: sk_info.commit_lsn.unwrap_or(Lsn::INVALID), + local_start_lsn: sk_info.local_start_lsn.unwrap_or(Lsn::INVALID), + ts, + } + } +} + +// vector-based node id -> peer state map with very limited functionality we +// need. +#[derive(Debug, Clone, Default)] +pub struct PeersInfo(pub Vec); + +impl PeersInfo { + fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> { + self.0.iter_mut().find(|p| p.sk_id == id) + } + + fn upsert(&mut self, p: &PeerInfo) { + match self.get(p.sk_id) { + Some(rp) => *rp = p.clone(), + None => self.0.push(p.clone()), + } + } +} + /// Replica status update + hot standby feedback #[derive(Debug, Clone, Copy)] pub struct ReplicaState { @@ -74,6 +121,8 @@ impl ReplicaState { pub struct SharedState { /// Safekeeper object sk: SafeKeeper, + /// In memory list containing state of peers sent in latest messages from them. + peers_info: PeersInfo, /// State of replicas replicas: Vec>, /// True when WAL backup launcher oversees the timeline, making sure WAL is @@ -123,7 +172,8 @@ impl SharedState { Ok(Self { sk, - replicas: Vec::new(), + peers_info: PeersInfo(vec![]), + replicas: vec![], wal_backup_active: false, active: false, num_computes: 0, @@ -142,6 +192,7 @@ impl SharedState { Ok(Self { sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?, + peers_info: PeersInfo(vec![]), replicas: Vec::new(), wal_backup_active: false, active: false, @@ -201,12 +252,6 @@ impl SharedState { self.wal_backup_active } - // Can this safekeeper offload to s3? Recently joined safekeepers might not - // have necessary WAL. - fn can_wal_backup(&self) -> bool { - self.sk.state.local_start_lsn <= self.sk.inmem.backup_lsn - } - fn get_wal_seg_size(&self) -> usize { self.sk.state.server.wal_seg_size as usize } @@ -268,6 +313,24 @@ impl SharedState { self.replicas.push(Some(state)); pos } + + fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo { + SkTimelineInfo { + last_log_term: Some(self.sk.get_epoch()), + flush_lsn: Some(self.sk.wal_store.flush_lsn()), + // note: this value is not flushed to control file yet and can be lost + commit_lsn: Some(self.sk.inmem.commit_lsn), + // TODO: rework feedbacks to avoid max here + remote_consistent_lsn: Some(max( + self.get_replicas_state().remote_consistent_lsn, + self.sk.inmem.remote_consistent_lsn, + )), + peer_horizon_lsn: Some(self.sk.inmem.peer_horizon_lsn), + safekeeper_connstr: Some(conf.listen_pg_addr.clone()), + backup_lsn: Some(self.sk.inmem.backup_lsn), + local_start_lsn: Some(self.sk.state.local_start_lsn), + } + } } #[derive(Debug, thiserror::Error)] @@ -517,17 +580,6 @@ impl Timeline { self.write_shared_state().wal_backup_attend() } - /// Can this safekeeper offload to s3? Recently joined safekeepers might not - /// have necessary WAL. - pub fn can_wal_backup(&self) -> bool { - if self.is_cancelled() { - return false; - } - - let shared_state = self.write_shared_state(); - shared_state.can_wal_backup() - } - /// Returns full timeline info, required for the metrics. If the timeline is /// not active, returns None instead. pub fn info_for_metrics(&self) -> Option { @@ -632,36 +684,25 @@ impl Timeline { Ok(()) } - /// Return public safekeeper info for broadcasting to broker and other peers. - pub fn get_public_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo { + /// Get safekeeper info for broadcasting to broker and other peers. + pub fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo { let shared_state = self.write_shared_state(); - SkTimelineInfo { - last_log_term: Some(shared_state.sk.get_epoch()), - flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()), - // note: this value is not flushed to control file yet and can be lost - commit_lsn: Some(shared_state.sk.inmem.commit_lsn), - // TODO: rework feedbacks to avoid max here - remote_consistent_lsn: Some(max( - shared_state.get_replicas_state().remote_consistent_lsn, - shared_state.sk.inmem.remote_consistent_lsn, - )), - peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn), - safekeeper_connstr: Some(conf.listen_pg_addr.clone()), - backup_lsn: Some(shared_state.sk.inmem.backup_lsn), - } + shared_state.get_safekeeper_info(conf) } /// Update timeline state with peer safekeeper data. pub async fn record_safekeeper_info( &self, sk_info: &SkTimelineInfo, - _sk_id: NodeId, + sk_id: NodeId, ) -> Result<()> { let is_wal_backup_action_pending: bool; let commit_lsn: Lsn; { let mut shared_state = self.write_shared_state(); shared_state.sk.record_safekeeper_info(sk_info)?; + let peer_info = PeerInfo::from_sk_info(sk_id, sk_info, Instant::now()); + shared_state.peers_info.upsert(&peer_info); is_wal_backup_action_pending = shared_state.update_status(self.ttid); commit_lsn = shared_state.sk.inmem.commit_lsn; } @@ -673,6 +714,22 @@ impl Timeline { Ok(()) } + /// Get our latest view of alive peers status on the timeline. + /// We pass our own info through the broker as well, so when we don't have connection + /// to the broker returned vec is empty. + pub fn get_peers(&self, conf: &SafeKeeperConf) -> Vec { + let shared_state = self.write_shared_state(); + let now = Instant::now(); + shared_state + .peers_info + .0 + .iter() + // Regard peer as absent if we haven't heard from it within heartbeat_timeout. + .filter(|p| now.duration_since(p.ts) <= conf.heartbeat_timeout) + .cloned() + .collect() + } + /// Add send_wal replica to the in-memory vector of replicas. pub fn add_replica(&self, state: ReplicaState) -> usize { self.write_shared_state().add_replica(state) diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index c82a003161..13287bd036 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -1,8 +1,7 @@ use anyhow::{Context, Result}; -use etcd_broker::subscription_key::{ - NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind, -}; + use tokio::task::JoinHandle; +use utils::id::NodeId; use std::cmp::min; use std::collections::HashMap; @@ -26,14 +25,11 @@ use tracing::*; use utils::{id::TenantTimelineId, lsn::Lsn}; -use crate::broker::{Election, ElectionLeader}; -use crate::timeline::Timeline; -use crate::{broker, GlobalTimelines, SafeKeeperConf}; +use crate::timeline::{PeerInfo, Timeline}; +use crate::{GlobalTimelines, SafeKeeperConf}; use once_cell::sync::OnceCell; -const BROKER_CONNECTION_RETRY_DELAY_MS: u64 = 1000; - const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10; const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000; @@ -70,47 +66,104 @@ struct WalBackupTimelineEntry { handle: Option, } -/// Start per timeline task, if it makes sense for this safekeeper to offload. -fn consider_start_task( +async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) { + if let Some(wb_handle) = entry.handle.take() { + // Tell the task to shutdown. Error means task exited earlier, that's ok. + let _ = wb_handle.shutdown_tx.send(()).await; + // Await the task itself. TODO: restart panicked tasks earlier. + if let Err(e) = wb_handle.handle.await { + warn!("WAL backup task for {} panicked: {}", ttid, e); + } + } +} + +/// The goal is to ensure that normally only one safekeepers offloads. However, +/// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short +/// time we have several ones as they PUT the same files. Also, +/// - frequently changing the offloader would be bad; +/// - electing seriously lagging safekeeper is undesirable; +/// So we deterministically choose among the reasonably caught up candidates. +/// TODO: take into account failed attempts to deal with hypothetical situation +/// where s3 is unreachable only for some sks. +fn determine_offloader( + alive_peers: &[PeerInfo], + wal_backup_lsn: Lsn, + ttid: TenantTimelineId, + conf: &SafeKeeperConf, +) -> (Option, String) { + // TODO: remove this once we fill newly joined safekeepers since backup_lsn. + let capable_peers = alive_peers + .iter() + .filter(|p| p.local_start_lsn <= wal_backup_lsn); + match alive_peers.iter().map(|p| p.commit_lsn).max() { + None => (None, "no connected peers to elect from".to_string()), + Some(max_commit_lsn) => { + let threshold = max_commit_lsn + .checked_sub(conf.max_offloader_lag_bytes) + .unwrap_or(Lsn(0)); + let mut caughtup_peers = capable_peers + .clone() + .filter(|p| p.commit_lsn >= threshold) + .collect::>(); + caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id)); + + // To distribute the load, shift by timeline_id. + let offloader = caughtup_peers + [(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize] + .sk_id; + + let mut capable_peers_dbg = capable_peers + .map(|p| (p.sk_id, p.commit_lsn)) + .collect::>(); + capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0)); + ( + Some(offloader), + format!( + "elected {} among {:?} peers, with {} of them being caughtup", + offloader, + capable_peers_dbg, + caughtup_peers.len() + ), + ) + } + } +} + +/// Based on peer information determine which safekeeper should offload; if it +/// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task +/// is running, kill it. +async fn update_task( conf: &SafeKeeperConf, ttid: TenantTimelineId, - task: &mut WalBackupTimelineEntry, + entry: &mut WalBackupTimelineEntry, ) { - if !task.timeline.can_wal_backup() { - return; + let alive_peers = entry.timeline.get_peers(conf); + let wal_backup_lsn = entry.timeline.get_wal_backup_lsn(); + let (offloader, election_dbg_str) = + determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf); + let elected_me = Some(conf.my_id) == offloader; + + if elected_me != (entry.handle.is_some()) { + if elected_me { + info!("elected for backup {}: {}", ttid, election_dbg_str); + + let (shutdown_tx, shutdown_rx) = mpsc::channel(1); + let timeline_dir = conf.timeline_dir(&ttid); + + let handle = tokio::spawn( + backup_task_main(ttid, timeline_dir, shutdown_rx) + .instrument(info_span!("WAL backup task", ttid = %ttid)), + ); + + entry.handle = Some(WalBackupTaskHandle { + shutdown_tx, + handle, + }); + } else { + info!("stepping down from backup {}: {}", ttid, election_dbg_str); + shut_down_task(ttid, entry).await; + } } - info!("starting WAL backup task for {}", ttid); - - // TODO: decide who should offload right here by simply checking current - // state instead of running elections in offloading task. - let election_name = SubscriptionKey { - cluster_prefix: conf.broker_etcd_prefix.clone(), - kind: SubscriptionKind::Operation( - ttid, - NodeKind::Safekeeper, - OperationKind::Safekeeper(SkOperationKind::WalBackup), - ), - } - .watch_key(); - let my_candidate_name = broker::get_candiate_name(conf.my_id); - let election = broker::Election::new( - election_name, - my_candidate_name, - conf.broker_endpoints.clone(), - ); - - let (shutdown_tx, shutdown_rx) = mpsc::channel(1); - let timeline_dir = conf.timeline_dir(&ttid); - - let handle = tokio::spawn( - backup_task_main(ttid, timeline_dir, shutdown_rx, election) - .instrument(info_span!("WAL backup task", ttid = %ttid)), - ); - - task.handle = Some(WalBackupTaskHandle { - shutdown_tx, - handle, - }); } const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000; @@ -158,27 +211,20 @@ async fn wal_backup_launcher_main_loop( timeline, handle: None, }); - consider_start_task(&conf, ttid, entry); + update_task(&conf, ttid, entry).await; } else { // need to stop the task info!("stopping WAL backup task for {}", ttid); - - let entry = tasks.remove(&ttid).unwrap(); - if let Some(wb_handle) = entry.handle { - // Tell the task to shutdown. Error means task exited earlier, that's ok. - let _ = wb_handle.shutdown_tx.send(()).await; - // Await the task itself. TODO: restart panicked tasks earlier. - if let Err(e) = wb_handle.handle.await { - warn!("WAL backup task for {} panicked: {}", ttid, e); - } - } + let mut entry = tasks.remove(&ttid).unwrap(); + shut_down_task(ttid, &mut entry).await; } } } - // Start known tasks, if needed and possible. + // For each timeline needing offloading, check if this safekeeper + // should do the job and start/stop the task accordingly. _ = ticker.tick() => { - for (ttid, entry) in tasks.iter_mut().filter(|(_, entry)| entry.handle.is_none()) { - consider_start_task(&conf, *ttid, entry); + for (ttid, entry) in tasks.iter_mut() { + update_task(&conf, *ttid, entry).await; } } } @@ -190,17 +236,13 @@ struct WalBackupTask { timeline_dir: PathBuf, wal_seg_size: usize, commit_lsn_watch_rx: watch::Receiver, - leader: Option, - election: Election, } -/// Offload single timeline. Called only after we checked that backup -/// is required (wal_backup_attend) and possible (can_wal_backup). +/// Offload single timeline. async fn backup_task_main( ttid: TenantTimelineId, timeline_dir: PathBuf, mut shutdown_rx: Receiver<()>, - election: Election, ) { info!("started"); let res = GlobalTimelines::get(ttid); @@ -215,8 +257,6 @@ async fn backup_task_main( commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(), timeline: tli, timeline_dir, - leader: None, - election, }; // task is spinned up only when wal_seg_size already initialized @@ -229,9 +269,6 @@ async fn backup_task_main( canceled = true; } } - if let Some(l) = wb.leader { - l.give_up().await; - } info!("task {}", if canceled { "canceled" } else { "terminated" }); } @@ -239,106 +276,71 @@ impl WalBackupTask { async fn run(&mut self) { let mut backup_lsn = Lsn(0); - // election loop + let mut retry_attempt = 0u32; + // offload loop loop { - let mut retry_attempt = 0u32; + if retry_attempt == 0 { + // wait for new WAL to arrive + if let Err(e) = self.commit_lsn_watch_rx.changed().await { + // should never happen, as we hold Arc to timeline. + error!("commit_lsn watch shut down: {:?}", e); + return; + } + } else { + // or just sleep if we errored previously + let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS; + if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt) + { + retry_delay = min(retry_delay, backoff_delay); + } + sleep(Duration::from_millis(retry_delay)).await; + } - info!("acquiring leadership"); - if let Err(e) = broker::get_leader(&self.election, &mut self.leader).await { - error!("error during leader election {:?}", e); - sleep(Duration::from_millis(BROKER_CONNECTION_RETRY_DELAY_MS)).await; + let commit_lsn = *self.commit_lsn_watch_rx.borrow(); + + // Note that backup_lsn can be higher than commit_lsn if we + // don't have much local WAL and others already uploaded + // segments we don't even have. + if backup_lsn.segment_number(self.wal_seg_size) + >= commit_lsn.segment_number(self.wal_seg_size) + { + retry_attempt = 0; + continue; /* nothing to do, common case as we wake up on every commit_lsn bump */ + } + // Perhaps peers advanced the position, check shmem value. + backup_lsn = self.timeline.get_wal_backup_lsn(); + if backup_lsn.segment_number(self.wal_seg_size) + >= commit_lsn.segment_number(self.wal_seg_size) + { + retry_attempt = 0; continue; } - info!("acquired leadership"); - // offload loop - loop { - if retry_attempt == 0 { - // wait for new WAL to arrive - if let Err(e) = self.commit_lsn_watch_rx.changed().await { - // should never happen, as we hold Arc to timeline. - error!("commit_lsn watch shut down: {:?}", e); + match backup_lsn_range( + backup_lsn, + commit_lsn, + self.wal_seg_size, + &self.timeline_dir, + ) + .await + { + Ok(backup_lsn_result) => { + backup_lsn = backup_lsn_result; + let res = self.timeline.set_wal_backup_lsn(backup_lsn_result); + if let Err(e) = res { + error!("failed to set wal_backup_lsn: {}", e); return; } - } else { - // or just sleep if we errored previously - let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS; - if let Some(backoff_delay) = - UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt) - { - retry_delay = min(retry_delay, backoff_delay); - } - sleep(Duration::from_millis(retry_delay)).await; + retry_attempt = 0; } + Err(e) => { + error!( + "failed while offloading range {}-{}: {:?}", + backup_lsn, commit_lsn, e + ); - let commit_lsn = *self.commit_lsn_watch_rx.borrow(); - - // Note that backup_lsn can be higher than commit_lsn if we - // don't have much local WAL and others already uploaded - // segments we don't even have. - if backup_lsn.segment_number(self.wal_seg_size) - >= commit_lsn.segment_number(self.wal_seg_size) - { - continue; /* nothing to do, common case as we wake up on every commit_lsn bump */ - } - // Perhaps peers advanced the position, check shmem value. - backup_lsn = self.timeline.get_wal_backup_lsn(); - if backup_lsn.segment_number(self.wal_seg_size) - >= commit_lsn.segment_number(self.wal_seg_size) - { - continue; - } - - if let Some(l) = self.leader.as_mut() { - // Optimization idea for later: - // Avoid checking election leader every time by returning current lease grant expiration time - // Re-check leadership only after expiration time, - // such approach would reduce overhead on write-intensive workloads - - match l - .check_am_i( - self.election.election_name.clone(), - self.election.candidate_name.clone(), - ) - .await - { - Ok(leader) => { - if !leader { - info!("lost leadership"); - break; - } - } - Err(e) => { - warn!("error validating leader, {:?}", e); - break; - } - } - } - - match backup_lsn_range( - backup_lsn, - commit_lsn, - self.wal_seg_size, - &self.timeline_dir, - ) - .await - { - Ok(backup_lsn_result) => { - backup_lsn = backup_lsn_result; - let res = self.timeline.set_wal_backup_lsn(backup_lsn_result); - if let Err(e) = res { - error!("backup error: {}", e); - return; - } - retry_attempt = 0; - } - Err(e) => { - error!( - "failed while offloading range {}-{}: {:?}", - backup_lsn, commit_lsn, e - ); - - retry_attempt = min(retry_attempt + 1, u32::MAX); + if retry_attempt < u32::MAX { + retry_attempt += 1; } } } From 6ff2c61ae0b80a5d53421a32d8c31c6a742b0072 Mon Sep 17 00:00:00 2001 From: Sergey Melnikov Date: Fri, 21 Oct 2022 16:44:08 +0300 Subject: [PATCH 02/10] Refactor safekeeper s3 config and change it for new account (#2672) --- .github/ansible/neon-stress.hosts.yaml | 2 +- .github/ansible/production.hosts.yaml | 2 +- .github/ansible/staging.hosts.yaml | 2 +- .github/ansible/staging.us-east-2.hosts.yaml | 2 +- .github/ansible/systemd/safekeeper.service | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/ansible/neon-stress.hosts.yaml b/.github/ansible/neon-stress.hosts.yaml index 8afc9a5be8..dd61ac5a5e 100644 --- a/.github/ansible/neon-stress.hosts.yaml +++ b/.github/ansible/neon-stress.hosts.yaml @@ -3,7 +3,6 @@ storage: bucket_name: neon-storage-ireland bucket_region: eu-west-1 console_mgmt_base_url: http://neon-stress-console.local - env_name: neon-stress etcd_endpoints: neon-stress-etcd.local:2379 safekeeper_enable_s3_offload: 'false' pageserver_config_stub: @@ -12,6 +11,7 @@ storage: bucket_name: "{{ bucket_name }}" bucket_region: "{{ bucket_region }}" prefix_in_bucket: "{{ inventory_hostname }}" + safekeeper_s3_prefix: neon-stress/wal hostname_suffix: ".local" remote_user: admin children: diff --git a/.github/ansible/production.hosts.yaml b/.github/ansible/production.hosts.yaml index 9f9b12d25d..bca2614399 100644 --- a/.github/ansible/production.hosts.yaml +++ b/.github/ansible/production.hosts.yaml @@ -1,7 +1,6 @@ --- storage: vars: - env_name: prod-1 console_mgmt_base_url: http://console-release.local bucket_name: zenith-storage-oregon bucket_region: us-west-2 @@ -12,6 +11,7 @@ storage: bucket_name: "{{ bucket_name }}" bucket_region: "{{ bucket_region }}" prefix_in_bucket: "{{ inventory_hostname }}" + safekeeper_s3_prefix: prod-1/wal hostname_suffix: ".local" remote_user: admin diff --git a/.github/ansible/staging.hosts.yaml b/.github/ansible/staging.hosts.yaml index 7e91e8e728..44d971455d 100644 --- a/.github/ansible/staging.hosts.yaml +++ b/.github/ansible/staging.hosts.yaml @@ -3,7 +3,6 @@ storage: bucket_name: zenith-staging-storage-us-east-1 bucket_region: us-east-1 console_mgmt_base_url: http://console-staging.local - env_name: us-stage etcd_endpoints: zenith-us-stage-etcd.local:2379 pageserver_config_stub: pg_distrib_dir: /usr/local @@ -11,6 +10,7 @@ storage: bucket_name: "{{ bucket_name }}" bucket_region: "{{ bucket_region }}" prefix_in_bucket: "{{ inventory_hostname }}" + safekeeper_s3_prefix: us-stage/wal hostname_suffix: ".local" remote_user: admin diff --git a/.github/ansible/staging.us-east-2.hosts.yaml b/.github/ansible/staging.us-east-2.hosts.yaml index 5da0cce973..db3ed87c45 100644 --- a/.github/ansible/staging.us-east-2.hosts.yaml +++ b/.github/ansible/staging.us-east-2.hosts.yaml @@ -3,7 +3,6 @@ storage: bucket_name: neon-staging-storage-us-east-2 bucket_region: us-east-2 console_mgmt_base_url: http://console-staging.local - env_name: us-stage etcd_endpoints: etcd-0.us-east-2.aws.neon.build:2379 pageserver_config_stub: pg_distrib_dir: /usr/local @@ -11,6 +10,7 @@ storage: bucket_name: "{{ bucket_name }}" bucket_region: "{{ bucket_region }}" prefix_in_bucket: "pageserver/v1" + safekeeper_s3_prefix: safekeeper/v1/wal hostname_suffix: "" remote_user: ssm-user ansible_aws_ssm_region: us-east-2 diff --git a/.github/ansible/systemd/safekeeper.service b/.github/ansible/systemd/safekeeper.service index 877579fbfa..69827e36ac 100644 --- a/.github/ansible/systemd/safekeeper.service +++ b/.github/ansible/systemd/safekeeper.service @@ -6,7 +6,7 @@ After=network.target auditd.service Type=simple User=safekeeper Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/v14/lib -ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ env_name }}/wal"}' +ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ safekeeper_s3_prefix }}"}' ExecReload=/bin/kill -HUP $MAINPID KillMode=mixed KillSignal=SIGINT From 5928cb33c553913c28c7857f126fbab9d3537ff6 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Fri, 21 Oct 2022 18:51:48 +0300 Subject: [PATCH 03/10] Introduce timeline state (#2651) Similar to https://github.com/neondatabase/neon/pull/2395, introduces a state field in Timeline, that's possible to subscribe to. Adjusts * walreceiver to not to have any connections if timeline is not Active * remote storage sync to not to schedule uploads if timeline is Broken * not to create timelines if a tenant/timeline is broken * automatically switches timelines' states based on tenant state Does not adjust timeline's gc, checkpointing and layer flush behaviour much, since it's not safe to cancel these processes abruptly and there's task_mgr::shutdown_tasks that does similar thing. --- libs/pageserver_api/src/models.rs | 18 ++ pageserver/src/http/openapi_spec.yml | 3 + pageserver/src/http/routes.rs | 31 ++- pageserver/src/page_service.rs | 3 +- pageserver/src/tenant.rs | 240 +++++++++++------- pageserver/src/tenant/timeline.rs | 101 ++++++-- pageserver/src/tenant_tasks.rs | 2 +- .../src/walreceiver/connection_manager.rs | 104 ++++++-- test_runner/regress/test_broken_timeline.py | 8 +- test_runner/regress/test_timeline_delete.py | 2 +- 10 files changed, 367 insertions(+), 145 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index a153f1a01e..dd40ba9e1c 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -19,6 +19,22 @@ pub enum TenantState { Broken, } +/// A state of a timeline in pageserver's memory. +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum TimelineState { + /// Timeline is fully operational, its background jobs are running. + Active, + /// A timeline is recognized by pageserver, but not yet ready to operate. + /// The status indicates, that the timeline could eventually go back to Active automatically: + /// for example, if the owning tenant goes back to Active again. + Suspended, + /// A timeline is recognized by pageserver, but not yet ready to operate and not allowed to + /// automatically become Active after certain events: only a management call can change this status. + Paused, + /// A timeline is recognized by the pageserver, but no longer used for any operations, as failed to get activated. + Broken, +} + #[serde_as] #[derive(Serialize, Deserialize)] pub struct TimelineCreateRequest { @@ -160,6 +176,8 @@ pub struct TimelineInfo { pub remote_consistent_lsn: Option, pub awaits_download: bool, + pub state: TimelineState, + // Some of the above fields are duplicated in 'local' and 'remote', for backwards- // compatility with older clients. pub local: LocalTimelineInfo, diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 626cc07429..89609f5674 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -618,6 +618,7 @@ components: - last_record_lsn - disk_consistent_lsn - awaits_download + - state properties: timeline_id: type: string @@ -660,6 +661,8 @@ components: type: integer awaits_download: type: boolean + state: + type: string # These 'local' and 'remote' fields just duplicate some of the fields # above. They are kept for backwards-compatibility. They can be removed, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 489adbb2cf..8ec7604b8a 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -129,6 +129,7 @@ async fn build_timeline_info( } }; let current_physical_size = Some(timeline.get_physical_size()); + let state = timeline.current_state(); let info = TimelineInfo { tenant_id: timeline.tenant_id, @@ -158,6 +159,7 @@ async fn build_timeline_info( remote_consistent_lsn, awaits_download, + state, // Duplicate some fields in 'local' and 'remote' fields, for backwards-compatility // with the control plane. @@ -294,7 +296,7 @@ async fn timeline_detail_handler(request: Request) -> Result) -> Result format!("{}", lsn), + LsnForTimestamp::Present(lsn) => format!("{lsn}"), LsnForTimestamp::Future(_lsn) => "future".into(), LsnForTimestamp::Past(_lsn) => "past".into(), LsnForTimestamp::NoData(_lsn) => "nodata".into(), @@ -788,16 +789,16 @@ async fn timeline_gc_handler(mut request: Request) -> Result) -> Result) -> Result Result> { - tenant_mgr::get_tenant(tenant_id, true).and_then(|tenant| tenant.get_timeline(timeline_id)) + tenant_mgr::get_tenant(tenant_id, true) + .and_then(|tenant| tenant.get_timeline(timeline_id, true)) } /// diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 69c89a80b4..84833e9c40 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -11,7 +11,8 @@ //! parent timeline, and the last LSN that has been written to disk. //! -use anyhow::{bail, ensure, Context}; +use anyhow::{bail, Context}; +use pageserver_api::models::TimelineState; use tokio::sync::watch; use tracing::*; use utils::crashsafe::path_with_suffix_extension; @@ -189,6 +190,7 @@ impl UninitializedTimeline<'_> { "Failed to remove uninit mark file for timeline {tenant_id}/{timeline_id}" ) })?; + new_timeline.set_state(TimelineState::Active); v.insert(Arc::clone(&new_timeline)); new_timeline.launch_wal_receiver(); } @@ -338,18 +340,26 @@ impl Tenant { /// Get Timeline handle for given Neon timeline ID. /// This function is idempotent. It doesn't change internal state in any way. - pub fn get_timeline(&self, timeline_id: TimelineId) -> anyhow::Result> { - self.timelines - .lock() - .unwrap() - .get(&timeline_id) - .with_context(|| { - format!( - "Timeline {} was not found for tenant {}", - timeline_id, self.tenant_id - ) - }) - .map(Arc::clone) + pub fn get_timeline( + &self, + timeline_id: TimelineId, + active_only: bool, + ) -> anyhow::Result> { + let timelines_accessor = self.timelines.lock().unwrap(); + let timeline = timelines_accessor.get(&timeline_id).with_context(|| { + format!("Timeline {}/{} was not found", self.tenant_id, timeline_id) + })?; + + if active_only && !timeline.is_active() { + anyhow::bail!( + "Timeline {}/{} is not active, state: {:?}", + self.tenant_id, + timeline_id, + timeline.current_state() + ) + } else { + Ok(Arc::clone(timeline)) + } } /// Lists timelines the tenant contains. @@ -372,6 +382,11 @@ impl Tenant { initdb_lsn: Lsn, pg_version: u32, ) -> anyhow::Result { + anyhow::ensure!( + self.is_active(), + "Cannot create empty timelines on inactive tenant" + ); + let timelines = self.timelines.lock().unwrap(); let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?; drop(timelines); @@ -408,9 +423,14 @@ impl Tenant { mut ancestor_start_lsn: Option, pg_version: u32, ) -> anyhow::Result>> { + anyhow::ensure!( + self.is_active(), + "Cannot create timelines on inactive tenant" + ); + let new_timeline_id = new_timeline_id.unwrap_or_else(TimelineId::generate); - if self.get_timeline(new_timeline_id).is_ok() { + if self.get_timeline(new_timeline_id, false).is_ok() { debug!("timeline {new_timeline_id} already exists"); return Ok(None); } @@ -418,7 +438,7 @@ impl Tenant { let loaded_timeline = match ancestor_timeline_id { Some(ancestor_timeline_id) => { let ancestor_timeline = self - .get_timeline(ancestor_timeline_id) + .get_timeline(ancestor_timeline_id, false) .context("Cannot branch off the timeline that's not present in pageserver")?; if let Some(lsn) = ancestor_start_lsn.as_mut() { @@ -470,6 +490,11 @@ impl Tenant { pitr: Duration, checkpoint_before_gc: bool, ) -> anyhow::Result { + anyhow::ensure!( + self.is_active(), + "Cannot run GC iteration on inactive tenant" + ); + let timeline_str = target_timeline_id .map(|x| x.to_string()) .unwrap_or_else(|| "-".to_string()); @@ -486,6 +511,11 @@ impl Tenant { /// Also it can be explicitly requested per timeline through page server /// api's 'compact' command. pub fn compaction_iteration(&self) -> anyhow::Result<()> { + anyhow::ensure!( + self.is_active(), + "Cannot run compaction iteration on inactive tenant" + ); + // Scan through the hashmap and collect a list of all the timelines, // while holding the lock. Then drop the lock and actually perform the // compactions. We don't want to block everything else while the @@ -493,6 +523,7 @@ impl Tenant { let timelines = self.timelines.lock().unwrap(); let timelines_to_compact = timelines .iter() + .filter(|(_, timeline)| timeline.is_active()) .map(|(timeline_id, timeline)| (*timeline_id, timeline.clone())) .collect::>(); drop(timelines); @@ -515,13 +546,13 @@ impl Tenant { // checkpoints. We don't want to block everything else while the // checkpoint runs. let timelines = self.timelines.lock().unwrap(); - let timelines_to_compact = timelines + let timelines_to_checkpoint = timelines .iter() .map(|(timeline_id, timeline)| (*timeline_id, Arc::clone(timeline))) .collect::>(); drop(timelines); - for (timeline_id, timeline) in &timelines_to_compact { + for (timeline_id, timeline) in &timelines_to_checkpoint { let _entered = info_span!("checkpoint", timeline = %timeline_id, tenant = %self.tenant_id) .entered(); @@ -543,7 +574,7 @@ impl Tenant { .iter() .any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id)); - ensure!( + anyhow::ensure!( !children_exist, "Cannot delete timeline which has child timelines" ); @@ -552,7 +583,10 @@ impl Tenant { Entry::Vacant(_) => bail!("timeline not found"), }; - let layer_removal_guard = timeline_entry.get().layer_removal_guard()?; + let timeline = timeline_entry.get(); + timeline.set_state(TimelineState::Paused); + + let layer_removal_guard = timeline.layer_removal_guard()?; let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id); std::fs::remove_dir_all(&local_timeline_directory).with_context(|| { @@ -569,58 +603,6 @@ impl Tenant { Ok(()) } - pub fn init_attach_timelines( - &self, - timelines: HashMap, - ) -> anyhow::Result<()> { - let sorted_timelines = if timelines.len() == 1 { - timelines.into_iter().collect() - } else if !timelines.is_empty() { - tree_sort_timelines(timelines)? - } else { - warn!("No timelines to attach received"); - return Ok(()); - }; - - let mut timelines_accessor = self.timelines.lock().unwrap(); - for (timeline_id, metadata) in sorted_timelines { - info!( - "Attaching timeline {} pg_version {}", - timeline_id, - metadata.pg_version() - ); - - if timelines_accessor.contains_key(&timeline_id) { - warn!( - "Timeline {}/{} already exists in the tenant map, skipping its initialization", - self.tenant_id, timeline_id - ); - continue; - } else { - let ancestor = metadata - .ancestor_timeline() - .and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id)) - .cloned(); - let timeline = UninitializedTimeline { - owning_tenant: self, - timeline_id, - raw_timeline: Some(( - self.create_timeline_data(timeline_id, metadata, ancestor) - .with_context(|| { - format!("Failed to initialize timeline {timeline_id}") - })?, - TimelineUninitMark::dummy(), - )), - }; - let initialized_timeline = - timeline.initialize_with_lock(&mut timelines_accessor, true)?; - timelines_accessor.insert(timeline_id, initialized_timeline); - } - } - - Ok(()) - } - /// Allows to retrieve remote timeline index from the tenant. Used in walreceiver to grab remote consistent lsn. pub fn get_remote_index(&self) -> &RemoteIndex { &self.remote_index @@ -661,10 +643,30 @@ impl Tenant { } (_, new_state) => { self.state.send_replace(new_state); - if self.should_run_tasks() { - // Spawn gc and compaction loops. The loops will shut themselves - // down when they notice that the tenant is inactive. - crate::tenant_tasks::start_background_loops(self.tenant_id); + + let timelines_accessor = self.timelines.lock().unwrap(); + let not_broken_timelines = timelines_accessor + .values() + .filter(|timeline| timeline.current_state() != TimelineState::Broken); + match new_state { + TenantState::Active { + background_jobs_running, + } => { + if background_jobs_running { + // Spawn gc and compaction loops. The loops will shut themselves + // down when they notice that the tenant is inactive. + crate::tenant_tasks::start_background_loops(self.tenant_id); + } + + for timeline in not_broken_timelines { + timeline.set_state(TimelineState::Active); + } + } + TenantState::Paused | TenantState::Broken => { + for timeline in not_broken_timelines { + timeline.set_state(TimelineState::Suspended); + } + } } } } @@ -993,6 +995,7 @@ impl Tenant { timelines .iter() + .filter(|(_, timeline)| timeline.is_active()) .map(|(timeline_id, timeline_entry)| { // This is unresolved question for now, how to do gc in presence of remote timelines // especially when this is combined with branching. @@ -1026,7 +1029,7 @@ impl Tenant { for timeline_id in timeline_ids { // Timeline is known to be local and loaded. let timeline = self - .get_timeline(timeline_id) + .get_timeline(timeline_id, false) .with_context(|| format!("Timeline {timeline_id} was not found"))?; // If target_timeline is specified, ignore all other timelines @@ -1111,7 +1114,7 @@ impl Tenant { // Step 2 is to avoid initializing the new branch using data removed by past GC iterations // or in-queue GC iterations. - let src_timeline = self.get_timeline(src).with_context(|| { + let src_timeline = self.get_timeline(src, false).with_context(|| { format!( "No ancestor {} found for timeline {}/{}", src, self.tenant_id, dst @@ -1381,6 +1384,68 @@ impl Tenant { Ok(uninit_mark) } + + pub(super) fn init_attach_timelines( + &self, + timelines: HashMap, + ) -> anyhow::Result<()> { + let sorted_timelines = if timelines.len() == 1 { + timelines.into_iter().collect() + } else if !timelines.is_empty() { + tree_sort_timelines(timelines)? + } else { + warn!("No timelines to attach received"); + return Ok(()); + }; + + let tenant_id = self.tenant_id; + let mut timelines_accessor = self.timelines.lock().unwrap(); + for (timeline_id, metadata) in sorted_timelines { + info!( + "Attaching timeline {}/{} pg_version {}", + tenant_id, + timeline_id, + metadata.pg_version() + ); + + if timelines_accessor.contains_key(&timeline_id) { + warn!("Timeline {tenant_id}/{timeline_id} already exists in the tenant map, skipping its initialization"); + continue; + } + + let ancestor = metadata + .ancestor_timeline() + .and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id)) + .cloned(); + let dummy_timeline = self + .create_timeline_data(timeline_id, metadata.clone(), ancestor.clone()) + .with_context(|| { + format!("Failed to crate dummy timeline data for {tenant_id}/{timeline_id}") + })?; + let timeline = UninitializedTimeline { + owning_tenant: self, + timeline_id, + raw_timeline: Some((dummy_timeline, TimelineUninitMark::dummy())), + }; + match timeline.initialize_with_lock(&mut timelines_accessor, true) { + Ok(initialized_timeline) => { + timelines_accessor.insert(timeline_id, initialized_timeline); + } + Err(e) => { + error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}"); + let broken_timeline = self + .create_timeline_data(timeline_id, metadata, ancestor) + .with_context(|| { + format!("Failed to crate broken timeline data for {tenant_id}/{timeline_id}") + })?; + broken_timeline.set_state(TimelineState::Broken); + timelines_accessor.insert(timeline_id, Arc::new(broken_timeline)); + } + } + } + + Ok(()) + } } /// Create the cluster temporarily in 'initdbpath' directory inside the repository @@ -1608,6 +1673,9 @@ pub mod harness { timelines_to_load.insert(timeline_id, timeline_metadata); } tenant.init_attach_timelines(timelines_to_load)?; + tenant.set_state(TenantState::Active { + background_jobs_running: false, + }); Ok(tenant) } @@ -1767,7 +1835,7 @@ mod tests { // Branch the history, modify relation differently on the new timeline tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x30)))?; let newtline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); let new_writer = newtline.writer(); new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?; @@ -1923,7 +1991,7 @@ mod tests { tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; let newtline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; @@ -1942,7 +2010,7 @@ mod tests { tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; let newtline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); make_some_layers(newtline.as_ref(), Lsn(0x60))?; @@ -1974,7 +2042,7 @@ mod tests { let tenant = harness.load(); tenant - .get_timeline(TIMELINE_ID) + .get_timeline(TIMELINE_ID, true) .expect("cannot load timeline"); Ok(()) @@ -1997,7 +2065,7 @@ mod tests { tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; let newtline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); make_some_layers(newtline.as_ref(), Lsn(0x60))?; @@ -2009,11 +2077,11 @@ mod tests { // check that both, child and ancestor are loaded let _child_tline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("cannot get child timeline loaded"); let _ancestor_tline = tenant - .get_timeline(TIMELINE_ID) + .get_timeline(TIMELINE_ID, true) .expect("cannot get ancestor timeline loaded"); Ok(()) @@ -2267,7 +2335,7 @@ mod tests { let new_tline_id = TimelineId::generate(); tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?; tline = tenant - .get_timeline(new_tline_id) + .get_timeline(new_tline_id, true) .expect("Should have the branched timeline"); tline_id = new_tline_id; @@ -2330,7 +2398,7 @@ mod tests { let new_tline_id = TimelineId::generate(); tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?; tline = tenant - .get_timeline(new_tline_id) + .get_timeline(new_tline_id, true) .expect("Should have the branched timeline"); tline_id = new_tline_id; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ccd094b65a..194ca0d857 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5,6 +5,8 @@ use bytes::Bytes; use fail::fail_point; use itertools::Itertools; use once_cell::sync::OnceCell; +use pageserver_api::models::TimelineState; +use tokio::sync::watch; use tokio::task::spawn_blocking; use tracing::*; @@ -160,6 +162,8 @@ pub struct Timeline { /// Relation size cache pub rel_size_cache: RwLock>, + + state: watch::Sender, } /// Internal structure to hold all data needed for logical size calculation. @@ -416,9 +420,11 @@ impl Timeline { /// those functions with an LSN that has been processed yet is an error. /// pub async fn wait_lsn(&self, lsn: Lsn) -> anyhow::Result<()> { + anyhow::ensure!(self.is_active(), "Cannot wait for Lsn on inactive timeline"); + // This should never be called from the WAL receiver, because that could lead // to a deadlock. - ensure!( + anyhow::ensure!( task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnection), "wait_lsn cannot be called in WAL receiver" ); @@ -635,6 +641,35 @@ impl Timeline { } Ok(()) } + + pub fn set_state(&self, new_state: TimelineState) { + match (self.current_state(), new_state) { + (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => { + debug!("Ignoring new state, equal to the existing one: {equal_state_2:?}"); + } + (TimelineState::Broken, _) => { + error!("Ignoring state update {new_state:?} for broken tenant"); + } + (TimelineState::Paused, TimelineState::Active) => { + debug!("Not activating a paused timeline"); + } + (_, new_state) => { + self.state.send_replace(new_state); + } + } + } + + pub fn current_state(&self) -> TimelineState { + *self.state.borrow() + } + + pub fn is_active(&self) -> bool { + self.current_state() == TimelineState::Active + } + + pub fn subscribe_for_state_updates(&self) -> watch::Receiver { + self.state.subscribe() + } } // Private functions @@ -688,8 +723,9 @@ impl Timeline { walredo_mgr: Arc, upload_layers: bool, pg_version: u32, - ) -> Timeline { + ) -> Self { let disk_consistent_lsn = metadata.disk_consistent_lsn(); + let (state, _) = watch::channel(TimelineState::Suspended); let mut result = Timeline { conf, @@ -746,6 +782,7 @@ impl Timeline { last_received_wal: Mutex::new(None), rel_size_cache: RwLock::new(HashMap::new()), + state, }; result.repartition_threshold = result.get_checkpoint_distance() / 10; result @@ -883,8 +920,6 @@ impl Timeline { } fn try_spawn_size_init_task(self: &Arc, init_lsn: Lsn) { - let timeline_id = self.timeline_id; - // Atomically check if the timeline size calculation had already started. // If the flag was not already set, this sets it. if !self @@ -901,17 +936,42 @@ impl Timeline { "initial size calculation", false, async move { - let calculated_size = self_clone.calculate_logical_size(init_lsn)?; - let result = spawn_blocking(move || { - self_clone.current_logical_size.initial_logical_size.set(calculated_size) - }).await?; - match result { - Ok(()) => info!("Successfully calculated initial logical size"), - Err(existing_size) => error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing"), + let mut timeline_state_updates = self_clone.subscribe_for_state_updates(); + let self_calculation = Arc::clone(&self_clone); + tokio::select! { + calculation_result = spawn_blocking(move || self_calculation.calculate_logical_size(init_lsn)) => { + let calculated_size = calculation_result + .context("Failed to spawn calculation result task")? + .context("Failed to calculate logical size")?; + match self_clone.current_logical_size.initial_logical_size.set(calculated_size) { + Ok(()) => info!("Successfully calculated initial logical size"), + Err(existing_size) => error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing"), + } + Ok(()) + }, + new_event = async { + loop { + match timeline_state_updates.changed().await { + Ok(()) => { + let new_state = *timeline_state_updates.borrow(); + match new_state { + // we're running this job for active timelines only + TimelineState::Active => continue, + TimelineState::Broken | TimelineState::Paused | TimelineState::Suspended => return Some(new_state), + } + } + Err(_sender_dropped_error) => return None, + } + } + } => { + match new_event { + Some(new_state) => info!("Timeline became inactive (new state: {new_state:?}), dropping current connections until it reactivates"), + None => info!("Timeline dropped state updates sender, stopping init size calculation"), + } + Ok(()) + }, } - Ok(()) - } - .instrument(info_span!("initial_logical_size_calculation", timeline = %timeline_id)) + }.instrument(info_span!("initial_logical_size_calculation", tenant = %self.tenant_id, timeline = %self.timeline_id)), ); } } @@ -1356,7 +1416,7 @@ impl Timeline { false, )?; - if self.upload_layers.load(atomic::Ordering::Relaxed) { + if self.can_upload_layers() { storage_sync::schedule_layer_upload( self.tenant_id, self.timeline_id, @@ -1826,7 +1886,7 @@ impl Timeline { } drop(layers); - if self.upload_layers.load(atomic::Ordering::Relaxed) { + if self.can_upload_layers() { storage_sync::schedule_layer_upload( self.tenant_id, self.timeline_id, @@ -1930,7 +1990,7 @@ impl Timeline { /// obsolete. /// pub(super) fn gc(&self) -> anyhow::Result { - let mut result: GcResult = Default::default(); + let mut result: GcResult = GcResult::default(); let now = SystemTime::now(); fail_point!("before-timeline-gc"); @@ -2110,7 +2170,7 @@ impl Timeline { fail_point!("after-timeline-gc-removed-layers"); } - if self.upload_layers.load(atomic::Ordering::Relaxed) { + if self.can_upload_layers() { storage_sync::schedule_layer_delete( self.tenant_id, self.timeline_id, @@ -2199,6 +2259,11 @@ impl Timeline { } } } + + fn can_upload_layers(&self) -> bool { + self.upload_layers.load(atomic::Ordering::Relaxed) + && self.current_state() != TimelineState::Broken + } } /// Helper function for get_reconstruct_data() to add the path of layers traversed diff --git a/pageserver/src/tenant_tasks.rs b/pageserver/src/tenant_tasks.rs index 030055df6d..23ce9dc699 100644 --- a/pageserver/src/tenant_tasks.rs +++ b/pageserver/src/tenant_tasks.rs @@ -175,7 +175,7 @@ async fn wait_for_active_tenant( } state => { debug!("Not running the task loop, tenant is not active with background jobs enabled: {state:?}"); - tokio::time::sleep(wait).await; + continue; } } } diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index 2380caaff1..53dd2d8eac 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -12,6 +12,7 @@ use std::{ collections::{hash_map, HashMap}, num::NonZeroU64, + ops::ControlFlow, sync::Arc, time::Duration, }; @@ -26,7 +27,8 @@ use etcd_broker::{ subscription_key::SubscriptionKey, subscription_value::SkTimelineInfo, BrokerSubscription, BrokerUpdate, Client, }; -use tokio::select; +use pageserver_api::models::TimelineState; +use tokio::{select, sync::watch}; use tracing::*; use crate::{ @@ -58,10 +60,7 @@ pub fn spawn_connection_manager_task( TaskKind::WalReceiverManager, Some(tenant_id), Some(timeline_id), - &format!( - "walreceiver for tenant {} timeline {}", - timeline.tenant_id, timeline.timeline_id - ), + &format!("walreceiver for timeline {tenant_id}/{timeline_id}"), false, async move { info!("WAL receiver broker started, connecting to etcd"); @@ -75,19 +74,21 @@ pub fn spawn_connection_manager_task( select! { _ = task_mgr::shutdown_watcher() => { info!("WAL receiver shutdown requested, shutting down"); - // Kill current connection, if any - if let Some(wal_connection) = walreceiver_state.wal_connection.take() - { - wal_connection.connection_task.shutdown().await; - } + walreceiver_state.shutdown().await; return Ok(()); }, - - _ = connection_manager_loop_step( + loop_step_result = connection_manager_loop_step( &broker_loop_prefix, &mut etcd_client, &mut walreceiver_state, - ) => {}, + ) => match loop_step_result { + ControlFlow::Continue(()) => continue, + ControlFlow::Break(()) => { + info!("Connection manager loop ended, shutting down"); + walreceiver_state.shutdown().await; + return Ok(()); + } + }, } } } @@ -104,7 +105,17 @@ async fn connection_manager_loop_step( broker_prefix: &str, etcd_client: &mut Client, walreceiver_state: &mut WalreceiverState, -) { +) -> ControlFlow<(), ()> { + let mut timeline_state_updates = walreceiver_state.timeline.subscribe_for_state_updates(); + + match wait_for_active_timeline(&mut timeline_state_updates).await { + ControlFlow::Continue(()) => {} + ControlFlow::Break(()) => { + info!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop"); + return ControlFlow::Break(()); + } + } + let id = TenantTimelineId { tenant_id: walreceiver_state.timeline.tenant_id, timeline_id: walreceiver_state.timeline.timeline_id, @@ -129,10 +140,12 @@ async fn connection_manager_loop_step( // - change connection if the rules decide so, or if the current connection dies // - receive updates from broker // - this might change the current desired connection + // - timeline state changes to something that does not allow walreceiver to run concurrently select! { broker_connection_result = &mut broker_subscription.watcher_handle => { + info!("Broker connection was closed from the other side, ending current broker loop step"); cleanup_broker_connection(broker_connection_result, walreceiver_state); - return; + return ControlFlow::Continue(()); }, Some(wal_connection_update) = async { @@ -185,11 +198,36 @@ async fn connection_manager_loop_step( (&mut broker_subscription.watcher_handle).await, walreceiver_state, ); - return; + return ControlFlow::Continue(()); } } }, + new_event = async { + loop { + match timeline_state_updates.changed().await { + Ok(()) => { + let new_state = walreceiver_state.timeline.current_state(); + match new_state { + // we're already active as walreceiver, no need to reactivate + TimelineState::Active => continue, + TimelineState::Broken | TimelineState::Paused | TimelineState::Suspended => return ControlFlow::Continue(new_state), + } + } + Err(_sender_dropped_error) => return ControlFlow::Break(()), + } + } + } => match new_event { + ControlFlow::Continue(new_state) => { + info!("Timeline became inactive (new state: {new_state:?}), dropping current connections until it reactivates"); + return ControlFlow::Continue(()); + } + ControlFlow::Break(()) => { + info!("Timeline dropped state updates sender, stopping wal connection manager loop"); + return ControlFlow::Break(()); + } + }, + _ = async { tokio::time::sleep(time_until_next_retry.unwrap()).await }, if time_until_next_retry.is_some() => {} } @@ -216,6 +254,34 @@ async fn connection_manager_loop_step( } } +async fn wait_for_active_timeline( + timeline_state_updates: &mut watch::Receiver, +) -> ControlFlow<(), ()> { + let current_state = *timeline_state_updates.borrow(); + if current_state == TimelineState::Active { + return ControlFlow::Continue(()); + } + + loop { + match timeline_state_updates.changed().await { + Ok(()) => { + let new_state = *timeline_state_updates.borrow(); + match new_state { + TimelineState::Active => { + debug!("Timeline state changed to active, continuing the walreceiver connection manager"); + return ControlFlow::Continue(()); + } + state => { + debug!("Not running the walreceiver connection manager, timeline is not active: {state:?}"); + continue; + } + } + } + Err(_sender_dropped_error) => return ControlFlow::Break(()), + } + } +} + fn cleanup_broker_connection( broker_connection_result: Result, tokio::task::JoinError>, walreceiver_state: &mut WalreceiverState, @@ -723,6 +789,12 @@ impl WalreceiverState { self.wal_connection_retries.remove(&node_id); } } + + async fn shutdown(mut self) { + if let Some(wal_connection) = self.wal_connection.take() { + wal_connection.connection_task.shutdown().await; + } + } } #[derive(Debug, PartialEq, Eq)] diff --git a/test_runner/regress/test_broken_timeline.py b/test_runner/regress/test_broken_timeline.py index 101cce9ffc..b747af4d09 100644 --- a/test_runner/regress/test_broken_timeline.py +++ b/test_runner/regress/test_broken_timeline.py @@ -70,18 +70,14 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder): # But all others are broken # First timeline would not get loaded into pageserver due to corrupt metadata file - with pytest.raises( - Exception, match=f"Timeline {timeline1} was not found for tenant {tenant1}" - ) as err: + with pytest.raises(Exception, match=f"Timeline {tenant1}/{timeline1} was not found") as err: pg1.start() log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}") # Second timeline has no ancestors, only the metadata file and no layer files # We don't have the remote storage enabled, which means timeline is in an incorrect state, # it's not loaded at all - with pytest.raises( - Exception, match=f"Timeline {timeline2} was not found for tenant {tenant2}" - ) as err: + with pytest.raises(Exception, match=f"Timeline {tenant2}/{timeline2} was not found") as err: pg2.start() log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}") diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index de05d445ed..4a78a2746e 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -65,7 +65,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv): # check 404 with pytest.raises( NeonPageserverApiException, - match=f"Timeline {leaf_timeline_id} was not found for tenant {env.initial_tenant}", + match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found", ): ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id) From 71ef7b666305350d59298cc9c53d721be509a093 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s?= Date: Fri, 21 Oct 2022 19:02:31 +0200 Subject: [PATCH 04/10] Remove cached_property package (#2673) Co-authored-by: andres --- poetry.lock | 12 ------------ pyproject.toml | 2 -- test_runner/fixtures/neon_fixtures.py | 2 +- test_runner/regress/test_close_fds.py | 2 +- 4 files changed, 2 insertions(+), 16 deletions(-) diff --git a/poetry.lock b/poetry.lock index 2af0d97511..27de8508ce 100644 --- a/poetry.lock +++ b/poetry.lock @@ -514,14 +514,6 @@ python-versions = ">=3.7" [package.dependencies] typing-extensions = ">=4.1.0" -[[package]] -name = "cached-property" -version = "1.5.2" -description = "A decorator for caching properties in classes." -category = "main" -optional = false -python-versions = "*" - [[package]] name = "certifi" version = "2022.6.15" @@ -1647,10 +1639,6 @@ botocore-stubs = [ {file = "botocore-stubs-1.27.38.tar.gz", hash = "sha256:408e8b86b5d171b58f81c74ca9d3b5317a5a8e2d3bc2073aa841ac13b8939e56"}, {file = "botocore_stubs-1.27.38-py3-none-any.whl", hash = "sha256:7add7641e9a479a9c8366893bb522fd9ca3d58714201e43662a200a148a1bc38"}, ] -cached-property = [ - {file = "cached-property-1.5.2.tar.gz", hash = "sha256:9fa5755838eecbb2d234c3aa390bd80fbd3ac6b6869109bfc1b499f7bd89a130"}, - {file = "cached_property-1.5.2-py2.py3-none-any.whl", hash = "sha256:df4f613cf7ad9a588cc381aaf4a512d26265ecebd5eb9e1ba12f1319eb85a6a0"}, -] certifi = [ {file = "certifi-2022.6.15-py3-none-any.whl", hash = "sha256:fe86415d55e84719d75f8b69414f6438ac3547d2078ab91b67e779ef69378412"}, {file = "certifi-2022.6.15.tar.gz", hash = "sha256:84c85a9078b11105f04f3036a9482ae10e4621616db313fe045dd24743a0820d"}, diff --git a/pyproject.toml b/pyproject.toml index 9c2aa39c7c..1ee6fbe6f4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,6 @@ requests = "^2.26.0" pytest-xdist = "^2.3.0" asyncpg = "^0.24.0" aiopg = "^1.3.1" -cached-property = "^1.5.2" Jinja2 = "^3.0.2" types-requests = "^2.28.5" types-psycopg2 = "^2.9.18" @@ -74,7 +73,6 @@ strict = true [[tool.mypy.overrides]] module = [ "asyncpg.*", - "cached_property.*", "pg8000.*", ] ignore_missing_imports = true diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index a77b3958c9..4b2638bb2a 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -17,6 +17,7 @@ import uuid from contextlib import closing, contextmanager from dataclasses import dataclass, field from enum import Flag, auto +from functools import cached_property from pathlib import Path from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, TypeVar, Union, cast @@ -27,7 +28,6 @@ import jwt import psycopg2 import pytest import requests -from cached_property import cached_property from fixtures.log_helper import log from fixtures.types import Lsn, TenantId, TimelineId diff --git a/test_runner/regress/test_close_fds.py b/test_runner/regress/test_close_fds.py index c7ea37f9c8..22f245f79b 100644 --- a/test_runner/regress/test_close_fds.py +++ b/test_runner/regress/test_close_fds.py @@ -1,10 +1,10 @@ import os.path import shutil import subprocess +import threading import time from contextlib import closing -from cached_property import threading from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv From 321aeac3d4f15e84ca615c499caf35033bc891e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lassi=20P=C3=B6l=C3=B6nen?= Date: Fri, 21 Oct 2022 20:30:20 +0300 Subject: [PATCH 05/10] Json logging capability (#2624) * Support configuring the log format as json or plain. Separately test json and plain logger. They would be competing on the same global subscriber otherwise. * Implement log_format for pageserver config * Implement configurable log format for safekeeper. --- Cargo.lock | 15 ++++++ libs/utils/Cargo.toml | 4 +- libs/utils/src/logging.rs | 78 ++++++++++++++++++++++----- libs/utils/tests/logger_json_test.rs | 36 +++++++++++++ libs/utils/tests/logger_plain_test.rs | 36 +++++++++++++ pageserver/src/bin/pageserver.rs | 2 +- pageserver/src/config.rs | 22 ++++++++ safekeeper/src/bin/safekeeper.rs | 19 +++++-- safekeeper/src/lib.rs | 7 ++- 9 files changed, 200 insertions(+), 19 deletions(-) create mode 100644 libs/utils/tests/logger_json_test.rs create mode 100644 libs/utils/tests/logger_plain_test.rs diff --git a/Cargo.lock b/Cargo.lock index 657baf5d80..13774f7fe6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3932,6 +3932,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.16" @@ -3942,12 +3952,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] @@ -4042,6 +4055,8 @@ dependencies = [ "serde_json", "serde_with", "signal-hook", + "strum", + "strum_macros", "tempfile", "thiserror", "tokio", diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index a7baddada4..1753ee81b9 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -19,7 +19,7 @@ thiserror = "1.0" tokio = { version = "1.17", features = ["macros"]} tokio-rustls = "0.23" tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } nix = "0.25" signal-hook = "0.3.10" rand = "0.8.3" @@ -30,6 +30,8 @@ rustls-split = "0.3.0" git-version = "0.3.5" serde_with = "2.0" once_cell = "1.13.0" +strum = "0.24" +strum_macros = "0.24" metrics = { path = "../metrics" } diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index 1576a54c8e..31c0e02f98 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -1,11 +1,35 @@ use std::{ fs::{File, OpenOptions}, path::Path, + str::FromStr, }; use anyhow::{Context, Result}; +use strum_macros::{EnumString, EnumVariantNames}; -pub fn init(log_filename: impl AsRef, daemonize: bool) -> Result { +#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)] +#[strum(serialize_all = "snake_case")] +pub enum LogFormat { + Plain, + Json, +} + +impl LogFormat { + pub fn from_config(s: &str) -> anyhow::Result { + use strum::VariantNames; + LogFormat::from_str(s).with_context(|| { + format!( + "Unrecognized log format. Please specify one of: {:?}", + LogFormat::VARIANTS + ) + }) + } +} +pub fn init( + log_filename: impl AsRef, + daemonize: bool, + log_format: LogFormat, +) -> Result { // Don't open the same file for output multiple times; // the different fds could overwrite each other's output. let log_file = OpenOptions::new() @@ -21,22 +45,50 @@ pub fn init(log_filename: impl AsRef, daemonize: bool) -> Result { let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_filter_str)); + let x: File = log_file.try_clone().unwrap(); let base_logger = tracing_subscriber::fmt() .with_env_filter(env_filter) - .with_target(false) // don't include event targets - .with_ansi(false); // don't use colors in log file; + .with_target(false) + .with_ansi(false) + .with_writer(move || -> Box { + // we are cloning and returning log file in order to allow redirecting daemonized stdout and stderr to it + // if we do not use daemonization (e.g. in docker) it is better to log to stdout directly + // for example to be in line with docker log command which expects logs comimg from stdout + if daemonize { + Box::new(x.try_clone().unwrap()) + } else { + Box::new(std::io::stdout()) + } + }); - // we are cloning and returning log file in order to allow redirecting daemonized stdout and stderr to it - // if we do not use daemonization (e.g. in docker) it is better to log to stdout directly - // for example to be in line with docker log command which expects logs comimg from stdout - if daemonize { - let x = log_file.try_clone().unwrap(); - base_logger - .with_writer(move || x.try_clone().unwrap()) - .init(); - } else { - base_logger.init(); + match log_format { + LogFormat::Json => base_logger.json().init(), + LogFormat::Plain => base_logger.init(), } Ok(log_file) } + +// #[cfg(test)] +// Due to global logger, can't run tests in same process. +// So until there's a non-global one, the tests are in ../tests/ as separate files. +#[macro_export(local_inner_macros)] +macro_rules! test_init_file_logger { + ($log_level:expr, $log_format:expr) => {{ + use std::str::FromStr; + std::env::set_var("RUST_LOG", $log_level); + + let tmp_dir = tempfile::TempDir::new().unwrap(); + let log_file_path = tmp_dir.path().join("logfile"); + + let log_format = $crate::logging::LogFormat::from_str($log_format).unwrap(); + let _log_file = $crate::logging::init(&log_file_path, true, log_format).unwrap(); + + let log_file = std::fs::OpenOptions::new() + .read(true) + .open(&log_file_path) + .unwrap(); + + log_file + }}; +} diff --git a/libs/utils/tests/logger_json_test.rs b/libs/utils/tests/logger_json_test.rs new file mode 100644 index 0000000000..5d63b9b004 --- /dev/null +++ b/libs/utils/tests/logger_json_test.rs @@ -0,0 +1,36 @@ +// This could be in ../src/logging.rs but since the logger is global, these +// can't be run in threads of the same process +use std::fs::File; +use std::io::{BufRead, BufReader, Lines}; +use tracing::*; +use utils::test_init_file_logger; + +fn read_lines(file: File) -> Lines> { + BufReader::new(file).lines() +} + +#[test] +fn test_json_format_has_message_and_custom_field() { + std::env::set_var("RUST_LOG", "info"); + + let log_file = test_init_file_logger!("info", "json"); + + let custom_field: &str = "hi"; + trace!(custom = %custom_field, "test log message"); + debug!(custom = %custom_field, "test log message"); + info!(custom = %custom_field, "test log message"); + warn!(custom = %custom_field, "test log message"); + error!(custom = %custom_field, "test log message"); + + let lines = read_lines(log_file); + for line in lines { + let content = line.unwrap(); + let json_object = serde_json::from_str::(&content).unwrap(); + + assert_eq!(json_object["fields"]["custom"], "hi"); + assert_eq!(json_object["fields"]["message"], "test log message"); + + assert_ne!(json_object["level"], "TRACE"); + assert_ne!(json_object["level"], "DEBUG"); + } +} diff --git a/libs/utils/tests/logger_plain_test.rs b/libs/utils/tests/logger_plain_test.rs new file mode 100644 index 0000000000..bc5abf45dd --- /dev/null +++ b/libs/utils/tests/logger_plain_test.rs @@ -0,0 +1,36 @@ +// This could be in ../src/logging.rs but since the logger is global, these +// can't be run in threads of the same process +use std::fs::File; +use std::io::{BufRead, BufReader, Lines}; +use tracing::*; +use utils::test_init_file_logger; + +fn read_lines(file: File) -> Lines> { + BufReader::new(file).lines() +} + +#[test] +fn test_plain_format_has_message_and_custom_field() { + std::env::set_var("RUST_LOG", "warn"); + + let log_file = test_init_file_logger!("warn", "plain"); + + let custom_field: &str = "hi"; + trace!(custom = %custom_field, "test log message"); + debug!(custom = %custom_field, "test log message"); + info!(custom = %custom_field, "test log message"); + warn!(custom = %custom_field, "test log message"); + error!(custom = %custom_field, "test log message"); + + let lines = read_lines(log_file); + for line in lines { + let content = line.unwrap(); + serde_json::from_str::(&content).unwrap_err(); + assert!(content.contains("custom=hi")); + assert!(content.contains("test log message")); + + assert!(!content.contains("TRACE")); + assert!(!content.contains("DEBUG")); + assert!(!content.contains("INFO")); + } +} diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 9317dd5dd7..802352be90 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -199,7 +199,7 @@ fn initialize_config( fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()> { // Initialize logger - let log_file = logging::init(LOG_FILE_NAME, daemonize)?; + let log_file = logging::init(LOG_FILE_NAME, daemonize, conf.log_format)?; info!("version: {}", version()); diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 4f80fc96b5..6a372fb081 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -17,6 +17,7 @@ use toml_edit::{Document, Item}; use url::Url; use utils::{ id::{NodeId, TenantId, TimelineId}, + logging::LogFormat, postgres_backend::AuthType, }; @@ -45,6 +46,8 @@ pub mod defaults { pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192; pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100; + pub const DEFAULT_LOG_FORMAT: &str = "plain"; + /// /// Default built-in configuration file. /// @@ -63,6 +66,7 @@ pub mod defaults { # initial superuser role name to use when creating a new tenant #initial_superuser_name = '{DEFAULT_SUPERUSER}' +#log_format = '{DEFAULT_LOG_FORMAT}' # [tenant_config] #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT} @@ -126,6 +130,8 @@ pub struct PageServerConf { /// Etcd broker endpoints to connect to. pub broker_endpoints: Vec, + + pub log_format: LogFormat, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -192,6 +198,8 @@ struct PageServerConfigBuilder { profiling: BuilderValue, broker_etcd_prefix: BuilderValue, broker_endpoints: BuilderValue>, + + log_format: BuilderValue, } impl Default for PageServerConfigBuilder { @@ -219,6 +227,7 @@ impl Default for PageServerConfigBuilder { profiling: Set(ProfilingConfig::Disabled), broker_etcd_prefix: Set(etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string()), broker_endpoints: Set(Vec::new()), + log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()), } } } @@ -291,6 +300,10 @@ impl PageServerConfigBuilder { self.profiling = BuilderValue::Set(profiling) } + pub fn log_format(&mut self, log_format: LogFormat) { + self.log_format = BuilderValue::Set(log_format) + } + pub fn build(self) -> anyhow::Result { let broker_endpoints = self .broker_endpoints @@ -335,6 +348,7 @@ impl PageServerConfigBuilder { broker_etcd_prefix: self .broker_etcd_prefix .ok_or(anyhow!("missing broker_etcd_prefix"))?, + log_format: self.log_format.ok_or(anyhow!("missing log_format"))?, }) } } @@ -459,6 +473,9 @@ impl PageServerConf { }) .collect::>()?, ), + "log_format" => builder.log_format( + LogFormat::from_config(&parse_toml_string(key, item)?)? + ), _ => bail!("unrecognized pageserver option '{key}'"), } } @@ -571,6 +588,7 @@ impl PageServerConf { default_tenant_conf: TenantConf::dummy_conf(), broker_endpoints: Vec::new(), broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), + log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(), } } } @@ -665,6 +683,8 @@ max_file_descriptors = 333 initial_superuser_name = 'zzzz' id = 10 +log_format = 'json' + "#; #[test] @@ -704,6 +724,7 @@ id = 10 .parse() .expect("Failed to parse a valid broker endpoint URL")], broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), + log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(), }, "Correct defaults should be used when no config values are provided" ); @@ -748,6 +769,7 @@ id = 10 .parse() .expect("Failed to parse a valid broker endpoint URL")], broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), + log_format: LogFormat::Json, }, "Should be able to parse all basic config values correctly" ); diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index a867aea5af..67c2c62f73 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -32,8 +32,12 @@ use safekeeper::GlobalTimelines; use safekeeper::SafeKeeperConf; use utils::auth::JwtAuth; use utils::{ - http::endpoint, id::NodeId, logging, project_git_version, shutdown::exit_now, signals, - tcp_listener, + http::endpoint, + id::NodeId, + logging::{self, LogFormat}, + project_git_version, + shutdown::exit_now, + signals, tcp_listener, }; const LOCK_FILE_NAME: &str = "safekeeper.lock"; @@ -131,11 +135,15 @@ fn main() -> anyhow::Result<()> { .get_one::("auth-validation-public-key-path") .map(PathBuf::from); + if let Some(log_format) = arg_matches.get_one::("log-format") { + conf.log_format = LogFormat::from_config(log_format)?; + } + start_safekeeper(conf, given_id, arg_matches.get_flag("init")) } fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bool) -> Result<()> { - let log_file = logging::init("safekeeper.log", conf.daemonize)?; + let log_file = logging::init("safekeeper.log", conf.daemonize, conf.log_format)?; info!("version: {GIT_VERSION}"); @@ -436,6 +444,11 @@ fn cli() -> Command { .long("auth-validation-public-key-path") .help("Path to an RSA .pem public key which is used to check JWT tokens") ) + .arg( + Arg::new("log-format") + .long("log-format") + .help("Format for logging, either 'plain' or 'json'") + ) } #[test] diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 19dff79b88..c3b8227e17 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -7,7 +7,10 @@ use std::path::PathBuf; use std::time::Duration; use url::Url; -use utils::id::{NodeId, TenantId, TenantTimelineId}; +use utils::{ + id::{NodeId, TenantId, TenantTimelineId}, + logging::LogFormat, +}; pub mod broker; pub mod control_file; @@ -64,6 +67,7 @@ pub struct SafeKeeperConf { pub auth_validation_public_key_path: Option, pub heartbeat_timeout: Duration, pub max_offloader_lag_bytes: u64, + pub log_format: LogFormat, } impl SafeKeeperConf { @@ -97,6 +101,7 @@ impl Default for SafeKeeperConf { auth_validation_public_key_path: None, heartbeat_timeout: DEFAULT_HEARTBEAT_TIMEOUT, max_offloader_lag_bytes: DEFAULT_MAX_OFFLOADER_LAG_BYTES, + log_format: LogFormat::Plain, } } } From 7b6431cbd7ca1cf7fe7fcf7001cecfd1102d7879 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sat, 22 Oct 2022 14:59:18 +0300 Subject: [PATCH 06/10] Disable wal_log_hints by default (#2598) * Disable wal_log_hints by default * Remove obsolete comment anbout wal_log_hints --- control_plane/src/compute.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 9f32ad31c1..b3f90b5922 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -282,9 +282,7 @@ impl PostgresNode { fn setup_pg_conf(&self, auth_type: AuthType) -> Result<()> { let mut conf = PostgresConf::new(); conf.append("max_wal_senders", "10"); - // wal_log_hints is mandatory when running against pageserver (see gh issue#192) - // TODO: is it possible to check wal_log_hints at pageserver side via XLOG_PARAMETER_CHANGE? - conf.append("wal_log_hints", "on"); + conf.append("wal_log_hints", "off"); conf.append("max_replication_slots", "10"); conf.append("hot_standby", "on"); conf.append("shared_buffers", "1MB"); From 9f49605041cc4954eff96690c7874cc16ac3f8f8 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Sat, 22 Oct 2022 15:11:43 +0400 Subject: [PATCH 07/10] Fix division by zero panic in determine_offloader. --- safekeeper/src/wal_backup.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 13287bd036..0a43d6085c 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -95,7 +95,7 @@ fn determine_offloader( let capable_peers = alive_peers .iter() .filter(|p| p.local_start_lsn <= wal_backup_lsn); - match alive_peers.iter().map(|p| p.commit_lsn).max() { + match capable_peers.clone().map(|p| p.commit_lsn).max() { None => (None, "no connected peers to elect from".to_string()), Some(max_commit_lsn) => { let threshold = max_commit_lsn From 2f399f08b2f61ea3507d108768d6d9a29113d0fc Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Sat, 22 Oct 2022 02:26:28 +0300 Subject: [PATCH 08/10] Hotfix to disable grant create on public schema `GRANT CREATE ON SCHEMA public` fails if there is no schema `public`. Disable it in release for now and make a better fix later (it is needed for v15 support). --- compute_tools/src/spec.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index e0c0e9404b..1e7cd51b6e 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -423,11 +423,11 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> { ); db_client.simple_query(&alter_query)?; - // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user. - // This is needed since postgres 15, where this privilege is removed by default. - let grant_query: String = "GRANT CREATE ON SCHEMA public TO web_access".to_string(); - info!("grant query for db {} : {}", &db.name, &grant_query); - db_client.simple_query(&grant_query)?; + // // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user. + // // This is needed since postgres 15, where this privilege is removed by default. + // let grant_query: String = "GRANT CREATE ON SCHEMA public TO web_access".to_string(); + // info!("grant query for db {} : {}", &db.name, &grant_query); + // db_client.simple_query(&grant_query)?; } Ok(()) From 39897105b27fc133802b0edc676bc3c8297d00b0 Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Mon, 24 Oct 2022 11:49:36 +0300 Subject: [PATCH 09/10] Check postgres version and ensure that public schema exists before running GRANT query on it --- compute_tools/src/spec.rs | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 1e7cd51b6e..58c94d74ae 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -423,11 +423,32 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> { ); db_client.simple_query(&alter_query)?; - // // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user. - // // This is needed since postgres 15, where this privilege is removed by default. - // let grant_query: String = "GRANT CREATE ON SCHEMA public TO web_access".to_string(); - // info!("grant query for db {} : {}", &db.name, &grant_query); - // db_client.simple_query(&grant_query)?; + // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user. + // This is needed because since postgres 15 this privilege is removed by default. + let grant_query = "DO $$\n\ + BEGIN\n\ + IF EXISTS(\n\ + SELECT nspname\n\ + FROM pg_catalog.pg_namespace\n\ + WHERE nspname = 'public'\n\ + ) AND\n\ + current_setting('server_version_num')::int/10000 >= 15\n\ + THEN\n\ + IF EXISTS(\n\ + SELECT rolname\n\ + FROM pg_catalog.pg_roles\n\ + WHERE rolname = 'web_access'\n\ + )\n\ + THEN\n\ + GRANT CREATE ON SCHEMA public TO web_access;\n\ + END IF;\n\ + END IF;\n\ + END\n\ + $$;" + .to_string(); + + info!("grant query for db {} : {}", &db.name, &grant_query); + db_client.simple_query(&grant_query)?; } Ok(()) From df18b041c0889dc034ee59a7091f99442bc07e20 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Tue, 25 Oct 2022 13:09:41 +0300 Subject: [PATCH 10/10] Use apt version pinning instead of repo priorities Higher `bullseye` priority doesn't works for packages installed via `bullseye-updates`, e.g.: ``` libc-bin: Installed: 2.31-13+deb11u5 Candidate: 2.35-3 Version table: 2.35-3 500 500 http://ftp.debian.org/debian testing/main amd64 Packages *** 2.31-13+deb11u5 500 500 http://deb.debian.org/debian bullseye-updates/main amd64 Packages 100 /var/lib/dpkg/status 2.31-13+deb11u4 990 990 http://deb.debian.org/debian bullseye/main amd64 Packages ``` Try version pinning instead --- Dockerfile.compute-node-v14 | 4 ++-- Dockerfile.compute-node-v15 | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Dockerfile.compute-node-v14 b/Dockerfile.compute-node-v14 index f5ccdf7e99..6d2b285fa3 100644 --- a/Dockerfile.compute-node-v14 +++ b/Dockerfile.compute-node-v14 @@ -9,7 +9,7 @@ ARG TAG=pinned # FROM debian:bullseye-slim AS build-deps RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \ - echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \ + echo "Package: *\nPin: release n=bullseye\nPin-Priority: 50" > /etc/apt/preferences && \ apt update RUN apt update && \ apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \ @@ -191,7 +191,7 @@ RUN apt update && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \ echo "Installing GLIBC 2.34" && \ echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \ - echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \ + echo "Package: *\nPin: release n=bullseye\nPin-Priority: 50" > /etc/apt/preferences && \ apt update && \ apt install -y --no-install-recommends -t testing libc6 && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \ diff --git a/Dockerfile.compute-node-v15 b/Dockerfile.compute-node-v15 index ec555ad932..b7b1f25103 100644 --- a/Dockerfile.compute-node-v15 +++ b/Dockerfile.compute-node-v15 @@ -14,7 +14,7 @@ ARG TAG=pinned # FROM debian:bullseye-slim AS build-deps RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \ - echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \ + echo "Package: *\nPin: release n=bullseye\nPin-Priority: 50" > /etc/apt/preferences && \ apt update RUN apt update && \ apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \ @@ -196,7 +196,7 @@ RUN apt update && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \ echo "Installing GLIBC 2.34" && \ echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \ - echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \ + echo "Package: *\nPin: release n=bullseye\nPin-Priority: 50" > /etc/apt/preferences && \ apt update && \ apt install -y --no-install-recommends -t testing libc6 && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \