This commit is contained in:
Christian Schwarz
2025-06-03 14:00:56 +02:00
parent df36b9aa62
commit 88a3c9e7fd
3 changed files with 191 additions and 34 deletions

View File

@@ -319,9 +319,18 @@ pub enum TenantShardPageserverAttachmentChange {
Detach(TenantShardPageserverAttachment),
}
impl TenantShardPageserverAttachmentChange {
pub fn attachment(&self) -> TenantShardPageserverAttachment {
match self {
TenantShardPageserverAttachmentChange::Attach(a) => a,
TenantShardPageserverAttachmentChange::Detach(a) => a,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TenantShardPageserverAttachment {
pub shard: ShardIndex,
pub shard_id: ShardIndex,
pub generation: Generation,
pub ps_id: NodeId,
}

View File

@@ -1,11 +1,13 @@
//! The data structure that track advertisement state.
use std::{
collections::HashMap,
collections::{BTreeMap, BTreeSet, HashMap, HashSet, btree_map, hash_map},
sync::{Arc, RwLock},
};
use safekeeper_api::models::TenantShardPageserverAttachment;
use serde::Serialize;
use tokio::sync::mpsc;
use utils::{
generation::Generation,
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
@@ -18,12 +20,139 @@ use crate::timeline::Timeline;
#[derive(Default)]
pub struct World {
pageservers: RwLock<HashMap<NodeId, Arc<Pageserver>>>,
inner: RwLock<WorldInner>,
}
pub struct Pageserver {
node_id: NodeId,
attachments: RwLock<HashMap<TenantShardId, Arc<PageserverAttachment>>>,
struct WorldInner {
tenant_shards: HashMap<TenantId, SafekeeperTenant>,
}
struct Pageservers {
pageservers: HashMap<NodeId, PageserverHandle>,
}
impl Pageservers {
fn get_handle(&self, ps_id: NodeId) -> PageserverHandle {
match self.pageservers.entry(ps_id) {
hash_map::Entry::Occupied(occupied_entry) => occupied_entry.get().clone(),
hash_map::Entry::Vacant(vacant_entry) => {
todo!()
}
}
}
}
struct Pageserver {
// XXX track more fine-grained
world: Arc<World>,
}
struct CommitLsnAdv {
ttid: TenantTimelineId,
commit_lsn: Lsn,
}
struct RemoteConsistentLsnAdv {
tenant_id: TenantId,
shard_attachment: ShardAttachmentId,
timeline_id: TimelineId,
remote_consistent_lsn: Lsn,
}
#[derive(Clone)]
struct PageserverHandle {
tx: tokio::sync::mpsc::Sender<CommitLsnAdv>,
}
struct SafekeeperTenant {
/// Shared among all [`SafekeeperTimeline::shard_attachments`]
/// in [`Self::timelines`].
shard_attachments: Arc<RwLock<ShardAttachments>>,
timelines: HashMap<TimelineId, Arc<SafekeeperTimeline>>,
}
struct SafekeeperTimeline {
tli: Arc<Timeline>,
/// Shared among all [`SafekeeperTimeline`] instances of a [`SafekeeperTenant`],
/// and [`SafekeeperTenant::shard_attachments`].
shard_attachments: Arc<RwLock<ShardAttachments>>,
}
struct ShardAttachments {
pageservers: Pageservers,
precise: BTreeMap<ShardAttachmentId, ShardAttachment>,
nodes: BTreeMap<NodeId, (usize, PageserverHandle)>, // usize is a refcount from precise
}
#[derive(Default)]
struct ShardAttachment {
remote_consistent_lsn: Option<Lsn>,
}
impl ShardAttachments {
pub fn add(&mut self, a: ShardAttachmentId) {
let (refcount, _) = self
.nodes
.entry(a.ps_id)
.or_insert_with(|| (0, self.pageservers.get_handle(a.ps_id)));
*refcount += 1;
}
pub fn remove(&mut self, a: ShardAttachmentId) -> ShardAttachmentChange {
let removed = self.precise.remove(&a);
if !removed {
return ShardAttachmentChange::None;
}
let mut entry = match self.nodes.entry(a.ps_id) {
btree_map::Entry::Vacant(vacant_entry) => unreachable!("was referenced by precise"),
btree_map::Entry::Occupied(occupied_entry) => occupied_entry,
};
*entry.get_mut() = entry
.get()
.checked_sub(1)
.expect("was referenced by precise, we add refcount in add()");
if *entry.get() == 0 {
entry.remove();
}
ShardAttachmentChange::Removed(a.ps_id)
}
pub fn advance_remote_consistent_lsn(
&mut self,
a: ShardAttachmentId,
remote_consistent_lsn: Lsn,
) -> anyhow::Result<()> {
let Some(attachment) = self.precise.get_mut(&a) else {
anyhow::bail!(
"attachment is not known: attachment={a:?} remote_consistent_lsn={remote_consistent_lsn}"
);
};
let current = attachment
.remote_consistent_lsn
.get_or_insert(remote_consistent_lsn);
match current.cmp(remote_consistent_lsn) {
std::cmp::Ordering::Less => {
*current = remote_consistent_lsn;
Ok(())
}
std::cmp::Ordering::Equal => {
warn!(attachment=?a, %remote_consistent_lsn, "update does not advance remote_consistent_lsn");
Ok(())
}
std::cmp::Ordering::Greater => {
// Does this need to be an error? Can we just ignore? Does anything in this function need to be an error?
anyhow::bail!("proposed remote_consistent_lsn is lower than current record");
}
}
}
pub fn nodes(&self) -> impl Iterator<Item = &PageserverHandle> {
self.nodes.values()
}
}
#[derive(Debug)]
struct ShardAttachmentId {
shard_id: ShardIndex,
generation: Generation,
ps_id: NodeId,
}
pub struct PageserverAttachment {
@@ -33,19 +162,7 @@ pub struct PageserverAttachment {
remote_consistent_lsn: RwLock<HashMap<TimelineId, Arc<PageserverTimeline>>>,
}
pub struct PageserverTimeline {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
generation: Generation,
remote_consistent_lsn: RwLock<Lsn>,
pageserver: Arc<Pageserver>,
}
pub struct SafekeeperTimelineHandle {}
struct SafekeeperTimeline {
tli: Arc<Timeline>,
pageserver_shards: RwLock<HashMap<TenantShardId, Arc<PageserverTimeline>>>,
}
impl World {
pub fn housekeeping(&self) {}
@@ -53,7 +170,7 @@ impl World {
tokio::spawn(async move {
SafekeeperTimeline {
tli,
pageserver_shards: todo!(),
pageserver_timeline_shards: todo!(),
}
.run()
.await;
@@ -65,7 +182,19 @@ impl World {
tenant_id: TenantId,
arg: safekeeper_api::models::TenantShardPageserverAttachmentChange,
) -> anyhow::Result<()> {
todo!()
let mut inner = self.inner.write().unwrap();
let sk_tenant: &mut SafekeeperTenant =
inner.tenant_shards.entry(tenant_id).or_insert(todo!());
let mut shard_attachments = sk_tenant.shard_attachments.write().unwrap();
use safekeeper_api::models::TenantShardPageserverAttachmentChange::*;
let change = match arg {
Attach(a) => shard_attachments.add(a.into()),
Detach(a) => shard_attachments.remove(a.into()),
};
Ok(())
}
pub fn process_remote_consistent_lsn_adv(&self, adv: RemoteConsistentLsnAdv) {
}
}
@@ -79,27 +208,46 @@ impl SafekeeperTimeline {
let ttid = self.tli.ttid;
loop {
let commit_lsn = *commit_lsn_rx.borrow_and_update();
{
let guard = self.pageserver_shards.read().unwrap();
for (shard, ps_tl) in guard.iter() {
if *ps_tl.remote_consistent_lsn.read() < commit_lsn {
ps_tl.pageserver.advertise(ps_tl);
}
}
let shard_attachments = self.shard_attachments.read().unwrap();
for node in shard_attachments.nodes() {
node.advertise_commit_lsn(CommitLsnAdv { ttid, commit_lsn });
}
tokio::select! {
_ = cancel.cancelled() => { return; }
// TODO: debounce changed notifications, one every second or so is easily enough.
commit_lsn = commit_lsn_rx.changed() => { continue; }
commit_lsn = async {
// at most one advertisement per second
tokio::time::sleep(Duration::from_secs(1)).await;
commit_lsn_rx.changed().await
} => { continue; }
};
}
drop(gate_guard);
}
}
impl Pageserver {
fn advertise(&self, ps_tl: &Arc<PageserverTimeline>) {
todo!()
impl PageserverHandle {
pub async fn advertise_commit_lsn(&self, adv: CommitLsnAdv) {
self.tx
.send(adv)
.await
.expect("Pageserver loop never drops the rx");
}
}
impl Pageserver {
}
impl From<safekeeper_api::models::TenantShardPageserverAttachment> for ShardAttachmentId {
fn from(value: safekeeper_api::models::TenantShardPageserverAttachment) {
let safekeeper_api::models::TenantShardPageserverAttachment {
shard_id,
generation,
ps_id,
} = value;
ShardAttachmentId {
shard_id,
generation,
ps_id,
}
}
}

View File

@@ -224,7 +224,7 @@ impl DeliveryAttempt {
let body = {
let val = TenantShardPageserverAttachment {
shard: ShardIndex {
shard_id: ShardIndex {
shard_number: utils::shard::ShardNumber(self.work.shard_number as u8),
shard_count: utils::shard::ShardCount(self.work.shard_count as u8),
},