diff --git a/libs/utils/src/sync/spsc_fold.rs b/libs/utils/src/sync/spsc_fold.rs index 7dfbf40411..64e664b19e 100644 --- a/libs/utils/src/sync/spsc_fold.rs +++ b/libs/utils/src/sync/spsc_fold.rs @@ -56,7 +56,7 @@ impl Sender { /// # Panics /// /// If `try_fold` panics, any subsequent call to `send` panic. - pub async fn send(&mut self, value: T, try_fold: F) -> Result<(), SendError> + pub async fn send(&mut self, value: T, try_fold: F) -> Result<(), (T, SendError)> where F: Fn(&mut T, T) -> Result<(), T>, { @@ -104,7 +104,9 @@ impl Sender { } Poll::Pending } - State::ReceiverGone => Poll::Ready(Err(SendError::ReceiverGone)), + State::ReceiverGone => { + Poll::Ready(Err((value.take().unwrap(), SendError::ReceiverGone))) + } State::SenderGone(_) | State::AllGone | State::SenderDropping diff --git a/libs/utils/src/sync/spsc_watch.rs b/libs/utils/src/sync/spsc_watch.rs index 6666aa3eaf..97b7e754ed 100644 --- a/libs/utils/src/sync/spsc_watch.rs +++ b/libs/utils/src/sync/spsc_watch.rs @@ -4,9 +4,8 @@ use tokio_util::sync::CancellationToken; use crate::sync::spsc_fold; -pub fn channel(init: T) -> (Sender, Receiver) { +pub fn channel() -> (Sender, Receiver) { let (mut tx, rx) = spsc_fold::channel(); - poll_ready(tx.send(init, |_, _| unreachable!("init"))); let cancel = CancellationToken::new(); ( Sender { @@ -28,7 +27,7 @@ pub struct Receiver { } impl Sender { - pub fn send_replace(&mut self, value: T) -> Result<(), spsc_fold::SendError> { + pub fn send_replace(&mut self, value: T) -> Result<(), (T, spsc_fold::SendError)> { poll_ready(self.tx.send(value, |old, new| { *old = new; Ok(()) @@ -36,9 +35,9 @@ impl Sender { } } -impl Receiver { - pub async fn recv(&mut self) -> Result<(), spsc_fold::RecvError> { - todo!() +impl Receiver { + pub async fn recv(&mut self) -> Result { + self.rx.recv().await } pub async fn cancelled(&mut self) { self.cancel.cancelled().await diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index 1119e39b17..475d2eeea4 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -201,7 +201,7 @@ pub(crate) struct Manager { pub(crate) wal_seg_size: usize, pub(crate) walsenders: Arc, pub(crate) wal_backup: Arc, - pub(crate) wal_advertiser: wal_advertiser::advmap::SafekeeperTimelineHandle, + pub(crate) wal_advertiser: wal_advertiser::SafekeeperTimelineHandle, // current state pub(crate) state_version_rx: tokio::sync::watch::Receiver, diff --git a/safekeeper/src/wal_advertiser.rs b/safekeeper/src/wal_advertiser.rs index 16ae869b9f..45d115e52f 100644 --- a/safekeeper/src/wal_advertiser.rs +++ b/safekeeper/src/wal_advertiser.rs @@ -1,27 +1,33 @@ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc, time::Duration}; +use anyhow::Context; +use tracing::{Instrument, error, info, info_span}; use utils::{ - id::TenantTimelineId, + id::{NodeId, TenantTimelineId}, + lsn::Lsn, sync::{spsc_fold, spsc_watch}, }; use crate::{GlobalTimelines, SafeKeeperConf}; +type Advs = HashMap; + pub async fn task_main( conf: Arc, global_timelines: Arc, ) -> anyhow::Result<()> { let mut world = sk_ps_discovery::World::default(); - let mut senders: HashMap = HashMap::new(); + let mut senders: HashMap> = HashMap::new(); + let mut endpoints: HashMap = HashMap::new(); loop { let advertisements = world.get_commit_lsn_advertisements(); - for (node_id, advs) in advertisements { - loop { + for (node_id, mut advs) in advertisements { + 'inner: loop { let tx = senders.entry(node_id).or_insert_with(|| { - let (tx, rx) = spsc_watch::channel(advs); + let (tx, rx) = spsc_watch::channel(); tokio::spawn( - NodeTask { + PageserverTask { ps_id: node_id, advs: rx, } @@ -30,8 +36,11 @@ pub async fn task_main( ); tx }); - if let Err(err) = tx.send_modify(advs) { + if let Err((failed, err)) = tx.send_replace(advs) { senders.remove(&node_id); + advs = failed; + } else { + break 'inner; } } } @@ -40,14 +49,18 @@ pub async fn task_main( struct PageserverTask { ps_id: NodeId, - advs: spsc_watch::Receiver>, + endpoint: tonic::transport::Endpoint, + advs: spsc_watch::Receiver, } impl PageserverTask { /// Cancellation: happens through last PageserverHandle being dropped. async fn run(mut self) { - let mut current; loop { + let Ok(advs) = self.advs.recv().await else { + info!("main task gone, exiting"); + return; + }; let res = self.run0(advs).await; match res { Ok(()) => {} @@ -64,7 +77,10 @@ impl PageserverTask { use storage_broker::wal_advertisement::pageserver_client::PageserverClient; let stream = async_stream::stream! { loop { for (tenant_timeline_id, commit_lsn) in advs { - yield proto::CommitLsnAdvertisement {tenant_timeline_id: Some(tenant_timeline_id), commit_lsn: Some(commit_lsn) }; + yield proto::CommitLsnAdvertisement {tenant_timeline_id: Some(proto::TenantTimelineId { + tenant_id: tenant_timeline_id.tenant_id.as_ref().to_owned(), + timeline_id: tenant_timeline_id.timeline_id.as_ref().to_owned(), + }), commit_lsn: commit_lsn.0 }; } }}; let client: PageserverClient<_> = @@ -78,41 +94,38 @@ impl PageserverTask { } } -struct GlobalState {} +#[derive(Default)] +pub struct GlobalState {} -pub mod advmap { - use std::sync::Arc; +use utils::id::TenantId; - use utils::id::TenantId; +use crate::timeline::Timeline; - use crate::timeline::Timeline; +pub struct World {} +pub struct SafekeeperTimelineHandle {} - pub struct World {} - pub struct SafekeeperTimelineHandle {} - - impl World { - pub fn update_pageserver_attachments( - &self, - tenant_id: TenantId, - update: safekeeper_api::models::TenantShardPageserverAttachmentChange, - ) -> anyhow::Result<()> { - todo!() - } - pub fn register_timeline( - &self, - tli: Arc, - ) -> anyhow::Result { - todo!() - } +impl GlobalState { + pub fn update_pageserver_attachments( + &self, + tenant_id: TenantId, + update: safekeeper_api::models::TenantShardPageserverAttachmentChange, + ) -> anyhow::Result<()> { + todo!() } - impl SafekeeperTimelineHandle { - pub fn ready_for_eviction(&self) -> bool { - todo!() - } - } - impl Default for World { - fn default() -> Self { - todo!() - } + pub fn register_timeline( + &self, + tli: Arc, + ) -> anyhow::Result { + todo!() + } +} +impl SafekeeperTimelineHandle { + pub fn ready_for_eviction(&self) -> bool { + todo!() + } +} +impl Default for World { + fn default() -> Self { + todo!() } } diff --git a/storage_broker/proto/wal_advertisement.proto b/storage_broker/proto/wal_advertisement.proto index 1528dbcbd4..7a49b75f76 100644 --- a/storage_broker/proto/wal_advertisement.proto +++ b/storage_broker/proto/wal_advertisement.proto @@ -11,7 +11,7 @@ service Pageserver { message CommitLsnAdvertisement { TenantTimelineId tenant_timeline_id = 1; - + uint64 commit_lsn = 2; } message RemoteConsistentLsnAdvertisement { @@ -25,6 +25,5 @@ message RemoteConsistentLsnAdvertisement { message TenantTimelineId { bytes tenant_id = 1; bytes timeline_id = 2; - uint64 commit_lsn = 3; }