diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index a300c8464f..b34ed947c0 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -301,7 +301,12 @@ pub struct PullTimelineRequest { pub tenant_id: TenantId, pub timeline_id: TimelineId, pub http_hosts: Vec, - pub ignore_tombstone: Option, + /// Membership configuration to switch to after pull. + /// It guarantees that if pull_timeline returns successfully, the timeline will + /// not be deleted by request with an older generation. + /// Storage controller always sets this field. + /// None is only allowed for manual pull_timeline requests. + pub mconf: Option, } #[derive(Debug, Serialize, Deserialize)] diff --git a/safekeeper/src/copy_timeline.rs b/safekeeper/src/copy_timeline.rs index 7984c2e2b9..1ab6246206 100644 --- a/safekeeper/src/copy_timeline.rs +++ b/safekeeper/src/copy_timeline.rs @@ -161,9 +161,9 @@ pub async fn handle_request( FileStorage::create_new(&tli_dir_path, new_state.clone(), conf.no_sync).await?; // now we have a ready timeline in a temp directory - validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?; + validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path, None).await?; global_timelines - .load_temp_timeline(request.destination_ttid, &tli_dir_path, true) + .load_temp_timeline(request.destination_ttid, &tli_dir_path, None) .await?; Ok(()) diff --git a/safekeeper/src/hadron.rs b/safekeeper/src/hadron.rs index 8c6a912166..f41fe2512d 100644 --- a/safekeeper/src/hadron.rs +++ b/safekeeper/src/hadron.rs @@ -193,7 +193,7 @@ pub async fn hcc_pull_timeline( tenant_id: timeline.tenant_id, timeline_id: timeline.timeline_id, http_hosts: Vec::new(), - ignore_tombstone: None, + mconf: None, }; for host in timeline.peers { if host.0 == conf.my_id.0 { diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index c9d8e7d3b0..9f4c7141ec 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -352,7 +352,7 @@ async fn timeline_exclude_handler(mut request: Request) -> Result, wait_for_peer_timeline_status: bool, ) -> Result { + if let Some(mconf) = &request.mconf { + let sk_id = global_timelines.get_sk_id(); + if !mconf.contains(sk_id) { + return Err(ApiError::BadRequest(anyhow!( + "refused to pull timeline with {mconf}, node {sk_id} is not member of it", + ))); + } + } + let existing_tli = global_timelines.get(TenantTimelineId::new( request.tenant_id, request.timeline_id, )); - if existing_tli.is_ok() { - info!("Timeline {} already exists", request.timeline_id); + if let Ok(timeline) = existing_tli { + let cur_generation = timeline + .read_shared_state() + .await + .sk + .state() + .mconf + .generation; + + info!( + "Timeline {} already exists with generation {cur_generation}", + request.timeline_id, + ); + + if let Some(mconf) = request.mconf { + timeline + .membership_switch(mconf) + .await + .map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?; + } + return Ok(PullTimelineResponse { safekeeper_host: None, }); @@ -495,6 +523,19 @@ pub async fn handle_request( for (i, response) in responses.into_iter().enumerate() { match response { Ok(status) => { + if let Some(mconf) = &request.mconf { + if status.mconf.generation > mconf.generation { + // We probably raced with another timeline membership change with higher generation. + // Ignore this request. + return Err(ApiError::Conflict(format!( + "cannot pull timeline with generation {}: timeline {} already exists with generation {} on {}", + mconf.generation, + request.timeline_id, + status.mconf.generation, + http_hosts[i], + ))); + } + } statuses.push((status, i)); } Err(e) => { @@ -593,15 +634,13 @@ pub async fn handle_request( assert!(status.tenant_id == request.tenant_id); assert!(status.timeline_id == request.timeline_id); - let check_tombstone = !request.ignore_tombstone.unwrap_or_default(); - match pull_timeline( status, safekeeper_host, sk_auth_token, http_client, global_timelines, - check_tombstone, + request.mconf, ) .await { @@ -611,6 +650,10 @@ pub async fn handle_request( Some(TimelineError::AlreadyExists(_)) => Ok(PullTimelineResponse { safekeeper_host: None, }), + Some(TimelineError::Deleted(_)) => Err(ApiError::Conflict(format!( + "Timeline {}/{} deleted", + request.tenant_id, request.timeline_id + ))), Some(TimelineError::CreationInProgress(_)) => { // We don't return success here because creation might still fail. Err(ApiError::Conflict("Creation in progress".to_owned())) @@ -627,7 +670,7 @@ async fn pull_timeline( sk_auth_token: Option, http_client: reqwest::Client, global_timelines: Arc, - check_tombstone: bool, + mconf: Option, ) -> Result { let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id); info!( @@ -689,8 +732,11 @@ async fn pull_timeline( // fsync temp timeline directory to remember its contents. fsync_async_opt(&tli_dir_path, !conf.no_sync).await?; + let generation = mconf.as_ref().map(|c| c.generation); + // Let's create timeline from temp directory and verify that it's correct - let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?; + let (commit_lsn, flush_lsn) = + validate_temp_timeline(conf, ttid, &tli_dir_path, generation).await?; info!( "finished downloading timeline {}, commit_lsn={}, flush_lsn={}", ttid, commit_lsn, flush_lsn @@ -698,10 +744,20 @@ async fn pull_timeline( assert!(status.commit_lsn <= status.flush_lsn); // Finally, load the timeline. - let _tli = global_timelines - .load_temp_timeline(ttid, &tli_dir_path, check_tombstone) + let timeline = global_timelines + .load_temp_timeline(ttid, &tli_dir_path, generation) .await?; + if let Some(mconf) = mconf { + // Switch to provided mconf to guarantee that the timeline will not + // be deleted by request with older generation. + // The generation might already be higer than the one in mconf, e.g. + // if another membership_switch request was executed between `load_temp_timeline` + // and `membership_switch`, but that's totaly fine. `membership_switch` will + // ignore switch to older generation. + timeline.membership_switch(mconf).await?; + } + Ok(PullTimelineResponse { safekeeper_host: Some(host), }) diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 09ca041e22..6c658d30fb 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -1026,6 +1026,13 @@ where self.state.finish_change(&state).await?; } + if msg.mconf.generation > self.state.mconf.generation && !msg.mconf.contains(self.node_id) { + bail!( + "refused to switch into {}, node {} is not a member of it", + msg.mconf, + self.node_id, + ); + } // Switch into conf given by proposer conf if it is higher. self.state.membership_switch(msg.mconf.clone()).await?; diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index b8774b30ea..43b5b3a8d3 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -594,7 +594,7 @@ impl Timeline { /// Cancel the timeline, requesting background activity to stop. Closing /// the `self.gate` waits for that. - pub async fn cancel(&self) { + pub fn cancel(&self) { info!("timeline {} shutting down", self.ttid); self.cancel.cancel(); } @@ -914,6 +914,13 @@ impl Timeline { to: Configuration, ) -> Result { let mut state = self.write_shared_state().await; + // Ensure we don't race with exclude/delete requests by checking the cancellation + // token under the write_shared_state lock. + // Exclude/delete cancel the timeline under the shared state lock, + // so the timeline cannot be deleted in the middle of the membership switch. + if self.is_cancelled() { + bail!(TimelineError::Cancelled(self.ttid)); + } state.sk.membership_switch(to).await } diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index a81a7298a9..f63d1abdcf 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -10,13 +10,13 @@ use std::time::{Duration, Instant}; use anyhow::{Context, Result, bail}; use camino::Utf8PathBuf; use camino_tempfile::Utf8TempDir; -use safekeeper_api::membership::Configuration; +use safekeeper_api::membership::{Configuration, SafekeeperGeneration}; use safekeeper_api::models::{SafekeeperUtilization, TimelineDeleteResult}; use safekeeper_api::{ServerInfo, membership}; use tokio::fs; use tracing::*; use utils::crashsafe::{durable_rename, fsync_async_opt}; -use utils::id::{TenantId, TenantTimelineId, TimelineId}; +use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId}; use utils::lsn::Lsn; use crate::defaults::DEFAULT_EVICTION_CONCURRENCY; @@ -40,10 +40,17 @@ enum GlobalMapTimeline { struct GlobalTimelinesState { timelines: HashMap, - // A tombstone indicates this timeline used to exist has been deleted. These are used to prevent - // on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as - // this map is dropped on restart. - tombstones: HashMap, + /// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent + /// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as + /// this map is dropped on restart. + /// The timeline might also be locally deleted (excluded) via safekeeper migration algorithm. In that case, + /// the tombsone contains the corresponding safekeeper generation. The pull_timeline requests with + /// higher generation ignore such tombstones and can recreate the timeline. + timeline_tombstones: HashMap, + /// A tombstone indicates that the tenant used to exist has been deleted. + /// These are created only by tenant_delete requests. They are always valid regardless of the + /// request generation. + /// This is only soft-enforced, as this map is dropped on restart. tenant_tombstones: HashMap, conf: Arc, @@ -79,7 +86,7 @@ impl GlobalTimelinesState { Err(TimelineError::CreationInProgress(*ttid)) } None => { - if self.has_tombstone(ttid) { + if self.has_tombstone(ttid, None) { Err(TimelineError::Deleted(*ttid)) } else { Err(TimelineError::NotFound(*ttid)) @@ -88,20 +95,46 @@ impl GlobalTimelinesState { } } - fn has_tombstone(&self, ttid: &TenantTimelineId) -> bool { - self.tombstones.contains_key(ttid) || self.tenant_tombstones.contains_key(&ttid.tenant_id) + fn has_timeline_tombstone( + &self, + ttid: &TenantTimelineId, + generation: Option, + ) -> bool { + if let Some(generation) = generation { + self.timeline_tombstones + .get(ttid) + .is_some_and(|t| t.is_valid(generation)) + } else { + self.timeline_tombstones.contains_key(ttid) + } } - /// Removes all blocking tombstones for the given timeline ID. + fn has_tenant_tombstone(&self, tenant_id: &TenantId) -> bool { + self.tenant_tombstones.contains_key(tenant_id) + } + + /// Check if the state has a tenant or a timeline tombstone. + /// If `generation` is provided, check only for timeline tombsotnes with same or higher generation. + /// If `generation` is `None`, check for any timeline tombstone. + /// Tenant tombstones are checked regardless of the generation. + fn has_tombstone( + &self, + ttid: &TenantTimelineId, + generation: Option, + ) -> bool { + self.has_timeline_tombstone(ttid, generation) || self.has_tenant_tombstone(&ttid.tenant_id) + } + + /// Removes timeline tombstone for the given timeline ID. /// Returns `true` if there have been actual changes. - fn remove_tombstone(&mut self, ttid: &TenantTimelineId) -> bool { - self.tombstones.remove(ttid).is_some() - || self.tenant_tombstones.remove(&ttid.tenant_id).is_some() + fn remove_timeline_tombstone(&mut self, ttid: &TenantTimelineId) -> bool { + self.timeline_tombstones.remove(ttid).is_some() } - fn delete(&mut self, ttid: TenantTimelineId) { + fn delete(&mut self, ttid: TenantTimelineId, generation: Option) { self.timelines.remove(&ttid); - self.tombstones.insert(ttid, Instant::now()); + self.timeline_tombstones + .insert(ttid, TimelineTombstone::new(generation)); } fn add_tenant_tombstone(&mut self, tenant_id: TenantId) { @@ -120,7 +153,7 @@ impl GlobalTimelines { Self { state: Mutex::new(GlobalTimelinesState { timelines: HashMap::new(), - tombstones: HashMap::new(), + timeline_tombstones: HashMap::new(), tenant_tombstones: HashMap::new(), conf, broker_active_set: Arc::new(TimelinesSet::default()), @@ -261,6 +294,8 @@ impl GlobalTimelines { start_lsn: Lsn, commit_lsn: Lsn, ) -> Result> { + let generation = Some(mconf.generation); + let (conf, _, _, _) = { let state = self.state.lock().unwrap(); if let Ok(timeline) = state.get(&ttid) { @@ -268,8 +303,8 @@ impl GlobalTimelines { return Ok(timeline); } - if state.has_tombstone(&ttid) { - anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate"); + if state.has_tombstone(&ttid, generation) { + anyhow::bail!(TimelineError::Deleted(ttid)); } state.get_dependencies() @@ -284,7 +319,9 @@ impl GlobalTimelines { // immediately initialize first WAL segment as well. let state = TimelinePersistentState::new(&ttid, mconf, server_info, start_lsn, commit_lsn)?; control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?; - let timeline = self.load_temp_timeline(ttid, &tmp_dir_path, true).await?; + let timeline = self + .load_temp_timeline(ttid, &tmp_dir_path, generation) + .await?; Ok(timeline) } @@ -303,7 +340,7 @@ impl GlobalTimelines { &self, ttid: TenantTimelineId, tmp_path: &Utf8PathBuf, - check_tombstone: bool, + generation: Option, ) -> Result> { // Check for existence and mark that we're creating it. let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = { @@ -317,18 +354,18 @@ impl GlobalTimelines { } _ => {} } - if check_tombstone { - if state.has_tombstone(&ttid) { - anyhow::bail!("timeline {ttid} is deleted, refusing to recreate"); - } - } else { - // We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust - // that the human doing this manual intervention knows what they are doing, and remove its tombstone. - // It's also possible that we enter this when the tenant has been deleted, even if the timeline itself has never existed. - if state.remove_tombstone(&ttid) { - warn!("un-deleted timeline {ttid}"); - } + + if state.has_tombstone(&ttid, generation) { + // If the timeline is deleted, we refuse to recreate it. + // This is a safeguard against accidentally overwriting a timeline that was deleted + // by concurrent request. + anyhow::bail!(TimelineError::Deleted(ttid)); } + + // We might have an outdated tombstone with the older generation. + // Remove it unconditionally. + state.remove_timeline_tombstone(&ttid); + state .timelines .insert(ttid, GlobalMapTimeline::CreationInProgress); @@ -503,11 +540,16 @@ impl GlobalTimelines { ttid: &TenantTimelineId, action: DeleteOrExclude, ) -> Result { + let generation = match &action { + DeleteOrExclude::Delete | DeleteOrExclude::DeleteLocal => None, + DeleteOrExclude::Exclude(mconf) => Some(mconf.generation), + }; + let tli_res = { let state = self.state.lock().unwrap(); // Do NOT check tenant tombstones here: those were set earlier - if state.tombstones.contains_key(ttid) { + if state.has_timeline_tombstone(ttid, generation) { // Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do. info!("Timeline {ttid} was already deleted"); return Ok(TimelineDeleteResult { dir_existed: false }); @@ -528,6 +570,11 @@ impl GlobalTimelines { // We would like to avoid holding the lock while waiting for the // gate to finish as this is deadlock prone, so for actual // deletion will take it second time. + // + // Canceling the timeline will block membership switch requests, + // ensuring that the timeline generation will not increase + // after this point, and we will not remove a timeline with a generation + // higher than the requested one. if let DeleteOrExclude::Exclude(ref mconf) = action { let shared_state = timeline.read_shared_state().await; if shared_state.sk.state().mconf.generation > mconf.generation { @@ -536,9 +583,9 @@ impl GlobalTimelines { current: shared_state.sk.state().mconf.clone(), }); } - timeline.cancel().await; + timeline.cancel(); } else { - timeline.cancel().await; + timeline.cancel(); } timeline.close().await; @@ -565,7 +612,7 @@ impl GlobalTimelines { // Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones // are used to prevent still-running computes from re-creating the same timeline when they send data, // and to speed up repeated deletion calls by avoiding re-listing objects. - self.state.lock().unwrap().delete(*ttid); + self.state.lock().unwrap().delete(*ttid, generation); result } @@ -627,12 +674,16 @@ impl GlobalTimelines { // may recreate a deleted timeline. let now = Instant::now(); state - .tombstones - .retain(|_, v| now.duration_since(*v) < *tombstone_ttl); + .timeline_tombstones + .retain(|_, v| now.duration_since(v.timestamp) < *tombstone_ttl); state .tenant_tombstones .retain(|_, v| now.duration_since(*v) < *tombstone_ttl); } + + pub fn get_sk_id(&self) -> NodeId { + self.state.lock().unwrap().conf.my_id + } } /// Action for delete_or_exclude. @@ -673,6 +724,7 @@ pub async fn validate_temp_timeline( conf: &SafeKeeperConf, ttid: TenantTimelineId, path: &Utf8PathBuf, + generation: Option, ) -> Result<(Lsn, Lsn)> { let control_path = path.join("safekeeper.control"); @@ -681,6 +733,15 @@ pub async fn validate_temp_timeline( bail!("wal_seg_size is not set"); } + if let Some(generation) = generation { + if control_store.mconf.generation > generation { + bail!( + "tmp timeline generation {} is higher than expected {generation}", + control_store.mconf.generation + ); + } + } + let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?; let commit_lsn = control_store.commit_lsn; @@ -688,3 +749,28 @@ pub async fn validate_temp_timeline( Ok((commit_lsn, flush_lsn)) } + +/// A tombstone for a deleted timeline. +/// The generation is passed with "exclude" request and stored in the tombstone. +/// We ignore the tombstone if the request generation is higher than +/// the tombstone generation. +/// If the tombstone doesn't have a generation, it's considered permanent, +/// e.g. after "delete" request. +struct TimelineTombstone { + timestamp: Instant, + generation: Option, +} + +impl TimelineTombstone { + fn new(generation: Option) -> Self { + TimelineTombstone { + timestamp: Instant::now(), + generation, + } + } + + /// Check if the timeline is still valid for the given generation. + fn is_valid(&self, generation: SafekeeperGeneration) -> bool { + self.generation.is_none_or(|g| g >= generation) + } +} diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs index b67a679fad..7dbbd3afe4 100644 --- a/storage_controller/src/service/safekeeper_reconciler.rs +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -364,7 +364,12 @@ impl SafekeeperReconcilerInner { http_hosts, tenant_id: req.tenant_id, timeline_id, - ignore_tombstone: Some(false), + // TODO(diko): get mconf from "timelines" table and pass it here. + // Now we use pull_timeline reconciliation only for the timeline creation, + // so it's not critical right now. + // It could be fixed together with other reconciliation issues: + // https://github.com/neondatabase/neon/issues/12189 + mconf: None, }; success = self .reconcile_inner( diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index bc77a1a6b8..a60ebb85c6 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -991,6 +991,7 @@ impl Service { timeline_id: TimelineId, to_safekeepers: &[Safekeeper], from_safekeepers: &[Safekeeper], + mconf: membership::Configuration, ) -> Result<(), ApiError> { let http_hosts = from_safekeepers .iter() @@ -1009,14 +1010,11 @@ impl Service { .collect::>() ); - // TODO(diko): need to pass mconf/generation with the request - // to properly handle tombstones. Ignore tombstones for now. - // Worst case: we leave a timeline on a safekeeper which is not in the current set. let req = PullTimelineRequest { tenant_id, timeline_id, http_hosts, - ignore_tombstone: Some(true), + mconf: Some(mconf), }; const SK_PULL_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); @@ -1336,6 +1334,7 @@ impl Service { timeline_id, &pull_to_safekeepers, &cur_safekeepers, + joint_config.clone(), ) .await?; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 88919fe888..f7917f214a 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1540,6 +1540,17 @@ class NeonEnv: raise RuntimeError(f"Pageserver with ID {id} not found") + def get_safekeeper(self, id: int) -> Safekeeper: + """ + Look up a safekeeper by its ID. + """ + + for sk in self.safekeepers: + if sk.id == id: + return sk + + raise RuntimeError(f"Safekeeper with ID {id} not found") + def get_tenant_pageserver(self, tenant_id: TenantId | TenantShardId): """ Get the NeonPageserver where this tenant shard is currently attached, according @@ -5391,15 +5402,24 @@ class Safekeeper(LogUtils): return timeline_status.commit_lsn def pull_timeline( - self, srcs: list[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId + self, + srcs: list[Safekeeper], + tenant_id: TenantId, + timeline_id: TimelineId, + mconf: MembershipConfiguration | None = None, ) -> dict[str, Any]: """ pull_timeline from srcs to self. """ src_https = [f"http://localhost:{sk.port.http}" for sk in srcs] - res = self.http_client().pull_timeline( - {"tenant_id": str(tenant_id), "timeline_id": str(timeline_id), "http_hosts": src_https} - ) + body: dict[str, Any] = { + "tenant_id": str(tenant_id), + "timeline_id": str(timeline_id), + "http_hosts": src_https, + } + if mconf is not None: + body["mconf"] = mconf.__dict__ + res = self.http_client().pull_timeline(body) src_ids = [sk.id for sk in srcs] log.info(f"finished pulling timeline from {src_ids} to {self.id}") return res diff --git a/test_runner/regress/test_safekeeper_migration.py b/test_runner/regress/test_safekeeper_migration.py index 371bec0c62..2ceeea37a5 100644 --- a/test_runner/regress/test_safekeeper_migration.py +++ b/test_runner/regress/test_safekeeper_migration.py @@ -1,5 +1,6 @@ from __future__ import annotations +import re from typing import TYPE_CHECKING import pytest @@ -12,7 +13,7 @@ if TYPE_CHECKING: # TODO(diko): pageserver spams with various errors during safekeeper migration. # Fix the code so it handles the migration better. -ALLOWED_PAGESERVER_ERRORS = [ +PAGESERVER_ALLOWED_ERRORS = [ ".*Timeline .* was cancelled and cannot be used anymore.*", ".*Timeline .* has been deleted.*", ".*Timeline .* was not found in global map.*", @@ -35,7 +36,7 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder): "timeline_safekeeper_count": 1, } env = neon_env_builder.init_start() - env.pageserver.allowed_errors.extend(ALLOWED_PAGESERVER_ERRORS) + env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS) ep = env.endpoints.create("main", tenant_id=env.initial_tenant) @@ -136,7 +137,7 @@ def test_safekeeper_migration_common_set_failpoints(neon_env_builder: NeonEnvBui "timeline_safekeeper_count": 3, } env = neon_env_builder.init_start() - env.pageserver.allowed_errors.extend(ALLOWED_PAGESERVER_ERRORS) + env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS) mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) assert len(mconf["sk_set"]) == 3 @@ -196,3 +197,92 @@ def test_safekeeper_migration_common_set_failpoints(neon_env_builder: NeonEnvBui assert ( f"timeline {env.initial_tenant}/{env.initial_timeline} deleted" in exc.value.response.text ) + + +def test_sk_generation_aware_tombstones(neon_env_builder: NeonEnvBuilder): + """ + Test that safekeeper respects generations: + 1. Check that migration back and forth between two safekeepers works. + 2. Check that sk refuses to execute requests with stale generation. + """ + neon_env_builder.num_safekeepers = 3 + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": True, + "timeline_safekeeper_count": 1, + } + env = neon_env_builder.init_start() + env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS) + + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + assert mconf["new_sk_set"] is None + assert len(mconf["sk_set"]) == 1 + cur_sk = mconf["sk_set"][0] + + second_sk, third_sk = [sk.id for sk in env.safekeepers if sk.id != cur_sk] + cur_gen = 1 + + # Pull the timeline manually to third_sk, so the timeline exists there with stale generation. + # This is needed for the test later. + env.get_safekeeper(third_sk).pull_timeline( + [env.get_safekeeper(cur_sk)], env.initial_tenant, env.initial_timeline + ) + + def expect_deleted(sk_id: int): + with pytest.raises(requests.exceptions.HTTPError, match="Not Found") as exc: + env.get_safekeeper(sk_id).http_client().timeline_status( + env.initial_tenant, env.initial_timeline + ) + assert exc.value.response.status_code == 404 + assert re.match(r".*timeline .* deleted.*", exc.value.response.text) + + def get_mconf(sk_id: int): + status = ( + env.get_safekeeper(sk_id) + .http_client() + .timeline_status(env.initial_tenant, env.initial_timeline) + ) + assert status.mconf is not None + return status.mconf + + def migrate(): + nonlocal cur_sk, second_sk, cur_gen + env.storage_controller.migrate_safekeepers( + env.initial_tenant, env.initial_timeline, [second_sk] + ) + cur_sk, second_sk = second_sk, cur_sk + cur_gen += 2 + + # Migrate the timeline back and forth between cur_sk and second_sk. + for _i in range(3): + migrate() + # Timeline should exist on cur_sk. + assert get_mconf(cur_sk).generation == cur_gen + # Timeline should be deleted on second_sk. + expect_deleted(second_sk) + + # Remember current mconf. + mconf = get_mconf(cur_sk) + + # Migrate the timeline one more time. + # It increases the generation by 2. + migrate() + + # Check that sk refuses to execute the exclude request with the old mconf. + with pytest.raises(requests.exceptions.HTTPError, match="Conflict") as exc: + env.get_safekeeper(cur_sk).http_client().timeline_exclude( + env.initial_tenant, env.initial_timeline, mconf + ) + assert re.match(r".*refused to switch into excluding mconf.*", exc.value.response.text) + # We shouldn't have deleted the timeline. + assert get_mconf(cur_sk).generation == cur_gen + + # Check that sk refuses to execute the pull_timeline request with the old mconf. + # Note: we try to pull from third_sk, which has a timeline with stale generation. + # Thus, we bypass some preliminary generation checks and actually test tombstones. + with pytest.raises(requests.exceptions.HTTPError, match="Conflict") as exc: + env.get_safekeeper(second_sk).pull_timeline( + [env.get_safekeeper(third_sk)], env.initial_tenant, env.initial_timeline, mconf + ) + assert re.match(r".*Timeline .* deleted.*", exc.value.response.text) + # The timeline should remain deleted. + expect_deleted(second_sk)