mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
storcon: add storage controller peer client
This commit is contained in:
@@ -10,6 +10,7 @@ mod id_lock_map;
|
||||
pub mod metrics;
|
||||
mod node;
|
||||
mod pageserver_client;
|
||||
mod peer_client;
|
||||
pub mod persistence;
|
||||
mod reconciler;
|
||||
mod scheduler;
|
||||
|
||||
104
storage_controller/src/peer_client.rs
Normal file
104
storage_controller/src/peer_client.rs
Normal file
@@ -0,0 +1,104 @@
|
||||
use crate::tenant_shard::ObservedState;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use reqwest::{StatusCode, Url};
|
||||
use utils::{backoff, http::error::HttpErrorBody};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct PeerClient {
|
||||
hostname: String,
|
||||
port: i32,
|
||||
jwt: Option<String>,
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum StorageControllerPeerError {
|
||||
#[error("failed to deserialize error response with status code {0} at {1}: {2}")]
|
||||
DeserializationError(StatusCode, Url, reqwest::Error),
|
||||
#[error("storage controller peer API error ({0}): {1}")]
|
||||
ApiError(StatusCode, String),
|
||||
#[error("failed to send HTTP request: {0}")]
|
||||
SendError(reqwest::Error),
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
pub(crate) type Result<T> = std::result::Result<T, StorageControllerPeerError>;
|
||||
|
||||
pub(crate) trait ResponseErrorMessageExt: Sized {
|
||||
fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
|
||||
}
|
||||
|
||||
impl ResponseErrorMessageExt for reqwest::Response {
|
||||
async fn error_from_body(self) -> Result<Self> {
|
||||
let status = self.status();
|
||||
if !(status.is_client_error() || status.is_server_error()) {
|
||||
return Ok(self);
|
||||
}
|
||||
|
||||
let url = self.url().to_owned();
|
||||
Err(match self.json::<HttpErrorBody>().await {
|
||||
Ok(HttpErrorBody { msg }) => StorageControllerPeerError::ApiError(status, msg),
|
||||
Err(err) => StorageControllerPeerError::DeserializationError(status, url, err),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
|
||||
|
||||
impl PeerClient {
|
||||
pub(crate) fn new(hostname: String, port: i32, jwt: Option<String>) -> Self {
|
||||
Self {
|
||||
hostname,
|
||||
port,
|
||||
jwt,
|
||||
client: reqwest::Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn request_step_down(&self) -> Result<GlobalObservedState> {
|
||||
let uri = format!("{}:{}/control/v1/step_down", self.hostname, self.port);
|
||||
let req = self.client.put(uri);
|
||||
let req = if let Some(jwt) = &self.jwt {
|
||||
req.header(reqwest::header::AUTHORIZATION, format!("Bearer {jwt}"))
|
||||
} else {
|
||||
req
|
||||
};
|
||||
|
||||
let res = req
|
||||
.send()
|
||||
.await
|
||||
.map_err(StorageControllerPeerError::SendError)?;
|
||||
let response = res.error_from_body().await?;
|
||||
|
||||
let status = response.status();
|
||||
let url = response.url().to_owned();
|
||||
|
||||
response
|
||||
.json()
|
||||
.await
|
||||
.map_err(|err| StorageControllerPeerError::DeserializationError(status, url, err))
|
||||
}
|
||||
|
||||
pub(crate) async fn step_down(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<GlobalObservedState> {
|
||||
backoff::retry(
|
||||
|| self.request_step_down(),
|
||||
|_e| false,
|
||||
4,
|
||||
8,
|
||||
"Send step down request",
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
.ok_or_else(|| StorageControllerPeerError::Cancelled)
|
||||
.and_then(|x| x)
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,7 @@ use crate::{
|
||||
compute_hook::NotifyError,
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
|
||||
metrics::LeadershipStatusGroup,
|
||||
peer_client::GlobalObservedState,
|
||||
persistence::{AbortShardSplitStatus, MetadataHealthPersistence, TenantFilter},
|
||||
reconciler::{ReconcileError, ReconcileUnits},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
|
||||
@@ -82,7 +83,6 @@ use crate::{
|
||||
ReconcilerWaiter, TenantShard,
|
||||
},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
// For operations that should be quick, like attaching a new tenant
|
||||
const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
@@ -491,10 +491,6 @@ pub(crate) enum ReconcileResultRequest {
|
||||
Stop,
|
||||
}
|
||||
|
||||
// TODO: move this into the storcon peer client when that gets added
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
pub(crate) struct GlobalObservedState(HashMap<TenantShardId, ObservedState>);
|
||||
|
||||
impl Service {
|
||||
pub fn get_config(&self) -> &Config {
|
||||
&self.config
|
||||
|
||||
Reference in New Issue
Block a user