diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index c9500b51f6..5d1870bd73 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -24,6 +24,7 @@ pub(crate) mod range; pub(crate) mod scan_region; pub(crate) mod scan_util; pub(crate) mod seq_scan; +pub(crate) mod series_scan; pub(crate) mod unordered_scan; use std::collections::{HashMap, HashSet}; diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs new file mode 100644 index 0000000000..1ecd006ce2 --- /dev/null +++ b/src/mito2/src/read/series_scan.rs @@ -0,0 +1,113 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Per-series scan implementation. + +use std::fmt; +use std::sync::Arc; + +use common_error::ext::BoxedError; +use common_recordbatch::SendableRecordBatchStream; +use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; +use datatypes::schema::SchemaRef; +use store_api::metadata::RegionMetadataRef; +use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties}; + +use crate::error::PartitionOutOfRangeSnafu; +use crate::read::scan_region::StreamContext; + +/// Scans a region and returns sorted rows of a series in the same partition. +/// +/// The output order is always order by `(primary key, time index)` inside every +/// partition. +/// Always returns the same series (primary key) to the same partition. +pub struct SeriesScan { + /// Properties of the scanner. + properties: ScannerProperties, + /// Context of streams. + stream_ctx: Arc, +} + +impl SeriesScan { + fn scan_partition_impl( + &self, + partition: usize, + ) -> Result { + if partition >= self.properties.partitions.len() { + return Err(BoxedError::new( + PartitionOutOfRangeSnafu { + given: partition, + all: self.properties.partitions.len(), + } + .build(), + )); + } + + todo!() + } +} + +impl RegionScanner for SeriesScan { + fn properties(&self) -> &ScannerProperties { + &self.properties + } + + fn schema(&self) -> SchemaRef { + self.stream_ctx.input.mapper.output_schema() + } + + fn metadata(&self) -> RegionMetadataRef { + self.stream_ctx.input.mapper.metadata().clone() + } + + fn scan_partition(&self, partition: usize) -> Result { + self.scan_partition_impl(partition) + } + + fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { + self.properties.prepare(request); + Ok(()) + } + + fn has_predicate(&self) -> bool { + let predicate = self.stream_ctx.input.predicate(); + predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false) + } + + fn set_logical_region(&mut self, logical_region: bool) { + self.properties.set_logical_region(logical_region); + } +} + +impl DisplayAs for SeriesScan { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "SeriesScan: region={}, ", + self.stream_ctx.input.mapper.metadata().region_id + )?; + match t { + DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f), + DisplayFormatType::Verbose => self.stream_ctx.format_for_explain(true, f), + } + } +} + +impl fmt::Debug for SeriesScan { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SeriesScan") + .field("num_ranges", &self.stream_ctx.ranges.len()) + .finish() + } +}