diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs index 38eecaf7ef..8501e4980f 100644 --- a/control_plane/attachment_service/src/http.rs +++ b/control_plane/attachment_service/src/http.rs @@ -42,7 +42,7 @@ pub struct HttpState { impl HttpState { pub fn new(service: Arc, auth: Option>) -> Self { - let allowlist_routes = ["/status"] + let allowlist_routes = ["/status", "/ready", "/metrics"] .iter() .map(|v| v.parse().unwrap()) .collect::>(); @@ -325,6 +325,17 @@ async fn handle_status(_req: Request) -> Result, ApiError> json_response(StatusCode::OK, ()) } +/// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling +/// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe. +async fn handle_ready(req: Request) -> Result, ApiError> { + let state = get_state(&req); + if state.service.startup_complete.is_ready() { + json_response(StatusCode::OK, ()) + } else { + json_response(StatusCode::SERVICE_UNAVAILABLE, ()) + } +} + impl From for ApiError { fn from(value: ReconcileError) -> Self { ApiError::Conflict(format!("Reconciliation error: {}", value)) @@ -380,6 +391,7 @@ pub fn make_router( .data(Arc::new(HttpState::new(service, auth))) // Non-prefixed generic endpoints (status, metrics) .get("/status", |r| request_span(r, handle_status)) + .get("/ready", |r| request_span(r, handle_ready)) // Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix .post("/upcall/v1/re-attach", |r| { request_span(r, handle_re_attach) diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 0ec2b9dc4c..0331087e0d 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -1,6 +1,6 @@ use std::{ cmp::Ordering, - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashMap, HashSet}, str::FromStr, sync::Arc, time::{Duration, Instant}, @@ -31,6 +31,7 @@ use pageserver_api::{ use pageserver_client::mgmt_api; use tokio_util::sync::CancellationToken; use utils::{ + backoff, completion::Barrier, generation::Generation, http::error::ApiError, @@ -150,31 +151,71 @@ impl Service { // indeterminate, same as in [`ObservedStateLocation`]) let mut observed = HashMap::new(); - let nodes = { - let locked = self.inner.read().unwrap(); - locked.nodes.clone() - }; + let mut nodes_online = HashSet::new(); + + // TODO: give Service a cancellation token for clean shutdown + let cancel = CancellationToken::new(); // TODO: issue these requests concurrently - for node in nodes.values() { - let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref()); + { + let nodes = { + let locked = self.inner.read().unwrap(); + locked.nodes.clone() + }; + for node in nodes.values() { + let http_client = reqwest::ClientBuilder::new() + .timeout(Duration::from_secs(5)) + .build() + .expect("Failed to construct HTTP client"); + let client = mgmt_api::Client::from_client( + http_client, + node.base_url(), + self.config.jwt_token.as_deref(), + ); - tracing::info!("Scanning shards on node {}...", node.id); - match client.list_location_config().await { - Err(e) => { - tracing::warn!("Could not contact pageserver {} ({e})", node.id); - // TODO: be more tolerant, apply a generous 5-10 second timeout with retries, in case - // pageserver is being restarted at the same time as we are + fn is_fatal(e: &mgmt_api::Error) -> bool { + use mgmt_api::Error::*; + match e { + ReceiveBody(_) | ReceiveErrorBody(_) => false, + ApiError(StatusCode::SERVICE_UNAVAILABLE, _) + | ApiError(StatusCode::GATEWAY_TIMEOUT, _) + | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false, + ApiError(_, _) => true, + } } - Ok(listing) => { - tracing::info!( - "Received {} shard statuses from pageserver {}, setting it to Active", - listing.tenant_shards.len(), - node.id - ); - for (tenant_shard_id, conf_opt) in listing.tenant_shards { - observed.insert(tenant_shard_id, (node.id, conf_opt)); + let list_response = backoff::retry( + || client.list_location_config(), + is_fatal, + 1, + 5, + "Location config listing", + &cancel, + ) + .await; + let Some(list_response) = list_response else { + tracing::info!("Shutdown during startup_reconcile"); + return; + }; + + tracing::info!("Scanning shards on node {}...", node.id); + match list_response { + Err(e) => { + tracing::warn!("Could not contact pageserver {} ({e})", node.id); + // TODO: be more tolerant, do some retries, in case + // pageserver is being restarted at the same time as we are + } + Ok(listing) => { + tracing::info!( + "Received {} shard statuses from pageserver {}, setting it to Active", + listing.tenant_shards.len(), + node.id + ); + nodes_online.insert(node.id); + + for (tenant_shard_id, conf_opt) in listing.tenant_shards { + observed.insert(tenant_shard_id, (node.id, conf_opt)); + } } } } @@ -185,8 +226,19 @@ impl Service { let mut compute_notifications = Vec::new(); // Populate intent and observed states for all tenants, based on reported state on pageservers - let shard_count = { + let (shard_count, nodes) = { let mut locked = self.inner.write().unwrap(); + + // Mark nodes online if they responded to us: nodes are offline by default after a restart. + let mut nodes = (*locked.nodes).clone(); + for (node_id, node) in nodes.iter_mut() { + if nodes_online.contains(node_id) { + node.availability = NodeAvailability::Active; + } + } + locked.nodes = Arc::new(nodes); + let nodes = locked.nodes.clone(); + for (tenant_shard_id, (node_id, observed_loc)) in observed { let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else { cleanup.push((tenant_shard_id, node_id)); @@ -218,7 +270,7 @@ impl Service { } } - locked.tenants.len() + (locked.tenants.len(), nodes) }; // TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that @@ -279,9 +331,8 @@ impl Service { let stream = futures::stream::iter(compute_notifications.into_iter()) .map(|(tenant_shard_id, node_id)| { let compute_hook = compute_hook.clone(); + let cancel = cancel.clone(); async move { - // TODO: give Service a cancellation token for clean shutdown - let cancel = CancellationToken::new(); if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await { tracing::error!( tenant_shard_id=%tenant_shard_id, @@ -387,7 +438,7 @@ impl Service { ))), config, persistence, - startup_complete, + startup_complete: startup_complete.clone(), }); let result_task_this = this.clone(); @@ -984,6 +1035,10 @@ impl Service { } }; + // TODO: if we timeout/fail on reconcile, we should still succeed this request, + // because otherwise a broken compute hook causes a feedback loop where + // location_config returns 500 and gets retried forever. + if let Some(create_req) = maybe_create { let create_resp = self.tenant_create(create_req).await?; result.shards = create_resp diff --git a/libs/utils/src/completion.rs b/libs/utils/src/completion.rs index ca6827c9b8..ea05cf54b1 100644 --- a/libs/utils/src/completion.rs +++ b/libs/utils/src/completion.rs @@ -27,6 +27,11 @@ impl Barrier { b.wait().await } } + + /// Return true if a call to wait() would complete immediately + pub fn is_ready(&self) -> bool { + futures::future::FutureExt::now_or_never(self.0.wait()).is_some() + } } impl PartialEq for Barrier { diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 200369df90..baea747d3c 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -56,10 +56,18 @@ pub enum ForceAwaitLogicalSize { impl Client { pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self { + Self::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt) + } + + pub fn from_client( + client: reqwest::Client, + mgmt_api_endpoint: String, + jwt: Option<&str>, + ) -> Self { Self { mgmt_api_endpoint, authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")), - client: reqwest::Client::new(), + client, } } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 3d2549a8c3..0af8098cad 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1949,6 +1949,15 @@ class NeonAttachmentService: return headers + def ready(self) -> bool: + resp = self.request("GET", f"{self.env.attachment_service_api}/ready") + if resp.status_code == 503: + return False + elif resp.status_code == 200: + return True + else: + raise RuntimeError(f"Unexpected status {resp.status_code} from readiness endpoint") + def attach_hook_issue( self, tenant_shard_id: Union[TenantId, TenantShardId], pageserver_id: int ) -> int: diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py index fd811a9d02..babb0d261c 100644 --- a/test_runner/regress/test_sharding_service.py +++ b/test_runner/regress/test_sharding_service.py @@ -128,6 +128,38 @@ def test_sharding_service_smoke( assert counts[env.pageservers[2].id] == tenant_shard_count // 2 +def test_node_status_after_restart( + neon_env_builder: NeonEnvBuilder, +): + neon_env_builder.num_pageservers = 2 + env = neon_env_builder.init_start() + + # Initially we have two online pageservers + nodes = env.attachment_service.node_list() + assert len(nodes) == 2 + + env.pageservers[1].stop() + + env.attachment_service.stop() + env.attachment_service.start() + + # Initially readiness check should fail because we're trying to connect to the offline node + assert env.attachment_service.ready() is False + + def is_ready(): + assert env.attachment_service.ready() is True + + wait_until(30, 1, is_ready) + + # We loaded nodes from database on restart + nodes = env.attachment_service.node_list() + assert len(nodes) == 2 + + # We should still be able to create a tenant, because the pageserver which is still online + # should have had its availabilty state set to Active. + env.attachment_service.tenant_create(TenantId.generate()) + + def test_sharding_service_passthrough( neon_env_builder: NeonEnvBuilder, ):