This commit is contained in:
Christian Schwarz
2025-06-03 18:42:54 +02:00
parent 4d2f27a33f
commit 1de0f41403
3 changed files with 120 additions and 60 deletions

View File

@@ -320,7 +320,7 @@ pub enum TenantShardPageserverAttachmentChange {
}
impl TenantShardPageserverAttachmentChange {
pub fn attachment(&self) -> TenantShardPageserverAttachment {
pub fn attachment(&self) -> &TenantShardPageserverAttachment {
match self {
TenantShardPageserverAttachmentChange::Attach(a) => a,
TenantShardPageserverAttachmentChange::Detach(a) => a,

View File

@@ -433,7 +433,7 @@ impl Manager {
state_version_rx: tli.get_state_version_rx(),
num_computes_rx: tli.get_walreceivers().get_num_rx(),
tli_broker_active: broker_active_set.guard(tli.clone()),
wal_advertiser: wal_advertiser.spawn(tli.clone()).unwrap(),
wal_advertiser: wal_advertiser.register_timeline(tli.clone()).unwrap(),
last_removed_segno: 0,
is_offloaded,
backup_task: None,

View File

@@ -2,12 +2,15 @@
use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet, btree_map, hash_map},
sync::{Arc, RwLock},
sync::{Arc, Mutex, RwLock},
time::Duration,
};
use safekeeper_api::models::TenantShardPageserverAttachment;
use serde::Serialize;
use tokio::sync::mpsc;
use tokio::sync::{Notify, mpsc};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use utils::{
generation::Generation,
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
@@ -20,31 +23,55 @@ use crate::timeline::Timeline;
#[derive(Default)]
pub struct World {
cancel: CancellationToken,
inner: RwLock<WorldInner>,
pageservers: Pageservers,
}
#[derive(Default)]
struct WorldInner {
tenant_shards: HashMap<TenantId, SafekeeperTenant>,
tenants: HashMap<TenantId, SafekeeperTenant>,
}
#[derive(Default)]
struct Pageservers {
cancel: CancellationToken,
pageservers: HashMap<NodeId, PageserverHandle>,
}
impl Pageservers {
fn get_handle(&self, ps_id: NodeId) -> PageserverHandle {
fn get_handle(&mut self, ps_id: NodeId) -> PageserverHandle {
match self.pageservers.entry(ps_id) {
hash_map::Entry::Occupied(occupied_entry) => occupied_entry.get().clone(),
hash_map::Entry::Vacant(vacant_entry) => {
todo!()
let notify = Arc::new(Notify::new());
let cancel = CancellationToken::new();
let state = Default::default();
tokio::spawn(
PageserverTask {
ps_id,
state: state.clone(),
notify: Arc::clone(&notify),
cancel: cancel.clone(),
}
.run(),
);
PageserverHandle { state, cancel }
}
}
}
}
struct Pageserver {
// XXX track more fine-grained
world: Arc<World>,
struct PageserverTask {
ps_id: NodeId,
state: Arc<Mutex<PageserverSharedState>>,
notify: Arc<tokio::sync::Notify>,
cancel: CancellationToken,
}
#[derive(Default)]
struct PageserverSharedState {
pending_advertisements: HashMap<TenantTimelineId, Lsn>,
}
struct CommitLsnAdv {
@@ -52,6 +79,7 @@ struct CommitLsnAdv {
commit_lsn: Lsn,
}
#[derive(Debug)]
struct RemoteConsistentLsnAdv {
tenant_id: TenantId,
shard_attachment: ShardAttachmentId,
@@ -61,29 +89,36 @@ struct RemoteConsistentLsnAdv {
#[derive(Clone)]
struct PageserverHandle {
tx: tokio::sync::mpsc::Sender<CommitLsnAdv>,
state: Arc<RwLock<PageserverSharedState>>,
cancel: CancellationToken,
}
impl std::ops::Drop for PageserverHandle {
fn drop(&mut self) {
self.cancel.cancel();
}
}
#[derive(Default)]
struct SafekeeperTenant {
/// Shared with [`SafekeeperTimeline::tenant_shard_attachments`].
tenant_shard_attachments: Arc<RwLock<TenantShardAttachments>>,
tenant_shard_attachments: tokio::sync::watch::Sender<TenantShardAttachments>,
timelines: HashMap<TimelineId, Arc<SafekeeperTimeline>>,
}
struct SafekeeperTimeline {
tli: Arc<Timeline>,
/// Shared with [`SafekeeperTenant::tenant_shard_attachments`].
tenant_shard_attachments: Arc<RwLock<TenantShardAttachments>>,
tenant_shard_attachments: tokio::sync::watch::Receiver<TenantShardAttachments>,
remote_consistent_lsns: HashMap<ShardAttachmentId, Lsn>,
}
#[derive(Default)]
struct TenantShardAttachments {
pageservers: Pageservers,
precise: BTreeSet<ShardAttachmentId>,
precise: HashSet<ShardAttachmentId>,
/// projection from `precise`
nodes: BTreeMap<NodeId, (usize, PageserverHandle)>, // usize is a refcount from precise
nodes: HashMap<NodeId, (usize, PageserverHandle)>, // usize is a refcount from precise
}
impl TenantShardAttachments {
@@ -100,20 +135,20 @@ impl TenantShardAttachments {
return;
}
let mut entry = match self.nodes.entry(a.ps_id) {
btree_map::Entry::Vacant(vacant_entry) => unreachable!("was referenced by precise"),
btree_map::Entry::Occupied(occupied_entry) => occupied_entry,
hash_map::Entry::Vacant(vacant_entry) => unreachable!("was referenced by precise"),
hash_map::Entry::Occupied(occupied_entry) => occupied_entry,
};
*entry.get_mut() = entry
.get()
let (refcount, _) = entry.get_mut();
*refcount = refcount
.checked_sub(1)
.expect("was referenced by precise, we add refcount in add()");
if *entry.get() == 0 {
if *refcount == 0 {
entry.remove();
}
}
pub fn nodes(&self) -> impl Iterator<Item = &PageserverHandle> {
self.nodes.values()
self.nodes.values().map(|(_, h)| h)
}
}
@@ -124,20 +159,14 @@ struct ShardAttachmentId {
ps_id: NodeId,
}
pub struct PageserverAttachment {
pageserver: NodeId,
tenant_shard_id: TenantShardId,
generation: Generation,
remote_consistent_lsn: RwLock<HashMap<TimelineId, Arc<PageserverTimeline>>>,
}
pub struct SafekeeperTimelineHandle {}
impl World {
pub fn spawn(&self, tli: Arc<Timeline>) -> anyhow::Result<SafekeeperTimelineHandle> {
pub fn register_timeline(
&self,
tli: Arc<Timeline>,
) -> anyhow::Result<SafekeeperTimelineHandle> {
let ttid = tli.ttid;
let mut inner = self.inner.write().unwrap();
let sk_tenant: &mut SafekeeperTenant = inner.tenant_shards.entry(tenant_id).or_default();
let sk_tenant: &mut SafekeeperTenant = inner.tenants.entry(ttid.tenant_id).or_default();
let vacant = match sk_tenant.timelines.entry(ttid.timeline_id) {
hash_map::Entry::Occupied(occupied_entry) => {
anyhow::bail!("entry for timeline already exists");
@@ -147,13 +176,13 @@ impl World {
tokio::spawn(async move {
SafekeeperTimeline {
tli,
tenant_shard_attachments: Arc::clone(&sk_tenant.tenant_shard_attachments),
tenant_shard_attachments: sk_tenant.tenant_shard_attachments.subscribe(),
remote_consistent_lsns: HashMap::default(), // TODO: fill from persistence inside .run()?
}
.run()
.await;
});
vacant.insert(SafekeeperTimelineHandle {})
Ok(vacant.insert(SafekeeperTimelineHandle { tx }).clone())
}
pub fn update_pageserver_attachments(
&self,
@@ -161,8 +190,7 @@ impl World {
arg: safekeeper_api::models::TenantShardPageserverAttachmentChange,
) -> anyhow::Result<()> {
let mut inner = self.inner.write().unwrap();
let sk_tenant: &mut SafekeeperTenant =
inner.tenant_shards.entry(tenant_id).or_insert(todo!());
let sk_tenant: &mut SafekeeperTenant = inner.tenants.entry(tenant_id).or_insert(todo!());
let mut shard_attachments = sk_tenant.tenant_shard_attachments.write().unwrap();
use safekeeper_api::models::TenantShardPageserverAttachmentChange::*;
let change = match arg {
@@ -178,9 +206,9 @@ impl World {
shard_attachment,
timeline_id,
remote_consistent_lsn,
} = adv;
} = &adv;
let mut inner = self.inner.write().unwrap();
let Some(sk_tenant) = inner.tenant_shards.get(&tenant_id) else {
let Some(sk_tenant) = inner.tenants.get(&tenant_id) else {
warn!(?adv, "tenant shard attachment is not known");
return;
};
@@ -190,7 +218,7 @@ impl World {
};
if cfg!(feature = "testing") {
let commit_lsn = *timeline.tli.get_commit_lsn_watch_rx().borrow();
if !(remote_consistent_lsn <= commit_lsn) {
if !(*remote_consistent_lsn <= commit_lsn) {
warn!(
?adv,
"advertised remote_consistent_lsn is ahead of commit_lsn"
@@ -214,48 +242,80 @@ impl World {
}
impl SafekeeperTimeline {
async fn run(self) {
async fn run(mut self) {
let Ok(gate_guard) = self.tli.gate.enter() else {
return;
};
let cancel = self.tli.cancel.child_token();
let mut commit_lsn_rx = self.tli.get_commit_lsn_watch_rx();
let ttid = self.tli.ttid;
let mut commit_lsn_rx = self.tli.get_commit_lsn_watch_rx();
// arm for first iteration
commit_lsn_rx.mark_changed();
self.tenant_shard_attachments.mark_changed();
let mut tenant_shard_attachments: Vec<PageserverHandle> = Vec::new();
loop {
let commit_lsn = *commit_lsn_rx.borrow_and_update();
let shard_attachments = self.tenant_shard_attachments.read().unwrap();
for node in shard_attachments.nodes() {
node.advertise_commit_lsn(CommitLsnAdv { ttid, commit_lsn });
}
tokio::select! {
_ = cancel.cancelled() => { return; }
commit_lsn = async {
_ = async {
// at most one advertisement per second
tokio::time::sleep(Duration::from_secs(1)).await;
commit_lsn_rx.changed().await
} => { continue; }
} => { }
_ = self.tenant_shard_attachments.changed() => {
tenant_shard_attachments.clear();
tenant_shard_attachments.extend(self.tenant_shard_attachments.borrow_and_update().nodes().cloned().collect());
tenant_shard_attachments.shrink_to_fit();
}
};
let commit_lsn = *commit_lsn_rx.borrow_and_update(); // The rhs deref minimizes time we lock the commit_lsn_tx
for pageserver_handle in tenant_shard_attachments {
// NB: if this function ever becomes slow / may need an .await, make sure
// that other nodes continue to recieve advertisements.
pageserver_handle.advertise_commit_lsn(CommitLsnAdv { ttid, commit_lsn });
}
drop(shard_attachments);
}
drop(gate_guard);
}
}
impl SafekeeperTimelineHandle {
pub fn ready_for_eviction(&self) -> bool {
todo!()
}
}
impl PageserverHandle {
pub async fn advertise_commit_lsn(&self, adv: CommitLsnAdv) {
self.tx
.send(adv)
.await
.expect("Pageserver loop never drops the rx");
pub fn advertise_commit_lsn(&self, adv: CommitLsnAdv) {
let mut state = self.state.write().unwrap();
state.pending_advertisements.insert(ttid, commit_lsn);
}
}
impl Pageserver {}
impl PageserverTask {
/// Cancellation: happens through last PageserverHandle being dropped.
async fn run(mut self) {
let mut pending_advertisements = HashMap::new();
loop {
tokio::select! {
_ = self.cancel.cancelled() => {
return;
}
_ = self.notify.notified() => {}
}
{
let mut state = self.state.locck().unwrap();
pending_advertisements.clear();
pending_advertisements =
std::mem::replace(&mut state.pending_advertisements, pending_advertisements);
drop(state);
}
todo!("grpc call");
// batch updates by sleeping
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
impl From<safekeeper_api::models::TenantShardPageserverAttachment> for ShardAttachmentId {
fn from(value: safekeeper_api::models::TenantShardPageserverAttachment) {