From ec4072f84577eeb2a92d97fa77281efe50325730 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 6 Dec 2024 11:12:39 +0100 Subject: [PATCH] pageserver: add `wait_until_flushed` parameter for timeline checkpoint (#10013) ## Problem I'm writing an ingest benchmark in #9812. To time S3 uploads, I need to schedule a flush of the Pageserver's in-memory layer, but don't actually want to wait around for it to complete (which will take a minute). ## Summary of changes Add a parameter `wait_until_flush` (default `true`) for `timeline/checkpoint` to control whether to wait for the flush to complete. --- pageserver/src/http/routes.rs | 12 ++++++++---- pageserver/src/tenant/timeline.rs | 26 ++++++++++++++++--------- test_runner/fixtures/pageserver/http.py | 5 ++++- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index b3981b4a8e..b7fddb065c 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2148,16 +2148,20 @@ async fn timeline_checkpoint_handler( // By default, checkpoints come with a compaction, but this may be optionally disabled by tests that just want to flush + upload. let compact = parse_query_param::<_, bool>(&request, "compact")?.unwrap_or(true); + let wait_until_flushed: bool = + parse_query_param(&request, "wait_until_flushed")?.unwrap_or(true); + let wait_until_uploaded = parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false); async { let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?; - timeline - .freeze_and_flush() - .await - .map_err(|e| { + if wait_until_flushed { + timeline.freeze_and_flush().await + } else { + timeline.freeze().await.and(Ok(())) + }.map_err(|e| { match e { tenant::timeline::FlushLayerError::Cancelled => ApiError::ShuttingDown, other => ApiError::InternalServerError(other.into()), diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index fc69525bf4..aab6703a3c 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1457,23 +1457,31 @@ impl Timeline { Ok(lease) } - /// Flush to disk all data that was written with the put_* functions + /// Freeze the current open in-memory layer. It will be written to disk on next iteration. + /// Returns the flush request ID which can be awaited with wait_flush_completion(). + #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))] + pub(crate) async fn freeze(&self) -> Result { + self.freeze0().await + } + + /// Freeze and flush the open in-memory layer, waiting for it to be written to disk. #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))] pub(crate) async fn freeze_and_flush(&self) -> Result<(), FlushLayerError> { self.freeze_and_flush0().await } + /// Freeze the current open in-memory layer. It will be written to disk on next iteration. + /// Returns the flush request ID which can be awaited with wait_flush_completion(). + pub(crate) async fn freeze0(&self) -> Result { + let mut g = self.write_lock.lock().await; + let to_lsn = self.get_last_record_lsn(); + self.freeze_inmem_layer_at(to_lsn, &mut g).await + } + // This exists to provide a non-span creating version of `freeze_and_flush` we can call without // polluting the span hierarchy. pub(crate) async fn freeze_and_flush0(&self) -> Result<(), FlushLayerError> { - let token = { - // Freeze the current open in-memory layer. It will be written to disk on next - // iteration. - let mut g = self.write_lock.lock().await; - - let to_lsn = self.get_last_record_lsn(); - self.freeze_inmem_layer_at(to_lsn, &mut g).await? - }; + let token = self.freeze0().await?; self.wait_flush_completion(token).await } diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 4cf3ece396..0832eac22f 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -850,6 +850,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): force_repartition=False, force_image_layer_creation=False, force_l0_compaction=False, + wait_until_flushed=True, wait_until_uploaded=False, compact: bool | None = None, **kwargs, @@ -862,6 +863,8 @@ class PageserverHttpClient(requests.Session, MetricsGetter): query["force_image_layer_creation"] = "true" if force_l0_compaction: query["force_l0_compaction"] = "true" + if not wait_until_flushed: + query["wait_until_flushed"] = "false" if wait_until_uploaded: query["wait_until_uploaded"] = "true" @@ -869,7 +872,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): query["compact"] = "true" if compact else "false" log.info( - f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}, wait_until_uploaded={wait_until_uploaded}" + f"Requesting checkpoint: tenant={tenant_id} timeline={timeline_id} wait_until_flushed={wait_until_flushed} wait_until_uploaded={wait_until_uploaded} compact={compact}" ) res = self.put( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",