safekeeper: generation aware timeline tombstones (#12482)

## Problem
With safekeeper migration in mind, we can now pull/exclude the timeline
multiple times within the same safekeeper. To avoid races between out of
order requests, we need to ignore the pull/exclude requests if we have
already seen a higher generation.

- Closes: https://github.com/neondatabase/neon/issues/12186
- Closes: [LKB-949](https://databricks.atlassian.net/browse/LKB-949)

## Summary of changes
- Annotate timeline tombstones in safekeeper with request generation.
- Replace `ignore_tombstone` option with `mconf` in
`PullTimelineRequest`
- Switch membership in `pull_timeline` if the existing/pulled timeline
has an older generation.
- Refuse to switch membership if the timeline is being deleted
(`is_canceled`).
- Refuse to switch membership in compute greeting request if the
safekeeper is not a member of `mconf`.
- Pass `mconf` in `PullTimelineRequest` in safekeeper_service

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
This commit is contained in:
Dmitrii Kovalkov
2025-07-23 15:01:04 +04:00
committed by GitHub
parent fc242afcc2
commit 94cb9a79d9
12 changed files with 340 additions and 65 deletions

View File

@@ -161,9 +161,9 @@ pub async fn handle_request(
FileStorage::create_new(&tli_dir_path, new_state.clone(), conf.no_sync).await?;
// now we have a ready timeline in a temp directory
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path, None).await?;
global_timelines
.load_temp_timeline(request.destination_ttid, &tli_dir_path, true)
.load_temp_timeline(request.destination_ttid, &tli_dir_path, None)
.await?;
Ok(())

View File

@@ -193,7 +193,7 @@ pub async fn hcc_pull_timeline(
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
http_hosts: Vec::new(),
ignore_tombstone: None,
mconf: None,
};
for host in timeline.peers {
if host.0 == conf.my_id.0 {

View File

@@ -352,7 +352,7 @@ async fn timeline_exclude_handler(mut request: Request<Body>) -> Result<Response
// instead.
if data.mconf.contains(my_id) {
return Err(ApiError::Forbidden(format!(
"refused to switch into {}, node {} is member of it",
"refused to exclude timeline with {}, node {} is member of it",
data.mconf, my_id
)));
}

View File

@@ -13,8 +13,8 @@ use http_utils::error::ApiError;
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
use remote_storage::GenericRemoteStorage;
use reqwest::Certificate;
use safekeeper_api::Term;
use safekeeper_api::models::{PullTimelineRequest, PullTimelineResponse, TimelineStatus};
use safekeeper_api::{Term, membership};
use safekeeper_client::mgmt_api;
use safekeeper_client::mgmt_api::Client;
use serde::Deserialize;
@@ -453,12 +453,40 @@ pub async fn handle_request(
global_timelines: Arc<GlobalTimelines>,
wait_for_peer_timeline_status: bool,
) -> Result<PullTimelineResponse, ApiError> {
if let Some(mconf) = &request.mconf {
let sk_id = global_timelines.get_sk_id();
if !mconf.contains(sk_id) {
return Err(ApiError::BadRequest(anyhow!(
"refused to pull timeline with {mconf}, node {sk_id} is not member of it",
)));
}
}
let existing_tli = global_timelines.get(TenantTimelineId::new(
request.tenant_id,
request.timeline_id,
));
if existing_tli.is_ok() {
info!("Timeline {} already exists", request.timeline_id);
if let Ok(timeline) = existing_tli {
let cur_generation = timeline
.read_shared_state()
.await
.sk
.state()
.mconf
.generation;
info!(
"Timeline {} already exists with generation {cur_generation}",
request.timeline_id,
);
if let Some(mconf) = request.mconf {
timeline
.membership_switch(mconf)
.await
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
}
return Ok(PullTimelineResponse {
safekeeper_host: None,
});
@@ -495,6 +523,19 @@ pub async fn handle_request(
for (i, response) in responses.into_iter().enumerate() {
match response {
Ok(status) => {
if let Some(mconf) = &request.mconf {
if status.mconf.generation > mconf.generation {
// We probably raced with another timeline membership change with higher generation.
// Ignore this request.
return Err(ApiError::Conflict(format!(
"cannot pull timeline with generation {}: timeline {} already exists with generation {} on {}",
mconf.generation,
request.timeline_id,
status.mconf.generation,
http_hosts[i],
)));
}
}
statuses.push((status, i));
}
Err(e) => {
@@ -593,15 +634,13 @@ pub async fn handle_request(
assert!(status.tenant_id == request.tenant_id);
assert!(status.timeline_id == request.timeline_id);
let check_tombstone = !request.ignore_tombstone.unwrap_or_default();
match pull_timeline(
status,
safekeeper_host,
sk_auth_token,
http_client,
global_timelines,
check_tombstone,
request.mconf,
)
.await
{
@@ -611,6 +650,10 @@ pub async fn handle_request(
Some(TimelineError::AlreadyExists(_)) => Ok(PullTimelineResponse {
safekeeper_host: None,
}),
Some(TimelineError::Deleted(_)) => Err(ApiError::Conflict(format!(
"Timeline {}/{} deleted",
request.tenant_id, request.timeline_id
))),
Some(TimelineError::CreationInProgress(_)) => {
// We don't return success here because creation might still fail.
Err(ApiError::Conflict("Creation in progress".to_owned()))
@@ -627,7 +670,7 @@ async fn pull_timeline(
sk_auth_token: Option<SecretString>,
http_client: reqwest::Client,
global_timelines: Arc<GlobalTimelines>,
check_tombstone: bool,
mconf: Option<membership::Configuration>,
) -> Result<PullTimelineResponse> {
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
info!(
@@ -689,8 +732,11 @@ async fn pull_timeline(
// fsync temp timeline directory to remember its contents.
fsync_async_opt(&tli_dir_path, !conf.no_sync).await?;
let generation = mconf.as_ref().map(|c| c.generation);
// Let's create timeline from temp directory and verify that it's correct
let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
let (commit_lsn, flush_lsn) =
validate_temp_timeline(conf, ttid, &tli_dir_path, generation).await?;
info!(
"finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
ttid, commit_lsn, flush_lsn
@@ -698,10 +744,20 @@ async fn pull_timeline(
assert!(status.commit_lsn <= status.flush_lsn);
// Finally, load the timeline.
let _tli = global_timelines
.load_temp_timeline(ttid, &tli_dir_path, check_tombstone)
let timeline = global_timelines
.load_temp_timeline(ttid, &tli_dir_path, generation)
.await?;
if let Some(mconf) = mconf {
// Switch to provided mconf to guarantee that the timeline will not
// be deleted by request with older generation.
// The generation might already be higer than the one in mconf, e.g.
// if another membership_switch request was executed between `load_temp_timeline`
// and `membership_switch`, but that's totaly fine. `membership_switch` will
// ignore switch to older generation.
timeline.membership_switch(mconf).await?;
}
Ok(PullTimelineResponse {
safekeeper_host: Some(host),
})

View File

@@ -1026,6 +1026,13 @@ where
self.state.finish_change(&state).await?;
}
if msg.mconf.generation > self.state.mconf.generation && !msg.mconf.contains(self.node_id) {
bail!(
"refused to switch into {}, node {} is not a member of it",
msg.mconf,
self.node_id,
);
}
// Switch into conf given by proposer conf if it is higher.
self.state.membership_switch(msg.mconf.clone()).await?;

View File

@@ -594,7 +594,7 @@ impl Timeline {
/// Cancel the timeline, requesting background activity to stop. Closing
/// the `self.gate` waits for that.
pub async fn cancel(&self) {
pub fn cancel(&self) {
info!("timeline {} shutting down", self.ttid);
self.cancel.cancel();
}
@@ -914,6 +914,13 @@ impl Timeline {
to: Configuration,
) -> Result<TimelineMembershipSwitchResponse> {
let mut state = self.write_shared_state().await;
// Ensure we don't race with exclude/delete requests by checking the cancellation
// token under the write_shared_state lock.
// Exclude/delete cancel the timeline under the shared state lock,
// so the timeline cannot be deleted in the middle of the membership switch.
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
state.sk.membership_switch(to).await
}

View File

@@ -10,13 +10,13 @@ use std::time::{Duration, Instant};
use anyhow::{Context, Result, bail};
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use safekeeper_api::membership::Configuration;
use safekeeper_api::membership::{Configuration, SafekeeperGeneration};
use safekeeper_api::models::{SafekeeperUtilization, TimelineDeleteResult};
use safekeeper_api::{ServerInfo, membership};
use tokio::fs;
use tracing::*;
use utils::crashsafe::{durable_rename, fsync_async_opt};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
@@ -40,10 +40,17 @@ enum GlobalMapTimeline {
struct GlobalTimelinesState {
timelines: HashMap<TenantTimelineId, GlobalMapTimeline>,
// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
// this map is dropped on restart.
tombstones: HashMap<TenantTimelineId, Instant>,
/// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
/// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
/// this map is dropped on restart.
/// The timeline might also be locally deleted (excluded) via safekeeper migration algorithm. In that case,
/// the tombsone contains the corresponding safekeeper generation. The pull_timeline requests with
/// higher generation ignore such tombstones and can recreate the timeline.
timeline_tombstones: HashMap<TenantTimelineId, TimelineTombstone>,
/// A tombstone indicates that the tenant used to exist has been deleted.
/// These are created only by tenant_delete requests. They are always valid regardless of the
/// request generation.
/// This is only soft-enforced, as this map is dropped on restart.
tenant_tombstones: HashMap<TenantId, Instant>,
conf: Arc<SafeKeeperConf>,
@@ -79,7 +86,7 @@ impl GlobalTimelinesState {
Err(TimelineError::CreationInProgress(*ttid))
}
None => {
if self.has_tombstone(ttid) {
if self.has_tombstone(ttid, None) {
Err(TimelineError::Deleted(*ttid))
} else {
Err(TimelineError::NotFound(*ttid))
@@ -88,20 +95,46 @@ impl GlobalTimelinesState {
}
}
fn has_tombstone(&self, ttid: &TenantTimelineId) -> bool {
self.tombstones.contains_key(ttid) || self.tenant_tombstones.contains_key(&ttid.tenant_id)
fn has_timeline_tombstone(
&self,
ttid: &TenantTimelineId,
generation: Option<SafekeeperGeneration>,
) -> bool {
if let Some(generation) = generation {
self.timeline_tombstones
.get(ttid)
.is_some_and(|t| t.is_valid(generation))
} else {
self.timeline_tombstones.contains_key(ttid)
}
}
/// Removes all blocking tombstones for the given timeline ID.
fn has_tenant_tombstone(&self, tenant_id: &TenantId) -> bool {
self.tenant_tombstones.contains_key(tenant_id)
}
/// Check if the state has a tenant or a timeline tombstone.
/// If `generation` is provided, check only for timeline tombsotnes with same or higher generation.
/// If `generation` is `None`, check for any timeline tombstone.
/// Tenant tombstones are checked regardless of the generation.
fn has_tombstone(
&self,
ttid: &TenantTimelineId,
generation: Option<SafekeeperGeneration>,
) -> bool {
self.has_timeline_tombstone(ttid, generation) || self.has_tenant_tombstone(&ttid.tenant_id)
}
/// Removes timeline tombstone for the given timeline ID.
/// Returns `true` if there have been actual changes.
fn remove_tombstone(&mut self, ttid: &TenantTimelineId) -> bool {
self.tombstones.remove(ttid).is_some()
|| self.tenant_tombstones.remove(&ttid.tenant_id).is_some()
fn remove_timeline_tombstone(&mut self, ttid: &TenantTimelineId) -> bool {
self.timeline_tombstones.remove(ttid).is_some()
}
fn delete(&mut self, ttid: TenantTimelineId) {
fn delete(&mut self, ttid: TenantTimelineId, generation: Option<SafekeeperGeneration>) {
self.timelines.remove(&ttid);
self.tombstones.insert(ttid, Instant::now());
self.timeline_tombstones
.insert(ttid, TimelineTombstone::new(generation));
}
fn add_tenant_tombstone(&mut self, tenant_id: TenantId) {
@@ -120,7 +153,7 @@ impl GlobalTimelines {
Self {
state: Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
tombstones: HashMap::new(),
timeline_tombstones: HashMap::new(),
tenant_tombstones: HashMap::new(),
conf,
broker_active_set: Arc::new(TimelinesSet::default()),
@@ -261,6 +294,8 @@ impl GlobalTimelines {
start_lsn: Lsn,
commit_lsn: Lsn,
) -> Result<Arc<Timeline>> {
let generation = Some(mconf.generation);
let (conf, _, _, _) = {
let state = self.state.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) {
@@ -268,8 +303,8 @@ impl GlobalTimelines {
return Ok(timeline);
}
if state.has_tombstone(&ttid) {
anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
if state.has_tombstone(&ttid, generation) {
anyhow::bail!(TimelineError::Deleted(ttid));
}
state.get_dependencies()
@@ -284,7 +319,9 @@ impl GlobalTimelines {
// immediately initialize first WAL segment as well.
let state = TimelinePersistentState::new(&ttid, mconf, server_info, start_lsn, commit_lsn)?;
control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
let timeline = self.load_temp_timeline(ttid, &tmp_dir_path, true).await?;
let timeline = self
.load_temp_timeline(ttid, &tmp_dir_path, generation)
.await?;
Ok(timeline)
}
@@ -303,7 +340,7 @@ impl GlobalTimelines {
&self,
ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf,
check_tombstone: bool,
generation: Option<SafekeeperGeneration>,
) -> Result<Arc<Timeline>> {
// Check for existence and mark that we're creating it.
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
@@ -317,18 +354,18 @@ impl GlobalTimelines {
}
_ => {}
}
if check_tombstone {
if state.has_tombstone(&ttid) {
anyhow::bail!("timeline {ttid} is deleted, refusing to recreate");
}
} else {
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
// It's also possible that we enter this when the tenant has been deleted, even if the timeline itself has never existed.
if state.remove_tombstone(&ttid) {
warn!("un-deleted timeline {ttid}");
}
if state.has_tombstone(&ttid, generation) {
// If the timeline is deleted, we refuse to recreate it.
// This is a safeguard against accidentally overwriting a timeline that was deleted
// by concurrent request.
anyhow::bail!(TimelineError::Deleted(ttid));
}
// We might have an outdated tombstone with the older generation.
// Remove it unconditionally.
state.remove_timeline_tombstone(&ttid);
state
.timelines
.insert(ttid, GlobalMapTimeline::CreationInProgress);
@@ -503,11 +540,16 @@ impl GlobalTimelines {
ttid: &TenantTimelineId,
action: DeleteOrExclude,
) -> Result<TimelineDeleteResult, DeleteOrExcludeError> {
let generation = match &action {
DeleteOrExclude::Delete | DeleteOrExclude::DeleteLocal => None,
DeleteOrExclude::Exclude(mconf) => Some(mconf.generation),
};
let tli_res = {
let state = self.state.lock().unwrap();
// Do NOT check tenant tombstones here: those were set earlier
if state.tombstones.contains_key(ttid) {
if state.has_timeline_tombstone(ttid, generation) {
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
info!("Timeline {ttid} was already deleted");
return Ok(TimelineDeleteResult { dir_existed: false });
@@ -528,6 +570,11 @@ impl GlobalTimelines {
// We would like to avoid holding the lock while waiting for the
// gate to finish as this is deadlock prone, so for actual
// deletion will take it second time.
//
// Canceling the timeline will block membership switch requests,
// ensuring that the timeline generation will not increase
// after this point, and we will not remove a timeline with a generation
// higher than the requested one.
if let DeleteOrExclude::Exclude(ref mconf) = action {
let shared_state = timeline.read_shared_state().await;
if shared_state.sk.state().mconf.generation > mconf.generation {
@@ -536,9 +583,9 @@ impl GlobalTimelines {
current: shared_state.sk.state().mconf.clone(),
});
}
timeline.cancel().await;
timeline.cancel();
} else {
timeline.cancel().await;
timeline.cancel();
}
timeline.close().await;
@@ -565,7 +612,7 @@ impl GlobalTimelines {
// Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
// are used to prevent still-running computes from re-creating the same timeline when they send data,
// and to speed up repeated deletion calls by avoiding re-listing objects.
self.state.lock().unwrap().delete(*ttid);
self.state.lock().unwrap().delete(*ttid, generation);
result
}
@@ -627,12 +674,16 @@ impl GlobalTimelines {
// may recreate a deleted timeline.
let now = Instant::now();
state
.tombstones
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
.timeline_tombstones
.retain(|_, v| now.duration_since(v.timestamp) < *tombstone_ttl);
state
.tenant_tombstones
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
}
pub fn get_sk_id(&self) -> NodeId {
self.state.lock().unwrap().conf.my_id
}
}
/// Action for delete_or_exclude.
@@ -673,6 +724,7 @@ pub async fn validate_temp_timeline(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
path: &Utf8PathBuf,
generation: Option<SafekeeperGeneration>,
) -> Result<(Lsn, Lsn)> {
let control_path = path.join("safekeeper.control");
@@ -681,6 +733,15 @@ pub async fn validate_temp_timeline(
bail!("wal_seg_size is not set");
}
if let Some(generation) = generation {
if control_store.mconf.generation > generation {
bail!(
"tmp timeline generation {} is higher than expected {generation}",
control_store.mconf.generation
);
}
}
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;
let commit_lsn = control_store.commit_lsn;
@@ -688,3 +749,28 @@ pub async fn validate_temp_timeline(
Ok((commit_lsn, flush_lsn))
}
/// A tombstone for a deleted timeline.
/// The generation is passed with "exclude" request and stored in the tombstone.
/// We ignore the tombstone if the request generation is higher than
/// the tombstone generation.
/// If the tombstone doesn't have a generation, it's considered permanent,
/// e.g. after "delete" request.
struct TimelineTombstone {
timestamp: Instant,
generation: Option<SafekeeperGeneration>,
}
impl TimelineTombstone {
fn new(generation: Option<SafekeeperGeneration>) -> Self {
TimelineTombstone {
timestamp: Instant::now(),
generation,
}
}
/// Check if the timeline is still valid for the given generation.
fn is_valid(&self, generation: SafekeeperGeneration) -> bool {
self.generation.is_none_or(|g| g >= generation)
}
}