Various test fixes + tweaks to flushing

This commit is contained in:
John Spray
2023-08-18 10:28:38 +01:00
parent 2de5efa208
commit c1bc9c0f70
5 changed files with 103 additions and 40 deletions

View File

@@ -33,7 +33,7 @@ const EXECUTE_IDLE_DEADLINE: Duration = Duration::from_secs(60);
// If the last attempt to execute failed, wait only this long before
// trying again.
const EXECUTE_RETRY_DEADLINE: Duration = Duration::from_secs(1);
const EXECUTE_RETRY_DEADLINE: Duration = Duration::from_millis(100);
// From the S3 spec
const MAX_KEYS_PER_DELETE: usize = 1000;
@@ -227,13 +227,16 @@ impl DeletionQueueClient {
// Wait until all previous deletions are executed
pub async fn flush_execute(&self) {
debug!("flush_execute: flushing to deletion lists...");
// Flush any buffered work to deletion lists
self.flush().await;
// Flush execution of deletion lists
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
debug!("flush_execute: flushing execution...");
self.do_flush(FrontendQueueMessage::FlushExecute(FlushOp { tx }), rx)
.await
.await;
debug!("flush_execute: finished flushing execution...");
}
}
@@ -253,6 +256,9 @@ pub struct BackendQueueWorker {
// from remote storage.
executed_lists: Vec<DeletionList>,
// These FlushOps should fire the next time we flush
pending_flushes: Vec<FlushOp>,
// How long to wait for a message before executing anyway
timeout: Duration,
}
@@ -264,10 +270,16 @@ impl BackendQueueWorker {
DELETION_QUEUE_ERRORS
.with_label_values(&["failpoint"])
.inc();
// Retry fast when failpoint is active, so that when it is disabled we resume promptly
self.timeout = EXECUTE_RETRY_DEADLINE;
false
});
if self.accumulator.is_empty() {
for f in self.pending_flushes.drain(..) {
f.fire();
}
return true;
}
@@ -285,6 +297,10 @@ impl BackendQueueWorker {
);
self.accumulator.clear();
self.executed_lists.append(&mut self.pending_lists);
for f in self.pending_flushes.drain(..) {
f.fire();
}
self.timeout = EXECUTE_IDLE_DEADLINE;
true
}
@@ -436,11 +452,22 @@ impl BackendQueueWorker {
self.cleanup_lists().await;
}
BackendQueueMessage::Flush(op) => {
if self.accumulator.is_empty() {
op.fire();
continue;
}
self.maybe_execute().await;
self.cleanup_lists().await;
op.fire();
if self.accumulator.is_empty() {
// Successful flush. Clean up lists before firing, for the benefit of tests that would
// like to have a deterministic state post-flush.
self.cleanup_lists().await;
op.fire();
} else {
// We didn't flush inline: defer until next time we successfully drain accumulatorr
self.pending_flushes.push(op);
}
}
}
}
@@ -482,12 +509,6 @@ impl FrontendQueueWorker {
/// This does not return errors, because on failure to flush we do not lose
/// any state: flushing will be retried implicitly on the next deadline
async fn flush(&mut self) {
let key = &self.conf.remote_deletion_list_path(self.pending.sequence);
let bytes = serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list");
let size = bytes.len();
let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes));
if self.pending.objects.is_empty() {
// We do not expect to be called in this state, but handle it so that later
// logging code can be assured that therre is always a first+last key to print
@@ -497,6 +518,12 @@ impl FrontendQueueWorker {
return;
}
let key = &self.conf.remote_deletion_list_path(self.pending.sequence);
let bytes = serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list");
let size = bytes.len();
let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes));
match self.remote_storage.upload(source, size, key, None).await {
Ok(_) => {
info!(
@@ -682,13 +709,7 @@ impl FrontendQueueWorker {
pub async fn background(&mut self) {
info!("Started deletion frontend worker");
// Before accepting any input from this pageserver lifetime, recover all deletion lists that are in S3
if let Err(e) = self.recover().await {
// This should only happen in truly unrecoverable cases, like the recovery finding that the backend
// queue receiver has been dropped.
info!("Deletion queue recover aborted, deletion queue will not proceed ({e:#})");
return;
}
let mut recovered: bool = false;
loop {
let msg = match tokio::time::timeout(self.timeout, self.rx.recv()).await {
@@ -704,16 +725,38 @@ impl FrontendQueueWorker {
}
};
// On first message, do recovery. This avoids unnecessary recovery very
// early in startup, and simplifies testing by avoiding a 404 reading the
// header on every first pageserver startup.
if !recovered {
// Before accepting any input from this pageserver lifetime, recover all deletion lists that are in S3
if let Err(e) = self.recover().await {
// This should only happen in truly unrecoverable cases, like the recovery finding that the backend
// queue receiver has been dropped.
info!(
"Deletion queue recover aborted, deletion queue will not proceed ({e:#})"
);
return;
} else {
recovered = true;
}
}
match msg {
FrontendQueueMessage::Delete(op) => {
let timeline_path = self.conf.timeline_path(&op.tenant_id, &op.timeline_id);
let _span = tracing::info_span!(
"deletion_frontend_enqueue",
tenant_id = %op.tenant_id,
timeline_id = %op.timeline_id,
);
debug!(
"Deletion enqueue {0} layers, {1} other objects",
op.layers.len(),
op.objects.len()
);
let timeline_path = self.conf.timeline_path(&op.tenant_id, &op.timeline_id);
for layer in op.layers {
// TODO go directly to remote path without composing local path
let local_path = timeline_path.join(layer.file_name());
@@ -731,6 +774,7 @@ impl FrontendQueueWorker {
FrontendQueueMessage::Flush(op) => {
if self.pending.objects.is_empty() {
// Execute immediately
debug!("No pending objects, flushing immediately");
op.fire()
} else {
// Execute next time we flush
@@ -805,6 +849,7 @@ impl DeletionQueue {
pending_lists: Vec::new(),
executed_lists: Vec::new(),
timeout: EXECUTE_IDLE_DEADLINE,
pending_flushes: Vec::new(),
}),
)
}
@@ -1002,6 +1047,12 @@ mod test {
ctx.runtime.block_on(client.flush_execute());
assert_remote_files(&[], &remote_timeline_path);
assert_remote_files(&["header-00000000-01"], &remote_deletion_prefix);
// Flushing on an empty queue should succeed immediately, and not write any lists
info!("Flush-executing on empty");
ctx.runtime.block_on(client.flush_execute());
assert_remote_files(&["header-00000000-01"], &remote_deletion_prefix);
Ok(())
}

View File

@@ -67,8 +67,8 @@ paths:
responses:
"200":
description: |
Flush completed: deletion has been attempted for enqueued objects. This does not guarantee
that deletion executed successfully.
Flush completed: if execute was true, then enqueued deletions have been completed. If execute was false,
then enqueued deletions have been persisted to deletion lists, and may have been completed.
content:
application/json:
schema:

View File

@@ -349,7 +349,7 @@ def test_remote_storage_upload_queue_retries(
# Deletion queue should not grow, because deletions wait for upload of
# metadata, and we blocked that upload.
wait_until(2, 1, lambda: assert_deletion_queue(client, lambda v: v == 0))
wait_until(10, 0.5, lambda: assert_deletion_queue(client, lambda v: v == 0))
# No more deletions should have executed
assert get_deletions_executed() == deletions_executed_pre_failpoint
@@ -366,19 +366,31 @@ def test_remote_storage_upload_queue_retries(
log.info("Waiting to see deletions enqueued")
wait_until(5, 1, lambda: assert_deletion_queue(client, lambda v: v > 0))
# Deletions should not be processed while failpoint is still active.
client.deletion_queue_flush(execute=True)
# Run flush in the backgrorund because it will block on the failpoint
class background_flush(threading.Thread):
def run(self):
client.deletion_queue_flush(execute=True)
flusher = background_flush()
flusher.start()
def assert_failpoint_hit():
assert get_deletion_errors("failpoint") > 0
# Our background flush thread should induce us to hit the failpoint
wait_until(20, 0.25, assert_failpoint_hit)
# Deletions should not have been executed while failpoint is still active.
assert get_deletion_queue_depth(client) is not None
assert get_deletion_queue_depth(client) > 0
assert get_deletions_executed() == deletions_executed_pre_failpoint
assert get_deletion_errors("failpoint") > 0
log.info("Unblocking remote deletes")
configure_storage_delete_failpoints("off")
# issue a flush to the deletion queue -- otherwise it won't retry until hits
# a deadline.
client.deletion_queue_flush(execute=True)
# An API flush should now complete
flusher.join()
# Queue should drain, which should involve executing some deletions
wait_until(2, 1, lambda: assert_deletion_queue(client, lambda v: v == 0))
assert get_deletions_executed() > deletions_executed_pre_failpoint
@@ -1109,11 +1121,9 @@ def test_deletion_queue_recovery(
def assert_deletions_submitted(n: int):
assert ps_http.get_metric_value("pageserver_deletion_queue_submitted_total") == n
# Wait for recovery to complete (this is fast but async, so need a wait_until)
#
# After restart the failpoint is reset: execution may proceed. If we deleted enough layers
# to fill a DeleteObjects request, those will have executed already, so we check the total
# number of deletions recovered ("submitted") rather than the queue length.
# After restart, issue a flush to kick the deletion frorntend to do recovery.
# It should recover all the operations we submitted before the restart.
ps_http.deletion_queue_flush(execute=False)
wait_until(20, 0.25, lambda: assert_deletions_submitted(before_restart_depth))
# The queue should drain through completely if we flush it

View File

@@ -53,6 +53,7 @@ def test_tenant_delete_smoke(
".*deletion frontend: Failed to write deletion list.*",
".*deletion backend: Failed to delete deletion list.*",
".*deletion backend: DeleteObjects request failed.*",
".*deletion backend: Failed to upload deletion queue header.*",
]
)
@@ -94,7 +95,9 @@ def test_tenant_delete_smoke(
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
tenant_delete_wait_completed(ps_http, tenant_id, iterations)
# We are running with failures enabled, so this may take some time to make
# it through all the remote storage operations required to complete
tenant_delete_wait_completed(ps_http, tenant_id, iterations * 10)
tenant_path = env.tenant_dir(tenant_id=tenant_id)
assert not tenant_path.exists()
@@ -211,6 +214,7 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
".*deletion frontend: Failed to write deletion list.*",
".*deletion backend: Failed to delete deletion list.*",
".*deletion backend: DeleteObjects request failed.*",
".*deletion backend: Failed to upload deletion queue header.*",
]
)

View File

@@ -273,10 +273,6 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
# failpoint may not be the only error in the stack
assert reason.endswith(f"failpoint: {failpoint}"), reason
# Flush deletion queue before restart/retry, so that anything logically deleted before the
# failpoint is really deleted.
ps_http.deletion_queue_flush(execute=True)
if check is Check.RETRY_WITH_RESTART:
env.pageserver.stop()
env.pageserver.start()
@@ -710,7 +706,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
wait_until(50, 0.1, first_request_finished)
# check that the timeline is gone
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id, iterations=2)
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id, iterations=4)
@pytest.mark.parametrize(
@@ -858,6 +854,8 @@ def test_delete_orphaned_objects(
reason = timeline_info["state"]["Broken"]["reason"]
assert reason.endswith(f"failpoint: {failpoint}"), reason
ps_http.deletion_queue_flush(execute=True)
for orphan in orphans:
assert not orphan.exists()
assert env.pageserver.log_contains(