From f67a8a173ec889a163f0b89b43dd6957da45b82c Mon Sep 17 00:00:00 2001 From: HaoyuHuang Date: Mon, 14 Jul 2025 09:37:04 -0700 Subject: [PATCH] A few SK changes (#12577) # TLDR This PR is a no-op. ## Problem When a SK loses a disk, it must recover all WALs from the very beginning. This may take days/weeks to catch up to the latest WALs for all timelines it owns. ## Summary of changes When SK starts up, if it finds that it has 0 timelines, - it will ask SC for the timeline it owns. - Then, pulls the timeline from its peer safekeepers to restore the WAL redundancy right away. After pulling timeline is complete, it will become active and accepts new WALs. The current impl is a prototype. We can optimize the impl further, e.g., parallel pull timelines. --------- Co-authored-by: Haoyu Huang --- control_plane/storcon_cli/src/main.rs | 1 + libs/pageserver_api/src/controller_api.rs | 39 ++ libs/utils/src/ip_address.rs | 73 ++++ libs/utils/src/lib.rs | 3 + pageserver/src/controller_upcall_client.rs | 1 + safekeeper/client/src/mgmt_api.rs | 9 +- safekeeper/src/bin/safekeeper.rs | 24 ++ safekeeper/src/hadron.rs | 388 ++++++++++++++++++ safekeeper/src/http/routes.rs | 11 +- safekeeper/src/lib.rs | 12 + safekeeper/src/metrics.rs | 37 ++ safekeeper/src/pull_timeline.rs | 128 ++++-- .../tests/walproposer_sim/safekeeper.rs | 5 + test_runner/regress/test_wal_restore.py | 113 +++++ 14 files changed, 808 insertions(+), 36 deletions(-) create mode 100644 libs/utils/src/ip_address.rs create mode 100644 safekeeper/src/hadron.rs diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 24fd34a87a..fcc5549beb 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -476,6 +476,7 @@ async fn main() -> anyhow::Result<()> { listen_http_port, listen_https_port, availability_zone_id: AvailabilityZone(availability_zone_id), + node_ip_addr: None, }), ) .await?; diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index b02c6a613a..8f86b03f72 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -1,5 +1,6 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Display; +use std::net::IpAddr; use std::str::FromStr; use std::time::{Duration, Instant}; @@ -60,6 +61,11 @@ pub struct NodeRegisterRequest { pub listen_https_port: Option, pub availability_zone_id: AvailabilityZone, + + // Reachable IP address of the PS/SK registering, if known. + // Hadron Cluster Coordiantor will update the DNS record of the registering node + // with this IP address. + pub node_ip_addr: Option, } #[derive(Serialize, Deserialize)] @@ -545,6 +551,39 @@ pub struct SafekeeperDescribeResponse { pub scheduling_policy: SkSchedulingPolicy, } +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct TimelineSafekeeperPeer { + pub node_id: NodeId, + pub listen_http_addr: String, + pub http_port: i32, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct SCSafekeeperTimeline { + // SC does not know the tenant id. + pub timeline_id: TimelineId, + pub peers: Vec, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct SCSafekeeperTimelinesResponse { + pub timelines: Vec, + pub safekeeper_peers: Vec, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct SafekeeperTimeline { + pub tenant_id: TenantId, + pub timeline_id: TimelineId, + pub peers: Vec, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct SafekeeperTimelinesResponse { + pub timelines: Vec, + pub safekeeper_peers: Vec, +} + #[derive(Serialize, Deserialize, Clone)] pub struct SafekeeperSchedulingPolicyRequest { pub scheduling_policy: SkSchedulingPolicy, diff --git a/libs/utils/src/ip_address.rs b/libs/utils/src/ip_address.rs new file mode 100644 index 0000000000..d0834d0ba5 --- /dev/null +++ b/libs/utils/src/ip_address.rs @@ -0,0 +1,73 @@ +use std::env::{VarError, var}; +use std::error::Error; +use std::net::IpAddr; +use std::str::FromStr; + +/// Name of the environment variable containing the reachable IP address of the node. If set, the IP address contained in this +/// environment variable is used as the reachable IP address of the pageserver or safekeeper node during node registration. +/// In a Kubernetes environment, this environment variable should be set by Kubernetes to the Pod IP (specified in the Pod +/// template). +pub const HADRON_NODE_IP_ADDRESS: &str = "HADRON_NODE_IP_ADDRESS"; + +/// Read the reachable IP address of this page server from env var HADRON_NODE_IP_ADDRESS. +/// In Kubernetes this environment variable is set to the Pod IP (specified in the Pod template). +pub fn read_node_ip_addr_from_env() -> Result, Box> { + match var(HADRON_NODE_IP_ADDRESS) { + Ok(v) => { + if let Ok(addr) = IpAddr::from_str(&v) { + Ok(Some(addr)) + } else { + Err(format!("Invalid IP address string: {v}. Cannot be parsed as either an IPv4 or an IPv6 address.").into()) + } + } + Err(VarError::NotPresent) => Ok(None), + Err(e) => Err(e.into()), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::env; + use std::net::{Ipv4Addr, Ipv6Addr}; + + #[test] + fn test_read_node_ip_addr_from_env() { + // SAFETY: test code + unsafe { + // Test with a valid IPv4 address + env::set_var(HADRON_NODE_IP_ADDRESS, "192.168.1.1"); + let result = read_node_ip_addr_from_env().unwrap(); + assert_eq!(result, Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)))); + + // Test with a valid IPv6 address + env::set_var( + HADRON_NODE_IP_ADDRESS, + "2001:0db8:85a3:0000:0000:8a2e:0370:7334", + ); + } + let result = read_node_ip_addr_from_env().unwrap(); + assert_eq!( + result, + Some(IpAddr::V6( + Ipv6Addr::from_str("2001:0db8:85a3:0000:0000:8a2e:0370:7334").unwrap() + )) + ); + + // Test with an invalid IP address + // SAFETY: test code + unsafe { + env::set_var(HADRON_NODE_IP_ADDRESS, "invalid_ip"); + } + let result = read_node_ip_addr_from_env(); + assert!(result.is_err()); + + // Test with no environment variable set + // SAFETY: test code + unsafe { + env::remove_var(HADRON_NODE_IP_ADDRESS); + } + let result = read_node_ip_addr_from_env().unwrap(); + assert_eq!(result, None); + } +} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 2b81da017d..69771be5dc 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -26,6 +26,9 @@ pub mod auth; // utility functions and helper traits for unified unique id generation/serialization etc. pub mod id; +// utility functions to obtain reachable IP addresses in PS/SK nodes. +pub mod ip_address; + pub mod shard; mod hex; diff --git a/pageserver/src/controller_upcall_client.rs b/pageserver/src/controller_upcall_client.rs index f1f9aaf43c..be1de43d18 100644 --- a/pageserver/src/controller_upcall_client.rs +++ b/pageserver/src/controller_upcall_client.rs @@ -194,6 +194,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { listen_http_port: m.http_port, listen_https_port: m.https_port, availability_zone_id: az_id.expect("Checked above"), + node_ip_addr: None, }) } Err(e) => { diff --git a/safekeeper/client/src/mgmt_api.rs b/safekeeper/client/src/mgmt_api.rs index b4bb193a4b..3c8db3029e 100644 --- a/safekeeper/client/src/mgmt_api.rs +++ b/safekeeper/client/src/mgmt_api.rs @@ -6,10 +6,10 @@ use std::error::Error as _; use http_utils::error::HttpErrorBody; -use reqwest::{IntoUrl, Method, StatusCode}; +use reqwest::{IntoUrl, Method, Response, StatusCode}; use safekeeper_api::models::{ self, PullTimelineRequest, PullTimelineResponse, SafekeeperStatus, SafekeeperUtilization, - TimelineCreateRequest, TimelineStatus, + TimelineCreateRequest, }; use utils::id::{NodeId, TenantId, TimelineId}; use utils::logging::SecretString; @@ -161,13 +161,12 @@ impl Client { &self, tenant_id: TenantId, timeline_id: TimelineId, - ) -> Result { + ) -> Result { let uri = format!( "{}/v1/tenant/{}/timeline/{}", self.mgmt_api_endpoint, tenant_id, timeline_id ); - let resp = self.get(&uri).await?; - resp.json().await.map_err(Error::ReceiveBody) + self.get(&uri).await } pub async fn snapshot( diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index b2d5976ef4..79cf2f9149 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -23,6 +23,7 @@ use safekeeper::defaults::{ DEFAULT_PARTIAL_BACKUP_CONCURRENCY, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_SSL_CERT_FILE, DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE, }; +use safekeeper::hadron; use safekeeper::wal_backup::WalBackup; use safekeeper::{ BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, @@ -252,6 +253,10 @@ struct Args { /// Run in development mode (disables security checks) #[arg(long, help = "Run in development mode (disables security checks)")] dev: bool, + /* BEGIN_HADRON */ + #[arg(long)] + enable_pull_timeline_on_startup: bool, + /* END_HADRON */ } // Like PathBufValueParser, but allows empty string. @@ -435,6 +440,11 @@ async fn main() -> anyhow::Result<()> { use_https_safekeeper_api: args.use_https_safekeeper_api, enable_tls_wal_service_api: args.enable_tls_wal_service_api, force_metric_collection_on_scrape: args.force_metric_collection_on_scrape, + /* BEGIN_HADRON */ + advertise_pg_addr_tenant_only: None, + enable_pull_timeline_on_startup: args.enable_pull_timeline_on_startup, + hcc_base_url: None, + /* END_HADRON */ }); // initialize sentry if SENTRY_DSN is provided @@ -529,6 +539,20 @@ async fn start_safekeeper(conf: Arc) -> Result<()> { // Load all timelines from disk to memory. global_timelines.init().await?; + /* BEGIN_HADRON */ + if conf.enable_pull_timeline_on_startup && global_timelines.timelines_count() == 0 { + match hadron::hcc_pull_timelines(&conf, global_timelines.clone()).await { + Ok(_) => { + info!("Successfully pulled all timelines from peer safekeepers"); + } + Err(e) => { + error!("Failed to pull timelines from peer safekeepers: {:?}", e); + return Err(e); + } + } + } + /* END_HADRON */ + // Run everything in current thread rt, if asked. if conf.current_thread_runtime { info!("running in current thread runtime"); diff --git a/safekeeper/src/hadron.rs b/safekeeper/src/hadron.rs new file mode 100644 index 0000000000..b41bf2c3da --- /dev/null +++ b/safekeeper/src/hadron.rs @@ -0,0 +1,388 @@ +use pem::Pem; +use safekeeper_api::models::PullTimelineRequest; +use std::{collections::HashMap, env::VarError, net::IpAddr, sync::Arc, time::Duration}; +use tokio::time::sleep; +use tokio_util::sync::CancellationToken; +use url::Url; +use utils::{backoff, id::TenantTimelineId, ip_address}; + +use anyhow::Result; +use pageserver_api::controller_api::{ + AvailabilityZone, NodeRegisterRequest, SafekeeperTimeline, SafekeeperTimelinesResponse, +}; + +use crate::{ + GlobalTimelines, SafeKeeperConf, + metrics::{ + SK_RECOVERY_PULL_TIMELINE_ERRORS, SK_RECOVERY_PULL_TIMELINE_OKS, + SK_RECOVERY_PULL_TIMELINE_SECONDS, SK_RECOVERY_PULL_TIMELINES_SECONDS, + }, + pull_timeline, + timelines_global_map::DeleteOrExclude, +}; + +// Extract information in the SafeKeeperConf to build a NodeRegisterRequest used to register the safekeeper with the HCC. +fn build_node_registeration_request( + conf: &SafeKeeperConf, + node_ip_addr: Option, +) -> Result { + let advertise_pg_addr_with_port = conf + .advertise_pg_addr_tenant_only + .as_deref() + .expect("advertise_pg_addr_tenant_only is required to register with HCC"); + + // Extract host/port from the string. + let (advertise_host_addr, pg_port_str) = advertise_pg_addr_with_port.split_at( + advertise_pg_addr_with_port + .rfind(':') + .ok_or(anyhow::anyhow!("Invalid advertise_pg_addr"))?, + ); + // Need the `[1..]` to remove the leading ':'. + let pg_port = pg_port_str[1..] + .parse::() + .map_err(|e| anyhow::anyhow!("Cannot parse PG port: {}", e))?; + + let (_, http_port_str) = conf.listen_http_addr.split_at( + conf.listen_http_addr + .rfind(':') + .ok_or(anyhow::anyhow!("Invalid listen_http_addr"))?, + ); + let http_port = http_port_str[1..] + .parse::() + .map_err(|e| anyhow::anyhow!("Cannot parse HTTP port: {}", e))?; + + Ok(NodeRegisterRequest { + node_id: conf.my_id, + listen_pg_addr: advertise_host_addr.to_string(), + listen_pg_port: pg_port, + listen_http_addr: advertise_host_addr.to_string(), + listen_http_port: http_port, + node_ip_addr, + availability_zone_id: AvailabilityZone("todo".to_string()), + listen_grpc_addr: None, + listen_grpc_port: None, + listen_https_port: None, + }) +} + +// Retrieve the JWT token used for authenticating with HCC from the environment variable. +// Returns None if the token cannot be retrieved. +fn get_hcc_auth_token() -> Option { + match std::env::var("HCC_AUTH_TOKEN") { + Ok(v) => { + tracing::info!("Loaded JWT token for authentication with HCC"); + Some(v) + } + Err(VarError::NotPresent) => { + tracing::info!("No JWT token for authentication with HCC detected"); + None + } + Err(_) => { + tracing::info!( + "Failed to either load to detect non-present HCC_AUTH_TOKEN environment variable" + ); + None + } + } +} + +async fn send_safekeeper_register_request( + request_url: &Url, + auth_token: &Option, + request: &NodeRegisterRequest, +) -> Result<()> { + let client = reqwest::Client::new(); + let mut req_builder = client + .post(request_url.clone()) + .header("Content-Type", "application/json"); + if let Some(token) = auth_token { + req_builder = req_builder.bearer_auth(token); + } + req_builder + .json(&request) + .send() + .await? + .error_for_status()?; + Ok(()) +} + +/// Registers this safe keeper with the HCC. +pub async fn register(conf: &SafeKeeperConf) -> Result<()> { + match conf.hcc_base_url.as_ref() { + None => { + tracing::info!("HCC base URL is not set, skipping registration"); + Ok(()) + } + Some(hcc_base_url) => { + // The following operations acquiring the auth token and the node IP address both read environment + // variables. It's fine for now as this `register()` function is only called once during startup. + // If we start to talk to HCC more regularly in the safekeeper we should probably consider + // refactoring things into a "HadronClusterCoordinatorClient" struct. + let auth_token = get_hcc_auth_token(); + let node_ip_addr = + ip_address::read_node_ip_addr_from_env().expect("Error reading node IP address."); + + let request = build_node_registeration_request(conf, node_ip_addr)?; + let cancel = CancellationToken::new(); + let request_url = hcc_base_url.clone().join("/hadron-internal/v1/sk")?; + + backoff::retry( + || async { + send_safekeeper_register_request(&request_url, &auth_token, &request).await + }, + |_| false, + 3, + u32::MAX, + "Calling the HCC safekeeper register API", + &cancel, + ) + .await + .ok_or(anyhow::anyhow!( + "Error in forever retry loop. This error should never be surfaced." + ))? + } + } +} + +async fn safekeeper_list_timelines_request( + conf: &SafeKeeperConf, +) -> Result { + if conf.hcc_base_url.is_none() { + tracing::info!("HCC base URL is not set, skipping registration"); + return Err(anyhow::anyhow!("HCC base URL is not set")); + } + + // The following operations acquiring the auth token and the node IP address both read environment + // variables. It's fine for now as this `register()` function is only called once during startup. + // If we start to talk to HCC more regularly in the safekeeper we should probably consider + // refactoring things into a "HadronClusterCoordinatorClient" struct. + let auth_token = get_hcc_auth_token(); + let method = format!("/control/v1/safekeeper/{}/timelines", conf.my_id.0); + let request_url = conf.hcc_base_url.as_ref().unwrap().clone().join(&method)?; + + let client = reqwest::Client::new(); + let mut req_builder = client + .get(request_url.clone()) + .header("Content-Type", "application/json") + .query(&[("id", conf.my_id.0)]); + if let Some(token) = auth_token { + req_builder = req_builder.bearer_auth(token); + } + let response = req_builder + .send() + .await? + .error_for_status()? + .json::() + .await?; + Ok(response) +} + +// Returns true on success, false otherwise. +pub async fn hcc_pull_timeline( + timeline: SafekeeperTimeline, + conf: &SafeKeeperConf, + global_timelines: Arc, + nodeid_http: &HashMap, +) -> bool { + let mut request = PullTimelineRequest { + tenant_id: timeline.tenant_id, + timeline_id: timeline.timeline_id, + http_hosts: Vec::new(), + ignore_tombstone: None, + }; + for host in timeline.peers { + if host.0 == conf.my_id.0 { + continue; + } + if let Some(http_host) = nodeid_http.get(&host.0) { + request.http_hosts.push(http_host.clone()); + } + } + + let ca_certs = match conf + .ssl_ca_certs + .iter() + .map(Pem::contents) + .map(reqwest::Certificate::from_der) + .collect::, _>>() + { + Ok(result) => result, + Err(_) => { + return false; + } + }; + match pull_timeline::handle_request( + request, + conf.sk_auth_token.clone(), + ca_certs, + global_timelines.clone(), + true, + ) + .await + { + Ok(resp) => { + tracing::info!( + "Completed pulling tenant {} timeline {} from SK {:?}", + timeline.tenant_id, + timeline.timeline_id, + resp.safekeeper_host + ); + return true; + } + Err(e) => { + tracing::error!( + "Failed to pull tenant {} timeline {} from SK {}", + timeline.tenant_id, + timeline.timeline_id, + e + ); + + let ttid = TenantTimelineId { + tenant_id: timeline.tenant_id, + timeline_id: timeline.timeline_id, + }; + // Revert the failed timeline pull. + // Notice that not found timeline returns OK also. + match global_timelines + .delete_or_exclude(&ttid, DeleteOrExclude::DeleteLocal) + .await + { + Ok(dr) => { + tracing::info!( + "Deleted tenant {} timeline {} DirExists: {}", + timeline.tenant_id, + timeline.timeline_id, + dr.dir_existed, + ); + } + Err(e) => { + tracing::error!( + "Failed to delete tenant {} timeline {} from global_timelines: {}", + timeline.tenant_id, + timeline.timeline_id, + e + ); + } + } + } + } + false +} + +pub async fn hcc_pull_timeline_till_success( + timeline: SafekeeperTimeline, + conf: &SafeKeeperConf, + global_timelines: Arc, + nodeid_http: &HashMap, +) { + const MAX_PULL_TIMELINE_RETRIES: u64 = 100; + for i in 0..MAX_PULL_TIMELINE_RETRIES { + if hcc_pull_timeline( + timeline.clone(), + conf, + global_timelines.clone(), + nodeid_http, + ) + .await + { + SK_RECOVERY_PULL_TIMELINE_OKS.inc(); + return; + } + tracing::error!( + "Failed to pull timeline {} from SK peers, retrying {}/{}", + timeline.timeline_id, + i + 1, + MAX_PULL_TIMELINE_RETRIES + ); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + SK_RECOVERY_PULL_TIMELINE_ERRORS.inc(); +} + +pub async fn hcc_pull_timelines( + conf: &SafeKeeperConf, + global_timelines: Arc, +) -> Result<()> { + let _timer = SK_RECOVERY_PULL_TIMELINES_SECONDS.start_timer(); + tracing::info!("Start pulling timelines from SK peers"); + + let mut response = SafekeeperTimelinesResponse { + timelines: Vec::new(), + safekeeper_peers: Vec::new(), + }; + for i in 0..100 { + match safekeeper_list_timelines_request(conf).await { + Ok(timelines) => { + response = timelines; + } + Err(e) => { + tracing::error!("Failed to list timelines from HCC: {}", e); + if i == 99 { + return Err(e); + } + } + } + sleep(Duration::from_millis(100)).await; + } + + let mut nodeid_http = HashMap::new(); + for sk in response.safekeeper_peers { + nodeid_http.insert( + sk.node_id.0, + format!("http://{}:{}", sk.listen_http_addr, sk.http_port), + ); + } + tracing::info!("Received {} timelines from HCC", response.timelines.len()); + for timeline in response.timelines { + let _timer = SK_RECOVERY_PULL_TIMELINE_SECONDS + .with_label_values(&[ + &timeline.tenant_id.to_string(), + &timeline.timeline_id.to_string(), + ]) + .start_timer(); + hcc_pull_timeline_till_success(timeline, conf, global_timelines.clone(), &nodeid_http) + .await; + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use utils::id::NodeId; + + #[test] + fn test_build_node_registeration_request() { + // Test that: + // 1. We always extract the host name and port used to register with the HCC from the + // `advertise_pg_addr` if it is set. + // 2. The correct ports are extracted from `advertise_pg_addr` and `listen_http_addr`. + let mut conf = SafeKeeperConf::dummy(); + conf.my_id = NodeId(1); + conf.advertise_pg_addr_tenant_only = + Some("safe-keeper-1.safe-keeper.hadron.svc.cluster.local:5454".to_string()); + // `listen_pg_addr` and `listen_pg_addr_tenant_only` are not used for node registration. Set them to a different + // host and port values and make sure that they don't show up in the node registration request. + conf.listen_pg_addr = "0.0.0.0:5456".to_string(); + conf.listen_pg_addr_tenant_only = Some("0.0.0.0:5456".to_string()); + conf.listen_http_addr = "0.0.0.0:7676".to_string(); + let node_ip_addr: Option = Some("127.0.0.1".parse().unwrap()); + + let request = build_node_registeration_request(&conf, node_ip_addr).unwrap(); + assert_eq!(request.node_id, NodeId(1)); + assert_eq!( + request.listen_pg_addr, + "safe-keeper-1.safe-keeper.hadron.svc.cluster.local" + ); + assert_eq!(request.listen_pg_port, 5454); + assert_eq!( + request.listen_http_addr, + "safe-keeper-1.safe-keeper.hadron.svc.cluster.local" + ); + assert_eq!(request.listen_http_port, 7676); + assert_eq!( + request.node_ip_addr, + Some(IpAddr::V4("127.0.0.1".parse().unwrap())) + ); + } +} diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 4b061c65d9..a0ee2facb5 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -241,9 +241,14 @@ async fn timeline_pull_handler(mut request: Request) -> Result, pub availability_zone: Option, pub no_sync: bool, + /* BEGIN_HADRON */ + pub advertise_pg_addr_tenant_only: Option, + pub enable_pull_timeline_on_startup: bool, + pub hcc_base_url: Option, + /* END_HADRON */ pub broker_endpoint: Uri, pub broker_keepalive_interval: Duration, pub heartbeat_timeout: Duration, @@ -185,6 +192,11 @@ impl SafeKeeperConf { use_https_safekeeper_api: false, enable_tls_wal_service_api: false, force_metric_collection_on_scrape: true, + /* BEGIN_HADRON */ + advertise_pg_addr_tenant_only: None, + enable_pull_timeline_on_startup: false, + hcc_base_url: None, + /* END_HADRON */ } } } diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index 1f98651e71..e1af51c115 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -85,6 +85,43 @@ pub static WAL_STORAGE_LIMIT_ERRORS: Lazy = Lazy::new(|| { ) .expect("Failed to register safekeeper_wal_storage_limit_errors counter") }); +pub static SK_RECOVERY_PULL_TIMELINE_ERRORS: Lazy = Lazy::new(|| { + register_int_counter!( + "safekeeper_recovery_pull_timeline_errors", + concat!( + "Number of errors due to pull_timeline errors during SK lost disk recovery.", + "An increase in this metric indicates pull timelines runs into error." + ) + ) + .expect("Failed to register safekeeper_recovery_pull_timeline_errors counter") +}); +pub static SK_RECOVERY_PULL_TIMELINE_OKS: Lazy = Lazy::new(|| { + register_int_counter!( + "safekeeper_recovery_pull_timeline_oks", + concat!( + "Number of successful pull_timeline during SK lost disk recovery.", + "An increase in this metric indicates pull timelines is successful." + ) + ) + .expect("Failed to register safekeeper_recovery_pull_timeline_oks counter") +}); +pub static SK_RECOVERY_PULL_TIMELINES_SECONDS: Lazy = Lazy::new(|| { + register_histogram!( + "safekeeper_recovery_pull_timelines_seconds", + "Seconds to pull timelines", + DISK_FSYNC_SECONDS_BUCKETS.to_vec() + ) + .expect("Failed to register safekeeper_recovery_pull_timelines_seconds histogram") +}); +pub static SK_RECOVERY_PULL_TIMELINE_SECONDS: Lazy = Lazy::new(|| { + register_histogram_vec!( + "safekeeper_recovery_pull_timeline_seconds", + "Seconds to pull timeline", + &["tenant_id", "timeline_id"], + DISK_FSYNC_SECONDS_BUCKETS.to_vec() + ) + .expect("Failed to register safekeeper_recovery_pull_timeline_seconds histogram vec") +}); /* END_HADRON */ pub static PERSIST_CONTROL_FILE_SECONDS: Lazy = Lazy::new(|| { register_histogram!( diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 1c9e5bade5..b4c4877b2c 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -8,6 +8,7 @@ use bytes::Bytes; use camino::Utf8PathBuf; use chrono::{DateTime, Utc}; use futures::{SinkExt, StreamExt, TryStreamExt}; +use http::StatusCode; use http_utils::error::ApiError; use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo}; use remote_storage::GenericRemoteStorage; @@ -21,10 +22,11 @@ use tokio::fs::OpenOptions; use tokio::io::AsyncWrite; use tokio::sync::mpsc; use tokio::task; +use tokio::time::sleep; use tokio_tar::{Archive, Builder, Header}; use tokio_util::io::{CopyToBytes, SinkWriter}; use tokio_util::sync::PollSender; -use tracing::{error, info, instrument}; +use tracing::{error, info, instrument, warn}; use utils::crashsafe::fsync_async_opt; use utils::id::{NodeId, TenantTimelineId}; use utils::logging::SecretString; @@ -449,6 +451,7 @@ pub async fn handle_request( sk_auth_token: Option, ssl_ca_certs: Vec, global_timelines: Arc, + wait_for_peer_timeline_status: bool, ) -> Result { let existing_tli = global_timelines.get(TenantTimelineId::new( request.tenant_id, @@ -472,37 +475,100 @@ pub async fn handle_request( let http_hosts = request.http_hosts.clone(); // Figure out statuses of potential donors. - let responses: Vec> = - futures::future::join_all(http_hosts.iter().map(|url| async { - let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone()); - let info = cclient - .timeline_status(request.tenant_id, request.timeline_id) - .await?; - Ok(info) - })) - .await; - let mut statuses = Vec::new(); - for (i, response) in responses.into_iter().enumerate() { - match response { - Ok(status) => { - statuses.push((status, i)); - } - Err(e) => { - info!("error fetching status from {}: {e}", http_hosts[i]); + if !wait_for_peer_timeline_status { + let responses: Vec> = + futures::future::join_all(http_hosts.iter().map(|url| async { + let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone()); + let resp = cclient + .timeline_status(request.tenant_id, request.timeline_id) + .await?; + let info: TimelineStatus = resp + .json() + .await + .context("Failed to deserialize timeline status") + .map_err(|e| mgmt_api::Error::ReceiveErrorBody(e.to_string()))?; + Ok(info) + })) + .await; + + for (i, response) in responses.into_iter().enumerate() { + match response { + Ok(status) => { + statuses.push((status, i)); + } + Err(e) => { + info!("error fetching status from {}: {e}", http_hosts[i]); + } } } - } - // Allow missing responses from up to one safekeeper (say due to downtime) - // e.g. if we created a timeline on PS A and B, with C being offline. Then B goes - // offline and C comes online. Then we want a pull on C with A and B as hosts to work. - let min_required_successful = (http_hosts.len() - 1).max(1); - if statuses.len() < min_required_successful { - return Err(ApiError::InternalServerError(anyhow::anyhow!( - "only got {} successful status responses. required: {min_required_successful}", - statuses.len() - ))); + // Allow missing responses from up to one safekeeper (say due to downtime) + // e.g. if we created a timeline on PS A and B, with C being offline. Then B goes + // offline and C comes online. Then we want a pull on C with A and B as hosts to work. + let min_required_successful = (http_hosts.len() - 1).max(1); + if statuses.len() < min_required_successful { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "only got {} successful status responses. required: {min_required_successful}", + statuses.len() + ))); + } + } else { + let mut retry = true; + // We must get status from all other peers. + // Otherwise, we may run into split-brain scenario. + while retry { + statuses.clear(); + retry = false; + for (i, url) in http_hosts.iter().enumerate() { + let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone()); + match cclient + .timeline_status(request.tenant_id, request.timeline_id) + .await + { + Ok(resp) => { + if resp.status() == StatusCode::NOT_FOUND { + warn!( + "Timeline {} not found on peer SK {}, no need to pull it", + TenantTimelineId::new(request.tenant_id, request.timeline_id), + url + ); + return Ok(PullTimelineResponse { + safekeeper_host: None, + }); + } + let info: TimelineStatus = resp + .json() + .await + .context("Failed to deserialize timeline status") + .map_err(ApiError::InternalServerError)?; + statuses.push((info, i)); + } + Err(e) => { + match e { + // If we get a 404, it means the timeline doesn't exist on this safekeeper. + // We can ignore this error. + mgmt_api::Error::ApiError(status, _) + if status == StatusCode::NOT_FOUND => + { + warn!( + "Timeline {} not found on peer SK {}, no need to pull it", + TenantTimelineId::new(request.tenant_id, request.timeline_id), + url + ); + return Ok(PullTimelineResponse { + safekeeper_host: None, + }); + } + _ => {} + } + retry = true; + error!("Failed to get timeline status from {}: {:#}", url, e); + } + } + } + sleep(std::time::Duration::from_millis(100)).await; + } } // Find the most advanced safekeeper @@ -511,6 +577,12 @@ pub async fn handle_request( .max_by_key(|(status, _)| { ( status.acceptor_state.epoch, + /* BEGIN_HADRON */ + // We need to pull from the SK with the highest term. + // This is because another compute may come online and vote the same highest term again on the other two SKs. + // Then, there will be 2 computes running on the same term. + status.acceptor_state.term, + /* END_HADRON */ status.flush_lsn, status.commit_lsn, ) diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index 280cd790a4..393df6228e 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -191,6 +191,11 @@ pub fn run_server(os: NodeOs, disk: Arc) -> Result<()> { use_https_safekeeper_api: false, enable_tls_wal_service_api: false, force_metric_collection_on_scrape: true, + /* BEGIN_HADRON */ + enable_pull_timeline_on_startup: false, + advertise_pg_addr_tenant_only: None, + hcc_base_url: None, + /* END_HADRON */ }; let mut global = GlobalMap::new(disk, conf.clone())?; diff --git a/test_runner/regress/test_wal_restore.py b/test_runner/regress/test_wal_restore.py index 0bb63308bb..573016f772 100644 --- a/test_runner/regress/test_wal_restore.py +++ b/test_runner/regress/test_wal_restore.py @@ -3,6 +3,7 @@ from __future__ import annotations import sys import tarfile import tempfile +from pathlib import Path from typing import TYPE_CHECKING import pytest @@ -198,3 +199,115 @@ def test_wal_restore_http(neon_env_builder: NeonEnvBuilder, broken_tenant: bool) # the table is back now! restored = env.endpoints.create_start("main") assert restored.safe_psql("select count(*) from t", user="cloud_admin") == [(300000,)] + + +# BEGIN_HADRON +# TODO: re-enable once CM python is integreated. +# def clear_directory(directory): +# for item in os.listdir(directory): +# item_path = os.path.join(directory, item) +# if os.path.isdir(item_path): +# log.info(f"removing SK directory: {item_path}") +# shutil.rmtree(item_path) +# else: +# log.info(f"removing SK file: {item_path}") +# os.remove(item_path) + + +# def test_sk_pull_timelines( +# neon_env_builder: NeonEnvBuilder, +# ): +# DBNAME = "regression" +# superuser_name = "databricks_superuser" +# neon_env_builder.num_safekeepers = 3 +# neon_env_builder.num_pageservers = 4 +# neon_env_builder.safekeeper_extra_opts = ["--enable-pull-timeline-on-startup"] +# neon_env_builder.enable_safekeeper_remote_storage(s3_storage()) + +# env = neon_env_builder.init_start(initial_tenant_shard_count=4) + +# env.compute_manager.start(base_port=env.compute_manager_port) + +# test_creator = "test_creator" +# test_metastore_id = uuid4() +# test_account_id = uuid4() +# test_workspace_id = 1 +# test_workspace_url = "http://test_workspace_url" +# test_metadata_version = 1 +# test_metadata = { +# "state": "INSTANCE_PROVISIONING", +# "admin_rolename": "admin", +# "admin_password_scram": "abc123456", +# } + +# test_instance_name_1 = "test_instance_1" +# test_instance_read_write_compute_pool_1 = { +# "instance_name": test_instance_name_1, +# "compute_pool_name": "compute_pool_1", +# "creator": test_creator, +# "capacity": 2.0, +# "node_count": 1, +# "metadata_version": 0, +# "metadata": { +# "state": "INSTANCE_PROVISIONING", +# }, +# } + +# test_instance_1_readable_secondaries_enabled = False + +# # Test creation +# create_instance_with_retries( +# env, +# test_instance_name_1, +# test_creator, +# test_metastore_id, +# test_account_id, +# test_workspace_id, +# test_workspace_url, +# test_instance_read_write_compute_pool_1, +# test_metadata_version, +# test_metadata, +# test_instance_1_readable_secondaries_enabled, +# ) +# instance = env.compute_manager.get_instance_by_name(test_instance_name_1, test_workspace_id) +# log.info(f"haoyu Instance created: {instance}") +# assert instance["instance_name"] == test_instance_name_1 +# test_instance_id = instance["instance_id"] +# instance_detail = env.compute_manager.describe_instance(test_instance_id) +# log.info(f"haoyu Instance detail: {instance_detail}") + +# env.initial_tenant = instance_detail[0]["tenant_id"] +# env.initial_timeline = instance_detail[0]["timeline_id"] + +# # Connect to postgres and create a database called "regression". +# endpoint = env.endpoints.create_start("main") +# endpoint.safe_psql(f"CREATE ROLE {superuser_name}") +# endpoint.safe_psql(f"CREATE DATABASE {DBNAME}") + +# endpoint.safe_psql("CREATE TABLE usertable ( YCSB_KEY INT, FIELD0 TEXT);") +# # Write some data. ~20 MB. +# num_rows = 0 +# for _i in range(0, 20000): +# endpoint.safe_psql( +# "INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False +# ) +# num_rows += 1 + +# log.info(f"SKs {env.storage_controller.hcc_sk_node_list()}") + +# env.safekeepers[0].stop(immediate=True) +# clear_directory(env.safekeepers[0].data_dir) +# env.safekeepers[0].start() + +# # PG can still write data. ~20 MB. +# for _i in range(0, 20000): +# endpoint.safe_psql( +# "INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False +# ) +# num_rows += 1 + +# tuples = endpoint.safe_psql("SELECT COUNT(*) FROM usertable;") +# assert tuples[0][0] == num_rows +# endpoint.stop_and_destroy() + +# END_HADRON