diff --git a/safekeeper/src/wal_advertiser/advmap.rs b/safekeeper/src/wal_advertiser/advmap.rs index 2022f2666f..c29eef6ee4 100644 --- a/safekeeper/src/wal_advertiser/advmap.rs +++ b/safekeeper/src/wal_advertiser/advmap.rs @@ -9,13 +9,14 @@ use std::{ use anyhow::Context; use safekeeper_api::models::TenantShardPageserverAttachment; use serde::Serialize; +use storage_broker::wal_advertisement::RemoteConsistentLsnAdvertisement; use tokio::sync::{Notify, mpsc}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use utils::{ generation::Generation, id::{NodeId, TenantId, TenantTimelineId, TimelineId}, - lsn::Lsn, + lsn::{AtomicLsn, Lsn}, shard::{ShardIndex, TenantShardId}, sync::gate::GateGuard, }; @@ -45,19 +46,20 @@ impl Pageservers { match self.pageservers.entry(ps_id) { hash_map::Entry::Occupied(occupied_entry) => occupied_entry.get().clone(), hash_map::Entry::Vacant(vacant_entry) => { - let notify = Arc::new(Notify::new()); + let notify_pending_advertisements = Arc::new(Notify::new()); let cancel = CancellationToken::new(); let state = Default::default(); tokio::spawn( PageserverTask { ps_id, state: state.clone(), - notify: Arc::clone(¬ify), + notify_pending_advertisements: Arc::clone(¬ify_pending_advertisements), cancel: cancel.clone(), + pending_advertisements: HashMap::default(), } .run(), ); - PageserverHandle { state, cancel } + PageserverHandle { state, cancel, notify_pending_advertisements } } } } @@ -65,8 +67,8 @@ impl Pageservers { struct PageserverTask { ps_id: NodeId, + notify_pending_advertisements: Arc, state: Arc>, - notify: Arc, cancel: CancellationToken, pending_advertisements: HashMap, } @@ -93,6 +95,7 @@ struct RemoteConsistentLsnAdv { #[derive(Clone)] struct PageserverHandle { state: Arc>, + notify_pending_advertisements: Arc, cancel: CancellationToken, } @@ -113,7 +116,7 @@ struct SafekeeperTimeline { tli: Arc, /// Shared with [`SafekeeperTenant::tenant_shard_attachments`]. tenant_shard_attachments: tokio::sync::watch::Receiver, - remote_consistent_lsns: Mutex>, + remote_consistent_lsns: RwLock>, } #[derive(Default)] @@ -203,9 +206,16 @@ impl World { let sk_tenant: &mut SafekeeperTenant = inner.tenants.entry(tenant_id).or_insert(todo!()); let mut shard_attachments = sk_tenant.tenant_shard_attachments.write().unwrap(); use safekeeper_api::models::TenantShardPageserverAttachmentChange::*; - let change = match arg { + match arg { Attach(a) => shard_attachments.add(a.into()), - Detach(a) => shard_attachments.remove(a.into()), + Detach(a) => { + let a: ShardAttachmentId = a.into(); + shard_attachments.remove(a); + for timeline in sk_tenant.timelines.values() { + let remote_consistent_lsns = timeline.remote_consistent_lsns.write().unwrap(); + remote_consistent_lsns.remove(a) + } + } }; Ok(()) } @@ -226,28 +236,40 @@ impl World { warn!(?adv, "safekeeper timeline is not know"); return; }; - if cfg!(feature = "testing") { - let commit_lsn = *timeline.tli.get_commit_lsn_watch_rx().borrow(); - if !(*remote_consistent_lsn <= commit_lsn) { - warn!( - ?adv, - "advertised remote_consistent_lsn is ahead of commit_lsn" - ); - return; - } - } - let current = match timeline.remote_consistent_lsns.entry(shard_attachment) { - hash_map::Entry::Occupied(mut occupied_entry) => occupied_entry.get_mut(), - hash_map::Entry::Vacant(vacant_entry) => { - info!(?adv, "first time learning about timeline shard attachment"); - vacant_entry.insert(remote_consistent_lsn) + // in most cases, we already have a record of remote_consistent_lsn and can use shared access + let remote_consistent_lsns = loop { + let read_guard = timeline.remote_consistent_lsns.read().uwnrap(); + match read_guard.get(shard_attachment) { + Some(existent) => break existent, // likely case + None => { + drop(read_guard); + let write_guard = timeline.remote_consistent_lsns.write().unwrap(); + write_guard.entry(*shard_attachment).or_insert(AtomicLsn::new(remote_consistent_lsn.0)); + drop(write_guard); + // we still hold the tenant in write mode, hence + } } }; - if !(*current <= remote_consistent_lsn) { + + read_guard.entry(shard_attachment) { + hash_map::Entry::Occupied(mut occupied_entry) => occupied_entry.get(), + hash_map::Entry::Vacant(vacant_entry) => { + info!(?adv, "first time learning about timeline shard attachment"); + vacant_entry.insert(AtomicLsn::new(remote_consistent_lsn.0)) + } + }; + let current = match timeline.remote_consistent_lsns.read().unwrap().entry(shard_attachment) { + hash_map::Entry::Occupied(mut occupied_entry) => occupied_entry.get(), + hash_map::Entry::Vacant(vacant_entry) => { + info!(?adv, "first time learning about timeline shard attachment"); + vacant_entry.insert(AtomicLsn::new(remote_consistent_lsn.0)) + } + }; + if !(current.load() <= remote_consistent_lsn) { warn!(current=%*current, ?adv, "advertised remote_consistent_lsn is lower than earlier advertisements, either delayed message or shard is behaving inconsistently"); return; } - *current = remote_consistent_lsn; + current.fetch_max(*remote_consistent_lsn); } } @@ -283,6 +305,9 @@ impl SafekeeperTimeline { }; let commit_lsn = *commit_lsn_rx.borrow_and_update(); // The rhs deref minimizes time we lock the commit_lsn_tx for pageserver_handle in tenant_shard_attachments { + for (_, rclsn) in self.remote_consistent_lsns.read().unwrap() { + + } // NB: if this function ever becomes slow / may need an .await, make sure // that other nodes continue to recieve advertisements. pageserver_handle.advertise_commit_lsn(CommitLsnAdv { ttid, commit_lsn }); @@ -297,6 +322,7 @@ impl PageserverHandle { let CommitLsnAdv { ttid, commit_lsn } = adv; let mut state = self.state.write().unwrap(); state.pending_advertisements.insert(ttid, commit_lsn); + self.notify_pending_advertisements.notify_waiters(); } } @@ -322,13 +348,14 @@ impl PageserverTask { } async fn run0(&mut self) -> anyhow::Result<()> { use storage_broker::wal_advertisement::pageserver_client::PageserverClient; + use storage_broker::wal_advertisement as proto; let stream = async_stream::stream! { loop { while self.pending_advertisements.is_empty() { tokio::select! { _ = self.cancel.cancelled() => { return; } - _ = self.notify.notified() => {} + _ = self.notify_pending_advertisements.notified() => {} } let mut state = self.state.lock().unwrap(); std::mem::swap( @@ -336,6 +363,9 @@ impl PageserverTask { &mut self.pending_advertisements, ); } + for (tenant_timeline_id, commit_lsn) in self.pending_advertisements.drain() { + yield proto::CommitLsnAdvertisement {tenant_timeline_id: Some(tenant_timeline_id), commit_lsn: Some(commit_lsn) }; + } } }; let client: PageserverClient<_> = PageserverClient::connect(todo!()) .await diff --git a/storage_broker/proto/wal_advertisement.proto b/storage_broker/proto/wal_advertisement.proto index 6b0e035b57..1528dbcbd4 100644 --- a/storage_broker/proto/wal_advertisement.proto +++ b/storage_broker/proto/wal_advertisement.proto @@ -25,5 +25,6 @@ message RemoteConsistentLsnAdvertisement { message TenantTimelineId { bytes tenant_id = 1; bytes timeline_id = 2; + uint64 commit_lsn = 3; }