diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index 694d912914..7d3e4f30fb 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -31,7 +31,7 @@ use smallvec::{smallvec, SmallVec}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties}; -use tokio::sync::mpsc::error::SendTimeoutError; +use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::Semaphore; @@ -46,7 +46,7 @@ use crate::read::seq_scan::{build_sources, SeqScan}; use crate::read::{Batch, ScannerMetrics}; /// Timeout to send a batch to a sender. -const SEND_TIMEOUT: Duration = Duration::from_millis(100); +const SEND_TIMEOUT: Duration = Duration::from_millis(10); /// List of receivers. type ReceiverList = Vec>>>; @@ -433,8 +433,47 @@ impl SenderList { } } + /// Finds a partition and tries to send the batch to the partition. + /// Returns None if it sends successfully. + fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result> { + for _ in 0..self.senders.len() { + ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu); + + let sender_idx = self.fetch_add_sender_idx(); + let Some(sender) = &self.senders[sender_idx] else { + continue; + }; + + match sender.try_send(Ok(batch)) { + Ok(()) => return Ok(None), + Err(TrySendError::Full(res)) => { + // Safety: we send Ok. + batch = res.unwrap(); + } + Err(TrySendError::Closed(res)) => { + self.senders[sender_idx] = None; + self.num_nones += 1; + // Safety: we send Ok. + batch = res.unwrap(); + } + } + } + + Ok(Some(batch)) + } + /// Finds a partition and sends the batch to the partition. async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> { + match self.try_send_batch(batch)? { + Some(b) => { + // Unable to send batch to partition. + batch = b; + } + None => { + return Ok(()); + } + } + loop { ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);