This commit is contained in:
Christian Schwarz
2025-06-03 19:45:19 +02:00
parent 36ba2b8e44
commit 5efb0d8072
2 changed files with 57 additions and 26 deletions

View File

@@ -9,13 +9,14 @@ use std::{
use anyhow::Context;
use safekeeper_api::models::TenantShardPageserverAttachment;
use serde::Serialize;
use storage_broker::wal_advertisement::RemoteConsistentLsnAdvertisement;
use tokio::sync::{Notify, mpsc};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use utils::{
generation::Generation,
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
lsn::{AtomicLsn, Lsn},
shard::{ShardIndex, TenantShardId},
sync::gate::GateGuard,
};
@@ -45,19 +46,20 @@ impl Pageservers {
match self.pageservers.entry(ps_id) {
hash_map::Entry::Occupied(occupied_entry) => occupied_entry.get().clone(),
hash_map::Entry::Vacant(vacant_entry) => {
let notify = Arc::new(Notify::new());
let notify_pending_advertisements = Arc::new(Notify::new());
let cancel = CancellationToken::new();
let state = Default::default();
tokio::spawn(
PageserverTask {
ps_id,
state: state.clone(),
notify: Arc::clone(&notify),
notify_pending_advertisements: Arc::clone(&notify_pending_advertisements),
cancel: cancel.clone(),
pending_advertisements: HashMap::default(),
}
.run(),
);
PageserverHandle { state, cancel }
PageserverHandle { state, cancel, notify_pending_advertisements }
}
}
}
@@ -65,8 +67,8 @@ impl Pageservers {
struct PageserverTask {
ps_id: NodeId,
notify_pending_advertisements: Arc<tokio::sync::Notify>,
state: Arc<Mutex<PageserverSharedState>>,
notify: Arc<tokio::sync::Notify>,
cancel: CancellationToken,
pending_advertisements: HashMap<TenantTimelineId, Lsn>,
}
@@ -93,6 +95,7 @@ struct RemoteConsistentLsnAdv {
#[derive(Clone)]
struct PageserverHandle {
state: Arc<RwLock<PageserverSharedState>>,
notify_pending_advertisements: Arc<Notify>,
cancel: CancellationToken,
}
@@ -113,7 +116,7 @@ struct SafekeeperTimeline {
tli: Arc<Timeline>,
/// Shared with [`SafekeeperTenant::tenant_shard_attachments`].
tenant_shard_attachments: tokio::sync::watch::Receiver<TenantShardAttachments>,
remote_consistent_lsns: Mutex<HashMap<ShardAttachmentId, Lsn>>,
remote_consistent_lsns: RwLock<HashMap<ShardAttachmentId, AtomicLsn>>,
}
#[derive(Default)]
@@ -203,9 +206,16 @@ impl World {
let sk_tenant: &mut SafekeeperTenant = inner.tenants.entry(tenant_id).or_insert(todo!());
let mut shard_attachments = sk_tenant.tenant_shard_attachments.write().unwrap();
use safekeeper_api::models::TenantShardPageserverAttachmentChange::*;
let change = match arg {
match arg {
Attach(a) => shard_attachments.add(a.into()),
Detach(a) => shard_attachments.remove(a.into()),
Detach(a) => {
let a: ShardAttachmentId = a.into();
shard_attachments.remove(a);
for timeline in sk_tenant.timelines.values() {
let remote_consistent_lsns = timeline.remote_consistent_lsns.write().unwrap();
remote_consistent_lsns.remove(a)
}
}
};
Ok(())
}
@@ -226,28 +236,40 @@ impl World {
warn!(?adv, "safekeeper timeline is not know");
return;
};
if cfg!(feature = "testing") {
let commit_lsn = *timeline.tli.get_commit_lsn_watch_rx().borrow();
if !(*remote_consistent_lsn <= commit_lsn) {
warn!(
?adv,
"advertised remote_consistent_lsn is ahead of commit_lsn"
);
return;
}
}
let current = match timeline.remote_consistent_lsns.entry(shard_attachment) {
hash_map::Entry::Occupied(mut occupied_entry) => occupied_entry.get_mut(),
hash_map::Entry::Vacant(vacant_entry) => {
info!(?adv, "first time learning about timeline shard attachment");
vacant_entry.insert(remote_consistent_lsn)
// in most cases, we already have a record of remote_consistent_lsn and can use shared access
let remote_consistent_lsns = loop {
let read_guard = timeline.remote_consistent_lsns.read().uwnrap();
match read_guard.get(shard_attachment) {
Some(existent) => break existent, // likely case
None => {
drop(read_guard);
let write_guard = timeline.remote_consistent_lsns.write().unwrap();
write_guard.entry(*shard_attachment).or_insert(AtomicLsn::new(remote_consistent_lsn.0));
drop(write_guard);
// we still hold the tenant in write mode, hence
}
}
};
if !(*current <= remote_consistent_lsn) {
read_guard.entry(shard_attachment) {
hash_map::Entry::Occupied(mut occupied_entry) => occupied_entry.get(),
hash_map::Entry::Vacant(vacant_entry) => {
info!(?adv, "first time learning about timeline shard attachment");
vacant_entry.insert(AtomicLsn::new(remote_consistent_lsn.0))
}
};
let current = match timeline.remote_consistent_lsns.read().unwrap().entry(shard_attachment) {
hash_map::Entry::Occupied(mut occupied_entry) => occupied_entry.get(),
hash_map::Entry::Vacant(vacant_entry) => {
info!(?adv, "first time learning about timeline shard attachment");
vacant_entry.insert(AtomicLsn::new(remote_consistent_lsn.0))
}
};
if !(current.load() <= remote_consistent_lsn) {
warn!(current=%*current, ?adv, "advertised remote_consistent_lsn is lower than earlier advertisements, either delayed message or shard is behaving inconsistently");
return;
}
*current = remote_consistent_lsn;
current.fetch_max(*remote_consistent_lsn);
}
}
@@ -283,6 +305,9 @@ impl SafekeeperTimeline {
};
let commit_lsn = *commit_lsn_rx.borrow_and_update(); // The rhs deref minimizes time we lock the commit_lsn_tx
for pageserver_handle in tenant_shard_attachments {
for (_, rclsn) in self.remote_consistent_lsns.read().unwrap() {
}
// NB: if this function ever becomes slow / may need an .await, make sure
// that other nodes continue to recieve advertisements.
pageserver_handle.advertise_commit_lsn(CommitLsnAdv { ttid, commit_lsn });
@@ -297,6 +322,7 @@ impl PageserverHandle {
let CommitLsnAdv { ttid, commit_lsn } = adv;
let mut state = self.state.write().unwrap();
state.pending_advertisements.insert(ttid, commit_lsn);
self.notify_pending_advertisements.notify_waiters();
}
}
@@ -322,13 +348,14 @@ impl PageserverTask {
}
async fn run0(&mut self) -> anyhow::Result<()> {
use storage_broker::wal_advertisement::pageserver_client::PageserverClient;
use storage_broker::wal_advertisement as proto;
let stream = async_stream::stream! { loop {
while self.pending_advertisements.is_empty() {
tokio::select! {
_ = self.cancel.cancelled() => {
return;
}
_ = self.notify.notified() => {}
_ = self.notify_pending_advertisements.notified() => {}
}
let mut state = self.state.lock().unwrap();
std::mem::swap(
@@ -336,6 +363,9 @@ impl PageserverTask {
&mut self.pending_advertisements,
);
}
for (tenant_timeline_id, commit_lsn) in self.pending_advertisements.drain() {
yield proto::CommitLsnAdvertisement {tenant_timeline_id: Some(tenant_timeline_id), commit_lsn: Some(commit_lsn) };
}
} };
let client: PageserverClient<_> = PageserverClient::connect(todo!())
.await

View File

@@ -25,5 +25,6 @@ message RemoteConsistentLsnAdvertisement {
message TenantTimelineId {
bytes tenant_id = 1;
bytes timeline_id = 2;
uint64 commit_lsn = 3;
}