From 4f4214eea32b20b2e77c8e0273cc3820e6b6ad0b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 6 Jun 2025 18:36:16 -0700 Subject: [PATCH] WIP integrate --- Cargo.lock | 1 + Cargo.toml | 1 + libs/utils/src/lib.rs | 1 + libs/utils/src/sync.rs | 2 + libs/utils/src/sync/spsc_watch.rs | 61 +++++++++++++++ safekeeper/Cargo.toml | 1 + safekeeper/src/bin/safekeeper.rs | 15 +++- safekeeper/src/lib.rs | 8 ++ safekeeper/src/timeline.rs | 2 +- safekeeper/src/timeline_manager.rs | 4 +- safekeeper/src/timelines_global_map.rs | 8 +- safekeeper/src/wal_advertiser.rs | 100 +++++++++++++++++++++++++ 12 files changed, 195 insertions(+), 9 deletions(-) create mode 100644 libs/utils/src/sync/spsc_watch.rs diff --git a/Cargo.lock b/Cargo.lock index 87115efff5..e4ab95ac52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6064,6 +6064,7 @@ dependencies = [ "serde", "serde_json", "sha2", + "sk_ps_discovery", "smallvec", "storage_broker", "strum", diff --git a/Cargo.toml b/Cargo.toml index 11fce50bee..9495297956 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -262,6 +262,7 @@ pq_proto = { version = "0.1", path = "./libs/pq_proto/" } remote_storage = { version = "0.1", path = "./libs/remote_storage/" } safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" } safekeeper_client = { path = "./safekeeper/client" } +sk_ps_discovery = { path = "./libs/sk_ps_discovery" } desim = { version = "0.1", path = "./libs/desim" } storage_broker = { version = "0.1", path = "./storage_broker/" } # Note: main broker code is inside the binary crate, so linking with the library shouldn't be heavy. storage_controller_client = { path = "./storage_controller/client" } diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 9d03cb0d05..7ba8b5de42 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -97,6 +97,7 @@ pub mod elapsed_accum; pub mod merge_join; + #[cfg(target_os = "linux")] pub mod linux_socket_ioctl; diff --git a/libs/utils/src/sync.rs b/libs/utils/src/sync.rs index 280637de8f..96bf500bc9 100644 --- a/libs/utils/src/sync.rs +++ b/libs/utils/src/sync.rs @@ -4,3 +4,5 @@ pub mod duplex; pub mod gate; pub mod spsc_fold; + +pub mod spsc_watch; diff --git a/libs/utils/src/sync/spsc_watch.rs b/libs/utils/src/sync/spsc_watch.rs new file mode 100644 index 0000000000..d29a820645 --- /dev/null +++ b/libs/utils/src/sync/spsc_watch.rs @@ -0,0 +1,61 @@ +//! Like [`tokio::sync::watch`] but the Sender gets an error when the Receiver is gone. +//! +//! TODO: an actually more efficient implementation + +use tokio_util::sync::CancellationToken; + +pub fn channel(init: T) -> (Sender, Receiver) { + let (tx, rx) = tokio::sync::watch::channel(init); + let cancel = CancellationToken::new(); + ( + Sender { + tx, + cancel: cancel.clone().drop_guard(), + }, + Receiver { rx, cancel }, + ) +} + +pub struct Sender { + tx: tokio::sync::watch::Sender, + cancel: tokio_util::sync::DropGuard, +} + +pub struct Receiver { + rx: tokio::sync::watch::Receiver, + cancel: CancellationToken, +} + +pub enum RecvError { + SenderGone, +} +pub enum SendError { + ReceiverGone, +} + +impl Sender { + pub fn send_replace(&mut self, value: T) -> Result<(), (T, SendError)> { + if self.tx.receiver_count() == 0 { + // we don't provide outside access to tx, so, we know the only + // rx that is ever going to exist is gone + return Err((value, SendError::ReceiverGone)); + } + self.tx.send_replace(value); + Ok(()) + } +} + +impl Receiver { + pub async fn changed(&mut self) -> Result<(), RecvError> { + match self.rx.changed().await { + Ok(()) => Ok(self.rx.borrow()), + Err(e) => Err(RecvError::SenderGone), + } + } + pub fn borrow(&self) -> impl Deref { + self.rx.borrow() + } + pub async fn closed(&mut self) { + self.cancel.cancelled().await + } +} diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 287fada465..8be164f52c 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -63,6 +63,7 @@ pq_proto.workspace = true remote_storage.workspace = true safekeeper_api.workspace = true safekeeper_client.workspace = true +sk_ps_discovery.workspace = true sha2.workspace = true sd-notify.workspace = true storage_broker.workspace = true diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 8d31ada24f..8de2d0fb75 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -25,10 +25,11 @@ use safekeeper::defaults::{ use safekeeper::wal_backup::WalBackup; use safekeeper::{ BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, - WAL_SERVICE_RUNTIME, broker, control_file, http, wal_service, + WAL_ADVERTISER_RUNTIME, WAL_SERVICE_RUNTIME, broker, control_file, http, wal_advertiser, + wal_service, }; use sd_notify::NotifyState; -use storage_broker::{DEFAULT_ENDPOINT, Uri}; +use storage_broker::{DEFAULT_ENDPOINT, Uri, wal_advertisement}; use tokio::runtime::Handle; use tokio::signal::unix::{SignalKind, signal}; use tokio::task::JoinError; @@ -626,6 +627,16 @@ async fn start_safekeeper(conf: Arc) -> Result<()> { .map(|res| ("broker main".to_owned(), res)); tasks_handles.push(Box::pin(broker_task_handle)); + let wal_advertiser_task_handle = current_thread_rt + .as_ref() + .unwrap_or_else(WAL_ADVERTISER_RUNTIME.handle()) + .spawn( + wal_advertiser::task_main(conf.clone(), global_timelines.clone()) + .instrument(info_span!("wal_advertiser_main")), + ) + .map(|res| ("wal advertiser task handle".to_owned(), res)); + tasks_handles.push(Box::pin(wal_advertiser_task_handle)); + set_build_info_metric(GIT_VERSION, BUILD_TAG); // TODO: update tokio-stream, convert to real async Stream with diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 72b3689125..c9d78e817c 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -199,6 +199,14 @@ pub static BROKER_RUNTIME: Lazy = Lazy::new(|| { .expect("Failed to create broker runtime") }); +pub static WAL_ADVERTISER_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("wal advertiser worker") + .enable_all() + .build() + .expect("Failed to create broker runtime") +}); + pub static WAL_BACKUP_RUNTIME: Lazy = Lazy::new(|| { tokio::runtime::Builder::new_multi_thread() .thread_name("WAL backup worker") diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 7790e767e5..90d8b18fb0 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -549,7 +549,7 @@ impl Timeline { broker_active_set: Arc, partial_backup_rate_limiter: RateLimiter, wal_backup: Arc, - wal_advertiser: Arc, + wal_advertiser: Arc, ) { let (tx, rx) = self.manager_ctl.bootstrap_manager(); diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index 729d3b0409..1119e39b17 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -241,7 +241,7 @@ pub async fn main_task( mut manager_rx: tokio::sync::mpsc::UnboundedReceiver, global_rate_limiter: RateLimiter, wal_backup: Arc, - wal_advertiser: Arc, + wal_advertiser: Arc, ) { tli.set_status(Status::Started); @@ -423,7 +423,7 @@ impl Manager { manager_tx: tokio::sync::mpsc::UnboundedSender, global_rate_limiter: RateLimiter, wal_backup: Arc, - wal_advertiser: Arc, + wal_advertiser: Arc, ) -> Manager { let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await; Manager { diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 10a883cd26..8b201322e5 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -47,7 +47,7 @@ struct GlobalTimelinesState { conf: Arc, broker_active_set: Arc, - wal_advertisement: Arc, + wal_advertisement: Arc, global_rate_limiter: RateLimiter, wal_backup: Arc, } @@ -61,7 +61,7 @@ impl GlobalTimelinesState { Arc, RateLimiter, Arc, - Arc, + Arc, ) { ( self.conf.clone(), @@ -104,7 +104,7 @@ impl GlobalTimelines { tombstones: HashMap::new(), conf, broker_active_set: Arc::new(TimelinesSet::default()), - wal_advertisement: Arc::new(wal_advertiser::advmap::World::default()), + wal_advertisement: Arc::new(wal_advertiser::GlobalState::default()), global_rate_limiter: RateLimiter::new(1, 1), wal_backup, }), @@ -597,7 +597,7 @@ impl GlobalTimelines { Ok(deleted) } - pub fn get_wal_advertiser(&self) -> Arc { + pub fn get_wal_advertiser(&self) -> Arc { self.state.lock().unwrap().wal_advertisement.clone() } diff --git a/safekeeper/src/wal_advertiser.rs b/safekeeper/src/wal_advertiser.rs index 11b5d524a8..1ca9133f6a 100644 --- a/safekeeper/src/wal_advertiser.rs +++ b/safekeeper/src/wal_advertiser.rs @@ -1,3 +1,103 @@ +use std::collections::HashMap; + +use utils::{ + id::TenantTimelineId, + sync::{spsc_fold, spsc_watch}, +}; + +use crate::{GlobalTimelines, SafeKeeperConf}; + +pub async fn task_main( + conf: Arc, + global_timelines: Arc, +) -> anyhow::Result<()> { + let mut world = sk_ps_discovery::World::default(); + + let mut senders: HashMap = HashMap::new(); + loop { + let advertisements = world.get_commit_lsn_advertisements(); + for (node_id, advs) in advertisements { + loop { + let tx = senders.entry(node_id).or_insert_with(|| { + let (tx, rx) = utils::sync::spsc_fold::channel(); + tokio::spawn( + NodeTask { + ps_id: node_id, + advs: rx, + } + .run() + .instrument(info_span!("wal_advertiser", ps_id=%node_id)), + ); + tx + }); + if let Err(err) = tx.send_modify(advs) { + senders.remove(&node_id); + } + } + } + } +} + +struct PageserverTask { + ps_id: NodeId, + advs: spsc_fold::Receiver>, +} + +impl PageserverTask { + /// Cancellation: happens through last PageserverHandle being dropped. + async fn run(mut self) { + loop { + let Ok(advs) = self.advs.recv().await else { + return; + }; + tokio::select! { + _ = self.advs.cancelled() => { + return; + } + res = self.run0() => { + if let Err(err) = res { + error!(?err, "failure sending advertisements, restarting after back-off"); + // TODO: backoff? + cancellation sensitivity + tokio::time::sleep(Duration::from_secs(10)).await; + } + continue; + } + }; + } + } + async fn run0(&mut self) -> anyhow::Result<()> { + use storage_broker::wal_advertisement as proto; + use storage_broker::wal_advertisement::pageserver_client::PageserverClient; + let stream = async_stream::stream! { loop { + while self.pending_advertisements.is_empty() { + tokio::select! { + _ = self.advs.cancelled() => { + return; + } + _ = self.notify_pending_advertisements.notified() => {} + } + let mut state = self.state.lock().unwrap(); + std::mem::swap( + &mut state.pending_advertisements, + &mut self.pending_advertisements, + ); + } + for (tenant_timeline_id, commit_lsn) in self.pending_advertisements.drain() { + yield proto::CommitLsnAdvertisement {tenant_timeline_id: Some(tenant_timeline_id), commit_lsn: Some(commit_lsn) }; + } + } }; + let client: PageserverClient<_> = PageserverClient::connect(todo!()) + .await + .context("connect")?; + let publish_stream = client + .publish_commit_lsn_advertisements(stream) + .await + .context("publish stream")?; + } +} + +struct GlobalState {} + pub mod advmap { use std::sync::Arc;