mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
Use async RwLock around tenants (#3009)
A step towards more async code in our repo, to help avoid most of the odd blocking calls, that might deadlock, as mentioned in https://github.com/neondatabase/neon/issues/2975
This commit is contained in:
@@ -297,10 +297,20 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
|
||||
})
|
||||
.transpose()
|
||||
.context("Failed to init generic remote storage")?;
|
||||
{
|
||||
let _rt_guard = BACKGROUND_RUNTIME.enter();
|
||||
tenant_mgr::init_tenant_mgr(conf, remote_storage.clone())?
|
||||
};
|
||||
|
||||
let (init_result_sender, init_result_receiver) =
|
||||
std::sync::mpsc::channel::<anyhow::Result<()>>();
|
||||
let storage_for_spawn = remote_storage.clone();
|
||||
let _handler = BACKGROUND_RUNTIME.spawn(async move {
|
||||
let result = tenant_mgr::init_tenant_mgr(conf, storage_for_spawn).await;
|
||||
init_result_sender.send(result)
|
||||
});
|
||||
match init_result_receiver.recv() {
|
||||
Ok(init_result) => init_result.context("Failed to init tenant_mgr")?,
|
||||
Err(_sender_dropped_err) => {
|
||||
anyhow::bail!("Failed to init tenant_mgr: no init status was returned");
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn all HTTP related tasks in the MGMT_REQUEST_RUNTIME.
|
||||
// bind before launching separate thread so the error reported before startup exits
|
||||
|
||||
@@ -5,7 +5,6 @@ use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response, Uri};
|
||||
use pageserver_api::models::TenantState;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use tokio::task::JoinError;
|
||||
use tracing::*;
|
||||
|
||||
use super::models::{
|
||||
@@ -189,7 +188,9 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
.new_timeline_id
|
||||
.unwrap_or_else(TimelineId::generate);
|
||||
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::NotFound)?;
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
match tenant.create_timeline(
|
||||
new_timeline_id,
|
||||
request_data.ancestor_timeline_id.map(TimelineId::from),
|
||||
@@ -217,26 +218,30 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
query_param_present(&request, "include-non-incremental-physical-size");
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let _entered = info_span!("timeline_list", tenant = %tenant_id).entered();
|
||||
let response_data = async {
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
let timelines = tenant.list_timelines();
|
||||
|
||||
let (tenant_state, timelines) = {
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::NotFound)?;
|
||||
(tenant.current_state(), tenant.list_timelines())
|
||||
};
|
||||
let mut response_data = Vec::with_capacity(timelines.len());
|
||||
for timeline in timelines {
|
||||
let timeline_info = build_timeline_info(
|
||||
tenant.current_state(),
|
||||
&timeline,
|
||||
include_non_incremental_logical_size,
|
||||
include_non_incremental_physical_size,
|
||||
)
|
||||
.context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
let mut response_data = Vec::with_capacity(timelines.len());
|
||||
for timeline in timelines {
|
||||
let timeline_info = build_timeline_info(
|
||||
tenant_state,
|
||||
&timeline,
|
||||
include_non_incremental_logical_size,
|
||||
include_non_incremental_physical_size,
|
||||
)
|
||||
.context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
response_data.push(timeline_info);
|
||||
}
|
||||
|
||||
response_data.push(timeline_info);
|
||||
Ok(response_data)
|
||||
}
|
||||
.instrument(info_span!("timeline_list", tenant = %tenant_id))
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, response_data)
|
||||
}
|
||||
@@ -281,20 +286,16 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let timeline_info = async {
|
||||
let (tenant_state, timeline) = tokio::task::spawn_blocking(move || {
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::NotFound)?;
|
||||
Ok((
|
||||
tenant.current_state(),
|
||||
tenant.get_timeline(timeline_id, false),
|
||||
))
|
||||
})
|
||||
.await
|
||||
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))??;
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
let timeline = timeline.map_err(ApiError::NotFound)?;
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, false)
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
let timeline_info = build_timeline_info(
|
||||
tenant_state,
|
||||
tenant.current_state(),
|
||||
&timeline,
|
||||
include_non_incremental_logical_size,
|
||||
include_non_incremental_physical_size,
|
||||
@@ -322,6 +323,7 @@ async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response
|
||||
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
|
||||
|
||||
let timeline = tenant_mgr::get_tenant(tenant_id, true)
|
||||
.await
|
||||
.and_then(|tenant| tenant.get_timeline(timeline_id, true))
|
||||
.map_err(ApiError::NotFound)?;
|
||||
let result = match timeline
|
||||
@@ -395,20 +397,17 @@ async fn tenant_detach_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
async fn tenant_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let response_data = tokio::task::spawn_blocking(move || {
|
||||
let _enter = info_span!("tenant_list").entered();
|
||||
tenant_mgr::list_tenants()
|
||||
.iter()
|
||||
.map(|(id, state)| TenantInfo {
|
||||
id: *id,
|
||||
state: *state,
|
||||
current_physical_size: None,
|
||||
has_in_progress_downloads: Some(state.has_in_progress_downloads()),
|
||||
})
|
||||
.collect::<Vec<TenantInfo>>()
|
||||
})
|
||||
.await
|
||||
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?;
|
||||
let response_data = tenant_mgr::list_tenants()
|
||||
.instrument(info_span!("tenant_list"))
|
||||
.await
|
||||
.iter()
|
||||
.map(|(id, state)| TenantInfo {
|
||||
id: *id,
|
||||
state: *state,
|
||||
current_physical_size: None,
|
||||
has_in_progress_downloads: Some(state.has_in_progress_downloads()),
|
||||
})
|
||||
.collect::<Vec<TenantInfo>>();
|
||||
|
||||
json_response(StatusCode::OK, response_data)
|
||||
}
|
||||
@@ -417,9 +416,8 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let tenant_info = tokio::task::spawn_blocking(move || {
|
||||
let _enter = info_span!("tenant_status_handler", tenant = %tenant_id).entered();
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, false)?;
|
||||
let tenant_info = async {
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, false).await?;
|
||||
|
||||
// Calculate total physical size of all timelines
|
||||
let mut current_physical_size = 0;
|
||||
@@ -428,17 +426,15 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
}
|
||||
|
||||
let state = tenant.current_state();
|
||||
let tenant_info = TenantInfo {
|
||||
Ok(TenantInfo {
|
||||
id: tenant_id,
|
||||
state,
|
||||
current_physical_size: Some(current_physical_size),
|
||||
has_in_progress_downloads: Some(state.has_in_progress_downloads()),
|
||||
};
|
||||
|
||||
Ok::<_, anyhow::Error>(tenant_info)
|
||||
})
|
||||
})
|
||||
}
|
||||
.instrument(info_span!("tenant_status_handler", tenant = %tenant_id))
|
||||
.await
|
||||
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, tenant_info)
|
||||
@@ -448,7 +444,9 @@ async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, A
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::InternalServerError)?;
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, true)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
// this can be long operation, it currently is not backed by any request coalescing or similar
|
||||
let inputs = tenant
|
||||
@@ -565,22 +563,19 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
.map(TenantId::from)
|
||||
.unwrap_or_else(TenantId::generate);
|
||||
|
||||
let new_tenant = tokio::task::spawn_blocking(move || {
|
||||
let _enter = info_span!("tenant_create", tenant = ?target_tenant_id).entered();
|
||||
let state = get_state(&request);
|
||||
let state = get_state(&request);
|
||||
|
||||
tenant_mgr::create_tenant(
|
||||
state.conf,
|
||||
tenant_conf,
|
||||
target_tenant_id,
|
||||
state.remote_storage.clone(),
|
||||
)
|
||||
// FIXME: `create_tenant` can fail from both user and internal errors. Replace this
|
||||
// with better error handling once the type permits it
|
||||
.map_err(ApiError::InternalServerError)
|
||||
})
|
||||
let new_tenant = tenant_mgr::create_tenant(
|
||||
state.conf,
|
||||
tenant_conf,
|
||||
target_tenant_id,
|
||||
state.remote_storage.clone(),
|
||||
)
|
||||
.instrument(info_span!("tenant_create", tenant = ?target_tenant_id))
|
||||
.await
|
||||
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))??;
|
||||
// FIXME: `create_tenant` can fail from both user and internal errors. Replace this
|
||||
// with better error handling once the type permits it
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
Ok(match new_tenant {
|
||||
Some(tenant) => {
|
||||
@@ -671,17 +666,13 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
);
|
||||
}
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let _enter = info_span!("tenant_config", tenant = ?tenant_id).entered();
|
||||
|
||||
let state = get_state(&request);
|
||||
tenant_mgr::update_tenant_config(state.conf, tenant_conf, tenant_id)
|
||||
// FIXME: `update_tenant_config` can fail because of both user and internal errors.
|
||||
// Replace this `map_err` with better error handling once the type permits it
|
||||
.map_err(ApiError::InternalServerError)
|
||||
})
|
||||
.await
|
||||
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))??;
|
||||
let state = get_state(&request);
|
||||
tenant_mgr::update_tenant_config(state.conf, tenant_conf, tenant_id)
|
||||
.instrument(info_span!("tenant_config", tenant = ?tenant_id))
|
||||
.await
|
||||
// FIXME: `update_tenant_config` can fail because of both user and internal errors.
|
||||
// Replace this `map_err` with better error handling once the type permits it
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
@@ -728,7 +719,7 @@ async fn timeline_gc_handler(mut request: Request<Body>) -> Result<Response<Body
|
||||
|
||||
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
|
||||
|
||||
let wait_task_done = tenant_mgr::immediate_gc(tenant_id, timeline_id, gc_req)?;
|
||||
let wait_task_done = tenant_mgr::immediate_gc(tenant_id, timeline_id, gc_req).await?;
|
||||
let gc_result = wait_task_done
|
||||
.await
|
||||
.context("wait for gc task")
|
||||
@@ -745,7 +736,9 @@ async fn timeline_compact_handler(request: Request<Body>) -> Result<Response<Bod
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::NotFound)?;
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(ApiError::NotFound)?;
|
||||
@@ -764,7 +757,9 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::NotFound)?;
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
@@ -941,7 +941,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
/// ensures that queries don't fail immediately after pageserver startup, because
|
||||
/// all tenants are still loading.
|
||||
async fn get_active_tenant_with_timeout(tenant_id: TenantId) -> Result<Arc<Tenant>> {
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, false)?;
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, false).await?;
|
||||
match tokio::time::timeout(Duration::from_secs(30), tenant.wait_to_become_active()).await {
|
||||
Ok(wait_result) => wait_result
|
||||
// no .context(), the error message is good enough and some tests depend on it
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
//! This module acts as a switchboard to access different repositories managed by this
|
||||
//! page server.
|
||||
|
||||
use std::collections::hash_map;
|
||||
use std::collections::{hash_map, HashMap};
|
||||
use std::ffi::OsStr;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio::fs;
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::*;
|
||||
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
@@ -20,86 +22,49 @@ use crate::tenant_config::TenantConfOpt;
|
||||
use utils::fs_ext::PathExt;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
mod tenants_state {
|
||||
use once_cell::sync::Lazy;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
|
||||
};
|
||||
use utils::id::TenantId;
|
||||
|
||||
use crate::tenant::Tenant;
|
||||
|
||||
static TENANTS: Lazy<RwLock<HashMap<TenantId, Arc<Tenant>>>> =
|
||||
Lazy::new(|| RwLock::new(HashMap::new()));
|
||||
|
||||
pub(super) fn read_tenants() -> RwLockReadGuard<'static, HashMap<TenantId, Arc<Tenant>>> {
|
||||
TENANTS
|
||||
.read()
|
||||
.expect("Failed to read() tenants lock, it got poisoned")
|
||||
}
|
||||
|
||||
pub(super) fn write_tenants() -> RwLockWriteGuard<'static, HashMap<TenantId, Arc<Tenant>>> {
|
||||
TENANTS
|
||||
.write()
|
||||
.expect("Failed to write() tenants lock, it got poisoned")
|
||||
}
|
||||
}
|
||||
static TENANTS: Lazy<RwLock<HashMap<TenantId, Arc<Tenant>>>> =
|
||||
Lazy::new(|| RwLock::new(HashMap::new()));
|
||||
|
||||
/// Initialize repositories with locally available timelines.
|
||||
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
|
||||
/// are scheduled for download and added to the tenant once download is completed.
|
||||
pub fn init_tenant_mgr(
|
||||
#[instrument(skip(conf, remote_storage))]
|
||||
pub async fn init_tenant_mgr(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
) -> anyhow::Result<()> {
|
||||
let _entered = info_span!("init_tenant_mgr").entered();
|
||||
|
||||
// Scan local filesystem for attached tenants
|
||||
let mut number_of_tenants = 0;
|
||||
let tenants_dir = conf.tenants_path();
|
||||
for dir_entry in std::fs::read_dir(&tenants_dir)
|
||||
.with_context(|| format!("Failed to list tenants dir {}", tenants_dir.display()))?
|
||||
{
|
||||
match &dir_entry {
|
||||
Ok(dir_entry) => {
|
||||
|
||||
let mut dir_entries = fs::read_dir(&tenants_dir)
|
||||
.await
|
||||
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
|
||||
|
||||
loop {
|
||||
match dir_entries.next_entry().await {
|
||||
Ok(None) => break,
|
||||
Ok(Some(dir_entry)) => {
|
||||
let tenant_dir_path = dir_entry.path();
|
||||
if crate::is_temporary(&tenant_dir_path) {
|
||||
info!(
|
||||
"Found temporary tenant directory, removing: {}",
|
||||
tenant_dir_path.display()
|
||||
);
|
||||
if let Err(e) = std::fs::remove_dir_all(&tenant_dir_path) {
|
||||
error!(
|
||||
"Failed to remove temporary directory '{}': {:?}",
|
||||
tenant_dir_path.display(),
|
||||
e
|
||||
);
|
||||
info!("Found temporary tenant directory, removing: {tenant_dir_path:?}",);
|
||||
if let Err(e) = fs::remove_dir_all(&tenant_dir_path).await {
|
||||
error!("Failed to remove temporary directory {tenant_dir_path:?}: {e:?}");
|
||||
}
|
||||
} else {
|
||||
match load_local_tenant(conf, &tenant_dir_path, remote_storage.clone()) {
|
||||
Ok(Some(tenant)) => {
|
||||
tenants_state::write_tenants().insert(tenant.tenant_id(), tenant);
|
||||
number_of_tenants += 1;
|
||||
}
|
||||
Ok(None) => {
|
||||
// This case happens if we crash during attach before creating the attach marker file
|
||||
if let Err(e) = std::fs::remove_dir(&tenant_dir_path) {
|
||||
error!(
|
||||
"Failed to remove empty tenant directory '{}': {e:#}",
|
||||
tenant_dir_path.display()
|
||||
)
|
||||
Ok(Some(tenant)) => {
|
||||
TENANTS.write().await.insert(tenant.tenant_id(), tenant);
|
||||
number_of_tenants += 1;
|
||||
}
|
||||
Ok(None) => {
|
||||
// This case happens if we crash during attach before creating the attach marker file
|
||||
if let Err(e) = fs::remove_dir(&tenant_dir_path).await {
|
||||
error!("Failed to remove empty tenant directory {tenant_dir_path:?}: {e:#}")
|
||||
}
|
||||
}
|
||||
Err(e) => error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}"),
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to collect tenant files from dir '{}' for entry {:?}, reason: {:#}",
|
||||
tenants_dir.display(),
|
||||
dir_entry,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -107,10 +72,7 @@ pub fn init_tenant_mgr(
|
||||
// here, the pageserver startup fails altogether, causing outage for *all*
|
||||
// tenants. That seems worse.
|
||||
error!(
|
||||
"Failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
|
||||
dir_entry,
|
||||
tenants_dir.display(),
|
||||
e,
|
||||
"Failed to list tenants dir entry in directory {tenants_dir:?}, reason: {e:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -165,7 +127,7 @@ fn load_local_tenant(
|
||||
///
|
||||
pub async fn shutdown_all_tenants() {
|
||||
let tenants_to_shut_down = {
|
||||
let mut m = tenants_state::write_tenants();
|
||||
let mut m = TENANTS.write().await;
|
||||
let mut tenants_to_shut_down = Vec::with_capacity(m.len());
|
||||
for (_, tenant) in m.drain() {
|
||||
if tenant.is_active() {
|
||||
@@ -199,13 +161,13 @@ pub async fn shutdown_all_tenants() {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_tenant(
|
||||
pub async fn create_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConfOpt,
|
||||
tenant_id: TenantId,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
) -> anyhow::Result<Option<Arc<Tenant>>> {
|
||||
match tenants_state::write_tenants().entry(tenant_id) {
|
||||
match TENANTS.write().await.entry(tenant_id) {
|
||||
hash_map::Entry::Occupied(_) => {
|
||||
debug!("tenant {tenant_id} already exists");
|
||||
Ok(None)
|
||||
@@ -238,21 +200,23 @@ pub fn create_tenant(
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_tenant_config(
|
||||
pub async fn update_tenant_config(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConfOpt,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("configuring tenant {tenant_id}");
|
||||
get_tenant(tenant_id, true)?.update_tenant_config(tenant_conf);
|
||||
get_tenant(tenant_id, true)
|
||||
.await?
|
||||
.update_tenant_config(tenant_conf);
|
||||
Tenant::persist_tenant_config(&conf.tenant_config_path(tenant_id), tenant_conf, false)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
|
||||
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
|
||||
pub fn get_tenant(tenant_id: TenantId, active_only: bool) -> anyhow::Result<Arc<Tenant>> {
|
||||
let m = tenants_state::read_tenants();
|
||||
pub async fn get_tenant(tenant_id: TenantId, active_only: bool) -> anyhow::Result<Arc<Tenant>> {
|
||||
let m = TENANTS.read().await;
|
||||
let tenant = m
|
||||
.get(&tenant_id)
|
||||
.with_context(|| format!("Tenant {tenant_id} not found in the local state"))?;
|
||||
@@ -288,7 +252,7 @@ pub async fn delete_timeline(tenant_id: TenantId, timeline_id: TimelineId) -> an
|
||||
info!("waiting for timeline tasks to shutdown");
|
||||
task_mgr::shutdown_tasks(None, Some(tenant_id), Some(timeline_id)).await;
|
||||
info!("timeline task shutdown completed");
|
||||
match get_tenant(tenant_id, true) {
|
||||
match get_tenant(tenant_id, true).await {
|
||||
Ok(tenant) => {
|
||||
tenant.delete_timeline(timeline_id).await?;
|
||||
}
|
||||
@@ -303,7 +267,7 @@ pub async fn detach_tenant(
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<()> {
|
||||
let tenant = match {
|
||||
let mut tenants_accessor = tenants_state::write_tenants();
|
||||
let mut tenants_accessor = TENANTS.write().await;
|
||||
tenants_accessor.remove(&tenant_id)
|
||||
} {
|
||||
Some(tenant) => tenant,
|
||||
@@ -321,12 +285,14 @@ pub async fn detach_tenant(
|
||||
// we will attempt to remove files which no longer exist. This can be fixed by having shutdown
|
||||
// mechanism for tenant that will clean temporary data to avoid any references to ephemeral files
|
||||
let local_tenant_directory = conf.tenant_path(&tenant_id);
|
||||
fs::remove_dir_all(&local_tenant_directory).with_context(|| {
|
||||
format!(
|
||||
"Failed to remove local tenant directory '{}'",
|
||||
local_tenant_directory.display()
|
||||
)
|
||||
})?;
|
||||
fs::remove_dir_all(&local_tenant_directory)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to remove local tenant directory '{}'",
|
||||
local_tenant_directory.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -334,8 +300,10 @@ pub async fn detach_tenant(
|
||||
///
|
||||
/// Get list of tenants, for the mgmt API
|
||||
///
|
||||
pub fn list_tenants() -> Vec<(TenantId, TenantState)> {
|
||||
tenants_state::read_tenants()
|
||||
pub async fn list_tenants() -> Vec<(TenantId, TenantState)> {
|
||||
TENANTS
|
||||
.read()
|
||||
.await
|
||||
.iter()
|
||||
.map(|(id, tenant)| (*id, tenant.current_state()))
|
||||
.collect()
|
||||
@@ -350,7 +318,7 @@ pub async fn attach_tenant(
|
||||
tenant_id: TenantId,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
) -> anyhow::Result<()> {
|
||||
match tenants_state::write_tenants().entry(tenant_id) {
|
||||
match TENANTS.write().await.entry(tenant_id) {
|
||||
hash_map::Entry::Occupied(e) => {
|
||||
// Cannot attach a tenant that already exists. The error message depends on
|
||||
// the state it's in.
|
||||
@@ -378,12 +346,12 @@ use {
|
||||
};
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
pub fn immediate_gc(
|
||||
pub async fn immediate_gc(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
gc_req: TimelineGcRequest,
|
||||
) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
|
||||
let guard = tenants_state::read_tenants();
|
||||
let guard = TENANTS.read().await;
|
||||
|
||||
let tenant = guard
|
||||
.get(&tenant_id)
|
||||
|
||||
@@ -155,7 +155,7 @@ async fn wait_for_active_tenant(
|
||||
wait: Duration,
|
||||
) -> ControlFlow<(), Arc<Tenant>> {
|
||||
let tenant = loop {
|
||||
match tenant_mgr::get_tenant(tenant_id, false) {
|
||||
match tenant_mgr::get_tenant(tenant_id, false).await {
|
||||
Ok(tenant) => break tenant,
|
||||
Err(e) => {
|
||||
error!("Failed to get a tenant {tenant_id}: {e:#}");
|
||||
|
||||
Reference in New Issue
Block a user