feat: meta refactor (#339)

* feat: heartbeat handler

* chore: heartbeat handlers lock refactor

* chore: store rpc req/res wrapper

* chore: router rpc/res wrapper

* chore: const method(request_header)

* chore: rm unnessary const fn & refactor HeartbeatHandler

* chore: refactor CreateRequest

* chore: HeartbeatAccumulator

* chore: improve router req/res convert

* fix: register race condition
This commit is contained in:
Jiachun Feng
2022-10-26 11:26:40 +08:00
committed by GitHub
parent 932b30d299
commit 00966cad69
23 changed files with 1404 additions and 319 deletions

View File

@@ -24,11 +24,7 @@ message Error {
message Peer {
uint64 id = 1;
Endpoint endpoint = 2;
}
message Endpoint {
string addr = 1;
string addr = 2;
}
message TableName {

View File

@@ -88,5 +88,5 @@ message AskLeaderRequest {
message AskLeaderResponse {
ResponseHeader header = 1;
Endpoint leader = 2;
Peer leader = 2;
}

View File

@@ -8,19 +8,21 @@ service Router {
// Fetch routing information for tables. The smallest unit is the complete
// routing information(all regions) of a table.
//
// ```text
// table_1
// table_name
// table_schema
// regions
// region_1
// mutate_endpoint
// select_endpoint_1, select_endpoint_2
// leader_peer
// follower_peer_1, follower_peer_2
// region_2
// mutate_endpoint
// select_endpoint_1, select_endpoint_2, select_endpoint_3
// leader_peer
// follower_peer_1, follower_peer_2, follower_peer_3
// region_xxx
// table_2
// ...
// ```
//
rpc Route(RouteRequest) returns (RouteResponse) {}

View File

@@ -2,56 +2,19 @@ tonic::include_proto!("greptime.v1.meta");
pub const PROTOCOL_VERSION: u64 = 1;
impl Peer {
pub fn new(id: u64, addr: impl AsRef<str>) -> Self {
Self {
id,
endpoint: Some(addr.as_ref().into()),
}
}
}
impl From<&str> for Endpoint {
fn from(s: &str) -> Self {
Self {
addr: s.to_string(),
}
}
pub const fn request_header((cluster_id, member_id): (u64, u64)) -> Option<RequestHeader> {
Some(RequestHeader::new((cluster_id, member_id)))
}
impl RequestHeader {
pub fn new(cluster_id: u64, member_id: u64) -> Self {
#[inline]
pub const fn new((cluster_id, member_id): (u64, u64)) -> Self {
Self {
protocol_version: PROTOCOL_VERSION,
cluster_id,
member_id,
}
}
pub fn with_id((cluster_id, member_id): (u64, u64)) -> Self {
Self {
protocol_version: PROTOCOL_VERSION,
cluster_id,
member_id,
}
}
}
impl HeartbeatRequest {
pub fn new(header: RequestHeader) -> Self {
Self {
header: Some(header),
..Default::default()
}
}
}
impl AskLeaderRequest {
pub fn new(header: RequestHeader) -> Self {
Self {
header: Some(header),
}
}
}
impl TableName {
@@ -69,13 +32,6 @@ impl TableName {
}
impl RouteRequest {
pub fn new(header: RequestHeader) -> Self {
Self {
header: Some(header),
..Default::default()
}
}
pub fn add_table(mut self, table_name: TableName) -> Self {
self.table_names.push(table_name);
self
@@ -83,14 +39,6 @@ impl RouteRequest {
}
impl CreateRequest {
pub fn new(header: RequestHeader, table_name: TableName) -> Self {
Self {
header: Some(header),
table_name: Some(table_name),
..Default::default()
}
}
pub fn add_partition(mut self, partition: Partition) -> Self {
self.partitions.push(partition);
self
@@ -128,20 +76,3 @@ impl Partition {
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_peer() {
let peer = Peer::new(1, "test_addr");
assert_eq!(1, peer.id);
assert_eq!(
Endpoint {
addr: "test_addr".to_string()
},
peer.endpoint.unwrap()
);
}
}

View File

@@ -1,16 +1,16 @@
use std::time::Duration;
use api::v1::meta::CreateRequest;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::Partition;
use api::v1::meta::PutRequest;
use api::v1::meta::RangeRequest;
use api::v1::meta::RequestHeader;
use api::v1::meta::TableName;
use api::v1::meta::Peer;
use common_grpc::channel_manager::ChannelConfig;
use common_grpc::channel_manager::ChannelManager;
use meta_client::client::MetaClientBuilder;
use meta_client::rpc::CreateRequest;
use meta_client::rpc::DeleteRangeRequest;
use meta_client::rpc::Partition;
use meta_client::rpc::PutRequest;
use meta_client::rpc::RangeRequest;
use meta_client::rpc::TableName;
use tracing::event;
use tracing::subscriber;
use tracing::Level;
@@ -44,28 +44,37 @@ async fn run() {
// send heartbeats
tokio::spawn(async move {
for _ in 0..5 {
let req = HeartbeatRequest::new(RequestHeader::with_id(id));
let req = HeartbeatRequest {
peer: Some(Peer {
id: 1,
addr: "meta_client_peer".to_string(),
}),
..Default::default()
};
sender.send(req).await.unwrap();
}
tokio::time::sleep(Duration::from_secs(10)).await;
});
tokio::spawn(async move {
while let Some(res) = receiver.message().await.unwrap() {
event!(Level::INFO, "heartbeat response: {:#?}", res);
}
});
while let Some(res) = receiver.message().await.unwrap() {
event!(Level::INFO, "heartbeat response: {:#?}", res);
}
let p1 = Partition {
column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()],
value_list: vec![b"k1".to_vec(), b"k2".to_vec()],
};
let header = RequestHeader::with_id(id);
let p1 = Partition::new()
.column_list(vec![b"col_1".to_vec(), b"col_2".to_vec()])
.value_list(vec![b"k1".to_vec(), b"k2".to_vec()]);
let p2 = Partition::new()
.column_list(vec![b"col_1".to_vec(), b"col_2".to_vec()])
.value_list(vec![b"Max1".to_vec(), b"Max2".to_vec()]);
let p2 = Partition {
column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()],
value_list: vec![b"Max1".to_vec(), b"Max2".to_vec()],
};
let table_name = TableName::new("test_catlog", "test_schema", "test_table");
let create_req = CreateRequest::new(header, table_name)
let create_req = CreateRequest::new(table_name)
.add_partition(p1)
.add_partition(p2);
@@ -73,32 +82,24 @@ async fn run() {
event!(Level::INFO, "create_route result: {:#?}", res);
// put
let put_req = PutRequest {
key: b"key1".to_vec(),
value: b"value1".to_vec(),
prev_kv: true,
..Default::default()
};
let res = meta_client.put(put_req).await.unwrap();
let put = PutRequest::new()
.with_key(b"key1".to_vec())
.with_value(b"value1".to_vec())
.with_prev_kv();
let res = meta_client.put(put).await.unwrap();
event!(Level::INFO, "put result: {:#?}", res);
// get
let range_req = RangeRequest {
key: b"key1".to_vec(),
..Default::default()
};
let res = meta_client.range(range_req.clone()).await.unwrap();
let range = RangeRequest::new().with_key(b"key2".to_vec());
let res = meta_client.range(range.clone()).await.unwrap();
event!(Level::INFO, "get range result: {:#?}", res);
// delete
let delete_range_req = DeleteRangeRequest {
key: b"key1".to_vec(),
..Default::default()
};
let res = meta_client.delete_range(delete_range_req).await.unwrap();
let delete_range = DeleteRangeRequest::new().with_key(b"key1".to_vec());
let res = meta_client.delete_range(delete_range).await.unwrap();
event!(Level::INFO, "delete range result: {:#?}", res);
// get none
let res = meta_client.range(range_req).await;
let res = meta_client.range(range).await.unwrap();
event!(Level::INFO, "get range result: {:#?}", res);
}

View File

@@ -3,15 +3,6 @@ mod load_balance;
mod router;
mod store;
use api::v1::meta::CreateRequest;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse;
use api::v1::meta::PutRequest;
use api::v1::meta::PutResponse;
use api::v1::meta::RangeRequest;
use api::v1::meta::RangeResponse;
use api::v1::meta::RouteRequest;
use api::v1::meta::RouteResponse;
use common_grpc::channel_manager::ChannelConfig;
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::info;
@@ -24,6 +15,15 @@ use self::heartbeat::HeartbeatSender;
use self::heartbeat::HeartbeatStream;
use crate::error;
use crate::error::Result;
use crate::rpc::CreateRequest;
use crate::rpc::DeleteRangeRequest;
use crate::rpc::DeleteRangeResponse;
use crate::rpc::PutRequest;
use crate::rpc::PutResponse;
use crate::rpc::RangeRequest;
use crate::rpc::RangeResponse;
use crate::rpc::RouteRequest;
use crate::rpc::RouteResponse;
pub type Id = (u64, u64);
@@ -176,8 +176,9 @@ impl MetaClient {
.context(error::NotStartedSnafu {
name: "route_client",
})?
.create(req)
.await
.create(req.into())
.await?
.try_into()
}
/// Fetch routing information for tables. The smallest unit is the complete
@@ -189,11 +190,11 @@ impl MetaClient {
/// table_schema
/// regions
/// region_1
/// mutate_endpoint
/// select_endpoint_1, select_endpoint_2
/// leader_peer
/// follower_peer_1, follower_peer_2
/// region_2
/// mutate_endpoint
/// select_endpoint_1, select_endpoint_2, select_endpoint_3
/// leader_peer
/// follower_peer_1, follower_peer_2, follower_peer_3
/// region_xxx
/// table_2
/// ...
@@ -204,8 +205,9 @@ impl MetaClient {
.context(error::NotStartedSnafu {
name: "route_client",
})?
.route(req)
.await
.route(req.into())
.await?
.try_into()
}
/// Range gets the keys in the range from the key-value store.
@@ -214,8 +216,9 @@ impl MetaClient {
.context(error::NotStartedSnafu {
name: "store_client",
})?
.range(req)
.range(req.into())
.await
.map(Into::into)
}
/// Put puts the given key into the key-value store.
@@ -224,8 +227,9 @@ impl MetaClient {
.context(error::NotStartedSnafu {
name: "store_client",
})?
.put(req)
.put(req.into())
.await
.map(Into::into)
}
/// DeleteRange deletes the given range from the key-value store.
@@ -234,8 +238,9 @@ impl MetaClient {
.context(error::NotStartedSnafu {
name: "store_client",
})?
.delete_range(req)
.delete_range(req.into())
.await
.map(Into::into)
}
#[inline]
@@ -267,6 +272,7 @@ impl MetaClient {
#[cfg(test)]
mod tests {
use super::*;
use crate::rpc::TableName;
#[tokio::test]
async fn test_meta_client_builder() {
@@ -336,7 +342,8 @@ mod tests {
meta_client.start(urls).await.unwrap();
let res = meta_client.create_route(CreateRequest::default()).await;
let req = CreateRequest::new(TableName::new("c", "s", "t"));
let res = meta_client.create_route(req).await;
assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
}

View File

@@ -2,10 +2,10 @@ use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::heartbeat_client::HeartbeatClient;
use api::v1::meta::request_header;
use api::v1::meta::AskLeaderRequest;
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::HeartbeatResponse;
use api::v1::meta::RequestHeader;
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::debug;
use common_telemetry::info;
@@ -23,17 +23,24 @@ use crate::error;
use crate::error::Result;
pub struct HeartbeatSender {
id: Id,
sender: mpsc::Sender<HeartbeatRequest>,
}
impl HeartbeatSender {
#[inline]
const fn new(sender: mpsc::Sender<HeartbeatRequest>) -> Self {
Self { sender }
const fn new(id: Id, sender: mpsc::Sender<HeartbeatRequest>) -> Self {
Self { id, sender }
}
#[inline]
pub async fn send(&self, req: HeartbeatRequest) -> Result<()> {
pub fn id(&self) -> Id {
self.id
}
#[inline]
pub async fn send(&self, mut req: HeartbeatRequest) -> Result<()> {
req.header = request_header(self.id);
self.sender.send(req).await.map_err(|e| {
error::SendHeartbeatSnafu {
err_msg: e.to_string(),
@@ -45,13 +52,19 @@ impl HeartbeatSender {
#[derive(Debug)]
pub struct HeartbeatStream {
id: Id,
stream: Streaming<HeartbeatResponse>,
}
impl HeartbeatStream {
#[inline]
const fn new(stream: Streaming<HeartbeatResponse>) -> Self {
Self { stream }
const fn new(id: Id, stream: Streaming<HeartbeatResponse>) -> Self {
Self { id, stream }
}
#[inline]
pub fn id(&self) -> Id {
self.id
}
/// Fetch the next message from this stream.
@@ -141,11 +154,12 @@ impl Inner {
}
);
// TODO(jiachun): set cluster_id and member_id
let header = RequestHeader::with_id(self.id);
let header = request_header(self.id);
let mut leader = None;
for addr in &self.peers {
let req = AskLeaderRequest::new(header.clone());
let req = AskLeaderRequest {
header: header.clone(),
};
let mut client = self.make_client(addr)?;
match client.ask_leader(req).await {
Ok(res) => {
@@ -168,7 +182,11 @@ impl Inner {
let mut leader = self.make_client(leader)?;
let (sender, receiver) = mpsc::channel::<HeartbeatRequest>(128);
let handshake = HeartbeatRequest::new(RequestHeader::with_id(self.id));
let header = request_header(self.id);
let handshake = HeartbeatRequest {
header,
..Default::default()
};
sender.send(handshake).await.map_err(|e| {
error::SendHeartbeatSnafu {
err_msg: e.to_string(),
@@ -190,7 +208,10 @@ impl Inner {
.context(error::CreateHeartbeatStreamSnafu)?;
info!("Success to create heartbeat stream to server: {:#?}", res);
Ok((HeartbeatSender::new(sender), HeartbeatStream::new(stream)))
Ok((
HeartbeatSender::new(self.id, sender),
HeartbeatStream::new(self.id, stream),
))
}
fn make_client(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
@@ -287,23 +308,18 @@ mod test {
#[tokio::test]
async fn test_heartbeat_stream() {
let (sender, mut receiver) = mpsc::channel::<HeartbeatRequest>(100);
let sender = HeartbeatSender::new(sender);
let sender = HeartbeatSender::new((8, 8), sender);
tokio::spawn(async move {
for i in 0..10 {
sender
.send(HeartbeatRequest::new(RequestHeader::new(i, i)))
.await
.unwrap();
for _ in 0..10 {
sender.send(HeartbeatRequest::default()).await.unwrap();
}
});
let mut i = 0;
while let Some(req) = receiver.recv().await {
let header = req.header.unwrap();
assert_eq!(i, header.cluster_id);
assert_eq!(i, header.member_id);
i += 1;
assert_eq!(8, header.cluster_id);
assert_eq!(8, header.member_id);
}
}
}

View File

@@ -1,6 +1,7 @@
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::request_header;
use api::v1::meta::router_client::RouterClient;
use api::v1::meta::CreateRequest;
use api::v1::meta::RouteRequest;
@@ -60,8 +61,7 @@ impl Client {
#[derive(Debug)]
struct Inner {
#[allow(dead_code)]
id: Id, // TODO(jiachun): will use it later
id: Id,
channel_manager: ChannelManager,
peers: Vec<String>,
}
@@ -90,17 +90,17 @@ impl Inner {
Ok(())
}
async fn route(&self, req: RouteRequest) -> Result<RouteResponse> {
async fn route(&self, mut req: RouteRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
req.header = request_header(self.id);
let res = client.route(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
async fn create(&self, req: CreateRequest) -> Result<RouteResponse> {
async fn create(&self, mut req: CreateRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
req.header = request_header(self.id);
let res = client.create(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
@@ -134,8 +134,6 @@ impl Inner {
#[cfg(test)]
mod test {
use api::v1::meta::{RequestHeader, TableName};
use super::*;
#[tokio::test]
@@ -188,8 +186,10 @@ mod test {
let mut client = Client::new((0, 0), ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
let header = RequestHeader::new(0, 0);
let req = CreateRequest::new(header, TableName::default());
let req = CreateRequest {
header: request_header((0, 0)),
..Default::default()
};
let res = client.create(req).await;
assert!(res.is_err());
@@ -205,8 +205,10 @@ mod test {
let mut client = Client::new((0, 0), ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
let header = RequestHeader::new(0, 0);
let req = RouteRequest::new(header);
let req = RouteRequest {
header: request_header((0, 0)),
..Default::default()
};
let res = client.route(req).await;
assert!(res.is_err());

View File

@@ -1,6 +1,7 @@
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::request_header;
use api::v1::meta::store_client::StoreClient;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse;
@@ -68,8 +69,7 @@ impl Client {
#[derive(Debug)]
struct Inner {
#[allow(dead_code)]
id: Id, // TODO(jiachun): will use it later
id: Id,
channel_manager: ChannelManager,
peers: Vec<String>,
}
@@ -98,25 +98,25 @@ impl Inner {
Ok(())
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
async fn range(&self, mut req: RangeRequest) -> Result<RangeResponse> {
let mut client = self.random_client()?;
req.header = request_header(self.id);
let res = client.range(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
async fn put(&self, req: PutRequest) -> Result<PutResponse> {
async fn put(&self, mut req: PutRequest) -> Result<PutResponse> {
let mut client = self.random_client()?;
req.header = request_header(self.id);
let res = client.put(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let mut client = self.random_client()?;
req.header = request_header(self.id);
let res = client
.delete_range(req)
.await

View File

@@ -45,6 +45,12 @@ pub enum Error {
#[snafu(display("Failed create heartbeat stream to server"))]
CreateHeartbeatStream { backtrace: Backtrace },
#[snafu(display("Route info corruped: {}", err_msg))]
RouteInfoCorrupted {
err_msg: String,
backtrace: Backtrace,
},
}
#[allow(dead_code)]
@@ -70,6 +76,7 @@ impl ErrorExt for Error {
| Error::SendHeartbeat { .. }
| Error::CreateHeartbeatStream { .. }
| Error::CreateChannel { .. } => StatusCode::Internal,
Error::RouteInfoCorrupted { .. } => StatusCode::Unexpected,
}
}
}
@@ -179,4 +186,15 @@ mod tests {
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_route_info_corruped_error() {
let e = throw_none_option()
.context(RouteInfoCorruptedSnafu { err_msg: "" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Unexpected);
}
}

View File

@@ -1,2 +1,3 @@
pub mod client;
mod error;
pub mod rpc;

176
src/meta-client/src/rpc.rs Normal file
View File

@@ -0,0 +1,176 @@
mod router;
mod store;
use api::v1::meta::KeyValue as PbKeyValue;
use api::v1::meta::Peer as PbPeer;
use api::v1::meta::ResponseHeader as PbResponseHeader;
use api::v1::meta::TableName as PbTableName;
pub use router::CreateRequest;
pub use router::Partition;
pub use router::Region;
pub use router::RouteRequest;
pub use router::RouteResponse;
pub use router::Table;
pub use router::TableRoute;
pub use store::DeleteRangeRequest;
pub use store::DeleteRangeResponse;
pub use store::PutRequest;
pub use store::PutResponse;
pub use store::RangeRequest;
pub use store::RangeResponse;
#[derive(Debug, Clone)]
pub struct ResponseHeader(PbResponseHeader);
impl ResponseHeader {
#[inline]
pub(crate) fn new(header: PbResponseHeader) -> Self {
Self(header)
}
#[inline]
pub fn protocol_version(&self) -> u64 {
self.0.protocol_version
}
#[inline]
pub fn cluster_id(&self) -> u64 {
self.0.cluster_id
}
#[inline]
pub fn error_code(&self) -> i32 {
match self.0.error.as_ref() {
Some(err) => err.code,
None => 0,
}
}
#[inline]
pub fn error_msg(&self) -> String {
match self.0.error.as_ref() {
Some(err) => err.err_msg.clone(),
None => "None".to_string(),
}
}
}
#[derive(Debug, Clone)]
pub struct KeyValue(PbKeyValue);
impl KeyValue {
#[inline]
pub(crate) fn new(kv: PbKeyValue) -> Self {
Self(kv)
}
#[inline]
pub fn key(&self) -> &[u8] {
&self.0.key
}
#[inline]
pub fn take_key(&mut self) -> Vec<u8> {
std::mem::take(&mut self.0.key)
}
#[inline]
pub fn value(&self) -> &[u8] {
&self.0.value
}
#[inline]
pub fn take_value(&mut self) -> Vec<u8> {
std::mem::take(&mut self.0.value)
}
}
#[derive(Debug, Clone)]
pub struct TableName {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
}
impl TableName {
pub fn new(
catalog_name: impl Into<String>,
schema_name: impl Into<String>,
table_name: impl Into<String>,
) -> Self {
Self {
catalog_name: catalog_name.into(),
schema_name: schema_name.into(),
table_name: table_name.into(),
}
}
}
impl From<TableName> for PbTableName {
fn from(tb: TableName) -> Self {
Self {
catalog_name: tb.catalog_name,
schema_name: tb.schema_name,
table_name: tb.table_name,
}
}
}
impl From<PbTableName> for TableName {
fn from(tb: PbTableName) -> Self {
Self {
catalog_name: tb.catalog_name,
schema_name: tb.schema_name,
table_name: tb.table_name,
}
}
}
#[derive(Debug, Clone)]
pub struct Peer {
pub id: u64,
pub addr: String,
}
impl From<PbPeer> for Peer {
fn from(p: PbPeer) -> Self {
Self {
id: p.id,
addr: p.addr,
}
}
}
impl Peer {
pub fn new(id: u64, addr: impl Into<String>) -> Self {
Self {
id,
addr: addr.into(),
}
}
}
#[cfg(test)]
mod tests {
use api::v1::meta::{Error, ResponseHeader as PbResponseHeader};
use super::*;
#[test]
fn test_response_header_trans() {
let pb_header = PbResponseHeader {
protocol_version: 101,
cluster_id: 1,
error: Some(Error {
code: 100,
err_msg: "test".to_string(),
}),
};
let header = ResponseHeader::new(pb_header);
assert_eq!(101, header.protocol_version());
assert_eq!(1, header.cluster_id());
assert_eq!(100, header.error_code());
assert_eq!("test".to_string(), header.error_msg());
}
}

View File

@@ -0,0 +1,345 @@
use std::collections::HashMap;
use api::v1::meta::CreateRequest as PbCreateRequest;
use api::v1::meta::Partition as PbPartition;
use api::v1::meta::Region as PbRegion;
use api::v1::meta::RouteRequest as PbRouteRequest;
use api::v1::meta::RouteResponse as PbRouteResponse;
use api::v1::meta::Table as PbTable;
use snafu::OptionExt;
use super::Peer;
use super::TableName;
use crate::error;
use crate::error::Result;
#[derive(Debug, Clone, Default)]
pub struct RouteRequest {
pub table_names: Vec<TableName>,
}
impl From<RouteRequest> for PbRouteRequest {
fn from(mut req: RouteRequest) -> Self {
Self {
header: None,
table_names: req.table_names.drain(..).map(Into::into).collect(),
}
}
}
impl RouteRequest {
#[inline]
pub fn new() -> Self {
Self {
table_names: vec![],
}
}
#[inline]
pub fn add_table_name(mut self, table_name: TableName) -> Self {
self.table_names.push(table_name);
self
}
}
#[derive(Debug, Clone)]
pub struct CreateRequest {
pub table_name: TableName,
pub partitions: Vec<Partition>,
}
impl From<CreateRequest> for PbCreateRequest {
fn from(mut req: CreateRequest) -> Self {
Self {
header: None,
table_name: Some(req.table_name.into()),
partitions: req.partitions.drain(..).map(Into::into).collect(),
}
}
}
impl CreateRequest {
#[inline]
pub fn new(table_name: TableName) -> Self {
Self {
table_name,
partitions: vec![],
}
}
#[inline]
pub fn add_partition(mut self, partition: Partition) -> Self {
self.partitions.push(partition);
self
}
}
#[derive(Debug, Clone)]
pub struct RouteResponse {
pub table_routes: Vec<TableRoute>,
}
impl TryFrom<PbRouteResponse> for RouteResponse {
type Error = error::Error;
fn try_from(pb: PbRouteResponse) -> Result<Self> {
let peers: Vec<Peer> = pb.peers.into_iter().map(Into::into).collect();
let get_peer = |index: u64| peers.get(index as usize).map(ToOwned::to_owned);
let mut table_routes = Vec::with_capacity(pb.table_routes.len());
for table_route in pb.table_routes.into_iter() {
let table = table_route
.table
.context(error::RouteInfoCorruptedSnafu {
err_msg: "table required",
})?
.try_into()?;
let region_routes = table_route
.region_routes
.into_iter()
.map(|region_route| {
let region = region_route.region.map(Into::into);
let leader_peer = get_peer(region_route.leader_peer_index);
let follower_peers = region_route
.follower_peer_indexes
.into_iter()
.filter_map(get_peer)
.collect::<Vec<_>>();
RegionRoute {
region,
leader_peer,
follower_peers,
}
})
.collect::<Vec<_>>();
table_routes.push(TableRoute {
table,
region_routes,
});
}
Ok(Self { table_routes })
}
}
#[derive(Debug, Clone)]
pub struct TableRoute {
pub table: Table,
pub region_routes: Vec<RegionRoute>,
}
#[derive(Debug, Clone)]
pub struct Table {
pub table_name: TableName,
pub table_schema: Vec<u8>,
}
impl TryFrom<PbTable> for Table {
type Error = error::Error;
fn try_from(t: PbTable) -> Result<Self> {
let table_name = t
.table_name
.context(error::RouteInfoCorruptedSnafu {
err_msg: "table name requied",
})?
.into();
Ok(Self {
table_name,
table_schema: t.table_schema,
})
}
}
#[derive(Debug, Clone, Default)]
pub struct RegionRoute {
pub region: Option<Region>,
pub leader_peer: Option<Peer>,
pub follower_peers: Vec<Peer>,
}
#[derive(Debug, Clone, Default)]
pub struct Region {
pub id: u64,
pub name: String,
pub partition: Option<Partition>,
pub attrs: HashMap<String, String>,
}
impl From<PbRegion> for Region {
fn from(r: PbRegion) -> Self {
Self {
id: r.id,
name: r.name,
partition: r.partition.map(Into::into),
attrs: r.attrs,
}
}
}
#[derive(Debug, Clone)]
pub struct Partition {
pub column_list: Vec<Vec<u8>>,
pub value_list: Vec<Vec<u8>>,
}
impl From<Partition> for PbPartition {
fn from(p: Partition) -> Self {
Self {
column_list: p.column_list,
value_list: p.value_list,
}
}
}
impl From<PbPartition> for Partition {
fn from(p: PbPartition) -> Self {
Self {
column_list: p.column_list,
value_list: p.value_list,
}
}
}
#[cfg(test)]
mod tests {
use api::v1::meta::Partition as PbPartition;
use api::v1::meta::Peer as PbPeer;
use api::v1::meta::Region as PbRegion;
use api::v1::meta::RegionRoute as PbRegionRoute;
use api::v1::meta::RouteRequest as PbRouteRequest;
use api::v1::meta::RouteResponse as PbRouteResponse;
use api::v1::meta::Table as PbTable;
use api::v1::meta::TableName as PbTableName;
use api::v1::meta::TableRoute as PbTableRoute;
use super::*;
#[test]
fn test_route_request_trans() {
let req = RouteRequest {
table_names: vec![
TableName::new("c1", "s1", "t1"),
TableName::new("c2", "s2", "t2"),
],
};
let into_req: PbRouteRequest = req.into();
assert!(into_req.header.is_none());
assert_eq!("c1", into_req.table_names.get(0).unwrap().catalog_name);
assert_eq!("s1", into_req.table_names.get(0).unwrap().schema_name);
assert_eq!("t1", into_req.table_names.get(0).unwrap().table_name);
assert_eq!("c2", into_req.table_names.get(1).unwrap().catalog_name);
assert_eq!("s2", into_req.table_names.get(1).unwrap().schema_name);
assert_eq!("t2", into_req.table_names.get(1).unwrap().table_name);
}
#[test]
fn test_create_request_trans() {
let req = CreateRequest {
table_name: TableName::new("c1", "s1", "t1"),
partitions: vec![
Partition {
column_list: vec![b"c1".to_vec(), b"c2".to_vec()],
value_list: vec![b"v1".to_vec(), b"v2".to_vec()],
},
Partition {
column_list: vec![b"c1".to_vec(), b"c2".to_vec()],
value_list: vec![b"v11".to_vec(), b"v22".to_vec()],
},
],
};
let into_req: PbCreateRequest = req.into();
assert!(into_req.header.is_none());
let table_name = into_req.table_name;
assert_eq!("c1", table_name.as_ref().unwrap().catalog_name);
assert_eq!("s1", table_name.as_ref().unwrap().schema_name);
assert_eq!("t1", table_name.as_ref().unwrap().table_name);
assert_eq!(
vec![b"c1".to_vec(), b"c2".to_vec()],
into_req.partitions.get(0).unwrap().column_list
);
assert_eq!(
vec![b"v1".to_vec(), b"v2".to_vec()],
into_req.partitions.get(0).unwrap().value_list
);
assert_eq!(
vec![b"c1".to_vec(), b"c2".to_vec()],
into_req.partitions.get(1).unwrap().column_list
);
assert_eq!(
vec![b"v11".to_vec(), b"v22".to_vec()],
into_req.partitions.get(1).unwrap().value_list
);
}
#[test]
fn test_route_response_trans() {
let res = PbRouteResponse {
header: None,
peers: vec![
PbPeer {
id: 1,
addr: "peer1".to_string(),
},
PbPeer {
id: 2,
addr: "peer2".to_string(),
},
],
table_routes: vec![PbTableRoute {
table: Some(PbTable {
table_name: Some(PbTableName {
catalog_name: "c1".to_string(),
schema_name: "s1".to_string(),
table_name: "t1".to_string(),
}),
table_schema: b"schema".to_vec(),
}),
region_routes: vec![PbRegionRoute {
region: Some(PbRegion {
id: 1,
name: "region1".to_string(),
partition: Some(PbPartition {
column_list: vec![b"c1".to_vec(), b"c2".to_vec()],
value_list: vec![b"v1".to_vec(), b"v2".to_vec()],
}),
attrs: Default::default(),
}),
leader_peer_index: 0,
follower_peer_indexes: vec![1],
}],
}],
};
let res: RouteResponse = res.try_into().unwrap();
let mut table_routes = res.table_routes;
assert_eq!(1, table_routes.len());
let table_route = table_routes.remove(0);
let table = table_route.table;
assert_eq!("c1", table.table_name.catalog_name);
assert_eq!("s1", table.table_name.schema_name);
assert_eq!("t1", table.table_name.table_name);
let mut region_routes = table_route.region_routes;
assert_eq!(1, region_routes.len());
let region_route = region_routes.remove(0);
let region = region_route.region.unwrap();
assert_eq!(1, region.id);
assert_eq!("region1", region.name);
let partition = region.partition.unwrap();
assert_eq!(vec![b"c1".to_vec(), b"c2".to_vec()], partition.column_list);
assert_eq!(vec![b"v1".to_vec(), b"v2".to_vec()], partition.value_list);
assert_eq!(1, region_route.leader_peer.as_ref().unwrap().id);
assert_eq!("peer1", region_route.leader_peer.as_ref().unwrap().addr);
assert_eq!(1, region_route.follower_peers.len());
assert_eq!(2, region_route.follower_peers.get(0).unwrap().id);
assert_eq!("peer2", region_route.follower_peers.get(0).unwrap().addr);
}
}

View File

@@ -0,0 +1,452 @@
use api::v1::meta::DeleteRangeRequest as PbDeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse as PbDeleteRangeResponse;
use api::v1::meta::PutRequest as PbPutRequest;
use api::v1::meta::PutResponse as PbPutResponse;
use api::v1::meta::RangeRequest as PbRangeRequest;
use api::v1::meta::RangeResponse as PbRangeResponse;
use super::KeyValue;
use super::ResponseHeader;
#[derive(Debug, Clone, Default)]
pub struct RangeRequest {
/// key is the first key for the range, If range_end is not given, the
/// request only looks up key.
pub key: Vec<u8>,
/// range_end is the upper bound on the requested range [key, range_end).
/// If range_end is '\0', the range is all keys >= key.
/// If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"),
/// then the range request gets all keys prefixed with key.
/// If both key and range_end are '\0', then the range request returns all
/// keys.
pub range_end: Vec<u8>,
/// limit is a limit on the number of keys returned for the request. When
/// limit is set to 0, it is treated as no limit.
pub limit: i64,
/// keys_only when set returns only the keys and not the values.
pub keys_only: bool,
}
impl From<RangeRequest> for PbRangeRequest {
fn from(req: RangeRequest) -> Self {
Self {
header: None,
key: req.key,
range_end: req.range_end,
limit: req.limit,
keys_only: req.keys_only,
}
}
}
impl RangeRequest {
#[inline]
pub fn new() -> Self {
Self {
key: vec![],
range_end: vec![],
limit: 0,
keys_only: false,
}
}
/// key is the first key for the range, If range_end is not given, the
/// request only looks up key.
#[inline]
pub fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.key = key.into();
self
}
/// range_end is the upper bound on the requested range [key, range_end).
/// If range_end is '\0', the range is all keys >= key.
/// If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"),
/// then the range request gets all keys prefixed with key.
/// If both key and range_end are '\0', then the range request returns all
/// keys.
#[inline]
pub fn with_range_end(mut self, range_end: impl Into<Vec<u8>>) -> Self {
self.range_end = range_end.into();
self
}
/// limit is a limit on the number of keys returned for the request. When
/// limit is set to 0, it is treated as no limit.
#[inline]
pub fn with_limit(mut self, limit: i64) -> Self {
self.limit = limit;
self
}
/// keys_only when set returns only the keys and not the values.
#[inline]
pub fn with_keys_only(mut self) -> Self {
self.keys_only = true;
self
}
}
#[derive(Debug, Clone)]
pub struct RangeResponse(PbRangeResponse);
impl From<PbRangeResponse> for RangeResponse {
fn from(res: PbRangeResponse) -> Self {
Self::new(res)
}
}
impl RangeResponse {
#[inline]
pub fn new(res: PbRangeResponse) -> Self {
Self(res)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub fn take_kvs(&mut self) -> Vec<KeyValue> {
self.0.kvs.drain(..).map(KeyValue::new).collect()
}
#[inline]
pub fn more(&self) -> bool {
self.0.more
}
}
#[derive(Debug, Clone, Default)]
pub struct PutRequest {
/// key is the key, in bytes, to put into the key-value store.
pub key: Vec<u8>,
/// value is the value, in bytes, to associate with the key in the
/// key-value store.
pub value: Vec<u8>,
/// If prev_kv is set, gets the previous key-value pair before changing it.
/// The previous key-value pair will be returned in the put response.
pub prev_kv: bool,
}
impl From<PutRequest> for PbPutRequest {
fn from(req: PutRequest) -> Self {
Self {
header: None,
key: req.key,
value: req.value,
prev_kv: req.prev_kv,
}
}
}
impl PutRequest {
#[inline]
pub fn new() -> Self {
Self {
key: vec![],
value: vec![],
prev_kv: false,
}
}
/// key is the key, in bytes, to put into the key-value store.
#[inline]
pub fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.key = key.into();
self
}
/// value is the value, in bytes, to associate with the key in the
/// key-value store.
#[inline]
pub fn with_value(mut self, value: impl Into<Vec<u8>>) -> Self {
self.value = value.into();
self
}
/// If prev_kv is set, gets the previous key-value pair before changing it.
/// The previous key-value pair will be returned in the put response.
#[inline]
pub fn with_prev_kv(mut self) -> Self {
self.prev_kv = true;
self
}
}
#[derive(Debug, Clone)]
pub struct PutResponse(PbPutResponse);
impl From<PbPutResponse> for PutResponse {
fn from(res: PbPutResponse) -> Self {
Self::new(res)
}
}
impl PutResponse {
#[inline]
pub fn new(res: PbPutResponse) -> Self {
Self(res)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub fn take_prev_kv(&mut self) -> Option<KeyValue> {
self.0.prev_kv.take().map(KeyValue::new)
}
}
#[derive(Debug, Clone, Default)]
pub struct DeleteRangeRequest {
/// key is the first key to delete in the range.
pub key: Vec<u8>,
/// range_end is the key following the last key to delete for the range
/// [key, range_end).
/// If range_end is not given, the range is defined to contain only the key
/// argument.
/// If range_end is one bit larger than the given key, then the range is all
/// the keys with the prefix (the given key).
/// If range_end is '\0', the range is all keys greater than or equal to the
/// key argument.
pub range_end: Vec<u8>,
/// If prev_kv is set, gets the previous key-value pairs before deleting it.
/// The previous key-value pairs will be returned in the delete response.
pub prev_kv: bool,
// TODO(jiachun):
// Add a "limit" in delete request?
// To avoid a huge delete block everything.
}
impl From<DeleteRangeRequest> for PbDeleteRangeRequest {
fn from(req: DeleteRangeRequest) -> Self {
Self {
header: None,
key: req.key,
range_end: req.range_end,
prev_kv: req.prev_kv,
}
}
}
impl DeleteRangeRequest {
#[inline]
pub fn new() -> Self {
Self {
key: vec![],
range_end: vec![],
prev_kv: false,
}
}
/// key is the first key to delete in the range.
#[inline]
pub fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.key = key.into();
self
}
/// range_end is the key following the last key to delete for the range
/// [key, range_end).
/// If range_end is not given, the range is defined to contain only the key
/// argument.
/// If range_end is one bit larger than the given key, then the range is all
/// the keys with the prefix (the given key).
/// If range_end is '\0', the range is all keys greater than or equal to the
/// key argument.
#[inline]
pub fn with_range_end(mut self, range_end: impl Into<Vec<u8>>) -> Self {
self.range_end = range_end.into();
self
}
/// If prev_kv is set, gets the previous key-value pairs before deleting it.
/// The previous key-value pairs will be returned in the delete response.
#[inline]
pub fn with_prev_kv(mut self) -> Self {
self.prev_kv = true;
self
}
}
#[derive(Debug, Clone)]
pub struct DeleteRangeResponse(PbDeleteRangeResponse);
impl From<PbDeleteRangeResponse> for DeleteRangeResponse {
fn from(res: PbDeleteRangeResponse) -> Self {
Self::new(res)
}
}
impl DeleteRangeResponse {
#[inline]
pub fn new(res: PbDeleteRangeResponse) -> Self {
Self(res)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
pub fn deleted(&self) -> i64 {
self.0.deleted
}
#[inline]
pub fn take_prev_kvs(&mut self) -> Vec<KeyValue> {
self.0.prev_kvs.drain(..).map(KeyValue::new).collect()
}
}
#[cfg(test)]
mod tests {
use api::v1::meta::DeleteRangeRequest as PbDeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse as PbDeleteRangeResponse;
use api::v1::meta::KeyValue as PbKeyValue;
use api::v1::meta::PutRequest as PbPutRequest;
use api::v1::meta::PutResponse as PbPutResponse;
use api::v1::meta::RangeRequest as PbRangeRequest;
use api::v1::meta::RangeResponse as PbRangeResponse;
use super::*;
#[test]
fn test_range_request_trans() {
let (key, range_end, limit) = (b"test_key1".to_vec(), b"test_range_end1".to_vec(), 1);
let req = RangeRequest::new()
.with_key(key.clone())
.with_range_end(range_end.clone())
.with_limit(limit)
.with_keys_only();
let into_req: PbRangeRequest = req.into();
assert!(into_req.header.is_none());
assert_eq!(key, into_req.key);
assert_eq!(range_end, into_req.range_end);
assert_eq!(limit, into_req.limit);
assert!(into_req.keys_only);
}
#[test]
fn test_range_response_trans() {
let pb_res = PbRangeResponse {
header: None,
kvs: vec![
PbKeyValue {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
},
PbKeyValue {
key: b"k2".to_vec(),
value: b"v2".to_vec(),
},
],
more: true,
};
let mut res = RangeResponse::new(pb_res);
assert!(res.take_header().is_none());
assert!(res.more());
let mut kvs = res.take_kvs();
let kv0 = kvs.get_mut(0).unwrap();
assert_eq!(b"k1".to_vec(), kv0.key().to_vec());
assert_eq!(b"k1".to_vec(), kv0.take_key());
assert_eq!(b"v1".to_vec(), kv0.value().to_vec());
assert_eq!(b"v1".to_vec(), kv0.take_value());
let kv1 = kvs.get_mut(1).unwrap();
assert_eq!(b"k2".to_vec(), kv1.key().to_vec());
assert_eq!(b"k2".to_vec(), kv1.take_key());
assert_eq!(b"v2".to_vec(), kv1.value().to_vec());
assert_eq!(b"v2".to_vec(), kv1.take_value());
}
#[test]
fn test_put_request_trans() {
let (key, value) = (b"test_key1".to_vec(), b"test_value1".to_vec());
let req = PutRequest::new()
.with_key(key.clone())
.with_value(value.clone())
.with_prev_kv();
let into_req: PbPutRequest = req.into();
assert!(into_req.header.is_none());
assert_eq!(key, into_req.key);
assert_eq!(value, into_req.value);
}
#[test]
fn test_put_response_trans() {
let pb_res = PbPutResponse {
header: None,
prev_kv: Some(PbKeyValue {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
}),
};
let mut res = PutResponse::new(pb_res);
assert!(res.take_header().is_none());
let mut kv = res.take_prev_kv().unwrap();
assert_eq!(b"k1".to_vec(), kv.key().to_vec());
assert_eq!(b"k1".to_vec(), kv.take_key());
assert_eq!(b"v1".to_vec(), kv.value().to_vec());
assert_eq!(b"v1".to_vec(), kv.take_value());
}
#[test]
fn test_delete_range_request_trans() {
let (key, range_end) = (b"test_key1".to_vec(), b"test_range_end1".to_vec());
let req = DeleteRangeRequest::new()
.with_key(key.clone())
.with_range_end(range_end.clone())
.with_prev_kv();
let into_req: PbDeleteRangeRequest = req.into();
assert!(into_req.header.is_none());
assert_eq!(key, into_req.key);
assert_eq!(range_end, into_req.range_end);
assert!(into_req.prev_kv);
}
#[test]
fn test_delete_range_response_trans() {
let pb_res = PbDeleteRangeResponse {
header: None,
deleted: 2,
prev_kvs: vec![
PbKeyValue {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
},
PbKeyValue {
key: b"k2".to_vec(),
value: b"v2".to_vec(),
},
],
};
let mut res: DeleteRangeResponse = pb_res.into();
assert!(res.take_header().is_none());
assert_eq!(2, res.deleted());
let mut kvs = res.take_prev_kvs();
let kv0 = kvs.get_mut(0).unwrap();
assert_eq!(b"k1".to_vec(), kv0.key().to_vec());
assert_eq!(b"k1".to_vec(), kv0.take_key());
assert_eq!(b"v1".to_vec(), kv0.value().to_vec());
assert_eq!(b"v1".to_vec(), kv0.take_value());
let kv1 = kvs.get_mut(1).unwrap();
assert_eq!(b"k2".to_vec(), kv1.key().to_vec());
assert_eq!(b"k2".to_vec(), kv1.take_key());
assert_eq!(b"v2".to_vec(), kv1.value().to_vec());
assert_eq!(b"v2".to_vec(), kv1.take_value());
}
}

View File

@@ -22,7 +22,7 @@ pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> crate::Result<()> {
})?;
let listener = TcpListenerStream::new(listener);
let meta_srv = MetaSrv::new(opts, kv_store);
let meta_srv = MetaSrv::new(opts, kv_store).await;
tonic::transport::Server::builder()
.accept_http1(true) // for admin services

View File

@@ -0,0 +1,96 @@
pub(crate) mod response_header;
use std::collections::BTreeMap;
use std::sync::Arc;
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::HeartbeatResponse;
use api::v1::meta::ResponseHeader;
use common_telemetry::info;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use crate::error::Result;
use crate::service::store::kv::KvStoreRef;
#[async_trait::async_trait]
pub trait HeartbeatHandler: Send + Sync {
async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &mut HeartbeatAccumulator,
store: KvStoreRef,
) -> Result<()>;
}
#[derive(Debug, Default)]
pub struct HeartbeatAccumulator {
pub header: Option<ResponseHeader>,
pub states: Vec<State>,
pub instructions: Vec<Instruction>,
}
impl HeartbeatAccumulator {
pub fn into_payload(self) -> Vec<Vec<u8>> {
// TODO(jiachun): to HeartbeatResponse payload
vec![]
}
}
#[derive(Debug)]
pub enum State {}
#[derive(Debug)]
pub enum Instruction {}
pub type Pusher = Sender<std::result::Result<HeartbeatResponse, tonic::Status>>;
#[derive(Clone)]
pub struct HeartbeatHandlers {
kv_store: KvStoreRef,
handlers: Arc<RwLock<Vec<Box<dyn HeartbeatHandler>>>>,
pushers: Arc<RwLock<BTreeMap<String, Pusher>>>,
}
impl HeartbeatHandlers {
pub fn new(kv_store: KvStoreRef) -> Self {
Self {
kv_store,
handlers: Arc::new(RwLock::new(Default::default())),
pushers: Arc::new(RwLock::new(Default::default())),
}
}
pub async fn add_handler(&self, handler: impl HeartbeatHandler + 'static) {
let mut handlers = self.handlers.write().await;
handlers.push(Box::new(handler));
}
pub async fn register(&self, key: impl AsRef<str>, pusher: Pusher) {
let mut pushers = self.pushers.write().await;
let key = key.as_ref();
info!("Pusher register: {}", key);
pushers.insert(key.into(), pusher);
}
pub async fn unregister(&self, key: impl AsRef<str>) -> Option<Pusher> {
let mut pushers = self.pushers.write().await;
let key = key.as_ref();
info!("Pusher unregister: {}", key);
pushers.remove(key)
}
pub async fn handle(&self, req: HeartbeatRequest) -> Result<HeartbeatResponse> {
let mut acc = HeartbeatAccumulator::default();
let handlers = self.handlers.read().await;
for h in handlers.iter() {
h.handle(&req, &mut acc, self.kv_store.clone()).await?;
}
let header = std::mem::take(&mut acc.header);
let res = HeartbeatResponse {
header,
payload: acc.into_payload(),
};
Ok(res)
}
}

View File

@@ -0,0 +1,61 @@
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::ResponseHeader;
use api::v1::meta::PROTOCOL_VERSION;
use super::HeartbeatAccumulator;
use super::HeartbeatHandler;
use crate::error::Result;
use crate::service::store::kv::KvStoreRef;
pub struct ResponseHeaderHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for ResponseHeaderHandler {
async fn handle(
&self,
req: &HeartbeatRequest,
acc: &mut HeartbeatAccumulator,
_store: KvStoreRef,
) -> Result<()> {
let HeartbeatRequest { header, .. } = req;
let res_header = ResponseHeader {
protocol_version: PROTOCOL_VERSION,
cluster_id: header.as_ref().map_or(0, |h| h.cluster_id),
..Default::default()
};
acc.header = Some(res_header);
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::meta::{request_header, HeartbeatResponse};
use super::*;
use crate::service::store::noop::NoopKvStore;
#[tokio::test]
async fn test_handle_heartbeat_resp_header() {
let kv_store = Arc::new(NoopKvStore {});
let req = HeartbeatRequest {
header: request_header((1, 2)),
..Default::default()
};
let mut acc = HeartbeatAccumulator::default();
let response_handler = ResponseHeaderHandler {};
response_handler
.handle(&req, &mut acc, kv_store)
.await
.unwrap();
let header = std::mem::take(&mut acc.header);
let res = HeartbeatResponse {
header,
payload: acc.into_payload(),
};
assert_eq!(1, res.header.unwrap().cluster_id);
}
}

View File

@@ -1,5 +1,6 @@
pub mod bootstrap;
pub mod error;
pub mod handler;
pub mod metasrv;
pub mod service;

View File

@@ -1,6 +1,8 @@
use serde::Deserialize;
use serde::Serialize;
use crate::handler::response_header::ResponseHeaderHandler;
use crate::handler::HeartbeatHandlers;
use crate::service::store::kv::KvStoreRef;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
@@ -24,18 +26,32 @@ impl Default for MetaSrvOptions {
pub struct MetaSrv {
options: MetaSrvOptions,
kv_store: KvStoreRef,
heartbeat_handlers: HeartbeatHandlers,
}
impl MetaSrv {
pub fn new(options: MetaSrvOptions, kv_store: KvStoreRef) -> Self {
Self { options, kv_store }
pub async fn new(options: MetaSrvOptions, kv_store: KvStoreRef) -> Self {
let heartbeat_handlers = HeartbeatHandlers::new(kv_store.clone());
heartbeat_handlers.add_handler(ResponseHeaderHandler).await;
Self {
options,
kv_store,
heartbeat_handlers,
}
}
#[inline]
pub fn options(&self) -> &MetaSrvOptions {
&self.options
}
#[inline]
pub fn kv_store(&self) -> KvStoreRef {
self.kv_store.clone()
}
#[inline]
pub fn heartbeat_handlers(&self) -> HeartbeatHandlers {
self.heartbeat_handlers.clone()
}
}

View File

@@ -1,11 +1,13 @@
use std::io::ErrorKind;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use api::v1::meta::heartbeat_server;
use api::v1::meta::AskLeaderRequest;
use api::v1::meta::AskLeaderResponse;
use api::v1::meta::Endpoint;
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::HeartbeatResponse;
use api::v1::meta::Peer;
use api::v1::meta::ResponseHeader;
use api::v1::meta::PROTOCOL_VERSION;
use common_telemetry::error;
@@ -17,13 +19,14 @@ use tonic::Request;
use tonic::Response;
use tonic::Streaming;
use super::store::kv::KvStoreRef;
use super::GrpcResult;
use super::GrpcStream;
use crate::error;
use crate::error::Result;
use crate::metasrv::MetaSrv;
static PUSHER_ID: AtomicU64 = AtomicU64::new(0);
#[async_trait::async_trait]
impl heartbeat_server::Heartbeat for MetaSrv {
type HeartbeatStream = GrpcStream<HeartbeatResponse>;
@@ -34,19 +37,29 @@ impl heartbeat_server::Heartbeat for MetaSrv {
) -> GrpcResult<Self::HeartbeatStream> {
let mut in_stream = req.into_inner();
let (tx, rx) = mpsc::channel(128);
let kv_store = self.kv_store();
let handlers = self.heartbeat_handlers();
common_runtime::spawn_bg(async move {
let mut pusher_key = None;
while let Some(msg) = in_stream.next().await {
match msg {
Ok(req) => tx
.send(
handle_heartbeat(req, kv_store.clone())
.await
.map_err(|e| e.into()),
)
.await
.expect("working rx"),
Ok(req) => {
if pusher_key.is_none() {
if let Some(ref peer) = req.peer {
let key = format!(
"{}-{}-{}",
peer.addr,
peer.id,
PUSHER_ID.fetch_add(1, Ordering::Relaxed)
);
handlers.register(&key, tx.clone()).await;
pusher_key = Some(key);
}
}
tx.send(handlers.handle(req).await.map_err(|e| e.into()))
.await
.expect("working rx");
}
Err(err) => {
if let Some(io_err) = error::match_for_io_error(&err) {
if io_err.kind() == ErrorKind::BrokenPipe {
@@ -63,12 +76,18 @@ impl heartbeat_server::Heartbeat for MetaSrv {
}
}
}
info!("Heartbeat stream broken: {:?}", in_stream);
info!(
"Heartbeat stream broken: {:?}",
pusher_key.as_ref().unwrap_or(&"unknow".to_string())
);
if let Some(key) = pusher_key {
let _ = handlers.unregister(&key);
}
});
let out_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(out_stream) as Self::HeartbeatStream))
Ok(Response::new(Box::pin(out_stream)))
}
async fn ask_leader(&self, req: Request<AskLeaderRequest>) -> GrpcResult<AskLeaderResponse> {
@@ -93,7 +112,8 @@ impl MetaSrv {
// TODO(jiachun): return leader
let res = AskLeaderResponse {
header: Some(res_header),
leader: Some(Endpoint {
leader: Some(Peer {
id: 0,
addr: self.options().server_addr.clone(),
}),
};
@@ -102,28 +122,6 @@ impl MetaSrv {
}
}
async fn handle_heartbeat(
req: HeartbeatRequest,
_kv_store: KvStoreRef,
) -> Result<HeartbeatResponse> {
let HeartbeatRequest { header, .. } = req;
let res_header = ResponseHeader {
protocol_version: PROTOCOL_VERSION,
cluster_id: header.map_or(0, |h| h.cluster_id),
..Default::default()
};
// TODO(jiachun) Do something high-end
let res = HeartbeatResponse {
header: Some(res_header),
..Default::default()
};
Ok(res)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -134,48 +132,16 @@ mod tests {
use super::*;
use crate::metasrv::MetaSrvOptions;
use crate::service::store::kv::KvStore;
#[derive(Clone)]
pub struct NoopKvStore;
#[async_trait::async_trait]
impl KvStore for NoopKvStore {
async fn range(&self, _req: RangeRequest) -> crate::Result<RangeResponse> {
unreachable!()
}
async fn put(&self, _req: PutRequest) -> crate::Result<PutResponse> {
unreachable!()
}
async fn delete_range(
&self,
_req: DeleteRangeRequest,
) -> crate::Result<DeleteRangeResponse> {
unreachable!()
}
}
#[tokio::test]
async fn test_handle_heartbeat_resp_header() {
let kv_store = Arc::new(NoopKvStore {});
let header = RequestHeader::new(1, 2);
let req = HeartbeatRequest::new(header);
let res = handle_heartbeat(req, kv_store).await.unwrap();
assert_eq!(1, res.header.unwrap().cluster_id);
}
use crate::service::store::noop::NoopKvStore;
#[tokio::test]
async fn test_ask_leader() {
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store);
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let header = RequestHeader::new(1, 1);
let req = AskLeaderRequest::new(header);
let req = AskLeaderRequest {
header: request_header((1, 1)),
};
let res = meta_srv.ask_leader(req.into_request()).await.unwrap();
let res = res.into_inner();

View File

@@ -41,7 +41,10 @@ async fn handle_create(req: CreateRequest, _kv_store: KvStoreRef) -> Result<Rout
let _table_name = table_name.context(error::EmptyTableNameSnafu)?;
// TODO(jiachun):
let peers = vec![Peer::new(0, "127.0.0.1:3000")];
let peers = vec![Peer {
id: 0,
addr: "127.0.0.1:3000".to_string(),
}];
Ok(RouteResponse {
peers,
@@ -59,36 +62,18 @@ mod tests {
use super::*;
use crate::metasrv::MetaSrvOptions;
use crate::service::store::kv::KvStore;
struct MockKvStore;
#[async_trait::async_trait]
impl KvStore for MockKvStore {
async fn range(&self, _req: RangeRequest) -> crate::Result<RangeResponse> {
unreachable!()
}
async fn put(&self, _req: PutRequest) -> crate::Result<PutResponse> {
unreachable!()
}
async fn delete_range(
&self,
_req: DeleteRangeRequest,
) -> crate::Result<DeleteRangeResponse> {
unreachable!()
}
}
use crate::service::store::noop::NoopKvStore;
#[should_panic]
#[tokio::test]
async fn test_handle_route() {
let kv_store = Arc::new(MockKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store);
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let header = RequestHeader::new(1, 1);
let req = RouteRequest::new(header);
let req = RouteRequest {
header: request_header((1, 1)),
..Default::default()
};
let req = req
.add_table(TableName::new("catalog1", "schema1", "table1"))
.add_table(TableName::new("catalog1", "schema1", "table2"))
@@ -99,12 +84,15 @@ mod tests {
#[tokio::test]
async fn test_handle_create() {
let kv_store = Arc::new(MockKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store);
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let header = RequestHeader::new(1, 1);
let table_name = TableName::new("test_catalog", "test_db", "table1");
let req = CreateRequest::new(header, table_name);
let req = CreateRequest {
header: request_header((1, 1)),
table_name: Some(table_name),
..Default::default()
};
let p0 = Partition::new()
.column_list(vec![b"col1".to_vec(), b"col2".to_vec()])
@@ -119,7 +107,7 @@ mod tests {
let res = meta_srv.create(req.into_request()).await.unwrap();
for r in res.into_inner().peers {
assert_eq!("127.0.0.1:3000", r.endpoint.unwrap().addr);
assert_eq!("127.0.0.1:3000", r.addr);
}
}
}

View File

@@ -1,5 +1,6 @@
pub mod etcd;
pub mod kv;
pub(crate) mod noop;
use api::v1::meta::store_server;
use api::v1::meta::DeleteRangeRequest;
@@ -51,32 +52,12 @@ mod tests {
use super::*;
use crate::metasrv::MetaSrvOptions;
use crate::service::store::kv::KvStore;
struct MockKvStore;
#[async_trait::async_trait]
impl KvStore for MockKvStore {
async fn range(&self, _req: RangeRequest) -> crate::Result<RangeResponse> {
Ok(RangeResponse::default())
}
async fn put(&self, _req: PutRequest) -> crate::Result<PutResponse> {
Ok(PutResponse::default())
}
async fn delete_range(
&self,
_req: DeleteRangeRequest,
) -> crate::Result<DeleteRangeResponse> {
Ok(DeleteRangeResponse::default())
}
}
use crate::service::store::noop::NoopKvStore;
#[tokio::test]
async fn test_range() {
let kv_store = Arc::new(MockKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store);
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let req = RangeRequest::default();
let res = meta_srv.range(req.into_request()).await;
@@ -85,8 +66,8 @@ mod tests {
#[tokio::test]
async fn test_put() {
let kv_store = Arc::new(MockKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store);
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let req = PutRequest::default();
let res = meta_srv.put(req.into_request()).await;
@@ -95,8 +76,8 @@ mod tests {
#[tokio::test]
async fn test_delete_range() {
let kv_store = Arc::new(MockKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store);
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let req = DeleteRangeRequest::default();
let res = meta_srv.delete_range(req.into_request()).await;

View File

@@ -0,0 +1,29 @@
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse;
use api::v1::meta::PutRequest;
use api::v1::meta::PutResponse;
use api::v1::meta::RangeRequest;
use api::v1::meta::RangeResponse;
use super::kv::KvStore;
use crate::error::Result;
/// A noop kv_store which only for test
// TODO(jiachun): Add a test feature
#[derive(Clone)]
pub struct NoopKvStore;
#[async_trait::async_trait]
impl KvStore for NoopKvStore {
async fn range(&self, _req: RangeRequest) -> Result<RangeResponse> {
Ok(RangeResponse::default())
}
async fn put(&self, _req: PutRequest) -> Result<PutResponse> {
Ok(PutResponse::default())
}
async fn delete_range(&self, _req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
Ok(DeleteRangeResponse::default())
}
}