mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-16 10:12:58 +00:00
feat: scanner prepare by request
This commit is contained in:
@@ -625,7 +625,6 @@ impl ScanInput {
|
||||
return Ok(sources);
|
||||
}
|
||||
|
||||
debug_assert!(self.parallelism.parallelism > 1);
|
||||
// Spawn a task for each source.
|
||||
let sources = sources
|
||||
.into_iter()
|
||||
|
||||
@@ -28,7 +28,7 @@ use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
|
||||
use datatypes::schema::SchemaRef;
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
|
||||
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
|
||||
use store_api::storage::TimeSeriesRowSelector;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
@@ -135,7 +135,8 @@ impl SeqScan {
|
||||
Self::build_reader_from_sources(stream_ctx, sources, None).await
|
||||
}
|
||||
|
||||
// TODO(yingwen): Only merge and dedup when num source > 1.
|
||||
/// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
|
||||
/// if possible.
|
||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
async fn build_reader_from_sources(
|
||||
stream_ctx: &StreamContext,
|
||||
@@ -143,11 +144,12 @@ impl SeqScan {
|
||||
semaphore: Option<Arc<Semaphore>>,
|
||||
) -> Result<BoxedBatchReader> {
|
||||
if let Some(semaphore) = semaphore.as_ref() {
|
||||
// Read sources in parallel. We always spawn a task so we can control the parallelism
|
||||
// by the semaphore.
|
||||
sources = stream_ctx
|
||||
.input
|
||||
.create_parallel_sources(sources, semaphore.clone())?;
|
||||
// Read sources in parallel.
|
||||
if sources.len() > 1 {
|
||||
sources = stream_ctx
|
||||
.input
|
||||
.create_parallel_sources(sources, semaphore.clone())?;
|
||||
}
|
||||
}
|
||||
|
||||
let mut builder = MergeReaderBuilder::from_sources(sources);
|
||||
@@ -194,11 +196,20 @@ impl SeqScan {
|
||||
}
|
||||
|
||||
let stream_ctx = self.stream_ctx.clone();
|
||||
// FIXME(yingwen): 1. Get target partition from prepare. 2. Get parallelism from prepare.
|
||||
let semaphore = Arc::new(Semaphore::new(self.properties.partitions.len()));
|
||||
let semaphore = if self.properties.target_partitions() > self.properties.num_partitions() {
|
||||
// We can use addtional tasks to read the data. This semaphore is partition level.
|
||||
// We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
|
||||
// of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
|
||||
// files in a part range.
|
||||
Some(Arc::new(Semaphore::new(
|
||||
self.properties.target_partitions() - self.properties.num_partitions() + 1,
|
||||
)))
|
||||
} else {
|
||||
Some(Arc::new(Semaphore::new(1)))
|
||||
};
|
||||
let partition_ranges = self.properties.partitions[partition].clone();
|
||||
let compaction = self.compaction;
|
||||
let distinguish_range = self.properties.distinguish_partition_range();
|
||||
let distinguish_range = self.properties.distinguish_partition_range;
|
||||
let part_metrics = PartitionMetrics::new(
|
||||
self.stream_ctx.input.mapper.metadata().region_id,
|
||||
partition,
|
||||
@@ -230,7 +241,7 @@ impl SeqScan {
|
||||
);
|
||||
|
||||
let mut reader =
|
||||
Self::build_reader_from_sources(&stream_ctx, sources, Some(semaphore.clone()))
|
||||
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
@@ -313,13 +324,8 @@ impl RegionScanner for SeqScan {
|
||||
self.scan_partition_impl(partition)
|
||||
}
|
||||
|
||||
fn prepare(
|
||||
&mut self,
|
||||
ranges: Vec<Vec<PartitionRange>>,
|
||||
distinguish_partition_range: bool,
|
||||
) -> Result<(), BoxedError> {
|
||||
self.properties.partitions = ranges;
|
||||
self.properties.distinguish_partition_range = distinguish_partition_range;
|
||||
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
|
||||
self.properties.prepare(request);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ use datatypes::schema::SchemaRef;
|
||||
use futures::{Stream, StreamExt};
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
|
||||
use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties};
|
||||
|
||||
use crate::error::{PartitionOutOfRangeSnafu, Result};
|
||||
use crate::read::range::RangeBuilderList;
|
||||
@@ -144,7 +144,7 @@ impl UnorderedScan {
|
||||
);
|
||||
let stream_ctx = self.stream_ctx.clone();
|
||||
let part_ranges = self.properties.partitions[partition].clone();
|
||||
let distinguish_range = self.properties.distinguish_partition_range();
|
||||
let distinguish_range = self.properties.distinguish_partition_range;
|
||||
|
||||
let stream = try_stream! {
|
||||
part_metrics.on_first_poll();
|
||||
@@ -231,13 +231,8 @@ impl RegionScanner for UnorderedScan {
|
||||
self.stream_ctx.input.mapper.output_schema()
|
||||
}
|
||||
|
||||
fn prepare(
|
||||
&mut self,
|
||||
ranges: Vec<Vec<PartitionRange>>,
|
||||
distinguish_partition_range: bool,
|
||||
) -> Result<(), BoxedError> {
|
||||
self.properties.partitions = ranges;
|
||||
self.properties.distinguish_partition_range = distinguish_partition_range;
|
||||
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
|
||||
self.properties.prepare(request);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -94,7 +94,7 @@ impl ParallelizeScan {
|
||||
|
||||
// update the partition ranges
|
||||
let new_exec = region_scan_exec
|
||||
.with_new_partitions(partition_ranges)
|
||||
.with_new_partitions(partition_ranges, expected_partition_num)
|
||||
.map_err(|e| DataFusionError::External(e.into_inner()))?;
|
||||
return Ok(Transformed::yes(Arc::new(new_exec)));
|
||||
}
|
||||
|
||||
@@ -206,16 +206,13 @@ pub struct ScannerProperties {
|
||||
|
||||
/// Whether to yield an empty batch to distinguish partition ranges.
|
||||
pub distinguish_partition_range: bool,
|
||||
|
||||
/// The target partitions of the scanner. 0 indicates using the number of partitions as target partitions.
|
||||
target_partitions: usize,
|
||||
}
|
||||
|
||||
impl ScannerProperties {
|
||||
/// Initialize partitions with given parallelism for scanner.
|
||||
pub fn with_parallelism(mut self, parallelism: usize) -> Self {
|
||||
self.partitions = vec![vec![]; parallelism];
|
||||
self
|
||||
}
|
||||
|
||||
/// Set append mode for scanner.
|
||||
/// Sets append mode for scanner.
|
||||
pub fn with_append_mode(mut self, append_mode: bool) -> Self {
|
||||
self.append_mode = append_mode;
|
||||
self
|
||||
@@ -234,9 +231,24 @@ impl ScannerProperties {
|
||||
append_mode,
|
||||
total_rows,
|
||||
distinguish_partition_range: false,
|
||||
target_partitions: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Updates the properties with the given [PrepareRequest].
|
||||
pub fn prepare(&mut self, request: PrepareRequest) {
|
||||
if let Some(ranges) = request.ranges {
|
||||
self.partitions = ranges;
|
||||
}
|
||||
if let Some(distinguish_partition_range) = request.distinguish_partition_range {
|
||||
self.distinguish_partition_range = distinguish_partition_range;
|
||||
}
|
||||
if let Some(target_partitions) = request.target_partitions {
|
||||
self.target_partitions = target_partitions;
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of actual partitions.
|
||||
pub fn num_partitions(&self) -> usize {
|
||||
self.partitions.len()
|
||||
}
|
||||
@@ -249,8 +261,44 @@ impl ScannerProperties {
|
||||
self.total_rows
|
||||
}
|
||||
|
||||
pub fn distinguish_partition_range(&self) -> bool {
|
||||
self.distinguish_partition_range
|
||||
/// Returns the target partitions of the scanner. If it is not set, returns the number of partitions.
|
||||
pub fn target_partitions(&self) -> usize {
|
||||
if self.target_partitions == 0 {
|
||||
self.num_partitions()
|
||||
} else {
|
||||
self.target_partitions
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Request to override the scanner properties.
|
||||
#[derive(Default)]
|
||||
pub struct PrepareRequest {
|
||||
/// Assigned partition ranges.
|
||||
pub ranges: Option<Vec<Vec<PartitionRange>>>,
|
||||
/// Distringuishes partition range by empty batches.
|
||||
pub distinguish_partition_range: Option<bool>,
|
||||
/// The expected number of target partitions.
|
||||
pub target_partitions: Option<usize>,
|
||||
}
|
||||
|
||||
impl PrepareRequest {
|
||||
/// Sets the ranges.
|
||||
pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
|
||||
self.ranges = Some(ranges);
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the distinguish partition range flag.
|
||||
pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
|
||||
self.distinguish_partition_range = Some(distinguish_partition_range);
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the target partitions.
|
||||
pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
|
||||
self.target_partitions = Some(target_partitions);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -271,11 +319,7 @@ pub trait RegionScanner: Debug + DisplayAs + Send {
|
||||
/// Prepares the scanner with the given partition ranges.
|
||||
///
|
||||
/// This method is for the planner to adjust the scanner's behavior based on the partition ranges.
|
||||
fn prepare(
|
||||
&mut self,
|
||||
ranges: Vec<Vec<PartitionRange>>,
|
||||
distinguish_partition_range: bool,
|
||||
) -> Result<(), BoxedError>;
|
||||
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
|
||||
|
||||
/// Scans the partition and returns a stream of record batches.
|
||||
///
|
||||
@@ -431,9 +475,7 @@ impl SinglePartitionScanner {
|
||||
Self {
|
||||
stream: Mutex::new(Some(stream)),
|
||||
schema,
|
||||
properties: ScannerProperties::default()
|
||||
.with_parallelism(1)
|
||||
.with_append_mode(append_mode),
|
||||
properties: ScannerProperties::default().with_append_mode(append_mode),
|
||||
metadata,
|
||||
}
|
||||
}
|
||||
@@ -454,13 +496,8 @@ impl RegionScanner for SinglePartitionScanner {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
fn prepare(
|
||||
&mut self,
|
||||
ranges: Vec<Vec<PartitionRange>>,
|
||||
distinguish_partition_range: bool,
|
||||
) -> Result<(), BoxedError> {
|
||||
self.properties.partitions = ranges;
|
||||
self.properties.distinguish_partition_range = distinguish_partition_range;
|
||||
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
|
||||
self.properties.prepare(request);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ use datafusion_common::{ColumnStatistics, DataFusionError, Statistics};
|
||||
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr};
|
||||
use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
|
||||
use futures::{Stream, StreamExt};
|
||||
use store_api::region_engine::{PartitionRange, RegionScannerRef};
|
||||
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScannerRef};
|
||||
|
||||
use crate::table::metrics::StreamMetrics;
|
||||
|
||||
@@ -112,6 +112,7 @@ impl RegionScanExec {
|
||||
pub fn with_new_partitions(
|
||||
&self,
|
||||
partitions: Vec<Vec<PartitionRange>>,
|
||||
target_partitions: usize,
|
||||
) -> Result<Self, BoxedError> {
|
||||
if self.is_partition_set {
|
||||
warn!("Setting partition ranges more than once for RegionScanExec");
|
||||
@@ -123,8 +124,11 @@ impl RegionScanExec {
|
||||
|
||||
{
|
||||
let mut scanner = self.scanner.lock().unwrap();
|
||||
let distinguish_partition_range = scanner.properties().distinguish_partition_range();
|
||||
scanner.prepare(partitions, distinguish_partition_range)?;
|
||||
scanner.prepare(
|
||||
PrepareRequest::default()
|
||||
.with_ranges(partitions)
|
||||
.with_target_partitions(target_partitions),
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
@@ -141,9 +145,10 @@ impl RegionScanExec {
|
||||
|
||||
pub fn with_distinguish_partition_range(&self, distinguish_partition_range: bool) {
|
||||
let mut scanner = self.scanner.lock().unwrap();
|
||||
let partition_ranges = scanner.properties().partitions.clone();
|
||||
// set distinguish_partition_range won't fail
|
||||
let _ = scanner.prepare(partition_ranges, distinguish_partition_range);
|
||||
let _ = scanner.prepare(
|
||||
PrepareRequest::default().with_distinguish_partition_range(distinguish_partition_range),
|
||||
);
|
||||
}
|
||||
|
||||
pub fn time_index(&self) -> String {
|
||||
|
||||
Reference in New Issue
Block a user