diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 32d3fca47c..6c774ae1ae 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -297,10 +297,20 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> { }) .transpose() .context("Failed to init generic remote storage")?; - { - let _rt_guard = BACKGROUND_RUNTIME.enter(); - tenant_mgr::init_tenant_mgr(conf, remote_storage.clone())? - }; + + let (init_result_sender, init_result_receiver) = + std::sync::mpsc::channel::>(); + let storage_for_spawn = remote_storage.clone(); + let _handler = BACKGROUND_RUNTIME.spawn(async move { + let result = tenant_mgr::init_tenant_mgr(conf, storage_for_spawn).await; + init_result_sender.send(result) + }); + match init_result_receiver.recv() { + Ok(init_result) => init_result.context("Failed to init tenant_mgr")?, + Err(_sender_dropped_err) => { + anyhow::bail!("Failed to init tenant_mgr: no init status was returned"); + } + } // Spawn all HTTP related tasks in the MGMT_REQUEST_RUNTIME. // bind before launching separate thread so the error reported before startup exits diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 32f96b3c5c..db262598d7 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -5,7 +5,6 @@ use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; use pageserver_api::models::TenantState; use remote_storage::GenericRemoteStorage; -use tokio::task::JoinError; use tracing::*; use super::models::{ @@ -189,7 +188,9 @@ async fn timeline_create_handler(mut request: Request) -> Result) -> Result, query_param_present(&request, "include-non-incremental-physical-size"); check_permission(&request, Some(tenant_id))?; - let _entered = info_span!("timeline_list", tenant = %tenant_id).entered(); + let response_data = async { + let tenant = tenant_mgr::get_tenant(tenant_id, true) + .await + .map_err(ApiError::NotFound)?; + let timelines = tenant.list_timelines(); - let (tenant_state, timelines) = { - let tenant = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::NotFound)?; - (tenant.current_state(), tenant.list_timelines()) - }; + let mut response_data = Vec::with_capacity(timelines.len()); + for timeline in timelines { + let timeline_info = build_timeline_info( + tenant.current_state(), + &timeline, + include_non_incremental_logical_size, + include_non_incremental_physical_size, + ) + .context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}") + .map_err(ApiError::InternalServerError)?; - let mut response_data = Vec::with_capacity(timelines.len()); - for timeline in timelines { - let timeline_info = build_timeline_info( - tenant_state, - &timeline, - include_non_incremental_logical_size, - include_non_incremental_physical_size, - ) - .context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}") - .map_err(ApiError::InternalServerError)?; + response_data.push(timeline_info); + } - response_data.push(timeline_info); + Ok(response_data) } + .instrument(info_span!("timeline_list", tenant = %tenant_id)) + .await?; json_response(StatusCode::OK, response_data) } @@ -281,20 +286,16 @@ async fn timeline_detail_handler(request: Request) -> Result) -> Result) -> Result, async fn tenant_list_handler(request: Request) -> Result, ApiError> { check_permission(&request, None)?; - let response_data = tokio::task::spawn_blocking(move || { - let _enter = info_span!("tenant_list").entered(); - tenant_mgr::list_tenants() - .iter() - .map(|(id, state)| TenantInfo { - id: *id, - state: *state, - current_physical_size: None, - has_in_progress_downloads: Some(state.has_in_progress_downloads()), - }) - .collect::>() - }) - .await - .map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?; + let response_data = tenant_mgr::list_tenants() + .instrument(info_span!("tenant_list")) + .await + .iter() + .map(|(id, state)| TenantInfo { + id: *id, + state: *state, + current_physical_size: None, + has_in_progress_downloads: Some(state.has_in_progress_downloads()), + }) + .collect::>(); json_response(StatusCode::OK, response_data) } @@ -417,9 +416,8 @@ async fn tenant_status(request: Request) -> Result, ApiErro let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; - let tenant_info = tokio::task::spawn_blocking(move || { - let _enter = info_span!("tenant_status_handler", tenant = %tenant_id).entered(); - let tenant = tenant_mgr::get_tenant(tenant_id, false)?; + let tenant_info = async { + let tenant = tenant_mgr::get_tenant(tenant_id, false).await?; // Calculate total physical size of all timelines let mut current_physical_size = 0; @@ -428,17 +426,15 @@ async fn tenant_status(request: Request) -> Result, ApiErro } let state = tenant.current_state(); - let tenant_info = TenantInfo { + Ok(TenantInfo { id: tenant_id, state, current_physical_size: Some(current_physical_size), has_in_progress_downloads: Some(state.has_in_progress_downloads()), - }; - - Ok::<_, anyhow::Error>(tenant_info) - }) + }) + } + .instrument(info_span!("tenant_status_handler", tenant = %tenant_id)) .await - .map_err(|e: JoinError| ApiError::InternalServerError(e.into()))? .map_err(ApiError::InternalServerError)?; json_response(StatusCode::OK, tenant_info) @@ -448,7 +444,9 @@ async fn tenant_size_handler(request: Request) -> Result, A let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; - let tenant = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::InternalServerError)?; + let tenant = tenant_mgr::get_tenant(tenant_id, true) + .await + .map_err(ApiError::InternalServerError)?; // this can be long operation, it currently is not backed by any request coalescing or similar let inputs = tenant @@ -565,22 +563,19 @@ async fn tenant_create_handler(mut request: Request) -> Result { @@ -671,17 +666,13 @@ async fn tenant_config_handler(mut request: Request) -> Result) -> Result) -> Result) -> Result Result> { - let tenant = tenant_mgr::get_tenant(tenant_id, false)?; + let tenant = tenant_mgr::get_tenant(tenant_id, false).await?; match tokio::time::timeout(Duration::from_secs(30), tenant.wait_to_become_active()).await { Ok(wait_result) => wait_result // no .context(), the error message is good enough and some tests depend on it diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 70de713a26..bd765dabf8 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -1,13 +1,15 @@ //! This module acts as a switchboard to access different repositories managed by this //! page server. -use std::collections::hash_map; +use std::collections::{hash_map, HashMap}; use std::ffi::OsStr; -use std::fs; use std::path::Path; use std::sync::Arc; +use tokio::fs; use anyhow::Context; +use once_cell::sync::Lazy; +use tokio::sync::RwLock; use tracing::*; use remote_storage::GenericRemoteStorage; @@ -20,86 +22,49 @@ use crate::tenant_config::TenantConfOpt; use utils::fs_ext::PathExt; use utils::id::{TenantId, TimelineId}; -mod tenants_state { - use once_cell::sync::Lazy; - use std::{ - collections::HashMap, - sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, - }; - use utils::id::TenantId; - - use crate::tenant::Tenant; - - static TENANTS: Lazy>>> = - Lazy::new(|| RwLock::new(HashMap::new())); - - pub(super) fn read_tenants() -> RwLockReadGuard<'static, HashMap>> { - TENANTS - .read() - .expect("Failed to read() tenants lock, it got poisoned") - } - - pub(super) fn write_tenants() -> RwLockWriteGuard<'static, HashMap>> { - TENANTS - .write() - .expect("Failed to write() tenants lock, it got poisoned") - } -} +static TENANTS: Lazy>>> = + Lazy::new(|| RwLock::new(HashMap::new())); /// Initialize repositories with locally available timelines. /// Timelines that are only partially available locally (remote storage has more data than this pageserver) /// are scheduled for download and added to the tenant once download is completed. -pub fn init_tenant_mgr( +#[instrument(skip(conf, remote_storage))] +pub async fn init_tenant_mgr( conf: &'static PageServerConf, remote_storage: Option, ) -> anyhow::Result<()> { - let _entered = info_span!("init_tenant_mgr").entered(); - // Scan local filesystem for attached tenants let mut number_of_tenants = 0; let tenants_dir = conf.tenants_path(); - for dir_entry in std::fs::read_dir(&tenants_dir) - .with_context(|| format!("Failed to list tenants dir {}", tenants_dir.display()))? - { - match &dir_entry { - Ok(dir_entry) => { + + let mut dir_entries = fs::read_dir(&tenants_dir) + .await + .with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?; + + loop { + match dir_entries.next_entry().await { + Ok(None) => break, + Ok(Some(dir_entry)) => { let tenant_dir_path = dir_entry.path(); if crate::is_temporary(&tenant_dir_path) { - info!( - "Found temporary tenant directory, removing: {}", - tenant_dir_path.display() - ); - if let Err(e) = std::fs::remove_dir_all(&tenant_dir_path) { - error!( - "Failed to remove temporary directory '{}': {:?}", - tenant_dir_path.display(), - e - ); + info!("Found temporary tenant directory, removing: {tenant_dir_path:?}",); + if let Err(e) = fs::remove_dir_all(&tenant_dir_path).await { + error!("Failed to remove temporary directory {tenant_dir_path:?}: {e:?}"); } } else { match load_local_tenant(conf, &tenant_dir_path, remote_storage.clone()) { - Ok(Some(tenant)) => { - tenants_state::write_tenants().insert(tenant.tenant_id(), tenant); - number_of_tenants += 1; - } - Ok(None) => { - // This case happens if we crash during attach before creating the attach marker file - if let Err(e) = std::fs::remove_dir(&tenant_dir_path) { - error!( - "Failed to remove empty tenant directory '{}': {e:#}", - tenant_dir_path.display() - ) + Ok(Some(tenant)) => { + TENANTS.write().await.insert(tenant.tenant_id(), tenant); + number_of_tenants += 1; } + Ok(None) => { + // This case happens if we crash during attach before creating the attach marker file + if let Err(e) = fs::remove_dir(&tenant_dir_path).await { + error!("Failed to remove empty tenant directory {tenant_dir_path:?}: {e:#}") + } + } + Err(e) => error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}"), } - Err(e) => { - error!( - "Failed to collect tenant files from dir '{}' for entry {:?}, reason: {:#}", - tenants_dir.display(), - dir_entry, - e - ); - } - } } } Err(e) => { @@ -107,10 +72,7 @@ pub fn init_tenant_mgr( // here, the pageserver startup fails altogether, causing outage for *all* // tenants. That seems worse. error!( - "Failed to list tenants dir entry {:?} in directory {}, reason: {:?}", - dir_entry, - tenants_dir.display(), - e, + "Failed to list tenants dir entry in directory {tenants_dir:?}, reason: {e:?}" ); } } @@ -165,7 +127,7 @@ fn load_local_tenant( /// pub async fn shutdown_all_tenants() { let tenants_to_shut_down = { - let mut m = tenants_state::write_tenants(); + let mut m = TENANTS.write().await; let mut tenants_to_shut_down = Vec::with_capacity(m.len()); for (_, tenant) in m.drain() { if tenant.is_active() { @@ -199,13 +161,13 @@ pub async fn shutdown_all_tenants() { } } -pub fn create_tenant( +pub async fn create_tenant( conf: &'static PageServerConf, tenant_conf: TenantConfOpt, tenant_id: TenantId, remote_storage: Option, ) -> anyhow::Result>> { - match tenants_state::write_tenants().entry(tenant_id) { + match TENANTS.write().await.entry(tenant_id) { hash_map::Entry::Occupied(_) => { debug!("tenant {tenant_id} already exists"); Ok(None) @@ -238,21 +200,23 @@ pub fn create_tenant( } } -pub fn update_tenant_config( +pub async fn update_tenant_config( conf: &'static PageServerConf, tenant_conf: TenantConfOpt, tenant_id: TenantId, ) -> anyhow::Result<()> { info!("configuring tenant {tenant_id}"); - get_tenant(tenant_id, true)?.update_tenant_config(tenant_conf); + get_tenant(tenant_id, true) + .await? + .update_tenant_config(tenant_conf); Tenant::persist_tenant_config(&conf.tenant_config_path(tenant_id), tenant_conf, false)?; Ok(()) } /// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query. /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants. -pub fn get_tenant(tenant_id: TenantId, active_only: bool) -> anyhow::Result> { - let m = tenants_state::read_tenants(); +pub async fn get_tenant(tenant_id: TenantId, active_only: bool) -> anyhow::Result> { + let m = TENANTS.read().await; let tenant = m .get(&tenant_id) .with_context(|| format!("Tenant {tenant_id} not found in the local state"))?; @@ -288,7 +252,7 @@ pub async fn delete_timeline(tenant_id: TenantId, timeline_id: TimelineId) -> an info!("waiting for timeline tasks to shutdown"); task_mgr::shutdown_tasks(None, Some(tenant_id), Some(timeline_id)).await; info!("timeline task shutdown completed"); - match get_tenant(tenant_id, true) { + match get_tenant(tenant_id, true).await { Ok(tenant) => { tenant.delete_timeline(timeline_id).await?; } @@ -303,7 +267,7 @@ pub async fn detach_tenant( tenant_id: TenantId, ) -> anyhow::Result<()> { let tenant = match { - let mut tenants_accessor = tenants_state::write_tenants(); + let mut tenants_accessor = TENANTS.write().await; tenants_accessor.remove(&tenant_id) } { Some(tenant) => tenant, @@ -321,12 +285,14 @@ pub async fn detach_tenant( // we will attempt to remove files which no longer exist. This can be fixed by having shutdown // mechanism for tenant that will clean temporary data to avoid any references to ephemeral files let local_tenant_directory = conf.tenant_path(&tenant_id); - fs::remove_dir_all(&local_tenant_directory).with_context(|| { - format!( - "Failed to remove local tenant directory '{}'", - local_tenant_directory.display() - ) - })?; + fs::remove_dir_all(&local_tenant_directory) + .await + .with_context(|| { + format!( + "Failed to remove local tenant directory '{}'", + local_tenant_directory.display() + ) + })?; Ok(()) } @@ -334,8 +300,10 @@ pub async fn detach_tenant( /// /// Get list of tenants, for the mgmt API /// -pub fn list_tenants() -> Vec<(TenantId, TenantState)> { - tenants_state::read_tenants() +pub async fn list_tenants() -> Vec<(TenantId, TenantState)> { + TENANTS + .read() + .await .iter() .map(|(id, tenant)| (*id, tenant.current_state())) .collect() @@ -350,7 +318,7 @@ pub async fn attach_tenant( tenant_id: TenantId, remote_storage: &GenericRemoteStorage, ) -> anyhow::Result<()> { - match tenants_state::write_tenants().entry(tenant_id) { + match TENANTS.write().await.entry(tenant_id) { hash_map::Entry::Occupied(e) => { // Cannot attach a tenant that already exists. The error message depends on // the state it's in. @@ -378,12 +346,12 @@ use { }; #[cfg(feature = "testing")] -pub fn immediate_gc( +pub async fn immediate_gc( tenant_id: TenantId, timeline_id: TimelineId, gc_req: TimelineGcRequest, ) -> Result>, ApiError> { - let guard = tenants_state::read_tenants(); + let guard = TENANTS.read().await; let tenant = guard .get(&tenant_id) diff --git a/pageserver/src/tenant_tasks.rs b/pageserver/src/tenant_tasks.rs index d17f0eed43..d3aec933c2 100644 --- a/pageserver/src/tenant_tasks.rs +++ b/pageserver/src/tenant_tasks.rs @@ -155,7 +155,7 @@ async fn wait_for_active_tenant( wait: Duration, ) -> ControlFlow<(), Arc> { let tenant = loop { - match tenant_mgr::get_tenant(tenant_id, false) { + match tenant_mgr::get_tenant(tenant_id, false).await { Ok(tenant) => break tenant, Err(e) => { error!("Failed to get a tenant {tenant_id}: {e:#}");