From afa35eea871098c4622e97616e2a74cc8c1a1db4 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 30 May 2025 20:36:50 +0200 Subject: [PATCH] trigger now only does insertions; app loop will do cleanup; prepare API for cleanup --- libs/safekeeper_api/src/models.rs | 10 +- safekeeper/client/src/mgmt_api.rs | 8 +- .../down.sql | 2 +- .../2025-05-26-105843_sk_ps_discovery/up.sql | 33 +++---- storage_controller/src/persistence.rs | 53 +++++++++- storage_controller/src/safekeeper_client.rs | 12 +-- storage_controller/src/schema.rs | 2 + .../src/service/sk_ps_discovery.rs | 98 ++++++++----------- 8 files changed, 122 insertions(+), 96 deletions(-) diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 9460936084..7d473e8934 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -311,12 +311,14 @@ pub struct PullTimelineResponse { // TODO: add more fields? } -#[derive(Debug, Serialize, Deserialize)] -pub struct TenantShardPageserverAttachments { - pub attachments: Vec, +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(tag = "action")] +pub enum TenantShardPageserverAttachmentChange { + Attach(TenantShardPageserverAttachment), + Detach(TenantShardPageserverAttachment), } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct TenantShardPageserverAttachment { pub ps_id: NodeId, pub generation: Generation, diff --git a/safekeeper/client/src/mgmt_api.rs b/safekeeper/client/src/mgmt_api.rs index 0a52813e60..8068b9bb4d 100644 --- a/safekeeper/client/src/mgmt_api.rs +++ b/safekeeper/client/src/mgmt_api.rs @@ -9,7 +9,7 @@ use http_utils::error::HttpErrorBody; use reqwest::{IntoUrl, Method, StatusCode}; use safekeeper_api::models::{ self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, - TenantShardPageserverAttachments, TimelineCreateRequest, TimelineStatus, + TenantShardPageserverAttachmentChange, TimelineCreateRequest, TimelineStatus, }; use utils::id::{NodeId, TenantId, TimelineId}; use utils::logging::SecretString; @@ -190,16 +190,16 @@ impl Client { resp.json().await.map_err(Error::ReceiveBody) } - pub async fn put_tenant_shard_pageserver_attachments( + pub async fn post_tenant_shard_pageserver_attachments( &self, tenant_shard_id: TenantShardId, - attachments: TenantShardPageserverAttachments, + body: TenantShardPageserverAttachmentChange, ) -> Result<()> { let uri = format!( "{}/v1/tenant/{tenant_shard_id}/pageserver_attachments", self.mgmt_api_endpoint ); - let resp = self.put(uri, attachments).await?; + let resp = self.post(uri, body).await?; resp.json().await.map_err(Error::ReceiveBody) } diff --git a/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/down.sql b/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/down.sql index b4983adc04..c601533f43 100644 --- a/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/down.sql +++ b/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/down.sql @@ -10,7 +10,7 @@ DROP TRIGGER on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery on "tenant_shards DROP FUNCTION on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery_triggerfn; DROP TRIGGER on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery on "tenant_shards"; DROP FUNCTION on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery_triggerfn; -DROP FUNCTION sk_ps_discovery_enqueue_tenant; +DROP FUNCTION IF EXISTS sk_ps_discovery_enqueue_attachment_create; DROP TABLE "sk_ps_discovery"; diff --git a/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/up.sql b/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/up.sql index 6742b063bc..b68891feea 100644 --- a/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/up.sql +++ b/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/up.sql @@ -4,34 +4,31 @@ CREATE TABLE "sk_ps_discovery"( "shard_count" INT4 NOT NULL, "ps_generation" INT4 NOT NULL, "sk_id" INT8 NOT NULL REFERENCES "safekeepers"("id") ON DELETE CASCADE, -- more efficient that trigger on "safekeepers" + "intent_state" VARCHAR NOT NULL, -- attached,detached "ps_id" INT8 NOT NULL REFERENCES "nodes"("node_id") ON DELETE CASCADE, -- more efficient that trigger on "nodes" "created_at" TIMESTAMPTZ NOT NULL, "retries" INT4 NOT NULL DEFAULT 0, "last_retry_at" TIMESTAMPTZ, + "acknowledged_at" TIMESTAMPTZ, PRIMARY KEY("tenant_id", "shard_number", "shard_count", "ps_generation", "sk_id") ); -CREATE OR REPLACE FUNCTION sk_ps_discovery_enqueue_tenant(ARG_TENANT_ID VARCHAR) +CREATE OR REPLACE FUNCTION sk_ps_discovery_enqueue_attachment_create(ARG_TENANT_ID VARCHAR) RETURNS VOID AS $$ BEGIN - - DELETE FROM sk_ps_discovery - WHERE tenant_id = ARG_TENANT_ID; - - INSERT INTO sk_ps_discovery (tenant_id, shard_number, shard_count, ps_generation, sk_id, ps_id,created_at) - WITH sk_timeline_attachments AS ( + INSERT INTO sk_ps_discovery (tenant_id, shard_number, shard_count, ps_generation, sk_id, intent_state, ps_id, created_at) + WITH intent_attachments AS ( SELECT DISTINCT tenant_id,unnest(array_cat(sk_set, new_sk_set)) as sk_id FROM timelines WHERE tenant_id = ARG_TENANT_ID AND timelines.deleted_at IS NULL ) - SELECT tenant_shards.tenant_id, tenant_shards.shard_number, tenant_shards.shard_count, tenant_shards.generation, sk_timeline_attachments.sk_id, tenant_shards.generation_pageserver, NOW() + SELECT tenant_shards.tenant_id, tenant_shards.shard_number, tenant_shards.shard_count, + tenant_shards.generation, intent_attachments.sk_id, 'attached', tenant_shards.generation_pageserver, NOW() FROM tenant_shards - INNER JOIN sk_timeline_attachments ON tenant_shards.tenant_id = sk_timeline_attachments.tenant_id; - - -- TODO: how do no-longer-existing attachment infos get cleaned up on the SKs? - -- I think we do need a table of tenant timeline shard attachments, we flag as "deleted" here, and background loop processes it. + INNER JOIN intent_attachments ON tenant_shards.tenant_id = intent_attachments.tenant_id + ON CONFLICT DO NOTHING; -- the first trigger creates the attachment, all others are identical because tenant shard generations are monotonic PERFORM pg_notify('sk_ps_discovery', json_build_object( 'tenant_id', ARG_TENANT_ID @@ -44,7 +41,7 @@ $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery_triggerfn() RETURNS TRIGGER AS $$ BEGIN - PERFORM sk_ps_discovery_enqueue_tenant(NEW.tenant_id); + PERFORM sk_ps_discovery_enqueue_attachment_create(NEW.tenant_id); RETURN NEW; END; $$ LANGUAGE plpgsql; @@ -58,7 +55,7 @@ EXECUTE FUNCTION on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery_triggerfn(); CREATE OR REPLACE FUNCTION on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery_triggerfn() RETURNS TRIGGER AS $$ BEGIN - PERFORM sk_ps_discovery_enqueue_tenant(OLD.tenant_id); + PERFORM sk_ps_discovery_enqueue_attachment_create(OLD.tenant_id); RETURN OLD; END; $$ LANGUAGE plpgsql; @@ -72,7 +69,7 @@ EXECUTE FUNCTION on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery_triggerfn(); CREATE OR REPLACE FUNCTION on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery_triggerfn() RETURNS TRIGGER AS $$ BEGIN - PERFORM sk_ps_discovery_enqueue_tenant(NEW.tenant_id); + PERFORM sk_ps_discovery_enqueue_attachment_create(NEW.tenant_id); RETURN NEW; END; $$ LANGUAGE plpgsql; @@ -87,7 +84,7 @@ EXECUTE FUNCTION on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery_triggerfn(); CREATE OR REPLACE FUNCTION on_timelines_INSERT_enqueue_sk_ps_discovery_triggerfn() RETURNS TRIGGER AS $$ BEGIN - PERFORM sk_ps_discovery_enqueue_tenant(NEW.tenant_id); + PERFORM sk_ps_discovery_enqueue_attachment_create(NEW.tenant_id); RETURN NEW; END; $$ LANGUAGE plpgsql; @@ -100,7 +97,7 @@ EXECUTE FUNCTION on_timelines_INSERT_enqueue_sk_ps_discovery_triggerfn(); CREATE OR REPLACE FUNCTION on_timelines_DELETE_enqueue_sk_ps_discovery_triggerfn() RETURNS TRIGGER AS $$ BEGIN - PERFORM sk_ps_discovery_enqueue_tenant(OLD.tenant_id); + PERFORM sk_ps_discovery_enqueue_attachment_create(OLD.tenant_id); RETURN OLD; END; $$ LANGUAGE plpgsql; @@ -113,7 +110,7 @@ EXECUTE FUNCTION on_timelines_DELETE_enqueue_sk_ps_discovery_triggerfn(); CREATE OR REPLACE FUNCTION on_timelines_UPDATE_enqueue_sk_ps_discovery_triggerfn() RETURNS TRIGGER AS $$ BEGIN - PERFORM sk_ps_discovery_enqueue_tenant(NEW.tenant_id); + PERFORM sk_ps_discovery_enqueue_attachment_create(NEW.tenant_id); RETURN NEW; END; $$ LANGUAGE plpgsql; diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index f0c7a7b65e..037818c4e0 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1905,12 +1905,14 @@ impl Persistence { pub(crate) async fn update_sk_ps_discovery_attempt( &self, pk: SkPsDiscoveryPersistencePk, + intent_state: String, update: Result<(), ()>, ) -> DatabaseResult<()> { use crate::schema::sk_ps_discovery::dsl; self.with_measured_conn(DatabaseOperation::UpdateSkPsDiscoveryAttempt, move |conn| { let pk = pk.clone(); + let intent_state = intent_state.clone(); Box::pin(async move { match update { Ok(()) => { @@ -1921,14 +1923,23 @@ impl Persistence { ps_generation, sk_id, } = pk; - diesel::delete(dsl::sk_ps_discovery) + let nrows = diesel::delete(dsl::sk_ps_discovery) + // primary key .filter(dsl::tenant_id.eq(tenant_id)) .filter(dsl::shard_number.eq(shard_number)) .filter(dsl::shard_count.eq(shard_count)) .filter(dsl::ps_generation.eq(ps_generation)) .filter(dsl::sk_id.eq(sk_id)) - .execute(conn) // TODO: check update count? + // intent_state could have changed beneath us (split brain or concurrent state gc) + // TODO: this could also just be a globally monotonic sequence number, maybe easier to reason about? + .filter(dsl::intent_state.eq(intent_state)) + .execute(conn) .await?; + if nrows != 1 { + return Err(DatabaseError::Logical(format!( + "unexpected number of deletes: {nrows}" + ))); + } } Err(_) => { let SkPsDiscoveryPersistencePk { @@ -1939,18 +1950,28 @@ impl Persistence { sk_id, } = pk; - diesel::update(dsl::sk_ps_discovery) + let nrows = diesel::update(dsl::sk_ps_discovery) + // primary key .filter(dsl::tenant_id.eq(tenant_id)) .filter(dsl::shard_number.eq(shard_number)) .filter(dsl::shard_count.eq(shard_count)) .filter(dsl::ps_generation.eq(ps_generation)) .filter(dsl::sk_id.eq(sk_id)) + // intent_state could have changed beneath us (split brain or concurrent state gc) + // TODO: this could also just be a globally monotonic sequence number, maybe easier to reason about? + .filter(dsl::intent_state.eq(intent_state)) + // action: .set(( dsl::retries.eq(dsl::retries.add(1)), // XXX: in split-brain situation we would bump twice... dsl::last_retry_at.eq(diesel::dsl::now), )) .execute(conn) // TODO: check update count? .await?; + if nrows != 1 { + return Err(DatabaseError::Logical(format!( + "unexpected number of updates: {nrows}" + ))); + } } } Ok(()) @@ -2543,7 +2564,7 @@ pub(crate) struct TimelineImportPersistence { pub(crate) shard_statuses: serde_json::Value, } -#[derive(Insertable, AsChangeset, Selectable, Clone)] +#[derive(Insertable, AsChangeset, Selectable, Clone, PartialEq, Eq, Hash, Debug)] #[diesel(table_name = crate::schema::sk_ps_discovery)] pub(crate) struct SkPsDiscoveryPersistencePk { pub(crate) tenant_id: String, @@ -2561,10 +2582,12 @@ pub(crate) struct SkPsDiscoveryPersistence { pub(crate) shard_count: i32, pub(crate) ps_generation: i32, pub(crate) sk_id: i64, + pub(crate) intent_state: String, pub(crate) ps_id: i64, pub(crate) created_at: chrono::DateTime, pub(crate) retries: i32, pub(crate) last_retry_at: Option>, + pub(crate) acknowledged_at: Option>, } impl SkPsDiscoveryPersistence { @@ -2575,4 +2598,26 @@ impl SkPsDiscoveryPersistence { shard_count: ShardCount::new(self.shard_count as u8), }) } + pub(crate) fn primary_key(&self) -> SkPsDiscoveryPersistencePk { + let SkPsDiscoveryPersistence { + tenant_id, + shard_number, + shard_count, + ps_generation, + sk_id, + intent_state: _, + ps_id: _, + created_at: _, + retries: _, + last_retry_at: _, + acknowledged_at: _, + } = self; + SkPsDiscoveryPersistencePk { + tenant_id: tenant_id.clone(), + shard_number: *shard_number, + shard_count: *shard_count, + ps_generation: *ps_generation, + sk_id: *sk_id, + } + } } diff --git a/storage_controller/src/safekeeper_client.rs b/storage_controller/src/safekeeper_client.rs index 7129d224e4..a714a96e52 100644 --- a/storage_controller/src/safekeeper_client.rs +++ b/storage_controller/src/safekeeper_client.rs @@ -1,6 +1,6 @@ use safekeeper_api::models::{ self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, - TenantShardPageserverAttachments, TimelineCreateRequest, + TenantShardPageserverAttachmentChange, TimelineCreateRequest, }; use safekeeper_client::mgmt_api::{Client, Result}; use utils::id::{NodeId, TenantId, TimelineId}; @@ -167,17 +167,17 @@ impl SafekeeperClient { ) } - pub async fn put_tenant_shard_pageserver_attachments( + pub async fn post_tenant_shard_pageserver_attachments( &self, tenant_shard_id: TenantShardId, - attachments: TenantShardPageserverAttachments, + body: TenantShardPageserverAttachmentChange, ) -> Result<()> { measured_request!( - "put_tenant_shard_pageserver_attachments", - crate::metrics::Method::Put, + "post_tenant_shard_pageserver_attachments", + crate::metrics::Method::Post, &self.node_id_label, self.inner - .put_tenant_shard_pageserver_attachments(tenant_shard_id, attachments) + .post_tenant_shard_pageserver_attachments(tenant_shard_id, body) .await ) } diff --git a/storage_controller/src/schema.rs b/storage_controller/src/schema.rs index c828511271..1e1d40ea77 100644 --- a/storage_controller/src/schema.rs +++ b/storage_controller/src/schema.rs @@ -67,10 +67,12 @@ diesel::table! { shard_count -> Int4, ps_generation -> Int4, sk_id -> Int8, + intent_state -> Varchar, ps_id -> Int8, created_at -> Timestamptz, retries -> Int4, last_retry_at -> Nullable, + acknowledged_at -> Nullable, } } diff --git a/storage_controller/src/service/sk_ps_discovery.rs b/storage_controller/src/service/sk_ps_discovery.rs index bac8dec07a..3f7f09588b 100644 --- a/storage_controller/src/service/sk_ps_discovery.rs +++ b/storage_controller/src/service/sk_ps_discovery.rs @@ -6,7 +6,9 @@ use std::{ use anyhow::Context; use futures::{StreamExt, stream::FuturesUnordered}; -use safekeeper_api::models::{TenantShardPageserverAttachment, TenantShardPageserverAttachments}; +use safekeeper_api::models::{ + TenantShardPageserverAttachment, TenantShardPageserverAttachmentChange, +}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::{Instrument, Span, error, info, info_span}; @@ -14,12 +16,11 @@ use utils::{ generation::Generation, id::{NodeId, TenantId}, logging::SecretString, - shard::TenantShardId, }; use crate::{ heartbeater::SafekeeperState, - persistence::{Persistence, SkPsDiscoveryPersistence, SkPsDiscoveryPersistencePk}, + persistence::{Persistence, SkPsDiscoveryPersistence}, }; use super::Service; @@ -68,27 +69,11 @@ impl Actor { let mut sync_full_ticker = tokio::time::interval(std::time::Duration::from_secs(5)); - #[derive(PartialEq, Eq, Hash, Debug)] - struct TaskKey { - tenant_shard_id: TenantShardId, - ps_generation: Generation, - sk_id: NodeId, - } struct Task { work: SkPsDiscoveryPersistence, cancel: CancellationToken, join_handle: Option>, } - impl<'a> TryFrom<&'a SkPsDiscoveryPersistence> for TaskKey { - type Error = hex::FromHexError; - fn try_from(value: &'a SkPsDiscoveryPersistence) -> Result { - Ok(Self { - tenant_shard_id: value.tenant_shard_id()?, - ps_generation: Generation::new(value.ps_generation as u32), - sk_id: NodeId(value.sk_id as u64), - }) - } - } let mut tasks = HashMap::new(); loop { @@ -123,19 +108,19 @@ impl Actor { .context("get_all_sk_ps_discovery_work")? .into_iter() .map(|work: SkPsDiscoveryPersistence| { - anyhow::Ok(( - TaskKey::try_from(&work)?, + ( + work.primary_key(), Task { work, cancel: CancellationToken::new(), join_handle: None, }, - )) + ) }) - .collect::, _>>()?; + .collect::>(); // Carry over ongoing tasks - let mut ongoing_wait = FuturesUnordered::new(); + let mut cancelled_wait = FuturesUnordered::new(); for ( task_key, Task { @@ -156,26 +141,19 @@ impl Actor { if *planned_persistence == ongoing_persistence { *planned_jh = join_handle; *planned_cancel = cancel; - } else { - match join_handle { - Some(jh) => { - cancel.cancel(); - ongoing_wait.push(jh); - } - None => (), - } + continue; } } - hash_map::Entry::Vacant(_) => match join_handle { - Some(jh) => { - cancel.cancel(); - ongoing_wait.push(jh); - } - None => (), - }, + hash_map::Entry::Vacant(_) => (), } + cancel.cancel(); + cancelled_wait.push(async move { + if let Some(jh) = join_handle { + let _ = jh.await; + } + }); } - while let Some(_) = ongoing_wait.next().await {} + while let Some(_) = cancelled_wait.next().await {} tasks = new_tasks; // Kick off new tasks @@ -222,13 +200,8 @@ impl DeliveryAttempt { let res = self .persistence .update_sk_ps_discovery_attempt( - SkPsDiscoveryPersistencePk { - tenant_id: self.work.tenant_id, - shard_number: self.work.shard_number, - shard_count: self.work.shard_count, - ps_generation: self.work.ps_generation, - sk_id: self.work.sk_id, - }, + self.work.primary_key(), + self.work.intent_state.clone(), res.map_err(|_| ()), ) .await; @@ -247,20 +220,27 @@ impl DeliveryAttempt { anyhow::bail!("safekeeper is offline"); } } + + let body = { + let val = TenantShardPageserverAttachment { + ps_id: NodeId(self.work.ps_id as u64), + generation: Generation::new(self.work.ps_generation as u32), + }; + match self.work.intent_state.as_str() { + "attached" => TenantShardPageserverAttachmentChange::Attach(val), + "detached" => TenantShardPageserverAttachmentChange::Detach(val), + x => anyhow::bail!("unknown intent state {x:?}"), + } + }; let tenant_shard_id = self.work.tenant_shard_id()?; sk.with_client_retries( - |client| async move { - client - .put_tenant_shard_pageserver_attachments( - tenant_shard_id, - TenantShardPageserverAttachments { - attachments: vec![TenantShardPageserverAttachment { - ps_id: NodeId(self.work.ps_id as u64), - generation: Generation::new(self.work.ps_generation as u32), - }], - }, - ) - .await + |client| { + let body = body.clone(); + async move { + client + .post_tenant_shard_pageserver_attachments(tenant_shard_id, body) + .await + } }, &self.http_client, &self