mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
pageserver: add validate to control plane client
This commit is contained in:
@@ -1,7 +1,9 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::control_api::{ReAttachRequest, ReAttachResponse};
|
||||
use pageserver_api::control_api::{
|
||||
ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
|
||||
};
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use url::Url;
|
||||
use utils::{
|
||||
@@ -12,12 +14,6 @@ use utils::{
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
|
||||
// Backoffs when control plane requests do not succeed: compromise between reducing load
|
||||
// on control plane, and retrying frequently when we are blocked on a control plane
|
||||
// response to make progress.
|
||||
const BACKOFF_INCREMENT: f64 = 0.1;
|
||||
const BACKOFF_MAX: f64 = 10.0;
|
||||
|
||||
/// The Pageserver's client for using the control plane API: this is a small subset
|
||||
/// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
|
||||
pub(crate) struct ControlPlaneClient {
|
||||
@@ -54,22 +50,50 @@ impl ControlPlaneClient {
|
||||
})
|
||||
}
|
||||
|
||||
async fn try_re_attach(
|
||||
&self,
|
||||
url: Url,
|
||||
request: &ReAttachRequest,
|
||||
) -> anyhow::Result<ReAttachResponse> {
|
||||
match self.http_client.post(url).json(request).send().await {
|
||||
Err(e) => Err(anyhow::Error::from(e)),
|
||||
Ok(r) => {
|
||||
if r.status() == StatusCode::OK {
|
||||
r.json::<ReAttachResponse>()
|
||||
.await
|
||||
.map_err(anyhow::Error::from)
|
||||
} else {
|
||||
Err(anyhow::anyhow!("Unexpected status {}", r.status()))
|
||||
}
|
||||
async fn retry_http_forever<R, T>(&self, url: &url::Url, request: R) -> Result<T, anyhow::Error>
|
||||
where
|
||||
R: Serialize,
|
||||
T: DeserializeOwned,
|
||||
{
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum RemoteAttemptError {
|
||||
#[error("shutdown")]
|
||||
Shutdown,
|
||||
#[error("remote: {0}")]
|
||||
Remote(reqwest::Error),
|
||||
}
|
||||
|
||||
match backoff::retry(
|
||||
|| async {
|
||||
let response = self
|
||||
.http_client
|
||||
.post(url.clone())
|
||||
.json(&request)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| RemoteAttemptError::Remote(e))?;
|
||||
|
||||
response
|
||||
.error_for_status_ref()
|
||||
.map_err(|e| RemoteAttemptError::Remote(e))?;
|
||||
response
|
||||
.json::<T>()
|
||||
.await
|
||||
.map_err(|e| RemoteAttemptError::Remote(e))
|
||||
},
|
||||
|_| false,
|
||||
3,
|
||||
u32::MAX,
|
||||
"calling control plane generation validation API",
|
||||
backoff::Cancel::new(self.cancel.clone(), || RemoteAttemptError::Shutdown),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(RemoteAttemptError::Shutdown) => Err(anyhow::anyhow!("Shutting down")),
|
||||
Err(RemoteAttemptError::Remote(_)) => {
|
||||
panic!("We retry forever, this should never be reached");
|
||||
}
|
||||
Ok(r) => Ok(r),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,37 +107,46 @@ impl ControlPlaneClient {
|
||||
node_id: self.node_id,
|
||||
};
|
||||
|
||||
let mut attempt = 0;
|
||||
loop {
|
||||
let result = self.try_re_attach(re_attach_path.clone(), &request).await;
|
||||
match result {
|
||||
Ok(res) => {
|
||||
tracing::info!(
|
||||
"Received re-attach response with {} tenants",
|
||||
res.tenants.len()
|
||||
);
|
||||
let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
|
||||
tracing::info!(
|
||||
"Received re-attach response with {} tenants",
|
||||
response.tenants.len()
|
||||
);
|
||||
|
||||
return Ok(res
|
||||
.tenants
|
||||
.into_iter()
|
||||
.map(|t| (t.id, Generation::new(t.generation)))
|
||||
.collect::<HashMap<_, _>>());
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Error re-attaching tenants, retrying: {e:#}");
|
||||
backoff::exponential_backoff(
|
||||
attempt,
|
||||
BACKOFF_INCREMENT,
|
||||
BACKOFF_MAX,
|
||||
&self.cancel,
|
||||
)
|
||||
.await;
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(anyhow::anyhow!("Shutting down"));
|
||||
}
|
||||
attempt += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
return Ok(response
|
||||
.tenants
|
||||
.into_iter()
|
||||
.map(|t| (t.id, Generation::new(t.generation)))
|
||||
.collect::<HashMap<_, _>>());
|
||||
}
|
||||
|
||||
pub(crate) async fn validate(
|
||||
&self,
|
||||
tenants: Vec<(TenantId, Generation)>,
|
||||
) -> anyhow::Result<HashMap<TenantId, bool>> {
|
||||
let re_attach_path = self
|
||||
.base_url
|
||||
.join("validate")
|
||||
.expect("Failed to build validate path");
|
||||
|
||||
let request = ValidateRequest {
|
||||
tenants: tenants
|
||||
.into_iter()
|
||||
.map(|(id, gen)| ValidateRequestTenant {
|
||||
id,
|
||||
gen: gen
|
||||
.into()
|
||||
.expect("Generation should always be valid for a Tenant doing deletions"),
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
|
||||
let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
|
||||
|
||||
Ok(response
|
||||
.tenants
|
||||
.into_iter()
|
||||
.map(|rt| (rt.id, rt.valid))
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user