From e5617021a7b08000733c122fa487c22fde85c026 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 25 May 2023 15:47:42 +0200 Subject: [PATCH] refactor: eliminate global storage_broker client state (#4318) (This is prep work to make `Timeline::activate` infallible.) This patch removes the global storage_broker client instance from the pageserver codebase. Instead, pageserver startup instantiates it and passes it down to the `Timeline::activate` function, which in turn passes it to the WalReceiver, which is the entity that actually uses it. Patch series: - #4316 - #4317 - #4318 - #4319 --- pageserver/src/bin/pageserver.rs | 26 +++++++++++++---- pageserver/src/broker_client.rs | 48 ------------------------------- pageserver/src/http/routes.rs | 32 +++++++++++++++++---- pageserver/src/lib.rs | 1 - pageserver/src/page_service.rs | 23 +++++++++++++-- pageserver/src/tenant.rs | 21 ++++++++++---- pageserver/src/tenant/mgr.rs | 18 ++++++++---- pageserver/src/tenant/timeline.rs | 16 ++++------- storage_broker/src/lib.rs | 3 ++ 9 files changed, 104 insertions(+), 84 deletions(-) delete mode 100644 pageserver/src/broker_client.rs diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index d843b01ed7..d9d3d9d662 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -9,6 +9,7 @@ use clap::{Arg, ArgAction, Command}; use fail::FailScenario; use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp}; use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task}; +use pageserver::task_mgr::WALRECEIVER_RUNTIME; use remote_storage::GenericRemoteStorage; use tracing::*; @@ -18,9 +19,7 @@ use pageserver::{ context::{DownloadBehavior, RequestContext}, http, page_cache, page_service, task_mgr, task_mgr::TaskKind, - task_mgr::{ - BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME, - }, + task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME}, tenant::mgr, virtual_file, }; @@ -276,7 +275,18 @@ fn start_pageserver( let pageserver_listener = tcp_listener::bind(pg_addr)?; // Launch broker client - WALRECEIVER_RUNTIME.block_on(pageserver::broker_client::init_broker_client(conf))?; + // The storage_broker::connect call needs to happen inside a tokio runtime thread. + let broker_client = WALRECEIVER_RUNTIME + .block_on(async { + // Note: we do not attempt connecting here (but validate endpoints sanity). + storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval) + }) + .with_context(|| { + format!( + "create broker client for uri={:?} keepalive_interval={:?}", + &conf.broker_endpoint, conf.broker_keepalive_interval, + ) + })?; // Initialize authentication for incoming connections let http_auth; @@ -326,7 +336,11 @@ fn start_pageserver( let remote_storage = create_remote_storage_client(conf)?; // Scan the local 'tenants/' directory and start loading the tenants - BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(conf, remote_storage.clone()))?; + BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr( + conf, + broker_client.clone(), + remote_storage.clone(), + ))?; // shared state between the disk-usage backed eviction background task and the http endpoint // that allows triggering disk-usage based eviction manually. note that the http endpoint @@ -351,6 +365,7 @@ fn start_pageserver( conf, launch_ts, http_auth, + broker_client.clone(), remote_storage, disk_usage_eviction_state, )? @@ -427,6 +442,7 @@ fn start_pageserver( async move { page_service::libpq_listener_main( conf, + broker_client, pg_auth, pageserver_listener, conf.pg_auth_type, diff --git a/pageserver/src/broker_client.rs b/pageserver/src/broker_client.rs deleted file mode 100644 index 6c92967ca3..0000000000 --- a/pageserver/src/broker_client.rs +++ /dev/null @@ -1,48 +0,0 @@ -//! The broker client instance of the pageserver, created during pageserver startup. -//! Used by each timelines' [`walreceiver`]. - -use crate::config::PageServerConf; - -use anyhow::Context; -use once_cell::sync::OnceCell; -use storage_broker::BrokerClientChannel; -use tracing::*; - -static BROKER_CLIENT: OnceCell = OnceCell::new(); - -/// -/// Initialize the broker client. This must be called once at page server startup. -/// -pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result<()> { - let broker_endpoint = conf.broker_endpoint.clone(); - - // Note: we do not attempt connecting here (but validate endpoints sanity). - let broker_client = - storage_broker::connect(broker_endpoint.clone(), conf.broker_keepalive_interval).context( - format!( - "Failed to create broker client to {}", - &conf.broker_endpoint - ), - )?; - - if BROKER_CLIENT.set(broker_client).is_err() { - panic!("broker already initialized"); - } - - info!( - "Initialized broker client with endpoints: {}", - broker_endpoint - ); - Ok(()) -} - -/// -/// Get a handle to the broker client -/// -pub fn get_broker_client() -> &'static BrokerClientChannel { - BROKER_CLIENT.get().expect("broker client not initialized") -} - -pub fn is_broker_client_initialized() -> bool { - BROKER_CLIENT.get().is_some() -} diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 1ca3fdb54a..25e0d88e70 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -7,6 +7,7 @@ use hyper::{Body, Request, Response, Uri}; use metrics::launch_timestamp::LaunchTimestamp; use pageserver_api::models::{DownloadRemoteLayersTaskSpawnRequest, TenantAttachRequest}; use remote_storage::GenericRemoteStorage; +use storage_broker::BrokerClientChannel; use tenant_size_model::{SizeResult, StorageModel}; use tokio_util::sync::CancellationToken; use tracing::*; @@ -53,6 +54,7 @@ struct State { auth: Option>, allowlist_routes: Vec, remote_storage: Option, + broker_client: storage_broker::BrokerClientChannel, disk_usage_eviction_state: Arc, } @@ -61,6 +63,7 @@ impl State { conf: &'static PageServerConf, auth: Option>, remote_storage: Option, + broker_client: storage_broker::BrokerClientChannel, disk_usage_eviction_state: Arc, ) -> anyhow::Result { let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"] @@ -72,6 +75,7 @@ impl State { auth, allowlist_routes, remote_storage, + broker_client, disk_usage_eviction_state, }) } @@ -303,6 +307,8 @@ async fn timeline_create_handler(mut request: Request) -> Result) -> Result) -> Result) -> Result, A let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); let state = get_state(&request); - mgr::load_tenant(state.conf, tenant_id, state.remote_storage.clone(), &ctx) - .instrument(info_span!("load", tenant = %tenant_id)) - .await?; + mgr::load_tenant( + state.conf, + tenant_id, + state.broker_client.clone(), + state.remote_storage.clone(), + &ctx, + ) + .instrument(info_span!("load", tenant = %tenant_id)) + .await?; json_response(StatusCode::ACCEPTED, ()) } @@ -775,6 +789,7 @@ async fn tenant_create_handler(mut request: Request) -> Result>, + broker_client: BrokerClientChannel, remote_storage: Option, disk_usage_eviction_state: Arc, ) -> anyhow::Result> { @@ -1176,8 +1192,14 @@ pub fn make_router( Ok(router .data(Arc::new( - State::new(conf, auth, remote_storage, disk_usage_eviction_state) - .context("Failed to initialize router state")?, + State::new( + conf, + auth, + remote_storage, + broker_client, + disk_usage_eviction_state, + ) + .context("Failed to initialize router state")?, )) .get("/v1/status", |r| RequestSpan(status_handler).handle(r)) .put( diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 04863886cb..4349f0e2ea 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -1,6 +1,5 @@ mod auth; pub mod basebackup; -pub mod broker_client; pub mod config; pub mod consumption_metrics; pub mod context; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index fd442783f9..9e9285a009 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -174,6 +174,7 @@ async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<() /// pub async fn libpq_listener_main( conf: &'static PageServerConf, + broker_client: storage_broker::BrokerClientChannel, auth: Option>, listener: TcpListener, auth_type: AuthType, @@ -215,7 +216,14 @@ pub async fn libpq_listener_main( None, "serving compute connection task", false, - page_service_conn_main(conf, local_auth, socket, auth_type, connection_ctx), + page_service_conn_main( + conf, + broker_client.clone(), + local_auth, + socket, + auth_type, + connection_ctx, + ), ); } Err(err) => { @@ -232,6 +240,7 @@ pub async fn libpq_listener_main( async fn page_service_conn_main( conf: &'static PageServerConf, + broker_client: storage_broker::BrokerClientChannel, auth: Option>, socket: tokio::net::TcpStream, auth_type: AuthType, @@ -268,7 +277,7 @@ async fn page_service_conn_main( // and create a child per-query context when it invokes process_query. // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler // and create the per-query context in process_query ourselves. - let mut conn_handler = PageServerHandler::new(conf, auth, connection_ctx); + let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx); let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; match pgbackend @@ -326,6 +335,7 @@ impl PageRequestMetrics { struct PageServerHandler { _conf: &'static PageServerConf, + broker_client: storage_broker::BrokerClientChannel, auth: Option>, claims: Option, @@ -339,11 +349,13 @@ struct PageServerHandler { impl PageServerHandler { pub fn new( conf: &'static PageServerConf, + broker_client: storage_broker::BrokerClientChannel, auth: Option>, connection_ctx: RequestContext, ) -> Self { PageServerHandler { _conf: conf, + broker_client, auth, claims: None, connection_ctx, @@ -496,7 +508,12 @@ impl PageServerHandler { let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb))); timeline - .import_basebackup_from_tar(&mut copyin_reader, base_lsn, &ctx) + .import_basebackup_from_tar( + &mut copyin_reader, + base_lsn, + self.broker_client.clone(), + &ctx, + ) .await?; // Read the end of the tar archive. diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 6806b2c99d..e247fbf423 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -16,6 +16,7 @@ use futures::FutureExt; use pageserver_api::models::TimelineState; use remote_storage::DownloadError; use remote_storage::GenericRemoteStorage; +use storage_broker::BrokerClientChannel; use tokio::sync::watch; use tokio::task::JoinSet; use tracing::*; @@ -238,6 +239,7 @@ impl UninitializedTimeline<'_> { self, copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin), base_lsn: Lsn, + broker_client: storage_broker::BrokerClientChannel, ctx: &RequestContext, ) -> anyhow::Result> { let raw_timeline = self.raw_timeline()?; @@ -264,7 +266,7 @@ impl UninitializedTimeline<'_> { // updated it for the layers that we created during the import. let mut timelines = self.owning_tenant.timelines.lock().unwrap(); let tl = self.initialize_with_lock(ctx, &mut timelines, false)?; - tl.activate(ctx)?; + tl.activate(broker_client, ctx)?; Ok(tl) } @@ -613,6 +615,7 @@ impl Tenant { pub(crate) fn spawn_attach( conf: &'static PageServerConf, tenant_id: TenantId, + broker_client: storage_broker::BrokerClientChannel, remote_storage: GenericRemoteStorage, ctx: &RequestContext, ) -> anyhow::Result> { @@ -644,7 +647,7 @@ impl Tenant { async move { let doit = async { tenant_clone.attach(&ctx).await?; - tenant_clone.activate(&ctx)?; + tenant_clone.activate(broker_client, &ctx)?; anyhow::Ok(()) }; match doit.await { @@ -882,6 +885,7 @@ impl Tenant { pub fn spawn_load( conf: &'static PageServerConf, tenant_id: TenantId, + broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, ctx: &RequestContext, ) -> Arc { @@ -918,7 +922,7 @@ impl Tenant { async move { let doit = async { tenant_clone.load(&ctx).await?; - tenant_clone.activate(&ctx)?; + tenant_clone.activate(broker_client, &ctx)?; anyhow::Ok(()) }; match doit.await { @@ -1262,6 +1266,7 @@ impl Tenant { ancestor_timeline_id: Option, mut ancestor_start_lsn: Option, pg_version: u32, + broker_client: storage_broker::BrokerClientChannel, ctx: &RequestContext, ) -> anyhow::Result>> { anyhow::ensure!( @@ -1328,7 +1333,7 @@ impl Tenant { } }; - loaded_timeline.activate(ctx).context("activate timeline")?; + loaded_timeline.activate(broker_client, ctx)?; if let Some(remote_client) = loaded_timeline.remote_client.as_ref() { // Wait for the upload of the 'index_part.json` file to finish, so that when we return @@ -1633,7 +1638,11 @@ impl Tenant { } /// Changes tenant status to active, unless shutdown was already requested. - fn activate(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { + fn activate( + self: &Arc, + broker_client: BrokerClientChannel, + ctx: &RequestContext, + ) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); let mut result = Ok(()); @@ -1673,7 +1682,7 @@ impl Tenant { for timeline in not_broken_timelines { match timeline - .activate(ctx) + .activate(broker_client.clone(), ctx) .context("timeline activation for activating tenant") { Ok(()) => { diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index fa9769b0f8..dbb9577bf0 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -58,9 +58,10 @@ static TENANTS: Lazy> = Lazy::new(|| RwLock::new(TenantsMap:: /// Initialize repositories with locally available timelines. /// Timelines that are only partially available locally (remote storage has more data than this pageserver) /// are scheduled for download and added to the tenant once download is completed. -#[instrument(skip(conf, remote_storage))] +#[instrument(skip_all)] pub async fn init_tenant_mgr( conf: &'static PageServerConf, + broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, ) -> anyhow::Result<()> { // Scan local filesystem for attached tenants @@ -116,6 +117,7 @@ pub async fn init_tenant_mgr( match schedule_local_tenant_processing( conf, &tenant_dir_path, + broker_client.clone(), remote_storage.clone(), &ctx, ) { @@ -150,6 +152,7 @@ pub async fn init_tenant_mgr( pub fn schedule_local_tenant_processing( conf: &'static PageServerConf, tenant_path: &Path, + broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, ctx: &RequestContext, ) -> anyhow::Result> { @@ -186,7 +189,7 @@ pub fn schedule_local_tenant_processing( let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() { info!("tenant {tenant_id} has attaching mark file, resuming its attach operation"); if let Some(remote_storage) = remote_storage { - match Tenant::spawn_attach(conf, tenant_id, remote_storage, ctx) { + match Tenant::spawn_attach(conf, tenant_id, broker_client, remote_storage, ctx) { Ok(tenant) => tenant, Err(e) => { error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}"); @@ -204,7 +207,7 @@ pub fn schedule_local_tenant_processing( } else { info!("tenant {tenant_id} is assumed to be loadable, starting load operation"); // Start loading the tenant into memory. It will initially be in Loading state. - Tenant::spawn_load(conf, tenant_id, remote_storage, ctx) + Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, ctx) }; Ok(tenant) } @@ -275,6 +278,7 @@ pub async fn create_tenant( conf: &'static PageServerConf, tenant_conf: TenantConfOpt, tenant_id: TenantId, + broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, ctx: &RequestContext, ) -> Result, TenantMapInsertError> { @@ -287,7 +291,7 @@ pub async fn create_tenant( // See https://github.com/neondatabase/neon/issues/4233 let created_tenant = - schedule_local_tenant_processing(conf, &tenant_directory, remote_storage, ctx)?; + schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233 @@ -421,6 +425,7 @@ pub async fn detach_tenant( pub async fn load_tenant( conf: &'static PageServerConf, tenant_id: TenantId, + broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { @@ -432,7 +437,7 @@ pub async fn load_tenant( .with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?; } - let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, remote_storage, ctx) + let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, ctx) .with_context(|| { format!("Failed to schedule tenant processing in path {tenant_path:?}") })?; @@ -489,6 +494,7 @@ pub async fn attach_tenant( conf: &'static PageServerConf, tenant_id: TenantId, tenant_conf: TenantConfOpt, + broker_client: storage_broker::BrokerClientChannel, remote_storage: GenericRemoteStorage, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { @@ -504,7 +510,7 @@ pub async fn attach_tenant( .context("check for attach marker file existence")?; anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file"); - let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, Some(remote_storage), ctx)?; + let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233 diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 3c951c1188..9b449812ac 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -31,7 +31,6 @@ use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; -use crate::broker_client::{get_broker_client, is_broker_client_initialized}; use crate::context::{DownloadBehavior, RequestContext}; use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata}; use crate::tenant::storage_layer::{ @@ -907,15 +906,12 @@ impl Timeline { Ok(()) } - pub fn activate(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { - if is_broker_client_initialized() { - self.launch_wal_receiver(ctx, get_broker_client().clone())?; - } else if cfg!(test) { - info!("not launching WAL receiver because broker client hasn't been initialized"); - } else { - anyhow::bail!("broker client not initialized"); - } - + pub fn activate( + self: &Arc, + broker_client: BrokerClientChannel, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + self.launch_wal_receiver(ctx, broker_client)?; self.set_state(TimelineState::Active); self.launch_eviction_task(); Ok(()) diff --git a/storage_broker/src/lib.rs b/storage_broker/src/lib.rs index 8441aaf625..4bc561449d 100644 --- a/storage_broker/src/lib.rs +++ b/storage_broker/src/lib.rs @@ -40,6 +40,9 @@ pub type BrokerClientChannel = BrokerServiceClient; // Create connection object configured to run TLS if schema starts with https:// // and plain text otherwise. Connection is lazy, only endpoint sanity is // validated here. +// +// NB: this function is not async, but still must be run on a tokio runtime thread +// because that's a requirement of tonic_endpoint.connect_lazy()'s Channel::new call. pub fn connect(endpoint: U, keepalive_interval: Duration) -> anyhow::Result where U: std::convert::TryInto,