diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index af1e0dbb97..c5d9210702 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -15,7 +15,7 @@ on: - 'docker/**' workflow_dispatch: -name: Continuous integration for developing +name: CI env: RUST_TOOLCHAIN: nightly-2022-07-14 diff --git a/src/api/src/v1/meta.rs b/src/api/src/v1/meta.rs index 5c2d0046ad..087c00be88 100644 --- a/src/api/src/v1/meta.rs +++ b/src/api/src/v1/meta.rs @@ -71,11 +71,22 @@ impl ResponseHeader { error: Some(error), } } + + #[inline] + pub fn is_not_leader(&self) -> bool { + if let Some(error) = &self.error { + if error.code == ErrorCode::NotLeader as i32 { + return true; + } + } + false + } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ErrorCode { NoActiveDatanodes = 1, + NotLeader = 2, } impl Error { @@ -86,6 +97,24 @@ impl Error { err_msg: "No active datanodes".to_string(), } } + + #[inline] + pub fn is_not_leader() -> Self { + Self { + code: ErrorCode::NotLeader as i32, + err_msg: "Current server is not leader".to_string(), + } + } +} + +impl HeartbeatResponse { + #[inline] + pub fn is_not_leader(&self) -> bool { + if let Some(header) = &self.header { + return header.is_not_leader(); + } + false + } } macro_rules! gen_set_header { diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index fb00a35e10..aef4aad713 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -18,9 +18,10 @@ use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; use tonic::Streaming; -use super::Id; +use crate::client::Id; use crate::error; use crate::error::Result; +use crate::rpc::util; pub struct HeartbeatSender { id: Id, @@ -70,7 +71,11 @@ impl HeartbeatStream { /// Fetch the next message from this stream. #[inline] pub async fn message(&mut self) -> Result> { - self.stream.message().await.context(error::TonicStatusSnafu) + let res = self.stream.message().await.context(error::TonicStatusSnafu); + if let Ok(Some(heartbeat)) = &res { + util::check_response_header(heartbeat.header.as_ref())?; + } + res } } @@ -106,7 +111,8 @@ impl Client { } pub async fn heartbeat(&mut self) -> Result<(HeartbeatSender, HeartbeatStream)> { - let inner = self.inner.read().await; + let mut inner = self.inner.write().await; + inner.ask_leader().await?; inner.heartbeat().await } diff --git a/src/meta-client/src/client/router.rs b/src/meta-client/src/client/router.rs index 18089fd9a7..52277bb4fe 100644 --- a/src/meta-client/src/client/router.rs +++ b/src/meta-client/src/client/router.rs @@ -12,8 +12,8 @@ use snafu::ResultExt; use tokio::sync::RwLock; use tonic::transport::Channel; -use super::Id; use crate::client::load_balance as lb; +use crate::client::Id; use crate::error; use crate::error::Result; diff --git a/src/meta-client/src/client/store.rs b/src/meta-client/src/client/store.rs index d87fd0c616..36b236c730 100644 --- a/src/meta-client/src/client/store.rs +++ b/src/meta-client/src/client/store.rs @@ -19,8 +19,8 @@ use snafu::ResultExt; use tokio::sync::RwLock; use tonic::transport::Channel; -use super::Id; use crate::client::load_balance as lb; +use crate::client::Id; use crate::error; use crate::error::Result; diff --git a/src/meta-client/src/lib.rs b/src/meta-client/src/lib.rs index e02c5b9216..4226cdbea6 100644 --- a/src/meta-client/src/lib.rs +++ b/src/meta-client/src/lib.rs @@ -1,4 +1,5 @@ -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use serde::Serialize; pub mod client; pub mod error; diff --git a/src/meta-client/src/rpc/router.rs b/src/meta-client/src/rpc/router.rs index 614c898e4b..936fa076fb 100644 --- a/src/meta-client/src/rpc/router.rs +++ b/src/meta-client/src/rpc/router.rs @@ -6,14 +6,16 @@ use api::v1::meta::Region as PbRegion; use api::v1::meta::RouteRequest as PbRouteRequest; use api::v1::meta::RouteResponse as PbRouteResponse; use api::v1::meta::Table as PbTable; -use serde::{Deserialize, Serialize, Serializer}; +use serde::Deserialize; +use serde::Serialize; +use serde::Serializer; use snafu::OptionExt; -use super::util; -use super::Peer; -use super::TableName; use crate::error; use crate::error::Result; +use crate::rpc::util; +use crate::rpc::Peer; +use crate::rpc::TableName; #[derive(Debug, Clone, Default)] pub struct RouteRequest { diff --git a/src/meta-client/src/rpc/store.rs b/src/meta-client/src/rpc/store.rs index 343c1e1d93..4ecb1cb25b 100644 --- a/src/meta-client/src/rpc/store.rs +++ b/src/meta-client/src/rpc/store.rs @@ -10,11 +10,11 @@ use api::v1::meta::PutResponse as PbPutResponse; use api::v1::meta::RangeRequest as PbRangeRequest; use api::v1::meta::RangeResponse as PbRangeResponse; -use super::util; -use super::KeyValue; -use super::ResponseHeader; use crate::error; use crate::error::Result; +use crate::rpc::util; +use crate::rpc::KeyValue; +use crate::rpc::ResponseHeader; #[derive(Debug, Clone, Default)] pub struct RangeRequest { diff --git a/src/meta-client/src/rpc/util.rs b/src/meta-client/src/rpc/util.rs index d563b4a860..7b84d9b81e 100644 --- a/src/meta-client/src/rpc/util.rs +++ b/src/meta-client/src/rpc/util.rs @@ -6,7 +6,7 @@ use crate::error::Result; #[inline] pub(crate) fn check_response_header(header: Option<&ResponseHeader>) -> Result<()> { if let Some(header) = header { - if let Some(ref error) = header.error { + if let Some(error) = &header.error { let code = error.code; let err_msg = &error.err_msg; return error::IllegalServerStateSnafu { code, err_msg }.fail(); diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 85b9208bcf..4ce1f847ca 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -5,6 +5,7 @@ use snafu::ResultExt; use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; +use crate::election::etcd::EtcdElection; use crate::error; use crate::metasrv::MetaSrv; use crate::metasrv::MetaSrvOptions; @@ -14,6 +15,7 @@ use crate::service::store::etcd::EtcdStore; // Bootstrap the rpc server to serve incoming request pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> crate::Result<()> { let kv_store = EtcdStore::with_endpoints([&opts.store_addr]).await?; + let election = EtcdElection::with_endpoints(&opts.server_addr, [&opts.store_addr]).await?; let listener = TcpListener::bind(&opts.bind_addr) .await @@ -22,7 +24,8 @@ pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> crate::Result<()> { })?; let listener = TcpListenerStream::new(listener); - let meta_srv = MetaSrv::new(opts, kv_store, None).await; + let meta_srv = MetaSrv::new(opts, kv_store, None, Some(election)).await; + meta_srv.start().await; tonic::transport::Server::builder() .accept_http1(true) // for admin services diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs new file mode 100644 index 0000000000..09a6e2512a --- /dev/null +++ b/src/meta-srv/src/election.rs @@ -0,0 +1,28 @@ +pub(crate) mod etcd; + +use crate::error::Result; + +pub const LEASE_SECS: i64 = 3; +pub const PROCLAIM_PERIOD_SECS: u64 = LEASE_SECS as u64 * 2 / 3; +pub const ELECTION_KEY: &str = "__meta_srv_election"; + +#[async_trait::async_trait] +pub trait Election: Send + Sync { + type Leader; + + /// Returns `true` if current node is the leader. + fn is_leader(&self) -> bool; + + /// Campaign waits to acquire leadership in an election. + /// + /// Multiple sessions can participate in the election, + /// but only one can be the leader at a time. + async fn campaign(&self) -> Result<()>; + + /// Returns the leader value for the current election. + async fn leader(&self) -> Result; + + /// Releases election leadership so other campaigners may + /// acquire leadership on the election. + async fn resign(&self) -> Result<()>; +} diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs new file mode 100644 index 0000000000..e94653e34d --- /dev/null +++ b/src/meta-srv/src/election/etcd.rs @@ -0,0 +1,129 @@ +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; + +use common_telemetry::info; +use common_telemetry::warn; +use etcd_client::Client; +use snafu::OptionExt; +use snafu::ResultExt; + +use crate::election::Election; +use crate::election::ELECTION_KEY; +use crate::election::LEASE_SECS; +use crate::election::PROCLAIM_PERIOD_SECS; +use crate::error; +use crate::error::Result; +use crate::metasrv::ElectionRef; +use crate::metasrv::LeaderValue; + +pub struct EtcdElection { + leader_value: String, + client: Client, + is_leader: AtomicBool, +} + +impl EtcdElection { + pub async fn with_endpoints(leader_value: E, endpoints: S) -> Result + where + E: AsRef, + S: AsRef<[E]>, + { + let leader_value = leader_value.as_ref().into(); + let client = Client::connect(endpoints, None) + .await + .context(error::ConnectEtcdSnafu)?; + + Ok(Arc::new(Self { + leader_value, + client, + is_leader: AtomicBool::new(false), + })) + } +} + +#[async_trait::async_trait] +impl Election for EtcdElection { + type Leader = LeaderValue; + + fn is_leader(&self) -> bool { + self.is_leader.load(Ordering::Relaxed) + } + + async fn campaign(&self) -> Result<()> { + let mut lease_client = self.client.lease_client(); + let mut election_client = self.client.election_client(); + let res = lease_client + .grant(LEASE_SECS, None) + .await + .context(error::EtcdFailedSnafu)?; + let lease_id = res.id(); + + info!("Election grant ttl: {:?}, id: {:?}", res.ttl(), lease_id); + + // campaign + let res = election_client + .campaign(ELECTION_KEY, self.leader_value.clone(), lease_id) + .await + .context(error::EtcdFailedSnafu)?; + + if let Some(leader) = res.leader() { + info!( + "[{}] becoming leader: {:?}, lease: {}", + &self.leader_value, + leader.name_str(), + leader.lease() + ); + + let (mut keeper, mut receiver) = self + .client + .lease_client() + .keep_alive(lease_id) + .await + .context(error::EtcdFailedSnafu)?; + + let mut interval = tokio::time::interval(Duration::from_secs(PROCLAIM_PERIOD_SECS)); + loop { + interval.tick().await; + keeper.keep_alive().await.context(error::EtcdFailedSnafu)?; + + if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? { + if res.ttl() > 0 { + self.is_leader.store(true, Ordering::Relaxed); + } else { + warn!( + "Already lost leader status, lease: {}, will re-initiate election", + leader.lease() + ); + break; + } + } + } + + self.is_leader.store(false, Ordering::Relaxed); + } + + Ok(()) + } + + async fn leader(&self) -> Result { + if self.is_leader.load(Ordering::Relaxed) { + Ok(LeaderValue(self.leader_value.clone())) + } else { + let res = self + .client + .election_client() + .leader(ELECTION_KEY) + .await + .context(error::EtcdFailedSnafu)?; + let leader_value = res.kv().context(error::NoLeaderSnafu)?.value(); + let leader_value = String::from_utf8_lossy(leader_value).to_string(); + Ok(LeaderValue(leader_value)) + } + } + + async fn resign(&self) -> Result<()> { + todo!() + } +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index fcbab4ed58..b35e9cd046 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -1,5 +1,6 @@ use common_error::prelude::*; -use tonic::{Code, Status}; +use tonic::Code; +use tonic::Status; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] @@ -106,6 +107,9 @@ pub enum Error { err_msg: String, backtrace: Backtrace, }, + + #[snafu(display("MetaSrv has no leader at this moment"))] + NoLeader { backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -134,6 +138,7 @@ impl ErrorExt for Error { | Error::SerializeToJson { .. } | Error::DeserializeFromJson { .. } | Error::DecodeTableRoute { .. } + | Error::NoLeader { .. } | Error::StartGrpc { .. } => StatusCode::Internal, Error::EmptyKey { .. } | Error::EmptyTableName { .. } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 697ea62c29..de2ca2b50d 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -1,3 +1,4 @@ +pub(crate) mod check_leader; pub(crate) mod datanode_lease; pub(crate) mod response_header; @@ -12,7 +13,7 @@ use tokio::sync::mpsc::Sender; use tokio::sync::RwLock; use crate::error::Result; -use crate::service::store::kv::KvStoreRef; +use crate::metasrv::Context; #[async_trait::async_trait] pub trait HeartbeatHandler: Send + Sync { @@ -24,24 +25,6 @@ pub trait HeartbeatHandler: Send + Sync { ) -> Result<()>; } -#[derive(Clone)] -pub struct Context { - pub server_addr: String, // also server_id - pub kv_store: KvStoreRef, -} - -impl Context { - #[inline] - pub fn server_addr(&self) -> &str { - &self.server_addr - } - - #[inline] - pub fn kv_store(&self) -> KvStoreRef { - self.kv_store.clone() - } -} - #[derive(Debug, Default)] pub struct HeartbeatAccumulator { pub header: Option, diff --git a/src/meta-srv/src/handler/check_leader.rs b/src/meta-srv/src/handler/check_leader.rs new file mode 100644 index 0000000000..b9f7c4e750 --- /dev/null +++ b/src/meta-srv/src/handler/check_leader.rs @@ -0,0 +1,32 @@ +use api::v1::meta::Error; +use api::v1::meta::HeartbeatRequest; + +use crate::error::Result; +use crate::handler::HeartbeatAccumulator; +use crate::handler::HeartbeatHandler; +use crate::metasrv::Context; + +pub struct CheckLeaderHandler; + +#[async_trait::async_trait] +impl HeartbeatHandler for CheckLeaderHandler { + async fn handle( + &self, + _req: &HeartbeatRequest, + ctx: &Context, + acc: &mut HeartbeatAccumulator, + ) -> Result<()> { + if let Some(election) = &ctx.election { + if election.is_leader() { + return Ok(()); + } + } + + if let Some(header) = &mut acc.header { + header.error = Some(Error::is_not_leader()); + ctx.set_skip_all(); + } + + Ok(()) + } +} diff --git a/src/meta-srv/src/handler/datanode_lease.rs b/src/meta-srv/src/handler/datanode_lease.rs index 45c88f0278..06ab50a0e8 100644 --- a/src/meta-srv/src/handler/datanode_lease.rs +++ b/src/meta-srv/src/handler/datanode_lease.rs @@ -3,12 +3,12 @@ use api::v1::meta::PutRequest; use common_telemetry::info; use common_time::util as time_util; -use super::Context; -use super::HeartbeatAccumulator; -use super::HeartbeatHandler; use crate::error::Result; +use crate::handler::HeartbeatAccumulator; +use crate::handler::HeartbeatHandler; use crate::keys::LeaseKey; use crate::keys::LeaseValue; +use crate::metasrv::Context; pub struct DatanodeLeaseHandler; @@ -20,8 +20,12 @@ impl HeartbeatHandler for DatanodeLeaseHandler { ctx: &Context, _acc: &mut HeartbeatAccumulator, ) -> Result<()> { + if ctx.is_skip_all() { + return Ok(()); + } + let HeartbeatRequest { header, peer, .. } = req; - if let Some(ref peer) = peer { + if let Some(peer) = &peer { let key = LeaseKey { cluster_id: header.as_ref().map_or(0, |h| h.cluster_id), node_id: peer.id, @@ -41,9 +45,61 @@ impl HeartbeatHandler for DatanodeLeaseHandler { ..Default::default() }; - let _ = ctx.kv_store().put(put).await?; + ctx.kv_store.put(put).await?; } Ok(()) } } + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicBool; + use std::sync::Arc; + + use api::v1::meta::Peer; + use api::v1::meta::RangeRequest; + use api::v1::meta::RequestHeader; + + use super::*; + use crate::service::store::memory::MemStore; + + #[tokio::test] + async fn test_handle_datanode_lease() { + let kv_store = Arc::new(MemStore::new()); + let ctx = Context { + datanode_lease_secs: 30, + server_addr: "0.0.0.0:0000".to_string(), + kv_store, + election: None, + skip_all: Arc::new(AtomicBool::new(false)), + }; + + let req = HeartbeatRequest { + header: Some(RequestHeader::new((1, 2))), + peer: Some(Peer { + id: 3, + addr: "127.0.0.1:1111".to_string(), + }), + ..Default::default() + }; + let mut acc = HeartbeatAccumulator::default(); + + let lease_handler = DatanodeLeaseHandler {}; + lease_handler.handle(&req, &ctx, &mut acc).await.unwrap(); + + let key = LeaseKey { + cluster_id: 1, + node_id: 3, + }; + + let req = RangeRequest { + key: key.try_into().unwrap(), + ..Default::default() + }; + + let res = ctx.kv_store.range(req).await.unwrap(); + + assert_eq!(1, res.kvs.len()); + } +} diff --git a/src/meta-srv/src/handler/response_header.rs b/src/meta-srv/src/handler/response_header.rs index 76fa7b2508..be87232fd6 100644 --- a/src/meta-srv/src/handler/response_header.rs +++ b/src/meta-srv/src/handler/response_header.rs @@ -2,10 +2,10 @@ use api::v1::meta::HeartbeatRequest; use api::v1::meta::ResponseHeader; use api::v1::meta::PROTOCOL_VERSION; -use super::Context; -use super::HeartbeatAccumulator; -use super::HeartbeatHandler; use crate::error::Result; +use crate::handler::HeartbeatAccumulator; +use crate::handler::HeartbeatHandler; +use crate::metasrv::Context; pub struct ResponseHeaderHandler; @@ -30,6 +30,7 @@ impl HeartbeatHandler for ResponseHeaderHandler { #[cfg(test)] mod tests { + use std::sync::atomic::AtomicBool; use std::sync::Arc; use api::v1::meta::{HeartbeatResponse, RequestHeader}; @@ -42,8 +43,11 @@ mod tests { async fn test_handle_heartbeat_resp_header() { let kv_store = Arc::new(MemStore::new()); let ctx = Context { + datanode_lease_secs: 30, server_addr: "0.0.0.0:0000".to_string(), kv_store, + election: None, + skip_all: Arc::new(AtomicBool::new(false)), }; let req = HeartbeatRequest { diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 79744f8fb5..865d2cb33d 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -1,5 +1,6 @@ #![feature(btree_drain_filter)] pub mod bootstrap; +mod election; pub mod error; pub mod handler; mod keys; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index c7d886a61f..8bd9793a52 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -1,9 +1,15 @@ +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use std::sync::Arc; use api::v1::meta::Peer; +use common_telemetry::info; +use common_telemetry::warn; use serde::Deserialize; use serde::Serialize; +use crate::election::Election; +use crate::handler::check_leader::CheckLeaderHandler; use crate::handler::datanode_lease::DatanodeLeaseHandler; use crate::handler::response_header::ResponseHeaderHandler; use crate::handler::HeartbeatHandlerGroup; @@ -37,18 +43,36 @@ impl Default for MetaSrvOptions { #[derive(Clone)] pub struct Context { pub datanode_lease_secs: i64, + pub server_addr: String, pub kv_store: KvStoreRef, + pub election: Option, + pub skip_all: Arc, } +impl Context { + pub fn is_skip_all(&self) -> bool { + self.skip_all.load(Ordering::Relaxed) + } + + pub fn set_skip_all(&self) { + self.skip_all.store(true, Ordering::Relaxed); + } +} + +pub struct LeaderValue(pub String); + pub type SelectorRef = Arc>>; +pub type ElectionRef = Arc>; #[derive(Clone)] pub struct MetaSrv { + started: Arc, options: MetaSrvOptions, kv_store: KvStoreRef, table_id_sequence: SequenceRef, selector: SelectorRef, handler_group: HeartbeatHandlerGroup, + election: Option, } impl MetaSrv { @@ -56,22 +80,59 @@ impl MetaSrv { options: MetaSrvOptions, kv_store: KvStoreRef, selector: Option, + election: Option, ) -> Self { + let started = Arc::new(AtomicBool::new(false)); let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 10, kv_store.clone())); let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector {})); let handler_group = HeartbeatHandlerGroup::default(); handler_group.add_handler(ResponseHeaderHandler).await; + handler_group.add_handler(CheckLeaderHandler).await; handler_group.add_handler(DatanodeLeaseHandler).await; Self { + started, options, kv_store, table_id_sequence, selector, handler_group, + election, } } + pub async fn start(&self) { + if self + .started + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_err() + { + warn!("MetaSrv already started"); + return; + } + + if let Some(election) = self.election() { + let election = election.clone(); + let started = self.started.clone(); + common_runtime::spawn_bg(async move { + while started.load(Ordering::Relaxed) { + let res = election.campaign().await; + if let Err(e) = res { + warn!("MetaSrv election error: {}", e); + } + info!("MetaSrv re-initiate election"); + } + info!("MetaSrv stopped"); + }); + } + + info!("MetaSrv started"); + } + + pub fn shutdown(&self) { + self.started.store(false, Ordering::Relaxed); + } + #[inline] pub fn options(&self) -> &MetaSrvOptions { &self.options @@ -97,13 +158,24 @@ impl MetaSrv { self.handler_group.clone() } + #[inline] + pub fn election(&self) -> Option { + self.election.clone() + } + #[inline] pub fn new_ctx(&self) -> Context { let datanode_lease_secs = self.options().datanode_lease_secs; + let server_addr = self.options().server_addr.clone(); let kv_store = self.kv_store(); + let election = self.election(); + let skip_all = Arc::new(AtomicBool::new(false)); Context { datanode_lease_secs, + server_addr, kv_store, + election, + skip_all, } } } diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index a90dd01a21..f1ad3d836c 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -41,7 +41,7 @@ pub async fn mock( selector: Option, ) -> MockInfo { let server_addr = opts.server_addr.clone(); - let meta_srv = MetaSrv::new(opts, kv_store, selector).await; + let meta_srv = MetaSrv::new(opts, kv_store, selector, None).await; let (client, server) = tokio::io::duplex(1024); tokio::spawn(async move { tonic::transport::Server::builder() diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs index d584c692d9..c4936011b4 100644 --- a/src/meta-srv/src/selector/lease_based.rs +++ b/src/meta-srv/src/selector/lease_based.rs @@ -1,13 +1,13 @@ use api::v1::meta::Peer; use common_time::util as time_util; -use super::Namespace; -use super::Selector; use crate::error::Result; use crate::keys::LeaseKey; use crate::keys::LeaseValue; use crate::lease; use crate::metasrv::Context; +use crate::selector::Namespace; +use crate::selector::Selector; pub struct LeaseBasedSelector; diff --git a/src/meta-srv/src/service/admin/health.rs b/src/meta-srv/src/service/admin/health.rs index f24c52f656..60e7d7310a 100644 --- a/src/meta-srv/src/service/admin/health.rs +++ b/src/meta-srv/src/service/admin/health.rs @@ -2,8 +2,8 @@ use std::collections::HashMap; use tonic::codegen::http; -use super::HttpHandler; use crate::error::Result; +use crate::service::admin::HttpHandler; const HTTP_OK: &str = "OK\n"; diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 82f91ec7d2..3d8189ffe9 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -9,9 +9,9 @@ use api::v1::meta::HeartbeatRequest; use api::v1::meta::HeartbeatResponse; use api::v1::meta::Peer; use api::v1::meta::ResponseHeader; -use api::v1::meta::PROTOCOL_VERSION; use common_telemetry::error; use common_telemetry::info; +use common_telemetry::warn; use futures::StreamExt; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -19,12 +19,12 @@ use tonic::Request; use tonic::Response; use tonic::Streaming; -use super::GrpcResult; -use super::GrpcStream; use crate::error; use crate::error::Result; -use crate::handler::Context; +use crate::metasrv::Context; use crate::metasrv::MetaSrv; +use crate::service::GrpcResult; +use crate::service::GrpcStream; static PUSHER_ID: AtomicU64 = AtomicU64::new(0); @@ -39,17 +39,15 @@ impl heartbeat_server::Heartbeat for MetaSrv { let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(128); let handler_group = self.handler_group(); - let ctx = Context { - server_addr: self.options().server_addr.clone(), - kv_store: self.kv_store(), - }; + let ctx = self.new_ctx(); common_runtime::spawn_bg(async move { let mut pusher_key = None; while let Some(msg) = in_stream.next().await { + let mut quit = false; match msg { Ok(req) => { if pusher_key.is_none() { - if let Some(ref peer) = req.peer { + if let Some(peer) = &req.peer { let key = format!( "{}-{}-{}", peer.addr, @@ -61,14 +59,16 @@ impl heartbeat_server::Heartbeat for MetaSrv { } } - tx.send( - handler_group - .handle(req, ctx.clone()) - .await - .map_err(|e| e.into()), - ) - .await - .expect("working rx"); + let res = handler_group + .handle(req, ctx.clone()) + .await + .map_err(|e| e.into()); + + if let Ok(res) = &res { + quit = res.is_not_leader(); + } + + tx.send(res).await.expect("working rx"); } Err(err) => { if let Some(io_err) = error::match_for_io_error(&err) { @@ -85,6 +85,11 @@ impl heartbeat_server::Heartbeat for MetaSrv { } } } + + if quit { + warn!("Quit because it is no longer the leader"); + break; + } } info!( "Heartbeat stream broken: {:?}", @@ -102,34 +107,34 @@ impl heartbeat_server::Heartbeat for MetaSrv { async fn ask_leader(&self, req: Request) -> GrpcResult { let req = req.into_inner(); - let res = self.handle_ask_leader(req).await?; + let ctx = self.new_ctx(); + let res = handle_ask_leader(req, ctx).await?; Ok(Response::new(res)) } } -impl MetaSrv { - // TODO(jiachun): move out when we can get the leader peer from kv store - async fn handle_ask_leader(&self, req: AskLeaderRequest) -> Result { - let AskLeaderRequest { header, .. } = req; +async fn handle_ask_leader(req: AskLeaderRequest, ctx: Context) -> Result { + let cluster_id = req.header.as_ref().map_or(0, |h| h.cluster_id); - let res_header = ResponseHeader { - protocol_version: PROTOCOL_VERSION, - cluster_id: header.map_or(0u64, |h| h.cluster_id), - ..Default::default() - }; + let addr = match ctx.election { + Some(election) => { + if election.is_leader() { + ctx.server_addr + } else { + election.leader().await?.0 + } + } + None => ctx.server_addr, + }; - // TODO(jiachun): return leader - let res = AskLeaderResponse { - header: Some(res_header), - leader: Some(Peer { - id: 0, - addr: self.options().server_addr.clone(), - }), - }; + let leader = Some(Peer { + id: 0, // TODO(jiachun): meta node should have a Id + addr, + }); - Ok(res) - } + let header = Some(ResponseHeader::success(cluster_id)); + Ok(AskLeaderResponse { header, leader }) } #[cfg(test)] @@ -147,7 +152,7 @@ mod tests { #[tokio::test] async fn test_ask_leader() { let kv_store = Arc::new(MemStore::new()); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None).await; let req = AskLeaderRequest { header: Some(RequestHeader::new((1, 1))), diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index 1eb861151c..976650fc5b 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -20,8 +20,6 @@ use snafu::ResultExt; use tonic::Request; use tonic::Response; -use super::store::kv::KvStoreRef; -use super::GrpcResult; use crate::error; use crate::error::Result; use crate::keys::TableRouteKey; @@ -29,6 +27,8 @@ use crate::metasrv::Context; use crate::metasrv::MetaSrv; use crate::metasrv::SelectorRef; use crate::sequence::SequenceRef; +use crate::service::store::kv::KvStoreRef; +use crate::service::GrpcResult; #[async_trait::async_trait] impl router_server::Router for MetaSrv { @@ -71,7 +71,7 @@ async fn handle_route(req: RouteRequest, ctx: Context) -> Result peers, mut table_route, } = tr; - if let Some(ref mut table_route) = table_route { + if let Some(table_route) = &mut table_route { for rr in &mut table_route.region_routes { if let Some(peer) = peers.get(rr.leader_peer_index as usize) { rr.leader_peer_index = peer_dict.get_or_insert(peer.clone()) as u64; @@ -83,7 +83,7 @@ async fn handle_route(req: RouteRequest, ctx: Context) -> Result } } - if let Some(ref mut table) = table_route.table { + if let Some(table) = &mut table_route.table { table.table_schema = tg.as_bytes().context(error::InvalidCatalogValueSnafu)?; } } diff --git a/src/meta-srv/src/service/store.rs b/src/meta-srv/src/service/store.rs index 290df1a6ca..47096a0673 100644 --- a/src/meta-srv/src/service/store.rs +++ b/src/meta-srv/src/service/store.rs @@ -16,8 +16,8 @@ use api::v1::meta::RangeResponse; use tonic::Request; use tonic::Response; -use super::GrpcResult; use crate::metasrv::MetaSrv; +use crate::service::GrpcResult; #[async_trait::async_trait] impl store_server::Store for MetaSrv { @@ -78,7 +78,7 @@ mod tests { #[tokio::test] async fn test_range() { let kv_store = Arc::new(MemStore::new()); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None).await; let req = RangeRequest::default(); let res = meta_srv.range(req.into_request()).await; @@ -88,7 +88,7 @@ mod tests { #[tokio::test] async fn test_put() { let kv_store = Arc::new(MemStore::new()); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None).await; let req = PutRequest::default(); let res = meta_srv.put(req.into_request()).await; @@ -98,7 +98,7 @@ mod tests { #[tokio::test] async fn test_batch_put() { let kv_store = Arc::new(MemStore::new()); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None).await; let req = BatchPutRequest::default(); let res = meta_srv.batch_put(req.into_request()).await; @@ -108,7 +108,7 @@ mod tests { #[tokio::test] async fn test_compare_and_put() { let kv_store = Arc::new(MemStore::new()); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None).await; let req = CompareAndPutRequest::default(); let res = meta_srv.compare_and_put(req.into_request()).await; @@ -118,7 +118,7 @@ mod tests { #[tokio::test] async fn test_delete_range() { let kv_store = Arc::new(MemStore::new()); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None).await; let req = DeleteRangeRequest::default(); let res = meta_srv.delete_range(req.into_request()).await; diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs index aa71890b13..d7023cd48e 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/meta-srv/src/service/store/etcd.rs @@ -23,10 +23,10 @@ use etcd_client::Txn; use etcd_client::TxnOp; use etcd_client::TxnOpResponse; -use super::kv::KvStore; -use super::kv::KvStoreRef; use crate::error; use crate::error::Result; +use crate::service::store::kv::KvStore; +use crate::service::store::kv::KvStoreRef; #[derive(Clone)] pub struct EtcdStore { diff --git a/src/meta-srv/src/service/store/memory.rs b/src/meta-srv/src/service/store/memory.rs index 294abc32e0..d68d90f436 100644 --- a/src/meta-srv/src/service/store/memory.rs +++ b/src/meta-srv/src/service/store/memory.rs @@ -17,8 +17,8 @@ use api::v1::meta::RangeResponse; use api::v1::meta::ResponseHeader; use parking_lot::RwLock; -use super::kv::KvStore; use crate::error::Result; +use crate::service::store::kv::KvStore; /// Only for mock test #[derive(Clone)]