From dc8d63e4ea131adea5e9f55a9d2c6c52ab929380 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Sat, 28 Oct 2023 14:43:29 +0000 Subject: [PATCH] Implement prototype --- .../timeline/walreceiver/connection_manager.rs | 12 ++++++++++-- safekeeper/src/broker.rs | 13 +++++++++++++ storage_broker/src/bin/storage_broker.rs | 2 +- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index edff687a1a..89fd020d0c 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -23,10 +23,11 @@ use anyhow::Context; use chrono::{NaiveDateTime, Utc}; use pageserver_api::models::TimelineState; use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey; -use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::SubscribeSafekeeperInfoRequest; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; -use storage_broker::{BrokerClientChannel, Code, Streaming}; +use storage_broker::proto::{DiscoveryRequest, SafekeeperTimelineInfo}; +use storage_broker::{BrokerClientChannel, Code}; +use storage_broker::Streaming; use tokio::select; use tracing::*; @@ -198,6 +199,13 @@ pub(super) async fn connection_manager_loop_step( }, None => { debug!("No candidates to retry, waiting indefinitely for the broker events"); + tokio::time::sleep(Duration::from_secs(1)).await; + let tenant_timeline_id = Some(ProtoTenantTimelineId { + tenant_id: id.tenant_id.as_ref().to_owned(), + timeline_id: id.timeline_id.as_ref().to_owned(), + }); + let request = DiscoveryRequest { tenant_timeline_id }; + let _ = broker_client.send_discovery(request).await; None } } diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 723b43e488..ea7e9a00e3 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -82,6 +82,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { /// Subscribe and fetch all the interesting data from the broker. async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { + let discovery_conf = conf.clone(); let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?; // TODO: subscribe only to local timelines instead of all @@ -105,11 +106,23 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { .as_ref() .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?; let ttid = parse_proto_ttid(proto_ttid)?; + if let Ok(tli) = GlobalTimelines::get(ttid) { // Note that we also receive *our own* info. That's // important, as it is used as an indication of live // connection to the broker. + if msg.is_discovery { + let mut client = client.clone(); + let sk_info = tli.get_safekeeper_info(&discovery_conf).await; + let stream = async_stream::stream! { + yield sk_info; + }; + tokio::spawn(async move { + let _ = client.publish_safekeeper_info(stream).await; + }); + } + // note: there are blocking operations below, but it's considered fine for now let res = tli.record_safekeeper_info(msg).await; if res.is_ok() { diff --git a/storage_broker/src/bin/storage_broker.rs b/storage_broker/src/bin/storage_broker.rs index b18fcef9d7..38f8770f30 100644 --- a/storage_broker/src/bin/storage_broker.rs +++ b/storage_broker/src/bin/storage_broker.rs @@ -405,7 +405,7 @@ impl BrokerService for Broker { async fn subscribe_discovery( &self, - request: Request, + _request: Request, ) -> Result, Status> { todo!() }