diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 557b4c8df4..ff1096f265 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -656,6 +656,25 @@ impl Persistence { .get_result(conn) .await?; + let sk_ps_discovery_update_query = diesel::sql_query(r#" + INSERT INTO sk_ps_discovery (tenant_id, shard_number, shard_count, ps_generation, sk_id, ps_id,created_at, last_attmpt_at) + SELECT ( + WITH sk_timeline_attachments AS ( + SELECT DISTINCT tenant_id,timeline_id,unnest(array_cat(sk_set, new_sk_set)) as sk_id FROM timelines + WHERE tenant_id = $1 + ) + SELECT $1, $2, $3, tenant_shards.generation, sk_timeline_attachments.sk_id, tenant_shards.generation_pageserver, $4, NULL + FROM tenant_shards + INNER JOIN sk_timeline_attachments ON tenant_shards.tenant_id = sk_timeline_attachments.tenant_id + ) + "#); + sk_ps_discovery_update_query.bind::(tenant_shard_id.tenant_id.to_string()) + .bind::(tenant_shard_id.shard_number.0 as i32) + .bind::(tenant_shard_id.shard_count.literal() as i32) + .bind::(chrono::Utc::now()) + .execute(conn) + .await?; + Ok(updated) }) }) @@ -965,6 +984,8 @@ impl Persistence { } } + // TODO: schedule sk_to_ps_discovery + Ok(()) }) }) @@ -1327,6 +1348,8 @@ impl Persistence { .execute(conn) .await?; + // TODO + match inserted_updated { 0 => Ok(false), 1 => Ok(true), @@ -2387,3 +2410,16 @@ pub(crate) struct TimelineImportPersistence { pub(crate) timeline_id: String, pub(crate) shard_statuses: serde_json::Value, } + +#[derive(Insertable, AsChangeset, Clone)] +#[diesel(table_name = crate::schema::sk_ps_discovery)] +pub(crate) struct SkPsDiscoveryPersistence { + pub(crate) tenant_id: String, + pub(crate) shard_number: i32, + pub(crate) shard_count: i32, + pub(crate) ps_generation: i32, + pub(crate) sk_id: i64, + pub(crate) ps_id: i64, + pub(crate) created_at: chrono::DateTime, + pub(crate) last_attempt_at: Option>, +} diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index c0f48c670b..eac6776b01 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -490,6 +490,7 @@ pub struct Service { config: Config, persistence: Arc, compute_hook: Arc, + sk_ps_disovery: Arc, result_tx: tokio::sync::mpsc::UnboundedSender, heartbeater_ps: Heartbeater, @@ -1761,6 +1762,8 @@ impl Service { cancel.clone(), ); + let sk_ps_discovery = sk_ps_discovery::spawn(persistence.clone()); + let initial_leadership_status = if config.start_as_candidate { LeadershipStatus::Candidate } else { @@ -1780,6 +1783,7 @@ impl Service { config: config.clone(), persistence, compute_hook: Arc::new(ComputeHook::new(config.clone())?), + sk_ps_disovery, result_tx, heartbeater_ps, heartbeater_sk,