mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
In safekeepers we have several background tasks. Previously `WAL backup` task was spawned by another task called `wal_backup_launcher`. That task received notifications via `wal_backup_launcher_rx` and decided to spawn or kill existing backup task associated with the timeline. This was inconvenient because each code segment that touched shared state was responsible for pushing notification into `wal_backup_launcher_tx` channel. This was error prone because it's easy to miss and could lead to deadlock in some cases, if notification pushing was done in the wrong order. We also had a similar issue with `is_active` timeline flag. That flag was calculated based on the state and code modifying the state had to call function to update the flag. We had a few bugs related to that, when we forgot to update `is_active` flag in some places where it could change. To fix these issues, this PR adds a new `timeline_manager` background task associated with each timeline. This task is responsible for managing all background tasks, including `is_active` flag which is used for pushing broker messages. It is subscribed for updates in timeline state in a loop and decides to spawn/kill background tasks when needed. There is a new structure called `TimelinesSet`. It stores a set of `Arc<Timeline>` and allows to copy the set to iterate without holding the mutex. This is what replaced `is_active` flag for the broker. Now broker push task holds a reference to the `TimelinesSet` with active timelines and use it instead of iterating over all timelines and filtering by `is_active` flag. Also added some metrics for manager iterations and active backup tasks. Ideally manager should be doing not too many iterations and we should not have a lot of backup tasks spawned at the same time. Fixes #7751 --------- Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
327 lines
12 KiB
Rust
327 lines
12 KiB
Rust
//! Communication with the broker, providing safekeeper peers and pageserver coordination.
|
|
|
|
use anyhow::anyhow;
|
|
use anyhow::bail;
|
|
use anyhow::Context;
|
|
|
|
use anyhow::Error;
|
|
use anyhow::Result;
|
|
|
|
use storage_broker::parse_proto_ttid;
|
|
|
|
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
|
|
use storage_broker::proto::FilterTenantTimelineId;
|
|
use storage_broker::proto::MessageType;
|
|
use storage_broker::proto::SafekeeperDiscoveryResponse;
|
|
use storage_broker::proto::SubscribeByFilterRequest;
|
|
use storage_broker::proto::SubscribeSafekeeperInfoRequest;
|
|
use storage_broker::proto::TypeSubscription;
|
|
use storage_broker::proto::TypedMessage;
|
|
use storage_broker::Request;
|
|
|
|
use std::sync::atomic::AtomicU64;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use std::time::Instant;
|
|
use std::time::UNIX_EPOCH;
|
|
use tokio::task::JoinHandle;
|
|
use tokio::time::sleep;
|
|
use tracing::*;
|
|
|
|
use crate::metrics::BROKER_ITERATION_TIMELINES;
|
|
use crate::metrics::BROKER_PULLED_UPDATES;
|
|
use crate::metrics::BROKER_PUSHED_UPDATES;
|
|
use crate::metrics::BROKER_PUSH_ALL_UPDATES_SECONDS;
|
|
use crate::GlobalTimelines;
|
|
use crate::SafeKeeperConf;
|
|
|
|
const RETRY_INTERVAL_MSEC: u64 = 1000;
|
|
const PUSH_INTERVAL_MSEC: u64 = 1000;
|
|
|
|
/// Push once in a while data about all active timelines to the broker.
|
|
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
|
if conf.disable_periodic_broker_push {
|
|
info!("broker push_loop is disabled, doing nothing...");
|
|
futures::future::pending::<()>().await; // sleep forever
|
|
return Ok(());
|
|
}
|
|
|
|
let active_timelines_set = GlobalTimelines::get_global_broker_active_set();
|
|
|
|
let mut client =
|
|
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
|
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
|
|
|
|
let outbound = async_stream::stream! {
|
|
loop {
|
|
// Note: we lock runtime here and in timeline methods as GlobalTimelines
|
|
// is under plain mutex. That's ok, all this code is not performance
|
|
// sensitive and there is no risk of deadlock as we don't await while
|
|
// lock is held.
|
|
let now = Instant::now();
|
|
let all_tlis = active_timelines_set.get_all();
|
|
let mut n_pushed_tlis = 0;
|
|
for tli in &all_tlis {
|
|
let sk_info = tli.get_safekeeper_info(&conf).await;
|
|
yield sk_info;
|
|
BROKER_PUSHED_UPDATES.inc();
|
|
n_pushed_tlis += 1;
|
|
}
|
|
let elapsed = now.elapsed();
|
|
|
|
BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
|
|
BROKER_ITERATION_TIMELINES.observe(n_pushed_tlis as f64);
|
|
|
|
if elapsed > push_interval / 2 {
|
|
info!("broker push is too long, pushed {} timeline updates to broker in {:?}", n_pushed_tlis, elapsed);
|
|
}
|
|
|
|
sleep(push_interval).await;
|
|
}
|
|
};
|
|
client
|
|
.publish_safekeeper_info(Request::new(outbound))
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Subscribe and fetch all the interesting data from the broker.
|
|
#[instrument(name = "broker pull", skip_all)]
|
|
async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
|
|
let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
|
|
|
|
// TODO: subscribe only to local timelines instead of all
|
|
let request = SubscribeSafekeeperInfoRequest {
|
|
subscription_key: Some(ProtoSubscriptionKey::All(())),
|
|
};
|
|
|
|
let mut stream = client
|
|
.subscribe_safekeeper_info(request)
|
|
.await
|
|
.context("subscribe_safekeper_info request failed")?
|
|
.into_inner();
|
|
|
|
let ok_counter = BROKER_PULLED_UPDATES.with_label_values(&["ok"]);
|
|
let not_found = BROKER_PULLED_UPDATES.with_label_values(&["not_found"]);
|
|
let err_counter = BROKER_PULLED_UPDATES.with_label_values(&["error"]);
|
|
|
|
while let Some(msg) = stream.message().await? {
|
|
stats.update_pulled();
|
|
|
|
let proto_ttid = msg
|
|
.tenant_timeline_id
|
|
.as_ref()
|
|
.ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
|
|
let ttid = parse_proto_ttid(proto_ttid)?;
|
|
if let Ok(tli) = GlobalTimelines::get(ttid) {
|
|
// Note that we also receive *our own* info. That's
|
|
// important, as it is used as an indication of live
|
|
// connection to the broker.
|
|
|
|
// note: there are blocking operations below, but it's considered fine for now
|
|
let res = tli.record_safekeeper_info(msg).await;
|
|
if res.is_ok() {
|
|
ok_counter.inc();
|
|
} else {
|
|
err_counter.inc();
|
|
}
|
|
res?;
|
|
} else {
|
|
not_found.inc();
|
|
}
|
|
}
|
|
bail!("end of stream");
|
|
}
|
|
|
|
/// Process incoming discover requests. This is done in a separate task to avoid
|
|
/// interfering with the normal pull/push loops.
|
|
async fn discover_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
|
|
let mut client =
|
|
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
|
|
|
let request = SubscribeByFilterRequest {
|
|
types: vec![TypeSubscription {
|
|
r#type: MessageType::SafekeeperDiscoveryRequest as i32,
|
|
}],
|
|
tenant_timeline_id: Some(FilterTenantTimelineId {
|
|
enabled: false,
|
|
tenant_timeline_id: None,
|
|
}),
|
|
};
|
|
|
|
let mut stream = client
|
|
.subscribe_by_filter(request)
|
|
.await
|
|
.context("subscribe_by_filter request failed")?
|
|
.into_inner();
|
|
|
|
let discover_counter = BROKER_PULLED_UPDATES.with_label_values(&["discover"]);
|
|
|
|
while let Some(typed_msg) = stream.message().await? {
|
|
stats.update_pulled();
|
|
|
|
match typed_msg.r#type() {
|
|
MessageType::SafekeeperDiscoveryRequest => {
|
|
let msg = typed_msg
|
|
.safekeeper_discovery_request
|
|
.expect("proto type mismatch from broker message");
|
|
|
|
let proto_ttid = msg
|
|
.tenant_timeline_id
|
|
.as_ref()
|
|
.ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
|
|
let ttid = parse_proto_ttid(proto_ttid)?;
|
|
if let Ok(tli) = GlobalTimelines::get(ttid) {
|
|
// we received a discovery request for a timeline we know about
|
|
discover_counter.inc();
|
|
|
|
// create and reply with discovery response
|
|
let sk_info = tli.get_safekeeper_info(&conf).await;
|
|
let response = SafekeeperDiscoveryResponse {
|
|
safekeeper_id: sk_info.safekeeper_id,
|
|
tenant_timeline_id: sk_info.tenant_timeline_id,
|
|
commit_lsn: sk_info.commit_lsn,
|
|
safekeeper_connstr: sk_info.safekeeper_connstr,
|
|
availability_zone: sk_info.availability_zone,
|
|
standby_horizon: 0,
|
|
};
|
|
|
|
// note this is a blocking call
|
|
client
|
|
.publish_one(TypedMessage {
|
|
r#type: MessageType::SafekeeperDiscoveryResponse as i32,
|
|
safekeeper_timeline_info: None,
|
|
safekeeper_discovery_request: None,
|
|
safekeeper_discovery_response: Some(response),
|
|
})
|
|
.await?;
|
|
}
|
|
}
|
|
|
|
_ => {
|
|
warn!(
|
|
"unexpected message type i32 {}, {:?}",
|
|
typed_msg.r#type,
|
|
typed_msg.r#type()
|
|
);
|
|
}
|
|
}
|
|
}
|
|
bail!("end of stream");
|
|
}
|
|
|
|
pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
|
info!("started, broker endpoint {:?}", conf.broker_endpoint);
|
|
|
|
let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
|
|
let mut push_handle: Option<JoinHandle<Result<(), Error>>> = None;
|
|
let mut pull_handle: Option<JoinHandle<Result<(), Error>>> = None;
|
|
let mut discover_handle: Option<JoinHandle<Result<(), Error>>> = None;
|
|
|
|
let stats = Arc::new(BrokerStats::new());
|
|
let stats_task = task_stats(stats.clone());
|
|
tokio::pin!(stats_task);
|
|
|
|
// Selecting on JoinHandles requires some squats; is there a better way to
|
|
// reap tasks individually?
|
|
|
|
// Handling failures in task itself won't catch panic and in Tokio, task's
|
|
// panic doesn't kill the whole executor, so it is better to do reaping
|
|
// here.
|
|
loop {
|
|
tokio::select! {
|
|
res = async { push_handle.as_mut().unwrap().await }, if push_handle.is_some() => {
|
|
// was it panic or normal error?
|
|
let err = match res {
|
|
Ok(res_internal) => res_internal.unwrap_err(),
|
|
Err(err_outer) => err_outer.into(),
|
|
};
|
|
warn!("push task failed: {:?}", err);
|
|
push_handle = None;
|
|
},
|
|
res = async { pull_handle.as_mut().unwrap().await }, if pull_handle.is_some() => {
|
|
// was it panic or normal error?
|
|
match res {
|
|
Ok(res_internal) => if let Err(err_inner) = res_internal {
|
|
warn!("pull task failed: {:?}", err_inner);
|
|
}
|
|
Err(err_outer) => { warn!("pull task panicked: {:?}", err_outer) }
|
|
};
|
|
pull_handle = None;
|
|
},
|
|
res = async { discover_handle.as_mut().unwrap().await }, if discover_handle.is_some() => {
|
|
// was it panic or normal error?
|
|
match res {
|
|
Ok(res_internal) => if let Err(err_inner) = res_internal {
|
|
warn!("discover task failed: {:?}", err_inner);
|
|
}
|
|
Err(err_outer) => { warn!("discover task panicked: {:?}", err_outer) }
|
|
};
|
|
discover_handle = None;
|
|
},
|
|
_ = ticker.tick() => {
|
|
if push_handle.is_none() {
|
|
push_handle = Some(tokio::spawn(push_loop(conf.clone())));
|
|
}
|
|
if pull_handle.is_none() {
|
|
pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), stats.clone())));
|
|
}
|
|
if discover_handle.is_none() {
|
|
discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), stats.clone())));
|
|
}
|
|
},
|
|
_ = &mut stats_task => {}
|
|
}
|
|
}
|
|
}
|
|
|
|
struct BrokerStats {
|
|
/// Timestamp of the last received message from the broker.
|
|
last_pulled_ts: AtomicU64,
|
|
}
|
|
|
|
impl BrokerStats {
|
|
fn new() -> Self {
|
|
BrokerStats {
|
|
last_pulled_ts: AtomicU64::new(0),
|
|
}
|
|
}
|
|
|
|
fn now_millis() -> u64 {
|
|
std::time::SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.expect("time is before epoch")
|
|
.as_millis() as u64
|
|
}
|
|
|
|
/// Update last_pulled timestamp to current time.
|
|
fn update_pulled(&self) {
|
|
self.last_pulled_ts
|
|
.store(Self::now_millis(), std::sync::atomic::Ordering::Relaxed);
|
|
}
|
|
}
|
|
|
|
/// Periodically write to logs if there are issues with receiving data from the broker.
|
|
async fn task_stats(stats: Arc<BrokerStats>) {
|
|
let warn_duration = Duration::from_secs(10);
|
|
let mut ticker = tokio::time::interval(warn_duration);
|
|
|
|
loop {
|
|
tokio::select! {
|
|
_ = ticker.tick() => {
|
|
let last_pulled = stats.last_pulled_ts.load(std::sync::atomic::Ordering::SeqCst);
|
|
if last_pulled == 0 {
|
|
// no broker updates yet
|
|
continue;
|
|
}
|
|
|
|
let now = BrokerStats::now_millis();
|
|
if now > last_pulled && now - last_pulled > warn_duration.as_millis() as u64 {
|
|
let ts = chrono::DateTime::from_timestamp_millis(last_pulled as i64).expect("invalid timestamp");
|
|
info!("no broker updates for some time, last update: {:?}", ts);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|