diff --git a/libs/etcd_broker/src/lib.rs b/libs/etcd_broker/src/lib.rs index 38d4a403c2..8f698977a9 100644 --- a/libs/etcd_broker/src/lib.rs +++ b/libs/etcd_broker/src/lib.rs @@ -6,17 +6,13 @@ pub mod subscription_key; /// All broker values, possible to use when dealing with etcd. pub mod subscription_value; -use std::{ - collections::{hash_map, HashMap}, - str::FromStr, -}; +use std::str::FromStr; use serde::de::DeserializeOwned; use subscription_key::SubscriptionKey; use tokio::{sync::mpsc, task::JoinHandle}; use tracing::*; -use utils::zid::{NodeId, ZTenantTimelineId}; use crate::subscription_key::SubscriptionFullKey; @@ -28,18 +24,17 @@ pub const DEFAULT_NEON_BROKER_ETCD_PREFIX: &str = "neon"; /// A way to control the data retrieval from a certain subscription. pub struct BrokerSubscription { - value_updates: mpsc::UnboundedReceiver>>, + /// An unbounded channel to fetch the relevant etcd updates from. + pub value_updates: mpsc::UnboundedReceiver>, key: SubscriptionKey, - watcher_handle: JoinHandle>, + /// A subscription task handle, to allow waiting on it for the task to complete. + /// Both the updates channel and the handle require `&mut`, so it's better to keep + /// both `pub` to allow using both in the same structures without borrow checker complaining. + pub watcher_handle: JoinHandle>, watcher: Watcher, } impl BrokerSubscription { - /// Asynchronously polls for more data from the subscription, suspending the current future if there's no data sent yet. - pub async fn fetch_data(&mut self) -> Option>> { - self.value_updates.recv().await - } - /// Cancels the subscription, stopping the data poller and waiting for it to shut down. pub async fn cancel(mut self) -> Result<(), BrokerError> { self.watcher.cancel().await.map_err(|e| { @@ -48,15 +43,41 @@ impl BrokerSubscription { format!("Failed to cancel broker subscription, kind: {:?}", self.key), ) })?; - self.watcher_handle.await.map_err(|e| { - BrokerError::InternalError(format!( - "Failed to join the broker value updates task, kind: {:?}, error: {e}", - self.key - )) - })? + match (&mut self.watcher_handle).await { + Ok(res) => res, + Err(e) => { + if e.is_cancelled() { + // don't error on the tasks that are cancelled already + Ok(()) + } else { + Err(BrokerError::InternalError(format!( + "Panicked during broker subscription task, kind: {:?}, error: {e}", + self.key + ))) + } + } + } } } +impl Drop for BrokerSubscription { + fn drop(&mut self) { + // we poll data from etcd into the channel in the same struct, so if the whole struct gets dropped, + // no more data is used by the receiver and it's safe to cancel and drop the whole etcd subscription task. + self.watcher_handle.abort(); + } +} + +/// An update from the etcd broker. +pub struct BrokerUpdate { + /// Etcd generation version, the bigger the more actual the data is. + pub etcd_version: i64, + /// Etcd key for the corresponding value, parsed from the broker KV. + pub key: SubscriptionFullKey, + /// Current etcd value, parsed from the broker KV. + pub value: V, +} + #[derive(Debug, thiserror::Error)] pub enum BrokerError { #[error("Etcd client error: {0}. Context: {1}")] @@ -124,41 +145,21 @@ where break; } - let mut value_updates: HashMap> = HashMap::new(); - // Keep track that the timeline data updates from etcd arrive in the right order. - // https://etcd.io/docs/v3.5/learning/api_guarantees/#isolation-level-and-consistency-of-replicas - // > etcd does not ensure linearizability for watch operations. Users are expected to verify the revision of watch responses to ensure correct ordering. - let mut value_etcd_versions: HashMap = HashMap::new(); - - let events = resp.events(); debug!("Processing {} events", events.len()); for event in events { if EventType::Put == event.event_type() { if let Some(new_etcd_kv) = event.kv() { - let new_kv_version = new_etcd_kv.version(); - match parse_etcd_kv(new_etcd_kv, &value_parser, &key.cluster_prefix) { - Ok(Some((key, value))) => match value_updates - .entry(key.id) - .or_default() - .entry(key.node_id) - { - hash_map::Entry::Occupied(mut o) => { - let old_etcd_kv_version = value_etcd_versions.get(&key.id).copied().unwrap_or(i64::MIN); - if old_etcd_kv_version < new_kv_version { - o.insert(value); - value_etcd_versions.insert(key.id,new_kv_version); - } else { - debug!("Skipping etcd timeline update due to older version compared to one that's already stored"); - } - } - hash_map::Entry::Vacant(v) => { - v.insert(value); - value_etcd_versions.insert(key.id,new_kv_version); - } - }, + Ok(Some((key, value))) => if let Err(e) = value_updates_sender.send(BrokerUpdate { + etcd_version: new_etcd_kv.version(), + key, + value, + }) { + info!("Broker value updates for key {key:?} sender got dropped, exiting: {e}"); + break; + }, Ok(None) => debug!("Ignoring key {key:?} : no value was returned by the parser"), Err(BrokerError::KeyNotParsed(e)) => debug!("Unexpected key {key:?} for timeline update: {e}"), Err(e) => error!("Failed to represent etcd KV {new_etcd_kv:?}: {e}"), @@ -166,13 +167,6 @@ where } } } - - if !value_updates.is_empty() { - if let Err(e) = value_updates_sender.send(value_updates) { - info!("Broker value updates for key {key:?} sender got dropped, exiting: {e}"); - break; - } - } } Ok(()) diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 82401e1d8c..fd9468a101 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -15,66 +15,38 @@ //! //! * handle the actual connection and WAL streaming //! -//! Handle happens dynamically, by portions of WAL being processed and registered in the server. +//! Handling happens dynamically, by portions of WAL being processed and registered in the server. //! Along with the registration, certain metadata is written to show WAL streaming progress and rely on that when considering safekeepers for connection. //! -//! ## Implementation details -//! -//! WAL receiver's implementation consists of 3 kinds of nested loops, separately handling the logic from the bullets above: -//! -//! * [`init_wal_receiver_main_thread`], a wal receiver main thread, containing the control async loop: timeline addition/removal and interruption of a whole thread handling. -//! The loop is infallible, always trying to continue with the new tasks, the only place where it can fail is its initialization. -//! All of the code inside the loop is either async or a spawn_blocking wrapper around the sync code. -//! -//! * [`timeline_wal_broker_loop_step`], a broker task, handling the etcd broker subscription and polling, safekeeper selection logic and [re]connects. -//! On every concequent broker/wal streamer connection attempt, the loop steps are forced to wait for some time before running, -//! increasing with the number of attempts (capped with some fixed value). -//! This is done endlessly, to ensure we don't miss the WAL streaming when it gets available on one of the safekeepers. -//! -//! Apart from the broker management, it keeps the wal streaming connection open, with the safekeeper having the most advanced timeline state. -//! The connection could be closed from safekeeper side (with error or not), could be cancelled from pageserver side from time to time. -//! -//! * [`connection_handler::handle_walreceiver_connection`], a wal streaming task, opening the libpq connection and reading the data out of it to the end. -//! Does periodic reporting of the progress, to share some of the data via external HTTP API and to ensure we're able to switch connections when needed. -//! -//! Every task is cancellable via its separate cancellation channel, -//! also every such task's dependency (broker subscription or the data source channel) cancellation/drop triggers the corresponding task cancellation either. +//! The current module contains high-level primitives used in the submodules; general synchronization, timeline acknowledgement and shutdown logic. -mod connection_handler; +mod connection_manager; +mod walreceiver_connection; -use crate::config::PageServerConf; -use crate::http::models::WalReceiverEntry; -use crate::repository::Timeline; -use crate::tenant_mgr::{self, LocalTimelineUpdate, TenantState}; -use crate::thread_mgr::ThreadKind; -use crate::{thread_mgr, DatadirTimelineImpl}; use anyhow::{ensure, Context}; -use chrono::{NaiveDateTime, Utc}; -use etcd_broker::{ - subscription_key::SubscriptionKey, subscription_value::SkTimelineInfo, BrokerSubscription, - Client, -}; +use etcd_broker::Client; use itertools::Itertools; use once_cell::sync::Lazy; use std::cell::Cell; use std::collections::{hash_map, HashMap, HashSet}; +use std::future::Future; use std::num::NonZeroU64; -use std::ops::ControlFlow; use std::sync::Arc; use std::thread_local; use std::time::Duration; -use tokio::select; use tokio::{ + select, sync::{mpsc, watch, RwLock}, task::JoinHandle, }; use tracing::*; use url::Url; -use utils::lsn::Lsn; -use utils::pq_proto::ReplicationFeedback; -use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId}; -use self::connection_handler::{WalConnectionEvent, WalReceiverConnection}; +use crate::config::PageServerConf; +use crate::http::models::WalReceiverEntry; +use crate::tenant_mgr::{self, LocalTimelineUpdate, TenantState}; +use crate::thread_mgr::{self, ThreadKind}; +use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; thread_local! { // Boolean that is true only for WAL receiver threads @@ -125,7 +97,7 @@ pub fn init_wal_receiver_main_thread( .build() .context("Failed to create storage sync runtime")?; let etcd_client = runtime - .block_on(etcd_broker::Client::connect(etcd_endpoints, None)) + .block_on(Client::connect(etcd_endpoints, None)) .context("Failed to connect to etcd")?; thread_mgr::spawn( @@ -162,6 +134,97 @@ pub fn init_wal_receiver_main_thread( .context("Failed to spawn wal receiver main thread") } +async fn shutdown_all_wal_connections( + local_timeline_wal_receivers: &mut HashMap>>, +) { + info!("Shutting down all WAL connections"); + let mut broker_join_handles = Vec::new(); + for (tenant_id, timelines) in local_timeline_wal_receivers.drain() { + for (timeline_id, handles) in timelines { + handles.cancellation.send(()).ok(); + broker_join_handles.push(( + ZTenantTimelineId::new(tenant_id, timeline_id), + handles.handle, + )); + } + } + + let mut tenants = HashSet::with_capacity(broker_join_handles.len()); + for (id, broker_join_handle) in broker_join_handles { + tenants.insert(id.tenant_id); + debug!("Waiting for wal broker for timeline {id} to finish"); + if let Err(e) = broker_join_handle.await { + error!("Failed to join on wal broker for timeline {id}: {e}"); + } + } + if let Err(e) = tokio::task::spawn_blocking(move || { + for tenant_id in tenants { + if let Err(e) = tenant_mgr::set_tenant_state(tenant_id, TenantState::Idle) { + error!("Failed to make tenant {tenant_id} idle: {e:?}"); + } + } + }) + .await + { + error!("Failed to await a task to make all tenants idle: {e:?}"); + } +} + +/// A handle of an asynchronous task. +/// The task has a channel that it can use to communicate its lifecycle events in a certain form, see [`TaskEvent`] +/// and a cancellation channel that it can listen to for earlier interrupts. +/// +/// Note that the communication happens via the `watch` channel, that does not accumulate the events, replacing the old one with the never one on submission. +/// That may lead to certain events not being observed by the listener. +#[derive(Debug)] +struct TaskHandle { + handle: JoinHandle<()>, + events_receiver: watch::Receiver>, + cancellation: watch::Sender<()>, +} + +#[derive(Debug, Clone)] +pub enum TaskEvent { + Started, + NewEvent(E), + End(Result<(), String>), +} + +impl TaskHandle { + /// Initializes the task, starting it immediately after the creation. + pub fn spawn( + task: impl FnOnce(Arc>>, watch::Receiver<()>) -> Fut + Send + 'static, + ) -> Self + where + Fut: Future> + Send, + E: Sync + Send + 'static, + { + let (cancellation, cancellation_receiver) = watch::channel(()); + let (events_sender, events_receiver) = watch::channel(TaskEvent::Started); + let events_sender = Arc::new(events_sender); + + let sender = Arc::clone(&events_sender); + let handle = tokio::task::spawn(async move { + let task_result = task(sender, cancellation_receiver).await; + events_sender.send(TaskEvent::End(task_result)).ok(); + }); + + TaskHandle { + handle, + events_receiver, + cancellation, + } + } + + /// Aborts current task, waiting for it to finish. + async fn shutdown(self) { + self.cancellation.send(()).ok(); + if let Err(e) = self.handle.await { + error!("Task failed to shut down: {e}") + } + } +} + /// A step to process timeline attach/detach events to enable/disable the corresponding WAL receiver machinery. /// In addition to WAL streaming management, the step ensures that corresponding tenant has its service threads enabled or disabled. /// This is done here, since only walreceiver knows when a certain tenant has no streaming enabled. @@ -171,10 +234,7 @@ async fn wal_receiver_main_thread_loop_step<'a>( broker_prefix: &'a str, etcd_client: &'a Client, timeline_updates_receiver: &'a mut mpsc::UnboundedReceiver, - local_timeline_wal_receivers: &'a mut HashMap< - ZTenantId, - HashMap, - >, + local_timeline_wal_receivers: &'a mut HashMap>>, ) { // Only react on updates from [`tenant_mgr`] on local timeline attach/detach. match timeline_updates_receiver.recv().await { @@ -185,13 +245,8 @@ async fn wal_receiver_main_thread_loop_step<'a>( LocalTimelineUpdate::Detach(id) => { match local_timeline_wal_receivers.get_mut(&id.tenant_id) { Some(wal_receivers) => { - if let hash_map::Entry::Occupied(mut o) = wal_receivers.entry(id.timeline_id) { - if let Err(e) = o.get_mut().shutdown(id).await { - error!("Failed to shut down timeline {id} wal receiver handle: {e:#}"); - return; - } else { - o.remove(); - } + if let hash_map::Entry::Occupied(o) = wal_receivers.entry(id.timeline_id) { + o.remove().shutdown().await } if wal_receivers.is_empty() { if let Err(e) = change_tenant_state(id.tenant_id, TenantState::Idle).await { @@ -207,11 +262,11 @@ async fn wal_receiver_main_thread_loop_step<'a>( } // Timeline got attached, retrieve all necessary information to start its broker loop and maintain this loop endlessly. LocalTimelineUpdate::Attach(new_id, new_timeline) => { - let timelines = local_timeline_wal_receivers + let timeline_connection_managers = local_timeline_wal_receivers .entry(new_id.tenant_id) .or_default(); - if timelines.is_empty() { + if timeline_connection_managers.is_empty() { if let Err(e) = change_tenant_state(new_id.tenant_id, TenantState::Active).await { @@ -220,13 +275,14 @@ async fn wal_receiver_main_thread_loop_step<'a>( } } - let vacant_timeline_entry = match timelines.entry(new_id.timeline_id) { - hash_map::Entry::Occupied(_) => { - debug!("Attepted to readd an existing timeline {new_id}, ignoring"); - return; - } - hash_map::Entry::Vacant(v) => v, - }; + let vacant_connection_manager_entry = + match timeline_connection_managers.entry(new_id.timeline_id) { + hash_map::Entry::Occupied(_) => { + debug!("Attepted to readd an existing timeline {new_id}, ignoring"); + return; + } + hash_map::Entry::Vacant(v) => v, + }; let (wal_connect_timeout, lagging_wal_timeout, max_lsn_wal_lag) = match fetch_tenant_settings(new_id.tenant_id).await { @@ -248,48 +304,17 @@ async fn wal_receiver_main_thread_loop_step<'a>( ); } - let (cancellation_sender, mut cancellation_receiver) = watch::channel(()); - let mut wal_connection_manager = WalConnectionManager { - id: new_id, - timeline: Arc::clone(&new_timeline), - wal_connect_timeout, - lagging_wal_timeout, - max_lsn_wal_lag, - wal_connection_data: None, - wal_connection_attempt: 0, - }; - - let broker_prefix = broker_prefix.to_string(); - let mut loop_client = etcd_client.clone(); - let broker_join_handle = tokio::spawn(async move { - info!("WAL receiver broker started, connecting to etcd"); - let mut cancellation = cancellation_receiver.clone(); - loop { - select! { - _ = cancellation.changed() => { - info!("Wal broker loop cancelled, shutting down"); - break; - }, - step_result = timeline_wal_broker_loop_step( - &broker_prefix, - &mut loop_client, - &mut wal_connection_manager, - &mut cancellation_receiver, - ) => match step_result { - Ok(ControlFlow::Break(())) => { - break; - } - Ok(ControlFlow::Continue(())) => {} - Err(e) => warn!("Error during wal receiver main thread step for timeline {new_id}: {e:#}"), - } - } - } - }.instrument(info_span!("timeline", id = %new_id))); - - vacant_timeline_entry.insert(TimelineWalBrokerLoopHandle { - broker_join_handle, - cancellation_sender, - }); + vacant_connection_manager_entry.insert( + connection_manager::spawn_connection_manager_task( + new_id, + broker_prefix.to_owned(), + etcd_client.clone(), + new_timeline, + wal_connect_timeout, + lagging_wal_timeout, + max_lsn_wal_lag, + ), + ); } } } @@ -324,859 +349,3 @@ async fn change_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> an .await .with_context(|| format!("Failed to spawn activation task for tenant {tenant_id}"))? } - -async fn exponential_backoff(n: u32, base: f64, max_seconds: f64) { - if n == 0 { - return; - } - let seconds_to_wait = base.powf(f64::from(n) - 1.0).min(max_seconds); - info!("Backoff: waiting {seconds_to_wait} seconds before proceeding with the task"); - tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await; -} - -async fn shutdown_all_wal_connections( - local_timeline_wal_receivers: &mut HashMap< - ZTenantId, - HashMap, - >, -) { - info!("Shutting down all WAL connections"); - let mut broker_join_handles = Vec::new(); - for (tenant_id, timelines) in local_timeline_wal_receivers.drain() { - for (timeline_id, handles) in timelines { - handles.cancellation_sender.send(()).ok(); - broker_join_handles.push(( - ZTenantTimelineId::new(tenant_id, timeline_id), - handles.broker_join_handle, - )); - } - } - - let mut tenants = HashSet::with_capacity(broker_join_handles.len()); - for (id, broker_join_handle) in broker_join_handles { - tenants.insert(id.tenant_id); - debug!("Waiting for wal broker for timeline {id} to finish"); - if let Err(e) = broker_join_handle.await { - error!("Failed to join on wal broker for timeline {id}: {e}"); - } - } - if let Err(e) = tokio::task::spawn_blocking(move || { - for tenant_id in tenants { - if let Err(e) = tenant_mgr::set_tenant_state(tenant_id, TenantState::Idle) { - error!("Failed to make tenant {tenant_id} idle: {e:?}"); - } - } - }) - .await - { - error!("Failed to spawn a task to make all tenants idle: {e:?}"); - } -} - -/// Broker WAL loop handle to cancel the loop safely when needed. -struct TimelineWalBrokerLoopHandle { - broker_join_handle: JoinHandle<()>, - cancellation_sender: watch::Sender<()>, -} - -impl TimelineWalBrokerLoopHandle { - /// Stops the broker loop, waiting for its current task to finish. - async fn shutdown(&mut self, id: ZTenantTimelineId) -> anyhow::Result<()> { - self.cancellation_sender.send(()).context( - "Unexpected: cancellation sender is dropped before the receiver in the loop is", - )?; - debug!("Waiting for wal receiver for timeline {id} to finish"); - let handle = &mut self.broker_join_handle; - handle - .await - .with_context(|| format!("Failed to join the wal reveiver broker for timeline {id}")) - } -} - -/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker. -/// Based on the updates, desides whether to start, keep or stop a WAL receiver task. -async fn timeline_wal_broker_loop_step( - broker_prefix: &str, - etcd_client: &mut Client, - wal_connection_manager: &mut WalConnectionManager, - cancellation: &mut watch::Receiver<()>, -) -> anyhow::Result> { - let id = wal_connection_manager.id; - - // Endlessly try to subscribe for broker updates for a given timeline. - // If there are no safekeepers to maintain the lease, the timeline subscription will be inavailable in the broker and the operation will fail constantly. - // This is ok, pageservers should anyway try subscribing (with some backoff) since it's the only way they can get the timeline WAL anyway. - let mut broker_subscription: BrokerSubscription; - let mut attempt = 0; - loop { - select! { - _ = cancellation.changed() => { - info!("Subscription backoff cancelled, shutting down"); - return Ok(ControlFlow::Break(())); - }, - _ = exponential_backoff(attempt, 2.0, 60.0) => {}, - } - attempt += 1; - - select! { - _ = cancellation.changed() => { - info!("Broker subscription loop cancelled, shutting down"); - return Ok(ControlFlow::Break(())); - }, - new_subscription = etcd_broker::subscribe_for_json_values( - etcd_client, - SubscriptionKey::sk_timeline_info(broker_prefix.to_owned(), id), - ) - .instrument(info_span!("etcd_subscription")) => match new_subscription { - Ok(new_subscription) => { - broker_subscription = new_subscription; - break; - } - Err(e) => { - warn!("Attempt #{attempt}, failed to subscribe for timeline {id} updates in etcd: {e:#}"); - continue; - } - }, - - } - } - - info!("Subscribed for etcd timeline changes, considering walreceiver connections"); - - loop { - select! { - // the order of the polls is especially important here, since the first task to complete gets selected and the others get dropped (cancelled). - // place more frequetly updated tasks below to ensure the "slow" tasks are also reacted to. - biased; - // first, the cancellations are checked, to ensure we exit eagerly - _ = cancellation.changed() => { - info!("Broker loop cancelled, shutting down"); - break; - } - // then, we check for new events from the WAL connection: the existing connection should either return some progress data, - // or block, allowing other tasks in this `select!` to run first. - // - // We set a "timebomb" in the polling method, that waits long enough and cancels the entire loop if nothing happens during the wait. - // The wait is only initiated when no data (or a "channel closed" data) is received from the loop, ending with the break flow return. - // While waiting, more broker events are expected to be retrieved from etcd (currently, every safekeeper posts ~1 message/second). - // The timebomb ensures that we don't get stuck for too long on any of the WAL/etcd event polling, rather restarting the subscription entirely. - // - // We cannot return here eagerly on no WAL task data, since the result will get selected to early, not allowing etcd tasks to be polled properly. - // We cannot move etcd tasks above this select, since they are very frequent to finish and WAL events might get ignored. - // We need WAL events to periodically update the external data, so we cannot simply await the task result on the handler here. - wal_receiver_poll_result = wal_connection_manager.poll_connection_event_or_cancel() => match wal_receiver_poll_result { - ControlFlow::Break(()) => break, - ControlFlow::Continue(()) => {}, - }, - // finally, if no other tasks are completed, get another broker update and possibly reconnect - updates = broker_subscription.fetch_data() => match updates { - Some(mut all_timeline_updates) => { - match all_timeline_updates.remove(&id) { - Some(subscribed_timeline_updates) => { - match wal_connection_manager.select_connection_candidate(subscribed_timeline_updates) { - Some(candidate) => { - info!("Switching to different safekeeper {} for timeline {id}, reason: {:?}", candidate.safekeeper_id, candidate.reason); - wal_connection_manager.change_connection(candidate.safekeeper_id, candidate.wal_producer_connstr).await; - }, - None => debug!("No connection candidate was selected for timeline"), - } - } - // XXX: If we subscribe for a certain timeline, we expect only its data to come. - // But somebody could propagate a new etcd key, that has the same prefix as the subscribed one, then we'll get odd data. - // This is an error, we don't want to have overlapping prefixes for timelines, but we can complain and thow those away instead of panicking, - // since the next poll might bring the correct data. - None => error!("Timeline has an active broker subscription, but got no updates. Other data length: {}", all_timeline_updates.len()), - } - }, - None => { - info!("Subscription source end was dropped, no more updates are possible, shutting down"); - break; - }, - }, - } - } - - info!("Waiting for the current connection to close"); - wal_connection_manager.close_connection().await; - broker_subscription - .cancel() - .await - .with_context(|| format!("Failed to cancel timeline {id} subscription in etcd"))?; - Ok(ControlFlow::Continue(())) -} - -/// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible. -struct WalConnectionManager { - id: ZTenantTimelineId, - timeline: Arc, - wal_connect_timeout: Duration, - lagging_wal_timeout: Duration, - max_lsn_wal_lag: NonZeroU64, - wal_connection_attempt: u32, - wal_connection_data: Option, -} - -#[derive(Debug)] -struct WalConnectionData { - safekeeper_id: NodeId, - connection: WalReceiverConnection, - connection_init_time: NaiveDateTime, - last_wal_receiver_data: Option<(ReplicationFeedback, NaiveDateTime)>, -} - -#[derive(Debug, PartialEq, Eq)] -struct NewWalConnectionCandidate { - safekeeper_id: NodeId, - wal_producer_connstr: String, - reason: ReconnectReason, -} - -/// Stores the reason why WAL connection was switched, for furter debugging purposes. -#[derive(Debug, PartialEq, Eq)] -enum ReconnectReason { - NoExistingConnection, - LaggingWal { - current_lsn: Lsn, - new_lsn: Lsn, - threshold: NonZeroU64, - }, - NoWalTimeout { - last_wal_interaction: NaiveDateTime, - check_time: NaiveDateTime, - threshold: Duration, - }, -} - -impl WalConnectionManager { - /// Tries to get more data from the WAL connection. - /// If the WAL connection channel is dropped or no data is retrieved, a "timebomb" future is started to break the existing broker subscription. - /// This future is intended to be used in the `select!` loop, so lengthy future normally gets dropped due to other futures completing. - /// If not, it's better to cancel the entire "stuck" loop and start over. - async fn poll_connection_event_or_cancel(&mut self) -> ControlFlow<(), ()> { - let (connection_data, wal_receiver_event) = match self.wal_connection_data.as_mut() { - Some(connection_data) => match connection_data.connection.next_event().await { - Some(event) => (connection_data, event), - None => { - warn!("WAL receiver event source stopped sending messages, waiting for other events to arrive"); - tokio::time::sleep(Duration::from_secs(30)).await; - warn!("WAL receiver without a connection spent sleeping 30s without being interrupted, aborting the loop"); - return ControlFlow::Break(()); - } - }, - None => { - tokio::time::sleep(Duration::from_secs(30)).await; - warn!("WAL receiver without a connection spent sleeping 30s without being interrupted, aborting the loop"); - return ControlFlow::Break(()); - } - }; - - match wal_receiver_event { - WalConnectionEvent::Started => { - self.wal_connection_attempt = 0; - } - WalConnectionEvent::NewWal(new_wal_data) => { - self.wal_connection_attempt = 0; - connection_data.last_wal_receiver_data = - Some((new_wal_data, Utc::now().naive_utc())); - } - WalConnectionEvent::End(wal_receiver_result) => { - match wal_receiver_result { - Ok(()) => { - info!("WAL receiver task finished, reconnecting"); - self.wal_connection_attempt = 0; - } - Err(e) => { - error!("WAL receiver task failed: {e:#}, reconnecting"); - self.wal_connection_attempt += 1; - } - } - self.close_connection().await; - } - } - - ControlFlow::Continue(()) - } - - /// Shuts down current connection (if any), waiting for it to finish. - async fn close_connection(&mut self) { - if let Some(data) = self.wal_connection_data.as_mut() { - match data.connection.shutdown().await { - Err(e) => { - error!("Failed to shutdown wal receiver connection: {e:#}"); - } - Ok(()) => self.wal_connection_data = None, - } - } - } - - /// Shuts down the current connection (if any) and immediately starts another one with the given connection string. - async fn change_connection( - &mut self, - new_safekeeper_id: NodeId, - new_wal_producer_connstr: String, - ) { - self.close_connection().await; - self.wal_connection_data = Some(WalConnectionData { - safekeeper_id: new_safekeeper_id, - connection: WalReceiverConnection::open( - self.id, - new_safekeeper_id, - new_wal_producer_connstr, - self.wal_connect_timeout, - ), - connection_init_time: Utc::now().naive_utc(), - last_wal_receiver_data: None, - }); - } - - /// Checks current state against every fetched safekeeper state of a given timeline. - /// Returns a new candidate, if the current state is somewhat lagging, or `None` otherwise. - /// The current rules for approving new candidates: - /// * pick from the input data from etcd for currently connected safekeeper (if any) - /// * out of the rest input entries, pick one with biggest `commit_lsn` that's after than pageserver's latest Lsn for the timeline - /// * if there's no such entry, no new candidate found, abort - /// * otherwise, check if etcd updates contain currently connected safekeeper - /// * if not, that means no WAL updates happened after certain time (either none since the connection time or none since the last event after the connection) - /// Reconnect if the time exceeds the threshold. - /// * if there's one, compare its Lsn with the other candidate's, reconnect if candidate's over threshold - /// - /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently. - /// Both thresholds are configured per tenant. - fn select_connection_candidate( - &self, - mut safekeeper_timelines: HashMap, - ) -> Option { - let current_sk_data_updated = - self.wal_connection_data - .as_ref() - .and_then(|connection_data| { - safekeeper_timelines.remove(&connection_data.safekeeper_id) - }); - - let candidate_sk_data = safekeeper_timelines - .iter() - .filter(|(_, info)| { - info.commit_lsn > Some(self.timeline.tline.get_last_record_lsn()) - }) - .filter_map(|(sk_id, info)| { - match wal_stream_connection_string( - self.id, - info.safekeeper_connstr.as_deref()?, - ) { - Ok(connstr) => Some((sk_id, info, connstr)), - Err(e) => { - error!("Failed to create wal receiver connection string from broker data of safekeeper node {sk_id}: {e:#}"); - None - } - } - }) - .max_by_key(|(_, info, _)| info.commit_lsn); - - match (current_sk_data_updated, candidate_sk_data) { - // No better candidate than one we're already connected to: - // whatever data update comes for the connected one, we don't have a better candidate - (_, None) => None, - - // No updates from the old SK in this batch, but some candidate is available: - // check how long time ago did we receive updates from the current SK, switch connections in case it's over the threshold - (None, Some((&new_sk_id, _, new_wal_producer_connstr))) => { - match self.wal_connection_data.as_ref() { - Some(current_connection) => { - let last_sk_interaction_time = - match current_connection.last_wal_receiver_data.as_ref() { - Some((_, data_submission_time)) => *data_submission_time, - None => current_connection.connection_init_time, - }; - - let now = Utc::now().naive_utc(); - match (now - last_sk_interaction_time).to_std() { - Ok(last_interaction) => { - if last_interaction > self.lagging_wal_timeout { - return Some(NewWalConnectionCandidate { - safekeeper_id: new_sk_id, - wal_producer_connstr: new_wal_producer_connstr, - reason: ReconnectReason::NoWalTimeout { - last_wal_interaction: last_sk_interaction_time, - check_time: now, - threshold: self.lagging_wal_timeout, - }, - }); - } - } - Err(_e) => { - warn!("Last interaction with safekeeper {} happened in the future, ignoring the candidate. Interaction time: {last_sk_interaction_time}, now: {now}", current_connection.safekeeper_id); - } - } - None - } - None => Some(NewWalConnectionCandidate { - safekeeper_id: new_sk_id, - wal_producer_connstr: new_wal_producer_connstr, - reason: ReconnectReason::NoExistingConnection, - }), - } - } - // Both current SK got updated via etcd and there's another candidate with suitable Lsn: - // check how bigger the new SK Lsn is in the future compared to the current SK, switch connections in case it's over the threshold - ( - Some(current_sk_timeline), - Some((&new_sk_id, new_sk_timeline, new_wal_producer_connstr)), - ) => { - let new_lsn = new_sk_timeline.commit_lsn.unwrap_or(Lsn(0)); - let current_lsn = current_sk_timeline.commit_lsn.unwrap_or(Lsn(0)); - match new_lsn.0.checked_sub(current_lsn.0) - { - Some(new_sk_lsn_advantage) => { - if new_sk_lsn_advantage >= self.max_lsn_wal_lag.get() { - return Some( - NewWalConnectionCandidate { - safekeeper_id: new_sk_id, - wal_producer_connstr: new_wal_producer_connstr, - reason: ReconnectReason::LaggingWal { current_lsn, new_lsn, threshold: self.max_lsn_wal_lag }, - }); - } - } - None => debug!("Best SK candidate has its commit Lsn behind the current timeline's latest consistent Lsn"), - } - - None - } - } - } -} - -fn wal_stream_connection_string( - ZTenantTimelineId { - tenant_id, - timeline_id, - }: ZTenantTimelineId, - listen_pg_addr_str: &str, -) -> anyhow::Result { - let sk_connstr = format!("postgresql://no_user@{listen_pg_addr_str}/no_db"); - let me_conf = sk_connstr - .parse::() - .with_context(|| { - format!("Failed to parse pageserver connection string '{sk_connstr}' as a postgres one") - })?; - let (host, port) = utils::connstring::connection_host_port(&me_conf); - Ok(format!( - "host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'" - )) -} - -#[cfg(test)] -mod tests { - use std::time::SystemTime; - - use crate::repository::{ - repo_harness::{RepoHarness, TIMELINE_ID}, - Repository, - }; - - use super::*; - - #[test] - fn no_connection_no_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("no_connection_no_candidate")?; - let mut data_manager_with_no_connection = dummy_wal_connection_manager(&harness); - data_manager_with_no_connection.wal_connection_data = None; - - let no_candidate = - data_manager_with_no_connection.select_connection_candidate(HashMap::from([ - ( - NodeId(0), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn(1)), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: None, - }, - ), - ( - NodeId(2), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: None, - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some("no commit_lsn".to_string()), - }, - ), - ( - NodeId(3), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: None, - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some("no commit_lsn".to_string()), - }, - ), - ])); - - assert!( - no_candidate.is_none(), - "Expected no candidate selected out of non full data options, but got {no_candidate:?}" - ); - - Ok(()) - } - - #[tokio::test] - async fn connection_no_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("connection_no_candidate")?; - - let current_lsn = 100_000; - let connected_sk_id = NodeId(0); - let mut data_manager_with_connection = dummy_wal_connection_manager(&harness); - let mut dummy_connection_data = dummy_connection_data( - ZTenantTimelineId { - tenant_id: harness.tenant_id, - timeline_id: TIMELINE_ID, - }, - connected_sk_id, - ) - .await; - let now = Utc::now().naive_utc(); - dummy_connection_data.last_wal_receiver_data = Some(( - ReplicationFeedback { - current_timeline_size: 1, - ps_writelsn: 1, - ps_applylsn: current_lsn, - ps_flushlsn: 1, - ps_replytime: SystemTime::now(), - }, - now, - )); - dummy_connection_data.connection_init_time = now; - data_manager_with_connection.wal_connection_data = Some(dummy_connection_data); - - let no_candidate = - data_manager_with_connection.select_connection_candidate(HashMap::from([ - ( - connected_sk_id, - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn( - current_lsn + data_manager_with_connection.max_lsn_wal_lag.get() * 2 - )), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), - }, - ), - ( - NodeId(1), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn(current_lsn)), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some("not advanced Lsn".to_string()), - }, - ), - ( - NodeId(2), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn( - current_lsn + data_manager_with_connection.max_lsn_wal_lag.get() / 2 - )), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some("not enough advanced Lsn".to_string()), - }, - ), - ])); - - assert!( - no_candidate.is_none(), - "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}" - ); - - Ok(()) - } - - #[test] - fn no_connection_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("no_connection_candidate")?; - let mut data_manager_with_no_connection = dummy_wal_connection_manager(&harness); - data_manager_with_no_connection.wal_connection_data = None; - - let only_candidate = data_manager_with_no_connection - .select_connection_candidate(HashMap::from([( - NodeId(0), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn(1 + data_manager_with_no_connection - .max_lsn_wal_lag - .get())), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), - }, - )])) - .expect("Expected one candidate selected out of the only data option, but got none"); - assert_eq!(only_candidate.safekeeper_id, NodeId(0)); - assert_eq!( - only_candidate.reason, - ReconnectReason::NoExistingConnection, - "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" - ); - assert!(only_candidate - .wal_producer_connstr - .contains(DUMMY_SAFEKEEPER_CONNSTR)); - - let selected_lsn = 100_000; - let biggest_wal_candidate = data_manager_with_no_connection - .select_connection_candidate(HashMap::from([ - ( - NodeId(0), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn(selected_lsn - 100)), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some("smaller commit_lsn".to_string()), - }, - ), - ( - NodeId(1), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn(selected_lsn)), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), - }, - ), - ( - NodeId(2), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn(selected_lsn + 100)), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: None, - }, - ), - ])) - .expect( - "Expected one candidate selected out of multiple valid data options, but got none", - ); - - assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1)); - assert_eq!( - biggest_wal_candidate.reason, - ReconnectReason::NoExistingConnection, - "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" - ); - assert!(biggest_wal_candidate - .wal_producer_connstr - .contains(DUMMY_SAFEKEEPER_CONNSTR)); - - Ok(()) - } - - #[tokio::test] - async fn lsn_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("lsn_wal_over_threshcurrent_candidate")?; - let current_lsn = Lsn(100_000).align(); - - let id = ZTenantTimelineId { - tenant_id: harness.tenant_id, - timeline_id: TIMELINE_ID, - }; - - let mut data_manager_with_connection = dummy_wal_connection_manager(&harness); - let connected_sk_id = NodeId(0); - let mut dummy_connection_data = dummy_connection_data(id, connected_sk_id).await; - let lagging_wal_timeout = - chrono::Duration::from_std(data_manager_with_connection.lagging_wal_timeout)?; - let time_over_threshold = - Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout; - dummy_connection_data.last_wal_receiver_data = Some(( - ReplicationFeedback { - current_timeline_size: 1, - ps_writelsn: current_lsn.0, - ps_applylsn: 1, - ps_flushlsn: 1, - ps_replytime: SystemTime::now(), - }, - time_over_threshold, - )); - dummy_connection_data.connection_init_time = time_over_threshold; - data_manager_with_connection.wal_connection_data = Some(dummy_connection_data); - - let new_lsn = Lsn(current_lsn.0 + data_manager_with_connection.max_lsn_wal_lag.get() + 1); - let candidates = HashMap::from([ - ( - connected_sk_id, - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(current_lsn), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), - }, - ), - ( - NodeId(1), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(new_lsn), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()), - }, - ), - ]); - - let over_threshcurrent_candidate = data_manager_with_connection - .select_connection_candidate(candidates) - .expect( - "Expected one candidate selected out of multiple valid data options, but got none", - ); - - assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1)); - assert_eq!( - over_threshcurrent_candidate.reason, - ReconnectReason::LaggingWal { - current_lsn, - new_lsn, - threshold: data_manager_with_connection.max_lsn_wal_lag - }, - "Should select bigger WAL safekeeper if it starts to lag enough" - ); - assert!(over_threshcurrent_candidate - .wal_producer_connstr - .contains("advanced by Lsn safekeeper")); - - Ok(()) - } - - #[tokio::test] - async fn timeout_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("timeout_wal_over_threshhold_current_candidate")?; - let current_lsn = Lsn(100_000).align(); - - let id = ZTenantTimelineId { - tenant_id: harness.tenant_id, - timeline_id: TIMELINE_ID, - }; - - let mut data_manager_with_connection = dummy_wal_connection_manager(&harness); - let mut dummy_connection_data = dummy_connection_data(id, NodeId(1)).await; - let lagging_wal_timeout = - chrono::Duration::from_std(data_manager_with_connection.lagging_wal_timeout)?; - let time_over_threshold = - Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout; - dummy_connection_data.last_wal_receiver_data = None; - dummy_connection_data.connection_init_time = time_over_threshold; - data_manager_with_connection.wal_connection_data = Some(dummy_connection_data); - - let over_threshcurrent_candidate = data_manager_with_connection - .select_connection_candidate(HashMap::from([( - NodeId(0), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(current_lsn), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), - }, - )])) - .expect( - "Expected one candidate selected out of multiple valid data options, but got none", - ); - - assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0)); - match over_threshcurrent_candidate.reason { - ReconnectReason::NoWalTimeout { - last_wal_interaction, - threshold, - .. - } => { - assert_eq!(last_wal_interaction, time_over_threshold); - assert_eq!(threshold, data_manager_with_connection.lagging_wal_timeout); - } - unexpected => panic!("Unexpected reason: {unexpected:?}"), - } - assert!(over_threshcurrent_candidate - .wal_producer_connstr - .contains(DUMMY_SAFEKEEPER_CONNSTR)); - - Ok(()) - } - - fn dummy_wal_connection_manager(harness: &RepoHarness) -> WalConnectionManager { - WalConnectionManager { - id: ZTenantTimelineId { - tenant_id: harness.tenant_id, - timeline_id: TIMELINE_ID, - }, - timeline: Arc::new(DatadirTimelineImpl::new( - harness - .load() - .create_empty_timeline(TIMELINE_ID, Lsn(0)) - .expect("Failed to create an empty timeline for dummy wal connection manager"), - 10_000, - )), - wal_connect_timeout: Duration::from_secs(1), - lagging_wal_timeout: Duration::from_secs(10), - max_lsn_wal_lag: NonZeroU64::new(300_000).unwrap(), - wal_connection_attempt: 0, - wal_connection_data: None, - } - } - - const DUMMY_SAFEKEEPER_CONNSTR: &str = "safekeeper_connstr"; - - // the function itself does not need async, but it spawns a tokio::task underneath hence neeed - // a runtime to not to panic - async fn dummy_connection_data( - id: ZTenantTimelineId, - safekeeper_id: NodeId, - ) -> WalConnectionData { - let dummy_connstr = wal_stream_connection_string(id, DUMMY_SAFEKEEPER_CONNSTR) - .expect("Failed to construct dummy wal producer connstr"); - WalConnectionData { - safekeeper_id, - connection: WalReceiverConnection::open( - id, - safekeeper_id, - dummy_connstr, - Duration::from_secs(1), - ), - connection_init_time: Utc::now().naive_utc(), - last_wal_receiver_data: None, - } - } -} diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs new file mode 100644 index 0000000000..d5ca1d5159 --- /dev/null +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -0,0 +1,1133 @@ +//! WAL receiver logic that ensures the pageserver gets connectected to safekeeper, +//! that contains the latest WAL to stream and this connection does not go stale. +//! +//! To achieve that, a etcd broker is used: safekepers propagate their timelines' state in it, +//! the manager subscribes for changes and accumulates those to query the one with the biggest Lsn for connection. +//! Current connection state is tracked too, to ensure it's not getting stale. +//! +//! After every connection or etcd update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader, +//! then a [re]connection happens, if necessary. +//! Only WAL streaming task expects to be finished, other loops (etcd, connection management) never exit unless cancelled explicitly via the dedicated channel. + +use std::{ + collections::{hash_map, HashMap}, + num::NonZeroU64, + sync::Arc, + time::Duration, +}; + +use anyhow::Context; +use chrono::{DateTime, Local, NaiveDateTime, Utc}; +use etcd_broker::{ + subscription_key::SubscriptionKey, subscription_value::SkTimelineInfo, BrokerSubscription, + BrokerUpdate, Client, +}; +use tokio::select; +use tracing::*; + +use crate::DatadirTimelineImpl; +use utils::{ + lsn::Lsn, + pq_proto::ReplicationFeedback, + zid::{NodeId, ZTenantTimelineId}, +}; + +use super::{TaskEvent, TaskHandle}; + +/// Spawns the loop to take care of the timeline's WAL streaming connection. +pub(super) fn spawn_connection_manager_task( + id: ZTenantTimelineId, + broker_loop_prefix: String, + mut client: Client, + local_timeline: Arc, + wal_connect_timeout: Duration, + lagging_wal_timeout: Duration, + max_lsn_wal_lag: NonZeroU64, +) -> TaskHandle<()> { + TaskHandle::spawn(move |_, mut cancellation| { + async move { + info!("WAL receiver broker started, connecting to etcd"); + let mut walreceiver_state = WalreceiverState::new( + id, + local_timeline, + wal_connect_timeout, + lagging_wal_timeout, + max_lsn_wal_lag, + ); + loop { + select! { + _ = cancellation.changed() => { + info!("Broker subscription init cancelled, shutting down"); + if let Some(wal_connection) = walreceiver_state.wal_connection.take() + { + wal_connection.connection_task.shutdown().await; + } + return Ok(()); + }, + + _ = connection_manager_loop_step( + &broker_loop_prefix, + &mut client, + &mut walreceiver_state, + ) => {}, + } + } + } + .instrument(info_span!("wal_connection_manager", id = %id)) + }) +} + +/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker. +/// Based on the updates, desides whether to start, keep or stop a WAL receiver task. +/// If etcd subscription is cancelled, exits. +async fn connection_manager_loop_step( + broker_prefix: &str, + etcd_client: &mut Client, + walreceiver_state: &mut WalreceiverState, +) { + let id = walreceiver_state.id; + + // XXX: We never explicitly cancel etcd task, instead establishing one and never letting it go, + // running the entire loop step as much as possible to an end. + // The task removal happens implicitly on drop, both aborting the etcd subscription task and dropping the receiver channel end, + // forcing the etcd subscription to exit either way. + let mut broker_subscription = + subscribe_for_timeline_updates(etcd_client, broker_prefix, id).await; + info!("Subscribed for etcd timeline changes, waiting for new etcd data"); + + loop { + select! { + broker_connection_result = &mut broker_subscription.watcher_handle => { + cleanup_broker_connection(broker_connection_result, walreceiver_state); + return; + }, + + Some(wal_connection_update) = async { + match walreceiver_state.wal_connection.as_mut() { + Some(wal_connection) => { + let receiver = &mut wal_connection.connection_task.events_receiver; + Some(match receiver.changed().await { + Ok(()) => receiver.borrow().clone(), + Err(_cancellation_error) => TaskEvent::End(Ok(())), + }) + } + None => None, + } + } => { + let (connection_update, reset_connection_attempts) = match &wal_connection_update { + TaskEvent::Started => (Some(Utc::now().naive_utc()), true), + TaskEvent::NewEvent(replication_feedback) => (Some(DateTime::::from(replication_feedback.ps_replytime).naive_utc()), true), + TaskEvent::End(end_result) => { + let should_reset_connection_attempts = match end_result { + Ok(()) => { + debug!("WAL receiving task finished"); + true + }, + Err(e) => { + warn!("WAL receiving task failed: {e}"); + false + }, + }; + walreceiver_state.wal_connection = None; + (None, should_reset_connection_attempts) + }, + }; + + if let Some(connection_update) = connection_update { + match &mut walreceiver_state.wal_connection { + Some(wal_connection) => { + wal_connection.latest_connection_update = connection_update; + + let attempts_entry = walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(0); + if reset_connection_attempts { + *attempts_entry = 0; + } else { + *attempts_entry += 1; + } + }, + None => error!("Received connection update for WAL connection that is not active, update: {wal_connection_update:?}"), + } + } + }, + + broker_update = broker_subscription.value_updates.recv() => { + match broker_update { + Some(broker_update) => walreceiver_state.register_timeline_update(broker_update), + None => { + info!("Broker sender end was dropped, ending current broker loop step"); + // Ensure to cancel and wait for the broker subscription task end, to log its result. + // Broker sender end is in the broker subscription task and its drop means abnormal task completion. + // First, ensure that the task is stopped (abort can be done without errors on already stopped tasks and repeated multiple times). + broker_subscription.watcher_handle.abort(); + // Then, wait for the task to finish and print its result. If the task was finished before abort (which we assume in this abnormal case), + // a proper error message will be printed, otherwise an abortion message is printed which is ok, since we're signalled to finish anyway. + cleanup_broker_connection( + (&mut broker_subscription.watcher_handle).await, + walreceiver_state, + ); + return; + } + } + }, + } + + // Fetch more etcd timeline updates, but limit ourselves since they may arrive quickly. + let mut max_events_to_poll = 100_u32; + while max_events_to_poll > 0 { + if let Ok(broker_update) = broker_subscription.value_updates.try_recv() { + walreceiver_state.register_timeline_update(broker_update); + max_events_to_poll -= 1; + } else { + break; + } + } + + if let Some(new_candidate) = walreceiver_state.next_connection_candidate() { + info!("Switching to new connection candidate: {new_candidate:?}"); + walreceiver_state + .change_connection( + new_candidate.safekeeper_id, + new_candidate.wal_producer_connstr, + ) + .await + } + } +} + +fn cleanup_broker_connection( + broker_connection_result: Result, tokio::task::JoinError>, + walreceiver_state: &mut WalreceiverState, +) { + match broker_connection_result { + Ok(Ok(())) => info!("Broker conneciton task finished, ending current broker loop step"), + Ok(Err(broker_error)) => warn!("Broker conneciton ended with error: {broker_error}"), + Err(abort_error) => { + if abort_error.is_panic() { + error!("Broker connection panicked: {abort_error}") + } else { + debug!("Broker connection aborted: {abort_error}") + } + } + } + + walreceiver_state.wal_stream_candidates.clear(); +} + +/// Endlessly try to subscribe for broker updates for a given timeline. +/// If there are no safekeepers to maintain the lease, the timeline subscription will be unavailable in the broker and the operation will fail constantly. +/// This is ok, pageservers should anyway try subscribing (with some backoff) since it's the only way they can get the timeline WAL anyway. +async fn subscribe_for_timeline_updates( + etcd_client: &mut Client, + broker_prefix: &str, + id: ZTenantTimelineId, +) -> BrokerSubscription { + let mut attempt = 0; + loop { + exponential_backoff( + attempt, + DEFAULT_BASE_BACKOFF_SECONDS, + DEFAULT_MAX_BACKOFF_SECONDS, + ) + .await; + attempt += 1; + + match etcd_broker::subscribe_for_json_values( + etcd_client, + SubscriptionKey::sk_timeline_info(broker_prefix.to_owned(), id), + ) + .instrument(info_span!("etcd_subscription")) + .await + { + Ok(new_subscription) => { + return new_subscription; + } + Err(e) => { + warn!("Attempt #{attempt}, failed to subscribe for timeline {id} updates in etcd: {e:#}"); + continue; + } + } + } +} + +const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 2.0; +const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 60.0; + +async fn exponential_backoff(n: u32, base: f64, max_seconds: f64) { + if n == 0 { + return; + } + let seconds_to_wait = base.powf(f64::from(n) - 1.0).min(max_seconds); + info!("Backoff: waiting {seconds_to_wait} seconds before proceeding with the task"); + tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await; +} + +/// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible. +struct WalreceiverState { + id: ZTenantTimelineId, + /// Use pageserver data about the timeline to filter out some of the safekeepers. + local_timeline: Arc, + /// The timeout on the connection to safekeeper for WAL streaming. + wal_connect_timeout: Duration, + /// The timeout to use to determine when the current connection is "stale" and reconnect to the other one. + lagging_wal_timeout: Duration, + /// The Lsn lag to use to determine when the current connection is lagging to much behind and reconnect to the other one. + max_lsn_wal_lag: NonZeroU64, + /// Current connection to safekeeper for WAL streaming. + wal_connection: Option, + wal_connection_attempts: HashMap, + /// Data about all timelines, available for connection, fetched from etcd, grouped by their corresponding safekeeper node id. + wal_stream_candidates: HashMap, +} + +/// Current connection data. +#[derive(Debug)] +struct WalConnection { + /// Current safekeeper pageserver is connected to for WAL streaming. + sk_id: NodeId, + /// Connection task start time or the timestamp of a latest connection message received. + latest_connection_update: NaiveDateTime, + /// WAL streaming task handle. + connection_task: TaskHandle, +} + +/// Data about the timeline to connect to, received from etcd. +#[derive(Debug)] +struct EtcdSkTimeline { + timeline: SkTimelineInfo, + /// Etcd generation, the bigger it is, the more up to date the timeline data is. + etcd_version: i64, + /// Time at which the data was fetched from etcd last time, to track the stale data. + latest_update: NaiveDateTime, +} + +impl WalreceiverState { + fn new( + id: ZTenantTimelineId, + local_timeline: Arc, + wal_connect_timeout: Duration, + lagging_wal_timeout: Duration, + max_lsn_wal_lag: NonZeroU64, + ) -> Self { + Self { + id, + local_timeline, + wal_connect_timeout, + lagging_wal_timeout, + max_lsn_wal_lag, + wal_connection: None, + wal_stream_candidates: HashMap::new(), + wal_connection_attempts: HashMap::new(), + } + } + + /// Shuts down the current connection (if any) and immediately starts another one with the given connection string. + async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_producer_connstr: String) { + if let Some(old_connection) = self.wal_connection.take() { + old_connection.connection_task.shutdown().await + } + + let id = self.id; + let connect_timeout = self.wal_connect_timeout; + let connection_attempt = self + .wal_connection_attempts + .get(&new_sk_id) + .copied() + .unwrap_or(0); + let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| { + async move { + exponential_backoff( + connection_attempt, + DEFAULT_BASE_BACKOFF_SECONDS, + DEFAULT_MAX_BACKOFF_SECONDS, + ) + .await; + super::walreceiver_connection::handle_walreceiver_connection( + id, + &new_wal_producer_connstr, + events_sender.as_ref(), + cancellation, + connect_timeout, + ) + .await + .map_err(|e| format!("walreceiver connection handling failure: {e:#}")) + } + .instrument(info_span!("walreceiver_connection", id = %id)) + }); + + self.wal_connection = Some(WalConnection { + sk_id: new_sk_id, + latest_connection_update: Utc::now().naive_utc(), + connection_task: connection_handle, + }); + } + + /// Adds another etcd timeline into the state, if its more recent than the one already added there for the same key. + fn register_timeline_update(&mut self, timeline_update: BrokerUpdate) { + match self + .wal_stream_candidates + .entry(timeline_update.key.node_id) + { + hash_map::Entry::Occupied(mut o) => { + let existing_value = o.get_mut(); + if existing_value.etcd_version < timeline_update.etcd_version { + existing_value.etcd_version = timeline_update.etcd_version; + existing_value.timeline = timeline_update.value; + existing_value.latest_update = Utc::now().naive_utc(); + } + } + hash_map::Entry::Vacant(v) => { + v.insert(EtcdSkTimeline { + timeline: timeline_update.value, + etcd_version: timeline_update.etcd_version, + latest_update: Utc::now().naive_utc(), + }); + } + } + } + + /// Cleans up stale etcd records and checks the rest for the new connection candidate. + /// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise. + /// The current rules for approving new candidates: + /// * pick from the input data from etcd for currently connected safekeeper (if any) + /// * out of the rest input entries, pick one with biggest `commit_lsn` that's after than pageserver's latest Lsn for the timeline + /// * if there's no such entry, no new candidate found, abort + /// * check the current connection time data for staleness, reconnect if stale + /// * otherwise, check if etcd updates contain currently connected safekeeper + /// * if not, that means no WAL updates happened after certain time (either none since the connection time or none since the last event after the connection) + /// Reconnect if the time exceeds the threshold. + /// * if there's one, compare its Lsn with the other candidate's, reconnect if candidate's over threshold + /// + /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently. + /// Both thresholds are configured per tenant. + fn next_connection_candidate(&mut self) -> Option { + self.cleanup_old_candidates(); + + match &self.wal_connection { + Some(existing_wal_connection) => { + let connected_sk_node = existing_wal_connection.sk_id; + + let (new_sk_id, new_safekeeper_etcd_data, new_wal_producer_connstr) = self + .applicable_connection_candidates() + .filter(|&(sk_id, _, _)| sk_id != connected_sk_node) + .max_by_key(|(_, info, _)| info.commit_lsn)?; + + let now = Utc::now().naive_utc(); + if let Ok(latest_interaciton) = + (now - existing_wal_connection.latest_connection_update).to_std() + { + if latest_interaciton > self.lagging_wal_timeout { + return Some(NewWalConnectionCandidate { + safekeeper_id: new_sk_id, + wal_producer_connstr: new_wal_producer_connstr, + reason: ReconnectReason::NoWalTimeout { + last_wal_interaction: Some( + existing_wal_connection.latest_connection_update, + ), + check_time: now, + threshold: self.lagging_wal_timeout, + }, + }); + } + } + + match self.wal_stream_candidates.get(&connected_sk_node) { + Some(current_connection_etcd_data) => { + let new_lsn = new_safekeeper_etcd_data.commit_lsn.unwrap_or(Lsn(0)); + let current_lsn = current_connection_etcd_data + .timeline + .commit_lsn + .unwrap_or(Lsn(0)); + match new_lsn.0.checked_sub(current_lsn.0) + { + Some(new_sk_lsn_advantage) => { + if new_sk_lsn_advantage >= self.max_lsn_wal_lag.get() { + return Some( + NewWalConnectionCandidate { + safekeeper_id: new_sk_id, + wal_producer_connstr: new_wal_producer_connstr, + reason: ReconnectReason::LaggingWal { current_lsn, new_lsn, threshold: self.max_lsn_wal_lag }, + }); + } + } + None => debug!("Best SK candidate has its commit Lsn behind the current timeline's latest consistent Lsn"), + } + } + None => { + return Some(NewWalConnectionCandidate { + safekeeper_id: new_sk_id, + wal_producer_connstr: new_wal_producer_connstr, + reason: ReconnectReason::NoEtcdDataForExistingConnection, + }) + } + } + } + None => { + let (new_sk_id, _, new_wal_producer_connstr) = self + .applicable_connection_candidates() + .max_by_key(|(_, info, _)| info.commit_lsn)?; + return Some(NewWalConnectionCandidate { + safekeeper_id: new_sk_id, + wal_producer_connstr: new_wal_producer_connstr, + reason: ReconnectReason::NoExistingConnection, + }); + } + } + + None + } + + fn applicable_connection_candidates( + &self, + ) -> impl Iterator { + self.wal_stream_candidates + .iter() + .filter(|(_, etcd_info)| { + etcd_info.timeline.commit_lsn > Some(self.local_timeline.get_last_record_lsn()) + }) + .filter_map(|(sk_id, etcd_info)| { + let info = &etcd_info.timeline; + match wal_stream_connection_string( + self.id, + info.safekeeper_connstr.as_deref()?, + ) { + Ok(connstr) => Some((*sk_id, info, connstr)), + Err(e) => { + error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id); + None + } + } + }) + } + + fn cleanup_old_candidates(&mut self) { + self.wal_stream_candidates.retain(|_, etcd_info| { + if let Ok(time_since_latest_etcd_update) = + (Utc::now().naive_utc() - etcd_info.latest_update).to_std() + { + time_since_latest_etcd_update < self.lagging_wal_timeout + } else { + true + } + }); + } +} + +#[derive(Debug, PartialEq, Eq)] +struct NewWalConnectionCandidate { + safekeeper_id: NodeId, + wal_producer_connstr: String, + reason: ReconnectReason, +} + +/// Stores the reason why WAL connection was switched, for furter debugging purposes. +#[derive(Debug, PartialEq, Eq)] +enum ReconnectReason { + NoExistingConnection, + NoEtcdDataForExistingConnection, + LaggingWal { + current_lsn: Lsn, + new_lsn: Lsn, + threshold: NonZeroU64, + }, + NoWalTimeout { + last_wal_interaction: Option, + check_time: NaiveDateTime, + threshold: Duration, + }, +} + +fn wal_stream_connection_string( + ZTenantTimelineId { + tenant_id, + timeline_id, + }: ZTenantTimelineId, + listen_pg_addr_str: &str, +) -> anyhow::Result { + let sk_connstr = format!("postgresql://no_user@{listen_pg_addr_str}/no_db"); + let me_conf = sk_connstr + .parse::() + .with_context(|| { + format!("Failed to parse pageserver connection string '{sk_connstr}' as a postgres one") + })?; + let (host, port) = utils::connstring::connection_host_port(&me_conf); + Ok(format!( + "host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'" + )) +} + +#[cfg(test)] +mod tests { + use std::time::SystemTime; + + use crate::repository::{ + repo_harness::{RepoHarness, TIMELINE_ID}, + Repository, + }; + + use super::*; + + #[test] + fn no_connection_no_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("no_connection_no_candidate")?; + let mut state = dummy_state(&harness); + let now = Utc::now().naive_utc(); + + let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?; + let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout; + + state.wal_connection = None; + state.wal_stream_candidates = HashMap::from([ + ( + NodeId(0), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(1)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: None, + }, + etcd_version: 0, + latest_update: now, + }, + ), + ( + NodeId(1), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: None, + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("no commit_lsn".to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ( + NodeId(2), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: None, + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("no commit_lsn".to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ( + NodeId(3), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(1 + state.max_lsn_wal_lag.get())), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: delay_over_threshold, + }, + ), + ]); + + let no_candidate = state.next_connection_candidate(); + assert!( + no_candidate.is_none(), + "Expected no candidate selected out of non full data options, but got {no_candidate:?}" + ); + + Ok(()) + } + + #[tokio::test] + async fn connection_no_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("connection_no_candidate")?; + let mut state = dummy_state(&harness); + let now = Utc::now().naive_utc(); + + let connected_sk_id = NodeId(0); + let current_lsn = 100_000; + + state.max_lsn_wal_lag = NonZeroU64::new(100).unwrap(); + state.wal_connection = Some(WalConnection { + sk_id: connected_sk_id, + latest_connection_update: now, + connection_task: TaskHandle::spawn(move |sender, _| async move { + sender + .send(TaskEvent::NewEvent(ReplicationFeedback { + current_timeline_size: 1, + ps_writelsn: 1, + ps_applylsn: current_lsn, + ps_flushlsn: 1, + ps_replytime: SystemTime::now(), + })) + .ok(); + Ok(()) + }), + }); + state.wal_stream_candidates = HashMap::from([ + ( + connected_sk_id, + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(current_lsn + state.max_lsn_wal_lag.get() * 2)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ( + NodeId(1), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(current_lsn)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("not advanced Lsn".to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ( + NodeId(2), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(current_lsn + state.max_lsn_wal_lag.get() / 2)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("not enough advanced Lsn".to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ]); + + let no_candidate = state.next_connection_candidate(); + assert!( + no_candidate.is_none(), + "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}" + ); + + Ok(()) + } + + #[test] + fn no_connection_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("no_connection_candidate")?; + let mut state = dummy_state(&harness); + let now = Utc::now().naive_utc(); + + state.wal_connection = None; + state.wal_stream_candidates = HashMap::from([( + NodeId(0), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(1 + state.max_lsn_wal_lag.get())), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + )]); + + let only_candidate = state + .next_connection_candidate() + .expect("Expected one candidate selected out of the only data option, but got none"); + assert_eq!(only_candidate.safekeeper_id, NodeId(0)); + assert_eq!( + only_candidate.reason, + ReconnectReason::NoExistingConnection, + "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" + ); + assert!(only_candidate + .wal_producer_connstr + .contains(DUMMY_SAFEKEEPER_CONNSTR)); + + let selected_lsn = 100_000; + state.wal_stream_candidates = HashMap::from([ + ( + NodeId(0), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(selected_lsn - 100)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("smaller commit_lsn".to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ( + NodeId(1), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(selected_lsn)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ( + NodeId(2), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(selected_lsn + 100)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: None, + }, + etcd_version: 0, + latest_update: now, + }, + ), + ]); + let biggest_wal_candidate = state.next_connection_candidate().expect( + "Expected one candidate selected out of multiple valid data options, but got none", + ); + + assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1)); + assert_eq!( + biggest_wal_candidate.reason, + ReconnectReason::NoExistingConnection, + "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" + ); + assert!(biggest_wal_candidate + .wal_producer_connstr + .contains(DUMMY_SAFEKEEPER_CONNSTR)); + + Ok(()) + } + + #[tokio::test] + async fn connection_no_etcd_data_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("connection_no_etcd_data_candidate")?; + let mut state = dummy_state(&harness); + + let now = Utc::now().naive_utc(); + let current_lsn = Lsn(100_000).align(); + let connected_sk_id = NodeId(0); + let other_sk_id = NodeId(connected_sk_id.0 + 1); + + state.wal_connection = Some(WalConnection { + sk_id: connected_sk_id, + latest_connection_update: now, + connection_task: TaskHandle::spawn(move |sender, _| async move { + sender + .send(TaskEvent::NewEvent(ReplicationFeedback { + current_timeline_size: 1, + ps_writelsn: current_lsn.0, + ps_applylsn: 1, + ps_flushlsn: 1, + ps_replytime: SystemTime::now(), + })) + .ok(); + Ok(()) + }), + }); + state.wal_stream_candidates = HashMap::from([( + other_sk_id, + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(1 + state.max_lsn_wal_lag.get())), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + )]); + + let only_candidate = state + .next_connection_candidate() + .expect("Expected one candidate selected out of the only data option, but got none"); + assert_eq!(only_candidate.safekeeper_id, other_sk_id); + assert_eq!( + only_candidate.reason, + ReconnectReason::NoEtcdDataForExistingConnection, + "Should select new safekeeper due to missing etcd data, even if there's an existing connection with this safekeeper" + ); + assert!(only_candidate + .wal_producer_connstr + .contains(DUMMY_SAFEKEEPER_CONNSTR)); + + Ok(()) + } + + #[tokio::test] + async fn lsn_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("lsn_wal_over_threshcurrent_candidate")?; + let mut state = dummy_state(&harness); + let current_lsn = Lsn(100_000).align(); + let now = Utc::now().naive_utc(); + + let connected_sk_id = NodeId(0); + let new_lsn = Lsn(current_lsn.0 + state.max_lsn_wal_lag.get() + 1); + + state.wal_connection = Some(WalConnection { + sk_id: connected_sk_id, + latest_connection_update: now, + connection_task: TaskHandle::spawn(move |sender, _| async move { + sender + .send(TaskEvent::NewEvent(ReplicationFeedback { + current_timeline_size: 1, + ps_writelsn: current_lsn.0, + ps_applylsn: 1, + ps_flushlsn: 1, + ps_replytime: SystemTime::now(), + })) + .ok(); + Ok(()) + }), + }); + state.wal_stream_candidates = HashMap::from([ + ( + connected_sk_id, + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(current_lsn), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ( + NodeId(1), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(new_lsn), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ]); + + let over_threshcurrent_candidate = state.next_connection_candidate().expect( + "Expected one candidate selected out of multiple valid data options, but got none", + ); + + assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1)); + assert_eq!( + over_threshcurrent_candidate.reason, + ReconnectReason::LaggingWal { + current_lsn, + new_lsn, + threshold: state.max_lsn_wal_lag + }, + "Should select bigger WAL safekeeper if it starts to lag enough" + ); + assert!(over_threshcurrent_candidate + .wal_producer_connstr + .contains("advanced by Lsn safekeeper")); + + Ok(()) + } + + #[tokio::test] + async fn timeout_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("timeout_wal_over_threshhold_current_candidate")?; + let mut state = dummy_state(&harness); + let current_lsn = Lsn(100_000).align(); + let now = Utc::now().naive_utc(); + + let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?; + let time_over_threshold = + Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout; + + state.wal_connection = Some(WalConnection { + sk_id: NodeId(1), + latest_connection_update: time_over_threshold, + connection_task: TaskHandle::spawn(move |sender, _| async move { + sender + .send(TaskEvent::NewEvent(ReplicationFeedback { + current_timeline_size: 1, + ps_writelsn: current_lsn.0, + ps_applylsn: 1, + ps_flushlsn: 1, + ps_replytime: SystemTime::now(), + })) + .ok(); + Ok(()) + }), + }); + state.wal_stream_candidates = HashMap::from([( + NodeId(0), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(current_lsn), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + )]); + + let over_threshcurrent_candidate = state.next_connection_candidate().expect( + "Expected one candidate selected out of multiple valid data options, but got none", + ); + + assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0)); + match over_threshcurrent_candidate.reason { + ReconnectReason::NoWalTimeout { + last_wal_interaction, + threshold, + .. + } => { + assert_eq!(last_wal_interaction, Some(time_over_threshold)); + assert_eq!(threshold, state.lagging_wal_timeout); + } + unexpected => panic!("Unexpected reason: {unexpected:?}"), + } + assert!(over_threshcurrent_candidate + .wal_producer_connstr + .contains(DUMMY_SAFEKEEPER_CONNSTR)); + + Ok(()) + } + + #[tokio::test] + async fn timeout_connection_over_threshhold_current_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("timeout_connection_over_threshhold_current_candidate")?; + let mut state = dummy_state(&harness); + let current_lsn = Lsn(100_000).align(); + let now = Utc::now().naive_utc(); + + let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?; + let time_over_threshold = + Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout; + + state.wal_connection = Some(WalConnection { + sk_id: NodeId(1), + latest_connection_update: time_over_threshold, + connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }), + }); + state.wal_stream_candidates = HashMap::from([( + NodeId(0), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(current_lsn), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + )]); + + let over_threshcurrent_candidate = state.next_connection_candidate().expect( + "Expected one candidate selected out of multiple valid data options, but got none", + ); + + assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0)); + match over_threshcurrent_candidate.reason { + ReconnectReason::NoWalTimeout { + last_wal_interaction, + threshold, + .. + } => { + assert_eq!(last_wal_interaction, Some(time_over_threshold)); + assert_eq!(threshold, state.lagging_wal_timeout); + } + unexpected => panic!("Unexpected reason: {unexpected:?}"), + } + assert!(over_threshcurrent_candidate + .wal_producer_connstr + .contains(DUMMY_SAFEKEEPER_CONNSTR)); + + Ok(()) + } + + const DUMMY_SAFEKEEPER_CONNSTR: &str = "safekeeper_connstr"; + + fn dummy_state(harness: &RepoHarness) -> WalreceiverState { + WalreceiverState { + id: ZTenantTimelineId { + tenant_id: harness.tenant_id, + timeline_id: TIMELINE_ID, + }, + local_timeline: Arc::new(DatadirTimelineImpl::new( + harness + .load() + .create_empty_timeline(TIMELINE_ID, Lsn(0)) + .expect("Failed to create an empty timeline for dummy wal connection manager"), + 10_000, + )), + wal_connect_timeout: Duration::from_secs(1), + lagging_wal_timeout: Duration::from_secs(1), + max_lsn_wal_lag: NonZeroU64::new(1).unwrap(), + wal_connection: None, + wal_stream_candidates: HashMap::new(), + wal_connection_attempts: HashMap::new(), + } + } +} diff --git a/pageserver/src/walreceiver/connection_handler.rs b/pageserver/src/walreceiver/walreceiver_connection.rs similarity index 78% rename from pageserver/src/walreceiver/connection_handler.rs rename to pageserver/src/walreceiver/walreceiver_connection.rs index 97b9b8cc9b..98b36dfe48 100644 --- a/pageserver/src/walreceiver/connection_handler.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -1,5 +1,5 @@ //! Actual Postgres connection handler to stream WAL to the server. -//! Runs as a separate, cancellable Tokio task. + use std::{ str::FromStr, sync::Arc, @@ -10,113 +10,29 @@ use anyhow::{bail, ensure, Context}; use bytes::BytesMut; use fail::fail_point; use postgres::{SimpleQueryMessage, SimpleQueryRow}; -use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; use tokio::{pin, select, sync::watch, time}; use tokio_postgres::{replication::ReplicationStream, Client}; use tokio_stream::StreamExt; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; -use utils::{ - lsn::Lsn, - pq_proto::ReplicationFeedback, - zid::{NodeId, ZTenantTimelineId}, -}; +use super::TaskEvent; use crate::{ http::models::WalReceiverEntry, repository::{Repository, Timeline}, tenant_mgr, walingest::WalIngest, }; +use postgres_ffi::waldecoder::WalStreamDecoder; +use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId}; -#[derive(Debug, Clone)] -pub enum WalConnectionEvent { - Started, - NewWal(ReplicationFeedback), - End(Result<(), String>), -} - -/// A wrapper around standalone Tokio task, to poll its updates or cancel the task. -#[derive(Debug)] -pub struct WalReceiverConnection { - handle: tokio::task::JoinHandle<()>, - cancellation: watch::Sender<()>, - events_receiver: watch::Receiver, -} - -impl WalReceiverConnection { - /// Initializes the connection task, returning a set of handles on top of it. - /// The task is started immediately after the creation, fails if no connection is established during the timeout given. - pub fn open( - id: ZTenantTimelineId, - safekeeper_id: NodeId, - wal_producer_connstr: String, - connect_timeout: Duration, - ) -> Self { - let (cancellation, mut cancellation_receiver) = watch::channel(()); - let (events_sender, events_receiver) = watch::channel(WalConnectionEvent::Started); - - let handle = tokio::spawn( - async move { - let connection_result = handle_walreceiver_connection( - id, - &wal_producer_connstr, - &events_sender, - &mut cancellation_receiver, - connect_timeout, - ) - .await - .map_err(|e| { - format!("Walreceiver connection for id {id} failed with error: {e:#}") - }); - - match &connection_result { - Ok(()) => { - debug!("Walreceiver connection for id {id} ended successfully") - } - Err(e) => warn!("{e}"), - } - events_sender - .send(WalConnectionEvent::End(connection_result)) - .ok(); - } - .instrument(info_span!("safekeeper_handle", sk = %safekeeper_id)), - ); - - Self { - handle, - cancellation, - events_receiver, - } - } - - /// Polls for the next WAL receiver event, if there's any available since the last check. - /// Blocks if there's no new event available, returns `None` if no new events will ever occur. - /// Only the last event is returned, all events received between observatins are lost. - pub async fn next_event(&mut self) -> Option { - match self.events_receiver.changed().await { - Ok(()) => Some(self.events_receiver.borrow().clone()), - Err(_cancellation_error) => None, - } - } - - /// Gracefully aborts current WAL streaming task, waiting for the current WAL streamed. - pub async fn shutdown(&mut self) -> anyhow::Result<()> { - self.cancellation.send(()).ok(); - let handle = &mut self.handle; - handle - .await - .context("Failed to join on a walreceiver connection task")?; - Ok(()) - } -} - -async fn handle_walreceiver_connection( +/// Opens a conneciton to the given wal producer and streams the WAL, sending progress messages during streaming. +pub async fn handle_walreceiver_connection( id: ZTenantTimelineId, wal_producer_connstr: &str, - events_sender: &watch::Sender, - cancellation: &mut watch::Receiver<()>, + events_sender: &watch::Sender>, + mut cancellation: watch::Receiver<()>, connect_timeout: Duration, ) -> anyhow::Result<()> { // Connect to the database in replication mode. @@ -214,8 +130,6 @@ async fn handle_walreceiver_connection( while let Some(replication_message) = { select! { - // check for shutdown first - biased; _ = cancellation.changed() => { info!("walreceiver interrupted"); None @@ -344,7 +258,7 @@ async fn handle_walreceiver_connection( .as_mut() .zenith_status_update(data.len() as u64, &data) .await?; - if let Err(e) = events_sender.send(WalConnectionEvent::NewWal(zenith_status_update)) { + if let Err(e) = events_sender.send(TaskEvent::NewEvent(zenith_status_update)) { warn!("Wal connection event listener dropped, aborting the connection: {e}"); return Ok(()); } diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 169b106aa9..d3f6fb8903 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -221,15 +221,12 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { .await .context("failed to subscribe for safekeeper info")?; loop { - match subscription.fetch_data().await { + match subscription.value_updates.recv().await { Some(new_info) => { - for (zttid, sk_info) in new_info { - // note: there are blocking operations below, but it's considered fine for now - if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) { - for (safekeeper_id, info) in sk_info { - tli.record_safekeeper_info(&info, safekeeper_id).await? - } - } + // note: there are blocking operations below, but it's considered fine for now + if let Ok(tli) = GlobalTimelines::get(&conf, new_info.key.id, false) { + tli.record_safekeeper_info(&new_info.value, new_info.key.node_id) + .await? } } None => {