diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index bfe1520e68..0415ed05bd 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -49,6 +49,11 @@ impl ResponseErrorMessageExt for reqwest::Response { } } +pub enum ForceAwaitLogicalSize { + Yes, + No, +} + impl Client { pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self { Self { @@ -92,11 +97,18 @@ impl Client { &self, tenant_id: TenantId, timeline_id: TimelineId, + force_await_logical_size: ForceAwaitLogicalSize, ) -> Result { let uri = format!( "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}", self.mgmt_api_endpoint ); + + let uri = match force_await_logical_size { + ForceAwaitLogicalSize::Yes => format!("{}?force-await-logical-size={}", uri, true), + ForceAwaitLogicalSize::No => uri, + }; + self.get(&uri) .await? .json() diff --git a/pageserver/pagebench/src/cmd/basebackup.rs b/pageserver/pagebench/src/cmd/basebackup.rs index 85a3e695de..2d61b0e252 100644 --- a/pageserver/pagebench/src/cmd/basebackup.rs +++ b/pageserver/pagebench/src/cmd/basebackup.rs @@ -1,4 +1,5 @@ use anyhow::Context; +use pageserver_client::mgmt_api::ForceAwaitLogicalSize; use pageserver_client::page_service::BasebackupRequest; use utils::id::TenantTimelineId; @@ -92,10 +93,12 @@ async fn main_impl( for timeline in &timelines { js.spawn({ let timeline = *timeline; - // FIXME: this triggers initial logical size calculation - // https://github.com/neondatabase/neon/issues/6168 let info = mgmt_api_client - .timeline_info(timeline.tenant_id, timeline.timeline_id) + .timeline_info( + timeline.tenant_id, + timeline.timeline_id, + ForceAwaitLogicalSize::No, + ) .await .unwrap(); async move { diff --git a/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs b/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs index d46ae94e8a..98938d780a 100644 --- a/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs +++ b/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs @@ -4,6 +4,8 @@ use humantime::Duration; use tokio::task::JoinSet; use utils::id::TenantTimelineId; +use pageserver_client::mgmt_api::ForceAwaitLogicalSize; + #[derive(clap::Parser)] pub(crate) struct Args { #[clap(long, default_value = "http://localhost:9898")] @@ -56,14 +58,15 @@ async fn main_impl(args: Args) -> anyhow::Result<()> { for tl in timelines { let mgmt_api_client = Arc::clone(&mgmt_api_client); js.spawn(async move { - // TODO: API to explicitly trigger initial logical size computation. - // Should probably also avoid making it a side effect of timeline details to trigger initial logical size calculation. - // => https://github.com/neondatabase/neon/issues/6168 let info = mgmt_api_client - .timeline_info(tl.tenant_id, tl.timeline_id) + .timeline_info(tl.tenant_id, tl.timeline_id, ForceAwaitLogicalSize::Yes) .await .unwrap(); + // Polling should not be strictly required here since we await + // for the initial logical size, however it's possible for the request + // to land before the timeline is initialised. This results in an approximate + // logical size. if let Some(period) = args.poll_for_completion { let mut ticker = tokio::time::interval(period.into()); ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); @@ -71,7 +74,7 @@ async fn main_impl(args: Args) -> anyhow::Result<()> { while !info.current_logical_size_is_accurate { ticker.tick().await; info = mgmt_api_client - .timeline_info(tl.tenant_id, tl.timeline_id) + .timeline_info(tl.tenant_id, tl.timeline_id, ForceAwaitLogicalSize::Yes) .await .unwrap(); } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index feca08aeaf..af56a1b455 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -323,11 +323,21 @@ impl From for ApiError { async fn build_timeline_info( timeline: &Arc, include_non_incremental_logical_size: bool, + force_await_initial_logical_size: bool, ctx: &RequestContext, ) -> anyhow::Result { crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); - let mut info = build_timeline_info_common(timeline, ctx).await?; + if force_await_initial_logical_size { + timeline.clone().await_initial_logical_size().await + } + + let mut info = build_timeline_info_common( + timeline, + ctx, + tenant::timeline::GetLogicalSizePriority::Background, + ) + .await?; if include_non_incremental_logical_size { // XXX we should be using spawn_ondemand_logical_size_calculation here. // Otherwise, if someone deletes the timeline / detaches the tenant while @@ -344,6 +354,7 @@ async fn build_timeline_info( async fn build_timeline_info_common( timeline: &Arc, ctx: &RequestContext, + logical_size_task_priority: tenant::timeline::GetLogicalSizePriority, ) -> anyhow::Result { crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); let initdb_lsn = timeline.initdb_lsn; @@ -366,8 +377,7 @@ async fn build_timeline_info_common( Lsn(0) => None, lsn @ Lsn(_) => Some(lsn), }; - let current_logical_size = - timeline.get_current_logical_size(tenant::timeline::GetLogicalSizePriority::User, ctx); + let current_logical_size = timeline.get_current_logical_size(logical_size_task_priority, ctx); let current_physical_size = Some(timeline.layer_size_sum().await); let state = timeline.current_state(); let remote_consistent_lsn_projected = timeline @@ -478,7 +488,7 @@ async fn timeline_create_handler( .await { Ok(new_timeline) => { // Created. Construct a TimelineInfo for it. - let timeline_info = build_timeline_info_common(&new_timeline, &ctx) + let timeline_info = build_timeline_info_common(&new_timeline, &ctx, tenant::timeline::GetLogicalSizePriority::User) .await .map_err(ApiError::InternalServerError)?; json_response(StatusCode::CREATED, timeline_info) @@ -514,6 +524,8 @@ async fn timeline_list_handler( let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let include_non_incremental_logical_size: Option = parse_query_param(&request, "include-non-incremental-logical-size")?; + let force_await_initial_logical_size: Option = + parse_query_param(&request, "force-await-initial-logical-size")?; check_permission(&request, Some(tenant_shard_id.tenant_id))?; let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); @@ -527,6 +539,7 @@ async fn timeline_list_handler( let timeline_info = build_timeline_info( &timeline, include_non_incremental_logical_size.unwrap_or(false), + force_await_initial_logical_size.unwrap_or(false), &ctx, ) .instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id)) @@ -554,6 +567,8 @@ async fn timeline_detail_handler( let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; let include_non_incremental_logical_size: Option = parse_query_param(&request, "include-non-incremental-logical-size")?; + let force_await_initial_logical_size: Option = + parse_query_param(&request, "force-await-initial-logical-size")?; check_permission(&request, Some(tenant_shard_id.tenant_id))?; // Logical size calculation needs downloading. @@ -569,6 +584,7 @@ async fn timeline_detail_handler( let timeline_info = build_timeline_info( &timeline, include_non_incremental_logical_size.unwrap_or(false), + force_await_initial_logical_size.unwrap_or(false), &ctx, ) .await diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 8f7a5769a3..371b7465eb 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -130,6 +130,13 @@ macro_rules! pausable_failpoint { .expect("spawn_blocking"); } }; + ($name:literal, $cond:expr) => { + if cfg!(feature = "testing") { + if $cond { + pausable_failpoint!($name) + } + } + }; } pub mod blob_io; diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index f5adf9d639..3f29e9f6a5 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -945,8 +945,18 @@ impl LayerInner { Ok((Err(e), _permit)) => { // sleep already happened in the spawned task, if it was not cancelled let consecutive_failures = self.consecutive_failures.load(Ordering::Relaxed); - tracing::error!(consecutive_failures, "layer file download failed: {e:#}"); - Err(DownloadError::DownloadFailed) + + match e.downcast_ref::() { + // If the download failed due to its cancellation token, + // propagate the cancellation error upstream. + Some(remote_storage::DownloadError::Cancelled) => { + Err(DownloadError::DownloadCancelled) + } + _ => { + tracing::error!(consecutive_failures, "layer file download failed: {e:#}"); + Err(DownloadError::DownloadFailed) + } + } } Err(_gone) => Err(DownloadError::DownloadCancelled), } diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index aa5894cc37..2b2fcc7711 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -65,6 +65,11 @@ pub(crate) async fn concurrent_background_tasks_rate_limit_permit( .with_label_values(&[loop_kind.as_static_str()]) .guard(); + pausable_failpoint!( + "initial-size-calculation-permit-pause", + loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation + ); + match CONCURRENT_BACKGROUND_TASKS.acquire().await { Ok(permit) => permit, Err(_closed) => unreachable!("we never close the semaphore"), diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index a779dcc436..b24de342f8 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -441,6 +441,7 @@ class PageserverHttpClient(requests.Session): timeline_id: TimelineId, include_non_incremental_logical_size: bool = False, include_timeline_dir_layer_file_size_sum: bool = False, + force_await_initial_logical_size: bool = False, **kwargs, ) -> Dict[Any, Any]: params = {} @@ -448,6 +449,8 @@ class PageserverHttpClient(requests.Session): params["include-non-incremental-logical-size"] = "true" if include_timeline_dir_layer_file_size_sum: params["include-timeline-dir-layer-file-size-sum"] = "true" + if force_await_initial_logical_size: + params["force-await-initial-logical-size"] = "true" res = self.get( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}", diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index 11685d1d48..92f2e72378 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -923,3 +923,68 @@ def test_ondemand_activation(neon_env_builder: NeonEnvBuilder): # Check that all the stuck tenants proceed to active (apart from the one that deletes) wait_until(10, 1, all_active) assert len(get_tenant_states()) == n_tenants - 1 + + +def test_timeline_logical_size_task_priority(neon_env_builder: NeonEnvBuilder): + """ + /v1/tenant/:tenant_shard_id/timeline and /v1/tenant/:tenant_shard_id + should not bump the priority of the initial logical size computation + background task, unless the force-await-initial-logical-size query param + is set to true. + + This test verifies the invariant stated above. A couple of tricks are involved: + 1. Detach the tenant and re-attach it after the page server is restarted. This circumvents + the warm-up which forces the initial logical size calculation. + 2. A fail point (initial-size-calculation-permit-pause) is used to block the initial + computation of the logical size until forced. + 3. A fail point (walreceiver-after-ingest) is used to pause the walreceiver since + otherwise it would force the logical size computation. + """ + env = neon_env_builder.init_start() + client = env.pageserver.http_client() + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + # load in some data + endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) + endpoint.safe_psql_many( + [ + "CREATE TABLE foo (x INTEGER)", + "INSERT INTO foo SELECT g FROM generate_series(1, 10000) g", + ] + ) + wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) + + # restart with failpoint inside initial size calculation task + log.info(f"Detaching tenant {tenant_id} and stopping pageserver...") + + endpoint.stop() + env.pageserver.tenant_detach(tenant_id) + env.pageserver.stop() + env.pageserver.start( + extra_env_vars={ + "FAILPOINTS": "initial-size-calculation-permit-pause=pause;walreceiver-after-ingest=pause" + } + ) + + log.info(f"Re-attaching tenant {tenant_id}...") + env.pageserver.tenant_attach(tenant_id) + + # kick off initial size calculation task (the response we get here is the estimated size) + def assert_initial_logical_size_not_prioritised(): + details = client.timeline_detail(tenant_id, timeline_id) + assert details["current_logical_size_is_accurate"] is False + + assert_initial_logical_size_not_prioritised() + + # ensure that's actually the case + time.sleep(2) + assert_initial_logical_size_not_prioritised() + + details = client.timeline_detail(tenant_id, timeline_id, force_await_initial_logical_size=True) + assert details["current_logical_size_is_accurate"] is True + + client.configure_failpoints( + [("initial-size-calculation-permit-pause", "off"), ("walreceiver-after-ingest", "off")] + )