diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 77a04340a1..47f069bb59 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -22,6 +22,7 @@ use pageserver_api::shard::ShardStripeSize; use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId}; use serde::{Deserialize, Serialize}; use utils::generation::Generation; +use utils::id::TimelineId; use utils::id::{NodeId, TenantId}; use crate::metrics::{ @@ -1220,6 +1221,35 @@ impl Persistence { ) .await } + pub(crate) async fn update_timeline( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + timeline_status: TimelineStatus, + ) -> DatabaseResult<()> { + use crate::schema::timelines; + + self.with_measured_conn( + DatabaseOperation::InsertTimeline, + move |conn| -> DatabaseResult<()> { + let inserted_updated = diesel::update(timelines::table) + .filter(timelines::tenant_id.eq(tenant_id.to_string())) + .filter(timelines::timeline_id.eq(timeline_id.to_string())) + .set(timelines::status.eq(String::from(timeline_status))) + .execute(conn)?; + + if inserted_updated != 1 { + return Err(DatabaseError::Logical(format!( + "unexpected number of rows ({})", + inserted_updated + ))); + } + + Ok(()) + }, + ) + .await + } } /// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 5b0b646728..1dabf2b273 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -3395,6 +3395,7 @@ impl Service { Ok(timeline_info) }).await??) } + async fn tenant_timeline_create_safekeepers( &self, tenant_id: TenantId, @@ -3408,10 +3409,9 @@ impl Service { models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn, models::TimelineCreateRequestMode::ImportPgdata { .. } => { // Can't do return Err because of async block, must do ? plus unreachable!() - Err(ApiError::InternalServerError(anyhow!( + return Err(ApiError::InternalServerError(anyhow!( "import pgdata doesn't specify the start lsn, aborting creation on safekeepers" )))?; - unreachable!() } }; @@ -3444,11 +3444,9 @@ impl Service { let mut members = Vec::new(); for sk in sks.iter() { let Some(sk_p) = sk_persistences.get(&(sk.0 as i64)) else { - // Can't do return Err because of async block, must do ? plus unreachable!() - Err(ApiError::InternalServerError(anyhow!( + return Err(ApiError::InternalServerError(anyhow!( "couldn't find persisted entry for safekeeper with id {sk}" )))?; - unreachable!() }; members.push(SafekeeperId { id: NodeId(sk_p.id as u64), @@ -3510,16 +3508,14 @@ impl Service { reconcile_results.push(res_2); } Ok((Err(_), Ok(_)) | (_, Err(_))) => { - Err(ApiError::InternalServerError(anyhow!( + return Err(ApiError::InternalServerError(anyhow!( "task was cancelled while reconciling timeline creation" - )))?; - unreachable!() + ))); } Err(_) => { - Err(ApiError::InternalServerError(anyhow!( + return Err(ApiError::InternalServerError(anyhow!( "couldn't reconcile timeline creation on safekeepers within timeout" - )))?; - unreachable!() + ))); } } let timeout_or_last = @@ -3542,17 +3538,23 @@ impl Service { ); if successful.len() < 2 { // Failure + return Err(ApiError::InternalServerError(anyhow!( + "not enough successful reconciliations to reach quorum, please retry: {}", + successful.len() + ))); } else if successful.len() == 3 { - // Success, state of timeline is + // Success, state of timeline is Created + self.persistence + .update_timeline(tenant_id, timeline_id, TimelineStatus::Created) + .await?; } else if successful.len() == 2 { + // Success, state of timeline remains Creating } else { unreachable!( - "unexpected amount of successful reconciliations {}", + "unexpected number of successful reconciliations {}", successful.len() ); } - // TODO - // TODO update database state from "creating" to "created" or something // notify cplane about creation // TODO (this should probably be in a function so that the reconciler can use it too)