mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 15:22:56 +00:00
chore: basic methods for SeriesScan
This commit is contained in:
@@ -24,6 +24,7 @@ pub(crate) mod range;
|
||||
pub(crate) mod scan_region;
|
||||
pub(crate) mod scan_util;
|
||||
pub(crate) mod seq_scan;
|
||||
pub(crate) mod series_scan;
|
||||
pub(crate) mod unordered_scan;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
113
src/mito2/src/read/series_scan.rs
Normal file
113
src/mito2/src/read/series_scan.rs
Normal file
@@ -0,0 +1,113 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Per-series scan implementation.
|
||||
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
|
||||
use datatypes::schema::SchemaRef;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties};
|
||||
|
||||
use crate::error::PartitionOutOfRangeSnafu;
|
||||
use crate::read::scan_region::StreamContext;
|
||||
|
||||
/// Scans a region and returns sorted rows of a series in the same partition.
|
||||
///
|
||||
/// The output order is always order by `(primary key, time index)` inside every
|
||||
/// partition.
|
||||
/// Always returns the same series (primary key) to the same partition.
|
||||
pub struct SeriesScan {
|
||||
/// Properties of the scanner.
|
||||
properties: ScannerProperties,
|
||||
/// Context of streams.
|
||||
stream_ctx: Arc<StreamContext>,
|
||||
}
|
||||
|
||||
impl SeriesScan {
|
||||
fn scan_partition_impl(
|
||||
&self,
|
||||
partition: usize,
|
||||
) -> Result<SendableRecordBatchStream, BoxedError> {
|
||||
if partition >= self.properties.partitions.len() {
|
||||
return Err(BoxedError::new(
|
||||
PartitionOutOfRangeSnafu {
|
||||
given: partition,
|
||||
all: self.properties.partitions.len(),
|
||||
}
|
||||
.build(),
|
||||
));
|
||||
}
|
||||
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl RegionScanner for SeriesScan {
|
||||
fn properties(&self) -> &ScannerProperties {
|
||||
&self.properties
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.stream_ctx.input.mapper.output_schema()
|
||||
}
|
||||
|
||||
fn metadata(&self) -> RegionMetadataRef {
|
||||
self.stream_ctx.input.mapper.metadata().clone()
|
||||
}
|
||||
|
||||
fn scan_partition(&self, partition: usize) -> Result<SendableRecordBatchStream, BoxedError> {
|
||||
self.scan_partition_impl(partition)
|
||||
}
|
||||
|
||||
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
|
||||
self.properties.prepare(request);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn has_predicate(&self) -> bool {
|
||||
let predicate = self.stream_ctx.input.predicate();
|
||||
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
|
||||
}
|
||||
|
||||
fn set_logical_region(&mut self, logical_region: bool) {
|
||||
self.properties.set_logical_region(logical_region);
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for SeriesScan {
|
||||
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"SeriesScan: region={}, ",
|
||||
self.stream_ctx.input.mapper.metadata().region_id
|
||||
)?;
|
||||
match t {
|
||||
DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
|
||||
DisplayFormatType::Verbose => self.stream_ctx.format_for_explain(true, f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for SeriesScan {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("SeriesScan")
|
||||
.field("num_ranges", &self.stream_ctx.ranges.len())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user