feat: extension range definition (#6386)

* feat: defined extension range

Signed-off-by: luofucong <luofc@foxmail.com>

* remove feature parameters

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-06-30 10:42:40 +08:00
committed by GitHub
parent 616e76941a
commit a203909de3
17 changed files with 497 additions and 141 deletions

View File

@@ -34,6 +34,7 @@ excludes = [
"src/sql/src/statements/drop/trigger.rs",
"src/sql/src/parsers/create_parser/trigger.rs",
"src/sql/src/parsers/show_parser/trigger.rs",
"src/mito2/src/extension.rs",
]
[properties]

View File

@@ -40,7 +40,7 @@ use log_store::raft_engine::log_store::RaftEngineLogStore;
use meta_client::MetaClientRef;
use metric_engine::engine::MetricEngine;
use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use mito2::engine::{MitoEngine, MitoEngineBuilder};
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::normalize_dir;
use query::dummy_catalog::TableProviderFactoryRef;
@@ -162,6 +162,8 @@ pub struct DatanodeBuilder {
meta_client: Option<MetaClientRef>,
kv_backend: KvBackendRef,
cache_registry: Option<Arc<LayeredCacheRegistry>>,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
}
impl DatanodeBuilder {
@@ -173,6 +175,8 @@ impl DatanodeBuilder {
meta_client: None,
kv_backend,
cache_registry: None,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: None,
}
}
@@ -199,6 +203,15 @@ impl DatanodeBuilder {
self
}
#[cfg(feature = "enterprise")]
pub fn with_extension_range_provider(
&mut self,
extension_range_provider_factory: mito2::extension::BoxedExtensionRangeProviderFactory,
) -> &mut Self {
self.extension_range_provider_factory = Some(extension_range_provider_factory);
self
}
pub async fn build(mut self) -> Result<Datanode> {
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
@@ -340,7 +353,7 @@ impl DatanodeBuilder {
}
async fn new_region_server(
&self,
&mut self,
schema_metadata_manager: SchemaMetadataManagerRef,
event_listener: RegionServerEventListenerRef,
) -> Result<RegionServer> {
@@ -376,13 +389,13 @@ impl DatanodeBuilder {
);
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
let engines = Self::build_store_engines(
opts,
object_store_manager,
schema_metadata_manager,
self.plugins.clone(),
)
.await?;
let engines = self
.build_store_engines(
object_store_manager,
schema_metadata_manager,
self.plugins.clone(),
)
.await?;
for engine in engines {
region_server.register_engine(engine);
}
@@ -394,7 +407,7 @@ impl DatanodeBuilder {
/// Builds [RegionEngineRef] from `store_engine` section in `opts`
async fn build_store_engines(
opts: &DatanodeOptions,
&mut self,
object_store_manager: ObjectStoreManagerRef,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
@@ -403,7 +416,7 @@ impl DatanodeBuilder {
let mut mito_engine_config = MitoConfig::default();
let mut file_engine_config = file_engine::config::EngineConfig::default();
for engine in &opts.region_engine {
for engine in &self.opts.region_engine {
match engine {
RegionEngineConfig::Mito(config) => {
mito_engine_config = config.clone();
@@ -417,14 +430,14 @@ impl DatanodeBuilder {
}
}
let mito_engine = Self::build_mito_engine(
opts,
object_store_manager.clone(),
mito_engine_config,
schema_metadata_manager.clone(),
plugins.clone(),
)
.await?;
let mito_engine = self
.build_mito_engine(
object_store_manager.clone(),
mito_engine_config,
schema_metadata_manager.clone(),
plugins.clone(),
)
.await?;
let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
.context(BuildMetricEngineSnafu)?;
@@ -443,12 +456,13 @@ impl DatanodeBuilder {
/// Builds [MitoEngine] according to options.
async fn build_mito_engine(
opts: &DatanodeOptions,
&mut self,
object_store_manager: ObjectStoreManagerRef,
mut config: MitoConfig,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Result<MitoEngine> {
let opts = &self.opts;
if opts.storage.is_object_storage() {
// Enable the write cache when setting object storage
config.enable_write_cache = true;
@@ -456,17 +470,27 @@ impl DatanodeBuilder {
}
let mito_engine = match &opts.wal {
DatanodeWalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
.await?,
object_store_manager,
schema_metadata_manager,
plugins,
)
.await
.context(BuildMitoEngineSnafu)?,
DatanodeWalConfig::RaftEngine(raft_engine_config) => {
let log_store =
Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
.await?;
let builder = MitoEngineBuilder::new(
&opts.storage.data_home,
config,
log_store,
object_store_manager,
schema_metadata_manager,
plugins,
);
#[cfg(feature = "enterprise")]
let builder = builder.with_extension_range_provider_factory(
self.extension_range_provider_factory.take(),
);
builder.try_build().await.context(BuildMitoEngineSnafu)?
}
DatanodeWalConfig::Kafka(kafka_config) => {
if kafka_config.create_index && opts.node_id.is_none() {
warn!("The WAL index creation only available in distributed mode.")
@@ -488,16 +512,21 @@ impl DatanodeBuilder {
None
};
MitoEngine::new(
let builder = MitoEngineBuilder::new(
&opts.storage.data_home,
config,
Self::build_kafka_log_store(kafka_config, global_index_collector).await?,
object_store_manager,
schema_metadata_manager,
plugins,
)
.await
.context(BuildMitoEngineSnafu)?
);
#[cfg(feature = "enterprise")]
let builder = builder.with_extension_range_provider_factory(
self.extension_range_provider_factory.take(),
);
builder.try_build().await.context(BuildMitoEngineSnafu)?
}
};
Ok(mito_engine)

View File

@@ -7,6 +7,7 @@ license.workspace = true
[features]
default = []
test = ["common-test-util", "rstest", "rstest_reuse", "rskafka"]
enterprise = []
[lints]
workspace = true

View File

@@ -97,6 +97,8 @@ use crate::error::{
InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
SerdeJsonSnafu,
};
#[cfg(feature = "enterprise")]
use crate::extension::BoxedExtensionRangeProviderFactory;
use crate::manifest::action::RegionEdit;
use crate::memtable::MemtableStats;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
@@ -113,6 +115,81 @@ use crate::worker::WorkerGroup;
pub const MITO_ENGINE_NAME: &str = "mito";
pub struct MitoEngineBuilder<'a, S: LogStore> {
data_home: &'a str,
config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
}
impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
pub fn new(
data_home: &'a str,
config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Self {
Self {
data_home,
config,
log_store,
object_store_manager,
schema_metadata_manager,
plugins,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: None,
}
}
#[cfg(feature = "enterprise")]
#[must_use]
pub fn with_extension_range_provider_factory(
self,
extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
) -> Self {
Self {
extension_range_provider_factory,
..self
}
}
pub async fn try_build(mut self) -> Result<MitoEngine> {
self.config.sanitize(self.data_home)?;
let config = Arc::new(self.config);
let workers = WorkerGroup::start(
config.clone(),
self.log_store.clone(),
self.object_store_manager,
self.schema_metadata_manager,
self.plugins,
)
.await?;
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
let inner = EngineInner {
workers,
config,
wal_raw_entry_reader,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: None,
};
#[cfg(feature = "enterprise")]
let inner =
inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
Ok(MitoEngine {
inner: Arc::new(inner),
})
}
}
/// Region engine implementation for timeseries data.
#[derive(Clone)]
pub struct MitoEngine {
@@ -123,26 +200,21 @@ impl MitoEngine {
/// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`.
pub async fn new<S: LogStore>(
data_home: &str,
mut config: MitoConfig,
config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Result<MitoEngine> {
config.sanitize(data_home)?;
Ok(MitoEngine {
inner: Arc::new(
EngineInner::new(
config,
log_store,
object_store_manager,
schema_metadata_manager,
plugins,
)
.await?,
),
})
let builder = MitoEngineBuilder::new(
data_home,
config,
log_store,
object_store_manager,
schema_metadata_manager,
plugins,
);
builder.try_build().await
}
/// Returns true if the specific region exists.
@@ -316,6 +388,8 @@ struct EngineInner {
config: Arc<MitoConfig>,
/// The Wal raw entry reader.
wal_raw_entry_reader: Arc<dyn RawEntryReader>,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
}
type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
@@ -352,28 +426,16 @@ fn prepare_batch_open_requests(
}
impl EngineInner {
/// Returns a new [EngineInner] with specific `config`, `log_store` and `object_store`.
async fn new<S: LogStore>(
config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Result<EngineInner> {
let config = Arc::new(config);
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
Ok(EngineInner {
workers: WorkerGroup::start(
config.clone(),
log_store,
object_store_manager,
schema_metadata_manager,
plugins,
)
.await?,
config,
wal_raw_entry_reader,
})
#[cfg(feature = "enterprise")]
#[must_use]
fn with_extension_range_provider_factory(
self,
extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
) -> Self {
Self {
extension_range_provider_factory,
..self
}
}
/// Stop the inner engine.
@@ -524,9 +586,27 @@ impl EngineInner {
.with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
.with_start_time(query_start);
#[cfg(feature = "enterprise")]
let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
Ok(scan_region)
}
#[cfg(feature = "enterprise")]
fn maybe_fill_extension_range_provider(
&self,
mut scan_region: ScanRegion,
region: MitoRegionRef,
) -> ScanRegion {
if region.is_follower()
&& let Some(factory) = self.extension_range_provider_factory.as_ref()
{
scan_region
.set_extension_range_provider(factory.create_extension_range_provider(region));
}
scan_region
}
/// Converts the [`RegionRole`].
fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
let region = self.find_region(region_id)?;
@@ -756,6 +836,8 @@ impl MitoEngine {
.await?,
config,
wal_raw_entry_reader,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: None,
}),
})
}

View File

@@ -31,7 +31,7 @@ use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::error::{self, Error};
use crate::error::Error;
use crate::test_util::{
build_rows, flush_region, kafka_log_store_factory, prepare_test_for_kafka_log_store, put_rows,
raft_engine_log_store_factory, rows_schema, single_kafka_log_store_factory,
@@ -255,7 +255,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option<LogStoreFacto
.unwrap_err();
let err = err.as_any().downcast_ref::<Error>().unwrap();
assert_matches!(err, error::Error::UnexpectedReplay { .. });
assert_matches!(err, Error::Unexpected { .. });
// It should ignore requests to writable regions.
region.set_role(RegionRole::Leader);

View File

@@ -64,18 +64,6 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}",
region_id, expected_last_entry_id, replayed_last_entry_id
))]
UnexpectedReplay {
#[snafu(implicit)]
location: Location,
region_id: RegionId,
expected_last_entry_id: u64,
replayed_last_entry_id: u64,
},
#[snafu(display("OpenDAL operator failed"))]
OpenDal {
#[snafu(implicit)]
@@ -960,20 +948,6 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Unexpected impure default value with region_id: {}, column: {}, default_value: {}",
region_id,
column,
default_value
))]
UnexpectedImpureDefault {
#[snafu(implicit)]
location: Location,
region_id: RegionId,
column: String,
default_value: String,
},
#[snafu(display("Manual compaction is override by following operations."))]
ManualCompactionOverride {},
@@ -1020,6 +994,21 @@ pub enum Error {
location: Location,
source: mito_codec::error::Error,
},
#[snafu(display("Unexpected: {reason}"))]
Unexpected {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "enterprise")]
#[snafu(display("Failed to scan external range"))]
ScanExternalRange {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1058,12 +1047,11 @@ impl ErrorExt for Error {
| CreateDefault { .. }
| InvalidParquet { .. }
| OperateAbortedIndex { .. }
| UnexpectedReplay { .. }
| IndexEncodeNull { .. }
| UnexpectedImpureDefault { .. }
| NoCheckpoint { .. }
| NoManifests { .. }
| InstallManifestTo { .. } => StatusCode::Unexpected,
| InstallManifestTo { .. }
| Unexpected { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }
@@ -1175,6 +1163,9 @@ impl ErrorExt for Error {
ConvertBulkWalEntry { source, .. } => source.status_code(),
Encode { source, .. } | Decode { source, .. } => source.status_code(),
#[cfg(feature = "enterprise")]
ScanExternalRange { source, .. } => source.status_code(),
}
}

View File

@@ -0,0 +1,75 @@
use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_time::range::TimestampRange;
use common_time::Timestamp;
use store_api::storage::ScanRequest;
use crate::error::Result;
use crate::read::range::RowGroupIndex;
use crate::read::scan_region::StreamContext;
use crate::read::scan_util::PartitionMetrics;
use crate::read::BoxedBatchStream;
use crate::region::MitoRegionRef;
pub type InclusiveTimeRange = (Timestamp, Timestamp);
/// [`ExtensionRange`] is used to represent a scannable "range" for mito engine, just like the
/// memtable range and sst file range, but resides on the outside.
/// It can be scanned side by side as other ranges to produce the final result, so it's very useful
/// to extend the source of data in GreptimeDB.
pub trait ExtensionRange: Send + Sync {
/// The number of rows in this range.
fn num_rows(&self) -> u64;
/// The timestamp of the start and end (both inclusive) of the data within this range.
fn time_range(&self) -> InclusiveTimeRange;
/// The row groups number in this range.
fn num_row_groups(&self) -> u64;
/// Create the reader for reading this range.
fn reader(&self, context: &StreamContext) -> BoxedExtensionRangeReader;
}
pub type BoxedExtensionRange = Box<dyn ExtensionRange>;
/// The reader to read an extension range.
#[async_trait]
pub trait ExtensionRangeReader: Send {
/// Read the extension range by creating a stream that produces [`Batch`].
async fn read(
self: Box<Self>,
context: Arc<StreamContext>,
metrics: PartitionMetrics,
index: RowGroupIndex,
) -> Result<BoxedBatchStream, BoxedError>;
}
pub type BoxedExtensionRangeReader = Box<dyn ExtensionRangeReader>;
/// The provider to feed the extension ranges into the mito scanner.
#[async_trait]
pub trait ExtensionRangeProvider: Send + Sync {
/// Find the extension ranges by the timestamp filter and the [`ScanRequest`].
async fn find_extension_ranges(
&self,
timestamp_range: TimestampRange,
request: &ScanRequest,
) -> Result<Vec<BoxedExtensionRange>> {
let _ = timestamp_range;
let _ = request;
Ok(vec![])
}
}
pub type BoxedExtensionRangeProvider = Box<dyn ExtensionRangeProvider>;
/// The factory to create an [`ExtensionRangeProvider`], injecting some utilities.
pub trait ExtensionRangeProviderFactory: Send + Sync {
fn create_extension_range_provider(&self, region: MitoRegionRef)
-> BoxedExtensionRangeProvider;
}
pub type BoxedExtensionRangeProviderFactory = Box<dyn ExtensionRangeProviderFactory>;

View File

@@ -33,6 +33,8 @@ pub mod compaction;
pub mod config;
pub mod engine;
pub mod error;
#[cfg(feature = "enterprise")]
pub mod extension;
pub mod flush;
pub mod manifest;
pub mod memtable;

View File

@@ -28,7 +28,7 @@ use store_api::storage::{RegionId, SequenceNumber};
use crate::error::{
ComputeArrowSnafu, CreateDefaultSnafu, InvalidRequestSnafu, NewRecordBatchSnafu, Result,
UnexpectedImpureDefaultSnafu,
UnexpectedSnafu,
};
use crate::sst::parquet::plain_format::PLAIN_FIXED_POS_COLUMN_NUM;
@@ -202,10 +202,13 @@ fn fill_column_put_default(
num_rows: usize,
) -> Result<ArrayRef> {
if column.column_schema.is_default_impure() {
return UnexpectedImpureDefaultSnafu {
region_id,
column: &column.column_schema.name,
default_value: format!("{:?}", column.column_schema.default_constraint()),
return UnexpectedSnafu {
reason: format!(
"unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
region_id,
column.column_schema.name,
column.column_schema.default_constraint(),
),
}
.fail();
}

View File

@@ -97,6 +97,9 @@ impl RangeMeta {
Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
#[cfg(feature = "enterprise")]
Self::push_extension_ranges(input.extension_ranges(), &mut ranges);
let ranges = group_ranges_for_seq_scan(ranges);
if compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) {
// We don't split ranges in compaction or TimeSeriesDistribution::PerSeries.
@@ -116,6 +119,9 @@ impl RangeMeta {
&mut ranges,
);
#[cfg(feature = "enterprise")]
Self::push_extension_ranges(input.extension_ranges(), &mut ranges);
ranges
}
@@ -313,6 +319,28 @@ impl RangeMeta {
});
}
}
#[cfg(feature = "enterprise")]
fn push_extension_ranges(
ranges: &[crate::extension::BoxedExtensionRange],
metas: &mut Vec<RangeMeta>,
) {
for range in ranges.iter() {
let index = metas.len();
metas.push(RangeMeta {
time_range: range.time_range(),
indices: smallvec![SourceIndex {
index,
num_row_groups: range.num_row_groups(),
}],
row_group_indices: smallvec![RowGroupIndex {
index,
row_group_index: ALL_ROW_GROUPS,
}],
num_rows: range.num_rows() as usize,
});
}
}
}
/// Groups ranges by time range.

View File

@@ -41,6 +41,8 @@ use crate::access_layer::AccessLayerRef;
use crate::cache::CacheStrategy;
use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
use crate::error::Result;
#[cfg(feature = "enterprise")]
use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
use crate::memtable::MemtableRange;
use crate::metrics::READ_SST_COUNT;
use crate::read::compat::{self, CompatBatch};
@@ -208,6 +210,8 @@ pub(crate) struct ScanRegion {
/// Whether to filter out the deleted rows.
/// Usually true for normal read, and false for scan for compaction.
filter_deleted: bool,
#[cfg(feature = "enterprise")]
extension_range_provider: Option<BoxedExtensionRangeProvider>,
}
impl ScanRegion {
@@ -229,6 +233,8 @@ impl ScanRegion {
ignore_bloom_filter: false,
start_time: None,
filter_deleted: true,
#[cfg(feature = "enterprise")]
extension_range_provider: None,
}
}
@@ -273,6 +279,14 @@ impl ScanRegion {
self.filter_deleted = filter_deleted;
}
#[cfg(feature = "enterprise")]
pub(crate) fn set_extension_range_provider(
&mut self,
extension_range_provider: BoxedExtensionRangeProvider,
) {
self.extension_range_provider = Some(extension_range_provider);
}
/// Returns a [Scanner] to scan the region.
pub(crate) async fn scanner(self) -> Result<Scanner> {
if self.use_series_scan() {
@@ -436,6 +450,16 @@ impl ScanRegion {
.with_merge_mode(self.version.options.merge_mode())
.with_series_row_selector(self.request.series_row_selector)
.with_distribution(self.request.distribution);
#[cfg(feature = "enterprise")]
let input = if let Some(provider) = self.extension_range_provider {
let ranges = provider
.find_extension_ranges(time_range, &self.request)
.await?;
input.with_extension_ranges(ranges)
} else {
input
};
Ok(input)
}
@@ -622,6 +646,8 @@ pub struct ScanInput {
pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
/// Hint for the required distribution of the scanner.
pub(crate) distribution: Option<TimeSeriesDistribution>,
#[cfg(feature = "enterprise")]
extension_ranges: Vec<BoxedExtensionRange>,
}
impl ScanInput {
@@ -647,6 +673,8 @@ impl ScanInput {
merge_mode: MergeMode::default(),
series_row_selector: None,
distribution: None,
#[cfg(feature = "enterprise")]
extension_ranges: Vec::new(),
}
}
@@ -889,7 +917,16 @@ impl ScanInput {
pub(crate) fn total_rows(&self) -> usize {
let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
rows_in_files + rows_in_memtables
let rows = rows_in_files + rows_in_memtables;
#[cfg(feature = "enterprise")]
let rows = rows
+ self
.extension_ranges
.iter()
.map(|x| x.num_rows())
.sum::<u64>() as usize;
rows
}
/// Returns table predicate of all exprs.
@@ -912,6 +949,28 @@ impl ScanInput {
}
}
#[cfg(feature = "enterprise")]
impl ScanInput {
#[must_use]
pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
Self {
extension_ranges,
..self
}
}
#[cfg(feature = "enterprise")]
pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
&self.extension_ranges
}
/// Get a boxed [ExtensionRange] by the index in all ranges.
#[cfg(feature = "enterprise")]
pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
&self.extension_ranges[i - self.num_memtables() - self.num_files()]
}
}
#[cfg(test)]
impl ScanInput {
/// Returns SST file ids to scan.
@@ -965,6 +1024,11 @@ impl StreamContext {
self.input.num_memtables() > index.index
}
pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
!self.is_mem_range_index(index)
&& index.index < self.input.num_files() + self.input.num_memtables()
}
/// Retrieves the partition ranges.
pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
self.ranges

View File

@@ -23,6 +23,7 @@ use common_telemetry::debug;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
use futures::Stream;
use prometheus::IntGauge;
use smallvec::SmallVec;
use store_api::storage::RegionId;
use crate::error::Result;
@@ -32,8 +33,9 @@ use crate::metrics::{
};
use crate::read::range::{RangeBuilderList, RowGroupIndex};
use crate::read::scan_region::StreamContext;
use crate::read::{Batch, ScannerMetrics, Source};
use crate::read::{Batch, BoxedBatchStream, ScannerMetrics, Source};
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
/// Verbose scan metrics for a partition.
@@ -465,7 +467,7 @@ impl PartitionMetrics {
}
/// Merges [ReaderMetrics] and `build_reader_cost`.
pub(crate) fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
self.0.build_parts_cost.add_duration(metrics.build_cost);
let mut metrics_set = self.0.metrics.lock().unwrap();
@@ -534,18 +536,37 @@ pub(crate) fn scan_mem_ranges(
}
/// Scans file ranges at `index`.
pub(crate) fn scan_file_ranges(
pub(crate) async fn scan_file_ranges(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
index: RowGroupIndex,
read_type: &'static str,
range_builder: Arc<RangeBuilderList>,
) -> Result<impl Stream<Item = Result<Batch>>> {
let mut reader_metrics = ReaderMetrics::default();
let ranges = range_builder
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
.await?;
part_metrics.inc_num_file_ranges(ranges.len());
part_metrics.merge_reader_metrics(&reader_metrics);
Ok(build_file_range_scan_stream(
stream_ctx,
part_metrics,
read_type,
ranges,
))
}
/// Build the stream of scanning the input [`FileRange`]s.
pub fn build_file_range_scan_stream(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
read_type: &'static str,
ranges: SmallVec<[FileRange; 2]>,
) -> impl Stream<Item = Result<Batch>> {
try_stream! {
let mut reader_metrics = ReaderMetrics::default();
let ranges = range_builder.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics).await?;
part_metrics.inc_num_file_ranges(ranges.len());
let reader_metrics = &mut ReaderMetrics::default();
for range in ranges {
let build_reader_start = Instant::now();
let reader = range.reader(stream_ctx.input.series_row_selector).await?;
@@ -568,6 +589,46 @@ pub(crate) fn scan_file_ranges(
// Reports metrics.
reader_metrics.observe_rows(read_type);
reader_metrics.filter_metrics.observe();
part_metrics.merge_reader_metrics(&reader_metrics);
part_metrics.merge_reader_metrics(reader_metrics);
}
}
/// Build the stream of scanning the extension range denoted by the [`RowGroupIndex`].
#[cfg(feature = "enterprise")]
pub(crate) async fn scan_extension_range(
context: Arc<StreamContext>,
index: RowGroupIndex,
metrics: PartitionMetrics,
) -> Result<BoxedBatchStream> {
use snafu::ResultExt;
let range = context.input.extension_range(index.index);
let reader = range.reader(context.as_ref());
reader
.read(context, metrics, index)
.await
.context(crate::error::ScanExternalRangeSnafu)
}
pub(crate) async fn maybe_scan_other_ranges(
context: &Arc<StreamContext>,
index: RowGroupIndex,
metrics: &PartitionMetrics,
) -> Result<BoxedBatchStream> {
#[cfg(feature = "enterprise")]
{
scan_extension_range(context.clone(), index, metrics.clone()).await
}
#[cfg(not(feature = "enterprise"))]
{
let _ = context;
let _ = index;
let _ = metrics;
crate::error::UnexpectedSnafu {
reason: "no other ranges scannable",
}
.fail()
}
}

View File

@@ -43,7 +43,7 @@ use crate::read::scan_util::{
scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
};
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
use crate::read::{Batch, BatchReader, BoxedBatchReader, ScannerMetrics, Source};
use crate::read::{scan_util, Batch, BatchReader, BoxedBatchReader, ScannerMetrics, Source};
use crate::region::options::MergeMode;
/// Scans a region and returns rows in a sorted sequence.
@@ -149,7 +149,8 @@ impl SeqScan {
part_metrics,
range_builder_list.clone(),
&mut sources,
);
)
.await?;
}
common_telemetry::debug!(
@@ -270,7 +271,7 @@ impl SeqScan {
&part_metrics,
range_builder_list.clone(),
&mut sources,
);
).await?;
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
@@ -426,14 +427,14 @@ impl fmt::Debug for SeqScan {
}
/// Builds sources for the partition range and push them to the `sources` vector.
pub(crate) fn build_sources(
pub(crate) async fn build_sources(
stream_ctx: &Arc<StreamContext>,
part_range: &PartitionRange,
compaction: bool,
part_metrics: &PartitionMetrics,
range_builder_list: Arc<RangeBuilderList>,
sources: &mut Vec<Source>,
) {
) -> Result<()> {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range.identifier];
#[cfg(debug_assertions)]
@@ -460,7 +461,7 @@ pub(crate) fn build_sources(
range_meta.time_range,
);
Box::pin(stream) as _
} else {
} else if stream_ctx.is_file_range_index(*index) {
let read_type = if compaction {
"compaction"
} else {
@@ -472,11 +473,15 @@ pub(crate) fn build_sources(
*index,
read_type,
range_builder_list.clone(),
);
)
.await?;
Box::pin(stream) as _
} else {
scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?
};
sources.push(Source::Stream(stream));
}
Ok(())
}
#[cfg(test)]

View File

@@ -350,7 +350,8 @@ impl SeriesDistributor {
&part_metrics,
range_builder_list.clone(),
&mut sources,
);
)
.await?;
}
}

View File

@@ -36,7 +36,7 @@ use crate::read::scan_util::{
scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
};
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
use crate::read::{Batch, ScannerMetrics};
use crate::read::{scan_util, Batch, ScannerMetrics};
/// Scans a region without providing any output ordering guarantee.
///
@@ -94,7 +94,7 @@ impl UnorderedScan {
part_metrics: PartitionMetrics,
range_builder_list: Arc<RangeBuilderList>,
) -> impl Stream<Item = Result<Batch>> {
stream! {
try_stream! {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range_id];
for index in &range_meta.row_group_indices {
@@ -106,18 +106,27 @@ impl UnorderedScan {
range_meta.time_range,
);
for await batch in stream {
yield batch;
yield batch?;
}
} else {
} else if stream_ctx.is_file_range_index(*index) {
let stream = scan_file_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
"unordered_scan_files",
range_builder_list.clone(),
);
).await?;
for await batch in stream {
yield batch;
yield batch?;
}
} else {
let stream = scan_util::maybe_scan_other_ranges(
&stream_ctx,
*index,
&part_metrics,
).await?;
for await batch in stream {
yield batch?;
}
}
}

View File

@@ -44,7 +44,7 @@ use tokio::sync::oneshot::{self, Receiver, Sender};
use crate::error::{
CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedImpureDefaultSnafu,
FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu,
};
use crate::manifest::action::RegionEdit;
use crate::memtable::bulk::part::BulkPart;
@@ -380,10 +380,13 @@ impl WriteRequest {
OpType::Put => {
// For put requests, we use the default value from column schema.
if column.column_schema.is_default_impure() {
UnexpectedImpureDefaultSnafu {
region_id: self.region_id,
column: &column.column_schema.name,
default_value: format!("{:?}", column.column_schema.default_constraint()),
UnexpectedSnafu {
reason: format!(
"unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
self.region_id,
column.column_schema.name,
column.column_schema.default_constraint(),
),
}
.fail()?
}
@@ -1201,7 +1204,7 @@ mod tests {
.fill_missing_columns(&metadata)
.unwrap_err()
.to_string()
.contains("Unexpected impure default value with region_id"));
.contains("unexpected impure default value with region_id"));
}
#[test]

View File

@@ -90,10 +90,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
ensure!(
// The replayed last entry id may be greater than the `expected_last_entry_id`.
last_entry_id >= expected_last_entry_id,
error::UnexpectedReplaySnafu {
region_id,
expected_last_entry_id,
replayed_last_entry_id: last_entry_id,
error::UnexpectedSnafu {
reason: format!(
"failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}",
region_id, expected_last_entry_id, last_entry_id,
),
}
)
}