refactor: less static mutexes

This commit is contained in:
Joonas Koivunen
2023-03-27 20:02:41 +03:00
committed by Joonas Koivunen
parent 0462563d31
commit 0a5043fae5
3 changed files with 47 additions and 11 deletions

View File

@@ -8,7 +8,9 @@ use anyhow::{anyhow, Context};
use clap::{Arg, ArgAction, Command};
use fail::FailScenario;
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use pageserver::disk_usage_eviction_task::launch_disk_usage_global_eviction_task;
use pageserver::disk_usage_eviction_task::{
launch_disk_usage_global_eviction_task, DiskUsageEvictionState,
};
use remote_storage::GenericRemoteStorage;
use tracing::*;
@@ -320,8 +322,17 @@ fn start_pageserver(
// Scan the local 'tenants/' directory and start loading the tenants
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(conf, remote_storage.clone()))?;
// shared state between the background task and the http endpoint; note that the http endpoint
// is still accessible even if background task is not configured as long as remote storage has
// been configured.
let disk_usage_eviction_state: Arc<DiskUsageEvictionState> = Arc::default();
if let Some(remote_storage) = &remote_storage {
launch_disk_usage_global_eviction_task(conf, remote_storage.clone())?;
launch_disk_usage_global_eviction_task(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
)?;
}
// Start up the service to handle HTTP mgmt API request. We created the
@@ -329,9 +340,15 @@ fn start_pageserver(
{
let _rt_guard = MGMT_REQUEST_RUNTIME.enter();
let router = http::make_router(conf, launch_ts, http_auth, remote_storage)?
.build()
.map_err(|err| anyhow!(err))?;
let router = http::make_router(
conf,
launch_ts,
http_auth,
remote_storage,
disk_usage_eviction_state,
)?
.build()
.map_err(|err| anyhow!(err))?;
let service = utils::http::RouterService::new(router).unwrap();
let server = hyper::Server::from_tcp(http_listener)?
.serve(service)

View File

@@ -70,9 +70,16 @@ pub struct DiskUsageEvictionTaskConfig {
pub period: Duration,
}
#[derive(Default)]
pub struct DiskUsageEvictionState {
/// Exclude http requests and background task from running at the same time.
mutex: tokio::sync::Mutex<()>,
}
pub fn launch_disk_usage_global_eviction_task(
conf: &'static PageServerConf,
storage: GenericRemoteStorage,
state: Arc<DiskUsageEvictionState>,
) -> anyhow::Result<()> {
let Some(task_config) = &conf.disk_usage_based_eviction else {
info!("disk usage based eviction task not configured");
@@ -100,6 +107,7 @@ pub fn launch_disk_usage_global_eviction_task(
false,
async move {
disk_usage_eviction_task(
&*state,
task_config,
storage,
tenants_dir_fd,
@@ -116,6 +124,7 @@ pub fn launch_disk_usage_global_eviction_task(
#[instrument(skip_all)]
async fn disk_usage_eviction_task(
state: &DiskUsageEvictionState,
task_config: &DiskUsageEvictionTaskConfig,
storage: GenericRemoteStorage,
tenants_dir_fd: Dir,
@@ -147,6 +156,7 @@ async fn disk_usage_eviction_task(
async {
let res = disk_usage_eviction_task_iteration(
state,
task_config,
&storage,
&mut tenants_dir_fd,
@@ -182,6 +192,7 @@ pub trait Usage: Clone + Copy + std::fmt::Debug {
}
async fn disk_usage_eviction_task_iteration(
state: &DiskUsageEvictionState,
task_config: &DiskUsageEvictionTaskConfig,
storage: &GenericRemoteStorage,
tenants_dir_fd: &mut SyncWrapper<Dir>,
@@ -189,7 +200,7 @@ async fn disk_usage_eviction_task_iteration(
) -> anyhow::Result<()> {
let usage_pre = filesystem_level_usage::get(tenants_dir_fd, task_config)
.context("get filesystem-level disk usage before evictions")?;
let res = disk_usage_eviction_task_iteration_impl(storage, usage_pre, cancel).await;
let res = disk_usage_eviction_task_iteration_impl(state, storage, usage_pre, cancel).await;
match res {
Ok(outcome) => {
debug!(?outcome, "disk_usage_eviction_iteration finished");
@@ -273,14 +284,13 @@ struct LayerCount {
#[allow(clippy::needless_late_init)]
pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
state: &DiskUsageEvictionState,
storage: &GenericRemoteStorage,
usage_pre: U,
cancel: &CancellationToken,
) -> anyhow::Result<IterationOutcome<U>> {
static MUTEX: once_cell::sync::Lazy<tokio::sync::Mutex<()>> =
once_cell::sync::Lazy::new(|| tokio::sync::Mutex::new(()));
let _g = MUTEX
let _g = state
.mutex
.try_lock()
.map_err(|_| anyhow::anyhow!("iteration is already executing"))?;

View File

@@ -18,6 +18,7 @@ use super::models::{
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
};
use crate::context::{DownloadBehavior, RequestContext};
use crate::disk_usage_eviction_task::DiskUsageEvictionState;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::TenantConfOpt;
@@ -48,6 +49,7 @@ struct State {
auth: Option<Arc<JwtAuth>>,
allowlist_routes: Vec<Uri>,
remote_storage: Option<GenericRemoteStorage>,
disk_usage_eviction_state: Arc<DiskUsageEvictionState>,
}
impl State {
@@ -55,6 +57,7 @@ impl State {
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
disk_usage_eviction_state: Arc<DiskUsageEvictionState>,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
.iter()
@@ -65,6 +68,7 @@ impl State {
auth,
allowlist_routes,
remote_storage,
disk_usage_eviction_state,
})
}
}
@@ -1124,6 +1128,8 @@ async fn disk_usage_eviction_run(mut r: Request<Body>) -> Result<Response<Body>,
)))
};
let state = state.disk_usage_eviction_state.clone();
let cancel = CancellationToken::new();
let child_cancel = cancel.clone();
let _g = cancel.drop_guard();
@@ -1137,6 +1143,7 @@ async fn disk_usage_eviction_run(mut r: Request<Body>) -> Result<Response<Body>,
false,
async move {
let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
&state,
&storage,
usage,
&child_cancel,
@@ -1168,6 +1175,7 @@ pub fn make_router(
launch_ts: &'static LaunchTimestamp,
auth: Option<Arc<JwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
disk_usage_eviction_state: Arc<DiskUsageEvictionState>,
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
let spec = include_bytes!("openapi_spec.yml");
let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
@@ -1212,7 +1220,8 @@ pub fn make_router(
Ok(router
.data(Arc::new(
State::new(conf, auth, remote_storage).context("Failed to initialize router state")?,
State::new(conf, auth, remote_storage, disk_usage_eviction_state)
.context("Failed to initialize router state")?,
))
.get("/v1/status", |r| RequestSpan(status_handler).handle(r))
.put(