feat: in memory storage on meta leader (#856)

* chore: minor change on election

* chore: refactor some from/into

* feat: add in_memory store for leader node

* refactor: make context mutable

* feat: add ResetableKvStore trait
This commit is contained in:
Jiachun Feng
2023-01-10 15:53:34 +08:00
committed by GitHub
parent 1305924423
commit c609b193a1
15 changed files with 166 additions and 57 deletions

View File

@@ -17,7 +17,7 @@ pub mod etcd;
use crate::error::Result;
pub const LEASE_SECS: i64 = 3;
pub const PROCLAIM_PERIOD_SECS: u64 = LEASE_SECS as u64 * 2 / 3;
pub const KEEP_ALIVE_PERIOD_SECS: u64 = LEASE_SECS as u64 * 2 / 3;
pub const ELECTION_KEY: &str = "__meta_srv_election";
#[async_trait::async_trait]
@@ -27,6 +27,13 @@ pub trait Election: Send + Sync {
/// Returns `true` if current node is the leader.
fn is_leader(&self) -> bool;
/// When a new leader is born, it may need some initialization
/// operations (asynchronous), this method tells us when these
/// initialization operations can be performed.
///
/// note: a new leader will only return true on the first call.
fn in_infancy(&self) -> bool;
/// Campaign waits to acquire leadership in an election.
///
/// Multiple sessions can participate in the election,

View File

@@ -20,7 +20,7 @@ use common_telemetry::{info, warn};
use etcd_client::Client;
use snafu::{OptionExt, ResultExt};
use crate::election::{Election, ELECTION_KEY, LEASE_SECS, PROCLAIM_PERIOD_SECS};
use crate::election::{Election, ELECTION_KEY, KEEP_ALIVE_PERIOD_SECS, LEASE_SECS};
use crate::error;
use crate::error::Result;
use crate::metasrv::{ElectionRef, LeaderValue};
@@ -29,6 +29,7 @@ pub struct EtcdElection {
leader_value: String,
client: Client,
is_leader: AtomicBool,
infancy: AtomicBool,
}
impl EtcdElection {
@@ -46,6 +47,7 @@ impl EtcdElection {
leader_value,
client,
is_leader: AtomicBool::new(false),
infancy: AtomicBool::new(false),
}))
}
}
@@ -58,6 +60,12 @@ impl Election for EtcdElection {
self.is_leader.load(Ordering::Relaxed)
}
fn in_infancy(&self) -> bool {
self.infancy
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
}
async fn campaign(&self) -> Result<()> {
let mut lease_client = self.client.lease_client();
let mut election_client = self.client.election_client();
@@ -67,22 +75,21 @@ impl Election for EtcdElection {
.context(error::EtcdFailedSnafu)?;
let lease_id = res.id();
info!("Election grant ttl: {:?}, id: {:?}", res.ttl(), lease_id);
info!("Election grant ttl: {:?}, lease: {:?}", res.ttl(), lease_id);
// campaign
// Campaign, waits to acquire leadership in an election, returning
// a LeaderKey representing the leadership if successful.
//
// The method will be blocked until the election is won, and after
// passing the method, it is necessary to execute `keep_alive` immediately
// to confirm that it is a valid leader, because it is possible that the
// election's lease expires.
let res = election_client
.campaign(ELECTION_KEY, self.leader_value.clone(), lease_id)
.await
.context(error::EtcdFailedSnafu)?;
if let Some(leader) = res.leader() {
info!(
"[{}] becoming leader: {:?}, lease: {}",
&self.leader_value,
leader.name_str(),
leader.lease()
);
let (mut keeper, mut receiver) = self
.client
.lease_client()
@@ -90,17 +97,31 @@ impl Election for EtcdElection {
.await
.context(error::EtcdFailedSnafu)?;
let mut interval = tokio::time::interval(Duration::from_secs(PROCLAIM_PERIOD_SECS));
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(KEEP_ALIVE_PERIOD_SECS));
loop {
interval.tick().await;
keep_alive_interval.tick().await;
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
if res.ttl() > 0 {
self.is_leader.store(true, Ordering::Relaxed);
// Only after a successful `keep_alive` is the leader considered official.
if self
.is_leader
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.infancy.store(true, Ordering::Relaxed);
info!(
"[{}] becoming leader: {:?}, lease: {}",
&self.leader_value,
leader.name_str(),
leader.lease()
);
}
} else {
warn!(
"Already lost leader status, lease: {}, will re-initiate election",
"Failed to keep-alive, lease: {}, will re-initiate election",
leader.lease()
);
break;

View File

@@ -15,6 +15,7 @@
pub use check_leader_handler::CheckLeaderHandler;
pub use collect_stats_handler::CollectStatsHandler;
pub use keep_lease_handler::KeepLeaseHandler;
pub use on_leader_start::OnLeaderStartHandler;
pub use persist_stats_handler::PersistStatsHandler;
pub use response_header_handler::ResponseHeaderHandler;
@@ -23,6 +24,7 @@ mod collect_stats_handler;
mod instruction;
mod keep_lease_handler;
pub(crate) mod node_stat;
mod on_leader_start;
mod persist_stats_handler;
mod response_header_handler;
@@ -44,7 +46,7 @@ pub trait HeartbeatHandler: Send + Sync {
async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &Context,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()>;
}
@@ -91,11 +93,15 @@ impl HeartbeatHandlerGroup {
pushers.remove(key)
}
pub async fn handle(&self, req: HeartbeatRequest, ctx: Context) -> Result<HeartbeatResponse> {
pub async fn handle(
&self,
req: HeartbeatRequest,
mut ctx: Context,
) -> Result<HeartbeatResponse> {
let mut acc = HeartbeatAccumulator::default();
let handlers = self.handlers.read().await;
for h in handlers.iter() {
h.handle(&req, &ctx, &mut acc).await?;
h.handle(&req, &mut ctx, &mut acc).await?;
}
let header = std::mem::take(&mut acc.header);
let res = HeartbeatResponse {

View File

@@ -26,7 +26,7 @@ impl HeartbeatHandler for CheckLeaderHandler {
async fn handle(
&self,
_req: &HeartbeatRequest,
ctx: &Context,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if let Some(election) = &ctx.election {

View File

@@ -51,14 +51,14 @@ impl HeartbeatHandler for CollectStatsHandler {
async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &Context,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if ctx.is_skip_all() {
return Ok(());
}
match Stat::try_from(req) {
match Stat::try_from(req.clone()) {
Ok(stat) => {
let key = (stat.cluster_id, stat.id);
match self.cache.entry(key) {

View File

@@ -58,7 +58,7 @@ impl HeartbeatHandler for KeepLeaseHandler {
async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &Context,
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if ctx.is_skip_all() {

View File

@@ -69,10 +69,10 @@ impl Stat {
}
}
impl TryFrom<&HeartbeatRequest> for Stat {
impl TryFrom<HeartbeatRequest> for Stat {
type Error = ();
fn try_from(value: &HeartbeatRequest) -> Result<Self, Self::Error> {
fn try_from(value: HeartbeatRequest) -> Result<Self, Self::Error> {
let HeartbeatRequest {
header,
peer,
@@ -87,8 +87,8 @@ impl TryFrom<&HeartbeatRequest> for Stat {
timestamp_millis: time_util::current_time_millis(),
cluster_id: header.cluster_id,
id: peer.id,
addr: peer.addr.clone(),
is_leader: *is_leader,
addr: peer.addr,
is_leader,
rcus: node_stat.rcus,
wcus: node_stat.wcus,
table_num: node_stat.table_num,
@@ -97,15 +97,15 @@ impl TryFrom<&HeartbeatRequest> for Stat {
load: node_stat.load,
read_io_rate: node_stat.read_io_rate,
write_io_rate: node_stat.write_io_rate,
region_stats: region_stats.iter().map(RegionStat::from).collect(),
region_stats: region_stats.into_iter().map(RegionStat::from).collect(),
}),
_ => Err(()),
}
}
}
impl From<&api::v1::meta::RegionStat> for RegionStat {
fn from(value: &api::v1::meta::RegionStat) -> Self {
impl From<api::v1::meta::RegionStat> for RegionStat {
fn from(value: api::v1::meta::RegionStat) -> Self {
let table = value.table_name.as_ref();
Self {
id: value.region_id,

View File

@@ -0,0 +1,39 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::HeartbeatRequest;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
#[derive(Default)]
pub struct OnLeaderStartHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for OnLeaderStartHandler {
async fn handle(
&self,
_req: &HeartbeatRequest,
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if let Some(election) = &ctx.election {
if election.in_infancy() {
ctx.reset_in_memory();
}
}
Ok(())
}
}

View File

@@ -27,7 +27,7 @@ impl HeartbeatHandler for PersistStatsHandler {
async fn handle(
&self,
_req: &HeartbeatRequest,
ctx: &Context,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if ctx.is_skip_all() || acc.stats.is_empty() {
@@ -43,7 +43,7 @@ impl HeartbeatHandler for PersistStatsHandler {
// take stats from &mut acc.stats, avoid clone of vec
let stats = std::mem::take(stats);
let val = &StatValue { stats };
let val = StatValue { stats };
let put = PutRequest {
key: key.into(),
@@ -71,10 +71,12 @@ mod tests {
#[tokio::test]
async fn test_handle_datanode_stats() {
let in_memory = Arc::new(MemStore::new());
let kv_store = Arc::new(MemStore::new());
let ctx = Context {
let mut ctx = Context {
datanode_lease_secs: 30,
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_store,
election: None,
skip_all: Arc::new(AtomicBool::new(false)),
@@ -92,7 +94,10 @@ mod tests {
};
let stats_handler = PersistStatsHandler;
stats_handler.handle(&req, &ctx, &mut acc).await.unwrap();
stats_handler
.handle(&req, &mut ctx, &mut acc)
.await
.unwrap();
let key = StatKey {
cluster_id: 3,

View File

@@ -26,7 +26,7 @@ impl HeartbeatHandler for ResponseHeaderHandler {
async fn handle(
&self,
req: &HeartbeatRequest,
_ctx: &Context,
_ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
let HeartbeatRequest { header, .. } = req;
@@ -53,10 +53,12 @@ mod tests {
#[tokio::test]
async fn test_handle_heartbeat_resp_header() {
let in_memory = Arc::new(MemStore::new());
let kv_store = Arc::new(MemStore::new());
let ctx = Context {
let mut ctx = Context {
datanode_lease_secs: 30,
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_store,
election: None,
skip_all: Arc::new(AtomicBool::new(false)),
@@ -69,7 +71,10 @@ mod tests {
let mut acc = HeartbeatAccumulator::default();
let response_handler = ResponseHeaderHandler {};
response_handler.handle(&req, &ctx, &mut acc).await.unwrap();
response_handler
.handle(&req, &mut ctx, &mut acc)
.await
.unwrap();
let header = std::mem::take(&mut acc.header);
let res = HeartbeatResponse {
header,

View File

@@ -228,11 +228,11 @@ pub struct StatValue {
pub stats: Vec<Stat>,
}
impl TryFrom<&StatValue> for Vec<u8> {
impl TryFrom<StatValue> for Vec<u8> {
type Error = error::Error;
fn try_from(stats: &StatValue) -> Result<Self> {
Ok(serde_json::to_string(stats)
fn try_from(stats: StatValue) -> Result<Self> {
Ok(serde_json::to_string(&stats)
.context(crate::error::SerializeToJsonSnafu {
input: format!("{stats:?}"),
})?
@@ -286,7 +286,7 @@ mod tests {
..Default::default()
};
let stat_val = &StatValue { stats: vec![stat] };
let stat_val = StatValue { stats: vec![stat] };
let bytes: Vec<u8> = stat_val.try_into().unwrap();
let stat_val: StatValue = bytes.try_into().unwrap();

View File

@@ -22,12 +22,13 @@ use serde::{Deserialize, Serialize};
use crate::election::Election;
use crate::handler::{
CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, KeepLeaseHandler,
PersistStatsHandler, ResponseHeaderHandler,
OnLeaderStartHandler, PersistStatsHandler, ResponseHeaderHandler,
};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::Selector;
use crate::sequence::{Sequence, SequenceRef};
use crate::service::store::kv::KvStoreRef;
use crate::service::store::kv::{KvStoreRef, ResetableKvStoreRef};
use crate::service::store::memory::MemStore;
pub const TABLE_ID_SEQ: &str = "table_id";
@@ -55,6 +56,7 @@ impl Default for MetaSrvOptions {
pub struct Context {
pub datanode_lease_secs: i64,
pub server_addr: String,
pub in_memory: ResetableKvStoreRef,
pub kv_store: KvStoreRef,
pub election: Option<ElectionRef>,
pub skip_all: Arc<AtomicBool>,
@@ -68,6 +70,10 @@ impl Context {
pub fn set_skip_all(&self) {
self.skip_all.store(true, Ordering::Relaxed);
}
pub fn reset_in_memory(&self) {
self.in_memory.reset();
}
}
pub struct LeaderValue(pub String);
@@ -79,6 +85,9 @@ pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
pub struct MetaSrv {
started: Arc<AtomicBool>,
options: MetaSrvOptions,
// It is only valid at the leader node and is used to temporarily
// store some data that will not be persisted.
in_memory: ResetableKvStoreRef,
kv_store: KvStoreRef,
table_id_sequence: SequenceRef,
selector: SelectorRef,
@@ -97,26 +106,29 @@ impl MetaSrv {
let started = Arc::new(AtomicBool::new(false));
let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone()));
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector {}));
let in_memory = Arc::new(MemStore::default());
let handler_group = match handler_group {
Some(hg) => hg,
None => {
let hg = HeartbeatHandlerGroup::default();
let kv_store = kv_store.clone();
hg.add_handler(ResponseHeaderHandler::default()).await;
let group = HeartbeatHandlerGroup::default();
let keep_lease_handler = KeepLeaseHandler::new(kv_store.clone());
group.add_handler(ResponseHeaderHandler::default()).await;
// `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
// because even if the current meta-server node is no longer the leader it can
// still help the datanode to keep lease.
hg.add_handler(KeepLeaseHandler::new(kv_store)).await;
hg.add_handler(CheckLeaderHandler::default()).await;
hg.add_handler(CollectStatsHandler::default()).await;
hg.add_handler(PersistStatsHandler::default()).await;
hg
group.add_handler(keep_lease_handler).await;
group.add_handler(CheckLeaderHandler::default()).await;
group.add_handler(OnLeaderStartHandler::default()).await;
group.add_handler(CollectStatsHandler::default()).await;
group.add_handler(PersistStatsHandler::default()).await;
group
}
};
Self {
started,
options,
in_memory,
kv_store,
table_id_sequence,
selector,
@@ -162,6 +174,11 @@ impl MetaSrv {
&self.options
}
#[inline]
pub fn in_memory(&self) -> ResetableKvStoreRef {
self.in_memory.clone()
}
#[inline]
pub fn kv_store(&self) -> KvStoreRef {
self.kv_store.clone()
@@ -191,12 +208,14 @@ impl MetaSrv {
pub fn new_ctx(&self) -> Context {
let datanode_lease_secs = self.options().datanode_lease_secs;
let server_addr = self.options().server_addr.clone();
let in_memory = self.in_memory();
let kv_store = self.kv_store();
let election = self.election();
let skip_all = Arc::new(AtomicBool::new(false));
Context {
datanode_lease_secs,
server_addr,
in_memory,
kv_store,
election,
skip_all,

View File

@@ -29,7 +29,6 @@ use crate::error;
use crate::error::Result;
use crate::service::store::kv::{KvStore, KvStoreRef};
#[derive(Clone)]
pub struct EtcdStore {
client: Client,
}

View File

@@ -23,6 +23,7 @@ use api::v1::meta::{
use crate::error::Result;
pub type KvStoreRef = Arc<dyn KvStore>;
pub type ResetableKvStoreRef = Arc<dyn ResetableKvStore>;
#[async_trait::async_trait]
pub trait KvStore: Send + Sync {
@@ -38,3 +39,7 @@ pub trait KvStore: Send + Sync {
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse>;
}
pub trait ResetableKvStore: KvStore {
fn reset(&self);
}

View File

@@ -15,7 +15,6 @@
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::ops::Range;
use std::sync::Arc;
use api::v1::meta::{
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse,
@@ -25,12 +24,10 @@ use api::v1::meta::{
use parking_lot::RwLock;
use crate::error::Result;
use crate::service::store::kv::KvStore;
use crate::service::store::kv::{KvStore, ResetableKvStore};
/// Only for mock test
#[derive(Clone)]
pub struct MemStore {
inner: Arc<RwLock<BTreeMap<Vec<u8>, Vec<u8>>>>,
inner: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
}
impl Default for MemStore {
@@ -42,11 +39,17 @@ impl Default for MemStore {
impl MemStore {
pub fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(Default::default())),
inner: RwLock::new(Default::default()),
}
}
}
impl ResetableKvStore for MemStore {
fn reset(&self) {
self.inner.write().clear();
}
}
#[async_trait::async_trait]
impl KvStore for MemStore {
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {