mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 06:52:55 +00:00
pageserver: measure startup duration spent fetching remote indices (#5564)
## Problem Currently it's unclear how much of the `initial_tenant_load` period is in S3 objects, and therefore how impactful it is to make changes to remote operations during startup. ## Summary of changes - `Tenant::load` is refactored to load remote indices in parallel and to wait for all these remote downloads to finish before it proceeds to construct any `Timeline` objects. - `pageserver_startup_duration_seconds` gets a new `phase` value of `initial_tenant_load_remote` which counts the time from startup to when the last tenant finishes loading remote content. - `test_pageserver_restart` is extended to validate this phase. The previous version of the test was relying on order of dict entries, which stopped working when adding a phase, so this is refactored a bit. - `test_pageserver_restart` used to explicitly create a branch, now it uses the default initial_timeline. This avoids startup getting held up waiting for logical sizes, when one of the branches is not in use.
This commit is contained in:
@@ -355,6 +355,7 @@ fn start_pageserver(
|
||||
// consumer side) will be dropped once we can start the background jobs. Currently it is behind
|
||||
// completing all initial logical size calculations (init_logical_size_done_rx) and a timeout
|
||||
// (background_task_maximum_delay).
|
||||
let (init_remote_done_tx, init_remote_done_rx) = utils::completion::channel();
|
||||
let (init_done_tx, init_done_rx) = utils::completion::channel();
|
||||
|
||||
let (init_logical_size_done_tx, init_logical_size_done_rx) = utils::completion::channel();
|
||||
@@ -362,7 +363,8 @@ fn start_pageserver(
|
||||
let (background_jobs_can_start, background_jobs_barrier) = utils::completion::channel();
|
||||
|
||||
let order = pageserver::InitializationOrder {
|
||||
initial_tenant_load: Some(init_done_tx),
|
||||
initial_tenant_load_remote: Some(init_done_tx),
|
||||
initial_tenant_load: Some(init_remote_done_tx),
|
||||
initial_logical_size_can_start: init_done_rx.clone(),
|
||||
initial_logical_size_attempt: Some(init_logical_size_done_tx),
|
||||
background_jobs_can_start: background_jobs_barrier.clone(),
|
||||
@@ -388,6 +390,9 @@ fn start_pageserver(
|
||||
// NOTE: unlike many futures in pageserver, this one is cancellation-safe
|
||||
let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial load completed"));
|
||||
|
||||
init_remote_done_rx.wait().await;
|
||||
startup_checkpoint("initial_tenant_load_remote", "Remote part of initial load completed");
|
||||
|
||||
init_done_rx.wait().await;
|
||||
startup_checkpoint("initial_tenant_load", "Initial load completed");
|
||||
STARTUP_IS_LOADING.set(0);
|
||||
|
||||
@@ -173,6 +173,9 @@ fn is_walkdir_io_not_found(e: &walkdir::Error) -> bool {
|
||||
/// delaying is needed.
|
||||
#[derive(Clone)]
|
||||
pub struct InitializationOrder {
|
||||
/// Each initial tenant load task carries this until it is done loading timelines from remote storage
|
||||
pub initial_tenant_load_remote: Option<utils::completion::Completion>,
|
||||
|
||||
/// Each initial tenant load task carries this until completion.
|
||||
pub initial_tenant_load: Option<utils::completion::Completion>,
|
||||
|
||||
|
||||
@@ -23,12 +23,14 @@ use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::completion;
|
||||
use utils::completion::Completion;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
|
||||
use std::cmp::min;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Debug;
|
||||
use std::fmt::Display;
|
||||
use std::fs;
|
||||
@@ -185,6 +187,11 @@ impl AttachedTenantConf {
|
||||
}
|
||||
}
|
||||
}
|
||||
struct TimelinePreload {
|
||||
timeline_id: TimelineId,
|
||||
client: RemoteTimelineClient,
|
||||
index_part: Result<MaybeDeletedIndexPart, DownloadError>,
|
||||
}
|
||||
|
||||
///
|
||||
/// Tenant consists of multiple timelines. Keep them in a hash table.
|
||||
@@ -962,6 +969,9 @@ impl Tenant {
|
||||
let _completion = init_order
|
||||
.as_mut()
|
||||
.and_then(|x| x.initial_tenant_load.take());
|
||||
let remote_load_completion = init_order
|
||||
.as_mut()
|
||||
.and_then(|x| x.initial_tenant_load_remote.take());
|
||||
|
||||
// Dont block pageserver startup on figuring out deletion status
|
||||
let pending_deletion = {
|
||||
@@ -986,6 +996,7 @@ impl Tenant {
|
||||
// as we are no longer loading, signal completion by dropping
|
||||
// the completion while we resume deletion
|
||||
drop(_completion);
|
||||
drop(remote_load_completion);
|
||||
// do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout
|
||||
let _ = init_order
|
||||
.as_mut()
|
||||
@@ -1011,7 +1022,10 @@ impl Tenant {
|
||||
let background_jobs_can_start =
|
||||
init_order.as_ref().map(|x| &x.background_jobs_can_start);
|
||||
|
||||
match tenant_clone.load(init_order.as_ref(), &ctx).await {
|
||||
match tenant_clone
|
||||
.load(init_order.as_ref(), remote_load_completion, &ctx)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
debug!("load finished");
|
||||
|
||||
@@ -1175,6 +1189,52 @@ impl Tenant {
|
||||
})
|
||||
}
|
||||
|
||||
async fn load_timeline_metadata(
|
||||
self: &Arc<Tenant>,
|
||||
timeline_ids: HashSet<TimelineId>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
) -> anyhow::Result<HashMap<TimelineId, TimelinePreload>> {
|
||||
let mut part_downloads = JoinSet::new();
|
||||
for timeline_id in timeline_ids {
|
||||
let client = RemoteTimelineClient::new(
|
||||
remote_storage.clone(),
|
||||
self.deletion_queue_client.clone(),
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
self.generation,
|
||||
);
|
||||
part_downloads.spawn(
|
||||
async move {
|
||||
debug!("starting index part download");
|
||||
|
||||
let index_part = client.download_index_file().await;
|
||||
|
||||
debug!("finished index part download");
|
||||
|
||||
Result::<_, anyhow::Error>::Ok(TimelinePreload {
|
||||
client,
|
||||
timeline_id,
|
||||
index_part,
|
||||
})
|
||||
}
|
||||
.map(move |res| {
|
||||
res.with_context(|| format!("download index part for timeline {timeline_id}"))
|
||||
})
|
||||
.instrument(info_span!("download_index_part", %timeline_id)),
|
||||
);
|
||||
}
|
||||
|
||||
let mut timeline_preloads: HashMap<TimelineId, TimelinePreload> = HashMap::new();
|
||||
while let Some(result) = part_downloads.join_next().await {
|
||||
let preload_result = result.context("join preload task")?;
|
||||
let preload = preload_result?;
|
||||
timeline_preloads.insert(preload.timeline_id, preload);
|
||||
}
|
||||
|
||||
Ok(timeline_preloads)
|
||||
}
|
||||
|
||||
///
|
||||
/// Background task to load in-memory data structures for this tenant, from
|
||||
/// files on disk. Used at pageserver startup.
|
||||
@@ -1183,14 +1243,13 @@ impl Tenant {
|
||||
async fn load(
|
||||
self: &Arc<Tenant>,
|
||||
init_order: Option<&InitializationOrder>,
|
||||
remote_completion: Option<Completion>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
debug!("loading tenant task");
|
||||
|
||||
crate::failpoint_support::sleep_millis_async!("before-loading-tenant");
|
||||
|
||||
// Load in-memory state to reflect the local files on disk
|
||||
//
|
||||
// Scan the directory, peek into the metadata file of each timeline, and
|
||||
@@ -1209,10 +1268,38 @@ impl Tenant {
|
||||
// FIXME original collect_timeline_files contained one more check:
|
||||
// 1. "Timeline has no ancestor and no layer files"
|
||||
|
||||
// Load remote content for timelines in this tenant
|
||||
let all_timeline_ids = scan
|
||||
.sorted_timelines_to_load
|
||||
.iter()
|
||||
.map(|i| i.0)
|
||||
.chain(scan.timelines_to_resume_deletion.iter().map(|i| i.0))
|
||||
.collect();
|
||||
let mut preload = if let Some(remote_storage) = &self.remote_storage {
|
||||
Some(
|
||||
self.load_timeline_metadata(all_timeline_ids, remote_storage)
|
||||
.await?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
drop(remote_completion);
|
||||
|
||||
crate::failpoint_support::sleep_millis_async!("before-loading-tenant");
|
||||
|
||||
// Process loadable timelines first
|
||||
for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
|
||||
let timeline_preload = preload.as_mut().map(|p| p.remove(&timeline_id).unwrap());
|
||||
if let Err(e) = self
|
||||
.load_local_timeline(timeline_id, local_metadata, init_order, ctx, false)
|
||||
.load_local_timeline(
|
||||
timeline_id,
|
||||
local_metadata,
|
||||
timeline_preload,
|
||||
init_order,
|
||||
ctx,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
@@ -1245,8 +1332,17 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
Some(local_metadata) => {
|
||||
let timeline_preload =
|
||||
preload.as_mut().map(|p| p.remove(&timeline_id).unwrap());
|
||||
if let Err(e) = self
|
||||
.load_local_timeline(timeline_id, local_metadata, init_order, ctx, true)
|
||||
.load_local_timeline(
|
||||
timeline_id,
|
||||
local_metadata,
|
||||
timeline_preload,
|
||||
init_order,
|
||||
ctx,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
@@ -1274,11 +1370,12 @@ impl Tenant {
|
||||
/// Subroutine of `load_tenant`, to load an individual timeline
|
||||
///
|
||||
/// NB: The parent is assumed to be already loaded!
|
||||
#[instrument(skip(self, local_metadata, init_order, ctx))]
|
||||
#[instrument(skip(self, local_metadata, init_order, preload, ctx))]
|
||||
async fn load_local_timeline(
|
||||
self: &Arc<Self>,
|
||||
timeline_id: TimelineId,
|
||||
local_metadata: TimelineMetadata,
|
||||
preload: Option<TimelinePreload>,
|
||||
init_order: Option<&InitializationOrder>,
|
||||
ctx: &RequestContext,
|
||||
found_delete_mark: bool,
|
||||
@@ -1287,74 +1384,81 @@ impl Tenant {
|
||||
|
||||
let mut resources = self.build_timeline_resources(timeline_id);
|
||||
|
||||
let (remote_startup_data, remote_client) = match resources.remote_client {
|
||||
Some(remote_client) => match remote_client.download_index_file().await {
|
||||
Ok(index_part) => {
|
||||
let index_part = match index_part {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
|
||||
MaybeDeletedIndexPart::Deleted(index_part) => {
|
||||
// TODO: we won't reach here if remote storage gets de-configured after start of the deletion operation.
|
||||
// Example:
|
||||
// start deletion operation
|
||||
// finishes upload of index part
|
||||
// pageserver crashes
|
||||
// remote storage gets de-configured
|
||||
// pageserver starts
|
||||
//
|
||||
// We don't really anticipate remote storage to be de-configured, so, for now, this is fine.
|
||||
// Also, maybe we'll remove that option entirely in the future, see https://github.com/neondatabase/neon/issues/4099.
|
||||
info!("is_deleted is set on remote, resuming removal of timeline data originally done by timeline deletion handler");
|
||||
let (remote_startup_data, remote_client) = match preload {
|
||||
Some(preload) => {
|
||||
let TimelinePreload {
|
||||
index_part,
|
||||
client: remote_client,
|
||||
timeline_id: _timeline_id,
|
||||
} = preload;
|
||||
match index_part {
|
||||
Ok(index_part) => {
|
||||
let index_part = match index_part {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
|
||||
MaybeDeletedIndexPart::Deleted(index_part) => {
|
||||
// TODO: we won't reach here if remote storage gets de-configured after start of the deletion operation.
|
||||
// Example:
|
||||
// start deletion operation
|
||||
// finishes upload of index part
|
||||
// pageserver crashes
|
||||
// remote storage gets de-configured
|
||||
// pageserver starts
|
||||
//
|
||||
// We don't really anticipate remote storage to be de-configured, so, for now, this is fine.
|
||||
// Also, maybe we'll remove that option entirely in the future, see https://github.com/neondatabase/neon/issues/4099.
|
||||
info!("is_deleted is set on remote, resuming removal of timeline data originally done by timeline deletion handler");
|
||||
|
||||
remote_client
|
||||
.init_upload_queue_stopped_to_continue_deletion(&index_part)
|
||||
.context("init queue stopped")
|
||||
remote_client
|
||||
.init_upload_queue_stopped_to_continue_deletion(&index_part)
|
||||
.context("init queue stopped")
|
||||
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
|
||||
|
||||
DeleteTimelineFlow::resume_deletion(
|
||||
Arc::clone(self),
|
||||
timeline_id,
|
||||
&local_metadata,
|
||||
Some(remote_client),
|
||||
self.deletion_queue_client.clone(),
|
||||
init_order,
|
||||
)
|
||||
.await
|
||||
.context("resume deletion")
|
||||
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
|
||||
|
||||
DeleteTimelineFlow::resume_deletion(
|
||||
Arc::clone(self),
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let remote_metadata = index_part.metadata.clone();
|
||||
(
|
||||
Some(RemoteStartupData {
|
||||
index_part,
|
||||
remote_metadata,
|
||||
}),
|
||||
Some(remote_client),
|
||||
)
|
||||
}
|
||||
Err(DownloadError::NotFound) => {
|
||||
info!("no index file was found on the remote, found_delete_mark: {found_delete_mark}");
|
||||
|
||||
if found_delete_mark {
|
||||
// We could've resumed at a point where remote index was deleted, but metadata file wasnt.
|
||||
// Cleanup:
|
||||
return DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(
|
||||
self,
|
||||
timeline_id,
|
||||
&local_metadata,
|
||||
Some(remote_client),
|
||||
self.deletion_queue_client.clone(),
|
||||
init_order,
|
||||
)
|
||||
.await
|
||||
.context("resume deletion")
|
||||
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
|
||||
|
||||
return Ok(());
|
||||
.context("cleanup_remaining_timeline_fs_traces")
|
||||
.map_err(LoadLocalTimelineError::ResumeDeletion);
|
||||
}
|
||||
};
|
||||
|
||||
let remote_metadata = index_part.metadata.clone();
|
||||
(
|
||||
Some(RemoteStartupData {
|
||||
index_part,
|
||||
remote_metadata,
|
||||
}),
|
||||
Some(remote_client),
|
||||
)
|
||||
}
|
||||
Err(DownloadError::NotFound) => {
|
||||
info!("no index file was found on the remote, found_delete_mark: {found_delete_mark}");
|
||||
|
||||
if found_delete_mark {
|
||||
// We could've resumed at a point where remote index was deleted, but metadata file wasnt.
|
||||
// Cleanup:
|
||||
return DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(
|
||||
self,
|
||||
timeline_id,
|
||||
)
|
||||
.await
|
||||
.context("cleanup_remaining_timeline_fs_traces")
|
||||
.map_err(LoadLocalTimelineError::ResumeDeletion);
|
||||
// We're loading fresh timeline that didnt yet make it into remote.
|
||||
(None, Some(remote_client))
|
||||
}
|
||||
|
||||
// We're loading fresh timeline that didnt yet make it into remote.
|
||||
(None, Some(remote_client))
|
||||
Err(e) => return Err(LoadLocalTimelineError::Load(anyhow::Error::new(e))),
|
||||
}
|
||||
Err(e) => return Err(LoadLocalTimelineError::Load(anyhow::Error::new(e))),
|
||||
},
|
||||
}
|
||||
None => {
|
||||
// No remote client
|
||||
if found_delete_mark {
|
||||
@@ -3594,7 +3698,7 @@ pub mod harness {
|
||||
self.deletion_queue.new_client(),
|
||||
));
|
||||
tenant
|
||||
.load(None, ctx)
|
||||
.load(None, None, ctx)
|
||||
.instrument(info_span!("try_load", tenant_id=%self.tenant_id))
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -432,7 +432,7 @@ impl DeleteTenantFlow {
|
||||
// Tenant may not be loadable if we fail late in cleanup_remaining_fs_traces (e g remove timelines dir)
|
||||
let timelines_path = tenant.conf.timelines_path(&tenant.tenant_id);
|
||||
if timelines_path.exists() {
|
||||
tenant.load(init_order, ctx).await.context("load")?;
|
||||
tenant.load(init_order, None, ctx).await.context("load")?;
|
||||
}
|
||||
|
||||
Self::background(
|
||||
|
||||
@@ -4,6 +4,7 @@ import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.remote_storage import s3_storage
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
# Test restarting page server, while safekeeper and compute node keep
|
||||
@@ -16,8 +17,7 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("test_pageserver_restart")
|
||||
endpoint = env.endpoints.create_start("test_pageserver_restart")
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
@@ -75,27 +75,52 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool)
|
||||
cur.execute("SELECT count(*) FROM foo")
|
||||
assert cur.fetchone() == (100000,)
|
||||
|
||||
# Validate startup time metrics
|
||||
metrics = pageserver_http.get_metrics()
|
||||
# Wait for metrics to indicate startup complete, so that we can know all
|
||||
# startup phases will be reflected in the subsequent checks
|
||||
def assert_complete():
|
||||
for sample in pageserver_http.get_metrics().query_all(
|
||||
"pageserver_startup_duration_seconds"
|
||||
):
|
||||
labels = dict(sample.labels)
|
||||
log.info(f"metric {labels['phase']}={sample.value}")
|
||||
if labels["phase"] == "complete" and sample.value > 0:
|
||||
return
|
||||
|
||||
raise AssertionError("No 'complete' metric yet")
|
||||
|
||||
wait_until(30, 1.0, assert_complete)
|
||||
|
||||
# Expectation callbacks: arg t is sample value, arg p is the previous phase's sample value
|
||||
expectations = {
|
||||
"initial": lambda t, p: True, # make no assumptions about the initial time point, it could be 0 in theory
|
||||
expectations = [
|
||||
(
|
||||
"initial",
|
||||
lambda t, p: True,
|
||||
), # make no assumptions about the initial time point, it could be 0 in theory
|
||||
# Remote phase of initial_tenant_load should happen before overall phase is complete
|
||||
("initial_tenant_load_remote", lambda t, p: t >= 0.0 and t >= p),
|
||||
# Initial tenant load should reflect the delay we injected
|
||||
"initial_tenant_load": lambda t, p: t >= (tenant_load_delay_ms / 1000.0) and t >= p,
|
||||
("initial_tenant_load", lambda t, p: t >= (tenant_load_delay_ms / 1000.0) and t >= p),
|
||||
# Subsequent steps should occur in expected order
|
||||
"initial_logical_sizes": lambda t, p: t > 0 and t >= p,
|
||||
"background_jobs_can_start": lambda t, p: t > 0 and t >= p,
|
||||
"complete": lambda t, p: t > 0 and t >= p,
|
||||
}
|
||||
("initial_logical_sizes", lambda t, p: t > 0 and t >= p),
|
||||
("background_jobs_can_start", lambda t, p: t > 0 and t >= p),
|
||||
("complete", lambda t, p: t > 0 and t >= p),
|
||||
]
|
||||
|
||||
# Accumulate the runtime of each startup phase
|
||||
values = {}
|
||||
metrics = pageserver_http.get_metrics()
|
||||
prev_value = None
|
||||
for sample in metrics.query_all("pageserver_startup_duration_seconds"):
|
||||
labels = dict(sample.labels)
|
||||
phase = labels["phase"]
|
||||
phase = sample.labels["phase"]
|
||||
log.info(f"metric {phase}={sample.value}")
|
||||
assert phase in expectations, f"Unexpected phase {phase}"
|
||||
assert expectations[phase](
|
||||
assert phase in [e[0] for e in expectations], f"Unexpected phase {phase}"
|
||||
values[phase] = sample
|
||||
|
||||
# Apply expectations to the metrics retrieved
|
||||
for phase, expectation in expectations:
|
||||
assert phase in values, f"No data for phase {phase}"
|
||||
sample = values[phase]
|
||||
assert expectation(
|
||||
sample.value, prev_value
|
||||
), f"Unexpected value for {phase}: {sample.value}"
|
||||
prev_value = sample.value
|
||||
|
||||
Reference in New Issue
Block a user