mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 07:00:38 +00:00
Safekeeper: add timeline/tenant force delete HTTP endpoings (closes #895)
* There is no auth in Safekeeper HTTP at all currently, so simply calling `check_permission` is not enough. * There are no checks of Safekeeper still working with the data, as "still working" is burry now: a timeline may be "active" while there are no compute nodes and all data is propagated. * Still, callmemaybe is deactivated, and timeline is removed from the internal map. It can easily sneak back in case of race conditions and implicit creations, though.
This commit is contained in:
committed by
Egor Suvorov
parent
07b85e7cfc
commit
bf899a57d9
@@ -3,19 +3,20 @@ use hyper::{Body, Request, Response, StatusCode};
|
||||
|
||||
use serde::Serialize;
|
||||
use serde::Serializer;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::safekeeper::Term;
|
||||
use crate::safekeeper::TermHistory;
|
||||
use crate::timeline::GlobalTimelines;
|
||||
use crate::timeline::{GlobalTimelines, TimelineDeleteForceResult};
|
||||
use crate::SafeKeeperConf;
|
||||
use utils::{
|
||||
http::{
|
||||
endpoint,
|
||||
error::ApiError,
|
||||
json::{json_request, json_response},
|
||||
request::parse_request_param,
|
||||
request::{ensure_no_body, parse_request_param},
|
||||
RequestExt, RouterBuilder,
|
||||
},
|
||||
lsn::Lsn,
|
||||
@@ -130,6 +131,44 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
json_response(StatusCode::CREATED, ())
|
||||
}
|
||||
|
||||
/// Deactivates the timeline and removes its data directory.
|
||||
///
|
||||
/// It does not try to stop any processing of the timeline; there is no such code at the time of writing.
|
||||
/// However, it tries to check whether the timeline was active and report it to caller just in case.
|
||||
/// Note that this information is inaccurate:
|
||||
/// 1. There is a race condition between checking the timeline for activity and actual directory deletion.
|
||||
/// 2. At the time of writing Safekeeper rarely marks a timeline inactive. E.g. disconnecting the compute node does nothing.
|
||||
async fn timeline_delete_force_handler(
|
||||
mut request: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let zttid = ZTenantTimelineId::new(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
ensure_no_body(&mut request).await?;
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
GlobalTimelines::delete_force(get_conf(&request), &zttid).map_err(ApiError::from_err)?,
|
||||
)
|
||||
}
|
||||
|
||||
/// Deactivates all timelines for the tenant and removes its data directory.
|
||||
/// See `timeline_delete_force_handler`.
|
||||
async fn tenant_delete_force_handler(
|
||||
mut request: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id = parse_request_param(&request, "tenant_id")?;
|
||||
ensure_no_body(&mut request).await?;
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
GlobalTimelines::delete_force_all_for_tenant(get_conf(&request), &tenant_id)
|
||||
.map_err(ApiError::from_err)?
|
||||
.iter()
|
||||
.map(|(zttid, resp)| (format!("{}", zttid.timeline_id), *resp))
|
||||
.collect::<HashMap<String, TimelineDeleteForceResult>>(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Used only in tests to hand craft required data.
|
||||
async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let zttid = ZTenantTimelineId::new(
|
||||
@@ -155,6 +194,11 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
timeline_status_handler,
|
||||
)
|
||||
.post("/v1/timeline", timeline_create_handler)
|
||||
.delete(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id",
|
||||
timeline_delete_force_handler,
|
||||
)
|
||||
.delete("/v1/tenant/:tenant_id", tenant_delete_force_handler)
|
||||
// for tests
|
||||
.post(
|
||||
"/v1/record_safekeeper_info/:tenant_id/:timeline_id",
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
use url::Url;
|
||||
|
||||
use utils::zid::{ZNodeId, ZTenantTimelineId};
|
||||
use utils::zid::{ZNodeId, ZTenantId, ZTenantTimelineId};
|
||||
|
||||
pub mod broker;
|
||||
pub mod callmemaybe;
|
||||
@@ -57,9 +57,12 @@ pub struct SafeKeeperConf {
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
pub fn tenant_dir(&self, tenant_id: &ZTenantId) -> PathBuf {
|
||||
self.workdir.join(tenant_id.to_string())
|
||||
}
|
||||
|
||||
pub fn timeline_dir(&self, zttid: &ZTenantTimelineId) -> PathBuf {
|
||||
self.workdir
|
||||
.join(zttid.tenant_id.to_string())
|
||||
self.tenant_dir(&zttid.tenant_id)
|
||||
.join(zttid.timeline_id.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,8 @@ use etcd_broker::SkTimelineInfo;
|
||||
use lazy_static::lazy_static;
|
||||
use postgres_ffi::xlog_utils::XLogSegNo;
|
||||
|
||||
use serde::Serialize;
|
||||
|
||||
use std::cmp::{max, min};
|
||||
use std::collections::HashMap;
|
||||
use std::fs::{self};
|
||||
@@ -19,7 +21,7 @@ use tracing::*;
|
||||
use utils::{
|
||||
lsn::Lsn,
|
||||
pq_proto::ZenithFeedback,
|
||||
zid::{ZNodeId, ZTenantTimelineId},
|
||||
zid::{ZNodeId, ZTenantId, ZTenantTimelineId},
|
||||
};
|
||||
|
||||
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
|
||||
@@ -345,6 +347,20 @@ impl Timeline {
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
/// Deactivates the timeline, assuming it is being deleted.
|
||||
/// Returns whether the timeline was already active.
|
||||
///
|
||||
/// The callmemaybe thread is stopped by the deactivation message. We assume all other threads
|
||||
/// will stop by themselves eventually (possibly with errors, but no panics). There should be no
|
||||
/// compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but
|
||||
/// we're deleting the timeline anyway.
|
||||
pub fn deactivate_for_delete(&self) -> Result<bool> {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
let was_active = shared_state.active;
|
||||
shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?;
|
||||
Ok(was_active)
|
||||
}
|
||||
|
||||
fn is_active(&self) -> bool {
|
||||
let shared_state = self.mutex.lock().unwrap();
|
||||
shared_state.active
|
||||
@@ -515,6 +531,12 @@ lazy_static! {
|
||||
});
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Serialize)]
|
||||
pub struct TimelineDeleteForceResult {
|
||||
pub dir_existed: bool,
|
||||
pub was_active: bool,
|
||||
}
|
||||
|
||||
/// A zero-sized struct used to manage access to the global timelines map.
|
||||
pub struct GlobalTimelines;
|
||||
|
||||
@@ -613,4 +635,78 @@ impl GlobalTimelines {
|
||||
.map(|(zttid, _)| *zttid)
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn delete_force_internal(
|
||||
conf: &SafeKeeperConf,
|
||||
zttid: &ZTenantTimelineId,
|
||||
was_active: bool,
|
||||
) -> Result<TimelineDeleteForceResult> {
|
||||
match std::fs::remove_dir_all(conf.timeline_dir(zttid)) {
|
||||
Ok(_) => Ok(TimelineDeleteForceResult {
|
||||
dir_existed: true,
|
||||
was_active,
|
||||
}),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(TimelineDeleteForceResult {
|
||||
dir_existed: false,
|
||||
was_active,
|
||||
}),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Deactivates and deletes the timeline, see `Timeline::deactivate_for_delete()`, the deletes
|
||||
/// the corresponding data directory.
|
||||
/// We assume all timeline threads do not care about `GlobalTimelines` not containing the timeline
|
||||
/// anymore, and they will eventually terminate without panics.
|
||||
///
|
||||
/// There are multiple ways the timeline may be accidentally "re-created" (so we end up with two
|
||||
/// `Timeline` objects in memory):
|
||||
/// a) a compute node connects after this method is called, or
|
||||
/// b) an HTTP GET request about the timeline is made and it's able to restore the current state, or
|
||||
/// c) an HTTP POST request for timeline creation is made after the timeline is already deleted.
|
||||
/// TODO: ensure all of the above never happens.
|
||||
pub fn delete_force(
|
||||
conf: &SafeKeeperConf,
|
||||
zttid: &ZTenantTimelineId,
|
||||
) -> Result<TimelineDeleteForceResult> {
|
||||
info!("deleting timeline {}", zttid);
|
||||
let was_active = match TIMELINES_STATE.lock().unwrap().timelines.remove(zttid) {
|
||||
None => false,
|
||||
Some(tli) => tli.deactivate_for_delete()?,
|
||||
};
|
||||
GlobalTimelines::delete_force_internal(conf, zttid, was_active)
|
||||
}
|
||||
|
||||
/// Deactivates and deletes all timelines for the tenant, see `delete()`.
|
||||
/// Returns map of all timelines which the tenant had, `true` if a timeline was active.
|
||||
pub fn delete_force_all_for_tenant(
|
||||
conf: &SafeKeeperConf,
|
||||
tenant_id: &ZTenantId,
|
||||
) -> Result<HashMap<ZTenantTimelineId, TimelineDeleteForceResult>> {
|
||||
info!("deleting all timelines for tenant {}", tenant_id);
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
let mut deleted = HashMap::new();
|
||||
for (zttid, tli) in &state.timelines {
|
||||
if zttid.tenant_id == *tenant_id {
|
||||
deleted.insert(
|
||||
*zttid,
|
||||
GlobalTimelines::delete_force_internal(
|
||||
conf,
|
||||
zttid,
|
||||
tli.deactivate_for_delete()?,
|
||||
)?,
|
||||
);
|
||||
}
|
||||
}
|
||||
// TODO: test that the exact subset of timelines is removed.
|
||||
state
|
||||
.timelines
|
||||
.retain(|zttid, _| !deleted.contains_key(zttid));
|
||||
match std::fs::remove_dir_all(conf.tenant_dir(tenant_id)) {
|
||||
Ok(_) => (),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => (),
|
||||
e => e?,
|
||||
};
|
||||
Ok(deleted)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -850,3 +850,116 @@ def test_wal_deleted_after_broadcast(zenith_env_builder: ZenithEnvBuilder):
|
||||
|
||||
# there shouldn't be more than 2 WAL segments (but dir may have archive_status files)
|
||||
assert wal_size_after_checkpoint < 16 * 2.5
|
||||
|
||||
|
||||
def test_delete_force(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
# Create two tenants: one will be deleted, other should be preserved.
|
||||
tenant_id = env.initial_tenant.hex
|
||||
timeline_id_1 = env.zenith_cli.create_branch('br1').hex # Acive, delete explicitly
|
||||
timeline_id_2 = env.zenith_cli.create_branch('br2').hex # Inactive, delete explictly
|
||||
timeline_id_3 = env.zenith_cli.create_branch('br3').hex # Active, delete with the tenant
|
||||
timeline_id_4 = env.zenith_cli.create_branch('br4').hex # Inactive, delete with the tenant
|
||||
|
||||
tenant_id_other = env.zenith_cli.create_tenant().hex
|
||||
timeline_id_other = env.zenith_cli.create_root_branch(
|
||||
'br-other', tenant_id=uuid.UUID(hex=tenant_id_other)).hex
|
||||
|
||||
# Populate branches
|
||||
pg_1 = env.postgres.create_start('br1')
|
||||
pg_2 = env.postgres.create_start('br2')
|
||||
pg_3 = env.postgres.create_start('br3')
|
||||
pg_4 = env.postgres.create_start('br4')
|
||||
pg_other = env.postgres.create_start('br-other', tenant_id=uuid.UUID(hex=tenant_id_other))
|
||||
for pg in [pg_1, pg_2, pg_3, pg_4, pg_other]:
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute('CREATE TABLE t(key int primary key)')
|
||||
sk = env.safekeepers[0]
|
||||
sk_data_dir = Path(sk.data_dir())
|
||||
sk_http = sk.http_client()
|
||||
assert (sk_data_dir / tenant_id / timeline_id_1).is_dir()
|
||||
assert (sk_data_dir / tenant_id / timeline_id_2).is_dir()
|
||||
assert (sk_data_dir / tenant_id / timeline_id_3).is_dir()
|
||||
assert (sk_data_dir / tenant_id / timeline_id_4).is_dir()
|
||||
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
|
||||
|
||||
# Stop branches which should be inactive and restart Safekeeper to drop its in-memory state.
|
||||
pg_2.stop_and_destroy()
|
||||
pg_4.stop_and_destroy()
|
||||
sk.stop()
|
||||
sk.start()
|
||||
|
||||
# Ensure connections to Safekeeper are established
|
||||
for pg in [pg_1, pg_3, pg_other]:
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute('INSERT INTO t (key) VALUES (1)')
|
||||
|
||||
# Remove initial tenant's br1 (active)
|
||||
assert sk_http.timeline_delete_force(tenant_id, timeline_id_1) == {
|
||||
"dir_existed": True,
|
||||
"was_active": True,
|
||||
}
|
||||
assert not (sk_data_dir / tenant_id / timeline_id_1).exists()
|
||||
assert (sk_data_dir / tenant_id / timeline_id_2).is_dir()
|
||||
assert (sk_data_dir / tenant_id / timeline_id_3).is_dir()
|
||||
assert (sk_data_dir / tenant_id / timeline_id_4).is_dir()
|
||||
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
|
||||
|
||||
# Ensure repeated deletion succeeds
|
||||
assert sk_http.timeline_delete_force(tenant_id, timeline_id_1) == {
|
||||
"dir_existed": False, "was_active": False
|
||||
}
|
||||
assert not (sk_data_dir / tenant_id / timeline_id_1).exists()
|
||||
assert (sk_data_dir / tenant_id / timeline_id_2).is_dir()
|
||||
assert (sk_data_dir / tenant_id / timeline_id_3).is_dir()
|
||||
assert (sk_data_dir / tenant_id / timeline_id_4).is_dir()
|
||||
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
|
||||
|
||||
# Remove initial tenant's br2 (inactive)
|
||||
assert sk_http.timeline_delete_force(tenant_id, timeline_id_2) == {
|
||||
"dir_existed": True,
|
||||
"was_active": False,
|
||||
}
|
||||
assert not (sk_data_dir / tenant_id / timeline_id_1).exists()
|
||||
assert not (sk_data_dir / tenant_id / timeline_id_2).exists()
|
||||
assert (sk_data_dir / tenant_id / timeline_id_3).is_dir()
|
||||
assert (sk_data_dir / tenant_id / timeline_id_4).is_dir()
|
||||
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
|
||||
|
||||
# Remove non-existing branch, should succeed
|
||||
assert sk_http.timeline_delete_force(tenant_id, '00' * 16) == {
|
||||
"dir_existed": False,
|
||||
"was_active": False,
|
||||
}
|
||||
assert not (sk_data_dir / tenant_id / timeline_id_1).exists()
|
||||
assert not (sk_data_dir / tenant_id / timeline_id_2).exists()
|
||||
assert (sk_data_dir / tenant_id / timeline_id_3).exists()
|
||||
assert (sk_data_dir / tenant_id / timeline_id_4).is_dir()
|
||||
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
|
||||
|
||||
# Remove initial tenant fully (two branches are active)
|
||||
response = sk_http.tenant_delete_force(tenant_id)
|
||||
assert response == {
|
||||
timeline_id_3: {
|
||||
"dir_existed": True,
|
||||
"was_active": True,
|
||||
}
|
||||
}
|
||||
assert not (sk_data_dir / tenant_id).exists()
|
||||
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
|
||||
|
||||
# Remove initial tenant again.
|
||||
response = sk_http.tenant_delete_force(tenant_id)
|
||||
assert response == {}
|
||||
assert not (sk_data_dir / tenant_id).exists()
|
||||
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
|
||||
|
||||
# Ensure the other tenant still works
|
||||
sk_http.timeline_status(tenant_id_other, timeline_id_other)
|
||||
with closing(pg_other.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute('INSERT INTO t (key) VALUES (123)')
|
||||
|
||||
@@ -1800,6 +1800,21 @@ class SafekeeperHttpClient(requests.Session):
|
||||
json=body)
|
||||
res.raise_for_status()
|
||||
|
||||
def timeline_delete_force(self, tenant_id: str, timeline_id: str) -> Dict[Any, Any]:
|
||||
res = self.delete(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}")
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def tenant_delete_force(self, tenant_id: str) -> Dict[Any, Any]:
|
||||
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def get_metrics(self) -> SafekeeperMetrics:
|
||||
request_result = self.get(f"http://localhost:{self.port}/metrics")
|
||||
request_result.raise_for_status()
|
||||
|
||||
Reference in New Issue
Block a user