mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 06:52:55 +00:00
pageserver: rename control plane client & chunk validation requests (#8997)
## Problem - In https://github.com/neondatabase/neon/pull/8784, the validate controller API is modified to check generations directly in the database. It batches tenants into separate queries to avoid generating a huge statement, but - While updating this, I realized that "control_plane_client" is a kind of confusing name for the client code now that it primarily talks to the storage controller (the case of talking to the control plane will go away in a few months). ## Summary of changes - Big rename to "ControllerUpcallClient" -- this reflects the storage controller's api naming, where the paths used by the pageserver are in `/upcall/` - When sending validate requests, break them up into chunks so that we avoid possible edge cases of generating any HTTP requests that require database I/O across many thousands of tenants. This PR mixes a functional change with a refactor, but the commits are cleanly separated -- only the last commit is a functional change. --------- Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
@@ -15,7 +15,7 @@ use clap::{Arg, ArgAction, Command};
|
||||
|
||||
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
|
||||
use pageserver::config::PageserverIdentity;
|
||||
use pageserver::control_plane_client::ControlPlaneClient;
|
||||
use pageserver::controller_upcall_client::ControllerUpcallClient;
|
||||
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
|
||||
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
|
||||
use pageserver::task_mgr::{COMPUTE_REQUEST_RUNTIME, WALRECEIVER_RUNTIME};
|
||||
@@ -396,7 +396,7 @@ fn start_pageserver(
|
||||
// Set up deletion queue
|
||||
let (deletion_queue, deletion_workers) = DeletionQueue::new(
|
||||
remote_storage.clone(),
|
||||
ControlPlaneClient::new(conf, &shutdown_pageserver),
|
||||
ControllerUpcallClient::new(conf, &shutdown_pageserver),
|
||||
conf,
|
||||
);
|
||||
if let Some(deletion_workers) = deletion_workers {
|
||||
|
||||
@@ -17,9 +17,12 @@ use utils::{backoff, failpoint_support, generation::Generation, id::NodeId};
|
||||
use crate::{config::PageServerConf, virtual_file::on_fatal_io_error};
|
||||
use pageserver_api::config::NodeMetadata;
|
||||
|
||||
/// The Pageserver's client for using the control plane API: this is a small subset
|
||||
/// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
|
||||
pub struct ControlPlaneClient {
|
||||
/// The Pageserver's client for using the storage controller upcall API: this is a small API
|
||||
/// for dealing with generations (see docs/rfcs/025-generation-numbers.md).
|
||||
///
|
||||
/// The server presenting this API may either be the storage controller or some other
|
||||
/// service (such as the Neon control plane) providing a store of generation numbers.
|
||||
pub struct ControllerUpcallClient {
|
||||
http_client: reqwest::Client,
|
||||
base_url: Url,
|
||||
node_id: NodeId,
|
||||
@@ -45,7 +48,7 @@ pub trait ControlPlaneGenerationsApi {
|
||||
) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
|
||||
}
|
||||
|
||||
impl ControlPlaneClient {
|
||||
impl ControllerUpcallClient {
|
||||
/// A None return value indicates that the input `conf` object does not have control
|
||||
/// plane API enabled.
|
||||
pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
|
||||
@@ -114,7 +117,7 @@ impl ControlPlaneClient {
|
||||
}
|
||||
}
|
||||
|
||||
impl ControlPlaneGenerationsApi for ControlPlaneClient {
|
||||
impl ControlPlaneGenerationsApi for ControllerUpcallClient {
|
||||
/// Block until we get a successful response, or error out if we are shut down
|
||||
async fn re_attach(
|
||||
&self,
|
||||
@@ -216,29 +219,38 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
|
||||
.join("validate")
|
||||
.expect("Failed to build validate path");
|
||||
|
||||
let request = ValidateRequest {
|
||||
tenants: tenants
|
||||
.into_iter()
|
||||
.map(|(id, gen)| ValidateRequestTenant {
|
||||
id,
|
||||
gen: gen
|
||||
.into()
|
||||
.expect("Generation should always be valid for a Tenant doing deletions"),
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
// When sending validate requests, break them up into chunks so that we
|
||||
// avoid possible edge cases of generating any HTTP requests that
|
||||
// require database I/O across many thousands of tenants.
|
||||
let mut result: HashMap<TenantShardId, bool> = HashMap::with_capacity(tenants.len());
|
||||
for tenant_chunk in (tenants).chunks(128) {
|
||||
let request = ValidateRequest {
|
||||
tenants: tenant_chunk
|
||||
.iter()
|
||||
.map(|(id, generation)| ValidateRequestTenant {
|
||||
id: *id,
|
||||
gen: (*generation).into().expect(
|
||||
"Generation should always be valid for a Tenant doing deletions",
|
||||
),
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
|
||||
failpoint_support::sleep_millis_async!("control-plane-client-validate-sleep", &self.cancel);
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(RetryForeverError::ShuttingDown);
|
||||
failpoint_support::sleep_millis_async!(
|
||||
"control-plane-client-validate-sleep",
|
||||
&self.cancel
|
||||
);
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(RetryForeverError::ShuttingDown);
|
||||
}
|
||||
|
||||
let response: ValidateResponse =
|
||||
self.retry_http_forever(&re_attach_path, request).await?;
|
||||
for rt in response.tenants {
|
||||
result.insert(rt.id, rt.valid);
|
||||
}
|
||||
}
|
||||
|
||||
let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
|
||||
|
||||
Ok(response
|
||||
.tenants
|
||||
.into_iter()
|
||||
.map(|rt| (rt.id, rt.valid))
|
||||
.collect())
|
||||
Ok(result.into_iter().collect())
|
||||
}
|
||||
}
|
||||
@@ -6,7 +6,7 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::control_plane_client::ControlPlaneGenerationsApi;
|
||||
use crate::controller_upcall_client::ControlPlaneGenerationsApi;
|
||||
use crate::metrics;
|
||||
use crate::tenant::remote_timeline_client::remote_layer_path;
|
||||
use crate::tenant::remote_timeline_client::remote_timeline_path;
|
||||
@@ -622,7 +622,7 @@ impl DeletionQueue {
|
||||
/// If remote_storage is None, then the returned workers will also be None.
|
||||
pub fn new<C>(
|
||||
remote_storage: GenericRemoteStorage,
|
||||
control_plane_client: Option<C>,
|
||||
controller_upcall_client: Option<C>,
|
||||
conf: &'static PageServerConf,
|
||||
) -> (Self, Option<DeletionQueueWorkers<C>>)
|
||||
where
|
||||
@@ -662,7 +662,7 @@ impl DeletionQueue {
|
||||
conf,
|
||||
backend_rx,
|
||||
executor_tx,
|
||||
control_plane_client,
|
||||
controller_upcall_client,
|
||||
lsn_table.clone(),
|
||||
cancel.clone(),
|
||||
),
|
||||
@@ -704,7 +704,7 @@ mod test {
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::{
|
||||
control_plane_client::RetryForeverError,
|
||||
controller_upcall_client::RetryForeverError,
|
||||
repository::Key,
|
||||
tenant::{harness::TenantHarness, storage_layer::DeltaLayerName},
|
||||
};
|
||||
|
||||
@@ -25,8 +25,8 @@ use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::control_plane_client::ControlPlaneGenerationsApi;
|
||||
use crate::control_plane_client::RetryForeverError;
|
||||
use crate::controller_upcall_client::ControlPlaneGenerationsApi;
|
||||
use crate::controller_upcall_client::RetryForeverError;
|
||||
use crate::metrics;
|
||||
use crate::virtual_file::MaybeFatalIo;
|
||||
|
||||
@@ -61,7 +61,7 @@ where
|
||||
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
||||
|
||||
// Client for calling into control plane API for validation of deletes
|
||||
control_plane_client: Option<C>,
|
||||
controller_upcall_client: Option<C>,
|
||||
|
||||
// DeletionLists which are waiting generation validation. Not safe to
|
||||
// execute until [`validate`] has processed them.
|
||||
@@ -94,7 +94,7 @@ where
|
||||
conf: &'static PageServerConf,
|
||||
rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
|
||||
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
||||
control_plane_client: Option<C>,
|
||||
controller_upcall_client: Option<C>,
|
||||
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
@@ -102,7 +102,7 @@ where
|
||||
conf,
|
||||
rx,
|
||||
tx,
|
||||
control_plane_client,
|
||||
controller_upcall_client,
|
||||
lsn_table,
|
||||
pending_lists: Vec::new(),
|
||||
validated_lists: Vec::new(),
|
||||
@@ -145,8 +145,8 @@ where
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let tenants_valid = if let Some(control_plane_client) = &self.control_plane_client {
|
||||
match control_plane_client
|
||||
let tenants_valid = if let Some(controller_upcall_client) = &self.controller_upcall_client {
|
||||
match controller_upcall_client
|
||||
.validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -6,7 +6,7 @@ pub mod basebackup;
|
||||
pub mod config;
|
||||
pub mod consumption_metrics;
|
||||
pub mod context;
|
||||
pub mod control_plane_client;
|
||||
pub mod controller_upcall_client;
|
||||
pub mod deletion_queue;
|
||||
pub mod disk_usage_eviction_task;
|
||||
pub mod http;
|
||||
|
||||
@@ -30,8 +30,8 @@ use utils::{backoff, completion, crashsafe};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::control_plane_client::{
|
||||
ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError,
|
||||
use crate::controller_upcall_client::{
|
||||
ControlPlaneGenerationsApi, ControllerUpcallClient, RetryForeverError,
|
||||
};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::http::routes::ACTIVE_TENANT_TIMEOUT;
|
||||
@@ -122,7 +122,7 @@ pub(crate) enum ShardSelector {
|
||||
Known(ShardIndex),
|
||||
}
|
||||
|
||||
/// A convenience for use with the re_attach ControlPlaneClient function: rather
|
||||
/// A convenience for use with the re_attach ControllerUpcallClient function: rather
|
||||
/// than the serializable struct, we build this enum that encapsulates
|
||||
/// the invariant that attached tenants always have generations.
|
||||
///
|
||||
@@ -341,7 +341,7 @@ async fn init_load_generations(
|
||||
"Emergency mode! Tenants will be attached unsafely using their last known generation"
|
||||
);
|
||||
emergency_generations(tenant_confs)
|
||||
} else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
|
||||
} else if let Some(client) = ControllerUpcallClient::new(conf, cancel) {
|
||||
info!("Calling control plane API to re-attach tenants");
|
||||
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
|
||||
match client.re_attach(conf).await {
|
||||
|
||||
Reference in New Issue
Block a user