diff --git a/Cargo.lock b/Cargo.lock index 90991ab0a4..02450709d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -288,6 +288,7 @@ dependencies = [ "pageserver_api", "pageserver_client", "postgres_connection", + "reqwest", "serde", "serde_json", "thiserror", diff --git a/control_plane/attachment_service/Cargo.toml b/control_plane/attachment_service/Cargo.toml index 1d3831eea0..d3c62d74d2 100644 --- a/control_plane/attachment_service/Cargo.toml +++ b/control_plane/attachment_service/Cargo.toml @@ -16,6 +16,7 @@ hyper.workspace = true pageserver_api.workspace = true pageserver_client.workspace = true postgres_connection.workspace = true +reqwest.workspace = true serde.workspace = true serde_json.workspace = true thiserror.workspace = true diff --git a/control_plane/attachment_service/src/compute_hook.rs b/control_plane/attachment_service/src/compute_hook.rs index 02617cd065..9c1185f259 100644 --- a/control_plane/attachment_service/src/compute_hook.rs +++ b/control_plane/attachment_service/src/compute_hook.rs @@ -1,24 +1,76 @@ -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; -use control_plane::endpoint::ComputeControlPlane; +use control_plane::endpoint::{ComputeControlPlane, EndpointStatus}; use control_plane::local_env::LocalEnv; -use pageserver_api::shard::{ShardCount, ShardIndex, TenantShardId}; +use hyper::{Method, StatusCode}; +use pageserver_api::shard::{ShardCount, ShardIndex, ShardNumber, TenantShardId}; use postgres_connection::parse_host_port; -use utils::id::{NodeId, TenantId}; +use serde::{Deserialize, Serialize}; +use tokio_util::sync::CancellationToken; +use utils::{ + backoff::{self}, + id::{NodeId, TenantId}, +}; + +use crate::service::Config; + +const BUSY_DELAY: Duration = Duration::from_secs(1); +const SLOWDOWN_DELAY: Duration = Duration::from_secs(5); + +pub(crate) const API_CONCURRENCY: usize = 32; pub(super) struct ComputeHookTenant { shards: Vec<(ShardIndex, NodeId)>, } +#[derive(Serialize, Deserialize, Debug)] +struct ComputeHookNotifyRequestShard { + node_id: NodeId, + shard_number: ShardNumber, +} + +/// Request body that we send to the control plane to notify it of where a tenant is attached +#[derive(Serialize, Deserialize, Debug)] +struct ComputeHookNotifyRequest { + tenant_id: TenantId, + shards: Vec, +} + +/// Error type for attempts to call into the control plane compute notification hook +#[derive(thiserror::Error, Debug)] +pub(crate) enum NotifyError { + // Request was not send successfully, e.g. transport error + #[error("Sending request: {0}")] + Request(#[from] reqwest::Error), + // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon. + #[error("Control plane tenant busy")] + Busy, + // Explicit 429 response asking us to retry less frequently + #[error("Control plane overloaded")] + SlowDown, + // A 503 response indicates the control plane can't handle the request right now + #[error("Control plane unavailable (status {0})")] + Unavailable(StatusCode), + // API returned unexpected non-success status. We will retry, but log a warning. + #[error("Control plane returned unexpected status {0}")] + Unexpected(StatusCode), + // We shutdown while sending + #[error("Shutting down")] + ShuttingDown, + // A response indicates we will never succeed, such as 400 or 404 + #[error("Non-retryable error {0}")] + Fatal(StatusCode), +} + impl ComputeHookTenant { - pub(super) async fn maybe_reconfigure(&mut self, tenant_id: TenantId) -> anyhow::Result<()> { + async fn maybe_reconfigure(&mut self, tenant_id: TenantId) -> Option { // Find the highest shard count and drop any shards that aren't // for that shard count. let shard_count = self.shards.iter().map(|(k, _v)| k.shard_count).max(); let Some(shard_count) = shard_count else { // No shards, nothing to do. tracing::info!("ComputeHookTenant::maybe_reconfigure: no shards"); - return Ok(()); + return None; }; self.shards.retain(|(k, _v)| k.shard_count == shard_count); @@ -26,38 +78,18 @@ impl ComputeHookTenant { .sort_by_key(|(shard, _node_id)| shard.shard_number); if self.shards.len() == shard_count.0 as usize || shard_count == ShardCount(0) { - // We have pageservers for all the shards: proceed to reconfigure compute - let env = match LocalEnv::load_config() { - Ok(e) => e, - Err(e) => { - tracing::warn!( - "Couldn't load neon_local config, skipping compute update ({e})" - ); - return Ok(()); - } - }; - let cplane = ComputeControlPlane::load(env.clone()) - .expect("Error loading compute control plane"); - - let compute_pageservers = self - .shards - .iter() - .map(|(_shard, node_id)| { - let ps_conf = env - .get_pageserver_conf(*node_id) - .expect("Unknown pageserver"); - let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr) - .expect("Unable to parse listen_pg_addr"); - (pg_host, pg_port.unwrap_or(5432)) - }) - .collect::>(); - - for (endpoint_name, endpoint) in &cplane.endpoints { - if endpoint.tenant_id == tenant_id && endpoint.status() == "running" { - tracing::info!("🔁 Reconfiguring endpoint {}", endpoint_name,); - endpoint.reconfigure(compute_pageservers.clone()).await?; - } - } + // We have pageservers for all the shards: emit a configuration update + return Some(ComputeHookNotifyRequest { + tenant_id, + shards: self + .shards + .iter() + .map(|(shard, node_id)| ComputeHookNotifyRequestShard { + shard_number: shard.shard_number, + node_id: *node_id, + }) + .collect(), + }); } else { tracing::info!( "ComputeHookTenant::maybe_reconfigure: not enough shards ({}/{})", @@ -66,7 +98,7 @@ impl ComputeHookTenant { ); } - Ok(()) + None } } @@ -74,22 +106,171 @@ impl ComputeHookTenant { /// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures /// the compute connection string. pub(super) struct ComputeHook { + config: Config, state: tokio::sync::Mutex>, + authorization_header: Option, } impl ComputeHook { - pub(super) fn new() -> Self { + pub(super) fn new(config: Config) -> Self { + let authorization_header = config + .control_plane_jwt_token + .clone() + .map(|jwt| format!("Bearer {}", jwt)); + Self { state: Default::default(), + config, + authorization_header, } } + /// For test environments: use neon_local's LocalEnv to update compute + async fn do_notify_local( + &self, + reconfigure_request: ComputeHookNotifyRequest, + ) -> anyhow::Result<()> { + let env = match LocalEnv::load_config() { + Ok(e) => e, + Err(e) => { + tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})"); + return Ok(()); + } + }; + let cplane = + ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane"); + let ComputeHookNotifyRequest { tenant_id, shards } = reconfigure_request; + + let compute_pageservers = shards + .into_iter() + .map(|shard| { + let ps_conf = env + .get_pageserver_conf(shard.node_id) + .expect("Unknown pageserver"); + let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr) + .expect("Unable to parse listen_pg_addr"); + (pg_host, pg_port.unwrap_or(5432)) + }) + .collect::>(); + + for (endpoint_name, endpoint) in &cplane.endpoints { + if endpoint.tenant_id == tenant_id && endpoint.status() == EndpointStatus::Running { + tracing::info!("🔁 Reconfiguring endpoint {}", endpoint_name,); + endpoint.reconfigure(compute_pageservers.clone()).await?; + } + } + + Ok(()) + } + + async fn do_notify_iteration( + &self, + client: &reqwest::Client, + url: &String, + reconfigure_request: &ComputeHookNotifyRequest, + cancel: &CancellationToken, + ) -> Result<(), NotifyError> { + let req = client.request(Method::POST, url); + let req = if let Some(value) = &self.authorization_header { + req.header(reqwest::header::AUTHORIZATION, value) + } else { + req + }; + + tracing::debug!( + "Sending notify request to {} ({:?})", + url, + reconfigure_request + ); + let send_result = req.json(&reconfigure_request).send().await; + let response = match send_result { + Ok(r) => r, + Err(e) => return Err(e.into()), + }; + + // Treat all 2xx responses as success + if response.status() >= StatusCode::OK && response.status() < StatusCode::MULTIPLE_CHOICES { + if response.status() != StatusCode::OK { + // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so + // log a warning. + tracing::warn!( + "Unexpected 2xx response code {} from control plane", + response.status() + ); + } + + return Ok(()); + } + + // Error response codes + match response.status() { + StatusCode::TOO_MANY_REQUESTS => { + // TODO: 429 handling should be global: set some state visible to other requests + // so that they will delay before starting, rather than all notifications trying + // once before backing off. + tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled()) + .await + .ok(); + Err(NotifyError::SlowDown) + } + StatusCode::LOCKED => { + // Delay our retry if busy: the usual fast exponential backoff in backoff::retry + // is not appropriate + tokio::time::timeout(BUSY_DELAY, cancel.cancelled()) + .await + .ok(); + Err(NotifyError::Busy) + } + StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT + | StatusCode::BAD_GATEWAY => Err(NotifyError::Unavailable(response.status())), + StatusCode::BAD_REQUEST | StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => { + Err(NotifyError::Fatal(response.status())) + } + _ => Err(NotifyError::Unexpected(response.status())), + } + } + + async fn do_notify( + &self, + url: &String, + reconfigure_request: ComputeHookNotifyRequest, + cancel: &CancellationToken, + ) -> Result<(), NotifyError> { + let client = reqwest::Client::new(); + backoff::retry( + || self.do_notify_iteration(&client, url, &reconfigure_request, cancel), + |e| matches!(e, NotifyError::Fatal(_)), + 3, + 10, + "Send compute notification", + backoff::Cancel::new(cancel.clone(), || NotifyError::ShuttingDown), + ) + .await + } + + /// Call this to notify the compute (postgres) tier of new pageservers to use + /// for a tenant. notify() is called by each shard individually, and this function + /// will decide whether an update to the tenant is sent. An update is sent on the + /// condition that: + /// - We know a pageserver for every shard. + /// - All the shards have the same shard_count (i.e. we are not mid-split) + /// + /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler + /// that is cancelled. + /// + /// This function is fallible, including in the case that the control plane is transiently + /// unavailable. A limited number of retries are done internally to efficiently hide short unavailability + /// periods, but we don't retry forever. The **caller** is responsible for handling failures and + /// ensuring that they eventually call again to ensure that the compute is eventually notified of + /// the proper pageserver nodes for a tenant. + #[tracing::instrument(skip_all, fields(tenant_shard_id, node_id))] pub(super) async fn notify( &self, tenant_shard_id: TenantShardId, node_id: NodeId, - ) -> anyhow::Result<()> { - tracing::info!("ComputeHook::notify: {}->{}", tenant_shard_id, node_id); + cancel: &CancellationToken, + ) -> Result<(), NotifyError> { let mut locked = self.state.lock().await; let entry = locked .entry(tenant_shard_id.tenant_id) @@ -111,6 +292,25 @@ impl ComputeHook { entry.shards.push((shard_index, node_id)); } - entry.maybe_reconfigure(tenant_shard_id.tenant_id).await + let reconfigure_request = entry.maybe_reconfigure(tenant_shard_id.tenant_id).await; + let Some(reconfigure_request) = reconfigure_request else { + // The tenant doesn't yet have pageservers for all its shards: we won't notify anything + // until it does. + tracing::debug!("Tenant isn't yet ready to emit a notification",); + return Ok(()); + }; + + if let Some(notify_url) = &self.config.compute_hook_url { + self.do_notify(notify_url, reconfigure_request, cancel) + .await + } else { + self.do_notify_local(reconfigure_request) + .await + .map_err(|e| { + // This path is for testing only, so munge the error into our prod-style error type. + tracing::error!("Local notification hook failed: {e}"); + NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR) + }) + } } } diff --git a/control_plane/attachment_service/src/main.rs b/control_plane/attachment_service/src/main.rs index ed65437ba2..eda9c7aad6 100644 --- a/control_plane/attachment_service/src/main.rs +++ b/control_plane/attachment_service/src/main.rs @@ -35,9 +35,18 @@ struct Cli { public_key: Option, /// Token for authenticating this service with the pageservers it controls - #[arg(short, long)] + #[arg(long)] jwt_token: Option, + /// Token for authenticating this service with the control plane, when calling + /// the compute notification endpoint + #[arg(long)] + control_plane_jwt_token: Option, + + /// URL to control plane compute notification endpoint + #[arg(long)] + compute_hook_url: Option, + /// Path to the .json file to store state (will be created if it doesn't exist) #[arg(short, long)] path: Option, @@ -53,11 +62,15 @@ struct Secrets { database_url: String, public_key: Option, jwt_token: Option, + control_plane_jwt_token: Option, } impl Secrets { const DATABASE_URL_SECRET: &'static str = "rds-neon-storage-controller-url"; - const JWT_TOKEN_SECRET: &'static str = "neon-storage-controller-pageserver-jwt-token"; + const PAGESERVER_JWT_TOKEN_SECRET: &'static str = + "neon-storage-controller-pageserver-jwt-token"; + const CONTROL_PLANE_JWT_TOKEN_SECRET: &'static str = + "neon-storage-controller-control-plane-jwt-token"; const PUBLIC_KEY_SECRET: &'static str = "neon-storage-controller-public-key"; async fn load(args: &Cli) -> anyhow::Result { @@ -95,7 +108,7 @@ impl Secrets { let jwt_token = asm .get_secret_value() - .secret_id(Self::JWT_TOKEN_SECRET) + .secret_id(Self::PAGESERVER_JWT_TOKEN_SECRET) .send() .await? .secret_string() @@ -104,6 +117,17 @@ impl Secrets { tracing::warn!("No pageserver JWT token set: this will only work if authentication is disabled on the pageserver"); } + let control_plane_jwt_token = asm + .get_secret_value() + .secret_id(Self::CONTROL_PLANE_JWT_TOKEN_SECRET) + .send() + .await? + .secret_string() + .map(str::to_string); + if jwt_token.is_none() { + tracing::warn!("No control plane JWT token set: this will only work if authentication is disabled on the pageserver"); + } + let public_key = asm .get_secret_value() .secret_id(Self::PUBLIC_KEY_SECRET) @@ -125,6 +149,7 @@ impl Secrets { database_url, public_key, jwt_token, + control_plane_jwt_token, }) } @@ -137,6 +162,7 @@ impl Secrets { database_url: args.database_url.clone(), public_key, jwt_token: args.jwt_token.clone(), + control_plane_jwt_token: args.control_plane_jwt_token.clone(), }) } } @@ -165,6 +191,8 @@ async fn main() -> anyhow::Result<()> { let config = Config { jwt_token: secrets.jwt_token, + control_plane_jwt_token: secrets.control_plane_jwt_token, + compute_hook_url: args.compute_hook_url, }; let json_path = args.path; diff --git a/control_plane/attachment_service/src/reconciler.rs b/control_plane/attachment_service/src/reconciler.rs index d7f4c0406a..776e1f9d1e 100644 --- a/control_plane/attachment_service/src/reconciler.rs +++ b/control_plane/attachment_service/src/reconciler.rs @@ -14,7 +14,7 @@ use utils::generation::Generation; use utils::id::{NodeId, TimelineId}; use utils::lsn::Lsn; -use crate::compute_hook::ComputeHook; +use crate::compute_hook::{ComputeHook, NotifyError}; use crate::node::Node; use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation}; @@ -37,9 +37,15 @@ pub(super) struct Reconciler { pub(crate) pageservers: Arc>, /// A hook to notify the running postgres instances when we change the location - /// of a tenant + /// of a tenant. Use this via [`Self::compute_notify`] to update our failure flag + /// and guarantee eventual retries. pub(crate) compute_hook: Arc, + /// To avoid stalling if the cloud control plane is unavailable, we may proceed + /// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed + /// so that we can set [`crate::tenant_state::TenantState::pending_compute_notification`] to ensure a later retry. + pub(crate) compute_notify_failure: bool, + /// A means to abort background reconciliation: it is essential to /// call this when something changes in the original TenantState that /// will make this reconciliation impossible or unnecessary, for @@ -52,7 +58,9 @@ pub(super) struct Reconciler { } #[derive(thiserror::Error, Debug)] -pub enum ReconcileError { +pub(crate) enum ReconcileError { + #[error(transparent)] + Notify(#[from] NotifyError), #[error(transparent)] Other(#[from] anyhow::Error), } @@ -317,9 +325,19 @@ impl Reconciler { } tracing::info!("🔁 Notifying compute to use pageserver {}", dest_ps_id); - self.compute_hook - .notify(self.tenant_shard_id, dest_ps_id) - .await?; + + // During a live migration it is unhelpful to proceed if we couldn't notify compute: if we detach + // the origin without notifying compute, we will render the tenant unavailable. + while let Err(e) = self.compute_notify().await { + match e { + NotifyError::Fatal(_) => return Err(anyhow::anyhow!(e)), + _ => { + tracing::warn!( + "Live migration blocked by compute notification error, retrying: {e}" + ); + } + } + } // Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Single, then // this location will be deleted in the general case reconciliation that runs after this. @@ -400,15 +418,7 @@ impl Reconciler { wanted_conf.generation = self.generation.into(); tracing::info!("Observed configuration requires update."); self.location_config(node_id, wanted_conf, None).await?; - if let Err(e) = self - .compute_hook - .notify(self.tenant_shard_id, node_id) - .await - { - tracing::warn!( - "Failed to notify compute of newly attached pageserver {node_id}: {e}" - ); - } + self.compute_notify().await?; } } } @@ -461,6 +471,29 @@ impl Reconciler { Ok(()) } + + pub(crate) async fn compute_notify(&mut self) -> Result<(), NotifyError> { + // Whenever a particular Reconciler emits a notification, it is always notifying for the intended + // destination. + if let Some(node_id) = self.intent.attached { + let result = self + .compute_hook + .notify(self.tenant_shard_id, node_id, &self.cancel) + .await; + if let Err(e) = &result { + // It is up to the caller whether they want to drop out on this error, but they don't have to: + // in general we should avoid letting unavailability of the cloud control plane stop us from + // making progress. + tracing::warn!("Failed to notify compute of attached pageserver {node_id}: {e}"); + // Set this flag so that in our ReconcileResult we will set the flag on the shard that it + // needs to retry at some point. + self.compute_notify_failure = true; + } + result + } else { + Ok(()) + } + } } pub(crate) fn attached_location_conf( diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 8c6a348515..6f0e3ebb74 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -12,6 +12,7 @@ use control_plane::attachment_service::{ TenantShardMigrateRequest, TenantShardMigrateResponse, }; use diesel::result::DatabaseErrorKind; +use futures::StreamExt; use hyper::StatusCode; use pageserver_api::{ control_api::{ @@ -27,6 +28,7 @@ use pageserver_api::{ shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId}, }; use pageserver_client::mgmt_api; +use tokio_util::sync::CancellationToken; use utils::{ completion::Barrier, generation::Generation, @@ -36,7 +38,7 @@ use utils::{ }; use crate::{ - compute_hook::ComputeHook, + compute_hook::{self, ComputeHook}, node::Node, persistence::{DatabaseError, NodePersistence, Persistence, TenantShardPersistence}, scheduler::Scheduler, @@ -66,6 +68,7 @@ struct ServiceState { impl ServiceState { fn new( + config: Config, result_tx: tokio::sync::mpsc::UnboundedSender, nodes: HashMap, tenants: BTreeMap, @@ -73,7 +76,7 @@ impl ServiceState { Self { tenants, nodes: Arc::new(nodes), - compute_hook: Arc::new(ComputeHook::new()), + compute_hook: Arc::new(ComputeHook::new(config)), result_tx, } } @@ -82,8 +85,17 @@ impl ServiceState { #[derive(Clone)] pub struct Config { // All pageservers managed by one instance of this service must have - // the same public key. + // the same public key. This JWT token will be used to authenticate + // this service to the pageservers it manages. pub jwt_token: Option, + + // This JWT token will be used to authenticate this service to the control plane. + pub control_plane_jwt_token: Option, + + /// Where the compute hook should send notifications of pageserver attachment locations + /// (this URL points to the control plane in prod). If this is None, the compute hook will + /// assume it is running in a test environment and try to update neon_local. + pub compute_hook_url: Option, } impl From for ApiError { @@ -163,6 +175,8 @@ impl Service { let mut cleanup = Vec::new(); + let mut compute_notifications = Vec::new(); + // Populate intent and observed states for all tenants, based on reported state on pageservers let shard_count = { let mut locked = self.inner.write().unwrap(); @@ -187,6 +201,13 @@ impl Service { // not enough pageservers are available. The tenant may well still be available // to clients. tracing::error!("Failed to schedule tenant {tenant_shard_id} at startup: {e}"); + } else { + // If we're both intending and observed to be attached at a particular node, we will + // emit a compute notification for this. In the case where our observed state does not + // yet match our intent, we will eventually reconcile, and that will emit a compute notification. + if let Some(attached_at) = tenant_state.stably_attached() { + compute_notifications.push((*tenant_shard_id, attached_at)); + } } } @@ -235,10 +256,57 @@ impl Service { } } + // Emit compute hook notifications for all tenants which are already stably attached. Other tenants + // will emit compute hook notifications when they reconcile. + // + // Ordering: we must complete these notification attempts before doing any other reconciliation for the + // tenants named here, because otherwise our calls to notify() might race with more recent values + // generated by reconciliation. + + // Compute notify is fallible. If it fails here, do not delay overall startup: set the + // flag on these shards that they have a pending notification. + let compute_hook = self.inner.read().unwrap().compute_hook.clone(); + + // Construct an async stream of futures to invoke the compute notify function: we do this + // in order to subsequently use .buffered() on the stream to execute with bounded parallelism. + let stream = futures::stream::iter(compute_notifications.into_iter()) + .map(|(tenant_shard_id, node_id)| { + let compute_hook = compute_hook.clone(); + async move { + // TODO: give Service a cancellation token for clean shutdown + let cancel = CancellationToken::new(); + if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await { + tracing::error!( + tenant_shard_id=%tenant_shard_id, + node_id=%node_id, + "Failed to notify compute on startup for shard: {e}" + ); + Some(tenant_shard_id) + } else { + None + } + } + }) + .buffered(compute_hook::API_CONCURRENCY); + let notify_results = stream.collect::>().await; + + // Update tenant state for any that failed to do their initial compute notify, so that they'll retry later. + { + let mut locked = self.inner.write().unwrap(); + for tenant_shard_id in notify_results.into_iter().flatten() { + if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) { + shard.pending_compute_notification = true; + } + } + } + // Finally, now that the service is up and running, launch reconcile operations for any tenants // which require it: under normal circumstances this should only include tenants that were in some - // transient state before we restarted. + // transient state before we restarted, or any tenants whose compute hooks failed above. let reconcile_tasks = self.reconcile_all(); + // We will not wait for these reconciliation tasks to run here: we're now done with startup and + // normal operations may proceed. + tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)"); } @@ -295,6 +363,7 @@ impl Service { waiter: Arc::new(SeqWait::new(Sequence::initial())), error_waiter: Arc::new(SeqWait::new(Sequence::initial())), last_error: Arc::default(), + pending_compute_notification: false, }; tenants.insert(tenant_shard_id, new_tenant); @@ -304,7 +373,10 @@ impl Service { let this = Arc::new(Self { inner: Arc::new(std::sync::RwLock::new(ServiceState::new( - result_tx, nodes, tenants, + config.clone(), + result_tx, + nodes, + tenants, ))), config, persistence, @@ -330,6 +402,10 @@ impl Service { // needed, but it is used to handle out-of-band updates via. e.g. test hook. tenant.generation = std::cmp::max(tenant.generation, result.generation); + // If the reconciler signals that it failed to notify compute, set this state on + // the shard so that a future [`TenantState::maybe_reconcile`] will try again. + tenant.pending_compute_notification = result.pending_compute_notification; + match result.result { Ok(()) => { for (node_id, loc) in &result.observed.locations { diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs index 5290197d84..a358e1ff7b 100644 --- a/control_plane/attachment_service/src/tenant_state.rs +++ b/control_plane/attachment_service/src/tenant_state.rs @@ -71,6 +71,12 @@ pub(crate) struct TenantState { /// TODO: generalize to an array of recent events /// TOOD: use a ArcSwap instead of mutex for faster reads? pub(crate) last_error: std::sync::Arc>, + + /// If we have a pending compute notification that for some reason we weren't able to send, + /// set this to true. If this is set, calls to [`Self::maybe_reconcile`] will run a task to retry + /// sending it. This is the mechanism by which compute notifications are included in the scope + /// of state that we publish externally in an eventually consistent way. + pub(crate) pending_compute_notification: bool, } #[derive(Default, Clone, Debug)] @@ -164,6 +170,9 @@ pub(crate) struct ReconcileResult { pub(crate) tenant_shard_id: TenantShardId, pub(crate) generation: Generation, pub(crate) observed: ObservedState, + + /// Set [`TenantState::pending_compute_notification`] from this flag + pub(crate) pending_compute_notification: bool, } impl IntentState { @@ -226,6 +235,7 @@ impl TenantState { waiter: Arc::new(SeqWait::new(Sequence(0))), error_waiter: Arc::new(SeqWait::new(Sequence(0))), last_error: Arc::default(), + pending_compute_notification: false, } } @@ -333,6 +343,38 @@ impl TenantState { Ok(()) } + /// Query whether the tenant's observed state for attached node matches its intent state, and if so, + /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that + /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there. + /// + /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this + /// funciton should not be used to decide whether to reconcile. + pub(crate) fn stably_attached(&self) -> Option { + if let Some(attach_intent) = self.intent.attached { + match self.observed.locations.get(&attach_intent) { + Some(loc) => match &loc.conf { + Some(conf) => match conf.mode { + LocationConfigMode::AttachedMulti + | LocationConfigMode::AttachedSingle + | LocationConfigMode::AttachedStale => { + // Our intent and observed state agree that this node is in an attached state. + Some(attach_intent) + } + // Our observed config is not an attached state + _ => None, + }, + // Our observed state is None, i.e. in flux + None => None, + }, + // We have no observed state for this node + None => None, + } + } else { + // Our intent is not to attach + None + } + } + fn dirty(&self) -> bool { if let Some(node_id) = self.intent.attached { let wanted_conf = attached_location_conf(self.generation, &self.shard, &self.config); @@ -354,6 +396,12 @@ impl TenantState { } } + // Even if there is no pageserver work to be done, if we have a pending notification to computes, + // wake up a reconciler to send it. + if self.pending_compute_notification { + return true; + } + false } @@ -415,11 +463,13 @@ impl TenantState { service_config: service_config.clone(), cancel: cancel.clone(), persistence: persistence.clone(), + compute_notify_failure: false, }; let reconcile_seq = self.sequence; tracing::info!("Spawning Reconciler for sequence {}", self.sequence); + let must_notify = self.pending_compute_notification; let join_handle = tokio::task::spawn(async move { // Wait for any previous reconcile task to complete before we start if let Some(old_handle) = old_handle { @@ -438,7 +488,16 @@ impl TenantState { return; } + // Attempt to make observed state match intent state let result = reconciler.reconcile().await; + + // If we know we had a pending compute notification from some previous action, send a notification irrespective + // of whether the above reconcile() did any work + if result.is_ok() && must_notify { + // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`] + reconciler.compute_notify().await.ok(); + } + result_tx .send(ReconcileResult { sequence: reconcile_seq, @@ -446,6 +505,7 @@ impl TenantState { tenant_shard_id: reconciler.tenant_shard_id, generation: reconciler.generation, observed: reconciler.observed, + pending_compute_notification: reconciler.compute_notify_failure, }) .ok(); }); diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index 7816d0953b..140e5c4e34 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -457,6 +457,12 @@ impl AttachmentService { args.push(format!("--public-key={public_key_path}")); } + if let Some(control_plane_compute_hook_api) = &self.env.control_plane_compute_hook_api { + args.push(format!( + "--compute-hook-url={control_plane_compute_hook_api}" + )); + } + background_process::start_process( COMMAND, &self.env.base_data_dir, diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index d5abda729f..e56007dd20 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -795,7 +795,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re &endpoint.timeline_id.to_string(), branch_name, lsn_str.as_str(), - endpoint.status(), + &format!("{}", endpoint.status()), ]); } diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index dcad22b992..b19a6a1a18 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -184,7 +184,7 @@ impl ComputeControlPlane { v.tenant_id == tenant_id && v.timeline_id == timeline_id && v.mode == mode - && v.status() != "stopped" + && v.status() != EndpointStatus::Stopped }); if let Some((key, _)) = duplicates.next() { @@ -223,6 +223,26 @@ pub struct Endpoint { features: Vec, } +#[derive(PartialEq, Eq)] +pub enum EndpointStatus { + Running, + Stopped, + Crashed, + RunningNoPidfile, +} + +impl std::fmt::Display for EndpointStatus { + fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result { + let s = match self { + Self::Running => "running", + Self::Stopped => "stopped", + Self::Crashed => "crashed", + Self::RunningNoPidfile => "running, no pidfile", + }; + write!(writer, "{}", s) + } +} + impl Endpoint { fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result { if !entry.file_type()?.is_dir() { @@ -380,16 +400,16 @@ impl Endpoint { self.endpoint_path().join("pgdata") } - pub fn status(&self) -> &str { + pub fn status(&self) -> EndpointStatus { let timeout = Duration::from_millis(300); let has_pidfile = self.pgdata().join("postmaster.pid").exists(); let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok(); match (has_pidfile, can_connect) { - (true, true) => "running", - (false, false) => "stopped", - (true, false) => "crashed", - (false, true) => "running, no pidfile", + (true, true) => EndpointStatus::Running, + (false, false) => EndpointStatus::Stopped, + (true, false) => EndpointStatus::Crashed, + (false, true) => EndpointStatus::RunningNoPidfile, } } @@ -481,7 +501,7 @@ impl Endpoint { remote_ext_config: Option<&String>, shard_stripe_size: usize, ) -> Result<()> { - if self.status() == "running" { + if self.status() == EndpointStatus::Running { anyhow::bail!("The endpoint is already running"); } diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index aefef47da7..786ea6d098 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -72,11 +72,16 @@ pub struct LocalEnv { #[serde(default)] pub safekeepers: Vec, - // Control plane location: if None, we will not run attachment_service. If set, this will + // Control plane upcall API for pageserver: if None, we will not run attachment_service. If set, this will // be propagated into each pageserver's configuration. #[serde(default)] pub control_plane_api: Option, + // Control plane upcall API for attachment service. If set, this will be propagated into the + // attachment service's configuration. + #[serde(default)] + pub control_plane_compute_hook_api: Option, + /// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user. #[serde(default)] // A `HashMap>` would be more appropriate here, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index e2a2291dbc..1e15ebe5a0 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -482,6 +482,7 @@ class NeonEnvBuilder: self.overlay_mounts_created_by_us: List[Tuple[str, Path]] = [] self.config_init_force: Optional[str] = None self.top_output_dir = top_output_dir + self.control_plane_compute_hook_api: Optional[str] = None self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine @@ -1007,6 +1008,9 @@ class NeonEnv: # The base URL of the attachment service self.attachment_service_api: str = f"http://127.0.0.1:{self.attachment_service_port}" + # For testing this with a fake HTTP server, enable passing through a URL from config + self.control_plane_compute_hook_api = config.control_plane_compute_hook_api + self.attachment_service: NeonAttachmentService = NeonAttachmentService( self, config.auth_enabled ) @@ -1026,6 +1030,9 @@ class NeonEnv: if self.control_plane_api is not None: cfg["control_plane_api"] = self.control_plane_api + if self.control_plane_compute_hook_api is not None: + cfg["control_plane_compute_hook_api"] = self.control_plane_compute_hook_api + # Create config for pageserver http_auth_type = "NeonJWT" if config.auth_enabled else "Trust" pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust" @@ -1904,7 +1911,7 @@ class Pagectl(AbstractNeonCli): class NeonAttachmentService: - def __init__(self, env: NeonEnv, auth_enabled): + def __init__(self, env: NeonEnv, auth_enabled: bool): self.env = env self.running = False self.auth_enabled = auth_enabled diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py index 3b2c9334db..346df708de 100644 --- a/test_runner/regress/test_sharding_service.py +++ b/test_runner/regress/test_sharding_service.py @@ -1,14 +1,24 @@ import time from collections import defaultdict -from fixtures.neon_fixtures import ( - NeonEnvBuilder, -) +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder from fixtures.pageserver.http import PageserverHttpClient from fixtures.pageserver.utils import tenant_delete_wait_completed, timeline_delete_wait_completed from fixtures.pg_version import PgVersion from fixtures.types import TenantId, TimelineId from fixtures.utils import wait_until +from pytest_httpserver import HTTPServer +from werkzeug.wrappers.request import Request +from werkzeug.wrappers.response import Response + + +def get_node_shard_counts(env: NeonEnv, tenant_ids): + counts: defaultdict[str, int] = defaultdict(int) + for tid in tenant_ids: + for shard in env.attachment_service.locate(tid): + counts[shard["node_id"]] += 1 + return counts def test_sharding_service_smoke( @@ -54,14 +64,7 @@ def test_sharding_service_smoke( for tid in tenant_ids: env.neon_cli.create_tenant(tid, shard_count=shards_per_tenant) - def get_node_shard_counts(): - counts: defaultdict[str, int] = defaultdict(int) - for tid in tenant_ids: - for shard in env.attachment_service.locate(tid): - counts[shard["node_id"]] += 1 - return counts - - for node_id, count in get_node_shard_counts().items(): + for node_id, count in get_node_shard_counts(env, tenant_ids).items(): # we used a multiple of pagservers for the total shard count, # so expect equal number on all pageservers assert count == tenant_shard_count / len( @@ -89,7 +92,7 @@ def test_sharding_service_smoke( env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"}) def node_evacuated(node_id: int): - counts = get_node_shard_counts() + counts = get_node_shard_counts(env, tenant_ids) assert counts[node_id] == 0 wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id)) @@ -98,7 +101,7 @@ def test_sharding_service_smoke( # immediately env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Active"}) time.sleep(1) - assert get_node_shard_counts()[env.pageservers[0].id] == 0 + assert get_node_shard_counts(env, tenant_ids)[env.pageservers[0].id] == 0 # Delete all the tenants for tid in tenant_ids: @@ -113,7 +116,7 @@ def test_sharding_service_smoke( for tid in tenant_ids: env.neon_cli.create_tenant(tid, shard_count=shards_per_tenant) - counts = get_node_shard_counts() + counts = get_node_shard_counts(env, tenant_ids) # Nothing should have been scheduled on the node in Draining assert counts[env.pageservers[1].id] == 0 assert counts[env.pageservers[0].id] == tenant_shard_count // 2 @@ -270,3 +273,73 @@ def test_sharding_service_onboarding( # The onboarded tenant should surviev a restart of pageserver dest_ps.stop() dest_ps.start() + + +def test_sharding_service_compute_hook( + httpserver: HTTPServer, + neon_env_builder: NeonEnvBuilder, + httpserver_listen_address, +): + """ + Test that the sharding service calls out to the configured HTTP endpoint on attachment changes + """ + + # We will run two pageserver to migrate and check that the attachment service sends notifications + # when migrating. + neon_env_builder.num_pageservers = 2 + (host, port) = httpserver_listen_address + neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify" + + # Set up fake HTTP notify endpoint + notifications = [] + + def handler(request: Request): + log.info(f"Notify request: {request}") + notifications.append(request.json) + return Response(status=200) + + httpserver.expect_request("/notify", method="POST").respond_with_handler(handler) + + # Start running + env = neon_env_builder.init_start() + + # We will to an unclean migration, which will result in deletion queue warnings + env.pageservers[0].allowed_errors.append(".*Dropped remote consistent LSN updates for tenant.*") + + # Initial notification from tenant creation + assert len(notifications) == 1 + expect = { + "tenant_id": str(env.initial_tenant), + "shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}], + } + + env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"}) + + def node_evacuated(node_id: int): + counts = get_node_shard_counts(env, [env.initial_tenant]) + assert counts[node_id] == 0 + + wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id)) + + # Additional notification from migration + log.info(f"notifications: {notifications}") + expect = { + "tenant_id": str(env.initial_tenant), + "shards": [{"node_id": int(env.pageservers[1].id), "shard_number": 0}], + } + + def received_migration_notification(): + assert len(notifications) == 2 + assert notifications[1] == expect + + wait_until(20, 0.25, received_migration_notification) + + # When we restart, we should re-emit notifications for all tenants + env.attachment_service.stop() + env.attachment_service.start() + + def received_restart_notification(): + assert len(notifications) == 3 + assert notifications[1] == expect + + wait_until(10, 1, received_restart_notification)