safekeeper: block deletion on protocol handler shutdown (#9364)

## Problem

Two recently observed log errors indicate safekeeper tasks for a
timeline running after that timeline's deletion has started.
- https://github.com/neondatabase/neon/issues/8972
- https://github.com/neondatabase/neon/issues/8974

These code paths do not have a mechanism that coordinates task shutdown
with the overall shutdown of the timeline.

## Summary of changes

- Add a `Gate` to `Timeline`
- Take the gate as part of resident timeline guard: any code that holds
a guard over a timeline staying resident should also hold a guard over
the timeline's total lifetime.
- Take the gate from the wal removal task
- Respect Timeline::cancel in WAL send/recv code, so that we do not
block shutdown indefinitely.
- Add a test that deletes timelines with open pageserver+compute
connections, to check these get torn down as expected.

There is some risk to introducing gates: if there is code holding a gate
which does not properly respect a cancellation token, it can cause
shutdown hangs. The risk of this for safekeepers is lower in practice
than it is for other services, because in a healthy timeline deletion,
the compute is shutdown first, then the timeline is deleted on the
pageserver, and finally it is deleted on the safekeepers -- that makes
it much less likely that some protocol handler will still be running.

Closes: #8972
Closes: #8974
This commit is contained in:
John Spray
2024-11-20 11:07:45 +00:00
committed by GitHub
parent 3ae0b2149e
commit 33dce25af8
9 changed files with 270 additions and 118 deletions

View File

@@ -834,7 +834,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
use CopyStreamHandlerEnd::*;
let expected_end = match &end {
ServerInitiated(_) | CopyDone | CopyFail | Terminate | EOF => true,
ServerInitiated(_) | CopyDone | CopyFail | Terminate | EOF | Cancelled => true,
CopyStreamHandlerEnd::Disconnected(ConnectionError::Io(io_error))
if is_expected_io_error(io_error) =>
{
@@ -874,6 +874,9 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
// message from server' when it receives ErrorResponse (anything but
// CopyData/CopyDone) back.
CopyFail => Some((end.to_string(), SQLSTATE_SUCCESSFUL_COMPLETION)),
// When cancelled, send no response: we must not risk blocking on sending that response
Cancelled => None,
_ => None,
};
if let Some((err, errcode)) = err_to_send_and_errcode {
@@ -1051,6 +1054,8 @@ pub enum CopyStreamHandlerEnd {
/// The connection was lost
#[error("connection error: {0}")]
Disconnected(#[from] ConnectionError),
#[error("Shutdown")]
Cancelled,
/// Some other error
#[error(transparent)]
Other(#[from] anyhow::Error),

View File

@@ -239,6 +239,10 @@ impl SafekeeperPostgresHandler {
pgb: &mut PostgresBackend<IO>,
tli: &mut Option<WalResidentTimeline>,
) -> Result<(), CopyStreamHandlerEnd> {
// The `tli` parameter is only used for passing _out_ a timeline, one should
// not have been passed in.
assert!(tli.is_none());
// Notify the libpq client that it's allowed to send `CopyData` messages
pgb.write_message(&BeMessage::CopyBothResponse).await?;
@@ -256,6 +260,7 @@ impl SafekeeperPostgresHandler {
// sends, so this avoids deadlocks.
let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?;
let peer_addr = *pgb.get_peer_addr();
let mut network_reader = NetworkReader {
ttid: self.ttid,
conn_id: self.conn_id,
@@ -275,10 +280,14 @@ impl SafekeeperPostgresHandler {
.subscribe();
*tli = Some(timeline.wal_residence_guard().await?);
let timeline_cancel = timeline.cancel.clone();
tokio::select! {
// todo: add read|write .context to these errors
r = network_reader.run(msg_tx, msg_rx, reply_tx, timeline, next_msg) => r,
r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r,
_ = timeline_cancel.cancelled() => {
return Err(CopyStreamHandlerEnd::Cancelled);
}
}
} else {
res.map(|_| ())
@@ -303,7 +312,7 @@ impl SafekeeperPostgresHandler {
// Otherwise, WalAcceptor thread must have errored.
match wal_acceptor_res {
Ok(Ok(_)) => Ok(()), // can't happen currently; would be if we add graceful termination
Ok(Ok(_)) => Ok(()), // Clean shutdown
Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))),
Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!(
"WalAcceptor task panicked",
@@ -356,6 +365,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
Ok((tli, next_msg))
}
/// This function is cancellation-safe (only does network I/O and channel read/writes).
async fn run(
self,
msg_tx: Sender<ProposerAcceptorMessage>,
@@ -397,6 +407,7 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
loop {
let started = Instant::now();
let size = next_msg.size();
match msg_tx.send_timeout(next_msg, SLOW_THRESHOLD).await {
Ok(()) => {}
// Slow send, log a message and keep trying. Log context has timeline ID.
@@ -428,6 +439,8 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
/// Read replies from WalAcceptor and pass them back to socket. Returns Ok(())
/// if reply_rx closed; it must mean WalAcceptor terminated, joining it should
/// tell the error.
///
/// This function is cancellation-safe (only does network I/O and channel read/writes).
async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
pgb_writer: &mut PostgresBackend<IO>,
mut reply_rx: Receiver<AcceptorProposerMessage>,
@@ -461,7 +474,7 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
Some(AcceptorProposerMessage::AppendResponse(append_response))
}
_ => None,
}
},
};
let Some(msg) = msg else {
@@ -527,6 +540,10 @@ impl WalAcceptor {
/// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
/// it must mean that network thread terminated.
///
/// This function is *not* cancellation safe, it does local disk I/O: it should always
/// be allowed to run to completion. It respects Timeline::cancel and shuts down cleanly
/// when that gets triggered.
async fn run(&mut self) -> anyhow::Result<()> {
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
@@ -541,7 +558,7 @@ impl WalAcceptor {
// Tracks whether we have unflushed appends.
let mut dirty = false;
loop {
while !self.tli.is_cancelled() {
let reply = tokio::select! {
// Process inbound message.
msg = self.msg_rx.recv() => {
@@ -599,6 +616,10 @@ impl WalAcceptor {
WAL_RECEIVER_QUEUE_DEPTH.observe(self.msg_rx.len() as f64);
None // no reply
}
_ = self.tli.cancel.cancelled() => {
break;
}
};
// Send reply, if any.
@@ -610,7 +631,7 @@ impl WalAcceptor {
}
// Flush WAL on disconnect, see https://github.com/neondatabase/neon/issues/9259.
if dirty {
if dirty && !self.tli.cancel.is_cancelled() {
self.tli
.process_msg(&ProposerAcceptorMessage::FlushWAL)
.await?;

View File

@@ -456,6 +456,8 @@ impl SafekeeperPostgresHandler {
// not synchronized with sends, so this avoids deadlocks.
let reader = pgb.split().context("START_REPLICATION split")?;
let tli_cancel = tli.cancel.clone();
let mut sender = WalSender {
pgb,
// should succeed since we're already holding another guard
@@ -479,6 +481,9 @@ impl SafekeeperPostgresHandler {
// todo: add read|write .context to these errors
r = sender.run() => r,
r = reply_reader.run() => r,
_ = tli_cancel.cancelled() => {
return Err(CopyStreamHandlerEnd::Cancelled);
}
};
let ws_state = ws_guard
@@ -557,6 +562,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
/// Send WAL until
/// - an error occurs
/// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
/// - timeline's cancellation token fires
///
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
/// convenience.
@@ -601,15 +607,14 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
};
let send_buf = &send_buf[..send_size];
// and send it
self.pgb
.write_message(&BeMessage::XLogData(XLogDataBody {
// and send it, while respecting Timeline::cancel
let msg = BeMessage::XLogData(XLogDataBody {
wal_start: self.start_pos.0,
wal_end: self.end_pos.0,
timestamp: get_current_timestamp(),
data: send_buf,
}))
.await?;
});
self.pgb.write_message(&msg).await?;
if let Some(appname) = &self.appname {
if appname == "replica" {
@@ -674,13 +679,13 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
}
}
self.pgb
.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
let msg = BeMessage::KeepAlive(WalSndKeepAlive {
wal_end: self.end_pos.0,
timestamp: get_current_timestamp(),
request_reply: true,
}))
.await?;
});
self.pgb.write_message(&msg).await?;
}
}

View File

@@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize};
use tokio::fs::{self};
use tokio_util::sync::CancellationToken;
use utils::id::TenantId;
use utils::sync::gate::Gate;
use std::cmp::max;
use std::ops::{Deref, DerefMut};
@@ -467,6 +468,10 @@ pub struct Timeline {
timeline_dir: Utf8PathBuf,
manager_ctl: ManagerCtl,
/// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding
/// this gate, you must respect [`Timeline::cancel`]
pub(crate) gate: Gate,
/// Delete/cancel will trigger this, background tasks should drop out as soon as it fires
pub(crate) cancel: CancellationToken,
@@ -508,6 +513,7 @@ impl Timeline {
mutex: RwLock::new(shared_state),
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
gate: Default::default(),
cancel: CancellationToken::default(),
manager_ctl: ManagerCtl::new(),
broker_active: AtomicBool::new(false),
@@ -533,56 +539,6 @@ impl Timeline {
))
}
/// Initialize fresh timeline on disk and start background tasks. If init
/// fails, timeline is cancelled and cannot be used anymore.
///
/// Init is transactional, so if it fails, created files will be deleted,
/// and state on disk should remain unchanged.
pub async fn init_new(
self: &Arc<Timeline>,
shared_state: &mut WriteGuardSharedState<'_>,
conf: &SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
partial_backup_rate_limiter: RateLimiter,
) -> Result<()> {
match fs::metadata(&self.timeline_dir).await {
Ok(_) => {
// Timeline directory exists on disk, we should leave state unchanged
// and return error.
bail!(TimelineError::Invalid(self.ttid));
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
return Err(e.into());
}
}
// Create timeline directory.
fs::create_dir_all(&self.timeline_dir).await?;
// Write timeline to disk and start background tasks.
if let Err(e) = shared_state.sk.state_mut().flush().await {
// Bootstrap failed, cancel timeline and remove timeline directory.
self.cancel(shared_state);
if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
warn!(
"failed to remove timeline {} directory after bootstrap failure: {}",
self.ttid, fs_err
);
}
return Err(e);
}
self.bootstrap(
shared_state,
conf,
broker_active_set,
partial_backup_rate_limiter,
);
Ok(())
}
/// Bootstrap new or existing timeline starting background tasks.
pub fn bootstrap(
self: &Arc<Timeline>,
@@ -593,33 +549,61 @@ impl Timeline {
) {
let (tx, rx) = self.manager_ctl.bootstrap_manager();
let Ok(gate_guard) = self.gate.enter() else {
// Init raced with shutdown
return;
};
// Start manager task which will monitor timeline state and update
// background tasks.
tokio::spawn(timeline_manager::main_task(
ManagerTimeline { tli: self.clone() },
conf.clone(),
tokio::spawn({
let this = self.clone();
let conf = conf.clone();
async move {
let _gate_guard = gate_guard;
timeline_manager::main_task(
ManagerTimeline { tli: this },
conf,
broker_active_set,
tx,
rx,
partial_backup_rate_limiter,
));
)
.await
}
});
}
/// Background timeline activities (which hold Timeline::gate) will no
/// longer run once this function completes.
pub async fn shutdown(&self) {
info!("timeline {} shutting down", self.ttid);
self.cancel.cancel();
// Wait for any concurrent tasks to stop using this timeline, to avoid e.g. attempts
// to read deleted files.
self.gate.close().await;
}
/// Delete timeline from disk completely, by removing timeline directory.
/// Background timeline activities will stop eventually.
///
/// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but
/// deletion API endpoint is retriable.
///
/// Timeline must be in shut-down state (i.e. call [`Self::shutdown`] first)
pub async fn delete(
&self,
shared_state: &mut WriteGuardSharedState<'_>,
only_local: bool,
) -> Result<bool> {
self.cancel(shared_state);
// Assert that [`Self::shutdown`] was already called
assert!(self.cancel.is_cancelled());
assert!(self.gate.close_complete());
// Close associated FDs. Nobody will be able to touch timeline data once
// it is cancelled, so WAL storage won't be opened again.
shared_state.sk.close_wal_store();
// TODO: It's better to wait for s3 offloader termination before
// removing data from s3. Though since s3 doesn't have transactions it
// still wouldn't guarantee absense of data after removal.
let conf = GlobalTimelines::get_global_config();
if !only_local && conf.is_wal_backup_enabled() {
// Note: we concurrently delete remote storage data from multiple
@@ -631,16 +615,6 @@ impl Timeline {
Ok(dir_existed)
}
/// Cancel timeline to prevent further usage. Background tasks will stop
/// eventually after receiving cancellation signal.
fn cancel(&self, shared_state: &mut WriteGuardSharedState<'_>) {
info!("timeline {} is cancelled", self.ttid);
self.cancel.cancel();
// Close associated FDs. Nobody will be able to touch timeline data once
// it is cancelled, so WAL storage won't be opened again.
shared_state.sk.close_wal_store();
}
/// Returns if timeline is cancelled.
pub fn is_cancelled(&self) -> bool {
self.cancel.is_cancelled()

View File

@@ -7,6 +7,7 @@
use std::collections::HashSet;
use tracing::debug;
use utils::sync::gate::GateGuard;
use crate::timeline_manager::ManagerCtlMessage;
@@ -16,6 +17,12 @@ pub struct GuardId(u64);
pub struct ResidenceGuard {
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
guard_id: GuardId,
/// [`ResidenceGuard`] represents a guarantee that a timeline's data remains resident,
/// which by extension also means the timeline is not shut down (since after shut down
/// our data may be deleted). Therefore everyone holding a residence guard must also
/// hold a guard on [`crate::timeline::Timeline::gate`]
_gate_guard: GateGuard,
}
impl Drop for ResidenceGuard {
@@ -52,7 +59,8 @@ impl AccessService {
self.guards.is_empty()
}
pub(crate) fn create_guard(&mut self) -> ResidenceGuard {
/// `timeline_gate_guard` is a guarantee that the timeline is not shut down
pub(crate) fn create_guard(&mut self, timeline_gate_guard: GateGuard) -> ResidenceGuard {
let guard_id = self.next_guard_id;
self.next_guard_id += 1;
self.guards.insert(guard_id);
@@ -63,6 +71,7 @@ impl AccessService {
ResidenceGuard {
manager_tx: self.manager_tx.clone(),
guard_id,
_gate_guard: timeline_gate_guard,
}
}

View File

@@ -266,9 +266,11 @@ pub async fn main_task(
// Start recovery task which always runs on the timeline.
if !mgr.is_offloaded && mgr.conf.peer_recovery_enabled {
let tli = mgr.wal_resident_timeline();
// Recovery task is only spawned if we can get a residence guard (i.e. timeline is not already shutting down)
if let Ok(tli) = mgr.wal_resident_timeline() {
mgr.recovery_task = Some(tokio::spawn(recovery_main(tli, mgr.conf.clone())));
}
}
// If timeline is evicted, reflect that in the metric.
if mgr.is_offloaded {
@@ -375,6 +377,13 @@ pub async fn main_task(
// shutdown background tasks
if mgr.conf.is_wal_backup_enabled() {
if let Some(backup_task) = mgr.backup_task.take() {
// If we fell through here, then the timeline is shutting down. This is important
// because otherwise joining on the wal_backup handle might hang.
assert!(mgr.tli.cancel.is_cancelled());
backup_task.join().await;
}
wal_backup::update_task(&mut mgr, false, &last_state).await;
}
@@ -442,10 +451,18 @@ impl Manager {
/// Get a WalResidentTimeline.
/// Manager code must use this function instead of one from `Timeline`
/// directly, because it will deadlock.
pub(crate) fn wal_resident_timeline(&mut self) -> WalResidentTimeline {
///
/// This function is fallible because the guard may not be created if the timeline is
/// shutting down.
pub(crate) fn wal_resident_timeline(&mut self) -> anyhow::Result<WalResidentTimeline> {
assert!(!self.is_offloaded);
let guard = self.access_service.create_guard();
WalResidentTimeline::new(self.tli.clone(), guard)
let guard = self.access_service.create_guard(
self.tli
.gate
.enter()
.map_err(|_| anyhow::anyhow!("Timeline shutting down"))?,
);
Ok(WalResidentTimeline::new(self.tli.clone(), guard))
}
/// Get a snapshot of the timeline state.
@@ -559,6 +576,11 @@ impl Manager {
if removal_horizon_segno > self.last_removed_segno {
// we need to remove WAL
let Ok(timeline_gate_guard) = self.tli.gate.enter() else {
tracing::info!("Timeline shutdown, not spawning WAL removal task");
return;
};
let remover = match self.tli.read_shared_state().await.sk {
StateSK::Loaded(ref sk) => {
crate::wal_storage::Storage::remove_up_to(&sk.wal_store, removal_horizon_segno)
@@ -573,6 +595,8 @@ impl Manager {
self.wal_removal_task = Some(tokio::spawn(
async move {
let _timeline_gate_guard = timeline_gate_guard;
remover.await?;
Ok(removal_horizon_segno)
}
@@ -619,10 +643,15 @@ impl Manager {
return;
}
let Ok(resident) = self.wal_resident_timeline() else {
// Shutting down
return;
};
// Get WalResidentTimeline and start partial backup task.
let cancel = CancellationToken::new();
let handle = tokio::spawn(wal_backup_partial::main_task(
self.wal_resident_timeline(),
resident,
self.conf.clone(),
self.global_rate_limiter.clone(),
cancel.clone(),
@@ -664,7 +693,7 @@ impl Manager {
self.partial_backup_task = None;
}
let tli = self.wal_resident_timeline();
let tli = self.wal_resident_timeline()?;
let mut partial_backup = PartialBackup::new(tli, self.conf.clone()).await;
// Reset might fail e.g. when cfile is already reset but s3 removal
// failed, so set manager state to None beforehand. In any case caller
@@ -688,7 +717,12 @@ impl Manager {
let guard = if self.is_offloaded {
Err(anyhow::anyhow!("timeline is offloaded, can't get a guard"))
} else {
Ok(self.access_service.create_guard())
match self.tli.gate.enter() {
Ok(gate_guard) => Ok(self.access_service.create_guard(gate_guard)),
Err(_) => Err(anyhow::anyhow!(
"timeline is shutting down, can't get a guard"
)),
}
};
if tx.send(guard).is_err() {
@@ -699,7 +733,10 @@ impl Manager {
let result = if self.is_offloaded {
None
} else {
Some(self.access_service.create_guard())
match self.tli.gate.enter() {
Ok(gate_guard) => Some(self.access_service.create_guard(gate_guard)),
Err(_) => None,
}
};
if tx.send(result).is_err() {

View File

@@ -457,10 +457,12 @@ impl GlobalTimelines {
Ok(timeline) => {
let was_active = timeline.broker_active.load(Ordering::Relaxed);
info!("deleting timeline {}, only_local={}", ttid, only_local);
timeline.shutdown().await;
// Take a lock and finish the deletion holding this mutex.
let mut shared_state = timeline.write_shared_state().await;
info!("deleting timeline {}, only_local={}", ttid, only_local);
let dir_existed = timeline.delete(&mut shared_state, only_local).await?;
Ok(TimelineDeleteForceResult {

View File

@@ -25,7 +25,6 @@ use tokio::fs::File;
use tokio::select;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::{watch, OnceCell};
use tokio::time::sleep;
use tracing::*;
use utils::{id::TenantTimelineId, lsn::Lsn};
@@ -46,6 +45,14 @@ pub struct WalBackupTaskHandle {
handle: JoinHandle<()>,
}
impl WalBackupTaskHandle {
pub(crate) async fn join(self) {
if let Err(e) = self.handle.await {
error!("WAL backup task panicked: {}", e);
}
}
}
/// Do we have anything to upload to S3, i.e. should safekeepers run backup activity?
pub(crate) fn is_wal_backup_required(
wal_seg_size: usize,
@@ -74,11 +81,12 @@ pub(crate) async fn update_task(mgr: &mut Manager, need_backup: bool, state: &St
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let async_task = backup_task_main(
mgr.wal_resident_timeline(),
mgr.conf.backup_parallel_jobs,
shutdown_rx,
);
let Ok(resident) = mgr.wal_resident_timeline() else {
info!("Timeline shut down");
return;
};
let async_task = backup_task_main(resident, mgr.conf.backup_parallel_jobs, shutdown_rx);
let handle = if mgr.conf.current_thread_runtime {
tokio::spawn(async_task)
@@ -108,9 +116,7 @@ async fn shut_down_task(entry: &mut Option<WalBackupTaskHandle>) {
// Tell the task to shutdown. Error means task exited earlier, that's ok.
let _ = wb_handle.shutdown_tx.send(()).await;
// Await the task itself. TODO: restart panicked tasks earlier.
if let Err(e) = wb_handle.handle.await {
warn!("WAL backup task panicked: {}", e);
}
wb_handle.join().await;
}
}
@@ -214,6 +220,7 @@ async fn backup_task_main(
let _guard = WAL_BACKUP_TASKS.guard();
info!("started");
let cancel = tli.tli.cancel.clone();
let mut wb = WalBackupTask {
wal_seg_size: tli.get_wal_seg_size().await,
commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
@@ -230,25 +237,34 @@ async fn backup_task_main(
_ = wb.run() => {}
_ = shutdown_rx.recv() => {
canceled = true;
},
_ = cancel.cancelled() => {
canceled = true;
}
}
info!("task {}", if canceled { "canceled" } else { "terminated" });
}
impl WalBackupTask {
/// This function must be called from a select! that also respects self.timeline's
/// cancellation token. This is done in [`backup_task_main`].
///
/// The future returned by this function is safe to drop at any time because it
/// does not write to local disk.
async fn run(&mut self) {
let mut backup_lsn = Lsn(0);
let mut retry_attempt = 0u32;
// offload loop
loop {
while !self.timeline.cancel.is_cancelled() {
if retry_attempt == 0 {
// wait for new WAL to arrive
if let Err(e) = self.commit_lsn_watch_rx.changed().await {
// should never happen, as we hold Arc to timeline.
// should never happen, as we hold Arc to timeline and transmitter's lifetime
// is within Timeline's
error!("commit_lsn watch shut down: {:?}", e);
return;
}
};
} else {
// or just sleep if we errored previously
let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
@@ -256,7 +272,7 @@ impl WalBackupTask {
{
retry_delay = min(retry_delay, backoff_delay);
}
sleep(Duration::from_millis(retry_delay)).await;
tokio::time::sleep(Duration::from_millis(retry_delay)).await;
}
let commit_lsn = *self.commit_lsn_watch_rx.borrow();

View File

@@ -1784,6 +1784,89 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
cur.execute("INSERT INTO t (key) VALUES (123)")
def test_delete_timeline_under_load(neon_env_builder: NeonEnvBuilder):
"""
Test deleting timelines on a safekeeper while they're under load.
This should not happen under normal operation, but it can happen if
there is some rogue compute/pageserver that is writing/reading to a
safekeeper that we're migrating a timeline away from, or if the timeline
is being deleted while such a rogue client is running.
"""
neon_env_builder.auth_enabled = True
env = neon_env_builder.init_start()
# Create two endpoints that will generate load
timeline_id_a = env.create_branch("deleteme_a")
timeline_id_b = env.create_branch("deleteme_b")
endpoint_a = env.endpoints.create("deleteme_a")
endpoint_a.start()
endpoint_b = env.endpoints.create("deleteme_b")
endpoint_b.start()
# Get tenant and timeline IDs
tenant_id = env.initial_tenant
# Start generating load on both timelines
def generate_load(endpoint: Endpoint):
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE IF NOT EXISTS t(key int, value text)")
while True:
try:
cur.execute("INSERT INTO t SELECT generate_series(1,1000), 'data'")
except: # noqa
# Ignore errors since timeline may be deleted
break
t_a = threading.Thread(target=generate_load, args=(endpoint_a,))
t_b = threading.Thread(target=generate_load, args=(endpoint_b,))
try:
t_a.start()
t_b.start()
# Let the load run for a bit
log.info("Warming up...")
time.sleep(2)
# Safekeeper errors will propagate to the pageserver: it is correct that these are
# logged at error severity because they indicate the pageserver is trying to read
# a timeline that it shouldn't.
env.pageserver.allowed_errors.extend(
[
".*Timeline.*was cancelled.*",
".*Timeline.*was not found.*",
]
)
# Try deleting timelines while under load
sk = env.safekeepers[0]
sk_http = sk.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id))
# Delete first timeline
log.info(f"Deleting {timeline_id_a}...")
assert sk_http.timeline_delete(tenant_id, timeline_id_a, only_local=True)["dir_existed"]
# Delete second timeline
log.info(f"Deleting {timeline_id_b}...")
assert sk_http.timeline_delete(tenant_id, timeline_id_b, only_local=True)["dir_existed"]
# Verify timelines are gone from disk
sk_data_dir = sk.data_dir
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_a)).exists()
# assert not (sk_data_dir / str(tenant_id) / str(timeline_id_b)).exists()
finally:
log.info("Stopping endpoints...")
# Stop endpoints with immediate mode because we deleted the timeline out from under the compute, which may cause it to hang
endpoint_a.stop(mode="immediate")
endpoint_b.stop(mode="immediate")
log.info("Joining threads...")
t_a.join()
t_b.join()
# Basic pull_timeline test.
# When live_sk_change is False, compute is restarted to change set of
# safekeepers; otherwise it is live reload.