chore: refactor meta protocol (#332)

* chore: refactor channel_config

* chore: refactor grpc protocol

* feat: heartbeat streams
This commit is contained in:
Jiachun Feng
2022-10-21 20:30:57 +08:00
committed by GitHub
parent bc9a2df9bf
commit b650656ae3
22 changed files with 617 additions and 310 deletions

3
Cargo.lock generated
View File

@@ -2639,6 +2639,7 @@ dependencies = [
"rand 0.8.5",
"snafu",
"tokio",
"tokio-stream",
"tonic",
"tracing",
"tracing-subscriber",
@@ -2652,9 +2653,11 @@ dependencies = [
"async-trait",
"common-base",
"common-error",
"common-runtime",
"common-telemetry",
"etcd-client",
"futures",
"h2",
"http-body",
"serde",
"snafu",

View File

@@ -1,2 +1,3 @@
bind_addr = '127.0.0.1:3002'
server_addr = '0.0.0.0:3002'
store_addr = '127.0.0.1:2380'

View File

@@ -31,6 +31,12 @@ message Endpoint {
string addr = 1;
}
message TableName {
string catalog_name = 1;
string schema_name = 2;
string table_name = 3;
}
message TimeInterval {
// The unix timestamp in millis of the start of this period.
uint64 start_timestamp_millis = 1;

View File

@@ -18,10 +18,10 @@ service Heartbeat {
message HeartbeatRequest {
RequestHeader header = 1;
// Self peer
Peer peer = 2;
// Leader node
bool is_leader = 2;
// Leader Peer
Endpoint leader_endpoint = 3;
bool is_leader = 3;
// Actually reported time interval
TimeInterval report_interval = 4;
// Node stat
@@ -48,15 +48,25 @@ message NodeStat {
double read_io_rate = 7;
// Write disk I/O in the node
double write_io_rate = 8;
// Others
map<string, string> attrs = 100;
}
message RegionStat {
string table_name = 1;
uint64 region_id = 2;
uint64 region_id = 1;
TableName table_name = 2;
// The read capacity units during this period
uint64 rcus = 3;
// The write capacity units during this period
uint64 wcus = 4;
// Approximate region size
uint64 approximate_size = 5;
// Approximate number of rows
uint64 approximate_rows = 6;
// Others
map<string, string> attrs = 100;
}
message ReplicaStat {

View File

@@ -24,7 +24,7 @@ service Router {
//
rpc Route(RouteRequest) returns (RouteResponse) {}
rpc Create(CreateRequest) returns (CreateResponse) {}
rpc Create(CreateRequest) returns (RouteResponse) {}
}
message RouteRequest {
@@ -44,13 +44,7 @@ message CreateRequest {
RequestHeader header = 1;
TableName table_name = 2;
repeated Region regions = 3;
}
message CreateResponse {
ResponseHeader header = 1;
repeated Region regions = 2;
repeated Partition partitions = 3;
}
message TableRoute {
@@ -66,12 +60,6 @@ message RegionRoute {
repeated uint64 follower_peer_indexes = 3;
}
message TableName {
string catalog_name = 1;
string schema_name = 2;
string table_name = 3;
}
message Table {
TableName table_name = 1;
bytes table_schema = 2;
@@ -80,14 +68,13 @@ message Table {
message Region {
uint64 id = 1;
string name = 2;
Peer peer = 3;
// PARTITION `region_name` VALUES LESS THAN (value_list)
message Partition {
repeated bytes column_list = 1;
repeated bytes value_list = 2;
}
Partition partition = 4;
map<string, string> attrs = 5;
Partition partition = 3;
map<string, string> attrs = 100;
}
// PARTITION `region_name` VALUES LESS THAN (value_list)
message Partition {
repeated bytes column_list = 1;
repeated bytes value_list = 2;
}

View File

@@ -20,8 +20,16 @@ impl From<&str> for Endpoint {
}
impl RequestHeader {
pub fn new(cluster_id: u64, member_id: u64) -> RequestHeader {
RequestHeader {
pub fn new(cluster_id: u64, member_id: u64) -> Self {
Self {
protocol_version: PROTOCOL_VERSION,
cluster_id,
member_id,
}
}
pub fn with_id((cluster_id, member_id): (u64, u64)) -> Self {
Self {
protocol_version: PROTOCOL_VERSION,
cluster_id,
member_id,
@@ -83,14 +91,14 @@ impl CreateRequest {
}
}
pub fn add_region(mut self, region: Region) -> Self {
self.regions.push(region);
pub fn add_partition(mut self, partition: Partition) -> Self {
self.partitions.push(partition);
self
}
}
impl Region {
pub fn new(id: u64, name: impl Into<String>, partition: region::Partition) -> Self {
pub fn new(id: u64, name: impl Into<String>, partition: Partition) -> Self {
Self {
id,
name: name.into(),
@@ -105,7 +113,7 @@ impl Region {
}
}
impl region::Partition {
impl Partition {
pub fn new() -> Self {
Default::default()
}

View File

@@ -36,6 +36,8 @@ impl SubCommand {
#[derive(Debug, Parser)]
struct StartCommand {
#[clap(long)]
bind_addr: Option<String>,
#[clap(long)]
server_addr: Option<String>,
#[clap(long)]
@@ -68,6 +70,9 @@ impl TryFrom<StartCommand> for MetaSrvOptions {
MetaSrvOptions::default()
};
if let Some(addr) = cmd.bind_addr {
opts.bind_addr = addr;
}
if let Some(addr) = cmd.server_addr {
opts.server_addr = addr;
}
@@ -86,11 +91,13 @@ mod tests {
#[test]
fn test_read_from_cmd() {
let cmd = StartCommand {
bind_addr: Some("127.0.0.1:3002".to_string()),
server_addr: Some("0.0.0.0:3002".to_string()),
store_addr: Some("127.0.0.1:2380".to_string()),
config_file: None,
};
let options: MetaSrvOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("0.0.0.0:3002".to_string(), options.server_addr);
assert_eq!("127.0.0.1:2380".to_string(), options.store_addr);
}
@@ -98,6 +105,7 @@ mod tests {
#[test]
fn test_read_from_config_file() {
let cmd = StartCommand {
bind_addr: None,
server_addr: None,
store_addr: None,
config_file: Some(format!(
@@ -106,6 +114,7 @@ mod tests {
)),
};
let options: MetaSrvOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("0.0.0.0:3002".to_string(), options.server_addr);
assert_eq!("127.0.0.1:2380".to_string(), options.store_addr);
}

View File

@@ -14,10 +14,95 @@ const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60;
#[derive(Clone, Debug)]
pub struct ChannelManager {
config: Option<ChannelConfig>,
config: ChannelConfig,
pool: Arc<Mutex<Pool>>,
}
impl Default for ChannelManager {
fn default() -> Self {
ChannelManager::with_config(ChannelConfig::default())
}
}
impl ChannelManager {
pub fn new() -> Self {
Default::default()
}
pub fn with_config(config: ChannelConfig) -> Self {
let pool = Pool {
channels: HashMap::default(),
};
let pool = Arc::new(Mutex::new(pool));
let cloned_pool = pool.clone();
common_runtime::spawn_bg(async move {
recycle_channel_in_loop(cloned_pool, RECYCLE_CHANNEL_INTERVAL_SECS).await;
});
Self { pool, config }
}
pub fn config(&self) -> &ChannelConfig {
&self.config
}
pub fn get(&self, addr: impl AsRef<str>) -> Result<InnerChannel> {
let addr = addr.as_ref();
let mut pool = self.pool.lock().unwrap();
if let Some(ch) = pool.get_mut(addr) {
ch.access += 1;
return Ok(ch.channel.clone());
}
let mut endpoint =
Endpoint::new(format!("http://{}", addr)).context(error::CreateChannelSnafu)?;
if let Some(dur) = self.config.timeout {
endpoint = endpoint.timeout(dur);
}
if let Some(dur) = self.config.connect_timeout {
endpoint = endpoint.connect_timeout(dur);
}
if let Some(limit) = self.config.concurrency_limit {
endpoint = endpoint.concurrency_limit(limit);
}
if let Some((limit, dur)) = self.config.rate_limit {
endpoint = endpoint.rate_limit(limit, dur);
}
if let Some(size) = self.config.initial_stream_window_size {
endpoint = endpoint.initial_stream_window_size(size);
}
if let Some(size) = self.config.initial_connection_window_size {
endpoint = endpoint.initial_connection_window_size(size);
}
if let Some(dur) = self.config.http2_keep_alive_interval {
endpoint = endpoint.http2_keep_alive_interval(dur);
}
if let Some(dur) = self.config.http2_keep_alive_timeout {
endpoint = endpoint.keep_alive_timeout(dur);
}
if let Some(enabled) = self.config.http2_keep_alive_while_idle {
endpoint = endpoint.keep_alive_while_idle(enabled);
}
if let Some(enabled) = self.config.http2_adaptive_window {
endpoint = endpoint.http2_adaptive_window(enabled);
}
endpoint = endpoint
.tcp_keepalive(self.config.tcp_keepalive)
.tcp_nodelay(self.config.tcp_nodelay);
let inner_channel = endpoint.connect_lazy();
let channel = Channel {
channel: inner_channel.clone(),
access: 1,
};
pool.put(addr, channel);
Ok(inner_channel)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ChannelConfig {
pub timeout: Option<Duration>,
@@ -172,85 +257,6 @@ struct Pool {
channels: HashMap<String, Channel>,
}
#[derive(Debug)]
struct Channel {
channel: InnerChannel,
access: usize,
}
impl ChannelManager {
pub fn new() -> Self {
Default::default()
}
pub fn with_config(config: ChannelConfig) -> Self {
let mut manager = ChannelManager::new();
manager.config = Some(config);
manager
}
pub fn config(&self) -> Option<ChannelConfig> {
self.config.clone()
}
pub fn get(&self, addr: impl AsRef<str>) -> Result<InnerChannel> {
let addr = addr.as_ref();
let mut pool = self.pool.lock().unwrap();
if let Some(ch) = pool.get_mut(addr) {
ch.access += 1;
return Ok(ch.channel.clone());
}
let mut endpoint =
Endpoint::new(format!("http://{}", addr)).context(error::CreateChannelSnafu)?;
if let Some(cfg) = &self.config {
if let Some(dur) = cfg.timeout {
endpoint = endpoint.timeout(dur);
}
if let Some(dur) = cfg.connect_timeout {
endpoint = endpoint.connect_timeout(dur);
}
if let Some(limit) = cfg.concurrency_limit {
endpoint = endpoint.concurrency_limit(limit);
}
if let Some((limit, dur)) = cfg.rate_limit {
endpoint = endpoint.rate_limit(limit, dur);
}
if let Some(size) = cfg.initial_stream_window_size {
endpoint = endpoint.initial_stream_window_size(size);
}
if let Some(size) = cfg.initial_connection_window_size {
endpoint = endpoint.initial_connection_window_size(size);
}
if let Some(dur) = cfg.http2_keep_alive_interval {
endpoint = endpoint.http2_keep_alive_interval(dur);
}
if let Some(dur) = cfg.http2_keep_alive_timeout {
endpoint = endpoint.keep_alive_timeout(dur);
}
if let Some(enabled) = cfg.http2_keep_alive_while_idle {
endpoint = endpoint.keep_alive_while_idle(enabled);
}
if let Some(enabled) = cfg.http2_adaptive_window {
endpoint = endpoint.http2_adaptive_window(enabled);
}
endpoint = endpoint
.tcp_keepalive(cfg.tcp_keepalive)
.tcp_nodelay(cfg.tcp_nodelay);
}
let inner_channel = endpoint.connect_lazy();
let channel = Channel {
channel: inner_channel.clone(),
access: 1,
};
pool.put(addr, channel);
Ok(inner_channel)
}
}
impl Pool {
#[inline]
fn get_mut(&mut self, addr: &str) -> Option<&mut Channel> {
@@ -271,20 +277,10 @@ impl Pool {
}
}
impl Default for ChannelManager {
fn default() -> Self {
let pool = Pool {
channels: HashMap::default(),
};
let pool = Arc::new(Mutex::new(pool));
let cloned_pool = pool.clone();
common_runtime::spawn_bg(async move {
recycle_channel_in_loop(cloned_pool, RECYCLE_CHANNEL_INTERVAL_SECS).await;
});
Self { pool, config: None }
}
#[derive(Debug)]
struct Channel {
channel: InnerChannel,
access: usize,
}
async fn recycle_channel_in_loop(pool: Arc<Mutex<Pool>>, interval_secs: u64) {
@@ -315,7 +311,10 @@ mod tests {
channels: HashMap::default(),
};
let pool = Arc::new(Mutex::new(pool));
let mgr = ChannelManager { pool, config: None };
let mgr = ChannelManager {
pool,
..Default::default()
};
let addr = "http://test";
let _ = mgr.get(addr).unwrap();
@@ -340,10 +339,7 @@ mod tests {
.http2_adaptive_window(true)
.tcp_keepalive(Duration::from_secs(1))
.tcp_nodelay(true);
let mgr = ChannelManager {
pool,
config: Some(config),
};
let mgr = ChannelManager { pool, config };
let addr = "test_uri";
for i in 0..10 {

View File

@@ -13,6 +13,7 @@ etcd-client = "0.10"
rand = "0.8"
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.18", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"
[dev-dependencies]

View File

@@ -1,11 +1,11 @@
use std::time::Duration;
use api::v1::meta::region::Partition;
use api::v1::meta::CreateRequest;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::Partition;
use api::v1::meta::PutRequest;
use api::v1::meta::RangeRequest;
use api::v1::meta::Region;
use api::v1::meta::RequestHeader;
use api::v1::meta::TableName;
use common_grpc::channel_manager::ChannelConfig;
@@ -23,22 +23,37 @@ fn main() {
#[tokio::main]
async fn run() {
let id = (1000u64, 2000u64);
let config = ChannelConfig::new()
.timeout(Duration::from_secs(3))
.connect_timeout(Duration::from_secs(5))
.tcp_nodelay(true);
let channel_manager = ChannelManager::with_config(config);
let mut meta_client = MetaClientBuilder::new()
.heartbeat_client(true)
.router_client(true)
.store_client(true)
let mut meta_client = MetaClientBuilder::new(id.0, id.1)
.enable_heartbeat()
.enable_router()
.enable_store()
.channel_manager(channel_manager)
.build();
meta_client.start(&["127.0.0.1:3002"]).await.unwrap();
// required only when the heartbeat_client is enabled
meta_client.ask_leader().await.unwrap();
let header = RequestHeader::new(0, 0);
let (sender, mut receiver) = meta_client.heartbeat().await.unwrap();
// send heartbeats
tokio::spawn(async move {
for _ in 0..5 {
let req = HeartbeatRequest::new(RequestHeader::with_id(id));
sender.send(req).await.unwrap();
}
});
while let Some(res) = receiver.message().await.unwrap() {
event!(Level::INFO, "heartbeat response: {:#?}", res);
}
let header = RequestHeader::with_id(id);
let p1 = Partition::new()
.column_list(vec![b"col_1".to_vec(), b"col_2".to_vec()])
@@ -51,8 +66,8 @@ async fn run() {
let table_name = TableName::new("test_catlog", "test_schema", "test_table");
let create_req = CreateRequest::new(header, table_name)
.add_region(Region::new(0, "test_region1", p1))
.add_region(Region::new(1, "test_region2", p2));
.add_partition(p1)
.add_partition(p2);
let res = meta_client.create_route(create_req).await.unwrap();
event!(Level::INFO, "create_route result: {:#?}", res);

View File

@@ -4,7 +4,6 @@ mod router;
mod store;
use api::v1::meta::CreateRequest;
use api::v1::meta::CreateResponse;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse;
use api::v1::meta::PutRequest;
@@ -21,39 +20,47 @@ use router::Client as RouterClient;
use snafu::OptionExt;
use store::Client as StoreClient;
use self::heartbeat::HeartbeatSender;
use self::heartbeat::HeartbeatStream;
use crate::error;
use crate::error::Result;
pub type Id = (u64, u64);
#[derive(Clone, Debug, Default)]
pub struct MetaClientBuilder {
heartbeat_client: bool,
router_client: bool,
store_client: bool,
id: Id,
enable_heartbeat: bool,
enable_router: bool,
enable_store: bool,
channel_manager: Option<ChannelManager>,
}
impl MetaClientBuilder {
pub fn new() -> Self {
MetaClientBuilder::default()
pub fn new(cluster_id: u64, member_id: u64) -> Self {
Self {
id: (cluster_id, member_id),
..Default::default()
}
}
pub fn heartbeat_client(self, enabled: bool) -> Self {
pub fn enable_heartbeat(self) -> Self {
Self {
heartbeat_client: enabled,
enable_heartbeat: true,
..self
}
}
pub fn router_client(self, enabled: bool) -> Self {
pub fn enable_router(self) -> Self {
Self {
router_client: enabled,
enable_router: true,
..self
}
}
pub fn store_client(self, enabled: bool) -> Self {
pub fn enable_store(self) -> Self {
Self {
store_client: enabled,
enable_store: true,
..self
}
}
@@ -66,39 +73,37 @@ impl MetaClientBuilder {
}
pub fn build(self) -> MetaClient {
let mut meta_client = if let Some(mgr) = self.channel_manager {
MetaClient {
channel_manager: mgr,
..Default::default()
}
let mut client = if let Some(mgr) = self.channel_manager {
MetaClient::with_channel_manager(self.id, mgr)
} else {
Default::default()
MetaClient::new(self.id)
};
if let (false, false, false) =
(self.heartbeat_client, self.router_client, self.store_client)
(self.enable_heartbeat, self.enable_router, self.enable_store)
{
panic!("At least one client needs to be enabled.")
}
let mgr = meta_client.channel_manager.clone();
let mgr = client.channel_manager.clone();
if self.heartbeat_client {
meta_client.heartbeat_client = Some(HeartbeatClient::new(mgr.clone()));
if self.enable_heartbeat {
client.heartbeat_client = Some(HeartbeatClient::new(self.id, mgr.clone()));
}
if self.router_client {
meta_client.router_client = Some(RouterClient::new(mgr.clone()));
if self.enable_router {
client.router_client = Some(RouterClient::new(self.id, mgr.clone()));
}
if self.store_client {
meta_client.store_client = Some(StoreClient::new(mgr));
if self.enable_store {
client.store_client = Some(StoreClient::new(self.id, mgr));
}
meta_client
client
}
}
#[derive(Clone, Debug, Default)]
pub struct MetaClient {
id: Id,
channel_manager: ChannelManager,
heartbeat_client: Option<HeartbeatClient>,
router_client: Option<RouterClient>,
@@ -106,6 +111,21 @@ pub struct MetaClient {
}
impl MetaClient {
pub fn new(id: Id) -> Self {
Self {
id,
..Default::default()
}
}
pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self {
Self {
id,
channel_manager,
..Default::default()
}
}
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
@@ -142,7 +162,16 @@ impl MetaClient {
todo!()
}
pub async fn create_route(&self, req: CreateRequest) -> Result<CreateResponse> {
pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
self.heartbeat_client()
.context(error::NotStartedSnafu {
name: "heartbeat_client",
})?
.heartbeat()
.await
}
pub async fn create_route(&self, req: CreateRequest) -> Result<RouteResponse> {
self.router_client()
.context(error::NotStartedSnafu {
name: "route_client",
@@ -224,9 +253,15 @@ impl MetaClient {
self.store_client.clone()
}
pub fn channel_config(&self) -> Option<ChannelConfig> {
#[inline]
pub fn channel_config(&self) -> &ChannelConfig {
self.channel_manager.config()
}
#[inline]
pub fn id(&self) -> Id {
self.id
}
}
#[cfg(test)]
@@ -237,32 +272,34 @@ mod tests {
async fn test_meta_client_builder() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new().heartbeat_client(true).build();
let mut meta_client = MetaClientBuilder::new(0, 0).enable_heartbeat().build();
assert!(meta_client.heartbeat_client().is_some());
assert!(meta_client.router_client().is_none());
assert!(meta_client.store_client().is_none());
meta_client.start(urls).await.unwrap();
assert!(meta_client.heartbeat_client().unwrap().is_started().await);
let mut meta_client = MetaClientBuilder::new().router_client(true).build();
let mut meta_client = MetaClientBuilder::new(0, 0).enable_router().build();
assert!(meta_client.heartbeat_client().is_none());
assert!(meta_client.router_client().is_some());
assert!(meta_client.store_client().is_none());
meta_client.start(urls).await.unwrap();
assert!(meta_client.router_client().unwrap().is_started().await);
let mut meta_client = MetaClientBuilder::new().store_client(true).build();
let mut meta_client = MetaClientBuilder::new(0, 0).enable_store().build();
assert!(meta_client.heartbeat_client().is_none());
assert!(meta_client.router_client().is_none());
assert!(meta_client.store_client().is_some());
meta_client.start(urls).await.unwrap();
assert!(meta_client.store_client().unwrap().is_started().await);
let mut meta_client = MetaClientBuilder::new()
.heartbeat_client(true)
.router_client(true)
.store_client(true)
let mut meta_client = MetaClientBuilder::new(1, 2)
.enable_heartbeat()
.enable_router()
.enable_store()
.build();
assert_eq!(1, meta_client.id().0);
assert_eq!(2, meta_client.id().1);
assert!(meta_client.heartbeat_client().is_some());
assert!(meta_client.router_client().is_some());
assert!(meta_client.store_client().is_some());
@@ -276,9 +313,9 @@ mod tests {
async fn test_not_start_heartbeat_client() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new()
.router_client(true)
.store_client(true)
let mut meta_client = MetaClientBuilder::new(0, 0)
.enable_router()
.enable_store()
.build();
meta_client.start(urls).await.unwrap();
@@ -292,9 +329,9 @@ mod tests {
async fn test_not_start_router_client() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new()
.heartbeat_client(true)
.store_client(true)
let mut meta_client = MetaClientBuilder::new(0, 0)
.enable_heartbeat()
.enable_store()
.build();
meta_client.start(urls).await.unwrap();
@@ -308,9 +345,9 @@ mod tests {
async fn test_not_start_store_client() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new()
.heartbeat_client(true)
.router_client(true)
let mut meta_client = MetaClientBuilder::new(0, 0)
.enable_heartbeat()
.enable_router()
.build();
meta_client.start(urls).await.unwrap();
@@ -323,10 +360,6 @@ mod tests {
#[should_panic]
#[test]
fn test_enable_at_least_one_client() {
let _ = MetaClientBuilder::new()
.heartbeat_client(false)
.router_client(false)
.store_client(false)
.build();
let _ = MetaClientBuilder::new(0, 0).build();
}
}

View File

@@ -3,34 +3,79 @@ use std::sync::Arc;
use api::v1::meta::heartbeat_client::HeartbeatClient;
use api::v1::meta::AskLeaderRequest;
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::HeartbeatResponse;
use api::v1::meta::RequestHeader;
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::debug;
use common_telemetry::info;
use snafu::ensure;
use snafu::OptionExt;
use snafu::ResultExt;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Channel;
use tonic::Streaming;
use super::Id;
use crate::error;
use crate::error::Result;
pub struct HeartbeatSender {
sender: mpsc::Sender<HeartbeatRequest>,
}
impl HeartbeatSender {
#[inline]
const fn new(sender: mpsc::Sender<HeartbeatRequest>) -> Self {
Self { sender }
}
#[inline]
pub async fn send(&self, req: HeartbeatRequest) -> Result<()> {
self.sender.send(req).await.map_err(|e| {
error::SendHeartbeatSnafu {
err_msg: e.to_string(),
}
.build()
})
}
}
#[derive(Debug)]
pub struct HeartbeatStream {
stream: Streaming<HeartbeatResponse>,
}
impl HeartbeatStream {
#[inline]
const fn new(stream: Streaming<HeartbeatResponse>) -> Self {
Self { stream }
}
/// Fetch the next message from this stream.
#[inline]
pub async fn message(&mut self) -> Result<Option<HeartbeatResponse>> {
self.stream.message().await.context(error::TonicStatusSnafu)
}
}
#[derive(Clone, Debug)]
pub struct Client {
inner: Arc<RwLock<Inner>>,
}
impl Client {
pub fn new(channel_manager: ChannelManager) -> Self {
let inner = Inner {
pub fn new(id: Id, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
channel_manager,
peers: HashSet::default(),
leader: None,
};
}));
Self {
inner: Arc::new(RwLock::new(inner)),
}
Self { inner }
}
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
@@ -47,16 +92,20 @@ impl Client {
inner.ask_leader().await
}
pub async fn heartbeat(&mut self) -> Result<(HeartbeatSender, HeartbeatStream)> {
let inner = self.inner.read().await;
inner.heartbeat().await
}
pub async fn is_started(&self) -> bool {
let inner = self.inner.read().await;
inner.is_started()
}
// TODO(jiachun) send heartbeat
}
#[derive(Debug)]
struct Inner {
id: Id,
channel_manager: ChannelManager,
peers: HashSet<String>,
leader: Option<String>,
@@ -93,7 +142,7 @@ impl Inner {
);
// TODO(jiachun): set cluster_id and member_id
let header = RequestHeader::new(0, 0);
let header = RequestHeader::with_id(self.id);
let mut leader = None;
for addr in &self.peers {
let req = AskLeaderRequest::new(header.clone());
@@ -114,6 +163,36 @@ impl Inner {
Ok(())
}
async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
let leader = self.leader.as_ref().context(error::NoLeaderSnafu)?;
let mut leader = self.make_client(leader)?;
let (sender, receiver) = mpsc::channel::<HeartbeatRequest>(128);
let handshake = HeartbeatRequest::new(RequestHeader::with_id(self.id));
sender.send(handshake).await.map_err(|e| {
error::SendHeartbeatSnafu {
err_msg: e.to_string(),
}
.build()
})?;
let receiver = ReceiverStream::new(receiver);
let mut stream = leader
.heartbeat(receiver)
.await
.context(error::TonicStatusSnafu)?
.into_inner();
let res = stream
.message()
.await
.context(error::TonicStatusSnafu)?
.context(error::CreateHeartbeatStreamSnafu)?;
info!("Success to create heartbeat stream to server: {:#?}", res);
Ok((HeartbeatSender::new(sender), HeartbeatStream::new(stream)))
}
fn make_client(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
let channel = self
.channel_manager
@@ -135,7 +214,7 @@ mod test {
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new(ChannelManager::default());
let mut client = Client::new((0, 0), ChannelManager::default());
assert!(!client.is_started().await);
@@ -149,7 +228,7 @@ mod test {
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new(ChannelManager::default());
let mut client = Client::new((0, 0), ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
@@ -169,7 +248,7 @@ mod test {
#[tokio::test]
async fn test_start_with_duplicate_peers() {
let mut client = Client::new(ChannelManager::default());
let mut client = Client::new((0, 0), ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
.await
@@ -180,7 +259,7 @@ mod test {
#[tokio::test]
async fn test_ask_leader_unavailable() {
let mut client = Client::new(ChannelManager::default());
let mut client = Client::new((0, 0), ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
let res = client.ask_leader().await;
@@ -190,4 +269,41 @@ mod test {
let err = res.err().unwrap();
assert!(matches!(err, error::Error::AskLeader { .. }));
}
#[tokio::test]
async fn test_heartbeat_unavailable() {
let mut client = Client::new((0, 0), ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
client.inner.write().await.leader = Some("unavailable".to_string());
let res = client.heartbeat().await;
assert!(res.is_err());
let err = res.err().unwrap();
assert!(matches!(err, error::Error::TonicStatus { .. }));
}
#[tokio::test]
async fn test_heartbeat_stream() {
let (sender, mut receiver) = mpsc::channel::<HeartbeatRequest>(100);
let sender = HeartbeatSender::new(sender);
tokio::spawn(async move {
for i in 0..10 {
sender
.send(HeartbeatRequest::new(RequestHeader::new(i, i)))
.await
.unwrap();
}
});
let mut i = 0;
while let Some(req) = receiver.recv().await {
let header = req.header.unwrap();
assert_eq!(i, header.cluster_id);
assert_eq!(i, header.member_id);
i += 1;
}
}
}

View File

@@ -3,7 +3,6 @@ use std::sync::Arc;
use api::v1::meta::router_client::RouterClient;
use api::v1::meta::CreateRequest;
use api::v1::meta::CreateResponse;
use api::v1::meta::RouteRequest;
use api::v1::meta::RouteResponse;
use common_grpc::channel_manager::ChannelManager;
@@ -13,6 +12,7 @@ use snafu::ResultExt;
use tokio::sync::RwLock;
use tonic::transport::Channel;
use super::Id;
use crate::client::load_balance as lb;
use crate::error;
use crate::error::Result;
@@ -23,15 +23,14 @@ pub struct Client {
}
impl Client {
pub fn new(channel_manager: ChannelManager) -> Self {
let inner = Inner {
pub fn new(id: Id, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
channel_manager,
peers: vec![],
};
}));
Self {
inner: Arc::new(RwLock::new(inner)),
}
Self { inner }
}
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
@@ -48,7 +47,7 @@ impl Client {
inner.is_started()
}
pub async fn create(&self, req: CreateRequest) -> Result<CreateResponse> {
pub async fn create(&self, req: CreateRequest) -> Result<RouteResponse> {
let inner = self.inner.read().await;
inner.create(req).await
}
@@ -61,6 +60,8 @@ impl Client {
#[derive(Debug)]
struct Inner {
#[allow(dead_code)]
id: Id, // TODO(jiachun): will use it later
channel_manager: ChannelManager,
peers: Vec<String>,
}
@@ -97,7 +98,7 @@ impl Inner {
Ok(res.into_inner())
}
async fn create(&self, req: CreateRequest) -> Result<CreateResponse> {
async fn create(&self, req: CreateRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
let res = client.create(req).await.context(error::TonicStatusSnafu)?;
@@ -139,7 +140,7 @@ mod test {
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new(ChannelManager::default());
let mut client = Client::new((0, 0), ChannelManager::default());
assert!(!client.is_started().await);
@@ -153,7 +154,7 @@ mod test {
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new(ChannelManager::default());
let mut client = Client::new((0, 0), ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
@@ -173,7 +174,7 @@ mod test {
#[tokio::test]
async fn test_start_with_duplicate_peers() {
let mut client = Client::new(ChannelManager::default());
let mut client = Client::new((0, 0), ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
.await
@@ -184,7 +185,7 @@ mod test {
#[tokio::test]
async fn test_create_unavailable() {
let mut client = Client::new(ChannelManager::default());
let mut client = Client::new((0, 0), ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
let header = RequestHeader::new(0, 0);
@@ -201,7 +202,7 @@ mod test {
#[tokio::test]
async fn test_route_unavailable() {
let mut client = Client::new(ChannelManager::default());
let mut client = Client::new((0, 0), ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
let header = RequestHeader::new(0, 0);

View File

@@ -15,6 +15,7 @@ use snafu::ResultExt;
use tokio::sync::RwLock;
use tonic::transport::Channel;
use super::Id;
use crate::client::load_balance as lb;
use crate::error;
use crate::error::Result;
@@ -25,15 +26,14 @@ pub struct Client {
}
impl Client {
pub fn new(channel_manager: ChannelManager) -> Self {
let inner = Inner {
pub fn new(id: Id, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
channel_manager,
peers: vec![],
};
}));
Self {
inner: Arc::new(RwLock::new(inner)),
}
Self { inner }
}
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
@@ -68,6 +68,8 @@ impl Client {
#[derive(Debug)]
struct Inner {
#[allow(dead_code)]
id: Id, // TODO(jiachun): will use it later
channel_manager: ChannelManager,
peers: Vec<String>,
}
@@ -155,7 +157,7 @@ mod test {
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new(ChannelManager::default());
let mut client = Client::new((0, 0), ChannelManager::default());
assert!(!client.is_started().await);
@@ -169,7 +171,7 @@ mod test {
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new(ChannelManager::default());
let mut client = Client::new((0, 0), ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
@@ -189,7 +191,7 @@ mod test {
#[tokio::test]
async fn test_start_with_duplicate_peers() {
let mut client = Client::new(ChannelManager::default());
let mut client = Client::new((0, 0), ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
.await
@@ -200,7 +202,7 @@ mod test {
#[tokio::test]
async fn test_range_unavailable() {
let mut client = Client::new(ChannelManager::default());
let mut client = Client::new((0, 0), ChannelManager::default());
client.start(&["unknow_peer"]).await.unwrap();
let req = RangeRequest {
@@ -219,7 +221,7 @@ mod test {
#[tokio::test]
async fn test_put_unavailable() {
let mut client = Client::new(ChannelManager::default());
let mut client = Client::new((0, 0), ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
let req = PutRequest {
@@ -240,7 +242,7 @@ mod test {
#[tokio::test]
async fn test_delete_range_unavailable() {
let mut client = Client::new(ChannelManager::default());
let mut client = Client::new((0, 0), ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
let req = DeleteRangeRequest {

View File

@@ -25,6 +25,9 @@ pub enum Error {
#[snafu(display("Failed to ask leader from all endpoints"))]
AskLeader { backtrace: Backtrace },
#[snafu(display("No leader, should ask leader first"))]
NoLeader { backtrace: Backtrace },
#[snafu(display("Failed to create gRPC channel, source: {}", source))]
CreateChannel {
#[snafu(backtrace)]
@@ -33,6 +36,15 @@ pub enum Error {
#[snafu(display("{} not started", name))]
NotStarted { name: String, backtrace: Backtrace },
#[snafu(display("Failed to send heartbeat: {}", err_msg))]
SendHeartbeat {
err_msg: String,
backtrace: Backtrace,
},
#[snafu(display("Failed create heartbeat stream to server"))]
CreateHeartbeatStream { backtrace: Backtrace },
}
#[allow(dead_code)]
@@ -53,7 +65,10 @@ impl ErrorExt for Error {
| Error::IllegalGrpcClientState { .. }
| Error::TonicStatus { .. }
| Error::AskLeader { .. }
| Error::NoLeader { .. }
| Error::NotStarted { .. }
| Error::SendHeartbeat { .. }
| Error::CreateHeartbeatStream { .. }
| Error::CreateChannel { .. } => StatusCode::Internal,
}
}
@@ -118,6 +133,14 @@ mod tests {
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_no_leader_error() {
let e = throw_none_option().context(NoLeaderSnafu).err().unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_create_channel_error() {
fn throw_common_grpc_error() -> StdResult<common_grpc::Error> {
@@ -134,4 +157,26 @@ mod tests {
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_send_heartbeat_error() {
let e = throw_none_option()
.context(SendHeartbeatSnafu { err_msg: "" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_create_heartbeat_stream_error() {
let e = throw_none_option()
.context(CreateHeartbeatStreamSnafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
}

View File

@@ -9,10 +9,12 @@ api = { path = "../api" }
async-trait = "0.1"
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
etcd-client = "0.10"
futures = "0.3"
http-body = "0.4"
h2 = "0.3"
serde = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.0", features = ["full"] }

View File

@@ -14,15 +14,16 @@ use crate::service::store::etcd::EtcdStore;
// Bootstrap the rpc server to serve incoming request
pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> crate::Result<()> {
let kv_store = EtcdStore::with_endpoints([&opts.store_addr]).await?;
let meta_srv = MetaSrv::new(kv_store);
let listener = TcpListener::bind(&opts.server_addr)
let listener = TcpListener::bind(&opts.bind_addr)
.await
.context(error::TcpBindSnafu {
addr: &opts.server_addr,
addr: &opts.bind_addr,
})?;
let listener = TcpListenerStream::new(listener);
let meta_srv = MetaSrv::new(opts, kv_store);
tonic::transport::Server::builder()
.accept_http1(true) // for admin services
.add_service(HeartbeatServer::new(meta_srv.clone()))

View File

@@ -34,6 +34,9 @@ pub enum Error {
source: tonic::transport::Error,
backtrace: Backtrace,
},
#[snafu(display("Empty table name"))]
EmptyTableName { backtrace: Backtrace },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -60,11 +63,35 @@ impl ErrorExt for Error {
| Error::ConnectEtcd { .. }
| Error::TcpBind { .. }
| Error::StartGrpc { .. } => StatusCode::Internal,
Error::EmptyKey { .. } => StatusCode::InvalidArguments,
Error::EmptyKey { .. } | Error::EmptyTableName { .. } => StatusCode::InvalidArguments,
}
}
}
// for form tonic
pub(crate) fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
let mut err: &(dyn std::error::Error + 'static) = err_status;
loop {
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
return Some(io_err);
}
// h2::Error do not expose std::io::Error with `source()`
// https://github.com/hyperium/h2/pull/462
if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
if let Some(io_err) = h2_err.get_io() {
return Some(io_err);
}
}
err = match err.source() {
Some(err) => err,
None => return None,
};
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -142,4 +169,15 @@ mod tests {
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_empty_table_error() {
let e = throw_none_option()
.context(EmptyTableNameSnafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
}
}

View File

@@ -5,6 +5,7 @@ use crate::service::store::kv::KvStoreRef;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct MetaSrvOptions {
pub bind_addr: String,
pub server_addr: String,
pub store_addr: String,
}
@@ -12,6 +13,7 @@ pub struct MetaSrvOptions {
impl Default for MetaSrvOptions {
fn default() -> Self {
Self {
bind_addr: "0.0.0.0:3002".to_string(),
server_addr: "0.0.0.0:3002".to_string(),
store_addr: "0.0.0.0:2380".to_string(),
}
@@ -20,12 +22,17 @@ impl Default for MetaSrvOptions {
#[derive(Clone)]
pub struct MetaSrv {
options: MetaSrvOptions,
kv_store: KvStoreRef,
}
impl MetaSrv {
pub fn new(kv_store: KvStoreRef) -> Self {
Self { kv_store }
pub fn new(options: MetaSrvOptions, kv_store: KvStoreRef) -> Self {
Self { options, kv_store }
}
pub fn options(&self) -> &MetaSrvOptions {
&self.options
}
pub fn kv_store(&self) -> KvStoreRef {

View File

@@ -1,3 +1,5 @@
use std::io::ErrorKind;
use api::v1::meta::heartbeat_server;
use api::v1::meta::AskLeaderRequest;
use api::v1::meta::AskLeaderResponse;
@@ -6,9 +8,11 @@ use api::v1::meta::HeartbeatRequest;
use api::v1::meta::HeartbeatResponse;
use api::v1::meta::ResponseHeader;
use api::v1::meta::PROTOCOL_VERSION;
use common_telemetry::error;
use common_telemetry::info;
use futures::StreamExt;
use futures::TryFutureExt;
use snafu::OptionExt;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Request;
use tonic::Response;
use tonic::Streaming;
@@ -28,30 +32,81 @@ impl heartbeat_server::Heartbeat for MetaSrv {
&self,
req: Request<Streaming<HeartbeatRequest>>,
) -> GrpcResult<Self::HeartbeatStream> {
let msg = req
.into_inner()
.next()
.await
.context(error::StreamNoneSnafu {})??;
let mut in_stream = req.into_inner();
let (tx, rx) = mpsc::channel(128);
let res = handle_heartbeat(msg).map_err(|e| e.into());
let kv_store = self.kv_store();
common_runtime::spawn_bg(async move {
while let Some(msg) = in_stream.next().await {
match msg {
Ok(req) => tx
.send(
handle_heartbeat(req, kv_store.clone())
.await
.map_err(|e| e.into()),
)
.await
.expect("working rx"),
Err(err) => {
if let Some(io_err) = error::match_for_io_error(&err) {
if io_err.kind() == ErrorKind::BrokenPipe {
// client disconnected in unexpected way
error!("Client disconnected: broken pipe");
break;
}
}
let output = futures::stream::once(res);
match tx.send(Err(err)).await {
Ok(_) => (),
Err(_err) => break, // response was droped
}
}
}
}
info!("Heartbeat stream broken: {:?}", in_stream);
});
Ok(Response::new(Box::pin(output)))
let out_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(out_stream) as Self::HeartbeatStream))
}
async fn ask_leader(&self, req: Request<AskLeaderRequest>) -> GrpcResult<AskLeaderResponse> {
let req = req.into_inner();
let kv_store = self.kv_store();
let res = handle_ask_leader(req, kv_store).await?;
let res = self.handle_ask_leader(req).await?;
Ok(Response::new(res))
}
}
async fn handle_heartbeat(msg: HeartbeatRequest) -> Result<HeartbeatResponse> {
let HeartbeatRequest { header, .. } = msg;
impl MetaSrv {
// TODO(jiachun): move out when we can get the leader peer from kv store
async fn handle_ask_leader(&self, req: AskLeaderRequest) -> Result<AskLeaderResponse> {
let AskLeaderRequest { header, .. } = req;
let res_header = ResponseHeader {
protocol_version: PROTOCOL_VERSION,
cluster_id: header.map_or(0u64, |h| h.cluster_id),
..Default::default()
};
// TODO(jiachun): return leader
let res = AskLeaderResponse {
header: Some(res_header),
leader: Some(Endpoint {
addr: self.options().server_addr.clone(),
}),
};
Ok(res)
}
}
async fn handle_heartbeat(
req: HeartbeatRequest,
_kv_store: KvStoreRef,
) -> Result<HeartbeatResponse> {
let HeartbeatRequest { header, .. } = req;
let res_header = ResponseHeader {
protocol_version: PROTOCOL_VERSION,
@@ -69,29 +124,6 @@ async fn handle_heartbeat(msg: HeartbeatRequest) -> Result<HeartbeatResponse> {
Ok(res)
}
async fn handle_ask_leader(
req: AskLeaderRequest,
_kv_store: KvStoreRef,
) -> Result<AskLeaderResponse> {
let AskLeaderRequest { header, .. } = req;
let res_header = ResponseHeader {
protocol_version: PROTOCOL_VERSION,
cluster_id: header.map_or(0u64, |h| h.cluster_id),
..Default::default()
};
// TODO(jiachun): return leader
let res = AskLeaderResponse {
header: Some(res_header),
leader: Some(Endpoint {
addr: "127.0.0.1:3002".to_string(),
}),
};
Ok(res)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -105,15 +137,7 @@ mod tests {
use crate::service::store::kv::KvStore;
#[derive(Clone)]
pub struct NoopKvStore {
_opts: MetaSrvOptions,
}
impl NoopKvStore {
pub fn new(opts: MetaSrvOptions) -> Self {
Self { _opts: opts }
}
}
pub struct NoopKvStore;
#[async_trait::async_trait]
impl KvStore for NoopKvStore {
@@ -135,18 +159,20 @@ mod tests {
#[tokio::test]
async fn test_handle_heartbeat_resp_header() {
let kv_store = Arc::new(NoopKvStore {});
let header = RequestHeader::new(1, 2);
let req = HeartbeatRequest::new(header);
let res = handle_heartbeat(req).await.unwrap();
let res = handle_heartbeat(req, kv_store).await.unwrap();
assert_eq!(1, res.header.unwrap().cluster_id);
}
#[tokio::test]
async fn test_ask_leader() {
let kv_store = Arc::new(NoopKvStore::new(MetaSrvOptions::default()));
let meta_srv = MetaSrv::new(kv_store);
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store);
let header = RequestHeader::new(1, 1);
let req = AskLeaderRequest::new(header);
@@ -154,6 +180,6 @@ mod tests {
let res = meta_srv.ask_leader(req.into_request()).await.unwrap();
let res = res.into_inner();
assert_eq!(1, res.header.unwrap().cluster_id);
assert_eq!("127.0.0.1:3002".to_string(), res.leader.unwrap().addr);
assert_eq!(meta_srv.options().bind_addr, res.leader.unwrap().addr);
}
}

View File

@@ -1,14 +1,15 @@
use api::v1::meta::router_server;
use api::v1::meta::CreateRequest;
use api::v1::meta::CreateResponse;
use api::v1::meta::Peer;
use api::v1::meta::RouteRequest;
use api::v1::meta::RouteResponse;
use snafu::OptionExt;
use tonic::Request;
use tonic::Response;
use super::store::kv::KvStoreRef;
use super::GrpcResult;
use crate::error;
use crate::error::Result;
use crate::metasrv::MetaSrv;
@@ -22,7 +23,7 @@ impl router_server::Router for MetaSrv {
Ok(Response::new(res))
}
async fn create(&self, req: Request<CreateRequest>) -> GrpcResult<CreateResponse> {
async fn create(&self, req: Request<CreateRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let kv_store = self.kv_store();
let res = handle_create(req, kv_store).await?;
@@ -35,16 +36,15 @@ async fn handle_route(_req: RouteRequest, _kv_store: KvStoreRef) -> Result<Route
todo!()
}
async fn handle_create(req: CreateRequest, _kv_store: KvStoreRef) -> Result<CreateResponse> {
let CreateRequest { mut regions, .. } = req;
async fn handle_create(req: CreateRequest, _kv_store: KvStoreRef) -> Result<RouteResponse> {
let CreateRequest { table_name, .. } = req;
let _table_name = table_name.context(error::EmptyTableNameSnafu)?;
// TODO(jiachun): route table
for r in &mut regions {
r.peer = Some(Peer::new(0, "127.0.0.1:3000"));
}
// TODO(jiachun):
let peers = vec![Peer::new(0, "127.0.0.1:3000")];
Ok(CreateResponse {
regions,
Ok(RouteResponse {
peers,
..Default::default()
})
}
@@ -58,6 +58,7 @@ mod tests {
use tonic::IntoRequest;
use super::*;
use crate::metasrv::MetaSrvOptions;
use crate::service::store::kv::KvStore;
struct MockKvStore;
@@ -84,7 +85,7 @@ mod tests {
#[tokio::test]
async fn test_handle_route() {
let kv_store = Arc::new(MockKvStore {});
let meta_srv = MetaSrv::new(kv_store);
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store);
let header = RequestHeader::new(1, 1);
let req = RouteRequest::new(header);
@@ -99,28 +100,26 @@ mod tests {
#[tokio::test]
async fn test_handle_create() {
let kv_store = Arc::new(MockKvStore {});
let meta_srv = MetaSrv::new(kv_store);
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store);
let header = RequestHeader::new(1, 1);
let table_name = TableName::new("test_catalog", "test_db", "table1");
let req = CreateRequest::new(header, table_name);
let p = region::Partition::new()
let p0 = Partition::new()
.column_list(vec![b"col1".to_vec(), b"col2".to_vec()])
.value_list(vec![b"v1".to_vec(), b"v2".to_vec()]);
let r1 = Region::new(1, "region1", p);
let p = region::Partition::new()
let p1 = Partition::new()
.column_list(vec![b"col1".to_vec(), b"col2".to_vec()])
.value_list(vec![b"v11".to_vec(), b"v22".to_vec()]);
let r2 = Region::new(1, "region2", p);
let req = req.add_region(r1).add_region(r2);
let req = req.add_partition(p0).add_partition(p1);
let res = meta_srv.create(req.into_request()).await.unwrap();
for r in res.into_inner().regions {
assert_eq!("127.0.0.1:3000", r.peer.unwrap().endpoint.unwrap().addr);
for r in res.into_inner().peers {
assert_eq!("127.0.0.1:3000", r.endpoint.unwrap().addr);
}
}
}

View File

@@ -50,6 +50,7 @@ mod tests {
use tonic::IntoRequest;
use super::*;
use crate::metasrv::MetaSrvOptions;
use crate::service::store::kv::KvStore;
struct MockKvStore;
@@ -75,7 +76,7 @@ mod tests {
#[tokio::test]
async fn test_range() {
let kv_store = Arc::new(MockKvStore {});
let meta_srv = MetaSrv::new(kv_store);
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store);
let req = RangeRequest::default();
let res = meta_srv.range(req.into_request()).await;
@@ -85,7 +86,7 @@ mod tests {
#[tokio::test]
async fn test_put() {
let kv_store = Arc::new(MockKvStore {});
let meta_srv = MetaSrv::new(kv_store);
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store);
let req = PutRequest::default();
let res = meta_srv.put(req.into_request()).await;
@@ -95,7 +96,7 @@ mod tests {
#[tokio::test]
async fn test_delete_range() {
let kv_store = Arc::new(MockKvStore {});
let meta_srv = MetaSrv::new(kv_store);
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store);
let req = DeleteRangeRequest::default();
let res = meta_srv.delete_range(req.into_request()).await;