From dd7cafdd97d7371a1b970a7cb66adf0b538d9ca4 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Mon, 29 Jul 2024 13:10:37 +0100 Subject: [PATCH] storcon: add storage controller peer client --- storage_controller/src/lib.rs | 1 + storage_controller/src/peer_client.rs | 104 ++++++++++++++++++++++++++ storage_controller/src/service.rs | 6 +- 3 files changed, 106 insertions(+), 5 deletions(-) create mode 100644 storage_controller/src/peer_client.rs diff --git a/storage_controller/src/lib.rs b/storage_controller/src/lib.rs index 8caf638904..c4d333e07d 100644 --- a/storage_controller/src/lib.rs +++ b/storage_controller/src/lib.rs @@ -10,6 +10,7 @@ mod id_lock_map; pub mod metrics; mod node; mod pageserver_client; +mod peer_client; pub mod persistence; mod reconciler; mod scheduler; diff --git a/storage_controller/src/peer_client.rs b/storage_controller/src/peer_client.rs new file mode 100644 index 0000000000..8cdc970f69 --- /dev/null +++ b/storage_controller/src/peer_client.rs @@ -0,0 +1,104 @@ +use crate::tenant_shard::ObservedState; +use pageserver_api::shard::TenantShardId; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use tokio_util::sync::CancellationToken; + +use reqwest::{StatusCode, Url}; +use utils::{backoff, http::error::HttpErrorBody}; + +#[derive(Debug, Clone)] +pub(crate) struct PeerClient { + hostname: String, + port: i32, + jwt: Option, + client: reqwest::Client, +} + +#[derive(thiserror::Error, Debug)] +pub(crate) enum StorageControllerPeerError { + #[error("failed to deserialize error response with status code {0} at {1}: {2}")] + DeserializationError(StatusCode, Url, reqwest::Error), + #[error("storage controller peer API error ({0}): {1}")] + ApiError(StatusCode, String), + #[error("failed to send HTTP request: {0}")] + SendError(reqwest::Error), + #[error("Cancelled")] + Cancelled, +} + +pub(crate) type Result = std::result::Result; + +pub(crate) trait ResponseErrorMessageExt: Sized { + fn error_from_body(self) -> impl std::future::Future> + Send; +} + +impl ResponseErrorMessageExt for reqwest::Response { + async fn error_from_body(self) -> Result { + let status = self.status(); + if !(status.is_client_error() || status.is_server_error()) { + return Ok(self); + } + + let url = self.url().to_owned(); + Err(match self.json::().await { + Ok(HttpErrorBody { msg }) => StorageControllerPeerError::ApiError(status, msg), + Err(err) => StorageControllerPeerError::DeserializationError(status, url, err), + }) + } +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub(crate) struct GlobalObservedState(pub(crate) HashMap); + +impl PeerClient { + pub(crate) fn new(hostname: String, port: i32, jwt: Option) -> Self { + Self { + hostname, + port, + jwt, + client: reqwest::Client::new(), + } + } + + async fn request_step_down(&self) -> Result { + let uri = format!("{}:{}/control/v1/step_down", self.hostname, self.port); + let req = self.client.put(uri); + let req = if let Some(jwt) = &self.jwt { + req.header(reqwest::header::AUTHORIZATION, format!("Bearer {jwt}")) + } else { + req + }; + + let res = req + .send() + .await + .map_err(StorageControllerPeerError::SendError)?; + let response = res.error_from_body().await?; + + let status = response.status(); + let url = response.url().to_owned(); + + response + .json() + .await + .map_err(|err| StorageControllerPeerError::DeserializationError(status, url, err)) + } + + pub(crate) async fn step_down( + &self, + cancel: &CancellationToken, + ) -> Result { + backoff::retry( + || self.request_step_down(), + |_e| false, + 4, + 8, + "Send step down request", + cancel, + ) + .await + .ok_or_else(|| StorageControllerPeerError::Cancelled) + .and_then(|x| x) + } +} diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 050fbd537b..307e08e444 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -16,6 +16,7 @@ use crate::{ compute_hook::NotifyError, id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard}, metrics::LeadershipStatusGroup, + peer_client::GlobalObservedState, persistence::{AbortShardSplitStatus, MetadataHealthPersistence, TenantFilter}, reconciler::{ReconcileError, ReconcileUnits}, scheduler::{MaySchedule, ScheduleContext, ScheduleMode}, @@ -82,7 +83,6 @@ use crate::{ ReconcilerWaiter, TenantShard, }, }; -use serde::{Deserialize, Serialize}; // For operations that should be quick, like attaching a new tenant const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5); @@ -491,10 +491,6 @@ pub(crate) enum ReconcileResultRequest { Stop, } -// TODO: move this into the storcon peer client when that gets added -#[derive(Serialize, Deserialize, Debug, Default)] -pub(crate) struct GlobalObservedState(HashMap); - impl Service { pub fn get_config(&self) -> &Config { &self.config