diff --git a/Cargo.lock b/Cargo.lock index a98826107b..3b74af8971 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7040,12 +7040,14 @@ dependencies = [ "common-macro", "common-query", "common-recordbatch", + "common-runtime", "common-telemetry", "common-test-util", "common-time", "datafusion", "datatypes", "futures-util", + "humantime-serde", "itertools 0.14.0", "lazy_static", "mito2", diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index 73d1295417..07be39dddb 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -74,6 +74,7 @@ fn test_load_datanode_example_config() { RegionEngineConfig::File(FileEngineConfig {}), RegionEngineConfig::Metric(MetricEngineConfig { experimental_sparse_primary_key_encoding: false, + flush_metadata_region_interval: Duration::from_secs(30), }), ], logging: LoggingOptions { @@ -216,6 +217,7 @@ fn test_load_standalone_example_config() { RegionEngineConfig::File(FileEngineConfig {}), RegionEngineConfig::Metric(MetricEngineConfig { experimental_sparse_primary_key_encoding: false, + flush_metadata_region_interval: Duration::from_secs(30), }), ], storage: StorageConfig { diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 4b1e720032..25cb5a9d0c 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -57,9 +57,9 @@ use tokio::sync::Notify; use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig}; use crate::error::{ - self, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingCacheSnafu, - MissingKvBackendSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu, - ShutdownServerSnafu, StartServerSnafu, + self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, + MissingCacheSnafu, MissingKvBackendSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, + ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu, }; use crate::event_listener::{ new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef, @@ -416,10 +416,11 @@ impl DatanodeBuilder { ) .await?; - let metric_engine = MetricEngine::new( + let metric_engine = MetricEngine::try_new( mito_engine.clone(), metric_engine_config.take().unwrap_or_default(), - ); + ) + .context(BuildMetricEngineSnafu)?; engines.push(Arc::new(mito_engine) as _); engines.push(Arc::new(metric_engine) as _); } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 6c63e9115f..84e8168d91 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -336,6 +336,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to build metric engine"))] + BuildMetricEngine { + source: metric_engine::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to serialize options to TOML"))] TomlFormat { #[snafu(implicit)] @@ -452,6 +459,7 @@ impl ErrorExt for Error { FindLogicalRegions { source, .. } => source.status_code(), BuildMitoEngine { source, .. } => source.status_code(), + BuildMetricEngine { source, .. } => source.status_code(), ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => { StatusCode::RegionBusy } diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index 5fe5ed3cb5..9e7a5b8545 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -18,11 +18,13 @@ common-error.workspace = true common-macro.workspace = true common-query.workspace = true common-recordbatch.workspace = true +common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true datafusion.workspace = true datatypes.workspace = true futures-util.workspace = true +humantime-serde.workspace = true itertools.workspace = true lazy_static = "1.4" mito2.workspace = true diff --git a/src/metric-engine/src/config.rs b/src/metric-engine/src/config.rs index b35b412188..20df8fa739 100644 --- a/src/metric-engine/src/config.rs +++ b/src/metric-engine/src/config.rs @@ -12,9 +12,49 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + +use common_telemetry::warn; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +/// The default flush interval of the metadata region. +pub(crate) const DEFAULT_FLUSH_METADATA_REGION_INTERVAL: Duration = Duration::from_secs(30); + +/// Configuration for the metric engine. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct EngineConfig { + /// Experimental feature to use sparse primary key encoding. pub experimental_sparse_primary_key_encoding: bool, + /// The flush interval of the metadata region. + #[serde( + with = "humantime_serde", + default = "EngineConfig::default_flush_metadata_region_interval" + )] + pub flush_metadata_region_interval: Duration, +} + +impl Default for EngineConfig { + fn default() -> Self { + Self { + flush_metadata_region_interval: DEFAULT_FLUSH_METADATA_REGION_INTERVAL, + experimental_sparse_primary_key_encoding: false, + } + } +} + +impl EngineConfig { + fn default_flush_metadata_region_interval() -> Duration { + DEFAULT_FLUSH_METADATA_REGION_INTERVAL + } + + /// Sanitizes the configuration. + pub fn sanitize(&mut self) { + if self.flush_metadata_region_interval.is_zero() { + warn!( + "Flush metadata region interval is zero, override with default value: {:?}. Disable metadata region flush is forbidden.", + DEFAULT_FLUSH_METADATA_REGION_INTERVAL + ); + self.flush_metadata_region_interval = DEFAULT_FLUSH_METADATA_REGION_INTERVAL; + } + } } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 28709264b3..15caa2c456 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -34,9 +34,11 @@ use api::region::RegionResponse; use async_trait::async_trait; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; +use common_runtime::RepeatedTask; use mito2::engine::MitoEngine; pub(crate) use options::IndexOptions; use snafu::ResultExt; +pub(crate) use state::MetricEngineState; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::region_engine::{ @@ -47,11 +49,11 @@ use store_api::region_engine::{ use store_api::region_request::{BatchRegionDdlRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; -use self::state::MetricEngineState; use crate::config::EngineConfig; use crate::data_region::DataRegion; -use crate::error::{self, Result, UnsupportedRegionRequestSnafu}; +use crate::error::{self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu}; use crate::metadata_region::MetadataRegion; +use crate::repeated_task::FlushMetadataRegionTask; use crate::row_modifier::RowModifier; use crate::utils::{self, get_region_statistic}; @@ -359,19 +361,32 @@ impl RegionEngine for MetricEngine { } impl MetricEngine { - pub fn new(mito: MitoEngine, config: EngineConfig) -> Self { + pub fn try_new(mito: MitoEngine, mut config: EngineConfig) -> Result { let metadata_region = MetadataRegion::new(mito.clone()); let data_region = DataRegion::new(mito.clone()); - Self { - inner: Arc::new(MetricEngineInner { - mito, - metadata_region, - data_region, - state: RwLock::default(), - config, - row_modifier: RowModifier::new(), - }), - } + let state = Arc::new(RwLock::default()); + config.sanitize(); + let flush_interval = config.flush_metadata_region_interval; + let inner = Arc::new(MetricEngineInner { + mito: mito.clone(), + metadata_region, + data_region, + state: state.clone(), + config, + row_modifier: RowModifier::new(), + flush_task: RepeatedTask::new( + flush_interval, + Box::new(FlushMetadataRegionTask { + state: state.clone(), + mito: mito.clone(), + }), + ), + }); + inner + .flush_task + .start(common_runtime::global_runtime()) + .context(StartRepeatedTaskSnafu { name: "flush_task" })?; + Ok(Self { inner }) } pub fn mito(&self) -> MitoEngine { @@ -426,15 +441,21 @@ impl MetricEngine { ) -> Result { self.inner.scan_to_stream(region_id, request).await } + + /// Returns the configuration of the engine. + pub fn config(&self) -> &EngineConfig { + &self.inner.config + } } struct MetricEngineInner { mito: MitoEngine, metadata_region: MetadataRegion, data_region: DataRegion, - state: RwLock, + state: Arc>, config: EngineConfig, row_modifier: RowModifier, + flush_task: RepeatedTask, } #[cfg(test)] diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index 78d2293302..596054623d 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -737,7 +737,7 @@ mod test { // set up let env = TestEnv::new().await; - let engine = MetricEngine::new(env.mito(), EngineConfig::default()); + let engine = MetricEngine::try_new(env.mito(), EngineConfig::default()).unwrap(); let engine_inner = engine.inner; // check create data region request diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 015e9d9f98..c0ae55e402 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -282,6 +282,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to start repeated task: {}", name))] + StartRepeatedTask { + name: String, + source: common_runtime::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -335,6 +343,8 @@ impl ErrorExt for Error { CollectRecordBatchStream { source, .. } => source.status_code(), + StartRepeatedTask { source, .. } => source.status_code(), + MetricManifestInfo { .. } => StatusCode::Internal, } } diff --git a/src/metric-engine/src/lib.rs b/src/metric-engine/src/lib.rs index 597e8f5897..7722f3acd1 100644 --- a/src/metric-engine/src/lib.rs +++ b/src/metric-engine/src/lib.rs @@ -59,6 +59,7 @@ pub mod engine; pub mod error; mod metadata_region; mod metrics; +mod repeated_task; pub mod row_modifier; #[cfg(test)] mod test_util; diff --git a/src/metric-engine/src/repeated_task.rs b/src/metric-engine/src/repeated_task.rs new file mode 100644 index 0000000000..e5e7f7025f --- /dev/null +++ b/src/metric-engine/src/repeated_task.rs @@ -0,0 +1,167 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, RwLock}; +use std::time::Instant; + +use common_runtime::TaskFunction; +use common_telemetry::{debug, error}; +use mito2::engine::MitoEngine; +use store_api::region_engine::{RegionEngine, RegionRole}; +use store_api::region_request::{RegionFlushRequest, RegionRequest}; + +use crate::engine::MetricEngineState; +use crate::error::{Error, Result}; +use crate::utils; + +/// Task to flush metadata regions. +/// +/// This task is used to send flush requests to the metadata regions +/// periodically. +pub(crate) struct FlushMetadataRegionTask { + pub(crate) state: Arc>, + pub(crate) mito: MitoEngine, +} + +#[async_trait::async_trait] +impl TaskFunction for FlushMetadataRegionTask { + fn name(&self) -> &str { + "FlushMetadataRegionTask" + } + + async fn call(&mut self) -> Result<()> { + let region_ids = { + let state = self.state.read().unwrap(); + state + .physical_region_states() + .keys() + .cloned() + .collect::>() + }; + + let num_region = region_ids.len(); + let now = Instant::now(); + for region_id in region_ids { + let Some(role) = self.mito.role(region_id) else { + continue; + }; + if role == RegionRole::Follower { + continue; + } + let metadata_region_id = utils::to_metadata_region_id(region_id); + if let Err(e) = self + .mito + .handle_request( + metadata_region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) + .await + { + error!(e; "Failed to flush metadata region {}", metadata_region_id); + } + } + debug!( + "Flushed {} metadata regions, elapsed: {:?}", + num_region, + now.elapsed() + ); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::time::Duration; + + use store_api::region_engine::{RegionEngine, RegionManifestInfo}; + + use crate::config::{EngineConfig, DEFAULT_FLUSH_METADATA_REGION_INTERVAL}; + use crate::test_util::TestEnv; + + #[tokio::test] + async fn test_flush_metadata_region_task() { + let env = TestEnv::with_prefix_and_config( + "test_flush_metadata_region_task", + EngineConfig { + flush_metadata_region_interval: Duration::from_millis(100), + ..Default::default() + }, + ) + .await; + env.init_metric_region().await; + let engine = env.metric(); + // Wait for flush task run + tokio::time::sleep(Duration::from_millis(200)).await; + let physical_region_id = env.default_physical_region_id(); + let stat = engine.region_statistic(physical_region_id).unwrap(); + + assert_matches!( + stat.manifest, + RegionManifestInfo::Metric { + metadata_manifest_version: 1, + metadata_flushed_entry_id: 1, + .. + } + ) + } + + #[tokio::test] + async fn test_flush_metadata_region_task_with_long_interval() { + let env = TestEnv::with_prefix_and_config( + "test_flush_metadata_region_task_with_long_interval", + EngineConfig { + flush_metadata_region_interval: Duration::from_secs(60), + ..Default::default() + }, + ) + .await; + env.init_metric_region().await; + let engine = env.metric(); + // Wait for flush task run, should not flush metadata region + tokio::time::sleep(Duration::from_millis(200)).await; + let physical_region_id = env.default_physical_region_id(); + let stat = engine.region_statistic(physical_region_id).unwrap(); + + assert_matches!( + stat.manifest, + RegionManifestInfo::Metric { + metadata_manifest_version: 0, + metadata_flushed_entry_id: 0, + .. + } + ) + } + + #[tokio::test] + async fn test_flush_metadata_region_sanitize() { + let env = TestEnv::with_prefix_and_config( + "test_flush_metadata_region_sanitize", + EngineConfig { + flush_metadata_region_interval: Duration::from_secs(0), + ..Default::default() + }, + ) + .await; + let metric = env.metric(); + let config = metric.config(); + assert_eq!( + config.flush_metadata_region_interval, + DEFAULT_FLUSH_METADATA_REGION_INTERVAL + ); + } +} diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index 6bcc002908..e750b516f1 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -54,9 +54,14 @@ impl TestEnv { /// Returns a new env with specific `prefix` for test. pub async fn with_prefix(prefix: &str) -> Self { + Self::with_prefix_and_config(prefix, EngineConfig::default()).await + } + + /// Returns a new env with specific `prefix` and `config` for test. + pub async fn with_prefix_and_config(prefix: &str, config: EngineConfig) -> Self { let mut mito_env = MitoTestEnv::with_prefix(prefix); let mito = mito_env.create_engine(MitoConfig::default()).await; - let metric = MetricEngine::new(mito.clone(), EngineConfig::default()); + let metric = MetricEngine::try_new(mito.clone(), config).unwrap(); Self { mito_env, mito, @@ -84,7 +89,7 @@ impl TestEnv { .mito_env .create_follower_engine(MitoConfig::default()) .await; - let metric = MetricEngine::new(mito.clone(), EngineConfig::default()); + let metric = MetricEngine::try_new(mito.clone(), EngineConfig::default()).unwrap(); let region_id = self.default_physical_region_id(); debug!("opening default physical region: {region_id}"); diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index dc5a58d794..ca7499f326 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -141,11 +141,6 @@ impl RegionWorkerLoop { // But the flush is skipped if memtables are empty. Thus should update the `topic_latest_entry_id` // when handling flush request instead of in `schedule_flush` or `flush_finished`. self.update_topic_latest_entry_id(®ion); - info!( - "Region {} flush request, high watermark: {}", - region_id, - region.topic_latest_entry_id.load(Ordering::Relaxed) - ); let reason = if region.is_downgrading() { FlushReason::Downgrading @@ -268,15 +263,17 @@ impl RegionWorkerLoop { .store() .high_watermark(®ion.provider) .unwrap_or(0); - if high_watermark != 0 { + let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed); + + if high_watermark != 0 && high_watermark > topic_last_entry_id { region .topic_latest_entry_id .store(high_watermark, Ordering::Relaxed); + info!( + "Region {} high watermark updated to {}", + region.region_id, high_watermark + ); } - info!( - "Region {} high watermark updated to {}", - region.region_id, high_watermark - ); } } }