From 145685201a05d13aac514cf2e6539f77d9ca0840 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 4 Sep 2023 17:38:16 +0100 Subject: [PATCH] pageserver: add validate to control plane client --- pageserver/src/control_plane_client.rs | 141 +++++++++++++++---------- 1 file changed, 87 insertions(+), 54 deletions(-) diff --git a/pageserver/src/control_plane_client.rs b/pageserver/src/control_plane_client.rs index 192eb16789..7c91dd018f 100644 --- a/pageserver/src/control_plane_client.rs +++ b/pageserver/src/control_plane_client.rs @@ -1,7 +1,9 @@ use std::collections::HashMap; -use hyper::StatusCode; -use pageserver_api::control_api::{ReAttachRequest, ReAttachResponse}; +use pageserver_api::control_api::{ + ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse, +}; +use serde::{de::DeserializeOwned, Serialize}; use tokio_util::sync::CancellationToken; use url::Url; use utils::{ @@ -12,12 +14,6 @@ use utils::{ use crate::config::PageServerConf; -// Backoffs when control plane requests do not succeed: compromise between reducing load -// on control plane, and retrying frequently when we are blocked on a control plane -// response to make progress. -const BACKOFF_INCREMENT: f64 = 0.1; -const BACKOFF_MAX: f64 = 10.0; - /// The Pageserver's client for using the control plane API: this is a small subset /// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md) pub(crate) struct ControlPlaneClient { @@ -54,22 +50,50 @@ impl ControlPlaneClient { }) } - async fn try_re_attach( - &self, - url: Url, - request: &ReAttachRequest, - ) -> anyhow::Result { - match self.http_client.post(url).json(request).send().await { - Err(e) => Err(anyhow::Error::from(e)), - Ok(r) => { - if r.status() == StatusCode::OK { - r.json::() - .await - .map_err(anyhow::Error::from) - } else { - Err(anyhow::anyhow!("Unexpected status {}", r.status())) - } + async fn retry_http_forever(&self, url: &url::Url, request: R) -> Result + where + R: Serialize, + T: DeserializeOwned, + { + #[derive(thiserror::Error, Debug)] + enum RemoteAttemptError { + #[error("shutdown")] + Shutdown, + #[error("remote: {0}")] + Remote(reqwest::Error), + } + + match backoff::retry( + || async { + let response = self + .http_client + .post(url.clone()) + .json(&request) + .send() + .await + .map_err(|e| RemoteAttemptError::Remote(e))?; + + response + .error_for_status_ref() + .map_err(|e| RemoteAttemptError::Remote(e))?; + response + .json::() + .await + .map_err(|e| RemoteAttemptError::Remote(e)) + }, + |_| false, + 3, + u32::MAX, + "calling control plane generation validation API", + backoff::Cancel::new(self.cancel.clone(), || RemoteAttemptError::Shutdown), + ) + .await + { + Err(RemoteAttemptError::Shutdown) => Err(anyhow::anyhow!("Shutting down")), + Err(RemoteAttemptError::Remote(_)) => { + panic!("We retry forever, this should never be reached"); } + Ok(r) => Ok(r), } } @@ -83,37 +107,46 @@ impl ControlPlaneClient { node_id: self.node_id, }; - let mut attempt = 0; - loop { - let result = self.try_re_attach(re_attach_path.clone(), &request).await; - match result { - Ok(res) => { - tracing::info!( - "Received re-attach response with {} tenants", - res.tenants.len() - ); + let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?; + tracing::info!( + "Received re-attach response with {} tenants", + response.tenants.len() + ); - return Ok(res - .tenants - .into_iter() - .map(|t| (t.id, Generation::new(t.generation))) - .collect::>()); - } - Err(e) => { - tracing::error!("Error re-attaching tenants, retrying: {e:#}"); - backoff::exponential_backoff( - attempt, - BACKOFF_INCREMENT, - BACKOFF_MAX, - &self.cancel, - ) - .await; - if self.cancel.is_cancelled() { - return Err(anyhow::anyhow!("Shutting down")); - } - attempt += 1; - } - } - } + return Ok(response + .tenants + .into_iter() + .map(|t| (t.id, Generation::new(t.generation))) + .collect::>()); + } + + pub(crate) async fn validate( + &self, + tenants: Vec<(TenantId, Generation)>, + ) -> anyhow::Result> { + let re_attach_path = self + .base_url + .join("validate") + .expect("Failed to build validate path"); + + let request = ValidateRequest { + tenants: tenants + .into_iter() + .map(|(id, gen)| ValidateRequestTenant { + id, + gen: gen + .into() + .expect("Generation should always be valid for a Tenant doing deletions"), + }) + .collect(), + }; + + let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?; + + Ok(response + .tenants + .into_iter() + .map(|rt| (rt.id, rt.valid)) + .collect()) } }