mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
storage_controller: enforce generations in import upcalls (#11900)
## Problem Import up-calls did not enforce the usage of the latest generation. The import might have finished in one previous generation, but not in the latest one. Hence, the controller might try to activate a timeline before it is ready. In theory, that would be fine, but it's tricky to reason about. ## Summary of Changes Pageserver provides the current generation in the upcall to the storage controller and the later validates the generation. If the generation is stale, we return an error which stops progress of the import job. Note that the import job will retry the upcall until the stale location is detached. I'll add some proper tests for this as part of the [checkpointing PR](https://github.com/neondatabase/neon/pull/11862). Closes https://github.com/neondatabase/neon/issues/11884
This commit is contained in:
@@ -31,7 +31,7 @@ use pageserver_api::models::{
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::upcall_api::{
|
||||
PutTimelineImportStatusRequest, ReAttachRequest, ValidateRequest,
|
||||
PutTimelineImportStatusRequest, ReAttachRequest, TimelineImportStatusRequest, ValidateRequest,
|
||||
};
|
||||
use pageserver_client::{BlockUnblock, mgmt_api};
|
||||
use routerify::Middleware;
|
||||
@@ -160,22 +160,22 @@ async fn handle_validate(req: Request<Body>) -> Result<Response<Body>, ApiError>
|
||||
async fn handle_get_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::GenerationsApi)?;
|
||||
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
let mut req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
return res;
|
||||
}
|
||||
ForwardOutcome::NotForwarded(req) => req,
|
||||
};
|
||||
|
||||
let get_req = json_request::<TimelineImportStatusRequest>(&mut req).await?;
|
||||
|
||||
let state = get_state(&req);
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
state
|
||||
.service
|
||||
.handle_timeline_shard_import_progress(tenant_shard_id, timeline_id)
|
||||
.handle_timeline_shard_import_progress(get_req)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ use pageserver_api::shard::{
|
||||
};
|
||||
use pageserver_api::upcall_api::{
|
||||
PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant,
|
||||
ValidateRequest, ValidateResponse, ValidateResponseTenant,
|
||||
TimelineImportStatusRequest, ValidateRequest, ValidateResponse, ValidateResponseTenant,
|
||||
};
|
||||
use pageserver_client::{BlockUnblock, mgmt_api};
|
||||
use reqwest::{Certificate, StatusCode};
|
||||
@@ -194,6 +194,14 @@ pub(crate) enum LeadershipStatus {
|
||||
Candidate,
|
||||
}
|
||||
|
||||
enum ShardGenerationValidity {
|
||||
Valid,
|
||||
Mismatched {
|
||||
claimed: Generation,
|
||||
actual: Option<Generation>,
|
||||
},
|
||||
}
|
||||
|
||||
pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
|
||||
pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256;
|
||||
pub const SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT: usize = 32;
|
||||
@@ -3909,19 +3917,36 @@ impl Service {
|
||||
|
||||
pub(crate) async fn handle_timeline_shard_import_progress(
|
||||
self: &Arc<Self>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
req: TimelineImportStatusRequest,
|
||||
) -> Result<ShardImportStatus, ApiError> {
|
||||
let validity = self
|
||||
.validate_shard_generation(req.tenant_shard_id, req.generation)
|
||||
.await?;
|
||||
match validity {
|
||||
ShardGenerationValidity::Valid => {
|
||||
// fallthrough
|
||||
}
|
||||
ShardGenerationValidity::Mismatched { claimed, actual } => {
|
||||
tracing::info!(
|
||||
claimed=?claimed.into(),
|
||||
actual=?actual.and_then(|g| g.into()),
|
||||
"Rejecting import progress fetch from stale generation"
|
||||
);
|
||||
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!("Invalid generation")));
|
||||
}
|
||||
}
|
||||
|
||||
let maybe_import = self
|
||||
.persistence
|
||||
.get_timeline_import(tenant_shard_id.tenant_id, timeline_id)
|
||||
.get_timeline_import(req.tenant_shard_id.tenant_id, req.timeline_id)
|
||||
.await?;
|
||||
|
||||
let import = maybe_import.ok_or_else(|| {
|
||||
ApiError::NotFound(
|
||||
format!(
|
||||
"import for {}/{} not found",
|
||||
tenant_shard_id.tenant_id, timeline_id
|
||||
req.tenant_shard_id.tenant_id, req.timeline_id
|
||||
)
|
||||
.into(),
|
||||
)
|
||||
@@ -3930,11 +3955,11 @@ impl Service {
|
||||
import
|
||||
.shard_statuses
|
||||
.0
|
||||
.get(&tenant_shard_id.to_index())
|
||||
.get(&req.tenant_shard_id.to_index())
|
||||
.cloned()
|
||||
.ok_or_else(|| {
|
||||
ApiError::NotFound(
|
||||
format!("shard {} not found", tenant_shard_id.shard_slug()).into(),
|
||||
format!("shard {} not found", req.tenant_shard_id.shard_slug()).into(),
|
||||
)
|
||||
})
|
||||
}
|
||||
@@ -3943,6 +3968,24 @@ impl Service {
|
||||
self: &Arc<Self>,
|
||||
req: PutTimelineImportStatusRequest,
|
||||
) -> Result<(), ApiError> {
|
||||
let validity = self
|
||||
.validate_shard_generation(req.tenant_shard_id, req.generation)
|
||||
.await?;
|
||||
match validity {
|
||||
ShardGenerationValidity::Valid => {
|
||||
// fallthrough
|
||||
}
|
||||
ShardGenerationValidity::Mismatched { claimed, actual } => {
|
||||
tracing::info!(
|
||||
claimed=?claimed.into(),
|
||||
actual=?actual.and_then(|g| g.into()),
|
||||
"Rejecting import progress update from stale generation"
|
||||
);
|
||||
|
||||
return Err(ApiError::PreconditionFailed("Invalid generation".into()));
|
||||
}
|
||||
}
|
||||
|
||||
let res = self
|
||||
.persistence
|
||||
.update_timeline_import(req.tenant_shard_id, req.timeline_id, req.status)
|
||||
@@ -3977,6 +4020,56 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check that a provided generation for some tenant shard is the most recent one.
|
||||
///
|
||||
/// Validate with the in-mem state first, and, if that passes, validate with the
|
||||
/// database state which is authoritative.
|
||||
async fn validate_shard_generation(
|
||||
self: &Arc<Self>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
generation: Generation,
|
||||
) -> Result<ShardGenerationValidity, ApiError> {
|
||||
{
|
||||
let locked = self.inner.read().unwrap();
|
||||
let tenant_shard =
|
||||
locked
|
||||
.tenants
|
||||
.get(&tenant_shard_id)
|
||||
.ok_or(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"{} shard not found",
|
||||
tenant_shard_id
|
||||
)))?;
|
||||
|
||||
if tenant_shard.generation != Some(generation) {
|
||||
return Ok(ShardGenerationValidity::Mismatched {
|
||||
claimed: generation,
|
||||
actual: tenant_shard.generation,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut db_generations = self
|
||||
.persistence
|
||||
.shard_generations(std::iter::once(&tenant_shard_id))
|
||||
.await?;
|
||||
let (_tid, db_generation) =
|
||||
db_generations
|
||||
.pop()
|
||||
.ok_or(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"{} shard not found",
|
||||
tenant_shard_id
|
||||
)))?;
|
||||
|
||||
if db_generation != Some(generation) {
|
||||
return Ok(ShardGenerationValidity::Mismatched {
|
||||
claimed: generation,
|
||||
actual: db_generation,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(ShardGenerationValidity::Valid)
|
||||
}
|
||||
|
||||
/// Finalize the import of a timeline
|
||||
///
|
||||
/// This method should be called once all shards have reported that the import is complete.
|
||||
|
||||
Reference in New Issue
Block a user