mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-06 01:40:37 +00:00
Compare commits
13 Commits
split-prox
...
lazy_tenan
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4015f5d952 | ||
|
|
f5e0a63041 | ||
|
|
012539a0e7 | ||
|
|
41f2db3a58 | ||
|
|
1178dbc614 | ||
|
|
e4b345a3c1 | ||
|
|
963690e77a | ||
|
|
15fae34751 | ||
|
|
a3b7d068c1 | ||
|
|
198d256b7d | ||
|
|
3e044a1405 | ||
|
|
36eb1c83f3 | ||
|
|
fc27d871ed |
@@ -54,6 +54,10 @@ use bytes::{BufMut, Bytes, BytesMut};
|
|||||||
)]
|
)]
|
||||||
#[serde(tag = "slug", content = "data")]
|
#[serde(tag = "slug", content = "data")]
|
||||||
pub enum TenantState {
|
pub enum TenantState {
|
||||||
|
/// This tenant is not yet loaded. This state is not actually used internally because not loaded tenants are handled using OnceSet.
|
||||||
|
/// This value is needed only for reporting state of such tenants by list_tenants() function
|
||||||
|
///
|
||||||
|
NotLoaded,
|
||||||
/// This tenant is being loaded from local disk.
|
/// This tenant is being loaded from local disk.
|
||||||
///
|
///
|
||||||
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
|
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
|
||||||
@@ -104,7 +108,7 @@ impl TenantState {
|
|||||||
Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
|
Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
|
||||||
// tenant mgr startup distinguishes attaching from loading via marker file.
|
// tenant mgr startup distinguishes attaching from loading via marker file.
|
||||||
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
|
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
|
||||||
Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
|
Self::NotLoaded | Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
|
||||||
// We only reach Active after successful load / attach.
|
// We only reach Active after successful load / attach.
|
||||||
// So, call atttachment status Attached.
|
// So, call atttachment status Attached.
|
||||||
Self::Active => Attached,
|
Self::Active => Attached,
|
||||||
@@ -907,6 +911,7 @@ mod tests {
|
|||||||
fn tenantstatus_activating_strum() {
|
fn tenantstatus_activating_strum() {
|
||||||
// tests added, because we use these for metrics
|
// tests added, because we use these for metrics
|
||||||
let examples = [
|
let examples = [
|
||||||
|
(line!(), TenantState::NotLoaded, "NotLoaded"),
|
||||||
(line!(), TenantState::Loading, "Loading"),
|
(line!(), TenantState::Loading, "Loading"),
|
||||||
(line!(), TenantState::Attaching, "Attaching"),
|
(line!(), TenantState::Attaching, "Attaching"),
|
||||||
(
|
(
|
||||||
|
|||||||
@@ -159,6 +159,12 @@ impl From<GetTenantError> for ApiError {
|
|||||||
// (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
|
// (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
|
||||||
ApiError::InternalServerError(anyhow::Error::new(e))
|
ApiError::InternalServerError(anyhow::Error::new(e))
|
||||||
}
|
}
|
||||||
|
e @ GetTenantError::NotLoaded(_, _) => {
|
||||||
|
ApiError::InternalServerError(anyhow::Error::new(e))
|
||||||
|
}
|
||||||
|
e @ GetTenantError::NotActivated(_, _) => {
|
||||||
|
ApiError::InternalServerError(anyhow::Error::new(e))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1179,6 +1179,10 @@ async fn get_active_tenant_with_timeout(
|
|||||||
let tenant = match mgr::get_tenant(tenant_id, false).await {
|
let tenant = match mgr::get_tenant(tenant_id, false).await {
|
||||||
Ok(tenant) => tenant,
|
Ok(tenant) => tenant,
|
||||||
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
|
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
|
||||||
|
Err(e @ GetTenantError::NotLoaded(_, _)) => return Err(GetActiveTenantError::NotFound(e)),
|
||||||
|
Err(e @ GetTenantError::NotActivated(_, _)) => {
|
||||||
|
return Err(GetActiveTenantError::NotFound(e))
|
||||||
|
}
|
||||||
Err(GetTenantError::NotActive(_)) => {
|
Err(GetTenantError::NotActive(_)) => {
|
||||||
unreachable!("we're calling get_tenant with active=false")
|
unreachable!("we're calling get_tenant with active=false")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -459,7 +459,7 @@ struct RemoteStartupData {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub(crate) enum WaitToBecomeActiveError {
|
pub enum WaitToBecomeActiveError {
|
||||||
WillNotBecomeActive {
|
WillNotBecomeActive {
|
||||||
tenant_id: TenantId,
|
tenant_id: TenantId,
|
||||||
state: TenantState,
|
state: TenantState,
|
||||||
@@ -1694,7 +1694,7 @@ impl Tenant {
|
|||||||
self.state.send_modify(|current_state| {
|
self.state.send_modify(|current_state| {
|
||||||
use pageserver_api::models::ActivatingFrom;
|
use pageserver_api::models::ActivatingFrom;
|
||||||
match &*current_state {
|
match &*current_state {
|
||||||
TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping => {
|
TenantState::NotLoaded | TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping => {
|
||||||
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
|
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
|
||||||
}
|
}
|
||||||
TenantState::Loading => {
|
TenantState::Loading => {
|
||||||
@@ -1761,7 +1761,10 @@ impl Tenant {
|
|||||||
|
|
||||||
// cannot stop before we're done activating, so wait out until we're done activating
|
// cannot stop before we're done activating, so wait out until we're done activating
|
||||||
rx.wait_for(|state| match state {
|
rx.wait_for(|state| match state {
|
||||||
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
TenantState::NotLoaded
|
||||||
|
| TenantState::Activating(_)
|
||||||
|
| TenantState::Loading
|
||||||
|
| TenantState::Attaching => {
|
||||||
info!(
|
info!(
|
||||||
"waiting for {} to turn Active|Broken|Stopping",
|
"waiting for {} to turn Active|Broken|Stopping",
|
||||||
<&'static str>::from(state)
|
<&'static str>::from(state)
|
||||||
@@ -1776,7 +1779,7 @@ impl Tenant {
|
|||||||
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
|
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
|
||||||
let mut err = None;
|
let mut err = None;
|
||||||
let stopping = self.state.send_if_modified(|current_state| match current_state {
|
let stopping = self.state.send_if_modified(|current_state| match current_state {
|
||||||
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
TenantState::NotLoaded | TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
||||||
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
|
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
|
||||||
}
|
}
|
||||||
TenantState::Active => {
|
TenantState::Active => {
|
||||||
@@ -1834,7 +1837,10 @@ impl Tenant {
|
|||||||
// The load & attach routines own the tenant state until it has reached `Active`.
|
// The load & attach routines own the tenant state until it has reached `Active`.
|
||||||
// So, wait until it's done.
|
// So, wait until it's done.
|
||||||
rx.wait_for(|state| match state {
|
rx.wait_for(|state| match state {
|
||||||
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
TenantState::NotLoaded
|
||||||
|
| TenantState::Activating(_)
|
||||||
|
| TenantState::Loading
|
||||||
|
| TenantState::Attaching => {
|
||||||
info!(
|
info!(
|
||||||
"waiting for {} to turn Active|Broken|Stopping",
|
"waiting for {} to turn Active|Broken|Stopping",
|
||||||
<&'static str>::from(state)
|
<&'static str>::from(state)
|
||||||
@@ -1849,7 +1855,7 @@ impl Tenant {
|
|||||||
// we now know we're done activating, let's see whether this task is the winner to transition into Broken
|
// we now know we're done activating, let's see whether this task is the winner to transition into Broken
|
||||||
self.state.send_modify(|current_state| {
|
self.state.send_modify(|current_state| {
|
||||||
match *current_state {
|
match *current_state {
|
||||||
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
TenantState::NotLoaded | TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
||||||
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
|
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
|
||||||
}
|
}
|
||||||
TenantState::Active => {
|
TenantState::Active => {
|
||||||
@@ -1884,7 +1890,10 @@ impl Tenant {
|
|||||||
loop {
|
loop {
|
||||||
let current_state = receiver.borrow_and_update().clone();
|
let current_state = receiver.borrow_and_update().clone();
|
||||||
match current_state {
|
match current_state {
|
||||||
TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
|
TenantState::NotLoaded
|
||||||
|
| TenantState::Loading
|
||||||
|
| TenantState::Attaching
|
||||||
|
| TenantState::Activating(_) => {
|
||||||
// in these states, there's a chance that we can reach ::Active
|
// in these states, there's a chance that we can reach ::Active
|
||||||
receiver.changed().await.map_err(
|
receiver.changed().await.map_err(
|
||||||
|_e: tokio::sync::watch::error::RecvError| {
|
|_e: tokio::sync::watch::error::RecvError| {
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use tokio::fs;
|
|||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::{OnceCell, RwLock};
|
||||||
use tokio::task::JoinSet;
|
use tokio::task::JoinSet;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
|
|
||||||
@@ -22,6 +22,7 @@ use crate::task_mgr::{self, TaskKind};
|
|||||||
use crate::tenant::config::TenantConfOpt;
|
use crate::tenant::config::TenantConfOpt;
|
||||||
use crate::tenant::{
|
use crate::tenant::{
|
||||||
create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState,
|
create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState,
|
||||||
|
WaitToBecomeActiveError,
|
||||||
};
|
};
|
||||||
use crate::IGNORED_TENANT_FILE_NAME;
|
use crate::IGNORED_TENANT_FILE_NAME;
|
||||||
|
|
||||||
@@ -29,6 +30,65 @@ use utils::completion;
|
|||||||
use utils::fs_ext::PathExt;
|
use utils::fs_ext::PathExt;
|
||||||
use utils::id::{TenantId, TimelineId};
|
use utils::id::{TenantId, TimelineId};
|
||||||
|
|
||||||
|
struct LazyTenantsMap {
|
||||||
|
conf: &'static PageServerConf,
|
||||||
|
map: HashMap<TenantId, OnceCell<Arc<Tenant>>>,
|
||||||
|
broker_client: storage_broker::BrokerClientChannel,
|
||||||
|
remote_storage: Option<GenericRemoteStorage>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LazyTenantsMap {
|
||||||
|
fn load_tenant(&self, tenant_id: &TenantId) -> anyhow::Result<Arc<Tenant>> {
|
||||||
|
let tenant_path = self.conf.tenant_path(tenant_id);
|
||||||
|
let tenant_ignore_mark = self.conf.tenant_ignore_mark_file_path(*tenant_id);
|
||||||
|
if tenant_ignore_mark.exists() {
|
||||||
|
std::fs::remove_file(&tenant_ignore_mark)
|
||||||
|
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
|
||||||
|
}
|
||||||
|
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
|
||||||
|
schedule_local_tenant_processing(
|
||||||
|
self.conf,
|
||||||
|
&tenant_path,
|
||||||
|
self.broker_client.clone(),
|
||||||
|
self.remote_storage.clone(),
|
||||||
|
None,
|
||||||
|
&ctx,
|
||||||
|
)
|
||||||
|
.with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn try_load_tenant(
|
||||||
|
&self,
|
||||||
|
tenant_id: &TenantId,
|
||||||
|
wait_to_become_active: bool,
|
||||||
|
) -> Result<Arc<Tenant>, GetTenantError> {
|
||||||
|
let tenant = self
|
||||||
|
.load_tenant(tenant_id)
|
||||||
|
.map_err(|e| GetTenantError::NotLoaded(*tenant_id, e))?;
|
||||||
|
if wait_to_become_active {
|
||||||
|
tenant
|
||||||
|
.wait_to_become_active()
|
||||||
|
.await
|
||||||
|
.map_err(|e| GetTenantError::NotActivated(*tenant_id, e))?;
|
||||||
|
}
|
||||||
|
Ok(tenant)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get(
|
||||||
|
&self,
|
||||||
|
tenant_id: &TenantId,
|
||||||
|
wait_to_become_active: bool,
|
||||||
|
) -> Result<&Arc<Tenant>, GetTenantError> {
|
||||||
|
let tenant = self
|
||||||
|
.map
|
||||||
|
.get(tenant_id)
|
||||||
|
.ok_or(GetTenantError::NotFound(*tenant_id))?;
|
||||||
|
tenant
|
||||||
|
.get_or_try_init(|| self.try_load_tenant(tenant_id, wait_to_become_active))
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// The tenants known to the pageserver.
|
/// The tenants known to the pageserver.
|
||||||
/// The enum variants are used to distinguish the different states that the pageserver can be in.
|
/// The enum variants are used to distinguish the different states that the pageserver can be in.
|
||||||
enum TenantsMap {
|
enum TenantsMap {
|
||||||
@@ -36,23 +96,31 @@ enum TenantsMap {
|
|||||||
Initializing,
|
Initializing,
|
||||||
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
|
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
|
||||||
/// New tenants can be added using [`tenant_map_insert`].
|
/// New tenants can be added using [`tenant_map_insert`].
|
||||||
Open(HashMap<TenantId, Arc<Tenant>>),
|
Open(LazyTenantsMap),
|
||||||
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
|
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
|
||||||
/// Existing tenants are still accessible, but no new tenants can be created.
|
/// Existing tenants are still accessible, but no new tenants can be created.
|
||||||
ShuttingDown(HashMap<TenantId, Arc<Tenant>>),
|
ShuttingDown(HashMap<TenantId, Arc<Tenant>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TenantsMap {
|
impl TenantsMap {
|
||||||
fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
|
async fn get(
|
||||||
|
&self,
|
||||||
|
tenant_id: &TenantId,
|
||||||
|
wait_to_become_active: bool,
|
||||||
|
) -> Result<&Arc<Tenant>, GetTenantError> {
|
||||||
match self {
|
match self {
|
||||||
TenantsMap::Initializing => None,
|
TenantsMap::Initializing => Err(GetTenantError::NotFound(*tenant_id)),
|
||||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id),
|
TenantsMap::Open(m) => m.get(tenant_id, wait_to_become_active).await,
|
||||||
|
TenantsMap::ShuttingDown(m) => {
|
||||||
|
m.get(tenant_id).ok_or(GetTenantError::NotFound(*tenant_id))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
|
fn remove(&mut self, tenant_id: &TenantId) -> bool {
|
||||||
match self {
|
match self {
|
||||||
TenantsMap::Initializing => None,
|
TenantsMap::Initializing => false,
|
||||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
|
TenantsMap::Open(m) => m.map.remove(tenant_id).is_some(),
|
||||||
|
TenantsMap::ShuttingDown(m) => m.remove(tenant_id).is_some(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -67,7 +135,7 @@ pub async fn init_tenant_mgr(
|
|||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
broker_client: storage_broker::BrokerClientChannel,
|
broker_client: storage_broker::BrokerClientChannel,
|
||||||
remote_storage: Option<GenericRemoteStorage>,
|
remote_storage: Option<GenericRemoteStorage>,
|
||||||
init_done: (completion::Completion, completion::Barrier),
|
_init_done: (completion::Completion, completion::Barrier),
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// Scan local filesystem for attached tenants
|
// Scan local filesystem for attached tenants
|
||||||
let tenants_dir = conf.tenants_path();
|
let tenants_dir = conf.tenants_path();
|
||||||
@@ -78,8 +146,6 @@ pub async fn init_tenant_mgr(
|
|||||||
.await
|
.await
|
||||||
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
|
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
|
||||||
|
|
||||||
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match dir_entries.next_entry().await {
|
match dir_entries.next_entry().await {
|
||||||
Ok(None) => break,
|
Ok(None) => break,
|
||||||
@@ -119,21 +185,15 @@ pub async fn init_tenant_mgr(
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
match schedule_local_tenant_processing(
|
let tenant_id = tenant_dir_path
|
||||||
conf,
|
.file_name()
|
||||||
&tenant_dir_path,
|
.and_then(OsStr::to_str)
|
||||||
broker_client.clone(),
|
.unwrap_or_default()
|
||||||
remote_storage.clone(),
|
.parse::<TenantId>()
|
||||||
Some(init_done.clone()),
|
.with_context(|| {
|
||||||
&ctx,
|
format!("Could not parse tenant id out of the tenant dir name in path {tenant_dir_path:?}")
|
||||||
) {
|
})?;
|
||||||
Ok(tenant) => {
|
tenants.insert(tenant_id, OnceCell::new());
|
||||||
tenants.insert(tenant.tenant_id(), tenant);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -151,7 +211,12 @@ pub async fn init_tenant_mgr(
|
|||||||
|
|
||||||
let mut tenants_map = TENANTS.write().await;
|
let mut tenants_map = TENANTS.write().await;
|
||||||
assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
|
assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
|
||||||
*tenants_map = TenantsMap::Open(tenants);
|
*tenants_map = TenantsMap::Open(LazyTenantsMap {
|
||||||
|
conf,
|
||||||
|
broker_client,
|
||||||
|
remote_storage,
|
||||||
|
map: tenants,
|
||||||
|
});
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -247,10 +312,17 @@ pub async fn shutdown_all_tenants() {
|
|||||||
info!("tenants map is empty");
|
info!("tenants map is empty");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
TenantsMap::Open(tenants) => {
|
TenantsMap::Open(lazy) => {
|
||||||
let tenants_clone = tenants.clone();
|
let online_tenants: Vec<Arc<Tenant>> = lazy
|
||||||
*m = TenantsMap::ShuttingDown(std::mem::take(tenants));
|
.map
|
||||||
tenants_clone
|
.iter()
|
||||||
|
.filter_map(|(_, v)| v.get())
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
*m = TenantsMap::ShuttingDown(HashMap::from_iter(
|
||||||
|
online_tenants.iter().map(|t| (t.tenant_id(), t.clone())),
|
||||||
|
));
|
||||||
|
online_tenants
|
||||||
}
|
}
|
||||||
TenantsMap::ShuttingDown(_) => {
|
TenantsMap::ShuttingDown(_) => {
|
||||||
error!("already shutting down, this function isn't supposed to be called more than once");
|
error!("already shutting down, this function isn't supposed to be called more than once");
|
||||||
@@ -277,7 +349,8 @@ pub async fn shutdown_all_tenants() {
|
|||||||
// It's mesed up.
|
// It's mesed up.
|
||||||
let mut join_set = JoinSet::new();
|
let mut join_set = JoinSet::new();
|
||||||
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
|
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
|
||||||
for (tenant_id, tenant) in tenants_to_shut_down {
|
for tenant in tenants_to_shut_down {
|
||||||
|
let tenant_id = tenant.tenant_id();
|
||||||
join_set.spawn(
|
join_set.spawn(
|
||||||
async move {
|
async move {
|
||||||
match tenant.set_stopping().await {
|
match tenant.set_stopping().await {
|
||||||
@@ -421,6 +494,10 @@ pub enum GetTenantError {
|
|||||||
NotFound(TenantId),
|
NotFound(TenantId),
|
||||||
#[error("Tenant {0} is not active")]
|
#[error("Tenant {0} is not active")]
|
||||||
NotActive(TenantId),
|
NotActive(TenantId),
|
||||||
|
#[error("Tenant {0} can not be loaded: {1}")]
|
||||||
|
NotLoaded(TenantId, anyhow::Error),
|
||||||
|
#[error("Tenant {0} can not be activated: {1}")]
|
||||||
|
NotActivated(TenantId, WaitToBecomeActiveError),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
|
/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
|
||||||
@@ -430,9 +507,7 @@ pub async fn get_tenant(
|
|||||||
active_only: bool,
|
active_only: bool,
|
||||||
) -> Result<Arc<Tenant>, GetTenantError> {
|
) -> Result<Arc<Tenant>, GetTenantError> {
|
||||||
let m = TENANTS.read().await;
|
let m = TENANTS.read().await;
|
||||||
let tenant = m
|
let tenant = m.get(&tenant_id, active_only).await?;
|
||||||
.get(&tenant_id)
|
|
||||||
.ok_or(GetTenantError::NotFound(tenant_id))?;
|
|
||||||
if active_only && !tenant.is_active() {
|
if active_only && !tenant.is_active() {
|
||||||
Err(GetTenantError::NotActive(tenant_id))
|
Err(GetTenantError::NotActive(tenant_id))
|
||||||
} else {
|
} else {
|
||||||
@@ -558,15 +633,62 @@ pub enum TenantMapListError {
|
|||||||
///
|
///
|
||||||
/// Get list of tenants, for the mgmt API
|
/// Get list of tenants, for the mgmt API
|
||||||
///
|
///
|
||||||
|
// Many tests are using list_tenants to check if tenant is in active state
|
||||||
|
// With lazy loading tenants are initially in NotLoaded state.
|
||||||
|
// To make all this tests pass, lets force loading of tenants if testing feature is specified.
|
||||||
|
// Alternatively it is possible to pass extra parameter to list_tenants to choose between
|
||||||
|
// eager and lazy loading of tenants.
|
||||||
|
#[cfg(feature = "testing")]
|
||||||
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
|
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
|
||||||
let tenants = TENANTS.read().await;
|
let tenants = TENANTS.read().await;
|
||||||
let m = match &*tenants {
|
match &*tenants {
|
||||||
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
|
TenantsMap::Initializing => Err(TenantMapListError::Initializing),
|
||||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
|
// Do not copy paste futures::future::join_all usage to production code with many tenants
|
||||||
};
|
// Use a JoinSet instead
|
||||||
Ok(m.iter()
|
TenantsMap::Open(m) => Ok(futures::future::join_all(m.map.iter().map(
|
||||||
.map(|(id, tenant)| (*id, tenant.current_state()))
|
|(id, tenant)| async {
|
||||||
.collect())
|
(
|
||||||
|
*id,
|
||||||
|
tenant
|
||||||
|
.get_or_try_init(|| m.try_load_tenant(id, false))
|
||||||
|
.await
|
||||||
|
.map_or(
|
||||||
|
TenantState::broken_from_reason("Failed to load tenant".to_string()),
|
||||||
|
|t| t.current_state(),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
},
|
||||||
|
))
|
||||||
|
.await),
|
||||||
|
TenantsMap::ShuttingDown(m) => Ok(m
|
||||||
|
.iter()
|
||||||
|
.map(|(id, tenant)| (*id, tenant.current_state()))
|
||||||
|
.collect()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "testing"))]
|
||||||
|
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
|
||||||
|
let tenants = TENANTS.read().await;
|
||||||
|
match &*tenants {
|
||||||
|
TenantsMap::Initializing => Err(TenantMapListError::Initializing),
|
||||||
|
TenantsMap::Open(m) => Ok(m
|
||||||
|
.map
|
||||||
|
.iter()
|
||||||
|
.map(|(id, tenant)| {
|
||||||
|
(
|
||||||
|
*id,
|
||||||
|
tenant
|
||||||
|
.get()
|
||||||
|
.map_or(TenantState::NotLoaded, |tenant| tenant.current_state()),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect()),
|
||||||
|
TenantsMap::ShuttingDown(m) => Ok(m
|
||||||
|
.iter()
|
||||||
|
.map(|(id, tenant)| (*id, tenant.current_state()))
|
||||||
|
.collect()),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Execute Attach mgmt API command.
|
/// Execute Attach mgmt API command.
|
||||||
@@ -634,22 +756,23 @@ where
|
|||||||
F: FnOnce() -> anyhow::Result<Arc<Tenant>>,
|
F: FnOnce() -> anyhow::Result<Arc<Tenant>>,
|
||||||
{
|
{
|
||||||
let mut guard = TENANTS.write().await;
|
let mut guard = TENANTS.write().await;
|
||||||
let m = match &mut *guard {
|
match &mut *guard {
|
||||||
TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing),
|
TenantsMap::Initializing => Err(TenantMapInsertError::StillInitializing),
|
||||||
TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown),
|
TenantsMap::ShuttingDown(_) => Err(TenantMapInsertError::ShuttingDown),
|
||||||
TenantsMap::Open(m) => m,
|
TenantsMap::Open(m) => match m.map.entry(tenant_id) {
|
||||||
};
|
hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
|
||||||
match m.entry(tenant_id) {
|
tenant_id,
|
||||||
hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
|
e.get()
|
||||||
tenant_id,
|
.get()
|
||||||
e.get().current_state(),
|
.map_or(TenantState::NotLoaded, |tenant| tenant.current_state()),
|
||||||
)),
|
)),
|
||||||
hash_map::Entry::Vacant(v) => match insert_fn() {
|
hash_map::Entry::Vacant(v) => match insert_fn() {
|
||||||
Ok(tenant) => {
|
Ok(tenant) => {
|
||||||
v.insert(tenant.clone());
|
v.insert(OnceCell::new_with(Some(tenant.clone())));
|
||||||
Ok(tenant)
|
Ok(tenant)
|
||||||
}
|
}
|
||||||
Err(e) => Err(TenantMapInsertError::Closure(e)),
|
Err(e) => Err(TenantMapInsertError::Closure(e)),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -671,8 +794,8 @@ where
|
|||||||
// avoid holding the lock for the entire process.
|
// avoid holding the lock for the entire process.
|
||||||
{
|
{
|
||||||
let tenants_accessor = TENANTS.write().await;
|
let tenants_accessor = TENANTS.write().await;
|
||||||
match tenants_accessor.get(&tenant_id) {
|
match tenants_accessor.get(&tenant_id, false).await {
|
||||||
Some(tenant) => {
|
Ok(tenant) => {
|
||||||
let tenant = Arc::clone(tenant);
|
let tenant = Arc::clone(tenant);
|
||||||
// don't hold TENANTS lock while set_stopping waits for activation to finish
|
// don't hold TENANTS lock while set_stopping waits for activation to finish
|
||||||
drop(tenants_accessor);
|
drop(tenants_accessor);
|
||||||
@@ -689,7 +812,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => return Err(TenantStateError::NotFound(tenant_id)),
|
Err(_) => return Err(TenantStateError::NotFound(tenant_id)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -704,18 +827,18 @@ where
|
|||||||
{
|
{
|
||||||
Ok(hook_value) => {
|
Ok(hook_value) => {
|
||||||
let mut tenants_accessor = TENANTS.write().await;
|
let mut tenants_accessor = TENANTS.write().await;
|
||||||
if tenants_accessor.remove(&tenant_id).is_none() {
|
if !tenants_accessor.remove(&tenant_id) {
|
||||||
warn!("Tenant {tenant_id} got removed from memory before operation finished");
|
warn!("Tenant {tenant_id} got removed from memory before operation finished");
|
||||||
}
|
}
|
||||||
Ok(hook_value)
|
Ok(hook_value)
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let tenants_accessor = TENANTS.read().await;
|
let tenants_accessor = TENANTS.read().await;
|
||||||
match tenants_accessor.get(&tenant_id) {
|
match tenants_accessor.get(&tenant_id, false).await {
|
||||||
Some(tenant) => {
|
Ok(tenant) => {
|
||||||
tenant.set_broken(e.to_string()).await;
|
tenant.set_broken(e.to_string()).await;
|
||||||
}
|
}
|
||||||
None => {
|
Err(_) => {
|
||||||
warn!("Tenant {tenant_id} got removed from memory");
|
warn!("Tenant {tenant_id} got removed from memory");
|
||||||
return Err(TenantStateError::NotFound(tenant_id));
|
return Err(TenantStateError::NotFound(tenant_id));
|
||||||
}
|
}
|
||||||
@@ -736,13 +859,7 @@ pub async fn immediate_gc(
|
|||||||
gc_req: TimelineGcRequest,
|
gc_req: TimelineGcRequest,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
|
) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
|
||||||
let guard = TENANTS.read().await;
|
let tenant = get_tenant(tenant_id, false).await?;
|
||||||
let tenant = guard
|
|
||||||
.get(&tenant_id)
|
|
||||||
.map(Arc::clone)
|
|
||||||
.with_context(|| format!("tenant {tenant_id}"))
|
|
||||||
.map_err(ApiError::NotFound)?;
|
|
||||||
|
|
||||||
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
|
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
|
||||||
// Use tenant's pitr setting
|
// Use tenant's pitr setting
|
||||||
let pitr = tenant.get_pitr_interval();
|
let pitr = tenant.get_pitr_interval();
|
||||||
@@ -772,10 +889,6 @@ pub async fn immediate_gc(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
// drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
|
|
||||||
drop(guard);
|
|
||||||
|
|
||||||
Ok(wait_task_done)
|
Ok(wait_task_done)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -788,7 +901,8 @@ pub async fn immediate_compact(
|
|||||||
let guard = TENANTS.read().await;
|
let guard = TENANTS.read().await;
|
||||||
|
|
||||||
let tenant = guard
|
let tenant = guard
|
||||||
.get(&tenant_id)
|
.get(&tenant_id, true)
|
||||||
|
.await
|
||||||
.map(Arc::clone)
|
.map(Arc::clone)
|
||||||
.with_context(|| format!("tenant {tenant_id}"))
|
.with_context(|| format!("tenant {tenant_id}"))
|
||||||
.map_err(ApiError::NotFound)?;
|
.map_err(ApiError::NotFound)?;
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
|||||||
".*will not become active. Current state: Broken.*",
|
".*will not become active. Current state: Broken.*",
|
||||||
".*failed to load metadata.*",
|
".*failed to load metadata.*",
|
||||||
".*load failed.*load local timeline.*",
|
".*load failed.*load local timeline.*",
|
||||||
|
".*load failed, setting tenant state to Broken.*",
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user