diff --git a/libs/pageserver_api/src/upcall_api.rs b/libs/pageserver_api/src/upcall_api.rs index 7ee63f9036..4dce5f7817 100644 --- a/libs/pageserver_api/src/upcall_api.rs +++ b/libs/pageserver_api/src/upcall_api.rs @@ -4,6 +4,7 @@ //! See docs/rfcs/025-generation-numbers.md use serde::{Deserialize, Serialize}; +use utils::generation::Generation; use utils::id::{NodeId, TimelineId}; use crate::controller_api::NodeRegisterRequest; @@ -63,9 +64,17 @@ pub struct ValidateResponseTenant { pub valid: bool, } +#[derive(Serialize, Deserialize)] +pub struct TimelineImportStatusRequest { + pub tenant_shard_id: TenantShardId, + pub timeline_id: TimelineId, + pub generation: Generation, +} + #[derive(Serialize, Deserialize)] pub struct PutTimelineImportStatusRequest { pub tenant_shard_id: TenantShardId, pub timeline_id: TimelineId, pub status: ShardImportStatus, + pub generation: Generation, } diff --git a/pageserver/src/controller_upcall_client.rs b/pageserver/src/controller_upcall_client.rs index 6d186b091a..779ef3e37d 100644 --- a/pageserver/src/controller_upcall_client.rs +++ b/pageserver/src/controller_upcall_client.rs @@ -7,7 +7,7 @@ use pageserver_api::models::ShardImportStatus; use pageserver_api::shard::TenantShardId; use pageserver_api::upcall_api::{ PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, - ValidateRequest, ValidateRequestTenant, ValidateResponse, + TimelineImportStatusRequest, ValidateRequest, ValidateRequestTenant, ValidateResponse, }; use reqwest::Certificate; use serde::Serialize; @@ -51,12 +51,14 @@ pub trait StorageControllerUpcallApi { &self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, + generation: Generation, status: ShardImportStatus, ) -> impl Future> + Send; fn get_timeline_import_status( &self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, + generation: Generation, ) -> impl Future, RetryForeverError>> + Send; } @@ -292,6 +294,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { &self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, + generation: Generation, status: ShardImportStatus, ) -> Result<(), RetryForeverError> { let url = self @@ -302,6 +305,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { let request = PutTimelineImportStatusRequest { tenant_shard_id, timeline_id, + generation, status, }; @@ -313,15 +317,27 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { &self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, + generation: Generation, ) -> Result, RetryForeverError> { let url = self .base_url - .join(format!("timeline_import_status/{}/{}", tenant_shard_id, timeline_id).as_str()) + .join("timeline_import_status") .expect("Failed to build path"); + let request = TimelineImportStatusRequest { + tenant_shard_id, + timeline_id, + generation, + }; + Ok(backoff::retry( || async { - let response = self.http_client.get(url.clone()).send().await?; + let response = self + .http_client + .get(url.clone()) + .json(&request) + .send() + .await?; if let Err(err) = response.error_for_status_ref() { if matches!(err.status(), Some(reqwest::StatusCode::NOT_FOUND)) { diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 65b2de28cd..0bbad87c09 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -793,6 +793,7 @@ mod test { &self, _tenant_shard_id: TenantShardId, _timeline_id: TimelineId, + _generation: Generation, _status: pageserver_api::models::ShardImportStatus, ) -> Result<(), RetryForeverError> { unimplemented!() @@ -802,6 +803,7 @@ mod test { &self, _tenant_shard_id: TenantShardId, _timeline_id: TimelineId, + _generation: Generation, ) -> Result, RetryForeverError> { unimplemented!() } diff --git a/pageserver/src/tenant/timeline/import_pgdata.rs b/pageserver/src/tenant/timeline/import_pgdata.rs index 53e15e5395..5fac9e0ce7 100644 --- a/pageserver/src/tenant/timeline/import_pgdata.rs +++ b/pageserver/src/tenant/timeline/import_pgdata.rs @@ -48,7 +48,11 @@ pub async fn doit( let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel); let shard_status = storcon_client - .get_timeline_import_status(timeline.tenant_shard_id, timeline.timeline_id) + .get_timeline_import_status( + timeline.tenant_shard_id, + timeline.timeline_id, + timeline.generation, + ) .await .map_err(|_err| anyhow::anyhow!("Shut down while getting timeline import status"))?; @@ -175,6 +179,7 @@ pub async fn doit( .put_timeline_import_status( timeline.tenant_shard_id, timeline.timeline_id, + timeline.generation, // TODO(vlad): What about import errors? ShardImportStatus::Done, ) diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 8d459cab9c..02c02c0e7f 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -31,7 +31,7 @@ use pageserver_api::models::{ }; use pageserver_api::shard::TenantShardId; use pageserver_api::upcall_api::{ - PutTimelineImportStatusRequest, ReAttachRequest, ValidateRequest, + PutTimelineImportStatusRequest, ReAttachRequest, TimelineImportStatusRequest, ValidateRequest, }; use pageserver_client::{BlockUnblock, mgmt_api}; use routerify::Middleware; @@ -160,22 +160,22 @@ async fn handle_validate(req: Request) -> Result, ApiError> async fn handle_get_timeline_import_status(req: Request) -> Result, ApiError> { check_permissions(&req, Scope::GenerationsApi)?; - let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?; - let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?; - - let req = match maybe_forward(req).await { + let mut req = match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { return res; } ForwardOutcome::NotForwarded(req) => req, }; + let get_req = json_request::(&mut req).await?; + let state = get_state(&req); + json_response( StatusCode::OK, state .service - .handle_timeline_shard_import_progress(tenant_shard_id, timeline_id) + .handle_timeline_shard_import_progress(get_req) .await?, ) } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 05430733c2..852005639a 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -47,7 +47,7 @@ use pageserver_api::shard::{ }; use pageserver_api::upcall_api::{ PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, - ValidateRequest, ValidateResponse, ValidateResponseTenant, + TimelineImportStatusRequest, ValidateRequest, ValidateResponse, ValidateResponseTenant, }; use pageserver_client::{BlockUnblock, mgmt_api}; use reqwest::{Certificate, StatusCode}; @@ -194,6 +194,14 @@ pub(crate) enum LeadershipStatus { Candidate, } +enum ShardGenerationValidity { + Valid, + Mismatched { + claimed: Generation, + actual: Option, + }, +} + pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128; pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256; pub const SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT: usize = 32; @@ -3909,19 +3917,36 @@ impl Service { pub(crate) async fn handle_timeline_shard_import_progress( self: &Arc, - tenant_shard_id: TenantShardId, - timeline_id: TimelineId, + req: TimelineImportStatusRequest, ) -> Result { + let validity = self + .validate_shard_generation(req.tenant_shard_id, req.generation) + .await?; + match validity { + ShardGenerationValidity::Valid => { + // fallthrough + } + ShardGenerationValidity::Mismatched { claimed, actual } => { + tracing::info!( + claimed=?claimed.into(), + actual=?actual.and_then(|g| g.into()), + "Rejecting import progress fetch from stale generation" + ); + + return Err(ApiError::BadRequest(anyhow::anyhow!("Invalid generation"))); + } + } + let maybe_import = self .persistence - .get_timeline_import(tenant_shard_id.tenant_id, timeline_id) + .get_timeline_import(req.tenant_shard_id.tenant_id, req.timeline_id) .await?; let import = maybe_import.ok_or_else(|| { ApiError::NotFound( format!( "import for {}/{} not found", - tenant_shard_id.tenant_id, timeline_id + req.tenant_shard_id.tenant_id, req.timeline_id ) .into(), ) @@ -3930,11 +3955,11 @@ impl Service { import .shard_statuses .0 - .get(&tenant_shard_id.to_index()) + .get(&req.tenant_shard_id.to_index()) .cloned() .ok_or_else(|| { ApiError::NotFound( - format!("shard {} not found", tenant_shard_id.shard_slug()).into(), + format!("shard {} not found", req.tenant_shard_id.shard_slug()).into(), ) }) } @@ -3943,6 +3968,24 @@ impl Service { self: &Arc, req: PutTimelineImportStatusRequest, ) -> Result<(), ApiError> { + let validity = self + .validate_shard_generation(req.tenant_shard_id, req.generation) + .await?; + match validity { + ShardGenerationValidity::Valid => { + // fallthrough + } + ShardGenerationValidity::Mismatched { claimed, actual } => { + tracing::info!( + claimed=?claimed.into(), + actual=?actual.and_then(|g| g.into()), + "Rejecting import progress update from stale generation" + ); + + return Err(ApiError::PreconditionFailed("Invalid generation".into())); + } + } + let res = self .persistence .update_timeline_import(req.tenant_shard_id, req.timeline_id, req.status) @@ -3977,6 +4020,56 @@ impl Service { Ok(()) } + /// Check that a provided generation for some tenant shard is the most recent one. + /// + /// Validate with the in-mem state first, and, if that passes, validate with the + /// database state which is authoritative. + async fn validate_shard_generation( + self: &Arc, + tenant_shard_id: TenantShardId, + generation: Generation, + ) -> Result { + { + let locked = self.inner.read().unwrap(); + let tenant_shard = + locked + .tenants + .get(&tenant_shard_id) + .ok_or(ApiError::InternalServerError(anyhow::anyhow!( + "{} shard not found", + tenant_shard_id + )))?; + + if tenant_shard.generation != Some(generation) { + return Ok(ShardGenerationValidity::Mismatched { + claimed: generation, + actual: tenant_shard.generation, + }); + } + } + + let mut db_generations = self + .persistence + .shard_generations(std::iter::once(&tenant_shard_id)) + .await?; + let (_tid, db_generation) = + db_generations + .pop() + .ok_or(ApiError::InternalServerError(anyhow::anyhow!( + "{} shard not found", + tenant_shard_id + )))?; + + if db_generation != Some(generation) { + return Ok(ShardGenerationValidity::Mismatched { + claimed: generation, + actual: db_generation, + }); + } + + Ok(ShardGenerationValidity::Valid) + } + /// Finalize the import of a timeline /// /// This method should be called once all shards have reported that the import is complete.