refactor: move election trait and implementations to the common-meta crate (#7820)

* refactor: move election impl to common-meta

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: adding back comment

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2026-03-24 18:21:31 +08:00
committed by GitHub
parent 0e22d6a72b
commit c8c2e09eed
14 changed files with 218 additions and 145 deletions

View File

@@ -0,0 +1,242 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod etcd;
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
pub mod rds;
use std::fmt::{self, Debug};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use common_telemetry::{error, info, warn};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::{self, Receiver, Sender};
use crate::error::Result;
pub const CANDIDATE_LEASE_SECS: u64 = 600;
const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;
/// The value of the leader. It is used to store the leader's address.
pub struct LeaderValue(pub String);
impl<T: AsRef<[u8]>> From<T> for LeaderValue {
fn from(value: T) -> Self {
let string = String::from_utf8_lossy(value.as_ref());
Self(string.to_string())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetasrvNodeInfo {
// The metasrv's address
pub addr: String,
// The node build version
pub version: String,
// The node build git commit hash
pub git_commit: String,
// The node start timestamp in milliseconds
pub start_time_ms: u64,
// The node total cpu millicores
#[serde(default)]
pub total_cpu_millicores: i64,
// The node total memory bytes
#[serde(default)]
pub total_memory_bytes: i64,
/// The node build cpu usage millicores
#[serde(default)]
pub cpu_usage_millicores: i64,
/// The node build memory usage bytes
#[serde(default)]
pub memory_usage_bytes: i64,
// The node hostname
#[serde(default)]
pub hostname: String,
}
// TODO(zyy17): Allow deprecated fields for backward compatibility. Remove this when the deprecated top-level fields are removed from the proto.
#[allow(deprecated)]
impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
fn from(node_info: MetasrvNodeInfo) -> Self {
Self {
peer: Some(api::v1::meta::Peer {
addr: node_info.addr,
..Default::default()
}),
// TODO(zyy17): The following top-level fields are deprecated. They are kept for backward compatibility and will be removed in a future version.
// New code should use the fields in `info.NodeInfo` instead.
version: node_info.version.clone(),
git_commit: node_info.git_commit.clone(),
start_time_ms: node_info.start_time_ms,
cpus: node_info.total_cpu_millicores as u32,
memory_bytes: node_info.total_memory_bytes as u64,
// The canonical location for node information.
info: Some(api::v1::meta::NodeInfo {
version: node_info.version,
git_commit: node_info.git_commit,
start_time_ms: node_info.start_time_ms,
total_cpu_millicores: node_info.total_cpu_millicores,
total_memory_bytes: node_info.total_memory_bytes,
cpu_usage_millicores: node_info.cpu_usage_millicores,
memory_usage_bytes: node_info.memory_usage_bytes,
cpus: node_info.total_cpu_millicores as u32,
memory_bytes: node_info.total_memory_bytes as u64,
hostname: node_info.hostname,
}),
}
}
}
/// Messages sent when the leader changes.
#[derive(Debug, Clone)]
pub enum LeaderChangeMessage {
Elected(Arc<dyn LeaderKey>),
StepDown(Arc<dyn LeaderKey>),
}
/// LeaderKey is a key that represents the leader of metasrv.
/// The structure is corresponding to [etcd_client::LeaderKey].
pub trait LeaderKey: Send + Sync + Debug {
/// The name in byte. name is the election identifier that corresponds to the leadership key.
fn name(&self) -> &[u8];
/// The key in byte. key is an opaque key representing the ownership of the election. If the key
/// is deleted, then leadership is lost.
fn key(&self) -> &[u8];
/// The creation revision of the key.
fn revision(&self) -> i64;
/// The lease ID of the election leader.
fn lease_id(&self) -> i64;
}
impl fmt::Display for LeaderChangeMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let leader_key = match self {
LeaderChangeMessage::Elected(leader_key) => {
write!(f, "Elected(")?;
leader_key
}
LeaderChangeMessage::StepDown(leader_key) => {
write!(f, "StepDown(")?;
leader_key
}
};
write!(f, "LeaderKey {{ ")?;
write!(f, "name: {}", String::from_utf8_lossy(leader_key.name()))?;
write!(f, ", key: {}", String::from_utf8_lossy(leader_key.key()))?;
write!(f, ", rev: {}", leader_key.revision())?;
write!(f, ", lease: {}", leader_key.lease_id())?;
write!(f, " }})")
}
}
fn listen_leader_change(leader_value: String) -> Sender<LeaderChangeMessage> {
let (tx, mut rx) = broadcast::channel(100);
let _handle = common_runtime::spawn_global(async move {
loop {
match rx.recv().await {
Ok(msg) => match msg {
LeaderChangeMessage::Elected(key) => {
info!(
"[{leader_value}] is elected as leader: {:?}, lease: {}",
String::from_utf8_lossy(key.name()),
key.lease_id()
);
}
LeaderChangeMessage::StepDown(key) => {
warn!(
"[{leader_value}] is stepping down: {:?}, lease: {}",
String::from_utf8_lossy(key.name()),
key.lease_id()
);
}
},
Err(RecvError::Lagged(_)) => {
warn!("Log printing is too slow or leader changed too fast!");
}
Err(RecvError::Closed) => break,
}
}
});
tx
}
/// Sends a leader change message to the channel and sets the `is_leader` flag.
/// If a leader is elected, it will also set the `leader_infancy` flag to true.
fn send_leader_change_and_set_flags(
is_leader: &AtomicBool,
leader_infancy: &AtomicBool,
tx: &Sender<LeaderChangeMessage>,
msg: LeaderChangeMessage,
) {
let is_elected = matches!(msg, LeaderChangeMessage::Elected(_));
if is_leader
.compare_exchange(!is_elected, is_elected, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
if is_elected {
leader_infancy.store(true, Ordering::Release);
}
if let Err(e) = tx.send(msg) {
error!(e; "Failed to send leader change message");
}
}
}
#[async_trait::async_trait]
pub trait Election: Send + Sync {
type Leader;
/// Returns `true` if current node is the leader.
fn is_leader(&self) -> bool;
/// When a new leader is born, it may need some initialization
/// operations (asynchronous), this method tells us when these
/// initialization operations can be performed.
///
/// note: a new leader will only return true on the first call.
fn in_leader_infancy(&self) -> bool;
/// Registers a candidate for the election.
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()>;
/// Gets all candidates in the election.
async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>>;
/// Campaign waits to acquire leadership in an election.
///
/// Multiple sessions can participate in the election,
/// but only one can be the leader at a time.
async fn campaign(&self) -> Result<()>;
/// Resets the campaign.
///
/// Reset the client and the leader flag if needed.
async fn reset_campaign(&self) {}
/// Returns the leader value for the current election.
async fn leader(&self) -> Result<Self::Leader>;
/// Releases election leadership so other campaigners may
/// acquire leadership on the election.
async fn resign(&self) -> Result<()>;
fn subscribe_leader_change(&self) -> Receiver<LeaderChangeMessage>;
}
pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;

View File

@@ -0,0 +1,297 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use common_telemetry::{error, info, warn};
use etcd_client::{
Client, GetOptions, LeaderKey as EtcdLeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions,
};
use snafu::{OptionExt, ResultExt, ensure};
use tokio::sync::broadcast;
use tokio::sync::broadcast::Receiver;
use tokio::time::{MissedTickBehavior, timeout};
use crate::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use crate::election::{
CANDIDATE_LEASE_SECS, Election, ElectionRef, KEEP_ALIVE_INTERVAL_SECS, LeaderChangeMessage,
LeaderKey, LeaderValue, MetasrvNodeInfo, listen_leader_change,
send_leader_change_and_set_flags,
};
use crate::error;
use crate::error::Result;
use crate::key::{CANDIDATES_ROOT, ELECTION_KEY};
impl LeaderKey for EtcdLeaderKey {
fn name(&self) -> &[u8] {
self.name()
}
fn key(&self) -> &[u8] {
self.key()
}
fn revision(&self) -> i64 {
self.rev()
}
fn lease_id(&self) -> i64 {
self.lease()
}
}
pub struct EtcdElection {
leader_value: String,
client: Client,
is_leader: AtomicBool,
infancy: AtomicBool,
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
store_key_prefix: String,
}
impl EtcdElection {
pub async fn with_etcd_client<E>(
leader_value: E,
client: Client,
store_key_prefix: String,
) -> Result<ElectionRef>
where
E: AsRef<str>,
{
let leader_value: String = leader_value.as_ref().into();
let tx = listen_leader_change(leader_value.clone());
Ok(Arc::new(Self {
leader_value,
client,
is_leader: AtomicBool::new(false),
infancy: AtomicBool::new(false),
leader_watcher: tx,
store_key_prefix,
}))
}
fn election_key(&self) -> String {
format!("{}{}", self.store_key_prefix, ELECTION_KEY)
}
fn candidate_root(&self) -> String {
format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
}
fn candidate_key(&self) -> String {
format!("{}{}", self.candidate_root(), self.leader_value)
}
}
#[async_trait::async_trait]
impl Election for EtcdElection {
type Leader = LeaderValue;
fn is_leader(&self) -> bool {
self.is_leader.load(Ordering::Relaxed)
}
fn in_leader_infancy(&self) -> bool {
self.infancy
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
let mut lease_client = self.client.lease_client();
let res: etcd_client::LeaseGrantResponse = lease_client
.grant(CANDIDATE_LEASE_SECS as i64, None)
.await
.context(error::EtcdFailedSnafu)?;
let lease_id = res.id();
// The register info: key is the candidate key, value is its node info(addr, version, git_commit).
let key = self.candidate_key().into_bytes();
let value = serde_json::to_string(node_info)
.with_context(|_| error::SerializeToJsonSnafu {
input: format!("{node_info:?}"),
})?
.into_bytes();
// Puts with the lease id
self.client
.kv_client()
.put(key, value, Some(PutOptions::new().with_lease(lease_id)))
.await
.context(error::EtcdFailedSnafu)?;
let (mut keeper, mut receiver) = lease_client
.keep_alive(lease_id)
.await
.context(error::EtcdFailedSnafu)?;
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS));
loop {
let _ = keep_alive_interval.tick().await;
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)?
&& res.ttl() <= 0
{
warn!("Candidate lease expired, key: {}", self.candidate_key());
break;
}
}
Ok(())
}
async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
let key = self.candidate_root().into_bytes();
let res = self
.client
.kv_client()
.get(key, Some(GetOptions::new().with_prefix()))
.await
.context(error::EtcdFailedSnafu)?;
let mut nodes = Vec::with_capacity(res.kvs().len());
for kv in res.kvs() {
let node =
serde_json::from_slice::<MetasrvNodeInfo>(kv.value()).with_context(|_| {
error::DeserializeFromJsonSnafu {
input: String::from_utf8_lossy(kv.value()),
}
})?;
nodes.push(node);
}
Ok(nodes)
}
async fn campaign(&self) -> Result<()> {
let mut lease_client = self.client.lease_client();
let mut election_client = self.client.election_client();
let res = lease_client
.grant(META_LEASE_SECS as i64, None)
.await
.context(error::EtcdFailedSnafu)?;
let lease_id = res.id();
info!("Election grant ttl: {:?}, lease: {:?}", res.ttl(), lease_id);
// Campaign, waits to acquire leadership in an election, returning
// a LeaderKey representing the leadership if successful.
//
// The method will be blocked until the election is won, and after
// passing the method, it is necessary to execute `keep_alive` immediately
// to confirm that it is a valid leader, because it is possible that the
// election's lease expires.
let res = election_client
.campaign(self.election_key(), self.leader_value.clone(), lease_id)
.await
.context(error::EtcdFailedSnafu)?;
if let Some(leader) = res.leader() {
let (mut keeper, mut receiver) = lease_client
.keep_alive(lease_id)
.await
.context(error::EtcdFailedSnafu)?;
let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS);
let mut keep_alive_interval = tokio::time::interval(keep_lease_duration);
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
// The keep alive operation MUST be done in `META_KEEP_ALIVE_INTERVAL_SECS`.
match timeout(
keep_lease_duration,
self.keep_alive(&mut keeper, &mut receiver, leader.clone()),
)
.await
{
Ok(Ok(())) => {
let _ = keep_alive_interval.tick().await;
}
Ok(Err(err)) => {
error!(err; "Failed to keep alive");
break;
}
Err(_) => {
error!("Refresh lease timeout");
break;
}
}
}
send_leader_change_and_set_flags(
&self.is_leader,
&self.infancy,
&self.leader_watcher,
LeaderChangeMessage::StepDown(Arc::new(leader.clone())),
);
}
Ok(())
}
async fn leader(&self) -> Result<LeaderValue> {
if self.is_leader.load(Ordering::Relaxed) {
Ok(self.leader_value.as_bytes().into())
} else {
let res = self
.client
.election_client()
.leader(self.election_key())
.await
.context(error::EtcdFailedSnafu)?;
let leader_value = res.kv().context(error::ElectionNoLeaderSnafu)?.value();
Ok(leader_value.into())
}
}
async fn resign(&self) -> Result<()> {
todo!()
}
fn subscribe_leader_change(&self) -> Receiver<LeaderChangeMessage> {
self.leader_watcher.subscribe()
}
}
impl EtcdElection {
async fn keep_alive(
&self,
keeper: &mut LeaseKeeper,
receiver: &mut LeaseKeepAliveStream,
leader: EtcdLeaderKey,
) -> Result<()> {
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
ensure!(
res.ttl() > 0,
error::UnexpectedSnafu {
err_msg: "Failed to refresh the lease".to_string(),
}
);
// Only after a successful `keep_alive` is the leader considered official.
send_leader_change_and_set_flags(
&self.is_leader,
&self.infancy,
&self.leader_watcher,
LeaderChangeMessage::Elected(Arc::new(leader.clone())),
);
}
Ok(())
}
}

View File

@@ -0,0 +1,90 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(feature = "mysql_kvbackend")]
pub mod mysql;
#[cfg(feature = "pg_kvbackend")]
pub mod postgres;
use common_time::Timestamp;
use itertools::Itertools;
use snafu::OptionExt;
use crate::election::LeaderKey;
use crate::error::{Result, UnexpectedSnafu};
// Separator between value and expire time in the lease string.
// A lease is put into rds election in the format:
// <node_info> || __metadata_lease_sep || <expire_time>
const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#;
/// Parses the value and expire time from the given string retrieved from rds.
fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> {
let (value, expire_time) =
value
.split(LEASE_SEP)
.collect_tuple()
.with_context(|| UnexpectedSnafu {
err_msg: format!(
"Invalid value {}, expect node info || {} || expire time",
value, LEASE_SEP
),
})?;
// Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS'
let expire_time = match Timestamp::from_str(expire_time, None) {
Ok(ts) => ts,
Err(_) => UnexpectedSnafu {
err_msg: format!("Invalid timestamp: {}", expire_time),
}
.fail()?,
};
Ok((value.to_string(), expire_time))
}
/// LeaderKey used for [LeaderChangeMessage] in rds election components.
#[derive(Debug, Clone, Default)]
struct RdsLeaderKey {
name: Vec<u8>,
key: Vec<u8>,
rev: i64,
lease: i64,
}
impl LeaderKey for RdsLeaderKey {
fn name(&self) -> &[u8] {
&self.name
}
fn key(&self) -> &[u8] {
&self.key
}
fn revision(&self) -> i64 {
self.rev
}
fn lease_id(&self) -> i64 {
self.lease
}
}
/// Lease information for rds election.
#[derive(Default, Clone, Debug)]
struct Lease {
leader_value: String,
expire_time: Timestamp,
current: Timestamp,
// `origin` is the original value of the lease, used for CAS.
origin: String,
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -338,6 +338,24 @@ pub enum Error {
location: Location,
},
#[snafu(display("Metasrv election has no leader at this moment"))]
ElectionNoLeader {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Metasrv election leader lease expired"))]
ElectionLeaderLeaseExpired {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Metasrv election leader lease changed during election"))]
ElectionLeaderLeaseChanged {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Table already exists, table: {}", table_name))]
TableAlreadyExists {
table_name: String,
@@ -751,6 +769,15 @@ pub enum Error {
location: Location,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to get Postgres client"))]
GetPostgresClient {
#[snafu(source)]
error: deadpool::managed::PoolError<tokio_postgres::Error>,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to {} Postgres transaction", operation))]
PostgresTransaction {
@@ -805,6 +832,24 @@ pub enum Error {
location: Location,
},
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to decode sql value"))]
DecodeSqlValue {
#[snafu(source)]
error: sqlx::error::Error,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to acquire mysql client from pool"))]
AcquireMySqlClient {
#[snafu(source)]
error: sqlx::Error,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to {} MySql transaction", operation))]
MySqlTransaction {
@@ -822,6 +867,15 @@ pub enum Error {
location: Location,
},
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
#[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
SqlExecutionTimeout {
sql: String,
duration: std::time::Duration,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Datanode table info not found, table id: {}, datanode id: {}",
table_id,
@@ -1075,7 +1129,10 @@ impl ErrorExt for Error {
| GetCache { .. }
| GetLatestCacheRetryExceeded { .. }
| SerializeToJson { .. }
| DeserializeFromJson { .. } => StatusCode::Internal,
| DeserializeFromJson { .. }
| ElectionNoLeader { .. }
| ElectionLeaderLeaseExpired { .. }
| ElectionLeaderLeaseChanged { .. } => StatusCode::Internal,
NoLeader { .. } => StatusCode::TableUnavailable,
ValueNotExist { .. }
@@ -1198,15 +1255,18 @@ impl ErrorExt for Error {
PostgresExecution { .. }
| CreatePostgresPool { .. }
| GetPostgresConnection { .. }
| GetPostgresClient { .. }
| PostgresTransaction { .. }
| PostgresTlsConfig { .. }
| InvalidTlsConfig { .. } => StatusCode::Internal,
#[cfg(feature = "mysql_kvbackend")]
MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => {
StatusCode::Internal
}
MySqlExecution { .. }
| CreateMySqlPool { .. }
| DecodeSqlValue { .. }
| AcquireMySqlClient { .. }
| MySqlTransaction { .. } => StatusCode::Internal,
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
RdsTransactionRetryFailed { .. } => StatusCode::Internal,
RdsTransactionRetryFailed { .. } | SqlExecutionTimeout { .. } => StatusCode::Internal,
DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
}
}

View File

@@ -22,6 +22,7 @@ pub mod datanode;
pub mod ddl;
pub mod ddl_manager;
pub mod distributed_time_constants;
pub mod election;
pub mod error;
pub mod flow_name;
pub mod heartbeat;