mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
8 Commits
release-pr
...
jcsp/tenan
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c6867e9f32 | ||
|
|
e9a9e6be30 | ||
|
|
e8d4c214a3 | ||
|
|
8106f87945 | ||
|
|
5b989ff595 | ||
|
|
819426ec59 | ||
|
|
403a25d42d | ||
|
|
4234c886f4 |
@@ -408,6 +408,11 @@ fn start_pageserver(
|
||||
initial_tenant_load_remote: Some(init_done_tx),
|
||||
initial_tenant_load: Some(init_remote_done_tx),
|
||||
background_jobs_can_start: background_jobs_barrier.clone(),
|
||||
warmup_limit: Arc::new(tokio::sync::Semaphore::new(
|
||||
conf.concurrent_tenant_size_logical_size_queries
|
||||
.initial_permits()
|
||||
.get(),
|
||||
)),
|
||||
};
|
||||
|
||||
// Scan the local 'tenants/' directory and start loading the tenants
|
||||
|
||||
@@ -38,6 +38,7 @@ use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::{LocationConf, TenantConfOpt};
|
||||
use crate::tenant::mgr::GetActiveTenantError;
|
||||
use crate::tenant::mgr::{
|
||||
GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError,
|
||||
TenantSlotError, TenantSlotUpsertError, TenantStateError,
|
||||
@@ -67,6 +68,11 @@ use utils::{
|
||||
// Imports only used for testing APIs
|
||||
use super::models::ConfigureFailpointsRequest;
|
||||
|
||||
// For APIs that require an Active tenant, how long should we block waiting for that state?
|
||||
// This is not functionally necessary (clients will retry), but avoids generating a lot of
|
||||
// failed API calls while tenants are activating.
|
||||
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
|
||||
|
||||
pub struct State {
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
@@ -233,6 +239,19 @@ impl From<GetTenantError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetActiveTenantError> for ApiError {
|
||||
fn from(e: GetActiveTenantError) -> ApiError {
|
||||
match e {
|
||||
GetActiveTenantError::WillNotBecomeActive(_) => ApiError::Conflict(format!("{}", e)),
|
||||
GetActiveTenantError::Cancelled => ApiError::ShuttingDown,
|
||||
GetActiveTenantError::NotFound(gte) => gte.into(),
|
||||
GetActiveTenantError::WaitForActiveTimeout { .. } => {
|
||||
ApiError::ResourceUnavailable(format!("{}", e).into())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SetNewTenantConfigError> for ApiError {
|
||||
fn from(e: SetNewTenantConfigError) -> ApiError {
|
||||
match e {
|
||||
@@ -435,7 +454,10 @@ async fn timeline_create_handler(
|
||||
let state = get_state(&request);
|
||||
|
||||
async {
|
||||
let tenant = state.tenant_manager.get_attached_tenant_shard(tenant_shard_id, true)?;
|
||||
let tenant = state.tenant_manager.get_attached_tenant_shard(tenant_shard_id, false)?;
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
match tenant.create_timeline(
|
||||
new_timeline_id,
|
||||
request_data.ancestor_timeline_id.map(TimelineId::from),
|
||||
@@ -694,11 +716,23 @@ async fn timeline_delete_handler(
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
let state = get_state(&request);
|
||||
|
||||
state.tenant_manager.delete_timeline(tenant_shard_id, timeline_id, &ctx)
|
||||
.instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug(), %timeline_id))
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id, false)
|
||||
.map_err(|e| {
|
||||
match e {
|
||||
// GetTenantError has a built-in conversion to ApiError, but in this context we don't
|
||||
// want to treat missing tenants as 404, to avoid ambiguity with successful deletions.
|
||||
GetTenantError::NotFound(_) => ApiError::PreconditionFailed(
|
||||
"Requested tenant is missing".to_string().into_boxed_str(),
|
||||
),
|
||||
e => e.into(),
|
||||
}
|
||||
})?;
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
tenant.delete_timeline(timeline_id).instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug(), %timeline_id))
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
@@ -1136,7 +1170,10 @@ async fn tenant_create_handler(
|
||||
|
||||
// We created the tenant. Existing API semantics are that the tenant
|
||||
// is Active when this function returns.
|
||||
if let res @ Err(_) = new_tenant.wait_to_become_active().await {
|
||||
if let res @ Err(_) = new_tenant
|
||||
.wait_to_become_active(ACTIVE_TENANT_TIMEOUT)
|
||||
.await
|
||||
{
|
||||
// This shouldn't happen because we just created the tenant directory
|
||||
// in tenant::mgr::create_tenant, and there aren't any remote timelines
|
||||
// to load, so, nothing can really fail during load.
|
||||
|
||||
@@ -27,6 +27,8 @@ pub mod walredo;
|
||||
|
||||
pub mod failpoint_support;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::task_mgr::TaskKind;
|
||||
use camino::Utf8Path;
|
||||
use deletion_queue::DeletionQueue;
|
||||
@@ -190,6 +192,11 @@ pub struct InitializationOrder {
|
||||
///
|
||||
/// This can be broken up later on, but right now there is just one class of a background job.
|
||||
pub background_jobs_can_start: utils::completion::Barrier,
|
||||
|
||||
/// Concurrency limit for attaching tenants during startup. This limit does not
|
||||
/// apply to tenants that a client tries to access: those proceed to attach as fast
|
||||
/// as they can.
|
||||
pub warmup_limit: Arc<tokio::sync::Semaphore>,
|
||||
}
|
||||
|
||||
/// Time the future with a warning when it exceeds a threshold.
|
||||
|
||||
@@ -684,14 +684,54 @@ pub static STARTUP_IS_LOADING: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
.expect("Failed to register pageserver_startup_is_loading")
|
||||
});
|
||||
|
||||
/// How long did tenants take to go from construction to active state?
|
||||
pub(crate) static TENANT_ACTIVATION: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
/// Metrics related to the lifecycle of a [`crate::tenant::Tenant`] object: things
|
||||
/// like how long it took to load.
|
||||
///
|
||||
/// Note that these are process-global metrics, _not_ per-tenant metrics. Per-tenant
|
||||
/// metrics are rather expensive, and usually fine grained stuff makes more sense
|
||||
/// at a timeline level than tenant level.
|
||||
pub(crate) struct TenantMetrics {
|
||||
/// How long did tenants take to go from construction to active state?
|
||||
pub(crate) activation: Histogram,
|
||||
pub(crate) preload: Histogram,
|
||||
pub(crate) attach: Histogram,
|
||||
|
||||
/// How many tenants are included in the initial startup of the pagesrever?
|
||||
pub(crate) startup_scheduled: IntCounter,
|
||||
pub(crate) startup_complete: IntCounter,
|
||||
}
|
||||
|
||||
pub(crate) static TENANT: Lazy<TenantMetrics> = Lazy::new(|| {
|
||||
TenantMetrics {
|
||||
activation: register_histogram!(
|
||||
"pageserver_tenant_activation_seconds",
|
||||
"Time taken by tenants to activate, in seconds",
|
||||
CRITICAL_OP_BUCKETS.into()
|
||||
)
|
||||
.expect("Failed to register pageserver_tenant_activation_seconds metric")
|
||||
.expect("Failed to register metric"),
|
||||
preload: register_histogram!(
|
||||
"pageserver_tenant_preload_seconds",
|
||||
"Time taken by tenants to load remote metadata on startup/attach, in seconds",
|
||||
CRITICAL_OP_BUCKETS.into()
|
||||
)
|
||||
.expect("Failed to register metric"),
|
||||
attach: register_histogram!(
|
||||
"pageserver_tenant_attach_seconds",
|
||||
"Time taken by tenants to intialize, after remote metadata is already loaded",
|
||||
CRITICAL_OP_BUCKETS.into()
|
||||
)
|
||||
.expect("Failed to register metric"),
|
||||
startup_scheduled: register_int_counter!(
|
||||
"pageserver_tenant_startup_scheduled",
|
||||
"Number of tenants included in pageserver startup (doesn't count tenants attached later)"
|
||||
).expect("Failed to register metric"),
|
||||
startup_complete: register_int_counter!(
|
||||
"pageserver_tenant_startup_complete",
|
||||
"Number of tenants that have completed warm-up, or activated on-demand during initial startup: \
|
||||
should eventually reach `pageserver_tenant_startup_scheduled_total`. Does not include broken \
|
||||
tenants: such cases will lead to this metric never reaching the scheduled count."
|
||||
).expect("Failed to register metric"),
|
||||
}
|
||||
});
|
||||
|
||||
/// Each `Timeline`'s [`EVICTIONS_WITH_LOW_RESIDENCE_DURATION`] metric.
|
||||
@@ -2213,6 +2253,9 @@ pub fn preinitialize_metrics() {
|
||||
// Deletion queue stats
|
||||
Lazy::force(&DELETION_QUEUE);
|
||||
|
||||
// Tenant stats
|
||||
Lazy::force(&TENANT);
|
||||
|
||||
// Tenant manager stats
|
||||
Lazy::force(&TENANT_MANAGER);
|
||||
|
||||
|
||||
@@ -36,6 +36,8 @@ use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::fs_ext;
|
||||
use utils::sync::gate::Gate;
|
||||
use utils::sync::gate::GateGuard;
|
||||
use utils::timeout::timeout_cancellable;
|
||||
use utils::timeout::TimeoutCancellableError;
|
||||
|
||||
use self::config::AttachedLocationConfig;
|
||||
use self::config::AttachmentMode;
|
||||
@@ -59,7 +61,7 @@ use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::deletion_queue::DeletionQueueError;
|
||||
use crate::import_datadir;
|
||||
use crate::is_uninit_mark;
|
||||
use crate::metrics::TENANT_ACTIVATION;
|
||||
use crate::metrics::TENANT;
|
||||
use crate::metrics::{remove_tenant_metrics, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC};
|
||||
use crate::repository::GcResult;
|
||||
use crate::task_mgr;
|
||||
@@ -226,7 +228,7 @@ pub struct Tenant {
|
||||
|
||||
/// The value creation timestamp, used to measure activation delay, see:
|
||||
/// <https://github.com/neondatabase/neon/issues/4025>
|
||||
loading_started_at: Instant,
|
||||
constructed_at: Instant,
|
||||
|
||||
state: watch::Sender<TenantState>,
|
||||
|
||||
@@ -276,6 +278,11 @@ pub struct Tenant {
|
||||
|
||||
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
|
||||
|
||||
/// If the tenant is in Activating state, notify this to encourage it
|
||||
/// to proceed to Active as soon as possible, rather than waiting for lazy
|
||||
/// background warmup.
|
||||
pub(crate) activate_now: tokio::sync::Notify,
|
||||
|
||||
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
|
||||
|
||||
// Cancellation token fires when we have entered shutdown(). This is a parent of
|
||||
@@ -622,6 +629,11 @@ impl Tenant {
|
||||
"attach tenant",
|
||||
false,
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
tracing::info!("Increment complete count");
|
||||
TENANT.startup_complete.inc();
|
||||
}
|
||||
|
||||
// Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
|
||||
let make_broken =
|
||||
|t: &Tenant, err: anyhow::Error| {
|
||||
@@ -648,6 +660,54 @@ impl Tenant {
|
||||
.as_mut()
|
||||
.and_then(|x| x.initial_tenant_load_remote.take());
|
||||
|
||||
enum AttachType<'a> {
|
||||
// During pageserver startup, we are attaching this tenant lazily in the background
|
||||
Warmup(tokio::sync::SemaphorePermit<'a>),
|
||||
// During pageserver startup, we are attaching this tenant as soon as we can,
|
||||
// because a client tried to access it.
|
||||
OnDemand,
|
||||
// During normal operations after startup, we are attaching a tenant.
|
||||
Normal,
|
||||
|
||||
}
|
||||
|
||||
// Before doing any I/O, wait for either or:
|
||||
// - A client to attempt to access to this tenant (on-demand loading)
|
||||
// - A permit to become available in the warmup semaphore (background warmup)
|
||||
let attach_type = if let Some(init_order) = &init_order {
|
||||
tokio::select!(
|
||||
_ = tenant_clone.activate_now.notified() => {
|
||||
tracing::info!("Activating tenant (on-demand)");
|
||||
AttachType::OnDemand
|
||||
},
|
||||
permit_result = init_order.warmup_limit.acquire() => {
|
||||
match permit_result {
|
||||
Ok(p) => {
|
||||
tracing::info!("Activating tenant (warmup)");
|
||||
AttachType::Warmup(p)
|
||||
}
|
||||
Err(_) => {
|
||||
// This is unexpected: the warmup semaphore should stay alive
|
||||
// for the lifetime of init_order. Log a warning and proceed.
|
||||
tracing::warn!("warmup_limit semaphore unexpectedly closed");
|
||||
AttachType::Normal
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
_ = tenant_clone.cancel.cancelled() => {
|
||||
// This is safe, but should be pretty rare: it is interesting if a tenant
|
||||
// stayed in Activating for such a long time that shutdown found it in
|
||||
// that state.
|
||||
tracing::info!("Tenant shut down before activation");
|
||||
return Ok(());
|
||||
},
|
||||
)
|
||||
} else {
|
||||
AttachType::Normal
|
||||
};
|
||||
|
||||
let preload_timer = TENANT.preload.start_timer();
|
||||
let preload = match mode {
|
||||
SpawnMode::Create => {None},
|
||||
SpawnMode::Normal => {
|
||||
@@ -670,6 +730,7 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
};
|
||||
preload_timer.observe_duration();
|
||||
|
||||
// Remote preload is complete.
|
||||
drop(remote_load_completion);
|
||||
@@ -721,6 +782,7 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
let attach_timer = TENANT.attach.start_timer();
|
||||
match tenant_clone.attach(preload, &ctx).await {
|
||||
Ok(()) => {
|
||||
info!("attach finished, activating");
|
||||
@@ -730,6 +792,28 @@ impl Tenant {
|
||||
make_broken(&tenant_clone, anyhow::anyhow!(e));
|
||||
}
|
||||
}
|
||||
attach_timer.observe_duration();
|
||||
|
||||
// If we are doing an opportunistic warmup attachment at startup, initialize
|
||||
// logical size at the same time. This is better than starting a bunch of idle tenants
|
||||
// with cold caches and then coming back later to initialize their logical sizes.
|
||||
//
|
||||
// It also prevents the warmup proccess competing with the concurrency limit on
|
||||
// logical size calculations: if logical size calculation semaphore is saturated,
|
||||
// then warmup will wait for that before proceeding to the next tenant.
|
||||
if let AttachType::Warmup(_permit) = attach_type {
|
||||
let mut futs = FuturesUnordered::new();
|
||||
let timelines: Vec<_> = tenant_clone.timelines.lock().unwrap().values().cloned().collect();
|
||||
for t in timelines {
|
||||
futs.push(t.await_initial_logical_size())
|
||||
}
|
||||
tracing::info!("Waiting for initial logical sizes while warming up...");
|
||||
while futs.next().await.is_some() {
|
||||
|
||||
}
|
||||
tracing::info!("Warm-up complete");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.instrument({
|
||||
@@ -1696,6 +1780,15 @@ impl Tenant {
|
||||
Ok(loaded_timeline)
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_timeline(
|
||||
self: Arc<Self>,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
DeleteTimelineFlow::run(&self, timeline_id, false).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// perform one garbage collection iteration, removing old data files from disk.
|
||||
/// this function is periodically called by gc task.
|
||||
/// also it can be explicitly requested through page server api 'do_gc' command.
|
||||
@@ -1857,7 +1950,7 @@ impl Tenant {
|
||||
);
|
||||
*current_state = TenantState::Active;
|
||||
|
||||
let elapsed = self.loading_started_at.elapsed();
|
||||
let elapsed = self.constructed_at.elapsed();
|
||||
let total_timelines = timelines_accessor.len();
|
||||
|
||||
// log a lot of stuff, because some tenants sometimes suffer from user-visible
|
||||
@@ -1872,7 +1965,7 @@ impl Tenant {
|
||||
"activation attempt finished"
|
||||
);
|
||||
|
||||
TENANT_ACTIVATION.observe(elapsed.as_secs_f64());
|
||||
TENANT.activation.observe(elapsed.as_secs_f64());
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -2127,18 +2220,35 @@ impl Tenant {
|
||||
self.state.subscribe()
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_to_become_active(&self) -> Result<(), GetActiveTenantError> {
|
||||
pub(crate) async fn wait_to_become_active(
|
||||
&self,
|
||||
timeout: Duration,
|
||||
) -> Result<(), GetActiveTenantError> {
|
||||
self.activate_now.notify_one();
|
||||
let mut receiver = self.state.subscribe();
|
||||
loop {
|
||||
let current_state = receiver.borrow_and_update().clone();
|
||||
match current_state {
|
||||
TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
|
||||
// in these states, there's a chance that we can reach ::Active
|
||||
receiver.changed().await.map_err(
|
||||
|_e: tokio::sync::watch::error::RecvError|
|
||||
// Tenant existed but was dropped: report it as non-existent
|
||||
GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_shard_id.tenant_id))
|
||||
)?;
|
||||
match timeout_cancellable(timeout, &self.cancel, receiver.changed()).await {
|
||||
Ok(r) => {
|
||||
r.map_err(
|
||||
|_e: tokio::sync::watch::error::RecvError|
|
||||
// Tenant existed but was dropped: report it as non-existent
|
||||
GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_shard_id.tenant_id))
|
||||
)?
|
||||
}
|
||||
Err(TimeoutCancellableError::Cancelled) => {
|
||||
return Err(GetActiveTenantError::Cancelled);
|
||||
}
|
||||
Err(TimeoutCancellableError::Timeout) => {
|
||||
return Err(GetActiveTenantError::WaitForActiveTimeout {
|
||||
latest_state: Some(self.current_state()),
|
||||
wait_time: timeout,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
TenantState::Active { .. } => {
|
||||
return Ok(());
|
||||
@@ -2463,7 +2573,7 @@ impl Tenant {
|
||||
conf,
|
||||
// using now here is good enough approximation to catch tenants with really long
|
||||
// activation times.
|
||||
loading_started_at: Instant::now(),
|
||||
constructed_at: Instant::now(),
|
||||
tenant_conf: Arc::new(RwLock::new(attached_conf)),
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
timelines_creating: Mutex::new(HashSet::new()),
|
||||
@@ -2475,6 +2585,7 @@ impl Tenant {
|
||||
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
|
||||
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
||||
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
|
||||
activate_now: tokio::sync::Notify::new(),
|
||||
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
|
||||
cancel: CancellationToken::default(),
|
||||
gate: Gate::new(format!("Tenant<{tenant_shard_id}>")),
|
||||
|
||||
@@ -28,7 +28,7 @@ use crate::control_plane_client::{
|
||||
ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError,
|
||||
};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::metrics::TENANT_MANAGER as METRICS;
|
||||
use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::{
|
||||
AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, TenantConfOpt,
|
||||
@@ -44,7 +44,6 @@ use utils::generation::Generation;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::delete::DeleteTenantError;
|
||||
use super::timeline::delete::DeleteTimelineFlow;
|
||||
use super::TenantSharedResources;
|
||||
|
||||
/// For a tenant that appears in TenantsMap, it may either be
|
||||
@@ -430,6 +429,13 @@ pub async fn init_tenant_mgr(
|
||||
let tenant_generations =
|
||||
init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
|
||||
|
||||
tracing::info!(
|
||||
"Attaching {} tenants at startup, {} at a time",
|
||||
tenant_configs.len(),
|
||||
init_order.warmup_limit.available_permits()
|
||||
);
|
||||
TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64);
|
||||
|
||||
// Construct `Tenant` objects and start them running
|
||||
for (tenant_shard_id, location_conf) in tenant_configs {
|
||||
let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
|
||||
@@ -848,17 +854,6 @@ impl TenantManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_timeline(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
let tenant = self.get_attached_tenant_shard(tenant_shard_id, true)?;
|
||||
DeleteTimelineFlow::run(&tenant, timeline_id, false).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
|
||||
pub(crate) async fn upsert_location(
|
||||
&self,
|
||||
@@ -1221,7 +1216,10 @@ pub(crate) async fn get_active_tenant_with_timeout(
|
||||
// Fast path: we don't need to do any async waiting.
|
||||
return Ok(tenant.clone());
|
||||
}
|
||||
_ => (WaitFor::Tenant(tenant.clone()), tenant_shard_id),
|
||||
_ => {
|
||||
tenant.activate_now.notify_one();
|
||||
(WaitFor::Tenant(tenant.clone()), tenant_shard_id)
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(TenantSlot::Secondary) => {
|
||||
@@ -1275,28 +1273,10 @@ pub(crate) async fn get_active_tenant_with_timeout(
|
||||
};
|
||||
|
||||
tracing::debug!("Waiting for tenant to enter active state...");
|
||||
match timeout_cancellable(
|
||||
deadline.duration_since(Instant::now()),
|
||||
cancel,
|
||||
tenant.wait_to_become_active(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(())) => Ok(tenant),
|
||||
Ok(Err(e)) => Err(e),
|
||||
Err(TimeoutCancellableError::Timeout) => {
|
||||
let latest_state = tenant.current_state();
|
||||
if latest_state == TenantState::Active {
|
||||
Ok(tenant)
|
||||
} else {
|
||||
Err(GetActiveTenantError::WaitForActiveTimeout {
|
||||
latest_state: Some(latest_state),
|
||||
wait_time: timeout,
|
||||
})
|
||||
}
|
||||
}
|
||||
Err(TimeoutCancellableError::Cancelled) => Err(GetActiveTenantError::Cancelled),
|
||||
}
|
||||
tenant
|
||||
.wait_to_become_active(deadline.duration_since(Instant::now()))
|
||||
.await?;
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_tenant(
|
||||
|
||||
@@ -1903,6 +1903,7 @@ impl Timeline {
|
||||
.set((calculated_size, metrics_guard.calculation_result_saved()))
|
||||
.ok()
|
||||
.expect("only this task sets it");
|
||||
self.current_logical_size.initialized.notify_one();
|
||||
}
|
||||
|
||||
pub fn spawn_ondemand_logical_size_calculation(
|
||||
@@ -3104,6 +3105,24 @@ impl Timeline {
|
||||
|
||||
Ok(image_layers)
|
||||
}
|
||||
|
||||
/// Wait until the background initial logical size calculation is complete, or
|
||||
/// this Timeline is shut down. Calling this function will cause the initial
|
||||
/// logical size calculation to skip waiting for the background jobs barrier.
|
||||
pub(crate) async fn await_initial_logical_size(self: Arc<Self>) {
|
||||
if let Some(await_bg_cancel) = self
|
||||
.current_logical_size
|
||||
.cancel_wait_for_background_loop_concurrency_limit_semaphore
|
||||
.get()
|
||||
{
|
||||
await_bg_cancel.cancel();
|
||||
}
|
||||
|
||||
tokio::select!(
|
||||
_ = self.current_logical_size.initialized.notified() => {},
|
||||
_ = self.cancel.cancelled() => {}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
||||
@@ -34,6 +34,9 @@ pub(super) struct LogicalSize {
|
||||
pub(crate) cancel_wait_for_background_loop_concurrency_limit_semaphore:
|
||||
OnceCell<CancellationToken>,
|
||||
|
||||
/// Once the initial logical size is initialized, this is notified.
|
||||
pub(crate) initialized: tokio::sync::Notify,
|
||||
|
||||
/// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
|
||||
pub initial_part_end: Option<Lsn>,
|
||||
|
||||
@@ -125,6 +128,7 @@ impl LogicalSize {
|
||||
initial_part_end: None,
|
||||
size_added_after_initial: AtomicI64::new(0),
|
||||
did_return_approximate_to_walreceiver: AtomicBool::new(false),
|
||||
initialized: tokio::sync::Notify::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,6 +139,7 @@ impl LogicalSize {
|
||||
initial_part_end: Some(compute_to),
|
||||
size_added_after_initial: AtomicI64::new(0),
|
||||
did_return_approximate_to_walreceiver: AtomicBool::new(false),
|
||||
initialized: tokio::sync::Notify::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -300,7 +300,8 @@ def test_timeline_initial_logical_size_calculation_cancellation(
|
||||
env = neon_env_builder.init_start()
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
# load in some data
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
@@ -732,3 +733,139 @@ def wait_for_timeline_size_init(
|
||||
raise Exception(
|
||||
f"timed out while waiting for current_logical_size of a timeline to reach its non-incremental value, details: {timeline_details}"
|
||||
)
|
||||
|
||||
|
||||
def test_ondemand_activation(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Tenants warmuping up opportunistically will wait for one another's logical size calculations to complete
|
||||
before proceeding. However, they skip this if a client is actively trying to access them.
|
||||
|
||||
This test is not purely about logical sizes, but logical size calculation is the phase that we
|
||||
use as a proxy for "warming up" in this test: it happens within the semaphore guard used
|
||||
to limit concurrent tenant warm-up.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# Create some tenants
|
||||
n_tenants = 10
|
||||
tenant_ids = {env.initial_tenant}
|
||||
for _i in range(0, n_tenants - 1):
|
||||
tenant_id = TenantId.generate()
|
||||
env.pageserver.tenant_create(tenant_id)
|
||||
|
||||
# Empty tenants are not subject to waiting for logical size calculations, because
|
||||
# those hapen on timeline level
|
||||
branch_name = "main"
|
||||
timeline_id = TimelineId.generate()
|
||||
env.neon_cli.create_timeline(
|
||||
new_branch_name=branch_name, tenant_id=tenant_id, timeline_id=timeline_id
|
||||
)
|
||||
|
||||
tenant_ids.add(tenant_id)
|
||||
|
||||
# Restart pageserver with logical size calculations paused
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start(
|
||||
extra_env_vars={"FAILPOINTS": "timeline-calculate-logical-size-pause=pause"}
|
||||
)
|
||||
|
||||
def get_tenant_states():
|
||||
states = {}
|
||||
for tenant_id in tenant_ids:
|
||||
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
|
||||
states[tenant_id] = tenant["state"]["slug"]
|
||||
log.info(f"Tenant states: {states}")
|
||||
return states
|
||||
|
||||
def at_least_one_active():
|
||||
assert "Active" in set(get_tenant_states().values())
|
||||
|
||||
# One tenant should activate, then get stuck in their logical size calculation
|
||||
wait_until(10, 1, at_least_one_active)
|
||||
|
||||
# Wait some walltime to gain confidence that other tenants really are stuck and not proceeding to activate
|
||||
time.sleep(5)
|
||||
|
||||
# We should see one tenant win the activation race, and enter logical size calculation. The rest
|
||||
# will stay in Attaching state, waiting for the "warmup_limit" semaphore
|
||||
expect_activated = 1
|
||||
states = get_tenant_states()
|
||||
assert len([s for s in states.values() if s == "Active"]) == expect_activated
|
||||
assert len([s for s in states.values() if s == "Attaching"]) == n_tenants - expect_activated
|
||||
|
||||
assert (
|
||||
pageserver_http.get_metric_value("pageserver_tenant_startup_scheduled_total") == n_tenants
|
||||
)
|
||||
|
||||
# This is zero, and subsequent checks are expect_activated - 1, because this counter does not
|
||||
# count how may tenants are Active, it counts how many have finished warmup. The first tenant
|
||||
# that reached Active is still stuck in its local size calculation, and has therefore not finished warmup.
|
||||
assert pageserver_http.get_metric_value("pageserver_tenant_startup_complete_total") == 0
|
||||
|
||||
# If a client accesses one of the blocked tenants, it should skip waiting for warmup and
|
||||
# go active as fast as it can.
|
||||
stuck_tenant_id = list(
|
||||
[(tid, s) for (tid, s) in get_tenant_states().items() if s == "Attaching"]
|
||||
)[0][0]
|
||||
|
||||
endpoint = env.endpoints.create_start(branch_name="main", tenant_id=stuck_tenant_id)
|
||||
endpoint.safe_psql_many(
|
||||
[
|
||||
"CREATE TABLE foo (x INTEGER)",
|
||||
"INSERT INTO foo SELECT g FROM generate_series(1, 10) g",
|
||||
]
|
||||
)
|
||||
endpoint.stop()
|
||||
|
||||
# That one that we successfully accessed is now Active
|
||||
expect_activated += 1
|
||||
assert pageserver_http.tenant_status(tenant_id=stuck_tenant_id)["state"]["slug"] == "Active"
|
||||
assert (
|
||||
pageserver_http.get_metric_value("pageserver_tenant_startup_complete_total")
|
||||
== expect_activated - 1
|
||||
)
|
||||
|
||||
# The ones we didn't touch are still in Attaching
|
||||
assert (
|
||||
len([s for s in get_tenant_states().values() if s == "Attaching"])
|
||||
== n_tenants - expect_activated
|
||||
)
|
||||
|
||||
# Timeline creation operations also wake up Attaching tenants
|
||||
stuck_tenant_id = list(
|
||||
[(tid, s) for (tid, s) in get_tenant_states().items() if s == "Attaching"]
|
||||
)[0][0]
|
||||
pageserver_http.timeline_create(env.pg_version, stuck_tenant_id, TimelineId.generate())
|
||||
expect_activated += 1
|
||||
assert pageserver_http.tenant_status(tenant_id=stuck_tenant_id)["state"]["slug"] == "Active"
|
||||
assert (
|
||||
len([s for s in get_tenant_states().values() if s == "Attaching"])
|
||||
== n_tenants - expect_activated
|
||||
)
|
||||
|
||||
assert (
|
||||
pageserver_http.get_metric_value("pageserver_tenant_startup_complete_total")
|
||||
== expect_activated - 1
|
||||
)
|
||||
|
||||
# When we unblock logical size calculation, all tenants should proceed to active state via
|
||||
# the warmup route.
|
||||
pageserver_http.configure_failpoints(("timeline-calculate-logical-size-pause", "off"))
|
||||
|
||||
def all_active():
|
||||
assert all(s == "Active" for s in get_tenant_states().values())
|
||||
|
||||
wait_until(10, 1, all_active)
|
||||
|
||||
# Final control check: restarting with no failpoints at all results in all tenants coming active
|
||||
# without being prompted by client I/O
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
wait_until(10, 1, all_active)
|
||||
|
||||
assert (
|
||||
pageserver_http.get_metric_value("pageserver_tenant_startup_scheduled_total") == n_tenants
|
||||
)
|
||||
assert pageserver_http.get_metric_value("pageserver_tenant_startup_complete_total") == n_tenants
|
||||
|
||||
Reference in New Issue
Block a user