) -> Result
}
/// Deactivates the timeline and removes its data directory.
-async fn timeline_delete_force_handler(
- mut request: Request,
-) -> Result, ApiError> {
+async fn timeline_delete_handler(mut request: Request) -> Result, ApiError> {
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
+ let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
check_permission(&request, Some(ttid.tenant_id))?;
ensure_no_body(&mut request).await?;
// FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
// error handling here when we're able to.
- let resp = GlobalTimelines::delete_force(&ttid)
+ let resp = GlobalTimelines::delete(&ttid, only_local)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, resp)
}
/// Deactivates all timelines for the tenant and removes its data directory.
-/// See `timeline_delete_force_handler`.
-async fn tenant_delete_force_handler(
- mut request: Request,
-) -> Result, ApiError> {
+/// See `timeline_delete_handler`.
+async fn tenant_delete_handler(mut request: Request) -> Result, ApiError> {
let tenant_id = parse_request_param(&request, "tenant_id")?;
+ let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
check_permission(&request, Some(tenant_id))?;
ensure_no_body(&mut request).await?;
// FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
// Using an `InternalServerError` should be fixed when the types support it
- let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id)
+ let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id, only_local)
.await
.map_err(ApiError::InternalServerError)?;
json_response(
@@ -512,10 +510,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder
request_span(r, timeline_status_handler)
})
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
- request_span(r, timeline_delete_force_handler)
+ request_span(r, timeline_delete_handler)
})
.delete("/v1/tenant/:tenant_id", |r| {
- request_span(r, tenant_delete_force_handler)
+ request_span(r, tenant_delete_handler)
})
.post("/v1/pull_timeline", |r| {
request_span(r, timeline_pull_handler)
diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs
index 6618df5efa..f18a1ec22d 100644
--- a/safekeeper/src/lib.rs
+++ b/safekeeper/src/lib.rs
@@ -88,6 +88,10 @@ impl SafeKeeperConf {
self.tenant_dir(&ttid.tenant_id)
.join(ttid.timeline_id.to_string())
}
+
+ pub fn is_wal_backup_enabled(&self) -> bool {
+ self.remote_storage.is_some() && self.wal_backup_enabled
+ }
}
impl SafeKeeperConf {
diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs
index 879b805796..ee3e4c8ead 100644
--- a/safekeeper/src/send_wal.rs
+++ b/safekeeper/src/send_wal.rs
@@ -407,7 +407,7 @@ impl SafekeeperPostgresHandler {
self.conf.timeline_dir(&tli.ttid),
&persisted_state,
start_pos,
- self.conf.wal_backup_enabled,
+ self.conf.is_wal_backup_enabled(),
)?;
// Split to concurrently receive and send data; replies are generally
diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs
index 1a8df92828..ec7dd7d89b 100644
--- a/safekeeper/src/timeline.rs
+++ b/safekeeper/src/timeline.rs
@@ -33,12 +33,13 @@ use crate::safekeeper::{
};
use crate::send_wal::WalSenders;
use crate::state::{TimelineMemState, TimelinePersistentState};
+use crate::wal_backup::{self};
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
use crate::metrics::FullTimelineInfo;
use crate::wal_storage::Storage as wal_storage_iface;
-use crate::SafeKeeperConf;
use crate::{debug_dump, wal_storage};
+use crate::{GlobalTimelines, SafeKeeperConf};
/// Things safekeeper should know about timeline state on peers.
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -471,14 +472,29 @@ impl Timeline {
}
}
- /// Delete timeline from disk completely, by removing timeline directory. Background
- /// timeline activities will stop eventually.
- pub async fn delete_from_disk(
+ /// Delete timeline from disk completely, by removing timeline directory.
+ /// Background timeline activities will stop eventually.
+ ///
+ /// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but
+ /// deletion API endpoint is retriable.
+ pub async fn delete(
&self,
shared_state: &mut MutexGuard<'_, SharedState>,
+ only_local: bool,
) -> Result<(bool, bool)> {
let was_active = shared_state.active;
self.cancel(shared_state);
+
+ // TODO: It's better to wait for s3 offloader termination before
+ // removing data from s3. Though since s3 doesn't have transactions it
+ // still wouldn't guarantee absense of data after removal.
+ let conf = GlobalTimelines::get_global_config();
+ if !only_local && conf.is_wal_backup_enabled() {
+ // Note: we concurrently delete remote storage data from multiple
+ // safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
+ // do some retries anyway.
+ wal_backup::delete_timeline(&self.ttid).await?;
+ }
let dir_existed = delete_dir(&self.timeline_dir).await?;
Ok((dir_existed, was_active))
}
diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs
index 92ac5ba66d..079e706ff8 100644
--- a/safekeeper/src/timelines_global_map.rs
+++ b/safekeeper/src/timelines_global_map.rs
@@ -327,16 +327,20 @@ impl GlobalTimelines {
}
/// Cancels timeline, then deletes the corresponding data directory.
- pub async fn delete_force(ttid: &TenantTimelineId) -> Result {
+ /// If only_local, doesn't remove WAL segments in remote storage.
+ pub async fn delete(
+ ttid: &TenantTimelineId,
+ only_local: bool,
+ ) -> Result {
let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
match tli_res {
Ok(timeline) => {
// Take a lock and finish the deletion holding this mutex.
let mut shared_state = timeline.write_shared_state().await;
- info!("deleting timeline {}", ttid);
+ info!("deleting timeline {}, only_local={}", ttid, only_local);
let (dir_existed, was_active) =
- timeline.delete_from_disk(&mut shared_state).await?;
+ timeline.delete(&mut shared_state, only_local).await?;
// Remove timeline from the map.
// FIXME: re-enable it once we fix the issue with recreation of deleted timelines
@@ -369,8 +373,11 @@ impl GlobalTimelines {
/// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
/// created simultaneously. In that case the function will return error and the caller should
/// retry tenant deletion again later.
+ ///
+ /// If only_local, doesn't remove WAL segments in remote storage.
pub async fn delete_force_all_for_tenant(
tenant_id: &TenantId,
+ only_local: bool,
) -> Result> {
info!("deleting all timelines for tenant {}", tenant_id);
let to_delete = Self::get_all_for_tenant(*tenant_id);
@@ -379,7 +386,7 @@ impl GlobalTimelines {
let mut deleted = HashMap::new();
for tli in &to_delete {
- match Self::delete_force(&tli.ttid).await {
+ match Self::delete(&tli.ttid, only_local).await {
Ok(result) => {
deleted.insert(tli.ttid, result);
}
diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs
index e4499eaf50..c47381351d 100644
--- a/safekeeper/src/wal_backup.rs
+++ b/safekeeper/src/wal_backup.rs
@@ -4,6 +4,8 @@ use camino::{Utf8Path, Utf8PathBuf};
use futures::stream::FuturesOrdered;
use futures::StreamExt;
use tokio::task::JoinHandle;
+use tokio_util::sync::CancellationToken;
+use utils::backoff;
use utils::id::NodeId;
use std::cmp::min;
@@ -166,6 +168,17 @@ async fn update_task(
}
}
+static REMOTE_STORAGE: OnceCell