diff --git a/Cargo.lock b/Cargo.lock index c3bb7b87b3..867a86bd3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6678,6 +6678,7 @@ dependencies = [ "rustls 0.23.18", "tokio", "tokio-rustls 0.26.0", + "tokio-util", "tonic", "tonic-build", "tracing", diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 73ad7f5842..1e71219507 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -10,7 +10,7 @@ use utils::generation::Generation; use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId}; use utils::lsn::Lsn; use utils::pageserver_feedback::PageserverFeedback; -use utils::shard::{ShardIndex, TenantShardId}; +use utils::shard::ShardIndex; use crate::membership::Configuration; use crate::{ServerInfo, Term}; diff --git a/libs/utils/src/postgres_client.rs b/libs/utils/src/postgres_client.rs index 4167839e28..d5e332750c 100644 --- a/libs/utils/src/postgres_client.rs +++ b/libs/utils/src/postgres_client.rs @@ -46,6 +46,8 @@ pub struct ConnectionConfigArgs<'a> { pub auth_token: Option<&'a str>, pub availability_zone: Option<&'a str>, + + pub pageserver_generation: Option, } impl<'a> ConnectionConfigArgs<'a> { @@ -72,6 +74,10 @@ impl<'a> ConnectionConfigArgs<'a> { )); } + if let Some(pageserver_generation) = self.pageserver_generation { + options.push(format!("pageserver_generation={pageserver_generation}")); + } + options } } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 4c2572a577..2f2b652c2e 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -423,11 +423,14 @@ fn start_pageserver( .map(storage_broker::Certificate::from_pem), ); // Note: we do not attempt connecting here (but validate endpoints sanity). - storage_broker::connect( + let service_client = storage_broker::connect( conf.broker_endpoint.clone(), conf.broker_keepalive_interval, tls_config, - ) + )?; + anyhow::Ok(storage_broker::TimelineUpdatesSubscriber::new( + service_client, + )) }) .with_context(|| { format!( diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 2edec9dda1..3182097818 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -100,7 +100,7 @@ pub struct State { auth: Option>, allowlist_routes: &'static [&'static str], remote_storage: GenericRemoteStorage, - broker_client: storage_broker::BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, disk_usage_eviction_state: Arc, deletion_queue_client: DeletionQueueClient, secondary_controller: SecondaryController, @@ -114,7 +114,7 @@ impl State { tenant_manager: Arc, auth: Option>, remote_storage: GenericRemoteStorage, - broker_client: storage_broker::BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, disk_usage_eviction_state: Arc, deletion_queue_client: DeletionQueueClient, secondary_controller: SecondaryController, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 441049f47d..dce89be44b 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -48,7 +48,6 @@ use remote_timeline_client::{ download_tenant_manifest, }; use secondary::heatmap::{HeatMapTenant, HeatMapTimeline}; -use storage_broker::BrokerClientChannel; use timeline::compaction::{CompactionOutcome, GcCompactionQueue}; use timeline::import_pgdata::ImportingTimeline; use timeline::offload::{OffloadError, offload_timeline}; @@ -153,7 +152,7 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines"; /// as the shared remote storage client and process initialization state. #[derive(Clone)] pub struct TenantSharedResources { - pub broker_client: storage_broker::BrokerClientChannel, + pub broker_client: storage_broker::TimelineUpdatesSubscriber, pub remote_storage: GenericRemoteStorage, pub deletion_queue_client: DeletionQueueClient, pub l0_flush_global_state: L0FlushGlobalState, @@ -2107,7 +2106,7 @@ impl TenantShard { async fn unoffload_timeline( self: &Arc, timeline_id: TimelineId, - broker_client: storage_broker::BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, ctx: RequestContext, ) -> Result, TimelineArchivalError> { info!("unoffloading timeline"); @@ -2242,7 +2241,7 @@ impl TenantShard { self: &Arc, timeline_id: TimelineId, new_state: TimelineArchivalState, - broker_client: storage_broker::BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, ctx: RequestContext, ) -> Result<(), TimelineArchivalError> { info!("setting timeline archival config"); @@ -2571,7 +2570,7 @@ impl TenantShard { pub(crate) async fn create_timeline( self: &Arc, params: CreateTimelineParams, - broker_client: storage_broker::BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, ctx: &RequestContext, ) -> Result, CreateTimelineError> { if !self.is_active() { @@ -3299,7 +3298,7 @@ impl TenantShard { /// to delay background jobs. Background jobs can be started right away when None is given. fn activate( self: &Arc, - broker_client: BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, background_jobs_can_start: Option<&completion::Barrier>, ctx: &RequestContext, ) { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d7f5958128..a2430c7ec9 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -61,7 +61,6 @@ use postgres_ffi::{WAL_SEGMENT_SIZE, to_pg_timestamp}; use rand::Rng; use remote_storage::DownloadError; use serde_with::serde_as; -use storage_broker::BrokerClientChannel; use tokio::runtime::Handle; use tokio::sync::mpsc::Sender; use tokio::sync::{Notify, oneshot, watch}; @@ -2080,7 +2079,7 @@ impl Timeline { pub(crate) fn activate( self: &Arc, parent: Arc, - broker_client: BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, background_jobs_can_start: Option<&completion::Barrier>, ctx: &RequestContext, ) { @@ -3114,7 +3113,7 @@ impl Timeline { fn launch_wal_receiver( self: &Arc, ctx: &RequestContext, - broker_client: BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, ) { info!( "launching WAL receiver for timeline {} of tenant {}", diff --git a/pageserver/src/tenant/timeline/uninit.rs b/pageserver/src/tenant/timeline/uninit.rs index beebf35462..3da49a1007 100644 --- a/pageserver/src/tenant/timeline/uninit.rs +++ b/pageserver/src/tenant/timeline/uninit.rs @@ -161,7 +161,7 @@ impl<'t> UninitializedTimeline<'t> { tenant: Arc, copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin), base_lsn: Lsn, - broker_client: storage_broker::BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, ctx: &RequestContext, ) -> anyhow::Result> { self.write(|raw_timeline| async move { diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index 0f73eb839b..eb36c6ca32 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -28,7 +28,6 @@ use std::num::NonZeroU64; use std::sync::Arc; use std::time::Duration; -use storage_broker::BrokerClientChannel; use tokio::sync::watch; use tokio_util::sync::CancellationToken; use tracing::*; @@ -70,7 +69,7 @@ impl WalReceiver { pub fn start( timeline: Arc, conf: WalReceiverConf, - mut broker_client: BrokerClientChannel, + mut broker_client: storage_broker::TimelineUpdatesSubscriber, ctx: &RequestContext, ) -> Self { let tenant_shard_id = timeline.tenant_shard_id; diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 3c3608d1bd..7f86f45393 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -17,19 +17,12 @@ use std::time::Duration; use anyhow::Context; use chrono::{NaiveDateTime, Utc}; +use futures::StreamExt; use pageserver_api::models::TimelineState; use postgres_connection::PgConnectionConfig; -use storage_broker::proto::{ - FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse, - SubscribeByFilterRequest, TenantTimelineId as ProtoTenantTimelineId, TypeSubscription, - TypedMessage, -}; -use storage_broker::{BrokerClientChannel, Code, Streaming}; +use storage_broker::proto::SafekeeperDiscoveryResponse; use tokio_util::sync::CancellationToken; use tracing::*; -use utils::backoff::{ - DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff, -}; use utils::id::{NodeId, TenantTimelineId}; use utils::lsn::Lsn; use utils::postgres_client::{ @@ -56,7 +49,7 @@ pub(crate) struct Cancelled; /// /// Not cancellation-safe. Use `cancel` token to request cancellation. pub(super) async fn connection_manager_loop_step( - broker_client: &mut BrokerClientChannel, + broker_client: &mut storage_broker::TimelineUpdatesSubscriber, connection_manager_state: &mut ConnectionManagerState, ctx: &RequestContext, cancel: &CancellationToken, @@ -81,11 +74,6 @@ pub(super) async fn connection_manager_loop_step( WALRECEIVER_ACTIVE_MANAGERS.dec(); } - let id = TenantTimelineId { - tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id, - timeline_id: connection_manager_state.timeline.timeline_id, - }; - let mut timeline_state_updates = connection_manager_state .timeline .subscribe_for_state_updates(); @@ -101,7 +89,12 @@ pub(super) async fn connection_manager_loop_step( // Subscribe to the broker updates. Stream shares underlying TCP connection // with other streams on this client (other connection managers). When // object goes out of scope, stream finishes in drop() automatically. - let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?; + let (timeline_updates, mut discovery_requester) = broker_client.subscribe( + connection_manager_state.timeline.tenant_shard_id, + connection_manager_state.timeline.timeline_id, + cancel, + ); + let mut timeline_updates = Box::pin(timeline_updates); debug!("Subscribed for broker timeline updates"); loop { @@ -155,29 +148,10 @@ pub(super) async fn connection_manager_loop_step( } }, - // Got a new update from the broker - broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => { - match broker_update { - Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update), - Err(status) => { - match status.code() { - Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => { - // tonic's error handling doesn't provide a clear code for disconnections: we get - // "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe" - // => https://github.com/neondatabase/neon/issues/9562 - info!("broker disconnected: {status}"); - }, - _ => { - warn!("broker subscription failed: {status}"); - } - } - return Ok(()); - } - Ok(None) => { - error!("broker subscription stream ended"); // can't happen - return Ok(()); - } - } + // Got a new update from the broker. + // The stream ends with None if and only if `cancel` is cancelled. + Some(timeline_update) = timeline_updates.next() => { + connection_manager_state.register_timeline_update(timeline_update) }, new_event = async { @@ -258,32 +232,11 @@ pub(super) async fn connection_manager_loop_step( tokio::time::sleep(next_discovery_ts - now).await; } - let tenant_timeline_id = Some(ProtoTenantTimelineId { - tenant_id: id.tenant_id.as_ref().to_owned(), - timeline_id: id.timeline_id.as_ref().to_owned(), - }); - let request = SafekeeperDiscoveryRequest { tenant_timeline_id }; - let msg = TypedMessage { - r#type: MessageType::SafekeeperDiscoveryRequest as i32, - safekeeper_timeline_info: None, - safekeeper_discovery_request: Some(request), - safekeeper_discovery_response: None, - }; + info!("No active connection and no candidates, sending discovery request to the broker"); + discovery_requester.request().await; last_discovery_ts = Some(std::time::Instant::now()); - info!("No active connection and no candidates, sending discovery request to the broker"); - // Cancellation safety: we want to send a message to the broker, but publish_one() - // function can get cancelled by the other select! arm. This is absolutely fine, because - // we just want to receive broker updates and discovery is not important if we already - // receive updates. - // - // It is possible that `last_discovery_ts` will be updated, but the message will not be sent. - // This is totally fine because of the reason above. - - // This is a fire-and-forget request, we don't care about the response - let _ = broker_client.publish_one(msg).await; - debug!("Discovery request sent to the broker"); None } => {} } @@ -298,63 +251,6 @@ pub(super) async fn connection_manager_loop_step( } } -/// Endlessly try to subscribe for broker updates for a given timeline. -async fn subscribe_for_timeline_updates( - broker_client: &mut BrokerClientChannel, - id: TenantTimelineId, - cancel: &CancellationToken, -) -> Result, Cancelled> { - let mut attempt = 0; - loop { - exponential_backoff( - attempt, - DEFAULT_BASE_BACKOFF_SECONDS, - DEFAULT_MAX_BACKOFF_SECONDS, - cancel, - ) - .await; - attempt += 1; - - // subscribe to the specific timeline - let request = SubscribeByFilterRequest { - types: vec![ - TypeSubscription { - r#type: MessageType::SafekeeperTimelineInfo as i32, - }, - TypeSubscription { - r#type: MessageType::SafekeeperDiscoveryResponse as i32, - }, - ], - tenant_timeline_id: Some(FilterTenantTimelineId { - enabled: true, - tenant_timeline_id: Some(ProtoTenantTimelineId { - tenant_id: id.tenant_id.as_ref().to_owned(), - timeline_id: id.timeline_id.as_ref().to_owned(), - }), - }), - }; - - match { - tokio::select! { - r = broker_client.subscribe_by_filter(request) => { r } - _ = cancel.cancelled() => { return Err(Cancelled); } - } - } { - Ok(resp) => { - return Ok(resp.into_inner()); - } - Err(e) => { - // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and - // entire WAL is streamed. Keep this noticeable with logging, but do not warn/error. - info!( - "Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}" - ); - continue; - } - } - } -} - const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1; const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0; const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5; @@ -695,44 +591,14 @@ impl ConnectionManagerState { } /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key. - fn register_timeline_update(&mut self, typed_msg: TypedMessage) { - let mut is_discovery = false; - let timeline_update = match typed_msg.r#type() { - MessageType::SafekeeperTimelineInfo => { - let info = match typed_msg.safekeeper_timeline_info { - Some(info) => info, - None => { - warn!("bad proto message from broker: no safekeeper_timeline_info"); - return; - } - }; - SafekeeperDiscoveryResponse { - safekeeper_id: info.safekeeper_id, - tenant_timeline_id: info.tenant_timeline_id, - commit_lsn: info.commit_lsn, - safekeeper_connstr: info.safekeeper_connstr, - availability_zone: info.availability_zone, - standby_horizon: info.standby_horizon, - } - } - MessageType::SafekeeperDiscoveryResponse => { - is_discovery = true; - match typed_msg.safekeeper_discovery_response { - Some(response) => response, - None => { - warn!("bad proto message from broker: no safekeeper_discovery_response"); - return; - } - } - } - _ => { - // unexpected message - return; - } - }; - + fn register_timeline_update(&mut self, timeline_update: storage_broker::TimelineShardUpdate) { WALRECEIVER_BROKER_UPDATES.inc(); + let storage_broker::TimelineShardUpdate { + is_discovery, + inner: timeline_update, + } = timeline_update; + trace!( "safekeeper info update: standby_horizon(cutoff)={}", timeline_update.standby_horizon @@ -1013,7 +879,9 @@ impl ConnectionManagerState { shard_stripe_size, listen_pg_addr_str: info.safekeeper_connstr.as_ref(), auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()), - availability_zone: self.conf.availability_zone.as_deref() + availability_zone: self.conf.availability_zone.as_deref(), + // TODO: do we still have the emergency mode that runs without generations? If so, this expect would panic in that mode. + pageserver_generation: Some(self.timeline.generation.into().expect("attachments always have a generation number nowadays")), }; match wal_stream_connection_config(connection_conf_args) { diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 8d31ada24f..dade95259a 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -25,7 +25,7 @@ use safekeeper::defaults::{ use safekeeper::wal_backup::WalBackup; use safekeeper::{ BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, - WAL_SERVICE_RUNTIME, broker, control_file, http, wal_service, + WAL_SERVICE_RUNTIME, broker, control_file, http, wal_advertiser, wal_service, }; use sd_notify::NotifyState; use storage_broker::{DEFAULT_ENDPOINT, Uri}; diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 3b15cf8d70..41e9d6a6cd 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -50,7 +50,8 @@ async fn push_loop( conf.broker_endpoint.clone(), conf.broker_keepalive_interval, make_tls_config(&conf), - )?; + )? + .into_raw_grpc_client(); let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC); let outbound = async_stream::stream! { @@ -97,7 +98,8 @@ async fn pull_loop( conf.broker_endpoint.clone(), conf.broker_keepalive_interval, make_tls_config(&conf), - )?; + )? + .into_raw_grpc_client(); // TODO: subscribe only to local timelines instead of all let request = SubscribeSafekeeperInfoRequest { @@ -153,7 +155,8 @@ async fn discover_loop( conf.broker_endpoint.clone(), conf.broker_keepalive_interval, make_tls_config(&conf), - )?; + )? + .into_raw_grpc_client(); let request = SubscribeByFilterRequest { types: vec![TypeSubscription { diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index b54bee8bfb..fd3d762903 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -18,6 +18,7 @@ use safekeeper_api::models::ConnectionId; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{Instrument, debug, info, info_span}; use utils::auth::{Claims, JwtAuth, Scope}; +use utils::generation::{self, Generation}; use utils::id::{TenantId, TenantTimelineId, TimelineId}; use utils::lsn::Lsn; use utils::postgres_client::PostgresClientProtocol; @@ -37,6 +38,7 @@ pub struct SafekeeperPostgresHandler { pub timeline_id: Option, pub ttid: TenantTimelineId, pub shard: Option, + pub pageserver_generation: Option, pub protocol: Option, /// Unique connection id is logged in spans for observability. pub conn_id: ConnectionId, @@ -159,6 +161,7 @@ impl postgres_backend::Handler let mut shard_count: Option = None; let mut shard_number: Option = None; let mut shard_stripe_size: Option = None; + let mut pageserver_generation: Option = None; for opt in options { // FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy, @@ -201,6 +204,12 @@ impl postgres_backend::Handler format!("Failed to parse {value} as shard stripe size") })?); } + Some(("pageserver_generation", value)) => { + self.pageserver_generation = + Some(value.parse::().map(Generation::new).with_context( + || format!("Failed to parse {value} as generation"), + )?); + } _ => continue, } } @@ -259,6 +268,12 @@ impl postgres_backend::Handler tracing::Span::current().record("shard", tracing::field::display(slug)); } } + if let Some(pageserver_generation) = self.pageserver_generation { + tracing::Span::current().record( + "pageserver_generation", + tracing::field::display(pageserver_generation.get_suffix()), + ); + } Ok(()) } else { @@ -370,6 +385,7 @@ impl SafekeeperPostgresHandler { timeline_id: None, ttid: TenantTimelineId::empty(), shard: None, + pageserver_generation: None, protocol: None, conn_id, claims: None, diff --git a/safekeeper/src/http/mod.rs b/safekeeper/src/http/mod.rs index 0003310763..acfb910787 100644 --- a/safekeeper/src/http/mod.rs +++ b/safekeeper/src/http/mod.rs @@ -5,7 +5,7 @@ pub use routes::make_router; pub use safekeeper_api::models; use tokio_util::sync::CancellationToken; -use crate::{GlobalTimelines, SafeKeeperConf}; +use crate::{GlobalTimelines, SafeKeeperConf, wal_advertiser}; pub async fn task_main_http( conf: Arc, diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 337601c44a..1e76a0a3cd 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -17,9 +17,10 @@ use hyper::{Body, Request, Response, StatusCode}; use pem::Pem; use postgres_ffi::WAL_SEGMENT_SIZE; use safekeeper_api::models::{ - AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult, - TenantShardPageserverAttachmentChange, TermSwitchApiEntry, TimelineCopyRequest, - TimelineCreateRequest, TimelineDeleteResult, TimelineStatus, TimelineTermBumpRequest, + AcceptorStateStatus, PullTimelineRequest, PutTenantPageserverLocationRequest, SafekeeperStatus, + SkTimelineInfo, TenantDeleteResult, TenantShardPageserverLocation, TermSwitchApiEntry, + TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, TimelineStatus, + TimelineTermBumpRequest, }; use safekeeper_api::{ServerInfo, membership, models}; use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId}; @@ -31,12 +32,15 @@ use tracing::{Instrument, info_span}; use utils::auth::SwappableJwtAuth; use utils::id::{TenantId, TenantTimelineId, TimelineId}; use utils::lsn::Lsn; +use utils::shard::TenantShardId; use crate::debug_dump::TimelineDigestRequest; use crate::safekeeper::TermLsn; use crate::timelines_global_map::DeleteOrExclude; +use crate::wal_advertiser::advmap::UpdatePageserverAttachmentsArg; use crate::{ GlobalTimelines, SafeKeeperConf, copy_timeline, debug_dump, patch_control_file, pull_timeline, + wal_advertiser, }; /// Healthcheck handler. @@ -72,7 +76,7 @@ async fn post_tenant_pageserver_attachments(mut request: Request) -> Resul check_permission(&request, Some(tenant_id))?; let body: TenantShardPageserverAttachmentChange = json_request(&mut request).await?; let global_timelines = get_global_timelines(&request); - + match body { TenantShardPageserverAttachmentChange::Attach(tenant_shard_pageserver_attachment) => { todo!() @@ -107,6 +111,39 @@ async fn tenant_delete_handler(mut request: Request) -> Result, +) -> Result, ApiError> { + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; + + let PutTenantPageserverLocationRequest { + pageserver_locations, + }: PutTenantPageserverLocationRequest = json_request(&mut request).await?; + + let global_timelines = get_global_timelines(&request); + let wal_advertiser = global_timelines.get_wal_advertiser(); + wal_advertiser + .update_pageserver_attachments( + tenant_shard_id, + pageserver_locations + .into_iter() + .map( + |TenantShardPageserverLocation { + generation, + pageserver_node_id, + }| UpdatePageserverAttachmentsArg { + generation, + pageserver_node_id, + }, + ) + .collect(), + ) + .map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, ()) +} + async fn timeline_create_handler(mut request: Request) -> Result, ApiError> { let request_data: TimelineCreateRequest = json_request(&mut request).await?; @@ -740,6 +777,9 @@ pub fn make_router( .delete("/v1/tenant/:tenant_id", |r| { request_span(r, tenant_delete_handler) }) + .put("/v1/tenant/:tenant_shard_id/pageserver_attachments", |r| { + request_span(r, tenant_put_pageserver_attachments) + }) // Will be used in the future instead of implicit timeline creation .post("/v1/tenant/timeline", |r| { request_span(r, timeline_create_handler) diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index b4d9cadd6d..72b3689125 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -38,6 +38,7 @@ pub mod timeline_eviction; pub mod timeline_guard; pub mod timeline_manager; pub mod timelines_set; +pub mod wal_advertiser; pub mod wal_backup; pub mod wal_backup_partial; pub mod wal_reader_stream; diff --git a/safekeeper/src/recovery.rs b/safekeeper/src/recovery.rs index 577a2f694e..b2b4e935f0 100644 --- a/safekeeper/src/recovery.rs +++ b/safekeeper/src/recovery.rs @@ -360,6 +360,7 @@ async fn recovery_stream( listen_pg_addr_str: &donor.pg_connstr, auth_token: None, availability_zone: None, + pageserver_generation: None, }; let cfg = wal_stream_connection_config(connection_conf_args)?; let mut cfg = cfg.to_tokio_postgres_config(); diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 05f827494e..f52d53316c 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -37,6 +37,7 @@ use crate::send_interpreted_wal::{ Batch, InterpretedWalReader, InterpretedWalReaderHandle, InterpretedWalSender, }; use crate::timeline::WalResidentTimeline; +use crate::wal_advertiser; use crate::wal_reader_stream::StreamingWalReader; use crate::wal_storage::WalReader; @@ -657,10 +658,29 @@ impl SafekeeperPostgresHandler { let tli_cancel = tli.cancel.clone(); + let wal_advertiser = match (self.shard, self.pageserver_generation) { + (Some(shard), Some(pageserver_generation)) => { + Some(tli.wal_advertiser.get_pageserver_timeline( + self.ttid, + shard.shard_index(), + pageserver_generation, + )) + } + (shard, pageserver_generation) => { + debug!( + ?shard, + ?pageserver_generation, + "cannot feedback last_record_lsn to wal_advertiser subsystem, client must specify shard and pageserver_generation" + ); + None + } + }; + let mut reply_reader = ReplyReader { reader, ws_guard: ws_guard.clone(), tli, + wal_advertiser, }; let res = tokio::select! { @@ -977,6 +997,7 @@ struct ReplyReader { reader: PostgresBackendReader, ws_guard: Arc, tli: WalResidentTimeline, + wal_advertiser: Option>, } impl ReplyReader { @@ -1023,6 +1044,9 @@ impl ReplyReader { self.tli .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn) .await; + if let Some(wal_advertiser) = &self.wal_advertiser { + wal_advertiser.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn); + } // in principle new remote_consistent_lsn could allow to // deactivate the timeline, but we check that regularly through // broker updated, not need to do it here diff --git a/safekeeper/src/test_utils.rs b/safekeeper/src/test_utils.rs index e2817c8337..c4cfd0656e 100644 --- a/safekeeper/src/test_utils.rs +++ b/safekeeper/src/test_utils.rs @@ -108,6 +108,7 @@ impl Env { &timeline_dir, &remote_path, shared_state, + todo!(), conf.clone(), wal_backup.clone(), ); diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 588bd4f2c9..f1e0710382 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -24,6 +24,7 @@ use tokio_util::sync::CancellationToken; use tracing::*; use utils::id::{NodeId, TenantId, TenantTimelineId}; use utils::lsn::Lsn; +use utils::shard::TenantShardId; use utils::sync::gate::Gate; use crate::metrics::{FullTimelineInfo, MISC_OPERATION_SECONDS, WalStorageMetrics}; @@ -39,7 +40,9 @@ use crate::wal_backup; use crate::wal_backup::{WalBackup, remote_timeline_path}; use crate::wal_backup_partial::PartialRemoteSegment; use crate::wal_storage::{Storage as wal_storage_iface, WalReader}; -use crate::{SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_storage}; +use crate::{ + SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_advertiser, wal_storage, +}; fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo { PeerInfo { @@ -447,6 +450,7 @@ pub struct Timeline { /// synchronized with the disk. This is tokio mutex as we write WAL to disk /// while holding it, ensuring that consensus checks are in order. mutex: RwLock, + pub(crate) wal_advertiser: Arc, walsenders: Arc, walreceivers: Arc, timeline_dir: Utf8PathBuf, @@ -478,6 +482,7 @@ impl Timeline { timeline_dir: &Utf8Path, remote_path: &RemotePath, shared_state: SharedState, + wal_advertiser: Arc, conf: Arc, wal_backup: Arc, ) -> Arc { @@ -502,6 +507,7 @@ impl Timeline { shared_state_version_tx, shared_state_version_rx, mutex: RwLock::new(shared_state), + wal_advertiser, walsenders: WalSenders::new(walreceivers.clone()), walreceivers, gate: Default::default(), @@ -522,6 +528,7 @@ impl Timeline { conf: Arc, ttid: TenantTimelineId, wal_backup: Arc, + wal_advertiser: Arc, ) -> Result> { let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered(); @@ -534,6 +541,7 @@ impl Timeline { &timeline_dir, &remote_path, shared_state, + wal_advertiser, conf, wal_backup, )) diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index af33bcbd20..0898aaee9f 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -2,6 +2,7 @@ //! All timelines should always be present in this map, this is done by loading them //! all from the disk on startup and keeping them in memory. +use std::any; use std::collections::HashMap; use std::str::FromStr; use std::sync::{Arc, Mutex}; @@ -11,13 +12,16 @@ use anyhow::{Context, Result, bail}; use camino::Utf8PathBuf; use camino_tempfile::Utf8TempDir; use safekeeper_api::membership::Configuration; -use safekeeper_api::models::{SafekeeperUtilization, TimelineDeleteResult}; +use safekeeper_api::models::{ + SafekeeperUtilization, TenantShardPageserverLocation, TimelineDeleteResult, +}; use safekeeper_api::{ServerInfo, membership}; use tokio::fs; use tracing::*; use utils::crashsafe::{durable_rename, fsync_async_opt}; use utils::id::{TenantId, TenantTimelineId, TimelineId}; use utils::lsn::Lsn; +use utils::shard::TenantShardId; use crate::defaults::DEFAULT_EVICTION_CONCURRENCY; use crate::http::routes::DeleteOrExcludeError; @@ -27,7 +31,7 @@ use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_t use crate::timelines_set::TimelinesSet; use crate::wal_backup::WalBackup; use crate::wal_storage::Storage; -use crate::{SafeKeeperConf, control_file, wal_storage}; +use crate::{SafeKeeperConf, control_file, wal_advertiser, wal_storage}; // Timeline entry in the global map: either a ready timeline, or mark that it is // being created. @@ -47,6 +51,7 @@ struct GlobalTimelinesState { conf: Arc, broker_active_set: Arc, + wal_advertisement: Arc, global_rate_limiter: RateLimiter, wal_backup: Arc, } @@ -54,18 +59,26 @@ struct GlobalTimelinesState { impl GlobalTimelinesState { /// Get dependencies for a timeline constructor. fn get_dependencies( + &self, + , ) -> ( + Arc, + Arc, + RateLimiter, Arc, + , + Arc, ) { ( self.conf.clone(), self.broker_active_set.clone(), self.global_rate_limiter.clone(), self.wal_backup.clone(), + self.wal_advertisement.clone(), ) } @@ -101,6 +114,7 @@ impl GlobalTimelines { tombstones: HashMap::new(), conf, broker_active_set: Arc::new(TimelinesSet::default()), + wal_advertisement: Arc::new(wal_advertiser::advmap::World::default()), global_rate_limiter: RateLimiter::new(1, 1), wal_backup, }), @@ -158,12 +172,13 @@ impl GlobalTimelines { /// just lock and unlock it for each timeline -- this function is called /// during init when nothing else is running, so this is fine. async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> { - let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = { + let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup, wal_advertiser) = { let state = self.state.lock().unwrap(); state.get_dependencies() }; let timelines_dir = get_tenant_dir(&conf, &tenant_id); + for timelines_dir_entry in std::fs::read_dir(&timelines_dir) .with_context(|| format!("failed to list timelines dir {}", timelines_dir))? { @@ -173,7 +188,8 @@ impl GlobalTimelines { TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or("")) { let ttid = TenantTimelineId::new(tenant_id, timeline_id); - match Timeline::load_timeline(conf.clone(), ttid, wal_backup.clone()) { + let wal_advertiser = wal_advertiser.load_timeline(ttid); + match Timeline::load_timeline(conf.clone(), ttid, wal_backup.clone(), wal_advertiser) { Ok(tli) => { let mut shared_state = tli.write_shared_state().await; self.state @@ -283,7 +299,7 @@ impl GlobalTimelines { check_tombstone: bool, ) -> Result> { // Check for existence and mark that we're creating it. - let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = { + let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup, wal_advertiser) = { let mut state = self.state.lock().unwrap(); match state.timelines.get(&ttid) { Some(GlobalMapTimeline::CreationInProgress) => { @@ -312,13 +328,16 @@ impl GlobalTimelines { }; // Do the actual move and reflect the result in the map. + let wal_advertiser = wal_advertiser.load_timeline(ttid); match GlobalTimelines::install_temp_timeline( ttid, tmp_path, - conf.clone(), + wal_advertiser, conf.clone(), wal_backup.clone(), ) - .await + + .await + { Ok(timeline) => { let mut timeline_shared_state = timeline.write_shared_state().await; @@ -359,6 +378,7 @@ impl GlobalTimelines { async fn install_temp_timeline( ttid: TenantTimelineId, tmp_path: &Utf8PathBuf, + wal_advertiser: Arc, conf: Arc, wal_backup: Arc, ) -> Result> { @@ -402,7 +422,7 @@ impl GlobalTimelines { // Do the move. durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?; - Timeline::load_timeline(conf, ttid, wal_backup) + Timeline::load_timeline(conf, ttid, wal_backup, wal_advertiser) } /// Get a timeline from the global map. If it's not present, it doesn't exist on disk, @@ -590,6 +610,10 @@ impl GlobalTimelines { Ok(deleted) } + pub fn get_wal_advertiser(&self) -> Arc { + self.state.lock().unwrap().wal_advertisement.clone() + } + pub fn housekeeping(&self, tombstone_ttl: &Duration) { let mut state = self.state.lock().unwrap(); diff --git a/safekeeper/src/wal_advertiser.rs b/safekeeper/src/wal_advertiser.rs new file mode 100644 index 0000000000..5908be3e5d --- /dev/null +++ b/safekeeper/src/wal_advertiser.rs @@ -0,0 +1,20 @@ +//! Advertise pending WAL to all pageservers that might be interested in it. + +use std::{collections::HashSet, sync::Arc, time::Duration}; + +use crate::{GlobalTimelines, SafeKeeperConf}; + +pub(crate) mod advmap; + +pub(crate) async fn wal_advertiser_loop( + conf: Arc, + global_timelines: Arc, +) -> anyhow::Result<()> { + todo!(); + node_loop().await; + Ok(()) +} + +pub(crate) async fn node_loop() { + loop {} +} diff --git a/safekeeper/src/wal_advertiser/advmap.rs b/safekeeper/src/wal_advertiser/advmap.rs new file mode 100644 index 0000000000..07048c78d0 --- /dev/null +++ b/safekeeper/src/wal_advertiser/advmap.rs @@ -0,0 +1,78 @@ +//! The data structure that track advertisement state. + +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; + +use serde::Serialize; +use utils::{ + generation::Generation, + id::{NodeId, TenantTimelineId, TimelineId}, + lsn::Lsn, + shard::{ShardIndex, TenantShardId}, +}; + +#[derive(Default)] +pub struct World { + pageservers: RwLock>>, +} + +pub struct Pageserver { + node_id: NodeId, + attachments: RwLock>>, +} + +pub struct PageserverAttachment { + pageserver: NodeId, + tenant_shard_id: TenantShardId, + generation: Generation, + remote_consistent_lsn: RwLock>>, +} + +pub struct PageserverTimeline { + pageserver: NodeId, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + generation: Generation, + remote_consistent_lsn: RwLock, +} + +pub struct SafekeeperTimeline {} + +pub struct UpdatePageserverAttachmentsArg { + pub generation: Generation, + pub pageserver_node_id: NodeId, +} + +impl World { + pub fn housekeeping(&self) {} + pub fn load_timeline(&self, ttid: TenantTimelineId) -> Arc { + todo!() + } + pub fn update_pageserver_attachments( + &self, + tenant_shard_id: TenantShardId, + arg: Vec, + ) -> anyhow::Result<()> { + todo!() + } +} + +impl SafekeeperTimeline { + pub fn get_pageserver_timeline( + &self, + ttld: TenantTimelineId, + shard: ShardIndex, + pageserver_generation: Generation, + ) -> Arc { + assert!(!pageserver_generation.is_none()); + todo!() + } +} + +impl PageserverTimeline { + pub fn update_remote_consistent_lsn(&self, lsn: Lsn) -> anyhow::Result<()> { + todo!() + } +} diff --git a/safekeeper/src/wal_service.rs b/safekeeper/src/wal_service.rs index 6e007265b2..855fe2c288 100644 --- a/safekeeper/src/wal_service.rs +++ b/safekeeper/src/wal_service.rs @@ -18,7 +18,7 @@ use utils::measured_stream::MeasuredStream; use crate::handler::SafekeeperPostgresHandler; use crate::metrics::TrafficMetrics; -use crate::{GlobalTimelines, SafeKeeperConf}; +use crate::{GlobalTimelines, SafeKeeperConf, wal_advertiser}; /// Accept incoming TCP connections and spawn them into a background thread. /// @@ -51,7 +51,7 @@ pub async fn task_main( error!("connection handler exited: {}", err); } } - .instrument(info_span!("", cid = %conn_id, ttid = field::Empty, application_name = field::Empty, shard = field::Empty)), + .instrument(info_span!("", cid = %conn_id, ttid = field::Empty, application_name = field::Empty, shard = field::Empty, pageserver_generation = field::Empty)), ); } } diff --git a/storage_broker/Cargo.toml b/storage_broker/Cargo.toml index 67b276c8fe..d6fcefbaf2 100644 --- a/storage_broker/Cargo.toml +++ b/storage_broker/Cargo.toml @@ -27,6 +27,7 @@ parking_lot.workspace = true prost.workspace = true tonic.workspace = true tokio = { workspace = true, features = ["rt-multi-thread"] } +tokio-util.workspace = true tokio-rustls.workspace = true tracing.workspace = true metrics.workspace = true diff --git a/storage_broker/src/lib.rs b/storage_broker/src/lib.rs index 149656a191..a6edea695d 100644 --- a/storage_broker/src/lib.rs +++ b/storage_broker/src/lib.rs @@ -1,10 +1,14 @@ use std::time::Duration; +use futures::Stream; use proto::TenantTimelineId as ProtoTenantTimelineId; -use proto::broker_service_client::BrokerServiceClient; +use tokio_util::sync::CancellationToken; use tonic::Status; -use tonic::codegen::StdError; -use tonic::transport::{Channel, Endpoint}; +use tonic::transport::Endpoint; +use tracing::{debug, error, info, warn}; +use utils::backoff::{ + DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff, +}; use utils::id::{TenantId, TenantTimelineId, TimelineId}; // Code generated by protobuf. @@ -22,6 +26,7 @@ pub mod metrics; pub use hyper::Uri; pub use tonic::transport::{Certificate, ClientTlsConfig}; pub use tonic::{Code, Request, Streaming}; +use utils::shard::TenantShardId; pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051"; pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}"); @@ -29,9 +34,199 @@ pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LIST pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms"; pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000); -// BrokerServiceClient charged with tonic provided Channel transport; helps to -// avoid depending on tonic directly in user crates. -pub type BrokerClientChannel = BrokerServiceClient; +#[derive(Clone)] +pub struct TimelineUpdatesSubscriber { + client: proto::broker_service_client::BrokerServiceClient, +} + +/// Wrapper type to weed out all places in the codebase that interact directly with the gRPC generated code. +pub struct BrokerClientChannel { + client: proto::broker_service_client::BrokerServiceClient, +} + +impl BrokerClientChannel { + pub fn into_raw_grpc_client( + self, + ) -> proto::broker_service_client::BrokerServiceClient { + self.client + } +} + +pub struct TimelineShardUpdate { + pub is_discovery: bool, + pub inner: proto::SafekeeperDiscoveryResponse, +} + +pub struct DiscoveryRequester { + id: ProtoTenantTimelineId, + client: proto::broker_service_client::BrokerServiceClient, +} + +impl TimelineUpdatesSubscriber { + pub fn new(service_client: BrokerClientChannel) -> Self { + Self { + client: service_client.client.clone(), + } + } + pub fn subscribe( + &mut self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + cancel: &CancellationToken, + ) -> (impl Stream, DiscoveryRequester) { + let id = ProtoTenantTimelineId { + tenant_id: tenant_shard_id.tenant_id.as_ref().to_owned(), + timeline_id: timeline_id.as_ref().to_owned(), + }; + let discovery_requester = DiscoveryRequester { + id: id.clone(), + client: self.client.clone(), + }; + let stream = async_stream::stream! { + let mut attempt = 0; + 'resubscribe: loop { + exponential_backoff( + attempt, + DEFAULT_BASE_BACKOFF_SECONDS, + DEFAULT_MAX_BACKOFF_SECONDS, + cancel, + ) + .await; + attempt += 1; + + use proto::*; + // subscribe to the specific timeline + let request = SubscribeByFilterRequest { + types: vec![ + TypeSubscription { + r#type: MessageType::SafekeeperTimelineInfo as i32, + }, + TypeSubscription { + r#type: MessageType::SafekeeperDiscoveryResponse as i32, + }, + ], + tenant_timeline_id: Some(FilterTenantTimelineId { + enabled: true, + tenant_timeline_id: Some(id.clone()), + }), + }; + + let res = tokio::select! { + r = self.client.subscribe_by_filter(request) => { r } + _ = cancel.cancelled() => { return; } + }; + let mut update_stream = match res + { + Ok(resp) => { + resp.into_inner() + } + Err(e) => { + // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and + // entire WAL is streamed. Keep this noticeable with logging, but do not warn/error. + info!( + attempt, "failed to subscribe: {e:#}" + ); + continue 'resubscribe; + } + }; + loop { + let broker_update = tokio::select!{ + _ = cancel.cancelled() => { + return; + } + update = update_stream.message() => { update } + }; + match broker_update { + Ok(Some(typed_msg)) => { + let mut is_discovery = false; + let timeline_update = match typed_msg.r#type() { + MessageType::SafekeeperTimelineInfo => { + let info = match typed_msg.safekeeper_timeline_info { + Some(info) => info, + None => { + warn!("bad proto message from broker: no safekeeper_timeline_info"); + continue 'resubscribe; + } + }; + SafekeeperDiscoveryResponse { + safekeeper_id: info.safekeeper_id, + tenant_timeline_id: info.tenant_timeline_id, + commit_lsn: info.commit_lsn, + safekeeper_connstr: info.safekeeper_connstr, + availability_zone: info.availability_zone, + standby_horizon: info.standby_horizon, + } + } + MessageType::SafekeeperDiscoveryResponse => { + is_discovery = true; + match typed_msg.safekeeper_discovery_response { + Some(response) => response, + None => { + warn!("bad proto message from broker: no safekeeper_discovery_response"); + continue 'resubscribe; + } + } + } + _ => { + // unexpected message + warn!("unexpected message from broker: {typed_msg:?}"); + continue 'resubscribe; + } + }; + attempt = 0; // reset backoff iff we received a valid update + yield TimelineShardUpdate{is_discovery, inner: timeline_update }; + }, + Err(status) => { + match status.code() { + Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => { + // tonic's error handling doesn't provide a clear code for disconnections: we get + // "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe" + // => https://github.com/neondatabase/neon/issues/9562 + info!("broker disconnected: {status}"); + }, + _ => { + warn!("broker subscription failed: {status}"); + } + } + continue 'resubscribe; + } + Ok(None) => { + error!("broker subscription stream ended"); // can't happen + continue 'resubscribe; + } + } + } + } + }; + (stream, discovery_requester) + } +} + +impl DiscoveryRequester { + pub async fn request(&mut self) { + let request = proto::SafekeeperDiscoveryRequest { + tenant_timeline_id: Some(self.id.clone()), + }; + let msg = proto::TypedMessage { + r#type: proto::MessageType::SafekeeperDiscoveryRequest as i32, + safekeeper_timeline_info: None, + safekeeper_discovery_request: Some(request), + safekeeper_discovery_response: None, + }; + + // Cancellation safety: we want to send a message to the broker, but publish_one() + // function can get cancelled by the other select! arm. This is absolutely fine, because + // we just want to receive broker updates and discovery is not important if we already + // receive updates. + // + // It is possible that `last_discovery_ts` will be updated, but the message will not be sent. + // This is totally fine because of the reason above. + + // This is a fire-and-forget request, we don't care about the response + let _ = self.client.publish_one(msg).await; + debug!("Discovery request sent to the broker"); + } +} // Create connection object configured to run TLS if schema starts with https:// // and plain text otherwise. Connection is lazy, only endpoint sanity is @@ -67,19 +262,9 @@ where .connect_timeout(DEFAULT_CONNECT_TIMEOUT); // keep_alive_timeout is 20s by default on both client and server side let channel = tonic_endpoint.connect_lazy(); - Ok(BrokerClientChannel::new(channel)) -} - -impl BrokerClientChannel { - /// Create a new client to the given endpoint, but don't actually connect until the first request. - pub async fn connect_lazy(dst: D) -> Result - where - D: std::convert::TryInto, - D::Error: Into, - { - let conn = tonic::transport::Endpoint::new(dst)?.connect_lazy(); - Ok(Self::new(conn)) - } + Ok(BrokerClientChannel { + client: proto::broker_service_client::BrokerServiceClient::new(channel), + }) } // parse variable length bytes from protobuf