rip out the old wal_advertiser::advmap impl, stubbing out with todo!()s

This commit is contained in:
Christian Schwarz
2025-06-06 17:40:59 -07:00
parent 04fb256e0f
commit 3dffbda428
10 changed files with 34 additions and 419 deletions

View File

@@ -25,7 +25,7 @@ use safekeeper::defaults::{
use safekeeper::wal_backup::WalBackup;
use safekeeper::{
BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_advertiser, wal_service,
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_service,
};
use sd_notify::NotifyState;
use storage_broker::{DEFAULT_ENDPOINT, Uri};

View File

@@ -18,7 +18,6 @@ use safekeeper_api::models::ConnectionId;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{Instrument, debug, info, info_span};
use utils::auth::{Claims, JwtAuth, Scope};
use utils::generation::{self, Generation};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::postgres_client::PostgresClientProtocol;

View File

@@ -5,7 +5,7 @@ pub use routes::make_router;
pub use safekeeper_api::models;
use tokio_util::sync::CancellationToken;
use crate::{GlobalTimelines, SafeKeeperConf, wal_advertiser};
use crate::{GlobalTimelines, SafeKeeperConf};
pub async fn task_main_http(
conf: Arc<SafeKeeperConf>,

View File

@@ -31,14 +31,12 @@ use tracing::{Instrument, info_span};
use utils::auth::SwappableJwtAuth;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
use crate::debug_dump::TimelineDigestRequest;
use crate::safekeeper::TermLsn;
use crate::timelines_global_map::DeleteOrExclude;
use crate::{
GlobalTimelines, SafeKeeperConf, copy_timeline, debug_dump, patch_control_file, pull_timeline,
wal_advertiser,
};
/// Healthcheck handler.

View File

@@ -37,7 +37,6 @@ use crate::send_interpreted_wal::{
Batch, InterpretedWalReader, InterpretedWalReaderHandle, InterpretedWalSender,
};
use crate::timeline::WalResidentTimeline;
use crate::wal_advertiser;
use crate::wal_reader_stream::StreamingWalReader;
use crate::wal_storage::WalReader;

View File

@@ -24,7 +24,6 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::{NodeId, TenantId, TenantTimelineId};
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
use utils::sync::gate::Gate;
use crate::metrics::{FullTimelineInfo, MISC_OPERATION_SECONDS, WalStorageMetrics};

View File

@@ -2,7 +2,6 @@
//! All timelines should always be present in this map, this is done by loading them
//! all from the disk on startup and keeping them in memory.
use std::any;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
@@ -19,12 +18,10 @@ use tracing::*;
use utils::crashsafe::{durable_rename, fsync_async_opt};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
use crate::http::routes::DeleteOrExcludeError;
use crate::rate_limit::RateLimiter;
use crate::receive_wal::WalAcceptor;
use crate::state::TimelinePersistentState;
use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir};
use crate::timelines_set::TimelinesSet;

View File

@@ -1,20 +1,36 @@
//! Advertise pending WAL to all pageservers that might be interested in it.
pub mod advmap {
use std::sync::Arc;
use std::{collections::HashSet, sync::Arc, time::Duration};
use utils::id::TenantId;
use crate::{GlobalTimelines, SafeKeeperConf};
use crate::timeline::Timeline;
pub(crate) mod advmap;
pub struct World {}
pub struct SafekeeperTimelineHandle {}
pub(crate) async fn wal_advertiser_loop(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
todo!();
node_loop().await;
Ok(())
}
pub(crate) async fn node_loop() {
loop {}
impl World {
pub fn update_pageserver_attachments(
&self,
tenant_id: TenantId,
update: safekeeper_api::models::TenantShardPageserverAttachmentChange,
) -> anyhow::Result<()> {
todo!()
}
pub fn register_timeline(
&self,
tli: Arc<Timeline>,
) -> anyhow::Result<SafekeeperTimelineHandle> {
todo!()
}
}
impl SafekeeperTimelineHandle {
pub fn ready_for_eviction(&self) -> bool {
todo!()
}
}
impl Default for World {
fn default() -> Self {
todo!()
}
}
}

View File

@@ -1,393 +0,0 @@
//! The data structure that track advertisement state.
use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet, btree_map, hash_map},
sync::{Arc, Mutex, RwLock},
time::Duration,
};
use anyhow::Context;
use safekeeper_api::models::TenantShardPageserverAttachment;
use serde::Serialize;
use storage_broker::wal_advertisement::RemoteConsistentLsnAdvertisement;
use tokio::sync::{Notify, mpsc};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use utils::{
generation::Generation,
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
lsn::{AtomicLsn, Lsn},
shard::{ShardIndex, TenantShardId},
sync::gate::GateGuard,
};
use crate::timeline::Timeline;
#[derive(Default)]
pub struct World {
cancel: CancellationToken,
inner: RwLock<WorldInner>,
pageservers: Pageservers,
}
#[derive(Default)]
struct WorldInner {
tenants: HashMap<TenantId, SafekeeperTenant>,
}
#[derive(Default)]
struct Pageservers {
cancel: CancellationToken,
pageservers: HashMap<NodeId, PageserverHandle>,
}
impl Pageservers {
fn get_handle(&mut self, ps_id: NodeId) -> PageserverHandle {
match self.pageservers.entry(ps_id) {
hash_map::Entry::Occupied(occupied_entry) => occupied_entry.get().clone(),
hash_map::Entry::Vacant(vacant_entry) => {
let notify_pending_advertisements = Arc::new(Notify::new());
let cancel = CancellationToken::new();
let state = Default::default();
tokio::spawn(
PageserverTask {
ps_id,
state: state.clone(),
notify_pending_advertisements: Arc::clone(&notify_pending_advertisements),
cancel: cancel.clone(),
pending_advertisements: HashMap::default(),
}
.run(),
);
PageserverHandle { state, cancel, notify_pending_advertisements }
}
}
}
}
struct PageserverTask {
ps_id: NodeId,
notify_pending_advertisements: Arc<tokio::sync::Notify>,
state: Arc<Mutex<PageserverSharedState>>,
cancel: CancellationToken,
pending_advertisements: HashMap<TenantTimelineId, Lsn>,
}
#[derive(Default)]
struct PageserverSharedState {
pending_advertisements: HashMap<TenantTimelineId, Lsn>,
endpoint: tonic::endpoint::Endpoint,
}
struct CommitLsnAdv {
ttid: TenantTimelineId,
commit_lsn: Lsn,
}
#[derive(Debug)]
struct RemoteConsistentLsnAdv {
tenant_id: TenantId,
shard_attachment: ShardAttachmentId,
timeline_id: TimelineId,
remote_consistent_lsn: Lsn,
}
#[derive(Clone)]
struct PageserverHandle {
state: Arc<RwLock<PageserverSharedState>>,
notify_pending_advertisements: Arc<Notify>,
cancel: CancellationToken,
}
impl std::ops::Drop for PageserverHandle {
fn drop(&mut self) {
self.cancel.cancel();
}
}
#[derive(Default)]
struct SafekeeperTenant {
/// Shared with [`SafekeeperTimeline::tenant_shard_attachments`].
tenant_shard_attachments: tokio::sync::watch::Sender<TenantShardAttachments>,
timelines: HashMap<TimelineId, Arc<SafekeeperTimeline>>,
}
struct SafekeeperTimeline {
tli: Arc<Timeline>,
/// Shared with [`SafekeeperTenant::tenant_shard_attachments`].
tenant_shard_attachments: tokio::sync::watch::Receiver<TenantShardAttachments>,
remote_consistent_lsns: RwLock<HashMap<ShardAttachmentId, AtomicLsn>>,
}
#[derive(Default)]
struct TenantShardAttachments {
pageservers: Pageservers,
precise: HashSet<ShardAttachmentId>,
/// projection from `precise`
nodes: HashMap<NodeId, (usize, PageserverHandle)>, // usize is a refcount from precise
}
impl TenantShardAttachments {
pub fn add(&mut self, a: ShardAttachmentId) {
let (refcount, _) = self
.nodes
.entry(a.ps_id)
.or_insert_with(|| (0, self.pageservers.get_handle(a.ps_id)));
*refcount += 1;
}
pub fn remove(&mut self, a: ShardAttachmentId) {
let removed = self.precise.remove(&a);
if !removed {
return;
}
let mut entry = match self.nodes.entry(a.ps_id) {
hash_map::Entry::Vacant(vacant_entry) => unreachable!("was referenced by precise"),
hash_map::Entry::Occupied(occupied_entry) => occupied_entry,
};
let (refcount, _) = entry.get_mut();
*refcount = refcount
.checked_sub(1)
.expect("was referenced by precise, we add refcount in add()");
if *refcount == 0 {
entry.remove();
}
}
pub fn nodes(&self) -> impl Iterator<Item = &PageserverHandle> {
self.nodes.values().map(|(_, h)| h)
}
}
#[derive(Debug, Hash, PartialEq, Eq)]
struct ShardAttachmentId {
shard_id: ShardIndex,
generation: Generation,
ps_id: NodeId,
}
pub struct SafekeeperTimelineHandle {}
impl SafekeeperTimelineHandle {
pub fn ready_for_eviction(&self) -> bool {
todo!()
}
}
impl World {
pub fn register_timeline(
&self,
tli: Arc<Timeline>,
) -> anyhow::Result<SafekeeperTimelineHandle> {
let ttid = tli.ttid;
let mut inner = self.inner.write().unwrap();
let sk_tenant: &mut SafekeeperTenant = inner.tenants.entry(ttid.tenant_id).or_default();
let vacant = match sk_tenant.timelines.entry(ttid.timeline_id) {
hash_map::Entry::Occupied(occupied_entry) => {
anyhow::bail!("entry for timeline already exists");
}
hash_map::Entry::Vacant(vacant) => vacant,
};
tokio::spawn(async move {
SafekeeperTimeline {
tli,
tenant_shard_attachments: sk_tenant.tenant_shard_attachments.subscribe(),
remote_consistent_lsns: HashMap::default(), // TODO: fill from persistence inside .run()?
}
.run()
.await;
});
Ok(vacant.insert(SafekeeperTimelineHandle {}).clone())
}
pub fn update_pageserver_attachments(
&self,
tenant_id: TenantId,
arg: safekeeper_api::models::TenantShardPageserverAttachmentChange,
) -> anyhow::Result<()> {
let mut inner = self.inner.write().unwrap();
let sk_tenant: &mut SafekeeperTenant = inner.tenants.entry(tenant_id).or_insert(todo!());
let mut shard_attachments = sk_tenant.tenant_shard_attachments.write().unwrap();
use safekeeper_api::models::TenantShardPageserverAttachmentChange::*;
match arg {
Attach(a) => shard_attachments.add(a.into()),
Detach(a) => {
let a: ShardAttachmentId = a.into();
shard_attachments.remove(a);
for timeline in sk_tenant.timelines.values() {
let remote_consistent_lsns = timeline.remote_consistent_lsns.write().unwrap();
remote_consistent_lsns.remove(a)
}
}
};
Ok(())
}
pub fn handle_remote_consistent_lsn_advertisement(&mut self, adv: RemoteConsistentLsnAdv) {
debug!(?adv, "processing advertisement");
let RemoteConsistentLsnAdv {
tenant_id,
shard_attachment,
timeline_id,
remote_consistent_lsn,
} = &adv;
let mut inner = self.inner.write().unwrap();
let Some(sk_tenant) = inner.tenants.get(&tenant_id) else {
warn!(?adv, "tenant shard attachment is not known");
return;
};
let Some(timeline) = sk_tenant.timelines.get_mut(&timeline_id) else {
warn!(?adv, "safekeeper timeline is not know");
return;
};
// in most cases, we already have a record of remote_consistent_lsn and can use shared access
let remote_consistent_lsns = loop {
let read_guard = timeline.remote_consistent_lsns.read().uwnrap();
match read_guard.get(shard_attachment) {
Some(existent) => break existent, // likely case
None => {
drop(read_guard);
let write_guard = timeline.remote_consistent_lsns.write().unwrap();
write_guard.entry(*shard_attachment).or_insert(AtomicLsn::new(remote_consistent_lsn.0));
drop(write_guard);
// we still hold the tenant in write mode, hence
}
}
};
read_guard.entry(shard_attachment) {
hash_map::Entry::Occupied(mut occupied_entry) => occupied_entry.get(),
hash_map::Entry::Vacant(vacant_entry) => {
info!(?adv, "first time learning about timeline shard attachment");
vacant_entry.insert(AtomicLsn::new(remote_consistent_lsn.0))
}
};
let current = match timeline.remote_consistent_lsns.read().unwrap().entry(shard_attachment) {
hash_map::Entry::Occupied(mut occupied_entry) => occupied_entry.get(),
hash_map::Entry::Vacant(vacant_entry) => {
info!(?adv, "first time learning about timeline shard attachment");
vacant_entry.insert(AtomicLsn::new(remote_consistent_lsn.0))
}
};
if !(current.load() <= remote_consistent_lsn) {
warn!(current=%*current, ?adv, "advertised remote_consistent_lsn is lower than earlier advertisements, either delayed message or shard is behaving inconsistently");
return;
}
current.fetch_max(*remote_consistent_lsn);
}
}
impl SafekeeperTimeline {
async fn run(mut self) {
let Ok(gate_guard) = self.tli.gate.enter() else {
return;
};
let cancel = self.tli.cancel.child_token();
let ttid = self.tli.ttid;
let mut commit_lsn_rx = self.tli.get_commit_lsn_watch_rx();
// arm for first iteration
commit_lsn_rx.mark_changed();
self.tenant_shard_attachments.mark_changed();
let mut tenant_shard_attachments: Vec<PageserverHandle> = Vec::new();
loop {
tokio::select! {
_ = cancel.cancelled() => { return; }
_ = async {
// at most one advertisement per second
tokio::time::sleep(Duration::from_secs(1)).await;
commit_lsn_rx.changed().await
} => { }
_ = self.tenant_shard_attachments.changed() => {
tenant_shard_attachments.clear();
tenant_shard_attachments.extend(self.tenant_shard_attachments.borrow_and_update().nodes().cloned().collect());
tenant_shard_attachments.shrink_to_fit();
}
};
let commit_lsn = *commit_lsn_rx.borrow_and_update(); // The rhs deref minimizes time we lock the commit_lsn_tx
for pageserver_handle in tenant_shard_attachments {
for (_, rclsn) in self.remote_consistent_lsns.read().unwrap() {
}
// NB: if this function ever becomes slow / may need an .await, make sure
// that other nodes continue to recieve advertisements.
pageserver_handle.advertise_commit_lsn(CommitLsnAdv { ttid, commit_lsn });
}
}
drop(gate_guard);
}
}
impl PageserverHandle {
pub fn advertise_commit_lsn(&self, adv: CommitLsnAdv) {
let CommitLsnAdv { ttid, commit_lsn } = adv;
let mut state = self.state.write().unwrap();
state.pending_advertisements.insert(ttid, commit_lsn);
self.notify_pending_advertisements.notify_waiters();
}
}
impl PageserverTask {
/// Cancellation: happens through last PageserverHandle being dropped.
async fn run(mut self) {
let mut pending_advertisements = HashMap::new();
loop {
tokio::select! {
_ = self.cancel.cancelled() => {
return;
}
res = self.run0() => {
if let Err(err) = res {
error!(?err, "pageserver loop failed");
// TODO: backoff? + cancellation sensitivity
tokio::time::sleep(Duration::from_secs(10)).await;
}
continue;
}
};
}
}
async fn run0(&mut self) -> anyhow::Result<()> {
use storage_broker::wal_advertisement::pageserver_client::PageserverClient;
use storage_broker::wal_advertisement as proto;
let stream = async_stream::stream! { loop {
while self.pending_advertisements.is_empty() {
tokio::select! {
_ = self.cancel.cancelled() => {
return;
}
_ = self.notify_pending_advertisements.notified() => {}
}
let mut state = self.state.lock().unwrap();
std::mem::swap(
&mut state.pending_advertisements,
&mut self.pending_advertisements,
);
}
for (tenant_timeline_id, commit_lsn) in self.pending_advertisements.drain() {
yield proto::CommitLsnAdvertisement {tenant_timeline_id: Some(tenant_timeline_id), commit_lsn: Some(commit_lsn) };
}
} };
let client: PageserverClient<_> = PageserverClient::connect(todo!())
.await
.context("connect")?;
let publish_stream = client
.publish_commit_lsn_advertisements(stream)
.await
.context("publish stream")?;
}
}
impl From<safekeeper_api::models::TenantShardPageserverAttachment> for ShardAttachmentId {
fn from(value: safekeeper_api::models::TenantShardPageserverAttachment) {
let safekeeper_api::models::TenantShardPageserverAttachment {
shard_id,
generation,
ps_id,
} = value;
ShardAttachmentId {
shard_id,
generation,
ps_id,
}
}
}

View File

@@ -18,7 +18,7 @@ use utils::measured_stream::MeasuredStream;
use crate::handler::SafekeeperPostgresHandler;
use crate::metrics::TrafficMetrics;
use crate::{GlobalTimelines, SafeKeeperConf, wal_advertiser};
use crate::{GlobalTimelines, SafeKeeperConf};
/// Accept incoming TCP connections and spawn them into a background thread.
///