mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
refactor(timeline_detach_ancestor): replace ordered reparented with a hashset (#8629)
Earlier I was thinking we'd need a (ancestor_lsn, timeline_id) ordered list of reparented. Turns out we did not need it at all. Replace it with an unordered hashset. Additionally refactor the reparented direct children query out, it will later be used from more places. Split off from #8430. Cc: #6994
This commit is contained in:
committed by
John Spray
parent
658d763915
commit
40e3c913bb
@@ -1,6 +1,8 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use utils::id::TimelineId;
|
||||
|
||||
#[derive(Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct AncestorDetached {
|
||||
pub reparented_timelines: Vec<TimelineId>,
|
||||
pub reparented_timelines: HashSet<TimelineId>,
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ use pageserver_api::upcall_api::ReAttachResponseTenant;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use std::borrow::Cow;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -1966,7 +1966,8 @@ impl TenantManager {
|
||||
timeline_id: TimelineId,
|
||||
prepared: PreparedTimelineDetach,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<TimelineId>, anyhow::Error> {
|
||||
) -> Result<HashSet<TimelineId>, anyhow::Error> {
|
||||
// FIXME: this is unnecessary, slotguard already has these semantics
|
||||
struct RevertOnDropSlot(Option<SlotGuard>);
|
||||
|
||||
impl Drop for RevertOnDropSlot {
|
||||
|
||||
@@ -3286,10 +3286,6 @@ impl Timeline {
|
||||
Ok(ancestor.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn get_ancestor_timeline(&self) -> Option<Arc<Timeline>> {
|
||||
self.ancestor_timeline.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
|
||||
&self.shard_identity
|
||||
}
|
||||
@@ -4366,7 +4362,7 @@ impl Timeline {
|
||||
tenant: &crate::tenant::Tenant,
|
||||
prepared: detach_ancestor::PreparedTimelineDetach,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<TimelineId>, anyhow::Error> {
|
||||
) -> Result<HashSet<TimelineId>, anyhow::Error> {
|
||||
detach_ancestor::complete(self, tenant, prepared, ctx).await
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
|
||||
use super::{layer_manager::LayerManager, FlushLayerError, Timeline};
|
||||
use crate::{
|
||||
@@ -146,50 +146,9 @@ pub(super) async fn prepare(
|
||||
}
|
||||
}
|
||||
|
||||
// detached has previously been detached; let's inspect each of the current timelines and
|
||||
// report back the timelines which have been reparented by our detach
|
||||
let mut all_direct_children = tenant
|
||||
.timelines
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.filter(|tl| matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached)))
|
||||
.map(|tl| (tl.ancestor_lsn, tl.clone()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut any_shutdown = false;
|
||||
|
||||
all_direct_children.retain(
|
||||
|(_, tl)| match tl.remote_client.initialized_upload_queue() {
|
||||
Ok(accessor) => accessor
|
||||
.latest_uploaded_index_part()
|
||||
.lineage
|
||||
.is_reparented(),
|
||||
Err(_shutdownalike) => {
|
||||
// not 100% a shutdown, but let's bail early not to give inconsistent results in
|
||||
// sharded enviroment.
|
||||
any_shutdown = true;
|
||||
true
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
if any_shutdown {
|
||||
// it could be one or many being deleted; have client retry
|
||||
return Err(Error::ShuttingDown);
|
||||
}
|
||||
|
||||
let mut reparented = all_direct_children;
|
||||
// why this instead of hashset? there is a reason, but I've forgotten it many times.
|
||||
//
|
||||
// maybe if this was a hashset we would not be able to distinguish some race condition.
|
||||
reparented.sort_unstable_by_key(|(lsn, tl)| (*lsn, tl.timeline_id));
|
||||
|
||||
let reparented_timelines = reparented_direct_children(detached, tenant)?;
|
||||
return Ok(Progress::Done(AncestorDetached {
|
||||
reparented_timelines: reparented
|
||||
.into_iter()
|
||||
.map(|(_, tl)| tl.timeline_id)
|
||||
.collect(),
|
||||
reparented_timelines,
|
||||
}));
|
||||
};
|
||||
|
||||
@@ -386,6 +345,57 @@ pub(super) async fn prepare(
|
||||
Ok(Progress::Prepared(guard, prepared))
|
||||
}
|
||||
|
||||
fn reparented_direct_children(
|
||||
detached: &Arc<Timeline>,
|
||||
tenant: &Tenant,
|
||||
) -> Result<HashSet<TimelineId>, Error> {
|
||||
let mut all_direct_children = tenant
|
||||
.timelines
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.filter_map(|tl| {
|
||||
let is_direct_child = matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached));
|
||||
|
||||
if is_direct_child {
|
||||
Some(tl.clone())
|
||||
} else {
|
||||
if let Some(timeline) = tl.ancestor_timeline.as_ref() {
|
||||
assert_ne!(timeline.timeline_id, detached.timeline_id, "we cannot have two timelines with the same timeline_id live");
|
||||
}
|
||||
None
|
||||
}
|
||||
})
|
||||
// Collect to avoid lock taking order problem with Tenant::timelines and
|
||||
// Timeline::remote_client
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut any_shutdown = false;
|
||||
|
||||
all_direct_children.retain(|tl| match tl.remote_client.initialized_upload_queue() {
|
||||
Ok(accessor) => accessor
|
||||
.latest_uploaded_index_part()
|
||||
.lineage
|
||||
.is_reparented(),
|
||||
Err(_shutdownalike) => {
|
||||
// not 100% a shutdown, but let's bail early not to give inconsistent results in
|
||||
// sharded enviroment.
|
||||
any_shutdown = true;
|
||||
true
|
||||
}
|
||||
});
|
||||
|
||||
if any_shutdown {
|
||||
// it could be one or many being deleted; have client retry
|
||||
return Err(Error::ShuttingDown);
|
||||
}
|
||||
|
||||
Ok(all_direct_children
|
||||
.into_iter()
|
||||
.map(|tl| tl.timeline_id)
|
||||
.collect())
|
||||
}
|
||||
|
||||
fn partition_work(
|
||||
ancestor_lsn: Lsn,
|
||||
source: &LayerManager,
|
||||
@@ -544,11 +554,12 @@ pub(super) async fn complete(
|
||||
tenant: &Tenant,
|
||||
prepared: PreparedTimelineDetach,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<Vec<TimelineId>, anyhow::Error> {
|
||||
) -> Result<HashSet<TimelineId>, anyhow::Error> {
|
||||
let PreparedTimelineDetach { layers } = prepared;
|
||||
|
||||
let ancestor = detached
|
||||
.get_ancestor_timeline()
|
||||
.ancestor_timeline
|
||||
.as_ref()
|
||||
.expect("must still have a ancestor");
|
||||
let ancestor_lsn = detached.get_ancestor_lsn();
|
||||
|
||||
@@ -588,7 +599,7 @@ pub(super) async fn complete(
|
||||
}
|
||||
|
||||
let tl_ancestor = tl.ancestor_timeline.as_ref()?;
|
||||
let is_same = Arc::ptr_eq(&ancestor, tl_ancestor);
|
||||
let is_same = Arc::ptr_eq(ancestor, tl_ancestor);
|
||||
let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
|
||||
|
||||
let is_deleting = tl
|
||||
@@ -629,13 +640,18 @@ pub(super) async fn complete(
|
||||
});
|
||||
|
||||
let reparenting_candidates = tasks.len();
|
||||
let mut reparented = Vec::with_capacity(tasks.len());
|
||||
let mut reparented = HashSet::with_capacity(tasks.len());
|
||||
|
||||
while let Some(res) = tasks.join_next().await {
|
||||
match res {
|
||||
Ok(Some(timeline)) => {
|
||||
tracing::info!(reparented=%timeline.timeline_id, "reparenting done");
|
||||
reparented.push((timeline.ancestor_lsn, timeline.timeline_id));
|
||||
|
||||
assert!(
|
||||
reparented.insert(timeline.timeline_id),
|
||||
"duplicate reparenting? timeline_id={}",
|
||||
timeline.timeline_id
|
||||
);
|
||||
}
|
||||
Ok(None) => {
|
||||
// lets just ignore this for now. one or all reparented timelines could had
|
||||
@@ -657,12 +673,5 @@ pub(super) async fn complete(
|
||||
tracing::info!("failed to reparent some candidates");
|
||||
}
|
||||
|
||||
reparented.sort_unstable();
|
||||
|
||||
let reparented = reparented
|
||||
.into_iter()
|
||||
.map(|(_, timeline_id)| timeline_id)
|
||||
.collect();
|
||||
|
||||
Ok(reparented)
|
||||
}
|
||||
|
||||
@@ -2954,7 +2954,6 @@ impl Service {
|
||||
}
|
||||
|
||||
// no shard needs to go first/last; the operation should be idempotent
|
||||
// TODO: it would be great to ensure that all shards return the same error
|
||||
let mut results = self
|
||||
.tenant_for_shards(targets, |tenant_shard_id, node| {
|
||||
futures::FutureExt::boxed(detach_one(
|
||||
@@ -2973,6 +2972,7 @@ impl Service {
|
||||
.filter(|(_, res)| res != &any.1)
|
||||
.collect::<Vec<_>>();
|
||||
if !mismatching.is_empty() {
|
||||
// this can be hit by races which should not happen because operation lock on cplane
|
||||
let matching = results.len() - mismatching.len();
|
||||
tracing::error!(
|
||||
matching,
|
||||
|
||||
@@ -857,7 +857,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
timeline_id: TimelineId,
|
||||
batch_size: int | None = None,
|
||||
**kwargs,
|
||||
) -> List[TimelineId]:
|
||||
) -> Set[TimelineId]:
|
||||
params = {}
|
||||
if batch_size is not None:
|
||||
params["batch_size"] = batch_size
|
||||
@@ -868,7 +868,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
)
|
||||
self.verbose_error(res)
|
||||
json = res.json()
|
||||
return list(map(TimelineId, json["reparented_timelines"]))
|
||||
return set(map(TimelineId, json["reparented_timelines"]))
|
||||
|
||||
def evict_layer(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str
|
||||
|
||||
@@ -165,7 +165,7 @@ def test_ancestor_detach_branched_from(
|
||||
)
|
||||
|
||||
all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
|
||||
assert all_reparented == []
|
||||
assert all_reparented == set()
|
||||
|
||||
if restart_after:
|
||||
env.pageserver.stop()
|
||||
@@ -534,7 +534,7 @@ def test_compaction_induced_by_detaches_in_history(
|
||||
|
||||
for _, timeline_id in skip_main:
|
||||
reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
|
||||
assert reparented == [], "we have no earlier branches at any level"
|
||||
assert reparented == set(), "we have no earlier branches at any level"
|
||||
|
||||
post_detach_l0s = list(filter(lambda x: x.l0, delta_layers(branch_timeline_id)))
|
||||
assert len(post_detach_l0s) == 5, "should had inherited 4 L0s, have 5 in total"
|
||||
@@ -774,7 +774,7 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
|
||||
else:
|
||||
break
|
||||
|
||||
assert reparented == [], "too many retries (None) or unexpected reparentings"
|
||||
assert reparented == set(), "too many retries (None) or unexpected reparentings"
|
||||
|
||||
for shard_info in shards:
|
||||
node_id = int(shard_info["node_id"])
|
||||
|
||||
Reference in New Issue
Block a user