Error metric and retries

This commit is contained in:
John Spray
2023-08-14 16:42:21 +01:00
parent 164f916a40
commit d40f8475a5
2 changed files with 100 additions and 25 deletions

View File

@@ -1,4 +1,4 @@
use crate::metrics::{DELETION_QUEUE_EXECUTED, DELETION_QUEUE_SUBMITTED};
use crate::metrics::{DELETION_QUEUE_ERRORS, DELETION_QUEUE_EXECUTED, DELETION_QUEUE_SUBMITTED};
use remote_storage::{GenericRemoteStorage, RemotePath};
use serde::Deserialize;
use serde::Serialize;
@@ -24,13 +24,15 @@ const FLUSH_DEFAULT_DEADLINE: Duration = Duration::from_millis(10000);
// more objects before doing the flush.
const FLUSH_EXPLICIT_DEADLINE: Duration = Duration::from_millis(100);
// TODO: metrics for queue length, deletions executed, deletion errors
// After this length of time, execute deletions which are elegible to run,
// even if we haven't accumulated enough for a full-sized DeleteObjects
const EXECUTE_IDLE_DEADLINE: Duration = Duration::from_secs(60);
// If the last attempt to execute failed, wait only this long before
// trying again.
const EXECUTE_RETRY_DEADLINE: Duration = Duration::from_secs(1);
// TODO: adminstrative "panic button" config property to disable all deletions
// TODO: implement admin API hook to flush deletion queue, for use in integration tests
// that would like to assert deleted objects are gone
// TODO: configurable for how long to wait before executing deletions
/// We aggregate object deletions from many tenants in one place, for several reasons:
@@ -40,14 +42,11 @@ const FLUSH_EXPLICIT_DEADLINE: Duration = Duration::from_millis(100);
/// - Globally control throughput of deletions, as these are a low priority task: do
/// not compete with the same S3 clients/connections used for higher priority uploads.
///
/// There are two parts ot this, frontend and backend, joined by channels:
/// There are two parts to this, frontend and backend, joined by channels:
/// - DeletionQueueWorker consumes the frontend queue: the "DeletionQueue" that makes up
/// the public interface and accepts deletion requests.
/// - BackendQueueWorker consumes the backend queue: a queue of DeletionList that have
/// already been written to S3 and are now eligible for final deletion.
///
///
///
///
/// There are three queues internally:
/// - Incoming deletes (the DeletionQueue that the outside world sees)
@@ -139,6 +138,7 @@ impl DeletionQueueClient {
layers: Vec<LayerFileName>,
) {
DELETION_QUEUE_SUBMITTED.inc_by(layers.len() as u64);
info!("pushed!");
self.do_push(FrontendQueueMessage::Delete(DeletionOp {
tenant_id,
timeline_id,
@@ -166,6 +166,10 @@ impl DeletionQueueClient {
// Wait until all previous deletions are executed
pub async fn flush_execute(&self) {
// Flush any buffered work to deletion lists
self.flush().await;
// Flush execution of deletion lists
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
self.do_flush(FrontendQueueMessage::FlushExecute(FlushOp { tx }), rx)
.await
@@ -187,30 +191,52 @@ pub struct BackendQueueWorker {
// DeletionLists we have fully executed, which may be deleted
// from remote storage.
executed_lists: Vec<DeletionList>,
// How long to wait for a message before executing anyway
timeout: Duration,
}
impl BackendQueueWorker {
async fn maybe_execute(&mut self) {
async fn maybe_execute(&mut self) -> bool {
fail::fail_point!("deletion-queue-before-execute", |_| {
return;
info!("Skipping execution, failpoint set");
DELETION_QUEUE_ERRORS
.with_label_values(&["failpoint"])
.inc();
return false;
});
if self.accumulator.is_empty() {
return true;
}
match self.remote_storage.delete_objects(&self.accumulator).await {
Ok(()) => {
DELETION_QUEUE_EXECUTED.inc_by(self.accumulator.len() as u64);
info!(
"Executed deletion batch {}..{}",
self.accumulator
.first()
.expect("accumulator should be non-empty"),
self.accumulator
.last()
.expect("accumulator should be non-empty"),
);
self.accumulator.clear();
self.executed_lists.append(&mut self.pending_lists);
self.timeout = EXECUTE_IDLE_DEADLINE;
true
}
Err(e) => {
warn!("Batch deletion failed: {e}, will retry");
// TODO: increment error counter
warn!("DeleteObjects request failed: {e}, will retry");
DELETION_QUEUE_ERRORS.with_label_values(&["execute"]).inc();
self.timeout = EXECUTE_RETRY_DEADLINE;
false
}
}
}
pub async fn background(&mut self) {
let _span = tracing::info_span!("deletion_backend");
// TODO: if we would like to be able to defer deletions while a Layer still has
// refs (but it will be elegible for deletion after process ends), then we may
// add an ephemeral part to BackendQueueMessage::Delete that tracks which keys
@@ -222,7 +248,22 @@ impl BackendQueueWorker {
self.accumulator.reserve(MAX_KEYS_PER_DELETE);
while let Some(msg) = self.rx.recv().await {
loop {
let msg = match tokio::time::timeout(self.timeout, self.rx.recv()).await {
Ok(Some(m)) => m,
Ok(None) => {
// All queue senders closed
info!("Shutting down");
break;
}
Err(_) => {
// Timeout, we hit deadline to execute whatever we have in hand
self.maybe_execute().await;
continue;
}
};
match msg {
BackendQueueMessage::Delete(mut list) => {
if list.objects.is_empty() {
@@ -282,16 +323,17 @@ impl BackendQueueWorker {
.truncate(self.executed_lists.len() - executed_keys.len());
}
Err(e) => {
warn!("Failed to purge deletion lists: {e}");
warn!("Failed to delete deletion list(s): {e}");
// Do nothing: the elements remain in executed_lists, and purge will be retried
// next time we process some deletions and go around the loop.
DELETION_QUEUE_ERRORS
.with_label_values(&["delete_list"])
.inc();
}
}
}
BackendQueueMessage::Flush(op) => {
while !self.accumulator.is_empty() {
self.maybe_execute().await;
}
self.maybe_execute().await;
op.fire();
}
@@ -341,8 +383,29 @@ impl FrontendQueueWorker {
let size = bytes.len();
let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes));
if self.pending.objects.is_empty() {
// We do not expect to be called in this state, but handle it so that later
// logging code can be assured that therre is always a first+last key to print
for f in self.pending_flushes.drain(..) {
f.fire();
}
return;
}
match self.remote_storage.upload(source, size, &key, None).await {
Ok(_) => {
info!(
"Stored deletion list {key} ({0}..{1})",
self.pending
.objects
.first()
.expect("list should be non-empty"),
self.pending
.objects
.last()
.expect("list should be non-empty"),
);
for f in self.pending_flushes.drain(..) {
f.fire();
}
@@ -361,9 +424,10 @@ impl FrontendQueueWorker {
}
}
Err(e) => {
DELETION_QUEUE_ERRORS.with_label_values(&["put_list"]).inc();
warn!(
sequence = self.pending.sequence,
"Failed to flush deletion list, will retry later ({e})"
"Failed to write deletion list to remote storage, will retry later ({e})"
)
}
}
@@ -372,6 +436,7 @@ impl FrontendQueueWorker {
/// This is the front-end ingest, where we bundle up deletion requests into DeletionList
/// and write them out, for later
pub async fn background(&mut self) {
info!("Started deletion frontend worker");
loop {
let flush_delay = self.deadline.duration_since(Instant::now());
@@ -400,7 +465,7 @@ impl FrontendQueueWorker {
let timeline_path = self.conf.timeline_path(&op.tenant_id, &op.timeline_id);
let _span = tracing::info_span!(
"execute_deletion",
"deletion_frontend_enqueue",
tenant_id = %op.tenant_id,
timeline_id = %op.timeline_id,
);
@@ -498,6 +563,7 @@ impl DeletionQueue {
accumulator: Vec::new(),
pending_lists: Vec::new(),
executed_lists: Vec::new(),
timeout: EXECUTE_IDLE_DEADLINE,
}),
)
}

View File

@@ -660,7 +660,7 @@ static REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST: Lazy<HistogramVec> = Lazy::new
pub(crate) static DELETION_QUEUE_SUBMITTED: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_deletion_queue_submitted",
"pageserver_deletion_queue_submitted_total",
"Number of objects submitted for deletion"
)
.expect("failed to define a metric")
@@ -668,12 +668,21 @@ pub(crate) static DELETION_QUEUE_SUBMITTED: Lazy<IntCounter> = Lazy::new(|| {
pub(crate) static DELETION_QUEUE_EXECUTED: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_deletion_queue_executed",
"pageserver_deletion_queue_executed_total",
"Number of objects deleted"
)
.expect("failed to define a metric")
});
pub(crate) static DELETION_QUEUE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_deletion_queue_errors_total",
"Incremented on retryable remote I/O errors writing deletion lists or executing deletions.",
&["op_kind"],
)
.expect("failed to define a metric")
});
static REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_remote_timeline_client_bytes_started",