//! Batch processing system based on intrusive linked lists. //! //! Enqueuing a batch job requires no allocations, with //! direct support for cancelling jobs early. use std::collections::BTreeMap; use std::pin::pin; use std::sync::Mutex; use futures::future::Either; use scopeguard::ScopeGuard; use tokio::sync::oneshot::error::TryRecvError; use crate::ext::LockExt; pub trait QueueProcessing: Send + 'static { type Req: Send + 'static; type Res: Send; /// Get the desired batch size. fn batch_size(&self, queue_size: usize) -> usize; /// This applies a full batch of events. /// Must respond with a full batch of replies. /// /// If this apply can error, it's expected that errors be forwarded to each Self::Res. /// /// Batching does not need to happen atomically. fn apply(&mut self, req: Vec) -> impl Future> + Send; } pub struct BatchQueue { processor: tokio::sync::Mutex

, inner: Mutex>, } struct BatchJob { req: P::Req, res: tokio::sync::oneshot::Sender, } impl BatchQueue

{ pub fn new(p: P) -> Self { Self { processor: tokio::sync::Mutex::new(p), inner: Mutex::new(BatchQueueInner { version: 0, queue: BTreeMap::new(), }), } } pub async fn call(&self, req: P::Req) -> P::Res { let (id, mut rx) = self.inner.lock_propagate_poison().register_job(req); let guard = scopeguard::guard(id, move |id| { let mut inner = self.inner.lock_propagate_poison(); if inner.queue.remove(&id).is_some() { tracing::debug!("batched task cancelled before completion"); } }); let resp = loop { // try become the leader, or try wait for success. let mut processor = match futures::future::select(rx, pin!(self.processor.lock())).await { // we got the resp. Either::Left((resp, _)) => break resp.ok(), // we are the leader. Either::Right((p, rx_)) => { rx = rx_; p } }; let (reqs, resps) = self.inner.lock_propagate_poison().get_batch(&processor); // apply a batch. let values = processor.apply(reqs).await; // send response values. for (tx, value) in std::iter::zip(resps, values) { // sender hung up but that's fine. drop(tx.send(value)); } match rx.try_recv() { Ok(resp) => break Some(resp), Err(TryRecvError::Closed) => break None, // edge case - there was a race condition where // we became the leader but were not in the batch. // // Example: // thread 1: register job id=1 // thread 2: register job id=2 // thread 2: processor.lock().await // thread 1: processor.lock().await // thread 2: becomes leader, batch_size=1, jobs=[1]. Err(TryRecvError::Empty) => {} } }; // already removed. ScopeGuard::into_inner(guard); resp.expect("no response found. batch processer should not panic") } } struct BatchQueueInner { version: u64, queue: BTreeMap>, } impl BatchQueueInner

{ fn register_job(&mut self, req: P::Req) -> (u64, tokio::sync::oneshot::Receiver) { let (tx, rx) = tokio::sync::oneshot::channel(); let id = self.version; // Overflow concern: // This is a u64, and we might enqueue 2^16 tasks per second. // This gives us 2^48 seconds (9 million years). // Even if this does overflow, it will not break, but some // jobs with the higher version might never get prioritised. self.version += 1; self.queue.insert(id, BatchJob { req, res: tx }); (id, rx) } fn get_batch(&mut self, p: &P) -> (Vec, Vec>) { let batch_size = p.batch_size(self.queue.len()); let mut reqs = Vec::with_capacity(batch_size); let mut resps = Vec::with_capacity(batch_size); while reqs.len() < batch_size { let Some((_, job)) = self.queue.pop_first() else { break; }; reqs.push(job.req); resps.push(job.res); } (reqs, resps) } }