From c1bc9c0f70b14fe06e7022a1c96c6480c3334020 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 18 Aug 2023 10:28:38 +0100 Subject: [PATCH] Various test fixes + tweaks to flushing --- pageserver/src/deletion_queue.rs | 91 ++++++++++++++++----- pageserver/src/http/openapi_spec.yml | 4 +- test_runner/regress/test_remote_storage.py | 34 +++++--- test_runner/regress/test_tenant_delete.py | 6 +- test_runner/regress/test_timeline_delete.py | 8 +- 5 files changed, 103 insertions(+), 40 deletions(-) diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 9d1a441a62..82ff7ca4d6 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -33,7 +33,7 @@ const EXECUTE_IDLE_DEADLINE: Duration = Duration::from_secs(60); // If the last attempt to execute failed, wait only this long before // trying again. -const EXECUTE_RETRY_DEADLINE: Duration = Duration::from_secs(1); +const EXECUTE_RETRY_DEADLINE: Duration = Duration::from_millis(100); // From the S3 spec const MAX_KEYS_PER_DELETE: usize = 1000; @@ -227,13 +227,16 @@ impl DeletionQueueClient { // Wait until all previous deletions are executed pub async fn flush_execute(&self) { + debug!("flush_execute: flushing to deletion lists..."); // Flush any buffered work to deletion lists self.flush().await; // Flush execution of deletion lists let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + debug!("flush_execute: flushing execution..."); self.do_flush(FrontendQueueMessage::FlushExecute(FlushOp { tx }), rx) - .await + .await; + debug!("flush_execute: finished flushing execution..."); } } @@ -253,6 +256,9 @@ pub struct BackendQueueWorker { // from remote storage. executed_lists: Vec, + // These FlushOps should fire the next time we flush + pending_flushes: Vec, + // How long to wait for a message before executing anyway timeout: Duration, } @@ -264,10 +270,16 @@ impl BackendQueueWorker { DELETION_QUEUE_ERRORS .with_label_values(&["failpoint"]) .inc(); + + // Retry fast when failpoint is active, so that when it is disabled we resume promptly + self.timeout = EXECUTE_RETRY_DEADLINE; false }); if self.accumulator.is_empty() { + for f in self.pending_flushes.drain(..) { + f.fire(); + } return true; } @@ -285,6 +297,10 @@ impl BackendQueueWorker { ); self.accumulator.clear(); self.executed_lists.append(&mut self.pending_lists); + + for f in self.pending_flushes.drain(..) { + f.fire(); + } self.timeout = EXECUTE_IDLE_DEADLINE; true } @@ -436,11 +452,22 @@ impl BackendQueueWorker { self.cleanup_lists().await; } BackendQueueMessage::Flush(op) => { + if self.accumulator.is_empty() { + op.fire(); + continue; + } + self.maybe_execute().await; - self.cleanup_lists().await; - - op.fire(); + if self.accumulator.is_empty() { + // Successful flush. Clean up lists before firing, for the benefit of tests that would + // like to have a deterministic state post-flush. + self.cleanup_lists().await; + op.fire(); + } else { + // We didn't flush inline: defer until next time we successfully drain accumulatorr + self.pending_flushes.push(op); + } } } } @@ -482,12 +509,6 @@ impl FrontendQueueWorker { /// This does not return errors, because on failure to flush we do not lose /// any state: flushing will be retried implicitly on the next deadline async fn flush(&mut self) { - let key = &self.conf.remote_deletion_list_path(self.pending.sequence); - - let bytes = serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list"); - let size = bytes.len(); - let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes)); - if self.pending.objects.is_empty() { // We do not expect to be called in this state, but handle it so that later // logging code can be assured that therre is always a first+last key to print @@ -497,6 +518,12 @@ impl FrontendQueueWorker { return; } + let key = &self.conf.remote_deletion_list_path(self.pending.sequence); + + let bytes = serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list"); + let size = bytes.len(); + let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes)); + match self.remote_storage.upload(source, size, key, None).await { Ok(_) => { info!( @@ -682,13 +709,7 @@ impl FrontendQueueWorker { pub async fn background(&mut self) { info!("Started deletion frontend worker"); - // Before accepting any input from this pageserver lifetime, recover all deletion lists that are in S3 - if let Err(e) = self.recover().await { - // This should only happen in truly unrecoverable cases, like the recovery finding that the backend - // queue receiver has been dropped. - info!("Deletion queue recover aborted, deletion queue will not proceed ({e:#})"); - return; - } + let mut recovered: bool = false; loop { let msg = match tokio::time::timeout(self.timeout, self.rx.recv()).await { @@ -704,16 +725,38 @@ impl FrontendQueueWorker { } }; + // On first message, do recovery. This avoids unnecessary recovery very + // early in startup, and simplifies testing by avoiding a 404 reading the + // header on every first pageserver startup. + if !recovered { + // Before accepting any input from this pageserver lifetime, recover all deletion lists that are in S3 + if let Err(e) = self.recover().await { + // This should only happen in truly unrecoverable cases, like the recovery finding that the backend + // queue receiver has been dropped. + info!( + "Deletion queue recover aborted, deletion queue will not proceed ({e:#})" + ); + return; + } else { + recovered = true; + } + } + match msg { FrontendQueueMessage::Delete(op) => { - let timeline_path = self.conf.timeline_path(&op.tenant_id, &op.timeline_id); - let _span = tracing::info_span!( "deletion_frontend_enqueue", tenant_id = %op.tenant_id, timeline_id = %op.timeline_id, ); + debug!( + "Deletion enqueue {0} layers, {1} other objects", + op.layers.len(), + op.objects.len() + ); + + let timeline_path = self.conf.timeline_path(&op.tenant_id, &op.timeline_id); for layer in op.layers { // TODO go directly to remote path without composing local path let local_path = timeline_path.join(layer.file_name()); @@ -731,6 +774,7 @@ impl FrontendQueueWorker { FrontendQueueMessage::Flush(op) => { if self.pending.objects.is_empty() { // Execute immediately + debug!("No pending objects, flushing immediately"); op.fire() } else { // Execute next time we flush @@ -805,6 +849,7 @@ impl DeletionQueue { pending_lists: Vec::new(), executed_lists: Vec::new(), timeout: EXECUTE_IDLE_DEADLINE, + pending_flushes: Vec::new(), }), ) } @@ -1002,6 +1047,12 @@ mod test { ctx.runtime.block_on(client.flush_execute()); assert_remote_files(&[], &remote_timeline_path); assert_remote_files(&["header-00000000-01"], &remote_deletion_prefix); + + // Flushing on an empty queue should succeed immediately, and not write any lists + info!("Flush-executing on empty"); + ctx.runtime.block_on(client.flush_execute()); + assert_remote_files(&["header-00000000-01"], &remote_deletion_prefix); + Ok(()) } diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 2845cf5e38..b70438a35f 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -67,8 +67,8 @@ paths: responses: "200": description: | - Flush completed: deletion has been attempted for enqueued objects. This does not guarantee - that deletion executed successfully. + Flush completed: if execute was true, then enqueued deletions have been completed. If execute was false, + then enqueued deletions have been persisted to deletion lists, and may have been completed. content: application/json: schema: diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 62bba275b4..5776f66677 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -349,7 +349,7 @@ def test_remote_storage_upload_queue_retries( # Deletion queue should not grow, because deletions wait for upload of # metadata, and we blocked that upload. - wait_until(2, 1, lambda: assert_deletion_queue(client, lambda v: v == 0)) + wait_until(10, 0.5, lambda: assert_deletion_queue(client, lambda v: v == 0)) # No more deletions should have executed assert get_deletions_executed() == deletions_executed_pre_failpoint @@ -366,19 +366,31 @@ def test_remote_storage_upload_queue_retries( log.info("Waiting to see deletions enqueued") wait_until(5, 1, lambda: assert_deletion_queue(client, lambda v: v > 0)) - # Deletions should not be processed while failpoint is still active. - client.deletion_queue_flush(execute=True) + # Run flush in the backgrorund because it will block on the failpoint + class background_flush(threading.Thread): + def run(self): + client.deletion_queue_flush(execute=True) + + flusher = background_flush() + flusher.start() + + def assert_failpoint_hit(): + assert get_deletion_errors("failpoint") > 0 + + # Our background flush thread should induce us to hit the failpoint + wait_until(20, 0.25, assert_failpoint_hit) + + # Deletions should not have been executed while failpoint is still active. assert get_deletion_queue_depth(client) is not None assert get_deletion_queue_depth(client) > 0 assert get_deletions_executed() == deletions_executed_pre_failpoint - assert get_deletion_errors("failpoint") > 0 log.info("Unblocking remote deletes") configure_storage_delete_failpoints("off") - # issue a flush to the deletion queue -- otherwise it won't retry until hits - # a deadline. - client.deletion_queue_flush(execute=True) + # An API flush should now complete + flusher.join() + # Queue should drain, which should involve executing some deletions wait_until(2, 1, lambda: assert_deletion_queue(client, lambda v: v == 0)) assert get_deletions_executed() > deletions_executed_pre_failpoint @@ -1109,11 +1121,9 @@ def test_deletion_queue_recovery( def assert_deletions_submitted(n: int): assert ps_http.get_metric_value("pageserver_deletion_queue_submitted_total") == n - # Wait for recovery to complete (this is fast but async, so need a wait_until) - # - # After restart the failpoint is reset: execution may proceed. If we deleted enough layers - # to fill a DeleteObjects request, those will have executed already, so we check the total - # number of deletions recovered ("submitted") rather than the queue length. + # After restart, issue a flush to kick the deletion frorntend to do recovery. + # It should recover all the operations we submitted before the restart. + ps_http.deletion_queue_flush(execute=False) wait_until(20, 0.25, lambda: assert_deletions_submitted(before_restart_depth)) # The queue should drain through completely if we flush it diff --git a/test_runner/regress/test_tenant_delete.py b/test_runner/regress/test_tenant_delete.py index 3d9ba80a57..5a1514940d 100644 --- a/test_runner/regress/test_tenant_delete.py +++ b/test_runner/regress/test_tenant_delete.py @@ -53,6 +53,7 @@ def test_tenant_delete_smoke( ".*deletion frontend: Failed to write deletion list.*", ".*deletion backend: Failed to delete deletion list.*", ".*deletion backend: DeleteObjects request failed.*", + ".*deletion backend: Failed to upload deletion queue header.*", ] ) @@ -94,7 +95,9 @@ def test_tenant_delete_smoke( iterations = poll_for_remote_storage_iterations(remote_storage_kind) - tenant_delete_wait_completed(ps_http, tenant_id, iterations) + # We are running with failures enabled, so this may take some time to make + # it through all the remote storage operations required to complete + tenant_delete_wait_completed(ps_http, tenant_id, iterations * 10) tenant_path = env.tenant_dir(tenant_id=tenant_id) assert not tenant_path.exists() @@ -211,6 +214,7 @@ def test_delete_tenant_exercise_crash_safety_failpoints( ".*deletion frontend: Failed to write deletion list.*", ".*deletion backend: Failed to delete deletion list.*", ".*deletion backend: DeleteObjects request failed.*", + ".*deletion backend: Failed to upload deletion queue header.*", ] ) diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index bd65e6cf83..64d57503a5 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -273,10 +273,6 @@ def test_delete_timeline_exercise_crash_safety_failpoints( # failpoint may not be the only error in the stack assert reason.endswith(f"failpoint: {failpoint}"), reason - # Flush deletion queue before restart/retry, so that anything logically deleted before the - # failpoint is really deleted. - ps_http.deletion_queue_flush(execute=True) - if check is Check.RETRY_WITH_RESTART: env.pageserver.stop() env.pageserver.start() @@ -710,7 +706,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): wait_until(50, 0.1, first_request_finished) # check that the timeline is gone - wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id, iterations=2) + wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id, iterations=4) @pytest.mark.parametrize( @@ -858,6 +854,8 @@ def test_delete_orphaned_objects( reason = timeline_info["state"]["Broken"]["reason"] assert reason.endswith(f"failpoint: {failpoint}"), reason + ps_http.deletion_queue_flush(execute=True) + for orphan in orphans: assert not orphan.exists() assert env.pageserver.log_contains(