From f2e5212fed2d806c7a02e5c7456f24557fba06ac Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 16 Feb 2024 13:00:53 +0000 Subject: [PATCH] storage controller: background reconcile, graceful shutdown, better logging (#6709) ## Problem Now that the storage controller is working end to end, we start burning down the robustness aspects. ## Summary of changes - Add a background task that periodically calls `reconcile_all`. This ensures that if earlier operations couldn't succeed (e.g. because a node was unavailable), we will eventually retry. This is a naive initial implementation can start an unlimited number of reconcile tasks: limiting reconcile concurrency is a later item in #6342 - Add a number of tracing spans in key locations: each background task, each reconciler task. - Add a top level CancellationToken and Gate, and use these to implement a graceful shutdown that waits for tasks to shut down. This is not bulletproof yet, because within these tasks we have remote HTTP calls that aren't wrapped in cancellation/timeouts, but it creates the structure, and if we don't shutdown promptly then k8s will kill us. - To protect shard splits from background reconciliation, expose the `SplitState` in memory and use it to guard any APIs that require an attached tenant. --- control_plane/attachment_service/Cargo.toml | 5 + .../attachment_service/src/compute_hook.rs | 8 +- control_plane/attachment_service/src/lib.rs | 6 + control_plane/attachment_service/src/main.rs | 28 +- .../attachment_service/src/reconciler.rs | 5 + .../attachment_service/src/scheduler.rs | 9 +- .../attachment_service/src/service.rs | 282 +++++++++++++----- .../attachment_service/src/tenant_state.rs | 116 ++++--- .../regress/test_pageserver_generations.py | 67 +++-- 9 files changed, 370 insertions(+), 156 deletions(-) diff --git a/control_plane/attachment_service/Cargo.toml b/control_plane/attachment_service/Cargo.toml index 0b93211dbc..ada35295f9 100644 --- a/control_plane/attachment_service/Cargo.toml +++ b/control_plane/attachment_service/Cargo.toml @@ -4,6 +4,11 @@ version = "0.1.0" edition.workspace = true license.workspace = true +[features] +default = [] +# Enables test-only APIs and behaviors +testing = [] + [dependencies] anyhow.workspace = true aws-config.workspace = true diff --git a/control_plane/attachment_service/src/compute_hook.rs b/control_plane/attachment_service/src/compute_hook.rs index bac378d218..b5e90491c6 100644 --- a/control_plane/attachment_service/src/compute_hook.rs +++ b/control_plane/attachment_service/src/compute_hook.rs @@ -155,7 +155,7 @@ impl ComputeHook { for (endpoint_name, endpoint) in &cplane.endpoints { if endpoint.tenant_id == tenant_id && endpoint.status() == EndpointStatus::Running { - tracing::info!("🔁 Reconfiguring endpoint {}", endpoint_name,); + tracing::info!("Reconfiguring endpoint {}", endpoint_name,); endpoint.reconfigure(compute_pageservers.clone()).await?; } } @@ -177,7 +177,7 @@ impl ComputeHook { req }; - tracing::debug!( + tracing::info!( "Sending notify request to {} ({:?})", url, reconfigure_request @@ -266,7 +266,7 @@ impl ComputeHook { /// periods, but we don't retry forever. The **caller** is responsible for handling failures and /// ensuring that they eventually call again to ensure that the compute is eventually notified of /// the proper pageserver nodes for a tenant. - #[tracing::instrument(skip_all, fields(tenant_shard_id, node_id))] + #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))] pub(super) async fn notify( &self, tenant_shard_id: TenantShardId, @@ -298,7 +298,7 @@ impl ComputeHook { let Some(reconfigure_request) = reconfigure_request else { // The tenant doesn't yet have pageservers for all its shards: we won't notify anything // until it does. - tracing::debug!("Tenant isn't yet ready to emit a notification",); + tracing::info!("Tenant isn't yet ready to emit a notification"); return Ok(()); }; diff --git a/control_plane/attachment_service/src/lib.rs b/control_plane/attachment_service/src/lib.rs index 082afb4157..238efdf5a8 100644 --- a/control_plane/attachment_service/src/lib.rs +++ b/control_plane/attachment_service/src/lib.rs @@ -37,6 +37,12 @@ impl std::fmt::Display for Sequence { } } +impl std::fmt::Debug for Sequence { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + impl MonotonicCounter for Sequence { fn cnt_advance(&mut self, v: Sequence) { assert!(*self <= v); diff --git a/control_plane/attachment_service/src/main.rs b/control_plane/attachment_service/src/main.rs index 7229a2517b..b323ae8820 100644 --- a/control_plane/attachment_service/src/main.rs +++ b/control_plane/attachment_service/src/main.rs @@ -15,6 +15,7 @@ use diesel::Connection; use metrics::launch_timestamp::LaunchTimestamp; use std::sync::Arc; use tokio::signal::unix::SignalKind; +use tokio_util::sync::CancellationToken; use utils::auth::{JwtAuth, SwappableJwtAuth}; use utils::logging::{self, LogFormat}; @@ -237,15 +238,23 @@ async fn async_main() -> anyhow::Result<()> { let auth = secrets .public_key .map(|jwt_auth| Arc::new(SwappableJwtAuth::new(jwt_auth))); - let router = make_router(service, auth) + let router = make_router(service.clone(), auth) .build() .map_err(|err| anyhow!(err))?; let router_service = utils::http::RouterService::new(router).unwrap(); - let server = hyper::Server::from_tcp(http_listener)?.serve(router_service); + // Start HTTP server + let server_shutdown = CancellationToken::new(); + let server = hyper::Server::from_tcp(http_listener)? + .serve(router_service) + .with_graceful_shutdown({ + let server_shutdown = server_shutdown.clone(); + async move { + server_shutdown.cancelled().await; + } + }); tracing::info!("Serving on {0}", args.listen); - - tokio::task::spawn(server); + let server_task = tokio::task::spawn(server); // Wait until we receive a signal let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?; @@ -266,5 +275,16 @@ async fn async_main() -> anyhow::Result<()> { } } + // Stop HTTP server first, so that we don't have to service requests + // while shutting down Service + server_shutdown.cancel(); + if let Err(e) = server_task.await { + tracing::error!("Error joining HTTP server task: {e}") + } + tracing::info!("Joined HTTP server task"); + + service.shutdown().await; + tracing::info!("Service shutdown complete"); + std::process::exit(0); } diff --git a/control_plane/attachment_service/src/reconciler.rs b/control_plane/attachment_service/src/reconciler.rs index 65bbfa7181..a4fbd80dc3 100644 --- a/control_plane/attachment_service/src/reconciler.rs +++ b/control_plane/attachment_service/src/reconciler.rs @@ -13,6 +13,7 @@ use tokio_util::sync::CancellationToken; use utils::generation::Generation; use utils::id::{NodeId, TimelineId}; use utils::lsn::Lsn; +use utils::sync::gate::GateGuard; use crate::compute_hook::{ComputeHook, NotifyError}; use crate::node::Node; @@ -53,6 +54,10 @@ pub(super) struct Reconciler { /// the tenant is changed. pub(crate) cancel: CancellationToken, + /// Reconcilers are registered with a Gate so that during a graceful shutdown we + /// can wait for all the reconcilers to respond to their cancellation tokens. + pub(crate) _gate_guard: GateGuard, + /// Access to persistent storage for updating generation numbers pub(crate) persistence: Arc, } diff --git a/control_plane/attachment_service/src/scheduler.rs b/control_plane/attachment_service/src/scheduler.rs index 1966a7ea2a..3b4c9e3464 100644 --- a/control_plane/attachment_service/src/scheduler.rs +++ b/control_plane/attachment_service/src/scheduler.rs @@ -77,12 +77,11 @@ impl Scheduler { return Err(ScheduleError::ImpossibleConstraint); } - for (node_id, count) in &tenant_counts { - tracing::info!("tenant_counts[{node_id}]={count}"); - } - let node_id = tenant_counts.first().unwrap().0; - tracing::info!("scheduler selected node {node_id}"); + tracing::info!( + "scheduler selected node {node_id} (elegible nodes {:?}, exclude: {hard_exclude:?})", + tenant_counts.iter().map(|i| i.0 .0).collect::>() + ); *self.tenant_counts.get_mut(&node_id).unwrap() += 1; Ok(node_id) } diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 616b74e55d..149cb7f2ba 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -30,6 +30,7 @@ use pageserver_api::{ }; use pageserver_client::mgmt_api; use tokio_util::sync::CancellationToken; +use tracing::instrument; use utils::{ backoff, completion::Barrier, @@ -37,6 +38,7 @@ use utils::{ http::error::ApiError, id::{NodeId, TenantId, TimelineId}, seqwait::SeqWait, + sync::gate::Gate, }; use crate::{ @@ -124,6 +126,12 @@ pub struct Service { config: Config, persistence: Arc, + // Process shutdown will fire this token + cancel: CancellationToken, + + // Background tasks will hold this gate + gate: Gate, + /// This waits for initial reconciliation with pageservers to complete. Until this barrier /// passes, it isn't safe to do any actions that mutate tenants. pub(crate) startup_complete: Barrier, @@ -144,8 +152,9 @@ impl Service { &self.config } - /// TODO: don't allow other API calls until this is done, don't start doing any background housekeeping - /// until this is done. + /// Called once on startup, this function attempts to contact all pageservers to build an up-to-date + /// view of the world, and determine which pageservers are responsive. + #[instrument(skip_all)] async fn startup_reconcile(&self) { // For all tenant shards, a vector of observed states on nodes (where None means // indeterminate, same as in [`ObservedStateLocation`]) @@ -153,9 +162,6 @@ impl Service { let mut nodes_online = HashSet::new(); - // TODO: give Service a cancellation token for clean shutdown - let cancel = CancellationToken::new(); - // TODO: issue these requests concurrently { let nodes = { @@ -190,7 +196,7 @@ impl Service { 1, 5, "Location config listing", - &cancel, + &self.cancel, ) .await; let Some(list_response) = list_response else { @@ -331,7 +337,7 @@ impl Service { let stream = futures::stream::iter(compute_notifications.into_iter()) .map(|(tenant_shard_id, node_id)| { let compute_hook = compute_hook.clone(); - let cancel = cancel.clone(); + let cancel = self.cancel.clone(); async move { if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await { tracing::error!( @@ -368,8 +374,98 @@ impl Service { tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)"); } + /// Long running background task that periodically wakes up and looks for shards that need + /// reconciliation. Reconciliation is fallible, so any reconciliation tasks that fail during + /// e.g. a tenant create/attach/migrate must eventually be retried: this task is responsible + /// for those retries. + #[instrument(skip_all)] + async fn background_reconcile(&self) { + self.startup_complete.clone().wait().await; + + const BACKGROUND_RECONCILE_PERIOD: Duration = Duration::from_secs(20); + + let mut interval = tokio::time::interval(BACKGROUND_RECONCILE_PERIOD); + while !self.cancel.is_cancelled() { + tokio::select! { + _ = interval.tick() => { self.reconcile_all(); } + _ = self.cancel.cancelled() => return + } + } + } + + #[instrument(skip_all)] + async fn process_results( + &self, + mut result_rx: tokio::sync::mpsc::UnboundedReceiver, + ) { + loop { + // Wait for the next result, or for cancellation + let result = tokio::select! { + r = result_rx.recv() => { + match r { + Some(result) => {result}, + None => {break;} + } + } + _ = self.cancel.cancelled() => { + break; + } + }; + + tracing::info!( + "Reconcile result for sequence {}, ok={}", + result.sequence, + result.result.is_ok() + ); + let mut locked = self.inner.write().unwrap(); + let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else { + // A reconciliation result might race with removing a tenant: drop results for + // tenants that aren't in our map. + continue; + }; + + // Usually generation should only be updated via this path, so the max() isn't + // needed, but it is used to handle out-of-band updates via. e.g. test hook. + tenant.generation = std::cmp::max(tenant.generation, result.generation); + + // If the reconciler signals that it failed to notify compute, set this state on + // the shard so that a future [`TenantState::maybe_reconcile`] will try again. + tenant.pending_compute_notification = result.pending_compute_notification; + + match result.result { + Ok(()) => { + for (node_id, loc) in &result.observed.locations { + if let Some(conf) = &loc.conf { + tracing::info!("Updating observed location {}: {:?}", node_id, conf); + } else { + tracing::info!("Setting observed location {} to None", node_id,) + } + } + tenant.observed = result.observed; + tenant.waiter.advance(result.sequence); + } + Err(e) => { + tracing::warn!( + "Reconcile error on tenant {}: {}", + tenant.tenant_shard_id, + e + ); + + // Ordering: populate last_error before advancing error_seq, + // so that waiters will see the correct error after waiting. + *(tenant.last_error.lock().unwrap()) = format!("{e}"); + tenant.error_waiter.advance(result.sequence); + + for (node_id, o) in result.observed.locations { + tenant.observed.locations.insert(node_id, o); + } + } + } + } + } + pub async fn spawn(config: Config, persistence: Arc) -> anyhow::Result> { - let (result_tx, mut result_rx) = tokio::sync::mpsc::unbounded_channel(); + let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel(); tracing::info!("Loading nodes from database..."); let nodes = persistence.list_nodes().await?; @@ -418,6 +514,7 @@ impl Service { observed: ObservedState::new(), config: serde_json::from_str(&tsp.config).unwrap(), reconciler: None, + splitting: tsp.splitting, waiter: Arc::new(SeqWait::new(Sequence::initial())), error_waiter: Arc::new(SeqWait::new(Sequence::initial())), last_error: Arc::default(), @@ -439,73 +536,35 @@ impl Service { config, persistence, startup_complete: startup_complete.clone(), + cancel: CancellationToken::new(), + gate: Gate::default(), }); let result_task_this = this.clone(); tokio::task::spawn(async move { - while let Some(result) = result_rx.recv().await { - tracing::info!( - "Reconcile result for sequence {}, ok={}", - result.sequence, - result.result.is_ok() - ); - let mut locked = result_task_this.inner.write().unwrap(); - let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else { - // A reconciliation result might race with removing a tenant: drop results for - // tenants that aren't in our map. - continue; - }; - - // Usually generation should only be updated via this path, so the max() isn't - // needed, but it is used to handle out-of-band updates via. e.g. test hook. - tenant.generation = std::cmp::max(tenant.generation, result.generation); - - // If the reconciler signals that it failed to notify compute, set this state on - // the shard so that a future [`TenantState::maybe_reconcile`] will try again. - tenant.pending_compute_notification = result.pending_compute_notification; - - match result.result { - Ok(()) => { - for (node_id, loc) in &result.observed.locations { - if let Some(conf) = &loc.conf { - tracing::info!( - "Updating observed location {}: {:?}", - node_id, - conf - ); - } else { - tracing::info!("Setting observed location {} to None", node_id,) - } - } - tenant.observed = result.observed; - tenant.waiter.advance(result.sequence); - } - Err(e) => { - tracing::warn!( - "Reconcile error on tenant {}: {}", - tenant.tenant_shard_id, - e - ); - - // Ordering: populate last_error before advancing error_seq, - // so that waiters will see the correct error after waiting. - *(tenant.last_error.lock().unwrap()) = format!("{e}"); - tenant.error_waiter.advance(result.sequence); - - for (node_id, o) in result.observed.locations { - tenant.observed.locations.insert(node_id, o); - } - } - } + // Block shutdown until we're done (we must respect self.cancel) + if let Ok(_gate) = result_task_this.gate.enter() { + result_task_this.process_results(result_rx).await } }); - let startup_reconcile_this = this.clone(); - tokio::task::spawn(async move { - // Block the [`Service::startup_complete`] barrier until we're done - let _completion = startup_completion; + tokio::task::spawn({ + let this = this.clone(); + // We will block the [`Service::startup_complete`] barrier until [`Self::startup_reconcile`] + // is done. + let startup_completion = startup_completion.clone(); + async move { + // Block shutdown until we're done (we must respect self.cancel) + let Ok(_gate) = this.gate.enter() else { + return; + }; - startup_reconcile_this.startup_reconcile().await + this.startup_reconcile().await; + + drop(startup_completion); + + this.background_reconcile().await; + } }); Ok(this) @@ -620,6 +679,28 @@ impl Service { attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff)) ); + // Trick the reconciler into not doing anything for this tenant: this helps + // tests that manually configure a tenant on the pagesrever, and then call this + // attach hook: they don't want background reconciliation to modify what they + // did to the pageserver. + #[cfg(feature = "testing")] + { + if let Some(node_id) = attach_req.node_id { + tenant_state.observed.locations = HashMap::from([( + node_id, + ObservedStateLocation { + conf: Some(attached_location_conf( + tenant_state.generation, + &tenant_state.shard, + &tenant_state.config, + )), + }, + )]); + } else { + tenant_state.observed.locations.clear(); + } + } + Ok(AttachHookResponse { gen: attach_req .node_id @@ -868,6 +949,8 @@ impl Service { &compute_hook, &self.config, &self.persistence, + &self.gate, + &self.cancel, ) }) .collect::>(); @@ -970,6 +1053,8 @@ impl Service { &compute_hook, &self.config, &self.persistence, + &self.gate, + &self.cancel, ); if let Some(waiter) = maybe_waiter { waiters.push(waiter); @@ -1059,6 +1144,8 @@ impl Service { } pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result { + self.ensure_attached_wait(tenant_id).await?; + // TODO: refactor into helper let targets = { let locked = self.inner.read().unwrap(); @@ -1080,8 +1167,6 @@ impl Service { targets }; - // TODO: error out if the tenant is not attached anywhere. - // Phase 1: delete on the pageservers let mut any_pending = false; for (tenant_shard_id, node) in targets { @@ -1417,9 +1502,6 @@ impl Service { let mut policy = None; let mut shard_ident = None; - // TODO: put a cancellation token on Service for clean shutdown - let cancel = CancellationToken::new(); - // A parent shard which will be split struct SplitTarget { parent_id: TenantShardId, @@ -1591,6 +1673,18 @@ impl Service { } } + // Now that I have persisted the splitting state, apply it in-memory. This is infallible, so + // callers may assume that if splitting is set in memory, then it was persisted, and if splitting + // is not set in memory, then it was not persisted. + { + let mut locked = self.inner.write().unwrap(); + for target in &targets { + if let Some(parent_shard) = locked.tenants.get_mut(&target.parent_id) { + parent_shard.splitting = SplitState::Splitting; + } + } + } + // FIXME: we have now committed the shard split state to the database, so any subsequent // failure needs to roll it back. We will later wrap this function in logic to roll back // the split if it fails. @@ -1650,7 +1744,7 @@ impl Service { .complete_shard_split(tenant_id, old_shard_count) .await?; - // Replace all the shards we just split with their children + // Replace all the shards we just split with their children: this phase is infallible. let mut response = TenantShardSplitResponse { new_shards: Vec::new(), }; @@ -1698,6 +1792,10 @@ impl Service { child_state.generation = generation; child_state.config = config.clone(); + // The child's TenantState::splitting is intentionally left at the default value of Idle, + // as at this point in the split process we have succeeded and this part is infallible: + // we will never need to do any special recovery from this state. + child_locations.push((child, pageserver)); locked.tenants.insert(child, child_state); @@ -1709,7 +1807,7 @@ impl Service { // Send compute notifications for all the new shards let mut failed_notifications = Vec::new(); for (child_id, child_ps) in child_locations { - if let Err(e) = compute_hook.notify(child_id, child_ps, &cancel).await { + if let Err(e) = compute_hook.notify(child_id, child_ps, &self.cancel).await { tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})", child_id, child_ps); failed_notifications.push(child_id); @@ -1785,6 +1883,8 @@ impl Service { &compute_hook, &self.config, &self.persistence, + &self.gate, + &self.cancel, ) }; @@ -1986,6 +2086,8 @@ impl Service { &compute_hook, &self.config, &self.persistence, + &self.gate, + &self.cancel, ); } } @@ -2007,6 +2109,8 @@ impl Service { &compute_hook, &self.config, &self.persistence, + &self.gate, + &self.cancel, ); } } @@ -2046,6 +2150,8 @@ impl Service { &compute_hook, &self.config, &self.persistence, + &self.gate, + &self.cancel, ) { waiters.push(waiter); } @@ -2057,6 +2163,17 @@ impl Service { let ensure_waiters = { let locked = self.inner.write().unwrap(); + // Check if the tenant is splitting: in this case, even if it is attached, + // we must act as if it is not: this blocks e.g. timeline creation/deletion + // operations during the split. + for (_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) { + if !matches!(shard.splitting, SplitState::Idle) { + return Err(ApiError::ResourceUnavailable( + "Tenant shards are currently splitting".into(), + )); + } + } + self.ensure_attached_schedule(locked, tenant_id) .map_err(ApiError::InternalServerError)? }; @@ -2088,8 +2205,25 @@ impl Service { &compute_hook, &self.config, &self.persistence, + &self.gate, + &self.cancel, ) }) .count() } + + pub async fn shutdown(&self) { + // Note that this already stops processing any results from reconciles: so + // we do not expect that our [`TenantState`] objects will reach a neat + // final state. + self.cancel.cancel(); + + // The cancellation tokens in [`crate::reconciler::Reconciler`] are children + // of our cancellation token, so we do not need to explicitly cancel each of + // them. + + // Background tasks and reconcilers hold gate guards: this waits for them all + // to complete. + self.gate.close().await; + } } diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs index 1646ed9fcd..dd753ece3d 100644 --- a/control_plane/attachment_service/src/tenant_state.rs +++ b/control_plane/attachment_service/src/tenant_state.rs @@ -7,16 +7,18 @@ use pageserver_api::{ }; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; +use tracing::{instrument, Instrument}; use utils::{ generation::Generation, id::NodeId, seqwait::{SeqWait, SeqWaitError}, + sync::gate::Gate, }; use crate::{ compute_hook::ComputeHook, node::Node, - persistence::Persistence, + persistence::{split_state::SplitState, Persistence}, reconciler::{attached_location_conf, secondary_location_conf, ReconcileError, Reconciler}, scheduler::{ScheduleError, Scheduler}, service, PlacementPolicy, Sequence, @@ -58,6 +60,11 @@ pub(crate) struct TenantState { /// cancellation token has been fired) pub(crate) reconciler: Option, + /// If a tenant is being split, then all shards with that TenantId will have a + /// SplitState set, this acts as a guard against other operations such as background + /// reconciliation, and timeline creation. + pub(crate) splitting: SplitState, + /// Optionally wait for reconciliation to complete up to a particular /// sequence number. pub(crate) waiter: std::sync::Arc>, @@ -238,6 +245,7 @@ impl TenantState { observed: ObservedState::default(), config: TenantConfig::default(), reconciler: None, + splitting: SplitState::Idle, sequence: Sequence(1), waiter: Arc::new(SeqWait::new(Sequence(0))), error_waiter: Arc::new(SeqWait::new(Sequence(0))), @@ -415,6 +423,8 @@ impl TenantState { false } + #[allow(clippy::too_many_arguments)] + #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))] pub(crate) fn maybe_reconcile( &mut self, result_tx: tokio::sync::mpsc::UnboundedSender, @@ -422,6 +432,8 @@ impl TenantState { compute_hook: &Arc, service_config: &service::Config, persistence: &Arc, + gate: &Gate, + cancel: &CancellationToken, ) -> Option { // If there are any ambiguous observed states, and the nodes they refer to are available, // we should reconcile to clean them up. @@ -443,6 +455,14 @@ impl TenantState { return None; } + // If we are currently splitting, then never start a reconciler task: the splitting logic + // requires that shards are not interfered with while it runs. Do this check here rather than + // up top, so that we only log this message if we would otherwise have done a reconciliation. + if !matches!(self.splitting, SplitState::Idle) { + tracing::info!("Refusing to reconcile, splitting in progress"); + return None; + } + // Reconcile already in flight for the current sequence? if let Some(handle) = &self.reconciler { if handle.sequence == self.sequence { @@ -460,7 +480,12 @@ impl TenantState { // doing our sequence's work. let old_handle = self.reconciler.take(); - let cancel = CancellationToken::new(); + let Ok(gate_guard) = gate.enter() else { + // Shutting down, don't start a reconciler + return None; + }; + + let reconciler_cancel = cancel.child_token(); let mut reconciler = Reconciler { tenant_shard_id: self.tenant_shard_id, shard: self.shard, @@ -471,59 +496,66 @@ impl TenantState { pageservers: pageservers.clone(), compute_hook: compute_hook.clone(), service_config: service_config.clone(), - cancel: cancel.clone(), + _gate_guard: gate_guard, + cancel: reconciler_cancel.clone(), persistence: persistence.clone(), compute_notify_failure: false, }; let reconcile_seq = self.sequence; - tracing::info!("Spawning Reconciler for sequence {}", self.sequence); + tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence); let must_notify = self.pending_compute_notification; - let join_handle = tokio::task::spawn(async move { - // Wait for any previous reconcile task to complete before we start - if let Some(old_handle) = old_handle { - old_handle.cancel.cancel(); - if let Err(e) = old_handle.handle.await { - // We can't do much with this other than log it: the task is done, so - // we may proceed with our work. - tracing::error!("Unexpected join error waiting for reconcile task: {e}"); + let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq, + tenant_id=%reconciler.tenant_shard_id.tenant_id, + shard_id=%reconciler.tenant_shard_id.shard_slug()); + let join_handle = tokio::task::spawn( + async move { + // Wait for any previous reconcile task to complete before we start + if let Some(old_handle) = old_handle { + old_handle.cancel.cancel(); + if let Err(e) = old_handle.handle.await { + // We can't do much with this other than log it: the task is done, so + // we may proceed with our work. + tracing::error!("Unexpected join error waiting for reconcile task: {e}"); + } } + + // Early check for cancellation before doing any work + // TODO: wrap all remote API operations in cancellation check + // as well. + if reconciler.cancel.is_cancelled() { + return; + } + + // Attempt to make observed state match intent state + let result = reconciler.reconcile().await; + + // If we know we had a pending compute notification from some previous action, send a notification irrespective + // of whether the above reconcile() did any work + if result.is_ok() && must_notify { + // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`] + reconciler.compute_notify().await.ok(); + } + + result_tx + .send(ReconcileResult { + sequence: reconcile_seq, + result, + tenant_shard_id: reconciler.tenant_shard_id, + generation: reconciler.generation, + observed: reconciler.observed, + pending_compute_notification: reconciler.compute_notify_failure, + }) + .ok(); } - - // Early check for cancellation before doing any work - // TODO: wrap all remote API operations in cancellation check - // as well. - if reconciler.cancel.is_cancelled() { - return; - } - - // Attempt to make observed state match intent state - let result = reconciler.reconcile().await; - - // If we know we had a pending compute notification from some previous action, send a notification irrespective - // of whether the above reconcile() did any work - if result.is_ok() && must_notify { - // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`] - reconciler.compute_notify().await.ok(); - } - - result_tx - .send(ReconcileResult { - sequence: reconcile_seq, - result, - tenant_shard_id: reconciler.tenant_shard_id, - generation: reconciler.generation, - observed: reconciler.observed, - pending_compute_notification: reconciler.compute_notify_failure, - }) - .ok(); - }); + .instrument(reconciler_span), + ); self.reconciler = Some(ReconcilerHandle { sequence: self.sequence, handle: join_handle, - cancel, + cancel: reconciler_cancel, }); Some(ReconcilerWaiter { diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index de9f3b6945..1070d06ed0 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -20,6 +20,7 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, + NeonPageserver, PgBin, S3Scrubber, last_flush_lsn_upload, @@ -62,7 +63,7 @@ def generate_uploads_and_deletions( tenant_id: Optional[TenantId] = None, timeline_id: Optional[TimelineId] = None, data: Optional[str] = None, - pageserver_id: Optional[int] = None, + pageserver: NeonPageserver, ): """ Using the environment's default tenant + timeline, generate a load pattern @@ -77,14 +78,16 @@ def generate_uploads_and_deletions( timeline_id = env.initial_timeline assert timeline_id is not None - ps_http = env.pageserver.http_client() + ps_http = pageserver.http_client() with env.endpoints.create_start( - "main", tenant_id=tenant_id, pageserver_id=pageserver_id + "main", tenant_id=tenant_id, pageserver_id=pageserver.id ) as endpoint: if init: endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)") - last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id) + last_flush_lsn_upload( + env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id + ) def churn(data): endpoint.safe_psql_many( @@ -105,7 +108,9 @@ def generate_uploads_and_deletions( # We are waiting for uploads as well as local flush, in order to avoid leaving the system # in a state where there are "future layers" in remote storage that will generate deletions # after a restart. - last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id) + last_flush_lsn_upload( + env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id + ) ps_http.timeline_checkpoint(tenant_id, timeline_id) # Compaction should generate some GC-elegible layers @@ -205,7 +210,7 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder): env.neon_cli.create_tenant( tenant_id=env.initial_tenant, conf=TENANT_CONF, timeline_id=env.initial_timeline ) - generate_uploads_and_deletions(env, pageserver_id=env.pageserver.id) + generate_uploads_and_deletions(env, pageserver=env.pageserver) def parse_generation_suffix(key): m = re.match(".+-([0-9a-zA-Z]{8})$", key) @@ -233,7 +238,7 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder): # Starting without the override that disabled control_plane_api env.pageserver.start() - generate_uploads_and_deletions(env, pageserver_id=env.pageserver.id, init=False) + generate_uploads_and_deletions(env, pageserver=env.pageserver, init=False) legacy_objects: list[str] = [] suffixed_objects = [] @@ -277,13 +282,16 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder): neon_env_builder.enable_pageserver_remote_storage( RemoteStorageKind.MOCK_S3, ) + neon_env_builder.num_pageservers = 2 env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF) - some_other_pageserver = 1234 + attached_to_id = env.attachment_service.locate(env.initial_tenant)[0]["node_id"] + main_pageserver = env.get_pageserver(attached_to_id) + other_pageserver = [p for p in env.pageservers if p.id != attached_to_id][0] - ps_http = env.pageserver.http_client() + ps_http = main_pageserver.http_client() - generate_uploads_and_deletions(env) + generate_uploads_and_deletions(env, pageserver=main_pageserver) # Flush: pending deletions should all complete assert_deletion_queue(ps_http, lambda n: n > 0) @@ -296,14 +304,14 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder): assert timeline["remote_consistent_lsn"] == timeline["remote_consistent_lsn_visible"] assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0 - env.pageserver.allowed_errors.extend( + main_pageserver.allowed_errors.extend( [".*Dropped remote consistent LSN updates.*", ".*Dropping stale deletions.*"] ) # Now advance the generation in the control plane: subsequent validations # from the running pageserver will fail. No more deletions should happen. - env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver) - generate_uploads_and_deletions(env, init=False, pageserver_id=env.pageserver.id) + env.attachment_service.attach_hook_issue(env.initial_tenant, other_pageserver.id) + generate_uploads_and_deletions(env, init=False, pageserver=main_pageserver) assert_deletion_queue(ps_http, lambda n: n > 0) queue_depth_before = get_deletion_queue_depth(ps_http) @@ -355,9 +363,14 @@ def test_deletion_queue_recovery( neon_env_builder.enable_pageserver_remote_storage( RemoteStorageKind.MOCK_S3, ) + neon_env_builder.num_pageservers = 2 env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF) - ps_http = env.pageserver.http_client() + attached_to_id = env.attachment_service.locate(env.initial_tenant)[0]["node_id"] + main_pageserver = env.get_pageserver(attached_to_id) + other_pageserver = [p for p in env.pageservers if p.id != attached_to_id][0] + + ps_http = main_pageserver.http_client() failpoints = [ # Prevent deletion lists from being executed, to build up some backlog of deletions @@ -374,7 +387,7 @@ def test_deletion_queue_recovery( ps_http.configure_failpoints(failpoints) - generate_uploads_and_deletions(env) + generate_uploads_and_deletions(env, pageserver=main_pageserver) # There should be entries in the deletion queue assert_deletion_queue(ps_http, lambda n: n > 0) @@ -401,7 +414,7 @@ def test_deletion_queue_recovery( # also wait to see the header hit the disk: this seems paranoid but the race # can really happen on a heavily overloaded test machine. def assert_header_written(): - assert (env.pageserver.workdir / "deletion" / "header-01").exists() + assert (main_pageserver.workdir / "deletion" / "header-01").exists() wait_until(20, 1, assert_header_written) @@ -411,13 +424,13 @@ def test_deletion_queue_recovery( before_restart_depth = get_deletion_queue_validated(ps_http) log.info(f"Restarting pageserver with {before_restart_depth} deletions enqueued") - env.pageserver.stop(immediate=True) + main_pageserver.stop(immediate=True) if keep_attachment == KeepAttachment.LOSE: - some_other_pageserver = 101010 + some_other_pageserver = other_pageserver.id env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver) - env.pageserver.start() + main_pageserver.start() def assert_deletions_submitted(n: int): assert ps_http.get_metric_value("pageserver_deletion_queue_submitted_total") == n @@ -440,7 +453,7 @@ def test_deletion_queue_recovery( # validated before restart. assert get_deletion_queue_executed(ps_http) == before_restart_depth else: - env.pageserver.allowed_errors.extend([".*Dropping stale deletions.*"]) + main_pageserver.allowed_errors.extend([".*Dropping stale deletions.*"]) # If we lost the attachment, we should have dropped our pre-restart deletions. assert get_deletion_queue_dropped(ps_http) == before_restart_depth @@ -449,8 +462,8 @@ def test_deletion_queue_recovery( assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0 # Restart again - env.pageserver.stop(immediate=True) - env.pageserver.start() + main_pageserver.stop(immediate=True) + main_pageserver.start() # No deletion lists should be recovered: this demonstrates that deletion lists # were cleaned up after being executed or dropped in the previous process lifetime. @@ -469,7 +482,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): ps_http = env.pageserver.http_client() - generate_uploads_and_deletions(env, pageserver_id=env.pageserver.id) + generate_uploads_and_deletions(env, pageserver=env.pageserver) env.pageserver.allowed_errors.extend( [ @@ -486,7 +499,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): # Remember how many validations had happened before the control plane went offline validated = get_deletion_queue_validated(ps_http) - generate_uploads_and_deletions(env, init=False, pageserver_id=env.pageserver.id) + generate_uploads_and_deletions(env, init=False, pageserver=env.pageserver) # The running pageserver should stop progressing deletions time.sleep(10) @@ -502,7 +515,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): ) # The pageserver should provide service to clients - generate_uploads_and_deletions(env, init=False, pageserver_id=env.pageserver.id) + generate_uploads_and_deletions(env, init=False, pageserver=env.pageserver) # The pageserver should neither validate nor execute any deletions, it should have # loaded the DeletionLists from before though @@ -523,7 +536,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP env.pageserver.start() - generate_uploads_and_deletions(env, init=False, pageserver_id=env.pageserver.id) + generate_uploads_and_deletions(env, init=False, pageserver=env.pageserver) ps_http.deletion_queue_flush(execute=True) assert get_deletion_queue_depth(ps_http) == 0 assert get_deletion_queue_validated(ps_http) > 0 @@ -561,7 +574,7 @@ def test_eviction_across_generations(neon_env_builder: NeonEnvBuilder): tenant_id = env.initial_tenant timeline_id = env.initial_timeline - generate_uploads_and_deletions(env) + generate_uploads_and_deletions(env, pageserver=env.pageserver) read_all(env, tenant_id, timeline_id) evict_all_layers(env, tenant_id, timeline_id)