naive implementation of advertisement generator

This commit is contained in:
Christian Schwarz
2025-06-04 15:16:55 +02:00
parent 5efb0d8072
commit 428f532f08
5 changed files with 381 additions and 1 deletions

66
Cargo.lock generated
View File

@@ -6573,6 +6573,72 @@ version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "sk_ps_discovery"
version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"byteorder",
"bytes",
"camino",
"camino-tempfile",
"chrono",
"clap",
"crc32c",
"env_logger",
"fail",
"futures",
"hex",
"http 1.1.0",
"http-utils",
"humantime",
"hyper 0.14.30",
"itertools 0.10.5",
"jsonwebtoken",
"metrics",
"once_cell",
"pageserver_api",
"parking_lot 0.12.1",
"pem",
"postgres-protocol",
"postgres_backend",
"postgres_ffi",
"pprof",
"pq_proto",
"rand 0.8.5",
"regex",
"remote_storage",
"reqwest",
"rustls 0.23.18",
"safekeeper_api",
"safekeeper_client",
"scopeguard",
"sd-notify",
"serde",
"serde_json",
"sha2",
"smallvec",
"storage_broker",
"strum",
"strum_macros",
"thiserror 1.0.69",
"tikv-jemallocator",
"tokio",
"tokio-io-timeout",
"tokio-postgres",
"tokio-rustls 0.26.0",
"tokio-stream",
"tokio-tar",
"tokio-util",
"tonic",
"tracing",
"url",
"utils",
"wal_decoder",
"workspace_hack",
]
[[package]]
name = "slab"
version = "0.4.8"

View File

@@ -43,7 +43,7 @@ members = [
"libs/proxy/postgres-protocol2",
"libs/proxy/postgres-types2",
"libs/proxy/tokio-postgres2",
"endpoint_storage",
"endpoint_storage", "libs/sk_ps_discovery",
]
[workspace.package]

View File

@@ -0,0 +1,69 @@
[package]
name = "sk_ps_discovery"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
workspace_hack.workspace = true
async-stream.workspace = true
anyhow.workspace = true
byteorder.workspace = true
bytes.workspace = true
camino.workspace = true
camino-tempfile.workspace = true
chrono.workspace = true
clap = { workspace = true, features = ["derive"] }
crc32c.workspace = true
fail.workspace = true
hex.workspace = true
humantime.workspace = true
http.workspace = true
hyper0.workspace = true
itertools.workspace = true
jsonwebtoken.workspace = true
futures.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
pageserver_api.workspace = true
postgres-protocol.workspace = true
pprof.workspace = true
rand.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["json"] }
rustls.workspace = true
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true
smallvec.workspace = true
strum.workspace = true
strum_macros.workspace = true
thiserror.workspace = true
tikv-jemallocator.workspace = true
tokio = { workspace = true, features = ["fs"] }
tokio-io-timeout.workspace = true
tokio-postgres.workspace = true
tokio-rustls.workspace = true
tokio-tar.workspace = true
tokio-util = { workspace = true }
tonic = { workspace = true }
tracing.workspace = true
url.workspace = true
metrics.workspace = true
pem.workspace = true
postgres_backend.workspace = true
postgres_ffi.workspace = true
pq_proto.workspace = true
remote_storage.workspace = true
safekeeper_api.workspace = true
safekeeper_client.workspace = true
sha2.workspace = true
sd-notify.workspace = true
storage_broker.workspace = true
tokio-stream.workspace = true
http-utils.workspace = true
utils.workspace = true
wal_decoder.workspace = true
env_logger.workspace = true

View File

@@ -0,0 +1,159 @@
#[cfg(test)]
mod tests;
use std::collections::{HashMap, hash_map};
use tracing::{info, warn};
use utils::{
generation::Generation,
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
shard::ShardIndex,
};
#[derive(Debug, Default)]
pub struct World {
attachments: HashMap<TenantShardAttachmentId, NodeId>,
commit_lsns: HashMap<TenantTimelineId, Lsn>,
remote_consistent_lsns: HashMap<TimelineAttachmentId, Lsn>,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
struct TenantShardAttachmentId {
tenant_id: TenantId,
shard_id: ShardIndex,
generation: Generation,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
struct TimelineAttachmentId {
tenant_shard_attachment_id: TenantShardAttachmentId,
timeline_id: TimelineId,
}
pub struct AttachmentUpdate {
tenant_shard_attachment_id: TenantShardAttachmentId,
action: AttachmentUpdateAction,
}
pub enum AttachmentUpdateAction {
Attach { ps_id: NodeId },
Detach,
}
pub struct RemoteConsistentLsnAdv {
attachment: TimelineAttachmentId,
remote_consistent_lsn: Lsn,
}
impl World {
pub fn update_attachment(&mut self, upd: AttachmentUpdate) {
use AttachmentUpdateAction::*;
use hash_map::Entry::*;
let AttachmentUpdate {
tenant_shard_attachment_id,
action,
} = upd;
match (action, self.attachments.entry(tenant_shard_attachment_id)) {
(Attach { ps_id }, Occupied(e)) if *e.get() == ps_id => {
info!("attachment is already known")
}
(Attach { ps_id }, Occupied(e)) => {
warn!(current_node=%e.get(), proposed_node=%ps_id, "ignoring update that moves attachment to a different pageserver");
}
(Attach { ps_id }, Vacant(e)) => {
e.insert(ps_id);
}
(Detach, Occupied(e)) => {
e.remove();
}
(Detach, Vacant(_)) => {
info!("detachment is already known");
}
}
}
pub fn handle_remote_consistent_lsn_advertisement(&mut self, adv: RemoteConsistentLsnAdv) {
let RemoteConsistentLsnAdv {
attachment,
remote_consistent_lsn,
} = adv;
match self.remote_consistent_lsns.entry(attachment) {
hash_map::Entry::Occupied(mut occupied_entry) => {
let current = occupied_entry.get_mut();
if !(*current <= remote_consistent_lsn) {
warn!(
"ignoring advertisement because remote_consistent_lsn is moving backwards"
);
return;
}
*current = remote_consistent_lsn;
}
hash_map::Entry::Vacant(vacant_entry) => {
info!("first time hearing from timeline attachment");
vacant_entry.insert(remote_consistent_lsn);
}
}
}
pub fn handle_commit_lsn_advancement(&mut self, ttid: TenantTimelineId, commit_lsn: Lsn) {
match self.commit_lsns.entry(ttid) {
hash_map::Entry::Occupied(mut occupied_entry) => {
assert!(*occupied_entry.get() <= commit_lsn);
*occupied_entry.get_mut() = commit_lsn;
}
hash_map::Entry::Vacant(vacant_entry) => {
info!("first time learning about sk timeline");
vacant_entry.insert(commit_lsn);
}
}
}
pub fn get_commit_lsn_advertisements(&self) -> HashMap<NodeId, HashMap<TenantTimelineId, Lsn>> {
let mut commit_lsn_advertisements_by_node: HashMap<NodeId, HashMap<TenantTimelineId, Lsn>> =
Default::default();
for (timeline_attachment_id, remote_consistent_lsn) in
self.remote_consistent_lsns.iter().map(|(k, v)| (*k, *v))
{
let tenant_timeline_id = timeline_attachment_id.tenant_timeline_id();
if let Some(commit_lsn) = self.commit_lsns.get(&tenant_timeline_id).cloned() {
if commit_lsn > remote_consistent_lsn {
if let Some(node_id) = self
.attachments
.get(&timeline_attachment_id.tenant_shard_attachment_id)
{
let for_node = commit_lsn_advertisements_by_node
.entry(*node_id)
.or_default();
match for_node.entry(tenant_timeline_id) {
hash_map::Entry::Vacant(vacant_entry) => {
vacant_entry.insert(commit_lsn);
}
hash_map::Entry::Occupied(occupied_entry) => {
assert_eq!(*occupied_entry.get(), commit_lsn);
}
}
}
}
}
}
commit_lsn_advertisements_by_node
}
}
impl TimelineAttachmentId {
pub fn tenant_timeline_id(&self) -> TenantTimelineId {
TenantTimelineId {
tenant_id: self.tenant_shard_attachment_id.tenant_id,
timeline_id: self.timeline_id,
}
}
}
impl TenantShardAttachmentId {
#[cfg(test)]
pub fn timeline_attachment_id(self, timeline_id: TimelineId) -> TimelineAttachmentId {
TimelineAttachmentId {
tenant_shard_attachment_id: self,
timeline_id,
}
}
}

View File

@@ -0,0 +1,86 @@
use utils::id::TenantId;
use super::*;
use crate::World;
#[test]
fn basic() {
let mut world = World::default();
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let timeline2 = TimelineId::generate();
let attachment1 = TenantShardAttachmentId {
tenant_id,
shard_id: ShardIndex::unsharded(),
generation: Generation::Valid(2),
};
let attachment2 = TenantShardAttachmentId {
tenant_id,
shard_id: ShardIndex::unsharded(),
generation: Generation::Valid(3),
};
let ps1 = NodeId(0x100);
let ps2 = NodeId(0x200);
// Out of order; in happy path, commit_lsn advances first, but let's test the
// case where safekeeper doesn't know about the attachments yet first, before
// we extend the case to the happy path.
world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv {
attachment: attachment1.timeline_attachment_id(timeline_id),
remote_consistent_lsn: Lsn(0x23),
});
world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv {
attachment: attachment2.timeline_attachment_id(timeline_id),
remote_consistent_lsn: Lsn(0x42),
});
// SK authoritative info on which advertisements ought exist is still empty
assert_eq!(world.get_commit_lsn_advertisements(), HashMap::default());
world.update_attachment(AttachmentUpdate {
tenant_shard_attachment_id: attachment1,
action: AttachmentUpdateAction::Attach { ps_id: ps1 },
});
// We have not inserted any commit_lsn info yet, so, still no advs expected
assert_eq!(world.get_commit_lsn_advertisements(), HashMap::default());
// insert commit_lsn info for different timeline
world.handle_commit_lsn_advancement(
TenantTimelineId {
tenant_id,
timeline_id: timeline2,
},
Lsn(0x66),
);
// Advs should still be empty
assert_eq!(world.get_commit_lsn_advertisements(), HashMap::default());
// Ok, out of order part tested. Now Safekeeper learns about the attachments.
// insert commit_lsn info for the timeline we have remote_consistent_lsn info for
world.handle_commit_lsn_advancement(
TenantTimelineId {
tenant_id,
timeline_id,
},
Lsn(0x55),
);
dbg!(&world);
// Now advertisements to attachment1 will be sent out, but attachment2 is still not known, so, no advertisements to it.
{
let mut advs = world.get_commit_lsn_advertisements();
assert_eq!(advs.len(), 1);
let advs = advs.remove(&ps1).unwrap();
assert_eq!(advs.len(), 1);
let (tenant_timeline_id, lsn) = advs.into_iter().next().unwrap();
assert_eq!(
TenantTimelineId {
tenant_id,
timeline_id
},
tenant_timeline_id
);
assert_eq!(lsn, Lsn(0x55));
}
}