From 9c5ad2134123f4757febe8c4b46837254e4062fb Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 25 Jul 2024 14:09:12 +0100 Subject: [PATCH] storcon: make heartbeats restart aware (#8222) ## Problem Re-attach blocks the pageserver http server from starting up. Hence, it can't reply to heartbeats until that's done. This makes the storage controller mark the node off-line (not good). We worked around this by setting the interval after which nodes are marked offline to 5 minutes. This isn't a long term solution. ## Summary of changes * Introduce a new `NodeAvailability` state: `WarmingUp`. This state models the following time interval: * From receiving the re-attach request until the pageserver replies to the first heartbeat post re-attach * The heartbeat delta generator becomes aware of this state and uses a separate longer interval * Flag `max-warming-up-interval` now models the longer timeout and `max-offline-interval` the shorter one to match the names of the states Closes https://github.com/neondatabase/neon/issues/7552 --- control_plane/src/bin/neon_local.rs | 65 +++++++- control_plane/src/local_env.rs | 13 +- control_plane/src/storage_controller.rs | 20 ++- libs/pageserver_api/src/controller_api.rs | 16 +- pageserver/src/control_plane_client.rs | 4 +- storage_controller/src/heartbeater.rs | 89 ++++++---- storage_controller/src/main.rs | 20 ++- storage_controller/src/node.rs | 40 +++-- storage_controller/src/service.rs | 157 ++++++++++-------- test_runner/fixtures/neon_fixtures.py | 65 +++++++- .../test_storage_controller_scale.py | 32 +++- .../regress/test_pageserver_generations.py | 23 ++- .../regress/test_pageserver_restart.py | 6 +- .../regress/test_pageserver_secondary.py | 7 +- test_runner/regress/test_sharding.py | 3 +- .../regress/test_storage_controller.py | 112 +++++++++++-- .../regress/test_threshold_based_eviction.py | 15 +- 17 files changed, 508 insertions(+), 179 deletions(-) diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 4bf1b29785..51e9a51a57 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -21,7 +21,9 @@ use pageserver_api::config::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT, }; -use pageserver_api::controller_api::{PlacementPolicy, TenantCreateRequest}; +use pageserver_api::controller_api::{ + NodeAvailabilityWrapper, PlacementPolicy, TenantCreateRequest, +}; use pageserver_api::models::{ShardParameters, TimelineCreateRequest, TimelineInfo}; use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId}; use postgres_backend::AuthType; @@ -1250,9 +1252,70 @@ async fn handle_start_all( exit(1); } } + + neon_start_status_check(env, retry_timeout).await?; + Ok(()) } +async fn neon_start_status_check( + env: &local_env::LocalEnv, + retry_timeout: &Duration, +) -> anyhow::Result<()> { + const RETRY_INTERVAL: Duration = Duration::from_millis(100); + const NOTICE_AFTER_RETRIES: Duration = Duration::from_secs(5); + + if env.control_plane_api.is_none() { + return Ok(()); + } + + let storcon = StorageController::from_env(env); + + let retries = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis(); + let notice_after_retries = retry_timeout.as_millis() / NOTICE_AFTER_RETRIES.as_millis(); + + println!("\nRunning neon status check"); + + for retry in 0..retries { + if retry == notice_after_retries { + println!("\nNeon status check has not passed yet, continuing to wait") + } + + let mut passed = true; + let mut nodes = storcon.node_list().await?; + let mut pageservers = env.pageservers.clone(); + + if nodes.len() != pageservers.len() { + continue; + } + + nodes.sort_by_key(|ps| ps.id); + pageservers.sort_by_key(|ps| ps.id); + + for (idx, pageserver) in pageservers.iter().enumerate() { + let node = &nodes[idx]; + if node.id != pageserver.id { + passed = false; + break; + } + + if !matches!(node.availability, NodeAvailabilityWrapper::Active) { + passed = false; + break; + } + } + + if passed { + println!("\nNeon started and passed status check"); + return Ok(()); + } + + tokio::time::sleep(RETRY_INTERVAL).await; + } + + anyhow::bail!("\nNeon passed status check") +} + async fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let immediate = sub_match.get_one::("stop-mode").map(|s| s.as_str()) == Some("immediate"); diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 3ac3ce21df..d7830a5e70 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -151,7 +151,10 @@ pub struct NeonBroker { pub struct NeonStorageControllerConf { /// Heartbeat timeout before marking a node offline #[serde(with = "humantime_serde")] - pub max_unavailable: Duration, + pub max_offline: Duration, + + #[serde(with = "humantime_serde")] + pub max_warming_up: Duration, /// Threshold for auto-splitting a tenant into shards pub split_threshold: Option, @@ -159,14 +162,16 @@ pub struct NeonStorageControllerConf { impl NeonStorageControllerConf { // Use a shorter pageserver unavailability interval than the default to speed up tests. - const DEFAULT_MAX_UNAVAILABLE_INTERVAL: std::time::Duration = - std::time::Duration::from_secs(10); + const DEFAULT_MAX_OFFLINE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10); + + const DEFAULT_MAX_WARMING_UP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30); } impl Default for NeonStorageControllerConf { fn default() -> Self { Self { - max_unavailable: Self::DEFAULT_MAX_UNAVAILABLE_INTERVAL, + max_offline: Self::DEFAULT_MAX_OFFLINE_INTERVAL, + max_warming_up: Self::DEFAULT_MAX_WARMING_UP_INTERVAL, split_threshold: None, } } diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index d7aedd711a..e054e9ee57 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -5,8 +5,9 @@ use crate::{ use camino::{Utf8Path, Utf8PathBuf}; use pageserver_api::{ controller_api::{ - NodeConfigureRequest, NodeRegisterRequest, TenantCreateRequest, TenantCreateResponse, - TenantLocateResponse, TenantShardMigrateRequest, TenantShardMigrateResponse, + NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest, + TenantCreateResponse, TenantLocateResponse, TenantShardMigrateRequest, + TenantShardMigrateResponse, }, models::{ TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo, @@ -353,8 +354,10 @@ impl StorageController { "--dev", "--database-url", &database_url, - "--max-unavailable-interval", - &humantime::Duration::from(self.config.max_unavailable).to_string(), + "--max-offline-interval", + &humantime::Duration::from(self.config.max_offline).to_string(), + "--max-warming-up-interval", + &humantime::Duration::from(self.config.max_warming_up).to_string(), ] .into_iter() .map(|s| s.to_string()) @@ -625,6 +628,15 @@ impl StorageController { .await } + pub async fn node_list(&self) -> anyhow::Result> { + self.dispatch::<(), Vec>( + Method::GET, + "control/v1/node".to_string(), + None, + ) + .await + } + #[instrument(skip(self))] pub async fn ready(&self) -> anyhow::Result<()> { self.dispatch::<(), ()>(Method::GET, "ready".to_string(), None) diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index d0e1eb6b28..474f796040 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -1,4 +1,5 @@ use std::str::FromStr; +use std::time::Instant; /// Request/response types for the storage controller /// API (`/control/v1` prefix). Implemented by the server @@ -150,11 +151,16 @@ impl UtilizationScore { } } -#[derive(Serialize, Deserialize, Clone, Copy, Debug)] +#[derive(Serialize, Clone, Copy, Debug)] #[serde(into = "NodeAvailabilityWrapper")] pub enum NodeAvailability { // Normal, happy state Active(UtilizationScore), + // Node is warming up, but we expect it to become available soon. Covers + // the time span between the re-attach response being composed on the storage controller + // and the first successful heartbeat after the processing of the re-attach response + // finishes on the pageserver. + WarmingUp(Instant), // Offline: Tenants shouldn't try to attach here, but they may assume that their // secondary locations on this node still exist. Newly added nodes are in this // state until we successfully contact them. @@ -164,7 +170,10 @@ pub enum NodeAvailability { impl PartialEq for NodeAvailability { fn eq(&self, other: &Self) -> bool { use NodeAvailability::*; - matches!((self, other), (Active(_), Active(_)) | (Offline, Offline)) + matches!( + (self, other), + (Active(_), Active(_)) | (Offline, Offline) | (WarmingUp(_), WarmingUp(_)) + ) } } @@ -176,6 +185,7 @@ impl Eq for NodeAvailability {} #[derive(Serialize, Deserialize, Clone, Copy, Debug)] pub enum NodeAvailabilityWrapper { Active, + WarmingUp, Offline, } @@ -185,6 +195,7 @@ impl From for NodeAvailability { // Assume the worst utilisation score to begin with. It will later be updated by // the heartbeats. NodeAvailabilityWrapper::Active => NodeAvailability::Active(UtilizationScore::worst()), + NodeAvailabilityWrapper::WarmingUp => NodeAvailability::WarmingUp(Instant::now()), NodeAvailabilityWrapper::Offline => NodeAvailability::Offline, } } @@ -194,6 +205,7 @@ impl From for NodeAvailabilityWrapper { fn from(val: NodeAvailability) -> Self { match val { NodeAvailability::Active(_) => NodeAvailabilityWrapper::Active, + NodeAvailability::WarmingUp(_) => NodeAvailabilityWrapper::WarmingUp, NodeAvailability::Offline => NodeAvailabilityWrapper::Offline, } } diff --git a/pageserver/src/control_plane_client.rs b/pageserver/src/control_plane_client.rs index 26e7cc7ef8..b5d9267d79 100644 --- a/pageserver/src/control_plane_client.rs +++ b/pageserver/src/control_plane_client.rs @@ -171,14 +171,14 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient { register, }; - fail::fail_point!("control-plane-client-re-attach"); - let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?; tracing::info!( "Received re-attach response with {} tenants", response.tenants.len() ); + failpoint_support::sleep_millis_async!("control-plane-client-re-attach"); + Ok(response .tenants .into_iter() diff --git a/storage_controller/src/heartbeater.rs b/storage_controller/src/heartbeater.rs index 14cda0a289..1bb9c17f30 100644 --- a/storage_controller/src/heartbeater.rs +++ b/storage_controller/src/heartbeater.rs @@ -22,7 +22,8 @@ struct HeartbeaterTask { state: HashMap, - max_unavailable_interval: Duration, + max_offline_interval: Duration, + max_warming_up_interval: Duration, jwt_token: Option, } @@ -31,7 +32,9 @@ pub(crate) enum PageserverState { Available { last_seen_at: Instant, utilization: PageserverUtilization, - new: bool, + }, + WarmingUp { + started_at: Instant, }, Offline, } @@ -57,12 +60,18 @@ pub(crate) struct Heartbeater { impl Heartbeater { pub(crate) fn new( jwt_token: Option, - max_unavailable_interval: Duration, + max_offline_interval: Duration, + max_warming_up_interval: Duration, cancel: CancellationToken, ) -> Self { let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::(); - let mut heartbeater = - HeartbeaterTask::new(receiver, jwt_token, max_unavailable_interval, cancel); + let mut heartbeater = HeartbeaterTask::new( + receiver, + jwt_token, + max_offline_interval, + max_warming_up_interval, + cancel, + ); tokio::task::spawn(async move { heartbeater.run().await }); Self { sender } @@ -88,14 +97,16 @@ impl HeartbeaterTask { fn new( receiver: tokio::sync::mpsc::UnboundedReceiver, jwt_token: Option, - max_unavailable_interval: Duration, + max_offline_interval: Duration, + max_warming_up_interval: Duration, cancel: CancellationToken, ) -> Self { Self { receiver, cancel, state: HashMap::new(), - max_unavailable_interval, + max_offline_interval, + max_warming_up_interval, jwt_token, } } @@ -128,16 +139,15 @@ impl HeartbeaterTask { heartbeat_futs.push({ let jwt_token = self.jwt_token.clone(); let cancel = self.cancel.clone(); - let new_node = !self.state.contains_key(node_id); // Clone the node and mark it as available such that the request // goes through to the pageserver even when the node is marked offline. // This doesn't impact the availability observed by [`crate::service::Service`]. - let mut node = node.clone(); - node.set_availability(NodeAvailability::Active(UtilizationScore::worst())); + let mut node_clone = node.clone(); + node_clone.set_availability(NodeAvailability::Active(UtilizationScore::worst())); async move { - let response = node + let response = node_clone .with_client_retries( |client| async move { client.get_utilization().await }, &jwt_token, @@ -161,7 +171,12 @@ impl HeartbeaterTask { PageserverState::Available { last_seen_at: Instant::now(), utilization, - new: new_node, + } + } else if let NodeAvailability::WarmingUp(last_seen_at) = + node.get_availability() + { + PageserverState::WarmingUp { + started_at: last_seen_at, } } else { PageserverState::Offline @@ -187,53 +202,67 @@ impl HeartbeaterTask { } } } + + let mut warming_up = 0; + let mut offline = 0; + for state in new_state.values() { + match state { + PageserverState::WarmingUp { .. } => { + warming_up += 1; + } + PageserverState::Offline { .. } => offline += 1, + PageserverState::Available { .. } => {} + } + } + tracing::info!( - "Heartbeat round complete for {} nodes, {} offline", + "Heartbeat round complete for {} nodes, {} warming-up, {} offline", new_state.len(), - new_state - .values() - .filter(|s| match s { - PageserverState::Available { .. } => { - false - } - PageserverState::Offline => true, - }) - .count() + warming_up, + offline ); let mut deltas = Vec::new(); let now = Instant::now(); - for (node_id, ps_state) in new_state { + for (node_id, ps_state) in new_state.iter_mut() { use std::collections::hash_map::Entry::*; - let entry = self.state.entry(node_id); + let entry = self.state.entry(*node_id); let mut needs_update = false; match entry { Occupied(ref occ) => match (occ.get(), &ps_state) { (PageserverState::Offline, PageserverState::Offline) => {} (PageserverState::Available { last_seen_at, .. }, PageserverState::Offline) => { - if now - *last_seen_at >= self.max_unavailable_interval { - deltas.push((node_id, ps_state.clone())); + if now - *last_seen_at >= self.max_offline_interval { + deltas.push((*node_id, ps_state.clone())); needs_update = true; } } + (_, PageserverState::WarmingUp { started_at }) => { + if now - *started_at >= self.max_warming_up_interval { + *ps_state = PageserverState::Offline; + } + + deltas.push((*node_id, ps_state.clone())); + needs_update = true; + } _ => { - deltas.push((node_id, ps_state.clone())); + deltas.push((*node_id, ps_state.clone())); needs_update = true; } }, Vacant(_) => { // This is a new node. Don't generate a delta for it. - deltas.push((node_id, ps_state.clone())); + deltas.push((*node_id, ps_state.clone())); } } match entry { Occupied(mut occ) if needs_update => { - (*occ.get_mut()) = ps_state; + (*occ.get_mut()) = ps_state.clone(); } Vacant(vac) => { - vac.insert(ps_state); + vac.insert(ps_state.clone()); } _ => {} } diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 789f96beb3..adbf5c6496 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -10,7 +10,8 @@ use storage_controller::http::make_router; use storage_controller::metrics::preinitialize_metrics; use storage_controller::persistence::Persistence; use storage_controller::service::{ - Config, Service, MAX_UNAVAILABLE_INTERVAL_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT, + Config, Service, MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT, + RECONCILER_CONCURRENCY_DEFAULT, }; use tokio::signal::unix::SignalKind; use tokio_util::sync::CancellationToken; @@ -61,7 +62,12 @@ struct Cli { /// Grace period before marking unresponsive pageserver offline #[arg(long)] - max_unavailable_interval: Option, + max_offline_interval: Option, + + /// More tolerant grace period before marking unresponsive pagserver offline used + /// around pageserver restarts + #[arg(long)] + max_warming_up_interval: Option, /// Size threshold for automatically splitting shards (disabled by default) #[arg(long)] @@ -254,10 +260,14 @@ async fn async_main() -> anyhow::Result<()> { jwt_token: secrets.jwt_token, control_plane_jwt_token: secrets.control_plane_jwt_token, compute_hook_url: args.compute_hook_url, - max_unavailable_interval: args - .max_unavailable_interval + max_offline_interval: args + .max_offline_interval .map(humantime::Duration::into) - .unwrap_or(MAX_UNAVAILABLE_INTERVAL_DEFAULT), + .unwrap_or(MAX_OFFLINE_INTERVAL_DEFAULT), + max_warming_up_interval: args + .max_warming_up_interval + .map(humantime::Duration::into) + .unwrap_or(MAX_WARMING_UP_INTERVAL_DEFAULT), reconciler_concurrency: args .reconciler_concurrency .unwrap_or(RECONCILER_CONCURRENCY_DEFAULT), diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index fff44aaf26..ea765ca123 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -3,7 +3,7 @@ use std::{str::FromStr, time::Duration}; use pageserver_api::{ controller_api::{ NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy, - TenantLocateResponseShard, UtilizationScore, + TenantLocateResponseShard, }, shard::TenantShardId, }; @@ -46,6 +46,8 @@ pub(crate) struct Node { /// whether/how they changed it. pub(crate) enum AvailabilityTransition { ToActive, + ToWarmingUpFromActive, + ToWarmingUpFromOffline, ToOffline, Unchanged, } @@ -90,22 +92,34 @@ impl Node { } } + pub(crate) fn get_availability(&self) -> NodeAvailability { + self.availability + } + pub(crate) fn set_availability(&mut self, availability: NodeAvailability) { + use AvailabilityTransition::*; + use NodeAvailability::WarmingUp; + match self.get_availability_transition(availability) { - AvailabilityTransition::ToActive => { + ToActive => { // Give the node a new cancellation token, effectively resetting it to un-cancelled. Any // users of previously-cloned copies of the node will still see the old cancellation // state. For example, Reconcilers in flight will have to complete and be spawned // again to realize that the node has become available. self.cancel = CancellationToken::new(); } - AvailabilityTransition::ToOffline => { + ToOffline | ToWarmingUpFromActive => { // Fire the node's cancellation token to cancel any in-flight API requests to it self.cancel.cancel(); } - AvailabilityTransition::Unchanged => {} + Unchanged | ToWarmingUpFromOffline => {} + } + + if let (WarmingUp(crnt), WarmingUp(proposed)) = (self.availability, availability) { + self.availability = WarmingUp(std::cmp::max(crnt, proposed)); + } else { + self.availability = availability; } - self.availability = availability; } /// Without modifying the availability of the node, convert the intended availability @@ -120,16 +134,10 @@ impl Node { match (self.availability, availability) { (Offline, Active(_)) => ToActive, (Active(_), Offline) => ToOffline, - // Consider the case when the storage controller handles the re-attach of a node - // before the heartbeats detect that the node is back online. We still need - // [`Service::node_configure`] to attempt reconciliations for shards with an - // unknown observed location. - // The unsavoury match arm below handles this situation. - (Active(lhs), Active(rhs)) - if lhs == UtilizationScore::worst() && rhs < UtilizationScore::worst() => - { - ToActive - } + (Active(_), WarmingUp(_)) => ToWarmingUpFromActive, + (WarmingUp(_), Offline) => ToOffline, + (WarmingUp(_), Active(_)) => ToActive, + (Offline, WarmingUp(_)) => ToWarmingUpFromOffline, _ => Unchanged, } } @@ -147,7 +155,7 @@ impl Node { pub(crate) fn may_schedule(&self) -> MaySchedule { let score = match self.availability { NodeAvailability::Active(score) => score, - NodeAvailability::Offline => return MaySchedule::No, + NodeAvailability::Offline | NodeAvailability::WarmingUp(_) => return MaySchedule::No, }; match self.scheduling { diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 2a6d5d3578..860fe4802a 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -100,9 +100,13 @@ pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); /// How long a node may be unresponsive to heartbeats before we declare it offline. /// This must be long enough to cover node restarts as well as normal operations: in future -/// it should be separated into distinct timeouts for startup vs. normal operation -/// (``) -pub const MAX_UNAVAILABLE_INTERVAL_DEFAULT: Duration = Duration::from_secs(300); +pub const MAX_OFFLINE_INTERVAL_DEFAULT: Duration = Duration::from_secs(30); + +/// How long a node may be unresponsive to heartbeats during start up before we declare it +/// offline. This is much more lenient than [`MAX_OFFLINE_INTERVAL_DEFAULT`] since the pageserver's +/// handling of the re-attach response may take a long time and blocks heartbeats from +/// being handled on the pageserver side. +pub const MAX_WARMING_UP_INTERVAL_DEFAULT: Duration = Duration::from_secs(300); #[derive(Clone, strum_macros::Display)] enum TenantOperations { @@ -236,7 +240,12 @@ pub struct Config { /// Grace period within which a pageserver does not respond to heartbeats, but is still /// considered active. Once the grace period elapses, the next heartbeat failure will /// mark the pagseserver offline. - pub max_unavailable_interval: Duration, + pub max_offline_interval: Duration, + + /// Extended grace period within which pageserver may not respond to heartbeats. + /// This extended grace period kicks in after the node has been drained for restart + /// and/or upon handling the re-attach request from a node. + pub max_warming_up_interval: Duration, /// How many Reconcilers may be spawned concurrently pub reconciler_concurrency: usize, @@ -587,6 +596,9 @@ impl Service { online_nodes.insert(node_id, utilization); } PageserverState::Offline => {} + PageserverState::WarmingUp { .. } => { + unreachable!("Nodes are never marked warming-up during startup reconcile") + } } } } @@ -779,63 +791,54 @@ impl Service { let res = self.heartbeater.heartbeat(nodes).await; if let Ok(deltas) = res { for (node_id, state) in deltas.0 { - let (new_node, new_availability) = match state { - PageserverState::Available { - utilization, new, .. - } => ( - new, - NodeAvailability::Active(UtilizationScore( - utilization.utilization_score, - )), + let new_availability = match state { + PageserverState::Available { utilization, .. } => NodeAvailability::Active( + UtilizationScore(utilization.utilization_score), ), - PageserverState::Offline => (false, NodeAvailability::Offline), + PageserverState::WarmingUp { started_at } => { + NodeAvailability::WarmingUp(started_at) + } + PageserverState::Offline => { + // The node might have been placed in the WarmingUp state + // while the heartbeat round was on-going. Hence, filter out + // offline transitions for WarmingUp nodes that are still within + // their grace period. + if let Ok(NodeAvailability::WarmingUp(started_at)) = + self.get_node(node_id).await.map(|n| n.get_availability()) + { + let now = Instant::now(); + if now - started_at >= self.config.max_warming_up_interval { + NodeAvailability::Offline + } else { + NodeAvailability::WarmingUp(started_at) + } + } else { + NodeAvailability::Offline + } + } }; - if new_node { - // When the heartbeats detect a newly added node, we don't wish - // to attempt to reconcile the shards assigned to it. The node - // is likely handling it's re-attach response, so reconciling now - // would be counterproductive. - // - // Instead, update the in-memory state with the details learned about the - // node. - let mut locked = self.inner.write().unwrap(); - let (nodes, _tenants, scheduler) = locked.parts_mut(); + // This is the code path for geniune availability transitions (i.e node + // goes unavailable and/or comes back online). + let res = self + .node_configure(node_id, Some(new_availability), None) + .await; - let mut new_nodes = (**nodes).clone(); - - if let Some(node) = new_nodes.get_mut(&node_id) { - node.set_availability(new_availability); - scheduler.node_upsert(node); + match res { + Ok(()) => {} + Err(ApiError::NotFound(_)) => { + // This should be rare, but legitimate since the heartbeats are done + // on a snapshot of the nodes. + tracing::info!("Node {} was not found after heartbeat round", node_id); } - - locked.nodes = Arc::new(new_nodes); - } else { - // This is the code path for geniune availability transitions (i.e node - // goes unavailable and/or comes back online). - let res = self - .node_configure(node_id, Some(new_availability), None) - .await; - - match res { - Ok(()) => {} - Err(ApiError::NotFound(_)) => { - // This should be rare, but legitimate since the heartbeats are done - // on a snapshot of the nodes. - tracing::info!( - "Node {} was not found after heartbeat round", - node_id - ); - } - Err(err) => { - // Transition to active involves reconciling: if a node responds to a heartbeat then - // becomes unavailable again, we may get an error here. - tracing::error!( - "Failed to update node {} after heartbeat round: {}", - node_id, - err - ); - } + Err(err) => { + // Transition to active involves reconciling: if a node responds to a heartbeat then + // becomes unavailable again, we may get an error here. + tracing::error!( + "Failed to update node {} after heartbeat round: {}", + node_id, + err + ); } } } @@ -1152,7 +1155,8 @@ impl Service { let cancel = CancellationToken::new(); let heartbeater = Heartbeater::new( config.jwt_token.clone(), - config.max_unavailable_interval, + config.max_offline_interval, + config.max_warming_up_interval, cancel.clone(), ); let this = Arc::new(Self { @@ -1664,21 +1668,23 @@ impl Service { | NodeSchedulingPolicy::Filling ); - if !node.is_available() || reset_scheduling { - let mut new_nodes = (**nodes).clone(); - if let Some(node) = new_nodes.get_mut(&reattach_req.node_id) { - if !node.is_available() { - node.set_availability(NodeAvailability::Active(UtilizationScore::worst())); - } - - if reset_scheduling { - node.set_scheduling(NodeSchedulingPolicy::Active); - } - - scheduler.node_upsert(node); - let new_nodes = Arc::new(new_nodes); - *nodes = new_nodes; + let mut new_nodes = (**nodes).clone(); + if let Some(node) = new_nodes.get_mut(&reattach_req.node_id) { + if reset_scheduling { + node.set_scheduling(NodeSchedulingPolicy::Active); } + + tracing::info!("Marking {} warming-up on reattach", reattach_req.node_id); + node.set_availability(NodeAvailability::WarmingUp(std::time::Instant::now())); + + scheduler.node_upsert(node); + let new_nodes = Arc::new(new_nodes); + *nodes = new_nodes; + } else { + tracing::error!( + "Reattaching node {} was removed while processing the request", + reattach_req.node_id + ); } } @@ -4719,6 +4725,15 @@ impl Service { // TODO: in the background, we should balance work back onto this pageserver } + // No action required for the intermediate unavailable state. + // When we transition into active or offline from the unavailable state, + // the correct handling above will kick in. + AvailabilityTransition::ToWarmingUpFromActive => { + tracing::info!("Node {} transition to unavailable from active", node_id); + } + AvailabilityTransition::ToWarmingUpFromOffline => { + tracing::info!("Node {} transition to unavailable from offline", node_id); + } AvailabilityTransition::Unchanged => { tracing::debug!("Node {} no availability change during config", node_id); } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 9e39457c06..76ab46b01a 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2148,6 +2148,23 @@ class StorageControllerApiException(Exception): self.status_code = status_code +# See libs/pageserver_api/src/controller_api.rs +# for the rust definitions of the enums below +# TODO: Replace with `StrEnum` when we upgrade to python 3.11 +class PageserverAvailability(str, Enum): + ACTIVE = "Active" + UNAVAILABLE = "Unavailable" + OFFLINE = "Offline" + + +class PageserverSchedulingPolicy(str, Enum): + ACTIVE = "Active" + DRAINING = "Draining" + FILLING = "Filling" + PAUSE = "Pause" + PAUSE_FOR_RESTART = "PauseForRestart" + + class NeonStorageController(MetricsGetter, LogUtils): def __init__(self, env: NeonEnv, auth_enabled: bool): self.env = env @@ -2531,26 +2548,54 @@ class NeonStorageController(MetricsGetter, LogUtils): ) log.info("storage controller passed consistency check") + def node_registered(self, node_id: int) -> bool: + """ + Returns true if the storage controller can confirm + it knows of pageserver with 'node_id' + """ + try: + self.node_status(node_id) + except StorageControllerApiException as e: + if e.status_code == 404: + return False + else: + raise e + + return True + def poll_node_status( - self, node_id: int, desired_scheduling_policy: str, max_attempts: int, backoff: int + self, + node_id: int, + desired_availability: Optional[PageserverAvailability], + desired_scheduling_policy: Optional[PageserverSchedulingPolicy], + max_attempts: int, + backoff: int, ): """ - Poll the node status until it reaches 'desired_scheduling_policy' or 'max_attempts' have been exhausted + Poll the node status until it reaches 'desired_scheduling_policy' and 'desired_availability' + or 'max_attempts' have been exhausted """ - log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy") + log.info( + f"Polling {node_id} for {desired_scheduling_policy} scheduling policy and {desired_availability} availability" + ) while max_attempts > 0: try: status = self.node_status(node_id) policy = status["scheduling"] - if policy == desired_scheduling_policy: + availability = status["availability"] + if (desired_scheduling_policy is None or policy == desired_scheduling_policy) and ( + desired_availability is None or availability == desired_availability + ): return else: max_attempts -= 1 - log.info(f"Status call returned {policy=} ({max_attempts} attempts left)") + log.info( + f"Status call returned {policy=} {availability=} ({max_attempts} attempts left)" + ) if max_attempts == 0: raise AssertionError( - f"Status for {node_id=} did not reach {desired_scheduling_policy=}" + f"Status for {node_id=} did not reach {desired_scheduling_policy=} {desired_availability=}" ) time.sleep(backoff) @@ -2694,6 +2739,14 @@ class NeonPageserver(PgProtocol, LogUtils): self.id, extra_env_vars=extra_env_vars, timeout_in_seconds=timeout_in_seconds ) self.running = True + + if self.env.storage_controller.running and self.env.storage_controller.node_registered( + self.id + ): + self.env.storage_controller.poll_node_status( + self.id, PageserverAvailability.ACTIVE, None, max_attempts=20, backoff=1 + ) + return self def stop(self, immediate: bool = False) -> "NeonPageserver": diff --git a/test_runner/performance/test_storage_controller_scale.py b/test_runner/performance/test_storage_controller_scale.py index 3a6113706f..281c9271e9 100644 --- a/test_runner/performance/test_storage_controller_scale.py +++ b/test_runner/performance/test_storage_controller_scale.py @@ -8,7 +8,12 @@ import pytest from fixtures.common_types import TenantId, TenantShardId, TimelineId from fixtures.compute_reconfigure import ComputeReconfigure from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder +from fixtures.neon_fixtures import ( + NeonEnv, + NeonEnvBuilder, + PageserverAvailability, + PageserverSchedulingPolicy, +) from fixtures.pageserver.http import PageserverHttpClient from fixtures.pg_version import PgVersion @@ -106,7 +111,8 @@ def test_storage_controller_many_tenants( # Default neon_local uses a small timeout: use a longer one to tolerate longer pageserver restarts. # TODO: tune this down as restarts get faster (https://github.com/neondatabase/neon/pull/7553), to # guard against regressions in restart time. - "max_unavailable": "300s" + "max_offline": "30s", + "max_warming_up": "300s", } neon_env_builder.control_plane_compute_hook_api = ( compute_reconfigure_listener.control_plane_compute_hook_api @@ -274,7 +280,11 @@ def test_storage_controller_many_tenants( ) env.storage_controller.poll_node_status( - ps.id, "PauseForRestart", max_attempts=24, backoff=5 + ps.id, + PageserverAvailability.ACTIVE, + PageserverSchedulingPolicy.PAUSE_FOR_RESTART, + max_attempts=24, + backoff=5, ) shard_counts = get_consistent_node_shard_counts(env, total_shards) @@ -285,12 +295,24 @@ def test_storage_controller_many_tenants( assert sum(shard_counts.values()) == total_shards ps.restart() - env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=24, backoff=1) + env.storage_controller.poll_node_status( + ps.id, + PageserverAvailability.ACTIVE, + PageserverSchedulingPolicy.ACTIVE, + max_attempts=24, + backoff=1, + ) env.storage_controller.retryable_node_operation( lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2 ) - env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=24, backoff=5) + env.storage_controller.poll_node_status( + ps.id, + PageserverAvailability.ACTIVE, + PageserverSchedulingPolicy.ACTIVE, + max_attempts=24, + backoff=5, + ) shard_counts = get_consistent_node_shard_counts(env, total_shards) log.info(f"Shard counts after filling node {ps.id}: {shard_counts}") diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index 041942cda3..8941ddd281 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -596,19 +596,26 @@ def test_multi_attach( for ps in pageservers: ps.stop() - # Returning to a normal healthy state: all pageservers will start, but only the one most - # recently attached via the control plane will re-attach on startup + # Returning to a normal healthy state: all pageservers will start for ps in pageservers: ps.start() - with pytest.raises(PageserverApiException): - _detail = http_clients[0].timeline_detail(tenant_id, timeline_id) - with pytest.raises(PageserverApiException): - _detail = http_clients[1].timeline_detail(tenant_id, timeline_id) - _detail = http_clients[2].timeline_detail(tenant_id, timeline_id) + # Pageservers are marked offline by the storage controller during the rolling restart + # above. This may trigger a reschedulling, so there's no guarantee that the tenant + # shard ends up attached to the most recent ps. + raised = 0 + serving_ps_idx = None + for idx, http_client in enumerate(http_clients): + try: + _detail = http_client.timeline_detail(tenant_id, timeline_id) + serving_ps_idx = idx + except PageserverApiException: + raised += 1 + + assert raised == 2 and serving_ps_idx is not None # All data we wrote while multi-attached remains readable - workload.validate(pageservers[2].id) + workload.validate(pageservers[serving_ps_idx].id) def test_upgrade_generationless_local_file_paths( diff --git a/test_runner/regress/test_pageserver_restart.py b/test_runner/regress/test_pageserver_restart.py index 4ce53df214..dccc1264e3 100644 --- a/test_runner/regress/test_pageserver_restart.py +++ b/test_runner/regress/test_pageserver_restart.py @@ -15,6 +15,10 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder): neon_env_builder.enable_pageserver_remote_storage(s3_storage()) neon_env_builder.enable_scrub_on_exit() + # We inject a delay of 15 seconds for tenant activation below. + # Hence, bump the max delay here to not skip over the activation. + neon_env_builder.pageserver_config_override = 'background_task_maximum_delay="20s"' + env = neon_env_builder.init_start() endpoint = env.endpoints.create_start("main") @@ -70,7 +74,7 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder): # pageserver does if a compute node connects and sends a request for the tenant # while it's still in Loading state. (It waits for the loading to finish, and then # processes the request.) - tenant_load_delay_ms = 5000 + tenant_load_delay_ms = 15000 env.pageserver.stop() env.pageserver.start( extra_env_vars={"FAILPOINTS": f"before-attaching-tenant=return({tenant_load_delay_ms})"} diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index 58d61eab0d..f43141c2d8 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -122,7 +122,12 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver, "scheduling": "Stop", }, ) - env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy Stop.*") + env.storage_controller.allowed_errors.extend( + [ + ".*Scheduling is disabled by policy Stop.*", + ".*Skipping reconcile for policy Stop.*", + ] + ) # We use a fixed seed to make the test reproducible: we want a randomly # chosen order, but not to change the order every time we run the test. diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 90c6e26d01..9c45af7c1b 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -207,7 +207,8 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint: neon_env_builder.storage_controller_config = { # Default neon_local uses a small timeout: use a longer one to tolerate longer pageserver restarts. - "max_unavailable": "300s" + "max_offline": "30s", + "max_warming_up": "300s", } env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF) diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 741f16685e..9a47d7d651 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -12,6 +12,8 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, + PageserverAvailability, + PageserverSchedulingPolicy, PgBin, StorageControllerApiException, TokenScope, @@ -918,6 +920,8 @@ def test_storage_controller_tenant_deletion( class Failure: pageserver_id: int + offline_timeout: int + must_detect_after: int def apply(self, env: NeonEnv): raise NotImplementedError() @@ -930,9 +934,11 @@ class Failure: class NodeStop(Failure): - def __init__(self, pageserver_ids, immediate): + def __init__(self, pageserver_ids, immediate, offline_timeout, must_detect_after): self.pageserver_ids = pageserver_ids self.immediate = immediate + self.offline_timeout = offline_timeout + self.must_detect_after = must_detect_after def apply(self, env: NeonEnv): for ps_id in self.pageserver_ids: @@ -948,10 +954,42 @@ class NodeStop(Failure): return self.pageserver_ids +class NodeRestartWithSlowReattach(Failure): + def __init__(self, pageserver_id, offline_timeout, must_detect_after): + self.pageserver_id = pageserver_id + self.offline_timeout = offline_timeout + self.must_detect_after = must_detect_after + self.thread = None + + def apply(self, env: NeonEnv): + pageserver = env.get_pageserver(self.pageserver_id) + pageserver.stop(immediate=False) + + def start_ps(): + pageserver.start( + extra_env_vars={"FAILPOINTS": "control-plane-client-re-attach=return(30000)"} + ) + + self.thread = threading.Thread(target=start_ps) + self.thread.start() + + def clear(self, env: NeonEnv): + if self.thread is not None: + self.thread.join() + + pageserver = env.get_pageserver(self.pageserver_id) + pageserver.http_client().configure_failpoints(("control-plane-client-re-attach", "off")) + + def nodes(self): + return [self.pageserver_id] + + class PageserverFailpoint(Failure): - def __init__(self, failpoint, pageserver_id): + def __init__(self, failpoint, pageserver_id, offline_timeout, must_detect_after): self.failpoint = failpoint self.pageserver_id = pageserver_id + self.offline_timeout = offline_timeout + self.must_detect_after = must_detect_after def apply(self, env: NeonEnv): pageserver = env.get_pageserver(self.pageserver_id) @@ -987,15 +1025,28 @@ def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]: @pytest.mark.parametrize( "failure", [ - NodeStop(pageserver_ids=[1], immediate=False), - NodeStop(pageserver_ids=[1], immediate=True), - NodeStop(pageserver_ids=[1, 2], immediate=True), - PageserverFailpoint(pageserver_id=1, failpoint="get-utilization-http-handler"), + NodeStop(pageserver_ids=[1], immediate=False, offline_timeout=20, must_detect_after=5), + NodeStop(pageserver_ids=[1], immediate=True, offline_timeout=20, must_detect_after=5), + NodeStop(pageserver_ids=[1, 2], immediate=True, offline_timeout=20, must_detect_after=5), + PageserverFailpoint( + pageserver_id=1, + failpoint="get-utilization-http-handler", + offline_timeout=20, + must_detect_after=5, + ), + # Instrument a scenario where the node is slow to re-attach. The re-attach request itself + # should serve as a signal to the storage controller to use a more lenient heartbeat timeout. + NodeRestartWithSlowReattach(pageserver_id=1, offline_timeout=60, must_detect_after=15), ], ) def test_storage_controller_heartbeats( neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, failure: Failure ): + neon_env_builder.storage_controller_config = { + "max_offline": "10s", + "max_warming_up": "20s", + } + neon_env_builder.num_pageservers = 2 env = neon_env_builder.init_configs() env.start() @@ -1061,9 +1112,12 @@ def test_storage_controller_heartbeats( if node["id"] in offline_node_ids: assert node["availability"] == "Offline" - # A node is considered offline if the last successful heartbeat - # was more than 10 seconds ago (hardcoded in the storage controller). - wait_until(20, 1, nodes_offline) + start = time.time() + wait_until(failure.offline_timeout, 1, nodes_offline) + detected_after = time.time() - start + log.info(f"Detected node failures after {detected_after}s") + + assert detected_after >= failure.must_detect_after # .. expecting the tenant on the offline node to be migrated def tenant_migrated(): @@ -1546,7 +1600,13 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder): env.storage_controller.retryable_node_operation( lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2 ) - env.storage_controller.poll_node_status(ps.id, "PauseForRestart", max_attempts=6, backoff=5) + env.storage_controller.poll_node_status( + ps.id, + PageserverAvailability.ACTIVE, + PageserverSchedulingPolicy.PAUSE_FOR_RESTART, + max_attempts=6, + backoff=5, + ) shard_counts = get_node_shard_counts(env, tenant_ids) log.info(f"Shard counts after draining node {ps.id}: {shard_counts}") @@ -1556,12 +1616,24 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder): assert sum(shard_counts.values()) == total_shards ps.restart() - env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=10, backoff=1) + env.storage_controller.poll_node_status( + ps.id, + PageserverAvailability.ACTIVE, + PageserverSchedulingPolicy.ACTIVE, + max_attempts=10, + backoff=1, + ) env.storage_controller.retryable_node_operation( lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2 ) - env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=6, backoff=5) + env.storage_controller.poll_node_status( + ps.id, + PageserverAvailability.ACTIVE, + PageserverSchedulingPolicy.ACTIVE, + max_attempts=6, + backoff=5, + ) shard_counts = get_node_shard_counts(env, tenant_ids) log.info(f"Shard counts after filling node {ps.id}: {shard_counts}") @@ -1606,11 +1678,23 @@ def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder): backoff=2, ) - env.storage_controller.poll_node_status(ps_id_to_drain, "Draining", max_attempts=6, backoff=2) + env.storage_controller.poll_node_status( + ps_id_to_drain, + PageserverAvailability.ACTIVE, + PageserverSchedulingPolicy.DRAINING, + max_attempts=6, + backoff=2, + ) env.storage_controller.cancel_node_drain(ps_id_to_drain) - env.storage_controller.poll_node_status(ps_id_to_drain, "Active", max_attempts=6, backoff=2) + env.storage_controller.poll_node_status( + ps_id_to_drain, + PageserverAvailability.ACTIVE, + PageserverSchedulingPolicy.ACTIVE, + max_attempts=6, + backoff=2, + ) @pytest.mark.parametrize("while_offline", [True, False]) diff --git a/test_runner/regress/test_threshold_based_eviction.py b/test_runner/regress/test_threshold_based_eviction.py index b62398d427..840c7159ad 100644 --- a/test_runner/regress/test_threshold_based_eviction.py +++ b/test_runner/regress/test_threshold_based_eviction.py @@ -48,13 +48,12 @@ def test_threshold_based_eviction( tenant_id, timeline_id = env.initial_tenant, env.initial_timeline ps_http = env.pageserver.http_client() - assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == { - "kind": "NoEviction" - } + vps_http = env.storage_controller.pageserver_api() + assert vps_http.tenant_config(tenant_id).effective_config["eviction_policy"] is None eviction_threshold = 10 eviction_period = 2 - ps_http.set_tenant_config( + vps_http.set_tenant_config( tenant_id, { "eviction_policy": { @@ -64,7 +63,7 @@ def test_threshold_based_eviction( }, }, ) - assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == { + assert vps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == { "kind": "LayerAccessThreshold", "threshold": f"{eviction_threshold}s", "period": f"{eviction_period}s", @@ -73,7 +72,7 @@ def test_threshold_based_eviction( # restart because changing tenant config is not instant env.pageserver.restart() - assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == { + assert vps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == { "kind": "LayerAccessThreshold", "threshold": f"{eviction_threshold}s", "period": f"{eviction_period}s", @@ -81,7 +80,7 @@ def test_threshold_based_eviction( # create a bunch of L1s, only the least of which will need to be resident compaction_threshold = 3 # create L1 layers quickly - ps_http.patch_tenant_config_client_side( + vps_http.patch_tenant_config_client_side( tenant_id, inserts={ # Disable gc and compaction to avoid on-demand downloads from their side. @@ -154,7 +153,7 @@ def test_threshold_based_eviction( while time.time() - started_waiting_at < observation_window: current = ( time.time(), - MapInfoProjection(ps_http.layer_map_info(tenant_id, timeline_id)), + MapInfoProjection(vps_http.layer_map_info(tenant_id, timeline_id)), ) last = map_info_changes[-1] if map_info_changes else (0, None) if last[1] is None or current[1] != last[1]: