mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 22:42:57 +00:00
pageserver: add wait_until_flushed parameter for timeline checkpoint (#10013)
## Problem I'm writing an ingest benchmark in #9812. To time S3 uploads, I need to schedule a flush of the Pageserver's in-memory layer, but don't actually want to wait around for it to complete (which will take a minute). ## Summary of changes Add a parameter `wait_until_flush` (default `true`) for `timeline/checkpoint` to control whether to wait for the flush to complete.
This commit is contained in:
@@ -2148,16 +2148,20 @@ async fn timeline_checkpoint_handler(
|
||||
// By default, checkpoints come with a compaction, but this may be optionally disabled by tests that just want to flush + upload.
|
||||
let compact = parse_query_param::<_, bool>(&request, "compact")?.unwrap_or(true);
|
||||
|
||||
let wait_until_flushed: bool =
|
||||
parse_query_param(&request, "wait_until_flushed")?.unwrap_or(true);
|
||||
|
||||
let wait_until_uploaded =
|
||||
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
|
||||
timeline
|
||||
.freeze_and_flush()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if wait_until_flushed {
|
||||
timeline.freeze_and_flush().await
|
||||
} else {
|
||||
timeline.freeze().await.and(Ok(()))
|
||||
}.map_err(|e| {
|
||||
match e {
|
||||
tenant::timeline::FlushLayerError::Cancelled => ApiError::ShuttingDown,
|
||||
other => ApiError::InternalServerError(other.into()),
|
||||
|
||||
@@ -1457,23 +1457,31 @@ impl Timeline {
|
||||
Ok(lease)
|
||||
}
|
||||
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
/// Freeze the current open in-memory layer. It will be written to disk on next iteration.
|
||||
/// Returns the flush request ID which can be awaited with wait_flush_completion().
|
||||
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
|
||||
pub(crate) async fn freeze(&self) -> Result<u64, FlushLayerError> {
|
||||
self.freeze0().await
|
||||
}
|
||||
|
||||
/// Freeze and flush the open in-memory layer, waiting for it to be written to disk.
|
||||
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
|
||||
pub(crate) async fn freeze_and_flush(&self) -> Result<(), FlushLayerError> {
|
||||
self.freeze_and_flush0().await
|
||||
}
|
||||
|
||||
/// Freeze the current open in-memory layer. It will be written to disk on next iteration.
|
||||
/// Returns the flush request ID which can be awaited with wait_flush_completion().
|
||||
pub(crate) async fn freeze0(&self) -> Result<u64, FlushLayerError> {
|
||||
let mut g = self.write_lock.lock().await;
|
||||
let to_lsn = self.get_last_record_lsn();
|
||||
self.freeze_inmem_layer_at(to_lsn, &mut g).await
|
||||
}
|
||||
|
||||
// This exists to provide a non-span creating version of `freeze_and_flush` we can call without
|
||||
// polluting the span hierarchy.
|
||||
pub(crate) async fn freeze_and_flush0(&self) -> Result<(), FlushLayerError> {
|
||||
let token = {
|
||||
// Freeze the current open in-memory layer. It will be written to disk on next
|
||||
// iteration.
|
||||
let mut g = self.write_lock.lock().await;
|
||||
|
||||
let to_lsn = self.get_last_record_lsn();
|
||||
self.freeze_inmem_layer_at(to_lsn, &mut g).await?
|
||||
};
|
||||
let token = self.freeze0().await?;
|
||||
self.wait_flush_completion(token).await
|
||||
}
|
||||
|
||||
|
||||
@@ -850,6 +850,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
force_repartition=False,
|
||||
force_image_layer_creation=False,
|
||||
force_l0_compaction=False,
|
||||
wait_until_flushed=True,
|
||||
wait_until_uploaded=False,
|
||||
compact: bool | None = None,
|
||||
**kwargs,
|
||||
@@ -862,6 +863,8 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
query["force_image_layer_creation"] = "true"
|
||||
if force_l0_compaction:
|
||||
query["force_l0_compaction"] = "true"
|
||||
if not wait_until_flushed:
|
||||
query["wait_until_flushed"] = "false"
|
||||
if wait_until_uploaded:
|
||||
query["wait_until_uploaded"] = "true"
|
||||
|
||||
@@ -869,7 +872,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
query["compact"] = "true" if compact else "false"
|
||||
|
||||
log.info(
|
||||
f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}, wait_until_uploaded={wait_until_uploaded}"
|
||||
f"Requesting checkpoint: tenant={tenant_id} timeline={timeline_id} wait_until_flushed={wait_until_flushed} wait_until_uploaded={wait_until_uploaded} compact={compact}"
|
||||
)
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
|
||||
|
||||
Reference in New Issue
Block a user