From 428f532f08f5397b491001b61290e6cedf1c9228 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 4 Jun 2025 15:16:55 +0200 Subject: [PATCH] naive implementation of advertisement generator --- Cargo.lock | 66 +++++++++++++ Cargo.toml | 2 +- libs/sk_ps_discovery/Cargo.toml | 69 +++++++++++++ libs/sk_ps_discovery/src/lib.rs | 159 ++++++++++++++++++++++++++++++ libs/sk_ps_discovery/src/tests.rs | 86 ++++++++++++++++ 5 files changed, 381 insertions(+), 1 deletion(-) create mode 100644 libs/sk_ps_discovery/Cargo.toml create mode 100644 libs/sk_ps_discovery/src/lib.rs create mode 100644 libs/sk_ps_discovery/src/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 3eca2ca1b6..d82916f921 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6573,6 +6573,72 @@ version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" +[[package]] +name = "sk_ps_discovery" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-stream", + "byteorder", + "bytes", + "camino", + "camino-tempfile", + "chrono", + "clap", + "crc32c", + "env_logger", + "fail", + "futures", + "hex", + "http 1.1.0", + "http-utils", + "humantime", + "hyper 0.14.30", + "itertools 0.10.5", + "jsonwebtoken", + "metrics", + "once_cell", + "pageserver_api", + "parking_lot 0.12.1", + "pem", + "postgres-protocol", + "postgres_backend", + "postgres_ffi", + "pprof", + "pq_proto", + "rand 0.8.5", + "regex", + "remote_storage", + "reqwest", + "rustls 0.23.18", + "safekeeper_api", + "safekeeper_client", + "scopeguard", + "sd-notify", + "serde", + "serde_json", + "sha2", + "smallvec", + "storage_broker", + "strum", + "strum_macros", + "thiserror 1.0.69", + "tikv-jemallocator", + "tokio", + "tokio-io-timeout", + "tokio-postgres", + "tokio-rustls 0.26.0", + "tokio-stream", + "tokio-tar", + "tokio-util", + "tonic", + "tracing", + "url", + "utils", + "wal_decoder", + "workspace_hack", +] + [[package]] name = "slab" version = "0.4.8" diff --git a/Cargo.toml b/Cargo.toml index a280c446b9..11fce50bee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ members = [ "libs/proxy/postgres-protocol2", "libs/proxy/postgres-types2", "libs/proxy/tokio-postgres2", - "endpoint_storage", + "endpoint_storage", "libs/sk_ps_discovery", ] [workspace.package] diff --git a/libs/sk_ps_discovery/Cargo.toml b/libs/sk_ps_discovery/Cargo.toml new file mode 100644 index 0000000000..8c761b025e --- /dev/null +++ b/libs/sk_ps_discovery/Cargo.toml @@ -0,0 +1,69 @@ +[package] +name = "sk_ps_discovery" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +workspace_hack.workspace = true + +async-stream.workspace = true +anyhow.workspace = true +byteorder.workspace = true +bytes.workspace = true +camino.workspace = true +camino-tempfile.workspace = true +chrono.workspace = true +clap = { workspace = true, features = ["derive"] } +crc32c.workspace = true +fail.workspace = true +hex.workspace = true +humantime.workspace = true +http.workspace = true +hyper0.workspace = true +itertools.workspace = true +jsonwebtoken.workspace = true +futures.workspace = true +once_cell.workspace = true +parking_lot.workspace = true +pageserver_api.workspace = true +postgres-protocol.workspace = true +pprof.workspace = true +rand.workspace = true +regex.workspace = true +reqwest = { workspace = true, features = ["json"] } +rustls.workspace = true +scopeguard.workspace = true +serde.workspace = true +serde_json.workspace = true +smallvec.workspace = true +strum.workspace = true +strum_macros.workspace = true +thiserror.workspace = true +tikv-jemallocator.workspace = true +tokio = { workspace = true, features = ["fs"] } +tokio-io-timeout.workspace = true +tokio-postgres.workspace = true +tokio-rustls.workspace = true +tokio-tar.workspace = true +tokio-util = { workspace = true } +tonic = { workspace = true } +tracing.workspace = true +url.workspace = true +metrics.workspace = true +pem.workspace = true +postgres_backend.workspace = true +postgres_ffi.workspace = true +pq_proto.workspace = true +remote_storage.workspace = true +safekeeper_api.workspace = true +safekeeper_client.workspace = true +sha2.workspace = true +sd-notify.workspace = true +storage_broker.workspace = true +tokio-stream.workspace = true +http-utils.workspace = true +utils.workspace = true +wal_decoder.workspace = true +env_logger.workspace = true + diff --git a/libs/sk_ps_discovery/src/lib.rs b/libs/sk_ps_discovery/src/lib.rs new file mode 100644 index 0000000000..026331acc2 --- /dev/null +++ b/libs/sk_ps_discovery/src/lib.rs @@ -0,0 +1,159 @@ +#[cfg(test)] +mod tests; + +use std::collections::{HashMap, hash_map}; + +use tracing::{info, warn}; +use utils::{ + generation::Generation, + id::{NodeId, TenantId, TenantTimelineId, TimelineId}, + lsn::Lsn, + shard::ShardIndex, +}; + +#[derive(Debug, Default)] +pub struct World { + attachments: HashMap, + commit_lsns: HashMap, + remote_consistent_lsns: HashMap, +} + +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] +struct TenantShardAttachmentId { + tenant_id: TenantId, + shard_id: ShardIndex, + generation: Generation, +} + +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] +struct TimelineAttachmentId { + tenant_shard_attachment_id: TenantShardAttachmentId, + timeline_id: TimelineId, +} + +pub struct AttachmentUpdate { + tenant_shard_attachment_id: TenantShardAttachmentId, + action: AttachmentUpdateAction, +} + +pub enum AttachmentUpdateAction { + Attach { ps_id: NodeId }, + Detach, +} + +pub struct RemoteConsistentLsnAdv { + attachment: TimelineAttachmentId, + remote_consistent_lsn: Lsn, +} + +impl World { + pub fn update_attachment(&mut self, upd: AttachmentUpdate) { + use AttachmentUpdateAction::*; + use hash_map::Entry::*; + let AttachmentUpdate { + tenant_shard_attachment_id, + action, + } = upd; + match (action, self.attachments.entry(tenant_shard_attachment_id)) { + (Attach { ps_id }, Occupied(e)) if *e.get() == ps_id => { + info!("attachment is already known") + } + (Attach { ps_id }, Occupied(e)) => { + warn!(current_node=%e.get(), proposed_node=%ps_id, "ignoring update that moves attachment to a different pageserver"); + } + (Attach { ps_id }, Vacant(e)) => { + e.insert(ps_id); + } + (Detach, Occupied(e)) => { + e.remove(); + } + (Detach, Vacant(_)) => { + info!("detachment is already known"); + } + } + } + pub fn handle_remote_consistent_lsn_advertisement(&mut self, adv: RemoteConsistentLsnAdv) { + let RemoteConsistentLsnAdv { + attachment, + remote_consistent_lsn, + } = adv; + match self.remote_consistent_lsns.entry(attachment) { + hash_map::Entry::Occupied(mut occupied_entry) => { + let current = occupied_entry.get_mut(); + if !(*current <= remote_consistent_lsn) { + warn!( + "ignoring advertisement because remote_consistent_lsn is moving backwards" + ); + return; + } + *current = remote_consistent_lsn; + } + hash_map::Entry::Vacant(vacant_entry) => { + info!("first time hearing from timeline attachment"); + vacant_entry.insert(remote_consistent_lsn); + } + } + } + pub fn handle_commit_lsn_advancement(&mut self, ttid: TenantTimelineId, commit_lsn: Lsn) { + match self.commit_lsns.entry(ttid) { + hash_map::Entry::Occupied(mut occupied_entry) => { + assert!(*occupied_entry.get() <= commit_lsn); + *occupied_entry.get_mut() = commit_lsn; + } + hash_map::Entry::Vacant(vacant_entry) => { + info!("first time learning about sk timeline"); + vacant_entry.insert(commit_lsn); + } + } + } + + pub fn get_commit_lsn_advertisements(&self) -> HashMap> { + let mut commit_lsn_advertisements_by_node: HashMap> = + Default::default(); + for (timeline_attachment_id, remote_consistent_lsn) in + self.remote_consistent_lsns.iter().map(|(k, v)| (*k, *v)) + { + let tenant_timeline_id = timeline_attachment_id.tenant_timeline_id(); + if let Some(commit_lsn) = self.commit_lsns.get(&tenant_timeline_id).cloned() { + if commit_lsn > remote_consistent_lsn { + if let Some(node_id) = self + .attachments + .get(&timeline_attachment_id.tenant_shard_attachment_id) + { + let for_node = commit_lsn_advertisements_by_node + .entry(*node_id) + .or_default(); + match for_node.entry(tenant_timeline_id) { + hash_map::Entry::Vacant(vacant_entry) => { + vacant_entry.insert(commit_lsn); + } + hash_map::Entry::Occupied(occupied_entry) => { + assert_eq!(*occupied_entry.get(), commit_lsn); + } + } + } + } + } + } + commit_lsn_advertisements_by_node + } +} + +impl TimelineAttachmentId { + pub fn tenant_timeline_id(&self) -> TenantTimelineId { + TenantTimelineId { + tenant_id: self.tenant_shard_attachment_id.tenant_id, + timeline_id: self.timeline_id, + } + } +} + +impl TenantShardAttachmentId { + #[cfg(test)] + pub fn timeline_attachment_id(self, timeline_id: TimelineId) -> TimelineAttachmentId { + TimelineAttachmentId { + tenant_shard_attachment_id: self, + timeline_id, + } + } +} diff --git a/libs/sk_ps_discovery/src/tests.rs b/libs/sk_ps_discovery/src/tests.rs new file mode 100644 index 0000000000..ead1067257 --- /dev/null +++ b/libs/sk_ps_discovery/src/tests.rs @@ -0,0 +1,86 @@ +use utils::id::TenantId; + +use super::*; +use crate::World; + +#[test] +fn basic() { + let mut world = World::default(); + + let tenant_id = TenantId::generate(); + let timeline_id = TimelineId::generate(); + let timeline2 = TimelineId::generate(); + + let attachment1 = TenantShardAttachmentId { + tenant_id, + shard_id: ShardIndex::unsharded(), + generation: Generation::Valid(2), + }; + let attachment2 = TenantShardAttachmentId { + tenant_id, + shard_id: ShardIndex::unsharded(), + generation: Generation::Valid(3), + }; + + let ps1 = NodeId(0x100); + let ps2 = NodeId(0x200); + + // Out of order; in happy path, commit_lsn advances first, but let's test the + // case where safekeeper doesn't know about the attachments yet first, before + // we extend the case to the happy path. + + world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv { + attachment: attachment1.timeline_attachment_id(timeline_id), + remote_consistent_lsn: Lsn(0x23), + }); + world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv { + attachment: attachment2.timeline_attachment_id(timeline_id), + remote_consistent_lsn: Lsn(0x42), + }); + // SK authoritative info on which advertisements ought exist is still empty + assert_eq!(world.get_commit_lsn_advertisements(), HashMap::default()); + world.update_attachment(AttachmentUpdate { + tenant_shard_attachment_id: attachment1, + action: AttachmentUpdateAction::Attach { ps_id: ps1 }, + }); + // We have not inserted any commit_lsn info yet, so, still no advs expected + assert_eq!(world.get_commit_lsn_advertisements(), HashMap::default()); + // insert commit_lsn info for different timeline + world.handle_commit_lsn_advancement( + TenantTimelineId { + tenant_id, + timeline_id: timeline2, + }, + Lsn(0x66), + ); + // Advs should still be empty + assert_eq!(world.get_commit_lsn_advertisements(), HashMap::default()); + + // Ok, out of order part tested. Now Safekeeper learns about the attachments. + + // insert commit_lsn info for the timeline we have remote_consistent_lsn info for + world.handle_commit_lsn_advancement( + TenantTimelineId { + tenant_id, + timeline_id, + }, + Lsn(0x55), + ); + dbg!(&world); + // Now advertisements to attachment1 will be sent out, but attachment2 is still not known, so, no advertisements to it. + { + let mut advs = world.get_commit_lsn_advertisements(); + assert_eq!(advs.len(), 1); + let advs = advs.remove(&ps1).unwrap(); + assert_eq!(advs.len(), 1); + let (tenant_timeline_id, lsn) = advs.into_iter().next().unwrap(); + assert_eq!( + TenantTimelineId { + tenant_id, + timeline_id + }, + tenant_timeline_id + ); + assert_eq!(lsn, Lsn(0x55)); + } +}