storage_controller: periodic pageserver heartbeats (#7092)

## Problem
If a pageserver was offline when the storage controller started, there
was no mechanism to update the
storage controller state when the pageserver becomes active.

## Summary of changes
* Add a heartbeater module. The heartbeater must be driven by an
external loop.
* Integrate the heartbeater into the service.
- Extend the types used by the service and scheduler to keep track of a
nodes' utilisation score.
- Add a background loop to drive the heartbeater and update the state
based on the deltas it generated
  - Do an initial round of heartbeats at start-up
This commit is contained in:
Vlad Lazar
2024-03-14 15:21:36 +00:00
committed by GitHub
parent 9fe0193e51
commit 38767ace68
17 changed files with 779 additions and 88 deletions

View File

@@ -12,6 +12,7 @@ clap.workspace = true
comfy-table.workspace = true
futures.workspace = true
git-version.workspace = true
humantime.workspace = true
nix.workspace = true
once_cell.workspace = true
postgres.workspace = true

View File

@@ -0,0 +1,227 @@
use futures::{stream::FuturesUnordered, StreamExt};
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};
use tokio_util::sync::CancellationToken;
use pageserver_api::{
controller_api::{NodeAvailability, UtilizationScore},
models::PageserverUtilization,
};
use thiserror::Error;
use utils::id::NodeId;
use crate::node::Node;
struct HeartbeaterTask {
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
cancel: CancellationToken,
state: HashMap<NodeId, PageserverState>,
max_unavailable_interval: Duration,
jwt_token: Option<String>,
}
#[derive(Debug, Clone)]
pub(crate) enum PageserverState {
Available {
last_seen_at: Instant,
utilization: PageserverUtilization,
},
Offline,
}
#[derive(Debug)]
pub(crate) struct AvailablityDeltas(pub Vec<(NodeId, PageserverState)>);
#[derive(Debug, Error)]
pub(crate) enum HeartbeaterError {
#[error("Cancelled")]
Cancel,
}
struct HeartbeatRequest {
pageservers: Arc<HashMap<NodeId, Node>>,
reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas, HeartbeaterError>>,
}
pub(crate) struct Heartbeater {
sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest>,
}
impl Heartbeater {
pub(crate) fn new(
jwt_token: Option<String>,
max_unavailable_interval: Duration,
cancel: CancellationToken,
) -> Self {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest>();
let mut heartbeater =
HeartbeaterTask::new(receiver, jwt_token, max_unavailable_interval, cancel);
tokio::task::spawn(async move { heartbeater.run().await });
Self { sender }
}
pub(crate) async fn heartbeat(
&self,
pageservers: Arc<HashMap<NodeId, Node>>,
) -> Result<AvailablityDeltas, HeartbeaterError> {
let (sender, receiver) = tokio::sync::oneshot::channel();
self.sender
.send(HeartbeatRequest {
pageservers,
reply: sender,
})
.unwrap();
receiver.await.unwrap()
}
}
impl HeartbeaterTask {
fn new(
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
jwt_token: Option<String>,
max_unavailable_interval: Duration,
cancel: CancellationToken,
) -> Self {
Self {
receiver,
cancel,
state: HashMap::new(),
max_unavailable_interval,
jwt_token,
}
}
async fn run(&mut self) {
loop {
tokio::select! {
request = self.receiver.recv() => {
match request {
Some(req) => {
let res = self.heartbeat(req.pageservers).await;
req.reply.send(res).unwrap();
},
None => { return; }
}
},
_ = self.cancel.cancelled() => return
}
}
}
async fn heartbeat(
&mut self,
pageservers: Arc<HashMap<NodeId, Node>>,
) -> Result<AvailablityDeltas, HeartbeaterError> {
let mut new_state = HashMap::new();
let mut heartbeat_futs = FuturesUnordered::new();
for (node_id, node) in &*pageservers {
heartbeat_futs.push({
let jwt_token = self.jwt_token.clone();
let cancel = self.cancel.clone();
// Clone the node and mark it as available such that the request
// goes through to the pageserver even when the node is marked offline.
// This doesn't impact the availability observed by [`crate::service::Service`].
let mut node = node.clone();
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
async move {
let response = node
.with_client_retries(
|client| async move { client.get_utilization().await },
&jwt_token,
2,
3,
Duration::from_secs(1),
&cancel,
)
.await;
let response = match response {
Some(r) => r,
None => {
// This indicates cancellation of the request.
// We ignore the node in this case.
return None;
}
};
let status = if let Ok(utilization) = response {
PageserverState::Available {
last_seen_at: Instant::now(),
utilization,
}
} else {
PageserverState::Offline
};
Some((*node_id, status))
}
});
loop {
let maybe_status = tokio::select! {
next = heartbeat_futs.next() => {
match next {
Some(result) => result,
None => { break; }
}
},
_ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
};
if let Some((node_id, status)) = maybe_status {
new_state.insert(node_id, status);
}
}
}
let mut deltas = Vec::new();
let now = Instant::now();
for (node_id, ps_state) in new_state {
use std::collections::hash_map::Entry::*;
let entry = self.state.entry(node_id);
let mut needs_update = false;
match entry {
Occupied(ref occ) => match (occ.get(), &ps_state) {
(PageserverState::Offline, PageserverState::Offline) => {}
(PageserverState::Available { last_seen_at, .. }, PageserverState::Offline) => {
if now - *last_seen_at >= self.max_unavailable_interval {
deltas.push((node_id, ps_state.clone()));
needs_update = true;
}
}
_ => {
deltas.push((node_id, ps_state.clone()));
needs_update = true;
}
},
Vacant(_) => {
deltas.push((node_id, ps_state.clone()));
}
}
match entry {
Occupied(mut occ) if needs_update => {
(*occ.get_mut()) = ps_state;
}
Vacant(vac) => {
vac.insert(ps_state);
}
_ => {}
}
}
Ok(AvailablityDeltas(deltas))
}
}

View File

@@ -28,7 +28,7 @@ use utils::{
};
use pageserver_api::controller_api::{
NodeConfigureRequest, NodeRegisterRequest, TenantShardMigrateRequest,
NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, TenantShardMigrateRequest,
};
use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
@@ -389,7 +389,14 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
json_response(
StatusCode::OK,
state.service.node_configure(config_req).await?,
state
.service
.node_configure(
config_req.node_id,
config_req.availability.map(NodeAvailability::from),
config_req.scheduling,
)
.await?,
)
}

View File

@@ -3,6 +3,7 @@ use utils::seqwait::MonotonicCounter;
mod auth;
mod compute_hook;
mod heartbeater;
pub mod http;
mod id_lock_map;
pub mod metrics;

View File

@@ -2,7 +2,7 @@ use anyhow::{anyhow, Context};
use attachment_service::http::make_router;
use attachment_service::metrics::preinitialize_metrics;
use attachment_service::persistence::Persistence;
use attachment_service::service::{Config, Service};
use attachment_service::service::{Config, Service, MAX_UNAVAILABLE_INTERVAL_DEFAULT};
use aws_config::{BehaviorVersion, Region};
use camino::Utf8PathBuf;
use clap::Parser;
@@ -54,6 +54,10 @@ struct Cli {
/// URL to connect to postgres, like postgresql://localhost:1234/attachment_service
#[arg(long)]
database_url: Option<String>,
/// Grace period before marking unresponsive pageserver offline
#[arg(long)]
max_unavailable_interval: Option<humantime::Duration>,
}
/// Secrets may either be provided on the command line (for testing), or loaded from AWS SecretManager: this
@@ -249,6 +253,10 @@ async fn async_main() -> anyhow::Result<()> {
jwt_token: secrets.jwt_token,
control_plane_jwt_token: secrets.control_plane_jwt_token,
compute_hook_url: args.compute_hook_url,
max_unavailable_interval: args
.max_unavailable_interval
.map(humantime::Duration::into)
.unwrap_or(MAX_UNAVAILABLE_INTERVAL_DEFAULT),
};
// After loading secrets & config, but before starting anything else, apply database migrations

View File

@@ -12,7 +12,7 @@ use serde::Serialize;
use tokio_util::sync::CancellationToken;
use utils::{backoff, id::NodeId};
use crate::persistence::NodePersistence;
use crate::{persistence::NodePersistence, scheduler::MaySchedule};
/// Represents the in-memory description of a Node.
///
@@ -111,8 +111,8 @@ impl Node {
use NodeAvailability::*;
match (self.availability, availability) {
(Offline, Active) => ToActive,
(Active, Offline) => ToOffline,
(Offline, Active(_)) => ToActive,
(Active(_), Offline) => ToOffline,
_ => Unchanged,
}
}
@@ -123,21 +123,21 @@ impl Node {
// a reference to the original Node's cancellation status. Checking both of these results
// in a "pessimistic" check where we will consider a Node instance unavailable if it was unavailable
// when we cloned it, or if the original Node instance's cancellation token was fired.
matches!(self.availability, NodeAvailability::Active) && !self.cancel.is_cancelled()
matches!(self.availability, NodeAvailability::Active(_)) && !self.cancel.is_cancelled()
}
/// Is this node elegible to have work scheduled onto it?
pub(crate) fn may_schedule(&self) -> bool {
match self.availability {
NodeAvailability::Active => {}
NodeAvailability::Offline => return false,
}
pub(crate) fn may_schedule(&self) -> MaySchedule {
let score = match self.availability {
NodeAvailability::Active(score) => score,
NodeAvailability::Offline => return MaySchedule::No,
};
match self.scheduling {
NodeSchedulingPolicy::Active => true,
NodeSchedulingPolicy::Draining => false,
NodeSchedulingPolicy::Filling => true,
NodeSchedulingPolicy::Pause => false,
NodeSchedulingPolicy::Active => MaySchedule::Yes(score),
NodeSchedulingPolicy::Draining => MaySchedule::No,
NodeSchedulingPolicy::Filling => MaySchedule::Yes(score),
NodeSchedulingPolicy::Pause => MaySchedule::No,
}
}
@@ -155,8 +155,7 @@ impl Node {
listen_pg_addr,
listen_pg_port,
scheduling: NodeSchedulingPolicy::Filling,
// TODO: we shouldn't really call this Active until we've heartbeated it.
availability: NodeAvailability::Active,
availability: NodeAvailability::Offline,
cancel: CancellationToken::new(),
}
}

View File

@@ -1,4 +1,5 @@
use crate::{node::Node, tenant_state::TenantState};
use pageserver_api::controller_api::UtilizationScore;
use serde::Serialize;
use std::collections::HashMap;
use utils::{http::error::ApiError, id::NodeId};
@@ -19,15 +20,34 @@ impl From<ScheduleError> for ApiError {
}
#[derive(Serialize, Eq, PartialEq)]
pub enum MaySchedule {
Yes(UtilizationScore),
No,
}
#[derive(Serialize)]
struct SchedulerNode {
/// How many shards are currently scheduled on this node, via their [`crate::tenant_state::IntentState`].
shard_count: usize,
/// Whether this node is currently elegible to have new shards scheduled (this is derived
/// from a node's availability state and scheduling policy).
may_schedule: bool,
may_schedule: MaySchedule,
}
impl PartialEq for SchedulerNode {
fn eq(&self, other: &Self) -> bool {
let may_schedule_matches = matches!(
(&self.may_schedule, &other.may_schedule),
(MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
);
may_schedule_matches && self.shard_count == other.shard_count
}
}
impl Eq for SchedulerNode {}
/// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
/// on which to run.
///
@@ -186,13 +206,15 @@ impl Scheduler {
return None;
}
// TODO: When the utilization score returned by the pageserver becomes meaningful,
// schedule based on that instead of the shard count.
let node = nodes
.iter()
.map(|node_id| {
let may_schedule = self
.nodes
.get(node_id)
.map(|n| n.may_schedule)
.map(|n| n.may_schedule != MaySchedule::No)
.unwrap_or(false);
(*node_id, may_schedule)
})
@@ -211,7 +233,7 @@ impl Scheduler {
.nodes
.iter()
.filter_map(|(k, v)| {
if hard_exclude.contains(k) || !v.may_schedule {
if hard_exclude.contains(k) || v.may_schedule == MaySchedule::No {
None
} else {
Some((*k, v.shard_count))
@@ -230,7 +252,7 @@ impl Scheduler {
for (node_id, node) in &self.nodes {
tracing::info!(
"Node {node_id}: may_schedule={} shards={}",
node.may_schedule,
node.may_schedule != MaySchedule::No,
node.shard_count
);
}
@@ -255,6 +277,7 @@ impl Scheduler {
pub(crate) mod test_utils {
use crate::node::Node;
use pageserver_api::controller_api::{NodeAvailability, UtilizationScore};
use std::collections::HashMap;
use utils::id::NodeId;
/// Test helper: synthesize the requested number of nodes, all in active state.
@@ -264,13 +287,14 @@ pub(crate) mod test_utils {
(1..n + 1)
.map(|i| {
(NodeId(i), {
let node = Node::new(
let mut node = Node::new(
NodeId(i),
format!("httphost-{i}"),
80 + i as u16,
format!("pghost-{i}"),
5432 + i as u16,
);
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
assert!(node.is_available());
node
})

View File

@@ -16,19 +16,13 @@ use diesel::result::DatabaseErrorKind;
use futures::{stream::FuturesUnordered, StreamExt};
use hyper::StatusCode;
use pageserver_api::{
controller_api::{
NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, PlacementPolicy,
TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse,
TenantShardMigrateRequest, TenantShardMigrateResponse,
},
models::TenantConfigRequest,
};
use pageserver_api::{
controller_api::UtilizationScore,
models::{
self, LocationConfig, LocationConfigListResponse, LocationConfigMode, ShardParameters,
TenantConfig, TenantCreateRequest, TenantLocationConfigRequest,
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
TenantShardSplitResponse, TenantTimeTravelRequest, TimelineCreateRequest, TimelineInfo,
self, LocationConfig, LocationConfigListResponse, LocationConfigMode,
PageserverUtilization, ShardParameters, TenantConfig, TenantCreateRequest,
TenantLocationConfigRequest, TenantLocationConfigResponse, TenantShardLocation,
TenantShardSplitRequest, TenantShardSplitResponse, TenantTimeTravelRequest,
TimelineCreateRequest, TimelineInfo,
},
shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
upcall_api::{
@@ -36,6 +30,14 @@ use pageserver_api::{
ValidateResponse, ValidateResponseTenant,
},
};
use pageserver_api::{
controller_api::{
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse,
TenantShardMigrateRequest, TenantShardMigrateResponse,
},
models::TenantConfigRequest,
};
use pageserver_client::mgmt_api;
use tokio::sync::OwnedRwLockWriteGuard;
use tokio_util::sync::CancellationToken;
@@ -51,6 +53,7 @@ use utils::{
use crate::{
compute_hook::{self, ComputeHook},
heartbeater::{Heartbeater, PageserverState},
node::{AvailabilityTransition, Node},
persistence::{split_state::SplitState, DatabaseError, Persistence, TenantShardPersistence},
reconciler::attached_location_conf,
@@ -78,6 +81,8 @@ const INITIAL_GENERATION: Generation = Generation::new(0);
/// up on unresponsive pageservers and proceed.
pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
pub const MAX_UNAVAILABLE_INTERVAL_DEFAULT: Duration = Duration::from_secs(30);
// Top level state available to all HTTP handlers
struct ServiceState {
tenants: BTreeMap<TenantShardId, TenantState>,
@@ -125,6 +130,11 @@ pub struct Config {
/// (this URL points to the control plane in prod). If this is None, the compute hook will
/// assume it is running in a test environment and try to update neon_local.
pub compute_hook_url: Option<String>,
/// Grace period within which a pageserver does not respond to heartbeats, but is still
/// considered active. Once the grace period elapses, the next heartbeat failure will
/// mark the pagseserver offline.
pub max_unavailable_interval: Duration,
}
impl From<DatabaseError> for ApiError {
@@ -149,6 +159,8 @@ pub struct Service {
compute_hook: Arc<ComputeHook>,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
heartbeater: Heartbeater,
// Channel for background cleanup from failed operations that require cleanup, such as shard split
abort_tx: tokio::sync::mpsc::UnboundedSender<TenantShardSplitAbort>,
@@ -232,8 +244,6 @@ impl Service {
let mut observed: HashMap<TenantShardId, Vec<(NodeId, Option<LocationConfig>)>> =
HashMap::new();
let mut nodes_online = HashSet::new();
// Startup reconciliation does I/O to other services: whether they
// are responsive or not, we should aim to finish within our deadline, because:
// - If we don't, a k8s readiness hook watching /ready will kill us.
@@ -255,6 +265,9 @@ impl Service {
let mut cleanup = Vec::new();
let node_listings = self.scan_node_locations(node_scan_deadline).await;
// Send initial heartbeat requests to nodes that replied to the location listing above.
let nodes_online = self.initial_heartbeat_round(node_listings.keys()).await;
for (node_id, list_response) in node_listings {
let tenant_shards = list_response.tenant_shards;
tracing::info!(
@@ -262,7 +275,6 @@ impl Service {
tenant_shards.len(),
node_id
);
nodes_online.insert(node_id);
for (tenant_shard_id, conf_opt) in tenant_shards {
let shard_observations = observed.entry(tenant_shard_id).or_default();
@@ -281,8 +293,10 @@ impl Service {
// Mark nodes online if they responded to us: nodes are offline by default after a restart.
let mut new_nodes = (**nodes).clone();
for (node_id, node) in new_nodes.iter_mut() {
if nodes_online.contains(node_id) {
node.set_availability(NodeAvailability::Active);
if let Some(utilization) = nodes_online.get(node_id) {
node.set_availability(NodeAvailability::Active(UtilizationScore(
utilization.utilization_score,
)));
scheduler.node_upsert(node);
}
}
@@ -371,6 +385,49 @@ impl Service {
tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)");
}
async fn initial_heartbeat_round<'a>(
&self,
node_ids: impl Iterator<Item = &'a NodeId>,
) -> HashMap<NodeId, PageserverUtilization> {
assert!(!self.startup_complete.is_ready());
let all_nodes = {
let locked = self.inner.read().unwrap();
locked.nodes.clone()
};
let mut nodes_to_heartbeat = HashMap::new();
for node_id in node_ids {
match all_nodes.get(node_id) {
Some(node) => {
nodes_to_heartbeat.insert(*node_id, node.clone());
}
None => {
tracing::warn!("Node {node_id} was removed during start-up");
}
}
}
let res = self
.heartbeater
.heartbeat(Arc::new(nodes_to_heartbeat))
.await;
let mut online_nodes = HashMap::new();
if let Ok(deltas) = res {
for (node_id, status) in deltas.0 {
match status {
PageserverState::Available { utilization, .. } => {
online_nodes.insert(node_id, utilization);
}
PageserverState::Offline => {}
}
}
}
online_nodes
}
/// Used during [`Self::startup_reconcile`]: issue GETs to all nodes concurrently, with a deadline.
///
/// The result includes only nodes which responded within the deadline
@@ -391,7 +448,7 @@ impl Service {
node_list_futs.push({
async move {
tracing::info!("Scanning shards on node {node}...");
let timeout = Duration::from_secs(5);
let timeout = Duration::from_secs(1);
let response = node
.with_client_retries(
|client| async move { client.list_location_config().await },
@@ -586,6 +643,56 @@ impl Service {
}
}
}
#[instrument(skip_all)]
async fn spawn_heartbeat_driver(&self) {
self.startup_complete.clone().wait().await;
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
let mut interval = tokio::time::interval(HEARTBEAT_INTERVAL);
while !self.cancel.is_cancelled() {
tokio::select! {
_ = interval.tick() => { }
_ = self.cancel.cancelled() => return
};
let nodes = {
let locked = self.inner.read().unwrap();
locked.nodes.clone()
};
let res = self.heartbeater.heartbeat(nodes).await;
if let Ok(deltas) = res {
for (node_id, state) in deltas.0 {
let new_availability = match state {
PageserverState::Available { utilization, .. } => NodeAvailability::Active(
UtilizationScore(utilization.utilization_score),
),
PageserverState::Offline => NodeAvailability::Offline,
};
let res = self
.node_configure(node_id, Some(new_availability), None)
.await;
match res {
Ok(()) => {}
Err(ApiError::NotFound(_)) => {
// This should be rare, but legitimate since the heartbeats are done
// on a snapshot of the nodes.
tracing::info!("Node {} was not found after heartbeat round", node_id);
}
Err(err) => {
tracing::error!(
"Failed to update node {} after heartbeat round: {}",
node_id,
err
);
}
}
}
}
}
}
/// Apply the contents of a [`ReconcileResult`] to our in-memory state: if the reconciliation
/// was successful, this will update the observed state of the tenant such that subsequent
@@ -836,6 +943,12 @@ impl Service {
let (startup_completion, startup_complete) = utils::completion::channel();
let cancel = CancellationToken::new();
let heartbeater = Heartbeater::new(
config.jwt_token.clone(),
config.max_unavailable_interval,
cancel.clone(),
);
let this = Arc::new(Self {
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
nodes, tenants, scheduler,
@@ -844,9 +957,10 @@ impl Service {
persistence,
compute_hook: Arc::new(ComputeHook::new(config)),
result_tx,
heartbeater,
abort_tx,
startup_complete: startup_complete.clone(),
cancel: CancellationToken::new(),
cancel,
gate: Gate::default(),
tenant_op_locks: Default::default(),
node_op_locks: Default::default(),
@@ -899,13 +1013,28 @@ impl Service {
};
this.startup_reconcile().await;
drop(startup_completion);
}
});
tokio::task::spawn({
let this = this.clone();
let startup_complete = startup_complete.clone();
async move {
startup_complete.wait().await;
this.background_reconcile().await;
}
});
tokio::task::spawn({
let this = this.clone();
let startup_complete = startup_complete.clone();
async move {
startup_complete.wait().await;
this.spawn_heartbeat_driver().await;
}
});
Ok(this)
}
@@ -964,11 +1093,37 @@ impl Service {
}
let new_generation = if let Some(req_node_id) = attach_req.node_id {
Some(
self.persistence
.increment_generation(attach_req.tenant_shard_id, req_node_id)
.await?,
)
let maybe_tenant_conf = {
let locked = self.inner.write().unwrap();
locked
.tenants
.get(&attach_req.tenant_shard_id)
.map(|t| t.config.clone())
};
match maybe_tenant_conf {
Some(conf) => {
let new_generation = self
.persistence
.increment_generation(attach_req.tenant_shard_id, req_node_id)
.await?;
// Persist the placement policy update. This is required
// when we reattaching a detached tenant.
self.persistence
.update_tenant_shard(
attach_req.tenant_shard_id,
PlacementPolicy::Single,
conf,
None,
)
.await?;
Some(new_generation)
}
None => {
anyhow::bail!("Attach hook handling raced with tenant removal")
}
}
} else {
self.persistence.detach(attach_req.tenant_shard_id).await?;
None
@@ -983,6 +1138,7 @@ impl Service {
if let Some(new_generation) = new_generation {
tenant_state.generation = Some(new_generation);
tenant_state.policy = PlacementPolicy::Single;
} else {
// This is a detach notification. We must update placement policy to avoid re-attaching
// during background scheduling/reconciliation, or during storage controller restart.
@@ -1085,7 +1241,7 @@ impl Service {
// This Node is a mutable local copy: we will set it active so that we can use its
// API client to reconcile with the node. The Node in [`Self::nodes`] will get updated
// later.
node.set_availability(NodeAvailability::Active);
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
let configs = match node
.with_client_retries(
@@ -1196,7 +1352,7 @@ impl Service {
// Apply the updated generation to our in-memory state
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, _scheduler) = locked.parts_mut();
let (nodes, tenants, scheduler) = locked.parts_mut();
let mut response = ReAttachResponse {
tenants: Vec::new(),
@@ -1271,7 +1427,8 @@ impl Service {
if !node.is_available() {
let mut new_nodes = (**nodes).clone();
if let Some(node) = new_nodes.get_mut(&reattach_req.node_id) {
node.set_availability(NodeAvailability::Active);
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
scheduler.node_upsert(node);
}
let new_nodes = Arc::new(new_nodes);
*nodes = new_nodes;
@@ -3328,16 +3485,16 @@ impl Service {
pub(crate) async fn node_configure(
&self,
config_req: NodeConfigureRequest,
node_id: NodeId,
availability: Option<NodeAvailability>,
scheduling: Option<NodeSchedulingPolicy>,
) -> Result<(), ApiError> {
let _node_lock = self.node_op_locks.exclusive(config_req.node_id).await;
let _node_lock = self.node_op_locks.exclusive(node_id).await;
if let Some(scheduling) = config_req.scheduling {
if let Some(scheduling) = scheduling {
// Scheduling is a persistent part of Node: we must write updates to the database before
// applying them in memory
self.persistence
.update_node(config_req.node_id, scheduling)
.await?;
self.persistence.update_node(node_id, scheduling).await?;
}
// If we're activating a node, then before setting it active we must reconcile any shard locations
@@ -3346,12 +3503,12 @@ impl Service {
//
// The transition we calculate here remains valid later in the function because we hold the op lock on the node:
// nothing else can mutate its availability while we run.
let availability_transition = if let Some(input_availability) = config_req.availability {
let availability_transition = if let Some(input_availability) = availability {
let (activate_node, availability_transition) = {
let locked = self.inner.read().unwrap();
let Some(node) = locked.nodes.get(&config_req.node_id) else {
let Some(node) = locked.nodes.get(&node_id) else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Node {} not registered", config_req.node_id).into(),
anyhow::anyhow!("Node {} not registered", node_id).into(),
));
};
@@ -3376,17 +3533,17 @@ impl Service {
let mut new_nodes = (**nodes).clone();
let Some(node) = new_nodes.get_mut(&config_req.node_id) else {
let Some(node) = new_nodes.get_mut(&node_id) else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Node not registered").into(),
));
};
if let Some(availability) = &config_req.availability {
if let Some(availability) = &availability {
node.set_availability(*availability);
}
if let Some(scheduling) = config_req.scheduling {
if let Some(scheduling) = scheduling {
node.set_scheduling(scheduling);
// TODO: once we have a background scheduling ticker for fill/drain, kick it
@@ -3401,25 +3558,23 @@ impl Service {
// Modify scheduling state for any Tenants that are affected by a change in the node's availability state.
match availability_transition {
AvailabilityTransition::ToOffline => {
tracing::info!("Node {} transition to offline", config_req.node_id);
tracing::info!("Node {} transition to offline", node_id);
let mut tenants_affected: usize = 0;
for (tenant_shard_id, tenant_state) in tenants {
if let Some(observed_loc) =
tenant_state.observed.locations.get_mut(&config_req.node_id)
{
if let Some(observed_loc) = tenant_state.observed.locations.get_mut(&node_id) {
// When a node goes offline, we set its observed configuration to None, indicating unknown: we will
// not assume our knowledge of the node's configuration is accurate until it comes back online
observed_loc.conf = None;
}
if tenant_state.intent.demote_attached(config_req.node_id) {
if tenant_state.intent.demote_attached(node_id) {
tenant_state.sequence = tenant_state.sequence.next();
match tenant_state.schedule(scheduler) {
Err(e) => {
// It is possible that some tenants will become unschedulable when too many pageservers
// go offline: in this case there isn't much we can do other than make the issue observable.
// TODO: give TenantState a scheduling error attribute to be queried later.
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id);
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id);
}
Ok(()) => {
if self
@@ -3435,17 +3590,15 @@ impl Service {
tracing::info!(
"Launched {} reconciler tasks for tenants affected by node {} going offline",
tenants_affected,
config_req.node_id
node_id
)
}
AvailabilityTransition::ToActive => {
tracing::info!("Node {} transition to active", config_req.node_id);
tracing::info!("Node {} transition to active", node_id);
// When a node comes back online, we must reconcile any tenant that has a None observed
// location on the node.
for tenant_state in locked.tenants.values_mut() {
if let Some(observed_loc) =
tenant_state.observed.locations.get_mut(&config_req.node_id)
{
if let Some(observed_loc) = tenant_state.observed.locations.get_mut(&node_id) {
if observed_loc.conf.is_none() {
self.maybe_reconcile_shard(tenant_state, &new_nodes);
}
@@ -3455,7 +3608,7 @@ impl Service {
// TODO: in the background, we should balance work back onto this pageserver
}
AvailabilityTransition::Unchanged => {
tracing::info!("Node {} no change during config", config_req.node_id);
tracing::info!("Node {} no change during config", node_id);
}
}
@@ -3534,17 +3687,23 @@ impl Service {
)
}
/// Check all tenants for pending reconciliation work, and reconcile those in need
/// Check all tenants for pending reconciliation work, and reconcile those in need.
/// Additionally, reschedule tenants that require it.
///
/// Returns how many reconciliation tasks were started
fn reconcile_all(&self) -> usize {
let mut locked = self.inner.write().unwrap();
let pageservers = locked.nodes.clone();
locked
.tenants
.iter_mut()
.filter_map(|(_tenant_shard_id, shard)| self.maybe_reconcile_shard(shard, &pageservers))
.count()
let (nodes, tenants, _scheduler) = locked.parts_mut();
let pageservers = nodes.clone();
let mut reconciles_spawned = 0;
for (_tenant_shard_id, shard) in tenants.iter_mut() {
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
reconciles_spawned += 1;
}
}
reconciles_spawned
}
pub async fn shutdown(&self) {

View File

@@ -38,6 +38,9 @@ const COMMAND: &str = "storage_controller";
const STORAGE_CONTROLLER_POSTGRES_VERSION: u32 = 16;
// Use a shorter pageserver unavailability interval than the default to speed up tests.
const NEON_LOCAL_MAX_UNAVAILABLE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
#[derive(Serialize, Deserialize)]
pub struct AttachHookRequest {
pub tenant_shard_id: TenantShardId,
@@ -269,6 +272,8 @@ impl StorageController {
// Run migrations on every startup, in case something changed.
let database_url = self.setup_database().await?;
let max_unavailable: humantime::Duration = NEON_LOCAL_MAX_UNAVAILABLE_INTERVAL.into();
let mut args = vec![
"-l",
&self.listen,
@@ -276,6 +281,8 @@ impl StorageController {
self.path.as_ref(),
"--database-url",
&database_url,
"--max-unavailable-interval",
&max_unavailable.to_string(),
]
.into_iter()
.map(|s| s.to_string())