mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-07 20:50:38 +00:00
Compare commits
13 Commits
skyzh/cli-
...
lazy_tenan
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4015f5d952 | ||
|
|
f5e0a63041 | ||
|
|
012539a0e7 | ||
|
|
41f2db3a58 | ||
|
|
1178dbc614 | ||
|
|
e4b345a3c1 | ||
|
|
963690e77a | ||
|
|
15fae34751 | ||
|
|
a3b7d068c1 | ||
|
|
198d256b7d | ||
|
|
3e044a1405 | ||
|
|
36eb1c83f3 | ||
|
|
fc27d871ed |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2617,7 +2617,6 @@ dependencies = [
|
||||
"anyhow",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"clap 4.3.0",
|
||||
"const_format",
|
||||
"enum-map",
|
||||
"postgres_ffi",
|
||||
|
||||
@@ -2,11 +2,12 @@ use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::fs::File;
|
||||
use std::io::{BufReader, Write};
|
||||
use std::num::NonZeroU64;
|
||||
use std::path::PathBuf;
|
||||
use std::process::{Child, Command};
|
||||
use std::{io, result};
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::{bail, Context};
|
||||
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
|
||||
use postgres_backend::AuthType;
|
||||
use postgres_connection::{parse_host_port, PgConnectionConfig};
|
||||
@@ -312,8 +313,68 @@ impl PageServerNode {
|
||||
new_tenant_id: Option<TenantId>,
|
||||
settings: HashMap<&str, &str>,
|
||||
) -> anyhow::Result<TenantId> {
|
||||
let settings = settings.clone();
|
||||
let config = models::TenantConfig::deserialize_from_settings(settings)?;
|
||||
let mut settings = settings.clone();
|
||||
|
||||
let config = models::TenantConfig {
|
||||
checkpoint_distance: settings
|
||||
.remove("checkpoint_distance")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()?,
|
||||
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
|
||||
compaction_target_size: settings
|
||||
.remove("compaction_target_size")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()?,
|
||||
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
|
||||
compaction_threshold: settings
|
||||
.remove("compaction_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()?,
|
||||
gc_horizon: settings
|
||||
.remove("gc_horizon")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()?,
|
||||
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
|
||||
image_creation_threshold: settings
|
||||
.remove("image_creation_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()?,
|
||||
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
|
||||
walreceiver_connect_timeout: settings
|
||||
.remove("walreceiver_connect_timeout")
|
||||
.map(|x| x.to_string()),
|
||||
lagging_wal_timeout: settings
|
||||
.remove("lagging_wal_timeout")
|
||||
.map(|x| x.to_string()),
|
||||
max_lsn_wal_lag: settings
|
||||
.remove("max_lsn_wal_lag")
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
||||
trace_read_requests: settings
|
||||
.remove("trace_read_requests")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||
eviction_policy: settings
|
||||
.remove("eviction_policy")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("Failed to parse 'eviction_policy' json")?,
|
||||
min_resident_size_override: settings
|
||||
.remove("min_resident_size_override")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'min_resident_size_override' as integer")?,
|
||||
evictions_low_residence_duration_metric_threshold: settings
|
||||
.remove("evictions_low_residence_duration_metric_threshold")
|
||||
.map(|x| x.to_string()),
|
||||
gc_feedback: settings
|
||||
.remove("gc_feedback")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_feedback' as bool")?,
|
||||
};
|
||||
|
||||
// If tenant ID was not specified, generate one
|
||||
let new_tenant_id = new_tenant_id.unwrap_or(TenantId::generate());
|
||||
@@ -322,6 +383,9 @@ impl PageServerNode {
|
||||
new_tenant_id,
|
||||
config,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
}
|
||||
self.http_request(Method::POST, format!("{}/tenant", self.http_base_url))?
|
||||
.json(&request)
|
||||
.send()?
|
||||
@@ -341,9 +405,81 @@ impl PageServerNode {
|
||||
pub fn tenant_config(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
settings: HashMap<&str, &str>,
|
||||
mut settings: HashMap<&str, &str>,
|
||||
) -> anyhow::Result<()> {
|
||||
let config = models::TenantConfig::deserialize_from_settings(settings)?;
|
||||
let config = {
|
||||
// Braces to make the diff easier to read
|
||||
models::TenantConfig {
|
||||
checkpoint_distance: settings
|
||||
.remove("checkpoint_distance")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'checkpoint_distance' as an integer")?,
|
||||
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
|
||||
compaction_target_size: settings
|
||||
.remove("compaction_target_size")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'compaction_target_size' as an integer")?,
|
||||
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
|
||||
compaction_threshold: settings
|
||||
.remove("compaction_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'compaction_threshold' as an integer")?,
|
||||
gc_horizon: settings
|
||||
.remove("gc_horizon")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_horizon' as an integer")?,
|
||||
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
|
||||
image_creation_threshold: settings
|
||||
.remove("image_creation_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'image_creation_threshold' as non zero integer")?,
|
||||
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
|
||||
walreceiver_connect_timeout: settings
|
||||
.remove("walreceiver_connect_timeout")
|
||||
.map(|x| x.to_string()),
|
||||
lagging_wal_timeout: settings
|
||||
.remove("lagging_wal_timeout")
|
||||
.map(|x| x.to_string()),
|
||||
max_lsn_wal_lag: settings
|
||||
.remove("max_lsn_wal_lag")
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
||||
trace_read_requests: settings
|
||||
.remove("trace_read_requests")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||
eviction_policy: settings
|
||||
.remove("eviction_policy")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("Failed to parse 'eviction_policy' json")?,
|
||||
min_resident_size_override: settings
|
||||
.remove("min_resident_size_override")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'min_resident_size_override' as an integer")?,
|
||||
evictions_low_residence_duration_metric_threshold: settings
|
||||
.remove("evictions_low_residence_duration_metric_threshold")
|
||||
.map(|x| x.to_string()),
|
||||
gc_feedback: settings
|
||||
.remove("gc_feedback")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_feedback' as bool")?,
|
||||
}
|
||||
};
|
||||
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
}
|
||||
|
||||
self.http_request(Method::PUT, format!("{}/tenant/config", self.http_base_url))?
|
||||
.json(&models::TenantConfigRequest { tenant_id, config })
|
||||
.send()?
|
||||
|
||||
@@ -5,7 +5,6 @@ edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
clap.workspace = true
|
||||
serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
@@ -5,7 +5,6 @@ use std::{
|
||||
};
|
||||
|
||||
use byteorder::{BigEndian, ReadBytesExt};
|
||||
use clap::Parser;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use strum_macros;
|
||||
@@ -55,6 +54,10 @@ use bytes::{BufMut, Bytes, BytesMut};
|
||||
)]
|
||||
#[serde(tag = "slug", content = "data")]
|
||||
pub enum TenantState {
|
||||
/// This tenant is not yet loaded. This state is not actually used internally because not loaded tenants are handled using OnceSet.
|
||||
/// This value is needed only for reporting state of such tenants by list_tenants() function
|
||||
///
|
||||
NotLoaded,
|
||||
/// This tenant is being loaded from local disk.
|
||||
///
|
||||
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
|
||||
@@ -105,7 +108,7 @@ impl TenantState {
|
||||
Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
|
||||
// tenant mgr startup distinguishes attaching from loading via marker file.
|
||||
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
|
||||
Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
|
||||
Self::NotLoaded | Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
|
||||
// We only reach Active after successful load / attach.
|
||||
// So, call atttachment status Attached.
|
||||
Self::Active => Attached,
|
||||
@@ -202,74 +205,31 @@ impl std::ops::Deref for TenantCreateRequest {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default, clap::Parser)]
|
||||
#[clap(rename_all = "snake_case")]
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
pub struct TenantConfig {
|
||||
#[clap(long)]
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
#[clap(long)]
|
||||
pub checkpoint_timeout: Option<String>,
|
||||
#[clap(long)]
|
||||
pub compaction_target_size: Option<u64>,
|
||||
#[clap(long)]
|
||||
pub compaction_period: Option<String>,
|
||||
#[clap(long)]
|
||||
pub compaction_threshold: Option<usize>,
|
||||
#[clap(long)]
|
||||
pub gc_horizon: Option<u64>,
|
||||
#[clap(long)]
|
||||
pub gc_period: Option<String>,
|
||||
#[clap(long)]
|
||||
pub image_creation_threshold: Option<usize>,
|
||||
#[clap(long)]
|
||||
pub pitr_interval: Option<String>,
|
||||
#[clap(long)]
|
||||
pub walreceiver_connect_timeout: Option<String>,
|
||||
#[clap(long)]
|
||||
pub lagging_wal_timeout: Option<String>,
|
||||
#[clap(long)]
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
#[clap(long)]
|
||||
pub trace_read_requests: Option<bool>,
|
||||
// We defer the parsing of the eviction_policy field to the request handler.
|
||||
// Otherwise we'd have to move the types for eviction policy into this package.
|
||||
// We might do that once the eviction feature has stabilizied.
|
||||
// For now, this field is not even documented in the openapi_spec.yml.
|
||||
#[clap(long, value_parser = parse_json)]
|
||||
pub eviction_policy: Option<serde_json::Value>,
|
||||
#[clap(long)]
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
#[clap(long)]
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
pub gc_feedback: Option<bool>,
|
||||
}
|
||||
|
||||
fn parse_json(s: &str) -> Result<serde_json::Value, serde_json::Error> {
|
||||
serde_json::from_str(s)
|
||||
}
|
||||
|
||||
impl TenantConfig {
|
||||
pub fn deserialize_from_settings(settings: HashMap<&str, &str>) -> Result<Self, anyhow::Error> {
|
||||
// Here we are using `clap` to parse the settings. This is not ideal, but it's the easiest
|
||||
// way to simplify th code. To convert settings into a list of command line arguments, we
|
||||
// need the program name argv0, each key into a long-form option, and each value proceeding it.
|
||||
let config = TenantConfig::try_parse_from(
|
||||
std::iter::once("argv0".to_string()).chain(
|
||||
settings
|
||||
.iter()
|
||||
.flat_map(|(k, v)| [format!("--{k}"), v.to_string()]),
|
||||
),
|
||||
)
|
||||
.map_err(|e| {
|
||||
anyhow::anyhow!(
|
||||
"failed to parse: {}",
|
||||
e.to_string().split('\n').next().unwrap_or_default()
|
||||
) // only get the first line as other lines in clap errors are not useful
|
||||
})?;
|
||||
Ok(config)
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[serde(transparent)]
|
||||
@@ -917,28 +877,17 @@ mod tests {
|
||||
err
|
||||
);
|
||||
|
||||
let config = HashMap::from_iter(std::iter::once(("unknown_field", "unknown_value")));
|
||||
let err = TenantConfig::deserialize_from_settings(config).unwrap_err();
|
||||
let attach_request = json!({
|
||||
"config": {
|
||||
"unknown_field": "unknown_value".to_string(),
|
||||
},
|
||||
});
|
||||
let err = serde_json::from_value::<TenantAttachRequest>(attach_request).unwrap_err();
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("unexpected argument '--unknown_field' found"),
|
||||
err.to_string().contains("unknown field `unknown_field`"),
|
||||
"expect unknown field `unknown_field` error, got: {}",
|
||||
err
|
||||
);
|
||||
|
||||
let config = HashMap::from_iter(std::iter::once(("checkpoint_distance", "not_a_number")));
|
||||
let err = TenantConfig::deserialize_from_settings(config).unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("invalid digit found in string") && err.to_string().contains("checkpoint_distance"),
|
||||
"expect error to contain both 'invalid digit found in string' and the field 'checkpoint_distance', got: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_json_field() {
|
||||
let config = vec![("eviction_policy", "{\"kind\": \"NoEviction\"}")];
|
||||
TenantConfig::deserialize_from_settings(config.into_iter().collect()).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -962,6 +911,7 @@ mod tests {
|
||||
fn tenantstatus_activating_strum() {
|
||||
// tests added, because we use these for metrics
|
||||
let examples = [
|
||||
(line!(), TenantState::NotLoaded, "NotLoaded"),
|
||||
(line!(), TenantState::Loading, "Loading"),
|
||||
(line!(), TenantState::Attaching, "Attaching"),
|
||||
(
|
||||
|
||||
@@ -159,6 +159,12 @@ impl From<GetTenantError> for ApiError {
|
||||
// (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
|
||||
ApiError::InternalServerError(anyhow::Error::new(e))
|
||||
}
|
||||
e @ GetTenantError::NotLoaded(_, _) => {
|
||||
ApiError::InternalServerError(anyhow::Error::new(e))
|
||||
}
|
||||
e @ GetTenantError::NotActivated(_, _) => {
|
||||
ApiError::InternalServerError(anyhow::Error::new(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1179,6 +1179,10 @@ async fn get_active_tenant_with_timeout(
|
||||
let tenant = match mgr::get_tenant(tenant_id, false).await {
|
||||
Ok(tenant) => tenant,
|
||||
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
|
||||
Err(e @ GetTenantError::NotLoaded(_, _)) => return Err(GetActiveTenantError::NotFound(e)),
|
||||
Err(e @ GetTenantError::NotActivated(_, _)) => {
|
||||
return Err(GetActiveTenantError::NotFound(e))
|
||||
}
|
||||
Err(GetTenantError::NotActive(_)) => {
|
||||
unreachable!("we're calling get_tenant with active=false")
|
||||
}
|
||||
|
||||
@@ -459,7 +459,7 @@ struct RemoteStartupData {
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum WaitToBecomeActiveError {
|
||||
pub enum WaitToBecomeActiveError {
|
||||
WillNotBecomeActive {
|
||||
tenant_id: TenantId,
|
||||
state: TenantState,
|
||||
@@ -1694,7 +1694,7 @@ impl Tenant {
|
||||
self.state.send_modify(|current_state| {
|
||||
use pageserver_api::models::ActivatingFrom;
|
||||
match &*current_state {
|
||||
TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping => {
|
||||
TenantState::NotLoaded | TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping => {
|
||||
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
|
||||
}
|
||||
TenantState::Loading => {
|
||||
@@ -1761,7 +1761,10 @@ impl Tenant {
|
||||
|
||||
// cannot stop before we're done activating, so wait out until we're done activating
|
||||
rx.wait_for(|state| match state {
|
||||
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
||||
TenantState::NotLoaded
|
||||
| TenantState::Activating(_)
|
||||
| TenantState::Loading
|
||||
| TenantState::Attaching => {
|
||||
info!(
|
||||
"waiting for {} to turn Active|Broken|Stopping",
|
||||
<&'static str>::from(state)
|
||||
@@ -1776,7 +1779,7 @@ impl Tenant {
|
||||
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
|
||||
let mut err = None;
|
||||
let stopping = self.state.send_if_modified(|current_state| match current_state {
|
||||
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
||||
TenantState::NotLoaded | TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
||||
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
|
||||
}
|
||||
TenantState::Active => {
|
||||
@@ -1834,7 +1837,10 @@ impl Tenant {
|
||||
// The load & attach routines own the tenant state until it has reached `Active`.
|
||||
// So, wait until it's done.
|
||||
rx.wait_for(|state| match state {
|
||||
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
||||
TenantState::NotLoaded
|
||||
| TenantState::Activating(_)
|
||||
| TenantState::Loading
|
||||
| TenantState::Attaching => {
|
||||
info!(
|
||||
"waiting for {} to turn Active|Broken|Stopping",
|
||||
<&'static str>::from(state)
|
||||
@@ -1849,7 +1855,7 @@ impl Tenant {
|
||||
// we now know we're done activating, let's see whether this task is the winner to transition into Broken
|
||||
self.state.send_modify(|current_state| {
|
||||
match *current_state {
|
||||
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
||||
TenantState::NotLoaded | TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
||||
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
|
||||
}
|
||||
TenantState::Active => {
|
||||
@@ -1884,7 +1890,10 @@ impl Tenant {
|
||||
loop {
|
||||
let current_state = receiver.borrow_and_update().clone();
|
||||
match current_state {
|
||||
TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
|
||||
TenantState::NotLoaded
|
||||
| TenantState::Loading
|
||||
| TenantState::Attaching
|
||||
| TenantState::Activating(_) => {
|
||||
// in these states, there's a chance that we can reach ::Active
|
||||
receiver.changed().await.map_err(
|
||||
|_e: tokio::sync::watch::error::RecvError| {
|
||||
|
||||
@@ -9,7 +9,7 @@ use tokio::fs;
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::{OnceCell, RwLock};
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::*;
|
||||
|
||||
@@ -22,6 +22,7 @@ use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::{
|
||||
create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState,
|
||||
WaitToBecomeActiveError,
|
||||
};
|
||||
use crate::IGNORED_TENANT_FILE_NAME;
|
||||
|
||||
@@ -29,6 +30,65 @@ use utils::completion;
|
||||
use utils::fs_ext::PathExt;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
struct LazyTenantsMap {
|
||||
conf: &'static PageServerConf,
|
||||
map: HashMap<TenantId, OnceCell<Arc<Tenant>>>,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
}
|
||||
|
||||
impl LazyTenantsMap {
|
||||
fn load_tenant(&self, tenant_id: &TenantId) -> anyhow::Result<Arc<Tenant>> {
|
||||
let tenant_path = self.conf.tenant_path(tenant_id);
|
||||
let tenant_ignore_mark = self.conf.tenant_ignore_mark_file_path(*tenant_id);
|
||||
if tenant_ignore_mark.exists() {
|
||||
std::fs::remove_file(&tenant_ignore_mark)
|
||||
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
|
||||
}
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
|
||||
schedule_local_tenant_processing(
|
||||
self.conf,
|
||||
&tenant_path,
|
||||
self.broker_client.clone(),
|
||||
self.remote_storage.clone(),
|
||||
None,
|
||||
&ctx,
|
||||
)
|
||||
.with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))
|
||||
}
|
||||
|
||||
async fn try_load_tenant(
|
||||
&self,
|
||||
tenant_id: &TenantId,
|
||||
wait_to_become_active: bool,
|
||||
) -> Result<Arc<Tenant>, GetTenantError> {
|
||||
let tenant = self
|
||||
.load_tenant(tenant_id)
|
||||
.map_err(|e| GetTenantError::NotLoaded(*tenant_id, e))?;
|
||||
if wait_to_become_active {
|
||||
tenant
|
||||
.wait_to_become_active()
|
||||
.await
|
||||
.map_err(|e| GetTenantError::NotActivated(*tenant_id, e))?;
|
||||
}
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
async fn get(
|
||||
&self,
|
||||
tenant_id: &TenantId,
|
||||
wait_to_become_active: bool,
|
||||
) -> Result<&Arc<Tenant>, GetTenantError> {
|
||||
let tenant = self
|
||||
.map
|
||||
.get(tenant_id)
|
||||
.ok_or(GetTenantError::NotFound(*tenant_id))?;
|
||||
tenant
|
||||
.get_or_try_init(|| self.try_load_tenant(tenant_id, wait_to_become_active))
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// The tenants known to the pageserver.
|
||||
/// The enum variants are used to distinguish the different states that the pageserver can be in.
|
||||
enum TenantsMap {
|
||||
@@ -36,23 +96,31 @@ enum TenantsMap {
|
||||
Initializing,
|
||||
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
|
||||
/// New tenants can be added using [`tenant_map_insert`].
|
||||
Open(HashMap<TenantId, Arc<Tenant>>),
|
||||
Open(LazyTenantsMap),
|
||||
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
|
||||
/// Existing tenants are still accessible, but no new tenants can be created.
|
||||
ShuttingDown(HashMap<TenantId, Arc<Tenant>>),
|
||||
}
|
||||
|
||||
impl TenantsMap {
|
||||
fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
|
||||
async fn get(
|
||||
&self,
|
||||
tenant_id: &TenantId,
|
||||
wait_to_become_active: bool,
|
||||
) -> Result<&Arc<Tenant>, GetTenantError> {
|
||||
match self {
|
||||
TenantsMap::Initializing => None,
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id),
|
||||
TenantsMap::Initializing => Err(GetTenantError::NotFound(*tenant_id)),
|
||||
TenantsMap::Open(m) => m.get(tenant_id, wait_to_become_active).await,
|
||||
TenantsMap::ShuttingDown(m) => {
|
||||
m.get(tenant_id).ok_or(GetTenantError::NotFound(*tenant_id))
|
||||
}
|
||||
}
|
||||
}
|
||||
fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
|
||||
fn remove(&mut self, tenant_id: &TenantId) -> bool {
|
||||
match self {
|
||||
TenantsMap::Initializing => None,
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
|
||||
TenantsMap::Initializing => false,
|
||||
TenantsMap::Open(m) => m.map.remove(tenant_id).is_some(),
|
||||
TenantsMap::ShuttingDown(m) => m.remove(tenant_id).is_some(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -67,7 +135,7 @@ pub async fn init_tenant_mgr(
|
||||
conf: &'static PageServerConf,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
init_done: (completion::Completion, completion::Barrier),
|
||||
_init_done: (completion::Completion, completion::Barrier),
|
||||
) -> anyhow::Result<()> {
|
||||
// Scan local filesystem for attached tenants
|
||||
let tenants_dir = conf.tenants_path();
|
||||
@@ -78,8 +146,6 @@ pub async fn init_tenant_mgr(
|
||||
.await
|
||||
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
|
||||
|
||||
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
|
||||
|
||||
loop {
|
||||
match dir_entries.next_entry().await {
|
||||
Ok(None) => break,
|
||||
@@ -119,21 +185,15 @@ pub async fn init_tenant_mgr(
|
||||
continue;
|
||||
}
|
||||
|
||||
match schedule_local_tenant_processing(
|
||||
conf,
|
||||
&tenant_dir_path,
|
||||
broker_client.clone(),
|
||||
remote_storage.clone(),
|
||||
Some(init_done.clone()),
|
||||
&ctx,
|
||||
) {
|
||||
Ok(tenant) => {
|
||||
tenants.insert(tenant.tenant_id(), tenant);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}");
|
||||
}
|
||||
}
|
||||
let tenant_id = tenant_dir_path
|
||||
.file_name()
|
||||
.and_then(OsStr::to_str)
|
||||
.unwrap_or_default()
|
||||
.parse::<TenantId>()
|
||||
.with_context(|| {
|
||||
format!("Could not parse tenant id out of the tenant dir name in path {tenant_dir_path:?}")
|
||||
})?;
|
||||
tenants.insert(tenant_id, OnceCell::new());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -151,7 +211,12 @@ pub async fn init_tenant_mgr(
|
||||
|
||||
let mut tenants_map = TENANTS.write().await;
|
||||
assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
|
||||
*tenants_map = TenantsMap::Open(tenants);
|
||||
*tenants_map = TenantsMap::Open(LazyTenantsMap {
|
||||
conf,
|
||||
broker_client,
|
||||
remote_storage,
|
||||
map: tenants,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -247,10 +312,17 @@ pub async fn shutdown_all_tenants() {
|
||||
info!("tenants map is empty");
|
||||
return;
|
||||
}
|
||||
TenantsMap::Open(tenants) => {
|
||||
let tenants_clone = tenants.clone();
|
||||
*m = TenantsMap::ShuttingDown(std::mem::take(tenants));
|
||||
tenants_clone
|
||||
TenantsMap::Open(lazy) => {
|
||||
let online_tenants: Vec<Arc<Tenant>> = lazy
|
||||
.map
|
||||
.iter()
|
||||
.filter_map(|(_, v)| v.get())
|
||||
.cloned()
|
||||
.collect();
|
||||
*m = TenantsMap::ShuttingDown(HashMap::from_iter(
|
||||
online_tenants.iter().map(|t| (t.tenant_id(), t.clone())),
|
||||
));
|
||||
online_tenants
|
||||
}
|
||||
TenantsMap::ShuttingDown(_) => {
|
||||
error!("already shutting down, this function isn't supposed to be called more than once");
|
||||
@@ -277,7 +349,8 @@ pub async fn shutdown_all_tenants() {
|
||||
// It's mesed up.
|
||||
let mut join_set = JoinSet::new();
|
||||
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
|
||||
for (tenant_id, tenant) in tenants_to_shut_down {
|
||||
for tenant in tenants_to_shut_down {
|
||||
let tenant_id = tenant.tenant_id();
|
||||
join_set.spawn(
|
||||
async move {
|
||||
match tenant.set_stopping().await {
|
||||
@@ -421,6 +494,10 @@ pub enum GetTenantError {
|
||||
NotFound(TenantId),
|
||||
#[error("Tenant {0} is not active")]
|
||||
NotActive(TenantId),
|
||||
#[error("Tenant {0} can not be loaded: {1}")]
|
||||
NotLoaded(TenantId, anyhow::Error),
|
||||
#[error("Tenant {0} can not be activated: {1}")]
|
||||
NotActivated(TenantId, WaitToBecomeActiveError),
|
||||
}
|
||||
|
||||
/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
|
||||
@@ -430,9 +507,7 @@ pub async fn get_tenant(
|
||||
active_only: bool,
|
||||
) -> Result<Arc<Tenant>, GetTenantError> {
|
||||
let m = TENANTS.read().await;
|
||||
let tenant = m
|
||||
.get(&tenant_id)
|
||||
.ok_or(GetTenantError::NotFound(tenant_id))?;
|
||||
let tenant = m.get(&tenant_id, active_only).await?;
|
||||
if active_only && !tenant.is_active() {
|
||||
Err(GetTenantError::NotActive(tenant_id))
|
||||
} else {
|
||||
@@ -558,15 +633,62 @@ pub enum TenantMapListError {
|
||||
///
|
||||
/// Get list of tenants, for the mgmt API
|
||||
///
|
||||
// Many tests are using list_tenants to check if tenant is in active state
|
||||
// With lazy loading tenants are initially in NotLoaded state.
|
||||
// To make all this tests pass, lets force loading of tenants if testing feature is specified.
|
||||
// Alternatively it is possible to pass extra parameter to list_tenants to choose between
|
||||
// eager and lazy loading of tenants.
|
||||
#[cfg(feature = "testing")]
|
||||
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
|
||||
let tenants = TENANTS.read().await;
|
||||
let m = match &*tenants {
|
||||
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
|
||||
};
|
||||
Ok(m.iter()
|
||||
.map(|(id, tenant)| (*id, tenant.current_state()))
|
||||
.collect())
|
||||
match &*tenants {
|
||||
TenantsMap::Initializing => Err(TenantMapListError::Initializing),
|
||||
// Do not copy paste futures::future::join_all usage to production code with many tenants
|
||||
// Use a JoinSet instead
|
||||
TenantsMap::Open(m) => Ok(futures::future::join_all(m.map.iter().map(
|
||||
|(id, tenant)| async {
|
||||
(
|
||||
*id,
|
||||
tenant
|
||||
.get_or_try_init(|| m.try_load_tenant(id, false))
|
||||
.await
|
||||
.map_or(
|
||||
TenantState::broken_from_reason("Failed to load tenant".to_string()),
|
||||
|t| t.current_state(),
|
||||
),
|
||||
)
|
||||
},
|
||||
))
|
||||
.await),
|
||||
TenantsMap::ShuttingDown(m) => Ok(m
|
||||
.iter()
|
||||
.map(|(id, tenant)| (*id, tenant.current_state()))
|
||||
.collect()),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "testing"))]
|
||||
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
|
||||
let tenants = TENANTS.read().await;
|
||||
match &*tenants {
|
||||
TenantsMap::Initializing => Err(TenantMapListError::Initializing),
|
||||
TenantsMap::Open(m) => Ok(m
|
||||
.map
|
||||
.iter()
|
||||
.map(|(id, tenant)| {
|
||||
(
|
||||
*id,
|
||||
tenant
|
||||
.get()
|
||||
.map_or(TenantState::NotLoaded, |tenant| tenant.current_state()),
|
||||
)
|
||||
})
|
||||
.collect()),
|
||||
TenantsMap::ShuttingDown(m) => Ok(m
|
||||
.iter()
|
||||
.map(|(id, tenant)| (*id, tenant.current_state()))
|
||||
.collect()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute Attach mgmt API command.
|
||||
@@ -634,22 +756,23 @@ where
|
||||
F: FnOnce() -> anyhow::Result<Arc<Tenant>>,
|
||||
{
|
||||
let mut guard = TENANTS.write().await;
|
||||
let m = match &mut *guard {
|
||||
TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing),
|
||||
TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown),
|
||||
TenantsMap::Open(m) => m,
|
||||
};
|
||||
match m.entry(tenant_id) {
|
||||
hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
|
||||
tenant_id,
|
||||
e.get().current_state(),
|
||||
)),
|
||||
hash_map::Entry::Vacant(v) => match insert_fn() {
|
||||
Ok(tenant) => {
|
||||
v.insert(tenant.clone());
|
||||
Ok(tenant)
|
||||
}
|
||||
Err(e) => Err(TenantMapInsertError::Closure(e)),
|
||||
match &mut *guard {
|
||||
TenantsMap::Initializing => Err(TenantMapInsertError::StillInitializing),
|
||||
TenantsMap::ShuttingDown(_) => Err(TenantMapInsertError::ShuttingDown),
|
||||
TenantsMap::Open(m) => match m.map.entry(tenant_id) {
|
||||
hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
|
||||
tenant_id,
|
||||
e.get()
|
||||
.get()
|
||||
.map_or(TenantState::NotLoaded, |tenant| tenant.current_state()),
|
||||
)),
|
||||
hash_map::Entry::Vacant(v) => match insert_fn() {
|
||||
Ok(tenant) => {
|
||||
v.insert(OnceCell::new_with(Some(tenant.clone())));
|
||||
Ok(tenant)
|
||||
}
|
||||
Err(e) => Err(TenantMapInsertError::Closure(e)),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -671,8 +794,8 @@ where
|
||||
// avoid holding the lock for the entire process.
|
||||
{
|
||||
let tenants_accessor = TENANTS.write().await;
|
||||
match tenants_accessor.get(&tenant_id) {
|
||||
Some(tenant) => {
|
||||
match tenants_accessor.get(&tenant_id, false).await {
|
||||
Ok(tenant) => {
|
||||
let tenant = Arc::clone(tenant);
|
||||
// don't hold TENANTS lock while set_stopping waits for activation to finish
|
||||
drop(tenants_accessor);
|
||||
@@ -689,7 +812,7 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
None => return Err(TenantStateError::NotFound(tenant_id)),
|
||||
Err(_) => return Err(TenantStateError::NotFound(tenant_id)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -704,18 +827,18 @@ where
|
||||
{
|
||||
Ok(hook_value) => {
|
||||
let mut tenants_accessor = TENANTS.write().await;
|
||||
if tenants_accessor.remove(&tenant_id).is_none() {
|
||||
if !tenants_accessor.remove(&tenant_id) {
|
||||
warn!("Tenant {tenant_id} got removed from memory before operation finished");
|
||||
}
|
||||
Ok(hook_value)
|
||||
}
|
||||
Err(e) => {
|
||||
let tenants_accessor = TENANTS.read().await;
|
||||
match tenants_accessor.get(&tenant_id) {
|
||||
Some(tenant) => {
|
||||
match tenants_accessor.get(&tenant_id, false).await {
|
||||
Ok(tenant) => {
|
||||
tenant.set_broken(e.to_string()).await;
|
||||
}
|
||||
None => {
|
||||
Err(_) => {
|
||||
warn!("Tenant {tenant_id} got removed from memory");
|
||||
return Err(TenantStateError::NotFound(tenant_id));
|
||||
}
|
||||
@@ -736,13 +859,7 @@ pub async fn immediate_gc(
|
||||
gc_req: TimelineGcRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
|
||||
let guard = TENANTS.read().await;
|
||||
let tenant = guard
|
||||
.get(&tenant_id)
|
||||
.map(Arc::clone)
|
||||
.with_context(|| format!("tenant {tenant_id}"))
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
let tenant = get_tenant(tenant_id, false).await?;
|
||||
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
|
||||
// Use tenant's pitr setting
|
||||
let pitr = tenant.get_pitr_interval();
|
||||
@@ -772,10 +889,6 @@ pub async fn immediate_gc(
|
||||
Ok(())
|
||||
}
|
||||
);
|
||||
|
||||
// drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
|
||||
drop(guard);
|
||||
|
||||
Ok(wait_task_done)
|
||||
}
|
||||
|
||||
@@ -788,7 +901,8 @@ pub async fn immediate_compact(
|
||||
let guard = TENANTS.read().await;
|
||||
|
||||
let tenant = guard
|
||||
.get(&tenant_id)
|
||||
.get(&tenant_id, true)
|
||||
.await
|
||||
.map(Arc::clone)
|
||||
.with_context(|| format!("tenant {tenant_id}"))
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
@@ -21,6 +21,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
".*will not become active. Current state: Broken.*",
|
||||
".*failed to load metadata.*",
|
||||
".*load failed.*load local timeline.*",
|
||||
".*load failed, setting tenant state to Broken.*",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user