mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 14:32:57 +00:00
## Problem
We saw the following scenario in staging:
1. Pod A starts up. Becomes leader and steps down the previous pod
cleanly.
2. Pod B starts up (deployment).
3. Step down request from pod B to pod A times out. Pod A did not manage
to stop its reconciliations within 10 seconds and exited with return
code 1
([code](7ba8519b43/storage_controller/src/service.rs (L8686-L8702))).
4. Pod B marks itself as the leader and finishes start-up
5. k8s restarts pod A
6. k8s marks pod B as ready
7. pod A sends step down request to pod A - this succeeds => pod A is
now the leader
8. k8s kills pod A because it thinks pod B is healthy and pod A is part
of the old replica set
We end up in a situation where the only pod we have (B) is stepped down
and attempts to forward requests to a leader that doesn't exist. k8s
can't detect that pod B is in a bad state since the /status endpoint
simply returns 200 hundred if the pod is running.
## Summary of changes
This PR includes a number of robustness improvements to the leadership
protocol:
* use a single step down task per controller
* add a new endpoint to be used as k8s liveness probe and check
leadership status there
* handle restarts explicitly (i.e. don't step yourself down)
* increase the step down retry count
* don't kill the process on long step down since k8s will just restart
it
118 lines
3.6 KiB
Rust
118 lines
3.6 KiB
Rust
use std::collections::HashMap;
|
|
use std::error::Error as _;
|
|
use std::time::Duration;
|
|
|
|
use http_utils::error::HttpErrorBody;
|
|
use hyper::Uri;
|
|
use pageserver_api::shard::TenantShardId;
|
|
use reqwest::{StatusCode, Url};
|
|
use serde::{Deserialize, Serialize};
|
|
use tokio_util::sync::CancellationToken;
|
|
use utils::backoff;
|
|
|
|
use crate::tenant_shard::ObservedState;
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub(crate) struct PeerClient {
|
|
uri: Uri,
|
|
jwt: Option<String>,
|
|
client: reqwest::Client,
|
|
}
|
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
pub(crate) enum StorageControllerPeerError {
|
|
#[error(
|
|
"failed to deserialize error response with status code {0} at {1}: {2}{}",
|
|
.2.source().map(|e| format!(": {e}")).unwrap_or_default()
|
|
)]
|
|
DeserializationError(StatusCode, Url, reqwest::Error),
|
|
#[error("storage controller peer API error ({0}): {1}")]
|
|
ApiError(StatusCode, String),
|
|
#[error("failed to send HTTP request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
|
|
SendError(reqwest::Error),
|
|
#[error("Cancelled")]
|
|
Cancelled,
|
|
}
|
|
|
|
pub(crate) type Result<T> = std::result::Result<T, StorageControllerPeerError>;
|
|
|
|
pub(crate) trait ResponseErrorMessageExt: Sized {
|
|
fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
|
|
}
|
|
|
|
impl ResponseErrorMessageExt for reqwest::Response {
|
|
async fn error_from_body(self) -> Result<Self> {
|
|
let status = self.status();
|
|
if !(status.is_client_error() || status.is_server_error()) {
|
|
return Ok(self);
|
|
}
|
|
|
|
let url = self.url().to_owned();
|
|
Err(match self.json::<HttpErrorBody>().await {
|
|
Ok(HttpErrorBody { msg }) => StorageControllerPeerError::ApiError(status, msg),
|
|
Err(err) => StorageControllerPeerError::DeserializationError(status, url, err),
|
|
})
|
|
}
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
|
|
pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
|
|
|
|
const STEP_DOWN_RETRIES: u32 = 8;
|
|
const STEP_DOWN_TIMEOUT: Duration = Duration::from_secs(1);
|
|
|
|
impl PeerClient {
|
|
pub(crate) fn new(http_client: reqwest::Client, uri: Uri, jwt: Option<String>) -> Self {
|
|
Self {
|
|
uri,
|
|
jwt,
|
|
client: http_client,
|
|
}
|
|
}
|
|
|
|
async fn request_step_down(&self) -> Result<GlobalObservedState> {
|
|
let step_down_path = format!("{}control/v1/step_down", self.uri);
|
|
let req = self.client.put(step_down_path);
|
|
let req = if let Some(jwt) = &self.jwt {
|
|
req.header(reqwest::header::AUTHORIZATION, format!("Bearer {jwt}"))
|
|
} else {
|
|
req
|
|
};
|
|
|
|
let req = req.timeout(STEP_DOWN_TIMEOUT);
|
|
|
|
let res = req
|
|
.send()
|
|
.await
|
|
.map_err(StorageControllerPeerError::SendError)?;
|
|
let response = res.error_from_body().await?;
|
|
|
|
let status = response.status();
|
|
let url = response.url().to_owned();
|
|
|
|
response
|
|
.json()
|
|
.await
|
|
.map_err(|err| StorageControllerPeerError::DeserializationError(status, url, err))
|
|
}
|
|
|
|
/// Request the peer to step down and return its current observed state
|
|
/// All errors are re-tried
|
|
pub(crate) async fn step_down(
|
|
&self,
|
|
cancel: &CancellationToken,
|
|
) -> Result<GlobalObservedState> {
|
|
backoff::retry(
|
|
|| self.request_step_down(),
|
|
|_e| false,
|
|
2,
|
|
STEP_DOWN_RETRIES,
|
|
"Send step down request",
|
|
cancel,
|
|
)
|
|
.await
|
|
.ok_or_else(|| StorageControllerPeerError::Cancelled)
|
|
.and_then(|x| x)
|
|
}
|
|
}
|