diff --git a/storage_controller/migrations/2025-01-15-000118_safekeeper_timelines/up.sql b/storage_controller/migrations/2025-01-15-000118_safekeeper_timelines/up.sql index 18daa5e987..749d727827 100644 --- a/storage_controller/migrations/2025-01-15-000118_safekeeper_timelines/up.sql +++ b/storage_controller/migrations/2025-01-15-000118_safekeeper_timelines/up.sql @@ -8,3 +8,4 @@ CREATE TABLE timelines ( cplane_notified_generation INTEGER NOT NULL, status VARCHAR NOT NULL ); +CREATE INDEX timelines_idx ON timelines(status, tenant_id, timeline_id); \ No newline at end of file diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 31ef00f32b..77a04340a1 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1073,13 +1073,17 @@ impl Persistence { az_id: String, #[diesel(sql_type = diesel::sql_types::Int8)] timeline_count: i64, + #[diesel(sql_type = diesel::sql_types::Varchar)] + host: String, + #[diesel(sql_type = diesel::sql_types::Int8)] + port: i64, } let safekeepers: Vec = self .with_measured_conn( DatabaseOperation::ListSafekeepers, move |conn| -> DatabaseResult<_> { let query = diesel::sql_query("\ - SELECT safekeepers.id as sk_id, safekeepers.availability_zone_id as az_id, COUNT(*) as timeline_count \ + SELECT safekeepers.id as sk_id, safekeepers.availability_zone_id as az_id, COUNT(*) as timeline_count, safekeepers.host as host, safekeepers.port as port \ FROM (select tenant_id, timeline_id, unnest(sk_set) as sk_id from timelines) as timelines_unnested \ JOIN safekeepers ON (safekeepers.id = timelines_unnested.id)\ "); @@ -1458,3 +1462,36 @@ pub(crate) struct TimelinePersistence { pub(crate) cplane_notified_generation: i32, pub(crate) status: String, } + +#[derive(PartialEq, Eq, Copy, Clone, Debug)] +pub(crate) enum TimelineStatus { + Creating, + Created, + Deleting, + Deleted, +} + +impl FromStr for TimelineStatus { + type Err = anyhow::Error; + fn from_str(s: &str) -> Result { + Ok(match s { + "creating" => TimelineStatus::Creating, + "created" => TimelineStatus::Created, + "deleting" => TimelineStatus::Deleting, + "deleted" => TimelineStatus::Deleted, + _ => return Err(anyhow::anyhow!("unexpected timeline status: {s}")), + }) + } +} + +impl From for String { + fn from(value: TimelineStatus) -> Self { + match value { + TimelineStatus::Creating => "creating", + TimelineStatus::Created => "created", + TimelineStatus::Deleting => "deleting", + TimelineStatus::Deleted => "deleted", + } + .to_string() + } +} diff --git a/storage_controller/src/safekeeper_client.rs b/storage_controller/src/safekeeper_client.rs index b48fe46e19..2ade4c1f33 100644 --- a/storage_controller/src/safekeeper_client.rs +++ b/storage_controller/src/safekeeper_client.rs @@ -63,7 +63,10 @@ impl SafekeeperClient { } } - pub(crate) async fn create_timeline(&self, req: TimelineCreateRequest) -> Result { + pub(crate) async fn create_timeline( + &self, + req: &TimelineCreateRequest, + ) -> Result { measured_request!( "create_timeline", crate::metrics::Method::Post, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index f0184e576f..778c5d925b 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -2,6 +2,7 @@ pub mod chaos_injector; mod context_iterator; use hyper::Uri; +use safekeeper_api::membership::{MemberSet, SafekeeperId}; use std::{ borrow::Cow, cmp::Ordering, @@ -26,7 +27,7 @@ use crate::{ peer_client::GlobalObservedState, persistence::{ AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence, - ShardGenerationState, TenantFilter, TimelinePersistence, + ShardGenerationState, TenantFilter, TimelinePersistence, TimelineStatus, }, reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder}, safekeeper_client::SafekeeperClient, @@ -85,7 +86,6 @@ use utils::{ http::error::ApiError, id::{NodeId, TenantId, TimelineId}, logging::SecretString, - lsn::Lsn, pausable_failpoint, sync::gate::Gate, }; @@ -3292,9 +3292,10 @@ impl Service { ) .await; failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock"); - let timeline_id = create_req.new_timeline_id.clone(); + let timeline_id = create_req.new_timeline_id; + let create_mode = create_req.mode.clone(); - let on_ps_fut = self.tenant_remote_mutation(tenant_id, move |mut targets| async move { + let timeline_info = self.tenant_remote_mutation(tenant_id, move |mut targets| async move { if targets.0.is_empty() { return Err(ApiError::NotFound( anyhow::anyhow!("Tenant not found").into(), @@ -3410,9 +3411,26 @@ impl Service { } Ok(timeline_info) - }); + }).await??; let on_sk_fut = async { if safekeepers { + let pg_version = timeline_info.pg_version; + let start_lsn = match create_mode { + models::TimelineCreateRequestMode::Bootstrap { .. } => { + timeline_info.last_record_lsn + } + models::TimelineCreateRequestMode::Branch { .. } => { + timeline_info.last_record_lsn + } + models::TimelineCreateRequestMode::ImportPgdata { .. } => { + // Can't do return Err because of async block, must do ? plus unreachable!() + Err(ApiError::InternalServerError(anyhow!( + "import pgdata doesn't specify the start lsn, aborting creation on safekeepers" + )))?; + unreachable!() + } + }; + // Choose initial set of safekeepers respecting affinity let sks = self.safekeepers_for_new_timeline().await?; let sks_persistence = sks.iter().map(|sk| sk.0 as i64).collect::>(); @@ -3424,24 +3442,11 @@ impl Service { sk_set: sks_persistence.clone(), new_sk_set: sks_persistence.clone(), cplane_notified_generation: 0, - status: "creating".to_string(), + status: String::from(TimelineStatus::Creating), }; self.persistence.insert_timeline(timeline_persist).await?; // reconcile: create timeline on safekeepers // If quorum is reached, return if we are outside of a specified timeout - let pg_version = 0; // TODO: pg_version - let start_lsn = Lsn::INVALID; // TODO: start_lsn - - let req = safekeeper_api::models::TimelineCreateRequest { - commit_lsn: None, - mconf: safekeeper_api::membership::Configuration::empty(), - pg_version, - start_lsn, - system_id: None, - tenant_id: tenant_id.clone(), - timeline_id: timeline_id.clone(), - wal_seg_size: None, - }; let sk_persistences = self .persistence .list_safekeepers() @@ -3451,6 +3456,8 @@ impl Service { .collect::>(); let jwt = self.config.jwt_token.clone().map(SecretString::from); let mut joinset = JoinSet::new(); + + let mut members = Vec::new(); for sk in sks.iter() { let Some(sk_p) = sk_persistences.get(&(sk.0 as i64)) else { // Can't do return Err because of async block, must do ? plus unreachable!() @@ -3459,6 +3466,28 @@ impl Service { )))?; unreachable!() }; + members.push(SafekeeperId { + id: NodeId(sk_p.id as u64), + host: sk_p.host.clone(), + pg_port: sk_p.port as u16, + }); + } + let mut mconf = safekeeper_api::membership::Configuration::empty(); + mconf.members = MemberSet::new(members).map_err(ApiError::InternalServerError)?; + + let req = safekeeper_api::models::TimelineCreateRequest { + commit_lsn: None, + mconf, + pg_version, + start_lsn, + system_id: None, + tenant_id, + timeline_id, + wal_seg_size: None, + }; + for sk in sks.iter() { + // Unwrap is fine as we already would have returned error above + let sk_p = sk_persistences.get(&(sk.0 as i64)).unwrap(); let sk_clone = *sk; let base_url = sk_p.base_url(); let jwt = jwt.clone(); @@ -3474,11 +3503,17 @@ impl Service { }) }); } + // After we have built the joinset, we now wait for the tasks to complete, + // but with a specified timeout to make sure we return swiftly, either with + // a failure or success. const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT; - // Return within the timeout, success if a quorum was reached, then do backgroud reconciliation. + // Treat the first two tasks to finish differently, mostly when they timeout, + // because then we won't have a successful quorum. + // For the third task, we don't rely on it succeeding, and we need this to support + // continuing operations even if one safekeeper is down. let timeout_or_quorum = tokio::time::timeout_at(reconcile_deadline, async { ( joinset.join_next().await.unwrap(), @@ -3517,6 +3552,25 @@ impl Service { // TODO: maybe log? } // check now if quorum was reached in reconcile_results + let successful = reconcile_results + .into_iter() + .filter_map(|res| res.ok()) + .collect::>(); + tracing::info!( + "Got {} successful results from reconciliation", + successful.len() + ); + if successful.len() < 2 { + // Failure + } else if successful.len() == 3 { + // Success, state of timeline is + } else if successful.len() == 2 { + } else { + unreachable!( + "unexpected amount of successful reconciliations {}", + successful.len() + ); + } // TODO // TODO update database state from "creating" to "created" or something @@ -3527,9 +3581,8 @@ impl Service { Ok((None, None)) } }; - let (on_ps_res, on_sk_res) = tokio::join!(on_ps_fut, on_sk_fut); + let on_sk_res = on_sk_fut.await; let (safekeepers_generation, safekeepers) = on_sk_res?; - let timeline_info = on_ps_res??; Ok(TimelineCreateResponseStorcon { timeline_info, safekeepers_generation, @@ -7801,11 +7854,11 @@ impl Service { } } if sks.len() == 3 { - return Ok(sks); + Ok(sks) } else { - return Err(ApiError::InternalServerError(anyhow!( + Err(ApiError::InternalServerError(anyhow!( "couldn't find three safekeepers in different AZs for new timeline" - ))); + ))) } }