From a80790e0c45ac3ec4bca6319e8ee634b973c8035 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 12 Jun 2025 06:41:07 -0700 Subject: [PATCH] WIP --- Cargo.lock | 1 + libs/safekeeper_api/src/models.rs | 3 + libs/utils/src/sync/spsc_watch.rs | 6 +- safekeeper/Cargo.toml | 1 + safekeeper/src/bin/safekeeper.rs | 23 +- safekeeper/src/lib.rs | 3 + safekeeper/src/pageserver_connectivity.rs | 117 ++++++++++ safekeeper/src/timeline_manager.rs | 2 +- safekeeper/src/timelines_global_map.rs | 8 +- safekeeper/src/wal_advertiser.rs | 218 ++++++++++++------ safekeeper/src/wal_advertiser/persistence.rs | 0 storage_broker/proto/broker.proto | 4 +- .../src/service/sk_ps_discovery.rs | 2 +- 13 files changed, 301 insertions(+), 87 deletions(-) create mode 100644 safekeeper/src/pageserver_connectivity.rs create mode 100644 safekeeper/src/wal_advertiser/persistence.rs diff --git a/Cargo.lock b/Cargo.lock index e4ab95ac52..6907f49361 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6067,6 +6067,7 @@ dependencies = [ "sk_ps_discovery", "smallvec", "storage_broker", + "storage_controller_client", "strum", "strum_macros", "thiserror 1.0.69", diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 268b4a742f..bcafbcee31 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -333,4 +333,7 @@ pub struct TenantShardPageserverAttachment { pub shard_id: ShardIndex, pub generation: Generation, pub ps_id: NodeId, + // TODO: avoid transmitting this with every request. + // How nice things could be if there were simple DNS records for ps-$node_id.$cell.$region.$cloud.neon.tech + pub ps_hostname: String, // TODO: some type safety } diff --git a/libs/utils/src/sync/spsc_watch.rs b/libs/utils/src/sync/spsc_watch.rs index 97b7e754ed..6e0ae6e108 100644 --- a/libs/utils/src/sync/spsc_watch.rs +++ b/libs/utils/src/sync/spsc_watch.rs @@ -5,12 +5,12 @@ use tokio_util::sync::CancellationToken; use crate::sync::spsc_fold; pub fn channel() -> (Sender, Receiver) { - let (mut tx, rx) = spsc_fold::channel(); + let (tx, rx) = spsc_fold::channel(); let cancel = CancellationToken::new(); ( Sender { tx, - cancel: cancel.clone().drop_guard(), + _cancel: cancel.clone().drop_guard(), }, Receiver { rx, cancel }, ) @@ -18,7 +18,7 @@ pub fn channel() -> (Sender, Receiver) { pub struct Sender { tx: spsc_fold::Sender, - cancel: tokio_util::sync::DropGuard, + _cancel: tokio_util::sync::DropGuard, } pub struct Receiver { diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 8be164f52c..dd0c580542 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -67,6 +67,7 @@ sk_ps_discovery.workspace = true sha2.workspace = true sd-notify.workspace = true storage_broker.workspace = true +storage_controller_client.workspace = true tokio-stream.workspace = true http-utils.workspace = true utils.workspace = true diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 8de2d0fb75..62918c55c1 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -25,11 +25,10 @@ use safekeeper::defaults::{ use safekeeper::wal_backup::WalBackup; use safekeeper::{ BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, - WAL_ADVERTISER_RUNTIME, WAL_SERVICE_RUNTIME, broker, control_file, http, wal_advertiser, - wal_service, + WAL_ADVERTISER_RUNTIME, WAL_SERVICE_RUNTIME, broker, control_file, http, wal_service, }; use sd_notify::NotifyState; -use storage_broker::{DEFAULT_ENDPOINT, Uri, wal_advertisement}; +use storage_broker::{DEFAULT_ENDPOINT, Uri}; use tokio::runtime::Handle; use tokio::signal::unix::{SignalKind, signal}; use tokio::task::JoinError; @@ -627,11 +626,25 @@ async fn start_safekeeper(conf: Arc) -> Result<()> { .map(|res| ("broker main".to_owned(), res)); tasks_handles.push(Box::pin(broker_task_handle)); + let ps_connectivity_handle = current_thread_rt + .as_ref() + .unwrap_or_else(|| HTTP_RUNTIME.handle()) + .spawn( + global_timelines + .get_pageserver_connectivity() + .task_main() + .instrument(info_span!("pageserver_connectivity")), + ) + .map(|res| ("pageserver connectivity".to_owned(), res)); + tasks_handles.push(Box::pin(ps_connectivity_handle)); + let wal_advertiser_task_handle = current_thread_rt .as_ref() - .unwrap_or_else(WAL_ADVERTISER_RUNTIME.handle()) + .unwrap_or_else(|| WAL_ADVERTISER_RUNTIME.handle()) .spawn( - wal_advertiser::task_main(conf.clone(), global_timelines.clone()) + global_timelines + .get_wal_advertiser() + .task_main() .instrument(info_span!("wal_advertiser_main")), ) .map(|res| ("wal advertiser task handle".to_owned(), res)); diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index c9d78e817c..ea0a3db490 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -44,6 +44,7 @@ pub mod wal_backup_partial; pub mod wal_reader_stream; pub mod wal_service; pub mod wal_storage; +pub(crate) mod pageserver_connectivity; #[cfg(any(test, feature = "benchmarking"))] pub mod test_utils; @@ -124,6 +125,7 @@ pub struct SafeKeeperConf { pub ssl_ca_certs: Vec, pub use_https_safekeeper_api: bool, pub enable_tls_wal_service_api: bool, + pub storage_controller_api: Option, } impl SafeKeeperConf { @@ -169,6 +171,7 @@ impl SafeKeeperConf { ssl_ca_certs: Vec::new(), use_https_safekeeper_api: false, enable_tls_wal_service_api: false, + storage_controller_api: None, } } } diff --git a/safekeeper/src/pageserver_connectivity.rs b/safekeeper/src/pageserver_connectivity.rs new file mode 100644 index 0000000000..ff7181b58e --- /dev/null +++ b/safekeeper/src/pageserver_connectivity.rs @@ -0,0 +1,117 @@ +use desim::world::Node; +use hyper::Uri; +use pageserver_api::controller_api; +use utils::id::TenantId; + +use crate::timeline::Timeline; + +use std::{ + collections::{HashMap, hash_map}, + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; + +use anyhow::Context; +use tracing::{Instrument, error, info, info_span, warn}; +use utils::{ + id::{NodeId, TenantTimelineId}, + lsn::Lsn, + sync::{spsc_fold, spsc_watch}, +}; + +use crate::{GlobalTimelines, SafeKeeperConf}; + +type Advs = HashMap; + +#[derive(Default)] +pub struct GlobalState { + inner: once_cell::sync::OnceCell>, +} + +enum Message { + Resolve { + ps_id: NodeId, + reply: tokio::sync::oneshot::Sender>, + }, +} + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("cancelled")] + Cancelled, +} + +impl GlobalState { + pub fn task_main(&self) -> impl 'static + Future> + Send { + let mut ret = None; + self.inner.get_or_init(|| { + let (tx, task_fut) = MainTask::prepare_run(); + ret = Some(task_fut); + tx + }); + ret.expect("must only call this method once") + } +} + +struct MainTask { + rx: tokio::sync::mpsc::Receiver, +} + +impl MainTask { + fn prepare_run() -> ( + tokio::sync::mpsc::Sender, + impl Future> + Send, + ) { + let (tx, rx) = tokio::sync::mpsc::channel(100 /* TODO think */); + let task = MainTask { rx }; + (tx, task.task()) + } + async fn task(mut self) -> anyhow::Result<()> { + // TODO: persistence + + let storcon_client = todo!(); + + let mut resolution: HashMap> = + HashMap::new(); + + while let Some(rx) = self.rx.recv().await { + match rx { + Message::Resolve { ps_id, reply } => match resolution.entry(ps_id) { + hash_map::Entry::Occupied(e) => {} + hash_map::Entry::Vacant(e) => { + tokio::spawn( + ResolutionTask { ps_id, storcon_client }.run() + ) + }, + }, + } + } + } +} + +struct ResolutionTask { + ps_id: NodeId, + storcon_client: storage_controller_client::control_api::Client, +} + +impl ResolutionTask { + pub async fn run(self) -> Result { + loop { + // XXX: well-defined upcall API? + let res = self + .storcon_client + .dispatch( + reqwest::Method::GET, + format!("control/v1/node/{}", self.node_id), + None, + ) + .await; + let node: NodeDescribeResponse = match res { + Ok(res) => res, + Err(err) => { + warn!("storcon upcall failed") + } + }; + } + } +} diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index 475d2eeea4..007e06f3df 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -433,7 +433,7 @@ impl Manager { state_version_rx: tli.get_state_version_rx(), num_computes_rx: tli.get_walreceivers().get_num_rx(), tli_broker_active: broker_active_set.guard(tli.clone()), - wal_advertiser: wal_advertiser.register_timeline(tli.clone()).unwrap(), + wal_advertiser: wal_advertiser.new_timeline(tli.clone()).await.unwrap(), last_removed_segno: 0, is_offloaded, backup_task: None, diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 8b201322e5..a72eec0c8e 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -27,7 +27,7 @@ use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_t use crate::timelines_set::TimelinesSet; use crate::wal_backup::WalBackup; use crate::wal_storage::Storage; -use crate::{SafeKeeperConf, control_file, wal_advertiser, wal_storage}; +use crate::{SafeKeeperConf, control_file, pageserver_connectivity, wal_advertiser, wal_storage}; // Timeline entry in the global map: either a ready timeline, or mark that it is // being created. @@ -48,6 +48,7 @@ struct GlobalTimelinesState { conf: Arc, broker_active_set: Arc, wal_advertisement: Arc, + pageserver_connectivity: Arc, global_rate_limiter: RateLimiter, wal_backup: Arc, } @@ -105,6 +106,7 @@ impl GlobalTimelines { conf, broker_active_set: Arc::new(TimelinesSet::default()), wal_advertisement: Arc::new(wal_advertiser::GlobalState::default()), + pageserver_connectivity: Arc::new(pageserver_connectivity::GlobalState::default()), global_rate_limiter: RateLimiter::new(1, 1), wal_backup, }), @@ -601,6 +603,10 @@ impl GlobalTimelines { self.state.lock().unwrap().wal_advertisement.clone() } + pub fn get_pageserver_connectivity(&self) -> Arc { + self.state.lock().unwrap().pageserver_connectivity.clone() + } + pub fn housekeeping(&self, tombstone_ttl: &Duration) { let mut state = self.state.lock().unwrap(); diff --git a/safekeeper/src/wal_advertiser.rs b/safekeeper/src/wal_advertiser.rs index 45d115e52f..ee1c1570a5 100644 --- a/safekeeper/src/wal_advertiser.rs +++ b/safekeeper/src/wal_advertiser.rs @@ -1,7 +1,18 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +mod persistence; +mod pageserver_connectivity; + +use utils::id::TenantId; + +use crate::timeline::Timeline; + +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; use anyhow::Context; -use tracing::{Instrument, error, info, info_span}; +use tracing::{Instrument, error, info, info_span, warn}; use utils::{ id::{NodeId, TenantTimelineId}, lsn::Lsn, @@ -12,44 +23,139 @@ use crate::{GlobalTimelines, SafeKeeperConf}; type Advs = HashMap; -pub async fn task_main( - conf: Arc, - global_timelines: Arc, -) -> anyhow::Result<()> { - let mut world = sk_ps_discovery::World::default(); +#[derive(Default)] +pub struct GlobalState { + inner: once_cell::sync::OnceCell>, +} - let mut senders: HashMap> = HashMap::new(); - let mut endpoints: HashMap = HashMap::new(); - loop { - let advertisements = world.get_commit_lsn_advertisements(); - for (node_id, mut advs) in advertisements { - 'inner: loop { - let tx = senders.entry(node_id).or_insert_with(|| { - let (tx, rx) = spsc_watch::channel(); - tokio::spawn( - PageserverTask { - ps_id: node_id, - advs: rx, - } - .run() - .instrument(info_span!("wal_advertiser", ps_id=%node_id)), - ); - tx - }); - if let Err((failed, err)) = tx.send_replace(advs) { - senders.remove(&node_id); - advs = failed; - } else { - break 'inner; +pub struct SafekeeperTimelineHandle { + tx: tokio::sync::mpsc::Sender, +} + +enum Message { + NewTimeline { + reply: tokio::sync::oneshot::Sender>, + }, +} + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("cancelled")] + Cancelled, +} + +impl GlobalState { + pub fn task_main(&self) -> impl 'static + Future> + Send { + let mut ret = None; + self.inner.get_or_init(|| { + let (tx, task_fut) = MainTask::prepare_run(); + ret = Some(task_fut); + tx + }); + ret.expect("must only call this method once") + } + + pub async fn new_timeline( + &self, + tli: Arc, + ) -> Result { + let tx = self.inner.get().unwrap().clone(); + let handle = SafekeeperTimelineHandle { tx }; + let (reply, rx) = tokio::sync::oneshot::channel(); + let Ok(()) = handle.tx.send(Message::NewTimeline { reply }).await else { + return Err(Error::Cancelled); + }; + let Ok(res) = rx.await else { + return Err(Error::Cancelled); + }; + Ok(handle) + } + pub fn update_pageserver_attachments( + &self, + tenant_id: TenantId, + update: safekeeper_api::models::TenantShardPageserverAttachmentChange, + ) -> anyhow::Result<()> { + todo!() + } +} +impl SafekeeperTimelineHandle { + pub fn ready_for_eviction(&self) -> bool { + todo!() + } +} + +struct MainTask { + rx: tokio::sync::mpsc::Receiver, + world: sk_ps_discovery::World, + senders: HashMap>, +} + +impl MainTask { + fn prepare_run() -> ( + tokio::sync::mpsc::Sender, + impl Future> + Send, + ) { + let (tx, rx) = tokio::sync::mpsc::channel(100 /* TODO think */); + let task = MainTask { + rx, + world: sk_ps_discovery::World::default(), + senders: Default::default(), + }; + (tx, task.task()) + } + async fn task(mut self) -> anyhow::Result<()> { + let mut adv_frequency = tokio::time::interval(Duration::from_secs(1)); + loop { + tokio::select! { + _ = adv_frequency.tick() => { + let start = Instant::now(); + self.advertisements_iteration(); + let elapsed = start.elapsed(); + if elapsed > Duration::from_millis(10) { + warn!(?elapsed, "advertisements iteration is slow"); + } + }, + message = self.rx.recv() => { + match message { + None => anyhow::bail!("last main task sender dropped, shouldn't happen, exiting"), + Some(_) => todo!(), + } + }, + } + } + } + + fn advertisements_iteration(&mut self) { + loop { + let advertisements = self.world.get_commit_lsn_advertisements(); + for (node_id, mut advs) in advertisements { + 'inner: loop { + let tx = self.senders.entry(node_id).or_insert_with(|| { + let (tx, rx) = spsc_watch::channel(); + tokio::spawn( + PageserverTask { + ps_id: node_id, + endpoint: todo!(), + advs: rx, + } + .run() + .instrument(info_span!("wal_advertiser", ps_id=%node_id)), + ); + tx + }); + if let Err((failed, err)) = tx.send_replace(advs) { + self.senders.remove(&node_id); + advs = failed; + } else { + break 'inner; + } } } } } } - struct PageserverTask { ps_id: NodeId, - endpoint: tonic::transport::Endpoint, advs: spsc_watch::Receiver, } @@ -75,57 +181,21 @@ impl PageserverTask { async fn run0(&mut self, advs: HashMap) -> anyhow::Result<()> { use storage_broker::wal_advertisement as proto; use storage_broker::wal_advertisement::pageserver_client::PageserverClient; - let stream = async_stream::stream! { loop { + let stream = async_stream::stream! { for (tenant_timeline_id, commit_lsn) in advs { yield proto::CommitLsnAdvertisement {tenant_timeline_id: Some(proto::TenantTimelineId { tenant_id: tenant_timeline_id.tenant_id.as_ref().to_owned(), timeline_id: tenant_timeline_id.timeline_id.as_ref().to_owned(), }), commit_lsn: commit_lsn.0 }; } - }}; - let client: PageserverClient<_> = - PageserverClient::connect(todo!("how do we learn pageserver hostnames?")) - .await - .context("connect")?; + }; + let mut client: PageserverClient<_> = PageserverClient::connect(self.endpoint.clone()) + .await + .context("connect")?; let publish_stream = client .publish_commit_lsn_advertisements(stream) .await .context("publish stream")?; - } -} - -#[derive(Default)] -pub struct GlobalState {} - -use utils::id::TenantId; - -use crate::timeline::Timeline; - -pub struct World {} -pub struct SafekeeperTimelineHandle {} - -impl GlobalState { - pub fn update_pageserver_attachments( - &self, - tenant_id: TenantId, - update: safekeeper_api::models::TenantShardPageserverAttachmentChange, - ) -> anyhow::Result<()> { - todo!() - } - pub fn register_timeline( - &self, - tli: Arc, - ) -> anyhow::Result { - todo!() - } -} -impl SafekeeperTimelineHandle { - pub fn ready_for_eviction(&self) -> bool { - todo!() - } -} -impl Default for World { - fn default() -> Self { - todo!() + Ok(()) } } diff --git a/safekeeper/src/wal_advertiser/persistence.rs b/safekeeper/src/wal_advertiser/persistence.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/storage_broker/proto/broker.proto b/storage_broker/proto/broker.proto index 3891685589..147635fc23 100644 --- a/storage_broker/proto/broker.proto +++ b/storage_broker/proto/broker.proto @@ -35,14 +35,14 @@ message SafekeeperTimelineInfo { // LSN of the last record. uint64 flush_lsn = 4; // Up to which LSN safekeeper regards its WAL as committed. - uint64 commit_lsn = 5; + uint64 commit_lsn = 5; // yes // LSN up to which safekeeper has backed WAL. uint64 backup_lsn = 6; // LSN of last checkpoint uploaded by pageserver. uint64 remote_consistent_lsn = 7; uint64 peer_horizon_lsn = 8; uint64 local_start_lsn = 9; - uint64 standby_horizon = 14; + uint64 standby_horizon = 14; // yes // A connection string to use for WAL receiving. string safekeeper_connstr = 10; // HTTP endpoint connection string. diff --git a/storage_controller/src/service/sk_ps_discovery.rs b/storage_controller/src/service/sk_ps_discovery.rs index 4ea15c5506..b245b6aea1 100644 --- a/storage_controller/src/service/sk_ps_discovery.rs +++ b/storage_controller/src/service/sk_ps_discovery.rs @@ -232,7 +232,7 @@ impl DeliveryAttempt { generation: Generation::new(self.work.ps_generation as u32), }; match self.work.intent_state.as_str() { - "attached" => TenantShardPageserverAttachmentChange::Attach(val), + "attached" => TenantShardPageserverAttachmentChange::Attach { field1: val }, "detached" => TenantShardPageserverAttachmentChange::Detach(val), x => anyhow::bail!("unknown intent state {x:?}"), }