diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 09378c2736..9da3a519a2 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -276,7 +276,16 @@ fn start_pageserver( // Launch broker client let broker_client = WALRECEIVER_RUNTIME - .block_on(async { pageserver::broker_client::init_broker_client(conf) })?; + .block_on(async { + // Note: we do not attempt connecting here (but validate endpoints sanity). + storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval) + }) + .with_context(|| { + format!( + "create broker client for uri={:?} keepalive_interval={:?}", + &conf.broker_endpoint, conf.broker_keepalive_interval, + ) + })?; // Initialize authentication for incoming connections let http_auth; @@ -328,7 +337,7 @@ fn start_pageserver( // Scan the local 'tenants/' directory and start loading the tenants BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr( conf, - broker_client, + broker_client.clone(), remote_storage.clone(), ))?; @@ -355,7 +364,7 @@ fn start_pageserver( conf, launch_ts, http_auth, - broker_client, + broker_client.clone(), remote_storage, disk_usage_eviction_state, )? diff --git a/pageserver/src/broker_client.rs b/pageserver/src/broker_client.rs deleted file mode 100644 index d895b9d7ed..0000000000 --- a/pageserver/src/broker_client.rs +++ /dev/null @@ -1,39 +0,0 @@ -//! The broker client instance of the pageserver, created during pageserver startup. -//! Used by each timelines' [`walreceiver`]. - -use crate::config::PageServerConf; - -use anyhow::Context; -use once_cell::sync::OnceCell; -use storage_broker::BrokerClientChannel; -use tracing::*; - -static BROKER_CLIENT: OnceCell = OnceCell::new(); - -/// -/// Initialize the broker client. This must be called once at page server startup. -/// -pub fn init_broker_client( - conf: &'static PageServerConf, -) -> anyhow::Result<&'static BrokerClientChannel> { - let broker_endpoint = conf.broker_endpoint.clone(); - - // Note: we do not attempt connecting here (but validate endpoints sanity). - let broker_client = - storage_broker::connect(broker_endpoint.clone(), conf.broker_keepalive_interval).context( - format!( - "Failed to create broker client to {}", - &conf.broker_endpoint - ), - )?; - - if BROKER_CLIENT.set(broker_client).is_err() { - panic!("broker already initialized"); - } - - info!( - "Initialized broker client with endpoints: {}", - broker_endpoint - ); - Ok(BROKER_CLIENT.get().unwrap()) -} diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 55cc46f15d..88616fb515 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -51,7 +51,7 @@ struct State { auth: Option>, allowlist_routes: Vec, remote_storage: Option, - broker_client: &'static storage_broker::BrokerClientChannel, + broker_client: storage_broker::BrokerClientChannel, disk_usage_eviction_state: Arc, } @@ -60,7 +60,7 @@ impl State { conf: &'static PageServerConf, auth: Option>, remote_storage: Option, - broker_client: &'static storage_broker::BrokerClientChannel, + broker_client: storage_broker::BrokerClientChannel, disk_usage_eviction_state: Arc, ) -> anyhow::Result { let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"] @@ -283,7 +283,7 @@ async fn timeline_create_handler(mut request: Request) -> Result) -> Result, // XXX: Attach should provide the config, especially during tenant migration. // See https://github.com/neondatabase/neon/issues/1555 TenantConfOpt::default(), - state.broker_client, + state.broker_client.clone(), remote_storage.clone(), &ctx, ) @@ -464,7 +464,7 @@ async fn tenant_load_handler(request: Request) -> Result, A mgr::load_tenant( state.conf, tenant_id, - state.broker_client, + state.broker_client.clone(), state.remote_storage.clone(), &ctx, ) @@ -754,7 +754,7 @@ async fn tenant_create_handler(mut request: Request) -> Result>, - broker_client: &'static BrokerClientChannel, + broker_client: BrokerClientChannel, remote_storage: Option, disk_usage_eviction_state: Arc, ) -> anyhow::Result> { diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 04863886cb..4349f0e2ea 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -1,6 +1,5 @@ mod auth; pub mod basebackup; -pub mod broker_client; pub mod config; pub mod consumption_metrics; pub mod context; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 0a25011cee..886e0b2c35 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -172,7 +172,7 @@ async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<() /// pub async fn libpq_listener_main( conf: &'static PageServerConf, - broker_client: &'static storage_broker::BrokerClientChannel, + broker_client: storage_broker::BrokerClientChannel, auth: Option>, listener: TcpListener, auth_type: AuthType, @@ -216,7 +216,7 @@ pub async fn libpq_listener_main( false, page_service_conn_main( conf, - broker_client, + broker_client.clone(), local_auth, socket, auth_type, @@ -238,7 +238,7 @@ pub async fn libpq_listener_main( async fn page_service_conn_main( conf: &'static PageServerConf, - broker_client: &'static storage_broker::BrokerClientChannel, + broker_client: storage_broker::BrokerClientChannel, auth: Option>, socket: tokio::net::TcpStream, auth_type: AuthType, @@ -333,7 +333,7 @@ impl PageRequestMetrics { struct PageServerHandler { _conf: &'static PageServerConf, - broker_client: &'static storage_broker::BrokerClientChannel, + broker_client: storage_broker::BrokerClientChannel, auth: Option>, claims: Option, @@ -347,7 +347,7 @@ struct PageServerHandler { impl PageServerHandler { pub fn new( conf: &'static PageServerConf, - broker_client: &'static storage_broker::BrokerClientChannel, + broker_client: storage_broker::BrokerClientChannel, auth: Option>, connection_ctx: RequestContext, ) -> Self { @@ -506,7 +506,12 @@ impl PageServerHandler { let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb))); timeline - .import_basebackup_from_tar(&mut copyin_reader, base_lsn, self.broker_client, &ctx) + .import_basebackup_from_tar( + &mut copyin_reader, + base_lsn, + self.broker_client.clone(), + &ctx, + ) .await?; // Read the end of the tar archive. diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 95a428611a..dde81f51cb 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -239,7 +239,7 @@ impl UninitializedTimeline<'_> { self, copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin), base_lsn: Lsn, - broker_client: &'static storage_broker::BrokerClientChannel, + broker_client: storage_broker::BrokerClientChannel, ctx: &RequestContext, ) -> anyhow::Result> { let raw_timeline = self.raw_timeline()?; @@ -587,7 +587,7 @@ impl Tenant { pub(crate) fn spawn_attach( conf: &'static PageServerConf, tenant_id: TenantId, - broker_client: &'static storage_broker::BrokerClientChannel, + broker_client: storage_broker::BrokerClientChannel, remote_storage: GenericRemoteStorage, ctx: &RequestContext, ) -> anyhow::Result> { @@ -857,7 +857,7 @@ impl Tenant { pub fn spawn_load( conf: &'static PageServerConf, tenant_id: TenantId, - broker_client: &'static storage_broker::BrokerClientChannel, + broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, ctx: &RequestContext, ) -> Arc { @@ -1238,7 +1238,7 @@ impl Tenant { ancestor_timeline_id: Option, mut ancestor_start_lsn: Option, pg_version: u32, - broker_client: &'static storage_broker::BrokerClientChannel, + broker_client: storage_broker::BrokerClientChannel, ctx: &RequestContext, ) -> anyhow::Result>> { anyhow::ensure!( @@ -1612,7 +1612,7 @@ impl Tenant { /// Changes tenant status to active, unless shutdown was already requested. fn activate( &self, - broker_client: &'static BrokerClientChannel, + broker_client: BrokerClientChannel, ctx: &RequestContext, ) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); @@ -1654,7 +1654,7 @@ impl Tenant { for timeline in not_broken_timelines { match timeline - .activate(broker_client, ctx) + .activate(broker_client.clone(), ctx) .context("timeline activation for activating tenant") { Ok(()) => { diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 28e6b349e2..2eeca189f0 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -61,7 +61,7 @@ static TENANTS: Lazy> = Lazy::new(|| RwLock::new(TenantsMap:: #[instrument(skip_all)] pub async fn init_tenant_mgr( conf: &'static PageServerConf, - broker_client: &'static storage_broker::BrokerClientChannel, + broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, ) -> anyhow::Result<()> { // Scan local filesystem for attached tenants @@ -117,7 +117,7 @@ pub async fn init_tenant_mgr( match schedule_local_tenant_processing( conf, &tenant_dir_path, - broker_client, + broker_client.clone(), remote_storage.clone(), &ctx, ) { @@ -152,7 +152,7 @@ pub async fn init_tenant_mgr( pub fn schedule_local_tenant_processing( conf: &'static PageServerConf, tenant_path: &Path, - broker_client: &'static storage_broker::BrokerClientChannel, + broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, ctx: &RequestContext, ) -> anyhow::Result> { @@ -278,7 +278,7 @@ pub async fn create_tenant( conf: &'static PageServerConf, tenant_conf: TenantConfOpt, tenant_id: TenantId, - broker_client: &'static storage_broker::BrokerClientChannel, + broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, ctx: &RequestContext, ) -> Result, TenantMapInsertError> { @@ -408,7 +408,7 @@ pub async fn detach_tenant( pub async fn load_tenant( conf: &'static PageServerConf, tenant_id: TenantId, - broker_client: &'static storage_broker::BrokerClientChannel, + broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { @@ -477,7 +477,7 @@ pub async fn attach_tenant( conf: &'static PageServerConf, tenant_id: TenantId, tenant_conf: TenantConfOpt, - broker_client: &'static storage_broker::BrokerClientChannel, + broker_client: storage_broker::BrokerClientChannel, remote_storage: GenericRemoteStorage, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index de4e17b7fd..9b449812ac 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -908,10 +908,10 @@ impl Timeline { pub fn activate( self: &Arc, - broker_client: &'static BrokerClientChannel, + broker_client: BrokerClientChannel, ctx: &RequestContext, ) -> anyhow::Result<()> { - self.launch_wal_receiver(ctx, (*broker_client).clone())?; + self.launch_wal_receiver(ctx, broker_client)?; self.set_state(TimelineState::Active); self.launch_eviction_task(); Ok(())