diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index c447685822..f645e3dc26 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -60,7 +60,7 @@ use crate::read::seq_scan::SeqScan; use crate::read::series_scan::SeriesScan; use crate::read::stream::ScanBatchStream; use crate::read::unordered_scan::UnorderedScan; -use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source}; +use crate::read::{BoxedRecordBatchStream, RecordBatch}; use crate::region::options::MergeMode; use crate::region::version::VersionRef; use crate::sst::file::FileHandle; @@ -1031,39 +1031,6 @@ impl ScanInput { self } - /// Scans sources in parallel. - /// - /// # Panics if the input doesn't allow parallel scan. - #[tracing::instrument( - skip(self, sources, semaphore), - fields( - region_id = %self.region_metadata().region_id, - source_count = sources.len() - ) - )] - pub(crate) fn create_parallel_sources( - &self, - sources: Vec, - semaphore: Arc, - channel_size: usize, - ) -> Result> { - if sources.len() <= 1 { - return Ok(sources); - } - - // Spawn a task for each source. - let sources = sources - .into_iter() - .map(|source| { - let (sender, receiver) = mpsc::channel(channel_size); - self.spawn_scan_task(source, semaphore.clone(), sender); - let stream = Box::pin(ReceiverStream::new(receiver)); - Source::Stream(stream) - }) - .collect(); - Ok(sources) - } - /// Builds memtable ranges to scan by `index`. pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> { let memtable = &self.memtables[index.index]; @@ -1173,49 +1140,6 @@ impl ScanInput { Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection)) } - /// Scans the input source in another task and sends batches to the sender. - #[tracing::instrument( - skip(self, input, semaphore, sender), - fields(region_id = %self.region_metadata().region_id) - )] - pub(crate) fn spawn_scan_task( - &self, - mut input: Source, - semaphore: Arc, - sender: mpsc::Sender>, - ) { - let region_id = self.region_metadata().region_id; - let span = tracing::info_span!( - "ScanInput::parallel_scan_task", - region_id = %region_id, - stream_kind = "batch" - ); - common_runtime::spawn_global( - async move { - loop { - // We release the permit before sending result to avoid the task waiting on - // the channel with the permit held. - let maybe_batch = { - // Safety: We never close the semaphore. - let _permit = semaphore.acquire().await.unwrap(); - input.next_batch().await - }; - match maybe_batch { - Ok(Some(batch)) => { - let _ = sender.send(Ok(batch)).await; - } - Ok(None) => break, - Err(e) => { - let _ = sender.send(Err(e)).await; - break; - } - } - } - } - .instrument(span), - ); - } - /// Scans flat sources (RecordBatch streams) in parallel. /// /// # Panics if the input doesn't allow parallel scan.