mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 14:00:38 +00:00
cleanup around attach (#6621)
The smaller changes I found while looking around #6584. - rustfmt was not able to format handle_timeline_create - fix Generation::get_suffix always allocating - Generation was missing a `#[track_caller]` for panicky method - attach has a lot of issues, but even with this PR it cannot be formatted by rustfmt - moved the `preload` span to be on top of `attach` -- it is awaited inline - make disconnected panic! or unreachable! into expect, expect_err
This commit is contained in:
@@ -54,12 +54,10 @@ impl Generation {
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
pub fn get_suffix(&self) -> String {
|
||||
pub fn get_suffix(&self) -> impl std::fmt::Display {
|
||||
match self {
|
||||
Self::Valid(v) => {
|
||||
format!("-{:08x}", v)
|
||||
}
|
||||
Self::None => "".into(),
|
||||
Self::Valid(v) => GenerationFileSuffix(Some(*v)),
|
||||
Self::None => GenerationFileSuffix(None),
|
||||
Self::Broken => {
|
||||
panic!("Tried to use a broken generation");
|
||||
}
|
||||
@@ -90,6 +88,7 @@ impl Generation {
|
||||
}
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
pub fn next(&self) -> Generation {
|
||||
match self {
|
||||
Self::Valid(n) => Self::Valid(*n + 1),
|
||||
@@ -107,6 +106,18 @@ impl Generation {
|
||||
}
|
||||
}
|
||||
|
||||
struct GenerationFileSuffix(Option<u32>);
|
||||
|
||||
impl std::fmt::Display for GenerationFileSuffix {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
if let Some(g) = self.0 {
|
||||
write!(f, "-{g:08x}")
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for Generation {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
@@ -164,4 +175,24 @@ mod test {
|
||||
assert!(Generation::none() < Generation::new(0));
|
||||
assert!(Generation::none() < Generation::new(1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn suffix_is_stable() {
|
||||
use std::fmt::Write as _;
|
||||
|
||||
// the suffix must remain stable through-out the pageserver remote storage evolution and
|
||||
// not be changed accidentially without thinking about migration
|
||||
let examples = [
|
||||
(line!(), Generation::None, ""),
|
||||
(line!(), Generation::Valid(0), "-00000000"),
|
||||
(line!(), Generation::Valid(u32::MAX), "-ffffffff"),
|
||||
];
|
||||
|
||||
let mut s = String::new();
|
||||
for (line, gen, expected) in examples {
|
||||
s.clear();
|
||||
write!(s, "{}", &gen.get_suffix()).expect("string grows");
|
||||
assert_eq!(s, expected, "example on {line}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -488,7 +488,9 @@ async fn timeline_create_handler(
|
||||
let state = get_state(&request);
|
||||
|
||||
async {
|
||||
let tenant = state.tenant_manager.get_attached_tenant_shard(tenant_shard_id, false)?;
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id, false)?;
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
@@ -498,48 +500,62 @@ async fn timeline_create_handler(
|
||||
tracing::info!("bootstrapping");
|
||||
}
|
||||
|
||||
match tenant.create_timeline(
|
||||
new_timeline_id,
|
||||
request_data.ancestor_timeline_id.map(TimelineId::from),
|
||||
request_data.ancestor_start_lsn,
|
||||
request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
|
||||
request_data.existing_initdb_timeline_id,
|
||||
state.broker_client.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await {
|
||||
match tenant
|
||||
.create_timeline(
|
||||
new_timeline_id,
|
||||
request_data.ancestor_timeline_id,
|
||||
request_data.ancestor_start_lsn,
|
||||
request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
|
||||
request_data.existing_initdb_timeline_id,
|
||||
state.broker_client.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(new_timeline) => {
|
||||
// Created. Construct a TimelineInfo for it.
|
||||
let timeline_info = build_timeline_info_common(&new_timeline, &ctx, tenant::timeline::GetLogicalSizePriority::User)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
let timeline_info = build_timeline_info_common(
|
||||
&new_timeline,
|
||||
&ctx,
|
||||
tenant::timeline::GetLogicalSizePriority::User,
|
||||
)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::CREATED, timeline_info)
|
||||
}
|
||||
Err(_) if tenant.cancel.is_cancelled() => {
|
||||
// In case we get some ugly error type during shutdown, cast it into a clean 503.
|
||||
json_response(StatusCode::SERVICE_UNAVAILABLE, HttpErrorBody::from_msg("Tenant shutting down".to_string()))
|
||||
}
|
||||
Err(tenant::CreateTimelineError::Conflict | tenant::CreateTimelineError::AlreadyCreating) => {
|
||||
json_response(StatusCode::CONFLICT, ())
|
||||
}
|
||||
Err(tenant::CreateTimelineError::AncestorLsn(err)) => {
|
||||
json_response(StatusCode::NOT_ACCEPTABLE, HttpErrorBody::from_msg(
|
||||
format!("{err:#}")
|
||||
))
|
||||
}
|
||||
Err(e @ tenant::CreateTimelineError::AncestorNotActive) => {
|
||||
json_response(StatusCode::SERVICE_UNAVAILABLE, HttpErrorBody::from_msg(e.to_string()))
|
||||
}
|
||||
Err(tenant::CreateTimelineError::ShuttingDown) => {
|
||||
json_response(StatusCode::SERVICE_UNAVAILABLE,HttpErrorBody::from_msg("tenant shutting down".to_string()))
|
||||
json_response(
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
HttpErrorBody::from_msg("Tenant shutting down".to_string()),
|
||||
)
|
||||
}
|
||||
Err(
|
||||
tenant::CreateTimelineError::Conflict
|
||||
| tenant::CreateTimelineError::AlreadyCreating,
|
||||
) => json_response(StatusCode::CONFLICT, ()),
|
||||
Err(tenant::CreateTimelineError::AncestorLsn(err)) => json_response(
|
||||
StatusCode::NOT_ACCEPTABLE,
|
||||
HttpErrorBody::from_msg(format!("{err:#}")),
|
||||
),
|
||||
Err(e @ tenant::CreateTimelineError::AncestorNotActive) => json_response(
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
HttpErrorBody::from_msg(e.to_string()),
|
||||
),
|
||||
Err(tenant::CreateTimelineError::ShuttingDown) => json_response(
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
HttpErrorBody::from_msg("tenant shutting down".to_string()),
|
||||
),
|
||||
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("timeline_create",
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
shard_id = %tenant_shard_id.shard_slug(),
|
||||
timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
|
||||
timeline_id = %new_timeline_id,
|
||||
lsn=?request_data.ancestor_start_lsn,
|
||||
pg_version=?request_data.pg_version
|
||||
))
|
||||
.await
|
||||
}
|
||||
|
||||
|
||||
@@ -644,10 +644,10 @@ impl Tenant {
|
||||
|
||||
// The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
|
||||
// we shut down while attaching.
|
||||
let Ok(attach_gate_guard) = tenant.gate.enter() else {
|
||||
// We just created the Tenant: nothing else can have shut it down yet
|
||||
unreachable!();
|
||||
};
|
||||
let attach_gate_guard = tenant
|
||||
.gate
|
||||
.enter()
|
||||
.expect("We just created the Tenant: nothing else can have shut it down yet");
|
||||
|
||||
// Do all the hard work in the background
|
||||
let tenant_clone = Arc::clone(&tenant);
|
||||
@@ -755,36 +755,27 @@ impl Tenant {
|
||||
AttachType::Normal
|
||||
};
|
||||
|
||||
let preload_timer = TENANT.preload.start_timer();
|
||||
let preload = match mode {
|
||||
SpawnMode::Create => {
|
||||
// Don't count the skipped preload into the histogram of preload durations
|
||||
preload_timer.stop_and_discard();
|
||||
let preload = match (&mode, &remote_storage) {
|
||||
(SpawnMode::Create, _) => {
|
||||
None
|
||||
},
|
||||
SpawnMode::Normal => {
|
||||
match &remote_storage {
|
||||
Some(remote_storage) => Some(
|
||||
match tenant_clone
|
||||
.preload(remote_storage, task_mgr::shutdown_token())
|
||||
.instrument(
|
||||
tracing::info_span!(parent: None, "attach_preload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()),
|
||||
)
|
||||
.await {
|
||||
Ok(p) => {
|
||||
preload_timer.observe_duration();
|
||||
p
|
||||
}
|
||||
,
|
||||
Err(e) => {
|
||||
make_broken(&tenant_clone, anyhow::anyhow!(e));
|
||||
return Ok(());
|
||||
}
|
||||
},
|
||||
),
|
||||
None => None,
|
||||
(SpawnMode::Normal, Some(remote_storage)) => {
|
||||
let _preload_timer = TENANT.preload.start_timer();
|
||||
let res = tenant_clone
|
||||
.preload(remote_storage, task_mgr::shutdown_token())
|
||||
.await;
|
||||
match res {
|
||||
Ok(p) => Some(p),
|
||||
Err(e) => {
|
||||
make_broken(&tenant_clone, anyhow::anyhow!(e));
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
(SpawnMode::Normal, None) => {
|
||||
let _preload_timer = TENANT.preload.start_timer();
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// Remote preload is complete.
|
||||
@@ -820,36 +811,37 @@ impl Tenant {
|
||||
info!("ready for backgound jobs barrier");
|
||||
}
|
||||
|
||||
match DeleteTenantFlow::resume_from_attach(
|
||||
let deleted = DeleteTenantFlow::resume_from_attach(
|
||||
deletion,
|
||||
&tenant_clone,
|
||||
preload,
|
||||
tenants,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(err) => {
|
||||
make_broken(&tenant_clone, anyhow::anyhow!(err));
|
||||
return Ok(());
|
||||
}
|
||||
Ok(()) => return Ok(()),
|
||||
.await;
|
||||
|
||||
if let Err(e) = deleted {
|
||||
make_broken(&tenant_clone, anyhow::anyhow!(e));
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// We will time the duration of the attach phase unless this is a creation (attach will do no work)
|
||||
let attach_timer = match mode {
|
||||
SpawnMode::Create => None,
|
||||
SpawnMode::Normal => {Some(TENANT.attach.start_timer())}
|
||||
let attached = {
|
||||
let _attach_timer = match mode {
|
||||
SpawnMode::Create => None,
|
||||
SpawnMode::Normal => {Some(TENANT.attach.start_timer())}
|
||||
};
|
||||
tenant_clone.attach(preload, mode, &ctx).await
|
||||
};
|
||||
match tenant_clone.attach(preload, mode, &ctx).await {
|
||||
|
||||
match attached {
|
||||
Ok(()) => {
|
||||
info!("attach finished, activating");
|
||||
if let Some(t)= attach_timer {t.observe_duration();}
|
||||
tenant_clone.activate(broker_client, None, &ctx);
|
||||
}
|
||||
Err(e) => {
|
||||
if let Some(t)= attach_timer {t.observe_duration();}
|
||||
make_broken(&tenant_clone, anyhow::anyhow!(e));
|
||||
}
|
||||
}
|
||||
@@ -862,34 +854,26 @@ impl Tenant {
|
||||
// logical size calculations: if logical size calculation semaphore is saturated,
|
||||
// then warmup will wait for that before proceeding to the next tenant.
|
||||
if let AttachType::Warmup(_permit) = attach_type {
|
||||
let mut futs = FuturesUnordered::new();
|
||||
let timelines: Vec<_> = tenant_clone.timelines.lock().unwrap().values().cloned().collect();
|
||||
for t in timelines {
|
||||
futs.push(t.await_initial_logical_size())
|
||||
}
|
||||
let mut futs: FuturesUnordered<_> = tenant_clone.timelines.lock().unwrap().values().cloned().map(|t| t.await_initial_logical_size()).collect();
|
||||
tracing::info!("Waiting for initial logical sizes while warming up...");
|
||||
while futs.next().await.is_some() {
|
||||
|
||||
}
|
||||
while futs.next().await.is_some() {}
|
||||
tracing::info!("Warm-up complete");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.instrument({
|
||||
let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), gen=?generation);
|
||||
span.follows_from(Span::current());
|
||||
span
|
||||
}),
|
||||
.instrument(tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), gen=?generation)),
|
||||
);
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn preload(
|
||||
self: &Arc<Tenant>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<TenantPreload> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
// Get list of remote timelines
|
||||
// download index files for every tenant timeline
|
||||
info!("listing remote timelines");
|
||||
@@ -3982,6 +3966,8 @@ pub(crate) mod harness {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[derive(Debug)]
|
||||
enum LoadMode {
|
||||
Local,
|
||||
Remote,
|
||||
@@ -4064,7 +4050,7 @@ pub(crate) mod harness {
|
||||
info_span!("TenantHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
|
||||
}
|
||||
|
||||
pub async fn load(&self) -> (Arc<Tenant>, RequestContext) {
|
||||
pub(crate) async fn load(&self) -> (Arc<Tenant>, RequestContext) {
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
(
|
||||
self.try_load(&ctx)
|
||||
@@ -4074,31 +4060,31 @@ pub(crate) mod harness {
|
||||
)
|
||||
}
|
||||
|
||||
fn remote_empty(&self) -> bool {
|
||||
let tenant_path = self.conf.tenant_path(&self.tenant_shard_id);
|
||||
let remote_tenant_dir = self
|
||||
.remote_fs_dir
|
||||
.join(tenant_path.strip_prefix(&self.conf.workdir).unwrap());
|
||||
if std::fs::metadata(&remote_tenant_dir).is_err() {
|
||||
return true;
|
||||
}
|
||||
|
||||
match std::fs::read_dir(remote_tenant_dir)
|
||||
.unwrap()
|
||||
.flatten()
|
||||
.next()
|
||||
{
|
||||
Some(entry) => {
|
||||
tracing::debug!(
|
||||
"remote_empty: not empty, found file {}",
|
||||
entry.file_name().to_string_lossy(),
|
||||
);
|
||||
false
|
||||
}
|
||||
None => true,
|
||||
}
|
||||
/// For tests that specifically want to exercise the local load path, which does
|
||||
/// not use remote storage.
|
||||
pub(crate) async fn try_load_local(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
self.do_try_load(ctx, LoadMode::Local).await
|
||||
}
|
||||
|
||||
/// The 'load' in this function is either a local load or a normal attachment,
|
||||
pub(crate) async fn try_load(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
|
||||
// If we have nothing in remote storage, must use load_local instead of attach: attach
|
||||
// will error out if there are no timelines.
|
||||
//
|
||||
// See https://github.com/neondatabase/neon/issues/5456 for how we will eliminate
|
||||
// this weird state of a Tenant which exists but doesn't have any timelines.
|
||||
let mode = match self.remote_empty() {
|
||||
true => LoadMode::Local,
|
||||
false => LoadMode::Remote,
|
||||
};
|
||||
|
||||
self.do_try_load(ctx, mode).await
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), ?mode))]
|
||||
async fn do_try_load(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
@@ -4125,20 +4111,13 @@ pub(crate) mod harness {
|
||||
|
||||
match mode {
|
||||
LoadMode::Local => {
|
||||
tenant
|
||||
.load_local(ctx)
|
||||
.instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
|
||||
.await?;
|
||||
tenant.load_local(ctx).await?;
|
||||
}
|
||||
LoadMode::Remote => {
|
||||
let preload = tenant
|
||||
.preload(&self.remote_storage, CancellationToken::new())
|
||||
.instrument(info_span!("try_load_preload", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
|
||||
.await?;
|
||||
tenant
|
||||
.attach(Some(preload), SpawnMode::Normal, ctx)
|
||||
.instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
|
||||
.await?;
|
||||
tenant.attach(Some(preload), SpawnMode::Normal, ctx).await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4149,25 +4128,29 @@ pub(crate) mod harness {
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
/// For tests that specifically want to exercise the local load path, which does
|
||||
/// not use remote storage.
|
||||
pub async fn try_load_local(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
|
||||
self.do_try_load(ctx, LoadMode::Local).await
|
||||
}
|
||||
fn remote_empty(&self) -> bool {
|
||||
let tenant_path = self.conf.tenant_path(&self.tenant_shard_id);
|
||||
let remote_tenant_dir = self
|
||||
.remote_fs_dir
|
||||
.join(tenant_path.strip_prefix(&self.conf.workdir).unwrap());
|
||||
if std::fs::metadata(&remote_tenant_dir).is_err() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/// The 'load' in this function is either a local load or a normal attachment,
|
||||
pub async fn try_load(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
|
||||
// If we have nothing in remote storage, must use load_local instead of attach: attach
|
||||
// will error out if there are no timelines.
|
||||
//
|
||||
// See https://github.com/neondatabase/neon/issues/5456 for how we will eliminate
|
||||
// this weird state of a Tenant which exists but doesn't have any timelines.
|
||||
let mode = match self.remote_empty() {
|
||||
true => LoadMode::Local,
|
||||
false => LoadMode::Remote,
|
||||
};
|
||||
|
||||
self.do_try_load(ctx, mode).await
|
||||
match std::fs::read_dir(remote_tenant_dir)
|
||||
.unwrap()
|
||||
.flatten()
|
||||
.next()
|
||||
{
|
||||
Some(entry) => {
|
||||
tracing::debug!(
|
||||
"remote_empty: not empty, found file {}",
|
||||
entry.file_name().to_string_lossy(),
|
||||
);
|
||||
false
|
||||
}
|
||||
None => true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn timeline_path(&self, timeline_id: &TimelineId) -> Utf8PathBuf {
|
||||
|
||||
@@ -6,7 +6,7 @@ use pageserver_api::{models::TenantState, shard::TenantShardId};
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, instrument, Instrument, Span};
|
||||
use tracing::{error, instrument, Instrument};
|
||||
|
||||
use utils::{backoff, completion, crashsafe, fs_ext, id::TimelineId};
|
||||
|
||||
@@ -496,11 +496,7 @@ impl DeleteTenantFlow {
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
.instrument({
|
||||
let span = tracing::info_span!(parent: None, "delete_tenant", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug());
|
||||
span.follows_from(Span::current());
|
||||
span
|
||||
}),
|
||||
.instrument(tracing::info_span!(parent: None, "delete_tenant", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::{
|
||||
use anyhow::Context;
|
||||
use pageserver_api::{models::TimelineState, shard::TenantShardId};
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
use tracing::{debug, error, info, instrument, warn, Instrument, Span};
|
||||
use tracing::{debug, error, info, instrument, warn, Instrument};
|
||||
use utils::{crashsafe, fs_ext, id::TimelineId};
|
||||
|
||||
use crate::{
|
||||
@@ -541,12 +541,7 @@ impl DeleteTimelineFlow {
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
.instrument({
|
||||
let span =
|
||||
tracing::info_span!(parent: None, "delete_timeline", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),timeline_id=%timeline_id);
|
||||
span.follows_from(Span::current());
|
||||
span
|
||||
}),
|
||||
.instrument(tracing::info_span!(parent: None, "delete_timeline", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),timeline_id=%timeline_id)),
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user