Compare commits

..

13 Commits

Author SHA1 Message Date
Konstantin Knizhnik
4015f5d952 Add comment for NotLoaded TenentState 2023-06-30 15:14:34 +03:00
Konstantin Knizhnik
f5e0a63041 Update pageserver/src/tenant/mgr.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-06-29 22:30:20 +03:00
Konstantin Knizhnik
012539a0e7 Update pageserver/src/tenant/mgr.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-06-29 22:26:53 +03:00
Konstantin Knizhnik
41f2db3a58 Add comment about list_tenants() 2023-06-04 14:15:48 +03:00
Konstantin Knizhnik
1178dbc614 Add GetTenantError::NotActivated 2023-06-04 09:38:00 +03:00
Konstantin Knizhnik
e4b345a3c1 Wait for tenant activation during lazy loading 2023-06-03 23:06:30 +03:00
Konstantin Knizhnik
963690e77a Fix unit tests 2023-06-03 16:01:13 +03:00
Konstantin Knizhnik
15fae34751 Fix style 2023-06-03 14:35:43 +03:00
Konstantin Knizhnik
a3b7d068c1 Make clippy happy 2023-06-03 08:39:24 +03:00
Konstantin Knizhnik
198d256b7d Load tenants in list_tenants for testing feature 2023-06-02 22:58:41 +03:00
Konstantin Knizhnik
3e044a1405 Add TenantState::NotLoaded 2023-06-02 22:06:39 +03:00
Konstantin Knizhnik
36eb1c83f3 Make clippy happy 2023-06-02 18:17:51 +03:00
Konstantin Knizhnik
fc27d871ed Lazy loadig of tenants on pageserver startup 2023-06-02 15:37:44 +03:00
9 changed files with 373 additions and 155 deletions

1
Cargo.lock generated
View File

@@ -2617,7 +2617,6 @@ dependencies = [
"anyhow",
"byteorder",
"bytes",
"clap 4.3.0",
"const_format",
"enum-map",
"postgres_ffi",

View File

@@ -2,11 +2,12 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufReader, Write};
use std::num::NonZeroU64;
use std::path::PathBuf;
use std::process::{Child, Command};
use std::{io, result};
use anyhow::Context;
use anyhow::{bail, Context};
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
use postgres_backend::AuthType;
use postgres_connection::{parse_host_port, PgConnectionConfig};
@@ -312,8 +313,68 @@ impl PageServerNode {
new_tenant_id: Option<TenantId>,
settings: HashMap<&str, &str>,
) -> anyhow::Result<TenantId> {
let settings = settings.clone();
let config = models::TenantConfig::deserialize_from_settings(settings)?;
let mut settings = settings.clone();
let config = models::TenantConfig {
checkpoint_distance: settings
.remove("checkpoint_distance")
.map(|x| x.parse::<u64>())
.transpose()?,
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
compaction_target_size: settings
.remove("compaction_target_size")
.map(|x| x.parse::<u64>())
.transpose()?,
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
compaction_threshold: settings
.remove("compaction_threshold")
.map(|x| x.parse::<usize>())
.transpose()?,
gc_horizon: settings
.remove("gc_horizon")
.map(|x| x.parse::<u64>())
.transpose()?,
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
image_creation_threshold: settings
.remove("image_creation_threshold")
.map(|x| x.parse::<usize>())
.transpose()?,
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
walreceiver_connect_timeout: settings
.remove("walreceiver_connect_timeout")
.map(|x| x.to_string()),
lagging_wal_timeout: settings
.remove("lagging_wal_timeout")
.map(|x| x.to_string()),
max_lsn_wal_lag: settings
.remove("max_lsn_wal_lag")
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
trace_read_requests: settings
.remove("trace_read_requests")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
eviction_policy: settings
.remove("eviction_policy")
.map(serde_json::from_str)
.transpose()
.context("Failed to parse 'eviction_policy' json")?,
min_resident_size_override: settings
.remove("min_resident_size_override")
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'min_resident_size_override' as integer")?,
evictions_low_residence_duration_metric_threshold: settings
.remove("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
gc_feedback: settings
.remove("gc_feedback")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'gc_feedback' as bool")?,
};
// If tenant ID was not specified, generate one
let new_tenant_id = new_tenant_id.unwrap_or(TenantId::generate());
@@ -322,6 +383,9 @@ impl PageServerNode {
new_tenant_id,
config,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
}
self.http_request(Method::POST, format!("{}/tenant", self.http_base_url))?
.json(&request)
.send()?
@@ -341,9 +405,81 @@ impl PageServerNode {
pub fn tenant_config(
&self,
tenant_id: TenantId,
settings: HashMap<&str, &str>,
mut settings: HashMap<&str, &str>,
) -> anyhow::Result<()> {
let config = models::TenantConfig::deserialize_from_settings(settings)?;
let config = {
// Braces to make the diff easier to read
models::TenantConfig {
checkpoint_distance: settings
.remove("checkpoint_distance")
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'checkpoint_distance' as an integer")?,
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
compaction_target_size: settings
.remove("compaction_target_size")
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'compaction_target_size' as an integer")?,
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
compaction_threshold: settings
.remove("compaction_threshold")
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'compaction_threshold' as an integer")?,
gc_horizon: settings
.remove("gc_horizon")
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'gc_horizon' as an integer")?,
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
image_creation_threshold: settings
.remove("image_creation_threshold")
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'image_creation_threshold' as non zero integer")?,
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
walreceiver_connect_timeout: settings
.remove("walreceiver_connect_timeout")
.map(|x| x.to_string()),
lagging_wal_timeout: settings
.remove("lagging_wal_timeout")
.map(|x| x.to_string()),
max_lsn_wal_lag: settings
.remove("max_lsn_wal_lag")
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
trace_read_requests: settings
.remove("trace_read_requests")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
eviction_policy: settings
.remove("eviction_policy")
.map(serde_json::from_str)
.transpose()
.context("Failed to parse 'eviction_policy' json")?,
min_resident_size_override: settings
.remove("min_resident_size_override")
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'min_resident_size_override' as an integer")?,
evictions_low_residence_duration_metric_threshold: settings
.remove("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
gc_feedback: settings
.remove("gc_feedback")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'gc_feedback' as bool")?,
}
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
}
self.http_request(Method::PUT, format!("{}/tenant/config", self.http_base_url))?
.json(&models::TenantConfigRequest { tenant_id, config })
.send()?

View File

@@ -5,7 +5,6 @@ edition.workspace = true
license.workspace = true
[dependencies]
clap.workspace = true
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true

View File

@@ -5,7 +5,6 @@ use std::{
};
use byteorder::{BigEndian, ReadBytesExt};
use clap::Parser;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use strum_macros;
@@ -55,6 +54,10 @@ use bytes::{BufMut, Bytes, BytesMut};
)]
#[serde(tag = "slug", content = "data")]
pub enum TenantState {
/// This tenant is not yet loaded. This state is not actually used internally because not loaded tenants are handled using OnceSet.
/// This value is needed only for reporting state of such tenants by list_tenants() function
///
NotLoaded,
/// This tenant is being loaded from local disk.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
@@ -105,7 +108,7 @@ impl TenantState {
Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
// tenant mgr startup distinguishes attaching from loading via marker file.
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
Self::NotLoaded | Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
// We only reach Active after successful load / attach.
// So, call atttachment status Attached.
Self::Active => Attached,
@@ -202,74 +205,31 @@ impl std::ops::Deref for TenantCreateRequest {
}
}
#[derive(Serialize, Deserialize, Debug, Default, clap::Parser)]
#[clap(rename_all = "snake_case")]
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct TenantConfig {
#[clap(long)]
pub checkpoint_distance: Option<u64>,
#[clap(long)]
pub checkpoint_timeout: Option<String>,
#[clap(long)]
pub compaction_target_size: Option<u64>,
#[clap(long)]
pub compaction_period: Option<String>,
#[clap(long)]
pub compaction_threshold: Option<usize>,
#[clap(long)]
pub gc_horizon: Option<u64>,
#[clap(long)]
pub gc_period: Option<String>,
#[clap(long)]
pub image_creation_threshold: Option<usize>,
#[clap(long)]
pub pitr_interval: Option<String>,
#[clap(long)]
pub walreceiver_connect_timeout: Option<String>,
#[clap(long)]
pub lagging_wal_timeout: Option<String>,
#[clap(long)]
pub max_lsn_wal_lag: Option<NonZeroU64>,
#[clap(long)]
pub trace_read_requests: Option<bool>,
// We defer the parsing of the eviction_policy field to the request handler.
// Otherwise we'd have to move the types for eviction policy into this package.
// We might do that once the eviction feature has stabilizied.
// For now, this field is not even documented in the openapi_spec.yml.
#[clap(long, value_parser = parse_json)]
pub eviction_policy: Option<serde_json::Value>,
#[clap(long)]
pub min_resident_size_override: Option<u64>,
#[clap(long)]
pub evictions_low_residence_duration_metric_threshold: Option<String>,
pub gc_feedback: Option<bool>,
}
fn parse_json(s: &str) -> Result<serde_json::Value, serde_json::Error> {
serde_json::from_str(s)
}
impl TenantConfig {
pub fn deserialize_from_settings(settings: HashMap<&str, &str>) -> Result<Self, anyhow::Error> {
// Here we are using `clap` to parse the settings. This is not ideal, but it's the easiest
// way to simplify th code. To convert settings into a list of command line arguments, we
// need the program name argv0, each key into a long-form option, and each value proceeding it.
let config = TenantConfig::try_parse_from(
std::iter::once("argv0".to_string()).chain(
settings
.iter()
.flat_map(|(k, v)| [format!("--{k}"), v.to_string()]),
),
)
.map_err(|e| {
anyhow::anyhow!(
"failed to parse: {}",
e.to_string().split('\n').next().unwrap_or_default()
) // only get the first line as other lines in clap errors are not useful
})?;
Ok(config)
}
}
#[serde_as]
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
@@ -917,28 +877,17 @@ mod tests {
err
);
let config = HashMap::from_iter(std::iter::once(("unknown_field", "unknown_value")));
let err = TenantConfig::deserialize_from_settings(config).unwrap_err();
let attach_request = json!({
"config": {
"unknown_field": "unknown_value".to_string(),
},
});
let err = serde_json::from_value::<TenantAttachRequest>(attach_request).unwrap_err();
assert!(
err.to_string()
.contains("unexpected argument '--unknown_field' found"),
err.to_string().contains("unknown field `unknown_field`"),
"expect unknown field `unknown_field` error, got: {}",
err
);
let config = HashMap::from_iter(std::iter::once(("checkpoint_distance", "not_a_number")));
let err = TenantConfig::deserialize_from_settings(config).unwrap_err();
assert!(
err.to_string().contains("invalid digit found in string") && err.to_string().contains("checkpoint_distance"),
"expect error to contain both 'invalid digit found in string' and the field 'checkpoint_distance', got: {}",
err
);
}
#[test]
fn test_parse_json_field() {
let config = vec![("eviction_policy", "{\"kind\": \"NoEviction\"}")];
TenantConfig::deserialize_from_settings(config.into_iter().collect()).unwrap();
}
#[test]
@@ -962,6 +911,7 @@ mod tests {
fn tenantstatus_activating_strum() {
// tests added, because we use these for metrics
let examples = [
(line!(), TenantState::NotLoaded, "NotLoaded"),
(line!(), TenantState::Loading, "Loading"),
(line!(), TenantState::Attaching, "Attaching"),
(

View File

@@ -159,6 +159,12 @@ impl From<GetTenantError> for ApiError {
// (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
ApiError::InternalServerError(anyhow::Error::new(e))
}
e @ GetTenantError::NotLoaded(_, _) => {
ApiError::InternalServerError(anyhow::Error::new(e))
}
e @ GetTenantError::NotActivated(_, _) => {
ApiError::InternalServerError(anyhow::Error::new(e))
}
}
}
}

View File

@@ -1179,6 +1179,10 @@ async fn get_active_tenant_with_timeout(
let tenant = match mgr::get_tenant(tenant_id, false).await {
Ok(tenant) => tenant,
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
Err(e @ GetTenantError::NotLoaded(_, _)) => return Err(GetActiveTenantError::NotFound(e)),
Err(e @ GetTenantError::NotActivated(_, _)) => {
return Err(GetActiveTenantError::NotFound(e))
}
Err(GetTenantError::NotActive(_)) => {
unreachable!("we're calling get_tenant with active=false")
}

View File

@@ -459,7 +459,7 @@ struct RemoteStartupData {
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum WaitToBecomeActiveError {
pub enum WaitToBecomeActiveError {
WillNotBecomeActive {
tenant_id: TenantId,
state: TenantState,
@@ -1694,7 +1694,7 @@ impl Tenant {
self.state.send_modify(|current_state| {
use pageserver_api::models::ActivatingFrom;
match &*current_state {
TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping => {
TenantState::NotLoaded | TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping => {
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
}
TenantState::Loading => {
@@ -1761,7 +1761,10 @@ impl Tenant {
// cannot stop before we're done activating, so wait out until we're done activating
rx.wait_for(|state| match state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
TenantState::NotLoaded
| TenantState::Activating(_)
| TenantState::Loading
| TenantState::Attaching => {
info!(
"waiting for {} to turn Active|Broken|Stopping",
<&'static str>::from(state)
@@ -1776,7 +1779,7 @@ impl Tenant {
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
let mut err = None;
let stopping = self.state.send_if_modified(|current_state| match current_state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
TenantState::NotLoaded | TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Active => {
@@ -1834,7 +1837,10 @@ impl Tenant {
// The load & attach routines own the tenant state until it has reached `Active`.
// So, wait until it's done.
rx.wait_for(|state| match state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
TenantState::NotLoaded
| TenantState::Activating(_)
| TenantState::Loading
| TenantState::Attaching => {
info!(
"waiting for {} to turn Active|Broken|Stopping",
<&'static str>::from(state)
@@ -1849,7 +1855,7 @@ impl Tenant {
// we now know we're done activating, let's see whether this task is the winner to transition into Broken
self.state.send_modify(|current_state| {
match *current_state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
TenantState::NotLoaded | TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Active => {
@@ -1884,7 +1890,10 @@ impl Tenant {
loop {
let current_state = receiver.borrow_and_update().clone();
match current_state {
TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
TenantState::NotLoaded
| TenantState::Loading
| TenantState::Attaching
| TenantState::Activating(_) => {
// in these states, there's a chance that we can reach ::Active
receiver.changed().await.map_err(
|_e: tokio::sync::watch::error::RecvError| {

View File

@@ -9,7 +9,7 @@ use tokio::fs;
use anyhow::Context;
use once_cell::sync::Lazy;
use tokio::sync::RwLock;
use tokio::sync::{OnceCell, RwLock};
use tokio::task::JoinSet;
use tracing::*;
@@ -22,6 +22,7 @@ use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{
create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState,
WaitToBecomeActiveError,
};
use crate::IGNORED_TENANT_FILE_NAME;
@@ -29,6 +30,65 @@ use utils::completion;
use utils::fs_ext::PathExt;
use utils::id::{TenantId, TimelineId};
struct LazyTenantsMap {
conf: &'static PageServerConf,
map: HashMap<TenantId, OnceCell<Arc<Tenant>>>,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
}
impl LazyTenantsMap {
fn load_tenant(&self, tenant_id: &TenantId) -> anyhow::Result<Arc<Tenant>> {
let tenant_path = self.conf.tenant_path(tenant_id);
let tenant_ignore_mark = self.conf.tenant_ignore_mark_file_path(*tenant_id);
if tenant_ignore_mark.exists() {
std::fs::remove_file(&tenant_ignore_mark)
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
}
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
schedule_local_tenant_processing(
self.conf,
&tenant_path,
self.broker_client.clone(),
self.remote_storage.clone(),
None,
&ctx,
)
.with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))
}
async fn try_load_tenant(
&self,
tenant_id: &TenantId,
wait_to_become_active: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
let tenant = self
.load_tenant(tenant_id)
.map_err(|e| GetTenantError::NotLoaded(*tenant_id, e))?;
if wait_to_become_active {
tenant
.wait_to_become_active()
.await
.map_err(|e| GetTenantError::NotActivated(*tenant_id, e))?;
}
Ok(tenant)
}
async fn get(
&self,
tenant_id: &TenantId,
wait_to_become_active: bool,
) -> Result<&Arc<Tenant>, GetTenantError> {
let tenant = self
.map
.get(tenant_id)
.ok_or(GetTenantError::NotFound(*tenant_id))?;
tenant
.get_or_try_init(|| self.try_load_tenant(tenant_id, wait_to_become_active))
.await
}
}
/// The tenants known to the pageserver.
/// The enum variants are used to distinguish the different states that the pageserver can be in.
enum TenantsMap {
@@ -36,23 +96,31 @@ enum TenantsMap {
Initializing,
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
/// New tenants can be added using [`tenant_map_insert`].
Open(HashMap<TenantId, Arc<Tenant>>),
Open(LazyTenantsMap),
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
/// Existing tenants are still accessible, but no new tenants can be created.
ShuttingDown(HashMap<TenantId, Arc<Tenant>>),
}
impl TenantsMap {
fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
async fn get(
&self,
tenant_id: &TenantId,
wait_to_become_active: bool,
) -> Result<&Arc<Tenant>, GetTenantError> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id),
TenantsMap::Initializing => Err(GetTenantError::NotFound(*tenant_id)),
TenantsMap::Open(m) => m.get(tenant_id, wait_to_become_active).await,
TenantsMap::ShuttingDown(m) => {
m.get(tenant_id).ok_or(GetTenantError::NotFound(*tenant_id))
}
}
}
fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
fn remove(&mut self, tenant_id: &TenantId) -> bool {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
TenantsMap::Initializing => false,
TenantsMap::Open(m) => m.map.remove(tenant_id).is_some(),
TenantsMap::ShuttingDown(m) => m.remove(tenant_id).is_some(),
}
}
}
@@ -67,7 +135,7 @@ pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: (completion::Completion, completion::Barrier),
_init_done: (completion::Completion, completion::Barrier),
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
let tenants_dir = conf.tenants_path();
@@ -78,8 +146,6 @@ pub async fn init_tenant_mgr(
.await
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
loop {
match dir_entries.next_entry().await {
Ok(None) => break,
@@ -119,21 +185,15 @@ pub async fn init_tenant_mgr(
continue;
}
match schedule_local_tenant_processing(
conf,
&tenant_dir_path,
broker_client.clone(),
remote_storage.clone(),
Some(init_done.clone()),
&ctx,
) {
Ok(tenant) => {
tenants.insert(tenant.tenant_id(), tenant);
}
Err(e) => {
error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}");
}
}
let tenant_id = tenant_dir_path
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TenantId>()
.with_context(|| {
format!("Could not parse tenant id out of the tenant dir name in path {tenant_dir_path:?}")
})?;
tenants.insert(tenant_id, OnceCell::new());
}
}
Err(e) => {
@@ -151,7 +211,12 @@ pub async fn init_tenant_mgr(
let mut tenants_map = TENANTS.write().await;
assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
*tenants_map = TenantsMap::Open(tenants);
*tenants_map = TenantsMap::Open(LazyTenantsMap {
conf,
broker_client,
remote_storage,
map: tenants,
});
Ok(())
}
@@ -247,10 +312,17 @@ pub async fn shutdown_all_tenants() {
info!("tenants map is empty");
return;
}
TenantsMap::Open(tenants) => {
let tenants_clone = tenants.clone();
*m = TenantsMap::ShuttingDown(std::mem::take(tenants));
tenants_clone
TenantsMap::Open(lazy) => {
let online_tenants: Vec<Arc<Tenant>> = lazy
.map
.iter()
.filter_map(|(_, v)| v.get())
.cloned()
.collect();
*m = TenantsMap::ShuttingDown(HashMap::from_iter(
online_tenants.iter().map(|t| (t.tenant_id(), t.clone())),
));
online_tenants
}
TenantsMap::ShuttingDown(_) => {
error!("already shutting down, this function isn't supposed to be called more than once");
@@ -277,7 +349,8 @@ pub async fn shutdown_all_tenants() {
// It's mesed up.
let mut join_set = JoinSet::new();
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
for (tenant_id, tenant) in tenants_to_shut_down {
for tenant in tenants_to_shut_down {
let tenant_id = tenant.tenant_id();
join_set.spawn(
async move {
match tenant.set_stopping().await {
@@ -421,6 +494,10 @@ pub enum GetTenantError {
NotFound(TenantId),
#[error("Tenant {0} is not active")]
NotActive(TenantId),
#[error("Tenant {0} can not be loaded: {1}")]
NotLoaded(TenantId, anyhow::Error),
#[error("Tenant {0} can not be activated: {1}")]
NotActivated(TenantId, WaitToBecomeActiveError),
}
/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
@@ -430,9 +507,7 @@ pub async fn get_tenant(
active_only: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
let m = TENANTS.read().await;
let tenant = m
.get(&tenant_id)
.ok_or(GetTenantError::NotFound(tenant_id))?;
let tenant = m.get(&tenant_id, active_only).await?;
if active_only && !tenant.is_active() {
Err(GetTenantError::NotActive(tenant_id))
} else {
@@ -558,15 +633,62 @@ pub enum TenantMapListError {
///
/// Get list of tenants, for the mgmt API
///
// Many tests are using list_tenants to check if tenant is in active state
// With lazy loading tenants are initially in NotLoaded state.
// To make all this tests pass, lets force loading of tenants if testing feature is specified.
// Alternatively it is possible to pass extra parameter to list_tenants to choose between
// eager and lazy loading of tenants.
#[cfg(feature = "testing")]
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
let tenants = TENANTS.read().await;
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
};
Ok(m.iter()
.map(|(id, tenant)| (*id, tenant.current_state()))
.collect())
match &*tenants {
TenantsMap::Initializing => Err(TenantMapListError::Initializing),
// Do not copy paste futures::future::join_all usage to production code with many tenants
// Use a JoinSet instead
TenantsMap::Open(m) => Ok(futures::future::join_all(m.map.iter().map(
|(id, tenant)| async {
(
*id,
tenant
.get_or_try_init(|| m.try_load_tenant(id, false))
.await
.map_or(
TenantState::broken_from_reason("Failed to load tenant".to_string()),
|t| t.current_state(),
),
)
},
))
.await),
TenantsMap::ShuttingDown(m) => Ok(m
.iter()
.map(|(id, tenant)| (*id, tenant.current_state()))
.collect()),
}
}
#[cfg(not(feature = "testing"))]
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
let tenants = TENANTS.read().await;
match &*tenants {
TenantsMap::Initializing => Err(TenantMapListError::Initializing),
TenantsMap::Open(m) => Ok(m
.map
.iter()
.map(|(id, tenant)| {
(
*id,
tenant
.get()
.map_or(TenantState::NotLoaded, |tenant| tenant.current_state()),
)
})
.collect()),
TenantsMap::ShuttingDown(m) => Ok(m
.iter()
.map(|(id, tenant)| (*id, tenant.current_state()))
.collect()),
}
}
/// Execute Attach mgmt API command.
@@ -634,22 +756,23 @@ where
F: FnOnce() -> anyhow::Result<Arc<Tenant>>,
{
let mut guard = TENANTS.write().await;
let m = match &mut *guard {
TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing),
TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown),
TenantsMap::Open(m) => m,
};
match m.entry(tenant_id) {
hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
tenant_id,
e.get().current_state(),
)),
hash_map::Entry::Vacant(v) => match insert_fn() {
Ok(tenant) => {
v.insert(tenant.clone());
Ok(tenant)
}
Err(e) => Err(TenantMapInsertError::Closure(e)),
match &mut *guard {
TenantsMap::Initializing => Err(TenantMapInsertError::StillInitializing),
TenantsMap::ShuttingDown(_) => Err(TenantMapInsertError::ShuttingDown),
TenantsMap::Open(m) => match m.map.entry(tenant_id) {
hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
tenant_id,
e.get()
.get()
.map_or(TenantState::NotLoaded, |tenant| tenant.current_state()),
)),
hash_map::Entry::Vacant(v) => match insert_fn() {
Ok(tenant) => {
v.insert(OnceCell::new_with(Some(tenant.clone())));
Ok(tenant)
}
Err(e) => Err(TenantMapInsertError::Closure(e)),
},
},
}
}
@@ -671,8 +794,8 @@ where
// avoid holding the lock for the entire process.
{
let tenants_accessor = TENANTS.write().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => {
match tenants_accessor.get(&tenant_id, false).await {
Ok(tenant) => {
let tenant = Arc::clone(tenant);
// don't hold TENANTS lock while set_stopping waits for activation to finish
drop(tenants_accessor);
@@ -689,7 +812,7 @@ where
}
}
}
None => return Err(TenantStateError::NotFound(tenant_id)),
Err(_) => return Err(TenantStateError::NotFound(tenant_id)),
}
}
@@ -704,18 +827,18 @@ where
{
Ok(hook_value) => {
let mut tenants_accessor = TENANTS.write().await;
if tenants_accessor.remove(&tenant_id).is_none() {
if !tenants_accessor.remove(&tenant_id) {
warn!("Tenant {tenant_id} got removed from memory before operation finished");
}
Ok(hook_value)
}
Err(e) => {
let tenants_accessor = TENANTS.read().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => {
match tenants_accessor.get(&tenant_id, false).await {
Ok(tenant) => {
tenant.set_broken(e.to_string()).await;
}
None => {
Err(_) => {
warn!("Tenant {tenant_id} got removed from memory");
return Err(TenantStateError::NotFound(tenant_id));
}
@@ -736,13 +859,7 @@ pub async fn immediate_gc(
gc_req: TimelineGcRequest,
ctx: &RequestContext,
) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
let guard = TENANTS.read().await;
let tenant = guard
.get(&tenant_id)
.map(Arc::clone)
.with_context(|| format!("tenant {tenant_id}"))
.map_err(ApiError::NotFound)?;
let tenant = get_tenant(tenant_id, false).await?;
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
// Use tenant's pitr setting
let pitr = tenant.get_pitr_interval();
@@ -772,10 +889,6 @@ pub async fn immediate_gc(
Ok(())
}
);
// drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
drop(guard);
Ok(wait_task_done)
}
@@ -788,7 +901,8 @@ pub async fn immediate_compact(
let guard = TENANTS.read().await;
let tenant = guard
.get(&tenant_id)
.get(&tenant_id, true)
.await
.map(Arc::clone)
.with_context(|| format!("tenant {tenant_id}"))
.map_err(ApiError::NotFound)?;

View File

@@ -21,6 +21,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
".*will not become active. Current state: Broken.*",
".*failed to load metadata.*",
".*load failed.*load local timeline.*",
".*load failed, setting tenant state to Broken.*",
]
)