diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 4082af3fe6..0236496c61 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -21,11 +21,11 @@ use pageserver_api::{ ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, ValidateResponse, ValidateResponseTenant, }, - models, models::{ - LocationConfig, LocationConfigMode, ShardParameters, TenantConfig, TenantCreateRequest, - TenantLocationConfigRequest, TenantLocationConfigResponse, TenantShardLocation, - TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo, + self, LocationConfig, LocationConfigListResponse, LocationConfigMode, ShardParameters, + TenantConfig, TenantCreateRequest, TenantLocationConfigRequest, + TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest, + TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo, }, shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId}, }; @@ -167,84 +167,53 @@ impl Service { /// Called once on startup, this function attempts to contact all pageservers to build an up-to-date /// view of the world, and determine which pageservers are responsive. #[instrument(skip_all)] - async fn startup_reconcile(&self) { + async fn startup_reconcile(self: &Arc) { // For all tenant shards, a vector of observed states on nodes (where None means // indeterminate, same as in [`ObservedStateLocation`]) let mut observed = HashMap::new(); let mut nodes_online = HashSet::new(); - // TODO: issue these requests concurrently - { - let nodes = { - let locked = self.inner.read().unwrap(); - locked.nodes.clone() - }; - for node in nodes.values() { - let http_client = reqwest::ClientBuilder::new() - .timeout(Duration::from_secs(5)) - .build() - .expect("Failed to construct HTTP client"); - let client = mgmt_api::Client::from_client( - http_client, - node.base_url(), - self.config.jwt_token.as_deref(), - ); + // Startup reconciliation does I/O to other services: whether they + // are responsive or not, we should aim to finish within our deadline, because: + // - If we don't, a k8s readiness hook watching /ready will kill us. + // - While we're waiting for startup reconciliation, we are not fully + // available for end user operations like creating/deleting tenants and timelines. + // + // We set multiple deadlines to break up the time available between the phases of work: this is + // arbitrary, but avoids a situation where the first phase could burn our entire timeout period. + let start_at = Instant::now(); + let node_scan_deadline = start_at + .checked_add(STARTUP_RECONCILE_TIMEOUT / 2) + .expect("Reconcile timeout is a modest constant"); - fn is_fatal(e: &mgmt_api::Error) -> bool { - use mgmt_api::Error::*; - match e { - ReceiveBody(_) | ReceiveErrorBody(_) => false, - ApiError(StatusCode::SERVICE_UNAVAILABLE, _) - | ApiError(StatusCode::GATEWAY_TIMEOUT, _) - | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false, - ApiError(_, _) => true, - } - } + let compute_notify_deadline = start_at + .checked_add((STARTUP_RECONCILE_TIMEOUT / 4) * 3) + .expect("Reconcile timeout is a modest constant"); - let list_response = backoff::retry( - || client.list_location_config(), - is_fatal, - 1, - 5, - "Location config listing", - &self.cancel, - ) - .await; - let Some(list_response) = list_response else { - tracing::info!("Shutdown during startup_reconcile"); - return; - }; + // Accumulate a list of any tenant locations that ought to be detached + let mut cleanup = Vec::new(); - tracing::info!("Scanning shards on node {}...", node.id); - match list_response { - Err(e) => { - tracing::warn!("Could not contact pageserver {} ({e})", node.id); - // TODO: be more tolerant, do some retries, in case - // pageserver is being restarted at the same time as we are - } - Ok(listing) => { - tracing::info!( - "Received {} shard statuses from pageserver {}, setting it to Active", - listing.tenant_shards.len(), - node.id - ); - nodes_online.insert(node.id); + let node_listings = self.scan_node_locations(node_scan_deadline).await; + for (node_id, list_response) in node_listings { + let tenant_shards = list_response.tenant_shards; + tracing::info!( + "Received {} shard statuses from pageserver {}, setting it to Active", + tenant_shards.len(), + node_id + ); + nodes_online.insert(node_id); - for (tenant_shard_id, conf_opt) in listing.tenant_shards { - observed.insert(tenant_shard_id, (node.id, conf_opt)); - } - } - } + for (tenant_shard_id, conf_opt) in tenant_shards { + observed.insert(tenant_shard_id, (node_id, conf_opt)); } } - let mut cleanup = Vec::new(); - + // List of tenants for which we will attempt to notify compute of their location at startup let mut compute_notifications = Vec::new(); // Populate intent and observed states for all tenants, based on reported state on pageservers - let (shard_count, nodes) = { + let shard_count = { let mut locked = self.inner.write().unwrap(); let (nodes, tenants, scheduler) = locked.parts_mut(); @@ -288,18 +257,171 @@ impl Service { } } - (tenants.len(), nodes.clone()) + tenants.len() }; // TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that // generation_pageserver in the database. - // Clean up any tenants that were found on pageservers but are not known to us. + // Emit compute hook notifications for all tenants which are already stably attached. Other tenants + // will emit compute hook notifications when they reconcile. + // + // Ordering: we must complete these notification attempts before doing any other reconciliation for the + // tenants named here, because otherwise our calls to notify() might race with more recent values + // generated by reconciliation. + let notify_failures = self + .compute_notify_many(compute_notifications, compute_notify_deadline) + .await; + + // Compute notify is fallible. If it fails here, do not delay overall startup: set the + // flag on these shards that they have a pending notification. + // Update tenant state for any that failed to do their initial compute notify, so that they'll retry later. + { + let mut locked = self.inner.write().unwrap(); + for tenant_shard_id in notify_failures.into_iter() { + if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) { + shard.pending_compute_notification = true; + } + } + } + + // Finally, now that the service is up and running, launch reconcile operations for any tenants + // which require it: under normal circumstances this should only include tenants that were in some + // transient state before we restarted, or any tenants whose compute hooks failed above. + let reconcile_tasks = self.reconcile_all(); + // We will not wait for these reconciliation tasks to run here: we're now done with startup and + // normal operations may proceed. + + // Clean up any tenants that were found on pageservers but are not known to us. Do this in the + // background because it does not need to complete in order to proceed with other work. + if !cleanup.is_empty() { + tracing::info!("Cleaning up {} locations in the background", cleanup.len()); + tokio::task::spawn({ + let cleanup_self = self.clone(); + async move { cleanup_self.cleanup_locations(cleanup).await } + }); + } + + tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)"); + } + + /// Used during [`Self::startup_reconcile`]: issue GETs to all nodes concurrently, with a deadline. + /// + /// The result includes only nodes which responded within the deadline + async fn scan_node_locations( + &self, + deadline: Instant, + ) -> HashMap { + let nodes = { + let locked = self.inner.read().unwrap(); + locked.nodes.clone() + }; + + let mut node_results = HashMap::new(); + + let mut node_list_futs = FuturesUnordered::new(); + + for node in nodes.values() { + node_list_futs.push({ + async move { + let http_client = reqwest::ClientBuilder::new() + .timeout(Duration::from_secs(5)) + .build() + .expect("Failed to construct HTTP client"); + let client = mgmt_api::Client::from_client( + http_client, + node.base_url(), + self.config.jwt_token.as_deref(), + ); + + fn is_fatal(e: &mgmt_api::Error) -> bool { + use mgmt_api::Error::*; + match e { + ReceiveBody(_) | ReceiveErrorBody(_) => false, + ApiError(StatusCode::SERVICE_UNAVAILABLE, _) + | ApiError(StatusCode::GATEWAY_TIMEOUT, _) + | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false, + ApiError(_, _) => true, + } + } + + tracing::info!("Scanning shards on node {}...", node.id); + let description = format!("List locations on {}", node.id); + let response = backoff::retry( + || client.list_location_config(), + is_fatal, + 1, + 5, + &description, + &self.cancel, + ) + .await; + + (node.id, response) + } + }); + } + + loop { + let (node_id, result) = tokio::select! { + next = node_list_futs.next() => { + match next { + Some(result) => result, + None =>{ + // We got results for all our nodes + break; + } + + } + }, + _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => { + // Give up waiting for anyone who hasn't responded: we will yield the results that we have + tracing::info!("Reached deadline while waiting for nodes to respond to location listing requests"); + break; + } + }; + + let Some(list_response) = result else { + tracing::info!("Shutdown during startup_reconcile"); + break; + }; + + match list_response { + Err(e) => { + tracing::warn!("Could not scan node {} ({e})", node_id); + } + Ok(listing) => { + node_results.insert(node_id, listing); + } + } + } + + node_results + } + + /// Used during [`Self::startup_reconcile`]: detach a list of unknown-to-us tenants from pageservers. + /// + /// This is safe to run in the background, because if we don't have this TenantShardId in our map of + /// tenants, then it is probably something incompletely deleted before: we will not fight with any + /// other task trying to attach it. + #[instrument(skip_all)] + async fn cleanup_locations(&self, cleanup: Vec<(TenantShardId, NodeId)>) { + let nodes = self.inner.read().unwrap().nodes.clone(); + for (tenant_shard_id, node_id) in cleanup { // A node reported a tenant_shard_id which is unknown to us: detach it. - let node = nodes - .get(&node_id) - .expect("Always exists: only known nodes are scanned"); + let Some(node) = nodes.get(&node_id) else { + // This is legitimate; we run in the background and [`Self::startup_reconcile`] might have identified + // a location to clean up on a node that has since been removed. + tracing::info!( + "Not cleaning up location {node_id}/{tenant_shard_id}: node not found" + ); + continue; + }; + + if self.cancel.is_cancelled() { + break; + } let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref()); match client @@ -332,21 +454,24 @@ impl Service { } } } + } - // Emit compute hook notifications for all tenants which are already stably attached. Other tenants - // will emit compute hook notifications when they reconcile. - // - // Ordering: we must complete these notification attempts before doing any other reconciliation for the - // tenants named here, because otherwise our calls to notify() might race with more recent values - // generated by reconciliation. - - // Compute notify is fallible. If it fails here, do not delay overall startup: set the - // flag on these shards that they have a pending notification. + /// Used during [`Self::startup_reconcile`]: issue many concurrent compute notifications. + /// + /// Returns a set of any shards for which notifications where not acked within the deadline. + async fn compute_notify_many( + &self, + notifications: Vec<(TenantShardId, NodeId)>, + deadline: Instant, + ) -> HashSet { let compute_hook = self.inner.read().unwrap().compute_hook.clone(); + let attempt_shards = notifications.iter().map(|i| i.0).collect::>(); + let mut success_shards = HashSet::new(); + // Construct an async stream of futures to invoke the compute notify function: we do this // in order to subsequently use .buffered() on the stream to execute with bounded parallelism. - let stream = futures::stream::iter(compute_notifications.into_iter()) + let mut stream = futures::stream::iter(notifications.into_iter()) .map(|(tenant_shard_id, node_id)| { let compute_hook = compute_hook.clone(); let cancel = self.cancel.clone(); @@ -357,33 +482,43 @@ impl Service { node_id=%node_id, "Failed to notify compute on startup for shard: {e}" ); - Some(tenant_shard_id) - } else { None + } else { + Some(tenant_shard_id) } } }) .buffered(compute_hook::API_CONCURRENCY); - let notify_results = stream.collect::>().await; - // Update tenant state for any that failed to do their initial compute notify, so that they'll retry later. - { - let mut locked = self.inner.write().unwrap(); - for tenant_shard_id in notify_results.into_iter().flatten() { - if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) { - shard.pending_compute_notification = true; + loop { + tokio::select! { + next = stream.next() => { + match next { + Some(Some(success_shard)) => { + // A notification succeeded + success_shards.insert(success_shard); + }, + Some(None) => { + // A notification that failed + }, + None => { + tracing::info!("Successfully sent all compute notifications"); + break; + } + } + }, + _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => { + // Give up sending any that didn't succeed yet + tracing::info!("Reached deadline while sending compute notifications"); + break; } - } + }; } - // Finally, now that the service is up and running, launch reconcile operations for any tenants - // which require it: under normal circumstances this should only include tenants that were in some - // transient state before we restarted, or any tenants whose compute hooks failed above. - let reconcile_tasks = self.reconcile_all(); - // We will not wait for these reconciliation tasks to run here: we're now done with startup and - // normal operations may proceed. - - tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)"); + attempt_shards + .difference(&success_shards) + .cloned() + .collect() } /// Long running background task that periodically wakes up and looks for shards that need