fix: prune memtable/files range independently in each partition (#4998)

* feat: prune in each partition

* chore: change pick log to trace

* chore: add in progress partition scan to metrics

* feat: seqscan support pruning in partition

* chore: remove commented codes
This commit is contained in:
Yingwen
2024-11-19 20:43:30 +08:00
committed by GitHub
parent 2f260d8b27
commit 63bbfd04c7
7 changed files with 245 additions and 178 deletions

View File

@@ -16,7 +16,7 @@ use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use common_telemetry::{debug, info};
use common_telemetry::{info, trace};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
@@ -114,7 +114,7 @@ impl TwcsPicker {
// Files in window exceeds file num limit
vec![enforce_file_num(&files.files, max_files)]
} else {
debug!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
trace!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
continue;
};

View File

@@ -26,6 +26,8 @@ pub const FLUSH_REASON: &str = "reason";
pub const FILE_TYPE_LABEL: &str = "file_type";
/// Region worker id label.
pub const WORKER_LABEL: &str = "worker";
/// Partition label.
pub const PARTITION_LABEL: &str = "partition";
lazy_static! {
/// Global write buffer size in bytes.
@@ -134,6 +136,13 @@ lazy_static! {
)
.unwrap();
pub static ref READ_STAGE_FETCH_PAGES: Histogram = READ_STAGE_ELAPSED.with_label_values(&["fetch_pages"]);
/// Number of in-progress scan per partition.
pub static ref IN_PROGRESS_SCAN: IntGaugeVec = register_int_gauge_vec!(
"greptime_mito_in_progress_scan",
"mito in progress scan per partition",
&[TYPE_LABEL, PARTITION_LABEL]
)
.unwrap();
/// Counter of rows read from different source.
pub static ref READ_ROWS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap();

View File

@@ -14,15 +14,22 @@
//! Structs for partition ranges.
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use common_time::Timestamp;
use parquet::arrow::arrow_reader::RowSelection;
use smallvec::{smallvec, SmallVec};
use store_api::region_engine::PartitionRange;
use crate::cache::CacheManager;
use crate::memtable::MemtableRef;
use crate::error::Result;
use crate::memtable::{MemtableRange, MemtableRef};
use crate::read::scan_region::ScanInput;
use crate::sst::file::{overlaps, FileHandle, FileTimeRange};
use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
use crate::sst::parquet::format::parquet_row_group_time_range;
use crate::sst::parquet::reader::ReaderMetrics;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
const ALL_ROW_GROUPS: i64 = -1;
@@ -334,6 +341,160 @@ fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
new_ranges
}
/// Builder to create file ranges.
#[derive(Default)]
pub(crate) struct FileRangeBuilder {
/// Context for the file.
/// None indicates nothing to read.
context: Option<FileRangeContextRef>,
/// Row selections for each row group to read.
/// It skips the row group if it is not in the map.
row_groups: BTreeMap<usize, Option<RowSelection>>,
}
impl FileRangeBuilder {
/// Builds a file range builder from context and row groups.
pub(crate) fn new(
context: FileRangeContextRef,
row_groups: BTreeMap<usize, Option<RowSelection>>,
) -> Self {
Self {
context: Some(context),
row_groups,
}
}
/// Builds file ranges to read.
/// Negative `row_group_index` indicates all row groups.
pub(crate) fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
let Some(context) = self.context.clone() else {
return;
};
if row_group_index >= 0 {
let row_group_index = row_group_index as usize;
// Scans one row group.
let Some(row_selection) = self.row_groups.get(&row_group_index) else {
return;
};
ranges.push(FileRange::new(
context,
row_group_index,
row_selection.clone(),
));
} else {
// Scans all row groups.
ranges.extend(
self.row_groups
.iter()
.map(|(row_group_index, row_selection)| {
FileRange::new(context.clone(), *row_group_index, row_selection.clone())
}),
);
}
}
}
/// Builder to create mem ranges.
pub(crate) struct MemRangeBuilder {
/// Ranges of a memtable.
row_groups: BTreeMap<usize, MemtableRange>,
}
impl MemRangeBuilder {
/// Builds a mem range builder from row groups.
pub(crate) fn new(row_groups: BTreeMap<usize, MemtableRange>) -> Self {
Self { row_groups }
}
/// Builds mem ranges to read in the memtable.
/// Negative `row_group_index` indicates all row groups.
fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[MemtableRange; 2]>) {
if row_group_index >= 0 {
let row_group_index = row_group_index as usize;
// Scans one row group.
let Some(range) = self.row_groups.get(&row_group_index) else {
return;
};
ranges.push(range.clone());
} else {
ranges.extend(self.row_groups.values().cloned());
}
}
}
/// List to manages the builders to create file ranges.
/// Each scan partition should have its own list. Mutex inside this list is used to allow moving
/// the list to different streams in the same partition.
pub(crate) struct RangeBuilderList {
num_memtables: usize,
mem_builders: Mutex<Vec<Option<MemRangeBuilder>>>,
file_builders: Mutex<Vec<Option<Arc<FileRangeBuilder>>>>,
}
impl RangeBuilderList {
/// Creates a new [ReaderBuilderList] with the given number of memtables and files.
pub(crate) fn new(num_memtables: usize, num_files: usize) -> Self {
let mem_builders = (0..num_memtables).map(|_| None).collect();
let file_builders = (0..num_files).map(|_| None).collect();
Self {
num_memtables,
mem_builders: Mutex::new(mem_builders),
file_builders: Mutex::new(file_builders),
}
}
/// Builds file ranges to read the row group at `index`.
pub(crate) async fn build_file_ranges(
&self,
input: &ScanInput,
index: RowGroupIndex,
reader_metrics: &mut ReaderMetrics,
) -> Result<SmallVec<[FileRange; 2]>> {
let mut ranges = SmallVec::new();
let file_index = index.index - self.num_memtables;
let builder_opt = self.get_file_builder(file_index);
match builder_opt {
Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
None => {
let builder = input.prune_file(file_index, reader_metrics).await?;
builder.build_ranges(index.row_group_index, &mut ranges);
self.set_file_builder(file_index, Arc::new(builder));
}
}
Ok(ranges)
}
/// Builds mem ranges to read the row group at `index`.
pub(crate) fn build_mem_ranges(
&self,
input: &ScanInput,
index: RowGroupIndex,
) -> SmallVec<[MemtableRange; 2]> {
let mut ranges = SmallVec::new();
let mut mem_builders = self.mem_builders.lock().unwrap();
match &mut mem_builders[index.index] {
Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
None => {
let builder = input.prune_memtable(index.index);
builder.build_ranges(index.row_group_index, &mut ranges);
mem_builders[index.index] = Some(builder);
}
}
ranges
}
fn get_file_builder(&self, index: usize) -> Option<Arc<FileRangeBuilder>> {
let file_builders = self.file_builders.lock().unwrap();
file_builders[index].clone()
}
fn set_file_builder(&self, index: usize, builder: Arc<FileRangeBuilder>) {
let mut file_builders = self.file_builders.lock().unwrap();
file_builders[index] = Some(builder);
}
}
#[cfg(test)]
mod tests {
use common_time::timestamp::TimeUnit;

View File

@@ -14,9 +14,9 @@
//! Scans a region according to the scan request.
use std::collections::{BTreeMap, HashSet};
use std::collections::HashSet;
use std::fmt;
use std::sync::{Arc, Mutex as StdMutex};
use std::sync::Arc;
use std::time::Instant;
use common_error::ext::BoxedError;
@@ -24,23 +24,21 @@ use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{debug, error, tracing, warn};
use common_time::range::TimestampRange;
use datafusion_expr::utils::expr_to_columns;
use parquet::arrow::arrow_reader::RowSelection;
use smallvec::SmallVec;
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{ScanRequest, TimeSeriesRowSelector};
use table::predicate::{build_time_range_predicate, Predicate};
use tokio::sync::{mpsc, Mutex, Semaphore};
use tokio::sync::{mpsc, Semaphore};
use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::CacheManagerRef;
use crate::error::Result;
use crate::memtable::{MemtableRange, MemtableRef};
use crate::memtable::MemtableRef;
use crate::metrics::READ_SST_COUNT;
use crate::read::compat::{self, CompatBatch};
use crate::read::projection::ProjectionMapper;
use crate::read::range::{RangeMeta, RowGroupIndex};
use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
use crate::read::seq_scan::SeqScan;
use crate::read::unordered_scan::UnorderedScan;
use crate::read::{Batch, Source};
@@ -51,7 +49,6 @@ use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBui
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
use crate::sst::parquet::reader::ReaderMetrics;
/// A scanner scans a region and returns a [SendableRecordBatchStream].
@@ -639,14 +636,14 @@ impl ScanInput {
}
/// Prunes a memtable to scan and returns the builder to build readers.
fn prune_memtable(&self, mem_index: usize) -> MemRangeBuilder {
pub(crate) fn prune_memtable(&self, mem_index: usize) -> MemRangeBuilder {
let memtable = &self.memtables[mem_index];
let row_groups = memtable.ranges(Some(self.mapper.column_ids()), self.predicate.clone());
MemRangeBuilder { row_groups }
MemRangeBuilder::new(row_groups)
}
/// Prunes a file to scan and returns the builder to build readers.
async fn prune_file(
pub(crate) async fn prune_file(
&self,
file_index: usize,
reader_metrics: &mut ReaderMetrics,
@@ -687,10 +684,7 @@ impl ScanInput {
)?;
file_range_ctx.set_compat_batch(Some(compat));
}
Ok(FileRangeBuilder {
context: Some(Arc::new(file_range_ctx)),
row_groups,
})
Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), row_groups))
}
/// Scans the input source in another task and sends batches to the sender.
@@ -759,8 +753,6 @@ pub(crate) struct StreamContext {
pub(crate) input: ScanInput,
/// Metadata for partition ranges.
pub(crate) ranges: Vec<RangeMeta>,
/// Lists of range builders.
range_builders: RangeBuilderList,
// Metrics:
/// The start time of the query.
@@ -773,12 +765,10 @@ impl StreamContext {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let ranges = RangeMeta::seq_scan_ranges(&input);
READ_SST_COUNT.observe(input.num_files() as f64);
let range_builders = RangeBuilderList::new(input.num_memtables(), input.num_files());
Self {
input,
ranges,
range_builders,
query_start,
}
}
@@ -788,12 +778,10 @@ impl StreamContext {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let ranges = RangeMeta::unordered_scan_ranges(&input);
READ_SST_COUNT.observe(input.num_files() as f64);
let range_builders = RangeBuilderList::new(input.num_memtables(), input.num_files());
Self {
input,
ranges,
range_builders,
query_start,
}
}
@@ -803,27 +791,6 @@ impl StreamContext {
self.input.num_memtables() > index.index
}
/// Creates file ranges to scan.
pub(crate) async fn build_file_ranges(
&self,
index: RowGroupIndex,
reader_metrics: &mut ReaderMetrics,
) -> Result<SmallVec<[FileRange; 2]>> {
let mut ranges = SmallVec::new();
self.range_builders
.build_file_ranges(&self.input, index, &mut ranges, reader_metrics)
.await?;
Ok(ranges)
}
/// Creates memtable ranges to scan.
pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
let mut ranges = SmallVec::new();
self.range_builders
.build_mem_ranges(&self.input, index, &mut ranges);
ranges
}
/// Retrieves the partition ranges.
pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
self.ranges
@@ -859,125 +826,3 @@ impl StreamContext {
Ok(())
}
}
/// List to manages the builders to create file ranges.
struct RangeBuilderList {
mem_builders: Vec<StdMutex<Option<MemRangeBuilder>>>,
file_builders: Vec<Mutex<Option<FileRangeBuilder>>>,
}
impl RangeBuilderList {
/// Creates a new [ReaderBuilderList] with the given number of memtables and files.
fn new(num_memtables: usize, num_files: usize) -> Self {
let mem_builders = (0..num_memtables).map(|_| StdMutex::new(None)).collect();
let file_builders = (0..num_files).map(|_| Mutex::new(None)).collect();
Self {
mem_builders,
file_builders,
}
}
/// Builds file ranges to read the row group at `index`.
async fn build_file_ranges(
&self,
input: &ScanInput,
index: RowGroupIndex,
ranges: &mut SmallVec<[FileRange; 2]>,
reader_metrics: &mut ReaderMetrics,
) -> Result<()> {
let file_index = index.index - self.mem_builders.len();
let mut builder_opt = self.file_builders[file_index].lock().await;
match &mut *builder_opt {
Some(builder) => builder.build_ranges(index.row_group_index, ranges),
None => {
let builder = input.prune_file(file_index, reader_metrics).await?;
builder.build_ranges(index.row_group_index, ranges);
*builder_opt = Some(builder);
}
}
Ok(())
}
/// Builds mem ranges to read the row group at `index`.
fn build_mem_ranges(
&self,
input: &ScanInput,
index: RowGroupIndex,
ranges: &mut SmallVec<[MemtableRange; 2]>,
) {
let mut builder_opt = self.mem_builders[index.index].lock().unwrap();
match &mut *builder_opt {
Some(builder) => builder.build_ranges(index.row_group_index, ranges),
None => {
let builder = input.prune_memtable(index.index);
builder.build_ranges(index.row_group_index, ranges);
*builder_opt = Some(builder);
}
}
}
}
/// Builder to create file ranges.
#[derive(Default)]
struct FileRangeBuilder {
/// Context for the file.
/// None indicates nothing to read.
context: Option<FileRangeContextRef>,
/// Row selections for each row group to read.
/// It skips the row group if it is not in the map.
row_groups: BTreeMap<usize, Option<RowSelection>>,
}
impl FileRangeBuilder {
/// Builds file ranges to read.
/// Negative `row_group_index` indicates all row groups.
fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
let Some(context) = self.context.clone() else {
return;
};
if row_group_index >= 0 {
let row_group_index = row_group_index as usize;
// Scans one row group.
let Some(row_selection) = self.row_groups.get(&row_group_index) else {
return;
};
ranges.push(FileRange::new(
context,
row_group_index,
row_selection.clone(),
));
} else {
// Scans all row groups.
ranges.extend(
self.row_groups
.iter()
.map(|(row_group_index, row_selection)| {
FileRange::new(context.clone(), *row_group_index, row_selection.clone())
}),
);
}
}
}
/// Builder to create mem ranges.
struct MemRangeBuilder {
/// Ranges of a memtable.
row_groups: BTreeMap<usize, MemtableRange>,
}
impl MemRangeBuilder {
/// Builds mem ranges to read in the memtable.
/// Negative `row_group_index` indicates all row groups.
fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[MemtableRange; 2]>) {
if row_group_index >= 0 {
let row_group_index = row_group_index as usize;
// Scans one row group.
let Some(range) = self.row_groups.get(&row_group_index) else {
return;
};
ranges.push(range.clone());
} else {
ranges.extend(self.row_groups.values().cloned());
}
}
}

View File

@@ -20,10 +20,12 @@ use std::time::{Duration, Instant};
use async_stream::try_stream;
use common_telemetry::debug;
use futures::Stream;
use prometheus::IntGauge;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::read::range::RowGroupIndex;
use crate::metrics::IN_PROGRESS_SCAN;
use crate::read::range::{RangeBuilderList, RowGroupIndex};
use crate::read::scan_region::StreamContext;
use crate::read::{Batch, ScannerMetrics, Source};
use crate::sst::file::FileTimeRange;
@@ -41,6 +43,7 @@ struct PartitionMetricsInner {
first_poll: Duration,
metrics: ScannerMetrics,
reader_metrics: ReaderMetrics,
in_progress_scan: IntGauge,
}
impl PartitionMetricsInner {
@@ -56,6 +59,7 @@ impl Drop for PartitionMetricsInner {
fn drop(&mut self) {
self.on_finish();
self.metrics.observe_metrics();
self.in_progress_scan.dec();
debug!(
"{} finished, region_id: {}, partition: {}, first_poll: {:?}, metrics: {:?}, reader_metrics: {:?}",
@@ -76,6 +80,8 @@ impl PartitionMetrics {
query_start: Instant,
metrics: ScannerMetrics,
) -> Self {
let partition_str = partition.to_string();
let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
let inner = PartitionMetricsInner {
region_id,
partition,
@@ -84,6 +90,7 @@ impl PartitionMetrics {
first_poll: Duration::default(),
metrics,
reader_metrics: ReaderMetrics::default(),
in_progress_scan,
};
Self(Arc::new(Mutex::new(inner)))
}
@@ -130,9 +137,10 @@ pub(crate) fn scan_mem_ranges(
part_metrics: PartitionMetrics,
index: RowGroupIndex,
time_range: FileTimeRange,
range_builder_list: Arc<RangeBuilderList>,
) -> impl Stream<Item = Result<Batch>> {
try_stream! {
let ranges = stream_ctx.build_mem_ranges(index);
let ranges = range_builder_list.build_mem_ranges(&stream_ctx.input, index);
part_metrics.inc_num_mem_ranges(ranges.len());
for range in ranges {
let build_reader_start = Instant::now();
@@ -153,17 +161,18 @@ pub(crate) fn scan_file_ranges(
part_metrics: PartitionMetrics,
index: RowGroupIndex,
read_type: &'static str,
range_builder: Arc<RangeBuilderList>,
) -> impl Stream<Item = Result<Batch>> {
try_stream! {
let mut reader_metrics = ReaderMetrics::default();
let ranges = stream_ctx
.build_file_ranges(index, &mut reader_metrics)
.await?;
let ranges = range_builder.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics).await?;
part_metrics.inc_num_file_ranges(ranges.len());
for range in ranges {
let build_reader_start = Instant::now();
let reader = range.reader(None).await?;
part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);
let compat_batch = range.compat_batch();
let mut source = Source::PruneReader(reader);
while let Some(mut batch) = source.next_batch().await? {

View File

@@ -36,6 +36,7 @@ use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::last_row::LastRowReader;
use crate::read::merge::MergeReaderBuilder;
use crate::read::range::RangeBuilderList;
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics};
use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source};
@@ -131,12 +132,17 @@ impl SeqScan {
part_metrics: &PartitionMetrics,
) -> Result<BoxedBatchReader> {
let mut sources = Vec::new();
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
));
for part_range in partition_ranges {
build_sources(
stream_ctx,
part_range,
compaction,
part_metrics,
range_builder_list.clone(),
&mut sources,
);
}
@@ -219,10 +225,21 @@ impl SeqScan {
let stream = try_stream! {
part_metrics.on_first_poll();
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
));
// Scans each part.
for part_range in partition_ranges {
let mut sources = Vec::new();
build_sources(&stream_ctx, &part_range, compaction, &part_metrics, &mut sources);
build_sources(
&stream_ctx,
&part_range,
compaction,
&part_metrics,
range_builder_list.clone(),
&mut sources,
);
let mut reader =
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
@@ -353,6 +370,7 @@ fn build_sources(
part_range: &PartitionRange,
compaction: bool,
part_metrics: &PartitionMetrics,
range_builder_list: Arc<RangeBuilderList>,
sources: &mut Vec<Source>,
) {
// Gets range meta.
@@ -365,6 +383,7 @@ fn build_sources(
part_metrics.clone(),
*index,
range_meta.time_range,
range_builder_list.clone(),
);
Box::pin(stream) as _
} else {
@@ -373,8 +392,13 @@ fn build_sources(
} else {
"seq_scan_files"
};
let stream =
scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, read_type);
let stream = scan_file_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
read_type,
range_builder_list.clone(),
);
Box::pin(stream) as _
};
sources.push(Source::Stream(stream));

View File

@@ -30,6 +30,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::range::RangeBuilderList;
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics};
use crate::read::{Batch, ScannerMetrics};
@@ -84,18 +85,31 @@ impl UnorderedScan {
stream_ctx: Arc<StreamContext>,
part_range_id: usize,
part_metrics: PartitionMetrics,
range_builder_list: Arc<RangeBuilderList>,
) -> impl Stream<Item = Result<Batch>> {
stream! {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range_id];
for index in &range_meta.row_group_indices {
if stream_ctx.is_mem_range_index(*index) {
let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index, range_meta.time_range);
let stream = scan_mem_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
range_meta.time_range,
range_builder_list.clone(),
);
for await batch in stream {
yield batch;
}
} else {
let stream = scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files");
let stream = scan_file_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
"unordered_scan_files",
range_builder_list.clone(),
);
for await batch in stream {
yield batch;
}
@@ -136,6 +150,10 @@ impl UnorderedScan {
part_metrics.on_first_poll();
let cache = &stream_ctx.input.cache_manager;
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
));
// Scans each part.
for part_range in part_ranges {
let mut metrics = ScannerMetrics::default();
@@ -149,6 +167,7 @@ impl UnorderedScan {
stream_ctx.clone(),
part_range.identifier,
part_metrics.clone(),
range_builder_list.clone(),
);
for await batch in stream {
let batch = batch.map_err(BoxedError::new).context(ExternalSnafu)?;