mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
stop with the (ancestor_lsn, timeline_id) ordered reparented
I was thinking of the case where we have multiple reparented at the same ancestor_lsn. But of course, that is not a problem if we compare the reparented as a set...
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use utils::id::TimelineId;
|
||||
|
||||
#[derive(Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct AncestorDetached {
|
||||
pub reparented_timelines: Vec<TimelineId>,
|
||||
pub reparented_timelines: HashSet<TimelineId>,
|
||||
}
|
||||
|
||||
@@ -1832,7 +1832,6 @@ async fn timeline_detach_ancestor_handler(
|
||||
detach_ancestor::Progress::Done(resp) => resp,
|
||||
};
|
||||
|
||||
// FIXME: if the ordering is really needed and not a hashset, move it here?
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
.instrument(span)
|
||||
|
||||
@@ -13,7 +13,7 @@ use pageserver_api::upcall_api::ReAttachResponseTenant;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use std::borrow::Cow;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -1975,7 +1975,7 @@ impl TenantManager {
|
||||
prepared: PreparedTimelineDetach,
|
||||
mut attempt: detach_ancestor::Attempt,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<TimelineId>, anyhow::Error> {
|
||||
) -> Result<HashSet<TimelineId>, anyhow::Error> {
|
||||
// FIXME: this is unnecessary, slotguard already has these semantics
|
||||
struct RevertOnDropSlot(Option<SlotGuard>);
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
|
||||
use super::{layer_manager::LayerManager, FlushLayerError, Timeline};
|
||||
use crate::{
|
||||
@@ -647,16 +647,7 @@ pub(super) async fn prepare(
|
||||
));
|
||||
}
|
||||
|
||||
// `detached` has previously been detached; let's inspect each of the current timelines and
|
||||
// report back the timelines which have been reparented by our detach, or which are still
|
||||
// reparentable
|
||||
|
||||
let reparented_timelines = reparented_direct_children(detached, tenant)?;
|
||||
|
||||
// IDEA? add the non-reparented in to the response -- these would be the reparentable, but
|
||||
// no longer reparentable because they appeared *after* gc blocking was released.
|
||||
//
|
||||
// will not be needed once we have the locking in.
|
||||
return Ok(Progress::Done(AncestorDetached {
|
||||
reparented_timelines,
|
||||
}));
|
||||
@@ -862,7 +853,7 @@ pub(super) async fn prepare(
|
||||
fn reparented_direct_children(
|
||||
detached: &Arc<Timeline>,
|
||||
tenant: &Tenant,
|
||||
) -> Result<Vec<TimelineId>, Error> {
|
||||
) -> Result<HashSet<TimelineId>, Error> {
|
||||
let mut all_direct_children = tenant
|
||||
.timelines
|
||||
.lock()
|
||||
@@ -872,7 +863,7 @@ fn reparented_direct_children(
|
||||
let is_direct_child = matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached));
|
||||
|
||||
if is_direct_child {
|
||||
Some((tl.ancestor_lsn, tl.clone()))
|
||||
Some(tl.clone())
|
||||
} else {
|
||||
if let Some(timeline) = tl.ancestor_timeline.as_ref() {
|
||||
assert_ne!(timeline.timeline_id, detached.timeline_id);
|
||||
@@ -886,34 +877,27 @@ fn reparented_direct_children(
|
||||
|
||||
let mut any_shutdown = false;
|
||||
|
||||
all_direct_children.retain(
|
||||
|(_, tl)| match tl.remote_client.initialized_upload_queue() {
|
||||
Ok(accessor) => accessor
|
||||
.latest_uploaded_index_part()
|
||||
.lineage
|
||||
.is_reparented(),
|
||||
Err(_shutdownalike) => {
|
||||
// not 100% a shutdown, but let's bail early not to give inconsistent results in
|
||||
// sharded enviroment.
|
||||
any_shutdown = true;
|
||||
true
|
||||
}
|
||||
},
|
||||
);
|
||||
all_direct_children.retain(|tl| match tl.remote_client.initialized_upload_queue() {
|
||||
Ok(accessor) => accessor
|
||||
.latest_uploaded_index_part()
|
||||
.lineage
|
||||
.is_reparented(),
|
||||
Err(_shutdownalike) => {
|
||||
// not 100% a shutdown, but let's bail early not to give inconsistent results in
|
||||
// sharded enviroment.
|
||||
any_shutdown = true;
|
||||
true
|
||||
}
|
||||
});
|
||||
|
||||
if any_shutdown {
|
||||
// it could be one or many being deleted; have client retry
|
||||
return Err(Error::ShuttingDown);
|
||||
}
|
||||
|
||||
let mut reparented = all_direct_children;
|
||||
// why this instead of hashset? there is a reason, but I've forgotten it many times.
|
||||
//
|
||||
// maybe if this was a hashset we would not be able to distinguish some race condition.
|
||||
reparented.sort_unstable_by_key(|(lsn, tl)| (*lsn, tl.timeline_id));
|
||||
Ok(reparented
|
||||
Ok(all_direct_children
|
||||
.into_iter()
|
||||
.map(|(_, tl)| tl.timeline_id)
|
||||
.map(|tl| tl.timeline_id)
|
||||
.collect())
|
||||
}
|
||||
|
||||
@@ -1070,7 +1054,7 @@ async fn remote_copy(
|
||||
pub(crate) enum DetachingAndReparenting {
|
||||
/// All of the following timeline ids were reparented and the timeline ancestor detach must be
|
||||
/// marked as completed.
|
||||
Reparented(Vec<TimelineId>),
|
||||
Reparented(HashSet<TimelineId>),
|
||||
|
||||
/// Some of the reparentings failed. The timeline ancestor detach must **not** be marked as
|
||||
/// completed.
|
||||
@@ -1080,7 +1064,7 @@ pub(crate) enum DetachingAndReparenting {
|
||||
|
||||
/// Detaching and reparentings were completed in a previous attempt. Timeline ancestor detach
|
||||
/// must be marked as completed.
|
||||
AlreadyDone(Vec<TimelineId>),
|
||||
AlreadyDone(HashSet<TimelineId>),
|
||||
}
|
||||
|
||||
impl DetachingAndReparenting {
|
||||
@@ -1093,7 +1077,7 @@ impl DetachingAndReparenting {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn completed(self) -> Option<Vec<TimelineId>> {
|
||||
pub(crate) fn completed(self) -> Option<HashSet<TimelineId>> {
|
||||
use DetachingAndReparenting::*;
|
||||
match self {
|
||||
Reparented(x) | AlreadyDone(x) => Some(x),
|
||||
|
||||
@@ -2868,7 +2868,6 @@ impl Service {
|
||||
}
|
||||
|
||||
// no shard needs to go first/last; the operation should be idempotent
|
||||
// TODO: it would be great to ensure that all shards return the same error
|
||||
let mut results = self
|
||||
.tenant_for_shards(targets, |tenant_shard_id, node| {
|
||||
futures::FutureExt::boxed(detach_one(
|
||||
@@ -2887,6 +2886,7 @@ impl Service {
|
||||
.filter(|(_, res)| res != &any.1)
|
||||
.collect::<Vec<_>>();
|
||||
if !mismatching.is_empty() {
|
||||
// this can be hit by races which should not happen because operation lock on cplane
|
||||
let matching = results.len() - mismatching.len();
|
||||
tracing::error!(
|
||||
matching,
|
||||
|
||||
@@ -837,7 +837,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
timeline_id: TimelineId,
|
||||
batch_size: int | None = None,
|
||||
**kwargs,
|
||||
) -> List[TimelineId]:
|
||||
) -> Set[TimelineId]:
|
||||
params = {}
|
||||
if batch_size is not None:
|
||||
params["batch_size"] = batch_size
|
||||
@@ -848,7 +848,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
)
|
||||
self.verbose_error(res)
|
||||
json = res.json()
|
||||
return list(map(TimelineId, json["reparented_timelines"]))
|
||||
return set(map(TimelineId, json["reparented_timelines"]))
|
||||
|
||||
def evict_layer(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str
|
||||
|
||||
@@ -165,7 +165,7 @@ def test_ancestor_detach_branched_from(
|
||||
)
|
||||
|
||||
all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
|
||||
assert all_reparented == []
|
||||
assert all_reparented == set()
|
||||
|
||||
if restart_after:
|
||||
env.pageserver.stop()
|
||||
@@ -534,7 +534,7 @@ def test_compaction_induced_by_detaches_in_history(
|
||||
|
||||
for _, timeline_id in skip_main:
|
||||
reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
|
||||
assert reparented == [], "we have no earlier branches at any level"
|
||||
assert reparented == set(), "we have no earlier branches at any level"
|
||||
|
||||
post_detach_l0s = list(filter(lambda x: x.l0, delta_layers(branch_timeline_id)))
|
||||
assert len(post_detach_l0s) == 5, "should had inherited 4 L0s, have 5 in total"
|
||||
@@ -774,7 +774,7 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
|
||||
else:
|
||||
break
|
||||
|
||||
assert reparented == [], "too many retries (None) or unexpected reparentings"
|
||||
assert reparented == set(), "too many retries (None) or unexpected reparentings"
|
||||
|
||||
for shard_info in shards:
|
||||
node_id = int(shard_info["node_id"])
|
||||
@@ -1203,7 +1203,7 @@ def test_retried_detach_ancestor_after_failed_reparenting(neon_env_builder: Neon
|
||||
http.configure_failpoints(("timeline-detach-ancestor::complete_before_uploading", "off"))
|
||||
|
||||
reparented_resp = http.detach_ancestor(env.initial_tenant, detached)
|
||||
assert reparented_resp == timelines
|
||||
assert reparented_resp == set(timelines)
|
||||
# no need to quiesce_tenants anymore, because completion does that
|
||||
|
||||
reparented, not_reparented = reparenting_progress(timelines)
|
||||
|
||||
Reference in New Issue
Block a user