safekeeper: exclude API (#10757)

## Problem

https://github.com/neondatabase/neon/pull/10241 added configuration
switch endpoint, but it didn't delete timeline if node was excluded.

## Summary of changes

Add separate /exclude API endpoint which similarly accepts membership
configuration where sk is supposed by be excluded. Implementation
deletes the timeline locally.

Some more small related tweaks:
- make mconf switch API PUT instead of POST as it is idempotent;
- return 409 if switch was refused instead of 200 with requested &
current;
- remove unused was_active flag from delete response;
- remove meaningless _force suffix from delete functions names;
- reuse timeline.rs delete_dir function in timelines_global_map instead
of its own copy.

part of https://github.com/neondatabase/neon/issues/9965
This commit is contained in:
Arseny Sher
2025-02-26 22:26:33 +03:00
committed by GitHub
parent c1a040447d
commit 643a48210f
6 changed files with 212 additions and 75 deletions

View File

@@ -85,12 +85,12 @@ impl MemberSet {
Ok(MemberSet { m: members })
}
pub fn contains(&self, sk: &SafekeeperId) -> bool {
self.m.iter().any(|m| m.id == sk.id)
pub fn contains(&self, sk: NodeId) -> bool {
self.m.iter().any(|m| m.id == sk)
}
pub fn add(&mut self, sk: SafekeeperId) -> anyhow::Result<()> {
if self.contains(&sk) {
if self.contains(sk.id) {
bail!(format!(
"sk {} is already member of the set {}",
sk.id, self
@@ -130,6 +130,11 @@ impl Configuration {
new_members: None,
}
}
/// Is `sk_id` member of the configuration?
pub fn contains(&self, sk_id: NodeId) -> bool {
self.members.contains(sk_id) || self.new_members.as_ref().is_some_and(|m| m.contains(sk_id))
}
}
impl Display for Configuration {

View File

@@ -19,7 +19,7 @@ use safekeeper_api::models::{
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TermSwitchApiEntry,
TimelineCopyRequest, TimelineCreateRequest, TimelineStatus, TimelineTermBumpRequest,
};
use safekeeper_api::{ServerInfo, models};
use safekeeper_api::{ServerInfo, membership, models};
use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
use tokio::sync::mpsc;
use tokio::task;
@@ -32,7 +32,7 @@ use utils::lsn::Lsn;
use crate::debug_dump::TimelineDigestRequest;
use crate::safekeeper::TermLsn;
use crate::timelines_global_map::TimelineDeleteForceResult;
use crate::timelines_global_map::{DeleteOrExclude, TimelineDeleteResult};
use crate::{
GlobalTimelines, SafeKeeperConf, copy_timeline, debug_dump, patch_control_file, pull_timeline,
};
@@ -73,10 +73,13 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
check_permission(&request, Some(tenant_id))?;
ensure_no_body(&mut request).await?;
let global_timelines = get_global_timelines(&request);
// FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
// Using an `InternalServerError` should be fixed when the types support it
let action = if only_local {
DeleteOrExclude::DeleteLocal
} else {
DeleteOrExclude::Delete
};
let delete_info = global_timelines
.delete_force_all_for_tenant(&tenant_id, only_local)
.delete_all_for_tenant(&tenant_id, action)
.await
.map_err(ApiError::InternalServerError)?;
json_response(
@@ -84,7 +87,7 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
delete_info
.iter()
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
.collect::<HashMap<String, TimelineDeleteForceResult>>(),
.collect::<HashMap<String, TimelineDeleteResult>>(),
)
}
@@ -208,12 +211,15 @@ async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<
check_permission(&request, Some(ttid.tenant_id))?;
ensure_no_body(&mut request).await?;
let global_timelines = get_global_timelines(&request);
// FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
// error handling here when we're able to.
let action = if only_local {
DeleteOrExclude::DeleteLocal
} else {
DeleteOrExclude::Delete
};
let resp = global_timelines
.delete(&ttid, only_local)
.delete_or_exclude(&ttid, action)
.await
.map_err(ApiError::InternalServerError)?;
.map_err(ApiError::from)?;
json_response(StatusCode::OK, resp)
}
@@ -267,6 +273,64 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
Ok(response)
}
/// Error type for delete_or_exclude: either generation conflict or something
/// internal.
#[derive(thiserror::Error, Debug)]
pub enum DeleteOrExcludeError {
#[error("refused to switch into excluding mconf {requested}, current: {current}")]
Conflict {
requested: membership::Configuration,
current: membership::Configuration,
},
#[error(transparent)]
Other(#[from] anyhow::Error),
}
/// Convert DeleteOrExcludeError to ApiError.
impl From<DeleteOrExcludeError> for ApiError {
fn from(de: DeleteOrExcludeError) -> ApiError {
match de {
DeleteOrExcludeError::Conflict {
requested: _,
current: _,
} => ApiError::Conflict(de.to_string()),
DeleteOrExcludeError::Other(e) => ApiError::InternalServerError(e),
}
}
}
/// Remove timeline locally after this node has been excluded from the
/// membership configuration. The body is the same as in the membership endpoint
/// -- conf where node is excluded -- and in principle single ep could be used
/// for both actions, but since this is a data deletion op let's keep them
/// separate.
async fn timeline_exclude_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
check_permission(&request, Some(ttid.tenant_id))?;
let global_timelines = get_global_timelines(&request);
let data: models::TimelineMembershipSwitchRequest = json_request(&mut request).await?;
let my_id = get_conf(&request).my_id;
// If request doesn't exclude us, membership switch endpoint should be used
// instead.
if data.mconf.contains(my_id) {
return Err(ApiError::Forbidden(format!(
"refused to switch into {}, node {} is member of it",
data.mconf, my_id
)));
}
let action = DeleteOrExclude::Exclude(data.mconf);
let resp = global_timelines
.delete_or_exclude(&ttid, action)
.await
.map_err(ApiError::from)?;
json_response(StatusCode::OK, resp)
}
/// Consider switching timeline membership configuration to the provided one.
async fn timeline_membership_handler(
mut request: Request<Body>,
@@ -281,12 +345,29 @@ async fn timeline_membership_handler(
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
let data: models::TimelineMembershipSwitchRequest = json_request(&mut request).await?;
let my_id = get_conf(&request).my_id;
// If request excludes us, exclude endpoint should be used instead.
if !data.mconf.contains(my_id) {
return Err(ApiError::Forbidden(format!(
"refused to switch into {}, node {} is not a member of it",
data.mconf, my_id
)));
}
let req_gen = data.mconf.generation;
let response = tli
.membership_switch(data.mconf)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, response)
// Return 409 if request was ignored.
if req_gen == response.current_conf.generation {
json_response(StatusCode::OK, response)
} else {
Err(ApiError::Conflict(format!(
"request to switch into {} ignored, current generation {}",
req_gen, response.current_conf.generation
)))
}
}
async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -637,11 +718,14 @@ pub fn make_router(
.post("/v1/pull_timeline", |r| {
request_span(r, timeline_pull_handler)
})
.put("/v1/tenant/:tenant_id/timeline/:timeline_id/exclude", |r| {
request_span(r, timeline_exclude_handler)
})
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id",
|r| request_span(r, timeline_snapshot_handler),
)
.post(
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/membership",
|r| request_span(r, timeline_membership_handler),
)

View File

@@ -558,11 +558,18 @@ impl Timeline {
});
}
/// Background timeline activities (which hold Timeline::gate) will no
/// longer run once this function completes.
pub async fn shutdown(&self) {
/// Cancel the timeline, requesting background activity to stop. Closing
/// the `self.gate` waits for that.
pub async fn cancel(&self) {
info!("timeline {} shutting down", self.ttid);
self.cancel.cancel();
}
/// Background timeline activities (which hold Timeline::gate) will no
/// longer run once this function completes. `Self::cancel` must have been
/// already called.
pub async fn close(&self) {
assert!(self.cancel.is_cancelled());
// Wait for any concurrent tasks to stop using this timeline, to avoid e.g. attempts
// to read deleted files.
@@ -574,13 +581,13 @@ impl Timeline {
/// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but
/// deletion API endpoint is retriable.
///
/// Timeline must be in shut-down state (i.e. call [`Self::shutdown`] first)
/// Timeline must be in shut-down state (i.e. call [`Self::close`] first)
pub async fn delete(
&self,
shared_state: &mut WriteGuardSharedState<'_>,
only_local: bool,
) -> Result<bool> {
// Assert that [`Self::shutdown`] was already called
// Assert that [`Self::close`] was already called
assert!(self.cancel.is_cancelled());
assert!(self.gate.close_complete());
@@ -1106,7 +1113,7 @@ impl ManagerTimeline {
}
/// Deletes directory and it's contents. Returns false if directory does not exist.
async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
pub async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
match fs::remove_dir_all(path).await {
Ok(_) => Ok(true),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),

View File

@@ -4,16 +4,15 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use anyhow::{Context, Result, bail};
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use safekeeper_api::ServerInfo;
use safekeeper_api::membership::Configuration;
use safekeeper_api::models::SafekeeperUtilization;
use safekeeper_api::{ServerInfo, membership};
use serde::Serialize;
use tokio::fs;
use tracing::*;
@@ -22,9 +21,10 @@ use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
use crate::http::routes::DeleteOrExcludeError;
use crate::rate_limit::RateLimiter;
use crate::state::TimelinePersistentState;
use crate::timeline::{Timeline, TimelineError, get_tenant_dir, get_timeline_dir};
use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir};
use crate::timelines_set::TimelinesSet;
use crate::wal_storage::Storage;
use crate::{SafeKeeperConf, control_file, wal_storage};
@@ -448,23 +448,20 @@ impl GlobalTimelines {
.collect()
}
/// Cancels timeline, then deletes the corresponding data directory.
/// If only_local, doesn't remove WAL segments in remote storage.
pub(crate) async fn delete(
/// Delete timeline, only locally on this node or globally (also cleaning
/// remote storage WAL), depending on `action` value.
pub(crate) async fn delete_or_exclude(
&self,
ttid: &TenantTimelineId,
only_local: bool,
) -> Result<TimelineDeleteForceResult> {
action: DeleteOrExclude,
) -> Result<TimelineDeleteResult, DeleteOrExcludeError> {
let tli_res = {
let state = self.state.lock().unwrap();
if state.tombstones.contains_key(ttid) {
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
info!("Timeline {ttid} was already deleted");
return Ok(TimelineDeleteForceResult {
dir_existed: false,
was_active: false,
});
return Ok(TimelineDeleteResult { dir_existed: false });
}
state.get(ttid)
@@ -472,32 +469,47 @@ impl GlobalTimelines {
let result = match tli_res {
Ok(timeline) => {
let was_active = timeline.broker_active.load(Ordering::Relaxed);
info!("deleting timeline {}, action={:?}", ttid, action);
info!("deleting timeline {}, only_local={}", ttid, only_local);
timeline.shutdown().await;
// If node is getting excluded, check the generation first.
// Then, while holding the lock cancel the timeline; it will be
// unusable after this point, and if node is added back first
// deletion must be completed and node seeded anew.
//
// We would like to avoid holding the lock while waiting for the
// gate to finish as this is deadlock prone, so for actual
// deletion will take it second time.
if let DeleteOrExclude::Exclude(ref mconf) = action {
let shared_state = timeline.read_shared_state().await;
if shared_state.sk.state().mconf.generation > mconf.generation {
return Err(DeleteOrExcludeError::Conflict {
requested: mconf.clone(),
current: shared_state.sk.state().mconf.clone(),
});
}
timeline.cancel().await;
} else {
timeline.cancel().await;
}
timeline.close().await;
info!("timeline {ttid} shut down for deletion");
// Take a lock and finish the deletion holding this mutex.
let mut shared_state = timeline.write_shared_state().await;
let only_local = !matches!(action, DeleteOrExclude::Delete);
let dir_existed = timeline.delete(&mut shared_state, only_local).await?;
Ok(TimelineDeleteForceResult {
dir_existed,
was_active, // TODO: we probably should remove this field
})
Ok(TimelineDeleteResult { dir_existed })
}
Err(_) => {
// Timeline is not memory, but it may still exist on disk in broken state.
let dir_path = get_timeline_dir(self.state.lock().unwrap().conf.as_ref(), ttid);
let dir_existed = delete_dir(dir_path)?;
let dir_existed = delete_dir(&dir_path).await?;
Ok(TimelineDeleteForceResult {
dir_existed,
was_active: false,
})
Ok(TimelineDeleteResult { dir_existed })
}
};
@@ -515,11 +527,11 @@ impl GlobalTimelines {
/// retry tenant deletion again later.
///
/// If only_local, doesn't remove WAL segments in remote storage.
pub async fn delete_force_all_for_tenant(
pub async fn delete_all_for_tenant(
&self,
tenant_id: &TenantId,
only_local: bool,
) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
action: DeleteOrExclude,
) -> Result<HashMap<TenantTimelineId, TimelineDeleteResult>> {
info!("deleting all timelines for tenant {}", tenant_id);
let to_delete = self.get_all_for_tenant(*tenant_id);
@@ -527,7 +539,7 @@ impl GlobalTimelines {
let mut deleted = HashMap::new();
for tli in &to_delete {
match self.delete(&tli.ttid, only_local).await {
match self.delete_or_exclude(&tli.ttid, action.clone()).await {
Ok(result) => {
deleted.insert(tli.ttid, result);
}
@@ -541,17 +553,15 @@ impl GlobalTimelines {
// If there was an error, return it.
if let Some(e) = err {
return Err(e);
return Err(anyhow::Error::from(e));
}
// There may be broken timelines on disk, so delete the whole tenant dir as well.
// Note that we could concurrently create new timelines while we were deleting them,
// so the directory may be not empty. In this case timelines will have bad state
// and timeline background jobs can panic.
delete_dir(get_tenant_dir(
self.state.lock().unwrap().conf.as_ref(),
tenant_id,
))?;
let tenant_dir = get_tenant_dir(self.state.lock().unwrap().conf.as_ref(), tenant_id);
delete_dir(&tenant_dir).await?;
Ok(deleted)
}
@@ -570,18 +580,20 @@ impl GlobalTimelines {
}
#[derive(Clone, Copy, Serialize)]
pub struct TimelineDeleteForceResult {
pub struct TimelineDeleteResult {
pub dir_existed: bool,
pub was_active: bool,
}
/// Deletes directory and it's contents. Returns false if directory does not exist.
fn delete_dir(path: Utf8PathBuf) -> Result<bool> {
match std::fs::remove_dir_all(path) {
Ok(_) => Ok(true),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
Err(e) => Err(e.into()),
}
/// Action for delete_or_exclude.
#[derive(Clone, Debug)]
pub enum DeleteOrExclude {
/// Delete timeline globally.
Delete,
/// Legacy mode until we fully migrate to generations: like exclude deletes
/// timeline only locally, but ignores generation number.
DeleteLocal,
/// This node is getting excluded, delete timeline locally.
Exclude(membership::Configuration),
}
/// Create temp directory for a new timeline. It needs to be located on the same

View File

@@ -273,10 +273,22 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
assert isinstance(res_json, dict)
return res_json
def timeline_exclude(
self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration
) -> dict[str, Any]:
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/exclude",
data=to.to_json(),
)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def membership_switch(
self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration
) -> TimelineMembershipSwitchResponse:
res = self.post(
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/membership",
data=to.to_json(),
)

View File

@@ -1686,7 +1686,7 @@ def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("auth_enabled", [False, True])
def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
def test_delete(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
neon_env_builder.auth_enabled = auth_enabled
env = neon_env_builder.init_start()
@@ -2215,13 +2215,21 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
# These are expected after timeline deletion on safekeepers.
env.pageserver.allowed_errors.extend(
[
".*Timeline .* was not found in global map.*",
".*Timeline .* was cancelled and cannot be used anymore.*",
]
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
sk = env.safekeepers[0]
http_cli = sk.http_client()
sk_id_1 = SafekeeperId(env.safekeepers[0].id, "localhost", sk.port.pg_tenant_only)
sk_id_1 = SafekeeperId(sk.id, "localhost", sk.port.pg_tenant_only)
sk_id_2 = SafekeeperId(11, "localhost", 5434) # just a mock
# Request to switch before timeline creation should fail.
@@ -2249,19 +2257,28 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder):
log.info(f"conf after restart: {after_restart}")
assert after_restart.generation == 4
# Switch into disjoint conf.
non_joint = Configuration(generation=5, members=[sk_id_2], new_members=None)
# Switch into non joint conf of which sk is not a member, must fail.
non_joint_not_member = Configuration(generation=5, members=[sk_id_2], new_members=None)
with pytest.raises(requests.exceptions.HTTPError):
resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint_not_member)
# Switch into good non joint conf.
non_joint = Configuration(generation=6, members=[sk_id_1], new_members=None)
resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint)
log.info(f"non joint switch resp: {resp}")
assert resp.previous_conf.generation == 4
assert resp.current_conf.generation == 5
assert resp.current_conf.generation == 6
# Switch request to lower conf should be ignored.
lower_conf = Configuration(generation=3, members=[], new_members=None)
resp = http_cli.membership_switch(tenant_id, timeline_id, lower_conf)
log.info(f"lower switch resp: {resp}")
assert resp.previous_conf.generation == 5
assert resp.current_conf.generation == 5
# Switch request to lower conf should be rejected.
lower_conf = Configuration(generation=3, members=[sk_id_1], new_members=None)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.membership_switch(tenant_id, timeline_id, lower_conf)
# Now, exclude sk from the membership, timeline should be deleted.
excluded_conf = Configuration(generation=7, members=[sk_id_2], new_members=None)
http_cli.timeline_exclude(tenant_id, timeline_id, excluded_conf)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.timeline_status(tenant_id, timeline_id)
# In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries