From 88df05753132bb93034d990298807713b5c88be5 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Wed, 17 Jan 2024 23:06:44 +0300 Subject: [PATCH] Delete WAL segments from s3 when timeline is deleted. In the most straightforward way; safekeeper performs it in DELETE endpoint implementation, with no coordination between sks. delete_force endpoint in the code is renamed to delete as there is only one way to delete. --- safekeeper/src/http/routes.rs | 20 +++---- safekeeper/src/lib.rs | 4 ++ safekeeper/src/send_wal.rs | 2 +- safekeeper/src/timeline.rs | 24 ++++++-- safekeeper/src/timelines_global_map.rs | 15 +++-- safekeeper/src/wal_backup.rs | 56 ++++++++++++++++--- test_runner/fixtures/neon_fixtures.py | 10 +++- test_runner/fixtures/pageserver/utils.py | 26 ++++----- .../regress/test_pageserver_generations.py | 15 ++++- .../regress/test_pageserver_secondary.py | 2 +- test_runner/regress/test_tenant_delete.py | 18 +++--- test_runner/regress/test_timeline_delete.py | 22 ++++---- test_runner/regress/test_wal_acceptor.py | 48 ++++++++++++---- 13 files changed, 184 insertions(+), 78 deletions(-) diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index ec715f6d2e..919b6b2982 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -288,34 +288,32 @@ async fn timeline_files_handler(request: Request) -> Result } /// Deactivates the timeline and removes its data directory. -async fn timeline_delete_force_handler( - mut request: Request, -) -> Result, ApiError> { +async fn timeline_delete_handler(mut request: Request) -> Result, ApiError> { let ttid = TenantTimelineId::new( parse_request_param(&request, "tenant_id")?, parse_request_param(&request, "timeline_id")?, ); + let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false); check_permission(&request, Some(ttid.tenant_id))?; ensure_no_body(&mut request).await?; // FIXME: `delete_force` can fail from both internal errors and bad requests. Add better // error handling here when we're able to. - let resp = GlobalTimelines::delete_force(&ttid) + let resp = GlobalTimelines::delete(&ttid, only_local) .await .map_err(ApiError::InternalServerError)?; json_response(StatusCode::OK, resp) } /// Deactivates all timelines for the tenant and removes its data directory. -/// See `timeline_delete_force_handler`. -async fn tenant_delete_force_handler( - mut request: Request, -) -> Result, ApiError> { +/// See `timeline_delete_handler`. +async fn tenant_delete_handler(mut request: Request) -> Result, ApiError> { let tenant_id = parse_request_param(&request, "tenant_id")?; + let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false); check_permission(&request, Some(tenant_id))?; ensure_no_body(&mut request).await?; // FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons; // Using an `InternalServerError` should be fixed when the types support it - let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id) + let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id, only_local) .await .map_err(ApiError::InternalServerError)?; json_response( @@ -512,10 +510,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder request_span(r, timeline_status_handler) }) .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| { - request_span(r, timeline_delete_force_handler) + request_span(r, timeline_delete_handler) }) .delete("/v1/tenant/:tenant_id", |r| { - request_span(r, tenant_delete_force_handler) + request_span(r, tenant_delete_handler) }) .post("/v1/pull_timeline", |r| { request_span(r, timeline_pull_handler) diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 6618df5efa..f18a1ec22d 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -88,6 +88,10 @@ impl SafeKeeperConf { self.tenant_dir(&ttid.tenant_id) .join(ttid.timeline_id.to_string()) } + + pub fn is_wal_backup_enabled(&self) -> bool { + self.remote_storage.is_some() && self.wal_backup_enabled + } } impl SafeKeeperConf { diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 879b805796..ee3e4c8ead 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -407,7 +407,7 @@ impl SafekeeperPostgresHandler { self.conf.timeline_dir(&tli.ttid), &persisted_state, start_pos, - self.conf.wal_backup_enabled, + self.conf.is_wal_backup_enabled(), )?; // Split to concurrently receive and send data; replies are generally diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 1a8df92828..ec7dd7d89b 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -33,12 +33,13 @@ use crate::safekeeper::{ }; use crate::send_wal::WalSenders; use crate::state::{TimelineMemState, TimelinePersistentState}; +use crate::wal_backup::{self}; use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION}; use crate::metrics::FullTimelineInfo; use crate::wal_storage::Storage as wal_storage_iface; -use crate::SafeKeeperConf; use crate::{debug_dump, wal_storage}; +use crate::{GlobalTimelines, SafeKeeperConf}; /// Things safekeeper should know about timeline state on peers. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -471,14 +472,29 @@ impl Timeline { } } - /// Delete timeline from disk completely, by removing timeline directory. Background - /// timeline activities will stop eventually. - pub async fn delete_from_disk( + /// Delete timeline from disk completely, by removing timeline directory. + /// Background timeline activities will stop eventually. + /// + /// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but + /// deletion API endpoint is retriable. + pub async fn delete( &self, shared_state: &mut MutexGuard<'_, SharedState>, + only_local: bool, ) -> Result<(bool, bool)> { let was_active = shared_state.active; self.cancel(shared_state); + + // TODO: It's better to wait for s3 offloader termination before + // removing data from s3. Though since s3 doesn't have transactions it + // still wouldn't guarantee absense of data after removal. + let conf = GlobalTimelines::get_global_config(); + if !only_local && conf.is_wal_backup_enabled() { + // Note: we concurrently delete remote storage data from multiple + // safekeepers. That's ok, s3 replies 200 if object doesn't exist and we + // do some retries anyway. + wal_backup::delete_timeline(&self.ttid).await?; + } let dir_existed = delete_dir(&self.timeline_dir).await?; Ok((dir_existed, was_active)) } diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 92ac5ba66d..079e706ff8 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -327,16 +327,20 @@ impl GlobalTimelines { } /// Cancels timeline, then deletes the corresponding data directory. - pub async fn delete_force(ttid: &TenantTimelineId) -> Result { + /// If only_local, doesn't remove WAL segments in remote storage. + pub async fn delete( + ttid: &TenantTimelineId, + only_local: bool, + ) -> Result { let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid); match tli_res { Ok(timeline) => { // Take a lock and finish the deletion holding this mutex. let mut shared_state = timeline.write_shared_state().await; - info!("deleting timeline {}", ttid); + info!("deleting timeline {}, only_local={}", ttid, only_local); let (dir_existed, was_active) = - timeline.delete_from_disk(&mut shared_state).await?; + timeline.delete(&mut shared_state, only_local).await?; // Remove timeline from the map. // FIXME: re-enable it once we fix the issue with recreation of deleted timelines @@ -369,8 +373,11 @@ impl GlobalTimelines { /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are /// created simultaneously. In that case the function will return error and the caller should /// retry tenant deletion again later. + /// + /// If only_local, doesn't remove WAL segments in remote storage. pub async fn delete_force_all_for_tenant( tenant_id: &TenantId, + only_local: bool, ) -> Result> { info!("deleting all timelines for tenant {}", tenant_id); let to_delete = Self::get_all_for_tenant(*tenant_id); @@ -379,7 +386,7 @@ impl GlobalTimelines { let mut deleted = HashMap::new(); for tli in &to_delete { - match Self::delete_force(&tli.ttid).await { + match Self::delete(&tli.ttid, only_local).await { Ok(result) => { deleted.insert(tli.ttid, result); } diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index e4499eaf50..c47381351d 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -4,6 +4,8 @@ use camino::{Utf8Path, Utf8PathBuf}; use futures::stream::FuturesOrdered; use futures::StreamExt; use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use utils::backoff; use utils::id::NodeId; use std::cmp::min; @@ -166,6 +168,17 @@ async fn update_task( } } +static REMOTE_STORAGE: OnceCell> = OnceCell::new(); + +// Storage must be configured and initialized when this is called. +fn get_configured_remote_storage() -> &'static GenericRemoteStorage { + REMOTE_STORAGE + .get() + .expect("failed to get remote storage") + .as_ref() + .unwrap() +} + const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000; /// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup @@ -199,7 +212,7 @@ pub async fn wal_backup_launcher_task_main( ttid = wal_backup_launcher_rx.recv() => { // channel is never expected to get closed let ttid = ttid.unwrap(); - if conf.remote_storage.is_none() || !conf.wal_backup_enabled { + if !conf.is_wal_backup_enabled() { continue; /* just drain the channel and do nothing */ } async { @@ -484,18 +497,12 @@ fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec { res } -static REMOTE_STORAGE: OnceCell> = OnceCell::new(); - async fn backup_object( source_file: &Utf8Path, target_file: &RemotePath, size: usize, ) -> Result<()> { - let storage = REMOTE_STORAGE - .get() - .expect("failed to get remote storage") - .as_ref() - .unwrap(); + let storage = get_configured_remote_storage(); let file = File::open(&source_file) .await @@ -532,6 +539,39 @@ pub async fn read_object( Ok(Box::pin(reader)) } +/// Delete WAL files for the given timeline. Remote storage must be configured +/// when called. +pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> { + let storage = get_configured_remote_storage(); + let ttid_path = Utf8Path::new(&ttid.tenant_id.to_string()).join(ttid.timeline_id.to_string()); + let remote_path = RemotePath::new(&ttid_path)?; + + // A backoff::retry is used here for two reasons: + // - To provide a backoff rather than busy-polling the API on errors + // - To absorb transient 429/503 conditions without hitting our error + // logging path for issues deleting objects. + // + // Note: listing segments might take a long time if there are many of them. + // We don't currently have http requests timeout cancellation, but if/once + // we have listing should get streaming interface to make progress. + let token = CancellationToken::new(); // not really used + backoff::retry( + || async { + let files = storage.list_files(Some(&remote_path)).await?; + storage.delete_objects(&files).await?; + Ok(()) + }, + |_| false, + 3, + 10, + "executing WAL segments deletion batch", + backoff::Cancel::new(token, || anyhow::anyhow!("canceled")), + ) + .await?; + + Ok(()) +} + /// Copy segments from one timeline to another. Used in copy_timeline. pub async fn copy_s3_segments( wal_seg_size: usize, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index ffd93004d2..d98aedf4d0 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3352,9 +3352,15 @@ class SafekeeperHttpClient(requests.Session): ) res.raise_for_status() - def timeline_delete_force(self, tenant_id: TenantId, timeline_id: TimelineId) -> Dict[Any, Any]: + # only_local doesn't remove segments in the remote storage. + def timeline_delete( + self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False + ) -> Dict[Any, Any]: res = self.delete( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}" + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}", + params={ + "only_local": str(only_local).lower(), + }, ) res.raise_for_status() res_json = res.json() diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index d0bb566408..a6c4b8e930 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -1,11 +1,11 @@ import time -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Union from mypy_boto3_s3.type_defs import ListObjectsV2OutputTypeDef, ObjectTypeDef from fixtures.log_helper import log from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient -from fixtures.remote_storage import RemoteStorageKind, S3Storage +from fixtures.remote_storage import RemoteStorage, RemoteStorageKind, S3Storage from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId from fixtures.utils import wait_until @@ -233,23 +233,18 @@ def timeline_delete_wait_completed( wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id, iterations, interval) -if TYPE_CHECKING: - # TODO avoid by combining remote storage related stuff in single type - # and just passing in this type instead of whole builder - from fixtures.neon_fixtures import NeonEnvBuilder - - +# remote_storage must not be None, but that's easier for callers to make mypy happy def assert_prefix_empty( - neon_env_builder: "NeonEnvBuilder", + remote_storage: Optional[RemoteStorage], prefix: Optional[str] = None, allowed_postfix: Optional[str] = None, ): - response = list_prefix(neon_env_builder, prefix) + assert remote_storage is not None + response = list_prefix(remote_storage, prefix) keys = response["KeyCount"] objects: List[ObjectTypeDef] = response.get("Contents", []) common_prefixes = response.get("CommonPrefixes", []) - remote_storage = neon_env_builder.pageserver_remote_storage is_mock_s3 = isinstance(remote_storage, S3Storage) and not remote_storage.cleanup if is_mock_s3: @@ -283,19 +278,20 @@ def assert_prefix_empty( ), f"remote dir with prefix {prefix} is not empty after deletion: {objects}" -def assert_prefix_not_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None): - response = list_prefix(neon_env_builder, prefix) +# remote_storage must not be None, but that's easier for callers to make mypy happy +def assert_prefix_not_empty(remote_storage: Optional[RemoteStorage], prefix: Optional[str] = None): + assert remote_storage is not None + response = list_prefix(remote_storage, prefix) assert response["KeyCount"] != 0, f"remote dir with prefix {prefix} is empty: {response}" def list_prefix( - neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None, delimiter: str = "/" + remote: RemoteStorage, prefix: Optional[str] = None, delimiter: str = "/" ) -> ListObjectsV2OutputTypeDef: """ Note that this function takes into account prefix_in_bucket. """ # For local_fs we need to properly handle empty directories, which we currently dont, so for simplicity stick to s3 api. - remote = neon_env_builder.pageserver_remote_storage assert isinstance(remote, S3Storage), "localfs is currently not supported" assert remote.client is not None diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index dd55d737ac..63f6130af5 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -216,8 +216,14 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder): log.info(f"group: {m.group(1)}") return int(m.group(1), 16) + assert neon_env_builder.pageserver_remote_storage is not None pre_upgrade_keys = list( - [o["Key"] for o in list_prefix(neon_env_builder, delimiter="")["Contents"]] + [ + o["Key"] + for o in list_prefix(neon_env_builder.pageserver_remote_storage, delimiter="")[ + "Contents" + ] + ] ) for key in pre_upgrade_keys: assert parse_generation_suffix(key) is None @@ -232,7 +238,12 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder): legacy_objects: list[str] = [] suffixed_objects = [] post_upgrade_keys = list( - [o["Key"] for o in list_prefix(neon_env_builder, delimiter="")["Contents"]] + [ + o["Key"] + for o in list_prefix(neon_env_builder.pageserver_remote_storage, delimiter="")[ + "Contents" + ] + ] ) for key in post_upgrade_keys: log.info(f"post-upgrade key: {key}") diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index a9eff99a0c..521b96779a 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -504,7 +504,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder): tenant_delete_wait_completed(ps_attached.http_client(), tenant_id, 10) assert_prefix_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", diff --git a/test_runner/regress/test_tenant_delete.py b/test_runner/regress/test_tenant_delete.py index 7c52fb3071..7a5b1c0fc2 100644 --- a/test_runner/regress/test_tenant_delete.py +++ b/test_runner/regress/test_tenant_delete.py @@ -75,7 +75,7 @@ def test_tenant_delete_smoke( wait_for_last_flush_lsn(env, endpoint, tenant=tenant_id, timeline=timeline_id) assert_prefix_not_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", @@ -96,7 +96,7 @@ def test_tenant_delete_smoke( assert not tenant_path.exists() assert_prefix_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", @@ -207,7 +207,7 @@ def test_delete_tenant_exercise_crash_safety_failpoints( last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id) assert_prefix_not_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", @@ -268,7 +268,7 @@ def test_delete_tenant_exercise_crash_safety_failpoints( # Check remote is empty assert_prefix_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", @@ -304,7 +304,7 @@ def test_tenant_delete_is_resumed_on_attach( # sanity check, data should be there assert_prefix_not_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", @@ -343,7 +343,7 @@ def test_tenant_delete_is_resumed_on_attach( ) assert_prefix_not_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", @@ -378,7 +378,7 @@ def test_tenant_delete_is_resumed_on_attach( ps_http.deletion_queue_flush(execute=True) assert_prefix_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", @@ -543,7 +543,7 @@ def test_tenant_delete_concurrent( # Physical deletion should have happened assert_prefix_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", @@ -645,7 +645,7 @@ def test_tenant_delete_races_timeline_creation( # Physical deletion should have happened assert_prefix_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index 82ffcb1177..352b82d525 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -191,7 +191,7 @@ def test_delete_timeline_exercise_crash_safety_failpoints( last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id) assert_prefix_not_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", @@ -275,7 +275,7 @@ def test_delete_timeline_exercise_crash_safety_failpoints( # Check remote is empty if remote_storage_kind is RemoteStorageKind.MOCK_S3: assert_prefix_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", @@ -449,7 +449,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild assert all([tl["state"] == "Active" for tl in timelines]) assert_prefix_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", @@ -466,7 +466,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild ) assert_prefix_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", @@ -482,7 +482,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild wait_until( 2, 0.5, - lambda: assert_prefix_empty(neon_env_builder), + lambda: assert_prefix_empty(neon_env_builder.pageserver_remote_storage), ) @@ -673,7 +673,7 @@ def test_timeline_delete_works_for_remote_smoke( for timeline_id in timeline_ids: assert_prefix_not_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", @@ -690,7 +690,7 @@ def test_timeline_delete_works_for_remote_smoke( timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=timeline_id) assert_prefix_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", @@ -703,7 +703,7 @@ def test_timeline_delete_works_for_remote_smoke( # for some reason the check above doesnt immediately take effect for the below. # Assume it is mock server inconsistency and check twice. - wait_until(2, 0.5, lambda: assert_prefix_empty(neon_env_builder)) + wait_until(2, 0.5, lambda: assert_prefix_empty(neon_env_builder.pageserver_remote_storage)) def test_delete_orphaned_objects( @@ -791,7 +791,7 @@ def test_timeline_delete_resumed_on_attach( last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id) assert_prefix_not_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", @@ -839,7 +839,7 @@ def test_timeline_delete_resumed_on_attach( assert reason.endswith(f"failpoint: {failpoint}"), reason assert_prefix_not_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", @@ -870,7 +870,7 @@ def test_timeline_delete_resumed_on_attach( assert not tenant_path.exists() assert_prefix_empty( - neon_env_builder, + neon_env_builder.pageserver_remote_storage, prefix="/".join( ( "tenants", diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index b4ce633531..2f8e69165e 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -33,13 +33,19 @@ from fixtures.neon_fixtures import ( last_flush_lsn_upload, ) from fixtures.pageserver.utils import ( + assert_prefix_empty, + assert_prefix_not_empty, timeline_delete_wait_completed, wait_for_last_record_lsn, wait_for_upload, ) from fixtures.pg_version import PgVersion from fixtures.port_distributor import PortDistributor -from fixtures.remote_storage import RemoteStorageKind, default_remote_storage +from fixtures.remote_storage import ( + RemoteStorageKind, + default_remote_storage, + s3_storage, +) from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import get_dir_size, query_scalar, start_in_background @@ -118,7 +124,8 @@ def test_many_timelines(neon_env_builder: NeonEnvBuilder): with env.pageserver.http_client() as pageserver_http: timeline_details = [ pageserver_http.timeline_detail( - tenant_id=tenant_id, timeline_id=branch_names_to_timeline_ids[branch_name] + tenant_id=tenant_id, + timeline_id=branch_names_to_timeline_ids[branch_name], ) for branch_name in branch_names ] @@ -457,10 +464,19 @@ def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, def test_wal_backup(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 3 - neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage()) + remote_storage_kind = s3_storage() + neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind) env = neon_env_builder.init_start() + # These are expected after timeline deletion on safekeepers. + env.pageserver.allowed_errors.extend( + [ + ".*Timeline .* was not found in global map.*", + ".*Timeline .* was cancelled and cannot be used anymore.*", + ] + ) + tenant_id = env.initial_tenant timeline_id = env.neon_cli.create_branch("test_safekeepers_wal_backup") endpoint = env.endpoints.create_start("test_safekeepers_wal_backup") @@ -488,7 +504,8 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder): # put one of safekeepers down again env.safekeepers[0].stop() # restart postgres - endpoint.stop_and_destroy().create_start("test_safekeepers_wal_backup") + endpoint.stop() + endpoint = env.endpoints.create_start("test_safekeepers_wal_backup") # and ensure offloading still works with closing(endpoint.connect()) as conn: with conn.cursor() as cur: @@ -498,6 +515,17 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder): partial(is_segment_offloaded, env.safekeepers[1], tenant_id, timeline_id, seg_end), f"segment ending at {seg_end} get offloaded", ) + env.safekeepers[0].start() + endpoint.stop() + + # Test that after timeline deletion remote objects are gone. + prefix = "/".join([str(tenant_id), str(timeline_id)]) + assert_prefix_not_empty(neon_env_builder.safekeepers_remote_storage, prefix) + + for sk in env.safekeepers: + sk_http = sk.http_client() + sk_http.timeline_delete(tenant_id, timeline_id) + assert_prefix_empty(neon_env_builder.safekeepers_remote_storage, prefix) def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder): @@ -586,7 +614,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder): # advancing peer_horizon_lsn. for sk in env.safekeepers: cli = sk.http_client() - cli.timeline_delete_force(tenant_id, timeline_id) + cli.timeline_delete(tenant_id, timeline_id, only_local=True) # restart safekeeper to clear its in-memory state sk.stop() # wait all potenital in flight pushes to broker arrive before starting @@ -1623,7 +1651,7 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): endpoint_3.stop_and_destroy() # Remove initial tenant's br1 (active) - assert sk_http.timeline_delete_force(tenant_id, timeline_id_1)["dir_existed"] + assert sk_http.timeline_delete(tenant_id, timeline_id_1)["dir_existed"] assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists() assert (sk_data_dir / str(tenant_id) / str(timeline_id_2)).is_dir() assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir() @@ -1631,7 +1659,7 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir() # Ensure repeated deletion succeeds - assert not sk_http.timeline_delete_force(tenant_id, timeline_id_1)["dir_existed"] + assert not sk_http.timeline_delete(tenant_id, timeline_id_1)["dir_existed"] assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists() assert (sk_data_dir / str(tenant_id) / str(timeline_id_2)).is_dir() assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir() @@ -1642,13 +1670,13 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): # Ensure we cannot delete the other tenant for sk_h in [sk_http, sk_http_noauth]: with pytest.raises(sk_h.HTTPError, match="Forbidden|Unauthorized"): - assert sk_h.timeline_delete_force(tenant_id_other, timeline_id_other) + assert sk_h.timeline_delete(tenant_id_other, timeline_id_other) with pytest.raises(sk_h.HTTPError, match="Forbidden|Unauthorized"): assert sk_h.tenant_delete_force(tenant_id_other) assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir() # Remove initial tenant's br2 (inactive) - assert sk_http.timeline_delete_force(tenant_id, timeline_id_2)["dir_existed"] + assert sk_http.timeline_delete(tenant_id, timeline_id_2)["dir_existed"] assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists() assert not (sk_data_dir / str(tenant_id) / str(timeline_id_2)).exists() assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir() @@ -1656,7 +1684,7 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir() # Remove non-existing branch, should succeed - assert not sk_http.timeline_delete_force(tenant_id, TimelineId("00" * 16))["dir_existed"] + assert not sk_http.timeline_delete(tenant_id, TimelineId("00" * 16))["dir_existed"] assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists() assert not (sk_data_dir / str(tenant_id) / str(timeline_id_2)).exists() assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).exists()