This commit is contained in:
Christian Schwarz
2025-06-03 14:55:08 +02:00
parent 88a3c9e7fd
commit 4d2f27a33f
3 changed files with 79 additions and 58 deletions

View File

@@ -42,6 +42,7 @@ impl Manager {
&& next_event.is_none()
&& self.access_service.is_empty()
&& !self.tli_broker_active.get()
&& self.wal_advertiser.ready_for_eviction()
// Partial segment of current flush_lsn is uploaded up to this flush_lsn.
&& !wal_backup_partial::needs_uploading(state, &self.partial_backup_uploaded)
// And it is the next one after the last removed. Given that local

View File

@@ -433,7 +433,7 @@ impl Manager {
state_version_rx: tli.get_state_version_rx(),
num_computes_rx: tli.get_walreceivers().get_num_rx(),
tli_broker_active: broker_active_set.guard(tli.clone()),
wal_advertiser: wal_advertiser.spawn(tli.clone()),
wal_advertiser: wal_advertiser.spawn(tli.clone()).unwrap(),
last_removed_segno: 0,
is_offloaded,
backup_task: None,

View File

@@ -64,32 +64,29 @@ struct PageserverHandle {
tx: tokio::sync::mpsc::Sender<CommitLsnAdv>,
}
#[derive(Default)]
struct SafekeeperTenant {
/// Shared among all [`SafekeeperTimeline::shard_attachments`]
/// in [`Self::timelines`].
shard_attachments: Arc<RwLock<ShardAttachments>>,
/// Shared with [`SafekeeperTimeline::tenant_shard_attachments`].
tenant_shard_attachments: Arc<RwLock<TenantShardAttachments>>,
timelines: HashMap<TimelineId, Arc<SafekeeperTimeline>>,
}
struct SafekeeperTimeline {
tli: Arc<Timeline>,
/// Shared among all [`SafekeeperTimeline`] instances of a [`SafekeeperTenant`],
/// and [`SafekeeperTenant::shard_attachments`].
shard_attachments: Arc<RwLock<ShardAttachments>>,
}
struct ShardAttachments {
pageservers: Pageservers,
precise: BTreeMap<ShardAttachmentId, ShardAttachment>,
nodes: BTreeMap<NodeId, (usize, PageserverHandle)>, // usize is a refcount from precise
/// Shared with [`SafekeeperTenant::tenant_shard_attachments`].
tenant_shard_attachments: Arc<RwLock<TenantShardAttachments>>,
remote_consistent_lsns: HashMap<ShardAttachmentId, Lsn>,
}
#[derive(Default)]
struct ShardAttachment {
remote_consistent_lsn: Option<Lsn>,
struct TenantShardAttachments {
pageservers: Pageservers,
precise: BTreeSet<ShardAttachmentId>,
/// projection from `precise`
nodes: BTreeMap<NodeId, (usize, PageserverHandle)>, // usize is a refcount from precise
}
impl ShardAttachments {
impl TenantShardAttachments {
pub fn add(&mut self, a: ShardAttachmentId) {
let (refcount, _) = self
.nodes
@@ -97,10 +94,10 @@ impl ShardAttachments {
.or_insert_with(|| (0, self.pageservers.get_handle(a.ps_id)));
*refcount += 1;
}
pub fn remove(&mut self, a: ShardAttachmentId) -> ShardAttachmentChange {
pub fn remove(&mut self, a: ShardAttachmentId) {
let removed = self.precise.remove(&a);
if !removed {
return ShardAttachmentChange::None;
return;
}
let mut entry = match self.nodes.entry(a.ps_id) {
btree_map::Entry::Vacant(vacant_entry) => unreachable!("was referenced by precise"),
@@ -113,42 +110,14 @@ impl ShardAttachments {
if *entry.get() == 0 {
entry.remove();
}
ShardAttachmentChange::Removed(a.ps_id)
}
pub fn advance_remote_consistent_lsn(
&mut self,
a: ShardAttachmentId,
remote_consistent_lsn: Lsn,
) -> anyhow::Result<()> {
let Some(attachment) = self.precise.get_mut(&a) else {
anyhow::bail!(
"attachment is not known: attachment={a:?} remote_consistent_lsn={remote_consistent_lsn}"
);
};
let current = attachment
.remote_consistent_lsn
.get_or_insert(remote_consistent_lsn);
match current.cmp(remote_consistent_lsn) {
std::cmp::Ordering::Less => {
*current = remote_consistent_lsn;
Ok(())
}
std::cmp::Ordering::Equal => {
warn!(attachment=?a, %remote_consistent_lsn, "update does not advance remote_consistent_lsn");
Ok(())
}
std::cmp::Ordering::Greater => {
// Does this need to be an error? Can we just ignore? Does anything in this function need to be an error?
anyhow::bail!("proposed remote_consistent_lsn is lower than current record");
}
}
}
pub fn nodes(&self) -> impl Iterator<Item = &PageserverHandle> {
self.nodes.values()
}
}
#[derive(Debug)]
#[derive(Debug, Hash, PartialEq, Eq)]
struct ShardAttachmentId {
shard_id: ShardIndex,
generation: Generation,
@@ -165,17 +134,26 @@ pub struct PageserverAttachment {
pub struct SafekeeperTimelineHandle {}
impl World {
pub fn housekeeping(&self) {}
pub fn spawn(&self, tli: Arc<Timeline>) -> SafekeeperTimelineHandle {
pub fn spawn(&self, tli: Arc<Timeline>) -> anyhow::Result<SafekeeperTimelineHandle> {
let ttid = tli.ttid;
let mut inner = self.inner.write().unwrap();
let sk_tenant: &mut SafekeeperTenant = inner.tenant_shards.entry(tenant_id).or_default();
let vacant = match sk_tenant.timelines.entry(ttid.timeline_id) {
hash_map::Entry::Occupied(occupied_entry) => {
anyhow::bail!("entry for timeline already exists");
}
hash_map::Entry::Vacant(vacant) => vacant,
};
tokio::spawn(async move {
SafekeeperTimeline {
tli,
pageserver_timeline_shards: todo!(),
tenant_shard_attachments: Arc::clone(&sk_tenant.tenant_shard_attachments),
remote_consistent_lsns: HashMap::default(), // TODO: fill from persistence inside .run()?
}
.run()
.await;
});
SafekeeperTimelineHandle {}
vacant.insert(SafekeeperTimelineHandle {})
}
pub fn update_pageserver_attachments(
&self,
@@ -185,7 +163,7 @@ impl World {
let mut inner = self.inner.write().unwrap();
let sk_tenant: &mut SafekeeperTenant =
inner.tenant_shards.entry(tenant_id).or_insert(todo!());
let mut shard_attachments = sk_tenant.shard_attachments.write().unwrap();
let mut shard_attachments = sk_tenant.tenant_shard_attachments.write().unwrap();
use safekeeper_api::models::TenantShardPageserverAttachmentChange::*;
let change = match arg {
Attach(a) => shard_attachments.add(a.into()),
@@ -193,8 +171,45 @@ impl World {
};
Ok(())
}
pub fn process_remote_consistent_lsn_adv(&self, adv: RemoteConsistentLsnAdv) {
pub fn handle_remote_consistent_lsn_advertisement(&mut self, adv: RemoteConsistentLsnAdv) {
debug!(?adv, "processing advertisement");
let RemoteConsistentLsnAdv {
tenant_id,
shard_attachment,
timeline_id,
remote_consistent_lsn,
} = adv;
let mut inner = self.inner.write().unwrap();
let Some(sk_tenant) = inner.tenant_shards.get(&tenant_id) else {
warn!(?adv, "tenant shard attachment is not known");
return;
};
let Some(timeline) = sk_tenant.timelines.get_mut(&timeline_id) else {
warn!(?adv, "safekeeper timeline is not know");
return;
};
if cfg!(feature = "testing") {
let commit_lsn = *timeline.tli.get_commit_lsn_watch_rx().borrow();
if !(remote_consistent_lsn <= commit_lsn) {
warn!(
?adv,
"advertised remote_consistent_lsn is ahead of commit_lsn"
);
return;
}
}
let current = match timeline.remote_consistent_lsns.entry(shard_attachment) {
hash_map::Entry::Occupied(mut occupied_entry) => occupied_entry.get_mut(),
hash_map::Entry::Vacant(vacant_entry) => {
info!(?adv, "first time learning about timeline shard attachment");
vacant_entry.insert(remote_consistent_lsn)
}
};
if !(*current <= remote_consistent_lsn) {
warn!(current=%*current, ?adv, "advertised remote_consistent_lsn is lower than earlier advertisements, either delayed message or shard is behaving inconsistently");
return;
}
*current = remote_consistent_lsn;
}
}
@@ -208,7 +223,7 @@ impl SafekeeperTimeline {
let ttid = self.tli.ttid;
loop {
let commit_lsn = *commit_lsn_rx.borrow_and_update();
let shard_attachments = self.shard_attachments.read().unwrap();
let shard_attachments = self.tenant_shard_attachments.read().unwrap();
for node in shard_attachments.nodes() {
node.advertise_commit_lsn(CommitLsnAdv { ttid, commit_lsn });
}
@@ -225,6 +240,12 @@ impl SafekeeperTimeline {
}
}
impl SafekeeperTimelineHandle {
pub fn ready_for_eviction(&self) -> bool {
todo!()
}
}
impl PageserverHandle {
pub async fn advertise_commit_lsn(&self, adv: CommitLsnAdv) {
self.tx
@@ -234,8 +255,7 @@ impl PageserverHandle {
}
}
impl Pageserver {
}
impl Pageserver {}
impl From<safekeeper_api::models::TenantShardPageserverAttachment> for ShardAttachmentId {
fn from(value: safekeeper_api::models::TenantShardPageserverAttachment) {