feat(timeline_detach_ancestor): success idempotency (#8354)

Right now timeline detach ancestor reports an error (409, "no ancestor")
on a new attempt after successful completion. This makes it troublesome
for storage controller retries. Fix it to respond with `200 OK` as if
the operation had just completed quickly.

Additionally, the returned timeline identifiers in the 200 OK response
are now ordered so that responses between different nodes for error
comparison are done by the storage controller added in #8353.

Design-wise, this PR introduces a new strategy for accessing the latest
uploaded IndexPart:
`RemoteTimelineClient::initialized_upload_queue(&self) ->
Result<UploadQueueAccessor<'_>, NotInitialized>`. It should be a more
scalable way to query the latest uploaded `IndexPart` than to add a
query method for each question directly on `RemoteTimelineClient`.

GC blocking will need to be introduced to make the operation fully
idempotent. However, it is idempotent for the cases demonstrated by
tests.

Cc: #6994
This commit is contained in:
Joonas Koivunen
2024-07-15 20:47:53 +03:00
committed by Christian Schwarz
parent 2a3a136474
commit 957f99cad5
9 changed files with 632 additions and 74 deletions

View File

@@ -1721,7 +1721,9 @@ async fn timeline_detach_ancestor_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
use crate::tenant::timeline::detach_ancestor::Options;
use crate::tenant::timeline::detach_ancestor;
use pageserver_api::models::detach_ancestor::AncestorDetached;
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
@@ -1729,7 +1731,7 @@ async fn timeline_detach_ancestor_handler(
let span = tracing::info_span!("detach_ancestor", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id);
async move {
let mut options = Options::default();
let mut options = detach_ancestor::Options::default();
let rewrite_concurrency =
parse_query_param::<_, std::num::NonZeroUsize>(&request, "rewrite_concurrency")?;
@@ -1757,27 +1759,36 @@ async fn timeline_detach_ancestor_handler(
let timeline = tenant.get_timeline(timeline_id, true)?;
let (_guard, prepared) = timeline
let progress = timeline
.prepare_to_detach_from_ancestor(&tenant, options, ctx)
.await?;
let res = state
.tenant_manager
.complete_detaching_timeline_ancestor(tenant_shard_id, timeline_id, prepared, ctx)
.await;
// uncomment to allow early as possible Tenant::drop
// drop(tenant);
match res {
Ok(reparented_timelines) => {
let resp = pageserver_api::models::detach_ancestor::AncestorDetached {
let resp = match progress {
detach_ancestor::Progress::Prepared(_guard, prepared) => {
// it would be great to tag the guard on to the tenant activation future
let reparented_timelines = state
.tenant_manager
.complete_detaching_timeline_ancestor(
tenant_shard_id,
timeline_id,
prepared,
ctx,
)
.await
.context("timeline detach ancestor completion")
.map_err(ApiError::InternalServerError)?;
AncestorDetached {
reparented_timelines,
};
json_response(StatusCode::OK, resp)
}
}
Err(e) => Err(ApiError::InternalServerError(
e.context("timeline detach completion"),
)),
}
detach_ancestor::Progress::Done(resp) => resp,
};
json_response(StatusCode::OK, resp)
}
.instrument(span)
.await

View File

@@ -241,7 +241,7 @@ use self::index::IndexPart;
use super::metadata::MetadataUpdate;
use super::storage_layer::{Layer, LayerName, ResidentLayer};
use super::upload_queue::SetDeletedFlagProgress;
use super::upload_queue::{NotInitialized, SetDeletedFlagProgress};
use super::Generation;
pub(crate) use download::{
@@ -1930,6 +1930,31 @@ impl RemoteTimelineClient {
}
}
}
/// Returns an accessor which will hold the UploadQueue mutex for accessing the upload queue
/// externally to RemoteTimelineClient.
pub(crate) fn initialized_upload_queue(
&self,
) -> Result<UploadQueueAccessor<'_>, NotInitialized> {
let mut inner = self.upload_queue.lock().unwrap();
inner.initialized_mut()?;
Ok(UploadQueueAccessor { inner })
}
}
pub(crate) struct UploadQueueAccessor<'a> {
inner: std::sync::MutexGuard<'a, UploadQueue>,
}
impl<'a> UploadQueueAccessor<'a> {
pub(crate) fn latest_uploaded_index_part(&self) -> &IndexPart {
match &*self.inner {
UploadQueue::Initialized(x) => &x.clean.0,
UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
unreachable!("checked before constructing")
}
}
}
}
pub fn remote_tenant_path(tenant_shard_id: &TenantShardId) -> RemotePath {

View File

@@ -176,6 +176,24 @@ pub(crate) struct Lineage {
///
/// If you are adding support for detaching from a hierarchy, consider changing the ancestry
/// into a `Vec<(TimelineId, Lsn)>` to be a path instead.
// FIXME: this is insufficient even for path of two timelines for future wal recovery
// purposes:
//
// assuming a "old main" which has received most of the WAL, and has a branch "new main",
// starting a bit before "old main" last_record_lsn. the current version works fine,
// because we will know to replay wal and branch at the recorded Lsn to do wal recovery.
//
// then assuming "new main" would similarly receive a branch right before its last_record_lsn,
// "new new main". the current implementation would just store ("new main", ancestor_lsn, _)
// here. however, we cannot recover from WAL using only that information, we would need the
// whole ancestry here:
//
// ```json
// [
// ["old main", ancestor_lsn("new main"), _],
// ["new main", ancestor_lsn("new new main"), _]
// ]
// ```
#[serde(skip_serializing_if = "Option::is_none", default)]
original_ancestor: Option<(TimelineId, Lsn, NaiveDateTime)>,
}
@@ -217,6 +235,14 @@ impl Lineage {
self.original_ancestor
.is_some_and(|(_, ancestor_lsn, _)| ancestor_lsn == lsn)
}
pub(crate) fn is_detached_from_original_ancestor(&self) -> bool {
self.original_ancestor.is_some()
}
pub(crate) fn is_reparented(&self) -> bool {
!self.reparenting_history.is_empty()
}
}
#[cfg(test)]

View File

@@ -4733,13 +4733,7 @@ impl Timeline {
tenant: &crate::tenant::Tenant,
options: detach_ancestor::Options,
ctx: &RequestContext,
) -> Result<
(
completion::Completion,
detach_ancestor::PreparedTimelineDetach,
),
detach_ancestor::Error,
> {
) -> Result<detach_ancestor::Progress, detach_ancestor::Error> {
detach_ancestor::prepare(self, tenant, options, ctx).await
}

View File

@@ -10,6 +10,7 @@ use crate::{
},
virtual_file::{MaybeFatalIo, VirtualFile},
};
use pageserver_api::models::detach_ancestor::AncestorDetached;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{completion, generation::Generation, http::error::ApiError, id::TimelineId, lsn::Lsn};
@@ -39,6 +40,9 @@ pub(crate) enum Error {
#[error("unexpected error")]
Unexpected(#[source] anyhow::Error),
#[error("failpoint: {}", .0)]
Failpoint(&'static str),
}
impl From<Error> for ApiError {
@@ -57,11 +61,41 @@ impl From<Error> for ApiError {
| e @ Error::CopyDeltaPrefix(_)
| e @ Error::UploadRewritten(_)
| e @ Error::CopyFailed(_)
| e @ Error::Unexpected(_) => ApiError::InternalServerError(e.into()),
| e @ Error::Unexpected(_)
| e @ Error::Failpoint(_) => ApiError::InternalServerError(e.into()),
}
}
}
impl From<crate::tenant::upload_queue::NotInitialized> for Error {
fn from(_: crate::tenant::upload_queue::NotInitialized) -> Self {
// treat all as shutting down signals, even though that is not entirely correct
// (uninitialized state)
Error::ShuttingDown
}
}
impl From<FlushLayerError> for Error {
fn from(value: FlushLayerError) -> Self {
match value {
FlushLayerError::Cancelled => Error::ShuttingDown,
FlushLayerError::NotRunning(_) => {
// FIXME(#6424): technically statically unreachable right now, given how we never
// drop the sender
Error::ShuttingDown
}
FlushLayerError::CreateImageLayersError(_) | FlushLayerError::Other(_) => {
Error::FlushAncestor(value)
}
}
}
}
pub(crate) enum Progress {
Prepared(completion::Completion, PreparedTimelineDetach),
Done(AncestorDetached),
}
pub(crate) struct PreparedTimelineDetach {
layers: Vec<Layer>,
}
@@ -88,7 +122,7 @@ pub(super) async fn prepare(
tenant: &Tenant,
options: Options,
ctx: &RequestContext,
) -> Result<(completion::Completion, PreparedTimelineDetach), Error> {
) -> Result<Progress, Error> {
use Error::*;
let Some((ancestor, ancestor_lsn)) = detached
@@ -96,15 +130,67 @@ pub(super) async fn prepare(
.as_ref()
.map(|tl| (tl.clone(), detached.ancestor_lsn))
else {
// TODO: check if we have already been detached; for this we need to read the stored data
// on remote client, for that we need a follow-up which makes uploads cheaper and maintains
// a projection of the commited data.
{
let accessor = detached.remote_client.initialized_upload_queue()?;
// we are safe to inspect the latest uploaded, because we can only witness this after
// restart is complete and ancestor is no more.
let latest = accessor.latest_uploaded_index_part();
if !latest.lineage.is_detached_from_original_ancestor() {
return Err(NoAncestor);
}
}
// detached has previously been detached; let's inspect each of the current timelines and
// report back the timelines which have been reparented by our detach
let mut all_direct_children = tenant
.timelines
.lock()
.unwrap()
.values()
.filter(|tl| matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached)))
.map(|tl| (tl.ancestor_lsn, tl.clone()))
.collect::<Vec<_>>();
let mut any_shutdown = false;
all_direct_children.retain(
|(_, tl)| match tl.remote_client.initialized_upload_queue() {
Ok(accessor) => accessor
.latest_uploaded_index_part()
.lineage
.is_reparented(),
Err(_shutdownalike) => {
// not 100% a shutdown, but let's bail early not to give inconsistent results in
// sharded enviroment.
any_shutdown = true;
true
}
},
);
if any_shutdown {
// it could be one or many being deleted; have client retry
return Err(Error::ShuttingDown);
}
let mut reparented = all_direct_children;
// why this instead of hashset? there is a reason, but I've forgotten it many times.
//
// the error is wrong per openapi
return Err(NoAncestor);
// maybe if this was a hashset we would not be able to distinguish some race condition.
reparented.sort_unstable_by_key(|(lsn, tl)| (*lsn, tl.timeline_id));
return Ok(Progress::Done(AncestorDetached {
reparented_timelines: reparented
.into_iter()
.map(|(_, tl)| tl.timeline_id)
.collect(),
}));
};
if !ancestor_lsn.is_valid() {
// rare case, probably wouldn't even load
tracing::error!("ancestor is set, but ancestor_lsn is invalid, this timeline needs fixing");
return Err(NoAncestor);
}
@@ -131,6 +217,15 @@ pub(super) async fn prepare(
let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?;
utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking_pausable");
fail::fail_point!(
"timeline-detach-ancestor::before_starting_after_locking",
|_| Err(Error::Failpoint(
"timeline-detach-ancestor::before_starting_after_locking"
))
);
if ancestor_lsn >= ancestor.get_disk_consistent_lsn() {
let span =
tracing::info_span!("freeze_and_flush", ancestor_timeline_id=%ancestor.timeline_id);
@@ -151,7 +246,7 @@ pub(super) async fn prepare(
}
};
res.map_err(FlushAncestor)?;
res?;
// we do not need to wait for uploads to complete but we do need `struct Layer`,
// copying delta prefix is unsupported currently for `InMemoryLayer`.
@@ -159,7 +254,7 @@ pub(super) async fn prepare(
elapsed_ms = started_at.elapsed().as_millis(),
"froze and flushed the ancestor"
);
Ok(())
Ok::<_, Error>(())
}
.instrument(span)
.await?;
@@ -283,7 +378,7 @@ pub(super) async fn prepare(
let prepared = PreparedTimelineDetach { layers: new_layers };
Ok((guard, prepared))
Ok(Progress::Prepared(guard, prepared))
}
fn partition_work(
@@ -350,7 +445,11 @@ async fn copy_lsn_prefix(
target_timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> Result<Option<ResidentLayer>, Error> {
use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed};
use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed, ShuttingDown};
if target_timeline.cancel.is_cancelled() {
return Err(ShuttingDown);
}
tracing::debug!(%layer, %end_lsn, "copying lsn prefix");
@@ -529,7 +628,7 @@ pub(super) async fn complete(
match res {
Ok(Some(timeline)) => {
tracing::info!(reparented=%timeline.timeline_id, "reparenting done");
reparented.push(timeline.timeline_id);
reparented.push((timeline.ancestor_lsn, timeline.timeline_id));
}
Ok(None) => {
// lets just ignore this for now. one or all reparented timelines could had
@@ -551,5 +650,12 @@ pub(super) async fn complete(
tracing::info!("failed to reparent some candidates");
}
reparented.sort_unstable();
let reparented = reparented
.into_iter()
.map(|(_, timeline_id)| timeline_id)
.collect();
Ok(reparented)
}

View File

@@ -228,18 +228,20 @@ impl UploadQueue {
Ok(self.initialized_mut().expect("we just set it"))
}
pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
pub(crate) fn initialized_mut(
&mut self,
) -> Result<&mut UploadQueueInitialized, NotInitialized> {
use UploadQueue::*;
match self {
Uninitialized => Err(NotInitialized::Uninitialized.into()),
Uninitialized => Err(NotInitialized::Uninitialized),
Initialized(x) => {
if x.shutting_down {
Err(NotInitialized::ShuttingDown.into())
Err(NotInitialized::ShuttingDown)
} else {
Ok(x)
}
}
Stopped(_) => Err(NotInitialized::Stopped.into()),
Stopped(_) => Err(NotInitialized::Stopped),
}
}

View File

@@ -2830,9 +2830,10 @@ impl Service {
match e {
// no ancestor (ever)
Error::ApiError(StatusCode::CONFLICT, msg) => {
ApiError::Conflict(format!("{node}: {msg}"))
}
Error::ApiError(StatusCode::CONFLICT, msg) => ApiError::Conflict(format!(
"{node}: {}",
msg.strip_prefix("Conflict: ").unwrap_or(&msg)
)),
// too many ancestors
Error::ApiError(StatusCode::BAD_REQUEST, msg) => {
ApiError::BadRequest(anyhow::anyhow!("{node}: {msg}"))
@@ -2859,8 +2860,6 @@ impl Service {
let any = results.pop().expect("we must have at least one response");
// FIXME: the ordering is not stable yet on pageserver, should be (ancestor_lsn,
// TimelineId)
let mismatching = results
.iter()
.filter(|(_, res)| res != &any.1)

View File

@@ -172,6 +172,21 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
if auth_token is not None:
self.headers["Authorization"] = f"Bearer {auth_token}"
def without_status_retrying(self) -> PageserverHttpClient:
retries = Retry(
status=0,
connect=5,
read=False,
backoff_factor=0.2,
status_forcelist=[],
allowed_methods=None,
remove_headers_on_redirect=[],
)
return PageserverHttpClient(
self.port, self.is_testing_enabled_or_skip, self.auth_token, retries
)
@property
def base_url(self) -> str:
return f"http://localhost:{self.port}"
@@ -814,17 +829,19 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
tenant_id: Union[TenantId, TenantShardId],
timeline_id: TimelineId,
batch_size: int | None = None,
) -> Set[TimelineId]:
**kwargs,
) -> List[TimelineId]:
params = {}
if batch_size is not None:
params["batch_size"] = batch_size
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/detach_ancestor",
params=params,
**kwargs,
)
self.verbose_error(res)
json = res.json()
return set(map(TimelineId, json["reparented_timelines"]))
return list(map(TimelineId, json["reparented_timelines"]))
def evict_layer(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str

View File

@@ -1,5 +1,7 @@
import datetime
import enum
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from queue import Empty, Queue
from threading import Barrier
@@ -9,6 +11,7 @@ import pytest
from fixtures.common_types import Lsn, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
LogCursor,
NeonEnvBuilder,
PgBin,
flush_ep_to_pageserver,
@@ -17,7 +20,8 @@ from fixtures.neon_fixtures import (
from fixtures.pageserver.http import HistoricLayerInfo, PageserverApiException
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_timeline_detail_404
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.utils import assert_pageserver_backups_equal
from fixtures.utils import assert_pageserver_backups_equal, wait_until
from requests import ReadTimeout
def by_end_lsn(info: HistoricLayerInfo) -> Lsn:
@@ -161,7 +165,7 @@ def test_ancestor_detach_branched_from(
)
all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert all_reparented == set()
assert all_reparented == []
if restart_after:
env.pageserver.stop()
@@ -270,7 +274,7 @@ def test_ancestor_detach_reparents_earlier(neon_env_builder: NeonEnvBuilder):
after = env.neon_cli.create_branch("after", "main", env.initial_tenant, ancestor_start_lsn=None)
all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert all_reparented == {reparented, same_branchpoint}
assert set(all_reparented) == {reparented, same_branchpoint}
env.pageserver.quiesce_tenants()
@@ -530,7 +534,7 @@ def test_compaction_induced_by_detaches_in_history(
for _, timeline_id in skip_main:
reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert reparented == set(), "we have no earlier branches at any level"
assert reparented == [], "we have no earlier branches at any level"
post_detach_l0s = list(filter(lambda x: x.l0, delta_layers(branch_timeline_id)))
assert len(post_detach_l0s) == 5, "should had inherited 4 L0s, have 5 in total"
@@ -561,7 +565,9 @@ def test_compaction_induced_by_detaches_in_history(
@pytest.mark.parametrize("sharded", [True, False])
def test_timeline_ancestor_detach_errors(neon_env_builder: NeonEnvBuilder, sharded: bool):
def test_timeline_ancestor_detach_idempotent_success(
neon_env_builder: NeonEnvBuilder, sharded: bool
):
shards = 2 if sharded else 1
neon_env_builder.num_pageservers = shards
@@ -579,28 +585,28 @@ def test_timeline_ancestor_detach_errors(neon_env_builder: NeonEnvBuilder, shard
else:
client = env.pageserver.http_client()
with pytest.raises(PageserverApiException, match=".* no ancestors") as info:
client.detach_ancestor(env.initial_tenant, env.initial_timeline)
assert info.value.status_code == 409
first_branch = env.neon_cli.create_branch("first_branch")
second_branch = env.neon_cli.create_branch("second_branch", ancestor_branch_name="first_branch")
# funnily enough this does not have a prefix
with pytest.raises(PageserverApiException, match="too many ancestors") as info:
client.detach_ancestor(env.initial_tenant, second_branch)
assert info.value.status_code == 400
_ = env.neon_cli.create_branch("second_branch", ancestor_branch_name="first_branch")
client.detach_ancestor(env.initial_tenant, first_branch)
# these two will be reparented, and they should be returned in stable order
# from pageservers OR otherwise there will be an `error!` logging from
# storage controller
reparented1 = env.neon_cli.create_branch("first_reparented", ancestor_branch_name="main")
reparented2 = env.neon_cli.create_branch("second_reparented", ancestor_branch_name="main")
first_reparenting_response = client.detach_ancestor(env.initial_tenant, first_branch)
assert set(first_reparenting_response) == {reparented1, reparented2}
# FIXME: this should be done by the http req handler
for ps in pageservers.values():
ps.quiesce_tenants()
with pytest.raises(PageserverApiException, match=".* no ancestors") as info:
client.detach_ancestor(env.initial_tenant, first_branch)
# FIXME: this should be 200 OK because we've already completed it
assert info.value.status_code == 409
for _ in range(5):
# once completed, we can retry this how many times
assert (
client.detach_ancestor(env.initial_tenant, first_branch) == first_reparenting_response
)
client.tenant_delete(env.initial_tenant)
@@ -609,7 +615,50 @@ def test_timeline_ancestor_detach_errors(neon_env_builder: NeonEnvBuilder, shard
assert e.value.status_code == 404
@pytest.mark.parametrize("sharded", [True, False])
def test_timeline_ancestor_detach_errors(neon_env_builder: NeonEnvBuilder, sharded: bool):
# the test is split from test_timeline_ancestor_detach_idempotent_success as only these error cases should create "request was dropped before completing",
# given the current first error handling
shards = 2 if sharded else 1
neon_env_builder.num_pageservers = shards
env = neon_env_builder.init_start(initial_tenant_shard_count=shards if sharded else None)
pageservers = dict((int(p.id), p) for p in env.pageservers)
for ps in pageservers.values():
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
ps.allowed_errors.append(
".* WARN .* path=/v1/tenant/.*/timeline/.*/detach_ancestor request_id=.*: request was dropped before completing"
)
client = (
env.pageserver.http_client() if not sharded else env.storage_controller.pageserver_api()
)
with pytest.raises(PageserverApiException, match=".* no ancestors") as info:
client.detach_ancestor(env.initial_tenant, env.initial_timeline)
assert info.value.status_code == 409
_ = env.neon_cli.create_branch("first_branch")
second_branch = env.neon_cli.create_branch("second_branch", ancestor_branch_name="first_branch")
# funnily enough this does not have a prefix
with pytest.raises(PageserverApiException, match="too many ancestors") as info:
client.detach_ancestor(env.initial_tenant, second_branch)
assert info.value.status_code == 400
def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
"""
Sharded timeline detach ancestor; 4 nodes: 1 stuck, 1 restarted, 2 normal.
Stuck node gets stuck on a pause failpoint for first storage controller request.
Restarted node remains stuck until explicit restart from test code.
We retry the request until storage controller gets 200 OK from all nodes.
"""
branch_name = "soon_detached"
shard_count = 4
neon_env_builder.num_pageservers = shard_count
@@ -621,8 +670,15 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
# FIXME: should this be in the neon_env_builder.init_start?
env.storage_controller.reconcile_until_idle()
# as we will stop a node, make sure there is no clever rebalancing
env.storage_controller.tenant_policy_update(env.initial_tenant, body={"scheduling": "Stop"})
env.storage_controller.allowed_errors.append(".*: Scheduling is disabled by policy Stop .*")
shards = env.storage_controller.locate(env.initial_tenant)
utilized_pageservers = {x["node_id"] for x in shards}
assert len(utilized_pageservers) > 1, "all shards got placed on single pageserver?"
branch_timeline_id = env.neon_cli.create_branch(branch_name, tenant_id=env.initial_tenant)
with env.endpoints.create_start(branch_name, tenant_id=env.initial_tenant) as ep:
@@ -642,7 +698,79 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
assert Lsn(detail["initdb_lsn"]) < lsn
assert TimelineId(detail["ancestor_timeline_id"]) == env.initial_timeline
env.storage_controller.pageserver_api().detach_ancestor(env.initial_tenant, branch_timeline_id)
# make one of the nodes get stuck, but continue the initial operation
# make another of the nodes get stuck, then restart
stuck = pageservers[int(shards[0]["node_id"])]
stuck.allowed_errors.append(".*: request was dropped before completing")
env.storage_controller.allowed_errors.append(".*: request was dropped before completing")
stuck_http = stuck.http_client()
stuck_http.configure_failpoints(
("timeline-detach-ancestor::before_starting_after_locking_pausable", "pause")
)
restarted = pageservers[int(shards[1]["node_id"])]
restarted.allowed_errors.extend(
[
".*: request was dropped before completing",
".*: Cancelled request finished with an error: ShuttingDown",
]
)
assert restarted.id != stuck.id
restarted_http = restarted.http_client()
restarted_http.configure_failpoints(
[
("timeline-detach-ancestor::before_starting_after_locking_pausable", "pause"),
]
)
target = env.storage_controller.pageserver_api()
with pytest.raises(ReadTimeout):
target.detach_ancestor(env.initial_tenant, branch_timeline_id, timeout=1)
stuck_http.configure_failpoints(
("timeline-detach-ancestor::before_starting_after_locking_pausable", "off")
)
barrier = threading.Barrier(2)
def restart_restarted():
barrier.wait()
# graceful shutdown should just work, because simultaneously unpaused
restarted.stop()
# this does not happen always, depends how fast we exit after unpausing
# restarted.assert_log_contains("Cancelled request finished with an error: ShuttingDown")
restarted.start()
with ThreadPoolExecutor(max_workers=1) as pool:
fut = pool.submit(restart_restarted)
barrier.wait()
# we have 10s, lets use 1/2 of that to help the shutdown start
time.sleep(5)
restarted_http.configure_failpoints(
("timeline-detach-ancestor::before_starting_after_locking_pausable", "off")
)
fut.result()
# detach ancestor request handling is not sensitive to http cancellation.
# this means that the "stuck" is on its way to complete the detach, but the restarted is off
# now it can either be complete on all nodes, or still in progress with
# one.
without_retrying = target.without_status_retrying()
# this retry loop will be long enough that the tenant can always activate
reparented = None
for _ in range(10):
try:
reparented = without_retrying.detach_ancestor(env.initial_tenant, branch_timeline_id)
except PageserverApiException as info:
assert info.status_code == 503
time.sleep(2)
else:
break
assert reparented == [], "too many retries (None) or unexpected reparentings"
for shard_info in shards:
node_id = int(shard_info["node_id"])
@@ -661,8 +789,262 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
assert count == 10000
@pytest.mark.parametrize("mode", ["delete_timeline", "delete_tenant"])
@pytest.mark.parametrize("sharded", [False, True])
def test_timeline_detach_ancestor_interrupted_by_deletion(
neon_env_builder: NeonEnvBuilder, mode: str, sharded: bool
):
"""
Timeline ancestor detach interrupted by deleting either:
- the detached timeline
- the whole tenant
after starting the detach.
What remains not tested by this:
- shutdown winning over complete
Shutdown winning over complete needs gc blocking and reparenting any left-overs on retry.
"""
if sharded and mode == "delete_tenant":
# the shared/exclusive lock for tenant is blocking this:
# timeline detach ancestor takes shared, delete tenant takes exclusive
pytest.skip(
"tenant deletion while timeline ancestor detach is underway is not supported yet"
)
shard_count = 2 if sharded else 1
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count if sharded else None)
for ps in env.pageservers:
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
pageservers = dict((int(p.id), p) for p in env.pageservers)
detached_timeline = env.neon_cli.create_branch("detached soon", "main")
failpoint = "timeline-detach-ancestor::before_starting_after_locking_pausable"
env.storage_controller.reconcile_until_idle()
shards = env.storage_controller.locate(env.initial_tenant)
assert len(set(info["node_id"] for info in shards)) == shard_count
target = env.storage_controller.pageserver_api() if sharded else env.pageserver.http_client()
target = target.without_status_retrying()
victim = pageservers[int(shards[-1]["node_id"])]
victim_http = victim.http_client()
victim_http.configure_failpoints((failpoint, "pause"))
def detach_ancestor():
target.detach_ancestor(env.initial_tenant, detached_timeline)
def at_failpoint() -> Tuple[str, LogCursor]:
return victim.assert_log_contains(f"at failpoint {failpoint}")
def start_delete():
if mode == "delete_timeline":
target.timeline_delete(env.initial_tenant, detached_timeline)
elif mode == "delete_tenant":
target.tenant_delete(env.initial_tenant)
else:
raise RuntimeError(f"unimplemented mode {mode}")
def at_waiting_on_gate_close(start_offset: LogCursor) -> LogCursor:
_, offset = victim.assert_log_contains(
"closing is taking longer than expected", offset=start_offset
)
return offset
def is_deleted():
try:
if mode == "delete_timeline":
target.timeline_detail(env.initial_tenant, detached_timeline)
elif mode == "delete_tenant":
target.tenant_status(env.initial_tenant)
else:
return False
except PageserverApiException as e:
assert e.status_code == 404
return True
else:
raise RuntimeError("waiting for 404")
with ThreadPoolExecutor(max_workers=2) as pool:
try:
fut = pool.submit(detach_ancestor)
_, offset = wait_until(10, 1.0, at_failpoint)
delete = pool.submit(start_delete)
wait_until(10, 1.0, lambda: at_waiting_on_gate_close(offset))
victim_http.configure_failpoints((failpoint, "off"))
delete.result()
assert wait_until(10, 1.0, is_deleted), f"unimplemented mode {mode}"
with pytest.raises(PageserverApiException) as exc:
fut.result()
assert exc.value.status_code == 503
finally:
victim_http.configure_failpoints((failpoint, "off"))
@pytest.mark.parametrize("mode", ["delete_reparentable_timeline"])
def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnvBuilder, mode: str):
"""
Technically possible storage controller concurrent interleaving timeline
deletion with timeline detach.
Deletion is fine, as any sharded pageservers reach the same end state, but
creating reparentable timeline would create an issue as the two nodes would
never agree. There is a solution though: the created reparentable timeline
must be detached.
"""
assert (
mode == "delete_reparentable_timeline"
), "only one now, but we could have the create just as well, need gc blocking"
shard_count = 2
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
for ps in env.pageservers:
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
pageservers = dict((int(p.id), p) for p in env.pageservers)
env.storage_controller.reconcile_until_idle()
shards = env.storage_controller.locate(env.initial_tenant)
assert len(set(x["node_id"] for x in shards)) == shard_count
with env.endpoints.create_start("main") as ep:
ep.safe_psql("create table foo as select i::bigint from generate_series(1, 1000) t(i)")
# as the interleaved operation, we will delete this timeline, which was reparenting candidate
first_branch_lsn = wait_for_last_flush_lsn(
env, ep, env.initial_tenant, env.initial_timeline
)
for ps, shard_id in [(pageservers[int(x["node_id"])], x["shard_id"]) for x in shards]:
ps.http_client().timeline_checkpoint(shard_id, env.initial_timeline)
ep.safe_psql("create table bar as select i::bigint from generate_series(1, 2000) t(i)")
detached_branch_lsn = flush_ep_to_pageserver(
env, ep, env.initial_tenant, env.initial_timeline
)
for ps, shard_id in [(pageservers[int(x["node_id"])], x["shard_id"]) for x in shards]:
ps.http_client().timeline_checkpoint(shard_id, env.initial_timeline)
first_branch = env.neon_cli.create_branch(
"first_branch", ancestor_branch_name="main", ancestor_start_lsn=first_branch_lsn
)
detached_branch = env.neon_cli.create_branch(
"detached_branch", ancestor_branch_name="main", ancestor_start_lsn=detached_branch_lsn
)
pausepoint = "timeline-detach-ancestor::before_starting_after_locking_pausable"
stuck = pageservers[int(shards[0]["node_id"])]
stuck_http = stuck.http_client().without_status_retrying()
stuck_http.configure_failpoints((pausepoint, "pause"))
victim = pageservers[int(shards[-1]["node_id"])]
victim_http = victim.http_client().without_status_retrying()
victim_http.configure_failpoints(
(pausepoint, "pause"),
)
# noticed a surprising 409 if the other one would fail instead
# victim_http.configure_failpoints([
# (pausepoint, "pause"),
# ("timeline-detach-ancestor::before_starting_after_locking", "return"),
# ])
# interleaving a create_timeline which could be reparented will produce two
# permanently different reparentings: one node has reparented, other has
# not
#
# with deletion there is no such problem
def detach_timeline():
env.storage_controller.pageserver_api().detach_ancestor(env.initial_tenant, detached_branch)
def paused_at_failpoint():
stuck.assert_log_contains(f"at failpoint {pausepoint}")
victim.assert_log_contains(f"at failpoint {pausepoint}")
def first_completed():
detail = stuck_http.timeline_detail(shards[0]["shard_id"], detached_branch)
log.info(detail)
assert detail.get("ancestor_lsn") is None
def first_branch_gone():
try:
env.storage_controller.pageserver_api().timeline_detail(
env.initial_tenant, first_branch
)
except PageserverApiException as e:
log.info(f"error {e}")
assert e.status_code == 404
else:
log.info("still ok")
raise RuntimeError("not done yet")
with ThreadPoolExecutor(max_workers=1) as pool:
try:
fut = pool.submit(detach_timeline)
wait_until(10, 1.0, paused_at_failpoint)
# let stuck complete
stuck_http.configure_failpoints((pausepoint, "off"))
wait_until(10, 1.0, first_completed)
# if we would let victim fail, for some reason there'd be a 409 response instead of 500
# victim_http.configure_failpoints((pausepoint, "off"))
# with pytest.raises(PageserverApiException, match=".* 500 Internal Server Error failpoint: timeline-detach-ancestor::before_starting_after_locking") as exc:
# fut.result()
# assert exc.value.status_code == 409
env.storage_controller.pageserver_api().timeline_delete(
env.initial_tenant, first_branch
)
victim_http.configure_failpoints((pausepoint, "off"))
wait_until(10, 1.0, first_branch_gone)
# it now passes, and we should get an error messages about mixed reparenting as the stuck still had something to reparent
fut.result()
msg, offset = env.storage_controller.assert_log_contains(
".*/timeline/\\S+/detach_ancestor.*: shards returned different results matching=0 .*"
)
log.info(f"expected error message: {msg}")
env.storage_controller.allowed_errors.append(
".*: shards returned different results matching=0 .*"
)
detach_timeline()
# FIXME: perhaps the above should be automatically retried, if we get mixed results?
not_found = env.storage_controller.log_contains(
".*/timeline/\\S+/detach_ancestor.*: shards returned different results matching=0 .*",
offset=offset,
)
assert not_found is None
finally:
stuck_http.configure_failpoints((pausepoint, "off"))
victim_http.configure_failpoints((pausepoint, "off"))
# TODO:
# - after starting the operation, tenant is deleted
# - after starting the operation, pageserver is shutdown, restarted
# - after starting the operation, bottom-most timeline is deleted, pageserver is restarted, gc is inhibited
# - deletion of reparented while reparenting should fail once, then succeed (?)
@@ -670,9 +1052,5 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
# - investigate: why are layers started at uneven lsn? not just after branching, but in general.
#
# TEST: 1. tad which partially succeeds, one returns 500
# 2. create branch below timeline? or delete timeline below
# 2. create branch below timeline? ~or delete reparented timeline~ (done)
# 3. on retry all should report the same reparented timelines
#
# TEST: 1. tad is started, one node stalls, other restarts
# 2. client timeout before stall over
# 3. on retry with stalled and other being able to proceed