diff --git a/libs/utils/src/seqwait.rs b/libs/utils/src/seqwait.rs index 0544c5be03..375b227b99 100644 --- a/libs/utils/src/seqwait.rs +++ b/libs/utils/src/seqwait.rs @@ -2,11 +2,10 @@ use std::cmp::{Eq, Ordering}; use std::collections::BinaryHeap; -use std::fmt::Debug; use std::mem; use std::sync::Mutex; use std::time::Duration; -use tokio::sync::watch::{channel, Receiver, Sender}; +use tokio::sync::watch::{self, channel}; use tokio::time::timeout; /// An error happened while waiting for a number @@ -35,23 +34,73 @@ pub trait MonotonicCounter { fn cnt_value(&self) -> V; } -/// Internal components of a `SeqWait` -struct SeqWaitInt +/// Heap of waiters, lowest numbers pop first. +struct Waiters where - S: MonotonicCounter, V: Ord, { - waiters: BinaryHeap>, - current: S, - shutdown: bool, + heap: BinaryHeap>, + /// Number of the first waiter in the heap, or None if there are no waiters. + status_channel: watch::Sender>, +} + +impl Waiters +where + V: Ord + Copy, +{ + fn new() -> Self { + Waiters { + heap: BinaryHeap::new(), + status_channel: channel(None).0, + } + } + + /// `status_channel` contains the number of the first waiter in the heap. + /// This function should be called whenever waiters heap changes. + fn update_status(&self) { + let first_waiter = self.heap.peek().map(|w| w.wake_num); + let _ = self.status_channel.send_replace(first_waiter); + } + + /// Add new waiter to the heap, return a channel that will be notified when the number arrives. + fn add(&mut self, num: V) -> watch::Receiver<()> { + let (tx, rx) = channel(()); + self.heap.push(Waiter { + wake_num: num, + wake_channel: tx, + }); + self.update_status(); + rx + } + + /// Pop all waiters <= num from the heap. Collect channels in a vector, + /// so that caller can wake them up. + fn pop_leq(&mut self, num: V) -> Vec> { + let mut wake_these = Vec::new(); + while let Some(n) = self.heap.peek() { + if n.wake_num > num { + break; + } + wake_these.push(self.heap.pop().unwrap().wake_channel); + } + self.update_status(); + wake_these + } + + /// Used on shutdown to efficiently drop all waiters. + fn take_all(&mut self) -> BinaryHeap> { + let heap = mem::take(&mut self.heap); + self.update_status(); + heap + } } struct Waiter where T: Ord, { - wake_num: T, // wake me when this number arrives ... - wake_channel: Sender<()>, // ... by sending a message to this channel + wake_num: T, // wake me when this number arrives ... + wake_channel: watch::Sender<()>, // ... by sending a message to this channel } // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here @@ -76,6 +125,17 @@ impl PartialEq for Waiter { impl Eq for Waiter {} +/// Internal components of a `SeqWait` +struct SeqWaitInt +where + S: MonotonicCounter, + V: Ord, +{ + waiters: Waiters, + current: S, + shutdown: bool, +} + /// A tool for waiting on a sequence number /// /// This provides a way to wait the arrival of a number. @@ -108,7 +168,7 @@ where /// Create a new `SeqWait`, initialized to a particular number pub fn new(starting_num: S) -> Self { let internal = SeqWaitInt { - waiters: BinaryHeap::new(), + waiters: Waiters::new(), current: starting_num, shutdown: false, }; @@ -128,9 +188,8 @@ where // Block any future waiters from starting internal.shutdown = true; - // This will steal the entire waiters map. - // When we drop it all waiters will be woken. - mem::take(&mut internal.waiters) + // Take all waiters to drop them later. + internal.waiters.take_all() // Drop the lock as we exit this scope. }; @@ -196,7 +255,7 @@ where /// Register and return a channel that will be notified when a number arrives, /// or None, if it has already arrived. - fn queue_for_wait(&self, num: V) -> Result>, SeqWaitError> { + fn queue_for_wait(&self, num: V) -> Result>, SeqWaitError> { let mut internal = self.internal.lock().unwrap(); if internal.current.cnt_value() >= num { return Ok(None); @@ -205,12 +264,8 @@ where return Err(SeqWaitError::Shutdown); } - // Create a new channel. - let (tx, rx) = channel(()); - internal.waiters.push(Waiter { - wake_num: num, - wake_channel: tx, - }); + // Add waiter channel to the queue. + let rx = internal.waiters.add(num); // Drop the lock as we exit this scope. Ok(Some(rx)) } @@ -231,16 +286,8 @@ where } internal.current.cnt_advance(num); - // Pop all waiters <= num from the heap. Collect them in a vector, and - // wake them up after releasing the lock. - let mut wake_these = Vec::new(); - while let Some(n) = internal.waiters.peek() { - if n.wake_num > num { - break; - } - wake_these.push(internal.waiters.pop().unwrap().wake_channel); - } - wake_these + // Pop all waiters <= num from the heap. + internal.waiters.pop_leq(num) }; for tx in wake_these { @@ -255,6 +302,23 @@ where pub fn load(&self) -> S { self.internal.lock().unwrap().current } + + /// Get a Receiver for the current status. + /// + /// The current status is the number of the first waiter in the queue, + /// or None if there are no waiters. + /// + /// This receiver will be notified whenever the status changes. + /// It is useful for receiving notifications when the first waiter + /// starts waiting for a number, or when there are no more waiters left. + pub fn status_receiver(&self) -> watch::Receiver> { + self.internal + .lock() + .unwrap() + .waiters + .status_channel + .subscribe() + } } #[cfg(test)] diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 2a2c5d4ee5..5537505749 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1253,6 +1253,12 @@ impl Timeline { self.last_record_lsn.load() } + /// Subscribe to callers of wait_lsn(). The value of the channel is None if there are no + /// wait_lsn() calls in progress, and Some(Lsn) if there is an active waiter for wait_lsn(). + pub(crate) fn subscribe_for_wait_lsn_updates(&self) -> watch::Receiver> { + self.last_record_lsn.status_receiver() + } + pub(crate) fn get_disk_consistent_lsn(&self) -> Lsn { self.disk_consistent_lsn.load() } diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index dae31934ad..7ef063c4e5 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -22,10 +22,12 @@ use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeli use anyhow::Context; use chrono::{NaiveDateTime, Utc}; use pageserver_api::models::TimelineState; -use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey; -use storage_broker::proto::SafekeeperTimelineInfo; -use storage_broker::proto::SubscribeSafekeeperInfoRequest; + use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; +use storage_broker::proto::{ + FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse, + SubscribeByFilterRequest, TypeSubscription, TypedMessage, +}; use storage_broker::{BrokerClientChannel, Code, Streaming}; use tokio_util::sync::CancellationToken; use tracing::*; @@ -89,6 +91,14 @@ pub(super) async fn connection_manager_loop_step( .timeline .subscribe_for_state_updates(); + let mut wait_lsn_status = connection_manager_state + .timeline + .subscribe_for_wait_lsn_updates(); + + // TODO: create a separate config option for discovery request interval + let discovery_request_interval = connection_manager_state.conf.lagging_wal_timeout; + let mut last_discovery_ts: Option = None; + // Subscribe to the broker updates. Stream shares underlying TCP connection // with other streams on this client (other connection managers). When // object goes out of scope, stream finishes in drop() automatically. @@ -97,10 +107,12 @@ pub(super) async fn connection_manager_loop_step( loop { let time_until_next_retry = connection_manager_state.time_until_next_retry(); + let any_activity = connection_manager_state.wal_connection.is_some() + || !connection_manager_state.wal_stream_candidates.is_empty(); // These things are happening concurrently: // - // - cancellation request + // - cancellation request // - keep receiving WAL on the current connection // - if the shared state says we need to change connection, disconnect and return // - this runs in a separate task and we receive updates via a watch channel @@ -108,6 +120,7 @@ pub(super) async fn connection_manager_loop_step( // - receive updates from broker // - this might change the current desired connection // - timeline state changes to something that does not allow walreceiver to run concurrently + // - if there's no connection and no candidates, try to send a discovery request // NB: make sure each of the select expressions are cancellation-safe // (no need for arms to be cancellation-safe). @@ -214,6 +227,65 @@ pub(super) async fn connection_manager_loop_step( } } } => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"), + + Some(()) = async { + // Reminder: this match arm needs to be cancellation-safe. + // Calculating time needed to wait until sending the next discovery request. + // Current implementation is conservative and sends discovery requests only when there are no candidates. + + if any_activity { + // No need to send discovery requests if there is an active connection or candidates. + return None; + } + + // Waiting for an active wait_lsn request. + while wait_lsn_status.borrow().is_none() { + if wait_lsn_status.changed().await.is_err() { + // wait_lsn_status channel was closed, exiting + warn!("wait_lsn_status channel was closed in connection_manager_loop_step"); + return None; + } + } + + // All preconditions met, preparing to send a discovery request. + let now = std::time::Instant::now(); + let next_discovery_ts = last_discovery_ts + .map(|ts| ts + discovery_request_interval) + .unwrap_or_else(|| now); + + if next_discovery_ts > now { + // Prevent sending discovery requests too frequently. + tokio::time::sleep(next_discovery_ts - now).await; + } + + let tenant_timeline_id = Some(ProtoTenantTimelineId { + tenant_id: id.tenant_id.as_ref().to_owned(), + timeline_id: id.timeline_id.as_ref().to_owned(), + }); + let request = SafekeeperDiscoveryRequest { tenant_timeline_id }; + let msg = TypedMessage { + r#type: MessageType::SafekeeperDiscoveryRequest as i32, + safekeeper_timeline_info: None, + safekeeper_discovery_request: Some(request), + safekeeper_discovery_response: None, + }; + + last_discovery_ts = Some(std::time::Instant::now()); + debug!("No active connection and no candidates, sending discovery request to the broker"); + + // Cancellation safety: we want to send a message to the broker, but publish_one() + // function can get cancelled by the other select! arm. This is absolutely fine, because + // we just want to receive broker updates and discovery is not important if we already + // receive updates. + // + // It is possible that `last_discovery_ts` will be updated, but the message will not be sent. + // This is totally fine because of the reason above. + + // This is a fire-and-forget request, we don't care about the response + let _ = broker_client.publish_one(msg).await; + debug!("Discovery request sent to the broker"); + None + } => {} } if let Some(new_candidate) = connection_manager_state.next_connection_candidate() { @@ -231,7 +303,7 @@ async fn subscribe_for_timeline_updates( broker_client: &mut BrokerClientChannel, id: TenantTimelineId, cancel: &CancellationToken, -) -> Result, Cancelled> { +) -> Result, Cancelled> { let mut attempt = 0; loop { exponential_backoff( @@ -244,17 +316,27 @@ async fn subscribe_for_timeline_updates( attempt += 1; // subscribe to the specific timeline - let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId { - tenant_id: id.tenant_id.as_ref().to_owned(), - timeline_id: id.timeline_id.as_ref().to_owned(), - }); - let request = SubscribeSafekeeperInfoRequest { - subscription_key: Some(key), + let request = SubscribeByFilterRequest { + types: vec![ + TypeSubscription { + r#type: MessageType::SafekeeperTimelineInfo as i32, + }, + TypeSubscription { + r#type: MessageType::SafekeeperDiscoveryResponse as i32, + }, + ], + tenant_timeline_id: Some(FilterTenantTimelineId { + enabled: true, + tenant_timeline_id: Some(ProtoTenantTimelineId { + tenant_id: id.tenant_id.as_ref().to_owned(), + timeline_id: id.timeline_id.as_ref().to_owned(), + }), + }), }; match { tokio::select! { - r = broker_client.subscribe_safekeeper_info(request) => { r } + r = broker_client.subscribe_by_filter(request) => { r } _ = cancel.cancelled() => { return Err(Cancelled); } } } { @@ -398,7 +480,7 @@ struct RetryInfo { /// Data about the timeline to connect to, received from the broker. #[derive(Debug, Clone)] struct BrokerSkTimeline { - timeline: SafekeeperTimelineInfo, + timeline: SafekeeperDiscoveryResponse, /// Time at which the data was fetched from the broker last time, to track the stale data. latest_update: NaiveDateTime, } @@ -606,7 +688,41 @@ impl ConnectionManagerState { } /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key. - fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) { + fn register_timeline_update(&mut self, typed_msg: TypedMessage) { + let mut is_discovery = false; + let timeline_update = match typed_msg.r#type() { + MessageType::SafekeeperTimelineInfo => { + let info = match typed_msg.safekeeper_timeline_info { + Some(info) => info, + None => { + warn!("bad proto message from broker: no safekeeper_timeline_info"); + return; + } + }; + SafekeeperDiscoveryResponse { + safekeeper_id: info.safekeeper_id, + tenant_timeline_id: info.tenant_timeline_id, + commit_lsn: info.commit_lsn, + safekeeper_connstr: info.safekeeper_connstr, + availability_zone: info.availability_zone, + } + } + MessageType::SafekeeperDiscoveryResponse => { + is_discovery = true; + match typed_msg.safekeeper_discovery_response { + Some(response) => response, + None => { + warn!("bad proto message from broker: no safekeeper_discovery_response"); + return; + } + } + } + _ => { + // unexpected message + return; + } + }; + WALRECEIVER_BROKER_UPDATES.inc(); let new_safekeeper_id = NodeId(timeline_update.safekeeper_id); @@ -619,7 +735,11 @@ impl ConnectionManagerState { ); if old_entry.is_none() { - info!("New SK node was added: {new_safekeeper_id}"); + info!( + ?is_discovery, + %new_safekeeper_id, + "New SK node was added", + ); WALRECEIVER_CANDIDATES_ADDED.inc(); } } @@ -818,7 +938,7 @@ impl ConnectionManagerState { fn select_connection_candidate( &self, node_to_omit: Option, - ) -> Option<(NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> { + ) -> Option<(NodeId, &SafekeeperDiscoveryResponse, PgConnectionConfig)> { self.applicable_connection_candidates() .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit) .max_by_key(|(_, info, _)| info.commit_lsn) @@ -828,7 +948,7 @@ impl ConnectionManagerState { /// Some safekeepers are filtered by the retry cooldown. fn applicable_connection_candidates( &self, - ) -> impl Iterator { + ) -> impl Iterator { let now = Utc::now().naive_utc(); self.wal_stream_candidates @@ -968,19 +1088,11 @@ mod tests { latest_update: NaiveDateTime, ) -> BrokerSkTimeline { BrokerSkTimeline { - timeline: SafekeeperTimelineInfo { + timeline: SafekeeperDiscoveryResponse { safekeeper_id: 0, tenant_timeline_id: None, - term: 0, - last_log_term: 0, - flush_lsn: 0, commit_lsn, - backup_lsn: 0, - remote_consistent_lsn: 0, - peer_horizon_lsn: 0, - local_start_lsn: 0, safekeeper_connstr: safekeeper_connstr.to_owned(), - http_connstr: safekeeper_connstr.to_owned(), availability_zone: None, }, latest_update, diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index e53ccaeb3d..09c565ce71 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -177,6 +177,10 @@ struct Args { /// Controls how long backup will wait until uploading the partial segment. #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_PARTIAL_BACKUP_TIMEOUT, verbatim_doc_comment)] partial_backup_timeout: Duration, + /// Disable task to push messages to broker every second. Supposed to + /// be used in tests. + #[arg(long)] + disable_periodic_broker_push: bool, } // Like PathBufValueParser, but allows empty string. @@ -309,6 +313,7 @@ async fn main() -> anyhow::Result<()> { walsenders_keep_horizon: args.walsenders_keep_horizon, partial_backup_enabled: args.partial_backup_enabled, partial_backup_timeout: args.partial_backup_timeout, + disable_periodic_broker_push: args.disable_periodic_broker_push, }; // initialize sentry if SENTRY_DSN is provided diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 2b1db2714b..98f58d3e49 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -10,11 +10,20 @@ use anyhow::Result; use storage_broker::parse_proto_ttid; use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey; +use storage_broker::proto::FilterTenantTimelineId; +use storage_broker::proto::MessageType; +use storage_broker::proto::SafekeeperDiscoveryResponse; +use storage_broker::proto::SubscribeByFilterRequest; use storage_broker::proto::SubscribeSafekeeperInfoRequest; +use storage_broker::proto::TypeSubscription; +use storage_broker::proto::TypedMessage; use storage_broker::Request; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; +use std::time::UNIX_EPOCH; use tokio::task::JoinHandle; use tokio::time::sleep; use tracing::*; @@ -31,6 +40,12 @@ const PUSH_INTERVAL_MSEC: u64 = 1000; /// Push once in a while data about all active timelines to the broker. async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { + if conf.disable_periodic_broker_push { + info!("broker push_loop is disabled, doing nothing..."); + futures::future::pending::<()>().await; // sleep forever + return Ok(()); + } + let mut client = storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?; let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC); @@ -75,7 +90,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { } /// Subscribe and fetch all the interesting data from the broker. -async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { +async fn pull_loop(conf: SafeKeeperConf, stats: Arc) -> Result<()> { let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?; // TODO: subscribe only to local timelines instead of all @@ -94,6 +109,8 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { let err_counter = BROKER_PULLED_UPDATES.with_label_values(&["error"]); while let Some(msg) = stream.message().await? { + stats.update_pulled(); + let proto_ttid = msg .tenant_timeline_id .as_ref() @@ -119,12 +136,93 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { bail!("end of stream"); } +/// Process incoming discover requests. This is done in a separate task to avoid +/// interfering with the normal pull/push loops. +async fn discover_loop(conf: SafeKeeperConf, stats: Arc) -> Result<()> { + let mut client = + storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?; + + let request = SubscribeByFilterRequest { + types: vec![TypeSubscription { + r#type: MessageType::SafekeeperDiscoveryRequest as i32, + }], + tenant_timeline_id: Some(FilterTenantTimelineId { + enabled: false, + tenant_timeline_id: None, + }), + }; + + let mut stream = client + .subscribe_by_filter(request) + .await + .context("subscribe_by_filter request failed")? + .into_inner(); + + let discover_counter = BROKER_PULLED_UPDATES.with_label_values(&["discover"]); + + while let Some(typed_msg) = stream.message().await? { + stats.update_pulled(); + + match typed_msg.r#type() { + MessageType::SafekeeperDiscoveryRequest => { + let msg = typed_msg + .safekeeper_discovery_request + .expect("proto type mismatch from broker message"); + + let proto_ttid = msg + .tenant_timeline_id + .as_ref() + .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?; + let ttid = parse_proto_ttid(proto_ttid)?; + if let Ok(tli) = GlobalTimelines::get(ttid) { + // we received a discovery request for a timeline we know about + discover_counter.inc(); + + // create and reply with discovery response + let sk_info = tli.get_safekeeper_info(&conf).await; + let response = SafekeeperDiscoveryResponse { + safekeeper_id: sk_info.safekeeper_id, + tenant_timeline_id: sk_info.tenant_timeline_id, + commit_lsn: sk_info.commit_lsn, + safekeeper_connstr: sk_info.safekeeper_connstr, + availability_zone: sk_info.availability_zone, + }; + + // note this is a blocking call + client + .publish_one(TypedMessage { + r#type: MessageType::SafekeeperDiscoveryResponse as i32, + safekeeper_timeline_info: None, + safekeeper_discovery_request: None, + safekeeper_discovery_response: Some(response), + }) + .await?; + } + } + + _ => { + warn!( + "unexpected message type i32 {}, {:?}", + typed_msg.r#type, + typed_msg.r#type() + ); + } + } + } + bail!("end of stream"); +} + pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> { info!("started, broker endpoint {:?}", conf.broker_endpoint); let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC)); let mut push_handle: Option>> = None; let mut pull_handle: Option>> = None; + let mut discover_handle: Option>> = None; + + let stats = Arc::new(BrokerStats::new()); + let stats_task = task_stats(stats.clone()); + tokio::pin!(stats_task); // Selecting on JoinHandles requires some squats; is there a better way to // reap tasks individually? @@ -153,13 +251,77 @@ pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> { }; pull_handle = None; }, + res = async { discover_handle.as_mut().unwrap().await }, if discover_handle.is_some() => { + // was it panic or normal error? + match res { + Ok(res_internal) => if let Err(err_inner) = res_internal { + warn!("discover task failed: {:?}", err_inner); + } + Err(err_outer) => { warn!("discover task panicked: {:?}", err_outer) } + }; + discover_handle = None; + }, _ = ticker.tick() => { if push_handle.is_none() { push_handle = Some(tokio::spawn(push_loop(conf.clone()))); } if pull_handle.is_none() { - pull_handle = Some(tokio::spawn(pull_loop(conf.clone()))); + pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), stats.clone()))); } + if discover_handle.is_none() { + discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), stats.clone()))); + } + }, + _ = &mut stats_task => {} + } + } +} + +struct BrokerStats { + /// Timestamp of the last received message from the broker. + last_pulled_ts: AtomicU64, +} + +impl BrokerStats { + fn new() -> Self { + BrokerStats { + last_pulled_ts: AtomicU64::new(0), + } + } + + fn now_millis() -> u64 { + std::time::SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("time is before epoch") + .as_millis() as u64 + } + + /// Update last_pulled timestamp to current time. + fn update_pulled(&self) { + self.last_pulled_ts + .store(Self::now_millis(), std::sync::atomic::Ordering::Relaxed); + } +} + +/// Periodically write to logs if there are issues with receiving data from the broker. +async fn task_stats(stats: Arc) { + let warn_duration = Duration::from_secs(10); + let mut ticker = tokio::time::interval(warn_duration); + + loop { + tokio::select! { + _ = ticker.tick() => { + let last_pulled = stats.last_pulled_ts.load(std::sync::atomic::Ordering::SeqCst); + if last_pulled == 0 { + // no broker updates yet + continue; + } + + let now = BrokerStats::now_millis(); + if now > last_pulled && now - last_pulled > warn_duration.as_millis() as u64 { + let ts = chrono::NaiveDateTime::from_timestamp_millis(last_pulled as i64).expect("invalid timestamp"); + info!("no broker updates for some time, last update: {:?}", ts); + } } } } diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 9b4d4dbb38..543714a54e 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -83,6 +83,7 @@ pub struct SafeKeeperConf { pub walsenders_keep_horizon: bool, pub partial_backup_enabled: bool, pub partial_backup_timeout: Duration, + pub disable_periodic_broker_push: bool, } impl SafeKeeperConf { @@ -129,6 +130,7 @@ impl SafeKeeperConf { walsenders_keep_horizon: false, partial_backup_enabled: false, partial_backup_timeout: Duration::from_secs(0), + disable_periodic_broker_push: false, } } } diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index bc21c4d765..27e2a4453b 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -178,6 +178,7 @@ pub fn run_server(os: NodeOs, disk: Arc) -> Result<()> { walsenders_keep_horizon: false, partial_backup_enabled: false, partial_backup_timeout: Duration::from_secs(0), + disable_periodic_broker_push: false, }; let mut global = GlobalMap::new(disk, conf.clone())?; diff --git a/storage_broker/src/bin/storage_broker.rs b/storage_broker/src/bin/storage_broker.rs index 4e5f8ed724..8c88b61abc 100644 --- a/storage_broker/src/bin/storage_broker.rs +++ b/storage_broker/src/bin/storage_broker.rs @@ -196,8 +196,13 @@ impl SubscriptionKey { /// Parse from FilterTenantTimelineId pub fn from_proto_filter_tenant_timeline_id( - f: &FilterTenantTimelineId, + opt: Option<&FilterTenantTimelineId>, ) -> Result { + if opt.is_none() { + return Ok(SubscriptionKey::All); + } + + let f = opt.unwrap(); if !f.enabled { return Ok(SubscriptionKey::All); } @@ -534,10 +539,7 @@ impl BrokerService for Broker { .remote_addr() .expect("TCPConnectInfo inserted by handler"); let proto_filter = request.into_inner(); - let ttid_filter = proto_filter - .tenant_timeline_id - .as_ref() - .ok_or_else(|| Status::new(Code::InvalidArgument, "missing tenant_timeline_id"))?; + let ttid_filter = proto_filter.tenant_timeline_id.as_ref(); let sub_key = SubscriptionKey::from_proto_filter_tenant_timeline_id(ttid_filter)?; let types_set = proto_filter diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index ac1a747df3..967d133e18 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -1828,7 +1828,7 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder): env = neon_env_builder.init_start() tenant_id = env.initial_tenant - timeline_id = env.neon_cli.create_branch("test_sk_auth_restart_endpoint") + timeline_id = env.neon_cli.create_branch("test_idle_reconnections") def collect_stats() -> Dict[str, float]: # we need to collect safekeeper_pg_queries_received_total metric from all safekeepers @@ -1859,7 +1859,7 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder): collect_stats() - endpoint = env.endpoints.create_start("test_sk_auth_restart_endpoint") + endpoint = env.endpoints.create_start("test_idle_reconnections") # just write something to the timeline endpoint.safe_psql("create table t(i int)") collect_stats() @@ -2007,3 +2007,47 @@ def test_patch_control_file(neon_env_builder: NeonEnvBuilder): ) log.info(f"dump_control_file response: {res}") assert res["timelines"][0]["control_file"]["timeline_start_lsn"] == "0/1" + + +# Test disables periodic pushes from safekeeper to the broker and checks that +# pageserver can still discover safekeepers with discovery requests. +def test_broker_discovery(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_safekeepers = 3 + neon_env_builder.enable_safekeeper_remote_storage(RemoteStorageKind.LOCAL_FS) + env = neon_env_builder.init_start() + + env.neon_cli.create_branch("test_broker_discovery") + + endpoint = env.endpoints.create_start( + "test_broker_discovery", + config_lines=["shared_buffers=1MB"], + ) + endpoint.safe_psql("create table t(i int, payload text)") + # Install extension containing function needed to clear buffer + endpoint.safe_psql("CREATE EXTENSION neon_test_utils") + + def do_something(): + time.sleep(1) + # generate some data to commit WAL on safekeepers + endpoint.safe_psql("insert into t select generate_series(1,100), 'action'") + # clear the buffers + endpoint.safe_psql("select clear_buffer_cache()") + # read data to fetch pages from pageserver + endpoint.safe_psql("select sum(i) from t") + + do_something() + do_something() + + for sk in env.safekeepers: + # Disable periodic broker push, so pageserver won't be able to discover + # safekeepers without sending a discovery request + sk.stop().start(extra_opts=["--disable-periodic-broker-push"]) + + do_something() + do_something() + + # restart pageserver and check how everything works + env.pageserver.stop().start() + + do_something() + do_something()