diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 1e71219507..4de456ddc3 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -319,9 +319,18 @@ pub enum TenantShardPageserverAttachmentChange { Detach(TenantShardPageserverAttachment), } +impl TenantShardPageserverAttachmentChange { + pub fn attachment(&self) -> TenantShardPageserverAttachment { + match self { + TenantShardPageserverAttachmentChange::Attach(a) => a, + TenantShardPageserverAttachmentChange::Detach(a) => a, + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct TenantShardPageserverAttachment { - pub shard: ShardIndex, + pub shard_id: ShardIndex, pub generation: Generation, pub ps_id: NodeId, } diff --git a/safekeeper/src/wal_advertiser/advmap.rs b/safekeeper/src/wal_advertiser/advmap.rs index a26a245859..293c6f820e 100644 --- a/safekeeper/src/wal_advertiser/advmap.rs +++ b/safekeeper/src/wal_advertiser/advmap.rs @@ -1,11 +1,13 @@ //! The data structure that track advertisement state. use std::{ - collections::HashMap, + collections::{BTreeMap, BTreeSet, HashMap, HashSet, btree_map, hash_map}, sync::{Arc, RwLock}, }; +use safekeeper_api::models::TenantShardPageserverAttachment; use serde::Serialize; +use tokio::sync::mpsc; use utils::{ generation::Generation, id::{NodeId, TenantId, TenantTimelineId, TimelineId}, @@ -18,12 +20,139 @@ use crate::timeline::Timeline; #[derive(Default)] pub struct World { - pageservers: RwLock>>, + inner: RwLock, } -pub struct Pageserver { - node_id: NodeId, - attachments: RwLock>>, +struct WorldInner { + tenant_shards: HashMap, +} + +struct Pageservers { + pageservers: HashMap, +} + +impl Pageservers { + fn get_handle(&self, ps_id: NodeId) -> PageserverHandle { + match self.pageservers.entry(ps_id) { + hash_map::Entry::Occupied(occupied_entry) => occupied_entry.get().clone(), + hash_map::Entry::Vacant(vacant_entry) => { + todo!() + } + } + } +} + +struct Pageserver { + // XXX track more fine-grained + world: Arc, +} + +struct CommitLsnAdv { + ttid: TenantTimelineId, + commit_lsn: Lsn, +} + +struct RemoteConsistentLsnAdv { + tenant_id: TenantId, + shard_attachment: ShardAttachmentId, + timeline_id: TimelineId, + remote_consistent_lsn: Lsn, +} + +#[derive(Clone)] +struct PageserverHandle { + tx: tokio::sync::mpsc::Sender, +} + +struct SafekeeperTenant { + /// Shared among all [`SafekeeperTimeline::shard_attachments`] + /// in [`Self::timelines`]. + shard_attachments: Arc>, + timelines: HashMap>, +} + +struct SafekeeperTimeline { + tli: Arc, + /// Shared among all [`SafekeeperTimeline`] instances of a [`SafekeeperTenant`], + /// and [`SafekeeperTenant::shard_attachments`]. + shard_attachments: Arc>, +} + +struct ShardAttachments { + pageservers: Pageservers, + precise: BTreeMap, + nodes: BTreeMap, // usize is a refcount from precise +} + +#[derive(Default)] +struct ShardAttachment { + remote_consistent_lsn: Option, +} + +impl ShardAttachments { + pub fn add(&mut self, a: ShardAttachmentId) { + let (refcount, _) = self + .nodes + .entry(a.ps_id) + .or_insert_with(|| (0, self.pageservers.get_handle(a.ps_id))); + *refcount += 1; + } + pub fn remove(&mut self, a: ShardAttachmentId) -> ShardAttachmentChange { + let removed = self.precise.remove(&a); + if !removed { + return ShardAttachmentChange::None; + } + let mut entry = match self.nodes.entry(a.ps_id) { + btree_map::Entry::Vacant(vacant_entry) => unreachable!("was referenced by precise"), + btree_map::Entry::Occupied(occupied_entry) => occupied_entry, + }; + *entry.get_mut() = entry + .get() + .checked_sub(1) + .expect("was referenced by precise, we add refcount in add()"); + if *entry.get() == 0 { + entry.remove(); + } + ShardAttachmentChange::Removed(a.ps_id) + } + pub fn advance_remote_consistent_lsn( + &mut self, + a: ShardAttachmentId, + remote_consistent_lsn: Lsn, + ) -> anyhow::Result<()> { + let Some(attachment) = self.precise.get_mut(&a) else { + anyhow::bail!( + "attachment is not known: attachment={a:?} remote_consistent_lsn={remote_consistent_lsn}" + ); + }; + let current = attachment + .remote_consistent_lsn + .get_or_insert(remote_consistent_lsn); + match current.cmp(remote_consistent_lsn) { + std::cmp::Ordering::Less => { + *current = remote_consistent_lsn; + Ok(()) + } + std::cmp::Ordering::Equal => { + warn!(attachment=?a, %remote_consistent_lsn, "update does not advance remote_consistent_lsn"); + Ok(()) + } + std::cmp::Ordering::Greater => { + // Does this need to be an error? Can we just ignore? Does anything in this function need to be an error? + anyhow::bail!("proposed remote_consistent_lsn is lower than current record"); + } + } + } + pub fn nodes(&self) -> impl Iterator { + self.nodes.values() + } +} + +#[derive(Debug)] +struct ShardAttachmentId { + shard_id: ShardIndex, + generation: Generation, + ps_id: NodeId, } pub struct PageserverAttachment { @@ -33,19 +162,7 @@ pub struct PageserverAttachment { remote_consistent_lsn: RwLock>>, } -pub struct PageserverTimeline { - tenant_shard_id: TenantShardId, - timeline_id: TimelineId, - generation: Generation, - remote_consistent_lsn: RwLock, - pageserver: Arc, -} - pub struct SafekeeperTimelineHandle {} -struct SafekeeperTimeline { - tli: Arc, - pageserver_shards: RwLock>>, -} impl World { pub fn housekeeping(&self) {} @@ -53,7 +170,7 @@ impl World { tokio::spawn(async move { SafekeeperTimeline { tli, - pageserver_shards: todo!(), + pageserver_timeline_shards: todo!(), } .run() .await; @@ -65,7 +182,19 @@ impl World { tenant_id: TenantId, arg: safekeeper_api::models::TenantShardPageserverAttachmentChange, ) -> anyhow::Result<()> { - todo!() + let mut inner = self.inner.write().unwrap(); + let sk_tenant: &mut SafekeeperTenant = + inner.tenant_shards.entry(tenant_id).or_insert(todo!()); + let mut shard_attachments = sk_tenant.shard_attachments.write().unwrap(); + use safekeeper_api::models::TenantShardPageserverAttachmentChange::*; + let change = match arg { + Attach(a) => shard_attachments.add(a.into()), + Detach(a) => shard_attachments.remove(a.into()), + }; + Ok(()) + } + pub fn process_remote_consistent_lsn_adv(&self, adv: RemoteConsistentLsnAdv) { + } } @@ -79,27 +208,46 @@ impl SafekeeperTimeline { let ttid = self.tli.ttid; loop { let commit_lsn = *commit_lsn_rx.borrow_and_update(); - { - let guard = self.pageserver_shards.read().unwrap(); - for (shard, ps_tl) in guard.iter() { - if *ps_tl.remote_consistent_lsn.read() < commit_lsn { - ps_tl.pageserver.advertise(ps_tl); - } - } + let shard_attachments = self.shard_attachments.read().unwrap(); + for node in shard_attachments.nodes() { + node.advertise_commit_lsn(CommitLsnAdv { ttid, commit_lsn }); } - tokio::select! { _ = cancel.cancelled() => { return; } - // TODO: debounce changed notifications, one every second or so is easily enough. - commit_lsn = commit_lsn_rx.changed() => { continue; } + commit_lsn = async { + // at most one advertisement per second + tokio::time::sleep(Duration::from_secs(1)).await; + commit_lsn_rx.changed().await + } => { continue; } }; } drop(gate_guard); } } -impl Pageserver { - fn advertise(&self, ps_tl: &Arc) { - todo!() +impl PageserverHandle { + pub async fn advertise_commit_lsn(&self, adv: CommitLsnAdv) { + self.tx + .send(adv) + .await + .expect("Pageserver loop never drops the rx"); + } +} + +impl Pageserver { +} + +impl From for ShardAttachmentId { + fn from(value: safekeeper_api::models::TenantShardPageserverAttachment) { + let safekeeper_api::models::TenantShardPageserverAttachment { + shard_id, + generation, + ps_id, + } = value; + ShardAttachmentId { + shard_id, + generation, + ps_id, + } } } diff --git a/storage_controller/src/service/sk_ps_discovery.rs b/storage_controller/src/service/sk_ps_discovery.rs index 91c4a24d69..4ea15c5506 100644 --- a/storage_controller/src/service/sk_ps_discovery.rs +++ b/storage_controller/src/service/sk_ps_discovery.rs @@ -224,7 +224,7 @@ impl DeliveryAttempt { let body = { let val = TenantShardPageserverAttachment { - shard: ShardIndex { + shard_id: ShardIndex { shard_number: utils::shard::ShardNumber(self.work.shard_number as u8), shard_count: utils::shard::ShardCount(self.work.shard_count as u8), },