From 730db859c741f6e782f721de12e8ec776c4ceb0a Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 15 Jul 2024 20:47:53 +0300 Subject: [PATCH] feat(timeline_detach_ancestor): success idempotency (#8354) Right now timeline detach ancestor reports an error (409, "no ancestor") on a new attempt after successful completion. This makes it troublesome for storage controller retries. Fix it to respond with `200 OK` as if the operation had just completed quickly. Additionally, the returned timeline identifiers in the 200 OK response are now ordered so that responses between different nodes for error comparison are done by the storage controller added in #8353. Design-wise, this PR introduces a new strategy for accessing the latest uploaded IndexPart: `RemoteTimelineClient::initialized_upload_queue(&self) -> Result, NotInitialized>`. It should be a more scalable way to query the latest uploaded `IndexPart` than to add a query method for each question directly on `RemoteTimelineClient`. GC blocking will need to be introduced to make the operation fully idempotent. However, it is idempotent for the cases demonstrated by tests. Cc: #6994 --- pageserver/src/http/routes.rs | 45 +- .../src/tenant/remote_timeline_client.rs | 27 +- .../tenant/remote_timeline_client/index.rs | 26 ++ pageserver/src/tenant/timeline.rs | 8 +- .../src/tenant/timeline/detach_ancestor.rs | 130 +++++- pageserver/src/tenant/upload_queue.rs | 10 +- storage_controller/src/service.rs | 9 +- test_runner/fixtures/pageserver/http.py | 21 +- .../regress/test_timeline_detach_ancestor.py | 430 ++++++++++++++++-- 9 files changed, 632 insertions(+), 74 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 6f8f3e6389..d7ef70477f 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1721,7 +1721,9 @@ async fn timeline_detach_ancestor_handler( request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { - use crate::tenant::timeline::detach_ancestor::Options; + use crate::tenant::timeline::detach_ancestor; + use pageserver_api::models::detach_ancestor::AncestorDetached; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; check_permission(&request, Some(tenant_shard_id.tenant_id))?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; @@ -1729,7 +1731,7 @@ async fn timeline_detach_ancestor_handler( let span = tracing::info_span!("detach_ancestor", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id); async move { - let mut options = Options::default(); + let mut options = detach_ancestor::Options::default(); let rewrite_concurrency = parse_query_param::<_, std::num::NonZeroUsize>(&request, "rewrite_concurrency")?; @@ -1757,27 +1759,36 @@ async fn timeline_detach_ancestor_handler( let timeline = tenant.get_timeline(timeline_id, true)?; - let (_guard, prepared) = timeline + let progress = timeline .prepare_to_detach_from_ancestor(&tenant, options, ctx) .await?; - let res = state - .tenant_manager - .complete_detaching_timeline_ancestor(tenant_shard_id, timeline_id, prepared, ctx) - .await; + // uncomment to allow early as possible Tenant::drop + // drop(tenant); - match res { - Ok(reparented_timelines) => { - let resp = pageserver_api::models::detach_ancestor::AncestorDetached { + let resp = match progress { + detach_ancestor::Progress::Prepared(_guard, prepared) => { + // it would be great to tag the guard on to the tenant activation future + let reparented_timelines = state + .tenant_manager + .complete_detaching_timeline_ancestor( + tenant_shard_id, + timeline_id, + prepared, + ctx, + ) + .await + .context("timeline detach ancestor completion") + .map_err(ApiError::InternalServerError)?; + + AncestorDetached { reparented_timelines, - }; - - json_response(StatusCode::OK, resp) + } } - Err(e) => Err(ApiError::InternalServerError( - e.context("timeline detach completion"), - )), - } + detach_ancestor::Progress::Done(resp) => resp, + }; + + json_response(StatusCode::OK, resp) } .instrument(span) .await diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index bc9364de61..66b759c8e0 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -241,7 +241,7 @@ use self::index::IndexPart; use super::metadata::MetadataUpdate; use super::storage_layer::{Layer, LayerName, ResidentLayer}; -use super::upload_queue::SetDeletedFlagProgress; +use super::upload_queue::{NotInitialized, SetDeletedFlagProgress}; use super::Generation; pub(crate) use download::{ @@ -1930,6 +1930,31 @@ impl RemoteTimelineClient { } } } + + /// Returns an accessor which will hold the UploadQueue mutex for accessing the upload queue + /// externally to RemoteTimelineClient. + pub(crate) fn initialized_upload_queue( + &self, + ) -> Result, NotInitialized> { + let mut inner = self.upload_queue.lock().unwrap(); + inner.initialized_mut()?; + Ok(UploadQueueAccessor { inner }) + } +} + +pub(crate) struct UploadQueueAccessor<'a> { + inner: std::sync::MutexGuard<'a, UploadQueue>, +} + +impl<'a> UploadQueueAccessor<'a> { + pub(crate) fn latest_uploaded_index_part(&self) -> &IndexPart { + match &*self.inner { + UploadQueue::Initialized(x) => &x.clean.0, + UploadQueue::Uninitialized | UploadQueue::Stopped(_) => { + unreachable!("checked before constructing") + } + } + } } pub fn remote_tenant_path(tenant_shard_id: &TenantShardId) -> RemotePath { diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index 6233a3477e..b439df8edb 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -176,6 +176,24 @@ pub(crate) struct Lineage { /// /// If you are adding support for detaching from a hierarchy, consider changing the ancestry /// into a `Vec<(TimelineId, Lsn)>` to be a path instead. + // FIXME: this is insufficient even for path of two timelines for future wal recovery + // purposes: + // + // assuming a "old main" which has received most of the WAL, and has a branch "new main", + // starting a bit before "old main" last_record_lsn. the current version works fine, + // because we will know to replay wal and branch at the recorded Lsn to do wal recovery. + // + // then assuming "new main" would similarly receive a branch right before its last_record_lsn, + // "new new main". the current implementation would just store ("new main", ancestor_lsn, _) + // here. however, we cannot recover from WAL using only that information, we would need the + // whole ancestry here: + // + // ```json + // [ + // ["old main", ancestor_lsn("new main"), _], + // ["new main", ancestor_lsn("new new main"), _] + // ] + // ``` #[serde(skip_serializing_if = "Option::is_none", default)] original_ancestor: Option<(TimelineId, Lsn, NaiveDateTime)>, } @@ -217,6 +235,14 @@ impl Lineage { self.original_ancestor .is_some_and(|(_, ancestor_lsn, _)| ancestor_lsn == lsn) } + + pub(crate) fn is_detached_from_original_ancestor(&self) -> bool { + self.original_ancestor.is_some() + } + + pub(crate) fn is_reparented(&self) -> bool { + !self.reparenting_history.is_empty() + } } #[cfg(test)] diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 0996616a67..239dce8786 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4733,13 +4733,7 @@ impl Timeline { tenant: &crate::tenant::Tenant, options: detach_ancestor::Options, ctx: &RequestContext, - ) -> Result< - ( - completion::Completion, - detach_ancestor::PreparedTimelineDetach, - ), - detach_ancestor::Error, - > { + ) -> Result { detach_ancestor::prepare(self, tenant, options, ctx).await } diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index 4fc89330ba..49ce3db3e6 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -10,6 +10,7 @@ use crate::{ }, virtual_file::{MaybeFatalIo, VirtualFile}, }; +use pageserver_api::models::detach_ancestor::AncestorDetached; use tokio_util::sync::CancellationToken; use tracing::Instrument; use utils::{completion, generation::Generation, http::error::ApiError, id::TimelineId, lsn::Lsn}; @@ -39,6 +40,9 @@ pub(crate) enum Error { #[error("unexpected error")] Unexpected(#[source] anyhow::Error), + + #[error("failpoint: {}", .0)] + Failpoint(&'static str), } impl From for ApiError { @@ -57,11 +61,41 @@ impl From for ApiError { | e @ Error::CopyDeltaPrefix(_) | e @ Error::UploadRewritten(_) | e @ Error::CopyFailed(_) - | e @ Error::Unexpected(_) => ApiError::InternalServerError(e.into()), + | e @ Error::Unexpected(_) + | e @ Error::Failpoint(_) => ApiError::InternalServerError(e.into()), } } } +impl From for Error { + fn from(_: crate::tenant::upload_queue::NotInitialized) -> Self { + // treat all as shutting down signals, even though that is not entirely correct + // (uninitialized state) + Error::ShuttingDown + } +} + +impl From for Error { + fn from(value: FlushLayerError) -> Self { + match value { + FlushLayerError::Cancelled => Error::ShuttingDown, + FlushLayerError::NotRunning(_) => { + // FIXME(#6424): technically statically unreachable right now, given how we never + // drop the sender + Error::ShuttingDown + } + FlushLayerError::CreateImageLayersError(_) | FlushLayerError::Other(_) => { + Error::FlushAncestor(value) + } + } + } +} + +pub(crate) enum Progress { + Prepared(completion::Completion, PreparedTimelineDetach), + Done(AncestorDetached), +} + pub(crate) struct PreparedTimelineDetach { layers: Vec, } @@ -88,7 +122,7 @@ pub(super) async fn prepare( tenant: &Tenant, options: Options, ctx: &RequestContext, -) -> Result<(completion::Completion, PreparedTimelineDetach), Error> { +) -> Result { use Error::*; let Some((ancestor, ancestor_lsn)) = detached @@ -96,15 +130,67 @@ pub(super) async fn prepare( .as_ref() .map(|tl| (tl.clone(), detached.ancestor_lsn)) else { - // TODO: check if we have already been detached; for this we need to read the stored data - // on remote client, for that we need a follow-up which makes uploads cheaper and maintains - // a projection of the commited data. + { + let accessor = detached.remote_client.initialized_upload_queue()?; + + // we are safe to inspect the latest uploaded, because we can only witness this after + // restart is complete and ancestor is no more. + let latest = accessor.latest_uploaded_index_part(); + if !latest.lineage.is_detached_from_original_ancestor() { + return Err(NoAncestor); + } + } + + // detached has previously been detached; let's inspect each of the current timelines and + // report back the timelines which have been reparented by our detach + let mut all_direct_children = tenant + .timelines + .lock() + .unwrap() + .values() + .filter(|tl| matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached))) + .map(|tl| (tl.ancestor_lsn, tl.clone())) + .collect::>(); + + let mut any_shutdown = false; + + all_direct_children.retain( + |(_, tl)| match tl.remote_client.initialized_upload_queue() { + Ok(accessor) => accessor + .latest_uploaded_index_part() + .lineage + .is_reparented(), + Err(_shutdownalike) => { + // not 100% a shutdown, but let's bail early not to give inconsistent results in + // sharded enviroment. + any_shutdown = true; + true + } + }, + ); + + if any_shutdown { + // it could be one or many being deleted; have client retry + return Err(Error::ShuttingDown); + } + + let mut reparented = all_direct_children; + // why this instead of hashset? there is a reason, but I've forgotten it many times. // - // the error is wrong per openapi - return Err(NoAncestor); + // maybe if this was a hashset we would not be able to distinguish some race condition. + reparented.sort_unstable_by_key(|(lsn, tl)| (*lsn, tl.timeline_id)); + + return Ok(Progress::Done(AncestorDetached { + reparented_timelines: reparented + .into_iter() + .map(|(_, tl)| tl.timeline_id) + .collect(), + })); }; if !ancestor_lsn.is_valid() { + // rare case, probably wouldn't even load + tracing::error!("ancestor is set, but ancestor_lsn is invalid, this timeline needs fixing"); return Err(NoAncestor); } @@ -131,6 +217,15 @@ pub(super) async fn prepare( let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?; + utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking_pausable"); + + fail::fail_point!( + "timeline-detach-ancestor::before_starting_after_locking", + |_| Err(Error::Failpoint( + "timeline-detach-ancestor::before_starting_after_locking" + )) + ); + if ancestor_lsn >= ancestor.get_disk_consistent_lsn() { let span = tracing::info_span!("freeze_and_flush", ancestor_timeline_id=%ancestor.timeline_id); @@ -151,7 +246,7 @@ pub(super) async fn prepare( } }; - res.map_err(FlushAncestor)?; + res?; // we do not need to wait for uploads to complete but we do need `struct Layer`, // copying delta prefix is unsupported currently for `InMemoryLayer`. @@ -159,7 +254,7 @@ pub(super) async fn prepare( elapsed_ms = started_at.elapsed().as_millis(), "froze and flushed the ancestor" ); - Ok(()) + Ok::<_, Error>(()) } .instrument(span) .await?; @@ -283,7 +378,7 @@ pub(super) async fn prepare( let prepared = PreparedTimelineDetach { layers: new_layers }; - Ok((guard, prepared)) + Ok(Progress::Prepared(guard, prepared)) } fn partition_work( @@ -350,7 +445,11 @@ async fn copy_lsn_prefix( target_timeline: &Arc, ctx: &RequestContext, ) -> Result, Error> { - use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed}; + use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed, ShuttingDown}; + + if target_timeline.cancel.is_cancelled() { + return Err(ShuttingDown); + } tracing::debug!(%layer, %end_lsn, "copying lsn prefix"); @@ -529,7 +628,7 @@ pub(super) async fn complete( match res { Ok(Some(timeline)) => { tracing::info!(reparented=%timeline.timeline_id, "reparenting done"); - reparented.push(timeline.timeline_id); + reparented.push((timeline.ancestor_lsn, timeline.timeline_id)); } Ok(None) => { // lets just ignore this for now. one or all reparented timelines could had @@ -551,5 +650,12 @@ pub(super) async fn complete( tracing::info!("failed to reparent some candidates"); } + reparented.sort_unstable(); + + let reparented = reparented + .into_iter() + .map(|(_, timeline_id)| timeline_id) + .collect(); + Ok(reparented) } diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 50c977a950..f7440ecdae 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -228,18 +228,20 @@ impl UploadQueue { Ok(self.initialized_mut().expect("we just set it")) } - pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> { + pub(crate) fn initialized_mut( + &mut self, + ) -> Result<&mut UploadQueueInitialized, NotInitialized> { use UploadQueue::*; match self { - Uninitialized => Err(NotInitialized::Uninitialized.into()), + Uninitialized => Err(NotInitialized::Uninitialized), Initialized(x) => { if x.shutting_down { - Err(NotInitialized::ShuttingDown.into()) + Err(NotInitialized::ShuttingDown) } else { Ok(x) } } - Stopped(_) => Err(NotInitialized::Stopped.into()), + Stopped(_) => Err(NotInitialized::Stopped), } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 95522525cb..3c24433c42 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -2830,9 +2830,10 @@ impl Service { match e { // no ancestor (ever) - Error::ApiError(StatusCode::CONFLICT, msg) => { - ApiError::Conflict(format!("{node}: {msg}")) - } + Error::ApiError(StatusCode::CONFLICT, msg) => ApiError::Conflict(format!( + "{node}: {}", + msg.strip_prefix("Conflict: ").unwrap_or(&msg) + )), // too many ancestors Error::ApiError(StatusCode::BAD_REQUEST, msg) => { ApiError::BadRequest(anyhow::anyhow!("{node}: {msg}")) @@ -2859,8 +2860,6 @@ impl Service { let any = results.pop().expect("we must have at least one response"); - // FIXME: the ordering is not stable yet on pageserver, should be (ancestor_lsn, - // TimelineId) let mismatching = results .iter() .filter(|(_, res)| res != &any.1) diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 03aee9e5c5..d66b94948a 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -172,6 +172,21 @@ class PageserverHttpClient(requests.Session, MetricsGetter): if auth_token is not None: self.headers["Authorization"] = f"Bearer {auth_token}" + def without_status_retrying(self) -> PageserverHttpClient: + retries = Retry( + status=0, + connect=5, + read=False, + backoff_factor=0.2, + status_forcelist=[], + allowed_methods=None, + remove_headers_on_redirect=[], + ) + + return PageserverHttpClient( + self.port, self.is_testing_enabled_or_skip, self.auth_token, retries + ) + @property def base_url(self) -> str: return f"http://localhost:{self.port}" @@ -814,17 +829,19 @@ class PageserverHttpClient(requests.Session, MetricsGetter): tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, batch_size: int | None = None, - ) -> Set[TimelineId]: + **kwargs, + ) -> List[TimelineId]: params = {} if batch_size is not None: params["batch_size"] = batch_size res = self.put( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/detach_ancestor", params=params, + **kwargs, ) self.verbose_error(res) json = res.json() - return set(map(TimelineId, json["reparented_timelines"])) + return list(map(TimelineId, json["reparented_timelines"])) def evict_layer( self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str diff --git a/test_runner/regress/test_timeline_detach_ancestor.py b/test_runner/regress/test_timeline_detach_ancestor.py index 803fcac583..d75ab4c060 100644 --- a/test_runner/regress/test_timeline_detach_ancestor.py +++ b/test_runner/regress/test_timeline_detach_ancestor.py @@ -1,5 +1,7 @@ import datetime import enum +import threading +import time from concurrent.futures import ThreadPoolExecutor from queue import Empty, Queue from threading import Barrier @@ -9,6 +11,7 @@ import pytest from fixtures.common_types import Lsn, TimelineId from fixtures.log_helper import log from fixtures.neon_fixtures import ( + LogCursor, NeonEnvBuilder, PgBin, flush_ep_to_pageserver, @@ -17,7 +20,8 @@ from fixtures.neon_fixtures import ( from fixtures.pageserver.http import HistoricLayerInfo, PageserverApiException from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_timeline_detail_404 from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind -from fixtures.utils import assert_pageserver_backups_equal +from fixtures.utils import assert_pageserver_backups_equal, wait_until +from requests import ReadTimeout def by_end_lsn(info: HistoricLayerInfo) -> Lsn: @@ -161,7 +165,7 @@ def test_ancestor_detach_branched_from( ) all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id) - assert all_reparented == set() + assert all_reparented == [] if restart_after: env.pageserver.stop() @@ -270,7 +274,7 @@ def test_ancestor_detach_reparents_earlier(neon_env_builder: NeonEnvBuilder): after = env.neon_cli.create_branch("after", "main", env.initial_tenant, ancestor_start_lsn=None) all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id) - assert all_reparented == {reparented, same_branchpoint} + assert set(all_reparented) == {reparented, same_branchpoint} env.pageserver.quiesce_tenants() @@ -530,7 +534,7 @@ def test_compaction_induced_by_detaches_in_history( for _, timeline_id in skip_main: reparented = client.detach_ancestor(env.initial_tenant, timeline_id) - assert reparented == set(), "we have no earlier branches at any level" + assert reparented == [], "we have no earlier branches at any level" post_detach_l0s = list(filter(lambda x: x.l0, delta_layers(branch_timeline_id))) assert len(post_detach_l0s) == 5, "should had inherited 4 L0s, have 5 in total" @@ -561,7 +565,9 @@ def test_compaction_induced_by_detaches_in_history( @pytest.mark.parametrize("sharded", [True, False]) -def test_timeline_ancestor_detach_errors(neon_env_builder: NeonEnvBuilder, sharded: bool): +def test_timeline_ancestor_detach_idempotent_success( + neon_env_builder: NeonEnvBuilder, sharded: bool +): shards = 2 if sharded else 1 neon_env_builder.num_pageservers = shards @@ -579,28 +585,28 @@ def test_timeline_ancestor_detach_errors(neon_env_builder: NeonEnvBuilder, shard else: client = env.pageserver.http_client() - with pytest.raises(PageserverApiException, match=".* no ancestors") as info: - client.detach_ancestor(env.initial_tenant, env.initial_timeline) - assert info.value.status_code == 409 - first_branch = env.neon_cli.create_branch("first_branch") - second_branch = env.neon_cli.create_branch("second_branch", ancestor_branch_name="first_branch") - # funnily enough this does not have a prefix - with pytest.raises(PageserverApiException, match="too many ancestors") as info: - client.detach_ancestor(env.initial_tenant, second_branch) - assert info.value.status_code == 400 + _ = env.neon_cli.create_branch("second_branch", ancestor_branch_name="first_branch") - client.detach_ancestor(env.initial_tenant, first_branch) + # these two will be reparented, and they should be returned in stable order + # from pageservers OR otherwise there will be an `error!` logging from + # storage controller + reparented1 = env.neon_cli.create_branch("first_reparented", ancestor_branch_name="main") + reparented2 = env.neon_cli.create_branch("second_reparented", ancestor_branch_name="main") + + first_reparenting_response = client.detach_ancestor(env.initial_tenant, first_branch) + assert set(first_reparenting_response) == {reparented1, reparented2} # FIXME: this should be done by the http req handler for ps in pageservers.values(): ps.quiesce_tenants() - with pytest.raises(PageserverApiException, match=".* no ancestors") as info: - client.detach_ancestor(env.initial_tenant, first_branch) - # FIXME: this should be 200 OK because we've already completed it - assert info.value.status_code == 409 + for _ in range(5): + # once completed, we can retry this how many times + assert ( + client.detach_ancestor(env.initial_tenant, first_branch) == first_reparenting_response + ) client.tenant_delete(env.initial_tenant) @@ -609,7 +615,50 @@ def test_timeline_ancestor_detach_errors(neon_env_builder: NeonEnvBuilder, shard assert e.value.status_code == 404 +@pytest.mark.parametrize("sharded", [True, False]) +def test_timeline_ancestor_detach_errors(neon_env_builder: NeonEnvBuilder, sharded: bool): + # the test is split from test_timeline_ancestor_detach_idempotent_success as only these error cases should create "request was dropped before completing", + # given the current first error handling + shards = 2 if sharded else 1 + + neon_env_builder.num_pageservers = shards + env = neon_env_builder.init_start(initial_tenant_shard_count=shards if sharded else None) + + pageservers = dict((int(p.id), p) for p in env.pageservers) + + for ps in pageservers.values(): + ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS) + ps.allowed_errors.append( + ".* WARN .* path=/v1/tenant/.*/timeline/.*/detach_ancestor request_id=.*: request was dropped before completing" + ) + + client = ( + env.pageserver.http_client() if not sharded else env.storage_controller.pageserver_api() + ) + + with pytest.raises(PageserverApiException, match=".* no ancestors") as info: + client.detach_ancestor(env.initial_tenant, env.initial_timeline) + assert info.value.status_code == 409 + + _ = env.neon_cli.create_branch("first_branch") + + second_branch = env.neon_cli.create_branch("second_branch", ancestor_branch_name="first_branch") + + # funnily enough this does not have a prefix + with pytest.raises(PageserverApiException, match="too many ancestors") as info: + client.detach_ancestor(env.initial_tenant, second_branch) + assert info.value.status_code == 400 + + def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder): + """ + Sharded timeline detach ancestor; 4 nodes: 1 stuck, 1 restarted, 2 normal. + + Stuck node gets stuck on a pause failpoint for first storage controller request. + Restarted node remains stuck until explicit restart from test code. + + We retry the request until storage controller gets 200 OK from all nodes. + """ branch_name = "soon_detached" shard_count = 4 neon_env_builder.num_pageservers = shard_count @@ -621,8 +670,15 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder): # FIXME: should this be in the neon_env_builder.init_start? env.storage_controller.reconcile_until_idle() + # as we will stop a node, make sure there is no clever rebalancing + env.storage_controller.tenant_policy_update(env.initial_tenant, body={"scheduling": "Stop"}) + env.storage_controller.allowed_errors.append(".*: Scheduling is disabled by policy Stop .*") + shards = env.storage_controller.locate(env.initial_tenant) + utilized_pageservers = {x["node_id"] for x in shards} + assert len(utilized_pageservers) > 1, "all shards got placed on single pageserver?" + branch_timeline_id = env.neon_cli.create_branch(branch_name, tenant_id=env.initial_tenant) with env.endpoints.create_start(branch_name, tenant_id=env.initial_tenant) as ep: @@ -642,7 +698,79 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder): assert Lsn(detail["initdb_lsn"]) < lsn assert TimelineId(detail["ancestor_timeline_id"]) == env.initial_timeline - env.storage_controller.pageserver_api().detach_ancestor(env.initial_tenant, branch_timeline_id) + # make one of the nodes get stuck, but continue the initial operation + # make another of the nodes get stuck, then restart + + stuck = pageservers[int(shards[0]["node_id"])] + stuck.allowed_errors.append(".*: request was dropped before completing") + env.storage_controller.allowed_errors.append(".*: request was dropped before completing") + stuck_http = stuck.http_client() + stuck_http.configure_failpoints( + ("timeline-detach-ancestor::before_starting_after_locking_pausable", "pause") + ) + + restarted = pageservers[int(shards[1]["node_id"])] + restarted.allowed_errors.extend( + [ + ".*: request was dropped before completing", + ".*: Cancelled request finished with an error: ShuttingDown", + ] + ) + assert restarted.id != stuck.id + restarted_http = restarted.http_client() + restarted_http.configure_failpoints( + [ + ("timeline-detach-ancestor::before_starting_after_locking_pausable", "pause"), + ] + ) + + target = env.storage_controller.pageserver_api() + + with pytest.raises(ReadTimeout): + target.detach_ancestor(env.initial_tenant, branch_timeline_id, timeout=1) + + stuck_http.configure_failpoints( + ("timeline-detach-ancestor::before_starting_after_locking_pausable", "off") + ) + + barrier = threading.Barrier(2) + + def restart_restarted(): + barrier.wait() + # graceful shutdown should just work, because simultaneously unpaused + restarted.stop() + # this does not happen always, depends how fast we exit after unpausing + # restarted.assert_log_contains("Cancelled request finished with an error: ShuttingDown") + restarted.start() + + with ThreadPoolExecutor(max_workers=1) as pool: + fut = pool.submit(restart_restarted) + barrier.wait() + # we have 10s, lets use 1/2 of that to help the shutdown start + time.sleep(5) + restarted_http.configure_failpoints( + ("timeline-detach-ancestor::before_starting_after_locking_pausable", "off") + ) + fut.result() + + # detach ancestor request handling is not sensitive to http cancellation. + # this means that the "stuck" is on its way to complete the detach, but the restarted is off + # now it can either be complete on all nodes, or still in progress with + # one. + without_retrying = target.without_status_retrying() + + # this retry loop will be long enough that the tenant can always activate + reparented = None + for _ in range(10): + try: + reparented = without_retrying.detach_ancestor(env.initial_tenant, branch_timeline_id) + except PageserverApiException as info: + assert info.status_code == 503 + time.sleep(2) + else: + break + + assert reparented == [], "too many retries (None) or unexpected reparentings" for shard_info in shards: node_id = int(shard_info["node_id"]) @@ -661,8 +789,262 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder): assert count == 10000 +@pytest.mark.parametrize("mode", ["delete_timeline", "delete_tenant"]) +@pytest.mark.parametrize("sharded", [False, True]) +def test_timeline_detach_ancestor_interrupted_by_deletion( + neon_env_builder: NeonEnvBuilder, mode: str, sharded: bool +): + """ + Timeline ancestor detach interrupted by deleting either: + - the detached timeline + - the whole tenant + + after starting the detach. + + What remains not tested by this: + - shutdown winning over complete + + Shutdown winning over complete needs gc blocking and reparenting any left-overs on retry. + """ + + if sharded and mode == "delete_tenant": + # the shared/exclusive lock for tenant is blocking this: + # timeline detach ancestor takes shared, delete tenant takes exclusive + pytest.skip( + "tenant deletion while timeline ancestor detach is underway is not supported yet" + ) + + shard_count = 2 if sharded else 1 + + neon_env_builder.num_pageservers = shard_count + + env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count if sharded else None) + + for ps in env.pageservers: + ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS) + + pageservers = dict((int(p.id), p) for p in env.pageservers) + + detached_timeline = env.neon_cli.create_branch("detached soon", "main") + + failpoint = "timeline-detach-ancestor::before_starting_after_locking_pausable" + + env.storage_controller.reconcile_until_idle() + shards = env.storage_controller.locate(env.initial_tenant) + + assert len(set(info["node_id"] for info in shards)) == shard_count + + target = env.storage_controller.pageserver_api() if sharded else env.pageserver.http_client() + target = target.without_status_retrying() + + victim = pageservers[int(shards[-1]["node_id"])] + victim_http = victim.http_client() + victim_http.configure_failpoints((failpoint, "pause")) + + def detach_ancestor(): + target.detach_ancestor(env.initial_tenant, detached_timeline) + + def at_failpoint() -> Tuple[str, LogCursor]: + return victim.assert_log_contains(f"at failpoint {failpoint}") + + def start_delete(): + if mode == "delete_timeline": + target.timeline_delete(env.initial_tenant, detached_timeline) + elif mode == "delete_tenant": + target.tenant_delete(env.initial_tenant) + else: + raise RuntimeError(f"unimplemented mode {mode}") + + def at_waiting_on_gate_close(start_offset: LogCursor) -> LogCursor: + _, offset = victim.assert_log_contains( + "closing is taking longer than expected", offset=start_offset + ) + return offset + + def is_deleted(): + try: + if mode == "delete_timeline": + target.timeline_detail(env.initial_tenant, detached_timeline) + elif mode == "delete_tenant": + target.tenant_status(env.initial_tenant) + else: + return False + except PageserverApiException as e: + assert e.status_code == 404 + return True + else: + raise RuntimeError("waiting for 404") + + with ThreadPoolExecutor(max_workers=2) as pool: + try: + fut = pool.submit(detach_ancestor) + _, offset = wait_until(10, 1.0, at_failpoint) + + delete = pool.submit(start_delete) + + wait_until(10, 1.0, lambda: at_waiting_on_gate_close(offset)) + + victim_http.configure_failpoints((failpoint, "off")) + + delete.result() + + assert wait_until(10, 1.0, is_deleted), f"unimplemented mode {mode}" + + with pytest.raises(PageserverApiException) as exc: + fut.result() + assert exc.value.status_code == 503 + finally: + victim_http.configure_failpoints((failpoint, "off")) + + +@pytest.mark.parametrize("mode", ["delete_reparentable_timeline"]) +def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnvBuilder, mode: str): + """ + Technically possible storage controller concurrent interleaving timeline + deletion with timeline detach. + + Deletion is fine, as any sharded pageservers reach the same end state, but + creating reparentable timeline would create an issue as the two nodes would + never agree. There is a solution though: the created reparentable timeline + must be detached. + """ + + assert ( + mode == "delete_reparentable_timeline" + ), "only one now, but we could have the create just as well, need gc blocking" + + shard_count = 2 + neon_env_builder.num_pageservers = shard_count + env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count) + + for ps in env.pageservers: + ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS) + + pageservers = dict((int(p.id), p) for p in env.pageservers) + + env.storage_controller.reconcile_until_idle() + shards = env.storage_controller.locate(env.initial_tenant) + assert len(set(x["node_id"] for x in shards)) == shard_count + + with env.endpoints.create_start("main") as ep: + ep.safe_psql("create table foo as select i::bigint from generate_series(1, 1000) t(i)") + + # as the interleaved operation, we will delete this timeline, which was reparenting candidate + first_branch_lsn = wait_for_last_flush_lsn( + env, ep, env.initial_tenant, env.initial_timeline + ) + for ps, shard_id in [(pageservers[int(x["node_id"])], x["shard_id"]) for x in shards]: + ps.http_client().timeline_checkpoint(shard_id, env.initial_timeline) + + ep.safe_psql("create table bar as select i::bigint from generate_series(1, 2000) t(i)") + detached_branch_lsn = flush_ep_to_pageserver( + env, ep, env.initial_tenant, env.initial_timeline + ) + + for ps, shard_id in [(pageservers[int(x["node_id"])], x["shard_id"]) for x in shards]: + ps.http_client().timeline_checkpoint(shard_id, env.initial_timeline) + + first_branch = env.neon_cli.create_branch( + "first_branch", ancestor_branch_name="main", ancestor_start_lsn=first_branch_lsn + ) + detached_branch = env.neon_cli.create_branch( + "detached_branch", ancestor_branch_name="main", ancestor_start_lsn=detached_branch_lsn + ) + + pausepoint = "timeline-detach-ancestor::before_starting_after_locking_pausable" + + stuck = pageservers[int(shards[0]["node_id"])] + stuck_http = stuck.http_client().without_status_retrying() + stuck_http.configure_failpoints((pausepoint, "pause")) + + victim = pageservers[int(shards[-1]["node_id"])] + victim_http = victim.http_client().without_status_retrying() + victim_http.configure_failpoints( + (pausepoint, "pause"), + ) + + # noticed a surprising 409 if the other one would fail instead + # victim_http.configure_failpoints([ + # (pausepoint, "pause"), + # ("timeline-detach-ancestor::before_starting_after_locking", "return"), + # ]) + + # interleaving a create_timeline which could be reparented will produce two + # permanently different reparentings: one node has reparented, other has + # not + # + # with deletion there is no such problem + def detach_timeline(): + env.storage_controller.pageserver_api().detach_ancestor(env.initial_tenant, detached_branch) + + def paused_at_failpoint(): + stuck.assert_log_contains(f"at failpoint {pausepoint}") + victim.assert_log_contains(f"at failpoint {pausepoint}") + + def first_completed(): + detail = stuck_http.timeline_detail(shards[0]["shard_id"], detached_branch) + log.info(detail) + assert detail.get("ancestor_lsn") is None + + def first_branch_gone(): + try: + env.storage_controller.pageserver_api().timeline_detail( + env.initial_tenant, first_branch + ) + except PageserverApiException as e: + log.info(f"error {e}") + assert e.status_code == 404 + else: + log.info("still ok") + raise RuntimeError("not done yet") + + with ThreadPoolExecutor(max_workers=1) as pool: + try: + fut = pool.submit(detach_timeline) + wait_until(10, 1.0, paused_at_failpoint) + + # let stuck complete + stuck_http.configure_failpoints((pausepoint, "off")) + wait_until(10, 1.0, first_completed) + + # if we would let victim fail, for some reason there'd be a 409 response instead of 500 + # victim_http.configure_failpoints((pausepoint, "off")) + # with pytest.raises(PageserverApiException, match=".* 500 Internal Server Error failpoint: timeline-detach-ancestor::before_starting_after_locking") as exc: + # fut.result() + # assert exc.value.status_code == 409 + + env.storage_controller.pageserver_api().timeline_delete( + env.initial_tenant, first_branch + ) + victim_http.configure_failpoints((pausepoint, "off")) + wait_until(10, 1.0, first_branch_gone) + + # it now passes, and we should get an error messages about mixed reparenting as the stuck still had something to reparent + fut.result() + + msg, offset = env.storage_controller.assert_log_contains( + ".*/timeline/\\S+/detach_ancestor.*: shards returned different results matching=0 .*" + ) + log.info(f"expected error message: {msg}") + env.storage_controller.allowed_errors.append( + ".*: shards returned different results matching=0 .*" + ) + + detach_timeline() + + # FIXME: perhaps the above should be automatically retried, if we get mixed results? + not_found = env.storage_controller.log_contains( + ".*/timeline/\\S+/detach_ancestor.*: shards returned different results matching=0 .*", + offset=offset, + ) + + assert not_found is None + finally: + stuck_http.configure_failpoints((pausepoint, "off")) + victim_http.configure_failpoints((pausepoint, "off")) + + # TODO: -# - after starting the operation, tenant is deleted # - after starting the operation, pageserver is shutdown, restarted # - after starting the operation, bottom-most timeline is deleted, pageserver is restarted, gc is inhibited # - deletion of reparented while reparenting should fail once, then succeed (?) @@ -670,9 +1052,5 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder): # - investigate: why are layers started at uneven lsn? not just after branching, but in general. # # TEST: 1. tad which partially succeeds, one returns 500 -# 2. create branch below timeline? or delete timeline below +# 2. create branch below timeline? ~or delete reparented timeline~ (done) # 3. on retry all should report the same reparented timelines -# -# TEST: 1. tad is started, one node stalls, other restarts -# 2. client timeout before stall over -# 3. on retry with stalled and other being able to proceed