Discover safekeepers via broker request (#7279)

We had an incident where pageserver requests timed out because
pageserver couldn't fetch WAL from safekeepers. This incident was caused
by a bug in safekeeper logic for timeline activation, which prevented
pageserver from finding safekeepers.
This bug was since fixed, but there is still a chance of a similar bug
in the future due to overall complexity.

We add a new broker message to "signal interest" for timeline. This
signal will be sent by pageservers `wait_lsn`, and safekeepers will
receive this signal to start broadcasting broker messages. Then every
broker subscriber will be able to find the safekeepers and connect to
them (to start fetching WAL).

This feature is not limited to pageservers and any service that wants to
download WAL from safekeepers will be able to use this discovery
request.

This commit changes pageserver's connection_manager (walreceiver) to
send a SafekeeperDiscoveryRequest when there is no information about
safekeepers present in memory. Current implementation will send these
requests only if there is an active wait_lsn() call and no more often
than once per 10 seconds.

Add `test_broker_discovery` to test this: safekeepers started with
`--disable-periodic-broker-push` will not push info to broker so that
pageserver must use a discovery to start fetching WAL.

Add task_stats in safekeepers broker module to log a warning if there is
no message received from the broker for the last 10 seconds.

Closes #5471

---------

Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
Arthur Petukhovsky
2024-04-30 21:50:03 +03:00
committed by GitHub
parent fcbe60f436
commit 50a45e67dc
9 changed files with 464 additions and 66 deletions

View File

@@ -2,11 +2,10 @@
use std::cmp::{Eq, Ordering};
use std::collections::BinaryHeap;
use std::fmt::Debug;
use std::mem;
use std::sync::Mutex;
use std::time::Duration;
use tokio::sync::watch::{channel, Receiver, Sender};
use tokio::sync::watch::{self, channel};
use tokio::time::timeout;
/// An error happened while waiting for a number
@@ -35,23 +34,73 @@ pub trait MonotonicCounter<V> {
fn cnt_value(&self) -> V;
}
/// Internal components of a `SeqWait`
struct SeqWaitInt<S, V>
/// Heap of waiters, lowest numbers pop first.
struct Waiters<V>
where
S: MonotonicCounter<V>,
V: Ord,
{
waiters: BinaryHeap<Waiter<V>>,
current: S,
shutdown: bool,
heap: BinaryHeap<Waiter<V>>,
/// Number of the first waiter in the heap, or None if there are no waiters.
status_channel: watch::Sender<Option<V>>,
}
impl<V> Waiters<V>
where
V: Ord + Copy,
{
fn new() -> Self {
Waiters {
heap: BinaryHeap::new(),
status_channel: channel(None).0,
}
}
/// `status_channel` contains the number of the first waiter in the heap.
/// This function should be called whenever waiters heap changes.
fn update_status(&self) {
let first_waiter = self.heap.peek().map(|w| w.wake_num);
let _ = self.status_channel.send_replace(first_waiter);
}
/// Add new waiter to the heap, return a channel that will be notified when the number arrives.
fn add(&mut self, num: V) -> watch::Receiver<()> {
let (tx, rx) = channel(());
self.heap.push(Waiter {
wake_num: num,
wake_channel: tx,
});
self.update_status();
rx
}
/// Pop all waiters <= num from the heap. Collect channels in a vector,
/// so that caller can wake them up.
fn pop_leq(&mut self, num: V) -> Vec<watch::Sender<()>> {
let mut wake_these = Vec::new();
while let Some(n) = self.heap.peek() {
if n.wake_num > num {
break;
}
wake_these.push(self.heap.pop().unwrap().wake_channel);
}
self.update_status();
wake_these
}
/// Used on shutdown to efficiently drop all waiters.
fn take_all(&mut self) -> BinaryHeap<Waiter<V>> {
let heap = mem::take(&mut self.heap);
self.update_status();
heap
}
}
struct Waiter<T>
where
T: Ord,
{
wake_num: T, // wake me when this number arrives ...
wake_channel: Sender<()>, // ... by sending a message to this channel
wake_num: T, // wake me when this number arrives ...
wake_channel: watch::Sender<()>, // ... by sending a message to this channel
}
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
@@ -76,6 +125,17 @@ impl<T: Ord> PartialEq for Waiter<T> {
impl<T: Ord> Eq for Waiter<T> {}
/// Internal components of a `SeqWait`
struct SeqWaitInt<S, V>
where
S: MonotonicCounter<V>,
V: Ord,
{
waiters: Waiters<V>,
current: S,
shutdown: bool,
}
/// A tool for waiting on a sequence number
///
/// This provides a way to wait the arrival of a number.
@@ -108,7 +168,7 @@ where
/// Create a new `SeqWait`, initialized to a particular number
pub fn new(starting_num: S) -> Self {
let internal = SeqWaitInt {
waiters: BinaryHeap::new(),
waiters: Waiters::new(),
current: starting_num,
shutdown: false,
};
@@ -128,9 +188,8 @@ where
// Block any future waiters from starting
internal.shutdown = true;
// This will steal the entire waiters map.
// When we drop it all waiters will be woken.
mem::take(&mut internal.waiters)
// Take all waiters to drop them later.
internal.waiters.take_all()
// Drop the lock as we exit this scope.
};
@@ -196,7 +255,7 @@ where
/// Register and return a channel that will be notified when a number arrives,
/// or None, if it has already arrived.
fn queue_for_wait(&self, num: V) -> Result<Option<Receiver<()>>, SeqWaitError> {
fn queue_for_wait(&self, num: V) -> Result<Option<watch::Receiver<()>>, SeqWaitError> {
let mut internal = self.internal.lock().unwrap();
if internal.current.cnt_value() >= num {
return Ok(None);
@@ -205,12 +264,8 @@ where
return Err(SeqWaitError::Shutdown);
}
// Create a new channel.
let (tx, rx) = channel(());
internal.waiters.push(Waiter {
wake_num: num,
wake_channel: tx,
});
// Add waiter channel to the queue.
let rx = internal.waiters.add(num);
// Drop the lock as we exit this scope.
Ok(Some(rx))
}
@@ -231,16 +286,8 @@ where
}
internal.current.cnt_advance(num);
// Pop all waiters <= num from the heap. Collect them in a vector, and
// wake them up after releasing the lock.
let mut wake_these = Vec::new();
while let Some(n) = internal.waiters.peek() {
if n.wake_num > num {
break;
}
wake_these.push(internal.waiters.pop().unwrap().wake_channel);
}
wake_these
// Pop all waiters <= num from the heap.
internal.waiters.pop_leq(num)
};
for tx in wake_these {
@@ -255,6 +302,23 @@ where
pub fn load(&self) -> S {
self.internal.lock().unwrap().current
}
/// Get a Receiver for the current status.
///
/// The current status is the number of the first waiter in the queue,
/// or None if there are no waiters.
///
/// This receiver will be notified whenever the status changes.
/// It is useful for receiving notifications when the first waiter
/// starts waiting for a number, or when there are no more waiters left.
pub fn status_receiver(&self) -> watch::Receiver<Option<V>> {
self.internal
.lock()
.unwrap()
.waiters
.status_channel
.subscribe()
}
}
#[cfg(test)]

View File

@@ -1253,6 +1253,12 @@ impl Timeline {
self.last_record_lsn.load()
}
/// Subscribe to callers of wait_lsn(). The value of the channel is None if there are no
/// wait_lsn() calls in progress, and Some(Lsn) if there is an active waiter for wait_lsn().
pub(crate) fn subscribe_for_wait_lsn_updates(&self) -> watch::Receiver<Option<Lsn>> {
self.last_record_lsn.status_receiver()
}
pub(crate) fn get_disk_consistent_lsn(&self) -> Lsn {
self.disk_consistent_lsn.load()
}

View File

@@ -22,10 +22,12 @@ use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeli
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
use pageserver_api::models::TimelineState;
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::SubscribeSafekeeperInfoRequest;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use storage_broker::proto::{
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
SubscribeByFilterRequest, TypeSubscription, TypedMessage,
};
use storage_broker::{BrokerClientChannel, Code, Streaming};
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -89,6 +91,14 @@ pub(super) async fn connection_manager_loop_step(
.timeline
.subscribe_for_state_updates();
let mut wait_lsn_status = connection_manager_state
.timeline
.subscribe_for_wait_lsn_updates();
// TODO: create a separate config option for discovery request interval
let discovery_request_interval = connection_manager_state.conf.lagging_wal_timeout;
let mut last_discovery_ts: Option<std::time::Instant> = None;
// Subscribe to the broker updates. Stream shares underlying TCP connection
// with other streams on this client (other connection managers). When
// object goes out of scope, stream finishes in drop() automatically.
@@ -97,10 +107,12 @@ pub(super) async fn connection_manager_loop_step(
loop {
let time_until_next_retry = connection_manager_state.time_until_next_retry();
let any_activity = connection_manager_state.wal_connection.is_some()
|| !connection_manager_state.wal_stream_candidates.is_empty();
// These things are happening concurrently:
//
// - cancellation request
// - cancellation request
// - keep receiving WAL on the current connection
// - if the shared state says we need to change connection, disconnect and return
// - this runs in a separate task and we receive updates via a watch channel
@@ -108,6 +120,7 @@ pub(super) async fn connection_manager_loop_step(
// - receive updates from broker
// - this might change the current desired connection
// - timeline state changes to something that does not allow walreceiver to run concurrently
// - if there's no connection and no candidates, try to send a discovery request
// NB: make sure each of the select expressions are cancellation-safe
// (no need for arms to be cancellation-safe).
@@ -214,6 +227,65 @@ pub(super) async fn connection_manager_loop_step(
}
}
} => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
Some(()) = async {
// Reminder: this match arm needs to be cancellation-safe.
// Calculating time needed to wait until sending the next discovery request.
// Current implementation is conservative and sends discovery requests only when there are no candidates.
if any_activity {
// No need to send discovery requests if there is an active connection or candidates.
return None;
}
// Waiting for an active wait_lsn request.
while wait_lsn_status.borrow().is_none() {
if wait_lsn_status.changed().await.is_err() {
// wait_lsn_status channel was closed, exiting
warn!("wait_lsn_status channel was closed in connection_manager_loop_step");
return None;
}
}
// All preconditions met, preparing to send a discovery request.
let now = std::time::Instant::now();
let next_discovery_ts = last_discovery_ts
.map(|ts| ts + discovery_request_interval)
.unwrap_or_else(|| now);
if next_discovery_ts > now {
// Prevent sending discovery requests too frequently.
tokio::time::sleep(next_discovery_ts - now).await;
}
let tenant_timeline_id = Some(ProtoTenantTimelineId {
tenant_id: id.tenant_id.as_ref().to_owned(),
timeline_id: id.timeline_id.as_ref().to_owned(),
});
let request = SafekeeperDiscoveryRequest { tenant_timeline_id };
let msg = TypedMessage {
r#type: MessageType::SafekeeperDiscoveryRequest as i32,
safekeeper_timeline_info: None,
safekeeper_discovery_request: Some(request),
safekeeper_discovery_response: None,
};
last_discovery_ts = Some(std::time::Instant::now());
debug!("No active connection and no candidates, sending discovery request to the broker");
// Cancellation safety: we want to send a message to the broker, but publish_one()
// function can get cancelled by the other select! arm. This is absolutely fine, because
// we just want to receive broker updates and discovery is not important if we already
// receive updates.
//
// It is possible that `last_discovery_ts` will be updated, but the message will not be sent.
// This is totally fine because of the reason above.
// This is a fire-and-forget request, we don't care about the response
let _ = broker_client.publish_one(msg).await;
debug!("Discovery request sent to the broker");
None
} => {}
}
if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
@@ -231,7 +303,7 @@ async fn subscribe_for_timeline_updates(
broker_client: &mut BrokerClientChannel,
id: TenantTimelineId,
cancel: &CancellationToken,
) -> Result<Streaming<SafekeeperTimelineInfo>, Cancelled> {
) -> Result<Streaming<TypedMessage>, Cancelled> {
let mut attempt = 0;
loop {
exponential_backoff(
@@ -244,17 +316,27 @@ async fn subscribe_for_timeline_updates(
attempt += 1;
// subscribe to the specific timeline
let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
tenant_id: id.tenant_id.as_ref().to_owned(),
timeline_id: id.timeline_id.as_ref().to_owned(),
});
let request = SubscribeSafekeeperInfoRequest {
subscription_key: Some(key),
let request = SubscribeByFilterRequest {
types: vec![
TypeSubscription {
r#type: MessageType::SafekeeperTimelineInfo as i32,
},
TypeSubscription {
r#type: MessageType::SafekeeperDiscoveryResponse as i32,
},
],
tenant_timeline_id: Some(FilterTenantTimelineId {
enabled: true,
tenant_timeline_id: Some(ProtoTenantTimelineId {
tenant_id: id.tenant_id.as_ref().to_owned(),
timeline_id: id.timeline_id.as_ref().to_owned(),
}),
}),
};
match {
tokio::select! {
r = broker_client.subscribe_safekeeper_info(request) => { r }
r = broker_client.subscribe_by_filter(request) => { r }
_ = cancel.cancelled() => { return Err(Cancelled); }
}
} {
@@ -398,7 +480,7 @@ struct RetryInfo {
/// Data about the timeline to connect to, received from the broker.
#[derive(Debug, Clone)]
struct BrokerSkTimeline {
timeline: SafekeeperTimelineInfo,
timeline: SafekeeperDiscoveryResponse,
/// Time at which the data was fetched from the broker last time, to track the stale data.
latest_update: NaiveDateTime,
}
@@ -606,7 +688,41 @@ impl ConnectionManagerState {
}
/// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) {
fn register_timeline_update(&mut self, typed_msg: TypedMessage) {
let mut is_discovery = false;
let timeline_update = match typed_msg.r#type() {
MessageType::SafekeeperTimelineInfo => {
let info = match typed_msg.safekeeper_timeline_info {
Some(info) => info,
None => {
warn!("bad proto message from broker: no safekeeper_timeline_info");
return;
}
};
SafekeeperDiscoveryResponse {
safekeeper_id: info.safekeeper_id,
tenant_timeline_id: info.tenant_timeline_id,
commit_lsn: info.commit_lsn,
safekeeper_connstr: info.safekeeper_connstr,
availability_zone: info.availability_zone,
}
}
MessageType::SafekeeperDiscoveryResponse => {
is_discovery = true;
match typed_msg.safekeeper_discovery_response {
Some(response) => response,
None => {
warn!("bad proto message from broker: no safekeeper_discovery_response");
return;
}
}
}
_ => {
// unexpected message
return;
}
};
WALRECEIVER_BROKER_UPDATES.inc();
let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
@@ -619,7 +735,11 @@ impl ConnectionManagerState {
);
if old_entry.is_none() {
info!("New SK node was added: {new_safekeeper_id}");
info!(
?is_discovery,
%new_safekeeper_id,
"New SK node was added",
);
WALRECEIVER_CANDIDATES_ADDED.inc();
}
}
@@ -818,7 +938,7 @@ impl ConnectionManagerState {
fn select_connection_candidate(
&self,
node_to_omit: Option<NodeId>,
) -> Option<(NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
) -> Option<(NodeId, &SafekeeperDiscoveryResponse, PgConnectionConfig)> {
self.applicable_connection_candidates()
.filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
.max_by_key(|(_, info, _)| info.commit_lsn)
@@ -828,7 +948,7 @@ impl ConnectionManagerState {
/// Some safekeepers are filtered by the retry cooldown.
fn applicable_connection_candidates(
&self,
) -> impl Iterator<Item = (NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
) -> impl Iterator<Item = (NodeId, &SafekeeperDiscoveryResponse, PgConnectionConfig)> {
let now = Utc::now().naive_utc();
self.wal_stream_candidates
@@ -968,19 +1088,11 @@ mod tests {
latest_update: NaiveDateTime,
) -> BrokerSkTimeline {
BrokerSkTimeline {
timeline: SafekeeperTimelineInfo {
timeline: SafekeeperDiscoveryResponse {
safekeeper_id: 0,
tenant_timeline_id: None,
term: 0,
last_log_term: 0,
flush_lsn: 0,
commit_lsn,
backup_lsn: 0,
remote_consistent_lsn: 0,
peer_horizon_lsn: 0,
local_start_lsn: 0,
safekeeper_connstr: safekeeper_connstr.to_owned(),
http_connstr: safekeeper_connstr.to_owned(),
availability_zone: None,
},
latest_update,

View File

@@ -177,6 +177,10 @@ struct Args {
/// Controls how long backup will wait until uploading the partial segment.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_PARTIAL_BACKUP_TIMEOUT, verbatim_doc_comment)]
partial_backup_timeout: Duration,
/// Disable task to push messages to broker every second. Supposed to
/// be used in tests.
#[arg(long)]
disable_periodic_broker_push: bool,
}
// Like PathBufValueParser, but allows empty string.
@@ -309,6 +313,7 @@ async fn main() -> anyhow::Result<()> {
walsenders_keep_horizon: args.walsenders_keep_horizon,
partial_backup_enabled: args.partial_backup_enabled,
partial_backup_timeout: args.partial_backup_timeout,
disable_periodic_broker_push: args.disable_periodic_broker_push,
};
// initialize sentry if SENTRY_DSN is provided

View File

@@ -10,11 +10,20 @@ use anyhow::Result;
use storage_broker::parse_proto_ttid;
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
use storage_broker::proto::FilterTenantTimelineId;
use storage_broker::proto::MessageType;
use storage_broker::proto::SafekeeperDiscoveryResponse;
use storage_broker::proto::SubscribeByFilterRequest;
use storage_broker::proto::SubscribeSafekeeperInfoRequest;
use storage_broker::proto::TypeSubscription;
use storage_broker::proto::TypedMessage;
use storage_broker::Request;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use std::time::UNIX_EPOCH;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing::*;
@@ -31,6 +40,12 @@ const PUSH_INTERVAL_MSEC: u64 = 1000;
/// Push once in a while data about all active timelines to the broker.
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
if conf.disable_periodic_broker_push {
info!("broker push_loop is disabled, doing nothing...");
futures::future::pending::<()>().await; // sleep forever
return Ok(());
}
let mut client =
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
@@ -75,7 +90,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
}
/// Subscribe and fetch all the interesting data from the broker.
async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
// TODO: subscribe only to local timelines instead of all
@@ -94,6 +109,8 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
let err_counter = BROKER_PULLED_UPDATES.with_label_values(&["error"]);
while let Some(msg) = stream.message().await? {
stats.update_pulled();
let proto_ttid = msg
.tenant_timeline_id
.as_ref()
@@ -119,12 +136,93 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
bail!("end of stream");
}
/// Process incoming discover requests. This is done in a separate task to avoid
/// interfering with the normal pull/push loops.
async fn discover_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
let mut client =
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
let request = SubscribeByFilterRequest {
types: vec![TypeSubscription {
r#type: MessageType::SafekeeperDiscoveryRequest as i32,
}],
tenant_timeline_id: Some(FilterTenantTimelineId {
enabled: false,
tenant_timeline_id: None,
}),
};
let mut stream = client
.subscribe_by_filter(request)
.await
.context("subscribe_by_filter request failed")?
.into_inner();
let discover_counter = BROKER_PULLED_UPDATES.with_label_values(&["discover"]);
while let Some(typed_msg) = stream.message().await? {
stats.update_pulled();
match typed_msg.r#type() {
MessageType::SafekeeperDiscoveryRequest => {
let msg = typed_msg
.safekeeper_discovery_request
.expect("proto type mismatch from broker message");
let proto_ttid = msg
.tenant_timeline_id
.as_ref()
.ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
let ttid = parse_proto_ttid(proto_ttid)?;
if let Ok(tli) = GlobalTimelines::get(ttid) {
// we received a discovery request for a timeline we know about
discover_counter.inc();
// create and reply with discovery response
let sk_info = tli.get_safekeeper_info(&conf).await;
let response = SafekeeperDiscoveryResponse {
safekeeper_id: sk_info.safekeeper_id,
tenant_timeline_id: sk_info.tenant_timeline_id,
commit_lsn: sk_info.commit_lsn,
safekeeper_connstr: sk_info.safekeeper_connstr,
availability_zone: sk_info.availability_zone,
};
// note this is a blocking call
client
.publish_one(TypedMessage {
r#type: MessageType::SafekeeperDiscoveryResponse as i32,
safekeeper_timeline_info: None,
safekeeper_discovery_request: None,
safekeeper_discovery_response: Some(response),
})
.await?;
}
}
_ => {
warn!(
"unexpected message type i32 {}, {:?}",
typed_msg.r#type,
typed_msg.r#type()
);
}
}
}
bail!("end of stream");
}
pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
info!("started, broker endpoint {:?}", conf.broker_endpoint);
let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
let mut push_handle: Option<JoinHandle<Result<(), Error>>> = None;
let mut pull_handle: Option<JoinHandle<Result<(), Error>>> = None;
let mut discover_handle: Option<JoinHandle<Result<(), Error>>> = None;
let stats = Arc::new(BrokerStats::new());
let stats_task = task_stats(stats.clone());
tokio::pin!(stats_task);
// Selecting on JoinHandles requires some squats; is there a better way to
// reap tasks individually?
@@ -153,13 +251,77 @@ pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
};
pull_handle = None;
},
res = async { discover_handle.as_mut().unwrap().await }, if discover_handle.is_some() => {
// was it panic or normal error?
match res {
Ok(res_internal) => if let Err(err_inner) = res_internal {
warn!("discover task failed: {:?}", err_inner);
}
Err(err_outer) => { warn!("discover task panicked: {:?}", err_outer) }
};
discover_handle = None;
},
_ = ticker.tick() => {
if push_handle.is_none() {
push_handle = Some(tokio::spawn(push_loop(conf.clone())));
}
if pull_handle.is_none() {
pull_handle = Some(tokio::spawn(pull_loop(conf.clone())));
pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), stats.clone())));
}
if discover_handle.is_none() {
discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), stats.clone())));
}
},
_ = &mut stats_task => {}
}
}
}
struct BrokerStats {
/// Timestamp of the last received message from the broker.
last_pulled_ts: AtomicU64,
}
impl BrokerStats {
fn new() -> Self {
BrokerStats {
last_pulled_ts: AtomicU64::new(0),
}
}
fn now_millis() -> u64 {
std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time is before epoch")
.as_millis() as u64
}
/// Update last_pulled timestamp to current time.
fn update_pulled(&self) {
self.last_pulled_ts
.store(Self::now_millis(), std::sync::atomic::Ordering::Relaxed);
}
}
/// Periodically write to logs if there are issues with receiving data from the broker.
async fn task_stats(stats: Arc<BrokerStats>) {
let warn_duration = Duration::from_secs(10);
let mut ticker = tokio::time::interval(warn_duration);
loop {
tokio::select! {
_ = ticker.tick() => {
let last_pulled = stats.last_pulled_ts.load(std::sync::atomic::Ordering::SeqCst);
if last_pulled == 0 {
// no broker updates yet
continue;
}
let now = BrokerStats::now_millis();
if now > last_pulled && now - last_pulled > warn_duration.as_millis() as u64 {
let ts = chrono::NaiveDateTime::from_timestamp_millis(last_pulled as i64).expect("invalid timestamp");
info!("no broker updates for some time, last update: {:?}", ts);
}
}
}
}

View File

@@ -83,6 +83,7 @@ pub struct SafeKeeperConf {
pub walsenders_keep_horizon: bool,
pub partial_backup_enabled: bool,
pub partial_backup_timeout: Duration,
pub disable_periodic_broker_push: bool,
}
impl SafeKeeperConf {
@@ -129,6 +130,7 @@ impl SafeKeeperConf {
walsenders_keep_horizon: false,
partial_backup_enabled: false,
partial_backup_timeout: Duration::from_secs(0),
disable_periodic_broker_push: false,
}
}
}

View File

@@ -178,6 +178,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
walsenders_keep_horizon: false,
partial_backup_enabled: false,
partial_backup_timeout: Duration::from_secs(0),
disable_periodic_broker_push: false,
};
let mut global = GlobalMap::new(disk, conf.clone())?;

View File

@@ -196,8 +196,13 @@ impl SubscriptionKey {
/// Parse from FilterTenantTimelineId
pub fn from_proto_filter_tenant_timeline_id(
f: &FilterTenantTimelineId,
opt: Option<&FilterTenantTimelineId>,
) -> Result<Self, Status> {
if opt.is_none() {
return Ok(SubscriptionKey::All);
}
let f = opt.unwrap();
if !f.enabled {
return Ok(SubscriptionKey::All);
}
@@ -534,10 +539,7 @@ impl BrokerService for Broker {
.remote_addr()
.expect("TCPConnectInfo inserted by handler");
let proto_filter = request.into_inner();
let ttid_filter = proto_filter
.tenant_timeline_id
.as_ref()
.ok_or_else(|| Status::new(Code::InvalidArgument, "missing tenant_timeline_id"))?;
let ttid_filter = proto_filter.tenant_timeline_id.as_ref();
let sub_key = SubscriptionKey::from_proto_filter_tenant_timeline_id(ttid_filter)?;
let types_set = proto_filter

View File

@@ -1828,7 +1828,7 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_sk_auth_restart_endpoint")
timeline_id = env.neon_cli.create_branch("test_idle_reconnections")
def collect_stats() -> Dict[str, float]:
# we need to collect safekeeper_pg_queries_received_total metric from all safekeepers
@@ -1859,7 +1859,7 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder):
collect_stats()
endpoint = env.endpoints.create_start("test_sk_auth_restart_endpoint")
endpoint = env.endpoints.create_start("test_idle_reconnections")
# just write something to the timeline
endpoint.safe_psql("create table t(i int)")
collect_stats()
@@ -2007,3 +2007,47 @@ def test_patch_control_file(neon_env_builder: NeonEnvBuilder):
)
log.info(f"dump_control_file response: {res}")
assert res["timelines"][0]["control_file"]["timeline_start_lsn"] == "0/1"
# Test disables periodic pushes from safekeeper to the broker and checks that
# pageserver can still discover safekeepers with discovery requests.
def test_broker_discovery(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_broker_discovery")
endpoint = env.endpoints.create_start(
"test_broker_discovery",
config_lines=["shared_buffers=1MB"],
)
endpoint.safe_psql("create table t(i int, payload text)")
# Install extension containing function needed to clear buffer
endpoint.safe_psql("CREATE EXTENSION neon_test_utils")
def do_something():
time.sleep(1)
# generate some data to commit WAL on safekeepers
endpoint.safe_psql("insert into t select generate_series(1,100), 'action'")
# clear the buffers
endpoint.safe_psql("select clear_buffer_cache()")
# read data to fetch pages from pageserver
endpoint.safe_psql("select sum(i) from t")
do_something()
do_something()
for sk in env.safekeepers:
# Disable periodic broker push, so pageserver won't be able to discover
# safekeepers without sending a discovery request
sk.stop().start(extra_opts=["--disable-periodic-broker-push"])
do_something()
do_something()
# restart pageserver and check how everything works
env.pageserver.stop().start()
do_something()
do_something()