diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs
index e731db5617..62fbd2ff2f 100644
--- a/safekeeper/src/http/routes.rs
+++ b/safekeeper/src/http/routes.rs
@@ -3,19 +3,20 @@ use hyper::{Body, Request, Response, StatusCode};
use serde::Serialize;
use serde::Serializer;
+use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;
use crate::safekeeper::Term;
use crate::safekeeper::TermHistory;
-use crate::timeline::GlobalTimelines;
+use crate::timeline::{GlobalTimelines, TimelineDeleteForceResult};
use crate::SafeKeeperConf;
use utils::{
http::{
endpoint,
error::ApiError,
json::{json_request, json_response},
- request::parse_request_param,
+ request::{ensure_no_body, parse_request_param},
RequestExt, RouterBuilder,
},
lsn::Lsn,
@@ -130,6 +131,44 @@ async fn timeline_create_handler(mut request: Request
) -> Result,
+) -> Result, ApiError> {
+ let zttid = ZTenantTimelineId::new(
+ parse_request_param(&request, "tenant_id")?,
+ parse_request_param(&request, "timeline_id")?,
+ );
+ ensure_no_body(&mut request).await?;
+ json_response(
+ StatusCode::OK,
+ GlobalTimelines::delete_force(get_conf(&request), &zttid).map_err(ApiError::from_err)?,
+ )
+}
+
+/// Deactivates all timelines for the tenant and removes its data directory.
+/// See `timeline_delete_force_handler`.
+async fn tenant_delete_force_handler(
+ mut request: Request,
+) -> Result, ApiError> {
+ let tenant_id = parse_request_param(&request, "tenant_id")?;
+ ensure_no_body(&mut request).await?;
+ json_response(
+ StatusCode::OK,
+ GlobalTimelines::delete_force_all_for_tenant(get_conf(&request), &tenant_id)
+ .map_err(ApiError::from_err)?
+ .iter()
+ .map(|(zttid, resp)| (format!("{}", zttid.timeline_id), *resp))
+ .collect::>(),
+ )
+}
+
/// Used only in tests to hand craft required data.
async fn record_safekeeper_info(mut request: Request) -> Result, ApiError> {
let zttid = ZTenantTimelineId::new(
@@ -155,6 +194,11 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder
timeline_status_handler,
)
.post("/v1/timeline", timeline_create_handler)
+ .delete(
+ "/v1/tenant/:tenant_id/timeline/:timeline_id",
+ timeline_delete_force_handler,
+ )
+ .delete("/v1/tenant/:tenant_id", tenant_delete_force_handler)
// for tests
.post(
"/v1/record_safekeeper_info/:tenant_id/:timeline_id",
diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs
index 03236d4e65..09b2e68a49 100644
--- a/safekeeper/src/lib.rs
+++ b/safekeeper/src/lib.rs
@@ -3,7 +3,7 @@ use std::path::PathBuf;
use std::time::Duration;
use url::Url;
-use utils::zid::{ZNodeId, ZTenantTimelineId};
+use utils::zid::{ZNodeId, ZTenantId, ZTenantTimelineId};
pub mod broker;
pub mod callmemaybe;
@@ -57,9 +57,12 @@ pub struct SafeKeeperConf {
}
impl SafeKeeperConf {
+ pub fn tenant_dir(&self, tenant_id: &ZTenantId) -> PathBuf {
+ self.workdir.join(tenant_id.to_string())
+ }
+
pub fn timeline_dir(&self, zttid: &ZTenantTimelineId) -> PathBuf {
- self.workdir
- .join(zttid.tenant_id.to_string())
+ self.tenant_dir(&zttid.tenant_id)
.join(zttid.timeline_id.to_string())
}
}
diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs
index c73d6af4ac..84ad53d72d 100644
--- a/safekeeper/src/timeline.rs
+++ b/safekeeper/src/timeline.rs
@@ -7,6 +7,8 @@ use etcd_broker::SkTimelineInfo;
use lazy_static::lazy_static;
use postgres_ffi::xlog_utils::XLogSegNo;
+use serde::Serialize;
+
use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs::{self};
@@ -19,7 +21,7 @@ use tracing::*;
use utils::{
lsn::Lsn,
pq_proto::ZenithFeedback,
- zid::{ZNodeId, ZTenantTimelineId},
+ zid::{ZNodeId, ZTenantId, ZTenantTimelineId},
};
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
@@ -345,6 +347,20 @@ impl Timeline {
Ok(false)
}
+ /// Deactivates the timeline, assuming it is being deleted.
+ /// Returns whether the timeline was already active.
+ ///
+ /// The callmemaybe thread is stopped by the deactivation message. We assume all other threads
+ /// will stop by themselves eventually (possibly with errors, but no panics). There should be no
+ /// compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but
+ /// we're deleting the timeline anyway.
+ pub fn deactivate_for_delete(&self) -> Result {
+ let mut shared_state = self.mutex.lock().unwrap();
+ let was_active = shared_state.active;
+ shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?;
+ Ok(was_active)
+ }
+
fn is_active(&self) -> bool {
let shared_state = self.mutex.lock().unwrap();
shared_state.active
@@ -515,6 +531,12 @@ lazy_static! {
});
}
+#[derive(Clone, Copy, Serialize)]
+pub struct TimelineDeleteForceResult {
+ pub dir_existed: bool,
+ pub was_active: bool,
+}
+
/// A zero-sized struct used to manage access to the global timelines map.
pub struct GlobalTimelines;
@@ -613,4 +635,78 @@ impl GlobalTimelines {
.map(|(zttid, _)| *zttid)
.collect()
}
+
+ fn delete_force_internal(
+ conf: &SafeKeeperConf,
+ zttid: &ZTenantTimelineId,
+ was_active: bool,
+ ) -> Result {
+ match std::fs::remove_dir_all(conf.timeline_dir(zttid)) {
+ Ok(_) => Ok(TimelineDeleteForceResult {
+ dir_existed: true,
+ was_active,
+ }),
+ Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(TimelineDeleteForceResult {
+ dir_existed: false,
+ was_active,
+ }),
+ Err(e) => Err(e.into()),
+ }
+ }
+
+ /// Deactivates and deletes the timeline, see `Timeline::deactivate_for_delete()`, the deletes
+ /// the corresponding data directory.
+ /// We assume all timeline threads do not care about `GlobalTimelines` not containing the timeline
+ /// anymore, and they will eventually terminate without panics.
+ ///
+ /// There are multiple ways the timeline may be accidentally "re-created" (so we end up with two
+ /// `Timeline` objects in memory):
+ /// a) a compute node connects after this method is called, or
+ /// b) an HTTP GET request about the timeline is made and it's able to restore the current state, or
+ /// c) an HTTP POST request for timeline creation is made after the timeline is already deleted.
+ /// TODO: ensure all of the above never happens.
+ pub fn delete_force(
+ conf: &SafeKeeperConf,
+ zttid: &ZTenantTimelineId,
+ ) -> Result {
+ info!("deleting timeline {}", zttid);
+ let was_active = match TIMELINES_STATE.lock().unwrap().timelines.remove(zttid) {
+ None => false,
+ Some(tli) => tli.deactivate_for_delete()?,
+ };
+ GlobalTimelines::delete_force_internal(conf, zttid, was_active)
+ }
+
+ /// Deactivates and deletes all timelines for the tenant, see `delete()`.
+ /// Returns map of all timelines which the tenant had, `true` if a timeline was active.
+ pub fn delete_force_all_for_tenant(
+ conf: &SafeKeeperConf,
+ tenant_id: &ZTenantId,
+ ) -> Result> {
+ info!("deleting all timelines for tenant {}", tenant_id);
+ let mut state = TIMELINES_STATE.lock().unwrap();
+ let mut deleted = HashMap::new();
+ for (zttid, tli) in &state.timelines {
+ if zttid.tenant_id == *tenant_id {
+ deleted.insert(
+ *zttid,
+ GlobalTimelines::delete_force_internal(
+ conf,
+ zttid,
+ tli.deactivate_for_delete()?,
+ )?,
+ );
+ }
+ }
+ // TODO: test that the exact subset of timelines is removed.
+ state
+ .timelines
+ .retain(|zttid, _| !deleted.contains_key(zttid));
+ match std::fs::remove_dir_all(conf.tenant_dir(tenant_id)) {
+ Ok(_) => (),
+ Err(e) if e.kind() == std::io::ErrorKind::NotFound => (),
+ e => e?,
+ };
+ Ok(deleted)
+ }
}
diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py
index 702c27a79b..e297f91f2c 100644
--- a/test_runner/batch_others/test_wal_acceptor.py
+++ b/test_runner/batch_others/test_wal_acceptor.py
@@ -850,3 +850,116 @@ def test_wal_deleted_after_broadcast(zenith_env_builder: ZenithEnvBuilder):
# there shouldn't be more than 2 WAL segments (but dir may have archive_status files)
assert wal_size_after_checkpoint < 16 * 2.5
+
+
+def test_delete_force(zenith_env_builder: ZenithEnvBuilder):
+ zenith_env_builder.num_safekeepers = 1
+ env = zenith_env_builder.init_start()
+
+ # Create two tenants: one will be deleted, other should be preserved.
+ tenant_id = env.initial_tenant.hex
+ timeline_id_1 = env.zenith_cli.create_branch('br1').hex # Acive, delete explicitly
+ timeline_id_2 = env.zenith_cli.create_branch('br2').hex # Inactive, delete explictly
+ timeline_id_3 = env.zenith_cli.create_branch('br3').hex # Active, delete with the tenant
+ timeline_id_4 = env.zenith_cli.create_branch('br4').hex # Inactive, delete with the tenant
+
+ tenant_id_other = env.zenith_cli.create_tenant().hex
+ timeline_id_other = env.zenith_cli.create_root_branch(
+ 'br-other', tenant_id=uuid.UUID(hex=tenant_id_other)).hex
+
+ # Populate branches
+ pg_1 = env.postgres.create_start('br1')
+ pg_2 = env.postgres.create_start('br2')
+ pg_3 = env.postgres.create_start('br3')
+ pg_4 = env.postgres.create_start('br4')
+ pg_other = env.postgres.create_start('br-other', tenant_id=uuid.UUID(hex=tenant_id_other))
+ for pg in [pg_1, pg_2, pg_3, pg_4, pg_other]:
+ with closing(pg.connect()) as conn:
+ with conn.cursor() as cur:
+ cur.execute('CREATE TABLE t(key int primary key)')
+ sk = env.safekeepers[0]
+ sk_data_dir = Path(sk.data_dir())
+ sk_http = sk.http_client()
+ assert (sk_data_dir / tenant_id / timeline_id_1).is_dir()
+ assert (sk_data_dir / tenant_id / timeline_id_2).is_dir()
+ assert (sk_data_dir / tenant_id / timeline_id_3).is_dir()
+ assert (sk_data_dir / tenant_id / timeline_id_4).is_dir()
+ assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
+
+ # Stop branches which should be inactive and restart Safekeeper to drop its in-memory state.
+ pg_2.stop_and_destroy()
+ pg_4.stop_and_destroy()
+ sk.stop()
+ sk.start()
+
+ # Ensure connections to Safekeeper are established
+ for pg in [pg_1, pg_3, pg_other]:
+ with closing(pg.connect()) as conn:
+ with conn.cursor() as cur:
+ cur.execute('INSERT INTO t (key) VALUES (1)')
+
+ # Remove initial tenant's br1 (active)
+ assert sk_http.timeline_delete_force(tenant_id, timeline_id_1) == {
+ "dir_existed": True,
+ "was_active": True,
+ }
+ assert not (sk_data_dir / tenant_id / timeline_id_1).exists()
+ assert (sk_data_dir / tenant_id / timeline_id_2).is_dir()
+ assert (sk_data_dir / tenant_id / timeline_id_3).is_dir()
+ assert (sk_data_dir / tenant_id / timeline_id_4).is_dir()
+ assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
+
+ # Ensure repeated deletion succeeds
+ assert sk_http.timeline_delete_force(tenant_id, timeline_id_1) == {
+ "dir_existed": False, "was_active": False
+ }
+ assert not (sk_data_dir / tenant_id / timeline_id_1).exists()
+ assert (sk_data_dir / tenant_id / timeline_id_2).is_dir()
+ assert (sk_data_dir / tenant_id / timeline_id_3).is_dir()
+ assert (sk_data_dir / tenant_id / timeline_id_4).is_dir()
+ assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
+
+ # Remove initial tenant's br2 (inactive)
+ assert sk_http.timeline_delete_force(tenant_id, timeline_id_2) == {
+ "dir_existed": True,
+ "was_active": False,
+ }
+ assert not (sk_data_dir / tenant_id / timeline_id_1).exists()
+ assert not (sk_data_dir / tenant_id / timeline_id_2).exists()
+ assert (sk_data_dir / tenant_id / timeline_id_3).is_dir()
+ assert (sk_data_dir / tenant_id / timeline_id_4).is_dir()
+ assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
+
+ # Remove non-existing branch, should succeed
+ assert sk_http.timeline_delete_force(tenant_id, '00' * 16) == {
+ "dir_existed": False,
+ "was_active": False,
+ }
+ assert not (sk_data_dir / tenant_id / timeline_id_1).exists()
+ assert not (sk_data_dir / tenant_id / timeline_id_2).exists()
+ assert (sk_data_dir / tenant_id / timeline_id_3).exists()
+ assert (sk_data_dir / tenant_id / timeline_id_4).is_dir()
+ assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
+
+ # Remove initial tenant fully (two branches are active)
+ response = sk_http.tenant_delete_force(tenant_id)
+ assert response == {
+ timeline_id_3: {
+ "dir_existed": True,
+ "was_active": True,
+ }
+ }
+ assert not (sk_data_dir / tenant_id).exists()
+ assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
+
+ # Remove initial tenant again.
+ response = sk_http.tenant_delete_force(tenant_id)
+ assert response == {}
+ assert not (sk_data_dir / tenant_id).exists()
+ assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
+
+ # Ensure the other tenant still works
+ sk_http.timeline_status(tenant_id_other, timeline_id_other)
+ with closing(pg_other.connect()) as conn:
+ with conn.cursor() as cur:
+ cur.execute('INSERT INTO t (key) VALUES (123)')
diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py
index fe20f1abbf..357db4c16d 100644
--- a/test_runner/fixtures/zenith_fixtures.py
+++ b/test_runner/fixtures/zenith_fixtures.py
@@ -1800,6 +1800,21 @@ class SafekeeperHttpClient(requests.Session):
json=body)
res.raise_for_status()
+ def timeline_delete_force(self, tenant_id: str, timeline_id: str) -> Dict[Any, Any]:
+ res = self.delete(
+ f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}")
+ res.raise_for_status()
+ res_json = res.json()
+ assert isinstance(res_json, dict)
+ return res_json
+
+ def tenant_delete_force(self, tenant_id: str) -> Dict[Any, Any]:
+ res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
+ res.raise_for_status()
+ res_json = res.json()
+ assert isinstance(res_json, dict)
+ return res_json
+
def get_metrics(self) -> SafekeeperMetrics:
request_result = self.get(f"http://localhost:{self.port}/metrics")
request_result.raise_for_status()