WIP advertisement sending

This commit is contained in:
Christian Schwarz
2025-06-08 20:10:30 -07:00
parent 9e7556bef2
commit 906a963351
5 changed files with 66 additions and 53 deletions

View File

@@ -56,7 +56,7 @@ impl<T: Send> Sender<T> {
/// # Panics
///
/// If `try_fold` panics, any subsequent call to `send` panic.
pub async fn send<F>(&mut self, value: T, try_fold: F) -> Result<(), SendError>
pub async fn send<F>(&mut self, value: T, try_fold: F) -> Result<(), (T, SendError)>
where
F: Fn(&mut T, T) -> Result<(), T>,
{
@@ -104,7 +104,9 @@ impl<T: Send> Sender<T> {
}
Poll::Pending
}
State::ReceiverGone => Poll::Ready(Err(SendError::ReceiverGone)),
State::ReceiverGone => {
Poll::Ready(Err((value.take().unwrap(), SendError::ReceiverGone)))
}
State::SenderGone(_)
| State::AllGone
| State::SenderDropping

View File

@@ -4,9 +4,8 @@ use tokio_util::sync::CancellationToken;
use crate::sync::spsc_fold;
pub fn channel<T: Send>(init: T) -> (Sender<T>, Receiver<T>) {
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
let (mut tx, rx) = spsc_fold::channel();
poll_ready(tx.send(init, |_, _| unreachable!("init")));
let cancel = CancellationToken::new();
(
Sender {
@@ -28,7 +27,7 @@ pub struct Receiver<T> {
}
impl<T: Send> Sender<T> {
pub fn send_replace(&mut self, value: T) -> Result<(), spsc_fold::SendError> {
pub fn send_replace(&mut self, value: T) -> Result<(), (T, spsc_fold::SendError)> {
poll_ready(self.tx.send(value, |old, new| {
*old = new;
Ok(())
@@ -36,9 +35,9 @@ impl<T: Send> Sender<T> {
}
}
impl<T> Receiver<T> {
pub async fn recv(&mut self) -> Result<(), spsc_fold::RecvError> {
todo!()
impl<T: Send> Receiver<T> {
pub async fn recv(&mut self) -> Result<T, spsc_fold::RecvError> {
self.rx.recv().await
}
pub async fn cancelled(&mut self) {
self.cancel.cancelled().await

View File

@@ -201,7 +201,7 @@ pub(crate) struct Manager {
pub(crate) wal_seg_size: usize,
pub(crate) walsenders: Arc<WalSenders>,
pub(crate) wal_backup: Arc<WalBackup>,
pub(crate) wal_advertiser: wal_advertiser::advmap::SafekeeperTimelineHandle,
pub(crate) wal_advertiser: wal_advertiser::SafekeeperTimelineHandle,
// current state
pub(crate) state_version_rx: tokio::sync::watch::Receiver<usize>,

View File

@@ -1,27 +1,33 @@
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc, time::Duration};
use anyhow::Context;
use tracing::{Instrument, error, info, info_span};
use utils::{
id::TenantTimelineId,
id::{NodeId, TenantTimelineId},
lsn::Lsn,
sync::{spsc_fold, spsc_watch},
};
use crate::{GlobalTimelines, SafeKeeperConf};
type Advs = HashMap<TenantTimelineId, Lsn>;
pub async fn task_main(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
let mut world = sk_ps_discovery::World::default();
let mut senders: HashMap<NodeId> = HashMap::new();
let mut senders: HashMap<utils::id::NodeId, spsc_watch::Sender<Advs>> = HashMap::new();
let mut endpoints: HashMap<utils::id::NodeId, tonic::transport::Endpoint> = HashMap::new();
loop {
let advertisements = world.get_commit_lsn_advertisements();
for (node_id, advs) in advertisements {
loop {
for (node_id, mut advs) in advertisements {
'inner: loop {
let tx = senders.entry(node_id).or_insert_with(|| {
let (tx, rx) = spsc_watch::channel(advs);
let (tx, rx) = spsc_watch::channel();
tokio::spawn(
NodeTask {
PageserverTask {
ps_id: node_id,
advs: rx,
}
@@ -30,8 +36,11 @@ pub async fn task_main(
);
tx
});
if let Err(err) = tx.send_modify(advs) {
if let Err((failed, err)) = tx.send_replace(advs) {
senders.remove(&node_id);
advs = failed;
} else {
break 'inner;
}
}
}
@@ -40,14 +49,18 @@ pub async fn task_main(
struct PageserverTask {
ps_id: NodeId,
advs: spsc_watch::Receiver<HashMap<TenantTimelineId, Lsn>>,
endpoint: tonic::transport::Endpoint,
advs: spsc_watch::Receiver<Advs>,
}
impl PageserverTask {
/// Cancellation: happens through last PageserverHandle being dropped.
async fn run(mut self) {
let mut current;
loop {
let Ok(advs) = self.advs.recv().await else {
info!("main task gone, exiting");
return;
};
let res = self.run0(advs).await;
match res {
Ok(()) => {}
@@ -64,7 +77,10 @@ impl PageserverTask {
use storage_broker::wal_advertisement::pageserver_client::PageserverClient;
let stream = async_stream::stream! { loop {
for (tenant_timeline_id, commit_lsn) in advs {
yield proto::CommitLsnAdvertisement {tenant_timeline_id: Some(tenant_timeline_id), commit_lsn: Some(commit_lsn) };
yield proto::CommitLsnAdvertisement {tenant_timeline_id: Some(proto::TenantTimelineId {
tenant_id: tenant_timeline_id.tenant_id.as_ref().to_owned(),
timeline_id: tenant_timeline_id.timeline_id.as_ref().to_owned(),
}), commit_lsn: commit_lsn.0 };
}
}};
let client: PageserverClient<_> =
@@ -78,41 +94,38 @@ impl PageserverTask {
}
}
struct GlobalState {}
#[derive(Default)]
pub struct GlobalState {}
pub mod advmap {
use std::sync::Arc;
use utils::id::TenantId;
use utils::id::TenantId;
use crate::timeline::Timeline;
use crate::timeline::Timeline;
pub struct World {}
pub struct SafekeeperTimelineHandle {}
pub struct World {}
pub struct SafekeeperTimelineHandle {}
impl World {
pub fn update_pageserver_attachments(
&self,
tenant_id: TenantId,
update: safekeeper_api::models::TenantShardPageserverAttachmentChange,
) -> anyhow::Result<()> {
todo!()
}
pub fn register_timeline(
&self,
tli: Arc<Timeline>,
) -> anyhow::Result<SafekeeperTimelineHandle> {
todo!()
}
impl GlobalState {
pub fn update_pageserver_attachments(
&self,
tenant_id: TenantId,
update: safekeeper_api::models::TenantShardPageserverAttachmentChange,
) -> anyhow::Result<()> {
todo!()
}
impl SafekeeperTimelineHandle {
pub fn ready_for_eviction(&self) -> bool {
todo!()
}
}
impl Default for World {
fn default() -> Self {
todo!()
}
pub fn register_timeline(
&self,
tli: Arc<Timeline>,
) -> anyhow::Result<SafekeeperTimelineHandle> {
todo!()
}
}
impl SafekeeperTimelineHandle {
pub fn ready_for_eviction(&self) -> bool {
todo!()
}
}
impl Default for World {
fn default() -> Self {
todo!()
}
}

View File

@@ -11,7 +11,7 @@ service Pageserver {
message CommitLsnAdvertisement {
TenantTimelineId tenant_timeline_id = 1;
uint64 commit_lsn = 2;
}
message RemoteConsistentLsnAdvertisement {
@@ -25,6 +25,5 @@ message RemoteConsistentLsnAdvertisement {
message TenantTimelineId {
bytes tenant_id = 1;
bytes timeline_id = 2;
uint64 commit_lsn = 3;
}