controller: improved handling of offline nodes (#6846)

Stacks on https://github.com/neondatabase/neon/pull/6823

- Pending a heartbeating mechanism (#6844 ), use /re-attach calls as a
cue to mark an offline node as active, so that a node which is
unavailable during controller startup doesn't require manual
intervention if it later starts/restarts.
- Tweak scheduling logic so that when we schedule the attached location
for a tenant, we prefer to select from secondary locations rather than
picking a fresh one.

This is an interim state until we implement #6844 and full chaos testing
for handling failures.
This commit is contained in:
John Spray
2024-02-22 14:01:06 +00:00
committed by GitHub
parent c671aeacd4
commit 9c48b5c4ab
6 changed files with 230 additions and 72 deletions

View File

@@ -66,14 +66,7 @@ fn get_state(request: &Request<Body>) -> &HttpState {
async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
let state = get_state(&req);
json_response(
StatusCode::OK,
state
.service
.re_attach(reattach_req)
.await
.map_err(ApiError::InternalServerError)?,
)
json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?)
}
/// Pageserver calls into this before doing deletions, to confirm that it still

View File

@@ -175,6 +175,33 @@ impl Scheduler {
}
}
/// Where we have several nodes to choose from, for example when picking a secondary location
/// to promote to an attached location, this method may be used to pick the best choice based
/// on the scheduler's knowledge of utilization and availability.
///
/// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
/// caller can pick a node some other way.
pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
if nodes.is_empty() {
return None;
}
let node = nodes
.iter()
.map(|node_id| {
let may_schedule = self
.nodes
.get(node_id)
.map(|n| n.may_schedule)
.unwrap_or(false);
(*node_id, may_schedule)
})
.max_by_key(|(_n, may_schedule)| *may_schedule);
// If even the preferred node has may_schedule==false, return None
node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
}
pub(crate) fn schedule_shard(&self, hard_exclude: &[NodeId]) -> Result<NodeId, ScheduleError> {
if self.nodes.is_empty() {
return Err(ScheduleError::NoPageservers);
@@ -224,44 +251,45 @@ impl Scheduler {
}
}
#[cfg(test)]
pub(crate) mod test_utils {
use crate::node::Node;
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
use std::collections::HashMap;
use utils::id::NodeId;
/// Test helper: synthesize the requested number of nodes, all in active state.
///
/// Node IDs start at one.
pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
(1..n + 1)
.map(|i| {
(
NodeId(i),
Node {
id: NodeId(i),
availability: NodeAvailability::Active,
scheduling: NodeSchedulingPolicy::Active,
listen_http_addr: format!("httphost-{i}"),
listen_http_port: 80 + i as u16,
listen_pg_addr: format!("pghost-{i}"),
listen_pg_port: 5432 + i as u16,
},
)
})
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
use utils::id::NodeId;
use crate::{node::Node, tenant_state::IntentState};
use crate::tenant_state::IntentState;
#[test]
fn scheduler_basic() -> anyhow::Result<()> {
let mut nodes = HashMap::new();
nodes.insert(
NodeId(1),
Node {
id: NodeId(1),
availability: NodeAvailability::Active,
scheduling: NodeSchedulingPolicy::Active,
listen_http_addr: String::new(),
listen_http_port: 0,
listen_pg_addr: String::new(),
listen_pg_port: 0,
},
);
nodes.insert(
NodeId(2),
Node {
id: NodeId(2),
availability: NodeAvailability::Active,
scheduling: NodeSchedulingPolicy::Active,
listen_http_addr: String::new(),
listen_http_port: 0,
listen_pg_addr: String::new(),
listen_pg_port: 0,
},
);
let nodes = test_utils::make_test_nodes(2);
let mut scheduler = Scheduler::new(nodes.values());
let mut t1_intent = IntentState::new();

View File

@@ -914,7 +914,15 @@ impl Service {
pub(crate) async fn re_attach(
&self,
reattach_req: ReAttachRequest,
) -> anyhow::Result<ReAttachResponse> {
) -> Result<ReAttachResponse, ApiError> {
// Take a re-attach as indication that the node is available: this is a precursor to proper
// heartbeating in https://github.com/neondatabase/neon/issues/6844
self.node_configure(NodeConfigureRequest {
node_id: reattach_req.node_id,
availability: Some(NodeAvailability::Active),
scheduling: None,
})?;
// Ordering: we must persist generation number updates before making them visible in the in-memory state
let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?;

View File

@@ -143,6 +143,23 @@ impl IntentState {
}
}
/// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
/// secondary to attached while maintaining the scheduler's reference counts.
pub(crate) fn promote_attached(
&mut self,
_scheduler: &mut Scheduler,
promote_secondary: NodeId,
) {
// If we call this with a node that isn't in secondary, it would cause incorrect
// scheduler reference counting, since we assume the node is already referenced as a secondary.
debug_assert!(self.secondary.contains(&promote_secondary));
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
// need to call into it here.
self.secondary.retain(|n| n != &promote_secondary);
self.attached = Some(promote_secondary);
}
pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
debug_assert!(!self.secondary.contains(&new_secondary));
scheduler.node_inc_ref(new_secondary);
@@ -197,6 +214,8 @@ impl IntentState {
/// Returns true if a change was made
pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
if self.attached == Some(node_id) {
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
// need to call into it here.
self.attached = None;
self.secondary.push(node_id);
true
@@ -370,6 +389,9 @@ impl TenantState {
// All remaining observed locations generate secondary intents. This includes None
// observations, as these may well have some local content on disk that is usable (this
// is an edge case that might occur if we restarted during a migration or other change)
//
// We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
// will take care of promoting one of these secondaries to be attached.
self.observed.locations.keys().for_each(|node_id| {
if Some(*node_id) != self.intent.attached {
self.intent.secondary.push(*node_id);
@@ -377,6 +399,33 @@ impl TenantState {
});
}
/// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
/// attached pageserver for a shard.
///
/// Returns whether we modified it, and the NodeId selected.
fn schedule_attached(
&mut self,
scheduler: &mut Scheduler,
) -> Result<(bool, NodeId), ScheduleError> {
// No work to do if we already have an attached tenant
if let Some(node_id) = self.intent.attached {
return Ok((false, node_id));
}
if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
// Promote a secondary
tracing::debug!("Promoted secondary {} to attached", promote_secondary);
self.intent.promote_attached(scheduler, promote_secondary);
Ok((true, promote_secondary))
} else {
// Pick a fresh node: either we had no secondaries or none were schedulable
let node_id = scheduler.schedule_shard(&self.intent.secondary)?;
tracing::debug!("Selected {} as attached", node_id);
self.intent.set_attached(scheduler, Some(node_id));
Ok((true, node_id))
}
}
pub(crate) fn schedule(&mut self, scheduler: &mut Scheduler) -> Result<(), ScheduleError> {
// TODO: before scheduling new nodes, check if any existing content in
// self.intent refers to pageservers that are offline, and pick other
@@ -387,19 +436,15 @@ impl TenantState {
// Build the set of pageservers already in use by this tenant, to avoid scheduling
// more work on the same pageservers we're already using.
let mut used_pageservers = self.intent.all_pageservers();
let mut modified = false;
use PlacementPolicy::*;
match self.policy {
Single => {
// Should have exactly one attached, and zero secondaries
if self.intent.attached.is_none() {
let node_id = scheduler.schedule_shard(&used_pageservers)?;
self.intent.set_attached(scheduler, Some(node_id));
used_pageservers.push(node_id);
modified = true;
}
let (modified_attached, _attached_node_id) = self.schedule_attached(scheduler)?;
modified |= modified_attached;
if !self.intent.secondary.is_empty() {
self.intent.clear_secondary(scheduler);
modified = true;
@@ -407,13 +452,10 @@ impl TenantState {
}
Double(secondary_count) => {
// Should have exactly one attached, and N secondaries
if self.intent.attached.is_none() {
let node_id = scheduler.schedule_shard(&used_pageservers)?;
self.intent.set_attached(scheduler, Some(node_id));
used_pageservers.push(node_id);
modified = true;
}
let (modified_attached, attached_node_id) = self.schedule_attached(scheduler)?;
modified |= modified_attached;
let mut used_pageservers = vec![attached_node_id];
while self.intent.secondary.len() < secondary_count {
let node_id = scheduler.schedule_shard(&used_pageservers)?;
self.intent.push_secondary(scheduler, node_id);
@@ -702,3 +744,83 @@ impl TenantState {
}
}
}
#[cfg(test)]
pub(crate) mod tests {
use pageserver_api::shard::{ShardCount, ShardNumber};
use utils::id::TenantId;
use crate::scheduler::test_utils::make_test_nodes;
use super::*;
fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantState {
let tenant_id = TenantId::generate();
let shard_number = ShardNumber(0);
let shard_count = ShardCount::new(1);
let tenant_shard_id = TenantShardId {
tenant_id,
shard_number,
shard_count,
};
TenantState::new(
tenant_shard_id,
ShardIdentity::new(
shard_number,
shard_count,
pageserver_api::shard::ShardStripeSize(32768),
)
.unwrap(),
policy,
)
}
/// Test the scheduling behaviors used when a tenant configured for HA is subject
/// to nodes being marked offline.
#[test]
fn tenant_ha_scheduling() -> anyhow::Result<()> {
// Start with three nodes. Our tenant will only use two. The third one is
// expected to remain unused.
let mut nodes = make_test_nodes(3);
let mut scheduler = Scheduler::new(nodes.values());
let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Double(1));
tenant_state
.schedule(&mut scheduler)
.expect("we have enough nodes, scheduling should work");
// Expect to initially be schedule on to different nodes
assert_eq!(tenant_state.intent.secondary.len(), 1);
assert!(tenant_state.intent.attached.is_some());
let attached_node_id = tenant_state.intent.attached.unwrap();
let secondary_node_id = *tenant_state.intent.secondary.iter().last().unwrap();
assert_ne!(attached_node_id, secondary_node_id);
// Notifying the attached node is offline should demote it to a secondary
let changed = tenant_state.intent.notify_offline(attached_node_id);
assert!(changed);
// Update the scheduler state to indicate the node is offline
nodes.get_mut(&attached_node_id).unwrap().availability = NodeAvailability::Offline;
scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
// Scheduling the node should promote the still-available secondary node to attached
tenant_state
.schedule(&mut scheduler)
.expect("active nodes are available");
assert_eq!(tenant_state.intent.attached.unwrap(), secondary_node_id);
// The original attached node should have been retained as a secondary
assert_eq!(
*tenant_state.intent.secondary.iter().last().unwrap(),
attached_node_id
);
tenant_state.intent.clear(&mut scheduler);
Ok(())
}
}

View File

@@ -210,6 +210,25 @@ impl PageServerNode {
update_config: bool,
register: bool,
) -> anyhow::Result<()> {
// Register the node with the storage controller before starting pageserver: pageserver must be registered to
// successfully call /re-attach and finish starting up.
if register {
let attachment_service = AttachmentService::from_env(&self.env);
let (pg_host, pg_port) =
parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr)
.expect("Unable to parse listen_http_addr");
attachment_service
.node_register(NodeRegisterRequest {
node_id: self.conf.id,
listen_pg_addr: pg_host.to_string(),
listen_pg_port: pg_port.unwrap_or(5432),
listen_http_addr: http_host.to_string(),
listen_http_port: http_port.unwrap_or(80),
})
.await?;
}
// TODO: using a thread here because start_process() is not async but we need to call check_status()
let datadir = self.repo_path();
print!(
@@ -248,23 +267,6 @@ impl PageServerNode {
)
.await?;
if register {
let attachment_service = AttachmentService::from_env(&self.env);
let (pg_host, pg_port) =
parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr)
.expect("Unable to parse listen_http_addr");
attachment_service
.node_register(NodeRegisterRequest {
node_id: self.conf.id,
listen_pg_addr: pg_host.to_string(),
listen_pg_port: pg_port.unwrap_or(5432),
listen_http_addr: http_host.to_string(),
listen_http_port: http_port.unwrap_or(80),
})
.await?;
}
Ok(())
}

View File

@@ -272,8 +272,13 @@ def test_sharding_service_onboarding(
env.broker.try_start()
env.attachment_service.start()
# This is the pageserver where we'll initially create the tenant
env.pageservers[0].start(register=False)
# This is the pageserver where we'll initially create the tenant. Run it in emergency
# mode so that it doesn't talk to storage controller, and do not register it.
env.pageservers[0].allowed_errors.append(".*Emergency mode!.*")
env.pageservers[0].start(
overrides=("--pageserver-config-override=control_plane_emergency_mode=true",),
register=False,
)
origin_ps = env.pageservers[0]
# This is the pageserver managed by the sharding service, where the tenant