storcon: track safekeepers in memory, send heartbeats to them (#10583)

In #9011, we want to schedule timelines to safekeepers. In order to do
such scheduling, we need information about how utilized a safekeeper is
and if it's available or not.

Therefore, send constant heartbeats to the safekeepers and try to figure
out if they are online or not.

Includes some code from #10440.
This commit is contained in:
Arpad Müller
2025-02-13 12:06:30 +01:00
committed by GitHub
parent b8095f84a0
commit 536bdb3209
10 changed files with 613 additions and 65 deletions

2
Cargo.lock generated
View File

@@ -6464,6 +6464,8 @@ dependencies = [
"routerify",
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"safekeeper_api",
"safekeeper_client",
"scoped-futures",
"scopeguard",
"serde",

View File

@@ -5,7 +5,7 @@
use http_utils::error::HttpErrorBody;
use reqwest::{IntoUrl, Method, StatusCode};
use safekeeper_api::models::{TimelineCreateRequest, TimelineStatus};
use safekeeper_api::models::{SafekeeperUtilization, TimelineCreateRequest, TimelineStatus};
use std::error::Error as _;
use utils::{
id::{NodeId, TenantId, TimelineId},
@@ -32,6 +32,9 @@ pub enum Error {
/// Status is not ok; parsed error in body as `HttpErrorBody`.
#[error("safekeeper API: {1}")]
ApiError(StatusCode, String),
#[error("Cancelled")]
Cancelled,
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -124,9 +127,10 @@ impl Client {
self.get(&uri).await
}
pub async fn utilization(&self) -> Result<reqwest::Response> {
pub async fn utilization(&self) -> Result<SafekeeperUtilization> {
let uri = format!("{}/v1/utilization/", self.mgmt_api_endpoint);
self.get(&uri).await
let resp = self.get(&uri).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
async fn post<B: serde::Serialize, U: IntoUrl>(

View File

@@ -32,6 +32,8 @@ postgres_connection.workspace = true
rand.workspace = true
reqwest = { workspace = true, features = ["stream"] }
routerify.workspace = true
safekeeper_api.workspace = true
safekeeper_client.workspace = true
rustls-native-certs.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -1,6 +1,10 @@
use futures::{stream::FuturesUnordered, StreamExt};
use safekeeper_api::models::SafekeeperUtilization;
use safekeeper_client::mgmt_api;
use std::{
collections::HashMap,
fmt::Debug,
future::Future,
sync::Arc,
time::{Duration, Instant},
};
@@ -9,15 +13,15 @@ use tokio_util::sync::CancellationToken;
use pageserver_api::{controller_api::NodeAvailability, models::PageserverUtilization};
use thiserror::Error;
use utils::id::NodeId;
use utils::{id::NodeId, logging::SecretString};
use crate::node::Node;
use crate::{node::Node, safekeeper::Safekeeper};
struct HeartbeaterTask {
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
struct HeartbeaterTask<Server, State> {
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
cancel: CancellationToken,
state: HashMap<NodeId, PageserverState>,
state: HashMap<NodeId, State>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
@@ -36,8 +40,17 @@ pub(crate) enum PageserverState {
Offline,
}
#[derive(Debug, Clone)]
pub(crate) enum SafekeeperState {
Available {
last_seen_at: Instant,
utilization: SafekeeperUtilization,
},
Offline,
}
#[derive(Debug)]
pub(crate) struct AvailablityDeltas(pub Vec<(NodeId, PageserverState)>);
pub(crate) struct AvailablityDeltas<State>(pub Vec<(NodeId, State)>);
#[derive(Debug, Error)]
pub(crate) enum HeartbeaterError {
@@ -45,23 +58,28 @@ pub(crate) enum HeartbeaterError {
Cancel,
}
struct HeartbeatRequest {
pageservers: Arc<HashMap<NodeId, Node>>,
reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas, HeartbeaterError>>,
struct HeartbeatRequest<Server, State> {
servers: Arc<HashMap<NodeId, Server>>,
reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas<State>, HeartbeaterError>>,
}
pub(crate) struct Heartbeater {
sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest>,
pub(crate) struct Heartbeater<Server, State> {
sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest<Server, State>>,
}
impl Heartbeater {
#[allow(private_bounds)]
impl<Server: Send + Sync + 'static, State: Debug + Send + 'static> Heartbeater<Server, State>
where
HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
{
pub(crate) fn new(
jwt_token: Option<String>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
cancel: CancellationToken,
) -> Self {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest>();
let (sender, receiver) =
tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest<Server, State>>();
let mut heartbeater = HeartbeaterTask::new(
receiver,
jwt_token,
@@ -76,12 +94,12 @@ impl Heartbeater {
pub(crate) async fn heartbeat(
&self,
pageservers: Arc<HashMap<NodeId, Node>>,
) -> Result<AvailablityDeltas, HeartbeaterError> {
servers: Arc<HashMap<NodeId, Server>>,
) -> Result<AvailablityDeltas<State>, HeartbeaterError> {
let (sender, receiver) = tokio::sync::oneshot::channel();
self.sender
.send(HeartbeatRequest {
pageservers,
servers,
reply: sender,
})
.map_err(|_| HeartbeaterError::Cancel)?;
@@ -93,9 +111,12 @@ impl Heartbeater {
}
}
impl HeartbeaterTask {
impl<Server, State: Debug> HeartbeaterTask<Server, State>
where
HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
{
fn new(
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
jwt_token: Option<String>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
@@ -110,14 +131,13 @@ impl HeartbeaterTask {
jwt_token,
}
}
async fn run(&mut self) {
loop {
tokio::select! {
request = self.receiver.recv() => {
match request {
Some(req) => {
let res = self.heartbeat(req.pageservers).await;
let res = self.heartbeat(req.servers).await;
req.reply.send(res).unwrap();
},
None => { return; }
@@ -127,11 +147,20 @@ impl HeartbeaterTask {
}
}
}
}
pub(crate) trait HeartBeat<Server, State> {
fn heartbeat(
&mut self,
pageservers: Arc<HashMap<NodeId, Server>>,
) -> impl Future<Output = Result<AvailablityDeltas<State>, HeartbeaterError>> + Send;
}
impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState> {
async fn heartbeat(
&mut self,
pageservers: Arc<HashMap<NodeId, Node>>,
) -> Result<AvailablityDeltas, HeartbeaterError> {
) -> Result<AvailablityDeltas<PageserverState>, HeartbeaterError> {
let mut new_state = HashMap::new();
let mut heartbeat_futs = FuturesUnordered::new();
@@ -272,3 +301,121 @@ impl HeartbeaterTask {
Ok(AvailablityDeltas(deltas))
}
}
impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, SafekeeperState> {
async fn heartbeat(
&mut self,
safekeepers: Arc<HashMap<NodeId, Safekeeper>>,
) -> Result<AvailablityDeltas<SafekeeperState>, HeartbeaterError> {
let mut new_state = HashMap::new();
let mut heartbeat_futs = FuturesUnordered::new();
for (node_id, sk) in &*safekeepers {
heartbeat_futs.push({
let jwt_token = self
.jwt_token
.as_ref()
.map(|t| SecretString::from(t.to_owned()));
let cancel = self.cancel.clone();
async move {
let response = sk
.with_client_retries(
|client| async move { client.get_utilization().await },
&jwt_token,
3,
3,
Duration::from_secs(1),
&cancel,
)
.await;
let status = match response {
Ok(utilization) => SafekeeperState::Available {
last_seen_at: Instant::now(),
utilization,
},
Err(mgmt_api::Error::Cancelled) => {
// This indicates cancellation of the request.
// We ignore the node in this case.
return None;
}
Err(_) => SafekeeperState::Offline,
};
Some((*node_id, status))
}
});
loop {
let maybe_status = tokio::select! {
next = heartbeat_futs.next() => {
match next {
Some(result) => result,
None => { break; }
}
},
_ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
};
if let Some((node_id, status)) = maybe_status {
new_state.insert(node_id, status);
}
}
}
let mut offline = 0;
for state in new_state.values() {
match state {
SafekeeperState::Offline { .. } => offline += 1,
SafekeeperState::Available { .. } => {}
}
}
tracing::info!(
"Heartbeat round complete for {} safekeepers, {} offline",
new_state.len(),
offline
);
let mut deltas = Vec::new();
let now = Instant::now();
for (node_id, sk_state) in new_state.iter_mut() {
use std::collections::hash_map::Entry::*;
let entry = self.state.entry(*node_id);
let mut needs_update = false;
match entry {
Occupied(ref occ) => match (occ.get(), &sk_state) {
(SafekeeperState::Offline, SafekeeperState::Offline) => {}
(SafekeeperState::Available { last_seen_at, .. }, SafekeeperState::Offline) => {
if now - *last_seen_at >= self.max_offline_interval {
deltas.push((*node_id, sk_state.clone()));
needs_update = true;
}
}
_ => {
deltas.push((*node_id, sk_state.clone()));
needs_update = true;
}
},
Vacant(_) => {
// This is a new node. Don't generate a delta for it.
deltas.push((*node_id, sk_state.clone()));
}
}
match entry {
Occupied(mut occ) if needs_update => {
(*occ.get_mut()) = sk_state.clone();
}
Vacant(vac) => {
vac.insert(sk_state.clone());
}
_ => {}
}
}
Ok(AvailablityDeltas(deltas))
}
}

View File

@@ -17,6 +17,8 @@ mod pageserver_client;
mod peer_client;
pub mod persistence;
mod reconciler;
mod safekeeper;
mod safekeeper_client;
mod scheduler;
mod schema;
pub mod service;

View File

@@ -80,6 +80,11 @@ pub(crate) struct StorageControllerMetricGroup {
pub(crate) storage_controller_pageserver_request_error:
measured::CounterVec<PageserverRequestLabelGroupSet>,
/// Count of HTTP requests to the safekeeper that resulted in an error,
/// broken down by the safekeeper node id, request name and method
pub(crate) storage_controller_safekeeper_request_error:
measured::CounterVec<PageserverRequestLabelGroupSet>,
/// Latency of HTTP requests to the pageserver, broken down by pageserver
/// node id, request name and method. This include both successful and unsuccessful
/// requests.
@@ -87,6 +92,13 @@ pub(crate) struct StorageControllerMetricGroup {
pub(crate) storage_controller_pageserver_request_latency:
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
/// Latency of HTTP requests to the safekeeper, broken down by safekeeper
/// node id, request name and method. This include both successful and unsuccessful
/// requests.
#[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))]
pub(crate) storage_controller_safekeeper_request_latency:
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
/// Count of pass-through HTTP requests to the pageserver that resulted in an error,
/// broken down by the pageserver node id, request name and method
pub(crate) storage_controller_passthrough_request_error:

View File

@@ -1185,23 +1185,6 @@ impl Persistence {
Ok(safekeepers)
}
pub(crate) async fn safekeeper_get(
&self,
id: i64,
) -> Result<SafekeeperPersistence, DatabaseError> {
use crate::schema::safekeepers::dsl::{id as id_column, safekeepers};
self.with_conn(move |conn| {
Box::pin(async move {
Ok(safekeepers
.filter(id_column.eq(&id))
.select(SafekeeperPersistence::as_select())
.get_result(conn)
.await?)
})
})
.await
}
pub(crate) async fn safekeeper_upsert(
&self,
record: SafekeeperUpsert,
@@ -1554,6 +1537,21 @@ pub(crate) struct SafekeeperPersistence {
}
impl SafekeeperPersistence {
pub(crate) fn from_upsert(
upsert: SafekeeperUpsert,
scheduling_policy: SkSchedulingPolicy,
) -> Self {
crate::persistence::SafekeeperPersistence {
id: upsert.id,
region_id: upsert.region_id,
version: upsert.version,
host: upsert.host,
port: upsert.port,
http_port: upsert.http_port,
availability_zone_id: upsert.availability_zone_id,
scheduling_policy: String::from(scheduling_policy),
}
}
pub(crate) fn as_describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
let scheduling_policy =
SkSchedulingPolicy::from_str(&self.scheduling_policy).map_err(|e| {

View File

@@ -0,0 +1,139 @@
use std::{str::FromStr, time::Duration};
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
use reqwest::StatusCode;
use safekeeper_client::mgmt_api;
use tokio_util::sync::CancellationToken;
use utils::{backoff, id::NodeId, logging::SecretString};
use crate::{
heartbeater::SafekeeperState,
persistence::{DatabaseError, SafekeeperPersistence},
safekeeper_client::SafekeeperClient,
};
#[derive(Clone)]
pub struct Safekeeper {
pub(crate) skp: SafekeeperPersistence,
cancel: CancellationToken,
listen_http_addr: String,
listen_http_port: u16,
id: NodeId,
availability: SafekeeperState,
}
impl Safekeeper {
pub(crate) fn from_persistence(skp: SafekeeperPersistence, cancel: CancellationToken) -> Self {
Self {
cancel,
listen_http_addr: skp.host.clone(),
listen_http_port: skp.http_port as u16,
id: NodeId(skp.id as u64),
skp,
availability: SafekeeperState::Offline,
}
}
pub(crate) fn base_url(&self) -> String {
format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
}
pub(crate) fn get_id(&self) -> NodeId {
self.id
}
pub(crate) fn describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
self.skp.as_describe_response()
}
pub(crate) fn set_availability(&mut self, availability: SafekeeperState) {
self.availability = availability;
}
/// Perform an operation (which is given a [`SafekeeperClient`]) with retries
pub(crate) async fn with_client_retries<T, O, F>(
&self,
mut op: O,
jwt: &Option<SecretString>,
warn_threshold: u32,
max_retries: u32,
timeout: Duration,
cancel: &CancellationToken,
) -> mgmt_api::Result<T>
where
O: FnMut(SafekeeperClient) -> F,
F: std::future::Future<Output = mgmt_api::Result<T>>,
{
fn is_fatal(e: &mgmt_api::Error) -> bool {
use mgmt_api::Error::*;
match e {
ReceiveBody(_) | ReceiveErrorBody(_) => false,
ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
| ApiError(StatusCode::GATEWAY_TIMEOUT, _)
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
ApiError(_, _) => true,
Cancelled => true,
}
}
backoff::retry(
|| {
let http_client = reqwest::ClientBuilder::new()
.timeout(timeout)
.build()
.expect("Failed to construct HTTP client");
let client = SafekeeperClient::from_client(
self.get_id(),
http_client,
self.base_url(),
jwt.clone(),
);
let node_cancel_fut = self.cancel.cancelled();
let op_fut = op(client);
async {
tokio::select! {
r = op_fut=> {r},
_ = node_cancel_fut => {
Err(mgmt_api::Error::Cancelled)
}}
}
},
is_fatal,
warn_threshold,
max_retries,
&format!(
"Call to node {} ({}:{}) management API",
self.id, self.listen_http_addr, self.listen_http_port
),
cancel,
)
.await
.unwrap_or(Err(mgmt_api::Error::Cancelled))
}
pub(crate) fn update_from_record(&mut self, record: crate::persistence::SafekeeperUpsert) {
let crate::persistence::SafekeeperUpsert {
active: _,
availability_zone_id: _,
host,
http_port,
id,
port: _,
region_id: _,
version: _,
} = record.clone();
if id != self.id.0 as i64 {
// The way the function is called ensures this. If we regress on that, it's a bug.
panic!(
"id can't be changed via update_from_record function: {id} != {}",
self.id.0
);
}
self.skp = crate::persistence::SafekeeperPersistence::from_upsert(
record,
SkSchedulingPolicy::from_str(&self.skp.scheduling_policy).unwrap(),
);
self.listen_http_port = http_port as u16;
self.listen_http_addr = host;
}
}

View File

@@ -0,0 +1,105 @@
use crate::metrics::PageserverRequestLabelGroup;
use safekeeper_api::models::{SafekeeperUtilization, TimelineCreateRequest, TimelineStatus};
use safekeeper_client::mgmt_api::{Client, Result};
use utils::{
id::{NodeId, TenantId, TimelineId},
logging::SecretString,
};
/// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage
/// controller to collect metrics in a non-intrusive manner.
///
/// Analogous to [`crate::pageserver_client::PageserverClient`].
#[derive(Debug, Clone)]
pub(crate) struct SafekeeperClient {
inner: Client,
node_id_label: String,
}
macro_rules! measured_request {
($name:literal, $method:expr, $node_id: expr, $invoke:expr) => {{
let labels = PageserverRequestLabelGroup {
pageserver_id: $node_id,
path: $name,
method: $method,
};
let latency = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_safekeeper_request_latency;
let _timer_guard = latency.start_timer(labels.clone());
let res = $invoke;
if res.is_err() {
let error_counters = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_pageserver_request_error;
error_counters.inc(labels)
}
res
}};
}
impl SafekeeperClient {
#[allow(dead_code)]
pub(crate) fn new(
node_id: NodeId,
mgmt_api_endpoint: String,
jwt: Option<SecretString>,
) -> Self {
Self {
inner: Client::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}
pub(crate) fn from_client(
node_id: NodeId,
raw_client: reqwest::Client,
mgmt_api_endpoint: String,
jwt: Option<SecretString>,
) -> Self {
Self {
inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}
#[allow(dead_code)]
pub(crate) async fn create_timeline(
&self,
req: &TimelineCreateRequest,
) -> Result<TimelineStatus> {
measured_request!(
"create_timeline",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.create_timeline(req).await
)
}
#[allow(dead_code)]
pub(crate) async fn delete_timeline(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<TimelineStatus> {
measured_request!(
"delete_timeline",
crate::metrics::Method::Delete,
&self.node_id_label,
self.inner.delete_timeline(tenant_id, timeline_id).await
)
}
pub(crate) async fn get_utilization(&self) -> Result<SafekeeperUtilization> {
measured_request!(
"utilization",
crate::metrics::Method::Get,
&self.node_id_label,
self.inner.utilization().await
)
}
}

View File

@@ -2,6 +2,7 @@ pub mod chaos_injector;
mod context_iterator;
use hyper::Uri;
use safekeeper_api::models::SafekeeperUtilization;
use std::{
borrow::Cow,
cmp::Ordering,
@@ -20,6 +21,7 @@ use crate::{
},
compute_hook::{self, NotifyError},
drain_utils::{self, TenantShardDrain, TenantShardIterator},
heartbeater::SafekeeperState,
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
leadership::Leadership,
metrics,
@@ -29,6 +31,7 @@ use crate::{
ShardGenerationState, TenantFilter,
},
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
safekeeper::Safekeeper,
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
tenant_shard::{
MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
@@ -206,6 +209,8 @@ struct ServiceState {
nodes: Arc<HashMap<NodeId, Node>>,
safekeepers: Arc<HashMap<NodeId, Safekeeper>>,
scheduler: Scheduler,
/// Ongoing background operation on the cluster if any is running.
@@ -272,6 +277,7 @@ fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError {
impl ServiceState {
fn new(
nodes: HashMap<NodeId, Node>,
safekeepers: HashMap<NodeId, Safekeeper>,
tenants: BTreeMap<TenantShardId, TenantShard>,
scheduler: Scheduler,
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
@@ -283,6 +289,7 @@ impl ServiceState {
leadership_status: initial_leadership_status,
tenants,
nodes: Arc::new(nodes),
safekeepers: Arc::new(safekeepers),
scheduler,
ongoing_operation: None,
delayed_reconcile_rx,
@@ -299,6 +306,23 @@ impl ServiceState {
(&mut self.nodes, &mut self.tenants, &mut self.scheduler)
}
#[allow(clippy::type_complexity)]
fn parts_mut_sk(
&mut self,
) -> (
&mut Arc<HashMap<NodeId, Node>>,
&mut Arc<HashMap<NodeId, Safekeeper>>,
&mut BTreeMap<TenantShardId, TenantShard>,
&mut Scheduler,
) {
(
&mut self.nodes,
&mut self.safekeepers,
&mut self.tenants,
&mut self.scheduler,
)
}
fn get_leadership_status(&self) -> LeadershipStatus {
self.leadership_status
}
@@ -397,7 +421,8 @@ pub struct Service {
compute_hook: Arc<ComputeHook>,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
heartbeater: Heartbeater,
heartbeater_ps: Heartbeater<Node, PageserverState>,
heartbeater_sk: Heartbeater<Safekeeper, SafekeeperState>,
// Channel for background cleanup from failed operations that require cleanup, such as shard split
abort_tx: tokio::sync::mpsc::UnboundedSender<TenantShardSplitAbort>,
@@ -607,7 +632,8 @@ impl Service {
let locked = self.inner.read().unwrap();
locked.nodes.clone()
};
let mut nodes_online = self.initial_heartbeat_round(all_nodes.keys()).await;
let (mut nodes_online, mut sks_online) =
self.initial_heartbeat_round(all_nodes.keys()).await;
// List of tenants for which we will attempt to notify compute of their location at startup
let mut compute_notifications = Vec::new();
@@ -616,7 +642,7 @@ impl Service {
tracing::info!("Populating tenant shards' states from initial pageserver scan...");
let shard_count = {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let (nodes, safekeepers, tenants, scheduler) = locked.parts_mut_sk();
// Mark nodes online if they responded to us: nodes are offline by default after a restart.
let mut new_nodes = (**nodes).clone();
@@ -628,6 +654,17 @@ impl Service {
}
*nodes = Arc::new(new_nodes);
let mut new_sks = (**safekeepers).clone();
for (node_id, node) in new_sks.iter_mut() {
if let Some((utilization, last_seen_at)) = sks_online.remove(node_id) {
node.set_availability(SafekeeperState::Available {
utilization,
last_seen_at,
});
}
}
*safekeepers = Arc::new(new_sks);
for (tenant_shard_id, observed_state) in observed.0 {
let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else {
for node_id in observed_state.locations.keys() {
@@ -736,7 +773,10 @@ impl Service {
async fn initial_heartbeat_round<'a>(
&self,
node_ids: impl Iterator<Item = &'a NodeId>,
) -> HashMap<NodeId, PageserverUtilization> {
) -> (
HashMap<NodeId, PageserverUtilization>,
HashMap<NodeId, (SafekeeperUtilization, Instant)>,
) {
assert!(!self.startup_complete.is_ready());
let all_nodes = {
@@ -756,14 +796,20 @@ impl Service {
}
}
let all_sks = {
let locked = self.inner.read().unwrap();
locked.safekeepers.clone()
};
tracing::info!("Sending initial heartbeats...");
let res = self
.heartbeater
let res_ps = self
.heartbeater_ps
.heartbeat(Arc::new(nodes_to_heartbeat))
.await;
let res_sk = self.heartbeater_sk.heartbeat(all_sks).await;
let mut online_nodes = HashMap::new();
if let Ok(deltas) = res {
if let Ok(deltas) = res_ps {
for (node_id, status) in deltas.0 {
match status {
PageserverState::Available { utilization, .. } => {
@@ -777,7 +823,22 @@ impl Service {
}
}
online_nodes
let mut online_sks = HashMap::new();
if let Ok(deltas) = res_sk {
for (node_id, status) in deltas.0 {
match status {
SafekeeperState::Available {
utilization,
last_seen_at,
} => {
online_sks.insert(node_id, (utilization, last_seen_at));
}
SafekeeperState::Offline => {}
}
}
}
(online_nodes, online_sks)
}
/// Used during [`Self::startup_reconcile`]: issue GETs to all nodes concurrently, with a deadline.
@@ -984,8 +1045,14 @@ impl Service {
locked.nodes.clone()
};
let res = self.heartbeater.heartbeat(nodes).await;
if let Ok(deltas) = res {
let safekeepers = {
let locked = self.inner.read().unwrap();
locked.safekeepers.clone()
};
let res_ps = self.heartbeater_ps.heartbeat(nodes).await;
let res_sk = self.heartbeater_sk.heartbeat(safekeepers).await;
if let Ok(deltas) = res_ps {
let mut to_handle = Vec::default();
for (node_id, state) in deltas.0 {
@@ -1086,6 +1153,18 @@ impl Service {
}
}
}
if let Ok(deltas) = res_sk {
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
for (id, state) in deltas.0 {
let Some(sk) = safekeepers.get_mut(&id) else {
tracing::info!("Couldn't update safekeeper safekeeper state for id {id} from heartbeat={state:?}");
continue;
};
sk.set_availability(state);
}
locked.safekeepers = Arc::new(safekeepers);
}
}
}
@@ -1311,6 +1390,17 @@ impl Service {
.storage_controller_pageserver_nodes
.set(nodes.len() as i64);
tracing::info!("Loading safekeepers from database...");
let safekeepers = persistence
.list_safekeepers()
.await?
.into_iter()
.map(|skp| Safekeeper::from_persistence(skp, CancellationToken::new()))
.collect::<Vec<_>>();
let safekeepers: HashMap<NodeId, Safekeeper> =
safekeepers.into_iter().map(|n| (n.get_id(), n)).collect();
tracing::info!("Loaded {} safekeepers from database.", safekeepers.len());
tracing::info!("Loading shards from database...");
let mut tenant_shard_persistence = persistence.load_active_tenant_shards().await?;
tracing::info!(
@@ -1437,7 +1527,14 @@ impl Service {
let cancel = CancellationToken::new();
let reconcilers_cancel = cancel.child_token();
let heartbeater = Heartbeater::new(
let heartbeater_ps = Heartbeater::new(
config.jwt_token.clone(),
config.max_offline_interval,
config.max_warming_up_interval,
cancel.clone(),
);
let heartbeater_sk = Heartbeater::new(
config.jwt_token.clone(),
config.max_offline_interval,
config.max_warming_up_interval,
@@ -1453,6 +1550,7 @@ impl Service {
let this = Arc::new(Self {
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
nodes,
safekeepers,
tenants,
scheduler,
delayed_reconcile_rx,
@@ -1462,7 +1560,8 @@ impl Service {
persistence,
compute_hook: Arc::new(ComputeHook::new(config.clone())),
result_tx,
heartbeater,
heartbeater_ps,
heartbeater_sk,
reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(
config.reconciler_concurrency,
)),
@@ -7661,29 +7760,54 @@ impl Service {
pub(crate) async fn safekeepers_list(
&self,
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
self.persistence
.list_safekeepers()
.await?
.into_iter()
.map(|v| v.as_describe_response())
.collect::<Result<Vec<_>, _>>()
let locked = self.inner.read().unwrap();
let mut list = locked
.safekeepers
.iter()
.map(|sk| sk.1.describe_response())
.collect::<Result<Vec<_>, _>>()?;
list.sort_by_key(|v| v.id);
Ok(list)
}
pub(crate) async fn get_safekeeper(
&self,
id: i64,
) -> Result<SafekeeperDescribeResponse, DatabaseError> {
self.persistence
.safekeeper_get(id)
.await
.and_then(|v| v.as_describe_response())
let locked = self.inner.read().unwrap();
let sk = locked
.safekeepers
.get(&NodeId(id as u64))
.ok_or(diesel::result::Error::NotFound)?;
sk.describe_response()
}
pub(crate) async fn upsert_safekeeper(
&self,
record: crate::persistence::SafekeeperUpsert,
) -> Result<(), DatabaseError> {
self.persistence.safekeeper_upsert(record).await
let node_id = NodeId(record.id as u64);
self.persistence.safekeeper_upsert(record.clone()).await?;
{
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
match safekeepers.entry(node_id) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().update_from_record(record);
}
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(Safekeeper::from_persistence(
crate::persistence::SafekeeperPersistence::from_upsert(
record,
SkSchedulingPolicy::Pause,
),
CancellationToken::new(),
));
}
}
locked.safekeepers = Arc::new(safekeepers);
}
Ok(())
}
pub(crate) async fn set_safekeeper_scheduling_policy(
@@ -7693,7 +7817,20 @@ impl Service {
) -> Result<(), DatabaseError> {
self.persistence
.set_safekeeper_scheduling_policy(id, scheduling_policy)
.await
.await?;
let node_id = NodeId(id as u64);
// After the change has been persisted successfully, update the in-memory state
{
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
let sk = safekeepers
.get_mut(&node_id)
.ok_or(DatabaseError::Logical("Not found".to_string()))?;
sk.skp.scheduling_policy = String::from(scheduling_policy);
locked.safekeepers = Arc::new(safekeepers);
}
Ok(())
}
pub(crate) async fn update_shards_preferred_azs(