diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 9c9c78f07e..913f874f51 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -710,8 +710,8 @@ pub enum Error { error: std::io::Error, }, - #[snafu(display("Failed to filter record batch"))] - FilterRecordBatch { + #[snafu(display("Record batch error"))] + RecordBatch { source: common_recordbatch::error::Error, #[snafu(implicit)] location: Location, @@ -1021,6 +1021,20 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to scan series"))] + ScanSeries { + #[snafu(implicit)] + location: Location, + source: Arc, + }, + + #[snafu(display("Partition {} scan multiple times", partition))] + ScanMultiTimes { + partition: usize, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -1143,7 +1157,7 @@ impl ErrorExt for Error { External { source, .. } => source.status_code(), - FilterRecordBatch { source, .. } => source.status_code(), + RecordBatch { source, .. } => source.status_code(), Download { .. } | Upload { .. } => StatusCode::StorageUnavailable, ChecksumMismatch { .. } => StatusCode::Unexpected, @@ -1172,6 +1186,10 @@ impl ErrorExt for Error { ManualCompactionOverride {} => StatusCode::Cancelled, IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments, + + ScanSeries { source, .. } => source.status_code(), + + ScanMultiTimes { .. } => StatusCode::InvalidArguments, } } diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 25f0a385d3..80373afc11 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -21,7 +21,7 @@ use datatypes::arrow::array::BooleanArray; use datatypes::arrow::buffer::BooleanBuffer; use snafu::ResultExt; -use crate::error::{FilterRecordBatchSnafu, Result}; +use crate::error::{RecordBatchSnafu, Result}; use crate::memtable::BoxedBatchIterator; use crate::read::last_row::RowGroupLastRowCachedReader; use crate::read::{Batch, BatchReader}; @@ -201,7 +201,7 @@ impl PruneTimeIterator { for filter in filters.iter() { let result = filter .evaluate_vector(batch.timestamps()) - .context(FilterRecordBatchSnafu)?; + .context(RecordBatchSnafu)?; mask = mask.bitand(&result); } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index ee56fd9d40..98929a7ba6 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -149,7 +149,7 @@ impl SeqScan { /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel /// if possible. #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] - async fn build_reader_from_sources( + pub(crate) async fn build_reader_from_sources( stream_ctx: &StreamContext, mut sources: Vec, semaphore: Option>, @@ -498,7 +498,7 @@ impl fmt::Debug for SeqScan { } /// Builds sources for the partition range and push them to the `sources` vector. -fn build_sources( +pub(crate) fn build_sources( stream_ctx: &Arc, part_range: &PartitionRange, compaction: bool, diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index d481a62a87..98bb16d2eb 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -15,19 +15,39 @@ //! Per-series scan implementation. use std::fmt; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; -use async_stream::stream; +use async_stream::{stream, try_stream}; use common_error::ext::BoxedError; -use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; +use common_recordbatch::error::ExternalSnafu; +use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream}; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; +use datatypes::compute::concat_batches; use datatypes::schema::SchemaRef; use futures::StreamExt; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties}; +use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties}; +use tokio::sync::mpsc::error::SendTimeoutError; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::sync::Semaphore; -use crate::error::PartitionOutOfRangeSnafu; +use crate::error::{ + ComputeArrowSnafu, Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, + ScanMultiTimesSnafu, ScanSeriesSnafu, +}; +use crate::read::range::RangeBuilderList; use crate::read::scan_region::{ScanInput, StreamContext}; +use crate::read::scan_util::PartitionMetrics; +use crate::read::seq_scan::{build_sources, SeqScan}; +use crate::read::{Batch, ScannerMetrics}; + +/// Timeout to send a batch to a sender. +const SEND_TIMEOUT: Duration = Duration::from_millis(100); + +/// List of receivers. +type ReceiverList = Vec>>>; /// Scans a region and returns sorted rows of a series in the same partition. /// @@ -39,29 +59,123 @@ pub struct SeriesScan { properties: ScannerProperties, /// Context of streams. stream_ctx: Arc, + /// Receivers of each partition. + receivers: Mutex, } impl SeriesScan { /// Creates a new [SeriesScan]. pub(crate) fn new(input: ScanInput) -> Self { - todo!() + let mut properties = ScannerProperties::default() + .with_append_mode(input.append_mode) + .with_total_rows(input.total_rows()); + let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, false)); + properties.partitions = vec![stream_ctx.partition_ranges()]; + + Self { + properties, + stream_ctx, + receivers: Mutex::new(Vec::new()), + } } fn scan_partition_impl( &self, partition: usize, ) -> Result { - if partition >= self.properties.partitions.len() { + if partition >= self.properties.num_partitions() { return Err(BoxedError::new( PartitionOutOfRangeSnafu { given: partition, - all: self.properties.partitions.len(), + all: self.properties.num_partitions(), } .build(), )); } - todo!() + self.maybe_start_distributor(); + + let part_metrics = new_partition_metrics(&self.stream_ctx, partition); + let mut receiver = self.take_receiver(partition).map_err(BoxedError::new)?; + let stream_ctx = self.stream_ctx.clone(); + + let stream = try_stream! { + part_metrics.on_first_poll(); + + let cache = &stream_ctx.input.cache_strategy; + let mut df_record_batches = Vec::new(); + let mut metrics = ScannerMetrics::default(); + let mut fetch_start = Instant::now(); + while let Some(result) = receiver.recv().await { + let series = result.map_err(BoxedError::new).context(ExternalSnafu)?; + + let convert_start = Instant::now(); + df_record_batches.reserve(series.batches.len()); + for batch in series.batches { + metrics.scan_cost += fetch_start.elapsed(); + metrics.num_batches += 1; + metrics.num_rows += batch.num_rows(); + + let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?; + df_record_batches.push(record_batch.into_df_record_batch()); + } + + let output_schema = stream_ctx.input.mapper.output_schema(); + let df_record_batch = + concat_batches(output_schema.arrow_schema(), &df_record_batches) + .context(ComputeArrowSnafu) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + df_record_batches.clear(); + let record_batch = + RecordBatch::try_from_df_record_batch(output_schema, df_record_batch)?; + metrics.convert_cost += convert_start.elapsed(); + + let yield_start = Instant::now(); + yield record_batch; + metrics.yield_cost += yield_start.elapsed(); + + metrics.scan_cost += fetch_start.elapsed(); + part_metrics.merge_metrics(&metrics); + fetch_start = Instant::now(); + } + }; + + let stream = Box::pin(RecordBatchStreamWrapper::new( + self.stream_ctx.input.mapper.output_schema(), + Box::pin(stream), + )); + + Ok(stream) + } + + /// Takes the receiver for the partition. + fn take_receiver(&self, partition: usize) -> Result>> { + let mut rx_list = self.receivers.lock().unwrap(); + rx_list[partition] + .take() + .context(ScanMultiTimesSnafu { partition }) + } + + /// Starts the distributor if the receiver list is empty. + fn maybe_start_distributor(&self) { + let mut rx_list = self.receivers.lock().unwrap(); + if !rx_list.is_empty() { + return; + } + + let (senders, receivers) = new_channel_list(self.properties.num_partitions()); + let mut distributor = SeriesDistributor { + stream_ctx: self.stream_ctx.clone(), + semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))), + partitions: self.properties.partitions.clone(), + senders, + }; + common_runtime::spawn_global(async move { + distributor.execute().await; + }); + + *rx_list = receivers; } // TODO(yingwen): Reuse codes. @@ -86,6 +200,16 @@ impl SeriesScan { } } +fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) { + let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions) + .map(|_| { + let (sender, receiver) = mpsc::channel(1); + (Some(sender), Some(receiver)) + }) + .unzip(); + (SenderList::new(senders), receivers) +} + impl RegionScanner for SeriesScan { fn properties(&self) -> &ScannerProperties { &self.properties @@ -147,3 +271,210 @@ impl SeriesScan { &self.stream_ctx.input } } + +/// The distributor scans series and distributes them to different partitions. +struct SeriesDistributor { + /// Context for the scan stream. + stream_ctx: Arc, + /// Optional semaphore for limiting the number of concurrent scans. + semaphore: Option>, + /// Partition ranges to scan. + partitions: Vec>, + /// Senders of all partitions. + senders: SenderList, +} + +impl SeriesDistributor { + /// Executes the distributor. + async fn execute(&mut self) { + if let Err(e) = self.scan_partitions().await { + self.senders.send_error(e).await; + } + } + + /// Scans all parts. + async fn scan_partitions(&mut self) -> Result<()> { + let part_metrics = new_partition_metrics(&self.stream_ctx, self.partitions.len()); + part_metrics.on_first_poll(); + + let range_builder_list = Arc::new(RangeBuilderList::new( + self.stream_ctx.input.num_memtables(), + self.stream_ctx.input.num_files(), + )); + // Scans all parts. + let mut sources = Vec::with_capacity(self.partitions.len()); + for partition in &self.partitions { + sources.reserve(partition.len()); + for part_range in partition { + build_sources( + &self.stream_ctx, + &part_range, + false, + &part_metrics, + range_builder_list.clone(), + &mut sources, + ); + } + } + + // Builds a reader that merge sources from all parts. + let mut reader = + SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone()) + .await?; + let mut metrics = ScannerMetrics::default(); + let mut fetch_start = Instant::now(); + + let mut current_series = SeriesBatch::default(); + while let Some(batch) = reader.next_batch().await? { + metrics.scan_cost += fetch_start.elapsed(); + metrics.num_batches += 1; + metrics.num_rows += batch.num_rows(); + + debug_assert!(!batch.is_empty()); + if batch.is_empty() { + continue; + } + + let Some(last_key) = current_series.current_key() else { + current_series.push(batch); + continue; + }; + + if last_key == batch.primary_key() { + current_series.push(batch); + continue; + } + + // We find a new series, send the current one. + let to_send = std::mem::replace(&mut current_series, SeriesBatch::single(batch)); + let yield_start = Instant::now(); + self.senders.send_batch(to_send).await?; + metrics.yield_cost += yield_start.elapsed(); + + fetch_start = Instant::now(); + } + + // todo: if not empty + if !current_series.is_empty() { + let yield_start = Instant::now(); + self.senders.send_batch(current_series).await?; + metrics.yield_cost += yield_start.elapsed(); + } + + metrics.scan_cost += fetch_start.elapsed(); + part_metrics.merge_metrics(&metrics); + + part_metrics.on_finish(); + + Ok(()) + } +} + +/// Batches of the same series. +#[derive(Default)] +struct SeriesBatch { + // FIXME: Use smallvec + batches: Vec, +} + +impl SeriesBatch { + /// Creates a new [SeriesBatch] from a single [Batch]. + fn single(batch: Batch) -> Self { + Self { + batches: vec![batch], + } + } + + fn current_key(&self) -> Option<&[u8]> { + self.batches.first().map(|batch| batch.primary_key()) + } + + fn push(&mut self, batch: Batch) { + self.batches.push(batch); + } + + /// Returns true if there is no batch. + fn is_empty(&self) -> bool { + self.batches.is_empty() + } +} + +/// List of senders. +struct SenderList { + senders: Vec>>>, + /// Number of None senders. + num_nones: usize, + /// Index of the current partition to send. + sender_idx: usize, +} + +impl SenderList { + fn new(senders: Vec>>>) -> Self { + let num_nones = senders.iter().filter(|sender| sender.is_none()).count(); + Self { + senders, + num_nones, + sender_idx: 0, + } + } + + /// Finds a partition and sends the batch to the partition. + async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> { + loop { + ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu); + + let sender_idx = self.fetch_add_sender_idx(); + let Some(sender) = &self.senders[sender_idx] else { + continue; + }; + // Adds a timeout to avoid blocking indefinitely and sending + // the batch in a round-robin fashion when some partitions + // don't poll their inputs. This may happen if we have a + // node like sort merging. But it is rare when we are using SeriesScan. + match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await { + Ok(()) => break, + Err(SendTimeoutError::Timeout(res)) => { + // Safety: we send Ok. + batch = res.unwrap(); + } + Err(SendTimeoutError::Closed(res)) => { + self.senders[sender_idx] = None; + self.num_nones += 1; + // Safety: we send Ok. + batch = res.unwrap(); + } + } + } + + Ok(()) + } + + async fn send_error(&self, error: Error) { + let error = Arc::new(error); + for sender in &self.senders { + if let Some(sender) = sender { + let result = Err(error.clone()).context(ScanSeriesSnafu); + let _ = sender.send(result).await; + } + } + } + + fn fetch_add_sender_idx(&mut self) -> usize { + let sender_idx = self.sender_idx; + self.sender_idx = (self.sender_idx + 1) % self.senders.len(); + sender_idx + } +} + +fn new_partition_metrics(stream_ctx: &StreamContext, partition: usize) -> PartitionMetrics { + PartitionMetrics::new( + stream_ctx.input.mapper.metadata().region_id, + partition, + "SeriesScan", + stream_ctx.query_start, + ScannerMetrics { + prepare_scan_cost: stream_ctx.query_start.elapsed(), + ..Default::default() + }, + ) +} diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index e8241a453f..85a3d6ca0e 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -27,7 +27,7 @@ use snafu::{OptionExt, ResultExt}; use store_api::storage::TimeSeriesRowSelector; use crate::error::{ - DecodeStatsSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result, StatsNotPresentSnafu, + DecodeStatsSnafu, FieldTypeMismatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu, }; use crate::read::compat::CompatBatch; use crate::read::last_row::RowGroupLastRowCachedReader; @@ -286,7 +286,7 @@ impl RangeBase { if filter .filter() .evaluate_scalar(&pk_value) - .context(FilterRecordBatchSnafu)? + .context(RecordBatchSnafu)? { continue; } else { @@ -303,12 +303,12 @@ impl RangeBase { filter .filter() .evaluate_vector(field_col) - .context(FilterRecordBatchSnafu)? + .context(RecordBatchSnafu)? } SemanticType::Timestamp => filter .filter() .evaluate_vector(input.timestamps()) - .context(FilterRecordBatchSnafu)?, + .context(RecordBatchSnafu)?, }; mask = mask.bitand(&result);