WIP integrate

This commit is contained in:
Christian Schwarz
2025-06-06 18:36:16 -07:00
parent 3dffbda428
commit 4f4214eea3
12 changed files with 195 additions and 9 deletions

1
Cargo.lock generated
View File

@@ -6064,6 +6064,7 @@ dependencies = [
"serde",
"serde_json",
"sha2",
"sk_ps_discovery",
"smallvec",
"storage_broker",
"strum",

View File

@@ -262,6 +262,7 @@ pq_proto = { version = "0.1", path = "./libs/pq_proto/" }
remote_storage = { version = "0.1", path = "./libs/remote_storage/" }
safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" }
safekeeper_client = { path = "./safekeeper/client" }
sk_ps_discovery = { path = "./libs/sk_ps_discovery" }
desim = { version = "0.1", path = "./libs/desim" }
storage_broker = { version = "0.1", path = "./storage_broker/" } # Note: main broker code is inside the binary crate, so linking with the library shouldn't be heavy.
storage_controller_client = { path = "./storage_controller/client" }

View File

@@ -97,6 +97,7 @@ pub mod elapsed_accum;
pub mod merge_join;
#[cfg(target_os = "linux")]
pub mod linux_socket_ioctl;

View File

@@ -4,3 +4,5 @@ pub mod duplex;
pub mod gate;
pub mod spsc_fold;
pub mod spsc_watch;

View File

@@ -0,0 +1,61 @@
//! Like [`tokio::sync::watch`] but the Sender gets an error when the Receiver is gone.
//!
//! TODO: an actually more efficient implementation
use tokio_util::sync::CancellationToken;
pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
let (tx, rx) = tokio::sync::watch::channel(init);
let cancel = CancellationToken::new();
(
Sender {
tx,
cancel: cancel.clone().drop_guard(),
},
Receiver { rx, cancel },
)
}
pub struct Sender<T> {
tx: tokio::sync::watch::Sender<T>,
cancel: tokio_util::sync::DropGuard,
}
pub struct Receiver<T> {
rx: tokio::sync::watch::Receiver<T>,
cancel: CancellationToken,
}
pub enum RecvError {
SenderGone,
}
pub enum SendError {
ReceiverGone,
}
impl<T> Sender<T> {
pub fn send_replace(&mut self, value: T) -> Result<(), (T, SendError)> {
if self.tx.receiver_count() == 0 {
// we don't provide outside access to tx, so, we know the only
// rx that is ever going to exist is gone
return Err((value, SendError::ReceiverGone));
}
self.tx.send_replace(value);
Ok(())
}
}
impl<T> Receiver<T> {
pub async fn changed(&mut self) -> Result<(), RecvError> {
match self.rx.changed().await {
Ok(()) => Ok(self.rx.borrow()),
Err(e) => Err(RecvError::SenderGone),
}
}
pub fn borrow(&self) -> impl Deref<Target = T> {
self.rx.borrow()
}
pub async fn closed(&mut self) {
self.cancel.cancelled().await
}
}

View File

@@ -63,6 +63,7 @@ pq_proto.workspace = true
remote_storage.workspace = true
safekeeper_api.workspace = true
safekeeper_client.workspace = true
sk_ps_discovery.workspace = true
sha2.workspace = true
sd-notify.workspace = true
storage_broker.workspace = true

View File

@@ -25,10 +25,11 @@ use safekeeper::defaults::{
use safekeeper::wal_backup::WalBackup;
use safekeeper::{
BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_service,
WAL_ADVERTISER_RUNTIME, WAL_SERVICE_RUNTIME, broker, control_file, http, wal_advertiser,
wal_service,
};
use sd_notify::NotifyState;
use storage_broker::{DEFAULT_ENDPOINT, Uri};
use storage_broker::{DEFAULT_ENDPOINT, Uri, wal_advertisement};
use tokio::runtime::Handle;
use tokio::signal::unix::{SignalKind, signal};
use tokio::task::JoinError;
@@ -626,6 +627,16 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
.map(|res| ("broker main".to_owned(), res));
tasks_handles.push(Box::pin(broker_task_handle));
let wal_advertiser_task_handle = current_thread_rt
.as_ref()
.unwrap_or_else(WAL_ADVERTISER_RUNTIME.handle())
.spawn(
wal_advertiser::task_main(conf.clone(), global_timelines.clone())
.instrument(info_span!("wal_advertiser_main")),
)
.map(|res| ("wal advertiser task handle".to_owned(), res));
tasks_handles.push(Box::pin(wal_advertiser_task_handle));
set_build_info_metric(GIT_VERSION, BUILD_TAG);
// TODO: update tokio-stream, convert to real async Stream with

View File

@@ -199,6 +199,14 @@ pub static BROKER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
.expect("Failed to create broker runtime")
});
pub static WAL_ADVERTISER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("wal advertiser worker")
.enable_all()
.build()
.expect("Failed to create broker runtime")
});
pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("WAL backup worker")

View File

@@ -549,7 +549,7 @@ impl Timeline {
broker_active_set: Arc<TimelinesSet>,
partial_backup_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
wal_advertiser: Arc<wal_advertiser::advmap::World>,
wal_advertiser: Arc<wal_advertiser::GlobalState>,
) {
let (tx, rx) = self.manager_ctl.bootstrap_manager();

View File

@@ -241,7 +241,7 @@ pub async fn main_task(
mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
global_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
wal_advertiser: Arc<wal_advertiser::advmap::World>,
wal_advertiser: Arc<wal_advertiser::GlobalState>,
) {
tli.set_status(Status::Started);
@@ -423,7 +423,7 @@ impl Manager {
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
global_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
wal_advertiser: Arc<wal_advertiser::advmap::World>,
wal_advertiser: Arc<wal_advertiser::GlobalState>,
) -> Manager {
let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await;
Manager {

View File

@@ -47,7 +47,7 @@ struct GlobalTimelinesState {
conf: Arc<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
wal_advertisement: Arc<wal_advertiser::advmap::World>,
wal_advertisement: Arc<wal_advertiser::GlobalState>,
global_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
}
@@ -61,7 +61,7 @@ impl GlobalTimelinesState {
Arc<TimelinesSet>,
RateLimiter,
Arc<WalBackup>,
Arc<wal_advertiser::advmap::World>,
Arc<wal_advertiser::GlobalState>,
) {
(
self.conf.clone(),
@@ -104,7 +104,7 @@ impl GlobalTimelines {
tombstones: HashMap::new(),
conf,
broker_active_set: Arc::new(TimelinesSet::default()),
wal_advertisement: Arc::new(wal_advertiser::advmap::World::default()),
wal_advertisement: Arc::new(wal_advertiser::GlobalState::default()),
global_rate_limiter: RateLimiter::new(1, 1),
wal_backup,
}),
@@ -597,7 +597,7 @@ impl GlobalTimelines {
Ok(deleted)
}
pub fn get_wal_advertiser(&self) -> Arc<wal_advertiser::advmap::World> {
pub fn get_wal_advertiser(&self) -> Arc<wal_advertiser::GlobalState> {
self.state.lock().unwrap().wal_advertisement.clone()
}

View File

@@ -1,3 +1,103 @@
use std::collections::HashMap;
use utils::{
id::TenantTimelineId,
sync::{spsc_fold, spsc_watch},
};
use crate::{GlobalTimelines, SafeKeeperConf};
pub async fn task_main(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
let mut world = sk_ps_discovery::World::default();
let mut senders: HashMap<NodeId> = HashMap::new();
loop {
let advertisements = world.get_commit_lsn_advertisements();
for (node_id, advs) in advertisements {
loop {
let tx = senders.entry(node_id).or_insert_with(|| {
let (tx, rx) = utils::sync::spsc_fold::channel();
tokio::spawn(
NodeTask {
ps_id: node_id,
advs: rx,
}
.run()
.instrument(info_span!("wal_advertiser", ps_id=%node_id)),
);
tx
});
if let Err(err) = tx.send_modify(advs) {
senders.remove(&node_id);
}
}
}
}
}
struct PageserverTask {
ps_id: NodeId,
advs: spsc_fold::Receiver<HashMap<TenantTimelineId, Lsn>>,
}
impl PageserverTask {
/// Cancellation: happens through last PageserverHandle being dropped.
async fn run(mut self) {
loop {
let Ok(advs) = self.advs.recv().await else {
return;
};
tokio::select! {
_ = self.advs.cancelled() => {
return;
}
res = self.run0() => {
if let Err(err) = res {
error!(?err, "failure sending advertisements, restarting after back-off");
// TODO: backoff? + cancellation sensitivity
tokio::time::sleep(Duration::from_secs(10)).await;
}
continue;
}
};
}
}
async fn run0(&mut self) -> anyhow::Result<()> {
use storage_broker::wal_advertisement as proto;
use storage_broker::wal_advertisement::pageserver_client::PageserverClient;
let stream = async_stream::stream! { loop {
while self.pending_advertisements.is_empty() {
tokio::select! {
_ = self.advs.cancelled() => {
return;
}
_ = self.notify_pending_advertisements.notified() => {}
}
let mut state = self.state.lock().unwrap();
std::mem::swap(
&mut state.pending_advertisements,
&mut self.pending_advertisements,
);
}
for (tenant_timeline_id, commit_lsn) in self.pending_advertisements.drain() {
yield proto::CommitLsnAdvertisement {tenant_timeline_id: Some(tenant_timeline_id), commit_lsn: Some(commit_lsn) };
}
} };
let client: PageserverClient<_> = PageserverClient::connect(todo!())
.await
.context("connect")?;
let publish_stream = client
.publish_commit_lsn_advertisements(stream)
.await
.context("publish stream")?;
}
}
struct GlobalState {}
pub mod advmap {
use std::sync::Arc;