diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index ca5b1cff36..fbe79cc9b2 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -24,6 +24,7 @@ use serde::{Deserialize, Serialize}; use utils::generation::Generation; use utils::id::TimelineId; use utils::id::{NodeId, TenantId}; +use utils::lsn::Lsn; use crate::metrics::{ DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY, @@ -1227,7 +1228,7 @@ impl Persistence { &self, tenant_id: TenantId, timeline_id: TimelineId, - timeline_status: TimelineStatus, + timeline_status: TimelineStatusKind, ) -> DatabaseResult<()> { use crate::schema::timelines; @@ -1295,8 +1296,8 @@ impl Persistence { Ok(timelines::table .filter( timelines::status - .eq(String::from(TimelineStatus::Creating)) - .or(timelines::status.eq(String::from(TimelineStatus::Deleting))), + .eq(String::from(TimelineStatusKind::Creating)) + .or(timelines::status.eq(String::from(TimelineStatusKind::Deleting))), ) .load::(conn)?) }, @@ -1542,7 +1543,7 @@ struct InsertUpdateSafekeeper<'a> { scheduling_policy: Option<&'a str>, } -#[derive(Insertable, AsChangeset, Queryable, Selectable)] +#[derive(Insertable, AsChangeset, Queryable, Selectable, Clone)] #[diesel(table_name = crate::schema::timelines)] pub(crate) struct TimelinePersistence { pub(crate) tenant_id: String, @@ -1551,6 +1552,7 @@ pub(crate) struct TimelinePersistence { pub(crate) sk_set: Vec, pub(crate) new_sk_set: Vec, pub(crate) cplane_notified_generation: i32, + pub(crate) status_kind: String, pub(crate) status: String, } @@ -1563,6 +1565,7 @@ pub(crate) struct TimelineFromDb { pub(crate) sk_set: Vec>, pub(crate) new_sk_set: Vec>, pub(crate) cplane_notified_generation: i32, + pub(crate) status_kind: String, pub(crate) status: String, } @@ -1575,40 +1578,47 @@ impl TimelineFromDb { sk_set: self.sk_set.into_iter().filter_map(|v| v).collect(), new_sk_set: self.new_sk_set.into_iter().filter_map(|v| v).collect(), cplane_notified_generation: self.cplane_notified_generation, + status_kind: self.status_kind, status: self.status, } } } #[derive(PartialEq, Eq, Copy, Clone, Debug)] -pub(crate) enum TimelineStatus { +pub(crate) enum TimelineStatusKind { Creating, Created, Deleting, Deleted, } -impl FromStr for TimelineStatus { +impl FromStr for TimelineStatusKind { type Err = anyhow::Error; fn from_str(s: &str) -> Result { Ok(match s { - "creating" => TimelineStatus::Creating, - "created" => TimelineStatus::Created, - "deleting" => TimelineStatus::Deleting, - "deleted" => TimelineStatus::Deleted, + "creating" => TimelineStatusKind::Creating, + "created" => TimelineStatusKind::Created, + "deleting" => TimelineStatusKind::Deleting, + "deleted" => TimelineStatusKind::Deleted, _ => return Err(anyhow::anyhow!("unexpected timeline status: {s}")), }) } } -impl From for String { - fn from(value: TimelineStatus) -> Self { +impl From for String { + fn from(value: TimelineStatusKind) -> Self { match value { - TimelineStatus::Creating => "creating", - TimelineStatus::Created => "created", - TimelineStatus::Deleting => "deleting", - TimelineStatus::Deleted => "deleted", + TimelineStatusKind::Creating => "creating", + TimelineStatusKind::Created => "created", + TimelineStatusKind::Deleting => "deleting", + TimelineStatusKind::Deleted => "deleted", } .to_string() } } + +#[derive(Serialize, Deserialize)] +pub(crate) struct TimelineStatusCreating { + pub(crate) pg_version: u32, + pub(crate) start_lsn: Lsn, +} \ No newline at end of file diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 0afa5ab4d3..83de6a1a71 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -28,7 +28,8 @@ use crate::{ peer_client::GlobalObservedState, persistence::{ AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence, - ShardGenerationState, TenantFilter, TimelinePersistence, TimelineStatus, + SafekeeperPersistence, ShardGenerationState, TenantFilter, TimelinePersistence, + TimelineStatusCreating, TimelineStatusKind, }, reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder}, safekeeper_client::SafekeeperClient, @@ -3399,54 +3400,24 @@ impl Service { }).await??) } - async fn tenant_timeline_create_safekeepers( + /// reconcile: create timeline on safekeepers + /// + /// Assumes tenant lock is held while calling this function + async fn tenant_timeline_create_safekeepers_reconcile( &self, tenant_id: TenantId, - timeline_info: &TimelineInfo, - create_mode: models::TimelineCreateRequestMode, - ) -> Result<(u32, Vec), ApiError> { - let timeline_id = timeline_info.timeline_id; - let pg_version = timeline_info.pg_version; - let start_lsn = match create_mode { - models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn, - models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn, - models::TimelineCreateRequestMode::ImportPgdata { .. } => { - // Can't do return Err because of async block, must do ? plus unreachable!() - return Err(ApiError::InternalServerError(anyhow!( - "import pgdata doesn't specify the start lsn, aborting creation on safekeepers" - )))?; - } - }; - - // Choose initial set of safekeepers respecting affinity - let sks = self.safekeepers_for_new_timeline().await?; - let sks_persistence = sks.iter().map(|sk| sk.0 as i64).collect::>(); - // Add timeline to db - let timeline_persist = TimelinePersistence { - tenant_id: tenant_id.to_string(), - timeline_id: timeline_id.to_string(), - generation: 0, - sk_set: sks_persistence.clone(), - new_sk_set: sks_persistence.clone(), - cplane_notified_generation: 0, - status: String::from(TimelineStatus::Creating), - }; - self.persistence.insert_timeline(timeline_persist).await?; - // reconcile: create timeline on safekeepers + timeline_id: TimelineId, + timeline_persistence: &TimelinePersistence, + status_creating: &TimelineStatusCreating, + sk_persistences: &HashMap, + ) -> Result<(), ApiError> { // If quorum is reached, return if we are outside of a specified timeout - let sk_persistences = self - .persistence - .list_safekeepers() - .await? - .into_iter() - .map(|p| (p.id, p)) - .collect::>(); let jwt = self.config.jwt_token.clone().map(SecretString::from); let mut joinset = JoinSet::new(); let mut members = Vec::new(); - for sk in sks.iter() { - let Some(sk_p) = sk_persistences.get(&(sk.0 as i64)) else { + for sk in timeline_persistence.sk_set.iter() { + let Some(sk_p) = sk_persistences.get(&sk) else { return Err(ApiError::InternalServerError(anyhow!( "couldn't find persisted entry for safekeeper with id {sk}" )))?; @@ -3463,17 +3434,17 @@ impl Service { let req = safekeeper_api::models::TimelineCreateRequest { commit_lsn: None, mconf, - pg_version, - start_lsn, + pg_version: status_creating.pg_version, + start_lsn: status_creating.start_lsn, system_id: None, tenant_id, timeline_id, wal_seg_size: None, }; - for sk in sks.iter() { + for sk in timeline_persistence.sk_set.iter() { // Unwrap is fine as we already would have returned error above - let sk_p = sk_persistences.get(&(sk.0 as i64)).unwrap(); - let sk_clone = *sk; + let sk_p = sk_persistences.get(&sk).unwrap(); + let sk_clone = NodeId(*sk as u64); let base_url = sk_p.base_url(); let jwt = jwt.clone(); let req = req.clone(); @@ -3556,7 +3527,7 @@ impl Service { "Got {} successful results from reconciliation", successful.len() ); - if successful.len() < 2 { + let status_kind = if successful.len() < 2 { // Failure return Err(ApiError::InternalServerError(anyhow!( "not enough successful reconciliations to reach quorum, please retry: {}", @@ -3564,20 +3535,82 @@ impl Service { ))); } else if successful.len() == 3 { // Success, state of timeline is Created - self.persistence - .update_timeline(tenant_id, timeline_id, TimelineStatus::Created) - .await?; + TimelineStatusKind::Created } else if successful.len() == 2 { // Success, state of timeline remains Creating + TimelineStatusKind::Creating } else { unreachable!( "unexpected number of successful reconciliations {}", successful.len() ); - } + }; // notify cplane about creation - // TODO (this should probably be in a function so that the reconciler can use it too) + // TODO + + self.persistence + .update_timeline(tenant_id, timeline_id, status_kind) + .await?; + Ok(()) + } + + async fn tenant_timeline_create_safekeepers( + &self, + tenant_id: TenantId, + timeline_info: &TimelineInfo, + create_mode: models::TimelineCreateRequestMode, + ) -> Result<(u32, Vec), ApiError> { + let timeline_id = timeline_info.timeline_id; + let pg_version = timeline_info.pg_version; + let start_lsn = match create_mode { + models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn, + models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn, + models::TimelineCreateRequestMode::ImportPgdata { .. } => { + // Can't do return Err because of async block, must do ? plus unreachable!() + return Err(ApiError::InternalServerError(anyhow!( + "import pgdata doesn't specify the start lsn, aborting creation on safekeepers" + )))?; + } + }; + + // Choose initial set of safekeepers respecting affinity + let sks = self.safekeepers_for_new_timeline().await?; + let sks_persistence = sks.iter().map(|sk| sk.0 as i64).collect::>(); + let status_creating = TimelineStatusCreating { + pg_version, + start_lsn, + }; + let status = serde_json::to_string(&status_creating).unwrap(); + // Add timeline to db + let timeline_persist = TimelinePersistence { + tenant_id: tenant_id.to_string(), + timeline_id: timeline_id.to_string(), + generation: 0, + sk_set: sks_persistence.clone(), + new_sk_set: sks_persistence.clone(), + cplane_notified_generation: 0, + status_kind: String::from(TimelineStatusKind::Creating), + status, + }; + self.persistence + .insert_timeline(timeline_persist.clone()) + .await?; + let sk_persistences = self + .persistence + .list_safekeepers() + .await? + .into_iter() + .map(|p| (p.id, p)) + .collect::>(); + self.tenant_timeline_create_safekeepers_reconcile( + tenant_id, + timeline_id, + &timeline_persist, + &status_creating, + &sk_persistences, + ) + .await?; Ok((0, sks)) } diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs index b5e9008c2a..74baea31c1 100644 --- a/storage_controller/src/service/safekeeper_reconciler.rs +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -1,4 +1,4 @@ -use std::{str::FromStr, sync::Arc, time::Duration}; +use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; use tokio_util::sync::CancellationToken; use tracing::Instrument; @@ -9,7 +9,8 @@ use utils::{ use crate::{ id_lock_map::trace_shared_lock, - service::{TenantOperations, TimelineStatus}, + persistence::SafekeeperPersistence, + service::{TenantOperations, TimelineStatusCreating, TimelineStatusKind}, }; use super::{Service, TimelinePersistence}; @@ -43,12 +44,25 @@ impl SafekeeperReconciler { .persistence .timelines_to_be_reconciled() .await?; + if work_list.is_empty() { + return Ok(()); + } + let sk_persistences = self + .service + .persistence + .list_safekeepers() + .await? + .into_iter() + .map(|p| (p.id, p)) + .collect::>(); for tl in work_list { - let reconcile_fut = self.reconcile_timeline(&tl).instrument(tracing::info_span!( - "safekeeper_reconcile_timeline", - timeline_id = tl.timeline_id, - tenant_id = tl.tenant_id - )); + let reconcile_fut = + self.reconcile_timeline(&tl, &sk_persistences) + .instrument(tracing::info_span!( + "safekeeper_reconcile_timeline", + timeline_id = tl.timeline_id, + tenant_id = tl.tenant_id + )); tokio::select! { r = reconcile_fut => r?, @@ -57,7 +71,11 @@ impl SafekeeperReconciler { } Ok(()) } - async fn reconcile_timeline(&self, tl: &TimelinePersistence) -> Result<(), anyhow::Error> { + async fn reconcile_timeline( + &self, + tl: &TimelinePersistence, + sk_persistences: &HashMap, + ) -> Result<(), anyhow::Error> { tracing::info!( "Reconciling timeline on safekeepers {}/{}", tl.tenant_id, @@ -80,15 +98,25 @@ impl SafekeeperReconciler { .persistence .get_timeline(tenant_id, timeline_id) .await?; - let status = TimelineStatus::from_str(&tl.status)?; + let status = TimelineStatusKind::from_str(&tl.status)?; match status { - TimelineStatus::Created | TimelineStatus::Deleted => return Ok(()), - TimelineStatus::Creating => { - todo!() + TimelineStatusKind::Created | TimelineStatusKind::Deleted => (), + TimelineStatusKind::Creating => { + let status_creating: TimelineStatusCreating = serde_json::from_str(&tl.status)?; + self.service + .tenant_timeline_create_safekeepers_reconcile( + tenant_id, + timeline_id, + &tl, + &status_creating, + sk_persistences, + ) + .await?; } - TimelineStatus::Deleting => { + TimelineStatusKind::Deleting => { todo!() } } + Ok(()) } }