From 99399c112a737c035632410e847fac8fcd737ead Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 24 Jan 2023 19:00:18 +0100 Subject: [PATCH] move walreceiver module under timeline Walreceiver is a per-timeline abstraction. Move it there to reflect the hierarchy of abstractions and task_mgr tasks. The code that sets up the global storage_broker client is not timeline-scoped. So, break it out into a separate module. The motivation for this change is to prepare the code base for replacing the task_mgr global task registry with a more ownership-oriented approach to manage task lifetimes. I removed TaskStateUpdate::Init because, after doing the changes, rustc warned that it was never constructed. A quick search through the commit history shows that this has always been true since commit fb68d01449edb4be9a0d064d69a442dd3688783e Author: Dmitry Rodionov Date: Mon Sep 26 23:57:02 2022 +0300 Preserve task result in TaskHandle by keeping join handle around (#2521) So, the warning is not an indication of some accidental code removal. This is PR: https://github.com/neondatabase/neon/pull/3456 --- pageserver/src/bin/pageserver.rs | 2 +- pageserver/src/broker_client.rs | 48 +++++++++++++++++++ pageserver/src/lib.rs | 2 +- pageserver/src/tenant/timeline.rs | 5 +- .../src/{ => tenant/timeline}/walreceiver.rs | 44 ----------------- .../walreceiver/connection_manager.rs | 12 ++--- .../walreceiver/walreceiver_connection.rs | 3 +- 7 files changed, 61 insertions(+), 55 deletions(-) create mode 100644 pageserver/src/broker_client.rs rename pageserver/src/{ => tenant/timeline}/walreceiver.rs (83%) rename pageserver/src/{ => tenant/timeline}/walreceiver/connection_manager.rs (99%) rename pageserver/src/{ => tenant/timeline}/walreceiver/walreceiver_connection.rs (99%) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 52b0c7f2be..f2cd93bd3a 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -250,7 +250,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> { let signals = signals::install_shutdown_handlers()?; // Launch broker client - WALRECEIVER_RUNTIME.block_on(pageserver::walreceiver::init_broker_client(conf))?; + WALRECEIVER_RUNTIME.block_on(pageserver::broker_client::init_broker_client(conf))?; // Initialize authentication for incoming connections let auth = match &conf.auth_type { diff --git a/pageserver/src/broker_client.rs b/pageserver/src/broker_client.rs new file mode 100644 index 0000000000..6c92967ca3 --- /dev/null +++ b/pageserver/src/broker_client.rs @@ -0,0 +1,48 @@ +//! The broker client instance of the pageserver, created during pageserver startup. +//! Used by each timelines' [`walreceiver`]. + +use crate::config::PageServerConf; + +use anyhow::Context; +use once_cell::sync::OnceCell; +use storage_broker::BrokerClientChannel; +use tracing::*; + +static BROKER_CLIENT: OnceCell = OnceCell::new(); + +/// +/// Initialize the broker client. This must be called once at page server startup. +/// +pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result<()> { + let broker_endpoint = conf.broker_endpoint.clone(); + + // Note: we do not attempt connecting here (but validate endpoints sanity). + let broker_client = + storage_broker::connect(broker_endpoint.clone(), conf.broker_keepalive_interval).context( + format!( + "Failed to create broker client to {}", + &conf.broker_endpoint + ), + )?; + + if BROKER_CLIENT.set(broker_client).is_err() { + panic!("broker already initialized"); + } + + info!( + "Initialized broker client with endpoints: {}", + broker_endpoint + ); + Ok(()) +} + +/// +/// Get a handle to the broker client +/// +pub fn get_broker_client() -> &'static BrokerClientChannel { + BROKER_CLIENT.get().expect("broker client not initialized") +} + +pub fn is_broker_client_initialized() -> bool { + BROKER_CLIENT.get().is_some() +} diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 3ea2165da9..09e21ae755 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -1,5 +1,6 @@ mod auth; pub mod basebackup; +pub mod broker_client; pub mod config; pub mod consumption_metrics; pub mod context; @@ -16,7 +17,6 @@ pub mod tenant; pub mod trace; pub mod virtual_file; pub mod walingest; -pub mod walreceiver; pub mod walrecord; pub mod walredo; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 134c8eed3c..c473a80f32 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1,5 +1,7 @@ //! +mod walreceiver; + use anyhow::{anyhow, bail, ensure, Context}; use bytes::Bytes; use fail::fail_point; @@ -23,6 +25,7 @@ use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; +use crate::broker_client::is_broker_client_initialized; use crate::context::{DownloadBehavior, RequestContext}; use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata}; use crate::tenant::storage_layer::{ @@ -59,11 +62,11 @@ use crate::page_cache; use crate::repository::GcResult; use crate::repository::{Key, Value}; use crate::task_mgr::TaskKind; -use crate::walreceiver::{is_broker_client_initialized, spawn_connection_manager_task}; use crate::walredo::WalRedoManager; use crate::METADATA_FILE_NAME; use crate::ZERO_PAGE; use crate::{is_temporary, task_mgr}; +use walreceiver::spawn_connection_manager_task; use super::remote_timeline_client::index::IndexPart; use super::remote_timeline_client::RemoteTimelineClient; diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs similarity index 83% rename from pageserver/src/walreceiver.rs rename to pageserver/src/tenant/timeline/walreceiver.rs index fc9daadc5c..f33a12c5cc 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -23,58 +23,15 @@ mod connection_manager; mod walreceiver_connection; -use crate::config::PageServerConf; use crate::task_mgr::WALRECEIVER_RUNTIME; -use anyhow::Context; -use once_cell::sync::OnceCell; use std::future::Future; -use storage_broker::BrokerClientChannel; use tokio::sync::watch; use tokio_util::sync::CancellationToken; use tracing::*; pub use connection_manager::spawn_connection_manager_task; -static BROKER_CLIENT: OnceCell = OnceCell::new(); - -/// -/// Initialize the broker client. This must be called once at page server startup. -/// -pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result<()> { - let broker_endpoint = conf.broker_endpoint.clone(); - - // Note: we do not attempt connecting here (but validate endpoints sanity). - let broker_client = - storage_broker::connect(broker_endpoint.clone(), conf.broker_keepalive_interval).context( - format!( - "Failed to create broker client to {}", - &conf.broker_endpoint - ), - )?; - - if BROKER_CLIENT.set(broker_client).is_err() { - panic!("broker already initialized"); - } - - info!( - "Initialized broker client with endpoints: {}", - broker_endpoint - ); - Ok(()) -} - -/// -/// Get a handle to the broker client -/// -pub fn get_broker_client() -> &'static BrokerClientChannel { - BROKER_CLIENT.get().expect("broker client not initialized") -} - -pub fn is_broker_client_initialized() -> bool { - BROKER_CLIENT.get().is_some() -} - /// A handle of an asynchronous task. /// The task has a channel that it can use to communicate its lifecycle events in a certain form, see [`TaskEvent`] /// and a cancellation token that it can listen to for earlier interrupts. @@ -95,7 +52,6 @@ pub enum TaskEvent { #[derive(Debug, Clone)] pub enum TaskStateUpdate { - Init, Started, Progress(E), } diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs similarity index 99% rename from pageserver/src/walreceiver/connection_manager.rs rename to pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 3af408c61b..cd7c7c51d2 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -11,11 +11,12 @@ use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration}; +use super::TaskStateUpdate; +use crate::broker_client::get_broker_client; use crate::context::RequestContext; -use crate::task_mgr::TaskKind; use crate::task_mgr::WALRECEIVER_RUNTIME; +use crate::task_mgr::{self, TaskKind}; use crate::tenant::Timeline; -use crate::{task_mgr, walreceiver::TaskStateUpdate}; use anyhow::Context; use chrono::{NaiveDateTime, Utc}; use pageserver_api::models::TimelineState; @@ -28,10 +29,7 @@ use storage_broker::Streaming; use tokio::{select, sync::watch}; use tracing::*; -use crate::{ - exponential_backoff, walreceiver::get_broker_client, DEFAULT_BASE_BACKOFF_SECONDS, - DEFAULT_MAX_BACKOFF_SECONDS, -}; +use crate::{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS}; use postgres_connection::{parse_host_port, PgConnectionConfig}; use utils::{ id::{NodeId, TenantTimelineId}, @@ -149,7 +147,7 @@ async fn connection_manager_loop_step( let wal_connection = walreceiver_state.wal_connection.as_mut() .expect("Should have a connection, as checked by the corresponding select! guard"); match wal_connection_update { - TaskEvent::Update(TaskStateUpdate::Init | TaskStateUpdate::Started) => {}, + TaskEvent::Update(TaskStateUpdate::Started) => {}, TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => { if new_status.has_processed_wal { // We have advanced last_record_lsn by processing the WAL received diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs similarity index 99% rename from pageserver/src/walreceiver/walreceiver_connection.rs rename to pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 1836195fd4..7e06c398af 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -22,8 +22,9 @@ use tokio_postgres::{replication::ReplicationStream, Client}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn}; +use super::TaskStateUpdate; use crate::context::RequestContext; -use crate::{metrics::LIVE_CONNECTIONS_COUNT, walreceiver::TaskStateUpdate}; +use crate::metrics::LIVE_CONNECTIONS_COUNT; use crate::{ task_mgr, task_mgr::TaskKind,