mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-26 18:00:41 +00:00
@@ -60,7 +60,7 @@ use crate::read::seq_scan::SeqScan;
|
||||
use crate::read::series_scan::SeriesScan;
|
||||
use crate::read::stream::ScanBatchStream;
|
||||
use crate::read::unordered_scan::UnorderedScan;
|
||||
use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
|
||||
use crate::read::{BoxedRecordBatchStream, RecordBatch};
|
||||
use crate::region::options::MergeMode;
|
||||
use crate::region::version::VersionRef;
|
||||
use crate::sst::file::FileHandle;
|
||||
@@ -1031,39 +1031,6 @@ impl ScanInput {
|
||||
self
|
||||
}
|
||||
|
||||
/// Scans sources in parallel.
|
||||
///
|
||||
/// # Panics if the input doesn't allow parallel scan.
|
||||
#[tracing::instrument(
|
||||
skip(self, sources, semaphore),
|
||||
fields(
|
||||
region_id = %self.region_metadata().region_id,
|
||||
source_count = sources.len()
|
||||
)
|
||||
)]
|
||||
pub(crate) fn create_parallel_sources(
|
||||
&self,
|
||||
sources: Vec<Source>,
|
||||
semaphore: Arc<Semaphore>,
|
||||
channel_size: usize,
|
||||
) -> Result<Vec<Source>> {
|
||||
if sources.len() <= 1 {
|
||||
return Ok(sources);
|
||||
}
|
||||
|
||||
// Spawn a task for each source.
|
||||
let sources = sources
|
||||
.into_iter()
|
||||
.map(|source| {
|
||||
let (sender, receiver) = mpsc::channel(channel_size);
|
||||
self.spawn_scan_task(source, semaphore.clone(), sender);
|
||||
let stream = Box::pin(ReceiverStream::new(receiver));
|
||||
Source::Stream(stream)
|
||||
})
|
||||
.collect();
|
||||
Ok(sources)
|
||||
}
|
||||
|
||||
/// Builds memtable ranges to scan by `index`.
|
||||
pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
|
||||
let memtable = &self.memtables[index.index];
|
||||
@@ -1173,49 +1140,6 @@ impl ScanInput {
|
||||
Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
|
||||
}
|
||||
|
||||
/// Scans the input source in another task and sends batches to the sender.
|
||||
#[tracing::instrument(
|
||||
skip(self, input, semaphore, sender),
|
||||
fields(region_id = %self.region_metadata().region_id)
|
||||
)]
|
||||
pub(crate) fn spawn_scan_task(
|
||||
&self,
|
||||
mut input: Source,
|
||||
semaphore: Arc<Semaphore>,
|
||||
sender: mpsc::Sender<Result<Batch>>,
|
||||
) {
|
||||
let region_id = self.region_metadata().region_id;
|
||||
let span = tracing::info_span!(
|
||||
"ScanInput::parallel_scan_task",
|
||||
region_id = %region_id,
|
||||
stream_kind = "batch"
|
||||
);
|
||||
common_runtime::spawn_global(
|
||||
async move {
|
||||
loop {
|
||||
// We release the permit before sending result to avoid the task waiting on
|
||||
// the channel with the permit held.
|
||||
let maybe_batch = {
|
||||
// Safety: We never close the semaphore.
|
||||
let _permit = semaphore.acquire().await.unwrap();
|
||||
input.next_batch().await
|
||||
};
|
||||
match maybe_batch {
|
||||
Ok(Some(batch)) => {
|
||||
let _ = sender.send(Ok(batch)).await;
|
||||
}
|
||||
Ok(None) => break,
|
||||
Err(e) => {
|
||||
let _ = sender.send(Err(e)).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(span),
|
||||
);
|
||||
}
|
||||
|
||||
/// Scans flat sources (RecordBatch streams) in parallel.
|
||||
///
|
||||
/// # Panics if the input doesn't allow parallel scan.
|
||||
|
||||
Reference in New Issue
Block a user