diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 4de456ddc3..268b4a742f 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -320,7 +320,7 @@ pub enum TenantShardPageserverAttachmentChange { } impl TenantShardPageserverAttachmentChange { - pub fn attachment(&self) -> TenantShardPageserverAttachment { + pub fn attachment(&self) -> &TenantShardPageserverAttachment { match self { TenantShardPageserverAttachmentChange::Attach(a) => a, TenantShardPageserverAttachmentChange::Detach(a) => a, diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index 3fb1a5c229..c03e422bf6 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -433,7 +433,7 @@ impl Manager { state_version_rx: tli.get_state_version_rx(), num_computes_rx: tli.get_walreceivers().get_num_rx(), tli_broker_active: broker_active_set.guard(tli.clone()), - wal_advertiser: wal_advertiser.spawn(tli.clone()).unwrap(), + wal_advertiser: wal_advertiser.register_timeline(tli.clone()).unwrap(), last_removed_segno: 0, is_offloaded, backup_task: None, diff --git a/safekeeper/src/wal_advertiser/advmap.rs b/safekeeper/src/wal_advertiser/advmap.rs index 0f877a9267..68c39f7f1a 100644 --- a/safekeeper/src/wal_advertiser/advmap.rs +++ b/safekeeper/src/wal_advertiser/advmap.rs @@ -2,12 +2,15 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap, HashSet, btree_map, hash_map}, - sync::{Arc, RwLock}, + sync::{Arc, Mutex, RwLock}, + time::Duration, }; use safekeeper_api::models::TenantShardPageserverAttachment; use serde::Serialize; -use tokio::sync::mpsc; +use tokio::sync::{Notify, mpsc}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, warn}; use utils::{ generation::Generation, id::{NodeId, TenantId, TenantTimelineId, TimelineId}, @@ -20,31 +23,55 @@ use crate::timeline::Timeline; #[derive(Default)] pub struct World { + cancel: CancellationToken, inner: RwLock, + pageservers: Pageservers, } +#[derive(Default)] struct WorldInner { - tenant_shards: HashMap, + tenants: HashMap, } +#[derive(Default)] struct Pageservers { + cancel: CancellationToken, pageservers: HashMap, } impl Pageservers { - fn get_handle(&self, ps_id: NodeId) -> PageserverHandle { + fn get_handle(&mut self, ps_id: NodeId) -> PageserverHandle { match self.pageservers.entry(ps_id) { hash_map::Entry::Occupied(occupied_entry) => occupied_entry.get().clone(), hash_map::Entry::Vacant(vacant_entry) => { - todo!() + let notify = Arc::new(Notify::new()); + let cancel = CancellationToken::new(); + let state = Default::default(); + tokio::spawn( + PageserverTask { + ps_id, + state: state.clone(), + notify: Arc::clone(¬ify), + cancel: cancel.clone(), + } + .run(), + ); + PageserverHandle { state, cancel } } } } } -struct Pageserver { - // XXX track more fine-grained - world: Arc, +struct PageserverTask { + ps_id: NodeId, + state: Arc>, + notify: Arc, + cancel: CancellationToken, +} + +#[derive(Default)] +struct PageserverSharedState { + pending_advertisements: HashMap, } struct CommitLsnAdv { @@ -52,6 +79,7 @@ struct CommitLsnAdv { commit_lsn: Lsn, } +#[derive(Debug)] struct RemoteConsistentLsnAdv { tenant_id: TenantId, shard_attachment: ShardAttachmentId, @@ -61,29 +89,36 @@ struct RemoteConsistentLsnAdv { #[derive(Clone)] struct PageserverHandle { - tx: tokio::sync::mpsc::Sender, + state: Arc>, + cancel: CancellationToken, +} + +impl std::ops::Drop for PageserverHandle { + fn drop(&mut self) { + self.cancel.cancel(); + } } #[derive(Default)] struct SafekeeperTenant { /// Shared with [`SafekeeperTimeline::tenant_shard_attachments`]. - tenant_shard_attachments: Arc>, + tenant_shard_attachments: tokio::sync::watch::Sender, timelines: HashMap>, } struct SafekeeperTimeline { tli: Arc, /// Shared with [`SafekeeperTenant::tenant_shard_attachments`]. - tenant_shard_attachments: Arc>, + tenant_shard_attachments: tokio::sync::watch::Receiver, remote_consistent_lsns: HashMap, } #[derive(Default)] struct TenantShardAttachments { pageservers: Pageservers, - precise: BTreeSet, + precise: HashSet, /// projection from `precise` - nodes: BTreeMap, // usize is a refcount from precise + nodes: HashMap, // usize is a refcount from precise } impl TenantShardAttachments { @@ -100,20 +135,20 @@ impl TenantShardAttachments { return; } let mut entry = match self.nodes.entry(a.ps_id) { - btree_map::Entry::Vacant(vacant_entry) => unreachable!("was referenced by precise"), - btree_map::Entry::Occupied(occupied_entry) => occupied_entry, + hash_map::Entry::Vacant(vacant_entry) => unreachable!("was referenced by precise"), + hash_map::Entry::Occupied(occupied_entry) => occupied_entry, }; - *entry.get_mut() = entry - .get() + let (refcount, _) = entry.get_mut(); + *refcount = refcount .checked_sub(1) .expect("was referenced by precise, we add refcount in add()"); - if *entry.get() == 0 { + if *refcount == 0 { entry.remove(); } } pub fn nodes(&self) -> impl Iterator { - self.nodes.values() + self.nodes.values().map(|(_, h)| h) } } @@ -124,20 +159,14 @@ struct ShardAttachmentId { ps_id: NodeId, } -pub struct PageserverAttachment { - pageserver: NodeId, - tenant_shard_id: TenantShardId, - generation: Generation, - remote_consistent_lsn: RwLock>>, -} - -pub struct SafekeeperTimelineHandle {} - impl World { - pub fn spawn(&self, tli: Arc) -> anyhow::Result { + pub fn register_timeline( + &self, + tli: Arc, + ) -> anyhow::Result { let ttid = tli.ttid; let mut inner = self.inner.write().unwrap(); - let sk_tenant: &mut SafekeeperTenant = inner.tenant_shards.entry(tenant_id).or_default(); + let sk_tenant: &mut SafekeeperTenant = inner.tenants.entry(ttid.tenant_id).or_default(); let vacant = match sk_tenant.timelines.entry(ttid.timeline_id) { hash_map::Entry::Occupied(occupied_entry) => { anyhow::bail!("entry for timeline already exists"); @@ -147,13 +176,13 @@ impl World { tokio::spawn(async move { SafekeeperTimeline { tli, - tenant_shard_attachments: Arc::clone(&sk_tenant.tenant_shard_attachments), + tenant_shard_attachments: sk_tenant.tenant_shard_attachments.subscribe(), remote_consistent_lsns: HashMap::default(), // TODO: fill from persistence inside .run()? } .run() .await; }); - vacant.insert(SafekeeperTimelineHandle {}) + Ok(vacant.insert(SafekeeperTimelineHandle { tx }).clone()) } pub fn update_pageserver_attachments( &self, @@ -161,8 +190,7 @@ impl World { arg: safekeeper_api::models::TenantShardPageserverAttachmentChange, ) -> anyhow::Result<()> { let mut inner = self.inner.write().unwrap(); - let sk_tenant: &mut SafekeeperTenant = - inner.tenant_shards.entry(tenant_id).or_insert(todo!()); + let sk_tenant: &mut SafekeeperTenant = inner.tenants.entry(tenant_id).or_insert(todo!()); let mut shard_attachments = sk_tenant.tenant_shard_attachments.write().unwrap(); use safekeeper_api::models::TenantShardPageserverAttachmentChange::*; let change = match arg { @@ -178,9 +206,9 @@ impl World { shard_attachment, timeline_id, remote_consistent_lsn, - } = adv; + } = &adv; let mut inner = self.inner.write().unwrap(); - let Some(sk_tenant) = inner.tenant_shards.get(&tenant_id) else { + let Some(sk_tenant) = inner.tenants.get(&tenant_id) else { warn!(?adv, "tenant shard attachment is not known"); return; }; @@ -190,7 +218,7 @@ impl World { }; if cfg!(feature = "testing") { let commit_lsn = *timeline.tli.get_commit_lsn_watch_rx().borrow(); - if !(remote_consistent_lsn <= commit_lsn) { + if !(*remote_consistent_lsn <= commit_lsn) { warn!( ?adv, "advertised remote_consistent_lsn is ahead of commit_lsn" @@ -214,48 +242,80 @@ impl World { } impl SafekeeperTimeline { - async fn run(self) { + async fn run(mut self) { let Ok(gate_guard) = self.tli.gate.enter() else { return; }; let cancel = self.tli.cancel.child_token(); - let mut commit_lsn_rx = self.tli.get_commit_lsn_watch_rx(); + let ttid = self.tli.ttid; + + let mut commit_lsn_rx = self.tli.get_commit_lsn_watch_rx(); + + // arm for first iteration + commit_lsn_rx.mark_changed(); + self.tenant_shard_attachments.mark_changed(); + + let mut tenant_shard_attachments: Vec = Vec::new(); loop { - let commit_lsn = *commit_lsn_rx.borrow_and_update(); - let shard_attachments = self.tenant_shard_attachments.read().unwrap(); - for node in shard_attachments.nodes() { - node.advertise_commit_lsn(CommitLsnAdv { ttid, commit_lsn }); - } tokio::select! { _ = cancel.cancelled() => { return; } - commit_lsn = async { + _ = async { // at most one advertisement per second tokio::time::sleep(Duration::from_secs(1)).await; commit_lsn_rx.changed().await - } => { continue; } + } => { } + _ = self.tenant_shard_attachments.changed() => { + tenant_shard_attachments.clear(); + tenant_shard_attachments.extend(self.tenant_shard_attachments.borrow_and_update().nodes().cloned().collect()); + tenant_shard_attachments.shrink_to_fit(); + } }; + let commit_lsn = *commit_lsn_rx.borrow_and_update(); // The rhs deref minimizes time we lock the commit_lsn_tx + for pageserver_handle in tenant_shard_attachments { + // NB: if this function ever becomes slow / may need an .await, make sure + // that other nodes continue to recieve advertisements. + pageserver_handle.advertise_commit_lsn(CommitLsnAdv { ttid, commit_lsn }); + } + drop(shard_attachments); } drop(gate_guard); } } -impl SafekeeperTimelineHandle { - pub fn ready_for_eviction(&self) -> bool { - todo!() - } -} - impl PageserverHandle { - pub async fn advertise_commit_lsn(&self, adv: CommitLsnAdv) { - self.tx - .send(adv) - .await - .expect("Pageserver loop never drops the rx"); + pub fn advertise_commit_lsn(&self, adv: CommitLsnAdv) { + let mut state = self.state.write().unwrap(); + state.pending_advertisements.insert(ttid, commit_lsn); } } -impl Pageserver {} +impl PageserverTask { + /// Cancellation: happens through last PageserverHandle being dropped. + async fn run(mut self) { + let mut pending_advertisements = HashMap::new(); + loop { + tokio::select! { + _ = self.cancel.cancelled() => { + return; + } + _ = self.notify.notified() => {} + } + { + let mut state = self.state.locck().unwrap(); + pending_advertisements.clear(); + pending_advertisements = + std::mem::replace(&mut state.pending_advertisements, pending_advertisements); + drop(state); + } + + todo!("grpc call"); + + // batch updates by sleeping + tokio::time::sleep(Duration::from_secs(1)).await; + } + } +} impl From for ShardAttachmentId { fn from(value: safekeeper_api::models::TenantShardPageserverAttachment) {