feat: metasrv (#300)

* meta: meta api&client

* meta: heartbeat server init

* feat: kv store

* chore: grpc server

* chore: meta server bootstrap

* feat: heartbeat client

* feat: route for create table

* chore: a channel pool manager

* feat: route client

* feat: store client

* chore: meta_client example

* chore: change schema

* chore: unit test & by cr

* chore: refactor meta client

* chore: add unit test
This commit is contained in:
Jiachun Feng
2022-10-19 11:02:58 +08:00
committed by GitHub
parent 4d08ee6fbb
commit d5b34f8917
43 changed files with 3846 additions and 12 deletions

59
Cargo.lock generated
View File

@@ -837,6 +837,7 @@ dependencies = [
"datanode",
"frontend",
"futures",
"meta-srv",
"serde",
"snafu",
"tempdir",
@@ -919,8 +920,11 @@ dependencies = [
"async-trait",
"common-base",
"common-error",
"common-runtime",
"datafusion",
"snafu",
"tokio",
"tonic",
]
[[package]]
@@ -1614,6 +1618,22 @@ dependencies = [
"str-buf",
]
[[package]]
name = "etcd-client"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1259da3b15ec7e54bd7203adb2c4335adb9ca1d47b56220d650e52c247e824a"
dependencies = [
"http",
"prost",
"tokio",
"tokio-stream",
"tonic",
"tonic-build",
"tower",
"tower-service",
]
[[package]]
name = "event-listener"
version = "2.5.3"
@@ -2606,6 +2626,45 @@ dependencies = [
"autocfg",
]
[[package]]
name = "meta-client"
version = "0.1.0"
dependencies = [
"api",
"common-error",
"common-grpc",
"common-telemetry",
"etcd-client",
"rand 0.8.5",
"snafu",
"tokio",
"tonic",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "meta-srv"
version = "0.1.0"
dependencies = [
"api",
"async-trait",
"common-base",
"common-error",
"common-telemetry",
"etcd-client",
"futures",
"http-body",
"serde",
"snafu",
"tokio",
"tokio-stream",
"tonic",
"tracing",
"tracing-subscriber",
"url",
]
[[package]]
name = "metrics"
version = "0.20.1"

View File

@@ -19,6 +19,8 @@ members = [
"src/frontend",
"src/log-store",
"src/logical-plans",
"src/meta-client",
"src/meta-srv",
"src/object-store",
"src/query",
"src/script",

View File

@@ -0,0 +1,2 @@
server_addr = '0.0.0.0:3002'
store_addr = '127.0.0.1:2380'

View File

@@ -6,6 +6,10 @@ fn main() {
"greptime/v1/select.proto",
"greptime/v1/physical_plan.proto",
"greptime/v1/greptime.proto",
"greptime/v1/meta/common.proto",
"greptime/v1/meta/heartbeat.proto",
"greptime/v1/meta/route.proto",
"greptime/v1/meta/store.proto",
],
&["."],
)

View File

@@ -0,0 +1,46 @@
syntax = "proto3";
package greptime.v1.meta;
message RequestHeader {
uint64 protocol_version = 1;
// cluster_id is the ID of the cluster which be sent to.
uint64 cluster_id = 2;
// member_id is the ID of the sender server.
uint64 member_id = 3;
}
message ResponseHeader {
uint64 protocol_version = 1;
// cluster_id is the ID of the cluster which sent the response.
uint64 cluster_id = 2;
Error error = 3;
}
message Error {
int32 code = 1;
string err_msg = 2;
}
message Peer {
uint64 id = 1;
Endpoint endpoint = 2;
}
message Endpoint {
string addr = 1;
}
message TimeInterval {
// The unix timestamp in millis of the start of this period.
uint64 start_timestamp_millis = 1;
// The unix timestamp in millis of the end of this period.
uint64 end_timestamp_millis = 2;
}
message KeyValue {
// key is the key in bytes. An empty key is not allowed.
bytes key = 1;
// value is the value held by the key, in bytes.
bytes value = 2;
}

View File

@@ -0,0 +1,82 @@
syntax = "proto3";
package greptime.v1.meta;
import "greptime/v1/meta/common.proto";
service Heartbeat {
// Heartbeat, there may be many contents of the heartbeat, such as:
// 1. Metadata to be registered to meta server and discoverable by other nodes.
// 2. Some performance metrics, such as Load, CPU usage, etc.
// 3. The number of computing tasks being executed.
rpc Heartbeat(stream HeartbeatRequest) returns (stream HeartbeatResponse) {}
// Ask leader's endpoint.
rpc AskLeader(AskLeaderRequest) returns (AskLeaderResponse) {}
}
message HeartbeatRequest {
RequestHeader header = 1;
// Leader node
bool is_leader = 2;
// Leader Peer
Endpoint leader_endpoint = 3;
// Actually reported time interval
TimeInterval report_interval = 4;
// Node stat
NodeStat node_stat = 5;
// Region stats in this node
repeated RegionStat region_stats = 6;
// Follower nodes and stats, empty on follower nodes
repeated ReplicaStat replica_stats = 7;
}
message NodeStat {
// The read capacity units during this period
uint64 rcus = 1;
// The write capacity units during this period
uint64 wcus = 2;
// Table number in this node
uint64 table_num = 3;
// Regon number in this node
uint64 region_num = 4;
double cpu_usage = 5;
double load = 6;
// Read disk I/O in the node
double read_io_rate = 7;
// Write disk I/O in the node
double write_io_rate = 8;
}
message RegionStat {
string table_name = 1;
uint64 region_id = 2;
// The read capacity units during this period
uint64 rcus = 3;
// The write capacity units during this period
uint64 wcus = 4;
}
message ReplicaStat {
Peer peer = 1;
bool in_sync = 2;
bool is_learner = 3;
}
message HeartbeatResponse {
ResponseHeader header = 1;
repeated bytes payload = 2;
}
message AskLeaderRequest {
RequestHeader header = 1;
}
message AskLeaderResponse {
ResponseHeader header = 1;
Endpoint leader = 2;
}

View File

@@ -0,0 +1,93 @@
syntax = "proto3";
package greptime.v1.meta;
import "greptime/v1/meta/common.proto";
service Router {
// Fetch routing information for tables. The smallest unit is the complete
// routing information(all regions) of a table.
//
// table_1
// table_name
// table_schema
// regions
// region_1
// mutate_endpoint
// select_endpoint_1, select_endpoint_2
// region_2
// mutate_endpoint
// select_endpoint_1, select_endpoint_2, select_endpoint_3
// region_xxx
// table_2
// ...
//
rpc Route(RouteRequest) returns (RouteResponse) {}
rpc Create(CreateRequest) returns (CreateResponse) {}
}
message RouteRequest {
RequestHeader header = 1;
repeated TableName table_names = 2;
}
message RouteResponse {
ResponseHeader header = 1;
repeated Peer peers = 2;
repeated TableRoute table_routes = 3;
}
message CreateRequest {
RequestHeader header = 1;
TableName table_name = 2;
repeated Region regions = 3;
}
message CreateResponse {
ResponseHeader header = 1;
repeated Region regions = 2;
}
message TableRoute {
Table table = 1;
repeated RegionRoute region_routes = 2;
}
message RegionRoute {
Region region = 1;
// single leader node for write task
uint64 leader_peer_index = 2;
// multiple follower nodes for read task
repeated uint64 follower_peer_indexes = 3;
}
message TableName {
string catalog_name = 1;
string schema_name = 2;
string table_name = 3;
}
message Table {
TableName table_name = 1;
bytes table_schema = 2;
}
message Region {
uint64 id = 1;
string name = 2;
Peer peer = 3;
// PARTITION `region_name` VALUES LESS THAN (value_list)
message Partition {
repeated bytes column_list = 1;
repeated bytes value_list = 2;
}
Partition partition = 4;
map<string, string> attrs = 5;
}

View File

@@ -0,0 +1,94 @@
syntax = "proto3";
package greptime.v1.meta;
import "greptime/v1/meta/common.proto";
service Store {
// Range gets the keys in the range from the key-value store.
rpc Range(RangeRequest) returns (RangeResponse);
// Put puts the given key into the key-value store.
rpc Put(PutRequest) returns (PutResponse);
// DeleteRange deletes the given range from the key-value store.
rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse);
}
message RangeRequest {
RequestHeader header = 1;
// key is the first key for the range, If range_end is not given, the
// request only looks up key.
bytes key = 2;
// range_end is the upper bound on the requested range [key, range_end).
// If range_end is '\0', the range is all keys >= key.
// If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"),
// then the range request gets all keys prefixed with key.
// If both key and range_end are '\0', then the range request returns all
// keys.
bytes range_end = 3;
// limit is a limit on the number of keys returned for the request. When
// limit is set to 0, it is treated as no limit.
int64 limit = 4;
// keys_only when set returns only the keys and not the values.
bool keys_only = 5;
}
message RangeResponse {
ResponseHeader header = 1;
// kvs is the list of key-value pairs matched by the range request.
repeated KeyValue kvs = 2;
bool more = 3;
}
message PutRequest {
RequestHeader header = 1;
// key is the key, in bytes, to put into the key-value store.
bytes key = 2;
// value is the value, in bytes, to associate with the key in the
// key-value store.
bytes value = 3;
// If prev_kv is set, gets the previous key-value pair before changing it.
// The previous key-value pair will be returned in the put response.
bool prev_kv = 4;
}
message PutResponse {
ResponseHeader header = 1;
// If prev_kv is set in the request, the previous key-value pair will be
// returned.
KeyValue prev_kv = 2;
}
message DeleteRangeRequest {
RequestHeader header = 1;
// key is the first key to delete in the range.
bytes key = 2;
// range_end is the key following the last key to delete for the range
// [key, range_end).
// If range_end is not given, the range is defined to contain only the key
// argument.
// If range_end is one bit larger than the given key, then the range is all
// the keys with the prefix (the given key).
// If range_end is '\0', the range is all keys greater than or equal to the
// key argument.
bytes range_end = 3;
// If prev_kv is set, gets the previous key-value pairs before deleting it.
// The previous key-value pairs will be returned in the delete response.
bool prev_kv = 4;
}
message DeleteRangeResponse {
ResponseHeader header = 1;
// deleted is the number of keys deleted by the delete range request.
int64 deleted = 2;
// If prev_kv is set in the request, the previous key-value pairs will be
// returned.
repeated KeyValue prev_kvs = 3;
}

View File

@@ -4,3 +4,5 @@ tonic::include_proto!("greptime.v1");
pub mod codec {
tonic::include_proto!("greptime.v1.codec");
}
pub mod meta;

139
src/api/src/v1/meta.rs Normal file
View File

@@ -0,0 +1,139 @@
tonic::include_proto!("greptime.v1.meta");
pub const PROTOCOL_VERSION: u64 = 1;
impl Peer {
pub fn new(id: u64, addr: impl AsRef<str>) -> Self {
Self {
id,
endpoint: Some(addr.as_ref().into()),
}
}
}
impl From<&str> for Endpoint {
fn from(s: &str) -> Self {
Self {
addr: s.to_string(),
}
}
}
impl RequestHeader {
pub fn new(cluster_id: u64, member_id: u64) -> RequestHeader {
RequestHeader {
protocol_version: PROTOCOL_VERSION,
cluster_id,
member_id,
}
}
}
impl HeartbeatRequest {
pub fn new(header: RequestHeader) -> Self {
Self {
header: Some(header),
..Default::default()
}
}
}
impl AskLeaderRequest {
pub fn new(header: RequestHeader) -> Self {
Self {
header: Some(header),
}
}
}
impl TableName {
pub fn new(
catalog: impl Into<String>,
schema: impl Into<String>,
table: impl Into<String>,
) -> Self {
Self {
catalog_name: catalog.into(),
schema_name: schema.into(),
table_name: table.into(),
}
}
}
impl RouteRequest {
pub fn new(header: RequestHeader) -> Self {
Self {
header: Some(header),
..Default::default()
}
}
pub fn add_table(mut self, table_name: TableName) -> Self {
self.table_names.push(table_name);
self
}
}
impl CreateRequest {
pub fn new(header: RequestHeader, table_name: TableName) -> Self {
Self {
header: Some(header),
table_name: Some(table_name),
..Default::default()
}
}
pub fn add_region(mut self, region: Region) -> Self {
self.regions.push(region);
self
}
}
impl Region {
pub fn new(id: u64, name: impl Into<String>, partition: region::Partition) -> Self {
Self {
id,
name: name.into(),
partition: Some(partition),
..Default::default()
}
}
pub fn attr(mut self, key: impl Into<String>, val: impl Into<String>) -> Self {
self.attrs.insert(key.into(), val.into());
self
}
}
impl region::Partition {
pub fn new() -> Self {
Default::default()
}
pub fn column_list(mut self, column_list: Vec<Vec<u8>>) -> Self {
self.column_list = column_list;
self
}
pub fn value_list(mut self, value_list: Vec<Vec<u8>>) -> Self {
self.value_list = value_list;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_peer() {
let peer = Peer::new(1, "test_addr");
assert_eq!(1, peer.id);
assert_eq!(
Endpoint {
addr: "test_addr".to_string()
},
peer.endpoint.unwrap()
);
}
}

View File

@@ -14,6 +14,7 @@ common-telemetry = { path = "../common/telemetry", features = ["deadlock_detecti
datanode = { path = "../datanode" }
frontend = { path = "../frontend" }
futures = "0.3"
meta-srv = { path = "../meta-srv" }
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.18", features = ["full"] }
toml = "0.5"

View File

@@ -4,7 +4,9 @@ use clap::Parser;
use cmd::datanode;
use cmd::error::Result;
use cmd::frontend;
use common_telemetry::{self, logging::error, logging::info};
use cmd::metasrv;
use common_telemetry::logging::error;
use common_telemetry::logging::info;
#[derive(Parser)]
#[clap(name = "greptimedb")]
@@ -29,6 +31,8 @@ enum SubCommand {
Datanode(datanode::Command),
#[clap(name = "frontend")]
Frontend(frontend::Command),
#[clap(name = "metasrv")]
Metasrv(metasrv::Command),
}
impl SubCommand {
@@ -36,6 +40,7 @@ impl SubCommand {
match self {
SubCommand::Datanode(cmd) => cmd.run().await,
SubCommand::Frontend(cmd) => cmd.run().await,
SubCommand::Metasrv(cmd) => cmd.run().await,
}
}
}
@@ -45,6 +50,7 @@ impl fmt::Display for SubCommand {
match self {
SubCommand::Datanode(..) => write!(f, "greptime-datanode"),
SubCommand::Frontend(..) => write!(f, "greptime-frontend"),
SubCommand::Metasrv(..) => write!(f, "greptime-metasrv"),
}
}
}

View File

@@ -17,14 +17,24 @@ pub enum Error {
source: frontend::error::Error,
},
#[snafu(display("Failed to start meta server, source: {}", source))]
StartMetaServer {
#[snafu(backtrace)]
source: meta_srv::error::Error,
},
#[snafu(display("Failed to read config file: {}, source: {}", path, source))]
ReadConfig {
source: std::io::Error,
path: String,
source: std::io::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to parse config, source: {}", source))]
ParseConfig { source: toml::de::Error },
ParseConfig {
source: toml::de::Error,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -34,6 +44,7 @@ impl ErrorExt for Error {
match self {
Error::StartDatanode { source } => source.status_code(),
Error::StartFrontend { source } => source.status_code(),
Error::StartMetaServer { source } => source.status_code(),
Error::ReadConfig { .. } | Error::ParseConfig { .. } => StatusCode::InvalidArguments,
}
}
@@ -51,18 +62,68 @@ impl ErrorExt for Error {
mod tests {
use super::*;
fn raise_read_config_error() -> std::result::Result<(), std::io::Error> {
Err(std::io::ErrorKind::NotFound.into())
type StdResult<E> = std::result::Result<(), E>;
#[test]
fn test_start_node_error() {
fn throw_datanode_error() -> StdResult<datanode::error::Error> {
datanode::error::MissingFieldSnafu {
field: "test_field",
}
.fail()
}
let e = throw_datanode_error()
.context(StartDatanodeSnafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_error() {
let e = raise_read_config_error()
fn test_start_frontend_error() {
fn throw_frontend_error() -> StdResult<frontend::error::Error> {
frontend::error::InvalidSqlSnafu { err_msg: "failed" }.fail()
}
let e = throw_frontend_error()
.context(StartFrontendSnafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_start_metasrv_error() {
fn throw_metasrv_error() -> StdResult<meta_srv::error::Error> {
meta_srv::error::StreamNoneSnafu {}.fail()
}
let e = throw_metasrv_error()
.context(StartMetaServerSnafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_read_config_error() {
fn throw_read_config_error() -> StdResult<std::io::Error> {
Err(std::io::ErrorKind::NotFound.into())
}
let e = throw_read_config_error()
.context(ReadConfigSnafu { path: "test" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_none());
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
}
}

View File

@@ -1,4 +1,5 @@
pub mod datanode;
pub mod error;
pub mod frontend;
pub mod metasrv;
mod toml_loader;

112
src/cmd/src/metasrv.rs Normal file
View File

@@ -0,0 +1,112 @@
use clap::Parser;
use common_telemetry::logging;
use meta_srv::bootstrap;
use meta_srv::metasrv::MetaSrvOptions;
use snafu::ResultExt;
use crate::error;
use crate::error::Error;
use crate::error::Result;
use crate::toml_loader;
#[derive(Parser)]
pub struct Command {
#[clap(subcommand)]
subcmd: SubCommand,
}
impl Command {
pub async fn run(self) -> Result<()> {
self.subcmd.run().await
}
}
#[derive(Parser)]
enum SubCommand {
Start(StartCommand),
}
impl SubCommand {
async fn run(self) -> Result<()> {
match self {
SubCommand::Start(cmd) => cmd.run().await,
}
}
}
#[derive(Debug, Parser)]
struct StartCommand {
#[clap(long)]
server_addr: Option<String>,
#[clap(long)]
store_addr: Option<String>,
#[clap(short, long)]
config_file: Option<String>,
}
impl StartCommand {
async fn run(self) -> Result<()> {
logging::info!("MetaSrv start command: {:#?}", self);
let opts: MetaSrvOptions = self.try_into()?;
logging::info!("MetaSrv options: {:#?}", opts);
bootstrap::bootstrap_meta_srv(opts)
.await
.context(error::StartMetaServerSnafu)
}
}
impl TryFrom<StartCommand> for MetaSrvOptions {
type Error = Error;
fn try_from(cmd: StartCommand) -> Result<Self> {
let mut opts: MetaSrvOptions = if let Some(path) = cmd.config_file {
toml_loader::from_file!(&path)?
} else {
MetaSrvOptions::default()
};
if let Some(addr) = cmd.server_addr {
opts.server_addr = addr;
}
if let Some(addr) = cmd.store_addr {
opts.store_addr = addr;
}
Ok(opts)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_read_from_cmd() {
let cmd = StartCommand {
server_addr: Some("0.0.0.0:3002".to_string()),
store_addr: Some("127.0.0.1:2380".to_string()),
config_file: None,
};
let options: MetaSrvOptions = cmd.try_into().unwrap();
assert_eq!("0.0.0.0:3002".to_string(), options.server_addr);
assert_eq!("127.0.0.1:2380".to_string(), options.store_addr);
}
#[test]
fn test_read_from_config_file() {
let cmd = StartCommand {
server_addr: None,
store_addr: None,
config_file: Some(format!(
"{}/../../config/metasrv.example.toml",
std::env::current_dir().unwrap().as_path().to_str().unwrap()
)),
};
let options: MetaSrvOptions = cmd.try_into().unwrap();
assert_eq!("0.0.0.0:3002".to_string(), options.server_addr);
assert_eq!("127.0.0.1:2380".to_string(), options.store_addr);
}
}

View File

@@ -8,8 +8,11 @@ api = { path = "../../api" }
async-trait = "0.1"
common-base = { path = "../base" }
common-error = { path = "../error" }
common-runtime = { path = "../runtime" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.0", features = ["full"] }
tonic = "0.8"
[dependencies.arrow]
package = "arrow2"

View File

@@ -0,0 +1,426 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use snafu::ResultExt;
use tonic::transport::Channel as InnerChannel;
use tonic::transport::Endpoint;
use crate::error;
use crate::error::Result;
const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60;
#[derive(Clone, Debug)]
pub struct ChannelManager {
config: Option<ChannelConfig>,
pool: Arc<Mutex<Pool>>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ChannelConfig {
pub timeout: Option<Duration>,
pub connect_timeout: Option<Duration>,
pub concurrency_limit: Option<usize>,
pub rate_limit: Option<(u64, Duration)>,
pub initial_stream_window_size: Option<u32>,
pub initial_connection_window_size: Option<u32>,
pub http2_keep_alive_interval: Option<Duration>,
pub http2_keep_alive_timeout: Option<Duration>,
pub http2_keep_alive_while_idle: Option<bool>,
pub http2_adaptive_window: Option<bool>,
pub tcp_keepalive: Option<Duration>,
pub tcp_nodelay: bool,
}
impl Default for ChannelConfig {
fn default() -> Self {
Self {
timeout: None,
connect_timeout: None,
concurrency_limit: None,
rate_limit: None,
initial_stream_window_size: None,
initial_connection_window_size: None,
http2_keep_alive_interval: None,
http2_keep_alive_timeout: None,
http2_keep_alive_while_idle: None,
http2_adaptive_window: None,
tcp_keepalive: None,
tcp_nodelay: true,
}
}
}
impl ChannelConfig {
pub fn new() -> Self {
Default::default()
}
/// A timeout to each request.
pub fn timeout(self, timeout: Duration) -> Self {
Self {
timeout: Some(timeout),
..self
}
}
/// A timeout to connecting to the uri.
///
/// Defaults to no timeout.
pub fn connect_timeout(self, timeout: Duration) -> Self {
Self {
connect_timeout: Some(timeout),
..self
}
}
/// A concurrency limit to each request.
pub fn concurrency_limit(self, limit: usize) -> Self {
Self {
concurrency_limit: Some(limit),
..self
}
}
/// A rate limit to each request.
pub fn rate_limit(self, limit: u64, duration: Duration) -> Self {
Self {
rate_limit: Some((limit, duration)),
..self
}
}
/// Sets the SETTINGS_INITIAL_WINDOW_SIZE option for HTTP2 stream-level flow control.
/// Default is 65,535
pub fn initial_stream_window_size(self, size: u32) -> Self {
Self {
initial_stream_window_size: Some(size),
..self
}
}
/// Sets the max connection-level flow control for HTTP2
///
/// Default is 65,535
pub fn initial_connection_window_size(self, size: u32) -> Self {
Self {
initial_connection_window_size: Some(size),
..self
}
}
/// Set http2 KEEP_ALIVE_INTERVAL. Uses hypers default otherwise.
pub fn http2_keep_alive_interval(self, duration: Duration) -> Self {
Self {
http2_keep_alive_interval: Some(duration),
..self
}
}
/// Set http2 KEEP_ALIVE_TIMEOUT. Uses hypers default otherwise.
pub fn http2_keep_alive_timeout(self, duration: Duration) -> Self {
Self {
http2_keep_alive_timeout: Some(duration),
..self
}
}
/// Set http2 KEEP_ALIVE_WHILE_IDLE. Uses hypers default otherwise.
pub fn http2_keep_alive_while_idle(self, enabled: bool) -> Self {
Self {
http2_keep_alive_while_idle: Some(enabled),
..self
}
}
/// Sets whether to use an adaptive flow control. Uses hypers default otherwise.
pub fn http2_adaptive_window(self, enabled: bool) -> Self {
Self {
http2_adaptive_window: Some(enabled),
..self
}
}
/// Set whether TCP keepalive messages are enabled on accepted connections.
///
/// If None is specified, keepalive is disabled, otherwise the duration specified
/// will be the time to remain idle before sending TCP keepalive probes.
///
/// Default is no keepalive (None)
pub fn tcp_keepalive(self, duration: Duration) -> Self {
Self {
tcp_keepalive: Some(duration),
..self
}
}
/// Set the value of TCP_NODELAY option for accepted connections.
///
/// Enabled by default.
pub fn tcp_nodelay(self, enabled: bool) -> Self {
Self {
tcp_nodelay: enabled,
..self
}
}
}
#[derive(Debug)]
struct Pool {
channels: HashMap<String, Channel>,
}
#[derive(Debug)]
struct Channel {
channel: InnerChannel,
access: usize,
}
impl ChannelManager {
pub fn new() -> Self {
Default::default()
}
pub fn with_config(config: ChannelConfig) -> Self {
let mut manager = ChannelManager::new();
manager.config = Some(config);
manager
}
pub fn config(&self) -> Option<ChannelConfig> {
self.config.clone()
}
pub fn get(&self, addr: impl AsRef<str>) -> Result<InnerChannel> {
let addr = addr.as_ref();
let mut pool = self.pool.lock().unwrap();
if let Some(ch) = pool.get_mut(addr) {
ch.access += 1;
return Ok(ch.channel.clone());
}
let mut endpoint =
Endpoint::new(format!("http://{}", addr)).context(error::CreateChannelSnafu)?;
if let Some(cfg) = &self.config {
if let Some(dur) = cfg.timeout {
endpoint = endpoint.timeout(dur);
}
if let Some(dur) = cfg.connect_timeout {
endpoint = endpoint.connect_timeout(dur);
}
if let Some(limit) = cfg.concurrency_limit {
endpoint = endpoint.concurrency_limit(limit);
}
if let Some((limit, dur)) = cfg.rate_limit {
endpoint = endpoint.rate_limit(limit, dur);
}
if let Some(size) = cfg.initial_stream_window_size {
endpoint = endpoint.initial_stream_window_size(size);
}
if let Some(size) = cfg.initial_connection_window_size {
endpoint = endpoint.initial_connection_window_size(size);
}
if let Some(dur) = cfg.http2_keep_alive_interval {
endpoint = endpoint.http2_keep_alive_interval(dur);
}
if let Some(dur) = cfg.http2_keep_alive_timeout {
endpoint = endpoint.keep_alive_timeout(dur);
}
if let Some(enabled) = cfg.http2_keep_alive_while_idle {
endpoint = endpoint.keep_alive_while_idle(enabled);
}
if let Some(enabled) = cfg.http2_adaptive_window {
endpoint = endpoint.http2_adaptive_window(enabled);
}
endpoint = endpoint
.tcp_keepalive(cfg.tcp_keepalive)
.tcp_nodelay(cfg.tcp_nodelay);
}
let inner_channel = endpoint.connect_lazy();
let channel = Channel {
channel: inner_channel.clone(),
access: 1,
};
pool.put(addr, channel);
Ok(inner_channel)
}
}
impl Pool {
#[inline]
fn get_mut(&mut self, addr: &str) -> Option<&mut Channel> {
self.channels.get_mut(addr)
}
#[inline]
fn put(&mut self, addr: &str, channel: Channel) {
self.channels.insert(addr.to_string(), channel);
}
#[inline]
fn retain_channel<F>(&mut self, f: F)
where
F: FnMut(&String, &mut Channel) -> bool,
{
self.channels.retain(f);
}
}
impl Default for ChannelManager {
fn default() -> Self {
let pool = Pool {
channels: HashMap::default(),
};
let pool = Arc::new(Mutex::new(pool));
let cloned_pool = pool.clone();
common_runtime::spawn_bg(async move {
recycle_channel_in_loop(cloned_pool, RECYCLE_CHANNEL_INTERVAL_SECS).await;
});
Self { pool, config: None }
}
}
async fn recycle_channel_in_loop(pool: Arc<Mutex<Pool>>, interval_secs: u64) {
let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
loop {
interval.tick().await;
let mut pool = pool.lock().unwrap();
pool.retain_channel(|_, c| {
if c.access == 0 {
false
} else {
c.access = 0;
true
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[should_panic]
#[test]
fn test_invalid_addr() {
let pool = Pool {
channels: HashMap::default(),
};
let pool = Arc::new(Mutex::new(pool));
let mgr = ChannelManager { pool, config: None };
let addr = "http://test";
let _ = mgr.get(addr).unwrap();
}
#[tokio::test]
async fn test_access_count() {
let pool = Pool {
channels: HashMap::default(),
};
let pool = Arc::new(Mutex::new(pool));
let config = ChannelConfig::new()
.timeout(Duration::from_secs(1))
.connect_timeout(Duration::from_secs(1))
.concurrency_limit(1)
.rate_limit(1, Duration::from_secs(1))
.initial_stream_window_size(1)
.initial_connection_window_size(1)
.http2_keep_alive_interval(Duration::from_secs(1))
.http2_keep_alive_timeout(Duration::from_secs(1))
.http2_keep_alive_while_idle(true)
.http2_adaptive_window(true)
.tcp_keepalive(Duration::from_secs(1))
.tcp_nodelay(true);
let mgr = ChannelManager {
pool,
config: Some(config),
};
let addr = "test_uri";
for i in 0..10 {
{
let _ = mgr.get(addr).unwrap();
let mut pool = mgr.pool.lock().unwrap();
assert_eq!(i + 1, pool.get_mut(addr).unwrap().access);
}
}
let mut pool = mgr.pool.lock().unwrap();
assert_eq!(10, pool.get_mut(addr).unwrap().access);
pool.retain_channel(|_, c| {
if c.access == 0 {
false
} else {
c.access = 0;
true
}
});
assert_eq!(0, pool.get_mut(addr).unwrap().access);
}
#[test]
fn test_config() {
let default_cfg = ChannelConfig::new();
assert_eq!(
ChannelConfig {
timeout: None,
connect_timeout: None,
concurrency_limit: None,
rate_limit: None,
initial_stream_window_size: None,
initial_connection_window_size: None,
http2_keep_alive_interval: None,
http2_keep_alive_timeout: None,
http2_keep_alive_while_idle: None,
http2_adaptive_window: None,
tcp_keepalive: None,
tcp_nodelay: true,
},
default_cfg
);
let cfg = default_cfg
.timeout(Duration::from_secs(3))
.connect_timeout(Duration::from_secs(5))
.concurrency_limit(6)
.rate_limit(5, Duration::from_secs(1))
.initial_stream_window_size(10)
.initial_connection_window_size(20)
.http2_keep_alive_interval(Duration::from_secs(1))
.http2_keep_alive_timeout(Duration::from_secs(3))
.http2_keep_alive_while_idle(true)
.http2_adaptive_window(true)
.tcp_keepalive(Duration::from_secs(2))
.tcp_nodelay(false);
assert_eq!(
ChannelConfig {
timeout: Some(Duration::from_secs(3)),
connect_timeout: Some(Duration::from_secs(5)),
concurrency_limit: Some(6),
rate_limit: Some((5, Duration::from_secs(1))),
initial_stream_window_size: Some(10),
initial_connection_window_size: Some(20),
http2_keep_alive_interval: Some(Duration::from_secs(1)),
http2_keep_alive_timeout: Some(Duration::from_secs(3)),
http2_keep_alive_while_idle: Some(true),
http2_adaptive_window: Some(true),
tcp_keepalive: Some(Duration::from_secs(2)),
tcp_nodelay: false,
},
cfg
);
}
}

View File

@@ -49,6 +49,12 @@ pub enum Error {
actual: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to create gRPC channel, source: {}", source))]
CreateChannel {
source: tonic::transport::Error,
backtrace: Backtrace,
},
}
impl ErrorExt for Error {
@@ -61,9 +67,9 @@ impl ErrorExt for Error {
Error::UnsupportedDfPlan { .. } | Error::UnsupportedDfExpr { .. } => {
StatusCode::Unsupported
}
Error::NewProjection { .. } | Error::DecodePhysicalPlanNode { .. } => {
StatusCode::Internal
}
Error::NewProjection { .. }
| Error::DecodePhysicalPlanNode { .. }
| Error::CreateChannel { .. } => StatusCode::Internal,
}
}
@@ -75,3 +81,129 @@ impl ErrorExt for Error {
self
}
}
#[cfg(test)]
mod tests {
use snafu::OptionExt;
use snafu::ResultExt;
use super::*;
type StdResult<E> = std::result::Result<(), E>;
fn throw_none_option() -> Option<String> {
None
}
#[test]
fn test_empty_physical_plan_error() {
let e = throw_none_option()
.context(EmptyPhysicalPlanSnafu { name: "test" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_empty_physical_expr_error() {
let e = throw_none_option()
.context(EmptyPhysicalExprSnafu { name: "test" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_unsupported_df_plan_error() {
let e = throw_none_option()
.context(UnsupportedDfPlanSnafu { name: "test" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Unsupported);
}
#[test]
fn test_unsupported_df_expr_error() {
let e = throw_none_option()
.context(UnsupportedDfExprSnafu { name: "test" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Unsupported);
}
#[test]
fn test_missing_field_error() {
let e = throw_none_option()
.context(MissingFieldSnafu { field: "test" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_new_projection_error() {
fn throw_df_error() -> StdResult<DataFusionError> {
Err(DataFusionError::NotImplemented("".to_string()))
}
let e = throw_df_error().context(NewProjectionSnafu).err().unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_decode_physical_plan_node_error() {
fn throw_decode_error() -> StdResult<DecodeError> {
Err(DecodeError::new("test"))
}
let e = throw_decode_error()
.context(DecodePhysicalPlanNodeSnafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_type_mismatch_error() {
let e = throw_none_option()
.context(TypeMismatchSnafu {
column_name: "",
expected: "",
actual: "",
})
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_create_channel_error() {
fn throw_tonic_error() -> StdResult<tonic::transport::Error> {
tonic::transport::Endpoint::new("http//http").map(|_| ())
}
let e = throw_tonic_error()
.context(CreateChannelSnafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
}

View File

@@ -1,3 +1,4 @@
pub mod channel_manager;
pub mod error;
pub mod physical;
pub mod writer;

View File

@@ -42,7 +42,7 @@ store-api = { path = "../store-api" }
table = { path = "../table" }
table-engine = { path = "../table-engine", features = ["test"] }
tokio = { version = "1.18", features = ["full"] }
tokio-stream = { version = "0.1.8", features = ["net"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.3", features = ["full"] }

View File

@@ -0,0 +1,21 @@
[package]
name = "meta-client"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
api = { path = "../api" }
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-telemetry = { path = "../common/telemetry" }
etcd-client = "0.10"
rand = "0.8"
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.18", features = ["full"] }
tonic = "0.8"
[dev-dependencies]
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@@ -0,0 +1,89 @@
use std::time::Duration;
use api::v1::meta::region::Partition;
use api::v1::meta::CreateRequest;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::PutRequest;
use api::v1::meta::RangeRequest;
use api::v1::meta::Region;
use api::v1::meta::RequestHeader;
use api::v1::meta::TableName;
use common_grpc::channel_manager::ChannelConfig;
use common_grpc::channel_manager::ChannelManager;
use meta_client::client::MetaClientBuilder;
use tracing::event;
use tracing::subscriber;
use tracing::Level;
use tracing_subscriber::FmtSubscriber;
fn main() {
subscriber::set_global_default(FmtSubscriber::builder().finish()).unwrap();
run();
}
#[tokio::main]
async fn run() {
let config = ChannelConfig::new()
.timeout(Duration::from_secs(3))
.connect_timeout(Duration::from_secs(5))
.tcp_nodelay(true);
let channel_manager = ChannelManager::with_config(config);
let mut meta_client = MetaClientBuilder::new()
.heartbeat_client(true)
.router_client(true)
.store_client(true)
.channel_manager(channel_manager)
.build();
meta_client.start(&["127.0.0.1:3002"]).await.unwrap();
// required only when the heartbeat_client is enabled
meta_client.ask_leader().await.unwrap();
let header = RequestHeader::new(0, 0);
let p1 = Partition::new()
.column_list(vec![b"col_1".to_vec(), b"col_2".to_vec()])
.value_list(vec![b"k1".to_vec(), b"k2".to_vec()]);
let p2 = Partition::new()
.column_list(vec![b"col_1".to_vec(), b"col_2".to_vec()])
.value_list(vec![b"Max1".to_vec(), b"Max2".to_vec()]);
let table_name = TableName::new("test_catlog", "test_schema", "test_table");
let create_req = CreateRequest::new(header, table_name)
.add_region(Region::new(0, "test_region1", p1))
.add_region(Region::new(1, "test_region2", p2));
let res = meta_client.create_route(create_req).await.unwrap();
event!(Level::INFO, "create_route result: {:#?}", res);
// put
let put_req = PutRequest {
key: b"key1".to_vec(),
value: b"value1".to_vec(),
prev_kv: true,
..Default::default()
};
let res = meta_client.put(put_req).await.unwrap();
event!(Level::INFO, "put result: {:#?}", res);
// get
let range_req = RangeRequest {
key: b"key1".to_vec(),
..Default::default()
};
let res = meta_client.range(range_req.clone()).await.unwrap();
event!(Level::INFO, "get range result: {:#?}", res);
// delete
let delete_range_req = DeleteRangeRequest {
key: b"key1".to_vec(),
..Default::default()
};
let res = meta_client.delete_range(delete_range_req).await.unwrap();
event!(Level::INFO, "delete range result: {:#?}", res);
// get none
let res = meta_client.range(range_req).await;
event!(Level::INFO, "get range result: {:#?}", res);
}

View File

@@ -0,0 +1,332 @@
mod heartbeat;
mod load_balance;
mod router;
mod store;
use api::v1::meta::CreateRequest;
use api::v1::meta::CreateResponse;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse;
use api::v1::meta::PutRequest;
use api::v1::meta::PutResponse;
use api::v1::meta::RangeRequest;
use api::v1::meta::RangeResponse;
use api::v1::meta::RouteRequest;
use api::v1::meta::RouteResponse;
use common_grpc::channel_manager::ChannelConfig;
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::info;
use heartbeat::Client as HeartbeatClient;
use router::Client as RouterClient;
use snafu::OptionExt;
use store::Client as StoreClient;
use crate::error;
use crate::error::Result;
#[derive(Clone, Debug, Default)]
pub struct MetaClientBuilder {
heartbeat_client: bool,
router_client: bool,
store_client: bool,
channel_manager: Option<ChannelManager>,
}
impl MetaClientBuilder {
pub fn new() -> Self {
MetaClientBuilder::default()
}
pub fn heartbeat_client(self, enabled: bool) -> Self {
Self {
heartbeat_client: enabled,
..self
}
}
pub fn router_client(self, enabled: bool) -> Self {
Self {
router_client: enabled,
..self
}
}
pub fn store_client(self, enabled: bool) -> Self {
Self {
store_client: enabled,
..self
}
}
pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
Self {
channel_manager: Some(channel_manager),
..self
}
}
pub fn build(self) -> MetaClient {
let mut meta_client = if let Some(mgr) = self.channel_manager {
MetaClient {
channel_manager: mgr,
..Default::default()
}
} else {
Default::default()
};
if let (false, false, false) =
(self.heartbeat_client, self.router_client, self.store_client)
{
panic!("At least one client needs to be enabled.")
}
let mgr = meta_client.channel_manager.clone();
if self.heartbeat_client {
meta_client.heartbeat_client = Some(HeartbeatClient::new(mgr.clone()));
}
if self.router_client {
meta_client.router_client = Some(RouterClient::new(mgr.clone()));
}
if self.store_client {
meta_client.store_client = Some(StoreClient::new(mgr));
}
meta_client
}
}
#[derive(Clone, Debug, Default)]
pub struct MetaClient {
channel_manager: ChannelManager,
heartbeat_client: Option<HeartbeatClient>,
router_client: Option<RouterClient>,
store_client: Option<StoreClient>,
}
impl MetaClient {
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]> + Clone,
{
info!("MetaClient channel config: {:?}", self.channel_config());
if let Some(heartbeat_client) = &mut self.heartbeat_client {
heartbeat_client.start(urls.clone()).await?;
info!("Heartbeat client started");
}
if let Some(router_client) = &mut self.router_client {
router_client.start(urls.clone()).await?;
info!("Router client started");
}
if let Some(store_client) = &mut self.store_client {
store_client.start(urls).await?;
info!("Store client started");
}
Ok(())
}
pub async fn ask_leader(&self) -> Result<()> {
self.heartbeat_client()
.context(error::NotStartedSnafu {
name: "heartbeat_client",
})?
.ask_leader()
.await
}
pub async fn refresh_members(&mut self) {
todo!()
}
pub async fn create_route(&self, req: CreateRequest) -> Result<CreateResponse> {
self.router_client()
.context(error::NotStartedSnafu {
name: "route_client",
})?
.create(req)
.await
}
/// Fetch routing information for tables. The smallest unit is the complete
/// routing information(all regions) of a table.
///
/// ```text
/// table_1
/// table_name
/// table_schema
/// regions
/// region_1
/// mutate_endpoint
/// select_endpoint_1, select_endpoint_2
/// region_2
/// mutate_endpoint
/// select_endpoint_1, select_endpoint_2, select_endpoint_3
/// region_xxx
/// table_2
/// ...
/// ```
///
pub async fn route(&self, req: RouteRequest) -> Result<RouteResponse> {
self.router_client()
.context(error::NotStartedSnafu {
name: "route_client",
})?
.route(req)
.await
}
/// Range gets the keys in the range from the key-value store.
pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
self.store_client()
.context(error::NotStartedSnafu {
name: "store_client",
})?
.range(req)
.await
}
/// Put puts the given key into the key-value store.
pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
self.store_client()
.context(error::NotStartedSnafu {
name: "store_client",
})?
.put(req)
.await
}
/// DeleteRange deletes the given range from the key-value store.
pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
self.store_client()
.context(error::NotStartedSnafu {
name: "store_client",
})?
.delete_range(req)
.await
}
#[inline]
pub fn heartbeat_client(&self) -> Option<HeartbeatClient> {
self.heartbeat_client.clone()
}
#[inline]
pub fn router_client(&self) -> Option<RouterClient> {
self.router_client.clone()
}
#[inline]
pub fn store_client(&self) -> Option<StoreClient> {
self.store_client.clone()
}
pub fn channel_config(&self) -> Option<ChannelConfig> {
self.channel_manager.config()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_meta_client_builder() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new().heartbeat_client(true).build();
assert!(meta_client.heartbeat_client().is_some());
assert!(meta_client.router_client().is_none());
assert!(meta_client.store_client().is_none());
meta_client.start(urls).await.unwrap();
assert!(meta_client.heartbeat_client().unwrap().is_started().await);
let mut meta_client = MetaClientBuilder::new().router_client(true).build();
assert!(meta_client.heartbeat_client().is_none());
assert!(meta_client.router_client().is_some());
assert!(meta_client.store_client().is_none());
meta_client.start(urls).await.unwrap();
assert!(meta_client.router_client().unwrap().is_started().await);
let mut meta_client = MetaClientBuilder::new().store_client(true).build();
assert!(meta_client.heartbeat_client().is_none());
assert!(meta_client.router_client().is_none());
assert!(meta_client.store_client().is_some());
meta_client.start(urls).await.unwrap();
assert!(meta_client.store_client().unwrap().is_started().await);
let mut meta_client = MetaClientBuilder::new()
.heartbeat_client(true)
.router_client(true)
.store_client(true)
.build();
assert!(meta_client.heartbeat_client().is_some());
assert!(meta_client.router_client().is_some());
assert!(meta_client.store_client().is_some());
meta_client.start(urls).await.unwrap();
assert!(meta_client.heartbeat_client().unwrap().is_started().await);
assert!(meta_client.router_client().unwrap().is_started().await);
assert!(meta_client.store_client().unwrap().is_started().await);
}
#[tokio::test]
async fn test_not_start_heartbeat_client() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new()
.router_client(true)
.store_client(true)
.build();
meta_client.start(urls).await.unwrap();
let res = meta_client.ask_leader().await;
assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
}
#[tokio::test]
async fn test_not_start_router_client() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new()
.heartbeat_client(true)
.store_client(true)
.build();
meta_client.start(urls).await.unwrap();
let res = meta_client.create_route(CreateRequest::default()).await;
assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
}
#[tokio::test]
async fn test_not_start_store_client() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new()
.heartbeat_client(true)
.router_client(true)
.build();
meta_client.start(urls).await.unwrap();
let res = meta_client.put(PutRequest::default()).await;
assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
}
#[should_panic]
#[test]
fn test_enable_at_least_one_client() {
let _ = MetaClientBuilder::new()
.heartbeat_client(false)
.router_client(false)
.store_client(false)
.build();
}
}

View File

@@ -0,0 +1,193 @@
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::heartbeat_client::HeartbeatClient;
use api::v1::meta::AskLeaderRequest;
use api::v1::meta::RequestHeader;
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::debug;
use snafu::ensure;
use snafu::OptionExt;
use snafu::ResultExt;
use tokio::sync::RwLock;
use tonic::transport::Channel;
use crate::error;
use crate::error::Result;
#[derive(Clone, Debug)]
pub struct Client {
inner: Arc<RwLock<Inner>>,
}
impl Client {
pub fn new(channel_manager: ChannelManager) -> Self {
let inner = Inner {
channel_manager,
peers: HashSet::default(),
leader: None,
};
Self {
inner: Arc::new(RwLock::new(inner)),
}
}
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let mut inner = self.inner.write().await;
inner.start(urls).await
}
pub async fn ask_leader(&mut self) -> Result<()> {
let mut inner = self.inner.write().await;
inner.ask_leader().await
}
pub async fn is_started(&self) -> bool {
let inner = self.inner.read().await;
inner.is_started()
}
// TODO(jiachun) send heartbeat
}
#[derive(Debug)]
struct Inner {
channel_manager: ChannelManager,
peers: HashSet<String>,
leader: Option<String>,
}
impl Inner {
async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
ensure!(
!self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "Heartbeat client already started"
}
);
self.peers = urls
.as_ref()
.iter()
.map(|url| url.as_ref().to_string())
.collect();
Ok(())
}
async fn ask_leader(&mut self) -> Result<()> {
ensure!(
self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "Heartbeat client not start"
}
);
// TODO(jiachun): set cluster_id and member_id
let header = RequestHeader::new(0, 0);
let mut leader = None;
for addr in &self.peers {
let req = AskLeaderRequest::new(header.clone());
let mut client = self.make_client(addr)?;
match client.ask_leader(req).await {
Ok(res) => {
if let Some(endpoint) = res.into_inner().leader {
leader = Some(endpoint.addr);
break;
}
}
Err(status) => {
debug!("Failed to ask leader from: {}, {}", addr, status);
}
}
}
self.leader = Some(leader.context(error::AskLeaderSnafu)?);
Ok(())
}
fn make_client(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
let channel = self
.channel_manager
.get(addr)
.context(error::CreateChannelSnafu)?;
Ok(HeartbeatClient::new(channel))
}
#[inline]
fn is_started(&self) -> bool {
!self.peers.is_empty()
}
}
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new(ChannelManager::default());
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
}
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new(ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
let res = client.start(&["127.0.0.1:1002"]).await;
assert!(res.is_err());
assert!(matches!(
res.err(),
Some(error::Error::IllegalGrpcClientState { .. })
));
}
#[tokio::test]
async fn test_start_with_duplicate_peers() {
let mut client = Client::new(ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
.await
.unwrap();
assert_eq!(1, client.inner.write().await.peers.len());
}
#[tokio::test]
async fn test_ask_leader_unavailable() {
let mut client = Client::new(ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
let res = client.ask_leader().await;
assert!(res.is_err());
let err = res.err().unwrap();
assert!(matches!(err, error::Error::AskLeader { .. }));
}
}

View File

@@ -0,0 +1,34 @@
use rand::Rng;
pub fn random_get<T, F>(len: usize, func: F) -> Option<T>
where
F: FnOnce(usize) -> Option<T>,
{
if len == 0 {
return None;
}
let mut rng = rand::thread_rng();
let i = rng.gen_range(0..len);
func(i)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_random_get() {
for i in 1..100 {
let res = random_get(i, |index| Some(2 * index));
assert!(res.unwrap() < 2 * i);
}
}
#[test]
fn test_random_get_none() {
let res = random_get(0, |index| Some(2 * index));
assert!(res.is_none());
}
}

View File

@@ -0,0 +1,218 @@
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::router_client::RouterClient;
use api::v1::meta::CreateRequest;
use api::v1::meta::CreateResponse;
use api::v1::meta::RouteRequest;
use api::v1::meta::RouteResponse;
use common_grpc::channel_manager::ChannelManager;
use snafu::ensure;
use snafu::OptionExt;
use snafu::ResultExt;
use tokio::sync::RwLock;
use tonic::transport::Channel;
use crate::client::load_balance as lb;
use crate::error;
use crate::error::Result;
#[derive(Clone, Debug)]
pub struct Client {
inner: Arc<RwLock<Inner>>,
}
impl Client {
pub fn new(channel_manager: ChannelManager) -> Self {
let inner = Inner {
channel_manager,
peers: vec![],
};
Self {
inner: Arc::new(RwLock::new(inner)),
}
}
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let mut inner = self.inner.write().await;
inner.start(urls).await
}
pub async fn is_started(&self) -> bool {
let inner = self.inner.read().await;
inner.is_started()
}
pub async fn create(&self, req: CreateRequest) -> Result<CreateResponse> {
let inner = self.inner.read().await;
inner.create(req).await
}
pub async fn route(&self, req: RouteRequest) -> Result<RouteResponse> {
let inner = self.inner.read().await;
inner.route(req).await
}
}
#[derive(Debug)]
struct Inner {
channel_manager: ChannelManager,
peers: Vec<String>,
}
impl Inner {
async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
ensure!(
!self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "Router client already started",
}
);
self.peers = urls
.as_ref()
.iter()
.map(|url| url.as_ref().to_string())
.collect::<HashSet<_>>()
.drain()
.collect::<Vec<_>>();
Ok(())
}
async fn route(&self, req: RouteRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
let res = client.route(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
async fn create(&self, req: CreateRequest) -> Result<CreateResponse> {
let mut client = self.random_client()?;
let res = client.create(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
fn random_client(&self) -> Result<RouterClient<Channel>> {
let len = self.peers.len();
let peer = lb::random_get(len, |i| Some(&self.peers[i])).context(
error::IllegalGrpcClientStateSnafu {
err_msg: "Empty peers, router client may not start yet",
},
)?;
self.make_client(peer)
}
fn make_client(&self, addr: impl AsRef<str>) -> Result<RouterClient<Channel>> {
let channel = self
.channel_manager
.get(addr)
.context(error::CreateChannelSnafu)?;
Ok(RouterClient::new(channel))
}
#[inline]
fn is_started(&self) -> bool {
!self.peers.is_empty()
}
}
#[cfg(test)]
mod test {
use api::v1::meta::{RequestHeader, TableName};
use super::*;
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new(ChannelManager::default());
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
}
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new(ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
let res = client.start(&["127.0.0.1:1002"]).await;
assert!(res.is_err());
assert!(matches!(
res.err(),
Some(error::Error::IllegalGrpcClientState { .. })
));
}
#[tokio::test]
async fn test_start_with_duplicate_peers() {
let mut client = Client::new(ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
.await
.unwrap();
assert_eq!(1, client.inner.write().await.peers.len());
}
#[tokio::test]
async fn test_create_unavailable() {
let mut client = Client::new(ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
let header = RequestHeader::new(0, 0);
let req = CreateRequest::new(header, TableName::default());
let res = client.create(req).await;
assert!(res.is_err());
let err = res.err().unwrap();
assert!(
matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable)
);
}
#[tokio::test]
async fn test_route_unavailable() {
let mut client = Client::new(ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
let header = RequestHeader::new(0, 0);
let req = RouteRequest::new(header);
let res = client.route(req).await;
assert!(res.is_err());
let err = res.err().unwrap();
assert!(
matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable)
);
}
}

View File

@@ -0,0 +1,259 @@
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::store_client::StoreClient;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse;
use api::v1::meta::PutRequest;
use api::v1::meta::PutResponse;
use api::v1::meta::RangeRequest;
use api::v1::meta::RangeResponse;
use common_grpc::channel_manager::ChannelManager;
use snafu::ensure;
use snafu::OptionExt;
use snafu::ResultExt;
use tokio::sync::RwLock;
use tonic::transport::Channel;
use crate::client::load_balance as lb;
use crate::error;
use crate::error::Result;
#[derive(Clone, Debug)]
pub struct Client {
inner: Arc<RwLock<Inner>>,
}
impl Client {
pub fn new(channel_manager: ChannelManager) -> Self {
let inner = Inner {
channel_manager,
peers: vec![],
};
Self {
inner: Arc::new(RwLock::new(inner)),
}
}
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let mut inner = self.inner.write().await;
inner.start(urls).await
}
pub async fn is_started(&self) -> bool {
let inner = self.inner.read().await;
inner.is_started()
}
pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
let inner = self.inner.read().await;
inner.range(req).await
}
pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
let inner = self.inner.read().await;
inner.put(req).await
}
pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let inner = self.inner.read().await;
inner.delete_range(req).await
}
}
#[derive(Debug)]
struct Inner {
channel_manager: ChannelManager,
peers: Vec<String>,
}
impl Inner {
async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
ensure!(
!self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "Store client already started",
}
);
self.peers = urls
.as_ref()
.iter()
.map(|url| url.as_ref().to_string())
.collect::<HashSet<_>>()
.drain()
.collect::<Vec<_>>();
Ok(())
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
let mut client = self.random_client()?;
let res = client.range(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
async fn put(&self, req: PutRequest) -> Result<PutResponse> {
let mut client = self.random_client()?;
let res = client.put(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let mut client = self.random_client()?;
let res = client
.delete_range(req)
.await
.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
fn random_client(&self) -> Result<StoreClient<Channel>> {
let len = self.peers.len();
let peer = lb::random_get(len, |i| Some(&self.peers[i])).context(
error::IllegalGrpcClientStateSnafu {
err_msg: "Empty peers, store client may not start yet",
},
)?;
self.make_client(peer)
}
fn make_client(&self, addr: impl AsRef<str>) -> Result<StoreClient<Channel>> {
let channel = self
.channel_manager
.get(addr)
.context(error::CreateChannelSnafu)?;
Ok(StoreClient::new(channel))
}
#[inline]
fn is_started(&self) -> bool {
!self.peers.is_empty()
}
}
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new(ChannelManager::default());
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
}
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new(ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
let res = client.start(&["127.0.0.1:1002"]).await;
assert!(res.is_err());
assert!(matches!(
res.err(),
Some(error::Error::IllegalGrpcClientState { .. })
));
}
#[tokio::test]
async fn test_start_with_duplicate_peers() {
let mut client = Client::new(ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
.await
.unwrap();
assert_eq!(1, client.inner.write().await.peers.len());
}
#[tokio::test]
async fn test_range_unavailable() {
let mut client = Client::new(ChannelManager::default());
client.start(&["unknow_peer"]).await.unwrap();
let req = RangeRequest {
key: b"key1".to_vec(),
..Default::default()
};
let res = client.range(req).await;
assert!(res.is_err());
let err = res.err().unwrap();
assert!(
matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable)
);
}
#[tokio::test]
async fn test_put_unavailable() {
let mut client = Client::new(ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
let req = PutRequest {
key: b"key1".to_vec(),
value: b"value1".to_vec(),
prev_kv: true,
..Default::default()
};
let res = client.put(req).await;
assert!(res.is_err());
let err = res.err().unwrap();
assert!(
matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable)
);
}
#[tokio::test]
async fn test_delete_range_unavailable() {
let mut client = Client::new(ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
let req = DeleteRangeRequest {
key: b"key1".to_vec(),
..Default::default()
};
let res = client.delete_range(req).await;
assert!(res.is_err());
let err = res.err().unwrap();
assert!(
matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable)
);
}
}

View File

@@ -0,0 +1,137 @@
use common_error::prelude::*;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to connect to {}, source: {}", url, source))]
ConnectFailed {
url: String,
source: tonic::transport::Error,
backtrace: Backtrace,
},
#[snafu(display("Illegal GRPC client state: {}", err_msg))]
IllegalGrpcClientState {
err_msg: String,
backtrace: Backtrace,
},
#[snafu(display("Tonic internal error, source: {}", source))]
TonicStatus {
source: tonic::Status,
backtrace: Backtrace,
},
#[snafu(display("Failed to ask leader from all endpoints"))]
AskLeader { backtrace: Backtrace },
#[snafu(display("Failed to create gRPC channel, source: {}", source))]
CreateChannel {
#[snafu(backtrace)]
source: common_grpc::error::Error,
},
#[snafu(display("{} not started", name))]
NotStarted { name: String, backtrace: Backtrace },
}
#[allow(dead_code)]
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn status_code(&self) -> StatusCode {
match self {
Error::ConnectFailed { .. }
| Error::IllegalGrpcClientState { .. }
| Error::TonicStatus { .. }
| Error::AskLeader { .. }
| Error::NotStarted { .. }
| Error::CreateChannel { .. } => StatusCode::Internal,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
type StdResult<E> = std::result::Result<(), E>;
fn throw_none_option() -> Option<String> {
None
}
#[test]
fn test_connect_failed_error() {
fn throw_tonic_error() -> StdResult<tonic::transport::Error> {
tonic::transport::Endpoint::new("http//http").map(|_| ())
}
let e = throw_tonic_error()
.context(ConnectFailedSnafu { url: "" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_illegal_grpc_client_state_error() {
let e = throw_none_option()
.context(IllegalGrpcClientStateSnafu { err_msg: "" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_tonic_status_error() {
fn throw_tonic_status_error() -> StdResult<tonic::Status> {
Err(tonic::Status::new(tonic::Code::Aborted, ""))
}
let e = throw_tonic_status_error()
.context(TonicStatusSnafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_ask_leader_error() {
let e = throw_none_option().context(AskLeaderSnafu).err().unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_create_channel_error() {
fn throw_common_grpc_error() -> StdResult<common_grpc::Error> {
tonic::transport::Endpoint::new("http//http")
.map(|_| ())
.context(common_grpc::error::CreateChannelSnafu)
}
let e = throw_common_grpc_error()
.context(CreateChannelSnafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
}

View File

@@ -0,0 +1,2 @@
pub mod client;
mod error;

26
src/meta-srv/Cargo.toml Normal file
View File

@@ -0,0 +1,26 @@
[package]
name = "meta-srv"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
api = { path = "../api" }
async-trait = "0.1"
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-telemetry = { path = "../common/telemetry" }
etcd-client = "0.10"
futures = "0.3"
http-body = "0.4"
snafu = { version = "0.7", features = ["backtraces"] }
serde = "1.0"
tokio = { version = "1.0", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"
url = "2.3"
[dev-dependencies]
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@@ -0,0 +1,48 @@
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::PutRequest;
use api::v1::meta::RangeRequest;
use meta_srv::service::store::etcd::EtcdStore;
use tracing::event;
use tracing::subscriber;
use tracing::Level;
use tracing_subscriber::FmtSubscriber;
fn main() {
subscriber::set_global_default(FmtSubscriber::builder().finish()).unwrap();
run();
}
#[tokio::main]
async fn run() {
let kv_store = EtcdStore::with_endpoints(["127.0.0.1:2380"]).await.unwrap();
// put
let put_req = PutRequest {
key: b"key1".to_vec(),
value: b"value1".to_vec(),
prev_kv: true,
..Default::default()
};
let res = kv_store.put(put_req).await;
event!(Level::INFO, "put result: {:#?}", res);
// get
let range_req = RangeRequest {
key: b"key1".to_vec(),
..Default::default()
};
let res = kv_store.range(range_req.clone()).await;
event!(Level::INFO, "get range result: {:#?}", res);
// delete
let delete_range_req = DeleteRangeRequest {
key: b"key1".to_vec(),
..Default::default()
};
let res = kv_store.delete_range(delete_range_req).await;
event!(Level::INFO, "delete range result: {:#?}", res);
// get none
let res = kv_store.range(range_req).await;
event!(Level::INFO, "get range result: {:#?}", res);
}

View File

@@ -0,0 +1,37 @@
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::router_server::RouterServer;
use api::v1::meta::store_server::StoreServer;
use snafu::ResultExt;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use crate::error;
use crate::metasrv::MetaSrv;
use crate::metasrv::MetaSrvOptions;
use crate::service::admin;
use crate::service::store::etcd::EtcdStore;
// Bootstrap the rpc server to serve incoming request
pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> crate::Result<()> {
let kv_store = EtcdStore::with_endpoints([&opts.store_addr]).await?;
let meta_srv = MetaSrv::new(kv_store);
let listener = TcpListener::bind(&opts.server_addr)
.await
.context(error::TcpBindSnafu {
addr: &opts.server_addr,
})?;
let listener = TcpListenerStream::new(listener);
tonic::transport::Server::builder()
.accept_http1(true) // for admin services
.add_service(HeartbeatServer::new(meta_srv.clone()))
.add_service(RouterServer::new(meta_srv.clone()))
.add_service(StoreServer::new(meta_srv.clone()))
.add_service(admin::make_admin_service(meta_srv.clone()))
.serve_with_incoming(listener)
.await
.context(error::StartGrpcSnafu)?;
Ok(())
}

145
src/meta-srv/src/error.rs Normal file
View File

@@ -0,0 +1,145 @@
use common_error::prelude::*;
use tonic::{Code, Status};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Error stream request next is None"))]
StreamNone { backtrace: Backtrace },
#[snafu(display("Empty key is not allowed"))]
EmptyKey { backtrace: Backtrace },
#[snafu(display("Failed to execute via Etcd, source: {}", source))]
EtcdFailed {
source: etcd_client::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to connect to Etcd, source: {}", source))]
ConnectEtcd {
source: etcd_client::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to bind address {}, source: {}", addr, source))]
TcpBind {
addr: String,
source: std::io::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to start gRPC server, source: {}", source))]
StartGrpc {
source: tonic::transport::Error,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for Status {
fn from(err: Error) -> Self {
Status::new(Code::Internal, err.to_string())
}
}
impl ErrorExt for Error {
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn status_code(&self) -> StatusCode {
match self {
Error::StreamNone { .. }
| Error::EtcdFailed { .. }
| Error::ConnectEtcd { .. }
| Error::TcpBind { .. }
| Error::StartGrpc { .. } => StatusCode::Internal,
Error::EmptyKey { .. } => StatusCode::InvalidArguments,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
type StdResult<E> = std::result::Result<(), E>;
fn throw_none_option() -> Option<String> {
None
}
fn throw_etcd_client_error() -> StdResult<etcd_client::Error> {
Err(etcd_client::Error::InvalidArgs("".to_string()))
}
#[test]
fn test_stream_node_error() {
let e = throw_none_option().context(StreamNoneSnafu).err().unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_empty_key_error() {
let e = throw_none_option().context(EmptyKeySnafu).err().unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_etcd_failed_error() {
let e = throw_etcd_client_error()
.context(EtcdFailedSnafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_connect_etcd_error() {
let e = throw_etcd_client_error()
.context(ConnectEtcdSnafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_tcp_bind_error() {
fn throw_std_error() -> StdResult<std::io::Error> {
Err(std::io::ErrorKind::NotFound.into())
}
let e = throw_std_error()
.context(TcpBindSnafu { addr: "127.0.0.1" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_start_grpc_error() {
fn throw_tonic_error() -> StdResult<tonic::transport::Error> {
tonic::transport::Endpoint::new("http//http").map(|_| ())
}
let e = throw_tonic_error().context(StartGrpcSnafu).err().unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
}

6
src/meta-srv/src/lib.rs Normal file
View File

@@ -0,0 +1,6 @@
pub mod bootstrap;
pub mod error;
pub mod metasrv;
pub mod service;
pub use crate::error::Result;

View File

@@ -0,0 +1,34 @@
use serde::Deserialize;
use serde::Serialize;
use crate::service::store::kv::KvStoreRef;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct MetaSrvOptions {
pub server_addr: String,
pub store_addr: String,
}
impl Default for MetaSrvOptions {
fn default() -> Self {
Self {
server_addr: "0.0.0.0:3002".to_string(),
store_addr: "0.0.0.0:2380".to_string(),
}
}
}
#[derive(Clone)]
pub struct MetaSrv {
kv_store: KvStoreRef,
}
impl MetaSrv {
pub fn new(kv_store: KvStoreRef) -> Self {
Self { kv_store }
}
pub fn kv_store(&self) -> KvStoreRef {
self.kv_store.clone()
}
}

View File

@@ -0,0 +1,13 @@
use std::pin::Pin;
use futures::Stream;
use tonic::Response;
use tonic::Status;
pub mod admin;
mod heartbeat;
pub mod router;
pub mod store;
pub type GrpcResult<T> = std::result::Result<Response<T>, Status>;
pub type GrpcStream<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send + Sync + 'static>>;

View File

@@ -0,0 +1,248 @@
mod health;
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use tonic::body::BoxBody;
use tonic::codegen::empty_body;
use tonic::codegen::http;
use tonic::codegen::BoxFuture;
use tonic::codegen::Service;
use tonic::transport::NamedService;
use crate::metasrv::MetaSrv;
pub fn make_admin_service(_: MetaSrv) -> Admin {
let router = Router::new().route("/health", health::HealthHandler);
let router = Router::nest("/admin", router);
Admin::new(router)
}
#[async_trait::async_trait]
pub trait HttpHandler: Send + Sync {
async fn handle(
&self,
path: &str,
params: &HashMap<String, String>,
) -> crate::Result<http::Response<String>>;
}
#[derive(Clone)]
pub struct Admin
where
Self: Send,
{
router: Arc<Router>,
}
impl Admin {
pub fn new(router: Router) -> Self {
Self {
router: Arc::new(router),
}
}
}
impl NamedService for Admin {
const NAME: &'static str = "admin";
}
impl<T> Service<http::Request<T>> for Admin
where
T: Send,
{
type Response = http::Response<tonic::body::BoxBody>;
type Error = Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<T>) -> Self::Future {
let router = self.router.clone();
let query_params = req
.uri()
.query()
.map(|q| {
url::form_urlencoded::parse(q.as_bytes())
.into_owned()
.collect()
})
.unwrap_or_else(HashMap::new);
let path = req.uri().path().to_owned();
Box::pin(async move { router.call(&path, query_params).await })
}
}
#[derive(Default)]
pub struct Router {
handlers: HashMap<String, Box<dyn HttpHandler>>,
}
impl Router {
pub fn new() -> Self {
Self {
handlers: HashMap::default(),
}
}
pub fn nest(path: &str, router: Router) -> Self {
check_path(path);
let handlers = router
.handlers
.into_iter()
.map(|(url, handler)| (format!("{path}{url}"), handler))
.collect();
Self { handlers }
}
pub fn route(mut self, path: &str, handler: impl HttpHandler + 'static) -> Self {
check_path(path);
self.handlers.insert(path.to_owned(), Box::new(handler));
self
}
pub async fn call(
&self,
path: &str,
params: HashMap<String, String>,
) -> Result<http::Response<BoxBody>, Infallible> {
let handler = match self.handlers.get(path) {
Some(handler) => handler,
None => {
return Ok(http::Response::builder()
.status(http::StatusCode::NOT_FOUND)
.body(empty_body())
.unwrap())
}
};
let res = match handler.handle(path, &params).await {
Ok(res) => res.map(boxed),
Err(e) => http::Response::builder()
.status(http::StatusCode::INTERNAL_SERVER_ERROR)
.body(boxed(e.to_string()))
.unwrap(),
};
Ok(res)
}
}
fn check_path(path: &str) {
if path.is_empty() || !path.starts_with('/') {
panic!("paths must start with a `/`")
}
}
fn boxed(body: String) -> BoxBody {
use http_body::Body;
body.map_err(|_| panic!("")).boxed_unsync()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error;
struct MockOkHandler;
#[async_trait::async_trait]
impl HttpHandler for MockOkHandler {
async fn handle(
&self,
_: &str,
_: &HashMap<String, String>,
) -> crate::Result<http::Response<String>> {
Ok(http::Response::builder()
.status(http::StatusCode::OK)
.body("Ok".to_string())
.unwrap())
}
}
struct MockEmptyKeyErrorHandler;
#[async_trait::async_trait]
impl HttpHandler for MockEmptyKeyErrorHandler {
async fn handle(
&self,
_: &str,
_: &HashMap<String, String>,
) -> crate::Result<http::Response<String>> {
error::EmptyKeySnafu {}.fail()
}
}
#[test]
fn test_route_nest() {
let mock_handler = MockOkHandler {};
let router = Router::new().route("/test_node", mock_handler);
let router = Router::nest("/test_root", router);
assert_eq!(1, router.handlers.len());
assert!(router.handlers.contains_key("/test_root/test_node"));
}
#[should_panic]
#[test]
fn test_invalid_path() {
check_path("test_node")
}
#[should_panic]
#[test]
fn test_empty_path() {
check_path("")
}
#[tokio::test]
async fn test_route_call_ok() {
let mock_handler = MockOkHandler {};
let router = Router::new().route("/test_node", mock_handler);
let router = Router::nest("/test_root", router);
let res = router
.call("/test_root/test_node", HashMap::default())
.await
.unwrap();
assert!(res.status().is_success());
}
#[tokio::test]
async fn test_route_call_no_handler() {
let router = Router::new();
let res = router
.call("/test_root/test_node", HashMap::default())
.await
.unwrap();
assert_eq!(http::StatusCode::NOT_FOUND, res.status());
}
#[tokio::test]
async fn test_route_call_err() {
let mock_handler = MockEmptyKeyErrorHandler {};
let router = Router::new().route("/test_node", mock_handler);
let router = Router::nest("/test_root", router);
let res = router
.call("/test_root/test_node", HashMap::default())
.await
.unwrap();
assert_eq!(http::StatusCode::INTERNAL_SERVER_ERROR, res.status());
}
}

View File

@@ -0,0 +1,36 @@
use std::collections::HashMap;
use tonic::codegen::http;
use super::HttpHandler;
use crate::error::Result;
const HTTP_OK: &str = "OK\n";
pub struct HealthHandler;
#[async_trait::async_trait]
impl HttpHandler for HealthHandler {
async fn handle(&self, _: &str, _: &HashMap<String, String>) -> Result<http::Response<String>> {
Ok(http::Response::builder()
.status(http::StatusCode::OK)
.body(HTTP_OK.to_owned())
.unwrap())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_health_handle() {
let health_handler = HealthHandler {};
let path = "any";
let params = HashMap::default();
let res = health_handler.handle(path, &params).await.unwrap();
assert!(res.status().is_success());
assert_eq!(HTTP_OK.to_owned(), res.body().to_owned());
}
}

View File

@@ -0,0 +1,159 @@
use api::v1::meta::heartbeat_server;
use api::v1::meta::AskLeaderRequest;
use api::v1::meta::AskLeaderResponse;
use api::v1::meta::Endpoint;
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::HeartbeatResponse;
use api::v1::meta::ResponseHeader;
use api::v1::meta::PROTOCOL_VERSION;
use futures::StreamExt;
use futures::TryFutureExt;
use snafu::OptionExt;
use tonic::Request;
use tonic::Response;
use tonic::Streaming;
use super::store::kv::KvStoreRef;
use super::GrpcResult;
use super::GrpcStream;
use crate::error;
use crate::error::Result;
use crate::metasrv::MetaSrv;
#[async_trait::async_trait]
impl heartbeat_server::Heartbeat for MetaSrv {
type HeartbeatStream = GrpcStream<HeartbeatResponse>;
async fn heartbeat(
&self,
req: Request<Streaming<HeartbeatRequest>>,
) -> GrpcResult<Self::HeartbeatStream> {
let msg = req
.into_inner()
.next()
.await
.context(error::StreamNoneSnafu {})??;
let res = handle_heartbeat(msg).map_err(|e| e.into());
let output = futures::stream::once(res);
Ok(Response::new(Box::pin(output)))
}
async fn ask_leader(&self, req: Request<AskLeaderRequest>) -> GrpcResult<AskLeaderResponse> {
let req = req.into_inner();
let kv_store = self.kv_store();
let res = handle_ask_leader(req, kv_store).await?;
Ok(Response::new(res))
}
}
async fn handle_heartbeat(msg: HeartbeatRequest) -> Result<HeartbeatResponse> {
let HeartbeatRequest { header, .. } = msg;
let res_header = ResponseHeader {
protocol_version: PROTOCOL_VERSION,
cluster_id: header.map_or(0, |h| h.cluster_id),
..Default::default()
};
// TODO(jiachun) Do something high-end
let res = HeartbeatResponse {
header: Some(res_header),
..Default::default()
};
Ok(res)
}
async fn handle_ask_leader(
req: AskLeaderRequest,
_kv_store: KvStoreRef,
) -> Result<AskLeaderResponse> {
let AskLeaderRequest { header, .. } = req;
let res_header = ResponseHeader {
protocol_version: PROTOCOL_VERSION,
cluster_id: header.map_or(0u64, |h| h.cluster_id),
..Default::default()
};
// TODO(jiachun): return leader
let res = AskLeaderResponse {
header: Some(res_header),
leader: Some(Endpoint {
addr: "127.0.0.1:3002".to_string(),
}),
};
Ok(res)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::meta::heartbeat_server::Heartbeat;
use api::v1::meta::*;
use tonic::IntoRequest;
use super::*;
use crate::metasrv::MetaSrvOptions;
use crate::service::store::kv::KvStore;
#[derive(Clone)]
pub struct NoopKvStore {
_opts: MetaSrvOptions,
}
impl NoopKvStore {
pub fn new(opts: MetaSrvOptions) -> Self {
Self { _opts: opts }
}
}
#[async_trait::async_trait]
impl KvStore for NoopKvStore {
async fn range(&self, _req: RangeRequest) -> crate::Result<RangeResponse> {
unreachable!()
}
async fn put(&self, _req: PutRequest) -> crate::Result<PutResponse> {
unreachable!()
}
async fn delete_range(
&self,
_req: DeleteRangeRequest,
) -> crate::Result<DeleteRangeResponse> {
unreachable!()
}
}
#[tokio::test]
async fn test_handle_heartbeat_resp_header() {
let header = RequestHeader::new(1, 2);
let req = HeartbeatRequest::new(header);
let res = handle_heartbeat(req).await.unwrap();
assert_eq!(1, res.header.unwrap().cluster_id);
}
#[tokio::test]
async fn test_ask_leader() {
let kv_store = Arc::new(NoopKvStore::new(MetaSrvOptions::default()));
let meta_srv = MetaSrv::new(kv_store);
let header = RequestHeader::new(1, 1);
let req = AskLeaderRequest::new(header);
let res = meta_srv.ask_leader(req.into_request()).await.unwrap();
let res = res.into_inner();
assert_eq!(1, res.header.unwrap().cluster_id);
assert_eq!("127.0.0.1:3002".to_string(), res.leader.unwrap().addr);
}
}

View File

@@ -0,0 +1,126 @@
use api::v1::meta::router_server;
use api::v1::meta::CreateRequest;
use api::v1::meta::CreateResponse;
use api::v1::meta::Peer;
use api::v1::meta::RouteRequest;
use api::v1::meta::RouteResponse;
use tonic::Request;
use tonic::Response;
use super::store::kv::KvStoreRef;
use super::GrpcResult;
use crate::error::Result;
use crate::metasrv::MetaSrv;
#[async_trait::async_trait]
impl router_server::Router for MetaSrv {
async fn route(&self, req: Request<RouteRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let kv_store = self.kv_store();
let res = handle_route(req, kv_store).await?;
Ok(Response::new(res))
}
async fn create(&self, req: Request<CreateRequest>) -> GrpcResult<CreateResponse> {
let req = req.into_inner();
let kv_store = self.kv_store();
let res = handle_create(req, kv_store).await?;
Ok(Response::new(res))
}
}
async fn handle_route(_req: RouteRequest, _kv_store: KvStoreRef) -> Result<RouteResponse> {
todo!()
}
async fn handle_create(req: CreateRequest, _kv_store: KvStoreRef) -> Result<CreateResponse> {
let CreateRequest { mut regions, .. } = req;
// TODO(jiachun): route table
for r in &mut regions {
r.peer = Some(Peer::new(0, "127.0.0.1:3000"));
}
Ok(CreateResponse {
regions,
..Default::default()
})
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::meta::router_server::Router;
use api::v1::meta::*;
use tonic::IntoRequest;
use super::*;
use crate::service::store::kv::KvStore;
struct MockKvStore;
#[async_trait::async_trait]
impl KvStore for MockKvStore {
async fn range(&self, _req: RangeRequest) -> crate::Result<RangeResponse> {
unreachable!()
}
async fn put(&self, _req: PutRequest) -> crate::Result<PutResponse> {
unreachable!()
}
async fn delete_range(
&self,
_req: DeleteRangeRequest,
) -> crate::Result<DeleteRangeResponse> {
unreachable!()
}
}
#[should_panic]
#[tokio::test]
async fn test_handle_route() {
let kv_store = Arc::new(MockKvStore {});
let meta_srv = MetaSrv::new(kv_store);
let header = RequestHeader::new(1, 1);
let req = RouteRequest::new(header);
let req = req
.add_table(TableName::new("catalog1", "schema1", "table1"))
.add_table(TableName::new("catalog1", "schema1", "table2"))
.add_table(TableName::new("catalog1", "schema1", "table3"));
let _res = meta_srv.route(req.into_request()).await.unwrap();
}
#[tokio::test]
async fn test_handle_create() {
let kv_store = Arc::new(MockKvStore {});
let meta_srv = MetaSrv::new(kv_store);
let header = RequestHeader::new(1, 1);
let table_name = TableName::new("test_catalog", "test_db", "table1");
let req = CreateRequest::new(header, table_name);
let p = region::Partition::new()
.column_list(vec![b"col1".to_vec(), b"col2".to_vec()])
.value_list(vec![b"v1".to_vec(), b"v2".to_vec()]);
let r1 = Region::new(1, "region1", p);
let p = region::Partition::new()
.column_list(vec![b"col1".to_vec(), b"col2".to_vec()])
.value_list(vec![b"v11".to_vec(), b"v22".to_vec()]);
let r2 = Region::new(1, "region2", p);
let req = req.add_region(r1).add_region(r2);
let res = meta_srv.create(req.into_request()).await.unwrap();
for r in res.into_inner().regions {
assert_eq!("127.0.0.1:3000", r.peer.unwrap().endpoint.unwrap().addr);
}
}
}

View File

@@ -0,0 +1,104 @@
pub mod etcd;
pub mod kv;
use api::v1::meta::store_server;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse;
use api::v1::meta::PutRequest;
use api::v1::meta::PutResponse;
use api::v1::meta::RangeRequest;
use api::v1::meta::RangeResponse;
use tonic::Request;
use tonic::Response;
use super::GrpcResult;
use crate::metasrv::MetaSrv;
#[async_trait::async_trait]
impl store_server::Store for MetaSrv {
async fn range(&self, req: Request<RangeRequest>) -> GrpcResult<RangeResponse> {
let req = req.into_inner();
let res = self.kv_store().range(req).await?;
Ok(Response::new(res))
}
async fn put(&self, req: Request<PutRequest>) -> GrpcResult<PutResponse> {
let req = req.into_inner();
let res = self.kv_store().put(req).await?;
Ok(Response::new(res))
}
async fn delete_range(
&self,
req: Request<DeleteRangeRequest>,
) -> GrpcResult<DeleteRangeResponse> {
let req = req.into_inner();
let res = self.kv_store().delete_range(req).await?;
Ok(Response::new(res))
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::meta::store_server::Store;
use api::v1::meta::*;
use tonic::IntoRequest;
use super::*;
use crate::service::store::kv::KvStore;
struct MockKvStore;
#[async_trait::async_trait]
impl KvStore for MockKvStore {
async fn range(&self, _req: RangeRequest) -> crate::Result<RangeResponse> {
Ok(RangeResponse::default())
}
async fn put(&self, _req: PutRequest) -> crate::Result<PutResponse> {
Ok(PutResponse::default())
}
async fn delete_range(
&self,
_req: DeleteRangeRequest,
) -> crate::Result<DeleteRangeResponse> {
Ok(DeleteRangeResponse::default())
}
}
#[tokio::test]
async fn test_range() {
let kv_store = Arc::new(MockKvStore {});
let meta_srv = MetaSrv::new(kv_store);
let req = RangeRequest::default();
let res = meta_srv.range(req.into_request()).await;
assert!(res.is_ok());
}
#[tokio::test]
async fn test_put() {
let kv_store = Arc::new(MockKvStore {});
let meta_srv = MetaSrv::new(kv_store);
let req = PutRequest::default();
let res = meta_srv.put(req.into_request()).await;
assert!(res.is_ok());
}
#[tokio::test]
async fn test_delete_range() {
let kv_store = Arc::new(MockKvStore {});
let meta_srv = MetaSrv::new(kv_store);
let req = DeleteRangeRequest::default();
let res = meta_srv.delete_range(req.into_request()).await;
assert!(res.is_ok());
}
}

View File

@@ -0,0 +1,280 @@
use std::sync::Arc;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse;
use api::v1::meta::KeyValue;
use api::v1::meta::PutRequest;
use api::v1::meta::PutResponse;
use api::v1::meta::RangeRequest;
use api::v1::meta::RangeResponse;
use common_error::prelude::*;
use etcd_client::Client;
use etcd_client::DeleteOptions;
use etcd_client::GetOptions;
use etcd_client::PutOptions;
use super::kv::KvStore;
use super::kv::KvStoreRef;
use crate::error;
use crate::error::Result;
#[derive(Clone)]
pub struct EtcdStore {
client: Client,
}
impl EtcdStore {
pub async fn with_endpoints<E, S>(endpoints: S) -> Result<KvStoreRef>
where
E: AsRef<str>,
S: AsRef<[E]>,
{
let client = Client::connect(endpoints, None)
.await
.context(error::ConnectEtcdSnafu)?;
Ok(Arc::new(Self { client }))
}
}
#[async_trait::async_trait]
impl KvStore for EtcdStore {
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
let Get { key, options } = req.try_into()?;
let res = self
.client
.kv_client()
.get(key, options)
.await
.context(error::EtcdFailedSnafu)?;
let kvs = res
.kvs()
.iter()
.map(|kv| KvPair::new(kv).into())
.collect::<Vec<_>>();
Ok(RangeResponse {
kvs,
more: res.more(),
..Default::default()
})
}
async fn put(&self, req: PutRequest) -> Result<PutResponse> {
let Put {
key,
value,
options,
} = req.try_into()?;
let res = self
.client
.kv_client()
.put(key, value, options)
.await
.context(error::EtcdFailedSnafu)?;
let prev_kv = res.prev_key().map(|kv| KvPair::new(kv).into());
Ok(PutResponse {
prev_kv,
..Default::default()
})
}
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let Delete { key, options } = req.try_into()?;
let res = self
.client
.kv_client()
.delete(key, options)
.await
.context(error::EtcdFailedSnafu)?;
let prev_kvs = res
.prev_kvs()
.iter()
.map(|kv| KvPair::new(kv).into())
.collect::<Vec<_>>();
Ok(DeleteRangeResponse {
deleted: res.deleted(),
prev_kvs,
..Default::default()
})
}
}
struct Get {
key: Vec<u8>,
options: Option<GetOptions>,
}
impl TryFrom<RangeRequest> for Get {
type Error = error::Error;
fn try_from(req: RangeRequest) -> Result<Self> {
let RangeRequest {
key,
range_end,
limit,
keys_only,
..
} = req;
ensure!(!key.is_empty(), error::EmptyKeySnafu);
let mut options = GetOptions::default();
if !range_end.is_empty() {
options = options.with_range(range_end);
if limit > 0 {
options = options.with_limit(limit);
}
}
if keys_only {
options = options.with_keys_only();
}
Ok(Get {
key,
options: Some(options),
})
}
}
struct Put {
key: Vec<u8>,
value: Vec<u8>,
options: Option<PutOptions>,
}
impl TryFrom<PutRequest> for Put {
type Error = error::Error;
fn try_from(req: PutRequest) -> Result<Self> {
let PutRequest {
key,
value,
prev_kv,
..
} = req;
let mut options = PutOptions::default();
if prev_kv {
options = options.with_prev_key();
}
Ok(Put {
key,
value,
options: Some(options),
})
}
}
struct Delete {
key: Vec<u8>,
options: Option<DeleteOptions>,
}
impl TryFrom<DeleteRangeRequest> for Delete {
type Error = error::Error;
fn try_from(req: DeleteRangeRequest) -> Result<Self> {
let DeleteRangeRequest {
key,
range_end,
prev_kv,
..
} = req;
ensure!(!key.is_empty(), error::EmptyKeySnafu);
let mut options = DeleteOptions::default();
if !range_end.is_empty() {
options = options.with_range(range_end);
}
if prev_kv {
options = options.with_prev_key();
}
Ok(Delete {
key,
options: Some(options),
})
}
}
struct KvPair<'a>(&'a etcd_client::KeyValue);
impl<'a> KvPair<'a> {
/// Creates a `KvPair` from etcd KeyValue
#[inline]
const fn new(kv: &'a etcd_client::KeyValue) -> Self {
Self(kv)
}
}
impl<'a> From<KvPair<'a>> for KeyValue {
fn from(kv: KvPair<'a>) -> Self {
Self {
key: kv.0.key().to_vec(),
value: kv.0.value().to_vec(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_get() {
let req = RangeRequest {
key: b"test_key".to_vec(),
range_end: b"test_range_end".to_vec(),
limit: 64,
keys_only: true,
..Default::default()
};
let get: Get = req.try_into().unwrap();
assert_eq!(b"test_key".to_vec(), get.key);
assert!(get.options.is_some());
}
#[test]
fn test_parse_put() {
let req = PutRequest {
key: b"test_key".to_vec(),
value: b"test_value".to_vec(),
prev_kv: true,
..Default::default()
};
let put: Put = req.try_into().unwrap();
assert_eq!(b"test_key".to_vec(), put.key);
assert_eq!(b"test_value".to_vec(), put.value);
assert!(put.options.is_some());
}
#[test]
fn test_parse_delete() {
let req = DeleteRangeRequest {
key: b"test_key".to_vec(),
range_end: b"test_range_end".to_vec(),
prev_kv: true,
..Default::default()
};
let delete: Delete = req.try_into().unwrap();
assert_eq!(b"test_key".to_vec(), delete.key);
assert!(delete.options.is_some());
}
}

View File

@@ -0,0 +1,21 @@
use std::sync::Arc;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse;
use api::v1::meta::PutRequest;
use api::v1::meta::PutResponse;
use api::v1::meta::RangeRequest;
use api::v1::meta::RangeResponse;
use crate::error::Result;
pub type KvStoreRef = Arc<dyn KvStore>;
#[async_trait::async_trait]
pub trait KvStore: Send + Sync {
async fn range(&self, req: RangeRequest) -> Result<RangeResponse>;
async fn put(&self, req: PutRequest) -> Result<PutResponse>;
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse>;
}