feat: make RegionScanner aware of PartitionRange (#4170)

* define PartitionRange

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add optimizer rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement interfaces

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl aggr stream

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add fallback method

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add document and rename struct

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add more comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-06-21 17:54:22 +08:00
committed by GitHub
parent ac574b66ab
commit fce65c97e3
22 changed files with 615 additions and 127 deletions

1
Cargo.lock generated
View File

@@ -10828,6 +10828,7 @@ dependencies = [
"common-error",
"common-macro",
"common-recordbatch",
"common-time",
"common-wal",
"datafusion-expr 38.0.0",
"datafusion-physical-plan 38.0.0",

View File

@@ -105,6 +105,10 @@ impl BoxedError {
inner: Box::new(err),
}
}
pub fn into_inner(self) -> Box<dyn crate::ext::ErrorExt + Send + Sync> {
self.inner
}
}
impl std::fmt::Debug for BoxedError {

View File

@@ -20,6 +20,7 @@ use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datafusion_common::ScalarValue;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::SchemaRef;
use snafu::{Location, Snafu};
pub type Result<T> = std::result::Result<T, Error>;
@@ -138,12 +139,28 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Cannot construct an empty stream"))]
EmptyStream {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Schema not match, left: {:?}, right: {:?}", left, right))]
SchemaNotMatch {
left: SchemaRef,
right: SchemaRef,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::NewDfRecordBatch { .. } => StatusCode::InvalidArguments,
Error::NewDfRecordBatch { .. }
| Error::EmptyStream { .. }
| Error::SchemaNotMatch { .. } => StatusCode::InvalidArguments,
Error::DataTypes { .. }
| Error::CreateRecordBatches { .. }

View File

@@ -12,10 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use futures::TryStreamExt;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::error::Result;
use crate::{RecordBatch, RecordBatches, SendableRecordBatchStream};
use arc_swap::ArcSwapOption;
use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt, TryStreamExt};
use snafu::ensure;
use crate::adapter::RecordBatchMetrics;
use crate::error::{EmptyStreamSnafu, Result, SchemaNotMatchSnafu};
use crate::{
OrderOption, RecordBatch, RecordBatchStream, RecordBatches, SendableRecordBatchStream,
};
/// Collect all the items from the stream into a vector of [`RecordBatch`].
pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatch>> {
@@ -29,6 +39,91 @@ pub async fn collect_batches(stream: SendableRecordBatchStream) -> Result<Record
RecordBatches::try_new(schema, batches)
}
/// A stream that chains multiple streams into a single stream.
pub struct ChainedRecordBatchStream {
inputs: Vec<SendableRecordBatchStream>,
curr_index: usize,
schema: SchemaRef,
metrics: Arc<ArcSwapOption<RecordBatchMetrics>>,
}
impl ChainedRecordBatchStream {
pub fn new(inputs: Vec<SendableRecordBatchStream>) -> Result<Self> {
// check length
ensure!(!inputs.is_empty(), EmptyStreamSnafu);
// check schema
let first_schema = inputs[0].schema();
for input in inputs.iter().skip(1) {
let schema = input.schema();
ensure!(
first_schema == schema,
SchemaNotMatchSnafu {
left: first_schema,
right: schema
}
);
}
Ok(Self {
inputs,
curr_index: 0,
schema: first_schema,
metrics: Default::default(),
})
}
fn sequence_poll(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
if self.curr_index >= self.inputs.len() {
return Poll::Ready(None);
}
let curr_index = self.curr_index;
match self.inputs[curr_index].poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(batch))) => Poll::Ready(Some(Ok(batch))),
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(None) => {
self.curr_index += 1;
if self.curr_index < self.inputs.len() {
self.sequence_poll(ctx)
} else {
Poll::Ready(None)
}
}
Poll::Pending => Poll::Pending,
}
}
}
impl RecordBatchStream for ChainedRecordBatchStream {
fn name(&self) -> &str {
"ChainedRecordBatchStream"
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
self.metrics.load().as_ref().map(|m| m.as_ref().clone())
}
}
impl Stream for ChainedRecordBatchStream {
type Item = Result<RecordBatch>;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.sequence_poll(ctx)
}
}
#[cfg(test)]
mod tests {
use std::pin::Pin;

View File

@@ -90,7 +90,7 @@ impl RegionEngine for FileRegionEngine {
request: ScanRequest,
) -> Result<RegionScannerRef, BoxedError> {
let stream = self.handle_query(region_id, request).await?;
let scanner = Arc::new(SinglePartitionScanner::new(stream));
let scanner = Box::new(SinglePartitionScanner::new(stream));
Ok(scanner)
}

View File

@@ -32,14 +32,11 @@ use api::region::RegionResponse;
use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_recordbatch::SendableRecordBatchStream;
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{
RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse, SinglePartitionScanner,
};
use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse};
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
@@ -174,9 +171,7 @@ impl RegionEngine for MetricEngine {
region_id: RegionId,
request: ScanRequest,
) -> Result<RegionScannerRef, BoxedError> {
let stream = self.handle_query(region_id, request).await?;
let scanner = Arc::new(SinglePartitionScanner::new(stream));
Ok(scanner)
self.handle_query(region_id, request).await
}
/// Retrieves region's metadata.
@@ -269,7 +264,7 @@ impl MetricEngine {
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
) -> Result<RegionScannerRef, BoxedError> {
self.inner
.read_region(region_id, request)
.await
@@ -277,6 +272,17 @@ impl MetricEngine {
}
}
#[cfg(test)]
impl MetricEngine {
pub async fn scan_to_stream(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<common_recordbatch::SendableRecordBatchStream, BoxedError> {
self.inner.scan_to_stream(region_id, request).await
}
}
struct MetricEngineInner {
mito: MitoEngine,
metadata_region: MetadataRegion,

View File

@@ -235,7 +235,7 @@ mod tests {
let request = ScanRequest::default();
let stream = env
.metric()
.handle_query(physical_region_id, request)
.scan_to_stream(physical_region_id, request)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
@@ -255,7 +255,7 @@ mod tests {
let request = ScanRequest::default();
let stream = env
.metric()
.handle_query(logical_region_id, request)
.scan_to_stream(logical_region_id, request)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();

View File

@@ -15,13 +15,12 @@
use std::sync::Arc;
use api::v1::SemanticType;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{error, info, tracing};
use datafusion::logical_expr::{self, Expr};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadataBuilder, RegionMetadataRef};
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::region_engine::RegionEngine;
use store_api::region_engine::{RegionEngine, RegionScannerRef};
use store_api::storage::{RegionId, ScanRequest};
use crate::engine::MetricEngineInner;
@@ -37,7 +36,7 @@ impl MetricEngineInner {
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream> {
) -> Result<RegionScannerRef> {
let is_reading_physical_region = self.is_physical_region(region_id);
if is_reading_physical_region {
@@ -55,13 +54,13 @@ impl MetricEngineInner {
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream> {
) -> Result<RegionScannerRef> {
let _timer = MITO_OPERATION_ELAPSED
.with_label_values(&["read_physical"])
.start_timer();
self.mito
.scan_to_stream(region_id, request)
.handle_query(region_id, request)
.await
.context(MitoReadOperationSnafu)
}
@@ -70,7 +69,7 @@ impl MetricEngineInner {
&self,
logical_region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream> {
) -> Result<RegionScannerRef> {
let _timer = MITO_OPERATION_ELAPSED
.with_label_values(&["read"])
.start_timer();
@@ -81,7 +80,7 @@ impl MetricEngineInner {
.transform_request(physical_region_id, logical_region_id, request)
.await?;
self.mito
.scan_to_stream(data_region_id, request)
.handle_query(data_region_id, request)
.await
.context(MitoReadOperationSnafu)
}
@@ -253,6 +252,37 @@ impl MetricEngineInner {
}
}
#[cfg(test)]
impl MetricEngineInner {
pub async fn scan_to_stream(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<common_recordbatch::SendableRecordBatchStream, common_error::ext::BoxedError> {
let is_reading_physical_region = self.is_physical_region(region_id);
if is_reading_physical_region {
self.mito
.scan_to_stream(region_id, request)
.await
.map_err(common_error::ext::BoxedError::new)
} else {
let physical_region_id = self
.get_physical_region_id(region_id)
.await
.map_err(common_error::ext::BoxedError::new)?;
let request = self
.transform_request(physical_region_id, region_id, request)
.await
.map_err(common_error::ext::BoxedError::new)?;
self.mito
.scan_to_stream(physical_region_id, request)
.await
.map_err(common_error::ext::BoxedError::new)
}
}
}
#[cfg(test)]
mod test {
use store_api::region_request::RegionRequest;

View File

@@ -135,6 +135,9 @@ impl MitoEngine {
}
/// Handle substrait query and return a stream of record batches
///
/// Notice that the output stream's ordering is not guranateed. If order
/// matter, please use [`scanner`] to build a [`Scanner`] to consume.
#[tracing::instrument(skip_all)]
pub async fn scan_to_stream(
&self,

View File

@@ -697,6 +697,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Partition {} out of range, {} in total", given, all))]
PartitionOutOfRange {
given: usize,
all: usize,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to iter data part"))]
ReadDataPart {
#[snafu(source)]
@@ -796,7 +804,8 @@ impl ErrorExt for Error {
| ColumnNotFound { .. }
| InvalidMetadata { .. }
| InvalidRegionOptions { .. }
| InvalidWalReadRequest { .. } => StatusCode::InvalidArguments,
| InvalidWalReadRequest { .. }
| PartitionOutOfRange { .. } => StatusCode::InvalidArguments,
InvalidRegionRequestSchemaVersion { .. } => StatusCode::RequestOutdated,

View File

@@ -68,8 +68,8 @@ impl Scanner {
/// Returns a [RegionScanner] to scan the region.
pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
match self {
Scanner::Seq(seq_scan) => Ok(Arc::new(seq_scan)),
Scanner::Unordered(unordered_scan) => Ok(Arc::new(unordered_scan)),
Scanner::Seq(seq_scan) => Ok(Box::new(seq_scan)),
Scanner::Unordered(unordered_scan) => Ok(Box::new(unordered_scan)),
}
}
}

View File

@@ -21,18 +21,19 @@ use std::time::Instant;
use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::util::ChainedRecordBatchStream;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::debug;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use smallvec::smallvec;
use snafu::ResultExt;
use store_api::region_engine::{RegionScanner, ScannerPartitioning, ScannerProperties};
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use tokio::sync::Semaphore;
use crate::error::Result;
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::memtable::MemtableRef;
use crate::read::dedup::{DedupReader, LastRow};
use crate::read::merge::MergeReaderBuilder;
@@ -46,7 +47,8 @@ use crate::sst::parquet::reader::ReaderMetrics;
/// Scans a region and returns rows in a sorted sequence.
///
/// The output order is always `order by primary keys, time index`.
/// The output order is always `order by primary keys, time index` inside every
/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
pub struct SeqScan {
/// Properties of the scanner.
properties: ScannerProperties,
@@ -61,7 +63,7 @@ impl SeqScan {
/// Creates a new [SeqScan].
pub(crate) fn new(input: ScanInput) -> Self {
let parallelism = input.parallelism.parallelism.max(1);
let properties = ScannerProperties::new(ScannerPartitioning::Unknown(parallelism));
let properties = ScannerProperties::new_with_partitions(parallelism);
let stream_ctx = Arc::new(StreamContext::new(input));
Self {
@@ -72,8 +74,16 @@ impl SeqScan {
}
/// Builds a stream for the query.
///
/// The returned stream is not partitioned and will contains all the data. If want
/// partitioned scan, use [`RegionScanner::scan_partition`].
pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
self.scan_partition_opt(None)
let streams = (0..self.properties.partitions.len())
.map(|partition: usize| self.scan_partition(partition))
.collect::<Result<Vec<_>, _>>()?;
let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
Ok(Box::pin(aggr_stream))
}
/// Builds a [BoxedBatchReader] from sequential scan for compaction.
@@ -83,7 +93,7 @@ impl SeqScan {
..Default::default()
};
let maybe_reader =
Self::build_merge_reader(&self.stream_ctx, None, self.semaphore.clone(), &mut metrics)
Self::build_all_merge_reader(&self.stream_ctx, self.semaphore.clone(), &mut metrics)
.await?;
// Safety: `build_merge_reader()` always returns a reader if partition is None.
let reader = maybe_reader.unwrap();
@@ -137,32 +147,56 @@ impl SeqScan {
Ok(())
}
/// Builds a merge reader.
/// If `partition` is None, reads all partitions.
/// If the `partition` is out of bound, returns None.
async fn build_merge_reader(
/// Builds a merge reader that reads all data.
async fn build_all_merge_reader(
stream_ctx: &StreamContext,
partition: Option<usize>,
semaphore: Arc<Semaphore>,
metrics: &mut ScannerMetrics,
) -> Result<Option<BoxedBatchReader>> {
// initialize parts list
let mut parts = stream_ctx.parts.lock().await;
maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?;
let parts_len = parts.len();
let mut sources = Vec::new();
if let Some(index) = partition {
let Some(part) = parts.get_part(index) else {
let mut sources = Vec::with_capacity(parts_len);
for id in 0..parts_len {
let Some(part) = parts.get_part(id) else {
return Ok(None);
};
Self::build_part_sources(part, &mut sources)?;
} else {
// Safety: We initialized parts before.
for part in parts.0.as_ref().unwrap() {
Self::build_part_sources(part, &mut sources)?;
}
}
Self::build_reader_from_sources(stream_ctx, sources, semaphore).await
}
/// Builds a merge reader that reads data from one [`PartitionRange`].
///
/// If the `range_id` is out of bound, returns None.
async fn build_merge_reader(
stream_ctx: &StreamContext,
range_id: usize,
semaphore: Arc<Semaphore>,
metrics: &mut ScannerMetrics,
) -> Result<Option<BoxedBatchReader>> {
let mut parts = stream_ctx.parts.lock().await;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?;
let mut sources = Vec::new();
let Some(part) = parts.get_part(range_id) else {
return Ok(None);
};
Self::build_part_sources(part, &mut sources)?;
Self::build_reader_from_sources(stream_ctx, sources, semaphore).await
}
async fn build_reader_from_sources(
stream_ctx: &StreamContext,
mut sources: Vec<Source>,
semaphore: Arc<Semaphore>,
) -> Result<Option<BoxedBatchReader>> {
if stream_ctx.input.parallelism.parallelism > 1 {
// Read sources in parallel. We always spawn a task so we can control the parallelism
// by the semaphore.
@@ -187,52 +221,71 @@ impl SeqScan {
}
}
/// Scans one partition or all partitions.
fn scan_partition_opt(
/// Scans the given partition when the part list is set properly.
/// Otherwise the returned stream might not contains any data.
// TODO: refactor out `uncached_scan_part_impl`.
#[allow(dead_code)]
fn scan_partition_impl(
&self,
partition: Option<usize>,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
if partition >= self.properties.partitions.len() {
return Err(BoxedError::new(
PartitionOutOfRangeSnafu {
given: partition,
all: self.properties.partitions.len(),
}
.build(),
));
}
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.prepare_scan_cost,
..Default::default()
};
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
let partition_ranges = self.properties.partitions[partition].clone();
let stream = try_stream! {
let maybe_reader = Self::build_merge_reader(&stream_ctx, partition, semaphore, &mut metrics)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let Some(mut reader) = maybe_reader else {
return;
};
let cache = stream_ctx.input.cache_manager.as_deref();
let mut fetch_start = Instant::now();
while let Some(batch) = reader
.next_batch()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
{
for partition_range in partition_ranges {
let maybe_reader =
Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let Some(mut reader) = maybe_reader else {
return;
};
let cache = stream_ctx.input.cache_manager.as_deref();
let mut fetch_start = Instant::now();
while let Some(batch) = reader
.next_batch()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
{
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
let convert_start = Instant::now();
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
yield record_batch;
fetch_start = Instant::now();
}
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
metrics.total_cost = stream_ctx.query_start.elapsed();
metrics.observe_metrics_on_finish();
let convert_start = Instant::now();
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
yield record_batch;
fetch_start = Instant::now();
debug!(
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}",
stream_ctx.input.mapper.metadata().region_id,
partition,
metrics,
);
}
metrics.scan_cost += fetch_start.elapsed();
metrics.total_cost = stream_ctx.query_start.elapsed();
metrics.observe_metrics_on_finish();
debug!(
"Seq scan finished, region_id: {:?}, partition: {:?}, metrics: {:?}",
stream_ctx.input.mapper.metadata().region_id, partition, metrics,
);
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
@@ -242,6 +295,116 @@ impl SeqScan {
Ok(stream)
}
/// Scans the given partition when the part list is not set.
/// This method will do a lazy initialize of part list and
/// ignores the partition settings in `properties`.
fn uncached_scan_part_impl(
&self,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
let num_partitions = self.properties.partitions.len();
if partition >= num_partitions {
return Err(BoxedError::new(
PartitionOutOfRangeSnafu {
given: partition,
all: self.properties.partitions.len(),
}
.build(),
));
}
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.prepare_scan_cost,
..Default::default()
};
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
// build stream
let stream = try_stream! {
// init parts
let parts_len = {
let mut parts = stream_ctx.parts.lock().await;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics).await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
parts.len()
};
for id in (0..parts_len).skip(partition).step_by(num_partitions) {
let maybe_reader = Self::build_merge_reader(
&stream_ctx,
id,
semaphore.clone(),
&mut metrics,
)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let Some(mut reader) = maybe_reader else {
return;
};
let cache = stream_ctx.input.cache_manager.as_deref();
let mut fetch_start = Instant::now();
while let Some(batch) = reader
.next_batch()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
{
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
let convert_start = Instant::now();
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
yield record_batch;
fetch_start = Instant::now();
}
metrics.scan_cost += fetch_start.elapsed();
metrics.total_cost = stream_ctx.query_start.elapsed();
metrics.observe_metrics_on_finish();
debug!(
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}",
stream_ctx.input.mapper.metadata().region_id,
partition,
metrics,
);
}
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
self.stream_ctx.input.mapper.output_schema(),
Box::pin(stream),
));
Ok(stream)
}
/// Initializes parts if they are not built yet.
async fn maybe_init_parts(
input: &ScanInput,
part_list: &mut ScanPartList,
metrics: &mut ScannerMetrics,
) -> Result<()> {
if part_list.is_none() {
let now = Instant::now();
let mut distributor = SeqDistributor::default();
input.prune_file_ranges(&mut distributor).await?;
distributor.append_mem_ranges(
&input.memtables,
Some(input.mapper.column_ids()),
input.predicate.clone(),
);
part_list.set_parts(distributor.build_parts(input.parallelism.parallelism));
metrics.observe_init_part(now.elapsed());
}
Ok(())
}
}
impl RegionScanner for SeqScan {
@@ -254,7 +417,12 @@ impl RegionScanner for SeqScan {
}
fn scan_partition(&self, partition: usize) -> Result<SendableRecordBatchStream, BoxedError> {
self.scan_partition_opt(Some(partition))
self.uncached_scan_part_impl(partition)
}
fn prepare(&mut self, ranges: Vec<Vec<PartitionRange>>) -> Result<(), BoxedError> {
self.properties.partitions = ranges;
Ok(())
}
}
@@ -282,28 +450,6 @@ impl SeqScan {
}
}
/// Initializes parts if they are not built yet.
async fn maybe_init_parts(
input: &ScanInput,
part_list: &mut ScanPartList,
metrics: &mut ScannerMetrics,
) -> Result<()> {
if part_list.is_none() {
let now = Instant::now();
let mut distributor = SeqDistributor::default();
input.prune_file_ranges(&mut distributor).await?;
distributor.append_mem_ranges(
&input.memtables,
Some(input.mapper.column_ids()),
input.predicate.clone(),
);
part_list.set_parts(distributor.build_parts(input.parallelism.parallelism));
metrics.observe_init_part(now.elapsed());
}
Ok(())
}
/// Builds [ScanPart]s that preserves order.
#[derive(Default)]
pub(crate) struct SeqDistributor {

View File

@@ -28,7 +28,7 @@ use datatypes::schema::SchemaRef;
use futures::StreamExt;
use smallvec::smallvec;
use snafu::ResultExt;
use store_api::region_engine::{RegionScanner, ScannerPartitioning, ScannerProperties};
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
use store_api::storage::ColumnId;
use table::predicate::Predicate;
@@ -58,9 +58,8 @@ pub struct UnorderedScan {
impl UnorderedScan {
/// Creates a new [UnorderedScan].
pub(crate) fn new(input: ScanInput) -> Self {
let properties = ScannerProperties::new(ScannerPartitioning::Unknown(
input.parallelism.parallelism.max(1),
));
let properties =
ScannerProperties::new_with_partitions(input.parallelism.parallelism.max(1));
let stream_ctx = Arc::new(StreamContext::new(input));
Self {
@@ -71,7 +70,7 @@ impl UnorderedScan {
/// Scans the region and returns a stream.
pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
let part_num = self.properties.partitioning().num_partitions();
let part_num = self.properties.num_partitions();
let streams = (0..part_num)
.map(|i| self.scan_partition(i))
.collect::<Result<Vec<_>, BoxedError>>()?;
@@ -135,6 +134,11 @@ impl RegionScanner for UnorderedScan {
self.stream_ctx.input.mapper.output_schema()
}
fn prepare(&mut self, ranges: Vec<Vec<PartitionRange>>) -> Result<(), BoxedError> {
self.properties = ScannerProperties::new(ranges);
Ok(())
}
fn scan_partition(&self, partition: usize) -> Result<SendableRecordBatchStream, BoxedError> {
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.prepare_scan_cost,

View File

@@ -15,6 +15,7 @@
#![feature(let_chains)]
#![feature(int_roundings)]
#![feature(option_get_or_insert_default)]
#![feature(trait_upcasting)]
mod analyze;
pub mod dataframe;

View File

@@ -14,6 +14,7 @@
pub mod count_wildcard;
pub mod order_hint;
pub mod parallelize_scan;
pub mod remove_duplicate;
pub mod string_normalization;
#[cfg(test)]

View File

@@ -0,0 +1,97 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_telemetry::debug;
use datafusion::config::ConfigOptions;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{DataFusionError, Result};
use store_api::region_engine::PartitionRange;
use table::table::scan::RegionScanExec;
pub struct ParallelizeScan;
impl PhysicalOptimizerRule for ParallelizeScan {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
Self::do_optimize(plan, config)
}
fn name(&self) -> &str {
"parallelize_scan"
}
fn schema_check(&self) -> bool {
true
}
}
impl ParallelizeScan {
fn do_optimize(
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let result = plan
.transform_down(|plan| {
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
let ranges = region_scan_exec.get_partition_ranges();
let total_range_num = ranges.len();
let expected_partition_num = config.execution.target_partitions;
// assign ranges to each partition
let partition_ranges =
Self::assign_partition_range(ranges, expected_partition_num);
debug!(
"Assign {total_range_num} ranges to {expected_partition_num} partitions"
);
// update the partition ranges
region_scan_exec
.set_partitions(partition_ranges)
.map_err(|e| DataFusionError::External(e.into_inner()))?;
}
// The plan might be modified, but it's modified in-place so we always return
// Transformed::no(plan) to indicate there is no "new child"
Ok(Transformed::no(plan))
})?
.data;
Ok(result)
}
/// Distribute [`PartitionRange`]s to each partition.
///
/// Currently we use a simple round-robin strategy to assign ranges to partitions.
fn assign_partition_range(
ranges: Vec<PartitionRange>,
expected_partition_num: usize,
) -> Vec<Vec<PartitionRange>> {
let mut partition_ranges = vec![vec![]; expected_partition_num];
// round-robin assignment
for (i, range) in ranges.into_iter().enumerate() {
let partition_idx = i % expected_partition_num;
partition_ranges[partition_idx].push(range);
}
partition_ranges
}
}

View File

@@ -43,6 +43,7 @@ use table::TableRef;
use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer};
use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
use crate::optimizer::order_hint::OrderHintRule;
use crate::optimizer::parallelize_scan::ParallelizeScan;
use crate::optimizer::remove_duplicate::RemoveDuplicate;
use crate::optimizer::string_normalization::StringNormalizationRule;
use crate::optimizer::type_conversion::TypeConversionRule;
@@ -112,6 +113,11 @@ impl QueryEngineState {
// add physical optimizer
let mut physical_optimizer = PhysicalOptimizer::new();
// Change TableScan's partition at first
physical_optimizer
.rules
.insert(0, Arc::new(ParallelizeScan));
// Add rule to remove duplicate nodes generated by other rules. Run this in the last.
physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
let session_state = SessionState::new_with_config_rt(session_config, runtime_env)

View File

@@ -15,6 +15,7 @@ common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-recordbatch.workspace = true
common-time.workspace = true
common-wal.workspace = true
datafusion-expr.workspace = true
datafusion-physical-plan.workspace = true

View File

@@ -24,6 +24,7 @@ use async_trait::async_trait;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_recordbatch::SendableRecordBatchStream;
use common_time::Timestamp;
use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use futures::future::join_all;
@@ -141,42 +142,71 @@ impl ScannerPartitioning {
}
}
/// Represents one data range within a partition
#[derive(Debug, Clone, Copy)]
pub struct PartitionRange {
/// Start time of time index column. Inclusive.
pub start: Timestamp,
/// End time of time index column. Inclusive.
pub end: Timestamp,
/// Estimate size of this range. Is used to balance ranges between partitions.
/// No base unit, just a number.
pub estimated_size: usize,
/// Identifier to this range. Assigned by storage engine.
pub identifier: usize,
}
/// Properties of the [RegionScanner].
#[derive(Debug)]
pub struct ScannerProperties {
/// Partitions to scan.
partitioning: ScannerPartitioning,
/// A 2-dim partition ranges.
///
/// The first dim vector's length represents the output partition number. The second
/// dim is ranges within one partition.
pub partitions: Vec<Vec<PartitionRange>>,
}
impl ScannerProperties {
/// Creates a new [ScannerProperties] with the given partitioning.
pub fn new(partitioning: ScannerPartitioning) -> Self {
Self { partitioning }
/// Creates a new [`ScannerProperties`] with the given partitioning.
pub fn new(partitions: Vec<Vec<PartitionRange>>) -> Self {
Self { partitions }
}
/// Returns properties of partitions to scan.
pub fn partitioning(&self) -> &ScannerPartitioning {
&self.partitioning
/// Creates a new [`ScannerProperties`] with the given number of partitions.
pub fn new_with_partitions(partitions: usize) -> Self {
Self {
partitions: vec![vec![]; partitions],
}
}
pub fn num_partitions(&self) -> usize {
self.partitions.len()
}
}
/// A scanner that provides a way to scan the region concurrently.
/// The scanner splits the region into partitions so that each partition can be scanned concurrently.
/// You can use this trait to implement an [ExecutionPlan](datafusion_physical_plan::ExecutionPlan).
pub trait RegionScanner: Debug + DisplayAs + Send + Sync {
/// You can use this trait to implement an [`ExecutionPlan`](datafusion_physical_plan::ExecutionPlan).
pub trait RegionScanner: Debug + DisplayAs + Send {
/// Returns the properties of the scanner.
fn properties(&self) -> &ScannerProperties;
/// Returns the schema of the record batches.
fn schema(&self) -> SchemaRef;
/// Prepares the scanner with the given partition ranges.
///
/// This method is for the planner to adjust the scanner's behavior based on the partition ranges.
fn prepare(&mut self, ranges: Vec<Vec<PartitionRange>>) -> Result<(), BoxedError>;
/// Scans the partition and returns a stream of record batches.
///
/// # Panics
/// Panics if the `partition` is out of bound.
fn scan_partition(&self, partition: usize) -> Result<SendableRecordBatchStream, BoxedError>;
}
pub type RegionScannerRef = Arc<dyn RegionScanner>;
pub type RegionScannerRef = Box<dyn RegionScanner>;
pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
@@ -272,7 +302,7 @@ impl SinglePartitionScanner {
Self {
stream: Mutex::new(Some(stream)),
schema,
properties: ScannerProperties::new(ScannerPartitioning::Unknown(1)),
properties: ScannerProperties::new_with_partitions(1),
}
}
}
@@ -292,6 +322,11 @@ impl RegionScanner for SinglePartitionScanner {
self.schema.clone()
}
fn prepare(&mut self, ranges: Vec<Vec<PartitionRange>>) -> Result<(), BoxedError> {
self.properties = ScannerProperties::new(ranges);
Ok(())
}
fn scan_partition(&self, _partition: usize) -> Result<SendableRecordBatchStream, BoxedError> {
let mut stream = self.stream.lock().unwrap();
stream.take().ok_or_else(|| {

View File

@@ -110,7 +110,7 @@ impl TableProvider for DfTableProviderAdapter {
.collect::<Vec<_>>()
});
let scanner = Arc::new(SinglePartitionScanner::new(stream));
let scanner = Box::new(SinglePartitionScanner::new(stream));
let mut plan = RegionScanExec::new(scanner);
if let Some(sort_expr) = sort_expr {
plan = plan.with_output_ordering(sort_expr);

View File

@@ -14,9 +14,10 @@
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use common_error::ext::BoxedError;
use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream, SendableRecordBatchStream};
use common_telemetry::tracing::Span;
use common_telemetry::tracing_context::TracingContext;
@@ -31,14 +32,14 @@ use datafusion_common::DataFusionError;
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr};
use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use futures::{Stream, StreamExt};
use store_api::region_engine::RegionScannerRef;
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use crate::table::metrics::MemoryUsageMetrics;
/// A plan to read multiple partitions from a region of a table.
#[derive(Debug)]
pub struct RegionScanExec {
scanner: RegionScannerRef,
scanner: Mutex<RegionScannerRef>,
arrow_schema: ArrowSchemaRef,
/// The expected output ordering for the plan.
output_ordering: Option<Vec<PhysicalSortExpr>>,
@@ -50,7 +51,7 @@ impl RegionScanExec {
pub fn new(scanner: RegionScannerRef) -> Self {
let arrow_schema = scanner.schema().arrow_schema().clone();
let scanner_props = scanner.properties();
let mut num_output_partition = scanner_props.partitioning().num_partitions();
let mut num_output_partition = scanner_props.num_partitions();
// The meaning of word "partition" is different in different context. For datafusion
// it's about "parallelism" and for storage it's about "data range". Thus here we add
// a special case to handle the situation where the number of storage partition is 0.
@@ -63,7 +64,7 @@ impl RegionScanExec {
ExecutionMode::Bounded,
);
Self {
scanner,
scanner: Mutex::new(scanner),
arrow_schema,
output_ordering: None,
metric: ExecutionPlanMetricsSet::new(),
@@ -76,6 +77,27 @@ impl RegionScanExec {
self.output_ordering = Some(output_ordering);
self
}
/// Get the partition ranges of the scanner. This method will collapse the ranges into
/// a single vector.
pub fn get_partition_ranges(&self) -> Vec<PartitionRange> {
let scanner = self.scanner.lock().unwrap();
let raw_ranges = &scanner.properties().partitions;
// collapse the ranges
let mut ranges = Vec::with_capacity(raw_ranges.len());
for partition in raw_ranges {
ranges.extend_from_slice(partition);
}
ranges
}
/// Update the partition ranges of underlying scanner.
pub fn set_partitions(&self, partitions: Vec<Vec<PartitionRange>>) -> Result<(), BoxedError> {
let mut scanner = self.scanner.lock().unwrap();
scanner.prepare(partitions)
}
}
impl ExecutionPlan for RegionScanExec {
@@ -113,6 +135,8 @@ impl ExecutionPlan for RegionScanExec {
let stream = self
.scanner
.lock()
.unwrap()
.scan_partition(partition)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition);
@@ -131,7 +155,10 @@ impl ExecutionPlan for RegionScanExec {
impl DisplayAs for RegionScanExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
// The scanner contains all information needed to display the plan.
self.scanner.fmt_as(t, f)
match self.scanner.try_lock() {
Ok(scanner) => scanner.fmt_as(t, f),
Err(_) => write!(f, "RegionScanExec <locked>"),
}
}
}
@@ -217,7 +244,7 @@ mod test {
RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
let stream = recordbatches.as_stream();
let scanner = Arc::new(SinglePartitionScanner::new(stream));
let scanner = Box::new(SinglePartitionScanner::new(stream));
let plan = RegionScanExec::new(scanner);
let actual: SchemaRef = Arc::new(
plan.properties

View File

@@ -137,6 +137,11 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
|_|_PromSeriesDivideExec: tags=["k"], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after parallelize_scan_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after OutputRequirements_| OutputRequirementExec_|
|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|