diff --git a/Cargo.lock b/Cargo.lock index 7592b6afdc..24e79419fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1863,6 +1863,7 @@ dependencies = [ "common-test-util", "futures", "futures-util", + "humantime-serde", "object-store", "serde", "serde_json", diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index bd572fcb7f..6f76fd9387 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -18,3 +18,10 @@ use_memory_store = false # [logging] # dir = "/tmp/greptimedb/logs" # level = "info" + +# Procedure storage options. +[procedure] +# Procedure max retry time. +max_retry_times = 3 +# Initial retry delay of procedures, increases exponentially +retry_delay = "500ms" diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml index 109bf4df77..11882f570b 100644 --- a/src/common/procedure/Cargo.toml +++ b/src/common/procedure/Cargo.toml @@ -7,15 +7,16 @@ license.workspace = true [dependencies] async-trait.workspace = true async-stream.workspace = true +backon = "0.4" common-error = { path = "../error" } common-runtime = { path = "../runtime" } common-telemetry = { path = "../telemetry" } futures.workspace = true +humantime-serde = "1.1" object-store = { path = "../../object-store" } serde.workspace = true serde_json = "1.0" smallvec = "1" -backon = "0.4.0" snafu.workspace = true tokio.workspace = true uuid.workspace = true diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index a124f94f6e..7a542c4157 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -16,6 +16,7 @@ pub mod error; pub mod local; +pub mod options; mod procedure; pub mod store; pub mod watcher; diff --git a/src/common/procedure/src/options.rs b/src/common/procedure/src/options.rs new file mode 100644 index 0000000000..9cebfa805c --- /dev/null +++ b/src/common/procedure/src/options.rs @@ -0,0 +1,38 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Common traits and structures for the procedure framework. + +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(default)] +pub struct ProcedureConfig { + /// Max retry times of procedure. + pub max_retry_times: usize, + /// Initial retry delay of procedures, increases exponentially. + #[serde(with = "humantime_serde")] + pub retry_delay: Duration, +} + +impl Default for ProcedureConfig { + fn default() -> ProcedureConfig { + ProcedureConfig { + max_retry_times: 3, + retry_delay: Duration::from_millis(500), + } + } +} diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 3d57bf2fc2..6369c353ea 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -20,6 +20,7 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_base::Plugins; use common_error::prelude::BoxedError; +pub use common_procedure::options::ProcedureConfig; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; use meta_client::MetaClientOptions; @@ -344,25 +345,6 @@ impl From<&DatanodeOptions> for StorageEngineConfig { } } -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(default)] -pub struct ProcedureConfig { - /// Max retry times of procedure. - pub max_retry_times: usize, - /// Initial retry delay of procedures, increases exponentially. - #[serde(with = "humantime_serde")] - pub retry_delay: Duration, -} - -impl Default for ProcedureConfig { - fn default() -> ProcedureConfig { - ProcedureConfig { - max_retry_times: 3, - retry_delay: Duration::from_millis(500), - } - } -} - #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct DatanodeOptions { diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 3075a78970..76ce1e36f6 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use api::v1::meta::Peer; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_telemetry::logging::LoggingOptions; use common_telemetry::{error, info, warn}; @@ -38,7 +39,6 @@ use crate::selector::{Selector, SelectorType}; use crate::sequence::SequenceRef; use crate::service::mailbox::MailboxRef; use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef}; - pub const TABLE_ID_SEQ: &str = "table_id"; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -53,6 +53,7 @@ pub struct MetaSrvOptions { pub disable_region_failover: bool, pub http_opts: HttpOptions, pub logging: LoggingOptions, + pub procedure: ProcedureConfig, } impl Default for MetaSrvOptions { @@ -67,6 +68,7 @@ impl Default for MetaSrvOptions { disable_region_failover: false, http_opts: HttpOptions::default(), logging: LoggingOptions::default(), + procedure: ProcedureConfig::default(), } } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index e338e16fb5..df20c11f59 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -160,7 +160,14 @@ impl MetaSrvBuilder { let mailbox_sequence = Sequence::new("heartbeat_mailbox", 1, 100, kv_store.clone()); let mailbox = HeartbeatMailbox::create(pushers.clone(), mailbox_sequence); let state_store = Arc::new(MetaStateStore::new(kv_store.clone())); - let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store)); + + let manager_config = ManagerConfig { + max_retry_times: options.procedure.max_retry_times, + retry_delay: options.procedure.retry_delay, + ..Default::default() + }; + + let procedure_manager = Arc::new(LocalManager::new(manager_config, state_store)); let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone())); let metadata_service = metadata_service .unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone()))); diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 53e31c2a26..649dff7294 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -26,7 +26,7 @@ use common_meta::peer::Peer; use common_meta::DatanodeId; use common_runtime::Builder as RuntimeBuilder; use common_test_util::temp_dir::create_temp_dir; -use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; +use datanode::datanode::{DatanodeOptions, ObjectStoreConfig, ProcedureConfig}; use datanode::heartbeat::HeartbeatTask; use datanode::instance::Instance as DatanodeInstance; use frontend::frontend::FrontendOptions; @@ -118,13 +118,17 @@ impl GreptimeDbClusterBuilder { } async fn build_metasrv(&self, datanode_clients: Arc) -> MockInfo { - meta_srv::mocks::mock( - MetaSrvOptions::default(), - self.kv_store.clone(), - None, - Some(datanode_clients), - ) - .await + let opt = MetaSrvOptions { + procedure: ProcedureConfig { + // Due to large network delay during cross data-center. + // We only make max_retry_times and retry_delay large than the default in tests. + max_retry_times: 5, + retry_delay: Duration::from_secs(1), + }, + ..Default::default() + }; + + meta_srv::mocks::mock(opt, self.kv_store.clone(), None, Some(datanode_clients)).await } async fn build_datanodes(