diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs
index d85753bedc..15ae2a26b4 100644
--- a/control_plane/attachment_service/src/http.rs
+++ b/control_plane/attachment_service/src/http.rs
@@ -66,14 +66,7 @@ fn get_state(request: &Request
) -> &HttpState {
async fn handle_re_attach(mut req: Request) -> Result, ApiError> {
let reattach_req = json_request::(&mut req).await?;
let state = get_state(&req);
- json_response(
- StatusCode::OK,
- state
- .service
- .re_attach(reattach_req)
- .await
- .map_err(ApiError::InternalServerError)?,
- )
+ json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?)
}
/// Pageserver calls into this before doing deletions, to confirm that it still
diff --git a/control_plane/attachment_service/src/scheduler.rs b/control_plane/attachment_service/src/scheduler.rs
index fb3c7f634c..7059071bee 100644
--- a/control_plane/attachment_service/src/scheduler.rs
+++ b/control_plane/attachment_service/src/scheduler.rs
@@ -175,6 +175,33 @@ impl Scheduler {
}
}
+ /// Where we have several nodes to choose from, for example when picking a secondary location
+ /// to promote to an attached location, this method may be used to pick the best choice based
+ /// on the scheduler's knowledge of utilization and availability.
+ ///
+ /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
+ /// caller can pick a node some other way.
+ pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option {
+ if nodes.is_empty() {
+ return None;
+ }
+
+ let node = nodes
+ .iter()
+ .map(|node_id| {
+ let may_schedule = self
+ .nodes
+ .get(node_id)
+ .map(|n| n.may_schedule)
+ .unwrap_or(false);
+ (*node_id, may_schedule)
+ })
+ .max_by_key(|(_n, may_schedule)| *may_schedule);
+
+ // If even the preferred node has may_schedule==false, return None
+ node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
+ }
+
pub(crate) fn schedule_shard(&self, hard_exclude: &[NodeId]) -> Result {
if self.nodes.is_empty() {
return Err(ScheduleError::NoPageservers);
@@ -224,44 +251,45 @@ impl Scheduler {
}
}
+#[cfg(test)]
+pub(crate) mod test_utils {
+
+ use crate::node::Node;
+ use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
+ use std::collections::HashMap;
+ use utils::id::NodeId;
+ /// Test helper: synthesize the requested number of nodes, all in active state.
+ ///
+ /// Node IDs start at one.
+ pub(crate) fn make_test_nodes(n: u64) -> HashMap {
+ (1..n + 1)
+ .map(|i| {
+ (
+ NodeId(i),
+ Node {
+ id: NodeId(i),
+ availability: NodeAvailability::Active,
+ scheduling: NodeSchedulingPolicy::Active,
+ listen_http_addr: format!("httphost-{i}"),
+ listen_http_port: 80 + i as u16,
+ listen_pg_addr: format!("pghost-{i}"),
+ listen_pg_port: 5432 + i as u16,
+ },
+ )
+ })
+ .collect()
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
- use std::collections::HashMap;
-
- use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
use utils::id::NodeId;
- use crate::{node::Node, tenant_state::IntentState};
-
+ use crate::tenant_state::IntentState;
#[test]
fn scheduler_basic() -> anyhow::Result<()> {
- let mut nodes = HashMap::new();
- nodes.insert(
- NodeId(1),
- Node {
- id: NodeId(1),
- availability: NodeAvailability::Active,
- scheduling: NodeSchedulingPolicy::Active,
- listen_http_addr: String::new(),
- listen_http_port: 0,
- listen_pg_addr: String::new(),
- listen_pg_port: 0,
- },
- );
-
- nodes.insert(
- NodeId(2),
- Node {
- id: NodeId(2),
- availability: NodeAvailability::Active,
- scheduling: NodeSchedulingPolicy::Active,
- listen_http_addr: String::new(),
- listen_http_port: 0,
- listen_pg_addr: String::new(),
- listen_pg_port: 0,
- },
- );
+ let nodes = test_utils::make_test_nodes(2);
let mut scheduler = Scheduler::new(nodes.values());
let mut t1_intent = IntentState::new();
diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs
index 6366348017..0b9a7d8a69 100644
--- a/control_plane/attachment_service/src/service.rs
+++ b/control_plane/attachment_service/src/service.rs
@@ -914,7 +914,15 @@ impl Service {
pub(crate) async fn re_attach(
&self,
reattach_req: ReAttachRequest,
- ) -> anyhow::Result {
+ ) -> Result {
+ // Take a re-attach as indication that the node is available: this is a precursor to proper
+ // heartbeating in https://github.com/neondatabase/neon/issues/6844
+ self.node_configure(NodeConfigureRequest {
+ node_id: reattach_req.node_id,
+ availability: Some(NodeAvailability::Active),
+ scheduling: None,
+ })?;
+
// Ordering: we must persist generation number updates before making them visible in the in-memory state
let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?;
diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs
index 7970207e27..3cfffc6c45 100644
--- a/control_plane/attachment_service/src/tenant_state.rs
+++ b/control_plane/attachment_service/src/tenant_state.rs
@@ -143,6 +143,23 @@ impl IntentState {
}
}
+ /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
+ /// secondary to attached while maintaining the scheduler's reference counts.
+ pub(crate) fn promote_attached(
+ &mut self,
+ _scheduler: &mut Scheduler,
+ promote_secondary: NodeId,
+ ) {
+ // If we call this with a node that isn't in secondary, it would cause incorrect
+ // scheduler reference counting, since we assume the node is already referenced as a secondary.
+ debug_assert!(self.secondary.contains(&promote_secondary));
+
+ // TODO: when scheduler starts tracking attached + secondary counts separately, we will
+ // need to call into it here.
+ self.secondary.retain(|n| n != &promote_secondary);
+ self.attached = Some(promote_secondary);
+ }
+
pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
debug_assert!(!self.secondary.contains(&new_secondary));
scheduler.node_inc_ref(new_secondary);
@@ -197,6 +214,8 @@ impl IntentState {
/// Returns true if a change was made
pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
if self.attached == Some(node_id) {
+ // TODO: when scheduler starts tracking attached + secondary counts separately, we will
+ // need to call into it here.
self.attached = None;
self.secondary.push(node_id);
true
@@ -370,6 +389,9 @@ impl TenantState {
// All remaining observed locations generate secondary intents. This includes None
// observations, as these may well have some local content on disk that is usable (this
// is an edge case that might occur if we restarted during a migration or other change)
+ //
+ // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
+ // will take care of promoting one of these secondaries to be attached.
self.observed.locations.keys().for_each(|node_id| {
if Some(*node_id) != self.intent.attached {
self.intent.secondary.push(*node_id);
@@ -377,6 +399,33 @@ impl TenantState {
});
}
+ /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
+ /// attached pageserver for a shard.
+ ///
+ /// Returns whether we modified it, and the NodeId selected.
+ fn schedule_attached(
+ &mut self,
+ scheduler: &mut Scheduler,
+ ) -> Result<(bool, NodeId), ScheduleError> {
+ // No work to do if we already have an attached tenant
+ if let Some(node_id) = self.intent.attached {
+ return Ok((false, node_id));
+ }
+
+ if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
+ // Promote a secondary
+ tracing::debug!("Promoted secondary {} to attached", promote_secondary);
+ self.intent.promote_attached(scheduler, promote_secondary);
+ Ok((true, promote_secondary))
+ } else {
+ // Pick a fresh node: either we had no secondaries or none were schedulable
+ let node_id = scheduler.schedule_shard(&self.intent.secondary)?;
+ tracing::debug!("Selected {} as attached", node_id);
+ self.intent.set_attached(scheduler, Some(node_id));
+ Ok((true, node_id))
+ }
+ }
+
pub(crate) fn schedule(&mut self, scheduler: &mut Scheduler) -> Result<(), ScheduleError> {
// TODO: before scheduling new nodes, check if any existing content in
// self.intent refers to pageservers that are offline, and pick other
@@ -387,19 +436,15 @@ impl TenantState {
// Build the set of pageservers already in use by this tenant, to avoid scheduling
// more work on the same pageservers we're already using.
- let mut used_pageservers = self.intent.all_pageservers();
let mut modified = false;
use PlacementPolicy::*;
match self.policy {
Single => {
// Should have exactly one attached, and zero secondaries
- if self.intent.attached.is_none() {
- let node_id = scheduler.schedule_shard(&used_pageservers)?;
- self.intent.set_attached(scheduler, Some(node_id));
- used_pageservers.push(node_id);
- modified = true;
- }
+ let (modified_attached, _attached_node_id) = self.schedule_attached(scheduler)?;
+ modified |= modified_attached;
+
if !self.intent.secondary.is_empty() {
self.intent.clear_secondary(scheduler);
modified = true;
@@ -407,13 +452,10 @@ impl TenantState {
}
Double(secondary_count) => {
// Should have exactly one attached, and N secondaries
- if self.intent.attached.is_none() {
- let node_id = scheduler.schedule_shard(&used_pageservers)?;
- self.intent.set_attached(scheduler, Some(node_id));
- used_pageservers.push(node_id);
- modified = true;
- }
+ let (modified_attached, attached_node_id) = self.schedule_attached(scheduler)?;
+ modified |= modified_attached;
+ let mut used_pageservers = vec![attached_node_id];
while self.intent.secondary.len() < secondary_count {
let node_id = scheduler.schedule_shard(&used_pageservers)?;
self.intent.push_secondary(scheduler, node_id);
@@ -702,3 +744,83 @@ impl TenantState {
}
}
}
+
+#[cfg(test)]
+pub(crate) mod tests {
+ use pageserver_api::shard::{ShardCount, ShardNumber};
+ use utils::id::TenantId;
+
+ use crate::scheduler::test_utils::make_test_nodes;
+
+ use super::*;
+
+ fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantState {
+ let tenant_id = TenantId::generate();
+ let shard_number = ShardNumber(0);
+ let shard_count = ShardCount::new(1);
+
+ let tenant_shard_id = TenantShardId {
+ tenant_id,
+ shard_number,
+ shard_count,
+ };
+ TenantState::new(
+ tenant_shard_id,
+ ShardIdentity::new(
+ shard_number,
+ shard_count,
+ pageserver_api::shard::ShardStripeSize(32768),
+ )
+ .unwrap(),
+ policy,
+ )
+ }
+
+ /// Test the scheduling behaviors used when a tenant configured for HA is subject
+ /// to nodes being marked offline.
+ #[test]
+ fn tenant_ha_scheduling() -> anyhow::Result<()> {
+ // Start with three nodes. Our tenant will only use two. The third one is
+ // expected to remain unused.
+ let mut nodes = make_test_nodes(3);
+
+ let mut scheduler = Scheduler::new(nodes.values());
+
+ let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Double(1));
+ tenant_state
+ .schedule(&mut scheduler)
+ .expect("we have enough nodes, scheduling should work");
+
+ // Expect to initially be schedule on to different nodes
+ assert_eq!(tenant_state.intent.secondary.len(), 1);
+ assert!(tenant_state.intent.attached.is_some());
+
+ let attached_node_id = tenant_state.intent.attached.unwrap();
+ let secondary_node_id = *tenant_state.intent.secondary.iter().last().unwrap();
+ assert_ne!(attached_node_id, secondary_node_id);
+
+ // Notifying the attached node is offline should demote it to a secondary
+ let changed = tenant_state.intent.notify_offline(attached_node_id);
+ assert!(changed);
+
+ // Update the scheduler state to indicate the node is offline
+ nodes.get_mut(&attached_node_id).unwrap().availability = NodeAvailability::Offline;
+ scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
+
+ // Scheduling the node should promote the still-available secondary node to attached
+ tenant_state
+ .schedule(&mut scheduler)
+ .expect("active nodes are available");
+ assert_eq!(tenant_state.intent.attached.unwrap(), secondary_node_id);
+
+ // The original attached node should have been retained as a secondary
+ assert_eq!(
+ *tenant_state.intent.secondary.iter().last().unwrap(),
+ attached_node_id
+ );
+
+ tenant_state.intent.clear(&mut scheduler);
+
+ Ok(())
+ }
+}
diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs
index 8dd86bad96..5909477586 100644
--- a/control_plane/src/pageserver.rs
+++ b/control_plane/src/pageserver.rs
@@ -210,6 +210,25 @@ impl PageServerNode {
update_config: bool,
register: bool,
) -> anyhow::Result<()> {
+ // Register the node with the storage controller before starting pageserver: pageserver must be registered to
+ // successfully call /re-attach and finish starting up.
+ if register {
+ let attachment_service = AttachmentService::from_env(&self.env);
+ let (pg_host, pg_port) =
+ parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
+ let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr)
+ .expect("Unable to parse listen_http_addr");
+ attachment_service
+ .node_register(NodeRegisterRequest {
+ node_id: self.conf.id,
+ listen_pg_addr: pg_host.to_string(),
+ listen_pg_port: pg_port.unwrap_or(5432),
+ listen_http_addr: http_host.to_string(),
+ listen_http_port: http_port.unwrap_or(80),
+ })
+ .await?;
+ }
+
// TODO: using a thread here because start_process() is not async but we need to call check_status()
let datadir = self.repo_path();
print!(
@@ -248,23 +267,6 @@ impl PageServerNode {
)
.await?;
- if register {
- let attachment_service = AttachmentService::from_env(&self.env);
- let (pg_host, pg_port) =
- parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
- let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr)
- .expect("Unable to parse listen_http_addr");
- attachment_service
- .node_register(NodeRegisterRequest {
- node_id: self.conf.id,
- listen_pg_addr: pg_host.to_string(),
- listen_pg_port: pg_port.unwrap_or(5432),
- listen_http_addr: http_host.to_string(),
- listen_http_port: http_port.unwrap_or(80),
- })
- .await?;
- }
-
Ok(())
}
diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py
index e62d239d77..00c3a1628e 100644
--- a/test_runner/regress/test_sharding_service.py
+++ b/test_runner/regress/test_sharding_service.py
@@ -272,8 +272,13 @@ def test_sharding_service_onboarding(
env.broker.try_start()
env.attachment_service.start()
- # This is the pageserver where we'll initially create the tenant
- env.pageservers[0].start(register=False)
+ # This is the pageserver where we'll initially create the tenant. Run it in emergency
+ # mode so that it doesn't talk to storage controller, and do not register it.
+ env.pageservers[0].allowed_errors.append(".*Emergency mode!.*")
+ env.pageservers[0].start(
+ overrides=("--pageserver-config-override=control_plane_emergency_mode=true",),
+ register=False,
+ )
origin_ps = env.pageservers[0]
# This is the pageserver managed by the sharding service, where the tenant