feat: introduce flush metadata region task for metric engine (#5951)

* feat: introduce flush metadata region task for metric engine

* docs: generate config.md

* chore: add header

* test: fix unit test

* fix: fix unit tests

* chore: apply suggestions from CR

* chore: remove docs

* fix: fix unit tests
This commit is contained in:
Weny Xu
2025-04-23 12:51:22 +08:00
committed by GitHub
parent 8c4796734a
commit 55cadcd2c0
13 changed files with 289 additions and 33 deletions

2
Cargo.lock generated
View File

@@ -7040,12 +7040,14 @@ dependencies = [
"common-macro",
"common-query",
"common-recordbatch",
"common-runtime",
"common-telemetry",
"common-test-util",
"common-time",
"datafusion",
"datatypes",
"futures-util",
"humantime-serde",
"itertools 0.14.0",
"lazy_static",
"mito2",

View File

@@ -74,6 +74,7 @@ fn test_load_datanode_example_config() {
RegionEngineConfig::File(FileEngineConfig {}),
RegionEngineConfig::Metric(MetricEngineConfig {
experimental_sparse_primary_key_encoding: false,
flush_metadata_region_interval: Duration::from_secs(30),
}),
],
logging: LoggingOptions {
@@ -216,6 +217,7 @@ fn test_load_standalone_example_config() {
RegionEngineConfig::File(FileEngineConfig {}),
RegionEngineConfig::Metric(MetricEngineConfig {
experimental_sparse_primary_key_encoding: false,
flush_metadata_region_interval: Duration::from_secs(30),
}),
],
storage: StorageConfig {

View File

@@ -57,9 +57,9 @@ use tokio::sync::Notify;
use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use crate::error::{
self, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingCacheSnafu,
MissingKvBackendSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu,
ShutdownServerSnafu, StartServerSnafu,
self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu,
MissingCacheSnafu, MissingKvBackendSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result,
ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
};
use crate::event_listener::{
new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
@@ -416,10 +416,11 @@ impl DatanodeBuilder {
)
.await?;
let metric_engine = MetricEngine::new(
let metric_engine = MetricEngine::try_new(
mito_engine.clone(),
metric_engine_config.take().unwrap_or_default(),
);
)
.context(BuildMetricEngineSnafu)?;
engines.push(Arc::new(mito_engine) as _);
engines.push(Arc::new(metric_engine) as _);
}

View File

@@ -336,6 +336,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to build metric engine"))]
BuildMetricEngine {
source: metric_engine::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serialize options to TOML"))]
TomlFormat {
#[snafu(implicit)]
@@ -452,6 +459,7 @@ impl ErrorExt for Error {
FindLogicalRegions { source, .. } => source.status_code(),
BuildMitoEngine { source, .. } => source.status_code(),
BuildMetricEngine { source, .. } => source.status_code(),
ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {
StatusCode::RegionBusy
}

View File

@@ -18,11 +18,13 @@ common-error.workspace = true
common-macro.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion.workspace = true
datatypes.workspace = true
futures-util.workspace = true
humantime-serde.workspace = true
itertools.workspace = true
lazy_static = "1.4"
mito2.workspace = true

View File

@@ -12,9 +12,49 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
/// The default flush interval of the metadata region.
pub(crate) const DEFAULT_FLUSH_METADATA_REGION_INTERVAL: Duration = Duration::from_secs(30);
/// Configuration for the metric engine.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct EngineConfig {
/// Experimental feature to use sparse primary key encoding.
pub experimental_sparse_primary_key_encoding: bool,
/// The flush interval of the metadata region.
#[serde(
with = "humantime_serde",
default = "EngineConfig::default_flush_metadata_region_interval"
)]
pub flush_metadata_region_interval: Duration,
}
impl Default for EngineConfig {
fn default() -> Self {
Self {
flush_metadata_region_interval: DEFAULT_FLUSH_METADATA_REGION_INTERVAL,
experimental_sparse_primary_key_encoding: false,
}
}
}
impl EngineConfig {
fn default_flush_metadata_region_interval() -> Duration {
DEFAULT_FLUSH_METADATA_REGION_INTERVAL
}
/// Sanitizes the configuration.
pub fn sanitize(&mut self) {
if self.flush_metadata_region_interval.is_zero() {
warn!(
"Flush metadata region interval is zero, override with default value: {:?}. Disable metadata region flush is forbidden.",
DEFAULT_FLUSH_METADATA_REGION_INTERVAL
);
self.flush_metadata_region_interval = DEFAULT_FLUSH_METADATA_REGION_INTERVAL;
}
}
}

View File

@@ -34,9 +34,11 @@ use api::region::RegionResponse;
use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_runtime::RepeatedTask;
use mito2::engine::MitoEngine;
pub(crate) use options::IndexOptions;
use snafu::ResultExt;
pub(crate) use state::MetricEngineState;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{
@@ -47,11 +49,11 @@ use store_api::region_engine::{
use store_api::region_request::{BatchRegionDdlRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use self::state::MetricEngineState;
use crate::config::EngineConfig;
use crate::data_region::DataRegion;
use crate::error::{self, Result, UnsupportedRegionRequestSnafu};
use crate::error::{self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu};
use crate::metadata_region::MetadataRegion;
use crate::repeated_task::FlushMetadataRegionTask;
use crate::row_modifier::RowModifier;
use crate::utils::{self, get_region_statistic};
@@ -359,19 +361,32 @@ impl RegionEngine for MetricEngine {
}
impl MetricEngine {
pub fn new(mito: MitoEngine, config: EngineConfig) -> Self {
pub fn try_new(mito: MitoEngine, mut config: EngineConfig) -> Result<Self> {
let metadata_region = MetadataRegion::new(mito.clone());
let data_region = DataRegion::new(mito.clone());
Self {
inner: Arc::new(MetricEngineInner {
mito,
metadata_region,
data_region,
state: RwLock::default(),
config,
row_modifier: RowModifier::new(),
}),
}
let state = Arc::new(RwLock::default());
config.sanitize();
let flush_interval = config.flush_metadata_region_interval;
let inner = Arc::new(MetricEngineInner {
mito: mito.clone(),
metadata_region,
data_region,
state: state.clone(),
config,
row_modifier: RowModifier::new(),
flush_task: RepeatedTask::new(
flush_interval,
Box::new(FlushMetadataRegionTask {
state: state.clone(),
mito: mito.clone(),
}),
),
});
inner
.flush_task
.start(common_runtime::global_runtime())
.context(StartRepeatedTaskSnafu { name: "flush_task" })?;
Ok(Self { inner })
}
pub fn mito(&self) -> MitoEngine {
@@ -426,15 +441,21 @@ impl MetricEngine {
) -> Result<common_recordbatch::SendableRecordBatchStream, BoxedError> {
self.inner.scan_to_stream(region_id, request).await
}
/// Returns the configuration of the engine.
pub fn config(&self) -> &EngineConfig {
&self.inner.config
}
}
struct MetricEngineInner {
mito: MitoEngine,
metadata_region: MetadataRegion,
data_region: DataRegion,
state: RwLock<MetricEngineState>,
state: Arc<RwLock<MetricEngineState>>,
config: EngineConfig,
row_modifier: RowModifier,
flush_task: RepeatedTask<Error>,
}
#[cfg(test)]

View File

@@ -737,7 +737,7 @@ mod test {
// set up
let env = TestEnv::new().await;
let engine = MetricEngine::new(env.mito(), EngineConfig::default());
let engine = MetricEngine::try_new(env.mito(), EngineConfig::default()).unwrap();
let engine_inner = engine.inner;
// check create data region request

View File

@@ -282,6 +282,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to start repeated task: {}", name))]
StartRepeatedTask {
name: String,
source: common_runtime::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -335,6 +343,8 @@ impl ErrorExt for Error {
CollectRecordBatchStream { source, .. } => source.status_code(),
StartRepeatedTask { source, .. } => source.status_code(),
MetricManifestInfo { .. } => StatusCode::Internal,
}
}

View File

@@ -59,6 +59,7 @@ pub mod engine;
pub mod error;
mod metadata_region;
mod metrics;
mod repeated_task;
pub mod row_modifier;
#[cfg(test)]
mod test_util;

View File

@@ -0,0 +1,167 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::{Arc, RwLock};
use std::time::Instant;
use common_runtime::TaskFunction;
use common_telemetry::{debug, error};
use mito2::engine::MitoEngine;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use crate::engine::MetricEngineState;
use crate::error::{Error, Result};
use crate::utils;
/// Task to flush metadata regions.
///
/// This task is used to send flush requests to the metadata regions
/// periodically.
pub(crate) struct FlushMetadataRegionTask {
pub(crate) state: Arc<RwLock<MetricEngineState>>,
pub(crate) mito: MitoEngine,
}
#[async_trait::async_trait]
impl TaskFunction<Error> for FlushMetadataRegionTask {
fn name(&self) -> &str {
"FlushMetadataRegionTask"
}
async fn call(&mut self) -> Result<()> {
let region_ids = {
let state = self.state.read().unwrap();
state
.physical_region_states()
.keys()
.cloned()
.collect::<Vec<_>>()
};
let num_region = region_ids.len();
let now = Instant::now();
for region_id in region_ids {
let Some(role) = self.mito.role(region_id) else {
continue;
};
if role == RegionRole::Follower {
continue;
}
let metadata_region_id = utils::to_metadata_region_id(region_id);
if let Err(e) = self
.mito
.handle_request(
metadata_region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
{
error!(e; "Failed to flush metadata region {}", metadata_region_id);
}
}
debug!(
"Flushed {} metadata regions, elapsed: {:?}",
num_region,
now.elapsed()
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::time::Duration;
use store_api::region_engine::{RegionEngine, RegionManifestInfo};
use crate::config::{EngineConfig, DEFAULT_FLUSH_METADATA_REGION_INTERVAL};
use crate::test_util::TestEnv;
#[tokio::test]
async fn test_flush_metadata_region_task() {
let env = TestEnv::with_prefix_and_config(
"test_flush_metadata_region_task",
EngineConfig {
flush_metadata_region_interval: Duration::from_millis(100),
..Default::default()
},
)
.await;
env.init_metric_region().await;
let engine = env.metric();
// Wait for flush task run
tokio::time::sleep(Duration::from_millis(200)).await;
let physical_region_id = env.default_physical_region_id();
let stat = engine.region_statistic(physical_region_id).unwrap();
assert_matches!(
stat.manifest,
RegionManifestInfo::Metric {
metadata_manifest_version: 1,
metadata_flushed_entry_id: 1,
..
}
)
}
#[tokio::test]
async fn test_flush_metadata_region_task_with_long_interval() {
let env = TestEnv::with_prefix_and_config(
"test_flush_metadata_region_task_with_long_interval",
EngineConfig {
flush_metadata_region_interval: Duration::from_secs(60),
..Default::default()
},
)
.await;
env.init_metric_region().await;
let engine = env.metric();
// Wait for flush task run, should not flush metadata region
tokio::time::sleep(Duration::from_millis(200)).await;
let physical_region_id = env.default_physical_region_id();
let stat = engine.region_statistic(physical_region_id).unwrap();
assert_matches!(
stat.manifest,
RegionManifestInfo::Metric {
metadata_manifest_version: 0,
metadata_flushed_entry_id: 0,
..
}
)
}
#[tokio::test]
async fn test_flush_metadata_region_sanitize() {
let env = TestEnv::with_prefix_and_config(
"test_flush_metadata_region_sanitize",
EngineConfig {
flush_metadata_region_interval: Duration::from_secs(0),
..Default::default()
},
)
.await;
let metric = env.metric();
let config = metric.config();
assert_eq!(
config.flush_metadata_region_interval,
DEFAULT_FLUSH_METADATA_REGION_INTERVAL
);
}
}

View File

@@ -54,9 +54,14 @@ impl TestEnv {
/// Returns a new env with specific `prefix` for test.
pub async fn with_prefix(prefix: &str) -> Self {
Self::with_prefix_and_config(prefix, EngineConfig::default()).await
}
/// Returns a new env with specific `prefix` and `config` for test.
pub async fn with_prefix_and_config(prefix: &str, config: EngineConfig) -> Self {
let mut mito_env = MitoTestEnv::with_prefix(prefix);
let mito = mito_env.create_engine(MitoConfig::default()).await;
let metric = MetricEngine::new(mito.clone(), EngineConfig::default());
let metric = MetricEngine::try_new(mito.clone(), config).unwrap();
Self {
mito_env,
mito,
@@ -84,7 +89,7 @@ impl TestEnv {
.mito_env
.create_follower_engine(MitoConfig::default())
.await;
let metric = MetricEngine::new(mito.clone(), EngineConfig::default());
let metric = MetricEngine::try_new(mito.clone(), EngineConfig::default()).unwrap();
let region_id = self.default_physical_region_id();
debug!("opening default physical region: {region_id}");

View File

@@ -141,11 +141,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// But the flush is skipped if memtables are empty. Thus should update the `topic_latest_entry_id`
// when handling flush request instead of in `schedule_flush` or `flush_finished`.
self.update_topic_latest_entry_id(&region);
info!(
"Region {} flush request, high watermark: {}",
region_id,
region.topic_latest_entry_id.load(Ordering::Relaxed)
);
let reason = if region.is_downgrading() {
FlushReason::Downgrading
@@ -268,15 +263,17 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.store()
.high_watermark(&region.provider)
.unwrap_or(0);
if high_watermark != 0 {
let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed);
if high_watermark != 0 && high_watermark > topic_last_entry_id {
region
.topic_latest_entry_id
.store(high_watermark, Ordering::Relaxed);
info!(
"Region {} high watermark updated to {}",
region.region_id, high_watermark
);
}
info!(
"Region {} high watermark updated to {}",
region.region_id, high_watermark
);
}
}
}