From a703cd342b1f7f8faf5920cec8ef09902f94eaa8 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 15 May 2025 11:02:11 +0100 Subject: [PATCH] storage_controller: enforce generations in import upcalls (#11900) ## Problem Import up-calls did not enforce the usage of the latest generation. The import might have finished in one previous generation, but not in the latest one. Hence, the controller might try to activate a timeline before it is ready. In theory, that would be fine, but it's tricky to reason about. ## Summary of Changes Pageserver provides the current generation in the upcall to the storage controller and the later validates the generation. If the generation is stale, we return an error which stops progress of the import job. Note that the import job will retry the upcall until the stale location is detached. I'll add some proper tests for this as part of the [checkpointing PR](https://github.com/neondatabase/neon/pull/11862). Closes https://github.com/neondatabase/neon/issues/11884 --- libs/pageserver_api/src/upcall_api.rs | 9 ++ pageserver/src/controller_upcall_client.rs | 22 +++- pageserver/src/deletion_queue.rs | 2 + .../src/tenant/timeline/import_pgdata.rs | 7 +- storage_controller/src/http.rs | 12 +- storage_controller/src/service.rs | 107 ++++++++++++++++-- 6 files changed, 142 insertions(+), 17 deletions(-) diff --git a/libs/pageserver_api/src/upcall_api.rs b/libs/pageserver_api/src/upcall_api.rs index 7ee63f9036..4dce5f7817 100644 --- a/libs/pageserver_api/src/upcall_api.rs +++ b/libs/pageserver_api/src/upcall_api.rs @@ -4,6 +4,7 @@ //! See docs/rfcs/025-generation-numbers.md use serde::{Deserialize, Serialize}; +use utils::generation::Generation; use utils::id::{NodeId, TimelineId}; use crate::controller_api::NodeRegisterRequest; @@ -63,9 +64,17 @@ pub struct ValidateResponseTenant { pub valid: bool, } +#[derive(Serialize, Deserialize)] +pub struct TimelineImportStatusRequest { + pub tenant_shard_id: TenantShardId, + pub timeline_id: TimelineId, + pub generation: Generation, +} + #[derive(Serialize, Deserialize)] pub struct PutTimelineImportStatusRequest { pub tenant_shard_id: TenantShardId, pub timeline_id: TimelineId, pub status: ShardImportStatus, + pub generation: Generation, } diff --git a/pageserver/src/controller_upcall_client.rs b/pageserver/src/controller_upcall_client.rs index 6d186b091a..779ef3e37d 100644 --- a/pageserver/src/controller_upcall_client.rs +++ b/pageserver/src/controller_upcall_client.rs @@ -7,7 +7,7 @@ use pageserver_api::models::ShardImportStatus; use pageserver_api::shard::TenantShardId; use pageserver_api::upcall_api::{ PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, - ValidateRequest, ValidateRequestTenant, ValidateResponse, + TimelineImportStatusRequest, ValidateRequest, ValidateRequestTenant, ValidateResponse, }; use reqwest::Certificate; use serde::Serialize; @@ -51,12 +51,14 @@ pub trait StorageControllerUpcallApi { &self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, + generation: Generation, status: ShardImportStatus, ) -> impl Future> + Send; fn get_timeline_import_status( &self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, + generation: Generation, ) -> impl Future, RetryForeverError>> + Send; } @@ -292,6 +294,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { &self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, + generation: Generation, status: ShardImportStatus, ) -> Result<(), RetryForeverError> { let url = self @@ -302,6 +305,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { let request = PutTimelineImportStatusRequest { tenant_shard_id, timeline_id, + generation, status, }; @@ -313,15 +317,27 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { &self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, + generation: Generation, ) -> Result, RetryForeverError> { let url = self .base_url - .join(format!("timeline_import_status/{}/{}", tenant_shard_id, timeline_id).as_str()) + .join("timeline_import_status") .expect("Failed to build path"); + let request = TimelineImportStatusRequest { + tenant_shard_id, + timeline_id, + generation, + }; + Ok(backoff::retry( || async { - let response = self.http_client.get(url.clone()).send().await?; + let response = self + .http_client + .get(url.clone()) + .json(&request) + .send() + .await?; if let Err(err) = response.error_for_status_ref() { if matches!(err.status(), Some(reqwest::StatusCode::NOT_FOUND)) { diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 65b2de28cd..0bbad87c09 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -793,6 +793,7 @@ mod test { &self, _tenant_shard_id: TenantShardId, _timeline_id: TimelineId, + _generation: Generation, _status: pageserver_api::models::ShardImportStatus, ) -> Result<(), RetryForeverError> { unimplemented!() @@ -802,6 +803,7 @@ mod test { &self, _tenant_shard_id: TenantShardId, _timeline_id: TimelineId, + _generation: Generation, ) -> Result, RetryForeverError> { unimplemented!() } diff --git a/pageserver/src/tenant/timeline/import_pgdata.rs b/pageserver/src/tenant/timeline/import_pgdata.rs index 53e15e5395..5fac9e0ce7 100644 --- a/pageserver/src/tenant/timeline/import_pgdata.rs +++ b/pageserver/src/tenant/timeline/import_pgdata.rs @@ -48,7 +48,11 @@ pub async fn doit( let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel); let shard_status = storcon_client - .get_timeline_import_status(timeline.tenant_shard_id, timeline.timeline_id) + .get_timeline_import_status( + timeline.tenant_shard_id, + timeline.timeline_id, + timeline.generation, + ) .await .map_err(|_err| anyhow::anyhow!("Shut down while getting timeline import status"))?; @@ -175,6 +179,7 @@ pub async fn doit( .put_timeline_import_status( timeline.tenant_shard_id, timeline.timeline_id, + timeline.generation, // TODO(vlad): What about import errors? ShardImportStatus::Done, ) diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 8d459cab9c..02c02c0e7f 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -31,7 +31,7 @@ use pageserver_api::models::{ }; use pageserver_api::shard::TenantShardId; use pageserver_api::upcall_api::{ - PutTimelineImportStatusRequest, ReAttachRequest, ValidateRequest, + PutTimelineImportStatusRequest, ReAttachRequest, TimelineImportStatusRequest, ValidateRequest, }; use pageserver_client::{BlockUnblock, mgmt_api}; use routerify::Middleware; @@ -160,22 +160,22 @@ async fn handle_validate(req: Request) -> Result, ApiError> async fn handle_get_timeline_import_status(req: Request) -> Result, ApiError> { check_permissions(&req, Scope::GenerationsApi)?; - let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?; - let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?; - - let req = match maybe_forward(req).await { + let mut req = match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { return res; } ForwardOutcome::NotForwarded(req) => req, }; + let get_req = json_request::(&mut req).await?; + let state = get_state(&req); + json_response( StatusCode::OK, state .service - .handle_timeline_shard_import_progress(tenant_shard_id, timeline_id) + .handle_timeline_shard_import_progress(get_req) .await?, ) } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 05430733c2..852005639a 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -47,7 +47,7 @@ use pageserver_api::shard::{ }; use pageserver_api::upcall_api::{ PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, - ValidateRequest, ValidateResponse, ValidateResponseTenant, + TimelineImportStatusRequest, ValidateRequest, ValidateResponse, ValidateResponseTenant, }; use pageserver_client::{BlockUnblock, mgmt_api}; use reqwest::{Certificate, StatusCode}; @@ -194,6 +194,14 @@ pub(crate) enum LeadershipStatus { Candidate, } +enum ShardGenerationValidity { + Valid, + Mismatched { + claimed: Generation, + actual: Option, + }, +} + pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128; pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256; pub const SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT: usize = 32; @@ -3909,19 +3917,36 @@ impl Service { pub(crate) async fn handle_timeline_shard_import_progress( self: &Arc, - tenant_shard_id: TenantShardId, - timeline_id: TimelineId, + req: TimelineImportStatusRequest, ) -> Result { + let validity = self + .validate_shard_generation(req.tenant_shard_id, req.generation) + .await?; + match validity { + ShardGenerationValidity::Valid => { + // fallthrough + } + ShardGenerationValidity::Mismatched { claimed, actual } => { + tracing::info!( + claimed=?claimed.into(), + actual=?actual.and_then(|g| g.into()), + "Rejecting import progress fetch from stale generation" + ); + + return Err(ApiError::BadRequest(anyhow::anyhow!("Invalid generation"))); + } + } + let maybe_import = self .persistence - .get_timeline_import(tenant_shard_id.tenant_id, timeline_id) + .get_timeline_import(req.tenant_shard_id.tenant_id, req.timeline_id) .await?; let import = maybe_import.ok_or_else(|| { ApiError::NotFound( format!( "import for {}/{} not found", - tenant_shard_id.tenant_id, timeline_id + req.tenant_shard_id.tenant_id, req.timeline_id ) .into(), ) @@ -3930,11 +3955,11 @@ impl Service { import .shard_statuses .0 - .get(&tenant_shard_id.to_index()) + .get(&req.tenant_shard_id.to_index()) .cloned() .ok_or_else(|| { ApiError::NotFound( - format!("shard {} not found", tenant_shard_id.shard_slug()).into(), + format!("shard {} not found", req.tenant_shard_id.shard_slug()).into(), ) }) } @@ -3943,6 +3968,24 @@ impl Service { self: &Arc, req: PutTimelineImportStatusRequest, ) -> Result<(), ApiError> { + let validity = self + .validate_shard_generation(req.tenant_shard_id, req.generation) + .await?; + match validity { + ShardGenerationValidity::Valid => { + // fallthrough + } + ShardGenerationValidity::Mismatched { claimed, actual } => { + tracing::info!( + claimed=?claimed.into(), + actual=?actual.and_then(|g| g.into()), + "Rejecting import progress update from stale generation" + ); + + return Err(ApiError::PreconditionFailed("Invalid generation".into())); + } + } + let res = self .persistence .update_timeline_import(req.tenant_shard_id, req.timeline_id, req.status) @@ -3977,6 +4020,56 @@ impl Service { Ok(()) } + /// Check that a provided generation for some tenant shard is the most recent one. + /// + /// Validate with the in-mem state first, and, if that passes, validate with the + /// database state which is authoritative. + async fn validate_shard_generation( + self: &Arc, + tenant_shard_id: TenantShardId, + generation: Generation, + ) -> Result { + { + let locked = self.inner.read().unwrap(); + let tenant_shard = + locked + .tenants + .get(&tenant_shard_id) + .ok_or(ApiError::InternalServerError(anyhow::anyhow!( + "{} shard not found", + tenant_shard_id + )))?; + + if tenant_shard.generation != Some(generation) { + return Ok(ShardGenerationValidity::Mismatched { + claimed: generation, + actual: tenant_shard.generation, + }); + } + } + + let mut db_generations = self + .persistence + .shard_generations(std::iter::once(&tenant_shard_id)) + .await?; + let (_tid, db_generation) = + db_generations + .pop() + .ok_or(ApiError::InternalServerError(anyhow::anyhow!( + "{} shard not found", + tenant_shard_id + )))?; + + if db_generation != Some(generation) { + return Ok(ShardGenerationValidity::Mismatched { + claimed: generation, + actual: db_generation, + }); + } + + Ok(ShardGenerationValidity::Valid) + } + /// Finalize the import of a timeline /// /// This method should be called once all shards have reported that the import is complete.