diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 7e2b376212..827f74fcce 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -355,6 +355,7 @@ fn start_pageserver( // consumer side) will be dropped once we can start the background jobs. Currently it is behind // completing all initial logical size calculations (init_logical_size_done_rx) and a timeout // (background_task_maximum_delay). + let (init_remote_done_tx, init_remote_done_rx) = utils::completion::channel(); let (init_done_tx, init_done_rx) = utils::completion::channel(); let (init_logical_size_done_tx, init_logical_size_done_rx) = utils::completion::channel(); @@ -362,7 +363,8 @@ fn start_pageserver( let (background_jobs_can_start, background_jobs_barrier) = utils::completion::channel(); let order = pageserver::InitializationOrder { - initial_tenant_load: Some(init_done_tx), + initial_tenant_load_remote: Some(init_done_tx), + initial_tenant_load: Some(init_remote_done_tx), initial_logical_size_can_start: init_done_rx.clone(), initial_logical_size_attempt: Some(init_logical_size_done_tx), background_jobs_can_start: background_jobs_barrier.clone(), @@ -388,6 +390,9 @@ fn start_pageserver( // NOTE: unlike many futures in pageserver, this one is cancellation-safe let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial load completed")); + init_remote_done_rx.wait().await; + startup_checkpoint("initial_tenant_load_remote", "Remote part of initial load completed"); + init_done_rx.wait().await; startup_checkpoint("initial_tenant_load", "Initial load completed"); STARTUP_IS_LOADING.set(0); diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 8199cd38e6..32d43053bd 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -173,6 +173,9 @@ fn is_walkdir_io_not_found(e: &walkdir::Error) -> bool { /// delaying is needed. #[derive(Clone)] pub struct InitializationOrder { + /// Each initial tenant load task carries this until it is done loading timelines from remote storage + pub initial_tenant_load_remote: Option, + /// Each initial tenant load task carries this until completion. pub initial_tenant_load: Option, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 8f7710ffcf..88b2423eec 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -23,12 +23,14 @@ use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::*; use utils::completion; +use utils::completion::Completion; use utils::crashsafe::path_with_suffix_extension; use std::cmp::min; use std::collections::hash_map::Entry; use std::collections::BTreeSet; use std::collections::HashMap; +use std::collections::HashSet; use std::fmt::Debug; use std::fmt::Display; use std::fs; @@ -185,6 +187,11 @@ impl AttachedTenantConf { } } } +struct TimelinePreload { + timeline_id: TimelineId, + client: RemoteTimelineClient, + index_part: Result, +} /// /// Tenant consists of multiple timelines. Keep them in a hash table. @@ -962,6 +969,9 @@ impl Tenant { let _completion = init_order .as_mut() .and_then(|x| x.initial_tenant_load.take()); + let remote_load_completion = init_order + .as_mut() + .and_then(|x| x.initial_tenant_load_remote.take()); // Dont block pageserver startup on figuring out deletion status let pending_deletion = { @@ -986,6 +996,7 @@ impl Tenant { // as we are no longer loading, signal completion by dropping // the completion while we resume deletion drop(_completion); + drop(remote_load_completion); // do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout let _ = init_order .as_mut() @@ -1011,7 +1022,10 @@ impl Tenant { let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start); - match tenant_clone.load(init_order.as_ref(), &ctx).await { + match tenant_clone + .load(init_order.as_ref(), remote_load_completion, &ctx) + .await + { Ok(()) => { debug!("load finished"); @@ -1175,6 +1189,52 @@ impl Tenant { }) } + async fn load_timeline_metadata( + self: &Arc, + timeline_ids: HashSet, + remote_storage: &GenericRemoteStorage, + ) -> anyhow::Result> { + let mut part_downloads = JoinSet::new(); + for timeline_id in timeline_ids { + let client = RemoteTimelineClient::new( + remote_storage.clone(), + self.deletion_queue_client.clone(), + self.conf, + self.tenant_id, + timeline_id, + self.generation, + ); + part_downloads.spawn( + async move { + debug!("starting index part download"); + + let index_part = client.download_index_file().await; + + debug!("finished index part download"); + + Result::<_, anyhow::Error>::Ok(TimelinePreload { + client, + timeline_id, + index_part, + }) + } + .map(move |res| { + res.with_context(|| format!("download index part for timeline {timeline_id}")) + }) + .instrument(info_span!("download_index_part", %timeline_id)), + ); + } + + let mut timeline_preloads: HashMap = HashMap::new(); + while let Some(result) = part_downloads.join_next().await { + let preload_result = result.context("join preload task")?; + let preload = preload_result?; + timeline_preloads.insert(preload.timeline_id, preload); + } + + Ok(timeline_preloads) + } + /// /// Background task to load in-memory data structures for this tenant, from /// files on disk. Used at pageserver startup. @@ -1183,14 +1243,13 @@ impl Tenant { async fn load( self: &Arc, init_order: Option<&InitializationOrder>, + remote_completion: Option, ctx: &RequestContext, ) -> anyhow::Result<()> { span::debug_assert_current_span_has_tenant_id(); debug!("loading tenant task"); - crate::failpoint_support::sleep_millis_async!("before-loading-tenant"); - // Load in-memory state to reflect the local files on disk // // Scan the directory, peek into the metadata file of each timeline, and @@ -1209,10 +1268,38 @@ impl Tenant { // FIXME original collect_timeline_files contained one more check: // 1. "Timeline has no ancestor and no layer files" + // Load remote content for timelines in this tenant + let all_timeline_ids = scan + .sorted_timelines_to_load + .iter() + .map(|i| i.0) + .chain(scan.timelines_to_resume_deletion.iter().map(|i| i.0)) + .collect(); + let mut preload = if let Some(remote_storage) = &self.remote_storage { + Some( + self.load_timeline_metadata(all_timeline_ids, remote_storage) + .await?, + ) + } else { + None + }; + + drop(remote_completion); + + crate::failpoint_support::sleep_millis_async!("before-loading-tenant"); + // Process loadable timelines first for (timeline_id, local_metadata) in scan.sorted_timelines_to_load { + let timeline_preload = preload.as_mut().map(|p| p.remove(&timeline_id).unwrap()); if let Err(e) = self - .load_local_timeline(timeline_id, local_metadata, init_order, ctx, false) + .load_local_timeline( + timeline_id, + local_metadata, + timeline_preload, + init_order, + ctx, + false, + ) .await { match e { @@ -1245,8 +1332,17 @@ impl Tenant { } } Some(local_metadata) => { + let timeline_preload = + preload.as_mut().map(|p| p.remove(&timeline_id).unwrap()); if let Err(e) = self - .load_local_timeline(timeline_id, local_metadata, init_order, ctx, true) + .load_local_timeline( + timeline_id, + local_metadata, + timeline_preload, + init_order, + ctx, + true, + ) .await { match e { @@ -1274,11 +1370,12 @@ impl Tenant { /// Subroutine of `load_tenant`, to load an individual timeline /// /// NB: The parent is assumed to be already loaded! - #[instrument(skip(self, local_metadata, init_order, ctx))] + #[instrument(skip(self, local_metadata, init_order, preload, ctx))] async fn load_local_timeline( self: &Arc, timeline_id: TimelineId, local_metadata: TimelineMetadata, + preload: Option, init_order: Option<&InitializationOrder>, ctx: &RequestContext, found_delete_mark: bool, @@ -1287,74 +1384,81 @@ impl Tenant { let mut resources = self.build_timeline_resources(timeline_id); - let (remote_startup_data, remote_client) = match resources.remote_client { - Some(remote_client) => match remote_client.download_index_file().await { - Ok(index_part) => { - let index_part = match index_part { - MaybeDeletedIndexPart::IndexPart(index_part) => index_part, - MaybeDeletedIndexPart::Deleted(index_part) => { - // TODO: we won't reach here if remote storage gets de-configured after start of the deletion operation. - // Example: - // start deletion operation - // finishes upload of index part - // pageserver crashes - // remote storage gets de-configured - // pageserver starts - // - // We don't really anticipate remote storage to be de-configured, so, for now, this is fine. - // Also, maybe we'll remove that option entirely in the future, see https://github.com/neondatabase/neon/issues/4099. - info!("is_deleted is set on remote, resuming removal of timeline data originally done by timeline deletion handler"); + let (remote_startup_data, remote_client) = match preload { + Some(preload) => { + let TimelinePreload { + index_part, + client: remote_client, + timeline_id: _timeline_id, + } = preload; + match index_part { + Ok(index_part) => { + let index_part = match index_part { + MaybeDeletedIndexPart::IndexPart(index_part) => index_part, + MaybeDeletedIndexPart::Deleted(index_part) => { + // TODO: we won't reach here if remote storage gets de-configured after start of the deletion operation. + // Example: + // start deletion operation + // finishes upload of index part + // pageserver crashes + // remote storage gets de-configured + // pageserver starts + // + // We don't really anticipate remote storage to be de-configured, so, for now, this is fine. + // Also, maybe we'll remove that option entirely in the future, see https://github.com/neondatabase/neon/issues/4099. + info!("is_deleted is set on remote, resuming removal of timeline data originally done by timeline deletion handler"); - remote_client - .init_upload_queue_stopped_to_continue_deletion(&index_part) - .context("init queue stopped") + remote_client + .init_upload_queue_stopped_to_continue_deletion(&index_part) + .context("init queue stopped") + .map_err(LoadLocalTimelineError::ResumeDeletion)?; + + DeleteTimelineFlow::resume_deletion( + Arc::clone(self), + timeline_id, + &local_metadata, + Some(remote_client), + self.deletion_queue_client.clone(), + init_order, + ) + .await + .context("resume deletion") .map_err(LoadLocalTimelineError::ResumeDeletion)?; - DeleteTimelineFlow::resume_deletion( - Arc::clone(self), + return Ok(()); + } + }; + + let remote_metadata = index_part.metadata.clone(); + ( + Some(RemoteStartupData { + index_part, + remote_metadata, + }), + Some(remote_client), + ) + } + Err(DownloadError::NotFound) => { + info!("no index file was found on the remote, found_delete_mark: {found_delete_mark}"); + + if found_delete_mark { + // We could've resumed at a point where remote index was deleted, but metadata file wasnt. + // Cleanup: + return DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces( + self, timeline_id, - &local_metadata, - Some(remote_client), - self.deletion_queue_client.clone(), - init_order, ) .await - .context("resume deletion") - .map_err(LoadLocalTimelineError::ResumeDeletion)?; - - return Ok(()); + .context("cleanup_remaining_timeline_fs_traces") + .map_err(LoadLocalTimelineError::ResumeDeletion); } - }; - let remote_metadata = index_part.metadata.clone(); - ( - Some(RemoteStartupData { - index_part, - remote_metadata, - }), - Some(remote_client), - ) - } - Err(DownloadError::NotFound) => { - info!("no index file was found on the remote, found_delete_mark: {found_delete_mark}"); - - if found_delete_mark { - // We could've resumed at a point where remote index was deleted, but metadata file wasnt. - // Cleanup: - return DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces( - self, - timeline_id, - ) - .await - .context("cleanup_remaining_timeline_fs_traces") - .map_err(LoadLocalTimelineError::ResumeDeletion); + // We're loading fresh timeline that didnt yet make it into remote. + (None, Some(remote_client)) } - - // We're loading fresh timeline that didnt yet make it into remote. - (None, Some(remote_client)) + Err(e) => return Err(LoadLocalTimelineError::Load(anyhow::Error::new(e))), } - Err(e) => return Err(LoadLocalTimelineError::Load(anyhow::Error::new(e))), - }, + } None => { // No remote client if found_delete_mark { @@ -3594,7 +3698,7 @@ pub mod harness { self.deletion_queue.new_client(), )); tenant - .load(None, ctx) + .load(None, None, ctx) .instrument(info_span!("try_load", tenant_id=%self.tenant_id)) .await?; diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index 3cb58aec6b..0db6213bbf 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -432,7 +432,7 @@ impl DeleteTenantFlow { // Tenant may not be loadable if we fail late in cleanup_remaining_fs_traces (e g remove timelines dir) let timelines_path = tenant.conf.timelines_path(&tenant.tenant_id); if timelines_path.exists() { - tenant.load(init_order, ctx).await.context("load")?; + tenant.load(init_order, None, ctx).await.context("load")?; } Self::background( diff --git a/test_runner/regress/test_pageserver_restart.py b/test_runner/regress/test_pageserver_restart.py index f7dc80a6d8..b49a1a40dd 100644 --- a/test_runner/regress/test_pageserver_restart.py +++ b/test_runner/regress/test_pageserver_restart.py @@ -4,6 +4,7 @@ import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder from fixtures.remote_storage import s3_storage +from fixtures.utils import wait_until # Test restarting page server, while safekeeper and compute node keep @@ -16,8 +17,7 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool) env = neon_env_builder.init_start() - env.neon_cli.create_branch("test_pageserver_restart") - endpoint = env.endpoints.create_start("test_pageserver_restart") + endpoint = env.endpoints.create_start("main") pageserver_http = env.pageserver.http_client() pg_conn = endpoint.connect() @@ -75,27 +75,52 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool) cur.execute("SELECT count(*) FROM foo") assert cur.fetchone() == (100000,) - # Validate startup time metrics - metrics = pageserver_http.get_metrics() + # Wait for metrics to indicate startup complete, so that we can know all + # startup phases will be reflected in the subsequent checks + def assert_complete(): + for sample in pageserver_http.get_metrics().query_all( + "pageserver_startup_duration_seconds" + ): + labels = dict(sample.labels) + log.info(f"metric {labels['phase']}={sample.value}") + if labels["phase"] == "complete" and sample.value > 0: + return + + raise AssertionError("No 'complete' metric yet") + + wait_until(30, 1.0, assert_complete) # Expectation callbacks: arg t is sample value, arg p is the previous phase's sample value - expectations = { - "initial": lambda t, p: True, # make no assumptions about the initial time point, it could be 0 in theory + expectations = [ + ( + "initial", + lambda t, p: True, + ), # make no assumptions about the initial time point, it could be 0 in theory + # Remote phase of initial_tenant_load should happen before overall phase is complete + ("initial_tenant_load_remote", lambda t, p: t >= 0.0 and t >= p), # Initial tenant load should reflect the delay we injected - "initial_tenant_load": lambda t, p: t >= (tenant_load_delay_ms / 1000.0) and t >= p, + ("initial_tenant_load", lambda t, p: t >= (tenant_load_delay_ms / 1000.0) and t >= p), # Subsequent steps should occur in expected order - "initial_logical_sizes": lambda t, p: t > 0 and t >= p, - "background_jobs_can_start": lambda t, p: t > 0 and t >= p, - "complete": lambda t, p: t > 0 and t >= p, - } + ("initial_logical_sizes", lambda t, p: t > 0 and t >= p), + ("background_jobs_can_start", lambda t, p: t > 0 and t >= p), + ("complete", lambda t, p: t > 0 and t >= p), + ] + # Accumulate the runtime of each startup phase + values = {} + metrics = pageserver_http.get_metrics() prev_value = None for sample in metrics.query_all("pageserver_startup_duration_seconds"): - labels = dict(sample.labels) - phase = labels["phase"] + phase = sample.labels["phase"] log.info(f"metric {phase}={sample.value}") - assert phase in expectations, f"Unexpected phase {phase}" - assert expectations[phase]( + assert phase in [e[0] for e in expectations], f"Unexpected phase {phase}" + values[phase] = sample + + # Apply expectations to the metrics retrieved + for phase, expectation in expectations: + assert phase in values, f"No data for phase {phase}" + sample = values[phase] + assert expectation( sample.value, prev_value ), f"Unexpected value for {phase}: {sample.value}" prev_value = sample.value