pass the BrokerClientChannel by value & clone it as necessary

It's a wrapper around an inner Arc anyways

Also, this gets rid of the OnceCell
This commit is contained in:
Christian Schwarz
2023-05-24 12:29:05 +02:00
parent def5eb8542
commit b54431bbd3
8 changed files with 44 additions and 70 deletions

View File

@@ -276,7 +276,16 @@ fn start_pageserver(
// Launch broker client
let broker_client = WALRECEIVER_RUNTIME
.block_on(async { pageserver::broker_client::init_broker_client(conf) })?;
.block_on(async {
// Note: we do not attempt connecting here (but validate endpoints sanity).
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)
})
.with_context(|| {
format!(
"create broker client for uri={:?} keepalive_interval={:?}",
&conf.broker_endpoint, conf.broker_keepalive_interval,
)
})?;
// Initialize authentication for incoming connections
let http_auth;
@@ -328,7 +337,7 @@ fn start_pageserver(
// Scan the local 'tenants/' directory and start loading the tenants
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
broker_client,
broker_client.clone(),
remote_storage.clone(),
))?;
@@ -355,7 +364,7 @@ fn start_pageserver(
conf,
launch_ts,
http_auth,
broker_client,
broker_client.clone(),
remote_storage,
disk_usage_eviction_state,
)?

View File

@@ -1,39 +0,0 @@
//! The broker client instance of the pageserver, created during pageserver startup.
//! Used by each timelines' [`walreceiver`].
use crate::config::PageServerConf;
use anyhow::Context;
use once_cell::sync::OnceCell;
use storage_broker::BrokerClientChannel;
use tracing::*;
static BROKER_CLIENT: OnceCell<BrokerClientChannel> = OnceCell::new();
///
/// Initialize the broker client. This must be called once at page server startup.
///
pub fn init_broker_client(
conf: &'static PageServerConf,
) -> anyhow::Result<&'static BrokerClientChannel> {
let broker_endpoint = conf.broker_endpoint.clone();
// Note: we do not attempt connecting here (but validate endpoints sanity).
let broker_client =
storage_broker::connect(broker_endpoint.clone(), conf.broker_keepalive_interval).context(
format!(
"Failed to create broker client to {}",
&conf.broker_endpoint
),
)?;
if BROKER_CLIENT.set(broker_client).is_err() {
panic!("broker already initialized");
}
info!(
"Initialized broker client with endpoints: {}",
broker_endpoint
);
Ok(BROKER_CLIENT.get().unwrap())
}

View File

@@ -51,7 +51,7 @@ struct State {
auth: Option<Arc<JwtAuth>>,
allowlist_routes: Vec<Uri>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: &'static storage_broker::BrokerClientChannel,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
}
@@ -60,7 +60,7 @@ impl State {
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: &'static storage_broker::BrokerClientChannel,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
@@ -283,7 +283,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
request_data.ancestor_timeline_id.map(TimelineId::from),
request_data.ancestor_start_lsn,
request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
state.broker_client,
state.broker_client.clone(),
&ctx,
)
.await {
@@ -411,7 +411,7 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
// XXX: Attach should provide the config, especially during tenant migration.
// See https://github.com/neondatabase/neon/issues/1555
TenantConfOpt::default(),
state.broker_client,
state.broker_client.clone(),
remote_storage.clone(),
&ctx,
)
@@ -464,7 +464,7 @@ async fn tenant_load_handler(request: Request<Body>) -> Result<Response<Body>, A
mgr::load_tenant(
state.conf,
tenant_id,
state.broker_client,
state.broker_client.clone(),
state.remote_storage.clone(),
&ctx,
)
@@ -754,7 +754,7 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
state.conf,
tenant_conf,
target_tenant_id,
state.broker_client,
state.broker_client.clone(),
state.remote_storage.clone(),
&ctx,
)
@@ -1110,7 +1110,7 @@ pub fn make_router(
conf: &'static PageServerConf,
launch_ts: &'static LaunchTimestamp,
auth: Option<Arc<JwtAuth>>,
broker_client: &'static BrokerClientChannel,
broker_client: BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {

View File

@@ -1,6 +1,5 @@
mod auth;
pub mod basebackup;
pub mod broker_client;
pub mod config;
pub mod consumption_metrics;
pub mod context;

View File

@@ -172,7 +172,7 @@ async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()
///
pub async fn libpq_listener_main(
conf: &'static PageServerConf,
broker_client: &'static storage_broker::BrokerClientChannel,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
listener: TcpListener,
auth_type: AuthType,
@@ -216,7 +216,7 @@ pub async fn libpq_listener_main(
false,
page_service_conn_main(
conf,
broker_client,
broker_client.clone(),
local_auth,
socket,
auth_type,
@@ -238,7 +238,7 @@ pub async fn libpq_listener_main(
async fn page_service_conn_main(
conf: &'static PageServerConf,
broker_client: &'static storage_broker::BrokerClientChannel,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
socket: tokio::net::TcpStream,
auth_type: AuthType,
@@ -333,7 +333,7 @@ impl PageRequestMetrics {
struct PageServerHandler {
_conf: &'static PageServerConf,
broker_client: &'static storage_broker::BrokerClientChannel,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
claims: Option<Claims>,
@@ -347,7 +347,7 @@ struct PageServerHandler {
impl PageServerHandler {
pub fn new(
conf: &'static PageServerConf,
broker_client: &'static storage_broker::BrokerClientChannel,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
connection_ctx: RequestContext,
) -> Self {
@@ -506,7 +506,12 @@ impl PageServerHandler {
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
timeline
.import_basebackup_from_tar(&mut copyin_reader, base_lsn, self.broker_client, &ctx)
.import_basebackup_from_tar(
&mut copyin_reader,
base_lsn,
self.broker_client.clone(),
&ctx,
)
.await?;
// Read the end of the tar archive.

View File

@@ -239,7 +239,7 @@ impl UninitializedTimeline<'_> {
self,
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
broker_client: &'static storage_broker::BrokerClientChannel,
broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let raw_timeline = self.raw_timeline()?;
@@ -587,7 +587,7 @@ impl Tenant {
pub(crate) fn spawn_attach(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: &'static storage_broker::BrokerClientChannel,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: GenericRemoteStorage,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
@@ -857,7 +857,7 @@ impl Tenant {
pub fn spawn_load(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: &'static storage_broker::BrokerClientChannel,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> Arc<Tenant> {
@@ -1238,7 +1238,7 @@ impl Tenant {
ancestor_timeline_id: Option<TimelineId>,
mut ancestor_start_lsn: Option<Lsn>,
pg_version: u32,
broker_client: &'static storage_broker::BrokerClientChannel,
broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<Option<Arc<Timeline>>> {
anyhow::ensure!(
@@ -1612,7 +1612,7 @@ impl Tenant {
/// Changes tenant status to active, unless shutdown was already requested.
fn activate(
&self,
broker_client: &'static BrokerClientChannel,
broker_client: BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
@@ -1654,7 +1654,7 @@ impl Tenant {
for timeline in not_broken_timelines {
match timeline
.activate(broker_client, ctx)
.activate(broker_client.clone(), ctx)
.context("timeline activation for activating tenant")
{
Ok(()) => {

View File

@@ -61,7 +61,7 @@ static TENANTS: Lazy<RwLock<TenantsMap>> = Lazy::new(|| RwLock::new(TenantsMap::
#[instrument(skip_all)]
pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
broker_client: &'static storage_broker::BrokerClientChannel,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
@@ -117,7 +117,7 @@ pub async fn init_tenant_mgr(
match schedule_local_tenant_processing(
conf,
&tenant_dir_path,
broker_client,
broker_client.clone(),
remote_storage.clone(),
&ctx,
) {
@@ -152,7 +152,7 @@ pub async fn init_tenant_mgr(
pub fn schedule_local_tenant_processing(
conf: &'static PageServerConf,
tenant_path: &Path,
broker_client: &'static storage_broker::BrokerClientChannel,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
@@ -278,7 +278,7 @@ pub async fn create_tenant(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
broker_client: &'static storage_broker::BrokerClientChannel,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> Result<Arc<Tenant>, TenantMapInsertError> {
@@ -408,7 +408,7 @@ pub async fn detach_tenant(
pub async fn load_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: &'static storage_broker::BrokerClientChannel,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
@@ -477,7 +477,7 @@ pub async fn attach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
tenant_conf: TenantConfOpt,
broker_client: &'static storage_broker::BrokerClientChannel,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: GenericRemoteStorage,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {

View File

@@ -908,10 +908,10 @@ impl Timeline {
pub fn activate(
self: &Arc<Self>,
broker_client: &'static BrokerClientChannel,
broker_client: BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.launch_wal_receiver(ctx, (*broker_client).clone())?;
self.launch_wal_receiver(ctx, broker_client)?;
self.set_state(TimelineState::Active);
self.launch_eviction_task();
Ok(())