mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 04:22:56 +00:00
timeline_detach_ancestor: adjust error handling (#8528)
With additional phases from #8430 the `detach_ancestor::Error` became untenable. Split it up into phases, and introduce laundering for remaining `anyhow::Error` to propagate them as most often `Error::ShuttingDown`. Additionally, complete FIXMEs. Cc: #6994
This commit is contained in:
@@ -42,6 +42,10 @@ impl DownloadError {
|
||||
Timeout | Other(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_cancelled(&self) -> bool {
|
||||
matches!(self, DownloadError::Cancelled)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for DownloadError {
|
||||
|
||||
@@ -1900,8 +1900,7 @@ async fn timeline_detach_ancestor_handler(
|
||||
attempt,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
.await?;
|
||||
|
||||
AncestorDetached {
|
||||
reparented_timelines,
|
||||
|
||||
@@ -1929,61 +1929,51 @@ impl TenantManager {
|
||||
prepared: PreparedTimelineDetach,
|
||||
mut attempt: detach_ancestor::Attempt,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<HashSet<TimelineId>, anyhow::Error> {
|
||||
use crate::tenant::timeline::detach_ancestor::Error;
|
||||
// FIXME: this is unnecessary, slotguard already has these semantics
|
||||
struct RevertOnDropSlot(Option<SlotGuard>);
|
||||
) -> Result<HashSet<TimelineId>, detach_ancestor::Error> {
|
||||
use detach_ancestor::Error;
|
||||
|
||||
impl Drop for RevertOnDropSlot {
|
||||
fn drop(&mut self) {
|
||||
if let Some(taken) = self.0.take() {
|
||||
taken.revert();
|
||||
}
|
||||
}
|
||||
}
|
||||
let slot_guard =
|
||||
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist).map_err(
|
||||
|e| {
|
||||
use TenantSlotError::*;
|
||||
|
||||
impl RevertOnDropSlot {
|
||||
fn into_inner(mut self) -> SlotGuard {
|
||||
self.0.take().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for RevertOnDropSlot {
|
||||
type Target = SlotGuard;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
|
||||
let slot_guard = RevertOnDropSlot(Some(slot_guard));
|
||||
match e {
|
||||
MapState(TenantMapError::ShuttingDown) => Error::ShuttingDown,
|
||||
NotFound(_) | InProgress | MapState(_) => Error::DetachReparent(e.into()),
|
||||
}
|
||||
},
|
||||
)?;
|
||||
|
||||
let tenant = {
|
||||
let Some(old_slot) = slot_guard.get_old_value() else {
|
||||
anyhow::bail!(
|
||||
"Tenant not found when trying to complete detaching timeline ancestor"
|
||||
);
|
||||
};
|
||||
let old_slot = slot_guard
|
||||
.get_old_value()
|
||||
.as_ref()
|
||||
.expect("requested MustExist");
|
||||
|
||||
let Some(tenant) = old_slot.get_attached() else {
|
||||
anyhow::bail!("Tenant is not in attached state");
|
||||
return Err(Error::DetachReparent(anyhow::anyhow!(
|
||||
"Tenant is not in attached state"
|
||||
)));
|
||||
};
|
||||
|
||||
if !tenant.is_active() {
|
||||
anyhow::bail!("Tenant is not active");
|
||||
return Err(Error::DetachReparent(anyhow::anyhow!(
|
||||
"Tenant is not active"
|
||||
)));
|
||||
}
|
||||
|
||||
tenant.clone()
|
||||
};
|
||||
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(Error::NotFound)?;
|
||||
|
||||
let resp = timeline
|
||||
.detach_from_ancestor_and_reparent(&tenant, prepared, ctx)
|
||||
.await?;
|
||||
|
||||
let mut slot_guard = slot_guard.into_inner();
|
||||
let mut slot_guard = slot_guard;
|
||||
|
||||
let tenant = if resp.reset_tenant_required() {
|
||||
attempt.before_reset_tenant();
|
||||
@@ -1991,17 +1981,20 @@ impl TenantManager {
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
match tenant.shutdown(progress, ShutdownMode::Hard).await {
|
||||
Ok(()) => {
|
||||
slot_guard.drop_old_value()?;
|
||||
slot_guard.drop_old_value().expect("it was just shutdown");
|
||||
}
|
||||
Err(_barrier) => {
|
||||
slot_guard.revert();
|
||||
// this really should not happen, at all, unless shutdown was already going?
|
||||
anyhow::bail!("Cannot restart Tenant, already shutting down");
|
||||
// this really should not happen, at all, unless a shutdown without acquiring
|
||||
// tenant slot was already going? regardless, on restart the attempt tracking
|
||||
// will reset to retryable.
|
||||
return Err(Error::ShuttingDown);
|
||||
}
|
||||
}
|
||||
|
||||
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
|
||||
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
|
||||
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)
|
||||
.map_err(|e| Error::DetachReparent(e.into()))?;
|
||||
|
||||
let shard_identity = config.shard;
|
||||
let tenant = tenant_spawn(
|
||||
@@ -2009,12 +2002,13 @@ impl TenantManager {
|
||||
tenant_shard_id,
|
||||
&tenant_path,
|
||||
self.resources.clone(),
|
||||
AttachedTenantConf::try_from(config)?,
|
||||
AttachedTenantConf::try_from(config).map_err(Error::DetachReparent)?,
|
||||
shard_identity,
|
||||
None,
|
||||
SpawnMode::Eager,
|
||||
ctx,
|
||||
)?;
|
||||
)
|
||||
.map_err(|_| Error::ShuttingDown)?;
|
||||
|
||||
{
|
||||
let mut g = tenant.ongoing_timeline_detach.lock().unwrap();
|
||||
@@ -2025,7 +2019,15 @@ impl TenantManager {
|
||||
*g = Some((attempt.timeline_id, attempt.new_barrier()));
|
||||
}
|
||||
|
||||
slot_guard.upsert(TenantSlot::Attached(tenant.clone()))?;
|
||||
// if we bail out here, we will not allow a new attempt, which should be fine.
|
||||
// pageserver should be shutting down regardless? tenant_reset would help, unless it
|
||||
// runs into the same problem.
|
||||
slot_guard
|
||||
.upsert(TenantSlot::Attached(tenant.clone()))
|
||||
.map_err(|e| match e {
|
||||
TenantSlotUpsertError::ShuttingDown(_) => Error::ShuttingDown,
|
||||
other => Error::DetachReparent(other.into()),
|
||||
})?;
|
||||
tenant
|
||||
} else {
|
||||
tracing::info!("skipping tenant_reset as no changes made required it");
|
||||
@@ -2047,7 +2049,7 @@ impl TenantManager {
|
||||
Cancelled | WillNotBecomeActive(TenantState::Stopping { .. }) => {
|
||||
Error::ShuttingDown
|
||||
}
|
||||
other => Error::Unexpected(other.into()),
|
||||
other => Error::Complete(other.into()),
|
||||
}
|
||||
})?;
|
||||
|
||||
@@ -2057,19 +2059,16 @@ impl TenantManager {
|
||||
|
||||
let timeline = tenant
|
||||
.get_timeline(attempt.timeline_id, true)
|
||||
.map_err(|_| Error::DetachedNotFoundAfterRestart)?;
|
||||
.map_err(Error::NotFound)?;
|
||||
|
||||
timeline
|
||||
.complete_detaching_timeline_ancestor(&tenant, attempt, ctx)
|
||||
.await
|
||||
.map(|()| reparented)
|
||||
.map_err(|e| e.into())
|
||||
} else {
|
||||
// at least the latest versions have now been downloaded and refreshed; be ready to
|
||||
// retry another time.
|
||||
Err(anyhow::anyhow!(
|
||||
"failed to reparent all candidate timelines, please retry"
|
||||
))
|
||||
Err(Error::FailedToReparentAll)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2392,6 +2391,9 @@ impl SlotGuard {
|
||||
|
||||
/// Get any value that was present in the slot before we acquired ownership
|
||||
/// of it: in state transitions, this will be the old state.
|
||||
///
|
||||
// FIXME: get_ prefix
|
||||
// FIXME: this should be .as_ref() -- unsure why no clippy
|
||||
fn get_old_value(&self) -> &Option<TenantSlot> {
|
||||
&self.old_value
|
||||
}
|
||||
|
||||
@@ -1612,6 +1612,12 @@ pub(crate) enum DownloadError {
|
||||
Failpoint(failpoints::FailpointKind),
|
||||
}
|
||||
|
||||
impl DownloadError {
|
||||
pub(crate) fn is_cancelled(&self) -> bool {
|
||||
matches!(self, DownloadError::DownloadCancelled)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub(crate) enum NeedsDownload {
|
||||
NotFound,
|
||||
|
||||
@@ -4342,7 +4342,7 @@ impl Timeline {
|
||||
tenant: &crate::tenant::Tenant,
|
||||
prepared: detach_ancestor::PreparedTimelineDetach,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<detach_ancestor::DetachingAndReparenting, anyhow::Error> {
|
||||
) -> Result<detach_ancestor::DetachingAndReparenting, detach_ancestor::Error> {
|
||||
detach_ancestor::detach_and_reparent(self, tenant, prepared, ctx).await
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ use crate::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
task_mgr::TaskKind,
|
||||
tenant::{
|
||||
mgr::GetActiveTenantError,
|
||||
remote_timeline_client::index::GcBlockingReason::DetachAncestor,
|
||||
storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer},
|
||||
Tenant,
|
||||
@@ -23,61 +22,74 @@ use utils::{completion, generation::Generation, http::error::ApiError, id::Timel
|
||||
pub(crate) enum Error {
|
||||
#[error("no ancestors")]
|
||||
NoAncestor,
|
||||
|
||||
#[error("too many ancestors")]
|
||||
TooManyAncestors,
|
||||
|
||||
#[error("shutting down, please retry later")]
|
||||
ShuttingDown,
|
||||
#[error("flushing failed")]
|
||||
FlushAncestor(#[source] FlushLayerError),
|
||||
#[error("layer download failed")]
|
||||
RewrittenDeltaDownloadFailed(#[source] crate::tenant::storage_layer::layer::DownloadError),
|
||||
#[error("copying LSN prefix locally failed")]
|
||||
CopyDeltaPrefix(#[source] anyhow::Error),
|
||||
#[error("upload rewritten layer")]
|
||||
UploadRewritten(#[source] anyhow::Error),
|
||||
|
||||
#[error(transparent)]
|
||||
NotFound(crate::tenant::GetTimelineError),
|
||||
|
||||
#[error("failed to reparent all candidate timelines, please retry")]
|
||||
FailedToReparentAll,
|
||||
|
||||
#[error("ancestor is already being detached by: {}", .0)]
|
||||
OtherTimelineDetachOngoing(TimelineId),
|
||||
|
||||
#[error("remote copying layer failed")]
|
||||
CopyFailed(#[source] anyhow::Error),
|
||||
#[error("preparing to timeline ancestor detach failed")]
|
||||
Prepare(#[source] anyhow::Error),
|
||||
|
||||
#[error("wait for tenant to activate after restarting")]
|
||||
WaitToActivate(#[source] GetActiveTenantError),
|
||||
#[error("detaching and reparenting failed")]
|
||||
DetachReparent(#[source] anyhow::Error),
|
||||
|
||||
#[error("detached timeline was not found after restart")]
|
||||
DetachedNotFoundAfterRestart,
|
||||
|
||||
#[error("unexpected error")]
|
||||
Unexpected(#[source] anyhow::Error),
|
||||
#[error("completing ancestor detach failed")]
|
||||
Complete(#[source] anyhow::Error),
|
||||
|
||||
#[error("failpoint: {}", .0)]
|
||||
Failpoint(&'static str),
|
||||
}
|
||||
|
||||
impl Error {
|
||||
/// Try to catch cancellation from within the `anyhow::Error`, or wrap the anyhow as the given
|
||||
/// variant or fancier `or_else`.
|
||||
fn launder<F>(e: anyhow::Error, or_else: F) -> Error
|
||||
where
|
||||
F: Fn(anyhow::Error) -> Error,
|
||||
{
|
||||
use crate::tenant::remote_timeline_client::WaitCompletionError;
|
||||
use crate::tenant::upload_queue::NotInitialized;
|
||||
use remote_storage::TimeoutOrCancel;
|
||||
|
||||
if e.is::<NotInitialized>()
|
||||
|| TimeoutOrCancel::caused_by_cancel(&e)
|
||||
|| e.downcast_ref::<remote_storage::DownloadError>()
|
||||
.is_some_and(|e| e.is_cancelled())
|
||||
|| e.is::<WaitCompletionError>()
|
||||
{
|
||||
Error::ShuttingDown
|
||||
} else {
|
||||
or_else(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Error> for ApiError {
|
||||
fn from(value: Error) -> Self {
|
||||
match value {
|
||||
e @ Error::NoAncestor => ApiError::Conflict(e.to_string()),
|
||||
// TODO: ApiError converts the anyhow using debug formatting ... just stop using ApiError?
|
||||
e @ Error::TooManyAncestors => ApiError::BadRequest(anyhow::anyhow!("{}", e)),
|
||||
Error::NoAncestor => ApiError::Conflict(value.to_string()),
|
||||
Error::TooManyAncestors => ApiError::BadRequest(anyhow::anyhow!("{}", value)),
|
||||
Error::ShuttingDown => ApiError::ShuttingDown,
|
||||
Error::OtherTimelineDetachOngoing(_) => {
|
||||
ApiError::ResourceUnavailable("other timeline detach is already ongoing".into())
|
||||
Error::OtherTimelineDetachOngoing(_) | Error::FailedToReparentAll => {
|
||||
ApiError::ResourceUnavailable(value.to_string().into())
|
||||
}
|
||||
e @ Error::WaitToActivate(_) => {
|
||||
let s = utils::error::report_compact_sources(&e).to_string();
|
||||
ApiError::ResourceUnavailable(s.into())
|
||||
}
|
||||
// All of these contain shutdown errors, in fact, it's the most common
|
||||
e @ Error::FlushAncestor(_)
|
||||
| e @ Error::RewrittenDeltaDownloadFailed(_)
|
||||
| e @ Error::CopyDeltaPrefix(_)
|
||||
| e @ Error::UploadRewritten(_)
|
||||
| e @ Error::CopyFailed(_)
|
||||
| e @ Error::Unexpected(_)
|
||||
| e @ Error::Failpoint(_) => ApiError::InternalServerError(e.into()),
|
||||
Error::DetachedNotFoundAfterRestart => ApiError::NotFound(value.into()),
|
||||
Error::NotFound(e) => ApiError::from(e),
|
||||
// these variants should have no cancellation errors because of Error::launder
|
||||
Error::Prepare(_)
|
||||
| Error::DetachReparent(_)
|
||||
| Error::Complete(_)
|
||||
| Error::Failpoint(_) => ApiError::InternalServerError(value.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -95,39 +107,6 @@ impl From<super::layer_manager::Shutdown> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<FlushLayerError> for Error {
|
||||
fn from(value: FlushLayerError) -> Self {
|
||||
match value {
|
||||
FlushLayerError::Cancelled => Error::ShuttingDown,
|
||||
FlushLayerError::NotRunning(_) => {
|
||||
// FIXME(#6424): technically statically unreachable right now, given how we never
|
||||
// drop the sender
|
||||
Error::ShuttingDown
|
||||
}
|
||||
FlushLayerError::CreateImageLayersError(_) | FlushLayerError::Other(_) => {
|
||||
Error::FlushAncestor(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetActiveTenantError> for Error {
|
||||
fn from(value: GetActiveTenantError) -> Self {
|
||||
use pageserver_api::models::TenantState;
|
||||
use GetActiveTenantError::*;
|
||||
|
||||
match value {
|
||||
Cancelled | WillNotBecomeActive(TenantState::Stopping { .. }) | SwitchedTenant => {
|
||||
Error::ShuttingDown
|
||||
}
|
||||
WaitForActiveTimeout { .. } | NotFound(_) | Broken(_) | WillNotBecomeActive(_) => {
|
||||
// NotFound seems out-of-place
|
||||
Error::WaitToActivate(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum Progress {
|
||||
Prepared(Attempt, PreparedTimelineDetach),
|
||||
Done(AncestorDetached),
|
||||
@@ -236,7 +215,7 @@ pub(super) async fn prepare(
|
||||
|
||||
let attempt = start_new_attempt(detached, tenant).await?;
|
||||
|
||||
utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking_pausable");
|
||||
utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking-pausable");
|
||||
|
||||
fail::fail_point!(
|
||||
"timeline-detach-ancestor::before_starting_after_locking",
|
||||
@@ -265,7 +244,17 @@ pub(super) async fn prepare(
|
||||
}
|
||||
};
|
||||
|
||||
res?;
|
||||
res.map_err(|e| {
|
||||
use FlushLayerError::*;
|
||||
match e {
|
||||
Cancelled | NotRunning(_) => {
|
||||
// FIXME(#6424): technically statically unreachable right now, given how we never
|
||||
// drop the sender
|
||||
Error::ShuttingDown
|
||||
}
|
||||
CreateImageLayersError(_) | Other(_) => Error::Prepare(e.into()),
|
||||
}
|
||||
})?;
|
||||
|
||||
// we do not need to wait for uploads to complete but we do need `struct Layer`,
|
||||
// copying delta prefix is unsupported currently for `InMemoryLayer`.
|
||||
@@ -346,7 +335,7 @@ pub(super) async fn prepare(
|
||||
}
|
||||
Ok(Ok(None)) => {}
|
||||
Ok(Err(e)) => return Err(e),
|
||||
Err(je) => return Err(Unexpected(je.into())),
|
||||
Err(je) => return Err(Error::Prepare(je.into())),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -394,7 +383,7 @@ pub(super) async fn prepare(
|
||||
Ok(Err(failed)) => {
|
||||
return Err(failed);
|
||||
}
|
||||
Err(je) => return Err(Unexpected(je.into())),
|
||||
Err(je) => return Err(Error::Prepare(je.into())),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -416,8 +405,7 @@ async fn start_new_attempt(detached: &Timeline, tenant: &Tenant) -> Result<Attem
|
||||
crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor,
|
||||
)
|
||||
.await
|
||||
// FIXME: better error
|
||||
.map_err(Error::Unexpected)?;
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
|
||||
Ok(attempt)
|
||||
}
|
||||
@@ -546,19 +534,17 @@ async fn upload_rewritten_layer(
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<Layer>, Error> {
|
||||
use Error::UploadRewritten;
|
||||
let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
|
||||
|
||||
let Some(copied) = copied else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// FIXME: better shuttingdown error
|
||||
target
|
||||
.remote_client
|
||||
.upload_layer_file(&copied, cancel)
|
||||
.await
|
||||
.map_err(UploadRewritten)?;
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
|
||||
Ok(Some(copied.into()))
|
||||
}
|
||||
@@ -569,10 +555,8 @@ async fn copy_lsn_prefix(
|
||||
target_timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<ResidentLayer>, Error> {
|
||||
use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed, ShuttingDown};
|
||||
|
||||
if target_timeline.cancel.is_cancelled() {
|
||||
return Err(ShuttingDown);
|
||||
return Err(Error::ShuttingDown);
|
||||
}
|
||||
|
||||
tracing::debug!(%layer, %end_lsn, "copying lsn prefix");
|
||||
@@ -586,18 +570,22 @@ async fn copy_lsn_prefix(
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(CopyDeltaPrefix)?;
|
||||
.with_context(|| format!("prepare to copy lsn prefix of ancestors {layer}"))
|
||||
.map_err(Error::Prepare)?;
|
||||
|
||||
let resident = layer
|
||||
.download_and_keep_resident()
|
||||
.await
|
||||
// likely shutdown
|
||||
.map_err(RewrittenDeltaDownloadFailed)?;
|
||||
let resident = layer.download_and_keep_resident().await.map_err(|e| {
|
||||
if e.is_cancelled() {
|
||||
Error::ShuttingDown
|
||||
} else {
|
||||
Error::Prepare(e.into())
|
||||
}
|
||||
})?;
|
||||
|
||||
let records = resident
|
||||
.copy_delta_prefix(&mut writer, end_lsn, ctx)
|
||||
.await
|
||||
.map_err(CopyDeltaPrefix)?;
|
||||
.with_context(|| format!("copy lsn prefix of ancestors {layer}"))
|
||||
.map_err(Error::Prepare)?;
|
||||
|
||||
drop(resident);
|
||||
|
||||
@@ -615,9 +603,9 @@ async fn copy_lsn_prefix(
|
||||
let (desc, path) = writer
|
||||
.finish(reused_highest_key, ctx)
|
||||
.await
|
||||
.map_err(CopyDeltaPrefix)?;
|
||||
.map_err(Error::Prepare)?;
|
||||
let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
|
||||
.map_err(CopyDeltaPrefix)?;
|
||||
.map_err(Error::Prepare)?;
|
||||
|
||||
tracing::debug!(%layer, %copied, "new layer produced");
|
||||
|
||||
@@ -633,8 +621,6 @@ async fn remote_copy(
|
||||
generation: Generation,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Layer, Error> {
|
||||
use Error::CopyFailed;
|
||||
|
||||
// depending if Layer::keep_resident we could hardlink
|
||||
|
||||
let mut metadata = adopted.metadata();
|
||||
@@ -648,13 +634,12 @@ async fn remote_copy(
|
||||
metadata,
|
||||
);
|
||||
|
||||
// FIXME: better shuttingdown error
|
||||
adoptee
|
||||
.remote_client
|
||||
.copy_timeline_layer(adopted, &owned, cancel)
|
||||
.await
|
||||
.map(move |()| owned)
|
||||
.map_err(CopyFailed)
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))
|
||||
}
|
||||
|
||||
pub(crate) enum DetachingAndReparenting {
|
||||
@@ -698,7 +683,7 @@ pub(super) async fn detach_and_reparent(
|
||||
tenant: &Tenant,
|
||||
prepared: PreparedTimelineDetach,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<DetachingAndReparenting, anyhow::Error> {
|
||||
) -> Result<DetachingAndReparenting, Error> {
|
||||
let PreparedTimelineDetach { layers } = prepared;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -783,7 +768,8 @@ pub(super) async fn detach_and_reparent(
|
||||
(ancestor.timeline_id, ancestor_lsn),
|
||||
)
|
||||
.await
|
||||
.context("publish layers and detach ancestor")?;
|
||||
.context("publish layers and detach ancestor")
|
||||
.map_err(|e| Error::launder(e, Error::DetachReparent))?;
|
||||
|
||||
tracing::info!(
|
||||
ancestor=%ancestor.timeline_id,
|
||||
@@ -927,8 +913,7 @@ pub(super) async fn complete(
|
||||
crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor,
|
||||
)
|
||||
.await
|
||||
// FIXME: better error
|
||||
.map_err(Error::Unexpected)?;
|
||||
.map_err(|e| Error::launder(e, Error::Complete))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -3006,8 +3006,13 @@ impl Service {
|
||||
Error::ApiError(StatusCode::BAD_REQUEST, msg) => {
|
||||
ApiError::BadRequest(anyhow::anyhow!("{node}: {msg}"))
|
||||
}
|
||||
Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, msg) => {
|
||||
// avoid turning these into conflicts to remain compatible with
|
||||
// pageservers, 500 errors are sadly retryable with timeline ancestor
|
||||
// detach
|
||||
ApiError::InternalServerError(anyhow::anyhow!("{node}: {msg}"))
|
||||
}
|
||||
// rest can be mapped as usual
|
||||
// FIXME: this converts some 500 to 409 which is not per openapi
|
||||
other => passthrough_api_error(&node, other),
|
||||
}
|
||||
})
|
||||
@@ -3041,6 +3046,8 @@ impl Service {
|
||||
?mismatching,
|
||||
"shards returned different results"
|
||||
);
|
||||
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!("pageservers returned mixed results for ancestor detach; manual intervention is required.")));
|
||||
}
|
||||
|
||||
Ok(any.1)
|
||||
|
||||
@@ -97,7 +97,7 @@ def test_ancestor_detach_branched_from(
|
||||
client.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
|
||||
|
||||
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(8192, 16383) g(i);")
|
||||
wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
|
||||
flush_ep_to_pageserver(env, ep, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
deltas = client.layer_map_info(env.initial_tenant, env.initial_timeline).delta_layers()
|
||||
# there is also the in-mem layer, but ignore it for now
|
||||
@@ -452,6 +452,9 @@ def test_compaction_induced_by_detaches_in_history(
|
||||
}
|
||||
)
|
||||
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*await_initial_logical_size: can't get semaphore cancel token, skipping"
|
||||
)
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
def delta_layers(timeline_id: TimelineId):
|
||||
@@ -524,6 +527,7 @@ def test_compaction_induced_by_detaches_in_history(
|
||||
assert len([filter(lambda x: x.l0, delta_layers(branch_timeline_id))]) == 1
|
||||
|
||||
skip_main = branches[1:]
|
||||
|
||||
branch_lsn = client.timeline_detail(env.initial_tenant, branch_timeline_id)["ancestor_lsn"]
|
||||
|
||||
# take the fullbackup before and after inheriting the new L0s
|
||||
@@ -532,6 +536,13 @@ def test_compaction_induced_by_detaches_in_history(
|
||||
env.pageserver, env.initial_tenant, branch_timeline_id, branch_lsn, fullbackup_before
|
||||
)
|
||||
|
||||
# force initial logical sizes, so we can evict all layers from all
|
||||
# timelines and exercise on-demand download for copy lsn prefix
|
||||
client.timeline_detail(
|
||||
env.initial_tenant, env.initial_timeline, force_await_initial_logical_size=True
|
||||
)
|
||||
client.evict_all_layers(env.initial_tenant, env.initial_timeline)
|
||||
|
||||
for _, timeline_id in skip_main:
|
||||
reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
|
||||
assert reparented == set(), "we have no earlier branches at any level"
|
||||
@@ -705,7 +716,7 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
|
||||
log.info(f"stuck pageserver is id={stuck.id}")
|
||||
stuck_http = stuck.http_client()
|
||||
stuck_http.configure_failpoints(
|
||||
("timeline-detach-ancestor::before_starting_after_locking_pausable", "pause")
|
||||
("timeline-detach-ancestor::before_starting_after_locking-pausable", "pause")
|
||||
)
|
||||
|
||||
restarted = pageservers[int(shards[1]["node_id"])]
|
||||
@@ -716,7 +727,7 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
|
||||
restarted_http = restarted.http_client()
|
||||
restarted_http.configure_failpoints(
|
||||
[
|
||||
("timeline-detach-ancestor::before_starting_after_locking_pausable", "pause"),
|
||||
("timeline-detach-ancestor::before_starting_after_locking-pausable", "pause"),
|
||||
]
|
||||
)
|
||||
|
||||
@@ -734,7 +745,7 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
|
||||
target.detach_ancestor(env.initial_tenant, branch_timeline_id, timeout=1)
|
||||
|
||||
stuck_http.configure_failpoints(
|
||||
("timeline-detach-ancestor::before_starting_after_locking_pausable", "off")
|
||||
("timeline-detach-ancestor::before_starting_after_locking-pausable", "off")
|
||||
)
|
||||
|
||||
barrier = threading.Barrier(2)
|
||||
@@ -753,7 +764,7 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
|
||||
# we have 10s, lets use 1/2 of that to help the shutdown start
|
||||
time.sleep(5)
|
||||
restarted_http.configure_failpoints(
|
||||
("timeline-detach-ancestor::before_starting_after_locking_pausable", "off")
|
||||
("timeline-detach-ancestor::before_starting_after_locking-pausable", "off")
|
||||
)
|
||||
fut.result()
|
||||
|
||||
@@ -806,7 +817,7 @@ def test_timeline_detach_ancestor_interrupted_by_deletion(
|
||||
after starting the detach.
|
||||
|
||||
What remains not tested by this:
|
||||
- shutdown winning over complete
|
||||
- shutdown winning over complete, see test_timeline_is_deleted_before_timeline_detach_ancestor_completes
|
||||
"""
|
||||
|
||||
if sharded and mode == "delete_tenant":
|
||||
@@ -833,7 +844,7 @@ def test_timeline_detach_ancestor_interrupted_by_deletion(
|
||||
|
||||
detached_timeline = env.neon_cli.create_branch("detached soon", "main")
|
||||
|
||||
pausepoint = "timeline-detach-ancestor::before_starting_after_locking_pausable"
|
||||
pausepoint = "timeline-detach-ancestor::before_starting_after_locking-pausable"
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
shards = env.storage_controller.locate(env.initial_tenant)
|
||||
@@ -931,7 +942,7 @@ def test_timeline_detach_ancestor_interrupted_by_deletion(
|
||||
_, offset = other.assert_log_contains(".* gc_loop.*: 1 timelines need GC", offset)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("mode", ["delete_reparentable_timeline"])
|
||||
@pytest.mark.parametrize("mode", ["delete_reparentable_timeline", "create_reparentable_timeline"])
|
||||
def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnvBuilder, mode: str):
|
||||
"""
|
||||
Technically possible storage controller concurrent interleaving timeline
|
||||
@@ -943,12 +954,6 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv
|
||||
must be detached.
|
||||
"""
|
||||
|
||||
assert (
|
||||
mode == "delete_reparentable_timeline"
|
||||
), "only one now, but creating reparentable timelines cannot be supported even with gc blocking"
|
||||
# perhaps it could be supported by always doing this for the shard0 first, and after that for others.
|
||||
# when we run shard0 to completion, we can use it's timelines to restrict which can be reparented.
|
||||
|
||||
shard_count = 2
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
|
||||
@@ -980,14 +985,21 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv
|
||||
for ps, shard_id in [(pageservers[int(x["node_id"])], x["shard_id"]) for x in shards]:
|
||||
ps.http_client().timeline_checkpoint(shard_id, env.initial_timeline)
|
||||
|
||||
first_branch = env.neon_cli.create_branch(
|
||||
"first_branch", ancestor_branch_name="main", ancestor_start_lsn=first_branch_lsn
|
||||
)
|
||||
def create_reparentable_timeline() -> TimelineId:
|
||||
return env.neon_cli.create_branch(
|
||||
"first_branch", ancestor_branch_name="main", ancestor_start_lsn=first_branch_lsn
|
||||
)
|
||||
|
||||
if mode == "delete_reparentable_timeline":
|
||||
first_branch = create_reparentable_timeline()
|
||||
else:
|
||||
first_branch = None
|
||||
|
||||
detached_branch = env.neon_cli.create_branch(
|
||||
"detached_branch", ancestor_branch_name="main", ancestor_start_lsn=detached_branch_lsn
|
||||
)
|
||||
|
||||
pausepoint = "timeline-detach-ancestor::before_starting_after_locking_pausable"
|
||||
pausepoint = "timeline-detach-ancestor::before_starting_after_locking-pausable"
|
||||
|
||||
stuck = pageservers[int(shards[0]["node_id"])]
|
||||
stuck_http = stuck.http_client().without_status_retrying()
|
||||
@@ -999,12 +1011,6 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv
|
||||
(pausepoint, "pause"),
|
||||
)
|
||||
|
||||
# noticed a surprising 409 if the other one would fail instead
|
||||
# victim_http.configure_failpoints([
|
||||
# (pausepoint, "pause"),
|
||||
# ("timeline-detach-ancestor::before_starting_after_locking", "return"),
|
||||
# ])
|
||||
|
||||
# interleaving a create_timeline which could be reparented will produce two
|
||||
# permanently different reparentings: one node has reparented, other has
|
||||
# not
|
||||
@@ -1023,6 +1029,7 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv
|
||||
assert detail.get("ancestor_lsn") is None
|
||||
|
||||
def first_branch_gone():
|
||||
assert first_branch is not None
|
||||
try:
|
||||
env.storage_controller.pageserver_api().timeline_detail(
|
||||
env.initial_tenant, first_branch
|
||||
@@ -1043,42 +1050,178 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv
|
||||
stuck_http.configure_failpoints((pausepoint, "off"))
|
||||
wait_until(10, 1.0, first_completed)
|
||||
|
||||
# if we would let victim fail, for some reason there'd be a 409 response instead of 500
|
||||
# victim_http.configure_failpoints((pausepoint, "off"))
|
||||
# with pytest.raises(PageserverApiException, match=".* 500 Internal Server Error failpoint: timeline-detach-ancestor::before_starting_after_locking") as exc:
|
||||
# fut.result()
|
||||
# assert exc.value.status_code == 409
|
||||
|
||||
env.storage_controller.pageserver_api().timeline_delete(
|
||||
env.initial_tenant, first_branch
|
||||
)
|
||||
victim_http.configure_failpoints((pausepoint, "off"))
|
||||
wait_until(10, 1.0, first_branch_gone)
|
||||
if mode == "delete_reparentable_timeline":
|
||||
assert first_branch is not None
|
||||
env.storage_controller.pageserver_api().timeline_delete(
|
||||
env.initial_tenant, first_branch
|
||||
)
|
||||
victim_http.configure_failpoints((pausepoint, "off"))
|
||||
wait_until(10, 1.0, first_branch_gone)
|
||||
elif mode == "create_reparentable_timeline":
|
||||
first_branch = create_reparentable_timeline()
|
||||
victim_http.configure_failpoints((pausepoint, "off"))
|
||||
else:
|
||||
raise RuntimeError("{mode}")
|
||||
|
||||
# it now passes, and we should get an error messages about mixed reparenting as the stuck still had something to reparent
|
||||
fut.result()
|
||||
mixed_results = "pageservers returned mixed results for ancestor detach; manual intervention is required."
|
||||
with pytest.raises(PageserverApiException, match=mixed_results):
|
||||
fut.result()
|
||||
|
||||
msg, offset = env.storage_controller.assert_log_contains(
|
||||
".*/timeline/\\S+/detach_ancestor.*: shards returned different results matching=0 .*"
|
||||
)
|
||||
log.info(f"expected error message: {msg}")
|
||||
env.storage_controller.allowed_errors.append(
|
||||
".*: shards returned different results matching=0 .*"
|
||||
log.info(f"expected error message: {msg.rstrip()}")
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
".*: shards returned different results matching=0 .*",
|
||||
f".*: InternalServerError\\({mixed_results}",
|
||||
]
|
||||
)
|
||||
|
||||
detach_timeline()
|
||||
if mode == "create_reparentable_timeline":
|
||||
with pytest.raises(PageserverApiException, match=mixed_results):
|
||||
detach_timeline()
|
||||
else:
|
||||
# it is a bit shame to flag it and then it suceeds, but most
|
||||
# likely there would be a retry loop which would take care of
|
||||
# this in cplane
|
||||
detach_timeline()
|
||||
|
||||
# FIXME: perhaps the above should be automatically retried, if we get mixed results?
|
||||
not_found = env.storage_controller.log_contains(
|
||||
retried = env.storage_controller.log_contains(
|
||||
".*/timeline/\\S+/detach_ancestor.*: shards returned different results matching=0 .*",
|
||||
offset=offset,
|
||||
offset,
|
||||
)
|
||||
|
||||
assert not_found is None
|
||||
if mode == "delete_reparentable_timeline":
|
||||
assert (
|
||||
retried is None
|
||||
), "detaching should had converged after both nodes saw the deletion"
|
||||
elif mode == "create_reparentable_timeline":
|
||||
assert retried is not None, "detaching should not have converged"
|
||||
_, offset = retried
|
||||
finally:
|
||||
stuck_http.configure_failpoints((pausepoint, "off"))
|
||||
victim_http.configure_failpoints((pausepoint, "off"))
|
||||
|
||||
if mode == "create_reparentable_timeline":
|
||||
assert first_branch is not None
|
||||
# now we have mixed ancestry
|
||||
assert (
|
||||
TimelineId(
|
||||
stuck_http.timeline_detail(shards[0]["shard_id"], first_branch)[
|
||||
"ancestor_timeline_id"
|
||||
]
|
||||
)
|
||||
== env.initial_timeline
|
||||
)
|
||||
assert (
|
||||
TimelineId(
|
||||
victim_http.timeline_detail(shards[-1]["shard_id"], first_branch)[
|
||||
"ancestor_timeline_id"
|
||||
]
|
||||
)
|
||||
== detached_branch
|
||||
)
|
||||
|
||||
# make sure we are still able to repair this by detaching the ancestor on the storage controller in case it ever happens
|
||||
# if the ancestor would be deleted, we would partially fail, making deletion stuck.
|
||||
env.storage_controller.pageserver_api().detach_ancestor(env.initial_tenant, first_branch)
|
||||
|
||||
# and we should now have good results
|
||||
not_found = env.storage_controller.log_contains(
|
||||
".*/timeline/\\S+/detach_ancestor.*: shards returned different results matching=0 .*",
|
||||
offset,
|
||||
)
|
||||
|
||||
assert not_found is None
|
||||
assert (
|
||||
stuck_http.timeline_detail(shards[0]["shard_id"], first_branch)["ancestor_timeline_id"]
|
||||
is None
|
||||
)
|
||||
assert (
|
||||
victim_http.timeline_detail(shards[-1]["shard_id"], first_branch)[
|
||||
"ancestor_timeline_id"
|
||||
]
|
||||
is None
|
||||
)
|
||||
|
||||
|
||||
def test_retryable_500_hit_through_storcon_during_timeline_detach_ancestor(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
shard_count = 2
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
|
||||
|
||||
for ps in env.pageservers:
|
||||
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
|
||||
|
||||
pageservers = dict((int(p.id), p) for p in env.pageservers)
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
shards = env.storage_controller.locate(env.initial_tenant)
|
||||
assert len(set(x["node_id"] for x in shards)) == shard_count
|
||||
|
||||
detached_branch = env.neon_cli.create_branch("detached_branch", ancestor_branch_name="main")
|
||||
|
||||
pausepoint = "timeline-detach-ancestor::before_starting_after_locking-pausable"
|
||||
failpoint = "timeline-detach-ancestor::before_starting_after_locking"
|
||||
|
||||
stuck = pageservers[int(shards[0]["node_id"])]
|
||||
stuck_http = stuck.http_client().without_status_retrying()
|
||||
stuck_http.configure_failpoints(
|
||||
(pausepoint, "pause"),
|
||||
)
|
||||
|
||||
env.storage_controller.allowed_errors.append(
|
||||
f".*Error processing HTTP request: .* failpoint: {failpoint}"
|
||||
)
|
||||
http = env.storage_controller.pageserver_api()
|
||||
|
||||
victim = pageservers[int(shards[-1]["node_id"])]
|
||||
victim.allowed_errors.append(
|
||||
f".*Error processing HTTP request: InternalServerError\\(failpoint: {failpoint}"
|
||||
)
|
||||
victim_http = victim.http_client().without_status_retrying()
|
||||
victim_http.configure_failpoints([(pausepoint, "pause"), (failpoint, "return")])
|
||||
|
||||
def detach_timeline():
|
||||
http.detach_ancestor(env.initial_tenant, detached_branch)
|
||||
|
||||
def paused_at_failpoint():
|
||||
stuck.assert_log_contains(f"at failpoint {pausepoint}")
|
||||
victim.assert_log_contains(f"at failpoint {pausepoint}")
|
||||
|
||||
def first_completed():
|
||||
detail = stuck_http.timeline_detail(shards[0]["shard_id"], detached_branch)
|
||||
log.info(detail)
|
||||
assert detail.get("ancestor_lsn") is None
|
||||
|
||||
with ThreadPoolExecutor(max_workers=1) as pool:
|
||||
try:
|
||||
fut = pool.submit(detach_timeline)
|
||||
wait_until(10, 1.0, paused_at_failpoint)
|
||||
|
||||
# let stuck complete
|
||||
stuck_http.configure_failpoints((pausepoint, "off"))
|
||||
wait_until(10, 1.0, first_completed)
|
||||
|
||||
victim_http.configure_failpoints((pausepoint, "off"))
|
||||
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match=f".*failpoint: {failpoint}",
|
||||
) as exc:
|
||||
fut.result()
|
||||
assert exc.value.status_code == 500
|
||||
|
||||
finally:
|
||||
stuck_http.configure_failpoints((pausepoint, "off"))
|
||||
victim_http.configure_failpoints((pausepoint, "off"))
|
||||
|
||||
victim_http.configure_failpoints((failpoint, "off"))
|
||||
detach_timeline()
|
||||
|
||||
|
||||
def test_retried_detach_ancestor_after_failed_reparenting(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
@@ -1169,7 +1312,7 @@ def test_retried_detach_ancestor_after_failed_reparenting(neon_env_builder: Neon
|
||||
match=".*failed to reparent all candidate timelines, please retry",
|
||||
) as exc:
|
||||
http.detach_ancestor(env.initial_tenant, detached)
|
||||
assert exc.value.status_code == 500
|
||||
assert exc.value.status_code == 503
|
||||
|
||||
# first round -- do more checking to make sure the gc gets paused
|
||||
try_detach()
|
||||
@@ -1323,14 +1466,11 @@ def test_timeline_is_deleted_before_timeline_detach_ancestor_completes(
|
||||
|
||||
http.configure_failpoints((failpoint, "off"))
|
||||
|
||||
with pytest.raises(PageserverApiException) as exc:
|
||||
with pytest.raises(
|
||||
PageserverApiException, match="NotFound: Timeline .* was not found"
|
||||
) as exc:
|
||||
detach.result()
|
||||
|
||||
# FIXME: this should be 404 but because there is another Anyhow conversion it is 500
|
||||
assert exc.value.status_code == 500
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*Error processing HTTP request: InternalServerError\\(detached timeline was not found after restart"
|
||||
)
|
||||
assert exc.value.status_code == 404
|
||||
finally:
|
||||
http.configure_failpoints((failpoint, "off"))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user