From 1d44c4e236e1c3ffdcdc9a71ef09b10a82807937 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 29 Nov 2023 17:53:01 +0800 Subject: [PATCH] feat: seq scan support parallelism --- Cargo.lock | 1 + Cargo.toml | 2 + src/mito2/Cargo.toml | 1 + src/mito2/src/read.rs | 8 +++ src/mito2/src/read/merge.rs | 8 ++- src/mito2/src/read/seq_scan.rs | 100 ++++++++++++++++++++++++++++++++- 6 files changed, 117 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 099d0ead57..b57bd1b23c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4867,6 +4867,7 @@ dependencies = [ "strum 0.25.0", "table", "tokio", + "tokio-stream", "tokio-util", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index 449e172983..10b9743038 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -124,6 +124,8 @@ sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "0 strum = { version = "0.25", features = ["derive"] } tempfile = "3" tokio = { version = "1.28", features = ["full"] } +# TODO(yingwen): replace other usage by this one. +tokio-stream = { version = "0.1" } tokio-util = { version = "0.7", features = ["io-util", "compat"] } toml = "0.7" tonic = { version = "0.10", features = ["tls"] } diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 7f340b04a7..0e2bd5ad39 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -58,6 +58,7 @@ snafu.workspace = true store-api.workspace = true strum.workspace = true table.workspace = true +tokio-stream.workspace = true tokio-util.workspace = true tokio.workspace = true uuid.workspace = true diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 1ef80b3bb1..c3ce229780 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -39,6 +39,8 @@ use datatypes::vectors::{ TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector, Vector, VectorRef, }; +use futures::stream::BoxStream; +use futures::TryStreamExt; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::RegionMetadata; use store_api::storage::{ColumnId, SequenceNumber}; @@ -668,6 +670,8 @@ pub enum Source { Reader(BoxedBatchReader), /// Source from a [BoxedBatchIterator]. Iter(BoxedBatchIterator), + /// Source from a [BoxedBatchStream]. + Stream(BoxedBatchStream), } impl Source { @@ -676,6 +680,7 @@ impl Source { match self { Source::Reader(reader) => reader.next_batch().await, Source::Iter(iter) => iter.next().transpose(), + Source::Stream(stream) => stream.try_next().await, } } } @@ -698,6 +703,9 @@ pub trait BatchReader: Send { /// Pointer to [BatchReader]. pub type BoxedBatchReader = Box; +/// Pointer to a stream that yields [Batch]. +pub type BoxedBatchStream = BoxStream<'static, Result>; + #[async_trait::async_trait] impl BatchReader for Box { async fn next_batch(&mut self) -> Result> { diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index ad1f51cf1d..f14b6da3c3 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -26,7 +26,7 @@ use common_time::Timestamp; use crate::error::Result; use crate::memtable::BoxedBatchIterator; use crate::metrics::{MERGE_FILTER_ROWS_TOTAL, READ_STAGE_ELAPSED}; -use crate::read::{Batch, BatchReader, BoxedBatchReader, Source}; +use crate::read::{Batch, BatchReader, BoxedBatchReader, BoxedBatchStream, Source}; /// Minimum batch size to output. const MIN_BATCH_SIZE: usize = 64; @@ -330,6 +330,12 @@ impl MergeReaderBuilder { self } + /// Pushes a batch stream to sources. + pub fn push_batch_stream(&mut self, stream: BoxedBatchStream) -> &mut Self { + self.sources.push(Source::Stream(stream)); + self + } + /// Sets the batch size of the reader. pub fn batch_size(&mut self, size: usize) -> &mut Self { self.batch_size = if size == 0 { MIN_BATCH_SIZE } else { size }; diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index b0a86072c1..c347bd81f1 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -25,6 +25,8 @@ use common_telemetry::{debug, error}; use common_time::range::TimestampRange; use snafu::ResultExt; use table::predicate::Predicate; +use tokio::sync::{mpsc, Semaphore}; +use tokio_stream::wrappers::ReceiverStream; use crate::access_layer::AccessLayerRef; use crate::cache::{CacheManager, CacheManagerRef}; @@ -34,7 +36,7 @@ use crate::metrics::READ_STAGE_ELAPSED; use crate::read::compat::{self, CompatReader}; use crate::read::merge::MergeReaderBuilder; use crate::read::projection::ProjectionMapper; -use crate::read::{BatchReader, BoxedBatchReader}; +use crate::read::{BatchReader, BoxedBatchReader, BoxedBatchStream, Source}; use crate::sst::file::FileHandle; /// Scans a region and returns rows in a sorted sequence. @@ -57,6 +59,10 @@ pub struct SeqScan { cache_manager: Option, /// Ignores file not found error. ignore_file_not_found: bool, + /// Parallelism to scan data. + /// + /// Uses parallel reader if `parallelism > 1`. + parallelism: usize, } impl SeqScan { @@ -72,6 +78,7 @@ impl SeqScan { files: Vec::new(), cache_manager: None, ignore_file_not_found: false, + parallelism: 0, } } @@ -117,11 +124,22 @@ impl SeqScan { self } + /// Sets scan parallelism. + #[must_use] + pub(crate) fn with_parallelism(mut self, parallelism: usize) -> Self { + self.parallelism = parallelism; + self + } + /// Builds a stream for the query. pub async fn build_stream(&self) -> Result { let start = Instant::now(); // Scans all memtables and SSTs. Builds a merge reader to merge results. - let mut reader = self.build_reader().await?; + let mut reader = if self.parallelism > 1 { + self.build_parallel_reader().await? + } else { + self.build_reader().await? + }; let mut metrics = Metrics { scan_cost: start.elapsed(), }; @@ -191,6 +209,84 @@ impl SeqScan { Ok(Box::new(builder.build().await?)) } + /// Builds a [BoxedBatchReader] that can scan memtables and SSTs in parallel. + async fn build_parallel_reader(&self) -> Result { + assert!(self.parallelism > 1); + let semaphore = Arc::new(Semaphore::new(self.parallelism)); + + // Scans all memtables and SSTs. Builds a merge reader to merge results. + let mut builder = MergeReaderBuilder::new(); + for mem in &self.memtables { + let iter = mem.iter(Some(self.mapper.column_ids()), self.predicate.clone()); + let stream = Self::scan_source_in_background(Source::Iter(iter), semaphore.clone()); + + builder.push_batch_stream(stream); + } + for file in &self.files { + let maybe_reader = self + .access_layer + .read_sst(file.clone()) + .predicate(self.predicate.clone()) + .time_range(self.time_range) + .projection(Some(self.mapper.column_ids().to_vec())) + .cache(self.cache_manager.clone()) + .build() + .await; + let reader = match maybe_reader { + Ok(reader) => reader, + Err(e) => { + if e.is_object_not_found() && self.ignore_file_not_found { + error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id()); + continue; + } else { + return Err(e); + } + } + }; + let reader = if compat::has_same_columns(self.mapper.metadata(), reader.metadata()) { + Source::Reader(Box::new(reader)) + } else { + // They have different schema. We need to adapt the batch first so the + // mapper can convert the it. + let compat_reader = + CompatReader::new(&self.mapper, reader.metadata().clone(), reader)?; + Source::Reader(Box::new(compat_reader)) + }; + + let stream = Self::scan_source_in_background(reader, semaphore.clone()); + builder.push_batch_stream(stream); + } + Ok(Box::new(builder.build().await?)) + } + + /// Scan the input source in another task. + fn scan_source_in_background(mut input: Source, semaphore: Arc) -> BoxedBatchStream { + let (sender, receiver) = mpsc::channel(1); + tokio::spawn(async move { + loop { + // We release the permit before sending result to avoid the task waiting on + // the channel with the permit holded + let maybe_batch = { + // Safety: We never close the semaphore. + let _permit = semaphore.acquire().await.unwrap(); + input.next_batch().await + }; + match maybe_batch { + Ok(Some(batch)) => { + let _ = sender.send(Ok(batch)).await; + } + Ok(None) => break, + Err(e) => { + let _ = sender.send(Err(e)).await; + break; + } + } + } + }); + + Box::pin(ReceiverStream::new(receiver)) + } + /// Fetch a batch from the reader and convert it into a record batch. async fn fetch_record_batch( reader: &mut dyn BatchReader,