feat: try send before send

reduce the send timeout to 10ms
This commit is contained in:
evenyag
2025-04-09 14:34:39 +08:00
parent 8fa1ebcc3e
commit 6293bb1f5b

View File

@@ -31,7 +31,7 @@ use smallvec::{smallvec, SmallVec};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::Semaphore;
@@ -46,7 +46,7 @@ use crate::read::seq_scan::{build_sources, SeqScan};
use crate::read::{Batch, ScannerMetrics};
/// Timeout to send a batch to a sender.
const SEND_TIMEOUT: Duration = Duration::from_millis(100);
const SEND_TIMEOUT: Duration = Duration::from_millis(10);
/// List of receivers.
type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
@@ -433,8 +433,47 @@ impl SenderList {
}
}
/// Finds a partition and tries to send the batch to the partition.
/// Returns None if it sends successfully.
fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
for _ in 0..self.senders.len() {
ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
let sender_idx = self.fetch_add_sender_idx();
let Some(sender) = &self.senders[sender_idx] else {
continue;
};
match sender.try_send(Ok(batch)) {
Ok(()) => return Ok(None),
Err(TrySendError::Full(res)) => {
// Safety: we send Ok.
batch = res.unwrap();
}
Err(TrySendError::Closed(res)) => {
self.senders[sender_idx] = None;
self.num_nones += 1;
// Safety: we send Ok.
batch = res.unwrap();
}
}
}
Ok(Some(batch))
}
/// Finds a partition and sends the batch to the partition.
async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
match self.try_send_batch(batch)? {
Some(b) => {
// Unable to send batch to partition.
batch = b;
}
None => {
return Ok(());
}
}
loop {
ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);