finish prototyping event changes via triggers

This commit is contained in:
Christian Schwarz
2025-05-27 18:28:22 +02:00
parent a6bd4a3be6
commit 3836ee8539
8 changed files with 224 additions and 75 deletions

1
Cargo.lock generated
View File

@@ -6690,6 +6690,7 @@ name = "storage_controller"
version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"bytes",
"camino",
"chrono",

View File

@@ -15,6 +15,7 @@ testing = []
[dependencies]
anyhow.workspace = true
async-stream.workspace = true
bytes.workspace = true
camino.workspace = true
chrono.workspace = true

View File

@@ -1,5 +1,9 @@
DROP TRIGGER on_sk_timeline_update_enqueue_sk_ps_discovery;
DROP TRIGGER on_ps_tenant_shard_change_enqueue_sk_ps_discovery;
DROP TRIGGER on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery on "tenant_shards";
DROP FUNCTION on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery_triggerfn;
DROP TRIGGER on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery on "tenant_shards";
DROP FUNCTION on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery_triggerfn;
DROP TRIGGER on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery on "tenant_shards";
DROP FUNCTION on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery_triggerfn;
DROP FUNCTION sk_ps_discovery_enqueue_tenant;
DROP TABLE "sk_ps_discovery";

View File

@@ -10,46 +10,68 @@ CREATE TABLE "sk_ps_discovery"(
PRIMARY KEY("tenant_id", "shard_number", "shard_count", "ps_generation", "sk_id")
);
CREATE OR REPLACE FUNCTION sk_ps_discovery_enqueue_tenant()
RETURNS TRIGGER AS $$
CREATE OR REPLACE FUNCTION sk_ps_discovery_enqueue_tenant(ARG_TENANT_ID VARCHAR)
RETURNS VOID AS $$
BEGIN
DELETE FROM sk_ps_discovery
WHERE tenant_id = NEW.tenant_id;
WHERE tenant_id = ARG_TENANT_ID;
INSERT INTO sk_ps_discovery (tenant_id, shard_number, shard_count, ps_generation, sk_id, ps_id,created_at,last_attempt_at)
SELECT (
WITH sk_timeline_attachments AS (
SELECT DISTINCT tenant_id,timeline_id,unnest(array_cat(sk_set, new_sk_set)) as sk_id FROM timelines
WHERE tenant_id = NEW.tenant_id
WHERE tenant_id = ARG_TENANT_ID
)
SELECT NEW.tenant_id, NEW.tenant_id, NEW.tenant_id, tenant_shards.generation, sk_timeline_attachments.sk_id, tenant_shards.generation_pageserver, NOW(), NULL
SELECT tenant_shards.tenant_id, tenant_shards.shard_number, tenant_shards.shard_count, tenant_shards.generation, sk_timeline_attachments.sk_id, tenant_shards.generation_pageserver, NOW(), NULL
FROM tenant_shards
INNER JOIN sk_timeline_attachments ON tenant_shards.tenant_id = sk_timeline_attachments.tenant_id
);
INNER JOIN sk_timeline_attachments ON tenant_shards.tenant_id = sk_timeline_attachments.tenant_id;
NOTIFY neon_storcon_sk_ps_discovery, json_build_object(
'tenant_id', NEW.tenant_id
)::text;
RETURN NEW;
PERFORM pg_notify('sk_ps_discovery', json_build_object(
'tenant_id', ARG_TENANT_ID
)::text);
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER on_ps_tenant_shard_change_enqueue_sk_ps_discovery
AFTER INSERT OR UPDATE OR DELETE
CREATE OR REPLACE FUNCTION on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery_triggerfn()
RETURNS TRIGGER AS $$
BEGIN
PERFORM sk_ps_discovery_enqueue_tenant(NEW.tenant_id);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery
AFTER INSERT
ON "tenant_shards"
WHEN OLD.generation IS DISTINCT FROM NEW.generation
FOR EACH STATEMENT
EXECUTE FUNCTION sk_ps_discovery_enqueue_tenant();
FOR EACH ROW
EXECUTE FUNCTION on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery_triggerfn();
CREATE OR REPLACE TRIGGER on_sk_timeline_update_enqueue_sk_ps_discovery
AFTER INSERT OR UPDATE OR DELETE
ON "timelines"
WHEN
OLD.sk_set IS DISTINCT FROM NEW.sk_set
OR
OLD.new_sk_set IS DISTINCT FROM NEW.new_sk_set
FOR EACH STATEMENT
EXECUTE FUNCTION sk_ps_discovery_enqueue_tenant();
CREATE OR REPLACE FUNCTION on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery_triggerfn()
RETURNS TRIGGER AS $$
BEGIN
PERFORM sk_ps_discovery_enqueue_tenant(OLD.tenant_id);
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery
AFTER DELETE
ON "tenant_shards"
FOR EACH ROW
EXECUTE FUNCTION on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery_triggerfn();
CREATE OR REPLACE FUNCTION on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery_triggerfn()
RETURNS TRIGGER AS $$
BEGIN
PERFORM sk_ps_discovery_enqueue_tenant(NEW.tenant_id);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery
AFTER UPDATE
ON "tenant_shards"
FOR EACH ROW
EXECUTE FUNCTION on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery_triggerfn();
-- TODO: same for `timelines` table

View File

@@ -436,7 +436,7 @@ async fn async_main() -> anyhow::Result<()> {
};
// Validate that we can connect to the database
Persistence::await_connection(&secrets.database_url, args.db_connect_timeout.into()).await?;
Persistence::await_connection(secrets.database_url.clone(), args.db_connect_timeout.into()).await?;
let persistence = Arc::new(Persistence::new(secrets.database_url).await);

View File

@@ -1,6 +1,7 @@
pub(crate) mod split_state;
use std::collections::HashMap;
use std::io::Write;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -15,8 +16,8 @@ use diesel_async::pooled_connection::bb8::Pool;
use diesel_async::pooled_connection::{AsyncDieselConnectionManager, ManagerConfig};
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use diesel_migrations::{EmbeddedMigrations, embed_migrations};
use futures::FutureExt;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use pageserver_api::controller_api::{
AvailabilityZone, MetadataHealthRecord, NodeSchedulingPolicy, PlacementPolicy,
@@ -31,6 +32,7 @@ use rustls::client::danger::{ServerCertVerified, ServerCertVerifier};
use rustls::crypto::ring;
use scoped_futures::ScopedBoxFuture;
use serde::{Deserialize, Serialize};
use tracing::info;
use utils::generation::Generation;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -40,7 +42,6 @@ use crate::metrics::{
DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY,
};
use crate::node::Node;
use crate::schema::sk_ps_discovery;
use crate::timeline_import::{
TimelineImport, TimelineImportUpdateError, TimelineImportUpdateFollowUp,
};
@@ -75,6 +76,8 @@ const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
/// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
pub struct Persistence {
connection_pool: Pool<AsyncPgConnection>,
connect_tokio_postgres:
Box<dyn Sync + Send + 'static + Fn() -> BoxFuture<'static, TokioPostgresConnectResult>>,
}
/// Legacy format, for use in JSON compat objects in test environment
@@ -178,10 +181,11 @@ impl Persistence {
pub async fn new(database_url: String) -> Self {
let mut mgr_config = ManagerConfig::default();
mgr_config.custom_setup = Box::new(establish_connection_rustls);
mgr_config.custom_setup =
Box::new(|config| establish_connection_rustls_diesel(config.to_owned()));
let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new_with_config(
database_url,
database_url.clone(),
mgr_config,
);
@@ -198,20 +202,25 @@ impl Persistence {
.await
.expect("Could not build connection pool");
Self { connection_pool }
Self {
connection_pool,
connect_tokio_postgres: Box::new(move || {
establish_connection_rustls_tokio_postgres(database_url.clone())
}),
}
}
/// A helper for use during startup, where we would like to tolerate concurrent restarts of the
/// database and the storage controller, therefore the database might not be available right away
pub async fn await_connection(
database_url: &str,
database_url: String,
timeout: Duration,
) -> Result<(), diesel::ConnectionError> {
let started_at = Instant::now();
log_postgres_connstr_info(database_url)
log_postgres_connstr_info(&database_url)
.map_err(|e| diesel::ConnectionError::InvalidConnectionUrl(e.to_string()))?;
loop {
match establish_connection_rustls(database_url).await {
match establish_connection_rustls_diesel(database_url.clone()).await {
Ok(_) => {
tracing::info!("Connected to database.");
return Ok(());
@@ -1822,6 +1831,60 @@ impl Persistence {
})
.await
}
pub(crate) async fn listen_sk_ps_discovery(
&self,
) -> DatabaseResult<
Pin<Box<dyn Send + 'static + futures::Stream<Item = Result<TenantId, serde_json::Error>>>>,
> {
let (client, mut conn) = (&self.connect_tokio_postgres)().await?;
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
let mut stream = futures::stream::poll_fn(move |cx| conn.poll_message(cx));
while let Some(msg) = stream.next().await {
info!(?msg, "async message");
match msg {
Ok(tokio_postgres::AsyncMessage::Notification(notification))
if notification.channel() == "sk_ps_discovery" =>
{
let Ok(()) = tx.send(notification).await else {
tracing::info!(
"sk_ps_discovery notification rx dropped, stopping async notification processing"
);
break;
};
}
Ok(_) => {}
Err(err) => {
tracing::error!(?err, "tokio_postgres poll_message error");
break;
}
}
}
tracing::info!("sk_ps_discovery notification stream returned None, exiting");
});
client
.batch_execute("LISTEN sk_ps_discovery;")
.await
.expect("TODO");
#[derive(serde::Deserialize)]
struct Notification {
tenant_id: TenantId,
}
Ok(Box::pin(async_stream::stream! {
while let Some(msg) = rx.recv().await {
let msg: Result<Notification, _> = serde_json::from_str(msg.payload());
let msg = msg.map(|Notification { tenant_id }| tenant_id );
yield msg;
}
tracing::info!("sk_ps_discovery channel closed, stopping stream");
// keep client alive inside the returned sream object, othrwise `conn` ends as soon as we return from this function
drop(client);
}))
}
}
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
@@ -1910,21 +1973,40 @@ fn client_config_with_root_certs() -> anyhow::Result<rustls::ClientConfig> {
})
}
fn establish_connection_rustls(config: &str) -> BoxFuture<ConnectionResult<AsyncPgConnection>> {
let fut = async {
type TokioPostgresConnectResult = ConnectionResult<(
tokio_postgres::Client,
tokio_postgres::Connection<
tokio_postgres::Socket,
tokio_postgres_rustls::RustlsStream<tokio_postgres::Socket>,
>,
)>;
fn establish_connection_rustls_tokio_postgres(
config: String,
) -> BoxFuture<'static, TokioPostgresConnectResult> {
let fut = async move {
// We first set up the way we want rustls to work.
let rustls_config = client_config_with_root_certs()
.map_err(|err| ConnectionError::BadConnection(format!("{err:?}")))?;
let tls = tokio_postgres_rustls::MakeRustlsConnect::new(rustls_config);
let (client, conn) = tokio_postgres::connect(config, tls)
let (client, conn) = tokio_postgres::connect(&config, tls)
.await
.map_err(|e| ConnectionError::BadConnection(e.to_string()))?;
AsyncPgConnection::try_from_client_and_connection(client, conn).await
Ok((client, conn))
};
fut.boxed()
}
fn establish_connection_rustls_diesel(
config: String,
) -> BoxFuture<'static, ConnectionResult<AsyncPgConnection>> {
async {
let (client, conn) = establish_connection_rustls_tokio_postgres(config).await?;
AsyncPgConnection::try_from_client_and_connection(client, conn).await
}
.boxed()
}
#[cfg_attr(test, test)]
fn test_config_debug_censors_password() {
let has_pw =

View File

@@ -489,6 +489,7 @@ pub struct Service {
inner: Arc<std::sync::RwLock<ServiceState>>,
config: Config,
persistence: Arc<Persistence>,
sk_ps_discovery: sk_ps_discovery::ActorClient,
compute_hook: Arc<ComputeHook>,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
@@ -1767,6 +1768,8 @@ impl Service {
LeadershipStatus::Leader
};
let sk_ps_discovery = sk_ps_discovery::spawn(persistence.clone());
let this = Arc::new(Self {
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
nodes,
@@ -1779,6 +1782,7 @@ impl Service {
))),
config: config.clone(),
persistence,
sk_ps_discovery,
compute_hook: Arc::new(ComputeHook::new(config.clone())?),
result_tx,
heartbeater_ps,

View File

@@ -1,46 +1,81 @@
use utils::{
generation::Generation,
id::{NodeId, TenantId, TimelineId},
shard::TenantShardId,
};
use std::sync::Arc;
use anyhow::Context;
use futures::StreamExt;
use tracing::info;
use utils::id::TenantId;
use crate::persistence::Persistence;
pub struct ActorClient {}
enum Message {
PageserverAttachmentNew {
tenant_shard_id: TenantShardId,
ps_generation: Generation,
ps_id: NodeId,
},
PageserverAttachmentRemoved {
tenant_shard_id: TenantShardId,
ps_generation: Generation,
ps_id: NodeId,
},
SafekeeperConfigChange {
tenant_id: TenantId,
timeline_id: TimelineId,
safekeeper_ids: Vec<NodeId>,
},
SafekeeperDelete {
safekeeper_id: NodeId,
},
pub struct ActorClient {
tx: tokio::sync::mpsc::UnboundedSender<Message>,
}
struct Actor {}
struct Actor {
persistence: Arc<Persistence>,
rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
}
pub async fn spawn(persistence: Arc<Persistence>) -> ActorClient {
let actor = Actor { persistence };
#[derive(Debug)]
enum Message {}
pub fn spawn(persistence: Arc<Persistence>) -> ActorClient {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let actor = Actor { persistence, rx };
tokio::spawn(actor.run());
ActorClient {}
ActorClient { tx }
}
impl ActorClient {}
impl Actor {
async fn run(self) {
loop {}
async fn run(mut self) {
loop {
match self.run0().await {
Ok(()) => {
info!("sk_ps_discovery actor exiting after shutdown signal observed");
return;
}
Err(err) => {
tracing::error!(
?err,
"sk_ps_discovery actor encountered an error, restarting after backoff"
);
// TODO: proper backoff
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
}
}
async fn run0(&mut self) -> anyhow::Result<()> {
let mut subscription = self
.persistence
.listen_sk_ps_discovery()
.await
.context("listen to sk_ps_discovery")?;
loop {
tokio::select! {
maybe_res = subscription.next() => {
match maybe_res {
None => {
anyhow::bail!("subscription should never end");
}
Some(Ok(tenant_id)) => {
let tenant_id: TenantId = tenant_id;
info!(?tenant_id, "notify for tenant_id");
}
Some(Err(err)) => {
let err: serde_json::Error = err;
anyhow::bail!("incorrect notification format: {err:?}"); // FIXME repeat message in error so it can be debugged ?
}
}
}
msg = self.rx.recv() => {
todo!("{msg:?}");
}
}
}
}
}