diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 778c5d925b..5b0b646728 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -3272,30 +3272,12 @@ impl Service { Ok(()) } - pub(crate) async fn tenant_timeline_create( + async fn tenant_timeline_create_pageservers( &self, tenant_id: TenantId, mut create_req: TimelineCreateRequest, - ) -> Result { - let safekeepers = create_req.safekeepers.unwrap_or_default(); - tracing::info!( - %safekeepers, - "Creating timeline {}/{}", - tenant_id, - create_req.new_timeline_id, - ); - - let _tenant_lock = trace_shared_lock( - &self.tenant_op_locks, - tenant_id, - TenantOperations::TimelineCreate, - ) - .await; - failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock"); - let timeline_id = create_req.new_timeline_id; - let create_mode = create_req.mode.clone(); - - let timeline_info = self.tenant_remote_mutation(tenant_id, move |mut targets| async move { + ) -> Result { + Ok(self.tenant_remote_mutation(tenant_id, move |mut targets| async move { if targets.0.is_empty() { return Err(ApiError::NotFound( anyhow::anyhow!("Tenant not found").into(), @@ -3411,178 +3393,207 @@ impl Service { } Ok(timeline_info) - }).await??; - let on_sk_fut = async { - if safekeepers { - let pg_version = timeline_info.pg_version; - let start_lsn = match create_mode { - models::TimelineCreateRequestMode::Bootstrap { .. } => { - timeline_info.last_record_lsn - } - models::TimelineCreateRequestMode::Branch { .. } => { - timeline_info.last_record_lsn - } - models::TimelineCreateRequestMode::ImportPgdata { .. } => { - // Can't do return Err because of async block, must do ? plus unreachable!() - Err(ApiError::InternalServerError(anyhow!( - "import pgdata doesn't specify the start lsn, aborting creation on safekeepers" - )))?; - unreachable!() - } - }; - - // Choose initial set of safekeepers respecting affinity - let sks = self.safekeepers_for_new_timeline().await?; - let sks_persistence = sks.iter().map(|sk| sk.0 as i64).collect::>(); - // Add timeline to db - let timeline_persist = TimelinePersistence { - tenant_id: tenant_id.to_string(), - timeline_id: timeline_id.to_string(), - generation: 0, - sk_set: sks_persistence.clone(), - new_sk_set: sks_persistence.clone(), - cplane_notified_generation: 0, - status: String::from(TimelineStatus::Creating), - }; - self.persistence.insert_timeline(timeline_persist).await?; - // reconcile: create timeline on safekeepers - // If quorum is reached, return if we are outside of a specified timeout - let sk_persistences = self - .persistence - .list_safekeepers() - .await? - .into_iter() - .map(|p| (p.id, p)) - .collect::>(); - let jwt = self.config.jwt_token.clone().map(SecretString::from); - let mut joinset = JoinSet::new(); - - let mut members = Vec::new(); - for sk in sks.iter() { - let Some(sk_p) = sk_persistences.get(&(sk.0 as i64)) else { - // Can't do return Err because of async block, must do ? plus unreachable!() - Err(ApiError::InternalServerError(anyhow!( - "couldn't find persisted entry for safekeeper with id {sk}" - )))?; - unreachable!() - }; - members.push(SafekeeperId { - id: NodeId(sk_p.id as u64), - host: sk_p.host.clone(), - pg_port: sk_p.port as u16, - }); - } - let mut mconf = safekeeper_api::membership::Configuration::empty(); - mconf.members = MemberSet::new(members).map_err(ApiError::InternalServerError)?; - - let req = safekeeper_api::models::TimelineCreateRequest { - commit_lsn: None, - mconf, - pg_version, - start_lsn, - system_id: None, - tenant_id, - timeline_id, - wal_seg_size: None, - }; - for sk in sks.iter() { - // Unwrap is fine as we already would have returned error above - let sk_p = sk_persistences.get(&(sk.0 as i64)).unwrap(); - let sk_clone = *sk; - let base_url = sk_p.base_url(); - let jwt = jwt.clone(); - let req = req.clone(); - joinset.spawn(async move { - let client = SafekeeperClient::new(sk_clone, base_url, jwt); - // TODO: logging on error, retries - client.create_timeline(req).await.map_err(|e| { - ApiError::InternalServerError( - anyhow::Error::new(e) - .context("error creating timeline on safekeeper"), - ) - }) - }); - } - // After we have built the joinset, we now wait for the tasks to complete, - // but with a specified timeout to make sure we return swiftly, either with - // a failure or success. - const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); - let reconcile_deadline = - tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT; - - // Treat the first two tasks to finish differently, mostly when they timeout, - // because then we won't have a successful quorum. - // For the third task, we don't rely on it succeeding, and we need this to support - // continuing operations even if one safekeeper is down. - let timeout_or_quorum = tokio::time::timeout_at(reconcile_deadline, async { - ( - joinset.join_next().await.unwrap(), - joinset.join_next().await.unwrap(), - ) - }) - .await; - let mut reconcile_results = Vec::new(); - match timeout_or_quorum { - Ok((Ok(res_1), Ok(res_2))) => { - reconcile_results.push(res_1); - reconcile_results.push(res_2); - } - Ok((Err(_), Ok(_)) | (_, Err(_))) => { - Err(ApiError::InternalServerError(anyhow!( - "task was cancelled while reconciling timeline creation" - )))?; - unreachable!() - } - Err(_) => { - Err(ApiError::InternalServerError(anyhow!( - "couldn't reconcile timeline creation on safekeepers within timeout" - )))?; - unreachable!() - } - } - let timeout_or_last = tokio::time::timeout_at( - reconcile_deadline, - joinset.join_next().map(Option::unwrap), - ) - .await; - if let Ok(Ok(res)) = timeout_or_last { - reconcile_results.push(res); - } else { - // No error if cancelled or timed out: we already have feedback from a quorum of safekeepers - // TODO: maybe log? - } - // check now if quorum was reached in reconcile_results - let successful = reconcile_results - .into_iter() - .filter_map(|res| res.ok()) - .collect::>(); - tracing::info!( - "Got {} successful results from reconciliation", - successful.len() - ); - if successful.len() < 2 { - // Failure - } else if successful.len() == 3 { - // Success, state of timeline is - } else if successful.len() == 2 { - } else { - unreachable!( - "unexpected amount of successful reconciliations {}", - successful.len() - ); - } - // TODO - // TODO update database state from "creating" to "created" or something - - // notify cplane about creation - // TODO (this should probably be in a function so that the reconciler can use it too) - Ok::<_, ApiError>((Some(0), Some(sks))) - } else { - Ok((None, None)) + }).await??) + } + async fn tenant_timeline_create_safekeepers( + &self, + tenant_id: TenantId, + timeline_info: &TimelineInfo, + create_mode: models::TimelineCreateRequestMode, + ) -> Result<(u32, Vec), ApiError> { + let timeline_id = timeline_info.timeline_id; + let pg_version = timeline_info.pg_version; + let start_lsn = match create_mode { + models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn, + models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn, + models::TimelineCreateRequestMode::ImportPgdata { .. } => { + // Can't do return Err because of async block, must do ? plus unreachable!() + Err(ApiError::InternalServerError(anyhow!( + "import pgdata doesn't specify the start lsn, aborting creation on safekeepers" + )))?; + unreachable!() } }; - let on_sk_res = on_sk_fut.await; - let (safekeepers_generation, safekeepers) = on_sk_res?; + + // Choose initial set of safekeepers respecting affinity + let sks = self.safekeepers_for_new_timeline().await?; + let sks_persistence = sks.iter().map(|sk| sk.0 as i64).collect::>(); + // Add timeline to db + let timeline_persist = TimelinePersistence { + tenant_id: tenant_id.to_string(), + timeline_id: timeline_id.to_string(), + generation: 0, + sk_set: sks_persistence.clone(), + new_sk_set: sks_persistence.clone(), + cplane_notified_generation: 0, + status: String::from(TimelineStatus::Creating), + }; + self.persistence.insert_timeline(timeline_persist).await?; + // reconcile: create timeline on safekeepers + // If quorum is reached, return if we are outside of a specified timeout + let sk_persistences = self + .persistence + .list_safekeepers() + .await? + .into_iter() + .map(|p| (p.id, p)) + .collect::>(); + let jwt = self.config.jwt_token.clone().map(SecretString::from); + let mut joinset = JoinSet::new(); + + let mut members = Vec::new(); + for sk in sks.iter() { + let Some(sk_p) = sk_persistences.get(&(sk.0 as i64)) else { + // Can't do return Err because of async block, must do ? plus unreachable!() + Err(ApiError::InternalServerError(anyhow!( + "couldn't find persisted entry for safekeeper with id {sk}" + )))?; + unreachable!() + }; + members.push(SafekeeperId { + id: NodeId(sk_p.id as u64), + host: sk_p.host.clone(), + pg_port: sk_p.port as u16, + }); + } + let mut mconf = safekeeper_api::membership::Configuration::empty(); + mconf.members = MemberSet::new(members).map_err(ApiError::InternalServerError)?; + + let req = safekeeper_api::models::TimelineCreateRequest { + commit_lsn: None, + mconf, + pg_version, + start_lsn, + system_id: None, + tenant_id, + timeline_id, + wal_seg_size: None, + }; + for sk in sks.iter() { + // Unwrap is fine as we already would have returned error above + let sk_p = sk_persistences.get(&(sk.0 as i64)).unwrap(); + let sk_clone = *sk; + let base_url = sk_p.base_url(); + let jwt = jwt.clone(); + let req = req.clone(); + joinset.spawn(async move { + let client = SafekeeperClient::new(sk_clone, base_url, jwt); + // TODO: logging on error, retries + client.create_timeline(req).await.map_err(|e| { + ApiError::InternalServerError( + anyhow::Error::new(e).context("error creating timeline on safekeeper"), + ) + }) + }); + } + // After we have built the joinset, we now wait for the tasks to complete, + // but with a specified timeout to make sure we return swiftly, either with + // a failure or success. + const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); + let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT; + + // Treat the first two tasks to finish differently, mostly when they timeout, + // because then we won't have a successful quorum. + // For the third task, we don't rely on it succeeding, and we need this to support + // continuing operations even if one safekeeper is down. + let timeout_or_quorum = tokio::time::timeout_at(reconcile_deadline, async { + ( + joinset.join_next().await.unwrap(), + joinset.join_next().await.unwrap(), + ) + }) + .await; + let mut reconcile_results = Vec::new(); + match timeout_or_quorum { + Ok((Ok(res_1), Ok(res_2))) => { + reconcile_results.push(res_1); + reconcile_results.push(res_2); + } + Ok((Err(_), Ok(_)) | (_, Err(_))) => { + Err(ApiError::InternalServerError(anyhow!( + "task was cancelled while reconciling timeline creation" + )))?; + unreachable!() + } + Err(_) => { + Err(ApiError::InternalServerError(anyhow!( + "couldn't reconcile timeline creation on safekeepers within timeout" + )))?; + unreachable!() + } + } + let timeout_or_last = + tokio::time::timeout_at(reconcile_deadline, joinset.join_next().map(Option::unwrap)) + .await; + if let Ok(Ok(res)) = timeout_or_last { + reconcile_results.push(res); + } else { + // No error if cancelled or timed out: we already have feedback from a quorum of safekeepers + // TODO: maybe log? + } + // check now if quorum was reached in reconcile_results + let successful = reconcile_results + .into_iter() + .filter_map(|res| res.ok()) + .collect::>(); + tracing::info!( + "Got {} successful results from reconciliation", + successful.len() + ); + if successful.len() < 2 { + // Failure + } else if successful.len() == 3 { + // Success, state of timeline is + } else if successful.len() == 2 { + } else { + unreachable!( + "unexpected amount of successful reconciliations {}", + successful.len() + ); + } + // TODO + // TODO update database state from "creating" to "created" or something + + // notify cplane about creation + // TODO (this should probably be in a function so that the reconciler can use it too) + Ok((0, sks)) + } + + pub(crate) async fn tenant_timeline_create( + &self, + tenant_id: TenantId, + create_req: TimelineCreateRequest, + ) -> Result { + let safekeepers = create_req.safekeepers.unwrap_or_default(); + tracing::info!( + %safekeepers, + "Creating timeline {}/{}", + tenant_id, + create_req.new_timeline_id, + ); + + let _tenant_lock = trace_shared_lock( + &self.tenant_op_locks, + tenant_id, + TenantOperations::TimelineCreate, + ) + .await; + failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock"); + let create_mode = create_req.mode.clone(); + + let timeline_info = self + .tenant_timeline_create_pageservers(tenant_id, create_req) + .await?; + + let (safekeepers_generation, safekeepers) = if safekeepers { + let res = self + .tenant_timeline_create_safekeepers(tenant_id, &timeline_info, create_mode) + .await?; + (Some(res.0), Some(res.1)) + } else { + (None, None) + }; + Ok(TimelineCreateResponseStorcon { timeline_info, safekeepers_generation,