storcon: make heartbeats restart aware (#8222)

## Problem
Re-attach blocks the pageserver http server from starting up. Hence, it
can't reply to heartbeats
until that's done. This makes the storage controller mark the node
off-line (not good). We worked
around this by setting the interval after which nodes are marked offline
to 5 minutes. This isn't a
long term solution.

## Summary of changes
* Introduce a new `NodeAvailability` state: `WarmingUp`. This state
models the following time interval:
* From receiving the re-attach request until the pageserver replies to
the first heartbeat post re-attach
* The heartbeat delta generator becomes aware of this state and uses a
separate longer interval
* Flag `max-warming-up-interval` now models the longer timeout and
`max-offline-interval` the shorter one to
match the names of the states

Closes https://github.com/neondatabase/neon/issues/7552
This commit is contained in:
Vlad Lazar
2024-07-25 14:09:12 +01:00
committed by GitHub
parent f76a4e0ad2
commit 9c5ad21341
17 changed files with 508 additions and 179 deletions

View File

@@ -21,7 +21,9 @@ use pageserver_api::config::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
};
use pageserver_api::controller_api::{PlacementPolicy, TenantCreateRequest};
use pageserver_api::controller_api::{
NodeAvailabilityWrapper, PlacementPolicy, TenantCreateRequest,
};
use pageserver_api::models::{ShardParameters, TimelineCreateRequest, TimelineInfo};
use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId};
use postgres_backend::AuthType;
@@ -1250,9 +1252,70 @@ async fn handle_start_all(
exit(1);
}
}
neon_start_status_check(env, retry_timeout).await?;
Ok(())
}
async fn neon_start_status_check(
env: &local_env::LocalEnv,
retry_timeout: &Duration,
) -> anyhow::Result<()> {
const RETRY_INTERVAL: Duration = Duration::from_millis(100);
const NOTICE_AFTER_RETRIES: Duration = Duration::from_secs(5);
if env.control_plane_api.is_none() {
return Ok(());
}
let storcon = StorageController::from_env(env);
let retries = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();
let notice_after_retries = retry_timeout.as_millis() / NOTICE_AFTER_RETRIES.as_millis();
println!("\nRunning neon status check");
for retry in 0..retries {
if retry == notice_after_retries {
println!("\nNeon status check has not passed yet, continuing to wait")
}
let mut passed = true;
let mut nodes = storcon.node_list().await?;
let mut pageservers = env.pageservers.clone();
if nodes.len() != pageservers.len() {
continue;
}
nodes.sort_by_key(|ps| ps.id);
pageservers.sort_by_key(|ps| ps.id);
for (idx, pageserver) in pageservers.iter().enumerate() {
let node = &nodes[idx];
if node.id != pageserver.id {
passed = false;
break;
}
if !matches!(node.availability, NodeAvailabilityWrapper::Active) {
passed = false;
break;
}
}
if passed {
println!("\nNeon started and passed status check");
return Ok(());
}
tokio::time::sleep(RETRY_INTERVAL).await;
}
anyhow::bail!("\nNeon passed status check")
}
async fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let immediate =
sub_match.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");

View File

@@ -151,7 +151,10 @@ pub struct NeonBroker {
pub struct NeonStorageControllerConf {
/// Heartbeat timeout before marking a node offline
#[serde(with = "humantime_serde")]
pub max_unavailable: Duration,
pub max_offline: Duration,
#[serde(with = "humantime_serde")]
pub max_warming_up: Duration,
/// Threshold for auto-splitting a tenant into shards
pub split_threshold: Option<u64>,
@@ -159,14 +162,16 @@ pub struct NeonStorageControllerConf {
impl NeonStorageControllerConf {
// Use a shorter pageserver unavailability interval than the default to speed up tests.
const DEFAULT_MAX_UNAVAILABLE_INTERVAL: std::time::Duration =
std::time::Duration::from_secs(10);
const DEFAULT_MAX_OFFLINE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
const DEFAULT_MAX_WARMING_UP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
}
impl Default for NeonStorageControllerConf {
fn default() -> Self {
Self {
max_unavailable: Self::DEFAULT_MAX_UNAVAILABLE_INTERVAL,
max_offline: Self::DEFAULT_MAX_OFFLINE_INTERVAL,
max_warming_up: Self::DEFAULT_MAX_WARMING_UP_INTERVAL,
split_threshold: None,
}
}

View File

@@ -5,8 +5,9 @@ use crate::{
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::{
controller_api::{
NodeConfigureRequest, NodeRegisterRequest, TenantCreateRequest, TenantCreateResponse,
TenantLocateResponse, TenantShardMigrateRequest, TenantShardMigrateResponse,
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
TenantCreateResponse, TenantLocateResponse, TenantShardMigrateRequest,
TenantShardMigrateResponse,
},
models::{
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
@@ -353,8 +354,10 @@ impl StorageController {
"--dev",
"--database-url",
&database_url,
"--max-unavailable-interval",
&humantime::Duration::from(self.config.max_unavailable).to_string(),
"--max-offline-interval",
&humantime::Duration::from(self.config.max_offline).to_string(),
"--max-warming-up-interval",
&humantime::Duration::from(self.config.max_warming_up).to_string(),
]
.into_iter()
.map(|s| s.to_string())
@@ -625,6 +628,15 @@ impl StorageController {
.await
}
pub async fn node_list(&self) -> anyhow::Result<Vec<NodeDescribeResponse>> {
self.dispatch::<(), Vec<NodeDescribeResponse>>(
Method::GET,
"control/v1/node".to_string(),
None,
)
.await
}
#[instrument(skip(self))]
pub async fn ready(&self) -> anyhow::Result<()> {
self.dispatch::<(), ()>(Method::GET, "ready".to_string(), None)

View File

@@ -1,4 +1,5 @@
use std::str::FromStr;
use std::time::Instant;
/// Request/response types for the storage controller
/// API (`/control/v1` prefix). Implemented by the server
@@ -150,11 +151,16 @@ impl UtilizationScore {
}
}
#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
#[derive(Serialize, Clone, Copy, Debug)]
#[serde(into = "NodeAvailabilityWrapper")]
pub enum NodeAvailability {
// Normal, happy state
Active(UtilizationScore),
// Node is warming up, but we expect it to become available soon. Covers
// the time span between the re-attach response being composed on the storage controller
// and the first successful heartbeat after the processing of the re-attach response
// finishes on the pageserver.
WarmingUp(Instant),
// Offline: Tenants shouldn't try to attach here, but they may assume that their
// secondary locations on this node still exist. Newly added nodes are in this
// state until we successfully contact them.
@@ -164,7 +170,10 @@ pub enum NodeAvailability {
impl PartialEq for NodeAvailability {
fn eq(&self, other: &Self) -> bool {
use NodeAvailability::*;
matches!((self, other), (Active(_), Active(_)) | (Offline, Offline))
matches!(
(self, other),
(Active(_), Active(_)) | (Offline, Offline) | (WarmingUp(_), WarmingUp(_))
)
}
}
@@ -176,6 +185,7 @@ impl Eq for NodeAvailability {}
#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
pub enum NodeAvailabilityWrapper {
Active,
WarmingUp,
Offline,
}
@@ -185,6 +195,7 @@ impl From<NodeAvailabilityWrapper> for NodeAvailability {
// Assume the worst utilisation score to begin with. It will later be updated by
// the heartbeats.
NodeAvailabilityWrapper::Active => NodeAvailability::Active(UtilizationScore::worst()),
NodeAvailabilityWrapper::WarmingUp => NodeAvailability::WarmingUp(Instant::now()),
NodeAvailabilityWrapper::Offline => NodeAvailability::Offline,
}
}
@@ -194,6 +205,7 @@ impl From<NodeAvailability> for NodeAvailabilityWrapper {
fn from(val: NodeAvailability) -> Self {
match val {
NodeAvailability::Active(_) => NodeAvailabilityWrapper::Active,
NodeAvailability::WarmingUp(_) => NodeAvailabilityWrapper::WarmingUp,
NodeAvailability::Offline => NodeAvailabilityWrapper::Offline,
}
}

View File

@@ -171,14 +171,14 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
register,
};
fail::fail_point!("control-plane-client-re-attach");
let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
tracing::info!(
"Received re-attach response with {} tenants",
response.tenants.len()
);
failpoint_support::sleep_millis_async!("control-plane-client-re-attach");
Ok(response
.tenants
.into_iter()

View File

@@ -22,7 +22,8 @@ struct HeartbeaterTask {
state: HashMap<NodeId, PageserverState>,
max_unavailable_interval: Duration,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
jwt_token: Option<String>,
}
@@ -31,7 +32,9 @@ pub(crate) enum PageserverState {
Available {
last_seen_at: Instant,
utilization: PageserverUtilization,
new: bool,
},
WarmingUp {
started_at: Instant,
},
Offline,
}
@@ -57,12 +60,18 @@ pub(crate) struct Heartbeater {
impl Heartbeater {
pub(crate) fn new(
jwt_token: Option<String>,
max_unavailable_interval: Duration,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
cancel: CancellationToken,
) -> Self {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest>();
let mut heartbeater =
HeartbeaterTask::new(receiver, jwt_token, max_unavailable_interval, cancel);
let mut heartbeater = HeartbeaterTask::new(
receiver,
jwt_token,
max_offline_interval,
max_warming_up_interval,
cancel,
);
tokio::task::spawn(async move { heartbeater.run().await });
Self { sender }
@@ -88,14 +97,16 @@ impl HeartbeaterTask {
fn new(
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
jwt_token: Option<String>,
max_unavailable_interval: Duration,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
cancel: CancellationToken,
) -> Self {
Self {
receiver,
cancel,
state: HashMap::new(),
max_unavailable_interval,
max_offline_interval,
max_warming_up_interval,
jwt_token,
}
}
@@ -128,16 +139,15 @@ impl HeartbeaterTask {
heartbeat_futs.push({
let jwt_token = self.jwt_token.clone();
let cancel = self.cancel.clone();
let new_node = !self.state.contains_key(node_id);
// Clone the node and mark it as available such that the request
// goes through to the pageserver even when the node is marked offline.
// This doesn't impact the availability observed by [`crate::service::Service`].
let mut node = node.clone();
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
let mut node_clone = node.clone();
node_clone.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
async move {
let response = node
let response = node_clone
.with_client_retries(
|client| async move { client.get_utilization().await },
&jwt_token,
@@ -161,7 +171,12 @@ impl HeartbeaterTask {
PageserverState::Available {
last_seen_at: Instant::now(),
utilization,
new: new_node,
}
} else if let NodeAvailability::WarmingUp(last_seen_at) =
node.get_availability()
{
PageserverState::WarmingUp {
started_at: last_seen_at,
}
} else {
PageserverState::Offline
@@ -187,53 +202,67 @@ impl HeartbeaterTask {
}
}
}
let mut warming_up = 0;
let mut offline = 0;
for state in new_state.values() {
match state {
PageserverState::WarmingUp { .. } => {
warming_up += 1;
}
PageserverState::Offline { .. } => offline += 1,
PageserverState::Available { .. } => {}
}
}
tracing::info!(
"Heartbeat round complete for {} nodes, {} offline",
"Heartbeat round complete for {} nodes, {} warming-up, {} offline",
new_state.len(),
new_state
.values()
.filter(|s| match s {
PageserverState::Available { .. } => {
false
}
PageserverState::Offline => true,
})
.count()
warming_up,
offline
);
let mut deltas = Vec::new();
let now = Instant::now();
for (node_id, ps_state) in new_state {
for (node_id, ps_state) in new_state.iter_mut() {
use std::collections::hash_map::Entry::*;
let entry = self.state.entry(node_id);
let entry = self.state.entry(*node_id);
let mut needs_update = false;
match entry {
Occupied(ref occ) => match (occ.get(), &ps_state) {
(PageserverState::Offline, PageserverState::Offline) => {}
(PageserverState::Available { last_seen_at, .. }, PageserverState::Offline) => {
if now - *last_seen_at >= self.max_unavailable_interval {
deltas.push((node_id, ps_state.clone()));
if now - *last_seen_at >= self.max_offline_interval {
deltas.push((*node_id, ps_state.clone()));
needs_update = true;
}
}
(_, PageserverState::WarmingUp { started_at }) => {
if now - *started_at >= self.max_warming_up_interval {
*ps_state = PageserverState::Offline;
}
deltas.push((*node_id, ps_state.clone()));
needs_update = true;
}
_ => {
deltas.push((node_id, ps_state.clone()));
deltas.push((*node_id, ps_state.clone()));
needs_update = true;
}
},
Vacant(_) => {
// This is a new node. Don't generate a delta for it.
deltas.push((node_id, ps_state.clone()));
deltas.push((*node_id, ps_state.clone()));
}
}
match entry {
Occupied(mut occ) if needs_update => {
(*occ.get_mut()) = ps_state;
(*occ.get_mut()) = ps_state.clone();
}
Vacant(vac) => {
vac.insert(ps_state);
vac.insert(ps_state.clone());
}
_ => {}
}

View File

@@ -10,7 +10,8 @@ use storage_controller::http::make_router;
use storage_controller::metrics::preinitialize_metrics;
use storage_controller::persistence::Persistence;
use storage_controller::service::{
Config, Service, MAX_UNAVAILABLE_INTERVAL_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
Config, Service, MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT,
RECONCILER_CONCURRENCY_DEFAULT,
};
use tokio::signal::unix::SignalKind;
use tokio_util::sync::CancellationToken;
@@ -61,7 +62,12 @@ struct Cli {
/// Grace period before marking unresponsive pageserver offline
#[arg(long)]
max_unavailable_interval: Option<humantime::Duration>,
max_offline_interval: Option<humantime::Duration>,
/// More tolerant grace period before marking unresponsive pagserver offline used
/// around pageserver restarts
#[arg(long)]
max_warming_up_interval: Option<humantime::Duration>,
/// Size threshold for automatically splitting shards (disabled by default)
#[arg(long)]
@@ -254,10 +260,14 @@ async fn async_main() -> anyhow::Result<()> {
jwt_token: secrets.jwt_token,
control_plane_jwt_token: secrets.control_plane_jwt_token,
compute_hook_url: args.compute_hook_url,
max_unavailable_interval: args
.max_unavailable_interval
max_offline_interval: args
.max_offline_interval
.map(humantime::Duration::into)
.unwrap_or(MAX_UNAVAILABLE_INTERVAL_DEFAULT),
.unwrap_or(MAX_OFFLINE_INTERVAL_DEFAULT),
max_warming_up_interval: args
.max_warming_up_interval
.map(humantime::Duration::into)
.unwrap_or(MAX_WARMING_UP_INTERVAL_DEFAULT),
reconciler_concurrency: args
.reconciler_concurrency
.unwrap_or(RECONCILER_CONCURRENCY_DEFAULT),

View File

@@ -3,7 +3,7 @@ use std::{str::FromStr, time::Duration};
use pageserver_api::{
controller_api::{
NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
TenantLocateResponseShard, UtilizationScore,
TenantLocateResponseShard,
},
shard::TenantShardId,
};
@@ -46,6 +46,8 @@ pub(crate) struct Node {
/// whether/how they changed it.
pub(crate) enum AvailabilityTransition {
ToActive,
ToWarmingUpFromActive,
ToWarmingUpFromOffline,
ToOffline,
Unchanged,
}
@@ -90,22 +92,34 @@ impl Node {
}
}
pub(crate) fn get_availability(&self) -> NodeAvailability {
self.availability
}
pub(crate) fn set_availability(&mut self, availability: NodeAvailability) {
use AvailabilityTransition::*;
use NodeAvailability::WarmingUp;
match self.get_availability_transition(availability) {
AvailabilityTransition::ToActive => {
ToActive => {
// Give the node a new cancellation token, effectively resetting it to un-cancelled. Any
// users of previously-cloned copies of the node will still see the old cancellation
// state. For example, Reconcilers in flight will have to complete and be spawned
// again to realize that the node has become available.
self.cancel = CancellationToken::new();
}
AvailabilityTransition::ToOffline => {
ToOffline | ToWarmingUpFromActive => {
// Fire the node's cancellation token to cancel any in-flight API requests to it
self.cancel.cancel();
}
AvailabilityTransition::Unchanged => {}
Unchanged | ToWarmingUpFromOffline => {}
}
if let (WarmingUp(crnt), WarmingUp(proposed)) = (self.availability, availability) {
self.availability = WarmingUp(std::cmp::max(crnt, proposed));
} else {
self.availability = availability;
}
self.availability = availability;
}
/// Without modifying the availability of the node, convert the intended availability
@@ -120,16 +134,10 @@ impl Node {
match (self.availability, availability) {
(Offline, Active(_)) => ToActive,
(Active(_), Offline) => ToOffline,
// Consider the case when the storage controller handles the re-attach of a node
// before the heartbeats detect that the node is back online. We still need
// [`Service::node_configure`] to attempt reconciliations for shards with an
// unknown observed location.
// The unsavoury match arm below handles this situation.
(Active(lhs), Active(rhs))
if lhs == UtilizationScore::worst() && rhs < UtilizationScore::worst() =>
{
ToActive
}
(Active(_), WarmingUp(_)) => ToWarmingUpFromActive,
(WarmingUp(_), Offline) => ToOffline,
(WarmingUp(_), Active(_)) => ToActive,
(Offline, WarmingUp(_)) => ToWarmingUpFromOffline,
_ => Unchanged,
}
}
@@ -147,7 +155,7 @@ impl Node {
pub(crate) fn may_schedule(&self) -> MaySchedule {
let score = match self.availability {
NodeAvailability::Active(score) => score,
NodeAvailability::Offline => return MaySchedule::No,
NodeAvailability::Offline | NodeAvailability::WarmingUp(_) => return MaySchedule::No,
};
match self.scheduling {

View File

@@ -100,9 +100,13 @@ pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
/// How long a node may be unresponsive to heartbeats before we declare it offline.
/// This must be long enough to cover node restarts as well as normal operations: in future
/// it should be separated into distinct timeouts for startup vs. normal operation
/// (`<https://github.com/neondatabase/neon/issues/7552>`)
pub const MAX_UNAVAILABLE_INTERVAL_DEFAULT: Duration = Duration::from_secs(300);
pub const MAX_OFFLINE_INTERVAL_DEFAULT: Duration = Duration::from_secs(30);
/// How long a node may be unresponsive to heartbeats during start up before we declare it
/// offline. This is much more lenient than [`MAX_OFFLINE_INTERVAL_DEFAULT`] since the pageserver's
/// handling of the re-attach response may take a long time and blocks heartbeats from
/// being handled on the pageserver side.
pub const MAX_WARMING_UP_INTERVAL_DEFAULT: Duration = Duration::from_secs(300);
#[derive(Clone, strum_macros::Display)]
enum TenantOperations {
@@ -236,7 +240,12 @@ pub struct Config {
/// Grace period within which a pageserver does not respond to heartbeats, but is still
/// considered active. Once the grace period elapses, the next heartbeat failure will
/// mark the pagseserver offline.
pub max_unavailable_interval: Duration,
pub max_offline_interval: Duration,
/// Extended grace period within which pageserver may not respond to heartbeats.
/// This extended grace period kicks in after the node has been drained for restart
/// and/or upon handling the re-attach request from a node.
pub max_warming_up_interval: Duration,
/// How many Reconcilers may be spawned concurrently
pub reconciler_concurrency: usize,
@@ -587,6 +596,9 @@ impl Service {
online_nodes.insert(node_id, utilization);
}
PageserverState::Offline => {}
PageserverState::WarmingUp { .. } => {
unreachable!("Nodes are never marked warming-up during startup reconcile")
}
}
}
}
@@ -779,63 +791,54 @@ impl Service {
let res = self.heartbeater.heartbeat(nodes).await;
if let Ok(deltas) = res {
for (node_id, state) in deltas.0 {
let (new_node, new_availability) = match state {
PageserverState::Available {
utilization, new, ..
} => (
new,
NodeAvailability::Active(UtilizationScore(
utilization.utilization_score,
)),
let new_availability = match state {
PageserverState::Available { utilization, .. } => NodeAvailability::Active(
UtilizationScore(utilization.utilization_score),
),
PageserverState::Offline => (false, NodeAvailability::Offline),
PageserverState::WarmingUp { started_at } => {
NodeAvailability::WarmingUp(started_at)
}
PageserverState::Offline => {
// The node might have been placed in the WarmingUp state
// while the heartbeat round was on-going. Hence, filter out
// offline transitions for WarmingUp nodes that are still within
// their grace period.
if let Ok(NodeAvailability::WarmingUp(started_at)) =
self.get_node(node_id).await.map(|n| n.get_availability())
{
let now = Instant::now();
if now - started_at >= self.config.max_warming_up_interval {
NodeAvailability::Offline
} else {
NodeAvailability::WarmingUp(started_at)
}
} else {
NodeAvailability::Offline
}
}
};
if new_node {
// When the heartbeats detect a newly added node, we don't wish
// to attempt to reconcile the shards assigned to it. The node
// is likely handling it's re-attach response, so reconciling now
// would be counterproductive.
//
// Instead, update the in-memory state with the details learned about the
// node.
let mut locked = self.inner.write().unwrap();
let (nodes, _tenants, scheduler) = locked.parts_mut();
// This is the code path for geniune availability transitions (i.e node
// goes unavailable and/or comes back online).
let res = self
.node_configure(node_id, Some(new_availability), None)
.await;
let mut new_nodes = (**nodes).clone();
if let Some(node) = new_nodes.get_mut(&node_id) {
node.set_availability(new_availability);
scheduler.node_upsert(node);
match res {
Ok(()) => {}
Err(ApiError::NotFound(_)) => {
// This should be rare, but legitimate since the heartbeats are done
// on a snapshot of the nodes.
tracing::info!("Node {} was not found after heartbeat round", node_id);
}
locked.nodes = Arc::new(new_nodes);
} else {
// This is the code path for geniune availability transitions (i.e node
// goes unavailable and/or comes back online).
let res = self
.node_configure(node_id, Some(new_availability), None)
.await;
match res {
Ok(()) => {}
Err(ApiError::NotFound(_)) => {
// This should be rare, but legitimate since the heartbeats are done
// on a snapshot of the nodes.
tracing::info!(
"Node {} was not found after heartbeat round",
node_id
);
}
Err(err) => {
// Transition to active involves reconciling: if a node responds to a heartbeat then
// becomes unavailable again, we may get an error here.
tracing::error!(
"Failed to update node {} after heartbeat round: {}",
node_id,
err
);
}
Err(err) => {
// Transition to active involves reconciling: if a node responds to a heartbeat then
// becomes unavailable again, we may get an error here.
tracing::error!(
"Failed to update node {} after heartbeat round: {}",
node_id,
err
);
}
}
}
@@ -1152,7 +1155,8 @@ impl Service {
let cancel = CancellationToken::new();
let heartbeater = Heartbeater::new(
config.jwt_token.clone(),
config.max_unavailable_interval,
config.max_offline_interval,
config.max_warming_up_interval,
cancel.clone(),
);
let this = Arc::new(Self {
@@ -1664,21 +1668,23 @@ impl Service {
| NodeSchedulingPolicy::Filling
);
if !node.is_available() || reset_scheduling {
let mut new_nodes = (**nodes).clone();
if let Some(node) = new_nodes.get_mut(&reattach_req.node_id) {
if !node.is_available() {
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
}
if reset_scheduling {
node.set_scheduling(NodeSchedulingPolicy::Active);
}
scheduler.node_upsert(node);
let new_nodes = Arc::new(new_nodes);
*nodes = new_nodes;
let mut new_nodes = (**nodes).clone();
if let Some(node) = new_nodes.get_mut(&reattach_req.node_id) {
if reset_scheduling {
node.set_scheduling(NodeSchedulingPolicy::Active);
}
tracing::info!("Marking {} warming-up on reattach", reattach_req.node_id);
node.set_availability(NodeAvailability::WarmingUp(std::time::Instant::now()));
scheduler.node_upsert(node);
let new_nodes = Arc::new(new_nodes);
*nodes = new_nodes;
} else {
tracing::error!(
"Reattaching node {} was removed while processing the request",
reattach_req.node_id
);
}
}
@@ -4719,6 +4725,15 @@ impl Service {
// TODO: in the background, we should balance work back onto this pageserver
}
// No action required for the intermediate unavailable state.
// When we transition into active or offline from the unavailable state,
// the correct handling above will kick in.
AvailabilityTransition::ToWarmingUpFromActive => {
tracing::info!("Node {} transition to unavailable from active", node_id);
}
AvailabilityTransition::ToWarmingUpFromOffline => {
tracing::info!("Node {} transition to unavailable from offline", node_id);
}
AvailabilityTransition::Unchanged => {
tracing::debug!("Node {} no availability change during config", node_id);
}

View File

@@ -2148,6 +2148,23 @@ class StorageControllerApiException(Exception):
self.status_code = status_code
# See libs/pageserver_api/src/controller_api.rs
# for the rust definitions of the enums below
# TODO: Replace with `StrEnum` when we upgrade to python 3.11
class PageserverAvailability(str, Enum):
ACTIVE = "Active"
UNAVAILABLE = "Unavailable"
OFFLINE = "Offline"
class PageserverSchedulingPolicy(str, Enum):
ACTIVE = "Active"
DRAINING = "Draining"
FILLING = "Filling"
PAUSE = "Pause"
PAUSE_FOR_RESTART = "PauseForRestart"
class NeonStorageController(MetricsGetter, LogUtils):
def __init__(self, env: NeonEnv, auth_enabled: bool):
self.env = env
@@ -2531,26 +2548,54 @@ class NeonStorageController(MetricsGetter, LogUtils):
)
log.info("storage controller passed consistency check")
def node_registered(self, node_id: int) -> bool:
"""
Returns true if the storage controller can confirm
it knows of pageserver with 'node_id'
"""
try:
self.node_status(node_id)
except StorageControllerApiException as e:
if e.status_code == 404:
return False
else:
raise e
return True
def poll_node_status(
self, node_id: int, desired_scheduling_policy: str, max_attempts: int, backoff: int
self,
node_id: int,
desired_availability: Optional[PageserverAvailability],
desired_scheduling_policy: Optional[PageserverSchedulingPolicy],
max_attempts: int,
backoff: int,
):
"""
Poll the node status until it reaches 'desired_scheduling_policy' or 'max_attempts' have been exhausted
Poll the node status until it reaches 'desired_scheduling_policy' and 'desired_availability'
or 'max_attempts' have been exhausted
"""
log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy")
log.info(
f"Polling {node_id} for {desired_scheduling_policy} scheduling policy and {desired_availability} availability"
)
while max_attempts > 0:
try:
status = self.node_status(node_id)
policy = status["scheduling"]
if policy == desired_scheduling_policy:
availability = status["availability"]
if (desired_scheduling_policy is None or policy == desired_scheduling_policy) and (
desired_availability is None or availability == desired_availability
):
return
else:
max_attempts -= 1
log.info(f"Status call returned {policy=} ({max_attempts} attempts left)")
log.info(
f"Status call returned {policy=} {availability=} ({max_attempts} attempts left)"
)
if max_attempts == 0:
raise AssertionError(
f"Status for {node_id=} did not reach {desired_scheduling_policy=}"
f"Status for {node_id=} did not reach {desired_scheduling_policy=} {desired_availability=}"
)
time.sleep(backoff)
@@ -2694,6 +2739,14 @@ class NeonPageserver(PgProtocol, LogUtils):
self.id, extra_env_vars=extra_env_vars, timeout_in_seconds=timeout_in_seconds
)
self.running = True
if self.env.storage_controller.running and self.env.storage_controller.node_registered(
self.id
):
self.env.storage_controller.poll_node_status(
self.id, PageserverAvailability.ACTIVE, None, max_attempts=20, backoff=1
)
return self
def stop(self, immediate: bool = False) -> "NeonPageserver":

View File

@@ -8,7 +8,12 @@ import pytest
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PageserverAvailability,
PageserverSchedulingPolicy,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pg_version import PgVersion
@@ -106,7 +111,8 @@ def test_storage_controller_many_tenants(
# Default neon_local uses a small timeout: use a longer one to tolerate longer pageserver restarts.
# TODO: tune this down as restarts get faster (https://github.com/neondatabase/neon/pull/7553), to
# guard against regressions in restart time.
"max_unavailable": "300s"
"max_offline": "30s",
"max_warming_up": "300s",
}
neon_env_builder.control_plane_compute_hook_api = (
compute_reconfigure_listener.control_plane_compute_hook_api
@@ -274,7 +280,11 @@ def test_storage_controller_many_tenants(
)
env.storage_controller.poll_node_status(
ps.id, "PauseForRestart", max_attempts=24, backoff=5
ps.id,
PageserverAvailability.ACTIVE,
PageserverSchedulingPolicy.PAUSE_FOR_RESTART,
max_attempts=24,
backoff=5,
)
shard_counts = get_consistent_node_shard_counts(env, total_shards)
@@ -285,12 +295,24 @@ def test_storage_controller_many_tenants(
assert sum(shard_counts.values()) == total_shards
ps.restart()
env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=24, backoff=1)
env.storage_controller.poll_node_status(
ps.id,
PageserverAvailability.ACTIVE,
PageserverSchedulingPolicy.ACTIVE,
max_attempts=24,
backoff=1,
)
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
)
env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=24, backoff=5)
env.storage_controller.poll_node_status(
ps.id,
PageserverAvailability.ACTIVE,
PageserverSchedulingPolicy.ACTIVE,
max_attempts=24,
backoff=5,
)
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")

View File

@@ -596,19 +596,26 @@ def test_multi_attach(
for ps in pageservers:
ps.stop()
# Returning to a normal healthy state: all pageservers will start, but only the one most
# recently attached via the control plane will re-attach on startup
# Returning to a normal healthy state: all pageservers will start
for ps in pageservers:
ps.start()
with pytest.raises(PageserverApiException):
_detail = http_clients[0].timeline_detail(tenant_id, timeline_id)
with pytest.raises(PageserverApiException):
_detail = http_clients[1].timeline_detail(tenant_id, timeline_id)
_detail = http_clients[2].timeline_detail(tenant_id, timeline_id)
# Pageservers are marked offline by the storage controller during the rolling restart
# above. This may trigger a reschedulling, so there's no guarantee that the tenant
# shard ends up attached to the most recent ps.
raised = 0
serving_ps_idx = None
for idx, http_client in enumerate(http_clients):
try:
_detail = http_client.timeline_detail(tenant_id, timeline_id)
serving_ps_idx = idx
except PageserverApiException:
raised += 1
assert raised == 2 and serving_ps_idx is not None
# All data we wrote while multi-attached remains readable
workload.validate(pageservers[2].id)
workload.validate(pageservers[serving_ps_idx].id)
def test_upgrade_generationless_local_file_paths(

View File

@@ -15,6 +15,10 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
# We inject a delay of 15 seconds for tenant activation below.
# Hence, bump the max delay here to not skip over the activation.
neon_env_builder.pageserver_config_override = 'background_task_maximum_delay="20s"'
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
@@ -70,7 +74,7 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
# pageserver does if a compute node connects and sends a request for the tenant
# while it's still in Loading state. (It waits for the loading to finish, and then
# processes the request.)
tenant_load_delay_ms = 5000
tenant_load_delay_ms = 15000
env.pageserver.stop()
env.pageserver.start(
extra_env_vars={"FAILPOINTS": f"before-attaching-tenant=return({tenant_load_delay_ms})"}

View File

@@ -122,7 +122,12 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
"scheduling": "Stop",
},
)
env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy Stop.*")
env.storage_controller.allowed_errors.extend(
[
".*Scheduling is disabled by policy Stop.*",
".*Skipping reconcile for policy Stop.*",
]
)
# We use a fixed seed to make the test reproducible: we want a randomly
# chosen order, but not to change the order every time we run the test.

View File

@@ -207,7 +207,8 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
neon_env_builder.storage_controller_config = {
# Default neon_local uses a small timeout: use a longer one to tolerate longer pageserver restarts.
"max_unavailable": "300s"
"max_offline": "30s",
"max_warming_up": "300s",
}
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)

View File

@@ -12,6 +12,8 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PageserverAvailability,
PageserverSchedulingPolicy,
PgBin,
StorageControllerApiException,
TokenScope,
@@ -918,6 +920,8 @@ def test_storage_controller_tenant_deletion(
class Failure:
pageserver_id: int
offline_timeout: int
must_detect_after: int
def apply(self, env: NeonEnv):
raise NotImplementedError()
@@ -930,9 +934,11 @@ class Failure:
class NodeStop(Failure):
def __init__(self, pageserver_ids, immediate):
def __init__(self, pageserver_ids, immediate, offline_timeout, must_detect_after):
self.pageserver_ids = pageserver_ids
self.immediate = immediate
self.offline_timeout = offline_timeout
self.must_detect_after = must_detect_after
def apply(self, env: NeonEnv):
for ps_id in self.pageserver_ids:
@@ -948,10 +954,42 @@ class NodeStop(Failure):
return self.pageserver_ids
class NodeRestartWithSlowReattach(Failure):
def __init__(self, pageserver_id, offline_timeout, must_detect_after):
self.pageserver_id = pageserver_id
self.offline_timeout = offline_timeout
self.must_detect_after = must_detect_after
self.thread = None
def apply(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.stop(immediate=False)
def start_ps():
pageserver.start(
extra_env_vars={"FAILPOINTS": "control-plane-client-re-attach=return(30000)"}
)
self.thread = threading.Thread(target=start_ps)
self.thread.start()
def clear(self, env: NeonEnv):
if self.thread is not None:
self.thread.join()
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.http_client().configure_failpoints(("control-plane-client-re-attach", "off"))
def nodes(self):
return [self.pageserver_id]
class PageserverFailpoint(Failure):
def __init__(self, failpoint, pageserver_id):
def __init__(self, failpoint, pageserver_id, offline_timeout, must_detect_after):
self.failpoint = failpoint
self.pageserver_id = pageserver_id
self.offline_timeout = offline_timeout
self.must_detect_after = must_detect_after
def apply(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
@@ -987,15 +1025,28 @@ def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]:
@pytest.mark.parametrize(
"failure",
[
NodeStop(pageserver_ids=[1], immediate=False),
NodeStop(pageserver_ids=[1], immediate=True),
NodeStop(pageserver_ids=[1, 2], immediate=True),
PageserverFailpoint(pageserver_id=1, failpoint="get-utilization-http-handler"),
NodeStop(pageserver_ids=[1], immediate=False, offline_timeout=20, must_detect_after=5),
NodeStop(pageserver_ids=[1], immediate=True, offline_timeout=20, must_detect_after=5),
NodeStop(pageserver_ids=[1, 2], immediate=True, offline_timeout=20, must_detect_after=5),
PageserverFailpoint(
pageserver_id=1,
failpoint="get-utilization-http-handler",
offline_timeout=20,
must_detect_after=5,
),
# Instrument a scenario where the node is slow to re-attach. The re-attach request itself
# should serve as a signal to the storage controller to use a more lenient heartbeat timeout.
NodeRestartWithSlowReattach(pageserver_id=1, offline_timeout=60, must_detect_after=15),
],
)
def test_storage_controller_heartbeats(
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, failure: Failure
):
neon_env_builder.storage_controller_config = {
"max_offline": "10s",
"max_warming_up": "20s",
}
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
@@ -1061,9 +1112,12 @@ def test_storage_controller_heartbeats(
if node["id"] in offline_node_ids:
assert node["availability"] == "Offline"
# A node is considered offline if the last successful heartbeat
# was more than 10 seconds ago (hardcoded in the storage controller).
wait_until(20, 1, nodes_offline)
start = time.time()
wait_until(failure.offline_timeout, 1, nodes_offline)
detected_after = time.time() - start
log.info(f"Detected node failures after {detected_after}s")
assert detected_after >= failure.must_detect_after
# .. expecting the tenant on the offline node to be migrated
def tenant_migrated():
@@ -1546,7 +1600,13 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
)
env.storage_controller.poll_node_status(ps.id, "PauseForRestart", max_attempts=6, backoff=5)
env.storage_controller.poll_node_status(
ps.id,
PageserverAvailability.ACTIVE,
PageserverSchedulingPolicy.PAUSE_FOR_RESTART,
max_attempts=6,
backoff=5,
)
shard_counts = get_node_shard_counts(env, tenant_ids)
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
@@ -1556,12 +1616,24 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
assert sum(shard_counts.values()) == total_shards
ps.restart()
env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=10, backoff=1)
env.storage_controller.poll_node_status(
ps.id,
PageserverAvailability.ACTIVE,
PageserverSchedulingPolicy.ACTIVE,
max_attempts=10,
backoff=1,
)
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
)
env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=6, backoff=5)
env.storage_controller.poll_node_status(
ps.id,
PageserverAvailability.ACTIVE,
PageserverSchedulingPolicy.ACTIVE,
max_attempts=6,
backoff=5,
)
shard_counts = get_node_shard_counts(env, tenant_ids)
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
@@ -1606,11 +1678,23 @@ def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder):
backoff=2,
)
env.storage_controller.poll_node_status(ps_id_to_drain, "Draining", max_attempts=6, backoff=2)
env.storage_controller.poll_node_status(
ps_id_to_drain,
PageserverAvailability.ACTIVE,
PageserverSchedulingPolicy.DRAINING,
max_attempts=6,
backoff=2,
)
env.storage_controller.cancel_node_drain(ps_id_to_drain)
env.storage_controller.poll_node_status(ps_id_to_drain, "Active", max_attempts=6, backoff=2)
env.storage_controller.poll_node_status(
ps_id_to_drain,
PageserverAvailability.ACTIVE,
PageserverSchedulingPolicy.ACTIVE,
max_attempts=6,
backoff=2,
)
@pytest.mark.parametrize("while_offline", [True, False])

View File

@@ -48,13 +48,12 @@ def test_threshold_based_eviction(
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
ps_http = env.pageserver.http_client()
assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == {
"kind": "NoEviction"
}
vps_http = env.storage_controller.pageserver_api()
assert vps_http.tenant_config(tenant_id).effective_config["eviction_policy"] is None
eviction_threshold = 10
eviction_period = 2
ps_http.set_tenant_config(
vps_http.set_tenant_config(
tenant_id,
{
"eviction_policy": {
@@ -64,7 +63,7 @@ def test_threshold_based_eviction(
},
},
)
assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == {
assert vps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == {
"kind": "LayerAccessThreshold",
"threshold": f"{eviction_threshold}s",
"period": f"{eviction_period}s",
@@ -73,7 +72,7 @@ def test_threshold_based_eviction(
# restart because changing tenant config is not instant
env.pageserver.restart()
assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == {
assert vps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == {
"kind": "LayerAccessThreshold",
"threshold": f"{eviction_threshold}s",
"period": f"{eviction_period}s",
@@ -81,7 +80,7 @@ def test_threshold_based_eviction(
# create a bunch of L1s, only the least of which will need to be resident
compaction_threshold = 3 # create L1 layers quickly
ps_http.patch_tenant_config_client_side(
vps_http.patch_tenant_config_client_side(
tenant_id,
inserts={
# Disable gc and compaction to avoid on-demand downloads from their side.
@@ -154,7 +153,7 @@ def test_threshold_based_eviction(
while time.time() - started_waiting_at < observation_window:
current = (
time.time(),
MapInfoProjection(ps_http.layer_map_info(tenant_id, timeline_id)),
MapInfoProjection(vps_http.layer_map_info(tenant_id, timeline_id)),
)
last = map_info_changes[-1] if map_info_changes else (0, None)
if last[1] is None or current[1] != last[1]: