From 536bdb32098c0332478ef0fb22ae92c9a4577785 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 13 Feb 2025 12:06:30 +0100 Subject: [PATCH] storcon: track safekeepers in memory, send heartbeats to them (#10583) In #9011, we want to schedule timelines to safekeepers. In order to do such scheduling, we need information about how utilized a safekeeper is and if it's available or not. Therefore, send constant heartbeats to the safekeepers and try to figure out if they are online or not. Includes some code from #10440. --- Cargo.lock | 2 + safekeeper/client/src/mgmt_api.rs | 10 +- storage_controller/Cargo.toml | 2 + storage_controller/src/heartbeater.rs | 189 +++++++++++++++++--- storage_controller/src/lib.rs | 2 + storage_controller/src/metrics.rs | 12 ++ storage_controller/src/persistence.rs | 32 ++-- storage_controller/src/safekeeper.rs | 139 ++++++++++++++ storage_controller/src/safekeeper_client.rs | 105 +++++++++++ storage_controller/src/service.rs | 185 ++++++++++++++++--- 10 files changed, 613 insertions(+), 65 deletions(-) create mode 100644 storage_controller/src/safekeeper.rs create mode 100644 storage_controller/src/safekeeper_client.rs diff --git a/Cargo.lock b/Cargo.lock index 407c8170bb..b3a88d46ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6464,6 +6464,8 @@ dependencies = [ "routerify", "rustls 0.23.18", "rustls-native-certs 0.8.0", + "safekeeper_api", + "safekeeper_client", "scoped-futures", "scopeguard", "serde", diff --git a/safekeeper/client/src/mgmt_api.rs b/safekeeper/client/src/mgmt_api.rs index df049f3eba..d4f47fc96d 100644 --- a/safekeeper/client/src/mgmt_api.rs +++ b/safekeeper/client/src/mgmt_api.rs @@ -5,7 +5,7 @@ use http_utils::error::HttpErrorBody; use reqwest::{IntoUrl, Method, StatusCode}; -use safekeeper_api::models::{TimelineCreateRequest, TimelineStatus}; +use safekeeper_api::models::{SafekeeperUtilization, TimelineCreateRequest, TimelineStatus}; use std::error::Error as _; use utils::{ id::{NodeId, TenantId, TimelineId}, @@ -32,6 +32,9 @@ pub enum Error { /// Status is not ok; parsed error in body as `HttpErrorBody`. #[error("safekeeper API: {1}")] ApiError(StatusCode, String), + + #[error("Cancelled")] + Cancelled, } pub type Result = std::result::Result; @@ -124,9 +127,10 @@ impl Client { self.get(&uri).await } - pub async fn utilization(&self) -> Result { + pub async fn utilization(&self) -> Result { let uri = format!("{}/v1/utilization/", self.mgmt_api_endpoint); - self.get(&uri).await + let resp = self.get(&uri).await?; + resp.json().await.map_err(Error::ReceiveBody) } async fn post( diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index 91d8098cb9..69276bfde4 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -32,6 +32,8 @@ postgres_connection.workspace = true rand.workspace = true reqwest = { workspace = true, features = ["stream"] } routerify.workspace = true +safekeeper_api.workspace = true +safekeeper_client.workspace = true rustls-native-certs.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/storage_controller/src/heartbeater.rs b/storage_controller/src/heartbeater.rs index b7e66d33eb..6f110d3294 100644 --- a/storage_controller/src/heartbeater.rs +++ b/storage_controller/src/heartbeater.rs @@ -1,6 +1,10 @@ use futures::{stream::FuturesUnordered, StreamExt}; +use safekeeper_api::models::SafekeeperUtilization; +use safekeeper_client::mgmt_api; use std::{ collections::HashMap, + fmt::Debug, + future::Future, sync::Arc, time::{Duration, Instant}, }; @@ -9,15 +13,15 @@ use tokio_util::sync::CancellationToken; use pageserver_api::{controller_api::NodeAvailability, models::PageserverUtilization}; use thiserror::Error; -use utils::id::NodeId; +use utils::{id::NodeId, logging::SecretString}; -use crate::node::Node; +use crate::{node::Node, safekeeper::Safekeeper}; -struct HeartbeaterTask { - receiver: tokio::sync::mpsc::UnboundedReceiver, +struct HeartbeaterTask { + receiver: tokio::sync::mpsc::UnboundedReceiver>, cancel: CancellationToken, - state: HashMap, + state: HashMap, max_offline_interval: Duration, max_warming_up_interval: Duration, @@ -36,8 +40,17 @@ pub(crate) enum PageserverState { Offline, } +#[derive(Debug, Clone)] +pub(crate) enum SafekeeperState { + Available { + last_seen_at: Instant, + utilization: SafekeeperUtilization, + }, + Offline, +} + #[derive(Debug)] -pub(crate) struct AvailablityDeltas(pub Vec<(NodeId, PageserverState)>); +pub(crate) struct AvailablityDeltas(pub Vec<(NodeId, State)>); #[derive(Debug, Error)] pub(crate) enum HeartbeaterError { @@ -45,23 +58,28 @@ pub(crate) enum HeartbeaterError { Cancel, } -struct HeartbeatRequest { - pageservers: Arc>, - reply: tokio::sync::oneshot::Sender>, +struct HeartbeatRequest { + servers: Arc>, + reply: tokio::sync::oneshot::Sender, HeartbeaterError>>, } -pub(crate) struct Heartbeater { - sender: tokio::sync::mpsc::UnboundedSender, +pub(crate) struct Heartbeater { + sender: tokio::sync::mpsc::UnboundedSender>, } -impl Heartbeater { +#[allow(private_bounds)] +impl Heartbeater +where + HeartbeaterTask: HeartBeat, +{ pub(crate) fn new( jwt_token: Option, max_offline_interval: Duration, max_warming_up_interval: Duration, cancel: CancellationToken, ) -> Self { - let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::(); + let (sender, receiver) = + tokio::sync::mpsc::unbounded_channel::>(); let mut heartbeater = HeartbeaterTask::new( receiver, jwt_token, @@ -76,12 +94,12 @@ impl Heartbeater { pub(crate) async fn heartbeat( &self, - pageservers: Arc>, - ) -> Result { + servers: Arc>, + ) -> Result, HeartbeaterError> { let (sender, receiver) = tokio::sync::oneshot::channel(); self.sender .send(HeartbeatRequest { - pageservers, + servers, reply: sender, }) .map_err(|_| HeartbeaterError::Cancel)?; @@ -93,9 +111,12 @@ impl Heartbeater { } } -impl HeartbeaterTask { +impl HeartbeaterTask +where + HeartbeaterTask: HeartBeat, +{ fn new( - receiver: tokio::sync::mpsc::UnboundedReceiver, + receiver: tokio::sync::mpsc::UnboundedReceiver>, jwt_token: Option, max_offline_interval: Duration, max_warming_up_interval: Duration, @@ -110,14 +131,13 @@ impl HeartbeaterTask { jwt_token, } } - async fn run(&mut self) { loop { tokio::select! { request = self.receiver.recv() => { match request { Some(req) => { - let res = self.heartbeat(req.pageservers).await; + let res = self.heartbeat(req.servers).await; req.reply.send(res).unwrap(); }, None => { return; } @@ -127,11 +147,20 @@ impl HeartbeaterTask { } } } +} +pub(crate) trait HeartBeat { + fn heartbeat( + &mut self, + pageservers: Arc>, + ) -> impl Future, HeartbeaterError>> + Send; +} + +impl HeartBeat for HeartbeaterTask { async fn heartbeat( &mut self, pageservers: Arc>, - ) -> Result { + ) -> Result, HeartbeaterError> { let mut new_state = HashMap::new(); let mut heartbeat_futs = FuturesUnordered::new(); @@ -272,3 +301,121 @@ impl HeartbeaterTask { Ok(AvailablityDeltas(deltas)) } } + +impl HeartBeat for HeartbeaterTask { + async fn heartbeat( + &mut self, + safekeepers: Arc>, + ) -> Result, HeartbeaterError> { + let mut new_state = HashMap::new(); + + let mut heartbeat_futs = FuturesUnordered::new(); + for (node_id, sk) in &*safekeepers { + heartbeat_futs.push({ + let jwt_token = self + .jwt_token + .as_ref() + .map(|t| SecretString::from(t.to_owned())); + let cancel = self.cancel.clone(); + + async move { + let response = sk + .with_client_retries( + |client| async move { client.get_utilization().await }, + &jwt_token, + 3, + 3, + Duration::from_secs(1), + &cancel, + ) + .await; + + let status = match response { + Ok(utilization) => SafekeeperState::Available { + last_seen_at: Instant::now(), + utilization, + }, + Err(mgmt_api::Error::Cancelled) => { + // This indicates cancellation of the request. + // We ignore the node in this case. + return None; + } + Err(_) => SafekeeperState::Offline, + }; + + Some((*node_id, status)) + } + }); + + loop { + let maybe_status = tokio::select! { + next = heartbeat_futs.next() => { + match next { + Some(result) => result, + None => { break; } + } + }, + _ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); } + }; + + if let Some((node_id, status)) = maybe_status { + new_state.insert(node_id, status); + } + } + } + + let mut offline = 0; + for state in new_state.values() { + match state { + SafekeeperState::Offline { .. } => offline += 1, + SafekeeperState::Available { .. } => {} + } + } + + tracing::info!( + "Heartbeat round complete for {} safekeepers, {} offline", + new_state.len(), + offline + ); + + let mut deltas = Vec::new(); + let now = Instant::now(); + for (node_id, sk_state) in new_state.iter_mut() { + use std::collections::hash_map::Entry::*; + let entry = self.state.entry(*node_id); + + let mut needs_update = false; + match entry { + Occupied(ref occ) => match (occ.get(), &sk_state) { + (SafekeeperState::Offline, SafekeeperState::Offline) => {} + (SafekeeperState::Available { last_seen_at, .. }, SafekeeperState::Offline) => { + if now - *last_seen_at >= self.max_offline_interval { + deltas.push((*node_id, sk_state.clone())); + needs_update = true; + } + } + _ => { + deltas.push((*node_id, sk_state.clone())); + needs_update = true; + } + }, + Vacant(_) => { + // This is a new node. Don't generate a delta for it. + deltas.push((*node_id, sk_state.clone())); + } + } + + match entry { + Occupied(mut occ) if needs_update => { + (*occ.get_mut()) = sk_state.clone(); + } + Vacant(vac) => { + vac.insert(sk_state.clone()); + } + _ => {} + } + } + + Ok(AvailablityDeltas(deltas)) + } +} diff --git a/storage_controller/src/lib.rs b/storage_controller/src/lib.rs index f5823935e1..5f2c081927 100644 --- a/storage_controller/src/lib.rs +++ b/storage_controller/src/lib.rs @@ -17,6 +17,8 @@ mod pageserver_client; mod peer_client; pub mod persistence; mod reconciler; +mod safekeeper; +mod safekeeper_client; mod scheduler; mod schema; pub mod service; diff --git a/storage_controller/src/metrics.rs b/storage_controller/src/metrics.rs index 4164e3dc2b..6d67e0d130 100644 --- a/storage_controller/src/metrics.rs +++ b/storage_controller/src/metrics.rs @@ -80,6 +80,11 @@ pub(crate) struct StorageControllerMetricGroup { pub(crate) storage_controller_pageserver_request_error: measured::CounterVec, + /// Count of HTTP requests to the safekeeper that resulted in an error, + /// broken down by the safekeeper node id, request name and method + pub(crate) storage_controller_safekeeper_request_error: + measured::CounterVec, + /// Latency of HTTP requests to the pageserver, broken down by pageserver /// node id, request name and method. This include both successful and unsuccessful /// requests. @@ -87,6 +92,13 @@ pub(crate) struct StorageControllerMetricGroup { pub(crate) storage_controller_pageserver_request_latency: measured::HistogramVec, + /// Latency of HTTP requests to the safekeeper, broken down by safekeeper + /// node id, request name and method. This include both successful and unsuccessful + /// requests. + #[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))] + pub(crate) storage_controller_safekeeper_request_latency: + measured::HistogramVec, + /// Count of pass-through HTTP requests to the pageserver that resulted in an error, /// broken down by the pageserver node id, request name and method pub(crate) storage_controller_passthrough_request_error: diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index c4e5b39589..67b60eadf3 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1185,23 +1185,6 @@ impl Persistence { Ok(safekeepers) } - pub(crate) async fn safekeeper_get( - &self, - id: i64, - ) -> Result { - use crate::schema::safekeepers::dsl::{id as id_column, safekeepers}; - self.with_conn(move |conn| { - Box::pin(async move { - Ok(safekeepers - .filter(id_column.eq(&id)) - .select(SafekeeperPersistence::as_select()) - .get_result(conn) - .await?) - }) - }) - .await - } - pub(crate) async fn safekeeper_upsert( &self, record: SafekeeperUpsert, @@ -1554,6 +1537,21 @@ pub(crate) struct SafekeeperPersistence { } impl SafekeeperPersistence { + pub(crate) fn from_upsert( + upsert: SafekeeperUpsert, + scheduling_policy: SkSchedulingPolicy, + ) -> Self { + crate::persistence::SafekeeperPersistence { + id: upsert.id, + region_id: upsert.region_id, + version: upsert.version, + host: upsert.host, + port: upsert.port, + http_port: upsert.http_port, + availability_zone_id: upsert.availability_zone_id, + scheduling_policy: String::from(scheduling_policy), + } + } pub(crate) fn as_describe_response(&self) -> Result { let scheduling_policy = SkSchedulingPolicy::from_str(&self.scheduling_policy).map_err(|e| { diff --git a/storage_controller/src/safekeeper.rs b/storage_controller/src/safekeeper.rs new file mode 100644 index 0000000000..be073d0cb9 --- /dev/null +++ b/storage_controller/src/safekeeper.rs @@ -0,0 +1,139 @@ +use std::{str::FromStr, time::Duration}; + +use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy}; +use reqwest::StatusCode; +use safekeeper_client::mgmt_api; +use tokio_util::sync::CancellationToken; +use utils::{backoff, id::NodeId, logging::SecretString}; + +use crate::{ + heartbeater::SafekeeperState, + persistence::{DatabaseError, SafekeeperPersistence}, + safekeeper_client::SafekeeperClient, +}; + +#[derive(Clone)] +pub struct Safekeeper { + pub(crate) skp: SafekeeperPersistence, + cancel: CancellationToken, + listen_http_addr: String, + listen_http_port: u16, + id: NodeId, + availability: SafekeeperState, +} + +impl Safekeeper { + pub(crate) fn from_persistence(skp: SafekeeperPersistence, cancel: CancellationToken) -> Self { + Self { + cancel, + listen_http_addr: skp.host.clone(), + listen_http_port: skp.http_port as u16, + id: NodeId(skp.id as u64), + skp, + availability: SafekeeperState::Offline, + } + } + pub(crate) fn base_url(&self) -> String { + format!("http://{}:{}", self.listen_http_addr, self.listen_http_port) + } + + pub(crate) fn get_id(&self) -> NodeId { + self.id + } + pub(crate) fn describe_response(&self) -> Result { + self.skp.as_describe_response() + } + pub(crate) fn set_availability(&mut self, availability: SafekeeperState) { + self.availability = availability; + } + /// Perform an operation (which is given a [`SafekeeperClient`]) with retries + pub(crate) async fn with_client_retries( + &self, + mut op: O, + jwt: &Option, + warn_threshold: u32, + max_retries: u32, + timeout: Duration, + cancel: &CancellationToken, + ) -> mgmt_api::Result + where + O: FnMut(SafekeeperClient) -> F, + F: std::future::Future>, + { + fn is_fatal(e: &mgmt_api::Error) -> bool { + use mgmt_api::Error::*; + match e { + ReceiveBody(_) | ReceiveErrorBody(_) => false, + ApiError(StatusCode::SERVICE_UNAVAILABLE, _) + | ApiError(StatusCode::GATEWAY_TIMEOUT, _) + | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false, + ApiError(_, _) => true, + Cancelled => true, + } + } + + backoff::retry( + || { + let http_client = reqwest::ClientBuilder::new() + .timeout(timeout) + .build() + .expect("Failed to construct HTTP client"); + + let client = SafekeeperClient::from_client( + self.get_id(), + http_client, + self.base_url(), + jwt.clone(), + ); + + let node_cancel_fut = self.cancel.cancelled(); + + let op_fut = op(client); + + async { + tokio::select! { + r = op_fut=> {r}, + _ = node_cancel_fut => { + Err(mgmt_api::Error::Cancelled) + }} + } + }, + is_fatal, + warn_threshold, + max_retries, + &format!( + "Call to node {} ({}:{}) management API", + self.id, self.listen_http_addr, self.listen_http_port + ), + cancel, + ) + .await + .unwrap_or(Err(mgmt_api::Error::Cancelled)) + } + + pub(crate) fn update_from_record(&mut self, record: crate::persistence::SafekeeperUpsert) { + let crate::persistence::SafekeeperUpsert { + active: _, + availability_zone_id: _, + host, + http_port, + id, + port: _, + region_id: _, + version: _, + } = record.clone(); + if id != self.id.0 as i64 { + // The way the function is called ensures this. If we regress on that, it's a bug. + panic!( + "id can't be changed via update_from_record function: {id} != {}", + self.id.0 + ); + } + self.skp = crate::persistence::SafekeeperPersistence::from_upsert( + record, + SkSchedulingPolicy::from_str(&self.skp.scheduling_policy).unwrap(), + ); + self.listen_http_port = http_port as u16; + self.listen_http_addr = host; + } +} diff --git a/storage_controller/src/safekeeper_client.rs b/storage_controller/src/safekeeper_client.rs new file mode 100644 index 0000000000..bb494f20fa --- /dev/null +++ b/storage_controller/src/safekeeper_client.rs @@ -0,0 +1,105 @@ +use crate::metrics::PageserverRequestLabelGroup; +use safekeeper_api::models::{SafekeeperUtilization, TimelineCreateRequest, TimelineStatus}; +use safekeeper_client::mgmt_api::{Client, Result}; +use utils::{ + id::{NodeId, TenantId, TimelineId}, + logging::SecretString, +}; + +/// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage +/// controller to collect metrics in a non-intrusive manner. +/// +/// Analogous to [`crate::pageserver_client::PageserverClient`]. +#[derive(Debug, Clone)] +pub(crate) struct SafekeeperClient { + inner: Client, + node_id_label: String, +} + +macro_rules! measured_request { + ($name:literal, $method:expr, $node_id: expr, $invoke:expr) => {{ + let labels = PageserverRequestLabelGroup { + pageserver_id: $node_id, + path: $name, + method: $method, + }; + + let latency = &crate::metrics::METRICS_REGISTRY + .metrics_group + .storage_controller_safekeeper_request_latency; + let _timer_guard = latency.start_timer(labels.clone()); + + let res = $invoke; + + if res.is_err() { + let error_counters = &crate::metrics::METRICS_REGISTRY + .metrics_group + .storage_controller_pageserver_request_error; + error_counters.inc(labels) + } + + res + }}; +} + +impl SafekeeperClient { + #[allow(dead_code)] + pub(crate) fn new( + node_id: NodeId, + mgmt_api_endpoint: String, + jwt: Option, + ) -> Self { + Self { + inner: Client::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt), + node_id_label: node_id.0.to_string(), + } + } + + pub(crate) fn from_client( + node_id: NodeId, + raw_client: reqwest::Client, + mgmt_api_endpoint: String, + jwt: Option, + ) -> Self { + Self { + inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt), + node_id_label: node_id.0.to_string(), + } + } + + #[allow(dead_code)] + pub(crate) async fn create_timeline( + &self, + req: &TimelineCreateRequest, + ) -> Result { + measured_request!( + "create_timeline", + crate::metrics::Method::Post, + &self.node_id_label, + self.inner.create_timeline(req).await + ) + } + + #[allow(dead_code)] + pub(crate) async fn delete_timeline( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Result { + measured_request!( + "delete_timeline", + crate::metrics::Method::Delete, + &self.node_id_label, + self.inner.delete_timeline(tenant_id, timeline_id).await + ) + } + + pub(crate) async fn get_utilization(&self) -> Result { + measured_request!( + "utilization", + crate::metrics::Method::Get, + &self.node_id_label, + self.inner.utilization().await + ) + } +} diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 6829663a4c..b9db46fe4a 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -2,6 +2,7 @@ pub mod chaos_injector; mod context_iterator; use hyper::Uri; +use safekeeper_api::models::SafekeeperUtilization; use std::{ borrow::Cow, cmp::Ordering, @@ -20,6 +21,7 @@ use crate::{ }, compute_hook::{self, NotifyError}, drain_utils::{self, TenantShardDrain, TenantShardIterator}, + heartbeater::SafekeeperState, id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard}, leadership::Leadership, metrics, @@ -29,6 +31,7 @@ use crate::{ ShardGenerationState, TenantFilter, }, reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder}, + safekeeper::Safekeeper, scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode}, tenant_shard::{ MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus, @@ -206,6 +209,8 @@ struct ServiceState { nodes: Arc>, + safekeepers: Arc>, + scheduler: Scheduler, /// Ongoing background operation on the cluster if any is running. @@ -272,6 +277,7 @@ fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError { impl ServiceState { fn new( nodes: HashMap, + safekeepers: HashMap, tenants: BTreeMap, scheduler: Scheduler, delayed_reconcile_rx: tokio::sync::mpsc::Receiver, @@ -283,6 +289,7 @@ impl ServiceState { leadership_status: initial_leadership_status, tenants, nodes: Arc::new(nodes), + safekeepers: Arc::new(safekeepers), scheduler, ongoing_operation: None, delayed_reconcile_rx, @@ -299,6 +306,23 @@ impl ServiceState { (&mut self.nodes, &mut self.tenants, &mut self.scheduler) } + #[allow(clippy::type_complexity)] + fn parts_mut_sk( + &mut self, + ) -> ( + &mut Arc>, + &mut Arc>, + &mut BTreeMap, + &mut Scheduler, + ) { + ( + &mut self.nodes, + &mut self.safekeepers, + &mut self.tenants, + &mut self.scheduler, + ) + } + fn get_leadership_status(&self) -> LeadershipStatus { self.leadership_status } @@ -397,7 +421,8 @@ pub struct Service { compute_hook: Arc, result_tx: tokio::sync::mpsc::UnboundedSender, - heartbeater: Heartbeater, + heartbeater_ps: Heartbeater, + heartbeater_sk: Heartbeater, // Channel for background cleanup from failed operations that require cleanup, such as shard split abort_tx: tokio::sync::mpsc::UnboundedSender, @@ -607,7 +632,8 @@ impl Service { let locked = self.inner.read().unwrap(); locked.nodes.clone() }; - let mut nodes_online = self.initial_heartbeat_round(all_nodes.keys()).await; + let (mut nodes_online, mut sks_online) = + self.initial_heartbeat_round(all_nodes.keys()).await; // List of tenants for which we will attempt to notify compute of their location at startup let mut compute_notifications = Vec::new(); @@ -616,7 +642,7 @@ impl Service { tracing::info!("Populating tenant shards' states from initial pageserver scan..."); let shard_count = { let mut locked = self.inner.write().unwrap(); - let (nodes, tenants, scheduler) = locked.parts_mut(); + let (nodes, safekeepers, tenants, scheduler) = locked.parts_mut_sk(); // Mark nodes online if they responded to us: nodes are offline by default after a restart. let mut new_nodes = (**nodes).clone(); @@ -628,6 +654,17 @@ impl Service { } *nodes = Arc::new(new_nodes); + let mut new_sks = (**safekeepers).clone(); + for (node_id, node) in new_sks.iter_mut() { + if let Some((utilization, last_seen_at)) = sks_online.remove(node_id) { + node.set_availability(SafekeeperState::Available { + utilization, + last_seen_at, + }); + } + } + *safekeepers = Arc::new(new_sks); + for (tenant_shard_id, observed_state) in observed.0 { let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else { for node_id in observed_state.locations.keys() { @@ -736,7 +773,10 @@ impl Service { async fn initial_heartbeat_round<'a>( &self, node_ids: impl Iterator, - ) -> HashMap { + ) -> ( + HashMap, + HashMap, + ) { assert!(!self.startup_complete.is_ready()); let all_nodes = { @@ -756,14 +796,20 @@ impl Service { } } + let all_sks = { + let locked = self.inner.read().unwrap(); + locked.safekeepers.clone() + }; + tracing::info!("Sending initial heartbeats..."); - let res = self - .heartbeater + let res_ps = self + .heartbeater_ps .heartbeat(Arc::new(nodes_to_heartbeat)) .await; + let res_sk = self.heartbeater_sk.heartbeat(all_sks).await; let mut online_nodes = HashMap::new(); - if let Ok(deltas) = res { + if let Ok(deltas) = res_ps { for (node_id, status) in deltas.0 { match status { PageserverState::Available { utilization, .. } => { @@ -777,7 +823,22 @@ impl Service { } } - online_nodes + let mut online_sks = HashMap::new(); + if let Ok(deltas) = res_sk { + for (node_id, status) in deltas.0 { + match status { + SafekeeperState::Available { + utilization, + last_seen_at, + } => { + online_sks.insert(node_id, (utilization, last_seen_at)); + } + SafekeeperState::Offline => {} + } + } + } + + (online_nodes, online_sks) } /// Used during [`Self::startup_reconcile`]: issue GETs to all nodes concurrently, with a deadline. @@ -984,8 +1045,14 @@ impl Service { locked.nodes.clone() }; - let res = self.heartbeater.heartbeat(nodes).await; - if let Ok(deltas) = res { + let safekeepers = { + let locked = self.inner.read().unwrap(); + locked.safekeepers.clone() + }; + + let res_ps = self.heartbeater_ps.heartbeat(nodes).await; + let res_sk = self.heartbeater_sk.heartbeat(safekeepers).await; + if let Ok(deltas) = res_ps { let mut to_handle = Vec::default(); for (node_id, state) in deltas.0 { @@ -1086,6 +1153,18 @@ impl Service { } } } + if let Ok(deltas) = res_sk { + let mut locked = self.inner.write().unwrap(); + let mut safekeepers = (*locked.safekeepers).clone(); + for (id, state) in deltas.0 { + let Some(sk) = safekeepers.get_mut(&id) else { + tracing::info!("Couldn't update safekeeper safekeeper state for id {id} from heartbeat={state:?}"); + continue; + }; + sk.set_availability(state); + } + locked.safekeepers = Arc::new(safekeepers); + } } } @@ -1311,6 +1390,17 @@ impl Service { .storage_controller_pageserver_nodes .set(nodes.len() as i64); + tracing::info!("Loading safekeepers from database..."); + let safekeepers = persistence + .list_safekeepers() + .await? + .into_iter() + .map(|skp| Safekeeper::from_persistence(skp, CancellationToken::new())) + .collect::>(); + let safekeepers: HashMap = + safekeepers.into_iter().map(|n| (n.get_id(), n)).collect(); + tracing::info!("Loaded {} safekeepers from database.", safekeepers.len()); + tracing::info!("Loading shards from database..."); let mut tenant_shard_persistence = persistence.load_active_tenant_shards().await?; tracing::info!( @@ -1437,7 +1527,14 @@ impl Service { let cancel = CancellationToken::new(); let reconcilers_cancel = cancel.child_token(); - let heartbeater = Heartbeater::new( + let heartbeater_ps = Heartbeater::new( + config.jwt_token.clone(), + config.max_offline_interval, + config.max_warming_up_interval, + cancel.clone(), + ); + + let heartbeater_sk = Heartbeater::new( config.jwt_token.clone(), config.max_offline_interval, config.max_warming_up_interval, @@ -1453,6 +1550,7 @@ impl Service { let this = Arc::new(Self { inner: Arc::new(std::sync::RwLock::new(ServiceState::new( nodes, + safekeepers, tenants, scheduler, delayed_reconcile_rx, @@ -1462,7 +1560,8 @@ impl Service { persistence, compute_hook: Arc::new(ComputeHook::new(config.clone())), result_tx, - heartbeater, + heartbeater_ps, + heartbeater_sk, reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new( config.reconciler_concurrency, )), @@ -7661,29 +7760,54 @@ impl Service { pub(crate) async fn safekeepers_list( &self, ) -> Result, DatabaseError> { - self.persistence - .list_safekeepers() - .await? - .into_iter() - .map(|v| v.as_describe_response()) - .collect::, _>>() + let locked = self.inner.read().unwrap(); + let mut list = locked + .safekeepers + .iter() + .map(|sk| sk.1.describe_response()) + .collect::, _>>()?; + list.sort_by_key(|v| v.id); + Ok(list) } pub(crate) async fn get_safekeeper( &self, id: i64, ) -> Result { - self.persistence - .safekeeper_get(id) - .await - .and_then(|v| v.as_describe_response()) + let locked = self.inner.read().unwrap(); + let sk = locked + .safekeepers + .get(&NodeId(id as u64)) + .ok_or(diesel::result::Error::NotFound)?; + sk.describe_response() } pub(crate) async fn upsert_safekeeper( &self, record: crate::persistence::SafekeeperUpsert, ) -> Result<(), DatabaseError> { - self.persistence.safekeeper_upsert(record).await + let node_id = NodeId(record.id as u64); + self.persistence.safekeeper_upsert(record.clone()).await?; + { + let mut locked = self.inner.write().unwrap(); + let mut safekeepers = (*locked.safekeepers).clone(); + match safekeepers.entry(node_id) { + std::collections::hash_map::Entry::Occupied(mut entry) => { + entry.get_mut().update_from_record(record); + } + std::collections::hash_map::Entry::Vacant(entry) => { + entry.insert(Safekeeper::from_persistence( + crate::persistence::SafekeeperPersistence::from_upsert( + record, + SkSchedulingPolicy::Pause, + ), + CancellationToken::new(), + )); + } + } + locked.safekeepers = Arc::new(safekeepers); + } + Ok(()) } pub(crate) async fn set_safekeeper_scheduling_policy( @@ -7693,7 +7817,20 @@ impl Service { ) -> Result<(), DatabaseError> { self.persistence .set_safekeeper_scheduling_policy(id, scheduling_policy) - .await + .await?; + let node_id = NodeId(id as u64); + // After the change has been persisted successfully, update the in-memory state + { + let mut locked = self.inner.write().unwrap(); + let mut safekeepers = (*locked.safekeepers).clone(); + let sk = safekeepers + .get_mut(&node_id) + .ok_or(DatabaseError::Logical("Not found".to_string()))?; + sk.skp.scheduling_policy = String::from(scheduling_policy); + + locked.safekeepers = Arc::new(safekeepers); + } + Ok(()) } pub(crate) async fn update_shards_preferred_azs(