diff --git a/Cargo.lock b/Cargo.lock index d919537818..c3bb7b87b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6690,6 +6690,7 @@ name = "storage_controller" version = "0.1.0" dependencies = [ "anyhow", + "async-stream", "bytes", "camino", "chrono", diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index c41e174d9d..b0d0ea0933 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -15,6 +15,7 @@ testing = [] [dependencies] anyhow.workspace = true +async-stream.workspace = true bytes.workspace = true camino.workspace = true chrono.workspace = true diff --git a/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/down.sql b/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/down.sql index 543fdfe3e2..46c8379fda 100644 --- a/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/down.sql +++ b/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/down.sql @@ -1,5 +1,9 @@ -DROP TRIGGER on_sk_timeline_update_enqueue_sk_ps_discovery; -DROP TRIGGER on_ps_tenant_shard_change_enqueue_sk_ps_discovery; +DROP TRIGGER on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery on "tenant_shards"; +DROP FUNCTION on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery_triggerfn; +DROP TRIGGER on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery on "tenant_shards"; +DROP FUNCTION on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery_triggerfn; +DROP TRIGGER on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery on "tenant_shards"; +DROP FUNCTION on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery_triggerfn; DROP FUNCTION sk_ps_discovery_enqueue_tenant; DROP TABLE "sk_ps_discovery"; diff --git a/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/up.sql b/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/up.sql index 44f14e7c5e..067dba8c11 100644 --- a/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/up.sql +++ b/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/up.sql @@ -10,46 +10,68 @@ CREATE TABLE "sk_ps_discovery"( PRIMARY KEY("tenant_id", "shard_number", "shard_count", "ps_generation", "sk_id") ); -CREATE OR REPLACE FUNCTION sk_ps_discovery_enqueue_tenant() -RETURNS TRIGGER AS $$ +CREATE OR REPLACE FUNCTION sk_ps_discovery_enqueue_tenant(ARG_TENANT_ID VARCHAR) +RETURNS VOID AS $$ BEGIN DELETE FROM sk_ps_discovery - WHERE tenant_id = NEW.tenant_id; + WHERE tenant_id = ARG_TENANT_ID; INSERT INTO sk_ps_discovery (tenant_id, shard_number, shard_count, ps_generation, sk_id, ps_id,created_at,last_attempt_at) - SELECT ( WITH sk_timeline_attachments AS ( SELECT DISTINCT tenant_id,timeline_id,unnest(array_cat(sk_set, new_sk_set)) as sk_id FROM timelines - WHERE tenant_id = NEW.tenant_id + WHERE tenant_id = ARG_TENANT_ID ) - SELECT NEW.tenant_id, NEW.tenant_id, NEW.tenant_id, tenant_shards.generation, sk_timeline_attachments.sk_id, tenant_shards.generation_pageserver, NOW(), NULL + SELECT tenant_shards.tenant_id, tenant_shards.shard_number, tenant_shards.shard_count, tenant_shards.generation, sk_timeline_attachments.sk_id, tenant_shards.generation_pageserver, NOW(), NULL FROM tenant_shards - INNER JOIN sk_timeline_attachments ON tenant_shards.tenant_id = sk_timeline_attachments.tenant_id - ); + INNER JOIN sk_timeline_attachments ON tenant_shards.tenant_id = sk_timeline_attachments.tenant_id; - NOTIFY neon_storcon_sk_ps_discovery, json_build_object( - 'tenant_id', NEW.tenant_id - )::text; - - RETURN NEW; + PERFORM pg_notify('sk_ps_discovery', json_build_object( + 'tenant_id', ARG_TENANT_ID + )::text); END; $$ LANGUAGE plpgsql; -CREATE OR REPLACE TRIGGER on_ps_tenant_shard_change_enqueue_sk_ps_discovery -AFTER INSERT OR UPDATE OR DELETE + +CREATE OR REPLACE FUNCTION on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery_triggerfn() +RETURNS TRIGGER AS $$ +BEGIN + PERFORM sk_ps_discovery_enqueue_tenant(NEW.tenant_id); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; +CREATE OR REPLACE TRIGGER on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery +AFTER INSERT ON "tenant_shards" -WHEN OLD.generation IS DISTINCT FROM NEW.generation -FOR EACH STATEMENT -EXECUTE FUNCTION sk_ps_discovery_enqueue_tenant(); +FOR EACH ROW +EXECUTE FUNCTION on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery_triggerfn(); -CREATE OR REPLACE TRIGGER on_sk_timeline_update_enqueue_sk_ps_discovery -AFTER INSERT OR UPDATE OR DELETE -ON "timelines" -WHEN - OLD.sk_set IS DISTINCT FROM NEW.sk_set - OR - OLD.new_sk_set IS DISTINCT FROM NEW.new_sk_set -FOR EACH STATEMENT -EXECUTE FUNCTION sk_ps_discovery_enqueue_tenant(); +CREATE OR REPLACE FUNCTION on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery_triggerfn() +RETURNS TRIGGER AS $$ +BEGIN + PERFORM sk_ps_discovery_enqueue_tenant(OLD.tenant_id); + RETURN OLD; +END; +$$ LANGUAGE plpgsql; +CREATE OR REPLACE TRIGGER on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery +AFTER DELETE +ON "tenant_shards" +FOR EACH ROW +EXECUTE FUNCTION on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery_triggerfn(); + + +CREATE OR REPLACE FUNCTION on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery_triggerfn() +RETURNS TRIGGER AS $$ +BEGIN + PERFORM sk_ps_discovery_enqueue_tenant(NEW.tenant_id); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; +CREATE OR REPLACE TRIGGER on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery +AFTER UPDATE +ON "tenant_shards" +FOR EACH ROW +EXECUTE FUNCTION on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery_triggerfn(); + +-- TODO: same for `timelines` table diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 2eea2f9d10..f55a3d04de 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -436,7 +436,7 @@ async fn async_main() -> anyhow::Result<()> { }; // Validate that we can connect to the database - Persistence::await_connection(&secrets.database_url, args.db_connect_timeout.into()).await?; + Persistence::await_connection(secrets.database_url.clone(), args.db_connect_timeout.into()).await?; let persistence = Arc::new(Persistence::new(secrets.database_url).await); diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 557b4c8df4..030c0d0c91 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1,6 +1,7 @@ pub(crate) mod split_state; use std::collections::HashMap; use std::io::Write; +use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -15,8 +16,8 @@ use diesel_async::pooled_connection::bb8::Pool; use diesel_async::pooled_connection::{AsyncDieselConnectionManager, ManagerConfig}; use diesel_async::{AsyncPgConnection, RunQueryDsl}; use diesel_migrations::{EmbeddedMigrations, embed_migrations}; -use futures::FutureExt; use futures::future::BoxFuture; +use futures::{FutureExt, StreamExt}; use itertools::Itertools; use pageserver_api::controller_api::{ AvailabilityZone, MetadataHealthRecord, NodeSchedulingPolicy, PlacementPolicy, @@ -31,6 +32,7 @@ use rustls::client::danger::{ServerCertVerified, ServerCertVerifier}; use rustls::crypto::ring; use scoped_futures::ScopedBoxFuture; use serde::{Deserialize, Serialize}; +use tracing::info; use utils::generation::Generation; use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; @@ -40,7 +42,6 @@ use crate::metrics::{ DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY, }; use crate::node::Node; -use crate::schema::sk_ps_discovery; use crate::timeline_import::{ TimelineImport, TimelineImportUpdateError, TimelineImportUpdateFollowUp, }; @@ -75,6 +76,8 @@ const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline. pub struct Persistence { connection_pool: Pool, + connect_tokio_postgres: + Box BoxFuture<'static, TokioPostgresConnectResult>>, } /// Legacy format, for use in JSON compat objects in test environment @@ -178,10 +181,11 @@ impl Persistence { pub async fn new(database_url: String) -> Self { let mut mgr_config = ManagerConfig::default(); - mgr_config.custom_setup = Box::new(establish_connection_rustls); + mgr_config.custom_setup = + Box::new(|config| establish_connection_rustls_diesel(config.to_owned())); let manager = AsyncDieselConnectionManager::::new_with_config( - database_url, + database_url.clone(), mgr_config, ); @@ -198,20 +202,25 @@ impl Persistence { .await .expect("Could not build connection pool"); - Self { connection_pool } + Self { + connection_pool, + connect_tokio_postgres: Box::new(move || { + establish_connection_rustls_tokio_postgres(database_url.clone()) + }), + } } /// A helper for use during startup, where we would like to tolerate concurrent restarts of the /// database and the storage controller, therefore the database might not be available right away pub async fn await_connection( - database_url: &str, + database_url: String, timeout: Duration, ) -> Result<(), diesel::ConnectionError> { let started_at = Instant::now(); - log_postgres_connstr_info(database_url) + log_postgres_connstr_info(&database_url) .map_err(|e| diesel::ConnectionError::InvalidConnectionUrl(e.to_string()))?; loop { - match establish_connection_rustls(database_url).await { + match establish_connection_rustls_diesel(database_url.clone()).await { Ok(_) => { tracing::info!("Connected to database."); return Ok(()); @@ -1822,6 +1831,60 @@ impl Persistence { }) .await } + + pub(crate) async fn listen_sk_ps_discovery( + &self, + ) -> DatabaseResult< + Pin>>>, + > { + let (client, mut conn) = (&self.connect_tokio_postgres)().await?; + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + tokio::spawn(async move { + let mut stream = futures::stream::poll_fn(move |cx| conn.poll_message(cx)); + while let Some(msg) = stream.next().await { + info!(?msg, "async message"); + match msg { + Ok(tokio_postgres::AsyncMessage::Notification(notification)) + if notification.channel() == "sk_ps_discovery" => + { + let Ok(()) = tx.send(notification).await else { + tracing::info!( + "sk_ps_discovery notification rx dropped, stopping async notification processing" + ); + break; + }; + } + Ok(_) => {} + Err(err) => { + tracing::error!(?err, "tokio_postgres poll_message error"); + break; + } + } + } + tracing::info!("sk_ps_discovery notification stream returned None, exiting"); + }); + + client + .batch_execute("LISTEN sk_ps_discovery;") + .await + .expect("TODO"); + + #[derive(serde::Deserialize)] + struct Notification { + tenant_id: TenantId, + } + Ok(Box::pin(async_stream::stream! { + while let Some(msg) = rx.recv().await { + let msg: Result = serde_json::from_str(msg.payload()); + let msg = msg.map(|Notification { tenant_id }| tenant_id ); + yield msg; + } + tracing::info!("sk_ps_discovery channel closed, stopping stream"); + // keep client alive inside the returned sream object, othrwise `conn` ends as soon as we return from this function + drop(client); + })) + } } pub(crate) fn load_certs() -> anyhow::Result> { @@ -1910,21 +1973,40 @@ fn client_config_with_root_certs() -> anyhow::Result { }) } -fn establish_connection_rustls(config: &str) -> BoxFuture> { - let fut = async { +type TokioPostgresConnectResult = ConnectionResult<( + tokio_postgres::Client, + tokio_postgres::Connection< + tokio_postgres::Socket, + tokio_postgres_rustls::RustlsStream, + >, +)>; + +fn establish_connection_rustls_tokio_postgres( + config: String, +) -> BoxFuture<'static, TokioPostgresConnectResult> { + let fut = async move { // We first set up the way we want rustls to work. let rustls_config = client_config_with_root_certs() .map_err(|err| ConnectionError::BadConnection(format!("{err:?}")))?; let tls = tokio_postgres_rustls::MakeRustlsConnect::new(rustls_config); - let (client, conn) = tokio_postgres::connect(config, tls) + let (client, conn) = tokio_postgres::connect(&config, tls) .await .map_err(|e| ConnectionError::BadConnection(e.to_string()))?; - - AsyncPgConnection::try_from_client_and_connection(client, conn).await + Ok((client, conn)) }; fut.boxed() } +fn establish_connection_rustls_diesel( + config: String, +) -> BoxFuture<'static, ConnectionResult> { + async { + let (client, conn) = establish_connection_rustls_tokio_postgres(config).await?; + AsyncPgConnection::try_from_client_and_connection(client, conn).await + } + .boxed() +} + #[cfg_attr(test, test)] fn test_config_debug_censors_password() { let has_pw = diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index c0f48c670b..3d83328a86 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -489,6 +489,7 @@ pub struct Service { inner: Arc>, config: Config, persistence: Arc, + sk_ps_discovery: sk_ps_discovery::ActorClient, compute_hook: Arc, result_tx: tokio::sync::mpsc::UnboundedSender, @@ -1767,6 +1768,8 @@ impl Service { LeadershipStatus::Leader }; + let sk_ps_discovery = sk_ps_discovery::spawn(persistence.clone()); + let this = Arc::new(Self { inner: Arc::new(std::sync::RwLock::new(ServiceState::new( nodes, @@ -1779,6 +1782,7 @@ impl Service { ))), config: config.clone(), persistence, + sk_ps_discovery, compute_hook: Arc::new(ComputeHook::new(config.clone())?), result_tx, heartbeater_ps, diff --git a/storage_controller/src/service/sk_ps_discovery.rs b/storage_controller/src/service/sk_ps_discovery.rs index c46373d0e8..7782bae2bb 100644 --- a/storage_controller/src/service/sk_ps_discovery.rs +++ b/storage_controller/src/service/sk_ps_discovery.rs @@ -1,46 +1,81 @@ -use utils::{ - generation::Generation, - id::{NodeId, TenantId, TimelineId}, - shard::TenantShardId, -}; +use std::sync::Arc; + +use anyhow::Context; +use futures::StreamExt; +use tracing::info; +use utils::id::TenantId; use crate::persistence::Persistence; -pub struct ActorClient {} - -enum Message { - PageserverAttachmentNew { - tenant_shard_id: TenantShardId, - ps_generation: Generation, - ps_id: NodeId, - }, - PageserverAttachmentRemoved { - tenant_shard_id: TenantShardId, - ps_generation: Generation, - ps_id: NodeId, - }, - SafekeeperConfigChange { - tenant_id: TenantId, - timeline_id: TimelineId, - safekeeper_ids: Vec, - }, - SafekeeperDelete { - safekeeper_id: NodeId, - }, +pub struct ActorClient { + tx: tokio::sync::mpsc::UnboundedSender, } -struct Actor {} +struct Actor { + persistence: Arc, + rx: tokio::sync::mpsc::UnboundedReceiver, +} -pub async fn spawn(persistence: Arc) -> ActorClient { - let actor = Actor { persistence }; +#[derive(Debug)] +enum Message {} + +pub fn spawn(persistence: Arc) -> ActorClient { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let actor = Actor { persistence, rx }; tokio::spawn(actor.run()); - ActorClient {} + ActorClient { tx } } impl ActorClient {} impl Actor { - async fn run(self) { - loop {} + async fn run(mut self) { + loop { + match self.run0().await { + Ok(()) => { + info!("sk_ps_discovery actor exiting after shutdown signal observed"); + return; + } + Err(err) => { + tracing::error!( + ?err, + "sk_ps_discovery actor encountered an error, restarting after backoff" + ); + // TODO: proper backoff + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + } + } + + async fn run0(&mut self) -> anyhow::Result<()> { + let mut subscription = self + .persistence + .listen_sk_ps_discovery() + .await + .context("listen to sk_ps_discovery")?; + + loop { + tokio::select! { + maybe_res = subscription.next() => { + match maybe_res { + None => { + anyhow::bail!("subscription should never end"); + } + Some(Ok(tenant_id)) => { + let tenant_id: TenantId = tenant_id; + info!(?tenant_id, "notify for tenant_id"); + } + Some(Err(err)) => { + let err: serde_json::Error = err; + anyhow::bail!("incorrect notification format: {err:?}"); // FIXME repeat message in error so it can be debugged ? + } + } + } + msg = self.rx.recv() => { + todo!("{msg:?}"); + } + } + } } }