safekeeper: allow remote deletion to proceed after dropped requests (#11042)

## Problem

If a caller times out on safekeeper timeline deletion on a large
timeline, and waits a while before retrying, the deletion will not
progress while the retry is waiting. The net effect is very very slow
deletion as it only proceeds in 30 second bursts across 5 minute idle
periods.

Related: https://github.com/neondatabase/neon/issues/10265

## Summary of changes

- Run remote deletion in a background task
- Carry a watch::Receiver on the Timeline for other callers to join the
wait
- Restart deletion if the API is called again and the previous attempt
failed
This commit is contained in:
John Spray
2025-03-03 16:03:51 +00:00
committed by GitHub
parent a07599949f
commit b953daa21f
7 changed files with 541 additions and 305 deletions

View File

@@ -415,6 +415,9 @@ impl From<TimelineError> for ApiError {
}
}
/// We run remote deletion in a background task, this is how it sends its results back.
type RemoteDeletionReceiver = tokio::sync::watch::Receiver<Option<anyhow::Result<()>>>;
/// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
/// It also holds SharedState and provides mutually exclusive access to it.
pub struct Timeline {
@@ -446,6 +449,8 @@ pub struct Timeline {
manager_ctl: ManagerCtl,
conf: Arc<SafeKeeperConf>,
remote_deletion: std::sync::Mutex<Option<RemoteDeletionReceiver>>,
/// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding
/// this gate, you must respect [`Timeline::cancel`]
pub(crate) gate: Gate,
@@ -494,6 +499,7 @@ impl Timeline {
walreceivers,
gate: Default::default(),
cancel: CancellationToken::default(),
remote_deletion: std::sync::Mutex::new(None),
manager_ctl: ManagerCtl::new(),
conf,
broker_active: AtomicBool::new(false),
@@ -598,15 +604,95 @@ impl Timeline {
shared_state.sk.close_wal_store();
if !only_local && self.conf.is_wal_backup_enabled() {
// Note: we concurrently delete remote storage data from multiple
// safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
// do some retries anyway.
wal_backup::delete_timeline(&self.ttid).await?;
self.remote_delete().await?;
}
let dir_existed = delete_dir(&self.timeline_dir).await?;
Ok(dir_existed)
}
/// Delete timeline content from remote storage. If the returned future is dropped,
/// deletion will continue in the background.
///
/// This function ordinarily spawns a task and stashes a result receiver into [`Self::remote_deletion`]. If
/// deletion is already happening, it may simply wait for an existing task's result.
///
/// Note: we concurrently delete remote storage data from multiple
/// safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
/// do some retries anyway.
async fn remote_delete(&self) -> Result<()> {
// We will start a background task to do the deletion, so that it proceeds even if our
// API request is dropped. Future requests will see the existing deletion task and wait
// for it to complete.
let mut result_rx = {
let mut remote_deletion_state = self.remote_deletion.lock().unwrap();
let result_rx = if let Some(result_rx) = remote_deletion_state.as_ref() {
if let Some(result) = result_rx.borrow().as_ref() {
if let Err(e) = result {
// A previous remote deletion failed: we will start a new one
tracing::error!("remote deletion failed, will retry ({e})");
None
} else {
// A previous remote deletion call already succeeded
return Ok(());
}
} else {
// Remote deletion is still in flight
Some(result_rx.clone())
}
} else {
// Remote deletion was not attempted yet, start it now.
None
};
match result_rx {
Some(result_rx) => result_rx,
None => self.start_remote_delete(&mut remote_deletion_state),
}
};
// Wait for a result
let Ok(result) = result_rx.wait_for(|v| v.is_some()).await else {
// Unexpected: sender should always send a result before dropping the channel, even if it has an error
return Err(anyhow::anyhow!(
"remote deletion task future was dropped without sending a result"
));
};
result
.as_ref()
.expect("We did a wait_for on this being Some above")
.as_ref()
.map(|_| ())
.map_err(|e| anyhow::anyhow!("remote deletion failed: {e}"))
}
/// Spawn background task to do remote deletion, return a receiver for its outcome
fn start_remote_delete(
&self,
guard: &mut std::sync::MutexGuard<Option<RemoteDeletionReceiver>>,
) -> RemoteDeletionReceiver {
tracing::info!("starting remote deletion");
let (result_tx, result_rx) = tokio::sync::watch::channel(None);
let ttid = self.ttid;
tokio::task::spawn(
async move {
let r = wal_backup::delete_timeline(&ttid).await;
if let Err(e) = &r {
// Log error here in case nobody ever listens for our result (e.g. dropped API request)
tracing::error!("remote deletion failed: {e}");
}
// Ignore send results: it's legal for the Timeline to give up waiting for us.
let _ = result_tx.send(Some(r));
}
.instrument(info_span!("remote_delete", timeline = %self.ttid)),
);
**guard = Some(result_rx.clone());
result_rx
}
/// Returns if timeline is cancelled.
pub fn is_cancelled(&self) -> bool {
self.cancel.is_cancelled()

View File

@@ -21,9 +21,9 @@ use tokio::sync::{OnceCell, watch};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::backoff;
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
use utils::{backoff, pausable_failpoint};
use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS, WAL_BACKUP_TASKS};
use crate::timeline::WalResidentTimeline;
@@ -564,6 +564,12 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
// We don't currently have http requests timeout cancellation, but if/once
// we have listing should get streaming interface to make progress.
pausable_failpoint!("sk-delete-timeline-remote-pause");
fail::fail_point!("sk-delete-timeline-remote", |_| {
Err(anyhow::anyhow!("failpoint: sk-delete-timeline-remote"))
});
let cancel = CancellationToken::new(); // not really used
backoff::retry(
|| async {

View File

@@ -282,6 +282,17 @@ class S3Storage:
def timeline_path(self, tenant_id: TenantShardId | TenantId, timeline_id: TimelineId) -> str:
return f"{self.tenant_path(tenant_id)}/timelines/{timeline_id}"
def safekeeper_tenants_path(self) -> str:
return f"{self.prefix_in_bucket}"
def safekeeper_tenant_path(self, tenant_id: TenantShardId | TenantId) -> str:
return f"{self.safekeeper_tenants_path()}/{tenant_id}"
def safekeeper_timeline_path(
self, tenant_id: TenantShardId | TenantId, timeline_id: TimelineId
) -> str:
return f"{self.safekeeper_tenant_path(tenant_id)}/{timeline_id}"
def get_latest_generation_key(self, prefix: str, suffix: str, keys: list[str]) -> str:
"""
Gets the latest generation key from a list of keys.

View File

@@ -229,13 +229,14 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
# only_local doesn't remove segments in the remote storage.
def timeline_delete(
self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False
self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False, **kwargs
) -> dict[Any, Any]:
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
params={
"only_local": str(only_local).lower(),
},
**kwargs,
)
res.raise_for_status()
res_json = res.json()

View File

@@ -0,0 +1,92 @@
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonPageserver, Safekeeper
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.utils import get_dir_size
def is_segment_offloaded(
sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, seg_end: Lsn
):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.backup_lsn >= seg_end
def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.flush_lsn >= lsn
def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
sk_wal_size = get_dir_size(sk.timeline_dir(tenant_id, timeline_id))
sk_wal_size_mb = sk_wal_size / 1024 / 1024
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}")
return sk_wal_size_mb <= target_size_mb
def wait_lsn_force_checkpoint(
tenant_id: TenantId,
timeline_id: TimelineId,
endpoint: Endpoint,
ps: NeonPageserver,
pageserver_conn_options=None,
):
pageserver_conn_options = pageserver_conn_options or {}
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
log.info(f"pg_current_wal_flush_lsn is {lsn}, waiting for it on pageserver")
wait_lsn_force_checkpoint_at(lsn, tenant_id, timeline_id, ps, pageserver_conn_options)
def wait_lsn_force_checkpoint_at_sk(
safekeeper: Safekeeper,
tenant_id: TenantId,
timeline_id: TimelineId,
ps: NeonPageserver,
pageserver_conn_options=None,
):
sk_flush_lsn = safekeeper.get_flush_lsn(tenant_id, timeline_id)
wait_lsn_force_checkpoint_at(sk_flush_lsn, tenant_id, timeline_id, ps, pageserver_conn_options)
def wait_lsn_force_checkpoint_at(
lsn: Lsn,
tenant_id: TenantId,
timeline_id: TimelineId,
ps: NeonPageserver,
pageserver_conn_options=None,
):
"""
Wait until pageserver receives given lsn, force checkpoint and wait for
upload, i.e. remote_consistent_lsn advancement.
"""
pageserver_conn_options = pageserver_conn_options or {}
auth_token = None
if "password" in pageserver_conn_options:
auth_token = pageserver_conn_options["password"]
# wait for the pageserver to catch up
wait_for_last_record_lsn(
ps.http_client(auth_token=auth_token),
tenant_id,
timeline_id,
lsn,
)
# force checkpoint to advance remote_consistent_lsn
ps.http_client(auth_token).timeline_checkpoint(tenant_id, timeline_id)
# ensure that remote_consistent_lsn is advanced
wait_for_upload(
ps.http_client(auth_token=auth_token),
tenant_id,
timeline_id,
lsn,
)

View File

@@ -0,0 +1,331 @@
from __future__ import annotations
import threading
import time
from contextlib import closing
from enum import StrEnum
import pytest
import requests
from fixtures.common_types import Lsn, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnvBuilder,
)
from fixtures.remote_storage import S3Storage, s3_storage
from fixtures.safekeeper_utils import is_segment_offloaded
from fixtures.utils import wait_until
@pytest.mark.parametrize("auth_enabled", [False, True])
def test_safekeeper_delete_timeline(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
neon_env_builder.auth_enabled = auth_enabled
env = neon_env_builder.init_start()
# FIXME: are these expected?
env.pageserver.allowed_errors.extend(
[
".*Timeline .* was not found in global map.*",
".*Timeline .* was cancelled and cannot be used anymore.*",
]
)
# Create two tenants: one will be deleted, other should be preserved.
tenant_id = env.initial_tenant
timeline_id_1 = env.create_branch("br1") # Active, delete explicitly
timeline_id_2 = env.create_branch("br2") # Inactive, delete explicitly
timeline_id_3 = env.create_branch("br3") # Active, delete with the tenant
timeline_id_4 = env.create_branch("br4") # Inactive, delete with the tenant
tenant_id_other, timeline_id_other = env.create_tenant()
# Populate branches
endpoint_1 = env.endpoints.create_start("br1")
endpoint_2 = env.endpoints.create_start("br2")
endpoint_3 = env.endpoints.create_start("br3")
endpoint_4 = env.endpoints.create_start("br4")
endpoint_other = env.endpoints.create_start("main", tenant_id=tenant_id_other)
for endpoint in [endpoint_1, endpoint_2, endpoint_3, endpoint_4, endpoint_other]:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key)")
sk = env.safekeepers[0]
sk_data_dir = sk.data_dir
if not auth_enabled:
sk_http = sk.http_client()
sk_http_other = sk_http
else:
sk_http = sk.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id))
sk_http_other = sk.http_client(
auth_token=env.auth_keys.generate_tenant_token(tenant_id_other)
)
sk_http_noauth = sk.http_client(gen_sk_wide_token=False)
assert (sk_data_dir / str(tenant_id) / str(timeline_id_1)).is_dir()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_2)).is_dir()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_4)).is_dir()
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
# Stop branches which should be inactive and restart Safekeeper to drop its in-memory state.
endpoint_2.stop_and_destroy()
endpoint_4.stop_and_destroy()
sk.stop()
sk.start()
# Ensure connections to Safekeeper are established
for endpoint in [endpoint_1, endpoint_3, endpoint_other]:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("INSERT INTO t (key) VALUES (1)")
# Stop all computes gracefully before safekeepers stop responding to them
endpoint_1.stop_and_destroy()
endpoint_3.stop_and_destroy()
# Remove initial tenant's br1 (active)
assert sk_http.timeline_delete(tenant_id, timeline_id_1)["dir_existed"]
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_2)).is_dir()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_4)).is_dir()
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
# Ensure repeated deletion succeeds
assert not sk_http.timeline_delete(tenant_id, timeline_id_1)["dir_existed"]
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_2)).is_dir()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_4)).is_dir()
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
if auth_enabled:
# Ensure we cannot delete the other tenant
for sk_h in [sk_http, sk_http_noauth]:
with pytest.raises(sk_h.HTTPError, match="Forbidden|Unauthorized"):
assert sk_h.timeline_delete(tenant_id_other, timeline_id_other)
with pytest.raises(sk_h.HTTPError, match="Forbidden|Unauthorized"):
assert sk_h.tenant_delete_force(tenant_id_other)
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
# Remove initial tenant's br2 (inactive)
assert sk_http.timeline_delete(tenant_id, timeline_id_2)["dir_existed"]
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists()
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_2)).exists()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_4)).is_dir()
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
# Remove non-existing branch, should succeed
assert not sk_http.timeline_delete(tenant_id, TimelineId("00" * 16))["dir_existed"]
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists()
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_2)).exists()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).exists()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_4)).is_dir()
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
# Remove initial tenant fully (two branches are active)
response = sk_http.tenant_delete_force(tenant_id)
assert response[str(timeline_id_3)]["dir_existed"]
assert not (sk_data_dir / str(tenant_id)).exists()
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
# Remove initial tenant again.
response = sk_http.tenant_delete_force(tenant_id)
# assert response == {}
assert not (sk_data_dir / str(tenant_id)).exists()
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
# Ensure the other tenant still works
sk_http_other.timeline_status(tenant_id_other, timeline_id_other)
with closing(endpoint_other.connect()) as conn:
with conn.cursor() as cur:
cur.execute("INSERT INTO t (key) VALUES (123)")
def test_safekeeper_delete_timeline_under_load(neon_env_builder: NeonEnvBuilder):
"""
Test deleting timelines on a safekeeper while they're under load.
This should not happen under normal operation, but it can happen if
there is some rogue compute/pageserver that is writing/reading to a
safekeeper that we're migrating a timeline away from, or if the timeline
is being deleted while such a rogue client is running.
"""
neon_env_builder.auth_enabled = True
env = neon_env_builder.init_start()
# Create two endpoints that will generate load
timeline_id_a = env.create_branch("deleteme_a")
timeline_id_b = env.create_branch("deleteme_b")
endpoint_a = env.endpoints.create("deleteme_a")
endpoint_a.start()
endpoint_b = env.endpoints.create("deleteme_b")
endpoint_b.start()
# Get tenant and timeline IDs
tenant_id = env.initial_tenant
# Start generating load on both timelines
def generate_load(endpoint: Endpoint):
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE IF NOT EXISTS t(key int, value text)")
while True:
try:
cur.execute("INSERT INTO t SELECT generate_series(1,1000), 'data'")
except: # noqa
# Ignore errors since timeline may be deleted
break
t_a = threading.Thread(target=generate_load, args=(endpoint_a,))
t_b = threading.Thread(target=generate_load, args=(endpoint_b,))
try:
t_a.start()
t_b.start()
# Let the load run for a bit
log.info("Warming up...")
time.sleep(2)
# Safekeeper errors will propagate to the pageserver: it is correct that these are
# logged at error severity because they indicate the pageserver is trying to read
# a timeline that it shouldn't.
env.pageserver.allowed_errors.extend(
[
".*Timeline.*was cancelled.*",
".*Timeline.*was not found.*",
]
)
# Try deleting timelines while under load
sk = env.safekeepers[0]
sk_http = sk.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id))
# Delete first timeline
log.info(f"Deleting {timeline_id_a}...")
assert sk_http.timeline_delete(tenant_id, timeline_id_a, only_local=True)["dir_existed"]
# Delete second timeline
log.info(f"Deleting {timeline_id_b}...")
assert sk_http.timeline_delete(tenant_id, timeline_id_b, only_local=True)["dir_existed"]
# Verify timelines are gone from disk
sk_data_dir = sk.data_dir
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_a)).exists()
# assert not (sk_data_dir / str(tenant_id) / str(timeline_id_b)).exists()
finally:
log.info("Stopping endpoints...")
# Stop endpoints with immediate mode because we deleted the timeline out from under the compute, which may cause it to hang
endpoint_a.stop(mode="immediate")
endpoint_b.stop(mode="immediate")
log.info("Joining threads...")
t_a.join()
t_b.join()
class RemoteDeleteFailpoint(StrEnum):
PAUSE = "sk-delete-timeline-remote-pause"
FAIL = "sk-delete-timeline-remote"
@pytest.mark.parametrize("failpoint", [RemoteDeleteFailpoint.PAUSE, RemoteDeleteFailpoint.FAIL])
def test_safekeeper_delete_remote_errors(
neon_env_builder: NeonEnvBuilder, failpoint: RemoteDeleteFailpoint
):
"""
Test that errors and delays during remote deletion are handled correctly.
"""
# Configure safekeepers with ultra-fast eviction policy
neon_env_builder.safekeeper_extra_opts = [
"--enable-offload",
"--delete-offloaded-wal",
"--control-file-save-interval",
"1s",
]
neon_env_builder.enable_safekeeper_remote_storage(s3_storage())
env = neon_env_builder.init_start()
# FIXME: pageserver is intermittently emitting this
env.pageserver.allowed_errors.extend(
[
".*unsupported command START_WAL_PUSH in START_WAL_PUSH.*",
]
)
timeline_id_a = env.create_branch("deleteme_a")
endpoint_a = env.endpoints.create("deleteme_a")
endpoint_a.start()
with closing(endpoint_a.connect()) as conn:
with conn.cursor() as cur:
# roughly fills one segment
cur.execute("create table t(key int, value text)")
cur.execute("insert into t select generate_series(1,250000), 'payload'")
endpoint_a.stop()
# Ensure something is uploaded to remote storage
def assert_is_uploaded():
assert is_segment_offloaded(
env.safekeepers[0], env.initial_tenant, timeline_id_a, Lsn("0/2000000")
)
wait_until(assert_is_uploaded)
def list_timeline_remote():
assert isinstance(env.safekeepers_remote_storage, S3Storage)
prefix = f"{env.safekeepers_remote_storage.safekeeper_timeline_path(env.initial_tenant, timeline_id_a)}/"
listing = env.safekeepers_remote_storage.client.list_objects_v2(
Bucket=env.safekeepers_remote_storage.bucket_name,
Prefix=prefix,
)
return listing.get("Contents", [])
assert list_timeline_remote() != []
sk_http = env.safekeepers[0].http_client()
env.pageserver.http_client().timeline_delete(env.initial_tenant, timeline_id_a)
# Set up failpoint
if failpoint == RemoteDeleteFailpoint.PAUSE:
sk_http.configure_failpoints((failpoint, "pause"))
elif failpoint == RemoteDeleteFailpoint.FAIL:
sk_http.configure_failpoints((failpoint, "return"))
else:
raise NotImplementedError(f"Unknown failpoint: {failpoint}")
# Delete the timeline - this should hit the configured failpoint
if failpoint == RemoteDeleteFailpoint.PAUSE:
# Expect time out
with pytest.raises(requests.exceptions.ReadTimeout, match="timed out"):
sk_http.timeline_delete(env.initial_tenant, timeline_id_a, timeout=5)
# Assert deletion didn't happy yet
assert list_timeline_remote() != []
# Unblock the background task that should still be running
sk_http.configure_failpoints((failpoint, "off"))
# Expect that after unblocking, remote deletion proceeds
def assert_remote_deleted():
assert list_timeline_remote() == []
wait_until(assert_remote_deleted)
elif failpoint == RemoteDeleteFailpoint.FAIL:
# Expect immediate failure
with pytest.raises(sk_http.HTTPError, match="Internal Server Error"):
sk_http.timeline_delete(env.initial_tenant, timeline_id_a)
sk_http.configure_failpoints((failpoint, "off"))
else:
raise NotImplementedError(f"Unknown failpoint: {failpoint}")
# Retry should succeed
sk_http.timeline_delete(env.initial_tenant, timeline_id_a)
# Remote storage should be empty
assert list_timeline_remote() == []

View File

@@ -27,7 +27,6 @@ from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import (
Endpoint,
NeonEnvBuilder,
NeonPageserver,
PgBin,
PgProtocol,
Safekeeper,
@@ -38,8 +37,6 @@ from fixtures.pageserver.utils import (
assert_prefix_empty,
assert_prefix_not_empty,
timeline_delete_wait_completed,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
@@ -55,9 +52,16 @@ from fixtures.safekeeper.http import (
TimelineCreateRequest,
)
from fixtures.safekeeper.utils import wait_walreceivers_absent
from fixtures.safekeeper_utils import (
is_flush_lsn_caught_up,
is_segment_offloaded,
is_wal_trimmed,
wait_lsn_force_checkpoint,
wait_lsn_force_checkpoint_at,
wait_lsn_force_checkpoint_at_sk,
)
from fixtures.utils import (
PropagatingThread,
get_dir_size,
query_scalar,
run_only_on_default_postgres,
skip_in_debug_build,
@@ -69,68 +73,6 @@ if TYPE_CHECKING:
from typing import Any, Self
def wait_lsn_force_checkpoint(
tenant_id: TenantId,
timeline_id: TimelineId,
endpoint: Endpoint,
ps: NeonPageserver,
pageserver_conn_options=None,
):
pageserver_conn_options = pageserver_conn_options or {}
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
log.info(f"pg_current_wal_flush_lsn is {lsn}, waiting for it on pageserver")
wait_lsn_force_checkpoint_at(lsn, tenant_id, timeline_id, ps, pageserver_conn_options)
def wait_lsn_force_checkpoint_at_sk(
safekeeper: Safekeeper,
tenant_id: TenantId,
timeline_id: TimelineId,
ps: NeonPageserver,
pageserver_conn_options=None,
):
sk_flush_lsn = safekeeper.get_flush_lsn(tenant_id, timeline_id)
wait_lsn_force_checkpoint_at(sk_flush_lsn, tenant_id, timeline_id, ps, pageserver_conn_options)
def wait_lsn_force_checkpoint_at(
lsn: Lsn,
tenant_id: TenantId,
timeline_id: TimelineId,
ps: NeonPageserver,
pageserver_conn_options=None,
):
"""
Wait until pageserver receives given lsn, force checkpoint and wait for
upload, i.e. remote_consistent_lsn advancement.
"""
pageserver_conn_options = pageserver_conn_options or {}
auth_token = None
if "password" in pageserver_conn_options:
auth_token = pageserver_conn_options["password"]
# wait for the pageserver to catch up
wait_for_last_record_lsn(
ps.http_client(auth_token=auth_token),
tenant_id,
timeline_id,
lsn,
)
# force checkpoint to advance remote_consistent_lsn
ps.http_client(auth_token).timeline_checkpoint(tenant_id, timeline_id)
# ensure that remote_consistent_lsn is advanced
wait_for_upload(
ps.http_client(auth_token=auth_token),
tenant_id,
timeline_id,
lsn,
)
@dataclass
class TimelineMetrics:
timeline_id: TimelineId
@@ -475,31 +417,6 @@ def wait(f, desc, timeout=30, wait_f=None):
wait_f()
def is_segment_offloaded(
sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, seg_end: Lsn
):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.backup_lsn >= seg_end
def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.flush_lsn >= lsn
def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
sk_wal_size = get_dir_size(sk.timeline_dir(tenant_id, timeline_id))
sk_wal_size_mb = sk_wal_size / 1024 / 1024
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}")
return sk_wal_size_mb <= target_size_mb
def test_wal_backup(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
remote_storage_kind = s3_storage()
@@ -1685,214 +1602,6 @@ def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder):
show_statuses(env.safekeepers, tenant_id, timeline_id)
@pytest.mark.parametrize("auth_enabled", [False, True])
def test_delete(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
neon_env_builder.auth_enabled = auth_enabled
env = neon_env_builder.init_start()
# FIXME: are these expected?
env.pageserver.allowed_errors.extend(
[
".*Timeline .* was not found in global map.*",
".*Timeline .* was cancelled and cannot be used anymore.*",
]
)
# Create two tenants: one will be deleted, other should be preserved.
tenant_id = env.initial_tenant
timeline_id_1 = env.create_branch("br1") # Active, delete explicitly
timeline_id_2 = env.create_branch("br2") # Inactive, delete explicitly
timeline_id_3 = env.create_branch("br3") # Active, delete with the tenant
timeline_id_4 = env.create_branch("br4") # Inactive, delete with the tenant
tenant_id_other, timeline_id_other = env.create_tenant()
# Populate branches
endpoint_1 = env.endpoints.create_start("br1")
endpoint_2 = env.endpoints.create_start("br2")
endpoint_3 = env.endpoints.create_start("br3")
endpoint_4 = env.endpoints.create_start("br4")
endpoint_other = env.endpoints.create_start("main", tenant_id=tenant_id_other)
for endpoint in [endpoint_1, endpoint_2, endpoint_3, endpoint_4, endpoint_other]:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key)")
sk = env.safekeepers[0]
sk_data_dir = sk.data_dir
if not auth_enabled:
sk_http = sk.http_client()
sk_http_other = sk_http
else:
sk_http = sk.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id))
sk_http_other = sk.http_client(
auth_token=env.auth_keys.generate_tenant_token(tenant_id_other)
)
sk_http_noauth = sk.http_client(gen_sk_wide_token=False)
assert (sk_data_dir / str(tenant_id) / str(timeline_id_1)).is_dir()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_2)).is_dir()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_4)).is_dir()
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
# Stop branches which should be inactive and restart Safekeeper to drop its in-memory state.
endpoint_2.stop_and_destroy()
endpoint_4.stop_and_destroy()
sk.stop()
sk.start()
# Ensure connections to Safekeeper are established
for endpoint in [endpoint_1, endpoint_3, endpoint_other]:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("INSERT INTO t (key) VALUES (1)")
# Stop all computes gracefully before safekeepers stop responding to them
endpoint_1.stop_and_destroy()
endpoint_3.stop_and_destroy()
# Remove initial tenant's br1 (active)
assert sk_http.timeline_delete(tenant_id, timeline_id_1)["dir_existed"]
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_2)).is_dir()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_4)).is_dir()
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
# Ensure repeated deletion succeeds
assert not sk_http.timeline_delete(tenant_id, timeline_id_1)["dir_existed"]
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_2)).is_dir()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_4)).is_dir()
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
if auth_enabled:
# Ensure we cannot delete the other tenant
for sk_h in [sk_http, sk_http_noauth]:
with pytest.raises(sk_h.HTTPError, match="Forbidden|Unauthorized"):
assert sk_h.timeline_delete(tenant_id_other, timeline_id_other)
with pytest.raises(sk_h.HTTPError, match="Forbidden|Unauthorized"):
assert sk_h.tenant_delete_force(tenant_id_other)
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
# Remove initial tenant's br2 (inactive)
assert sk_http.timeline_delete(tenant_id, timeline_id_2)["dir_existed"]
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists()
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_2)).exists()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_4)).is_dir()
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
# Remove non-existing branch, should succeed
assert not sk_http.timeline_delete(tenant_id, TimelineId("00" * 16))["dir_existed"]
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists()
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_2)).exists()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).exists()
assert (sk_data_dir / str(tenant_id) / str(timeline_id_4)).is_dir()
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
# Remove initial tenant fully (two branches are active)
response = sk_http.tenant_delete_force(tenant_id)
assert response[str(timeline_id_3)]["dir_existed"]
assert not (sk_data_dir / str(tenant_id)).exists()
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
# Remove initial tenant again.
response = sk_http.tenant_delete_force(tenant_id)
# assert response == {}
assert not (sk_data_dir / str(tenant_id)).exists()
assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir()
# Ensure the other tenant still works
sk_http_other.timeline_status(tenant_id_other, timeline_id_other)
with closing(endpoint_other.connect()) as conn:
with conn.cursor() as cur:
cur.execute("INSERT INTO t (key) VALUES (123)")
def test_delete_timeline_under_load(neon_env_builder: NeonEnvBuilder):
"""
Test deleting timelines on a safekeeper while they're under load.
This should not happen under normal operation, but it can happen if
there is some rogue compute/pageserver that is writing/reading to a
safekeeper that we're migrating a timeline away from, or if the timeline
is being deleted while such a rogue client is running.
"""
neon_env_builder.auth_enabled = True
env = neon_env_builder.init_start()
# Create two endpoints that will generate load
timeline_id_a = env.create_branch("deleteme_a")
timeline_id_b = env.create_branch("deleteme_b")
endpoint_a = env.endpoints.create("deleteme_a")
endpoint_a.start()
endpoint_b = env.endpoints.create("deleteme_b")
endpoint_b.start()
# Get tenant and timeline IDs
tenant_id = env.initial_tenant
# Start generating load on both timelines
def generate_load(endpoint: Endpoint):
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE IF NOT EXISTS t(key int, value text)")
while True:
try:
cur.execute("INSERT INTO t SELECT generate_series(1,1000), 'data'")
except: # noqa
# Ignore errors since timeline may be deleted
break
t_a = threading.Thread(target=generate_load, args=(endpoint_a,))
t_b = threading.Thread(target=generate_load, args=(endpoint_b,))
try:
t_a.start()
t_b.start()
# Let the load run for a bit
log.info("Warming up...")
time.sleep(2)
# Safekeeper errors will propagate to the pageserver: it is correct that these are
# logged at error severity because they indicate the pageserver is trying to read
# a timeline that it shouldn't.
env.pageserver.allowed_errors.extend(
[
".*Timeline.*was cancelled.*",
".*Timeline.*was not found.*",
]
)
# Try deleting timelines while under load
sk = env.safekeepers[0]
sk_http = sk.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id))
# Delete first timeline
log.info(f"Deleting {timeline_id_a}...")
assert sk_http.timeline_delete(tenant_id, timeline_id_a, only_local=True)["dir_existed"]
# Delete second timeline
log.info(f"Deleting {timeline_id_b}...")
assert sk_http.timeline_delete(tenant_id, timeline_id_b, only_local=True)["dir_existed"]
# Verify timelines are gone from disk
sk_data_dir = sk.data_dir
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_a)).exists()
# assert not (sk_data_dir / str(tenant_id) / str(timeline_id_b)).exists()
finally:
log.info("Stopping endpoints...")
# Stop endpoints with immediate mode because we deleted the timeline out from under the compute, which may cause it to hang
endpoint_a.stop(mode="immediate")
endpoint_b.stop(mode="immediate")
log.info("Joining threads...")
t_a.join()
t_b.join()
# Basic pull_timeline test.
# When live_sk_change is False, compute is restarted to change set of
# safekeepers; otherwise it is live reload.