feat: seq scan support parallelism

This commit is contained in:
evenyag
2023-11-29 17:53:01 +08:00
parent fe2fc723bc
commit 1d44c4e236
6 changed files with 117 additions and 3 deletions

1
Cargo.lock generated
View File

@@ -4867,6 +4867,7 @@ dependencies = [
"strum 0.25.0",
"table",
"tokio",
"tokio-stream",
"tokio-util",
"uuid",
]

View File

@@ -124,6 +124,8 @@ sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "0
strum = { version = "0.25", features = ["derive"] }
tempfile = "3"
tokio = { version = "1.28", features = ["full"] }
# TODO(yingwen): replace other usage by this one.
tokio-stream = { version = "0.1" }
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.7"
tonic = { version = "0.10", features = ["tls"] }

View File

@@ -58,6 +58,7 @@ snafu.workspace = true
store-api.workspace = true
strum.workspace = true
table.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
tokio.workspace = true
uuid.workspace = true

View File

@@ -39,6 +39,8 @@ use datatypes::vectors::{
TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector,
Vector, VectorRef,
};
use futures::stream::BoxStream;
use futures::TryStreamExt;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, SequenceNumber};
@@ -668,6 +670,8 @@ pub enum Source {
Reader(BoxedBatchReader),
/// Source from a [BoxedBatchIterator].
Iter(BoxedBatchIterator),
/// Source from a [BoxedBatchStream].
Stream(BoxedBatchStream),
}
impl Source {
@@ -676,6 +680,7 @@ impl Source {
match self {
Source::Reader(reader) => reader.next_batch().await,
Source::Iter(iter) => iter.next().transpose(),
Source::Stream(stream) => stream.try_next().await,
}
}
}
@@ -698,6 +703,9 @@ pub trait BatchReader: Send {
/// Pointer to [BatchReader].
pub type BoxedBatchReader = Box<dyn BatchReader>;
/// Pointer to a stream that yields [Batch].
pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
#[async_trait::async_trait]
impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
async fn next_batch(&mut self) -> Result<Option<Batch>> {

View File

@@ -26,7 +26,7 @@ use common_time::Timestamp;
use crate::error::Result;
use crate::memtable::BoxedBatchIterator;
use crate::metrics::{MERGE_FILTER_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::read::{Batch, BatchReader, BoxedBatchReader, Source};
use crate::read::{Batch, BatchReader, BoxedBatchReader, BoxedBatchStream, Source};
/// Minimum batch size to output.
const MIN_BATCH_SIZE: usize = 64;
@@ -330,6 +330,12 @@ impl MergeReaderBuilder {
self
}
/// Pushes a batch stream to sources.
pub fn push_batch_stream(&mut self, stream: BoxedBatchStream) -> &mut Self {
self.sources.push(Source::Stream(stream));
self
}
/// Sets the batch size of the reader.
pub fn batch_size(&mut self, size: usize) -> &mut Self {
self.batch_size = if size == 0 { MIN_BATCH_SIZE } else { size };

View File

@@ -25,6 +25,8 @@ use common_telemetry::{debug, error};
use common_time::range::TimestampRange;
use snafu::ResultExt;
use table::predicate::Predicate;
use tokio::sync::{mpsc, Semaphore};
use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::{CacheManager, CacheManagerRef};
@@ -34,7 +36,7 @@ use crate::metrics::READ_STAGE_ELAPSED;
use crate::read::compat::{self, CompatReader};
use crate::read::merge::MergeReaderBuilder;
use crate::read::projection::ProjectionMapper;
use crate::read::{BatchReader, BoxedBatchReader};
use crate::read::{BatchReader, BoxedBatchReader, BoxedBatchStream, Source};
use crate::sst::file::FileHandle;
/// Scans a region and returns rows in a sorted sequence.
@@ -57,6 +59,10 @@ pub struct SeqScan {
cache_manager: Option<CacheManagerRef>,
/// Ignores file not found error.
ignore_file_not_found: bool,
/// Parallelism to scan data.
///
/// Uses parallel reader if `parallelism > 1`.
parallelism: usize,
}
impl SeqScan {
@@ -72,6 +78,7 @@ impl SeqScan {
files: Vec::new(),
cache_manager: None,
ignore_file_not_found: false,
parallelism: 0,
}
}
@@ -117,11 +124,22 @@ impl SeqScan {
self
}
/// Sets scan parallelism.
#[must_use]
pub(crate) fn with_parallelism(mut self, parallelism: usize) -> Self {
self.parallelism = parallelism;
self
}
/// Builds a stream for the query.
pub async fn build_stream(&self) -> Result<SendableRecordBatchStream> {
let start = Instant::now();
// Scans all memtables and SSTs. Builds a merge reader to merge results.
let mut reader = self.build_reader().await?;
let mut reader = if self.parallelism > 1 {
self.build_parallel_reader().await?
} else {
self.build_reader().await?
};
let mut metrics = Metrics {
scan_cost: start.elapsed(),
};
@@ -191,6 +209,84 @@ impl SeqScan {
Ok(Box::new(builder.build().await?))
}
/// Builds a [BoxedBatchReader] that can scan memtables and SSTs in parallel.
async fn build_parallel_reader(&self) -> Result<BoxedBatchReader> {
assert!(self.parallelism > 1);
let semaphore = Arc::new(Semaphore::new(self.parallelism));
// Scans all memtables and SSTs. Builds a merge reader to merge results.
let mut builder = MergeReaderBuilder::new();
for mem in &self.memtables {
let iter = mem.iter(Some(self.mapper.column_ids()), self.predicate.clone());
let stream = Self::scan_source_in_background(Source::Iter(iter), semaphore.clone());
builder.push_batch_stream(stream);
}
for file in &self.files {
let maybe_reader = self
.access_layer
.read_sst(file.clone())
.predicate(self.predicate.clone())
.time_range(self.time_range)
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_manager.clone())
.build()
.await;
let reader = match maybe_reader {
Ok(reader) => reader,
Err(e) => {
if e.is_object_not_found() && self.ignore_file_not_found {
error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
continue;
} else {
return Err(e);
}
}
};
let reader = if compat::has_same_columns(self.mapper.metadata(), reader.metadata()) {
Source::Reader(Box::new(reader))
} else {
// They have different schema. We need to adapt the batch first so the
// mapper can convert the it.
let compat_reader =
CompatReader::new(&self.mapper, reader.metadata().clone(), reader)?;
Source::Reader(Box::new(compat_reader))
};
let stream = Self::scan_source_in_background(reader, semaphore.clone());
builder.push_batch_stream(stream);
}
Ok(Box::new(builder.build().await?))
}
/// Scan the input source in another task.
fn scan_source_in_background(mut input: Source, semaphore: Arc<Semaphore>) -> BoxedBatchStream {
let (sender, receiver) = mpsc::channel(1);
tokio::spawn(async move {
loop {
// We release the permit before sending result to avoid the task waiting on
// the channel with the permit holded
let maybe_batch = {
// Safety: We never close the semaphore.
let _permit = semaphore.acquire().await.unwrap();
input.next_batch().await
};
match maybe_batch {
Ok(Some(batch)) => {
let _ = sender.send(Ok(batch)).await;
}
Ok(None) => break,
Err(e) => {
let _ = sender.send(Err(e)).await;
break;
}
}
}
});
Box::pin(ReceiverStream::new(receiver))
}
/// Fetch a batch from the reader and convert it into a record batch.
async fn fetch_record_batch(
reader: &mut dyn BatchReader,