diff --git a/proxy/src/batch.rs b/proxy/src/batch.rs index b6c3f9be6e..61bdf2b747 100644 --- a/proxy/src/batch.rs +++ b/proxy/src/batch.rs @@ -52,7 +52,10 @@ impl BatchQueue

{ pub async fn call(&self, req: P::Req) -> P::Res { let (id, mut rx) = self.inner.lock_propagate_poison().register_job(req); let guard = scopeguard::guard(id, move |id| { - self.inner.lock_propagate_poison().queue.remove(&id); + let mut inner = self.inner.lock_propagate_poison(); + if inner.queue.remove(&id).is_some() { + tracing::debug!("batched task cancelled before completion"); + } }); let resp = loop { @@ -112,6 +115,12 @@ impl BatchQueueInner

{ let (tx, rx) = tokio::sync::oneshot::channel(); let id = self.version; + + // Overflow concern: + // This is a u64, and we might enqueue 2^16 tasks per second. + // This gives us 2^48 seconds (9 million years). + // Even if this does overflow, it will not break, but some + // jobs with the higher version might never get prioritised. self.version += 1; self.queue.insert(id, BatchJob { req, res: tx });