feat: meta procedure options (#1937)

* feat: meta procedure options

* chore: tune meta procedure options in tests

* Update src/common/procedure/Cargo.toml

Co-authored-by: dennis zhuang <killme2008@gmail.com>

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
This commit is contained in:
Weny Xu
2023-07-12 11:22:08 +09:00
committed by GitHub
parent fa12392d2c
commit 264c5ea720
9 changed files with 73 additions and 30 deletions

1
Cargo.lock generated
View File

@@ -1863,6 +1863,7 @@ dependencies = [
"common-test-util",
"futures",
"futures-util",
"humantime-serde",
"object-store",
"serde",
"serde_json",

View File

@@ -18,3 +18,10 @@ use_memory_store = false
# [logging]
# dir = "/tmp/greptimedb/logs"
# level = "info"
# Procedure storage options.
[procedure]
# Procedure max retry time.
max_retry_times = 3
# Initial retry delay of procedures, increases exponentially
retry_delay = "500ms"

View File

@@ -7,15 +7,16 @@ license.workspace = true
[dependencies]
async-trait.workspace = true
async-stream.workspace = true
backon = "0.4"
common-error = { path = "../error" }
common-runtime = { path = "../runtime" }
common-telemetry = { path = "../telemetry" }
futures.workspace = true
humantime-serde = "1.1"
object-store = { path = "../../object-store" }
serde.workspace = true
serde_json = "1.0"
smallvec = "1"
backon = "0.4.0"
snafu.workspace = true
tokio.workspace = true
uuid.workspace = true

View File

@@ -16,6 +16,7 @@
pub mod error;
pub mod local;
pub mod options;
mod procedure;
pub mod store;
pub mod watcher;

View File

@@ -0,0 +1,38 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Common traits and structures for the procedure framework.
use std::time::Duration;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct ProcedureConfig {
/// Max retry times of procedure.
pub max_retry_times: usize,
/// Initial retry delay of procedures, increases exponentially.
#[serde(with = "humantime_serde")]
pub retry_delay: Duration,
}
impl Default for ProcedureConfig {
fn default() -> ProcedureConfig {
ProcedureConfig {
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
}
}
}

View File

@@ -20,6 +20,7 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_error::prelude::BoxedError;
pub use common_procedure::options::ProcedureConfig;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
use meta_client::MetaClientOptions;
@@ -344,25 +345,6 @@ impl From<&DatanodeOptions> for StorageEngineConfig {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct ProcedureConfig {
/// Max retry times of procedure.
pub max_retry_times: usize,
/// Initial retry delay of procedures, increases exponentially.
#[serde(with = "humantime_serde")]
pub retry_delay: Duration,
}
impl Default for ProcedureConfig {
fn default() -> ProcedureConfig {
ProcedureConfig {
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct DatanodeOptions {

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use api::v1::meta::Peer;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::LoggingOptions;
use common_telemetry::{error, info, warn};
@@ -38,7 +39,6 @@ use crate::selector::{Selector, SelectorType};
use crate::sequence::SequenceRef;
use crate::service::mailbox::MailboxRef;
use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef};
pub const TABLE_ID_SEQ: &str = "table_id";
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
@@ -53,6 +53,7 @@ pub struct MetaSrvOptions {
pub disable_region_failover: bool,
pub http_opts: HttpOptions,
pub logging: LoggingOptions,
pub procedure: ProcedureConfig,
}
impl Default for MetaSrvOptions {
@@ -67,6 +68,7 @@ impl Default for MetaSrvOptions {
disable_region_failover: false,
http_opts: HttpOptions::default(),
logging: LoggingOptions::default(),
procedure: ProcedureConfig::default(),
}
}
}

View File

@@ -160,7 +160,14 @@ impl MetaSrvBuilder {
let mailbox_sequence = Sequence::new("heartbeat_mailbox", 1, 100, kv_store.clone());
let mailbox = HeartbeatMailbox::create(pushers.clone(), mailbox_sequence);
let state_store = Arc::new(MetaStateStore::new(kv_store.clone()));
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));
let manager_config = ManagerConfig {
max_retry_times: options.procedure.max_retry_times,
retry_delay: options.procedure.retry_delay,
..Default::default()
};
let procedure_manager = Arc::new(LocalManager::new(manager_config, state_store));
let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone()));
let metadata_service = metadata_service
.unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone())));

View File

@@ -26,7 +26,7 @@ use common_meta::peer::Peer;
use common_meta::DatanodeId;
use common_runtime::Builder as RuntimeBuilder;
use common_test_util::temp_dir::create_temp_dir;
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig, ProcedureConfig};
use datanode::heartbeat::HeartbeatTask;
use datanode::instance::Instance as DatanodeInstance;
use frontend::frontend::FrontendOptions;
@@ -118,13 +118,17 @@ impl GreptimeDbClusterBuilder {
}
async fn build_metasrv(&self, datanode_clients: Arc<DatanodeClients>) -> MockInfo {
meta_srv::mocks::mock(
MetaSrvOptions::default(),
self.kv_store.clone(),
None,
Some(datanode_clients),
)
.await
let opt = MetaSrvOptions {
procedure: ProcedureConfig {
// Due to large network delay during cross data-center.
// We only make max_retry_times and retry_delay large than the default in tests.
max_retry_times: 5,
retry_delay: Duration::from_secs(1),
},
..Default::default()
};
meta_srv::mocks::mock(opt, self.kv_store.clone(), None, Some(datanode_clients)).await
}
async fn build_datanodes(