//! WAL receiver manages an open connection to safekeeper, to get the WAL it streams into. //! To do so, a current implementation needs to do the following: //! //! * acknowledge the timelines that it needs to stream WAL into. //! Pageserver is able to dynamically (un)load tenants on attach and detach, //! hence WAL receiver needs to react on such events. //! //! * get a broker subscription, stream data from it to determine that a timeline needs WAL streaming. //! For that, it watches specific keys in etcd broker and pulls the relevant data periodically. //! The data is produced by safekeepers, that push it periodically and pull it to synchronize between each other. //! Without this data, no WAL streaming is possible currently. //! //! Only one active WAL streaming connection is allowed at a time. //! The connection is supposed to be updated periodically, based on safekeeper timeline data. //! //! * handle the actual connection and WAL streaming //! //! Handling happens dynamically, by portions of WAL being processed and registered in the server. //! Along with the registration, certain metadata is written to show WAL streaming progress and rely on that when considering safekeepers for connection. //! //! The current module contains high-level primitives used in the submodules; general synchronization, timeline acknowledgement and shutdown logic. mod connection_manager; mod walreceiver_connection; use crate::config::PageServerConf; use crate::task_mgr::WALRECEIVER_RUNTIME; use anyhow::{ensure, Context}; use etcd_broker::Client; use itertools::Itertools; use once_cell::sync::OnceCell; use std::future::Future; use tokio::sync::watch; use tracing::*; use url::Url; pub use connection_manager::spawn_connection_manager_task; static ETCD_CLIENT: OnceCell = OnceCell::new(); /// /// Initialize the etcd client. This must be called once at page server startup. /// pub async fn init_etcd_client(conf: &'static PageServerConf) -> anyhow::Result<()> { let etcd_endpoints = conf.broker_endpoints.clone(); ensure!( !etcd_endpoints.is_empty(), "Cannot start wal receiver: etcd endpoints are empty" ); let etcd_client = Client::connect(etcd_endpoints.clone(), None) .await .context("Failed to connect to etcd")?; // FIXME: Should we still allow the pageserver to start, if etcd // doesn't work? It could still serve GetPage requests, with the // data it has locally and from what it can download from remote // storage if ETCD_CLIENT.set(etcd_client).is_err() { panic!("etcd already initialized"); } info!( "Initialized etcd client with endpoints: {}", etcd_endpoints.iter().map(Url::to_string).join(", ") ); Ok(()) } /// /// Get a handle to the etcd client /// pub fn get_etcd_client() -> &'static etcd_broker::Client { ETCD_CLIENT.get().expect("etcd client not initialized") } pub fn is_etcd_client_initialized() -> bool { ETCD_CLIENT.get().is_some() } /// A handle of an asynchronous task. /// The task has a channel that it can use to communicate its lifecycle events in a certain form, see [`TaskEvent`] /// and a cancellation channel that it can listen to for earlier interrupts. /// /// Note that the communication happens via the `watch` channel, that does not accumulate the events, replacing the old one with the never one on submission. /// That may lead to certain events not being observed by the listener. #[derive(Debug)] pub struct TaskHandle { join_handle: Option>>, events_receiver: watch::Receiver>, cancellation: watch::Sender<()>, } pub enum TaskEvent { Update(TaskStateUpdate), End(anyhow::Result<()>), } #[derive(Debug, Clone)] pub enum TaskStateUpdate { Init, Started, Progress(E), } impl TaskHandle { /// Initializes the task, starting it immediately after the creation. pub fn spawn( task: impl FnOnce(watch::Sender>, watch::Receiver<()>) -> Fut + Send + 'static, ) -> Self where Fut: Future> + Send, E: Send + Sync + 'static, { let (cancellation, cancellation_receiver) = watch::channel(()); let (events_sender, events_receiver) = watch::channel(TaskStateUpdate::Started); let join_handle = WALRECEIVER_RUNTIME.spawn(async move { events_sender.send(TaskStateUpdate::Started).ok(); task(events_sender, cancellation_receiver).await }); TaskHandle { join_handle: Some(join_handle), events_receiver, cancellation, } } async fn next_task_event(&mut self) -> TaskEvent { match self.events_receiver.changed().await { Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()), Err(_task_channel_part_dropped) => { TaskEvent::End(match self.join_handle.take() { Some(jh) => { if !jh.is_finished() { warn!("sender is dropped while join handle is still alive"); } jh.await .map_err(|e| anyhow::anyhow!("Failed to join task: {e}")) .and_then(|x| x) } None => { // Another option is to have an enum, join handle or result and give away the reference to it Err(anyhow::anyhow!("Task was joined more than once")) } }) } } } /// Aborts current task, waiting for it to finish. pub async fn shutdown(self) { if let Some(jh) = self.join_handle { self.cancellation.send(()).ok(); match jh.await { Ok(Ok(())) => debug!("Shutdown success"), Ok(Err(e)) => error!("Shutdown task error: {e:?}"), Err(join_error) => { if join_error.is_cancelled() { error!("Shutdown task was cancelled"); } else { error!("Shutdown task join error: {join_error}") } } } } } }