diff --git a/libs/pageserver_api/src/models/detach_ancestor.rs b/libs/pageserver_api/src/models/detach_ancestor.rs index ae5a21bab9..ad74d343ae 100644 --- a/libs/pageserver_api/src/models/detach_ancestor.rs +++ b/libs/pageserver_api/src/models/detach_ancestor.rs @@ -1,6 +1,8 @@ +use std::collections::HashSet; + use utils::id::TimelineId; #[derive(Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)] pub struct AncestorDetached { - pub reparented_timelines: Vec, + pub reparented_timelines: HashSet, } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 7901fc3554..3f592f167e 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -13,7 +13,7 @@ use pageserver_api::upcall_api::ReAttachResponseTenant; use rand::{distributions::Alphanumeric, Rng}; use std::borrow::Cow; use std::cmp::Ordering; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; @@ -1966,7 +1966,8 @@ impl TenantManager { timeline_id: TimelineId, prepared: PreparedTimelineDetach, ctx: &RequestContext, - ) -> Result, anyhow::Error> { + ) -> Result, anyhow::Error> { + // FIXME: this is unnecessary, slotguard already has these semantics struct RevertOnDropSlot(Option); impl Drop for RevertOnDropSlot { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 8f9ff78fd8..76dcb5645f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3286,10 +3286,6 @@ impl Timeline { Ok(ancestor.clone()) } - pub(crate) fn get_ancestor_timeline(&self) -> Option> { - self.ancestor_timeline.clone() - } - pub(crate) fn get_shard_identity(&self) -> &ShardIdentity { &self.shard_identity } @@ -4366,7 +4362,7 @@ impl Timeline { tenant: &crate::tenant::Tenant, prepared: detach_ancestor::PreparedTimelineDetach, ctx: &RequestContext, - ) -> Result, anyhow::Error> { + ) -> Result, anyhow::Error> { detach_ancestor::complete(self, tenant, prepared, ctx).await } diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index 7f63b53e86..3b52adc77b 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use super::{layer_manager::LayerManager, FlushLayerError, Timeline}; use crate::{ @@ -146,50 +146,9 @@ pub(super) async fn prepare( } } - // detached has previously been detached; let's inspect each of the current timelines and - // report back the timelines which have been reparented by our detach - let mut all_direct_children = tenant - .timelines - .lock() - .unwrap() - .values() - .filter(|tl| matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached))) - .map(|tl| (tl.ancestor_lsn, tl.clone())) - .collect::>(); - - let mut any_shutdown = false; - - all_direct_children.retain( - |(_, tl)| match tl.remote_client.initialized_upload_queue() { - Ok(accessor) => accessor - .latest_uploaded_index_part() - .lineage - .is_reparented(), - Err(_shutdownalike) => { - // not 100% a shutdown, but let's bail early not to give inconsistent results in - // sharded enviroment. - any_shutdown = true; - true - } - }, - ); - - if any_shutdown { - // it could be one or many being deleted; have client retry - return Err(Error::ShuttingDown); - } - - let mut reparented = all_direct_children; - // why this instead of hashset? there is a reason, but I've forgotten it many times. - // - // maybe if this was a hashset we would not be able to distinguish some race condition. - reparented.sort_unstable_by_key(|(lsn, tl)| (*lsn, tl.timeline_id)); - + let reparented_timelines = reparented_direct_children(detached, tenant)?; return Ok(Progress::Done(AncestorDetached { - reparented_timelines: reparented - .into_iter() - .map(|(_, tl)| tl.timeline_id) - .collect(), + reparented_timelines, })); }; @@ -386,6 +345,57 @@ pub(super) async fn prepare( Ok(Progress::Prepared(guard, prepared)) } +fn reparented_direct_children( + detached: &Arc, + tenant: &Tenant, +) -> Result, Error> { + let mut all_direct_children = tenant + .timelines + .lock() + .unwrap() + .values() + .filter_map(|tl| { + let is_direct_child = matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached)); + + if is_direct_child { + Some(tl.clone()) + } else { + if let Some(timeline) = tl.ancestor_timeline.as_ref() { + assert_ne!(timeline.timeline_id, detached.timeline_id, "we cannot have two timelines with the same timeline_id live"); + } + None + } + }) + // Collect to avoid lock taking order problem with Tenant::timelines and + // Timeline::remote_client + .collect::>(); + + let mut any_shutdown = false; + + all_direct_children.retain(|tl| match tl.remote_client.initialized_upload_queue() { + Ok(accessor) => accessor + .latest_uploaded_index_part() + .lineage + .is_reparented(), + Err(_shutdownalike) => { + // not 100% a shutdown, but let's bail early not to give inconsistent results in + // sharded enviroment. + any_shutdown = true; + true + } + }); + + if any_shutdown { + // it could be one or many being deleted; have client retry + return Err(Error::ShuttingDown); + } + + Ok(all_direct_children + .into_iter() + .map(|tl| tl.timeline_id) + .collect()) +} + fn partition_work( ancestor_lsn: Lsn, source: &LayerManager, @@ -544,11 +554,12 @@ pub(super) async fn complete( tenant: &Tenant, prepared: PreparedTimelineDetach, _ctx: &RequestContext, -) -> Result, anyhow::Error> { +) -> Result, anyhow::Error> { let PreparedTimelineDetach { layers } = prepared; let ancestor = detached - .get_ancestor_timeline() + .ancestor_timeline + .as_ref() .expect("must still have a ancestor"); let ancestor_lsn = detached.get_ancestor_lsn(); @@ -588,7 +599,7 @@ pub(super) async fn complete( } let tl_ancestor = tl.ancestor_timeline.as_ref()?; - let is_same = Arc::ptr_eq(&ancestor, tl_ancestor); + let is_same = Arc::ptr_eq(ancestor, tl_ancestor); let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn; let is_deleting = tl @@ -629,13 +640,18 @@ pub(super) async fn complete( }); let reparenting_candidates = tasks.len(); - let mut reparented = Vec::with_capacity(tasks.len()); + let mut reparented = HashSet::with_capacity(tasks.len()); while let Some(res) = tasks.join_next().await { match res { Ok(Some(timeline)) => { tracing::info!(reparented=%timeline.timeline_id, "reparenting done"); - reparented.push((timeline.ancestor_lsn, timeline.timeline_id)); + + assert!( + reparented.insert(timeline.timeline_id), + "duplicate reparenting? timeline_id={}", + timeline.timeline_id + ); } Ok(None) => { // lets just ignore this for now. one or all reparented timelines could had @@ -657,12 +673,5 @@ pub(super) async fn complete( tracing::info!("failed to reparent some candidates"); } - reparented.sort_unstable(); - - let reparented = reparented - .into_iter() - .map(|(_, timeline_id)| timeline_id) - .collect(); - Ok(reparented) } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 6940bf2c64..e391ce65e6 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -2954,7 +2954,6 @@ impl Service { } // no shard needs to go first/last; the operation should be idempotent - // TODO: it would be great to ensure that all shards return the same error let mut results = self .tenant_for_shards(targets, |tenant_shard_id, node| { futures::FutureExt::boxed(detach_one( @@ -2973,6 +2972,7 @@ impl Service { .filter(|(_, res)| res != &any.1) .collect::>(); if !mismatching.is_empty() { + // this can be hit by races which should not happen because operation lock on cplane let matching = results.len() - mismatching.len(); tracing::error!( matching, diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 5be59d3749..65d6ff5d62 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -857,7 +857,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): timeline_id: TimelineId, batch_size: int | None = None, **kwargs, - ) -> List[TimelineId]: + ) -> Set[TimelineId]: params = {} if batch_size is not None: params["batch_size"] = batch_size @@ -868,7 +868,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): ) self.verbose_error(res) json = res.json() - return list(map(TimelineId, json["reparented_timelines"])) + return set(map(TimelineId, json["reparented_timelines"])) def evict_layer( self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str diff --git a/test_runner/regress/test_timeline_detach_ancestor.py b/test_runner/regress/test_timeline_detach_ancestor.py index 38f8dfa885..b3767a2766 100644 --- a/test_runner/regress/test_timeline_detach_ancestor.py +++ b/test_runner/regress/test_timeline_detach_ancestor.py @@ -165,7 +165,7 @@ def test_ancestor_detach_branched_from( ) all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id) - assert all_reparented == [] + assert all_reparented == set() if restart_after: env.pageserver.stop() @@ -534,7 +534,7 @@ def test_compaction_induced_by_detaches_in_history( for _, timeline_id in skip_main: reparented = client.detach_ancestor(env.initial_tenant, timeline_id) - assert reparented == [], "we have no earlier branches at any level" + assert reparented == set(), "we have no earlier branches at any level" post_detach_l0s = list(filter(lambda x: x.l0, delta_layers(branch_timeline_id))) assert len(post_detach_l0s) == 5, "should had inherited 4 L0s, have 5 in total" @@ -774,7 +774,7 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder): else: break - assert reparented == [], "too many retries (None) or unexpected reparentings" + assert reparented == set(), "too many retries (None) or unexpected reparentings" for shard_info in shards: node_id = int(shard_info["node_id"])