mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
Delete WAL segments from s3 when timeline is deleted.
In the most straightforward way; safekeeper performs it in DELETE endpoint implementation, with no coordination between sks. delete_force endpoint in the code is renamed to delete as there is only one way to delete.
This commit is contained in:
@@ -288,34 +288,32 @@ async fn timeline_files_handler(request: Request<Body>) -> Result<Response<Body>
|
||||
}
|
||||
|
||||
/// Deactivates the timeline and removes its data directory.
|
||||
async fn timeline_delete_force_handler(
|
||||
mut request: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let ttid = TenantTimelineId::new(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
ensure_no_body(&mut request).await?;
|
||||
// FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
|
||||
// error handling here when we're able to.
|
||||
let resp = GlobalTimelines::delete_force(&ttid)
|
||||
let resp = GlobalTimelines::delete(&ttid, only_local)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
|
||||
/// Deactivates all timelines for the tenant and removes its data directory.
|
||||
/// See `timeline_delete_force_handler`.
|
||||
async fn tenant_delete_force_handler(
|
||||
mut request: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
/// See `timeline_delete_handler`.
|
||||
async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id = parse_request_param(&request, "tenant_id")?;
|
||||
let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
ensure_no_body(&mut request).await?;
|
||||
// FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
|
||||
// Using an `InternalServerError` should be fixed when the types support it
|
||||
let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id)
|
||||
let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id, only_local)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(
|
||||
@@ -512,10 +510,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
request_span(r, timeline_status_handler)
|
||||
})
|
||||
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
|
||||
request_span(r, timeline_delete_force_handler)
|
||||
request_span(r, timeline_delete_handler)
|
||||
})
|
||||
.delete("/v1/tenant/:tenant_id", |r| {
|
||||
request_span(r, tenant_delete_force_handler)
|
||||
request_span(r, tenant_delete_handler)
|
||||
})
|
||||
.post("/v1/pull_timeline", |r| {
|
||||
request_span(r, timeline_pull_handler)
|
||||
|
||||
@@ -88,6 +88,10 @@ impl SafeKeeperConf {
|
||||
self.tenant_dir(&ttid.tenant_id)
|
||||
.join(ttid.timeline_id.to_string())
|
||||
}
|
||||
|
||||
pub fn is_wal_backup_enabled(&self) -> bool {
|
||||
self.remote_storage.is_some() && self.wal_backup_enabled
|
||||
}
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
|
||||
@@ -407,7 +407,7 @@ impl SafekeeperPostgresHandler {
|
||||
self.conf.timeline_dir(&tli.ttid),
|
||||
&persisted_state,
|
||||
start_pos,
|
||||
self.conf.wal_backup_enabled,
|
||||
self.conf.is_wal_backup_enabled(),
|
||||
)?;
|
||||
|
||||
// Split to concurrently receive and send data; replies are generally
|
||||
|
||||
@@ -33,12 +33,13 @@ use crate::safekeeper::{
|
||||
};
|
||||
use crate::send_wal::WalSenders;
|
||||
use crate::state::{TimelineMemState, TimelinePersistentState};
|
||||
use crate::wal_backup::{self};
|
||||
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
|
||||
|
||||
use crate::metrics::FullTimelineInfo;
|
||||
use crate::wal_storage::Storage as wal_storage_iface;
|
||||
use crate::SafeKeeperConf;
|
||||
use crate::{debug_dump, wal_storage};
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
|
||||
/// Things safekeeper should know about timeline state on peers.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
@@ -471,14 +472,29 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete timeline from disk completely, by removing timeline directory. Background
|
||||
/// timeline activities will stop eventually.
|
||||
pub async fn delete_from_disk(
|
||||
/// Delete timeline from disk completely, by removing timeline directory.
|
||||
/// Background timeline activities will stop eventually.
|
||||
///
|
||||
/// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but
|
||||
/// deletion API endpoint is retriable.
|
||||
pub async fn delete(
|
||||
&self,
|
||||
shared_state: &mut MutexGuard<'_, SharedState>,
|
||||
only_local: bool,
|
||||
) -> Result<(bool, bool)> {
|
||||
let was_active = shared_state.active;
|
||||
self.cancel(shared_state);
|
||||
|
||||
// TODO: It's better to wait for s3 offloader termination before
|
||||
// removing data from s3. Though since s3 doesn't have transactions it
|
||||
// still wouldn't guarantee absense of data after removal.
|
||||
let conf = GlobalTimelines::get_global_config();
|
||||
if !only_local && conf.is_wal_backup_enabled() {
|
||||
// Note: we concurrently delete remote storage data from multiple
|
||||
// safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
|
||||
// do some retries anyway.
|
||||
wal_backup::delete_timeline(&self.ttid).await?;
|
||||
}
|
||||
let dir_existed = delete_dir(&self.timeline_dir).await?;
|
||||
Ok((dir_existed, was_active))
|
||||
}
|
||||
|
||||
@@ -327,16 +327,20 @@ impl GlobalTimelines {
|
||||
}
|
||||
|
||||
/// Cancels timeline, then deletes the corresponding data directory.
|
||||
pub async fn delete_force(ttid: &TenantTimelineId) -> Result<TimelineDeleteForceResult> {
|
||||
/// If only_local, doesn't remove WAL segments in remote storage.
|
||||
pub async fn delete(
|
||||
ttid: &TenantTimelineId,
|
||||
only_local: bool,
|
||||
) -> Result<TimelineDeleteForceResult> {
|
||||
let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
|
||||
match tli_res {
|
||||
Ok(timeline) => {
|
||||
// Take a lock and finish the deletion holding this mutex.
|
||||
let mut shared_state = timeline.write_shared_state().await;
|
||||
|
||||
info!("deleting timeline {}", ttid);
|
||||
info!("deleting timeline {}, only_local={}", ttid, only_local);
|
||||
let (dir_existed, was_active) =
|
||||
timeline.delete_from_disk(&mut shared_state).await?;
|
||||
timeline.delete(&mut shared_state, only_local).await?;
|
||||
|
||||
// Remove timeline from the map.
|
||||
// FIXME: re-enable it once we fix the issue with recreation of deleted timelines
|
||||
@@ -369,8 +373,11 @@ impl GlobalTimelines {
|
||||
/// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
|
||||
/// created simultaneously. In that case the function will return error and the caller should
|
||||
/// retry tenant deletion again later.
|
||||
///
|
||||
/// If only_local, doesn't remove WAL segments in remote storage.
|
||||
pub async fn delete_force_all_for_tenant(
|
||||
tenant_id: &TenantId,
|
||||
only_local: bool,
|
||||
) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
|
||||
info!("deleting all timelines for tenant {}", tenant_id);
|
||||
let to_delete = Self::get_all_for_tenant(*tenant_id);
|
||||
@@ -379,7 +386,7 @@ impl GlobalTimelines {
|
||||
|
||||
let mut deleted = HashMap::new();
|
||||
for tli in &to_delete {
|
||||
match Self::delete_force(&tli.ttid).await {
|
||||
match Self::delete(&tli.ttid, only_local).await {
|
||||
Ok(result) => {
|
||||
deleted.insert(tli.ttid, result);
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::stream::FuturesOrdered;
|
||||
use futures::StreamExt;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::backoff;
|
||||
use utils::id::NodeId;
|
||||
|
||||
use std::cmp::min;
|
||||
@@ -166,6 +168,17 @@ async fn update_task(
|
||||
}
|
||||
}
|
||||
|
||||
static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
|
||||
|
||||
// Storage must be configured and initialized when this is called.
|
||||
fn get_configured_remote_storage() -> &'static GenericRemoteStorage {
|
||||
REMOTE_STORAGE
|
||||
.get()
|
||||
.expect("failed to get remote storage")
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
|
||||
|
||||
/// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
|
||||
@@ -199,7 +212,7 @@ pub async fn wal_backup_launcher_task_main(
|
||||
ttid = wal_backup_launcher_rx.recv() => {
|
||||
// channel is never expected to get closed
|
||||
let ttid = ttid.unwrap();
|
||||
if conf.remote_storage.is_none() || !conf.wal_backup_enabled {
|
||||
if !conf.is_wal_backup_enabled() {
|
||||
continue; /* just drain the channel and do nothing */
|
||||
}
|
||||
async {
|
||||
@@ -484,18 +497,12 @@ fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
|
||||
res
|
||||
}
|
||||
|
||||
static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
|
||||
|
||||
async fn backup_object(
|
||||
source_file: &Utf8Path,
|
||||
target_file: &RemotePath,
|
||||
size: usize,
|
||||
) -> Result<()> {
|
||||
let storage = REMOTE_STORAGE
|
||||
.get()
|
||||
.expect("failed to get remote storage")
|
||||
.as_ref()
|
||||
.unwrap();
|
||||
let storage = get_configured_remote_storage();
|
||||
|
||||
let file = File::open(&source_file)
|
||||
.await
|
||||
@@ -532,6 +539,39 @@ pub async fn read_object(
|
||||
Ok(Box::pin(reader))
|
||||
}
|
||||
|
||||
/// Delete WAL files for the given timeline. Remote storage must be configured
|
||||
/// when called.
|
||||
pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
|
||||
let storage = get_configured_remote_storage();
|
||||
let ttid_path = Utf8Path::new(&ttid.tenant_id.to_string()).join(ttid.timeline_id.to_string());
|
||||
let remote_path = RemotePath::new(&ttid_path)?;
|
||||
|
||||
// A backoff::retry is used here for two reasons:
|
||||
// - To provide a backoff rather than busy-polling the API on errors
|
||||
// - To absorb transient 429/503 conditions without hitting our error
|
||||
// logging path for issues deleting objects.
|
||||
//
|
||||
// Note: listing segments might take a long time if there are many of them.
|
||||
// We don't currently have http requests timeout cancellation, but if/once
|
||||
// we have listing should get streaming interface to make progress.
|
||||
let token = CancellationToken::new(); // not really used
|
||||
backoff::retry(
|
||||
|| async {
|
||||
let files = storage.list_files(Some(&remote_path)).await?;
|
||||
storage.delete_objects(&files).await?;
|
||||
Ok(())
|
||||
},
|
||||
|_| false,
|
||||
3,
|
||||
10,
|
||||
"executing WAL segments deletion batch",
|
||||
backoff::Cancel::new(token, || anyhow::anyhow!("canceled")),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Copy segments from one timeline to another. Used in copy_timeline.
|
||||
pub async fn copy_s3_segments(
|
||||
wal_seg_size: usize,
|
||||
|
||||
@@ -3352,9 +3352,15 @@ class SafekeeperHttpClient(requests.Session):
|
||||
)
|
||||
res.raise_for_status()
|
||||
|
||||
def timeline_delete_force(self, tenant_id: TenantId, timeline_id: TimelineId) -> Dict[Any, Any]:
|
||||
# only_local doesn't remove segments in the remote storage.
|
||||
def timeline_delete(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False
|
||||
) -> Dict[Any, Any]:
|
||||
res = self.delete(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}"
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
|
||||
params={
|
||||
"only_local": str(only_local).lower(),
|
||||
},
|
||||
)
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from mypy_boto3_s3.type_defs import ListObjectsV2OutputTypeDef, ObjectTypeDef
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
from fixtures.remote_storage import RemoteStorageKind, S3Storage
|
||||
from fixtures.remote_storage import RemoteStorage, RemoteStorageKind, S3Storage
|
||||
from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
@@ -233,23 +233,18 @@ def timeline_delete_wait_completed(
|
||||
wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id, iterations, interval)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
# TODO avoid by combining remote storage related stuff in single type
|
||||
# and just passing in this type instead of whole builder
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
# remote_storage must not be None, but that's easier for callers to make mypy happy
|
||||
def assert_prefix_empty(
|
||||
neon_env_builder: "NeonEnvBuilder",
|
||||
remote_storage: Optional[RemoteStorage],
|
||||
prefix: Optional[str] = None,
|
||||
allowed_postfix: Optional[str] = None,
|
||||
):
|
||||
response = list_prefix(neon_env_builder, prefix)
|
||||
assert remote_storage is not None
|
||||
response = list_prefix(remote_storage, prefix)
|
||||
keys = response["KeyCount"]
|
||||
objects: List[ObjectTypeDef] = response.get("Contents", [])
|
||||
common_prefixes = response.get("CommonPrefixes", [])
|
||||
|
||||
remote_storage = neon_env_builder.pageserver_remote_storage
|
||||
is_mock_s3 = isinstance(remote_storage, S3Storage) and not remote_storage.cleanup
|
||||
|
||||
if is_mock_s3:
|
||||
@@ -283,19 +278,20 @@ def assert_prefix_empty(
|
||||
), f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
|
||||
|
||||
|
||||
def assert_prefix_not_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None):
|
||||
response = list_prefix(neon_env_builder, prefix)
|
||||
# remote_storage must not be None, but that's easier for callers to make mypy happy
|
||||
def assert_prefix_not_empty(remote_storage: Optional[RemoteStorage], prefix: Optional[str] = None):
|
||||
assert remote_storage is not None
|
||||
response = list_prefix(remote_storage, prefix)
|
||||
assert response["KeyCount"] != 0, f"remote dir with prefix {prefix} is empty: {response}"
|
||||
|
||||
|
||||
def list_prefix(
|
||||
neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None, delimiter: str = "/"
|
||||
remote: RemoteStorage, prefix: Optional[str] = None, delimiter: str = "/"
|
||||
) -> ListObjectsV2OutputTypeDef:
|
||||
"""
|
||||
Note that this function takes into account prefix_in_bucket.
|
||||
"""
|
||||
# For local_fs we need to properly handle empty directories, which we currently dont, so for simplicity stick to s3 api.
|
||||
remote = neon_env_builder.pageserver_remote_storage
|
||||
assert isinstance(remote, S3Storage), "localfs is currently not supported"
|
||||
assert remote.client is not None
|
||||
|
||||
|
||||
@@ -216,8 +216,14 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
log.info(f"group: {m.group(1)}")
|
||||
return int(m.group(1), 16)
|
||||
|
||||
assert neon_env_builder.pageserver_remote_storage is not None
|
||||
pre_upgrade_keys = list(
|
||||
[o["Key"] for o in list_prefix(neon_env_builder, delimiter="")["Contents"]]
|
||||
[
|
||||
o["Key"]
|
||||
for o in list_prefix(neon_env_builder.pageserver_remote_storage, delimiter="")[
|
||||
"Contents"
|
||||
]
|
||||
]
|
||||
)
|
||||
for key in pre_upgrade_keys:
|
||||
assert parse_generation_suffix(key) is None
|
||||
@@ -232,7 +238,12 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
legacy_objects: list[str] = []
|
||||
suffixed_objects = []
|
||||
post_upgrade_keys = list(
|
||||
[o["Key"] for o in list_prefix(neon_env_builder, delimiter="")["Contents"]]
|
||||
[
|
||||
o["Key"]
|
||||
for o in list_prefix(neon_env_builder.pageserver_remote_storage, delimiter="")[
|
||||
"Contents"
|
||||
]
|
||||
]
|
||||
)
|
||||
for key in post_upgrade_keys:
|
||||
log.info(f"post-upgrade key: {key}")
|
||||
|
||||
@@ -504,7 +504,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_delete_wait_completed(ps_attached.http_client(), tenant_id, 10)
|
||||
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
|
||||
@@ -75,7 +75,7 @@ def test_tenant_delete_smoke(
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant=tenant_id, timeline=timeline_id)
|
||||
|
||||
assert_prefix_not_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
@@ -96,7 +96,7 @@ def test_tenant_delete_smoke(
|
||||
assert not tenant_path.exists()
|
||||
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
@@ -207,7 +207,7 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
assert_prefix_not_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
@@ -268,7 +268,7 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
|
||||
|
||||
# Check remote is empty
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
@@ -304,7 +304,7 @@ def test_tenant_delete_is_resumed_on_attach(
|
||||
|
||||
# sanity check, data should be there
|
||||
assert_prefix_not_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
@@ -343,7 +343,7 @@ def test_tenant_delete_is_resumed_on_attach(
|
||||
)
|
||||
|
||||
assert_prefix_not_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
@@ -378,7 +378,7 @@ def test_tenant_delete_is_resumed_on_attach(
|
||||
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
@@ -543,7 +543,7 @@ def test_tenant_delete_concurrent(
|
||||
|
||||
# Physical deletion should have happened
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
@@ -645,7 +645,7 @@ def test_tenant_delete_races_timeline_creation(
|
||||
|
||||
# Physical deletion should have happened
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
|
||||
@@ -191,7 +191,7 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
|
||||
last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id)
|
||||
|
||||
assert_prefix_not_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
@@ -275,7 +275,7 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
|
||||
# Check remote is empty
|
||||
if remote_storage_kind is RemoteStorageKind.MOCK_S3:
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
@@ -449,7 +449,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
|
||||
assert all([tl["state"] == "Active" for tl in timelines])
|
||||
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
@@ -466,7 +466,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
|
||||
)
|
||||
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
@@ -482,7 +482,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
|
||||
wait_until(
|
||||
2,
|
||||
0.5,
|
||||
lambda: assert_prefix_empty(neon_env_builder),
|
||||
lambda: assert_prefix_empty(neon_env_builder.pageserver_remote_storage),
|
||||
)
|
||||
|
||||
|
||||
@@ -673,7 +673,7 @@ def test_timeline_delete_works_for_remote_smoke(
|
||||
|
||||
for timeline_id in timeline_ids:
|
||||
assert_prefix_not_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
@@ -690,7 +690,7 @@ def test_timeline_delete_works_for_remote_smoke(
|
||||
timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=timeline_id)
|
||||
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
@@ -703,7 +703,7 @@ def test_timeline_delete_works_for_remote_smoke(
|
||||
|
||||
# for some reason the check above doesnt immediately take effect for the below.
|
||||
# Assume it is mock server inconsistency and check twice.
|
||||
wait_until(2, 0.5, lambda: assert_prefix_empty(neon_env_builder))
|
||||
wait_until(2, 0.5, lambda: assert_prefix_empty(neon_env_builder.pageserver_remote_storage))
|
||||
|
||||
|
||||
def test_delete_orphaned_objects(
|
||||
@@ -791,7 +791,7 @@ def test_timeline_delete_resumed_on_attach(
|
||||
last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id)
|
||||
|
||||
assert_prefix_not_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
@@ -839,7 +839,7 @@ def test_timeline_delete_resumed_on_attach(
|
||||
assert reason.endswith(f"failpoint: {failpoint}"), reason
|
||||
|
||||
assert_prefix_not_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
@@ -870,7 +870,7 @@ def test_timeline_delete_resumed_on_attach(
|
||||
assert not tenant_path.exists()
|
||||
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
|
||||
@@ -33,13 +33,19 @@ from fixtures.neon_fixtures import (
|
||||
last_flush_lsn_upload,
|
||||
)
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_prefix_empty,
|
||||
assert_prefix_not_empty,
|
||||
timeline_delete_wait_completed,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import RemoteStorageKind, default_remote_storage
|
||||
from fixtures.remote_storage import (
|
||||
RemoteStorageKind,
|
||||
default_remote_storage,
|
||||
s3_storage,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import get_dir_size, query_scalar, start_in_background
|
||||
|
||||
@@ -118,7 +124,8 @@ def test_many_timelines(neon_env_builder: NeonEnvBuilder):
|
||||
with env.pageserver.http_client() as pageserver_http:
|
||||
timeline_details = [
|
||||
pageserver_http.timeline_detail(
|
||||
tenant_id=tenant_id, timeline_id=branch_names_to_timeline_ids[branch_name]
|
||||
tenant_id=tenant_id,
|
||||
timeline_id=branch_names_to_timeline_ids[branch_name],
|
||||
)
|
||||
for branch_name in branch_names
|
||||
]
|
||||
@@ -457,10 +464,19 @@ def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId,
|
||||
|
||||
def test_wal_backup(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
|
||||
remote_storage_kind = s3_storage()
|
||||
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# These are expected after timeline deletion on safekeepers.
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*Timeline .* was not found in global map.*",
|
||||
".*Timeline .* was cancelled and cannot be used anymore.*",
|
||||
]
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_safekeepers_wal_backup")
|
||||
endpoint = env.endpoints.create_start("test_safekeepers_wal_backup")
|
||||
@@ -488,7 +504,8 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder):
|
||||
# put one of safekeepers down again
|
||||
env.safekeepers[0].stop()
|
||||
# restart postgres
|
||||
endpoint.stop_and_destroy().create_start("test_safekeepers_wal_backup")
|
||||
endpoint.stop()
|
||||
endpoint = env.endpoints.create_start("test_safekeepers_wal_backup")
|
||||
# and ensure offloading still works
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
@@ -498,6 +515,17 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder):
|
||||
partial(is_segment_offloaded, env.safekeepers[1], tenant_id, timeline_id, seg_end),
|
||||
f"segment ending at {seg_end} get offloaded",
|
||||
)
|
||||
env.safekeepers[0].start()
|
||||
endpoint.stop()
|
||||
|
||||
# Test that after timeline deletion remote objects are gone.
|
||||
prefix = "/".join([str(tenant_id), str(timeline_id)])
|
||||
assert_prefix_not_empty(neon_env_builder.safekeepers_remote_storage, prefix)
|
||||
|
||||
for sk in env.safekeepers:
|
||||
sk_http = sk.http_client()
|
||||
sk_http.timeline_delete(tenant_id, timeline_id)
|
||||
assert_prefix_empty(neon_env_builder.safekeepers_remote_storage, prefix)
|
||||
|
||||
|
||||
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
|
||||
@@ -586,7 +614,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
|
||||
# advancing peer_horizon_lsn.
|
||||
for sk in env.safekeepers:
|
||||
cli = sk.http_client()
|
||||
cli.timeline_delete_force(tenant_id, timeline_id)
|
||||
cli.timeline_delete(tenant_id, timeline_id, only_local=True)
|
||||
# restart safekeeper to clear its in-memory state
|
||||
sk.stop()
|
||||
# wait all potenital in flight pushes to broker arrive before starting
|
||||
@@ -1623,7 +1651,7 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
endpoint_3.stop_and_destroy()
|
||||
|
||||
# Remove initial tenant's br1 (active)
|
||||
assert sk_http.timeline_delete_force(tenant_id, timeline_id_1)["dir_existed"]
|
||||
assert sk_http.timeline_delete(tenant_id, timeline_id_1)["dir_existed"]
|
||||
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists()
|
||||
assert (sk_data_dir / str(tenant_id) / str(timeline_id_2)).is_dir()
|
||||
assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir()
|
||||
@@ -1631,7 +1659,7 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
|
||||
|
||||
# Ensure repeated deletion succeeds
|
||||
assert not sk_http.timeline_delete_force(tenant_id, timeline_id_1)["dir_existed"]
|
||||
assert not sk_http.timeline_delete(tenant_id, timeline_id_1)["dir_existed"]
|
||||
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists()
|
||||
assert (sk_data_dir / str(tenant_id) / str(timeline_id_2)).is_dir()
|
||||
assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir()
|
||||
@@ -1642,13 +1670,13 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
# Ensure we cannot delete the other tenant
|
||||
for sk_h in [sk_http, sk_http_noauth]:
|
||||
with pytest.raises(sk_h.HTTPError, match="Forbidden|Unauthorized"):
|
||||
assert sk_h.timeline_delete_force(tenant_id_other, timeline_id_other)
|
||||
assert sk_h.timeline_delete(tenant_id_other, timeline_id_other)
|
||||
with pytest.raises(sk_h.HTTPError, match="Forbidden|Unauthorized"):
|
||||
assert sk_h.tenant_delete_force(tenant_id_other)
|
||||
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
|
||||
|
||||
# Remove initial tenant's br2 (inactive)
|
||||
assert sk_http.timeline_delete_force(tenant_id, timeline_id_2)["dir_existed"]
|
||||
assert sk_http.timeline_delete(tenant_id, timeline_id_2)["dir_existed"]
|
||||
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists()
|
||||
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_2)).exists()
|
||||
assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir()
|
||||
@@ -1656,7 +1684,7 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
|
||||
|
||||
# Remove non-existing branch, should succeed
|
||||
assert not sk_http.timeline_delete_force(tenant_id, TimelineId("00" * 16))["dir_existed"]
|
||||
assert not sk_http.timeline_delete(tenant_id, TimelineId("00" * 16))["dir_existed"]
|
||||
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists()
|
||||
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_2)).exists()
|
||||
assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).exists()
|
||||
|
||||
Reference in New Issue
Block a user