From 786e9cf75ba482e67b7e7e0626fac21b1696c761 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 2 Feb 2024 19:22:03 +0000 Subject: [PATCH] control_plane: implement HTTP compute hook for attachment service (#6471) ## Problem When we change which physical pageservers a tenant is attached to, we must update the control plane so that it can update computes. This will be done via an HTTP hook, as described in https://www.notion.so/neondatabase/Sharding-Service-Control-Plane-interface-6de56dd310a043bfa5c2f5564fa98365#1fe185a35d6d41f0a54279ac1a41bc94 ## Summary of changes - Optional CLI args `--control-plane-jwt-token` and `-compute-hook-url` are added. If these are set, then we will use this HTTP endpoint, instead of trying to use neon_local LocalEnv to update compute configuration. - Implement an HTTP-driven version of ComputeHook that calls into the configured URL - Notify for all tenants on startup, to ensure that we don't miss notifications if we crash partway through a change, and carry a `pending_compute_notification` flag at runtime to allow notifications to fail without risking never sending the update. - Add a test for all this One might wonder: why not do a "forever" retry for compute hook notifications, rather than carrying a flag on the shard to call reconcile() again later. The reason is that we will later limit concurreny of reconciles, when dealing with larger numbers of shards, and if reconcile is stuck waiting for the control plane to accept a notification request, it could jam up the whole system and prevent us making other changes. Anyway: from the perspective of the outside world, we _do_ retry forever, but we don't retry forever within a given Reconciler lifetime. The `pending_compute_notification` logic is predicated on later adding a background task that just calls `Service::reconcile_all` on a schedule to make sure that anything+everything that can fail a Reconciler::reconcile call will eventually be retried. --- Cargo.lock | 1 + control_plane/attachment_service/Cargo.toml | 1 + .../attachment_service/src/compute_hook.rs | 286 +++++++++++++++--- control_plane/attachment_service/src/main.rs | 34 ++- .../attachment_service/src/reconciler.rs | 63 +++- .../attachment_service/src/service.rs | 86 +++++- .../attachment_service/src/tenant_state.rs | 60 ++++ control_plane/src/attachment_service.rs | 6 + control_plane/src/bin/neon_local.rs | 2 +- control_plane/src/endpoint.rs | 34 ++- control_plane/src/local_env.rs | 7 +- test_runner/fixtures/neon_fixtures.py | 9 +- test_runner/regress/test_sharding_service.py | 101 ++++++- 13 files changed, 600 insertions(+), 90 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 90991ab0a4..02450709d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -288,6 +288,7 @@ dependencies = [ "pageserver_api", "pageserver_client", "postgres_connection", + "reqwest", "serde", "serde_json", "thiserror", diff --git a/control_plane/attachment_service/Cargo.toml b/control_plane/attachment_service/Cargo.toml index 1d3831eea0..d3c62d74d2 100644 --- a/control_plane/attachment_service/Cargo.toml +++ b/control_plane/attachment_service/Cargo.toml @@ -16,6 +16,7 @@ hyper.workspace = true pageserver_api.workspace = true pageserver_client.workspace = true postgres_connection.workspace = true +reqwest.workspace = true serde.workspace = true serde_json.workspace = true thiserror.workspace = true diff --git a/control_plane/attachment_service/src/compute_hook.rs b/control_plane/attachment_service/src/compute_hook.rs index 02617cd065..9c1185f259 100644 --- a/control_plane/attachment_service/src/compute_hook.rs +++ b/control_plane/attachment_service/src/compute_hook.rs @@ -1,24 +1,76 @@ -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; -use control_plane::endpoint::ComputeControlPlane; +use control_plane::endpoint::{ComputeControlPlane, EndpointStatus}; use control_plane::local_env::LocalEnv; -use pageserver_api::shard::{ShardCount, ShardIndex, TenantShardId}; +use hyper::{Method, StatusCode}; +use pageserver_api::shard::{ShardCount, ShardIndex, ShardNumber, TenantShardId}; use postgres_connection::parse_host_port; -use utils::id::{NodeId, TenantId}; +use serde::{Deserialize, Serialize}; +use tokio_util::sync::CancellationToken; +use utils::{ + backoff::{self}, + id::{NodeId, TenantId}, +}; + +use crate::service::Config; + +const BUSY_DELAY: Duration = Duration::from_secs(1); +const SLOWDOWN_DELAY: Duration = Duration::from_secs(5); + +pub(crate) const API_CONCURRENCY: usize = 32; pub(super) struct ComputeHookTenant { shards: Vec<(ShardIndex, NodeId)>, } +#[derive(Serialize, Deserialize, Debug)] +struct ComputeHookNotifyRequestShard { + node_id: NodeId, + shard_number: ShardNumber, +} + +/// Request body that we send to the control plane to notify it of where a tenant is attached +#[derive(Serialize, Deserialize, Debug)] +struct ComputeHookNotifyRequest { + tenant_id: TenantId, + shards: Vec, +} + +/// Error type for attempts to call into the control plane compute notification hook +#[derive(thiserror::Error, Debug)] +pub(crate) enum NotifyError { + // Request was not send successfully, e.g. transport error + #[error("Sending request: {0}")] + Request(#[from] reqwest::Error), + // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon. + #[error("Control plane tenant busy")] + Busy, + // Explicit 429 response asking us to retry less frequently + #[error("Control plane overloaded")] + SlowDown, + // A 503 response indicates the control plane can't handle the request right now + #[error("Control plane unavailable (status {0})")] + Unavailable(StatusCode), + // API returned unexpected non-success status. We will retry, but log a warning. + #[error("Control plane returned unexpected status {0}")] + Unexpected(StatusCode), + // We shutdown while sending + #[error("Shutting down")] + ShuttingDown, + // A response indicates we will never succeed, such as 400 or 404 + #[error("Non-retryable error {0}")] + Fatal(StatusCode), +} + impl ComputeHookTenant { - pub(super) async fn maybe_reconfigure(&mut self, tenant_id: TenantId) -> anyhow::Result<()> { + async fn maybe_reconfigure(&mut self, tenant_id: TenantId) -> Option { // Find the highest shard count and drop any shards that aren't // for that shard count. let shard_count = self.shards.iter().map(|(k, _v)| k.shard_count).max(); let Some(shard_count) = shard_count else { // No shards, nothing to do. tracing::info!("ComputeHookTenant::maybe_reconfigure: no shards"); - return Ok(()); + return None; }; self.shards.retain(|(k, _v)| k.shard_count == shard_count); @@ -26,38 +78,18 @@ impl ComputeHookTenant { .sort_by_key(|(shard, _node_id)| shard.shard_number); if self.shards.len() == shard_count.0 as usize || shard_count == ShardCount(0) { - // We have pageservers for all the shards: proceed to reconfigure compute - let env = match LocalEnv::load_config() { - Ok(e) => e, - Err(e) => { - tracing::warn!( - "Couldn't load neon_local config, skipping compute update ({e})" - ); - return Ok(()); - } - }; - let cplane = ComputeControlPlane::load(env.clone()) - .expect("Error loading compute control plane"); - - let compute_pageservers = self - .shards - .iter() - .map(|(_shard, node_id)| { - let ps_conf = env - .get_pageserver_conf(*node_id) - .expect("Unknown pageserver"); - let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr) - .expect("Unable to parse listen_pg_addr"); - (pg_host, pg_port.unwrap_or(5432)) - }) - .collect::>(); - - for (endpoint_name, endpoint) in &cplane.endpoints { - if endpoint.tenant_id == tenant_id && endpoint.status() == "running" { - tracing::info!("🔁 Reconfiguring endpoint {}", endpoint_name,); - endpoint.reconfigure(compute_pageservers.clone()).await?; - } - } + // We have pageservers for all the shards: emit a configuration update + return Some(ComputeHookNotifyRequest { + tenant_id, + shards: self + .shards + .iter() + .map(|(shard, node_id)| ComputeHookNotifyRequestShard { + shard_number: shard.shard_number, + node_id: *node_id, + }) + .collect(), + }); } else { tracing::info!( "ComputeHookTenant::maybe_reconfigure: not enough shards ({}/{})", @@ -66,7 +98,7 @@ impl ComputeHookTenant { ); } - Ok(()) + None } } @@ -74,22 +106,171 @@ impl ComputeHookTenant { /// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures /// the compute connection string. pub(super) struct ComputeHook { + config: Config, state: tokio::sync::Mutex>, + authorization_header: Option, } impl ComputeHook { - pub(super) fn new() -> Self { + pub(super) fn new(config: Config) -> Self { + let authorization_header = config + .control_plane_jwt_token + .clone() + .map(|jwt| format!("Bearer {}", jwt)); + Self { state: Default::default(), + config, + authorization_header, } } + /// For test environments: use neon_local's LocalEnv to update compute + async fn do_notify_local( + &self, + reconfigure_request: ComputeHookNotifyRequest, + ) -> anyhow::Result<()> { + let env = match LocalEnv::load_config() { + Ok(e) => e, + Err(e) => { + tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})"); + return Ok(()); + } + }; + let cplane = + ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane"); + let ComputeHookNotifyRequest { tenant_id, shards } = reconfigure_request; + + let compute_pageservers = shards + .into_iter() + .map(|shard| { + let ps_conf = env + .get_pageserver_conf(shard.node_id) + .expect("Unknown pageserver"); + let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr) + .expect("Unable to parse listen_pg_addr"); + (pg_host, pg_port.unwrap_or(5432)) + }) + .collect::>(); + + for (endpoint_name, endpoint) in &cplane.endpoints { + if endpoint.tenant_id == tenant_id && endpoint.status() == EndpointStatus::Running { + tracing::info!("🔁 Reconfiguring endpoint {}", endpoint_name,); + endpoint.reconfigure(compute_pageservers.clone()).await?; + } + } + + Ok(()) + } + + async fn do_notify_iteration( + &self, + client: &reqwest::Client, + url: &String, + reconfigure_request: &ComputeHookNotifyRequest, + cancel: &CancellationToken, + ) -> Result<(), NotifyError> { + let req = client.request(Method::POST, url); + let req = if let Some(value) = &self.authorization_header { + req.header(reqwest::header::AUTHORIZATION, value) + } else { + req + }; + + tracing::debug!( + "Sending notify request to {} ({:?})", + url, + reconfigure_request + ); + let send_result = req.json(&reconfigure_request).send().await; + let response = match send_result { + Ok(r) => r, + Err(e) => return Err(e.into()), + }; + + // Treat all 2xx responses as success + if response.status() >= StatusCode::OK && response.status() < StatusCode::MULTIPLE_CHOICES { + if response.status() != StatusCode::OK { + // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so + // log a warning. + tracing::warn!( + "Unexpected 2xx response code {} from control plane", + response.status() + ); + } + + return Ok(()); + } + + // Error response codes + match response.status() { + StatusCode::TOO_MANY_REQUESTS => { + // TODO: 429 handling should be global: set some state visible to other requests + // so that they will delay before starting, rather than all notifications trying + // once before backing off. + tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled()) + .await + .ok(); + Err(NotifyError::SlowDown) + } + StatusCode::LOCKED => { + // Delay our retry if busy: the usual fast exponential backoff in backoff::retry + // is not appropriate + tokio::time::timeout(BUSY_DELAY, cancel.cancelled()) + .await + .ok(); + Err(NotifyError::Busy) + } + StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT + | StatusCode::BAD_GATEWAY => Err(NotifyError::Unavailable(response.status())), + StatusCode::BAD_REQUEST | StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => { + Err(NotifyError::Fatal(response.status())) + } + _ => Err(NotifyError::Unexpected(response.status())), + } + } + + async fn do_notify( + &self, + url: &String, + reconfigure_request: ComputeHookNotifyRequest, + cancel: &CancellationToken, + ) -> Result<(), NotifyError> { + let client = reqwest::Client::new(); + backoff::retry( + || self.do_notify_iteration(&client, url, &reconfigure_request, cancel), + |e| matches!(e, NotifyError::Fatal(_)), + 3, + 10, + "Send compute notification", + backoff::Cancel::new(cancel.clone(), || NotifyError::ShuttingDown), + ) + .await + } + + /// Call this to notify the compute (postgres) tier of new pageservers to use + /// for a tenant. notify() is called by each shard individually, and this function + /// will decide whether an update to the tenant is sent. An update is sent on the + /// condition that: + /// - We know a pageserver for every shard. + /// - All the shards have the same shard_count (i.e. we are not mid-split) + /// + /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler + /// that is cancelled. + /// + /// This function is fallible, including in the case that the control plane is transiently + /// unavailable. A limited number of retries are done internally to efficiently hide short unavailability + /// periods, but we don't retry forever. The **caller** is responsible for handling failures and + /// ensuring that they eventually call again to ensure that the compute is eventually notified of + /// the proper pageserver nodes for a tenant. + #[tracing::instrument(skip_all, fields(tenant_shard_id, node_id))] pub(super) async fn notify( &self, tenant_shard_id: TenantShardId, node_id: NodeId, - ) -> anyhow::Result<()> { - tracing::info!("ComputeHook::notify: {}->{}", tenant_shard_id, node_id); + cancel: &CancellationToken, + ) -> Result<(), NotifyError> { let mut locked = self.state.lock().await; let entry = locked .entry(tenant_shard_id.tenant_id) @@ -111,6 +292,25 @@ impl ComputeHook { entry.shards.push((shard_index, node_id)); } - entry.maybe_reconfigure(tenant_shard_id.tenant_id).await + let reconfigure_request = entry.maybe_reconfigure(tenant_shard_id.tenant_id).await; + let Some(reconfigure_request) = reconfigure_request else { + // The tenant doesn't yet have pageservers for all its shards: we won't notify anything + // until it does. + tracing::debug!("Tenant isn't yet ready to emit a notification",); + return Ok(()); + }; + + if let Some(notify_url) = &self.config.compute_hook_url { + self.do_notify(notify_url, reconfigure_request, cancel) + .await + } else { + self.do_notify_local(reconfigure_request) + .await + .map_err(|e| { + // This path is for testing only, so munge the error into our prod-style error type. + tracing::error!("Local notification hook failed: {e}"); + NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR) + }) + } } } diff --git a/control_plane/attachment_service/src/main.rs b/control_plane/attachment_service/src/main.rs index ed65437ba2..eda9c7aad6 100644 --- a/control_plane/attachment_service/src/main.rs +++ b/control_plane/attachment_service/src/main.rs @@ -35,9 +35,18 @@ struct Cli { public_key: Option, /// Token for authenticating this service with the pageservers it controls - #[arg(short, long)] + #[arg(long)] jwt_token: Option, + /// Token for authenticating this service with the control plane, when calling + /// the compute notification endpoint + #[arg(long)] + control_plane_jwt_token: Option, + + /// URL to control plane compute notification endpoint + #[arg(long)] + compute_hook_url: Option, + /// Path to the .json file to store state (will be created if it doesn't exist) #[arg(short, long)] path: Option, @@ -53,11 +62,15 @@ struct Secrets { database_url: String, public_key: Option, jwt_token: Option, + control_plane_jwt_token: Option, } impl Secrets { const DATABASE_URL_SECRET: &'static str = "rds-neon-storage-controller-url"; - const JWT_TOKEN_SECRET: &'static str = "neon-storage-controller-pageserver-jwt-token"; + const PAGESERVER_JWT_TOKEN_SECRET: &'static str = + "neon-storage-controller-pageserver-jwt-token"; + const CONTROL_PLANE_JWT_TOKEN_SECRET: &'static str = + "neon-storage-controller-control-plane-jwt-token"; const PUBLIC_KEY_SECRET: &'static str = "neon-storage-controller-public-key"; async fn load(args: &Cli) -> anyhow::Result { @@ -95,7 +108,7 @@ impl Secrets { let jwt_token = asm .get_secret_value() - .secret_id(Self::JWT_TOKEN_SECRET) + .secret_id(Self::PAGESERVER_JWT_TOKEN_SECRET) .send() .await? .secret_string() @@ -104,6 +117,17 @@ impl Secrets { tracing::warn!("No pageserver JWT token set: this will only work if authentication is disabled on the pageserver"); } + let control_plane_jwt_token = asm + .get_secret_value() + .secret_id(Self::CONTROL_PLANE_JWT_TOKEN_SECRET) + .send() + .await? + .secret_string() + .map(str::to_string); + if jwt_token.is_none() { + tracing::warn!("No control plane JWT token set: this will only work if authentication is disabled on the pageserver"); + } + let public_key = asm .get_secret_value() .secret_id(Self::PUBLIC_KEY_SECRET) @@ -125,6 +149,7 @@ impl Secrets { database_url, public_key, jwt_token, + control_plane_jwt_token, }) } @@ -137,6 +162,7 @@ impl Secrets { database_url: args.database_url.clone(), public_key, jwt_token: args.jwt_token.clone(), + control_plane_jwt_token: args.control_plane_jwt_token.clone(), }) } } @@ -165,6 +191,8 @@ async fn main() -> anyhow::Result<()> { let config = Config { jwt_token: secrets.jwt_token, + control_plane_jwt_token: secrets.control_plane_jwt_token, + compute_hook_url: args.compute_hook_url, }; let json_path = args.path; diff --git a/control_plane/attachment_service/src/reconciler.rs b/control_plane/attachment_service/src/reconciler.rs index d7f4c0406a..776e1f9d1e 100644 --- a/control_plane/attachment_service/src/reconciler.rs +++ b/control_plane/attachment_service/src/reconciler.rs @@ -14,7 +14,7 @@ use utils::generation::Generation; use utils::id::{NodeId, TimelineId}; use utils::lsn::Lsn; -use crate::compute_hook::ComputeHook; +use crate::compute_hook::{ComputeHook, NotifyError}; use crate::node::Node; use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation}; @@ -37,9 +37,15 @@ pub(super) struct Reconciler { pub(crate) pageservers: Arc>, /// A hook to notify the running postgres instances when we change the location - /// of a tenant + /// of a tenant. Use this via [`Self::compute_notify`] to update our failure flag + /// and guarantee eventual retries. pub(crate) compute_hook: Arc, + /// To avoid stalling if the cloud control plane is unavailable, we may proceed + /// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed + /// so that we can set [`crate::tenant_state::TenantState::pending_compute_notification`] to ensure a later retry. + pub(crate) compute_notify_failure: bool, + /// A means to abort background reconciliation: it is essential to /// call this when something changes in the original TenantState that /// will make this reconciliation impossible or unnecessary, for @@ -52,7 +58,9 @@ pub(super) struct Reconciler { } #[derive(thiserror::Error, Debug)] -pub enum ReconcileError { +pub(crate) enum ReconcileError { + #[error(transparent)] + Notify(#[from] NotifyError), #[error(transparent)] Other(#[from] anyhow::Error), } @@ -317,9 +325,19 @@ impl Reconciler { } tracing::info!("🔁 Notifying compute to use pageserver {}", dest_ps_id); - self.compute_hook - .notify(self.tenant_shard_id, dest_ps_id) - .await?; + + // During a live migration it is unhelpful to proceed if we couldn't notify compute: if we detach + // the origin without notifying compute, we will render the tenant unavailable. + while let Err(e) = self.compute_notify().await { + match e { + NotifyError::Fatal(_) => return Err(anyhow::anyhow!(e)), + _ => { + tracing::warn!( + "Live migration blocked by compute notification error, retrying: {e}" + ); + } + } + } // Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Single, then // this location will be deleted in the general case reconciliation that runs after this. @@ -400,15 +418,7 @@ impl Reconciler { wanted_conf.generation = self.generation.into(); tracing::info!("Observed configuration requires update."); self.location_config(node_id, wanted_conf, None).await?; - if let Err(e) = self - .compute_hook - .notify(self.tenant_shard_id, node_id) - .await - { - tracing::warn!( - "Failed to notify compute of newly attached pageserver {node_id}: {e}" - ); - } + self.compute_notify().await?; } } } @@ -461,6 +471,29 @@ impl Reconciler { Ok(()) } + + pub(crate) async fn compute_notify(&mut self) -> Result<(), NotifyError> { + // Whenever a particular Reconciler emits a notification, it is always notifying for the intended + // destination. + if let Some(node_id) = self.intent.attached { + let result = self + .compute_hook + .notify(self.tenant_shard_id, node_id, &self.cancel) + .await; + if let Err(e) = &result { + // It is up to the caller whether they want to drop out on this error, but they don't have to: + // in general we should avoid letting unavailability of the cloud control plane stop us from + // making progress. + tracing::warn!("Failed to notify compute of attached pageserver {node_id}: {e}"); + // Set this flag so that in our ReconcileResult we will set the flag on the shard that it + // needs to retry at some point. + self.compute_notify_failure = true; + } + result + } else { + Ok(()) + } + } } pub(crate) fn attached_location_conf( diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 8c6a348515..6f0e3ebb74 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -12,6 +12,7 @@ use control_plane::attachment_service::{ TenantShardMigrateRequest, TenantShardMigrateResponse, }; use diesel::result::DatabaseErrorKind; +use futures::StreamExt; use hyper::StatusCode; use pageserver_api::{ control_api::{ @@ -27,6 +28,7 @@ use pageserver_api::{ shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId}, }; use pageserver_client::mgmt_api; +use tokio_util::sync::CancellationToken; use utils::{ completion::Barrier, generation::Generation, @@ -36,7 +38,7 @@ use utils::{ }; use crate::{ - compute_hook::ComputeHook, + compute_hook::{self, ComputeHook}, node::Node, persistence::{DatabaseError, NodePersistence, Persistence, TenantShardPersistence}, scheduler::Scheduler, @@ -66,6 +68,7 @@ struct ServiceState { impl ServiceState { fn new( + config: Config, result_tx: tokio::sync::mpsc::UnboundedSender, nodes: HashMap, tenants: BTreeMap, @@ -73,7 +76,7 @@ impl ServiceState { Self { tenants, nodes: Arc::new(nodes), - compute_hook: Arc::new(ComputeHook::new()), + compute_hook: Arc::new(ComputeHook::new(config)), result_tx, } } @@ -82,8 +85,17 @@ impl ServiceState { #[derive(Clone)] pub struct Config { // All pageservers managed by one instance of this service must have - // the same public key. + // the same public key. This JWT token will be used to authenticate + // this service to the pageservers it manages. pub jwt_token: Option, + + // This JWT token will be used to authenticate this service to the control plane. + pub control_plane_jwt_token: Option, + + /// Where the compute hook should send notifications of pageserver attachment locations + /// (this URL points to the control plane in prod). If this is None, the compute hook will + /// assume it is running in a test environment and try to update neon_local. + pub compute_hook_url: Option, } impl From for ApiError { @@ -163,6 +175,8 @@ impl Service { let mut cleanup = Vec::new(); + let mut compute_notifications = Vec::new(); + // Populate intent and observed states for all tenants, based on reported state on pageservers let shard_count = { let mut locked = self.inner.write().unwrap(); @@ -187,6 +201,13 @@ impl Service { // not enough pageservers are available. The tenant may well still be available // to clients. tracing::error!("Failed to schedule tenant {tenant_shard_id} at startup: {e}"); + } else { + // If we're both intending and observed to be attached at a particular node, we will + // emit a compute notification for this. In the case where our observed state does not + // yet match our intent, we will eventually reconcile, and that will emit a compute notification. + if let Some(attached_at) = tenant_state.stably_attached() { + compute_notifications.push((*tenant_shard_id, attached_at)); + } } } @@ -235,10 +256,57 @@ impl Service { } } + // Emit compute hook notifications for all tenants which are already stably attached. Other tenants + // will emit compute hook notifications when they reconcile. + // + // Ordering: we must complete these notification attempts before doing any other reconciliation for the + // tenants named here, because otherwise our calls to notify() might race with more recent values + // generated by reconciliation. + + // Compute notify is fallible. If it fails here, do not delay overall startup: set the + // flag on these shards that they have a pending notification. + let compute_hook = self.inner.read().unwrap().compute_hook.clone(); + + // Construct an async stream of futures to invoke the compute notify function: we do this + // in order to subsequently use .buffered() on the stream to execute with bounded parallelism. + let stream = futures::stream::iter(compute_notifications.into_iter()) + .map(|(tenant_shard_id, node_id)| { + let compute_hook = compute_hook.clone(); + async move { + // TODO: give Service a cancellation token for clean shutdown + let cancel = CancellationToken::new(); + if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await { + tracing::error!( + tenant_shard_id=%tenant_shard_id, + node_id=%node_id, + "Failed to notify compute on startup for shard: {e}" + ); + Some(tenant_shard_id) + } else { + None + } + } + }) + .buffered(compute_hook::API_CONCURRENCY); + let notify_results = stream.collect::>().await; + + // Update tenant state for any that failed to do their initial compute notify, so that they'll retry later. + { + let mut locked = self.inner.write().unwrap(); + for tenant_shard_id in notify_results.into_iter().flatten() { + if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) { + shard.pending_compute_notification = true; + } + } + } + // Finally, now that the service is up and running, launch reconcile operations for any tenants // which require it: under normal circumstances this should only include tenants that were in some - // transient state before we restarted. + // transient state before we restarted, or any tenants whose compute hooks failed above. let reconcile_tasks = self.reconcile_all(); + // We will not wait for these reconciliation tasks to run here: we're now done with startup and + // normal operations may proceed. + tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)"); } @@ -295,6 +363,7 @@ impl Service { waiter: Arc::new(SeqWait::new(Sequence::initial())), error_waiter: Arc::new(SeqWait::new(Sequence::initial())), last_error: Arc::default(), + pending_compute_notification: false, }; tenants.insert(tenant_shard_id, new_tenant); @@ -304,7 +373,10 @@ impl Service { let this = Arc::new(Self { inner: Arc::new(std::sync::RwLock::new(ServiceState::new( - result_tx, nodes, tenants, + config.clone(), + result_tx, + nodes, + tenants, ))), config, persistence, @@ -330,6 +402,10 @@ impl Service { // needed, but it is used to handle out-of-band updates via. e.g. test hook. tenant.generation = std::cmp::max(tenant.generation, result.generation); + // If the reconciler signals that it failed to notify compute, set this state on + // the shard so that a future [`TenantState::maybe_reconcile`] will try again. + tenant.pending_compute_notification = result.pending_compute_notification; + match result.result { Ok(()) => { for (node_id, loc) in &result.observed.locations { diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs index 5290197d84..a358e1ff7b 100644 --- a/control_plane/attachment_service/src/tenant_state.rs +++ b/control_plane/attachment_service/src/tenant_state.rs @@ -71,6 +71,12 @@ pub(crate) struct TenantState { /// TODO: generalize to an array of recent events /// TOOD: use a ArcSwap instead of mutex for faster reads? pub(crate) last_error: std::sync::Arc>, + + /// If we have a pending compute notification that for some reason we weren't able to send, + /// set this to true. If this is set, calls to [`Self::maybe_reconcile`] will run a task to retry + /// sending it. This is the mechanism by which compute notifications are included in the scope + /// of state that we publish externally in an eventually consistent way. + pub(crate) pending_compute_notification: bool, } #[derive(Default, Clone, Debug)] @@ -164,6 +170,9 @@ pub(crate) struct ReconcileResult { pub(crate) tenant_shard_id: TenantShardId, pub(crate) generation: Generation, pub(crate) observed: ObservedState, + + /// Set [`TenantState::pending_compute_notification`] from this flag + pub(crate) pending_compute_notification: bool, } impl IntentState { @@ -226,6 +235,7 @@ impl TenantState { waiter: Arc::new(SeqWait::new(Sequence(0))), error_waiter: Arc::new(SeqWait::new(Sequence(0))), last_error: Arc::default(), + pending_compute_notification: false, } } @@ -333,6 +343,38 @@ impl TenantState { Ok(()) } + /// Query whether the tenant's observed state for attached node matches its intent state, and if so, + /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that + /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there. + /// + /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this + /// funciton should not be used to decide whether to reconcile. + pub(crate) fn stably_attached(&self) -> Option { + if let Some(attach_intent) = self.intent.attached { + match self.observed.locations.get(&attach_intent) { + Some(loc) => match &loc.conf { + Some(conf) => match conf.mode { + LocationConfigMode::AttachedMulti + | LocationConfigMode::AttachedSingle + | LocationConfigMode::AttachedStale => { + // Our intent and observed state agree that this node is in an attached state. + Some(attach_intent) + } + // Our observed config is not an attached state + _ => None, + }, + // Our observed state is None, i.e. in flux + None => None, + }, + // We have no observed state for this node + None => None, + } + } else { + // Our intent is not to attach + None + } + } + fn dirty(&self) -> bool { if let Some(node_id) = self.intent.attached { let wanted_conf = attached_location_conf(self.generation, &self.shard, &self.config); @@ -354,6 +396,12 @@ impl TenantState { } } + // Even if there is no pageserver work to be done, if we have a pending notification to computes, + // wake up a reconciler to send it. + if self.pending_compute_notification { + return true; + } + false } @@ -415,11 +463,13 @@ impl TenantState { service_config: service_config.clone(), cancel: cancel.clone(), persistence: persistence.clone(), + compute_notify_failure: false, }; let reconcile_seq = self.sequence; tracing::info!("Spawning Reconciler for sequence {}", self.sequence); + let must_notify = self.pending_compute_notification; let join_handle = tokio::task::spawn(async move { // Wait for any previous reconcile task to complete before we start if let Some(old_handle) = old_handle { @@ -438,7 +488,16 @@ impl TenantState { return; } + // Attempt to make observed state match intent state let result = reconciler.reconcile().await; + + // If we know we had a pending compute notification from some previous action, send a notification irrespective + // of whether the above reconcile() did any work + if result.is_ok() && must_notify { + // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`] + reconciler.compute_notify().await.ok(); + } + result_tx .send(ReconcileResult { sequence: reconcile_seq, @@ -446,6 +505,7 @@ impl TenantState { tenant_shard_id: reconciler.tenant_shard_id, generation: reconciler.generation, observed: reconciler.observed, + pending_compute_notification: reconciler.compute_notify_failure, }) .ok(); }); diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index 7816d0953b..140e5c4e34 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -457,6 +457,12 @@ impl AttachmentService { args.push(format!("--public-key={public_key_path}")); } + if let Some(control_plane_compute_hook_api) = &self.env.control_plane_compute_hook_api { + args.push(format!( + "--compute-hook-url={control_plane_compute_hook_api}" + )); + } + background_process::start_process( COMMAND, &self.env.base_data_dir, diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index d5abda729f..e56007dd20 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -795,7 +795,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re &endpoint.timeline_id.to_string(), branch_name, lsn_str.as_str(), - endpoint.status(), + &format!("{}", endpoint.status()), ]); } diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index dcad22b992..b19a6a1a18 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -184,7 +184,7 @@ impl ComputeControlPlane { v.tenant_id == tenant_id && v.timeline_id == timeline_id && v.mode == mode - && v.status() != "stopped" + && v.status() != EndpointStatus::Stopped }); if let Some((key, _)) = duplicates.next() { @@ -223,6 +223,26 @@ pub struct Endpoint { features: Vec, } +#[derive(PartialEq, Eq)] +pub enum EndpointStatus { + Running, + Stopped, + Crashed, + RunningNoPidfile, +} + +impl std::fmt::Display for EndpointStatus { + fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result { + let s = match self { + Self::Running => "running", + Self::Stopped => "stopped", + Self::Crashed => "crashed", + Self::RunningNoPidfile => "running, no pidfile", + }; + write!(writer, "{}", s) + } +} + impl Endpoint { fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result { if !entry.file_type()?.is_dir() { @@ -380,16 +400,16 @@ impl Endpoint { self.endpoint_path().join("pgdata") } - pub fn status(&self) -> &str { + pub fn status(&self) -> EndpointStatus { let timeout = Duration::from_millis(300); let has_pidfile = self.pgdata().join("postmaster.pid").exists(); let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok(); match (has_pidfile, can_connect) { - (true, true) => "running", - (false, false) => "stopped", - (true, false) => "crashed", - (false, true) => "running, no pidfile", + (true, true) => EndpointStatus::Running, + (false, false) => EndpointStatus::Stopped, + (true, false) => EndpointStatus::Crashed, + (false, true) => EndpointStatus::RunningNoPidfile, } } @@ -481,7 +501,7 @@ impl Endpoint { remote_ext_config: Option<&String>, shard_stripe_size: usize, ) -> Result<()> { - if self.status() == "running" { + if self.status() == EndpointStatus::Running { anyhow::bail!("The endpoint is already running"); } diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index aefef47da7..786ea6d098 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -72,11 +72,16 @@ pub struct LocalEnv { #[serde(default)] pub safekeepers: Vec, - // Control plane location: if None, we will not run attachment_service. If set, this will + // Control plane upcall API for pageserver: if None, we will not run attachment_service. If set, this will // be propagated into each pageserver's configuration. #[serde(default)] pub control_plane_api: Option, + // Control plane upcall API for attachment service. If set, this will be propagated into the + // attachment service's configuration. + #[serde(default)] + pub control_plane_compute_hook_api: Option, + /// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user. #[serde(default)] // A `HashMap>` would be more appropriate here, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index e2a2291dbc..1e15ebe5a0 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -482,6 +482,7 @@ class NeonEnvBuilder: self.overlay_mounts_created_by_us: List[Tuple[str, Path]] = [] self.config_init_force: Optional[str] = None self.top_output_dir = top_output_dir + self.control_plane_compute_hook_api: Optional[str] = None self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine @@ -1007,6 +1008,9 @@ class NeonEnv: # The base URL of the attachment service self.attachment_service_api: str = f"http://127.0.0.1:{self.attachment_service_port}" + # For testing this with a fake HTTP server, enable passing through a URL from config + self.control_plane_compute_hook_api = config.control_plane_compute_hook_api + self.attachment_service: NeonAttachmentService = NeonAttachmentService( self, config.auth_enabled ) @@ -1026,6 +1030,9 @@ class NeonEnv: if self.control_plane_api is not None: cfg["control_plane_api"] = self.control_plane_api + if self.control_plane_compute_hook_api is not None: + cfg["control_plane_compute_hook_api"] = self.control_plane_compute_hook_api + # Create config for pageserver http_auth_type = "NeonJWT" if config.auth_enabled else "Trust" pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust" @@ -1904,7 +1911,7 @@ class Pagectl(AbstractNeonCli): class NeonAttachmentService: - def __init__(self, env: NeonEnv, auth_enabled): + def __init__(self, env: NeonEnv, auth_enabled: bool): self.env = env self.running = False self.auth_enabled = auth_enabled diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py index 3b2c9334db..346df708de 100644 --- a/test_runner/regress/test_sharding_service.py +++ b/test_runner/regress/test_sharding_service.py @@ -1,14 +1,24 @@ import time from collections import defaultdict -from fixtures.neon_fixtures import ( - NeonEnvBuilder, -) +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder from fixtures.pageserver.http import PageserverHttpClient from fixtures.pageserver.utils import tenant_delete_wait_completed, timeline_delete_wait_completed from fixtures.pg_version import PgVersion from fixtures.types import TenantId, TimelineId from fixtures.utils import wait_until +from pytest_httpserver import HTTPServer +from werkzeug.wrappers.request import Request +from werkzeug.wrappers.response import Response + + +def get_node_shard_counts(env: NeonEnv, tenant_ids): + counts: defaultdict[str, int] = defaultdict(int) + for tid in tenant_ids: + for shard in env.attachment_service.locate(tid): + counts[shard["node_id"]] += 1 + return counts def test_sharding_service_smoke( @@ -54,14 +64,7 @@ def test_sharding_service_smoke( for tid in tenant_ids: env.neon_cli.create_tenant(tid, shard_count=shards_per_tenant) - def get_node_shard_counts(): - counts: defaultdict[str, int] = defaultdict(int) - for tid in tenant_ids: - for shard in env.attachment_service.locate(tid): - counts[shard["node_id"]] += 1 - return counts - - for node_id, count in get_node_shard_counts().items(): + for node_id, count in get_node_shard_counts(env, tenant_ids).items(): # we used a multiple of pagservers for the total shard count, # so expect equal number on all pageservers assert count == tenant_shard_count / len( @@ -89,7 +92,7 @@ def test_sharding_service_smoke( env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"}) def node_evacuated(node_id: int): - counts = get_node_shard_counts() + counts = get_node_shard_counts(env, tenant_ids) assert counts[node_id] == 0 wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id)) @@ -98,7 +101,7 @@ def test_sharding_service_smoke( # immediately env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Active"}) time.sleep(1) - assert get_node_shard_counts()[env.pageservers[0].id] == 0 + assert get_node_shard_counts(env, tenant_ids)[env.pageservers[0].id] == 0 # Delete all the tenants for tid in tenant_ids: @@ -113,7 +116,7 @@ def test_sharding_service_smoke( for tid in tenant_ids: env.neon_cli.create_tenant(tid, shard_count=shards_per_tenant) - counts = get_node_shard_counts() + counts = get_node_shard_counts(env, tenant_ids) # Nothing should have been scheduled on the node in Draining assert counts[env.pageservers[1].id] == 0 assert counts[env.pageservers[0].id] == tenant_shard_count // 2 @@ -270,3 +273,73 @@ def test_sharding_service_onboarding( # The onboarded tenant should surviev a restart of pageserver dest_ps.stop() dest_ps.start() + + +def test_sharding_service_compute_hook( + httpserver: HTTPServer, + neon_env_builder: NeonEnvBuilder, + httpserver_listen_address, +): + """ + Test that the sharding service calls out to the configured HTTP endpoint on attachment changes + """ + + # We will run two pageserver to migrate and check that the attachment service sends notifications + # when migrating. + neon_env_builder.num_pageservers = 2 + (host, port) = httpserver_listen_address + neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify" + + # Set up fake HTTP notify endpoint + notifications = [] + + def handler(request: Request): + log.info(f"Notify request: {request}") + notifications.append(request.json) + return Response(status=200) + + httpserver.expect_request("/notify", method="POST").respond_with_handler(handler) + + # Start running + env = neon_env_builder.init_start() + + # We will to an unclean migration, which will result in deletion queue warnings + env.pageservers[0].allowed_errors.append(".*Dropped remote consistent LSN updates for tenant.*") + + # Initial notification from tenant creation + assert len(notifications) == 1 + expect = { + "tenant_id": str(env.initial_tenant), + "shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}], + } + + env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"}) + + def node_evacuated(node_id: int): + counts = get_node_shard_counts(env, [env.initial_tenant]) + assert counts[node_id] == 0 + + wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id)) + + # Additional notification from migration + log.info(f"notifications: {notifications}") + expect = { + "tenant_id": str(env.initial_tenant), + "shards": [{"node_id": int(env.pageservers[1].id), "shard_number": 0}], + } + + def received_migration_notification(): + assert len(notifications) == 2 + assert notifications[1] == expect + + wait_until(20, 0.25, received_migration_notification) + + # When we restart, we should re-emit notifications for all tenants + env.attachment_service.stop() + env.attachment_service.start() + + def received_restart_notification(): + assert len(notifications) == 3 + assert notifications[1] == expect + + wait_until(10, 1, received_restart_notification)