mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 05:42:57 +00:00
feat: all distributed time together (#2423)
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -8690,6 +8690,7 @@ dependencies = [
|
||||
"common-grpc",
|
||||
"common-grpc-expr",
|
||||
"common-mem-prof",
|
||||
"common-meta",
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-runtime",
|
||||
|
||||
@@ -6,8 +6,6 @@ bind_addr = "127.0.0.1:3002"
|
||||
server_addr = "127.0.0.1:3002"
|
||||
# Etcd server address, "127.0.0.1:2379" by default.
|
||||
store_addr = "127.0.0.1:2379"
|
||||
# Datanode lease in seconds, 15 seconds by default.
|
||||
datanode_lease_secs = 15
|
||||
# Datanode selector type.
|
||||
# - "LeaseBased" (default value).
|
||||
# - "LoadBased"
|
||||
|
||||
@@ -207,7 +207,6 @@ mod tests {
|
||||
bind_addr = "127.0.0.1:3002"
|
||||
server_addr = "127.0.0.1:3002"
|
||||
store_addr = "127.0.0.1:2379"
|
||||
datanode_lease_secs = 15
|
||||
selector = "LeaseBased"
|
||||
use_memory_store = false
|
||||
|
||||
@@ -229,7 +228,6 @@ mod tests {
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
|
||||
assert_eq!("127.0.0.1:2379".to_string(), options.store_addr);
|
||||
assert_eq!(15, options.datanode_lease_secs);
|
||||
assert_eq!(SelectorType::LeaseBased, options.selector);
|
||||
assert_eq!("debug", options.logging.level.as_ref().unwrap());
|
||||
assert_eq!("/tmp/greptimedb/test/logs".to_string(), options.logging.dir);
|
||||
|
||||
31
src/common/meta/src/distributed_time_constants.rs
Normal file
31
src/common/meta/src/distributed_time_constants.rs
Normal file
@@ -0,0 +1,31 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
/// Heartbeat interval time (is the basic unit of various time).
|
||||
pub const HEARTBEAT_INTERVAL_MILLIS: u64 = 3000;
|
||||
|
||||
/// The frontend will also send heartbeats to Metasrv, sending an empty
|
||||
/// heartbeat every HEARTBEAT_INTERVAL_MILLIS * 6 seconds.
|
||||
pub const FRONTEND_HEARTBEAT_INTERVAL_MILLIS: u64 = HEARTBEAT_INTERVAL_MILLIS * 6;
|
||||
|
||||
/// The lease seconds of a region. It's set by 3 heartbeat intervals
|
||||
/// (HEARTBEAT_INTERVAL_MILLIS × 3), plus some extra buffer (1 second).
|
||||
pub const REGION_LEASE_SECS: u64 =
|
||||
Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS * 3).as_secs() + 1;
|
||||
|
||||
/// When creating table or region failover, a target node needs to be selected.
|
||||
/// If the node's lease has expired, the `Selector` will not select it.
|
||||
pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS;
|
||||
@@ -19,6 +19,7 @@ pub mod cache_invalidator;
|
||||
pub mod datanode_manager;
|
||||
pub mod ddl;
|
||||
pub mod ddl_manager;
|
||||
pub mod distributed_time_constants;
|
||||
pub mod error;
|
||||
pub mod heartbeat;
|
||||
pub mod instruction;
|
||||
|
||||
@@ -71,13 +71,12 @@ mod test {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::key::TableMetadataManager;
|
||||
use common_meta::RegionIdent;
|
||||
use common_meta::{distributed_time_constants, RegionIdent};
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
|
||||
use super::*;
|
||||
use crate::handler::node_stat::{RegionStat, Stat};
|
||||
use crate::metasrv::builder::MetaSrvBuilder;
|
||||
use crate::metasrv::DEFAULT_REGION_LEASE_SECS;
|
||||
use crate::service::store::kv::KvBackendAdapter;
|
||||
use crate::{table_routes, test_util};
|
||||
|
||||
@@ -147,7 +146,7 @@ mod test {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
RegionLeaseHandler::new(DEFAULT_REGION_LEASE_SECS)
|
||||
RegionLeaseHandler::new(distributed_time_constants::REGION_LEASE_SECS)
|
||||
.handle(&req, ctx, acc)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -156,6 +155,9 @@ mod test {
|
||||
let lease = acc.region_lease.as_ref().unwrap();
|
||||
assert_eq!(lease.region_ids, vec![RegionId::new(table_id, 2).as_u64()]);
|
||||
assert_eq!(lease.duration_since_epoch, 1234);
|
||||
assert_eq!(lease.lease_seconds, DEFAULT_REGION_LEASE_SECS);
|
||||
assert_eq!(
|
||||
lease.lease_seconds,
|
||||
distributed_time_constants::REGION_LEASE_SECS
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,19 +46,12 @@ use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef};
|
||||
pub const TABLE_ID_SEQ: &str = "table_id";
|
||||
pub const METASRV_HOME: &str = "/tmp/metasrv";
|
||||
|
||||
pub const DEFAULT_DATANODE_LEASE_SECS: u64 = 20;
|
||||
/// The lease seconds of a region. It's set by two default heartbeat intervals (5 second × 2) plus
|
||||
/// two roundtrip time (2 second × 2 × 2), plus some extra buffer (2 second).
|
||||
pub const DEFAULT_REGION_LEASE_SECS: u64 = 20;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct MetaSrvOptions {
|
||||
pub bind_addr: String,
|
||||
pub server_addr: String,
|
||||
pub store_addr: String,
|
||||
pub datanode_lease_secs: u64,
|
||||
pub region_lease_secs: u64,
|
||||
pub selector: SelectorType,
|
||||
pub use_memory_store: bool,
|
||||
pub enable_region_failover: bool,
|
||||
@@ -76,8 +69,6 @@ impl Default for MetaSrvOptions {
|
||||
bind_addr: "127.0.0.1:3002".to_string(),
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
store_addr: "127.0.0.1:2379".to_string(),
|
||||
datanode_lease_secs: DEFAULT_DATANODE_LEASE_SECS,
|
||||
region_lease_secs: DEFAULT_REGION_LEASE_SECS,
|
||||
selector: SelectorType::default(),
|
||||
use_memory_store: false,
|
||||
enable_region_failover: true,
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::time::Duration;
|
||||
use client::client_manager::DatanodeClients;
|
||||
use common_grpc::channel_manager::ChannelConfig;
|
||||
use common_meta::ddl_manager::{DdlManager, DdlManagerRef};
|
||||
use common_meta::distributed_time_constants;
|
||||
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||
use common_meta::sequence::{Sequence, SequenceRef};
|
||||
use common_meta::state_store::KvStateStore;
|
||||
@@ -166,9 +167,9 @@ impl MetaSrvBuilder {
|
||||
let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_backend.clone()));
|
||||
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
|
||||
let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default()));
|
||||
let ctx = SelectorContext {
|
||||
datanode_lease_secs: options.datanode_lease_secs,
|
||||
let selector_ctx = SelectorContext {
|
||||
server_addr: options.server_addr.clone(),
|
||||
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
|
||||
kv_store: kv_store.clone(),
|
||||
meta_peer_client: meta_peer_client.clone(),
|
||||
table_id: None,
|
||||
@@ -179,7 +180,7 @@ impl MetaSrvBuilder {
|
||||
&procedure_manager,
|
||||
&mailbox,
|
||||
&table_metadata_manager,
|
||||
(&selector, &ctx),
|
||||
(&selector, &selector_ctx),
|
||||
&table_id_sequence,
|
||||
);
|
||||
let _ = ddl_manager.try_start();
|
||||
@@ -188,19 +189,12 @@ impl MetaSrvBuilder {
|
||||
Some(handler_group) => handler_group,
|
||||
None => {
|
||||
let region_failover_handler = if options.enable_region_failover {
|
||||
let selector_ctx = SelectorContext {
|
||||
server_addr: options.server_addr.clone(),
|
||||
datanode_lease_secs: options.datanode_lease_secs,
|
||||
kv_store: kv_store.clone(),
|
||||
meta_peer_client: meta_peer_client.clone(),
|
||||
table_id: None,
|
||||
};
|
||||
let region_failover_manager = Arc::new(RegionFailoverManager::new(
|
||||
options.region_lease_secs,
|
||||
distributed_time_constants::REGION_LEASE_SECS,
|
||||
in_memory.clone(),
|
||||
mailbox.clone(),
|
||||
procedure_manager.clone(),
|
||||
(selector.clone(), selector_ctx),
|
||||
(selector.clone(), selector_ctx.clone()),
|
||||
lock.clone(),
|
||||
table_metadata_manager.clone(),
|
||||
));
|
||||
@@ -218,7 +212,8 @@ impl MetaSrvBuilder {
|
||||
None
|
||||
};
|
||||
|
||||
let region_lease_handler = RegionLeaseHandler::new(options.region_lease_secs);
|
||||
let region_lease_handler =
|
||||
RegionLeaseHandler::new(distributed_time_constants::REGION_LEASE_SECS);
|
||||
|
||||
let group = HeartbeatHandlerGroup::new(pushers);
|
||||
group.add_handler(ResponseHeaderHandler).await;
|
||||
|
||||
@@ -28,6 +28,7 @@ common-error = { workspace = true }
|
||||
common-grpc = { workspace = true }
|
||||
common-grpc-expr = { workspace = true }
|
||||
common-mem-prof = { workspace = true, optional = true }
|
||||
common-meta = { workspace = true }
|
||||
common-query = { workspace = true }
|
||||
common-recordbatch = { workspace = true }
|
||||
common-runtime = { workspace = true }
|
||||
|
||||
@@ -12,10 +12,9 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::distributed_time_constants;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub const HEARTBEAT_INTERVAL_MILLIS: u64 = 5000;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct HeartbeatOptions {
|
||||
@@ -31,8 +30,8 @@ impl HeartbeatOptions {
|
||||
pub fn frontend_default() -> Self {
|
||||
Self {
|
||||
// Frontend can send heartbeat with a longer interval.
|
||||
interval_millis: HEARTBEAT_INTERVAL_MILLIS * 10,
|
||||
retry_interval_millis: HEARTBEAT_INTERVAL_MILLIS,
|
||||
interval_millis: distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
|
||||
retry_interval_millis: distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -40,8 +39,8 @@ impl HeartbeatOptions {
|
||||
impl Default for HeartbeatOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
interval_millis: HEARTBEAT_INTERVAL_MILLIS,
|
||||
retry_interval_millis: HEARTBEAT_INTERVAL_MILLIS,
|
||||
interval_millis: distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
|
||||
retry_interval_millis: distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user