From 7c469b30aa5e103bfab9b2cba2f793c9c24aece4 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 10 Jun 2025 13:51:26 -0700 Subject: [PATCH] add another debug and overflow comment --- proxy/src/batch.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/proxy/src/batch.rs b/proxy/src/batch.rs index b6c3f9be6e..61bdf2b747 100644 --- a/proxy/src/batch.rs +++ b/proxy/src/batch.rs @@ -52,7 +52,10 @@ impl BatchQueue

{ pub async fn call(&self, req: P::Req) -> P::Res { let (id, mut rx) = self.inner.lock_propagate_poison().register_job(req); let guard = scopeguard::guard(id, move |id| { - self.inner.lock_propagate_poison().queue.remove(&id); + let mut inner = self.inner.lock_propagate_poison(); + if inner.queue.remove(&id).is_some() { + tracing::debug!("batched task cancelled before completion"); + } }); let resp = loop { @@ -112,6 +115,12 @@ impl BatchQueueInner

{ let (tx, rx) = tokio::sync::oneshot::channel(); let id = self.version; + + // Overflow concern: + // This is a u64, and we might enqueue 2^16 tasks per second. + // This gives us 2^48 seconds (9 million years). + // Even if this does overflow, it will not break, but some + // jobs with the higher version might never get prioritised. self.version += 1; self.queue.insert(id, BatchJob { req, res: tx });