From 0a5043fae571d47dfc4fd9e89b9bb5c4d6fd9dd3 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 27 Mar 2023 20:02:41 +0300 Subject: [PATCH] refactor: less static mutexes --- pageserver/src/bin/pageserver.rs | 27 ++++++++++++++++++---- pageserver/src/disk_usage_eviction_task.rs | 20 ++++++++++++---- pageserver/src/http/routes.rs | 11 ++++++++- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index f2f318adda..1400ef0e31 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -8,7 +8,9 @@ use anyhow::{anyhow, Context}; use clap::{Arg, ArgAction, Command}; use fail::FailScenario; use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp}; -use pageserver::disk_usage_eviction_task::launch_disk_usage_global_eviction_task; +use pageserver::disk_usage_eviction_task::{ + launch_disk_usage_global_eviction_task, DiskUsageEvictionState, +}; use remote_storage::GenericRemoteStorage; use tracing::*; @@ -320,8 +322,17 @@ fn start_pageserver( // Scan the local 'tenants/' directory and start loading the tenants BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(conf, remote_storage.clone()))?; + // shared state between the background task and the http endpoint; note that the http endpoint + // is still accessible even if background task is not configured as long as remote storage has + // been configured. + let disk_usage_eviction_state: Arc = Arc::default(); + if let Some(remote_storage) = &remote_storage { - launch_disk_usage_global_eviction_task(conf, remote_storage.clone())?; + launch_disk_usage_global_eviction_task( + conf, + remote_storage.clone(), + disk_usage_eviction_state.clone(), + )?; } // Start up the service to handle HTTP mgmt API request. We created the @@ -329,9 +340,15 @@ fn start_pageserver( { let _rt_guard = MGMT_REQUEST_RUNTIME.enter(); - let router = http::make_router(conf, launch_ts, http_auth, remote_storage)? - .build() - .map_err(|err| anyhow!(err))?; + let router = http::make_router( + conf, + launch_ts, + http_auth, + remote_storage, + disk_usage_eviction_state, + )? + .build() + .map_err(|err| anyhow!(err))?; let service = utils::http::RouterService::new(router).unwrap(); let server = hyper::Server::from_tcp(http_listener)? .serve(service) diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index a7ab5b8db7..38aeaf275a 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -70,9 +70,16 @@ pub struct DiskUsageEvictionTaskConfig { pub period: Duration, } +#[derive(Default)] +pub struct DiskUsageEvictionState { + /// Exclude http requests and background task from running at the same time. + mutex: tokio::sync::Mutex<()>, +} + pub fn launch_disk_usage_global_eviction_task( conf: &'static PageServerConf, storage: GenericRemoteStorage, + state: Arc, ) -> anyhow::Result<()> { let Some(task_config) = &conf.disk_usage_based_eviction else { info!("disk usage based eviction task not configured"); @@ -100,6 +107,7 @@ pub fn launch_disk_usage_global_eviction_task( false, async move { disk_usage_eviction_task( + &*state, task_config, storage, tenants_dir_fd, @@ -116,6 +124,7 @@ pub fn launch_disk_usage_global_eviction_task( #[instrument(skip_all)] async fn disk_usage_eviction_task( + state: &DiskUsageEvictionState, task_config: &DiskUsageEvictionTaskConfig, storage: GenericRemoteStorage, tenants_dir_fd: Dir, @@ -147,6 +156,7 @@ async fn disk_usage_eviction_task( async { let res = disk_usage_eviction_task_iteration( + state, task_config, &storage, &mut tenants_dir_fd, @@ -182,6 +192,7 @@ pub trait Usage: Clone + Copy + std::fmt::Debug { } async fn disk_usage_eviction_task_iteration( + state: &DiskUsageEvictionState, task_config: &DiskUsageEvictionTaskConfig, storage: &GenericRemoteStorage, tenants_dir_fd: &mut SyncWrapper, @@ -189,7 +200,7 @@ async fn disk_usage_eviction_task_iteration( ) -> anyhow::Result<()> { let usage_pre = filesystem_level_usage::get(tenants_dir_fd, task_config) .context("get filesystem-level disk usage before evictions")?; - let res = disk_usage_eviction_task_iteration_impl(storage, usage_pre, cancel).await; + let res = disk_usage_eviction_task_iteration_impl(state, storage, usage_pre, cancel).await; match res { Ok(outcome) => { debug!(?outcome, "disk_usage_eviction_iteration finished"); @@ -273,14 +284,13 @@ struct LayerCount { #[allow(clippy::needless_late_init)] pub async fn disk_usage_eviction_task_iteration_impl( + state: &DiskUsageEvictionState, storage: &GenericRemoteStorage, usage_pre: U, cancel: &CancellationToken, ) -> anyhow::Result> { - static MUTEX: once_cell::sync::Lazy> = - once_cell::sync::Lazy::new(|| tokio::sync::Mutex::new(())); - - let _g = MUTEX + let _g = state + .mutex .try_lock() .map_err(|_| anyhow::anyhow!("iteration is already executing"))?; diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 49be55f9e9..d0ed97fefe 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -18,6 +18,7 @@ use super::models::{ TimelineCreateRequest, TimelineGcRequest, TimelineInfo, }; use crate::context::{DownloadBehavior, RequestContext}; +use crate::disk_usage_eviction_task::DiskUsageEvictionState; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::task_mgr::TaskKind; use crate::tenant::config::TenantConfOpt; @@ -48,6 +49,7 @@ struct State { auth: Option>, allowlist_routes: Vec, remote_storage: Option, + disk_usage_eviction_state: Arc, } impl State { @@ -55,6 +57,7 @@ impl State { conf: &'static PageServerConf, auth: Option>, remote_storage: Option, + disk_usage_eviction_state: Arc, ) -> anyhow::Result { let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"] .iter() @@ -65,6 +68,7 @@ impl State { auth, allowlist_routes, remote_storage, + disk_usage_eviction_state, }) } } @@ -1124,6 +1128,8 @@ async fn disk_usage_eviction_run(mut r: Request) -> Result, ))) }; + let state = state.disk_usage_eviction_state.clone(); + let cancel = CancellationToken::new(); let child_cancel = cancel.clone(); let _g = cancel.drop_guard(); @@ -1137,6 +1143,7 @@ async fn disk_usage_eviction_run(mut r: Request) -> Result, false, async move { let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl( + &state, &storage, usage, &child_cancel, @@ -1168,6 +1175,7 @@ pub fn make_router( launch_ts: &'static LaunchTimestamp, auth: Option>, remote_storage: Option, + disk_usage_eviction_state: Arc, ) -> anyhow::Result> { let spec = include_bytes!("openapi_spec.yml"); let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc"); @@ -1212,7 +1220,8 @@ pub fn make_router( Ok(router .data(Arc::new( - State::new(conf, auth, remote_storage).context("Failed to initialize router state")?, + State::new(conf, auth, remote_storage, disk_usage_eviction_state) + .context("Failed to initialize router state")?, )) .get("/v1/status", |r| RequestSpan(status_handler).handle(r)) .put(