diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index c73debae4c..6a7cd40bf8 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -1104,6 +1104,7 @@ async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> any existing_initdb_timeline_id: None, pg_version: Some(args.pg_version), }, + safekeepers: None, }, ) .await?; @@ -1164,6 +1165,7 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re existing_initdb_timeline_id: None, pg_version: Some(args.pg_version), }, + safekeepers: None, }; let timeline_info = storage_controller .tenant_timeline_create(tenant_id, create_req) @@ -1222,6 +1224,7 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re ancestor_start_lsn: start_lsn, pg_version: None, }, + safekeepers: None, }; let timeline_info = storage_controller .tenant_timeline_create(tenant_id, create_req) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index fd4879087f..db601f6b5f 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -280,6 +280,18 @@ pub struct TimelineCreateRequest { pub new_timeline_id: TimelineId, #[serde(flatten)] pub mode: TimelineCreateRequestMode, + /// Whether to also create timeline on the safekeepers + pub safekeepers: Option, +} + +/// Storage controller specific extensions to [`TimelineInfo`]. +#[derive(Serialize, Deserialize, Clone)] +pub struct TimelineCreateResponseStorcon { + #[serde(flatten)] + pub timeline_info: TimelineInfo, + + pub safekeepers: Option>, + pub safekeepers_generation: Option, } #[derive(Serialize, Deserialize, Clone)] diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 37bfaf1139..d4e980a581 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -108,6 +108,7 @@ pub(crate) enum DatabaseOperation { ListMetadataHealthUnhealthy, ListMetadataHealthOutdated, ListSafekeepers, + InsertTimeline, GetLeader, UpdateLeader, SetPreferredAzs, @@ -1049,7 +1050,7 @@ impl Persistence { pub(crate) async fn list_safekeepers(&self) -> DatabaseResult> { let safekeepers: Vec = self .with_measured_conn( - DatabaseOperation::ListNodes, + DatabaseOperation::ListSafekeepers, move |conn| -> DatabaseResult<_> { Ok(crate::schema::safekeepers::table.load::(conn)?) }, @@ -1061,6 +1062,60 @@ impl Persistence { Ok(safekeepers) } + pub(crate) async fn list_safekeepers_with_timeline_count( + &self, + ) -> DatabaseResult> { + #[derive(QueryableByName, PartialEq, Debug)] + struct SafekeeperTimelineCountResponse { + #[diesel(sql_type = diesel::sql_types::Int8)] + sk_id: i64, + #[diesel(sql_type = diesel::sql_types::Varchar)] + az_id: String, + #[diesel(sql_type = diesel::sql_types::Int8)] + timeline_count: i64, + } + let safekeepers: Vec = self + .with_measured_conn( + DatabaseOperation::ListSafekeepers, + move |conn| -> DatabaseResult<_> { + let query = diesel::sql_query("\ + SELECT safekeepers.id as sk_id, safekeepers.availability_zone_id as az_id, COUNT(*) as timeline_count \ + FROM (select tenant_id, timeline_id, unnest(sk_set) as sk_id from timelines) as timelines_unnested \ + JOIN safekeepers ON (safekeepers.id = timelines_unnested.id)\ + "); + let results: Vec<_> = query.load(conn)?; + Ok(results) + }, + ) + .await?; + + let safekeepers = safekeepers + .into_iter() + .map(|sk| { + if sk.sk_id < 0 { + return Err(DatabaseError::Logical(format!( + "invalid safekeeper id: {}", + sk.sk_id + ))); + } + if sk.timeline_count < 0 { + return Err(DatabaseError::Logical(format!( + "invalid timeline count {} for sk: {}", + sk.timeline_count, sk.sk_id + ))); + } + Ok((NodeId(sk.sk_id as u64), sk.az_id, sk.timeline_count as u64)) + }) + .collect::, DatabaseError>>()?; + + tracing::info!( + "list_safekeepers_with_timeline_count: loaded {} safekeepers", + safekeepers.len() + ); + + Ok(safekeepers) + } + pub(crate) async fn safekeeper_get( &self, id: i64, @@ -1135,6 +1190,32 @@ impl Persistence { }) .await } + + /// Timelines must be persisted before we schedule them for the first time. + pub(crate) async fn insert_timeline(&self, entry: TimelinePersistence) -> DatabaseResult<()> { + use crate::schema::timelines; + + self.with_measured_conn( + DatabaseOperation::InsertTimeline, + move |conn| -> DatabaseResult<()> { + let inserted_updated = diesel::insert_into(timelines::table) + .values(&entry) + .on_conflict((timelines::tenant_id, timelines::timeline_id)) + .do_nothing() + .execute(conn)?; + + if inserted_updated != 1 { + return Err(DatabaseError::Logical(format!( + "unexpected number of rows ({})", + inserted_updated + ))); + } + + Ok(()) + }, + ) + .await + } } /// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably @@ -1362,3 +1443,15 @@ struct InsertUpdateSafekeeper<'a> { availability_zone_id: &'a str, scheduling_policy: Option<&'a str>, } + +#[derive(Insertable, AsChangeset)] +#[diesel(table_name = crate::schema::timelines)] +pub(crate) struct TimelinePersistence { + pub(crate) tenant_id: String, + pub(crate) timeline_id: String, + pub(crate) generation: i32, + pub(crate) sk_set: Vec, + pub(crate) new_sk_set: Vec, + pub(crate) cplane_notified_generation: i32, + pub(crate) status: String, +} diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 9ac9ee17ca..241229aade 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -26,7 +26,7 @@ use crate::{ peer_client::GlobalObservedState, persistence::{ AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence, - ShardGenerationState, TenantFilter, + ShardGenerationState, TenantFilter, TimelinePersistence, }, reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder}, scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode}, @@ -35,7 +35,7 @@ use crate::{ ScheduleOptimization, ScheduleOptimizationAction, }, }; -use anyhow::Context; +use anyhow::{anyhow, Context}; use control_plane::storage_controller::{ AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse, }; @@ -54,7 +54,7 @@ use pageserver_api::{ }, models::{ SecondaryProgress, TenantConfigPatchRequest, TenantConfigRequest, - TimelineArchivalConfigRequest, TopTenantShardsRequest, + TimelineArchivalConfigRequest, TimelineCreateResponseStorcon, TopTenantShardsRequest, }, }; use reqwest::StatusCode; @@ -3273,8 +3273,10 @@ impl Service { &self, tenant_id: TenantId, mut create_req: TimelineCreateRequest, - ) -> Result { + ) -> Result { + let safekeepers = create_req.safekeepers.unwrap_or_default(); tracing::info!( + %safekeepers, "Creating timeline {}/{}", tenant_id, create_req.new_timeline_id, @@ -3287,8 +3289,9 @@ impl Service { ) .await; failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock"); + let timeline_id = create_req.new_timeline_id.clone(); - self.tenant_remote_mutation(tenant_id, move |mut targets| async move { + let on_ps_fut = self.tenant_remote_mutation(tenant_id, move |mut targets| async move { if targets.0.is_empty() { return Err(ApiError::NotFound( anyhow::anyhow!("Tenant not found").into(), @@ -3404,8 +3407,38 @@ impl Service { } Ok(timeline_info) + }); + let on_sk_fut = async { + if safekeepers { + // Choose initial set of safekeepers respecting affinity + let sks = self.safekeepers_for_new_timeline().await?; + let sks_persistence = sks.iter().map(|sk| sk.0 as i64).collect::>(); + // Add timeline to db + let timeline_persist = TimelinePersistence { + tenant_id: tenant_id.to_string(), + timeline_id: timeline_id.to_string(), + generation: 0, + sk_set: sks_persistence.clone(), + new_sk_set: sks_persistence.clone(), + cplane_notified_generation: 0, + status: "creating".to_string(), + }; + self.persistence.insert_timeline(timeline_persist).await?; + // TODO: reconcile: create timeline on safekeepers, return success if quorum is met after timeout (or all sks return before timeout) + // TODO: call /notify-safekeepers on cplane + Ok::<_, ApiError>((Some(0), Some(sks))) + } else { + Ok((None, None)) + } + }; + let (on_ps_res, on_sk_res) = tokio::join!(on_ps_fut, on_sk_fut); + let (safekeepers_generation, safekeepers) = on_sk_res?; + let timeline_info = on_ps_res??; + Ok(TimelineCreateResponseStorcon { + timeline_info, + safekeepers_generation, + safekeepers, }) - .await? } pub(crate) async fn tenant_timeline_archival_config( @@ -7653,6 +7686,33 @@ impl Service { global_observed } + pub(crate) async fn safekeepers_for_new_timeline(&self) -> Result, ApiError> { + let mut all_safekeepers = self + .persistence + .list_safekeepers_with_timeline_count() + .await?; + all_safekeepers.sort_by_key(|sk| sk.2); + let mut sks = Vec::new(); + let mut azs = HashSet::new(); + // TODO: assign to safekeepers with smallest number of timelines + for (sk_id, az_id, _timeline_count) in all_safekeepers.iter() { + if !azs.insert(az_id) { + continue; + } + sks.push(*sk_id); + if sks.len() == 3 { + break; + } + } + if sks.len() == 3 { + return Ok(sks); + } else { + return Err(ApiError::InternalServerError(anyhow!( + "couldn't find three safekeepers in different AZs for new timeline" + ))); + } + } + pub(crate) async fn safekeepers_list( &self, ) -> Result, DatabaseError> {