diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index cd99cda783..bd63c4d860 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -41,6 +41,8 @@ use crate::{ TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX, }; +use self::defaults::DEFAULT_CONCURRENT_TENANT_WARMUP; + pub mod defaults { use crate::tenant::config::defaults::*; use const_format::formatcp; @@ -61,6 +63,8 @@ pub mod defaults { pub const DEFAULT_LOG_FORMAT: &str = "plain"; + pub const DEFAULT_CONCURRENT_TENANT_WARMUP: usize = 8; + pub const DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES: usize = super::ConfigurableSemaphore::DEFAULT_INITIAL.get(); @@ -94,6 +98,7 @@ pub mod defaults { #log_format = '{DEFAULT_LOG_FORMAT}' #concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}' +#concurrent_tenant_warmup = '{DEFAULT_CONCURRENT_TENANT_WARMUP}' #metric_collection_interval = '{DEFAULT_METRIC_COLLECTION_INTERVAL}' #cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}' @@ -180,6 +185,11 @@ pub struct PageServerConf { pub log_format: LogFormat, + /// Number of tenants which will be concurrently loaded from remote storage proactively on startup, + /// does not limit tenants loaded in response to client I/O. A lower value implicitly deprioritizes + /// loading such tenants, vs. other work in the system. + pub concurrent_tenant_warmup: ConfigurableSemaphore, + /// Number of concurrent [`Tenant::gather_size_inputs`](crate::tenant::Tenant::gather_size_inputs) allowed. pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore, /// Limit of concurrent [`Tenant::gather_size_inputs`] issued by module `eviction_task`. @@ -283,6 +293,7 @@ struct PageServerConfigBuilder { log_format: BuilderValue, + concurrent_tenant_warmup: BuilderValue, concurrent_tenant_size_logical_size_queries: BuilderValue, metric_collection_interval: BuilderValue, @@ -340,6 +351,8 @@ impl Default for PageServerConfigBuilder { .expect("cannot parse default keepalive interval")), log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()), + concurrent_tenant_warmup: Set(NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP) + .expect("Invalid default constant")), concurrent_tenant_size_logical_size_queries: Set( ConfigurableSemaphore::DEFAULT_INITIAL, ), @@ -453,6 +466,10 @@ impl PageServerConfigBuilder { self.log_format = BuilderValue::Set(log_format) } + pub fn concurrent_tenant_warmup(&mut self, u: NonZeroUsize) { + self.concurrent_tenant_warmup = BuilderValue::Set(u); + } + pub fn concurrent_tenant_size_logical_size_queries(&mut self, u: NonZeroUsize) { self.concurrent_tenant_size_logical_size_queries = BuilderValue::Set(u); } @@ -518,6 +535,9 @@ impl PageServerConfigBuilder { } pub fn build(self) -> anyhow::Result { + let concurrent_tenant_warmup = self + .concurrent_tenant_warmup + .ok_or(anyhow!("missing concurrent_tenant_warmup"))?; let concurrent_tenant_size_logical_size_queries = self .concurrent_tenant_size_logical_size_queries .ok_or(anyhow!( @@ -570,6 +590,7 @@ impl PageServerConfigBuilder { .broker_keepalive_interval .ok_or(anyhow!("No broker keepalive interval provided"))?, log_format: self.log_format.ok_or(anyhow!("missing log_format"))?, + concurrent_tenant_warmup: ConfigurableSemaphore::new(concurrent_tenant_warmup), concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::new( concurrent_tenant_size_logical_size_queries, ), @@ -807,6 +828,11 @@ impl PageServerConf { "log_format" => builder.log_format( LogFormat::from_config(&parse_toml_string(key, item)?)? ), + "concurrent_tenant_warmup" => builder.concurrent_tenant_warmup({ + let input = parse_toml_string(key, item)?; + let permits = input.parse::().context("expected a number of initial permits, not {s:?}")?; + NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")? + }), "concurrent_tenant_size_logical_size_queries" => builder.concurrent_tenant_size_logical_size_queries({ let input = parse_toml_string(key, item)?; let permits = input.parse::().context("expected a number of initial permits, not {s:?}")?; @@ -904,6 +930,10 @@ impl PageServerConf { broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(), broker_keepalive_interval: Duration::from_secs(5000), log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(), + concurrent_tenant_warmup: ConfigurableSemaphore::new( + NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP) + .expect("Invalid default constant"), + ), concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::default( ), @@ -1122,6 +1152,9 @@ background_task_maximum_delay = '334 s' storage_broker::DEFAULT_KEEPALIVE_INTERVAL )?, log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(), + concurrent_tenant_warmup: ConfigurableSemaphore::new( + NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP).unwrap() + ), concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::default(), @@ -1188,6 +1221,9 @@ background_task_maximum_delay = '334 s' broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(), broker_keepalive_interval: Duration::from_secs(5), log_format: LogFormat::Json, + concurrent_tenant_warmup: ConfigurableSemaphore::new( + NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP).unwrap() + ), concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::default(), diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 102c9d9a6a..47092c3bb0 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -38,6 +38,7 @@ use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL}; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::task_mgr::TaskKind; use crate::tenant::config::{LocationConf, TenantConfOpt}; +use crate::tenant::mgr::GetActiveTenantError; use crate::tenant::mgr::{ GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError, TenantSlotError, TenantSlotUpsertError, TenantStateError, @@ -67,6 +68,11 @@ use utils::{ // Imports only used for testing APIs use super::models::ConfigureFailpointsRequest; +// For APIs that require an Active tenant, how long should we block waiting for that state? +// This is not functionally necessary (clients will retry), but avoids generating a lot of +// failed API calls while tenants are activating. +const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000); + pub struct State { conf: &'static PageServerConf, tenant_manager: Arc, @@ -233,6 +239,19 @@ impl From for ApiError { } } +impl From for ApiError { + fn from(e: GetActiveTenantError) -> ApiError { + match e { + GetActiveTenantError::WillNotBecomeActive(_) => ApiError::Conflict(format!("{}", e)), + GetActiveTenantError::Cancelled => ApiError::ShuttingDown, + GetActiveTenantError::NotFound(gte) => gte.into(), + GetActiveTenantError::WaitForActiveTimeout { .. } => { + ApiError::ResourceUnavailable(format!("{}", e).into()) + } + } + } +} + impl From for ApiError { fn from(e: SetNewTenantConfigError) -> ApiError { match e { @@ -435,7 +454,10 @@ async fn timeline_create_handler( let state = get_state(&request); async { - let tenant = state.tenant_manager.get_attached_tenant_shard(tenant_shard_id, true)?; + let tenant = state.tenant_manager.get_attached_tenant_shard(tenant_shard_id, false)?; + + tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; + match tenant.create_timeline( new_timeline_id, request_data.ancestor_timeline_id.map(TimelineId::from), @@ -694,11 +716,23 @@ async fn timeline_delete_handler( let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; check_permission(&request, Some(tenant_shard_id.tenant_id))?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); let state = get_state(&request); - state.tenant_manager.delete_timeline(tenant_shard_id, timeline_id, &ctx) - .instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug(), %timeline_id)) + let tenant = state + .tenant_manager + .get_attached_tenant_shard(tenant_shard_id, false) + .map_err(|e| { + match e { + // GetTenantError has a built-in conversion to ApiError, but in this context we don't + // want to treat missing tenants as 404, to avoid ambiguity with successful deletions. + GetTenantError::NotFound(_) => ApiError::PreconditionFailed( + "Requested tenant is missing".to_string().into_boxed_str(), + ), + e => e.into(), + } + })?; + tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; + tenant.delete_timeline(timeline_id).instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug(), %timeline_id)) .await?; json_response(StatusCode::ACCEPTED, ()) @@ -1136,7 +1170,10 @@ async fn tenant_create_handler( // We created the tenant. Existing API semantics are that the tenant // is Active when this function returns. - if let res @ Err(_) = new_tenant.wait_to_become_active().await { + if let res @ Err(_) = new_tenant + .wait_to_become_active(ACTIVE_TENANT_TIMEOUT) + .await + { // This shouldn't happen because we just created the tenant directory // in tenant::mgr::create_tenant, and there aren't any remote timelines // to load, so, nothing can really fail during load. diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index ba6fd00bd1..45c01b71d1 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -684,14 +684,54 @@ pub static STARTUP_IS_LOADING: Lazy = Lazy::new(|| { .expect("Failed to register pageserver_startup_is_loading") }); -/// How long did tenants take to go from construction to active state? -pub(crate) static TENANT_ACTIVATION: Lazy = Lazy::new(|| { - register_histogram!( +/// Metrics related to the lifecycle of a [`crate::tenant::Tenant`] object: things +/// like how long it took to load. +/// +/// Note that these are process-global metrics, _not_ per-tenant metrics. Per-tenant +/// metrics are rather expensive, and usually fine grained stuff makes more sense +/// at a timeline level than tenant level. +pub(crate) struct TenantMetrics { + /// How long did tenants take to go from construction to active state? + pub(crate) activation: Histogram, + pub(crate) preload: Histogram, + pub(crate) attach: Histogram, + + /// How many tenants are included in the initial startup of the pagesrever? + pub(crate) startup_scheduled: IntCounter, + pub(crate) startup_complete: IntCounter, +} + +pub(crate) static TENANT: Lazy = Lazy::new(|| { + TenantMetrics { + activation: register_histogram!( "pageserver_tenant_activation_seconds", "Time taken by tenants to activate, in seconds", CRITICAL_OP_BUCKETS.into() ) - .expect("Failed to register pageserver_tenant_activation_seconds metric") + .expect("Failed to register metric"), + preload: register_histogram!( + "pageserver_tenant_preload_seconds", + "Time taken by tenants to load remote metadata on startup/attach, in seconds", + CRITICAL_OP_BUCKETS.into() + ) + .expect("Failed to register metric"), + attach: register_histogram!( + "pageserver_tenant_attach_seconds", + "Time taken by tenants to intialize, after remote metadata is already loaded", + CRITICAL_OP_BUCKETS.into() + ) + .expect("Failed to register metric"), + startup_scheduled: register_int_counter!( + "pageserver_tenant_startup_scheduled", + "Number of tenants included in pageserver startup (doesn't count tenants attached later)" + ).expect("Failed to register metric"), + startup_complete: register_int_counter!( + "pageserver_tenant_startup_complete", + "Number of tenants that have completed warm-up, or activated on-demand during initial startup: \ + should eventually reach `pageserver_tenant_startup_scheduled_total`. Does not include broken \ + tenants: such cases will lead to this metric never reaching the scheduled count." + ).expect("Failed to register metric"), +} }); /// Each `Timeline`'s [`EVICTIONS_WITH_LOW_RESIDENCE_DURATION`] metric. @@ -2213,6 +2253,9 @@ pub fn preinitialize_metrics() { // Deletion queue stats Lazy::force(&DELETION_QUEUE); + // Tenant stats + Lazy::force(&TENANT); + // Tenant manager stats Lazy::force(&TENANT_MANAGER); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0d1be33331..1478a1a445 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -36,6 +36,8 @@ use utils::crashsafe::path_with_suffix_extension; use utils::fs_ext; use utils::sync::gate::Gate; use utils::sync::gate::GateGuard; +use utils::timeout::timeout_cancellable; +use utils::timeout::TimeoutCancellableError; use self::config::AttachedLocationConfig; use self::config::AttachmentMode; @@ -59,7 +61,7 @@ use crate::deletion_queue::DeletionQueueClient; use crate::deletion_queue::DeletionQueueError; use crate::import_datadir; use crate::is_uninit_mark; -use crate::metrics::TENANT_ACTIVATION; +use crate::metrics::TENANT; use crate::metrics::{remove_tenant_metrics, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC}; use crate::repository::GcResult; use crate::task_mgr; @@ -226,7 +228,7 @@ pub struct Tenant { /// The value creation timestamp, used to measure activation delay, see: /// - loading_started_at: Instant, + constructed_at: Instant, state: watch::Sender, @@ -276,6 +278,11 @@ pub struct Tenant { eviction_task_tenant_state: tokio::sync::Mutex, + /// If the tenant is in Activating state, notify this to encourage it + /// to proceed to Active as soon as possible, rather than waiting for lazy + /// background warmup. + pub(crate) activate_now_sem: tokio::sync::Semaphore, + pub(crate) delete_progress: Arc>, // Cancellation token fires when we have entered shutdown(). This is a parent of @@ -622,6 +629,11 @@ impl Tenant { "attach tenant", false, async move { + scopeguard::defer! { + tracing::info!("Increment complete count"); + TENANT.startup_complete.inc(); + } + // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state. let make_broken = |t: &Tenant, err: anyhow::Error| { @@ -648,6 +660,56 @@ impl Tenant { .as_mut() .and_then(|x| x.initial_tenant_load_remote.take()); + enum AttachType<'a> { + // During pageserver startup, we are attaching this tenant lazily in the background + Warmup(tokio::sync::SemaphorePermit<'a>), + // During pageserver startup, we are attaching this tenant as soon as we can, + // because a client tried to access it. + OnDemand, + // During normal operations after startup, we are attaching a tenant. + Normal, + } + + // Before doing any I/O, wait for either or: + // - A client to attempt to access to this tenant (on-demand loading) + // - A permit to become available in the warmup semaphore (background warmup) + // + // Some-ness of init_order is how we know if we're attaching during startup or later + // in process lifetime. + let attach_type = if init_order.is_some() { + tokio::select!( + _ = tenant_clone.activate_now_sem.acquire() => { + tracing::info!("Activating tenant (on-demand)"); + AttachType::OnDemand + }, + permit_result = conf.concurrent_tenant_warmup.inner().acquire() => { + match permit_result { + Ok(p) => { + tracing::info!("Activating tenant (warmup)"); + AttachType::Warmup(p) + } + Err(_) => { + // This is unexpected: the warmup semaphore should stay alive + // for the lifetime of init_order. Log a warning and proceed. + tracing::warn!("warmup_limit semaphore unexpectedly closed"); + AttachType::Normal + } + } + + } + _ = tenant_clone.cancel.cancelled() => { + // This is safe, but should be pretty rare: it is interesting if a tenant + // stayed in Activating for such a long time that shutdown found it in + // that state. + tracing::info!(state=%tenant_clone.current_state(), "Tenant shut down before activation"); + return Ok(()); + }, + ) + } else { + AttachType::Normal + }; + + let preload_timer = TENANT.preload.start_timer(); let preload = match mode { SpawnMode::Create => {None}, SpawnMode::Normal => { @@ -670,6 +732,7 @@ impl Tenant { } } }; + preload_timer.observe_duration(); // Remote preload is complete. drop(remote_load_completion); @@ -721,15 +784,39 @@ impl Tenant { } } + let attach_timer = TENANT.attach.start_timer(); match tenant_clone.attach(preload, &ctx).await { Ok(()) => { info!("attach finished, activating"); + attach_timer.observe_duration(); tenant_clone.activate(broker_client, None, &ctx); } Err(e) => { + attach_timer.observe_duration(); make_broken(&tenant_clone, anyhow::anyhow!(e)); } } + + // If we are doing an opportunistic warmup attachment at startup, initialize + // logical size at the same time. This is better than starting a bunch of idle tenants + // with cold caches and then coming back later to initialize their logical sizes. + // + // It also prevents the warmup proccess competing with the concurrency limit on + // logical size calculations: if logical size calculation semaphore is saturated, + // then warmup will wait for that before proceeding to the next tenant. + if let AttachType::Warmup(_permit) = attach_type { + let mut futs = FuturesUnordered::new(); + let timelines: Vec<_> = tenant_clone.timelines.lock().unwrap().values().cloned().collect(); + for t in timelines { + futs.push(t.await_initial_logical_size()) + } + tracing::info!("Waiting for initial logical sizes while warming up..."); + while futs.next().await.is_some() { + + } + tracing::info!("Warm-up complete"); + } + Ok(()) } .instrument({ @@ -1696,6 +1783,15 @@ impl Tenant { Ok(loaded_timeline) } + pub(crate) async fn delete_timeline( + self: Arc, + timeline_id: TimelineId, + ) -> Result<(), DeleteTimelineError> { + DeleteTimelineFlow::run(&self, timeline_id, false).await?; + + Ok(()) + } + /// perform one garbage collection iteration, removing old data files from disk. /// this function is periodically called by gc task. /// also it can be explicitly requested through page server api 'do_gc' command. @@ -1857,7 +1953,7 @@ impl Tenant { ); *current_state = TenantState::Active; - let elapsed = self.loading_started_at.elapsed(); + let elapsed = self.constructed_at.elapsed(); let total_timelines = timelines_accessor.len(); // log a lot of stuff, because some tenants sometimes suffer from user-visible @@ -1872,7 +1968,7 @@ impl Tenant { "activation attempt finished" ); - TENANT_ACTIVATION.observe(elapsed.as_secs_f64()); + TENANT.activation.observe(elapsed.as_secs_f64()); }); } } @@ -2127,18 +2223,41 @@ impl Tenant { self.state.subscribe() } - pub(crate) async fn wait_to_become_active(&self) -> Result<(), GetActiveTenantError> { + /// The activate_now semaphore is initialized with zero units. As soon as + /// we add a unit, waiters will be able to acquire a unit and proceed. + pub(crate) fn activate_now(&self) { + self.activate_now_sem.add_permits(1); + } + + pub(crate) async fn wait_to_become_active( + &self, + timeout: Duration, + ) -> Result<(), GetActiveTenantError> { let mut receiver = self.state.subscribe(); loop { let current_state = receiver.borrow_and_update().clone(); match current_state { TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => { // in these states, there's a chance that we can reach ::Active - receiver.changed().await.map_err( - |_e: tokio::sync::watch::error::RecvError| - // Tenant existed but was dropped: report it as non-existent - GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_shard_id.tenant_id)) - )?; + self.activate_now(); + match timeout_cancellable(timeout, &self.cancel, receiver.changed()).await { + Ok(r) => { + r.map_err( + |_e: tokio::sync::watch::error::RecvError| + // Tenant existed but was dropped: report it as non-existent + GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_shard_id.tenant_id)) + )? + } + Err(TimeoutCancellableError::Cancelled) => { + return Err(GetActiveTenantError::Cancelled); + } + Err(TimeoutCancellableError::Timeout) => { + return Err(GetActiveTenantError::WaitForActiveTimeout { + latest_state: Some(self.current_state()), + wait_time: timeout, + }); + } + } } TenantState::Active { .. } => { return Ok(()); @@ -2463,7 +2582,7 @@ impl Tenant { conf, // using now here is good enough approximation to catch tenants with really long // activation times. - loading_started_at: Instant::now(), + constructed_at: Instant::now(), tenant_conf: Arc::new(RwLock::new(attached_conf)), timelines: Mutex::new(HashMap::new()), timelines_creating: Mutex::new(HashSet::new()), @@ -2475,6 +2594,7 @@ impl Tenant { cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()), cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)), eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()), + activate_now_sem: tokio::sync::Semaphore::new(0), delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())), cancel: CancellationToken::default(), gate: Gate::new(format!("Tenant<{tenant_shard_id}>")), diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index f53951e1d3..b2f14db9f7 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -28,7 +28,7 @@ use crate::control_plane_client::{ ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError, }; use crate::deletion_queue::DeletionQueueClient; -use crate::metrics::TENANT_MANAGER as METRICS; +use crate::metrics::{TENANT, TENANT_MANAGER as METRICS}; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::{ AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, TenantConfOpt, @@ -44,7 +44,6 @@ use utils::generation::Generation; use utils::id::{TenantId, TimelineId}; use super::delete::DeleteTenantError; -use super::timeline::delete::DeleteTimelineFlow; use super::TenantSharedResources; /// For a tenant that appears in TenantsMap, it may either be @@ -430,6 +429,13 @@ pub async fn init_tenant_mgr( let tenant_generations = init_load_generations(conf, &tenant_configs, &resources, &cancel).await?; + tracing::info!( + "Attaching {} tenants at startup, warming up {} at a time", + tenant_configs.len(), + conf.concurrent_tenant_warmup.initial_permits() + ); + TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64); + // Construct `Tenant` objects and start them running for (tenant_shard_id, location_conf) in tenant_configs { let tenant_dir_path = conf.tenant_path(&tenant_shard_id); @@ -848,17 +854,6 @@ impl TenantManager { } } - pub(crate) async fn delete_timeline( - &self, - tenant_shard_id: TenantShardId, - timeline_id: TimelineId, - _ctx: &RequestContext, - ) -> Result<(), DeleteTimelineError> { - let tenant = self.get_attached_tenant_shard(tenant_shard_id, true)?; - DeleteTimelineFlow::run(&tenant, timeline_id, false).await?; - Ok(()) - } - #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))] pub(crate) async fn upsert_location( &self, @@ -1221,7 +1216,10 @@ pub(crate) async fn get_active_tenant_with_timeout( // Fast path: we don't need to do any async waiting. return Ok(tenant.clone()); } - _ => (WaitFor::Tenant(tenant.clone()), tenant_shard_id), + _ => { + tenant.activate_now(); + (WaitFor::Tenant(tenant.clone()), tenant_shard_id) + } } } Some(TenantSlot::Secondary) => { @@ -1275,28 +1273,10 @@ pub(crate) async fn get_active_tenant_with_timeout( }; tracing::debug!("Waiting for tenant to enter active state..."); - match timeout_cancellable( - deadline.duration_since(Instant::now()), - cancel, - tenant.wait_to_become_active(), - ) - .await - { - Ok(Ok(())) => Ok(tenant), - Ok(Err(e)) => Err(e), - Err(TimeoutCancellableError::Timeout) => { - let latest_state = tenant.current_state(); - if latest_state == TenantState::Active { - Ok(tenant) - } else { - Err(GetActiveTenantError::WaitForActiveTimeout { - latest_state: Some(latest_state), - wait_time: timeout, - }) - } - } - Err(TimeoutCancellableError::Cancelled) => Err(GetActiveTenantError::Cancelled), - } + tenant + .wait_to_become_active(deadline.duration_since(Instant::now())) + .await?; + Ok(tenant) } pub(crate) async fn delete_tenant( diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7438215a68..1e84fa1848 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1734,6 +1734,7 @@ impl Timeline { self.current_logical_size.current_size().accuracy(), logical_size::Accuracy::Exact, ); + self.current_logical_size.initialized.add_permits(1); return; }; @@ -1779,6 +1780,11 @@ impl Timeline { cancel: CancellationToken, background_ctx: RequestContext, ) { + scopeguard::defer! { + // Irrespective of the outcome of this operation, we should unblock anyone waiting for it. + self.current_logical_size.initialized.add_permits(1); + } + enum BackgroundCalculationError { Cancelled, Other(anyhow::Error), @@ -3104,6 +3110,32 @@ impl Timeline { Ok(image_layers) } + + /// Wait until the background initial logical size calculation is complete, or + /// this Timeline is shut down. Calling this function will cause the initial + /// logical size calculation to skip waiting for the background jobs barrier. + pub(crate) async fn await_initial_logical_size(self: Arc) { + if let Some(await_bg_cancel) = self + .current_logical_size + .cancel_wait_for_background_loop_concurrency_limit_semaphore + .get() + { + await_bg_cancel.cancel(); + } else { + // We should not wait if we were not able to explicitly instruct + // the logical size cancellation to skip the concurrency limit semaphore. + // TODO: this is an unexpected case. We should restructure so that it + // can't happen. + tracing::info!( + "await_initial_logical_size: can't get semaphore cancel token, skipping" + ); + } + + tokio::select!( + _ = self.current_logical_size.initialized.acquire() => {}, + _ = self.cancel.cancelled() => {} + ) + } } #[derive(Default)] diff --git a/pageserver/src/tenant/timeline/logical_size.rs b/pageserver/src/tenant/timeline/logical_size.rs index f2db8c91fc..03bc59ea38 100644 --- a/pageserver/src/tenant/timeline/logical_size.rs +++ b/pageserver/src/tenant/timeline/logical_size.rs @@ -34,6 +34,9 @@ pub(super) struct LogicalSize { pub(crate) cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell, + /// Once the initial logical size is initialized, this is notified. + pub(crate) initialized: tokio::sync::Semaphore, + /// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines. pub initial_part_end: Option, @@ -125,6 +128,7 @@ impl LogicalSize { initial_part_end: None, size_added_after_initial: AtomicI64::new(0), did_return_approximate_to_walreceiver: AtomicBool::new(false), + initialized: tokio::sync::Semaphore::new(0), } } @@ -135,6 +139,7 @@ impl LogicalSize { initial_part_end: Some(compute_to), size_added_after_initial: AtomicI64::new(0), did_return_approximate_to_walreceiver: AtomicBool::new(false), + initialized: tokio::sync::Semaphore::new(0), } } diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index 24cbe34457..6e510b2eba 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -300,7 +300,8 @@ def test_timeline_initial_logical_size_calculation_cancellation( env = neon_env_builder.init_start() client = env.pageserver.http_client() - tenant_id, timeline_id = env.neon_cli.create_tenant() + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline # load in some data endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) @@ -732,3 +733,142 @@ def wait_for_timeline_size_init( raise Exception( f"timed out while waiting for current_logical_size of a timeline to reach its non-incremental value, details: {timeline_details}" ) + + +def test_ondemand_activation(neon_env_builder: NeonEnvBuilder): + """ + Tenants warmuping up opportunistically will wait for one another's logical size calculations to complete + before proceeding. However, they skip this if a client is actively trying to access them. + + This test is not purely about logical sizes, but logical size calculation is the phase that we + use as a proxy for "warming up" in this test: it happens within the semaphore guard used + to limit concurrent tenant warm-up. + """ + + # We will run with the limit set to 1, so that once we have one tenant stuck + # in a pausable failpoint, the rest are prevented from proceeding through warmup. + neon_env_builder.pageserver_config_override = "concurrent_tenant_warmup = '1'" + + env = neon_env_builder.init_start() + pageserver_http = env.pageserver.http_client() + + # Create some tenants + n_tenants = 10 + tenant_ids = {env.initial_tenant} + for _i in range(0, n_tenants - 1): + tenant_id = TenantId.generate() + env.pageserver.tenant_create(tenant_id) + + # Empty tenants are not subject to waiting for logical size calculations, because + # those hapen on timeline level + timeline_id = TimelineId.generate() + env.neon_cli.create_timeline( + new_branch_name="main", tenant_id=tenant_id, timeline_id=timeline_id + ) + + tenant_ids.add(tenant_id) + + # Restart pageserver with logical size calculations paused + env.pageserver.stop() + env.pageserver.start( + extra_env_vars={"FAILPOINTS": "timeline-calculate-logical-size-pause=pause"} + ) + + def get_tenant_states(): + states = {} + for tenant_id in tenant_ids: + tenant = pageserver_http.tenant_status(tenant_id=tenant_id) + states[tenant_id] = tenant["state"]["slug"] + log.info(f"Tenant states: {states}") + return states + + def at_least_one_active(): + assert "Active" in set(get_tenant_states().values()) + + # One tenant should activate, then get stuck in their logical size calculation + wait_until(10, 1, at_least_one_active) + + # Wait some walltime to gain confidence that other tenants really are stuck and not proceeding to activate + time.sleep(5) + + # We should see one tenant win the activation race, and enter logical size calculation. The rest + # will stay in Attaching state, waiting for the "warmup_limit" semaphore + expect_activated = 1 + states = get_tenant_states() + assert len([s for s in states.values() if s == "Active"]) == expect_activated + assert len([s for s in states.values() if s == "Attaching"]) == n_tenants - expect_activated + + assert ( + pageserver_http.get_metric_value("pageserver_tenant_startup_scheduled_total") == n_tenants + ) + + # This is zero, and subsequent checks are expect_activated - 1, because this counter does not + # count how may tenants are Active, it counts how many have finished warmup. The first tenant + # that reached Active is still stuck in its local size calculation, and has therefore not finished warmup. + assert pageserver_http.get_metric_value("pageserver_tenant_startup_complete_total") == 0 + + # If a client accesses one of the blocked tenants, it should skip waiting for warmup and + # go active as fast as it can. + stuck_tenant_id = list( + [(tid, s) for (tid, s) in get_tenant_states().items() if s == "Attaching"] + )[0][0] + + endpoint = env.endpoints.create_start(branch_name="main", tenant_id=stuck_tenant_id) + endpoint.safe_psql_many( + [ + "CREATE TABLE foo (x INTEGER)", + "INSERT INTO foo SELECT g FROM generate_series(1, 10) g", + ] + ) + endpoint.stop() + + # That one that we successfully accessed is now Active + expect_activated += 1 + assert pageserver_http.tenant_status(tenant_id=stuck_tenant_id)["state"]["slug"] == "Active" + assert ( + pageserver_http.get_metric_value("pageserver_tenant_startup_complete_total") + == expect_activated - 1 + ) + + # The ones we didn't touch are still in Attaching + assert ( + len([s for s in get_tenant_states().values() if s == "Attaching"]) + == n_tenants - expect_activated + ) + + # Timeline creation operations also wake up Attaching tenants + stuck_tenant_id = list( + [(tid, s) for (tid, s) in get_tenant_states().items() if s == "Attaching"] + )[0][0] + pageserver_http.timeline_create(env.pg_version, stuck_tenant_id, TimelineId.generate()) + expect_activated += 1 + assert pageserver_http.tenant_status(tenant_id=stuck_tenant_id)["state"]["slug"] == "Active" + assert ( + len([s for s in get_tenant_states().values() if s == "Attaching"]) + == n_tenants - expect_activated + ) + + assert ( + pageserver_http.get_metric_value("pageserver_tenant_startup_complete_total") + == expect_activated - 1 + ) + + # When we unblock logical size calculation, all tenants should proceed to active state via + # the warmup route. + pageserver_http.configure_failpoints(("timeline-calculate-logical-size-pause", "off")) + + def all_active(): + assert all(s == "Active" for s in get_tenant_states().values()) + + wait_until(10, 1, all_active) + + # Final control check: restarting with no failpoints at all results in all tenants coming active + # without being prompted by client I/O + env.pageserver.stop() + env.pageserver.start() + wait_until(10, 1, all_active) + + assert ( + pageserver_http.get_metric_value("pageserver_tenant_startup_scheduled_total") == n_tenants + ) + assert pageserver_http.get_metric_value("pageserver_tenant_startup_complete_total") == n_tenants