refactor: eliminate global storage_broker client state

(This is prep work to make `Timeline::activate` infallible.)

This patch removes the global storage_broker client instance from the
pageserver codebase.

Instead, pageserver startup instantiates it and passes it down to
the `Timeline::activate` function, which in turn passes it to
the WalReceiver, which is the entity that actually uses it.
This commit is contained in:
Christian Schwarz
2023-05-23 18:22:01 +02:00
parent d5337e6a65
commit 17b081d294
7 changed files with 101 additions and 55 deletions

View File

@@ -18,9 +18,7 @@ use pageserver::{
context::{DownloadBehavior, RequestContext},
http, page_cache, page_service, task_mgr,
task_mgr::TaskKind,
task_mgr::{
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
},
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
tenant::mgr,
virtual_file,
};
@@ -276,7 +274,7 @@ fn start_pageserver(
let pageserver_listener = tcp_listener::bind(pg_addr)?;
// Launch broker client
WALRECEIVER_RUNTIME.block_on(pageserver::broker_client::init_broker_client(conf))?;
let broker_client = pageserver::broker_client::init_broker_client(conf)?;
// Initialize authentication for incoming connections
let http_auth;
@@ -326,7 +324,11 @@ fn start_pageserver(
let remote_storage = create_remote_storage_client(conf)?;
// Scan the local 'tenants/' directory and start loading the tenants
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(conf, remote_storage.clone()))?;
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
broker_client,
remote_storage.clone(),
))?;
// shared state between the disk-usage backed eviction background task and the http endpoint
// that allows triggering disk-usage based eviction manually. note that the http endpoint
@@ -351,6 +353,7 @@ fn start_pageserver(
conf,
launch_ts,
http_auth,
broker_client,
remote_storage,
disk_usage_eviction_state,
)?
@@ -427,6 +430,7 @@ fn start_pageserver(
async move {
page_service::libpq_listener_main(
conf,
broker_client,
pg_auth,
pageserver_listener,
conf.pg_auth_type,

View File

@@ -13,7 +13,9 @@ static BROKER_CLIENT: OnceCell<BrokerClientChannel> = OnceCell::new();
///
/// Initialize the broker client. This must be called once at page server startup.
///
pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result<()> {
pub fn init_broker_client(
conf: &'static PageServerConf,
) -> anyhow::Result<&'static BrokerClientChannel> {
let broker_endpoint = conf.broker_endpoint.clone();
// Note: we do not attempt connecting here (but validate endpoints sanity).
@@ -33,16 +35,5 @@ pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result
"Initialized broker client with endpoints: {}",
broker_endpoint
);
Ok(())
}
///
/// Get a handle to the broker client
///
pub fn get_broker_client() -> &'static BrokerClientChannel {
BROKER_CLIENT.get().expect("broker client not initialized")
}
pub fn is_broker_client_initialized() -> bool {
BROKER_CLIENT.get().is_some()
Ok(BROKER_CLIENT.get().unwrap())
}

View File

@@ -7,6 +7,7 @@ use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use remote_storage::GenericRemoteStorage;
use storage_broker::BrokerClientChannel;
use tenant_size_model::{SizeResult, StorageModel};
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -50,6 +51,7 @@ struct State {
auth: Option<Arc<JwtAuth>>,
allowlist_routes: Vec<Uri>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: &'static storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
}
@@ -58,6 +60,7 @@ impl State {
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: &'static storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
@@ -69,6 +72,7 @@ impl State {
auth,
allowlist_routes,
remote_storage,
broker_client,
disk_usage_eviction_state,
})
}
@@ -270,6 +274,8 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
let state = get_state(&request);
async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
match tenant.create_timeline(
@@ -277,6 +283,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
request_data.ancestor_timeline_id.map(TimelineId::from),
request_data.ancestor_start_lsn,
request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
state.broker_client,
&ctx,
)
.await {
@@ -404,6 +411,7 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
// XXX: Attach should provide the config, especially during tenant migration.
// See https://github.com/neondatabase/neon/issues/1555
TenantConfOpt::default(),
state.broker_client,
remote_storage.clone(),
&ctx,
)
@@ -453,9 +461,15 @@ async fn tenant_load_handler(request: Request<Body>) -> Result<Response<Body>, A
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request);
mgr::load_tenant(state.conf, tenant_id, state.remote_storage.clone(), &ctx)
.instrument(info_span!("load", tenant = %tenant_id))
.await?;
mgr::load_tenant(
state.conf,
tenant_id,
state.broker_client,
state.remote_storage.clone(),
&ctx,
)
.instrument(info_span!("load", tenant = %tenant_id))
.await?;
json_response(StatusCode::ACCEPTED, ())
}
@@ -740,6 +754,7 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
state.conf,
tenant_conf,
target_tenant_id,
state.broker_client,
state.remote_storage.clone(),
&ctx,
)
@@ -1095,6 +1110,7 @@ pub fn make_router(
conf: &'static PageServerConf,
launch_ts: &'static LaunchTimestamp,
auth: Option<Arc<JwtAuth>>,
broker_client: &'static BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
@@ -1141,8 +1157,14 @@ pub fn make_router(
Ok(router
.data(Arc::new(
State::new(conf, auth, remote_storage, disk_usage_eviction_state)
.context("Failed to initialize router state")?,
State::new(
conf,
auth,
remote_storage,
broker_client,
disk_usage_eviction_state,
)
.context("Failed to initialize router state")?,
))
.get("/v1/status", |r| RequestSpan(status_handler).handle(r))
.put(

View File

@@ -172,6 +172,7 @@ async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()
///
pub async fn libpq_listener_main(
conf: &'static PageServerConf,
broker_client: &'static storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
listener: TcpListener,
auth_type: AuthType,
@@ -213,7 +214,14 @@ pub async fn libpq_listener_main(
None,
"serving compute connection task",
false,
page_service_conn_main(conf, local_auth, socket, auth_type, connection_ctx),
page_service_conn_main(
conf,
broker_client,
local_auth,
socket,
auth_type,
connection_ctx,
),
);
}
Err(err) => {
@@ -230,6 +238,7 @@ pub async fn libpq_listener_main(
async fn page_service_conn_main(
conf: &'static PageServerConf,
broker_client: &'static storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
socket: tokio::net::TcpStream,
auth_type: AuthType,
@@ -266,7 +275,7 @@ async fn page_service_conn_main(
// and create a child per-query context when it invokes process_query.
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler = PageServerHandler::new(conf, auth, connection_ctx);
let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
match pgbackend
@@ -324,6 +333,7 @@ impl PageRequestMetrics {
struct PageServerHandler {
_conf: &'static PageServerConf,
broker_client: &'static storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
claims: Option<Claims>,
@@ -337,11 +347,13 @@ struct PageServerHandler {
impl PageServerHandler {
pub fn new(
conf: &'static PageServerConf,
broker_client: &'static storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
connection_ctx: RequestContext,
) -> Self {
PageServerHandler {
_conf: conf,
broker_client,
auth,
claims: None,
connection_ctx,
@@ -494,7 +506,7 @@ impl PageServerHandler {
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
timeline
.import_basebackup_from_tar(&mut copyin_reader, base_lsn, &ctx)
.import_basebackup_from_tar(&mut copyin_reader, base_lsn, self.broker_client, &ctx)
.await?;
// Read the end of the tar archive.

View File

@@ -16,6 +16,7 @@ use futures::FutureExt;
use pageserver_api::models::TimelineState;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use storage_broker::BrokerClientChannel;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tracing::*;
@@ -238,6 +239,7 @@ impl UninitializedTimeline<'_> {
self,
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
broker_client: &'static storage_broker::BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let raw_timeline = self.raw_timeline()?;
@@ -264,7 +266,7 @@ impl UninitializedTimeline<'_> {
// updated it for the layers that we created during the import.
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
let tl = self.initialize_with_lock(ctx, &mut timelines, false)?;
tl.activate(ctx)?;
tl.activate(broker_client, ctx)?;
Ok(tl)
}
@@ -585,6 +587,7 @@ impl Tenant {
pub(crate) fn spawn_attach(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: &'static storage_broker::BrokerClientChannel,
remote_storage: GenericRemoteStorage,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
@@ -614,10 +617,13 @@ impl Tenant {
"attach tenant",
false,
async move {
match tenant_clone.attach(&ctx).await {
Ok(_) => {
tenant_clone.activate(&ctx).unwrap(); // WIP
}
let doit = async {
tenant_clone.attach(&ctx).await?;
tenant_clone.activate(broker_client, &ctx)?;
anyhow::Ok(())
};
match doit.await {
Ok(_) => {}
Err(e) => {
tenant_clone.set_broken(e.to_string());
error!("error attaching tenant: {:?}", e);
@@ -845,6 +851,7 @@ impl Tenant {
pub fn spawn_load(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: &'static storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> Arc<Tenant> {
@@ -879,10 +886,13 @@ impl Tenant {
"initial tenant load",
false,
async move {
match tenant_clone.load(&ctx).await {
Ok(()) => {
tenant_clone.activate(&ctx).unwrap(); // WIP
}
let doit = async {
tenant_clone.load(&ctx).await?;
tenant_clone.activate(broker_client, &ctx)?;
anyhow::Ok(())
};
match doit.await {
Ok(()) => {}
Err(err) => {
tenant_clone.set_broken(err.to_string());
error!("could not load tenant {tenant_id}: {err:?}");
@@ -902,7 +912,7 @@ impl Tenant {
/// Background task to load in-memory data structures for this tenant, from
/// files on disk. Used at pageserver startup.
///
#[instrument(skip(self, ctx), fields(tenant_id=%self.tenant_id))]
#[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
async fn load(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
info!("loading tenant task");
@@ -1216,6 +1226,7 @@ impl Tenant {
ancestor_timeline_id: Option<TimelineId>,
mut ancestor_start_lsn: Option<Lsn>,
pg_version: u32,
broker_client: &'static storage_broker::BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<Option<Arc<Timeline>>> {
anyhow::ensure!(
@@ -1282,7 +1293,7 @@ impl Tenant {
}
};
loaded_timeline.activate(ctx).context("activate timeline")?;
loaded_timeline.activate(broker_client, ctx)?;
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
@@ -1587,7 +1598,11 @@ impl Tenant {
}
/// Changes tenant status to active, unless shutdown was already requested.
fn activate(&self, ctx: &RequestContext) -> anyhow::Result<()> {
fn activate(
&self,
broker_client: &'static BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
let mut result = Ok(());
@@ -1627,7 +1642,7 @@ impl Tenant {
for timeline in not_broken_timelines {
match timeline
.activate(ctx)
.activate(broker_client, ctx)
.context("timeline activation for activating tenant")
{
Ok(()) => {

View File

@@ -58,9 +58,10 @@ static TENANTS: Lazy<RwLock<TenantsMap>> = Lazy::new(|| RwLock::new(TenantsMap::
/// Initialize repositories with locally available timelines.
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
/// are scheduled for download and added to the tenant once download is completed.
#[instrument(skip(conf, remote_storage))]
#[instrument(skip_all)]
pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
broker_client: &'static storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
@@ -116,6 +117,7 @@ pub async fn init_tenant_mgr(
match schedule_local_tenant_processing(
conf,
&tenant_dir_path,
broker_client,
remote_storage.clone(),
&ctx,
) {
@@ -150,6 +152,7 @@ pub async fn init_tenant_mgr(
pub fn schedule_local_tenant_processing(
conf: &'static PageServerConf,
tenant_path: &Path,
broker_client: &'static storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
@@ -186,7 +189,7 @@ pub fn schedule_local_tenant_processing(
let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() {
info!("tenant {tenant_id} has attaching mark file, resuming its attach operation");
if let Some(remote_storage) = remote_storage {
match Tenant::spawn_attach(conf, tenant_id, remote_storage, ctx) {
match Tenant::spawn_attach(conf, tenant_id, broker_client, remote_storage, ctx) {
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
@@ -204,7 +207,7 @@ pub fn schedule_local_tenant_processing(
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(conf, tenant_id, remote_storage, ctx)
Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, ctx)
};
Ok(tenant)
}
@@ -275,6 +278,7 @@ pub async fn create_tenant(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
broker_client: &'static storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> Result<Arc<Tenant>, TenantMapInsertError> {
@@ -287,7 +291,7 @@ pub async fn create_tenant(
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, remote_storage, ctx)?;
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -404,6 +408,7 @@ pub async fn detach_tenant(
pub async fn load_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: &'static storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
@@ -415,7 +420,7 @@ pub async fn load_tenant(
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
}
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, remote_storage, ctx)
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -472,6 +477,7 @@ pub async fn attach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
tenant_conf: TenantConfOpt,
broker_client: &'static storage_broker::BrokerClientChannel,
remote_storage: GenericRemoteStorage,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
@@ -487,7 +493,7 @@ pub async fn attach_tenant(
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, Some(remote_storage), ctx)?;
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233

View File

@@ -32,7 +32,6 @@ use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::broker_client::{get_broker_client, is_broker_client_initialized};
use crate::context::{DownloadBehavior, RequestContext};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
@@ -897,15 +896,12 @@ impl Timeline {
Ok(())
}
pub fn activate(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
if is_broker_client_initialized() {
self.launch_wal_receiver(ctx, get_broker_client().clone())?;
} else if cfg!(test) {
info!("not launching WAL receiver because broker client hasn't been initialized");
} else {
anyhow::bail!("broker client not initialized");
}
pub fn activate(
self: &Arc<Self>,
broker_client: &'static BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.launch_wal_receiver(ctx, (*broker_client).clone())?;
self.set_state(TimelineState::Active);
self.launch_eviction_task();
Ok(())