Compare commits

...

13 Commits

Author SHA1 Message Date
Konstantin Knizhnik
4015f5d952 Add comment for NotLoaded TenentState 2023-06-30 15:14:34 +03:00
Konstantin Knizhnik
f5e0a63041 Update pageserver/src/tenant/mgr.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-06-29 22:30:20 +03:00
Konstantin Knizhnik
012539a0e7 Update pageserver/src/tenant/mgr.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-06-29 22:26:53 +03:00
Konstantin Knizhnik
41f2db3a58 Add comment about list_tenants() 2023-06-04 14:15:48 +03:00
Konstantin Knizhnik
1178dbc614 Add GetTenantError::NotActivated 2023-06-04 09:38:00 +03:00
Konstantin Knizhnik
e4b345a3c1 Wait for tenant activation during lazy loading 2023-06-03 23:06:30 +03:00
Konstantin Knizhnik
963690e77a Fix unit tests 2023-06-03 16:01:13 +03:00
Konstantin Knizhnik
15fae34751 Fix style 2023-06-03 14:35:43 +03:00
Konstantin Knizhnik
a3b7d068c1 Make clippy happy 2023-06-03 08:39:24 +03:00
Konstantin Knizhnik
198d256b7d Load tenants in list_tenants for testing feature 2023-06-02 22:58:41 +03:00
Konstantin Knizhnik
3e044a1405 Add TenantState::NotLoaded 2023-06-02 22:06:39 +03:00
Konstantin Knizhnik
36eb1c83f3 Make clippy happy 2023-06-02 18:17:51 +03:00
Konstantin Knizhnik
fc27d871ed Lazy loadig of tenants on pageserver startup 2023-06-02 15:37:44 +03:00
6 changed files with 224 additions and 85 deletions

View File

@@ -54,6 +54,10 @@ use bytes::{BufMut, Bytes, BytesMut};
)]
#[serde(tag = "slug", content = "data")]
pub enum TenantState {
/// This tenant is not yet loaded. This state is not actually used internally because not loaded tenants are handled using OnceSet.
/// This value is needed only for reporting state of such tenants by list_tenants() function
///
NotLoaded,
/// This tenant is being loaded from local disk.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
@@ -104,7 +108,7 @@ impl TenantState {
Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
// tenant mgr startup distinguishes attaching from loading via marker file.
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
Self::NotLoaded | Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
// We only reach Active after successful load / attach.
// So, call atttachment status Attached.
Self::Active => Attached,
@@ -907,6 +911,7 @@ mod tests {
fn tenantstatus_activating_strum() {
// tests added, because we use these for metrics
let examples = [
(line!(), TenantState::NotLoaded, "NotLoaded"),
(line!(), TenantState::Loading, "Loading"),
(line!(), TenantState::Attaching, "Attaching"),
(

View File

@@ -159,6 +159,12 @@ impl From<GetTenantError> for ApiError {
// (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
ApiError::InternalServerError(anyhow::Error::new(e))
}
e @ GetTenantError::NotLoaded(_, _) => {
ApiError::InternalServerError(anyhow::Error::new(e))
}
e @ GetTenantError::NotActivated(_, _) => {
ApiError::InternalServerError(anyhow::Error::new(e))
}
}
}
}

View File

@@ -1179,6 +1179,10 @@ async fn get_active_tenant_with_timeout(
let tenant = match mgr::get_tenant(tenant_id, false).await {
Ok(tenant) => tenant,
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
Err(e @ GetTenantError::NotLoaded(_, _)) => return Err(GetActiveTenantError::NotFound(e)),
Err(e @ GetTenantError::NotActivated(_, _)) => {
return Err(GetActiveTenantError::NotFound(e))
}
Err(GetTenantError::NotActive(_)) => {
unreachable!("we're calling get_tenant with active=false")
}

View File

@@ -459,7 +459,7 @@ struct RemoteStartupData {
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum WaitToBecomeActiveError {
pub enum WaitToBecomeActiveError {
WillNotBecomeActive {
tenant_id: TenantId,
state: TenantState,
@@ -1694,7 +1694,7 @@ impl Tenant {
self.state.send_modify(|current_state| {
use pageserver_api::models::ActivatingFrom;
match &*current_state {
TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping => {
TenantState::NotLoaded | TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping => {
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
}
TenantState::Loading => {
@@ -1761,7 +1761,10 @@ impl Tenant {
// cannot stop before we're done activating, so wait out until we're done activating
rx.wait_for(|state| match state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
TenantState::NotLoaded
| TenantState::Activating(_)
| TenantState::Loading
| TenantState::Attaching => {
info!(
"waiting for {} to turn Active|Broken|Stopping",
<&'static str>::from(state)
@@ -1776,7 +1779,7 @@ impl Tenant {
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
let mut err = None;
let stopping = self.state.send_if_modified(|current_state| match current_state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
TenantState::NotLoaded | TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Active => {
@@ -1834,7 +1837,10 @@ impl Tenant {
// The load & attach routines own the tenant state until it has reached `Active`.
// So, wait until it's done.
rx.wait_for(|state| match state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
TenantState::NotLoaded
| TenantState::Activating(_)
| TenantState::Loading
| TenantState::Attaching => {
info!(
"waiting for {} to turn Active|Broken|Stopping",
<&'static str>::from(state)
@@ -1849,7 +1855,7 @@ impl Tenant {
// we now know we're done activating, let's see whether this task is the winner to transition into Broken
self.state.send_modify(|current_state| {
match *current_state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
TenantState::NotLoaded | TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Active => {
@@ -1884,7 +1890,10 @@ impl Tenant {
loop {
let current_state = receiver.borrow_and_update().clone();
match current_state {
TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
TenantState::NotLoaded
| TenantState::Loading
| TenantState::Attaching
| TenantState::Activating(_) => {
// in these states, there's a chance that we can reach ::Active
receiver.changed().await.map_err(
|_e: tokio::sync::watch::error::RecvError| {

View File

@@ -9,7 +9,7 @@ use tokio::fs;
use anyhow::Context;
use once_cell::sync::Lazy;
use tokio::sync::RwLock;
use tokio::sync::{OnceCell, RwLock};
use tokio::task::JoinSet;
use tracing::*;
@@ -22,6 +22,7 @@ use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{
create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState,
WaitToBecomeActiveError,
};
use crate::IGNORED_TENANT_FILE_NAME;
@@ -29,6 +30,65 @@ use utils::completion;
use utils::fs_ext::PathExt;
use utils::id::{TenantId, TimelineId};
struct LazyTenantsMap {
conf: &'static PageServerConf,
map: HashMap<TenantId, OnceCell<Arc<Tenant>>>,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
}
impl LazyTenantsMap {
fn load_tenant(&self, tenant_id: &TenantId) -> anyhow::Result<Arc<Tenant>> {
let tenant_path = self.conf.tenant_path(tenant_id);
let tenant_ignore_mark = self.conf.tenant_ignore_mark_file_path(*tenant_id);
if tenant_ignore_mark.exists() {
std::fs::remove_file(&tenant_ignore_mark)
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
}
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
schedule_local_tenant_processing(
self.conf,
&tenant_path,
self.broker_client.clone(),
self.remote_storage.clone(),
None,
&ctx,
)
.with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))
}
async fn try_load_tenant(
&self,
tenant_id: &TenantId,
wait_to_become_active: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
let tenant = self
.load_tenant(tenant_id)
.map_err(|e| GetTenantError::NotLoaded(*tenant_id, e))?;
if wait_to_become_active {
tenant
.wait_to_become_active()
.await
.map_err(|e| GetTenantError::NotActivated(*tenant_id, e))?;
}
Ok(tenant)
}
async fn get(
&self,
tenant_id: &TenantId,
wait_to_become_active: bool,
) -> Result<&Arc<Tenant>, GetTenantError> {
let tenant = self
.map
.get(tenant_id)
.ok_or(GetTenantError::NotFound(*tenant_id))?;
tenant
.get_or_try_init(|| self.try_load_tenant(tenant_id, wait_to_become_active))
.await
}
}
/// The tenants known to the pageserver.
/// The enum variants are used to distinguish the different states that the pageserver can be in.
enum TenantsMap {
@@ -36,23 +96,31 @@ enum TenantsMap {
Initializing,
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
/// New tenants can be added using [`tenant_map_insert`].
Open(HashMap<TenantId, Arc<Tenant>>),
Open(LazyTenantsMap),
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
/// Existing tenants are still accessible, but no new tenants can be created.
ShuttingDown(HashMap<TenantId, Arc<Tenant>>),
}
impl TenantsMap {
fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
async fn get(
&self,
tenant_id: &TenantId,
wait_to_become_active: bool,
) -> Result<&Arc<Tenant>, GetTenantError> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id),
TenantsMap::Initializing => Err(GetTenantError::NotFound(*tenant_id)),
TenantsMap::Open(m) => m.get(tenant_id, wait_to_become_active).await,
TenantsMap::ShuttingDown(m) => {
m.get(tenant_id).ok_or(GetTenantError::NotFound(*tenant_id))
}
}
}
fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
fn remove(&mut self, tenant_id: &TenantId) -> bool {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
TenantsMap::Initializing => false,
TenantsMap::Open(m) => m.map.remove(tenant_id).is_some(),
TenantsMap::ShuttingDown(m) => m.remove(tenant_id).is_some(),
}
}
}
@@ -67,7 +135,7 @@ pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: (completion::Completion, completion::Barrier),
_init_done: (completion::Completion, completion::Barrier),
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
let tenants_dir = conf.tenants_path();
@@ -78,8 +146,6 @@ pub async fn init_tenant_mgr(
.await
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
loop {
match dir_entries.next_entry().await {
Ok(None) => break,
@@ -119,21 +185,15 @@ pub async fn init_tenant_mgr(
continue;
}
match schedule_local_tenant_processing(
conf,
&tenant_dir_path,
broker_client.clone(),
remote_storage.clone(),
Some(init_done.clone()),
&ctx,
) {
Ok(tenant) => {
tenants.insert(tenant.tenant_id(), tenant);
}
Err(e) => {
error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}");
}
}
let tenant_id = tenant_dir_path
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TenantId>()
.with_context(|| {
format!("Could not parse tenant id out of the tenant dir name in path {tenant_dir_path:?}")
})?;
tenants.insert(tenant_id, OnceCell::new());
}
}
Err(e) => {
@@ -151,7 +211,12 @@ pub async fn init_tenant_mgr(
let mut tenants_map = TENANTS.write().await;
assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
*tenants_map = TenantsMap::Open(tenants);
*tenants_map = TenantsMap::Open(LazyTenantsMap {
conf,
broker_client,
remote_storage,
map: tenants,
});
Ok(())
}
@@ -247,10 +312,17 @@ pub async fn shutdown_all_tenants() {
info!("tenants map is empty");
return;
}
TenantsMap::Open(tenants) => {
let tenants_clone = tenants.clone();
*m = TenantsMap::ShuttingDown(std::mem::take(tenants));
tenants_clone
TenantsMap::Open(lazy) => {
let online_tenants: Vec<Arc<Tenant>> = lazy
.map
.iter()
.filter_map(|(_, v)| v.get())
.cloned()
.collect();
*m = TenantsMap::ShuttingDown(HashMap::from_iter(
online_tenants.iter().map(|t| (t.tenant_id(), t.clone())),
));
online_tenants
}
TenantsMap::ShuttingDown(_) => {
error!("already shutting down, this function isn't supposed to be called more than once");
@@ -277,7 +349,8 @@ pub async fn shutdown_all_tenants() {
// It's mesed up.
let mut join_set = JoinSet::new();
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
for (tenant_id, tenant) in tenants_to_shut_down {
for tenant in tenants_to_shut_down {
let tenant_id = tenant.tenant_id();
join_set.spawn(
async move {
match tenant.set_stopping().await {
@@ -421,6 +494,10 @@ pub enum GetTenantError {
NotFound(TenantId),
#[error("Tenant {0} is not active")]
NotActive(TenantId),
#[error("Tenant {0} can not be loaded: {1}")]
NotLoaded(TenantId, anyhow::Error),
#[error("Tenant {0} can not be activated: {1}")]
NotActivated(TenantId, WaitToBecomeActiveError),
}
/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
@@ -430,9 +507,7 @@ pub async fn get_tenant(
active_only: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
let m = TENANTS.read().await;
let tenant = m
.get(&tenant_id)
.ok_or(GetTenantError::NotFound(tenant_id))?;
let tenant = m.get(&tenant_id, active_only).await?;
if active_only && !tenant.is_active() {
Err(GetTenantError::NotActive(tenant_id))
} else {
@@ -558,15 +633,62 @@ pub enum TenantMapListError {
///
/// Get list of tenants, for the mgmt API
///
// Many tests are using list_tenants to check if tenant is in active state
// With lazy loading tenants are initially in NotLoaded state.
// To make all this tests pass, lets force loading of tenants if testing feature is specified.
// Alternatively it is possible to pass extra parameter to list_tenants to choose between
// eager and lazy loading of tenants.
#[cfg(feature = "testing")]
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
let tenants = TENANTS.read().await;
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
};
Ok(m.iter()
.map(|(id, tenant)| (*id, tenant.current_state()))
.collect())
match &*tenants {
TenantsMap::Initializing => Err(TenantMapListError::Initializing),
// Do not copy paste futures::future::join_all usage to production code with many tenants
// Use a JoinSet instead
TenantsMap::Open(m) => Ok(futures::future::join_all(m.map.iter().map(
|(id, tenant)| async {
(
*id,
tenant
.get_or_try_init(|| m.try_load_tenant(id, false))
.await
.map_or(
TenantState::broken_from_reason("Failed to load tenant".to_string()),
|t| t.current_state(),
),
)
},
))
.await),
TenantsMap::ShuttingDown(m) => Ok(m
.iter()
.map(|(id, tenant)| (*id, tenant.current_state()))
.collect()),
}
}
#[cfg(not(feature = "testing"))]
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
let tenants = TENANTS.read().await;
match &*tenants {
TenantsMap::Initializing => Err(TenantMapListError::Initializing),
TenantsMap::Open(m) => Ok(m
.map
.iter()
.map(|(id, tenant)| {
(
*id,
tenant
.get()
.map_or(TenantState::NotLoaded, |tenant| tenant.current_state()),
)
})
.collect()),
TenantsMap::ShuttingDown(m) => Ok(m
.iter()
.map(|(id, tenant)| (*id, tenant.current_state()))
.collect()),
}
}
/// Execute Attach mgmt API command.
@@ -634,22 +756,23 @@ where
F: FnOnce() -> anyhow::Result<Arc<Tenant>>,
{
let mut guard = TENANTS.write().await;
let m = match &mut *guard {
TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing),
TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown),
TenantsMap::Open(m) => m,
};
match m.entry(tenant_id) {
hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
tenant_id,
e.get().current_state(),
)),
hash_map::Entry::Vacant(v) => match insert_fn() {
Ok(tenant) => {
v.insert(tenant.clone());
Ok(tenant)
}
Err(e) => Err(TenantMapInsertError::Closure(e)),
match &mut *guard {
TenantsMap::Initializing => Err(TenantMapInsertError::StillInitializing),
TenantsMap::ShuttingDown(_) => Err(TenantMapInsertError::ShuttingDown),
TenantsMap::Open(m) => match m.map.entry(tenant_id) {
hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
tenant_id,
e.get()
.get()
.map_or(TenantState::NotLoaded, |tenant| tenant.current_state()),
)),
hash_map::Entry::Vacant(v) => match insert_fn() {
Ok(tenant) => {
v.insert(OnceCell::new_with(Some(tenant.clone())));
Ok(tenant)
}
Err(e) => Err(TenantMapInsertError::Closure(e)),
},
},
}
}
@@ -671,8 +794,8 @@ where
// avoid holding the lock for the entire process.
{
let tenants_accessor = TENANTS.write().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => {
match tenants_accessor.get(&tenant_id, false).await {
Ok(tenant) => {
let tenant = Arc::clone(tenant);
// don't hold TENANTS lock while set_stopping waits for activation to finish
drop(tenants_accessor);
@@ -689,7 +812,7 @@ where
}
}
}
None => return Err(TenantStateError::NotFound(tenant_id)),
Err(_) => return Err(TenantStateError::NotFound(tenant_id)),
}
}
@@ -704,18 +827,18 @@ where
{
Ok(hook_value) => {
let mut tenants_accessor = TENANTS.write().await;
if tenants_accessor.remove(&tenant_id).is_none() {
if !tenants_accessor.remove(&tenant_id) {
warn!("Tenant {tenant_id} got removed from memory before operation finished");
}
Ok(hook_value)
}
Err(e) => {
let tenants_accessor = TENANTS.read().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => {
match tenants_accessor.get(&tenant_id, false).await {
Ok(tenant) => {
tenant.set_broken(e.to_string()).await;
}
None => {
Err(_) => {
warn!("Tenant {tenant_id} got removed from memory");
return Err(TenantStateError::NotFound(tenant_id));
}
@@ -736,13 +859,7 @@ pub async fn immediate_gc(
gc_req: TimelineGcRequest,
ctx: &RequestContext,
) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
let guard = TENANTS.read().await;
let tenant = guard
.get(&tenant_id)
.map(Arc::clone)
.with_context(|| format!("tenant {tenant_id}"))
.map_err(ApiError::NotFound)?;
let tenant = get_tenant(tenant_id, false).await?;
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
// Use tenant's pitr setting
let pitr = tenant.get_pitr_interval();
@@ -772,10 +889,6 @@ pub async fn immediate_gc(
Ok(())
}
);
// drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
drop(guard);
Ok(wait_task_done)
}
@@ -788,7 +901,8 @@ pub async fn immediate_compact(
let guard = TENANTS.read().await;
let tenant = guard
.get(&tenant_id)
.get(&tenant_id, true)
.await
.map(Arc::clone)
.with_context(|| format!("tenant {tenant_id}"))
.map_err(ApiError::NotFound)?;

View File

@@ -21,6 +21,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
".*will not become active. Current state: Broken.*",
".*failed to load metadata.*",
".*load failed.*load local timeline.*",
".*load failed, setting tenant state to Broken.*",
]
)