mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 11:30:37 +00:00
## Problem Pageservers must not delete objects or advertise updates to remote_consistent_lsn without checking that they hold the latest generation for the tenant in question (see [the RFC]( https://github.com/neondatabase/neon/blob/main/docs/rfcs/025-generation-numbers.md)) In this PR: - A new "deletion queue" subsystem is introduced, through which deletions flow - `RemoteTimelineClient` is modified to send deletions through the deletion queue: - For GC & compaction, deletions flow through the full generation verifying process - For timeline deletions, deletions take a fast path that bypasses generation verification - The `last_uploaded_consistent_lsn` value in `UploadQueue` is replaced with a mechanism that maintains a "projected" lsn (equivalent to the previous property), and a "visible" LSN (which is the one that we may share with safekeepers). - Until `control_plane_api` is set, all deletions skip generation validation - Tests are introduced for the new functionality in `test_pageserver_generations.py` Once this lands, if a pageserver is configured with the `control_plane_api` configuration added in https://github.com/neondatabase/neon/pull/5163, it becomes safe to attach a tenant to multiple pageservers concurrently. --------- Co-authored-by: Joonas Koivunen <joonas@neon.tech> Co-authored-by: Christian Schwarz <christian@neon.tech>
176 lines
5.5 KiB
Rust
176 lines
5.5 KiB
Rust
use std::collections::HashMap;
|
|
|
|
use pageserver_api::control_api::{
|
|
ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
|
|
};
|
|
use serde::{de::DeserializeOwned, Serialize};
|
|
use tokio_util::sync::CancellationToken;
|
|
use url::Url;
|
|
use utils::{
|
|
backoff,
|
|
generation::Generation,
|
|
id::{NodeId, TenantId},
|
|
};
|
|
|
|
use crate::config::PageServerConf;
|
|
|
|
/// The Pageserver's client for using the control plane API: this is a small subset
|
|
/// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
|
|
pub struct ControlPlaneClient {
|
|
http_client: reqwest::Client,
|
|
base_url: Url,
|
|
node_id: NodeId,
|
|
cancel: CancellationToken,
|
|
}
|
|
|
|
/// Represent operations which internally retry on all errors other than
|
|
/// cancellation token firing: the only way they can fail is ShuttingDown.
|
|
pub enum RetryForeverError {
|
|
ShuttingDown,
|
|
}
|
|
|
|
#[async_trait::async_trait]
|
|
pub trait ControlPlaneGenerationsApi {
|
|
async fn re_attach(&self) -> Result<HashMap<TenantId, Generation>, RetryForeverError>;
|
|
async fn validate(
|
|
&self,
|
|
tenants: Vec<(TenantId, Generation)>,
|
|
) -> Result<HashMap<TenantId, bool>, RetryForeverError>;
|
|
}
|
|
|
|
impl ControlPlaneClient {
|
|
/// A None return value indicates that the input `conf` object does not have control
|
|
/// plane API enabled.
|
|
pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
|
|
let mut url = match conf.control_plane_api.as_ref() {
|
|
Some(u) => u.clone(),
|
|
None => return None,
|
|
};
|
|
|
|
if let Ok(mut segs) = url.path_segments_mut() {
|
|
// This ensures that `url` ends with a slash if it doesn't already.
|
|
// That way, we can subsequently use join() to safely attach extra path elements.
|
|
segs.pop_if_empty().push("");
|
|
}
|
|
|
|
let client = reqwest::ClientBuilder::new()
|
|
.build()
|
|
.expect("Failed to construct http client");
|
|
|
|
Some(Self {
|
|
http_client: client,
|
|
base_url: url,
|
|
node_id: conf.id,
|
|
cancel: cancel.clone(),
|
|
})
|
|
}
|
|
|
|
async fn retry_http_forever<R, T>(
|
|
&self,
|
|
url: &url::Url,
|
|
request: R,
|
|
) -> Result<T, RetryForeverError>
|
|
where
|
|
R: Serialize,
|
|
T: DeserializeOwned,
|
|
{
|
|
#[derive(thiserror::Error, Debug)]
|
|
enum RemoteAttemptError {
|
|
#[error("shutdown")]
|
|
Shutdown,
|
|
#[error("remote: {0}")]
|
|
Remote(reqwest::Error),
|
|
}
|
|
|
|
match backoff::retry(
|
|
|| async {
|
|
let response = self
|
|
.http_client
|
|
.post(url.clone())
|
|
.json(&request)
|
|
.send()
|
|
.await
|
|
.map_err(RemoteAttemptError::Remote)?;
|
|
|
|
response
|
|
.error_for_status_ref()
|
|
.map_err(RemoteAttemptError::Remote)?;
|
|
response
|
|
.json::<T>()
|
|
.await
|
|
.map_err(RemoteAttemptError::Remote)
|
|
},
|
|
|_| false,
|
|
3,
|
|
u32::MAX,
|
|
"calling control plane generation validation API",
|
|
backoff::Cancel::new(self.cancel.clone(), || RemoteAttemptError::Shutdown),
|
|
)
|
|
.await
|
|
{
|
|
Err(RemoteAttemptError::Shutdown) => Err(RetryForeverError::ShuttingDown),
|
|
Err(RemoteAttemptError::Remote(_)) => {
|
|
panic!("We retry forever, this should never be reached");
|
|
}
|
|
Ok(r) => Ok(r),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait::async_trait]
|
|
impl ControlPlaneGenerationsApi for ControlPlaneClient {
|
|
/// Block until we get a successful response, or error out if we are shut down
|
|
async fn re_attach(&self) -> Result<HashMap<TenantId, Generation>, RetryForeverError> {
|
|
let re_attach_path = self
|
|
.base_url
|
|
.join("re-attach")
|
|
.expect("Failed to build re-attach path");
|
|
let request = ReAttachRequest {
|
|
node_id: self.node_id,
|
|
};
|
|
|
|
let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
|
|
tracing::info!(
|
|
"Received re-attach response with {} tenants",
|
|
response.tenants.len()
|
|
);
|
|
|
|
Ok(response
|
|
.tenants
|
|
.into_iter()
|
|
.map(|t| (t.id, Generation::new(t.generation)))
|
|
.collect::<HashMap<_, _>>())
|
|
}
|
|
|
|
/// Block until we get a successful response, or error out if we are shut down
|
|
async fn validate(
|
|
&self,
|
|
tenants: Vec<(TenantId, Generation)>,
|
|
) -> Result<HashMap<TenantId, bool>, RetryForeverError> {
|
|
let re_attach_path = self
|
|
.base_url
|
|
.join("validate")
|
|
.expect("Failed to build validate path");
|
|
|
|
let request = ValidateRequest {
|
|
tenants: tenants
|
|
.into_iter()
|
|
.map(|(id, gen)| ValidateRequestTenant {
|
|
id,
|
|
gen: gen
|
|
.into()
|
|
.expect("Generation should always be valid for a Tenant doing deletions"),
|
|
})
|
|
.collect(),
|
|
};
|
|
|
|
let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
|
|
|
|
Ok(response
|
|
.tenants
|
|
.into_iter()
|
|
.map(|rt| (rt.id, rt.valid))
|
|
.collect())
|
|
}
|
|
}
|