Compare commits

...

4 Commits

Author SHA1 Message Date
Joonas Koivunen
0678febff8 fix: close semaphore on stop if not already closed 2024-02-05 15:17:39 +02:00
Joonas Koivunen
7c37fad092 nag if shutdown is taking longer than 1s 2024-02-05 15:17:39 +02:00
Joonas Koivunen
c3c9889985 chore: add time at shutdown to Shutdown op
so we can nag if it's longer 1s.
2024-02-05 15:17:39 +02:00
Joonas Koivunen
fc88328c05 refactor: looks like a deadlock
however it's not because the acquire_owned will not capture the
environment. better to still grab the semaphore, then execute the
acquire.
2024-02-05 10:51:18 +02:00
2 changed files with 54 additions and 22 deletions

View File

@@ -942,7 +942,7 @@ impl RemoteTimelineClient {
tracing::error!("RemoteTimelineClient::shutdown was cancelled; this should not happen, do not make this into an allowed_error")
});
let fut = {
let sem = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = match &mut *guard {
UploadQueue::Stopped(_) => return Ok(()),
@@ -958,25 +958,32 @@ impl RemoteTimelineClient {
// made cancellable.
if !upload_queue.shutting_down {
upload_queue.shutting_down = true;
upload_queue.queued_operations.push_back(UploadOp::Shutdown);
upload_queue
.queued_operations
.push_back(UploadOp::Shutdown {
since: std::time::Instant::now(),
});
// this operation is not counted similar to Barrier
self.launch_queued_tasks(upload_queue);
}
upload_queue.shutdown_ready.clone().acquire_owned()
upload_queue.shutdown_ready.clone()
};
let res = fut.await;
let mut closed = std::pin::pin!(sem.acquire());
let res = tokio::select! {
res = &mut closed => res,
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
tracing::warn!("still waiting for UploadQueueInitialized to shutdown");
closed.await
}
};
scopeguard::ScopeGuard::into_inner(sg);
match res {
Ok(_permit) => unreachable!("shutdown_ready should not have been added permits"),
Err(_closed) => {
// expected
}
}
res.expect_err("shutdown_ready should not have been added permits, only closed");
self.stop()
}
@@ -1249,8 +1256,7 @@ impl RemoteTimelineClient {
// Wait for preceding uploads to finish. Concurrent deletions are OK, though.
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
}
UploadOp::Barrier(_) | UploadOp::Shutdown => {
UploadOp::Barrier(_) | UploadOp::Shutdown { .. } => {
upload_queue.inprogress_tasks.is_empty()
}
};
@@ -1265,10 +1271,11 @@ impl RemoteTimelineClient {
break;
}
if let UploadOp::Shutdown = next_op {
if let UploadOp::Shutdown { since } = next_op {
// leave the op in the queue but do not start more tasks; it will be dropped when
// the stop is called.
upload_queue.shutdown_ready.close();
assert!(upload_queue.shutting_down);
Self::communicate_shutdown(&upload_queue.shutdown_ready, since);
break;
}
@@ -1292,7 +1299,9 @@ impl RemoteTimelineClient {
sender.send_replace(());
continue;
}
UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
UploadOp::Shutdown { .. } => {
unreachable!("shutdown is intentionally never popped off")
}
};
// Assign unique ID to this task
@@ -1331,7 +1340,24 @@ impl RemoteTimelineClient {
}
}
///
fn communicate_shutdown(
shutdown_ready: &tokio::sync::Semaphore,
shutting_down_since: &std::time::Instant,
) {
if shutdown_ready.is_closed() {
return;
}
// there cannot be any races because the semaphore is from &mut UploadQueueInitialized
shutdown_ready.close();
let elapsed = shutting_down_since.elapsed();
if elapsed > std::time::Duration::from_secs(1) {
tracing::warn!(
elapsed_ms = elapsed.as_millis(),
"it took longer than expected to shutdown RemoteTimelineClient"
);
}
}
/// Perform an upload task.
///
/// The task is in the `inprogress_tasks` list. This function will try to
@@ -1341,7 +1367,6 @@ impl RemoteTimelineClient {
///
/// The task can be shut down, however. That leads to stopping the whole
/// queue.
///
async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
// Loop to retry until it completes.
loop {
@@ -1428,7 +1453,7 @@ impl RemoteTimelineClient {
.await
.map_err(|e| anyhow::anyhow!(e))
}
unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown { .. } => {
// unreachable. Barrier operations are handled synchronously in
// launch_queued_tasks
warn!("unexpected {unexpected:?} operation in perform_upload_task");
@@ -1525,7 +1550,7 @@ impl RemoteTimelineClient {
upload_queue.num_inprogress_deletions -= 1;
None
}
UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
UploadOp::Barrier(..) | UploadOp::Shutdown { .. } => unreachable!(),
};
// Launch any queued tasks that were unblocked by this one.
@@ -1580,7 +1605,7 @@ impl RemoteTimelineClient {
reason: "should we track deletes? positive or negative sign?",
},
),
UploadOp::Barrier(..) | UploadOp::Shutdown => {
UploadOp::Barrier(..) | UploadOp::Shutdown { .. } => {
// we do not account these
return None;
}
@@ -1684,6 +1709,13 @@ impl RemoteTimelineClient {
// Tear down queued ops
for op in qi.queued_operations.into_iter() {
self.calls_unfinished_metric_end(&op);
if let UploadOp::Shutdown { since } = &op {
// just in case we have a waiter on the RemoteTimelineClient::shutdown
assert!(qi.shutting_down);
Self::communicate_shutdown(&qi.shutdown_ready, since);
}
// Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
// which is exactly what we want to happen.
drop(op);

View File

@@ -292,7 +292,7 @@ pub(crate) enum UploadOp {
/// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
/// this is the same as a Barrier.
Shutdown,
Shutdown { since: std::time::Instant },
}
impl std::fmt::Display for UploadOp {
@@ -314,7 +314,7 @@ impl std::fmt::Display for UploadOp {
write!(f, "Delete({} layers)", delete.layers.len())
}
UploadOp::Barrier(_) => write!(f, "Barrier"),
UploadOp::Shutdown => write!(f, "Shutdown"),
UploadOp::Shutdown { since } => write!(f, "Shutdown(since: {:?})", since.elapsed()),
}
}
}