This commit is contained in:
Christian Schwarz
2025-06-12 06:41:07 -07:00
parent 906a963351
commit a80790e0c4
13 changed files with 301 additions and 87 deletions

1
Cargo.lock generated
View File

@@ -6067,6 +6067,7 @@ dependencies = [
"sk_ps_discovery",
"smallvec",
"storage_broker",
"storage_controller_client",
"strum",
"strum_macros",
"thiserror 1.0.69",

View File

@@ -333,4 +333,7 @@ pub struct TenantShardPageserverAttachment {
pub shard_id: ShardIndex,
pub generation: Generation,
pub ps_id: NodeId,
// TODO: avoid transmitting this with every request.
// How nice things could be if there were simple DNS records for ps-$node_id.$cell.$region.$cloud.neon.tech
pub ps_hostname: String, // TODO: some type safety
}

View File

@@ -5,12 +5,12 @@ use tokio_util::sync::CancellationToken;
use crate::sync::spsc_fold;
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
let (mut tx, rx) = spsc_fold::channel();
let (tx, rx) = spsc_fold::channel();
let cancel = CancellationToken::new();
(
Sender {
tx,
cancel: cancel.clone().drop_guard(),
_cancel: cancel.clone().drop_guard(),
},
Receiver { rx, cancel },
)
@@ -18,7 +18,7 @@ pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
pub struct Sender<T> {
tx: spsc_fold::Sender<T>,
cancel: tokio_util::sync::DropGuard,
_cancel: tokio_util::sync::DropGuard,
}
pub struct Receiver<T> {

View File

@@ -67,6 +67,7 @@ sk_ps_discovery.workspace = true
sha2.workspace = true
sd-notify.workspace = true
storage_broker.workspace = true
storage_controller_client.workspace = true
tokio-stream.workspace = true
http-utils.workspace = true
utils.workspace = true

View File

@@ -25,11 +25,10 @@ use safekeeper::defaults::{
use safekeeper::wal_backup::WalBackup;
use safekeeper::{
BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
WAL_ADVERTISER_RUNTIME, WAL_SERVICE_RUNTIME, broker, control_file, http, wal_advertiser,
wal_service,
WAL_ADVERTISER_RUNTIME, WAL_SERVICE_RUNTIME, broker, control_file, http, wal_service,
};
use sd_notify::NotifyState;
use storage_broker::{DEFAULT_ENDPOINT, Uri, wal_advertisement};
use storage_broker::{DEFAULT_ENDPOINT, Uri};
use tokio::runtime::Handle;
use tokio::signal::unix::{SignalKind, signal};
use tokio::task::JoinError;
@@ -627,11 +626,25 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
.map(|res| ("broker main".to_owned(), res));
tasks_handles.push(Box::pin(broker_task_handle));
let ps_connectivity_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| HTTP_RUNTIME.handle())
.spawn(
global_timelines
.get_pageserver_connectivity()
.task_main()
.instrument(info_span!("pageserver_connectivity")),
)
.map(|res| ("pageserver connectivity".to_owned(), res));
tasks_handles.push(Box::pin(ps_connectivity_handle));
let wal_advertiser_task_handle = current_thread_rt
.as_ref()
.unwrap_or_else(WAL_ADVERTISER_RUNTIME.handle())
.unwrap_or_else(|| WAL_ADVERTISER_RUNTIME.handle())
.spawn(
wal_advertiser::task_main(conf.clone(), global_timelines.clone())
global_timelines
.get_wal_advertiser()
.task_main()
.instrument(info_span!("wal_advertiser_main")),
)
.map(|res| ("wal advertiser task handle".to_owned(), res));

View File

@@ -44,6 +44,7 @@ pub mod wal_backup_partial;
pub mod wal_reader_stream;
pub mod wal_service;
pub mod wal_storage;
pub(crate) mod pageserver_connectivity;
#[cfg(any(test, feature = "benchmarking"))]
pub mod test_utils;
@@ -124,6 +125,7 @@ pub struct SafeKeeperConf {
pub ssl_ca_certs: Vec<Pem>,
pub use_https_safekeeper_api: bool,
pub enable_tls_wal_service_api: bool,
pub storage_controller_api: Option<Uri>,
}
impl SafeKeeperConf {
@@ -169,6 +171,7 @@ impl SafeKeeperConf {
ssl_ca_certs: Vec::new(),
use_https_safekeeper_api: false,
enable_tls_wal_service_api: false,
storage_controller_api: None,
}
}
}

View File

@@ -0,0 +1,117 @@
use desim::world::Node;
use hyper::Uri;
use pageserver_api::controller_api;
use utils::id::TenantId;
use crate::timeline::Timeline;
use std::{
collections::{HashMap, hash_map},
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use anyhow::Context;
use tracing::{Instrument, error, info, info_span, warn};
use utils::{
id::{NodeId, TenantTimelineId},
lsn::Lsn,
sync::{spsc_fold, spsc_watch},
};
use crate::{GlobalTimelines, SafeKeeperConf};
type Advs = HashMap<TenantTimelineId, Lsn>;
#[derive(Default)]
pub struct GlobalState {
inner: once_cell::sync::OnceCell<tokio::sync::mpsc::Sender<Message>>,
}
enum Message {
Resolve {
ps_id: NodeId,
reply: tokio::sync::oneshot::Sender<tokio::sync::watch::Receiver<hyper::Uri>>,
},
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("cancelled")]
Cancelled,
}
impl GlobalState {
pub fn task_main(&self) -> impl 'static + Future<Output = anyhow::Result<()>> + Send {
let mut ret = None;
self.inner.get_or_init(|| {
let (tx, task_fut) = MainTask::prepare_run();
ret = Some(task_fut);
tx
});
ret.expect("must only call this method once")
}
}
struct MainTask {
rx: tokio::sync::mpsc::Receiver<Message>,
}
impl MainTask {
fn prepare_run() -> (
tokio::sync::mpsc::Sender<Message>,
impl Future<Output = anyhow::Result<()>> + Send,
) {
let (tx, rx) = tokio::sync::mpsc::channel(100 /* TODO think */);
let task = MainTask { rx };
(tx, task.task())
}
async fn task(mut self) -> anyhow::Result<()> {
// TODO: persistence
let storcon_client = todo!();
let mut resolution: HashMap<NodeId, tokio::sync::watch::Sender<hyper::Uri>> =
HashMap::new();
while let Some(rx) = self.rx.recv().await {
match rx {
Message::Resolve { ps_id, reply } => match resolution.entry(ps_id) {
hash_map::Entry::Occupied(e) => {}
hash_map::Entry::Vacant(e) => {
tokio::spawn(
ResolutionTask { ps_id, storcon_client }.run()
)
},
},
}
}
}
}
struct ResolutionTask {
ps_id: NodeId,
storcon_client: storage_controller_client::control_api::Client,
}
impl ResolutionTask {
pub async fn run(self) -> Result<Uri, Error> {
loop {
// XXX: well-defined upcall API?
let res = self
.storcon_client
.dispatch(
reqwest::Method::GET,
format!("control/v1/node/{}", self.node_id),
None,
)
.await;
let node: NodeDescribeResponse = match res {
Ok(res) => res,
Err(err) => {
warn!("storcon upcall failed")
}
};
}
}
}

View File

@@ -433,7 +433,7 @@ impl Manager {
state_version_rx: tli.get_state_version_rx(),
num_computes_rx: tli.get_walreceivers().get_num_rx(),
tli_broker_active: broker_active_set.guard(tli.clone()),
wal_advertiser: wal_advertiser.register_timeline(tli.clone()).unwrap(),
wal_advertiser: wal_advertiser.new_timeline(tli.clone()).await.unwrap(),
last_removed_segno: 0,
is_offloaded,
backup_task: None,

View File

@@ -27,7 +27,7 @@ use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_t
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::WalBackup;
use crate::wal_storage::Storage;
use crate::{SafeKeeperConf, control_file, wal_advertiser, wal_storage};
use crate::{SafeKeeperConf, control_file, pageserver_connectivity, wal_advertiser, wal_storage};
// Timeline entry in the global map: either a ready timeline, or mark that it is
// being created.
@@ -48,6 +48,7 @@ struct GlobalTimelinesState {
conf: Arc<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
wal_advertisement: Arc<wal_advertiser::GlobalState>,
pageserver_connectivity: Arc<pageserver_connectivity::GlobalState>,
global_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
}
@@ -105,6 +106,7 @@ impl GlobalTimelines {
conf,
broker_active_set: Arc::new(TimelinesSet::default()),
wal_advertisement: Arc::new(wal_advertiser::GlobalState::default()),
pageserver_connectivity: Arc::new(pageserver_connectivity::GlobalState::default()),
global_rate_limiter: RateLimiter::new(1, 1),
wal_backup,
}),
@@ -601,6 +603,10 @@ impl GlobalTimelines {
self.state.lock().unwrap().wal_advertisement.clone()
}
pub fn get_pageserver_connectivity(&self) -> Arc<pageserver_connectivity::GlobalState> {
self.state.lock().unwrap().pageserver_connectivity.clone()
}
pub fn housekeeping(&self, tombstone_ttl: &Duration) {
let mut state = self.state.lock().unwrap();

View File

@@ -1,7 +1,18 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
mod persistence;
mod pageserver_connectivity;
use utils::id::TenantId;
use crate::timeline::Timeline;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use anyhow::Context;
use tracing::{Instrument, error, info, info_span};
use tracing::{Instrument, error, info, info_span, warn};
use utils::{
id::{NodeId, TenantTimelineId},
lsn::Lsn,
@@ -12,44 +23,139 @@ use crate::{GlobalTimelines, SafeKeeperConf};
type Advs = HashMap<TenantTimelineId, Lsn>;
pub async fn task_main(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
let mut world = sk_ps_discovery::World::default();
#[derive(Default)]
pub struct GlobalState {
inner: once_cell::sync::OnceCell<tokio::sync::mpsc::Sender<Message>>,
}
let mut senders: HashMap<utils::id::NodeId, spsc_watch::Sender<Advs>> = HashMap::new();
let mut endpoints: HashMap<utils::id::NodeId, tonic::transport::Endpoint> = HashMap::new();
loop {
let advertisements = world.get_commit_lsn_advertisements();
for (node_id, mut advs) in advertisements {
'inner: loop {
let tx = senders.entry(node_id).or_insert_with(|| {
let (tx, rx) = spsc_watch::channel();
tokio::spawn(
PageserverTask {
ps_id: node_id,
advs: rx,
}
.run()
.instrument(info_span!("wal_advertiser", ps_id=%node_id)),
);
tx
});
if let Err((failed, err)) = tx.send_replace(advs) {
senders.remove(&node_id);
advs = failed;
} else {
break 'inner;
pub struct SafekeeperTimelineHandle {
tx: tokio::sync::mpsc::Sender<Message>,
}
enum Message {
NewTimeline {
reply: tokio::sync::oneshot::Sender<Result<(), Error>>,
},
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("cancelled")]
Cancelled,
}
impl GlobalState {
pub fn task_main(&self) -> impl 'static + Future<Output = anyhow::Result<()>> + Send {
let mut ret = None;
self.inner.get_or_init(|| {
let (tx, task_fut) = MainTask::prepare_run();
ret = Some(task_fut);
tx
});
ret.expect("must only call this method once")
}
pub async fn new_timeline(
&self,
tli: Arc<Timeline>,
) -> Result<SafekeeperTimelineHandle, Error> {
let tx = self.inner.get().unwrap().clone();
let handle = SafekeeperTimelineHandle { tx };
let (reply, rx) = tokio::sync::oneshot::channel();
let Ok(()) = handle.tx.send(Message::NewTimeline { reply }).await else {
return Err(Error::Cancelled);
};
let Ok(res) = rx.await else {
return Err(Error::Cancelled);
};
Ok(handle)
}
pub fn update_pageserver_attachments(
&self,
tenant_id: TenantId,
update: safekeeper_api::models::TenantShardPageserverAttachmentChange,
) -> anyhow::Result<()> {
todo!()
}
}
impl SafekeeperTimelineHandle {
pub fn ready_for_eviction(&self) -> bool {
todo!()
}
}
struct MainTask {
rx: tokio::sync::mpsc::Receiver<Message>,
world: sk_ps_discovery::World,
senders: HashMap<utils::id::NodeId, spsc_watch::Sender<Advs>>,
}
impl MainTask {
fn prepare_run() -> (
tokio::sync::mpsc::Sender<Message>,
impl Future<Output = anyhow::Result<()>> + Send,
) {
let (tx, rx) = tokio::sync::mpsc::channel(100 /* TODO think */);
let task = MainTask {
rx,
world: sk_ps_discovery::World::default(),
senders: Default::default(),
};
(tx, task.task())
}
async fn task(mut self) -> anyhow::Result<()> {
let mut adv_frequency = tokio::time::interval(Duration::from_secs(1));
loop {
tokio::select! {
_ = adv_frequency.tick() => {
let start = Instant::now();
self.advertisements_iteration();
let elapsed = start.elapsed();
if elapsed > Duration::from_millis(10) {
warn!(?elapsed, "advertisements iteration is slow");
}
},
message = self.rx.recv() => {
match message {
None => anyhow::bail!("last main task sender dropped, shouldn't happen, exiting"),
Some(_) => todo!(),
}
},
}
}
}
fn advertisements_iteration(&mut self) {
loop {
let advertisements = self.world.get_commit_lsn_advertisements();
for (node_id, mut advs) in advertisements {
'inner: loop {
let tx = self.senders.entry(node_id).or_insert_with(|| {
let (tx, rx) = spsc_watch::channel();
tokio::spawn(
PageserverTask {
ps_id: node_id,
endpoint: todo!(),
advs: rx,
}
.run()
.instrument(info_span!("wal_advertiser", ps_id=%node_id)),
);
tx
});
if let Err((failed, err)) = tx.send_replace(advs) {
self.senders.remove(&node_id);
advs = failed;
} else {
break 'inner;
}
}
}
}
}
}
struct PageserverTask {
ps_id: NodeId,
endpoint: tonic::transport::Endpoint,
advs: spsc_watch::Receiver<Advs>,
}
@@ -75,57 +181,21 @@ impl PageserverTask {
async fn run0(&mut self, advs: HashMap<TenantTimelineId, Lsn>) -> anyhow::Result<()> {
use storage_broker::wal_advertisement as proto;
use storage_broker::wal_advertisement::pageserver_client::PageserverClient;
let stream = async_stream::stream! { loop {
let stream = async_stream::stream! {
for (tenant_timeline_id, commit_lsn) in advs {
yield proto::CommitLsnAdvertisement {tenant_timeline_id: Some(proto::TenantTimelineId {
tenant_id: tenant_timeline_id.tenant_id.as_ref().to_owned(),
timeline_id: tenant_timeline_id.timeline_id.as_ref().to_owned(),
}), commit_lsn: commit_lsn.0 };
}
}};
let client: PageserverClient<_> =
PageserverClient::connect(todo!("how do we learn pageserver hostnames?"))
.await
.context("connect")?;
};
let mut client: PageserverClient<_> = PageserverClient::connect(self.endpoint.clone())
.await
.context("connect")?;
let publish_stream = client
.publish_commit_lsn_advertisements(stream)
.await
.context("publish stream")?;
}
}
#[derive(Default)]
pub struct GlobalState {}
use utils::id::TenantId;
use crate::timeline::Timeline;
pub struct World {}
pub struct SafekeeperTimelineHandle {}
impl GlobalState {
pub fn update_pageserver_attachments(
&self,
tenant_id: TenantId,
update: safekeeper_api::models::TenantShardPageserverAttachmentChange,
) -> anyhow::Result<()> {
todo!()
}
pub fn register_timeline(
&self,
tli: Arc<Timeline>,
) -> anyhow::Result<SafekeeperTimelineHandle> {
todo!()
}
}
impl SafekeeperTimelineHandle {
pub fn ready_for_eviction(&self) -> bool {
todo!()
}
}
impl Default for World {
fn default() -> Self {
todo!()
Ok(())
}
}

View File

@@ -35,14 +35,14 @@ message SafekeeperTimelineInfo {
// LSN of the last record.
uint64 flush_lsn = 4;
// Up to which LSN safekeeper regards its WAL as committed.
uint64 commit_lsn = 5;
uint64 commit_lsn = 5; // yes
// LSN up to which safekeeper has backed WAL.
uint64 backup_lsn = 6;
// LSN of last checkpoint uploaded by pageserver.
uint64 remote_consistent_lsn = 7;
uint64 peer_horizon_lsn = 8;
uint64 local_start_lsn = 9;
uint64 standby_horizon = 14;
uint64 standby_horizon = 14; // yes
// A connection string to use for WAL receiving.
string safekeeper_connstr = 10;
// HTTP endpoint connection string.

View File

@@ -232,7 +232,7 @@ impl DeliveryAttempt {
generation: Generation::new(self.work.ps_generation as u32),
};
match self.work.intent_state.as_str() {
"attached" => TenantShardPageserverAttachmentChange::Attach(val),
"attached" => TenantShardPageserverAttachmentChange::Attach { field1: val },
"detached" => TenantShardPageserverAttachmentChange::Detach(val),
x => anyhow::bail!("unknown intent state {x:?}"),
}