diff --git a/Cargo.lock b/Cargo.lock index 867a86bd3c..3eca2ca1b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6077,6 +6077,7 @@ dependencies = [ "tokio-stream", "tokio-tar", "tokio-util", + "tonic", "tracing", "tracing-subscriber", "url", diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 0a8cc415be..287fada465 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -52,6 +52,7 @@ tokio-postgres.workspace = true tokio-rustls.workspace = true tokio-tar.workspace = true tokio-util = { workspace = true } +tonic = { workspace = true } tracing.workspace = true url.workspace = true metrics.workspace = true diff --git a/safekeeper/src/wal_advertiser/advmap.rs b/safekeeper/src/wal_advertiser/advmap.rs index 68c39f7f1a..2022f2666f 100644 --- a/safekeeper/src/wal_advertiser/advmap.rs +++ b/safekeeper/src/wal_advertiser/advmap.rs @@ -6,11 +6,12 @@ use std::{ time::Duration, }; +use anyhow::Context; use safekeeper_api::models::TenantShardPageserverAttachment; use serde::Serialize; use tokio::sync::{Notify, mpsc}; use tokio_util::sync::CancellationToken; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; use utils::{ generation::Generation, id::{NodeId, TenantId, TenantTimelineId, TimelineId}, @@ -67,11 +68,13 @@ struct PageserverTask { state: Arc>, notify: Arc, cancel: CancellationToken, + pending_advertisements: HashMap, } #[derive(Default)] struct PageserverSharedState { pending_advertisements: HashMap, + endpoint: tonic::endpoint::Endpoint, } struct CommitLsnAdv { @@ -110,7 +113,7 @@ struct SafekeeperTimeline { tli: Arc, /// Shared with [`SafekeeperTenant::tenant_shard_attachments`]. tenant_shard_attachments: tokio::sync::watch::Receiver, - remote_consistent_lsns: HashMap, + remote_consistent_lsns: Mutex>, } #[derive(Default)] @@ -159,6 +162,13 @@ struct ShardAttachmentId { ps_id: NodeId, } +pub struct SafekeeperTimelineHandle {} +impl SafekeeperTimelineHandle { + pub fn ready_for_eviction(&self) -> bool { + todo!() + } +} + impl World { pub fn register_timeline( &self, @@ -182,7 +192,7 @@ impl World { .run() .await; }); - Ok(vacant.insert(SafekeeperTimelineHandle { tx }).clone()) + Ok(vacant.insert(SafekeeperTimelineHandle {}).clone()) } pub fn update_pageserver_attachments( &self, @@ -277,7 +287,6 @@ impl SafekeeperTimeline { // that other nodes continue to recieve advertisements. pageserver_handle.advertise_commit_lsn(CommitLsnAdv { ttid, commit_lsn }); } - drop(shard_attachments); } drop(gate_guard); } @@ -285,6 +294,7 @@ impl SafekeeperTimeline { impl PageserverHandle { pub fn advertise_commit_lsn(&self, adv: CommitLsnAdv) { + let CommitLsnAdv { ttid, commit_lsn } = adv; let mut state = self.state.write().unwrap(); state.pending_advertisements.insert(ttid, commit_lsn); } @@ -299,22 +309,42 @@ impl PageserverTask { _ = self.cancel.cancelled() => { return; } - _ = self.notify.notified() => {} - } - { - let mut state = self.state.locck().unwrap(); - pending_advertisements.clear(); - pending_advertisements = - std::mem::replace(&mut state.pending_advertisements, pending_advertisements); - drop(state); - } - - todo!("grpc call"); - - // batch updates by sleeping - tokio::time::sleep(Duration::from_secs(1)).await; + res = self.run0() => { + if let Err(err) = res { + error!(?err, "pageserver loop failed"); + // TODO: backoff? + cancellation sensitivity + tokio::time::sleep(Duration::from_secs(10)).await; + } + continue; + } + }; } } + async fn run0(&mut self) -> anyhow::Result<()> { + use storage_broker::wal_advertisement::pageserver_client::PageserverClient; + let stream = async_stream::stream! { loop { + while self.pending_advertisements.is_empty() { + tokio::select! { + _ = self.cancel.cancelled() => { + return; + } + _ = self.notify.notified() => {} + } + let mut state = self.state.lock().unwrap(); + std::mem::swap( + &mut state.pending_advertisements, + &mut self.pending_advertisements, + ); + } + } }; + let client: PageserverClient<_> = PageserverClient::connect(todo!()) + .await + .context("connect")?; + let publish_stream = client + .publish_commit_lsn_advertisements(stream) + .await + .context("publish stream")?; + } } impl From for ShardAttachmentId { diff --git a/storage_broker/build.rs b/storage_broker/build.rs index 08dadeacd5..450a8a4db8 100644 --- a/storage_broker/build.rs +++ b/storage_broker/build.rs @@ -5,7 +5,13 @@ fn main() -> Result<(), Box> { // easy location, but apparently interference with cachepot sometimes fails // the build then. Anyway, per cargo docs build script shouldn't output to // anywhere but $OUT_DIR. - tonic_build::compile_protos("proto/broker.proto") - .unwrap_or_else(|e| panic!("failed to compile protos {:?}", e)); + let protos = [ + "proto/broker.proto", + "proto/wal_advertisement.proto", + ]; + for proto in protos { + tonic_build::compile_protos(proto) + .unwrap_or_else(|e| panic!("failed to compile protos {:?}", e)); + } Ok(()) } diff --git a/storage_broker/proto/wal_advertisement.proto b/storage_broker/proto/wal_advertisement.proto new file mode 100644 index 0000000000..6b0e035b57 --- /dev/null +++ b/storage_broker/proto/wal_advertisement.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +package wal_advertisement; + +service Pageserver { + rpc PublishCommitLsnAdvertisements(stream CommitLsnAdvertisement) returns (google.protobuf.Empty) {}; + rpc SubscribeRemoteConsistentLsnAdvertisements(google.protobuf.Empty) returns (stream RemoteConsistentLsnAdvertisement) {}; +} + +message CommitLsnAdvertisement { + TenantTimelineId tenant_timeline_id = 1; + +} + +message RemoteConsistentLsnAdvertisement { + bytes tenant_id = 1; + uint32 shard_id = 2; + bytes timeline_id = 3; + uint64 generation = 4; + uint64 remote_consistent_lsn = 5; +} + +message TenantTimelineId { + bytes tenant_id = 1; + bytes timeline_id = 2; +} + diff --git a/storage_broker/src/lib.rs b/storage_broker/src/lib.rs index a6edea695d..712c214a24 100644 --- a/storage_broker/src/lib.rs +++ b/storage_broker/src/lib.rs @@ -20,6 +20,11 @@ pub mod proto { tonic::include_proto!("storage_broker"); } +pub mod wal_advertisement { + #![allow(clippy::derive_partial_eq_without_eq)] + tonic::include_proto!("wal_advertisement"); +} + pub mod metrics; // Re-exports to avoid direct tonic dependency in user crates.