diff --git a/licenserc.toml b/licenserc.toml index 44d644e76b..3330dabe34 100644 --- a/licenserc.toml +++ b/licenserc.toml @@ -34,6 +34,7 @@ excludes = [ "src/sql/src/statements/drop/trigger.rs", "src/sql/src/parsers/create_parser/trigger.rs", "src/sql/src/parsers/show_parser/trigger.rs", + "src/mito2/src/extension.rs", ] [properties] diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 5c87641947..dcf49199b3 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -40,7 +40,7 @@ use log_store::raft_engine::log_store::RaftEngineLogStore; use meta_client::MetaClientRef; use metric_engine::engine::MetricEngine; use mito2::config::MitoConfig; -use mito2::engine::MitoEngine; +use mito2::engine::{MitoEngine, MitoEngineBuilder}; use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::util::normalize_dir; use query::dummy_catalog::TableProviderFactoryRef; @@ -162,6 +162,8 @@ pub struct DatanodeBuilder { meta_client: Option, kv_backend: KvBackendRef, cache_registry: Option>, + #[cfg(feature = "enterprise")] + extension_range_provider_factory: Option, } impl DatanodeBuilder { @@ -173,6 +175,8 @@ impl DatanodeBuilder { meta_client: None, kv_backend, cache_registry: None, + #[cfg(feature = "enterprise")] + extension_range_provider_factory: None, } } @@ -199,6 +203,15 @@ impl DatanodeBuilder { self } + #[cfg(feature = "enterprise")] + pub fn with_extension_range_provider( + &mut self, + extension_range_provider_factory: mito2::extension::BoxedExtensionRangeProviderFactory, + ) -> &mut Self { + self.extension_range_provider_factory = Some(extension_range_provider_factory); + self + } + pub async fn build(mut self) -> Result { let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; @@ -340,7 +353,7 @@ impl DatanodeBuilder { } async fn new_region_server( - &self, + &mut self, schema_metadata_manager: SchemaMetadataManagerRef, event_listener: RegionServerEventListenerRef, ) -> Result { @@ -376,13 +389,13 @@ impl DatanodeBuilder { ); let object_store_manager = Self::build_object_store_manager(&opts.storage).await?; - let engines = Self::build_store_engines( - opts, - object_store_manager, - schema_metadata_manager, - self.plugins.clone(), - ) - .await?; + let engines = self + .build_store_engines( + object_store_manager, + schema_metadata_manager, + self.plugins.clone(), + ) + .await?; for engine in engines { region_server.register_engine(engine); } @@ -394,7 +407,7 @@ impl DatanodeBuilder { /// Builds [RegionEngineRef] from `store_engine` section in `opts` async fn build_store_engines( - opts: &DatanodeOptions, + &mut self, object_store_manager: ObjectStoreManagerRef, schema_metadata_manager: SchemaMetadataManagerRef, plugins: Plugins, @@ -403,7 +416,7 @@ impl DatanodeBuilder { let mut mito_engine_config = MitoConfig::default(); let mut file_engine_config = file_engine::config::EngineConfig::default(); - for engine in &opts.region_engine { + for engine in &self.opts.region_engine { match engine { RegionEngineConfig::Mito(config) => { mito_engine_config = config.clone(); @@ -417,14 +430,14 @@ impl DatanodeBuilder { } } - let mito_engine = Self::build_mito_engine( - opts, - object_store_manager.clone(), - mito_engine_config, - schema_metadata_manager.clone(), - plugins.clone(), - ) - .await?; + let mito_engine = self + .build_mito_engine( + object_store_manager.clone(), + mito_engine_config, + schema_metadata_manager.clone(), + plugins.clone(), + ) + .await?; let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config) .context(BuildMetricEngineSnafu)?; @@ -443,12 +456,13 @@ impl DatanodeBuilder { /// Builds [MitoEngine] according to options. async fn build_mito_engine( - opts: &DatanodeOptions, + &mut self, object_store_manager: ObjectStoreManagerRef, mut config: MitoConfig, schema_metadata_manager: SchemaMetadataManagerRef, plugins: Plugins, ) -> Result { + let opts = &self.opts; if opts.storage.is_object_storage() { // Enable the write cache when setting object storage config.enable_write_cache = true; @@ -456,17 +470,27 @@ impl DatanodeBuilder { } let mito_engine = match &opts.wal { - DatanodeWalConfig::RaftEngine(raft_engine_config) => MitoEngine::new( - &opts.storage.data_home, - config, - Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config) - .await?, - object_store_manager, - schema_metadata_manager, - plugins, - ) - .await - .context(BuildMitoEngineSnafu)?, + DatanodeWalConfig::RaftEngine(raft_engine_config) => { + let log_store = + Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config) + .await?; + + let builder = MitoEngineBuilder::new( + &opts.storage.data_home, + config, + log_store, + object_store_manager, + schema_metadata_manager, + plugins, + ); + + #[cfg(feature = "enterprise")] + let builder = builder.with_extension_range_provider_factory( + self.extension_range_provider_factory.take(), + ); + + builder.try_build().await.context(BuildMitoEngineSnafu)? + } DatanodeWalConfig::Kafka(kafka_config) => { if kafka_config.create_index && opts.node_id.is_none() { warn!("The WAL index creation only available in distributed mode.") @@ -488,16 +512,21 @@ impl DatanodeBuilder { None }; - MitoEngine::new( + let builder = MitoEngineBuilder::new( &opts.storage.data_home, config, Self::build_kafka_log_store(kafka_config, global_index_collector).await?, object_store_manager, schema_metadata_manager, plugins, - ) - .await - .context(BuildMitoEngineSnafu)? + ); + + #[cfg(feature = "enterprise")] + let builder = builder.with_extension_range_provider_factory( + self.extension_range_provider_factory.take(), + ); + + builder.try_build().await.context(BuildMitoEngineSnafu)? } }; Ok(mito_engine) diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 71255687df..2379c693f0 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [features] default = [] test = ["common-test-util", "rstest", "rstest_reuse", "rskafka"] +enterprise = [] [lints] workspace = true diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 0bc0b64a51..a6232591c4 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -97,6 +97,8 @@ use crate::error::{ InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu, }; +#[cfg(feature = "enterprise")] +use crate::extension::BoxedExtensionRangeProviderFactory; use crate::manifest::action::RegionEdit; use crate::memtable::MemtableStats; use crate::metrics::HANDLE_REQUEST_ELAPSED; @@ -113,6 +115,81 @@ use crate::worker::WorkerGroup; pub const MITO_ENGINE_NAME: &str = "mito"; +pub struct MitoEngineBuilder<'a, S: LogStore> { + data_home: &'a str, + config: MitoConfig, + log_store: Arc, + object_store_manager: ObjectStoreManagerRef, + schema_metadata_manager: SchemaMetadataManagerRef, + plugins: Plugins, + #[cfg(feature = "enterprise")] + extension_range_provider_factory: Option, +} + +impl<'a, S: LogStore> MitoEngineBuilder<'a, S> { + pub fn new( + data_home: &'a str, + config: MitoConfig, + log_store: Arc, + object_store_manager: ObjectStoreManagerRef, + schema_metadata_manager: SchemaMetadataManagerRef, + plugins: Plugins, + ) -> Self { + Self { + data_home, + config, + log_store, + object_store_manager, + schema_metadata_manager, + plugins, + #[cfg(feature = "enterprise")] + extension_range_provider_factory: None, + } + } + + #[cfg(feature = "enterprise")] + #[must_use] + pub fn with_extension_range_provider_factory( + self, + extension_range_provider_factory: Option, + ) -> Self { + Self { + extension_range_provider_factory, + ..self + } + } + + pub async fn try_build(mut self) -> Result { + self.config.sanitize(self.data_home)?; + + let config = Arc::new(self.config); + let workers = WorkerGroup::start( + config.clone(), + self.log_store.clone(), + self.object_store_manager, + self.schema_metadata_manager, + self.plugins, + ) + .await?; + let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store)); + let inner = EngineInner { + workers, + config, + wal_raw_entry_reader, + #[cfg(feature = "enterprise")] + extension_range_provider_factory: None, + }; + + #[cfg(feature = "enterprise")] + let inner = + inner.with_extension_range_provider_factory(self.extension_range_provider_factory); + + Ok(MitoEngine { + inner: Arc::new(inner), + }) + } +} + /// Region engine implementation for timeseries data. #[derive(Clone)] pub struct MitoEngine { @@ -123,26 +200,21 @@ impl MitoEngine { /// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`. pub async fn new( data_home: &str, - mut config: MitoConfig, + config: MitoConfig, log_store: Arc, object_store_manager: ObjectStoreManagerRef, schema_metadata_manager: SchemaMetadataManagerRef, plugins: Plugins, ) -> Result { - config.sanitize(data_home)?; - - Ok(MitoEngine { - inner: Arc::new( - EngineInner::new( - config, - log_store, - object_store_manager, - schema_metadata_manager, - plugins, - ) - .await?, - ), - }) + let builder = MitoEngineBuilder::new( + data_home, + config, + log_store, + object_store_manager, + schema_metadata_manager, + plugins, + ); + builder.try_build().await } /// Returns true if the specific region exists. @@ -316,6 +388,8 @@ struct EngineInner { config: Arc, /// The Wal raw entry reader. wal_raw_entry_reader: Arc, + #[cfg(feature = "enterprise")] + extension_range_provider_factory: Option, } type TopicGroupedRegionOpenRequests = HashMap>; @@ -352,28 +426,16 @@ fn prepare_batch_open_requests( } impl EngineInner { - /// Returns a new [EngineInner] with specific `config`, `log_store` and `object_store`. - async fn new( - config: MitoConfig, - log_store: Arc, - object_store_manager: ObjectStoreManagerRef, - schema_metadata_manager: SchemaMetadataManagerRef, - plugins: Plugins, - ) -> Result { - let config = Arc::new(config); - let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone())); - Ok(EngineInner { - workers: WorkerGroup::start( - config.clone(), - log_store, - object_store_manager, - schema_metadata_manager, - plugins, - ) - .await?, - config, - wal_raw_entry_reader, - }) + #[cfg(feature = "enterprise")] + #[must_use] + fn with_extension_range_provider_factory( + self, + extension_range_provider_factory: Option, + ) -> Self { + Self { + extension_range_provider_factory, + ..self + } } /// Stop the inner engine. @@ -524,9 +586,27 @@ impl EngineInner { .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled()) .with_start_time(query_start); + #[cfg(feature = "enterprise")] + let scan_region = self.maybe_fill_extension_range_provider(scan_region, region); + Ok(scan_region) } + #[cfg(feature = "enterprise")] + fn maybe_fill_extension_range_provider( + &self, + mut scan_region: ScanRegion, + region: MitoRegionRef, + ) -> ScanRegion { + if region.is_follower() + && let Some(factory) = self.extension_range_provider_factory.as_ref() + { + scan_region + .set_extension_range_provider(factory.create_extension_range_provider(region)); + } + scan_region + } + /// Converts the [`RegionRole`]. fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> { let region = self.find_region(region_id)?; @@ -756,6 +836,8 @@ impl MitoEngine { .await?, config, wal_raw_entry_reader, + #[cfg(feature = "enterprise")] + extension_range_provider_factory: None, }), }) } diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs index deb39e507b..73134265cb 100644 --- a/src/mito2/src/engine/catchup_test.rs +++ b/src/mito2/src/engine/catchup_test.rs @@ -31,7 +31,7 @@ use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; use crate::engine::MitoEngine; -use crate::error::{self, Error}; +use crate::error::Error; use crate::test_util::{ build_rows, flush_region, kafka_log_store_factory, prepare_test_for_kafka_log_store, put_rows, raft_engine_log_store_factory, rows_schema, single_kafka_log_store_factory, @@ -255,7 +255,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option().unwrap(); - assert_matches!(err, error::Error::UnexpectedReplay { .. }); + assert_matches!(err, Error::Unexpected { .. }); // It should ignore requests to writable regions. region.set_role(RegionRole::Leader); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index b1ae251753..6ea0386f03 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -64,18 +64,6 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}", - region_id, expected_last_entry_id, replayed_last_entry_id - ))] - UnexpectedReplay { - #[snafu(implicit)] - location: Location, - region_id: RegionId, - expected_last_entry_id: u64, - replayed_last_entry_id: u64, - }, - #[snafu(display("OpenDAL operator failed"))] OpenDal { #[snafu(implicit)] @@ -960,20 +948,6 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Unexpected impure default value with region_id: {}, column: {}, default_value: {}", - region_id, - column, - default_value - ))] - UnexpectedImpureDefault { - #[snafu(implicit)] - location: Location, - region_id: RegionId, - column: String, - default_value: String, - }, - #[snafu(display("Manual compaction is override by following operations."))] ManualCompactionOverride {}, @@ -1020,6 +994,21 @@ pub enum Error { location: Location, source: mito_codec::error::Error, }, + + #[snafu(display("Unexpected: {reason}"))] + Unexpected { + reason: String, + #[snafu(implicit)] + location: Location, + }, + + #[cfg(feature = "enterprise")] + #[snafu(display("Failed to scan external range"))] + ScanExternalRange { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -1058,12 +1047,11 @@ impl ErrorExt for Error { | CreateDefault { .. } | InvalidParquet { .. } | OperateAbortedIndex { .. } - | UnexpectedReplay { .. } | IndexEncodeNull { .. } - | UnexpectedImpureDefault { .. } | NoCheckpoint { .. } | NoManifests { .. } - | InstallManifestTo { .. } => StatusCode::Unexpected, + | InstallManifestTo { .. } + | Unexpected { .. } => StatusCode::Unexpected, RegionNotFound { .. } => StatusCode::RegionNotFound, ObjectStoreNotFound { .. } @@ -1175,6 +1163,9 @@ impl ErrorExt for Error { ConvertBulkWalEntry { source, .. } => source.status_code(), Encode { source, .. } | Decode { source, .. } => source.status_code(), + + #[cfg(feature = "enterprise")] + ScanExternalRange { source, .. } => source.status_code(), } } diff --git a/src/mito2/src/extension.rs b/src/mito2/src/extension.rs new file mode 100644 index 0000000000..2f93731873 --- /dev/null +++ b/src/mito2/src/extension.rs @@ -0,0 +1,75 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use common_error::ext::BoxedError; +use common_time::range::TimestampRange; +use common_time::Timestamp; +use store_api::storage::ScanRequest; + +use crate::error::Result; +use crate::read::range::RowGroupIndex; +use crate::read::scan_region::StreamContext; +use crate::read::scan_util::PartitionMetrics; +use crate::read::BoxedBatchStream; +use crate::region::MitoRegionRef; + +pub type InclusiveTimeRange = (Timestamp, Timestamp); + +/// [`ExtensionRange`] is used to represent a scannable "range" for mito engine, just like the +/// memtable range and sst file range, but resides on the outside. +/// It can be scanned side by side as other ranges to produce the final result, so it's very useful +/// to extend the source of data in GreptimeDB. +pub trait ExtensionRange: Send + Sync { + /// The number of rows in this range. + fn num_rows(&self) -> u64; + + /// The timestamp of the start and end (both inclusive) of the data within this range. + fn time_range(&self) -> InclusiveTimeRange; + + /// The row groups number in this range. + fn num_row_groups(&self) -> u64; + + /// Create the reader for reading this range. + fn reader(&self, context: &StreamContext) -> BoxedExtensionRangeReader; +} + +pub type BoxedExtensionRange = Box; + +/// The reader to read an extension range. +#[async_trait] +pub trait ExtensionRangeReader: Send { + /// Read the extension range by creating a stream that produces [`Batch`]. + async fn read( + self: Box, + context: Arc, + metrics: PartitionMetrics, + index: RowGroupIndex, + ) -> Result; +} + +pub type BoxedExtensionRangeReader = Box; + +/// The provider to feed the extension ranges into the mito scanner. +#[async_trait] +pub trait ExtensionRangeProvider: Send + Sync { + /// Find the extension ranges by the timestamp filter and the [`ScanRequest`]. + async fn find_extension_ranges( + &self, + timestamp_range: TimestampRange, + request: &ScanRequest, + ) -> Result> { + let _ = timestamp_range; + let _ = request; + Ok(vec![]) + } +} + +pub type BoxedExtensionRangeProvider = Box; + +/// The factory to create an [`ExtensionRangeProvider`], injecting some utilities. +pub trait ExtensionRangeProviderFactory: Send + Sync { + fn create_extension_range_provider(&self, region: MitoRegionRef) + -> BoxedExtensionRangeProvider; +} + +pub type BoxedExtensionRangeProviderFactory = Box; diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 503b60d5c5..ad4045c86e 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -33,6 +33,8 @@ pub mod compaction; pub mod config; pub mod engine; pub mod error; +#[cfg(feature = "enterprise")] +pub mod extension; pub mod flush; pub mod manifest; pub mod memtable; diff --git a/src/mito2/src/read/plain_batch.rs b/src/mito2/src/read/plain_batch.rs index e0fc5b903c..627e93789e 100644 --- a/src/mito2/src/read/plain_batch.rs +++ b/src/mito2/src/read/plain_batch.rs @@ -28,7 +28,7 @@ use store_api::storage::{RegionId, SequenceNumber}; use crate::error::{ ComputeArrowSnafu, CreateDefaultSnafu, InvalidRequestSnafu, NewRecordBatchSnafu, Result, - UnexpectedImpureDefaultSnafu, + UnexpectedSnafu, }; use crate::sst::parquet::plain_format::PLAIN_FIXED_POS_COLUMN_NUM; @@ -202,10 +202,13 @@ fn fill_column_put_default( num_rows: usize, ) -> Result { if column.column_schema.is_default_impure() { - return UnexpectedImpureDefaultSnafu { - region_id, - column: &column.column_schema.name, - default_value: format!("{:?}", column.column_schema.default_constraint()), + return UnexpectedSnafu { + reason: format!( + "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}", + region_id, + column.column_schema.name, + column.column_schema.default_constraint(), + ), } .fail(); } diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index 41edde66d8..865d61111a 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -97,6 +97,9 @@ impl RangeMeta { Self::push_seq_mem_ranges(&input.memtables, &mut ranges); Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges); + #[cfg(feature = "enterprise")] + Self::push_extension_ranges(input.extension_ranges(), &mut ranges); + let ranges = group_ranges_for_seq_scan(ranges); if compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) { // We don't split ranges in compaction or TimeSeriesDistribution::PerSeries. @@ -116,6 +119,9 @@ impl RangeMeta { &mut ranges, ); + #[cfg(feature = "enterprise")] + Self::push_extension_ranges(input.extension_ranges(), &mut ranges); + ranges } @@ -313,6 +319,28 @@ impl RangeMeta { }); } } + + #[cfg(feature = "enterprise")] + fn push_extension_ranges( + ranges: &[crate::extension::BoxedExtensionRange], + metas: &mut Vec, + ) { + for range in ranges.iter() { + let index = metas.len(); + metas.push(RangeMeta { + time_range: range.time_range(), + indices: smallvec![SourceIndex { + index, + num_row_groups: range.num_row_groups(), + }], + row_group_indices: smallvec![RowGroupIndex { + index, + row_group_index: ALL_ROW_GROUPS, + }], + num_rows: range.num_rows() as usize, + }); + } + } } /// Groups ranges by time range. diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index ad74b525b0..a98777f9ce 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -41,6 +41,8 @@ use crate::access_layer::AccessLayerRef; use crate::cache::CacheStrategy; use crate::config::DEFAULT_SCAN_CHANNEL_SIZE; use crate::error::Result; +#[cfg(feature = "enterprise")] +use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider}; use crate::memtable::MemtableRange; use crate::metrics::READ_SST_COUNT; use crate::read::compat::{self, CompatBatch}; @@ -208,6 +210,8 @@ pub(crate) struct ScanRegion { /// Whether to filter out the deleted rows. /// Usually true for normal read, and false for scan for compaction. filter_deleted: bool, + #[cfg(feature = "enterprise")] + extension_range_provider: Option, } impl ScanRegion { @@ -229,6 +233,8 @@ impl ScanRegion { ignore_bloom_filter: false, start_time: None, filter_deleted: true, + #[cfg(feature = "enterprise")] + extension_range_provider: None, } } @@ -273,6 +279,14 @@ impl ScanRegion { self.filter_deleted = filter_deleted; } + #[cfg(feature = "enterprise")] + pub(crate) fn set_extension_range_provider( + &mut self, + extension_range_provider: BoxedExtensionRangeProvider, + ) { + self.extension_range_provider = Some(extension_range_provider); + } + /// Returns a [Scanner] to scan the region. pub(crate) async fn scanner(self) -> Result { if self.use_series_scan() { @@ -436,6 +450,16 @@ impl ScanRegion { .with_merge_mode(self.version.options.merge_mode()) .with_series_row_selector(self.request.series_row_selector) .with_distribution(self.request.distribution); + + #[cfg(feature = "enterprise")] + let input = if let Some(provider) = self.extension_range_provider { + let ranges = provider + .find_extension_ranges(time_range, &self.request) + .await?; + input.with_extension_ranges(ranges) + } else { + input + }; Ok(input) } @@ -622,6 +646,8 @@ pub struct ScanInput { pub(crate) series_row_selector: Option, /// Hint for the required distribution of the scanner. pub(crate) distribution: Option, + #[cfg(feature = "enterprise")] + extension_ranges: Vec, } impl ScanInput { @@ -647,6 +673,8 @@ impl ScanInput { merge_mode: MergeMode::default(), series_row_selector: None, distribution: None, + #[cfg(feature = "enterprise")] + extension_ranges: Vec::new(), } } @@ -889,7 +917,16 @@ impl ScanInput { pub(crate) fn total_rows(&self) -> usize { let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum(); let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum(); - rows_in_files + rows_in_memtables + + let rows = rows_in_files + rows_in_memtables; + #[cfg(feature = "enterprise")] + let rows = rows + + self + .extension_ranges + .iter() + .map(|x| x.num_rows()) + .sum::() as usize; + rows } /// Returns table predicate of all exprs. @@ -912,6 +949,28 @@ impl ScanInput { } } +#[cfg(feature = "enterprise")] +impl ScanInput { + #[must_use] + pub(crate) fn with_extension_ranges(self, extension_ranges: Vec) -> Self { + Self { + extension_ranges, + ..self + } + } + + #[cfg(feature = "enterprise")] + pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] { + &self.extension_ranges + } + + /// Get a boxed [ExtensionRange] by the index in all ranges. + #[cfg(feature = "enterprise")] + pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange { + &self.extension_ranges[i - self.num_memtables() - self.num_files()] + } +} + #[cfg(test)] impl ScanInput { /// Returns SST file ids to scan. @@ -965,6 +1024,11 @@ impl StreamContext { self.input.num_memtables() > index.index } + pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool { + !self.is_mem_range_index(index) + && index.index < self.input.num_files() + self.input.num_memtables() + } + /// Retrieves the partition ranges. pub(crate) fn partition_ranges(&self) -> Vec { self.ranges diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 5e965cb83c..ce4505ac7a 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -23,6 +23,7 @@ use common_telemetry::debug; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time}; use futures::Stream; use prometheus::IntGauge; +use smallvec::SmallVec; use store_api::storage::RegionId; use crate::error::Result; @@ -32,8 +33,9 @@ use crate::metrics::{ }; use crate::read::range::{RangeBuilderList, RowGroupIndex}; use crate::read::scan_region::StreamContext; -use crate::read::{Batch, ScannerMetrics, Source}; +use crate::read::{Batch, BoxedBatchStream, ScannerMetrics, Source}; use crate::sst::file::FileTimeRange; +use crate::sst::parquet::file_range::FileRange; use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics}; /// Verbose scan metrics for a partition. @@ -465,7 +467,7 @@ impl PartitionMetrics { } /// Merges [ReaderMetrics] and `build_reader_cost`. - pub(crate) fn merge_reader_metrics(&self, metrics: &ReaderMetrics) { + pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) { self.0.build_parts_cost.add_duration(metrics.build_cost); let mut metrics_set = self.0.metrics.lock().unwrap(); @@ -534,18 +536,37 @@ pub(crate) fn scan_mem_ranges( } /// Scans file ranges at `index`. -pub(crate) fn scan_file_ranges( +pub(crate) async fn scan_file_ranges( stream_ctx: Arc, part_metrics: PartitionMetrics, index: RowGroupIndex, read_type: &'static str, range_builder: Arc, +) -> Result>> { + let mut reader_metrics = ReaderMetrics::default(); + let ranges = range_builder + .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics) + .await?; + part_metrics.inc_num_file_ranges(ranges.len()); + part_metrics.merge_reader_metrics(&reader_metrics); + + Ok(build_file_range_scan_stream( + stream_ctx, + part_metrics, + read_type, + ranges, + )) +} + +/// Build the stream of scanning the input [`FileRange`]s. +pub fn build_file_range_scan_stream( + stream_ctx: Arc, + part_metrics: PartitionMetrics, + read_type: &'static str, + ranges: SmallVec<[FileRange; 2]>, ) -> impl Stream> { try_stream! { - let mut reader_metrics = ReaderMetrics::default(); - let ranges = range_builder.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics).await?; - part_metrics.inc_num_file_ranges(ranges.len()); - + let reader_metrics = &mut ReaderMetrics::default(); for range in ranges { let build_reader_start = Instant::now(); let reader = range.reader(stream_ctx.input.series_row_selector).await?; @@ -568,6 +589,46 @@ pub(crate) fn scan_file_ranges( // Reports metrics. reader_metrics.observe_rows(read_type); reader_metrics.filter_metrics.observe(); - part_metrics.merge_reader_metrics(&reader_metrics); + part_metrics.merge_reader_metrics(reader_metrics); + } +} + +/// Build the stream of scanning the extension range denoted by the [`RowGroupIndex`]. +#[cfg(feature = "enterprise")] +pub(crate) async fn scan_extension_range( + context: Arc, + index: RowGroupIndex, + metrics: PartitionMetrics, +) -> Result { + use snafu::ResultExt; + + let range = context.input.extension_range(index.index); + let reader = range.reader(context.as_ref()); + reader + .read(context, metrics, index) + .await + .context(crate::error::ScanExternalRangeSnafu) +} + +pub(crate) async fn maybe_scan_other_ranges( + context: &Arc, + index: RowGroupIndex, + metrics: &PartitionMetrics, +) -> Result { + #[cfg(feature = "enterprise")] + { + scan_extension_range(context.clone(), index, metrics.clone()).await + } + + #[cfg(not(feature = "enterprise"))] + { + let _ = context; + let _ = index; + let _ = metrics; + + crate::error::UnexpectedSnafu { + reason: "no other ranges scannable", + } + .fail() } } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 1e61c9bdb1..93357da454 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -43,7 +43,7 @@ use crate::read::scan_util::{ scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList, }; use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream}; -use crate::read::{Batch, BatchReader, BoxedBatchReader, ScannerMetrics, Source}; +use crate::read::{scan_util, Batch, BatchReader, BoxedBatchReader, ScannerMetrics, Source}; use crate::region::options::MergeMode; /// Scans a region and returns rows in a sorted sequence. @@ -149,7 +149,8 @@ impl SeqScan { part_metrics, range_builder_list.clone(), &mut sources, - ); + ) + .await?; } common_telemetry::debug!( @@ -270,7 +271,7 @@ impl SeqScan { &part_metrics, range_builder_list.clone(), &mut sources, - ); + ).await?; let mut metrics = ScannerMetrics::default(); let mut fetch_start = Instant::now(); @@ -426,14 +427,14 @@ impl fmt::Debug for SeqScan { } /// Builds sources for the partition range and push them to the `sources` vector. -pub(crate) fn build_sources( +pub(crate) async fn build_sources( stream_ctx: &Arc, part_range: &PartitionRange, compaction: bool, part_metrics: &PartitionMetrics, range_builder_list: Arc, sources: &mut Vec, -) { +) -> Result<()> { // Gets range meta. let range_meta = &stream_ctx.ranges[part_range.identifier]; #[cfg(debug_assertions)] @@ -460,7 +461,7 @@ pub(crate) fn build_sources( range_meta.time_range, ); Box::pin(stream) as _ - } else { + } else if stream_ctx.is_file_range_index(*index) { let read_type = if compaction { "compaction" } else { @@ -472,11 +473,15 @@ pub(crate) fn build_sources( *index, read_type, range_builder_list.clone(), - ); + ) + .await?; Box::pin(stream) as _ + } else { + scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await? }; sources.push(Source::Stream(stream)); } + Ok(()) } #[cfg(test)] diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index 4fab1fa52e..b031180bfe 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -350,7 +350,8 @@ impl SeriesDistributor { &part_metrics, range_builder_list.clone(), &mut sources, - ); + ) + .await?; } } diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index eb58c79562..f3ecc96301 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -36,7 +36,7 @@ use crate::read::scan_util::{ scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList, }; use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream}; -use crate::read::{Batch, ScannerMetrics}; +use crate::read::{scan_util, Batch, ScannerMetrics}; /// Scans a region without providing any output ordering guarantee. /// @@ -94,7 +94,7 @@ impl UnorderedScan { part_metrics: PartitionMetrics, range_builder_list: Arc, ) -> impl Stream> { - stream! { + try_stream! { // Gets range meta. let range_meta = &stream_ctx.ranges[part_range_id]; for index in &range_meta.row_group_indices { @@ -106,18 +106,27 @@ impl UnorderedScan { range_meta.time_range, ); for await batch in stream { - yield batch; + yield batch?; } - } else { + } else if stream_ctx.is_file_range_index(*index) { let stream = scan_file_ranges( stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files", range_builder_list.clone(), - ); + ).await?; for await batch in stream { - yield batch; + yield batch?; + } + } else { + let stream = scan_util::maybe_scan_other_ranges( + &stream_ctx, + *index, + &part_metrics, + ).await?; + for await batch in stream { + yield batch?; } } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 2332ea1232..b9786c6421 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -44,7 +44,7 @@ use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::error::{ CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu, - FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedImpureDefaultSnafu, + FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu, }; use crate::manifest::action::RegionEdit; use crate::memtable::bulk::part::BulkPart; @@ -380,10 +380,13 @@ impl WriteRequest { OpType::Put => { // For put requests, we use the default value from column schema. if column.column_schema.is_default_impure() { - UnexpectedImpureDefaultSnafu { - region_id: self.region_id, - column: &column.column_schema.name, - default_value: format!("{:?}", column.column_schema.default_constraint()), + UnexpectedSnafu { + reason: format!( + "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}", + self.region_id, + column.column_schema.name, + column.column_schema.default_constraint(), + ), } .fail()? } @@ -1201,7 +1204,7 @@ mod tests { .fill_missing_columns(&metadata) .unwrap_err() .to_string() - .contains("Unexpected impure default value with region_id")); + .contains("unexpected impure default value with region_id")); } #[test] diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 095c769452..44b0b1bf62 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -90,10 +90,11 @@ impl RegionWorkerLoop { ensure!( // The replayed last entry id may be greater than the `expected_last_entry_id`. last_entry_id >= expected_last_entry_id, - error::UnexpectedReplaySnafu { - region_id, - expected_last_entry_id, - replayed_last_entry_id: last_entry_id, + error::UnexpectedSnafu { + reason: format!( + "failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}", + region_id, expected_last_entry_id, last_entry_id, + ), } ) }