pageserver: refactor http deletion_queue_flush

This commit is contained in:
John Spray
2023-09-12 15:27:27 +01:00
parent b76f08e863
commit 7b05ec2825

View File

@@ -5,6 +5,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use futures::TryFutureExt;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
@@ -24,7 +25,7 @@ use super::models::{
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
};
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
use crate::deletion_queue::DeletionQueueClient;
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
@@ -1159,27 +1160,20 @@ async fn deletion_queue_flush(
let execute = parse_query_param(&r, "execute")?.unwrap_or(false);
let flush = async {
if execute {
state.deletion_queue_client.flush_execute().await
} else {
state.deletion_queue_client.flush().await
}
}
// DeletionQueueError's only case is shutting down.
.map_err(|_| ApiError::ShuttingDown);
tokio::select! {
flush_result = async {
if execute {
state.deletion_queue_client.flush_execute().await
} else {
state.deletion_queue_client.flush().await
}
} => {
match flush_result {
Ok(())=> {
json_response(StatusCode::OK, ())
},
Err(e) => {
match e {
DeletionQueueError::ShuttingDown => {
Err(ApiError::ShuttingDown)
}
}
}
}
},
res = flush => {
res.map(|()| json_response(StatusCode::OK, ()))?
}
_ = cancel.cancelled() => {
Err(ApiError::ShuttingDown)
}