feat: implement scan logic of each partition

This commit is contained in:
evenyag
2025-03-28 12:16:00 +08:00
parent 4a79c1527d
commit b74e2a7d9b
5 changed files with 369 additions and 20 deletions

View File

@@ -710,8 +710,8 @@ pub enum Error {
error: std::io::Error,
},
#[snafu(display("Failed to filter record batch"))]
FilterRecordBatch {
#[snafu(display("Record batch error"))]
RecordBatch {
source: common_recordbatch::error::Error,
#[snafu(implicit)]
location: Location,
@@ -1021,6 +1021,20 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to scan series"))]
ScanSeries {
#[snafu(implicit)]
location: Location,
source: Arc<Error>,
},
#[snafu(display("Partition {} scan multiple times", partition))]
ScanMultiTimes {
partition: usize,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1143,7 +1157,7 @@ impl ErrorExt for Error {
External { source, .. } => source.status_code(),
FilterRecordBatch { source, .. } => source.status_code(),
RecordBatch { source, .. } => source.status_code(),
Download { .. } | Upload { .. } => StatusCode::StorageUnavailable,
ChecksumMismatch { .. } => StatusCode::Unexpected,
@@ -1172,6 +1186,10 @@ impl ErrorExt for Error {
ManualCompactionOverride {} => StatusCode::Cancelled,
IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,
ScanSeries { source, .. } => source.status_code(),
ScanMultiTimes { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -21,7 +21,7 @@ use datatypes::arrow::array::BooleanArray;
use datatypes::arrow::buffer::BooleanBuffer;
use snafu::ResultExt;
use crate::error::{FilterRecordBatchSnafu, Result};
use crate::error::{RecordBatchSnafu, Result};
use crate::memtable::BoxedBatchIterator;
use crate::read::last_row::RowGroupLastRowCachedReader;
use crate::read::{Batch, BatchReader};
@@ -201,7 +201,7 @@ impl PruneTimeIterator {
for filter in filters.iter() {
let result = filter
.evaluate_vector(batch.timestamps())
.context(FilterRecordBatchSnafu)?;
.context(RecordBatchSnafu)?;
mask = mask.bitand(&result);
}

View File

@@ -149,7 +149,7 @@ impl SeqScan {
/// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
/// if possible.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
async fn build_reader_from_sources(
pub(crate) async fn build_reader_from_sources(
stream_ctx: &StreamContext,
mut sources: Vec<Source>,
semaphore: Option<Arc<Semaphore>>,
@@ -498,7 +498,7 @@ impl fmt::Debug for SeqScan {
}
/// Builds sources for the partition range and push them to the `sources` vector.
fn build_sources(
pub(crate) fn build_sources(
stream_ctx: &Arc<StreamContext>,
part_range: &PartitionRange,
compaction: bool,

View File

@@ -15,19 +15,39 @@
//! Per-series scan implementation.
use std::fmt;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use async_stream::stream;
use async_stream::{stream, try_stream};
use common_error::ext::BoxedError;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::compute::concat_batches;
use datatypes::schema::SchemaRef;
use futures::StreamExt;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties};
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::Semaphore;
use crate::error::PartitionOutOfRangeSnafu;
use crate::error::{
ComputeArrowSnafu, Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result,
ScanMultiTimesSnafu, ScanSeriesSnafu,
};
use crate::read::range::RangeBuilderList;
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::PartitionMetrics;
use crate::read::seq_scan::{build_sources, SeqScan};
use crate::read::{Batch, ScannerMetrics};
/// Timeout to send a batch to a sender.
const SEND_TIMEOUT: Duration = Duration::from_millis(100);
/// List of receivers.
type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
/// Scans a region and returns sorted rows of a series in the same partition.
///
@@ -39,29 +59,123 @@ pub struct SeriesScan {
properties: ScannerProperties,
/// Context of streams.
stream_ctx: Arc<StreamContext>,
/// Receivers of each partition.
receivers: Mutex<ReceiverList>,
}
impl SeriesScan {
/// Creates a new [SeriesScan].
pub(crate) fn new(input: ScanInput) -> Self {
todo!()
let mut properties = ScannerProperties::default()
.with_append_mode(input.append_mode)
.with_total_rows(input.total_rows());
let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, false));
properties.partitions = vec![stream_ctx.partition_ranges()];
Self {
properties,
stream_ctx,
receivers: Mutex::new(Vec::new()),
}
}
fn scan_partition_impl(
&self,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
if partition >= self.properties.partitions.len() {
if partition >= self.properties.num_partitions() {
return Err(BoxedError::new(
PartitionOutOfRangeSnafu {
given: partition,
all: self.properties.partitions.len(),
all: self.properties.num_partitions(),
}
.build(),
));
}
todo!()
self.maybe_start_distributor();
let part_metrics = new_partition_metrics(&self.stream_ctx, partition);
let mut receiver = self.take_receiver(partition).map_err(BoxedError::new)?;
let stream_ctx = self.stream_ctx.clone();
let stream = try_stream! {
part_metrics.on_first_poll();
let cache = &stream_ctx.input.cache_strategy;
let mut df_record_batches = Vec::new();
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
while let Some(result) = receiver.recv().await {
let series = result.map_err(BoxedError::new).context(ExternalSnafu)?;
let convert_start = Instant::now();
df_record_batches.reserve(series.batches.len());
for batch in series.batches {
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
df_record_batches.push(record_batch.into_df_record_batch());
}
let output_schema = stream_ctx.input.mapper.output_schema();
let df_record_batch =
concat_batches(output_schema.arrow_schema(), &df_record_batches)
.context(ComputeArrowSnafu)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
df_record_batches.clear();
let record_batch =
RecordBatch::try_from_df_record_batch(output_schema, df_record_batch)?;
metrics.convert_cost += convert_start.elapsed();
let yield_start = Instant::now();
yield record_batch;
metrics.yield_cost += yield_start.elapsed();
metrics.scan_cost += fetch_start.elapsed();
part_metrics.merge_metrics(&metrics);
fetch_start = Instant::now();
}
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
self.stream_ctx.input.mapper.output_schema(),
Box::pin(stream),
));
Ok(stream)
}
/// Takes the receiver for the partition.
fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
let mut rx_list = self.receivers.lock().unwrap();
rx_list[partition]
.take()
.context(ScanMultiTimesSnafu { partition })
}
/// Starts the distributor if the receiver list is empty.
fn maybe_start_distributor(&self) {
let mut rx_list = self.receivers.lock().unwrap();
if !rx_list.is_empty() {
return;
}
let (senders, receivers) = new_channel_list(self.properties.num_partitions());
let mut distributor = SeriesDistributor {
stream_ctx: self.stream_ctx.clone(),
semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
partitions: self.properties.partitions.clone(),
senders,
};
common_runtime::spawn_global(async move {
distributor.execute().await;
});
*rx_list = receivers;
}
// TODO(yingwen): Reuse codes.
@@ -86,6 +200,16 @@ impl SeriesScan {
}
}
fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
.map(|_| {
let (sender, receiver) = mpsc::channel(1);
(Some(sender), Some(receiver))
})
.unzip();
(SenderList::new(senders), receivers)
}
impl RegionScanner for SeriesScan {
fn properties(&self) -> &ScannerProperties {
&self.properties
@@ -147,3 +271,210 @@ impl SeriesScan {
&self.stream_ctx.input
}
}
/// The distributor scans series and distributes them to different partitions.
struct SeriesDistributor {
/// Context for the scan stream.
stream_ctx: Arc<StreamContext>,
/// Optional semaphore for limiting the number of concurrent scans.
semaphore: Option<Arc<Semaphore>>,
/// Partition ranges to scan.
partitions: Vec<Vec<PartitionRange>>,
/// Senders of all partitions.
senders: SenderList,
}
impl SeriesDistributor {
/// Executes the distributor.
async fn execute(&mut self) {
if let Err(e) = self.scan_partitions().await {
self.senders.send_error(e).await;
}
}
/// Scans all parts.
async fn scan_partitions(&mut self) -> Result<()> {
let part_metrics = new_partition_metrics(&self.stream_ctx, self.partitions.len());
part_metrics.on_first_poll();
let range_builder_list = Arc::new(RangeBuilderList::new(
self.stream_ctx.input.num_memtables(),
self.stream_ctx.input.num_files(),
));
// Scans all parts.
let mut sources = Vec::with_capacity(self.partitions.len());
for partition in &self.partitions {
sources.reserve(partition.len());
for part_range in partition {
build_sources(
&self.stream_ctx,
&part_range,
false,
&part_metrics,
range_builder_list.clone(),
&mut sources,
);
}
}
// Builds a reader that merge sources from all parts.
let mut reader =
SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone())
.await?;
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
let mut current_series = SeriesBatch::default();
while let Some(batch) = reader.next_batch().await? {
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
debug_assert!(!batch.is_empty());
if batch.is_empty() {
continue;
}
let Some(last_key) = current_series.current_key() else {
current_series.push(batch);
continue;
};
if last_key == batch.primary_key() {
current_series.push(batch);
continue;
}
// We find a new series, send the current one.
let to_send = std::mem::replace(&mut current_series, SeriesBatch::single(batch));
let yield_start = Instant::now();
self.senders.send_batch(to_send).await?;
metrics.yield_cost += yield_start.elapsed();
fetch_start = Instant::now();
}
// todo: if not empty
if !current_series.is_empty() {
let yield_start = Instant::now();
self.senders.send_batch(current_series).await?;
metrics.yield_cost += yield_start.elapsed();
}
metrics.scan_cost += fetch_start.elapsed();
part_metrics.merge_metrics(&metrics);
part_metrics.on_finish();
Ok(())
}
}
/// Batches of the same series.
#[derive(Default)]
struct SeriesBatch {
// FIXME: Use smallvec
batches: Vec<Batch>,
}
impl SeriesBatch {
/// Creates a new [SeriesBatch] from a single [Batch].
fn single(batch: Batch) -> Self {
Self {
batches: vec![batch],
}
}
fn current_key(&self) -> Option<&[u8]> {
self.batches.first().map(|batch| batch.primary_key())
}
fn push(&mut self, batch: Batch) {
self.batches.push(batch);
}
/// Returns true if there is no batch.
fn is_empty(&self) -> bool {
self.batches.is_empty()
}
}
/// List of senders.
struct SenderList {
senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
/// Number of None senders.
num_nones: usize,
/// Index of the current partition to send.
sender_idx: usize,
}
impl SenderList {
fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
Self {
senders,
num_nones,
sender_idx: 0,
}
}
/// Finds a partition and sends the batch to the partition.
async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
loop {
ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
let sender_idx = self.fetch_add_sender_idx();
let Some(sender) = &self.senders[sender_idx] else {
continue;
};
// Adds a timeout to avoid blocking indefinitely and sending
// the batch in a round-robin fashion when some partitions
// don't poll their inputs. This may happen if we have a
// node like sort merging. But it is rare when we are using SeriesScan.
match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
Ok(()) => break,
Err(SendTimeoutError::Timeout(res)) => {
// Safety: we send Ok.
batch = res.unwrap();
}
Err(SendTimeoutError::Closed(res)) => {
self.senders[sender_idx] = None;
self.num_nones += 1;
// Safety: we send Ok.
batch = res.unwrap();
}
}
}
Ok(())
}
async fn send_error(&self, error: Error) {
let error = Arc::new(error);
for sender in &self.senders {
if let Some(sender) = sender {
let result = Err(error.clone()).context(ScanSeriesSnafu);
let _ = sender.send(result).await;
}
}
}
fn fetch_add_sender_idx(&mut self) -> usize {
let sender_idx = self.sender_idx;
self.sender_idx = (self.sender_idx + 1) % self.senders.len();
sender_idx
}
}
fn new_partition_metrics(stream_ctx: &StreamContext, partition: usize) -> PartitionMetrics {
PartitionMetrics::new(
stream_ctx.input.mapper.metadata().region_id,
partition,
"SeriesScan",
stream_ctx.query_start,
ScannerMetrics {
prepare_scan_cost: stream_ctx.query_start.elapsed(),
..Default::default()
},
)
}

View File

@@ -27,7 +27,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::TimeSeriesRowSelector;
use crate::error::{
DecodeStatsSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result, StatsNotPresentSnafu,
DecodeStatsSnafu, FieldTypeMismatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu,
};
use crate::read::compat::CompatBatch;
use crate::read::last_row::RowGroupLastRowCachedReader;
@@ -286,7 +286,7 @@ impl RangeBase {
if filter
.filter()
.evaluate_scalar(&pk_value)
.context(FilterRecordBatchSnafu)?
.context(RecordBatchSnafu)?
{
continue;
} else {
@@ -303,12 +303,12 @@ impl RangeBase {
filter
.filter()
.evaluate_vector(field_col)
.context(FilterRecordBatchSnafu)?
.context(RecordBatchSnafu)?
}
SemanticType::Timestamp => filter
.filter()
.evaluate_vector(input.timestamps())
.context(FilterRecordBatchSnafu)?,
.context(RecordBatchSnafu)?,
};
mask = mask.bitand(&result);