From d5b34f8917d6f4a34d19bf4b2391354b72693ed8 Mon Sep 17 00:00:00 2001 From: Jiachun Feng Date: Wed, 19 Oct 2022 11:02:58 +0800 Subject: [PATCH] feat: metasrv (#300) * meta: meta api&client * meta: heartbeat server init * feat: kv store * chore: grpc server * chore: meta server bootstrap * feat: heartbeat client * feat: route for create table * chore: a channel pool manager * feat: route client * feat: store client * chore: meta_client example * chore: change schema * chore: unit test & by cr * chore: refactor meta client * chore: add unit test --- Cargo.lock | 59 +++ Cargo.toml | 2 + config/metasrv.example.toml | 2 + src/api/build.rs | 4 + src/api/greptime/v1/meta/common.proto | 46 +++ src/api/greptime/v1/meta/heartbeat.proto | 82 ++++ src/api/greptime/v1/meta/route.proto | 93 +++++ src/api/greptime/v1/meta/store.proto | 94 +++++ src/api/src/v1.rs | 2 + src/api/src/v1/meta.rs | 139 +++++++ src/cmd/Cargo.toml | 1 + src/cmd/src/bin/greptime.rs | 8 +- src/cmd/src/error.rs | 75 +++- src/cmd/src/lib.rs | 1 + src/cmd/src/metasrv.rs | 112 ++++++ src/common/grpc/Cargo.toml | 3 + src/common/grpc/src/channel_manager.rs | 426 +++++++++++++++++++++ src/common/grpc/src/error.rs | 138 ++++++- src/common/grpc/src/lib.rs | 1 + src/datanode/Cargo.toml | 2 +- src/meta-client/Cargo.toml | 21 + src/meta-client/examples/meta_client.rs | 89 +++++ src/meta-client/src/client.rs | 332 ++++++++++++++++ src/meta-client/src/client/heartbeat.rs | 193 ++++++++++ src/meta-client/src/client/load_balance.rs | 34 ++ src/meta-client/src/client/router.rs | 218 +++++++++++ src/meta-client/src/client/store.rs | 259 +++++++++++++ src/meta-client/src/error.rs | 137 +++++++ src/meta-client/src/lib.rs | 2 + src/meta-srv/Cargo.toml | 26 ++ src/meta-srv/examples/kv_store.rs | 48 +++ src/meta-srv/src/bootstrap.rs | 37 ++ src/meta-srv/src/error.rs | 145 +++++++ src/meta-srv/src/lib.rs | 6 + src/meta-srv/src/metasrv.rs | 34 ++ src/meta-srv/src/service.rs | 13 + src/meta-srv/src/service/admin.rs | 248 ++++++++++++ src/meta-srv/src/service/admin/health.rs | 36 ++ src/meta-srv/src/service/heartbeat.rs | 159 ++++++++ src/meta-srv/src/service/router.rs | 126 ++++++ src/meta-srv/src/service/store.rs | 104 +++++ src/meta-srv/src/service/store/etcd.rs | 280 ++++++++++++++ src/meta-srv/src/service/store/kv.rs | 21 + 43 files changed, 3846 insertions(+), 12 deletions(-) create mode 100644 config/metasrv.example.toml create mode 100644 src/api/greptime/v1/meta/common.proto create mode 100644 src/api/greptime/v1/meta/heartbeat.proto create mode 100644 src/api/greptime/v1/meta/route.proto create mode 100644 src/api/greptime/v1/meta/store.proto create mode 100644 src/api/src/v1/meta.rs create mode 100644 src/cmd/src/metasrv.rs create mode 100644 src/common/grpc/src/channel_manager.rs create mode 100644 src/meta-client/Cargo.toml create mode 100644 src/meta-client/examples/meta_client.rs create mode 100644 src/meta-client/src/client.rs create mode 100644 src/meta-client/src/client/heartbeat.rs create mode 100644 src/meta-client/src/client/load_balance.rs create mode 100644 src/meta-client/src/client/router.rs create mode 100644 src/meta-client/src/client/store.rs create mode 100644 src/meta-client/src/error.rs create mode 100644 src/meta-client/src/lib.rs create mode 100644 src/meta-srv/Cargo.toml create mode 100644 src/meta-srv/examples/kv_store.rs create mode 100644 src/meta-srv/src/bootstrap.rs create mode 100644 src/meta-srv/src/error.rs create mode 100644 src/meta-srv/src/lib.rs create mode 100644 src/meta-srv/src/metasrv.rs create mode 100644 src/meta-srv/src/service.rs create mode 100644 src/meta-srv/src/service/admin.rs create mode 100644 src/meta-srv/src/service/admin/health.rs create mode 100644 src/meta-srv/src/service/heartbeat.rs create mode 100644 src/meta-srv/src/service/router.rs create mode 100644 src/meta-srv/src/service/store.rs create mode 100644 src/meta-srv/src/service/store/etcd.rs create mode 100644 src/meta-srv/src/service/store/kv.rs diff --git a/Cargo.lock b/Cargo.lock index ccde0f7d14..fbdeae5405 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -837,6 +837,7 @@ dependencies = [ "datanode", "frontend", "futures", + "meta-srv", "serde", "snafu", "tempdir", @@ -919,8 +920,11 @@ dependencies = [ "async-trait", "common-base", "common-error", + "common-runtime", "datafusion", "snafu", + "tokio", + "tonic", ] [[package]] @@ -1614,6 +1618,22 @@ dependencies = [ "str-buf", ] +[[package]] +name = "etcd-client" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1259da3b15ec7e54bd7203adb2c4335adb9ca1d47b56220d650e52c247e824a" +dependencies = [ + "http", + "prost", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", + "tower", + "tower-service", +] + [[package]] name = "event-listener" version = "2.5.3" @@ -2606,6 +2626,45 @@ dependencies = [ "autocfg", ] +[[package]] +name = "meta-client" +version = "0.1.0" +dependencies = [ + "api", + "common-error", + "common-grpc", + "common-telemetry", + "etcd-client", + "rand 0.8.5", + "snafu", + "tokio", + "tonic", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "meta-srv" +version = "0.1.0" +dependencies = [ + "api", + "async-trait", + "common-base", + "common-error", + "common-telemetry", + "etcd-client", + "futures", + "http-body", + "serde", + "snafu", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-subscriber", + "url", +] + [[package]] name = "metrics" version = "0.20.1" diff --git a/Cargo.toml b/Cargo.toml index 7deaf5eee7..4d3dfea88f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,8 @@ members = [ "src/frontend", "src/log-store", "src/logical-plans", + "src/meta-client", + "src/meta-srv", "src/object-store", "src/query", "src/script", diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml new file mode 100644 index 0000000000..1fdf4bc8cf --- /dev/null +++ b/config/metasrv.example.toml @@ -0,0 +1,2 @@ +server_addr = '0.0.0.0:3002' +store_addr = '127.0.0.1:2380' diff --git a/src/api/build.rs b/src/api/build.rs index 412a3fdcbd..1b8e171cb5 100644 --- a/src/api/build.rs +++ b/src/api/build.rs @@ -6,6 +6,10 @@ fn main() { "greptime/v1/select.proto", "greptime/v1/physical_plan.proto", "greptime/v1/greptime.proto", + "greptime/v1/meta/common.proto", + "greptime/v1/meta/heartbeat.proto", + "greptime/v1/meta/route.proto", + "greptime/v1/meta/store.proto", ], &["."], ) diff --git a/src/api/greptime/v1/meta/common.proto b/src/api/greptime/v1/meta/common.proto new file mode 100644 index 0000000000..3d3259b7f1 --- /dev/null +++ b/src/api/greptime/v1/meta/common.proto @@ -0,0 +1,46 @@ +syntax = "proto3"; + +package greptime.v1.meta; + +message RequestHeader { + uint64 protocol_version = 1; + // cluster_id is the ID of the cluster which be sent to. + uint64 cluster_id = 2; + // member_id is the ID of the sender server. + uint64 member_id = 3; +} + +message ResponseHeader { + uint64 protocol_version = 1; + // cluster_id is the ID of the cluster which sent the response. + uint64 cluster_id = 2; + Error error = 3; +} + +message Error { + int32 code = 1; + string err_msg = 2; +} + +message Peer { + uint64 id = 1; + Endpoint endpoint = 2; +} + +message Endpoint { + string addr = 1; +} + +message TimeInterval { + // The unix timestamp in millis of the start of this period. + uint64 start_timestamp_millis = 1; + // The unix timestamp in millis of the end of this period. + uint64 end_timestamp_millis = 2; +} + +message KeyValue { + // key is the key in bytes. An empty key is not allowed. + bytes key = 1; + // value is the value held by the key, in bytes. + bytes value = 2; +} diff --git a/src/api/greptime/v1/meta/heartbeat.proto b/src/api/greptime/v1/meta/heartbeat.proto new file mode 100644 index 0000000000..c8c48363f7 --- /dev/null +++ b/src/api/greptime/v1/meta/heartbeat.proto @@ -0,0 +1,82 @@ +syntax = "proto3"; + +package greptime.v1.meta; + +import "greptime/v1/meta/common.proto"; + +service Heartbeat { + // Heartbeat, there may be many contents of the heartbeat, such as: + // 1. Metadata to be registered to meta server and discoverable by other nodes. + // 2. Some performance metrics, such as Load, CPU usage, etc. + // 3. The number of computing tasks being executed. + rpc Heartbeat(stream HeartbeatRequest) returns (stream HeartbeatResponse) {} + + // Ask leader's endpoint. + rpc AskLeader(AskLeaderRequest) returns (AskLeaderResponse) {} +} + +message HeartbeatRequest { + RequestHeader header = 1; + + // Leader node + bool is_leader = 2; + // Leader Peer + Endpoint leader_endpoint = 3; + // Actually reported time interval + TimeInterval report_interval = 4; + // Node stat + NodeStat node_stat = 5; + // Region stats in this node + repeated RegionStat region_stats = 6; + // Follower nodes and stats, empty on follower nodes + repeated ReplicaStat replica_stats = 7; +} + +message NodeStat { + // The read capacity units during this period + uint64 rcus = 1; + // The write capacity units during this period + uint64 wcus = 2; + // Table number in this node + uint64 table_num = 3; + // Regon number in this node + uint64 region_num = 4; + + double cpu_usage = 5; + double load = 6; + // Read disk I/O in the node + double read_io_rate = 7; + // Write disk I/O in the node + double write_io_rate = 8; +} + +message RegionStat { + string table_name = 1; + uint64 region_id = 2; + // The read capacity units during this period + uint64 rcus = 3; + // The write capacity units during this period + uint64 wcus = 4; +} + +message ReplicaStat { + Peer peer = 1; + bool in_sync = 2; + bool is_learner = 3; +} + +message HeartbeatResponse { + ResponseHeader header = 1; + + repeated bytes payload = 2; +} + +message AskLeaderRequest { + RequestHeader header = 1; +} + +message AskLeaderResponse { + ResponseHeader header = 1; + + Endpoint leader = 2; +} diff --git a/src/api/greptime/v1/meta/route.proto b/src/api/greptime/v1/meta/route.proto new file mode 100644 index 0000000000..448b74619c --- /dev/null +++ b/src/api/greptime/v1/meta/route.proto @@ -0,0 +1,93 @@ +syntax = "proto3"; + +package greptime.v1.meta; + +import "greptime/v1/meta/common.proto"; + +service Router { + // Fetch routing information for tables. The smallest unit is the complete + // routing information(all regions) of a table. + // + // table_1 + // table_name + // table_schema + // regions + // region_1 + // mutate_endpoint + // select_endpoint_1, select_endpoint_2 + // region_2 + // mutate_endpoint + // select_endpoint_1, select_endpoint_2, select_endpoint_3 + // region_xxx + // table_2 + // ... + // + rpc Route(RouteRequest) returns (RouteResponse) {} + + rpc Create(CreateRequest) returns (CreateResponse) {} +} + +message RouteRequest { + RequestHeader header = 1; + + repeated TableName table_names = 2; +} + +message RouteResponse { + ResponseHeader header = 1; + + repeated Peer peers = 2; + repeated TableRoute table_routes = 3; +} + +message CreateRequest { + RequestHeader header = 1; + + TableName table_name = 2; + repeated Region regions = 3; +} + +message CreateResponse { + ResponseHeader header = 1; + + repeated Region regions = 2; +} + +message TableRoute { + Table table = 1; + repeated RegionRoute region_routes = 2; +} + +message RegionRoute { + Region region = 1; + // single leader node for write task + uint64 leader_peer_index = 2; + // multiple follower nodes for read task + repeated uint64 follower_peer_indexes = 3; +} + +message TableName { + string catalog_name = 1; + string schema_name = 2; + string table_name = 3; +} + +message Table { + TableName table_name = 1; + bytes table_schema = 2; +} + +message Region { + uint64 id = 1; + string name = 2; + Peer peer = 3; + + // PARTITION `region_name` VALUES LESS THAN (value_list) + message Partition { + repeated bytes column_list = 1; + repeated bytes value_list = 2; + } + Partition partition = 4; + + map attrs = 5; +} diff --git a/src/api/greptime/v1/meta/store.proto b/src/api/greptime/v1/meta/store.proto new file mode 100644 index 0000000000..acb2312ab1 --- /dev/null +++ b/src/api/greptime/v1/meta/store.proto @@ -0,0 +1,94 @@ +syntax = "proto3"; + +package greptime.v1.meta; + +import "greptime/v1/meta/common.proto"; + +service Store { + // Range gets the keys in the range from the key-value store. + rpc Range(RangeRequest) returns (RangeResponse); + + // Put puts the given key into the key-value store. + rpc Put(PutRequest) returns (PutResponse); + + // DeleteRange deletes the given range from the key-value store. + rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse); +} + +message RangeRequest { + RequestHeader header = 1; + + // key is the first key for the range, If range_end is not given, the + // request only looks up key. + bytes key = 2; + // range_end is the upper bound on the requested range [key, range_end). + // If range_end is '\0', the range is all keys >= key. + // If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"), + // then the range request gets all keys prefixed with key. + // If both key and range_end are '\0', then the range request returns all + // keys. + bytes range_end = 3; + // limit is a limit on the number of keys returned for the request. When + // limit is set to 0, it is treated as no limit. + int64 limit = 4; + // keys_only when set returns only the keys and not the values. + bool keys_only = 5; +} + +message RangeResponse { + ResponseHeader header = 1; + + // kvs is the list of key-value pairs matched by the range request. + repeated KeyValue kvs = 2; + bool more = 3; +} + +message PutRequest { + RequestHeader header = 1; + + // key is the key, in bytes, to put into the key-value store. + bytes key = 2; + // value is the value, in bytes, to associate with the key in the + // key-value store. + bytes value = 3; + // If prev_kv is set, gets the previous key-value pair before changing it. + // The previous key-value pair will be returned in the put response. + bool prev_kv = 4; +} + +message PutResponse { + ResponseHeader header = 1; + + // If prev_kv is set in the request, the previous key-value pair will be + // returned. + KeyValue prev_kv = 2; +} + +message DeleteRangeRequest { + RequestHeader header = 1; + + // key is the first key to delete in the range. + bytes key = 2; + // range_end is the key following the last key to delete for the range + // [key, range_end). + // If range_end is not given, the range is defined to contain only the key + // argument. + // If range_end is one bit larger than the given key, then the range is all + // the keys with the prefix (the given key). + // If range_end is '\0', the range is all keys greater than or equal to the + // key argument. + bytes range_end = 3; + // If prev_kv is set, gets the previous key-value pairs before deleting it. + // The previous key-value pairs will be returned in the delete response. + bool prev_kv = 4; +} + +message DeleteRangeResponse { + ResponseHeader header = 1; + + // deleted is the number of keys deleted by the delete range request. + int64 deleted = 2; + // If prev_kv is set in the request, the previous key-value pairs will be + // returned. + repeated KeyValue prev_kvs = 3; +} diff --git a/src/api/src/v1.rs b/src/api/src/v1.rs index 059593da5c..1c5ceaade8 100644 --- a/src/api/src/v1.rs +++ b/src/api/src/v1.rs @@ -4,3 +4,5 @@ tonic::include_proto!("greptime.v1"); pub mod codec { tonic::include_proto!("greptime.v1.codec"); } + +pub mod meta; diff --git a/src/api/src/v1/meta.rs b/src/api/src/v1/meta.rs new file mode 100644 index 0000000000..16fbc506cb --- /dev/null +++ b/src/api/src/v1/meta.rs @@ -0,0 +1,139 @@ +tonic::include_proto!("greptime.v1.meta"); + +pub const PROTOCOL_VERSION: u64 = 1; + +impl Peer { + pub fn new(id: u64, addr: impl AsRef) -> Self { + Self { + id, + endpoint: Some(addr.as_ref().into()), + } + } +} + +impl From<&str> for Endpoint { + fn from(s: &str) -> Self { + Self { + addr: s.to_string(), + } + } +} + +impl RequestHeader { + pub fn new(cluster_id: u64, member_id: u64) -> RequestHeader { + RequestHeader { + protocol_version: PROTOCOL_VERSION, + cluster_id, + member_id, + } + } +} + +impl HeartbeatRequest { + pub fn new(header: RequestHeader) -> Self { + Self { + header: Some(header), + ..Default::default() + } + } +} + +impl AskLeaderRequest { + pub fn new(header: RequestHeader) -> Self { + Self { + header: Some(header), + } + } +} + +impl TableName { + pub fn new( + catalog: impl Into, + schema: impl Into, + table: impl Into, + ) -> Self { + Self { + catalog_name: catalog.into(), + schema_name: schema.into(), + table_name: table.into(), + } + } +} + +impl RouteRequest { + pub fn new(header: RequestHeader) -> Self { + Self { + header: Some(header), + ..Default::default() + } + } + + pub fn add_table(mut self, table_name: TableName) -> Self { + self.table_names.push(table_name); + self + } +} + +impl CreateRequest { + pub fn new(header: RequestHeader, table_name: TableName) -> Self { + Self { + header: Some(header), + table_name: Some(table_name), + ..Default::default() + } + } + + pub fn add_region(mut self, region: Region) -> Self { + self.regions.push(region); + self + } +} + +impl Region { + pub fn new(id: u64, name: impl Into, partition: region::Partition) -> Self { + Self { + id, + name: name.into(), + partition: Some(partition), + ..Default::default() + } + } + + pub fn attr(mut self, key: impl Into, val: impl Into) -> Self { + self.attrs.insert(key.into(), val.into()); + self + } +} + +impl region::Partition { + pub fn new() -> Self { + Default::default() + } + + pub fn column_list(mut self, column_list: Vec>) -> Self { + self.column_list = column_list; + self + } + + pub fn value_list(mut self, value_list: Vec>) -> Self { + self.value_list = value_list; + self + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_peer() { + let peer = Peer::new(1, "test_addr"); + assert_eq!(1, peer.id); + assert_eq!( + Endpoint { + addr: "test_addr".to_string() + }, + peer.endpoint.unwrap() + ); + } +} diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 1cb37040ad..fc13e7daf7 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -14,6 +14,7 @@ common-telemetry = { path = "../common/telemetry", features = ["deadlock_detecti datanode = { path = "../datanode" } frontend = { path = "../frontend" } futures = "0.3" +meta-srv = { path = "../meta-srv" } snafu = { version = "0.7", features = ["backtraces"] } tokio = { version = "1.18", features = ["full"] } toml = "0.5" diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index 3713f4ab4c..75243a645f 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -4,7 +4,9 @@ use clap::Parser; use cmd::datanode; use cmd::error::Result; use cmd::frontend; -use common_telemetry::{self, logging::error, logging::info}; +use cmd::metasrv; +use common_telemetry::logging::error; +use common_telemetry::logging::info; #[derive(Parser)] #[clap(name = "greptimedb")] @@ -29,6 +31,8 @@ enum SubCommand { Datanode(datanode::Command), #[clap(name = "frontend")] Frontend(frontend::Command), + #[clap(name = "metasrv")] + Metasrv(metasrv::Command), } impl SubCommand { @@ -36,6 +40,7 @@ impl SubCommand { match self { SubCommand::Datanode(cmd) => cmd.run().await, SubCommand::Frontend(cmd) => cmd.run().await, + SubCommand::Metasrv(cmd) => cmd.run().await, } } } @@ -45,6 +50,7 @@ impl fmt::Display for SubCommand { match self { SubCommand::Datanode(..) => write!(f, "greptime-datanode"), SubCommand::Frontend(..) => write!(f, "greptime-frontend"), + SubCommand::Metasrv(..) => write!(f, "greptime-metasrv"), } } } diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 9247a8abc2..5db41d96b2 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -17,14 +17,24 @@ pub enum Error { source: frontend::error::Error, }, + #[snafu(display("Failed to start meta server, source: {}", source))] + StartMetaServer { + #[snafu(backtrace)] + source: meta_srv::error::Error, + }, + #[snafu(display("Failed to read config file: {}, source: {}", path, source))] ReadConfig { - source: std::io::Error, path: String, + source: std::io::Error, + backtrace: Backtrace, }, #[snafu(display("Failed to parse config, source: {}", source))] - ParseConfig { source: toml::de::Error }, + ParseConfig { + source: toml::de::Error, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -34,6 +44,7 @@ impl ErrorExt for Error { match self { Error::StartDatanode { source } => source.status_code(), Error::StartFrontend { source } => source.status_code(), + Error::StartMetaServer { source } => source.status_code(), Error::ReadConfig { .. } | Error::ParseConfig { .. } => StatusCode::InvalidArguments, } } @@ -51,18 +62,68 @@ impl ErrorExt for Error { mod tests { use super::*; - fn raise_read_config_error() -> std::result::Result<(), std::io::Error> { - Err(std::io::ErrorKind::NotFound.into()) + type StdResult = std::result::Result<(), E>; + + #[test] + fn test_start_node_error() { + fn throw_datanode_error() -> StdResult { + datanode::error::MissingFieldSnafu { + field: "test_field", + } + .fail() + } + + let e = throw_datanode_error() + .context(StartDatanodeSnafu) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::InvalidArguments); } #[test] - fn test_error() { - let e = raise_read_config_error() + fn test_start_frontend_error() { + fn throw_frontend_error() -> StdResult { + frontend::error::InvalidSqlSnafu { err_msg: "failed" }.fail() + } + + let e = throw_frontend_error() + .context(StartFrontendSnafu) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::InvalidArguments); + } + + #[test] + fn test_start_metasrv_error() { + fn throw_metasrv_error() -> StdResult { + meta_srv::error::StreamNoneSnafu {}.fail() + } + + let e = throw_metasrv_error() + .context(StartMetaServerSnafu) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } + + #[test] + fn test_read_config_error() { + fn throw_read_config_error() -> StdResult { + Err(std::io::ErrorKind::NotFound.into()) + } + + let e = throw_read_config_error() .context(ReadConfigSnafu { path: "test" }) .err() .unwrap(); - assert!(e.backtrace_opt().is_none()); + assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::InvalidArguments); } } diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 0bffa92fd1..09ac5bbd35 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -1,4 +1,5 @@ pub mod datanode; pub mod error; pub mod frontend; +pub mod metasrv; mod toml_loader; diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs new file mode 100644 index 0000000000..f070b5d37e --- /dev/null +++ b/src/cmd/src/metasrv.rs @@ -0,0 +1,112 @@ +use clap::Parser; +use common_telemetry::logging; +use meta_srv::bootstrap; +use meta_srv::metasrv::MetaSrvOptions; +use snafu::ResultExt; + +use crate::error; +use crate::error::Error; +use crate::error::Result; +use crate::toml_loader; + +#[derive(Parser)] +pub struct Command { + #[clap(subcommand)] + subcmd: SubCommand, +} + +impl Command { + pub async fn run(self) -> Result<()> { + self.subcmd.run().await + } +} + +#[derive(Parser)] +enum SubCommand { + Start(StartCommand), +} + +impl SubCommand { + async fn run(self) -> Result<()> { + match self { + SubCommand::Start(cmd) => cmd.run().await, + } + } +} + +#[derive(Debug, Parser)] +struct StartCommand { + #[clap(long)] + server_addr: Option, + #[clap(long)] + store_addr: Option, + #[clap(short, long)] + config_file: Option, +} + +impl StartCommand { + async fn run(self) -> Result<()> { + logging::info!("MetaSrv start command: {:#?}", self); + + let opts: MetaSrvOptions = self.try_into()?; + + logging::info!("MetaSrv options: {:#?}", opts); + + bootstrap::bootstrap_meta_srv(opts) + .await + .context(error::StartMetaServerSnafu) + } +} + +impl TryFrom for MetaSrvOptions { + type Error = Error; + + fn try_from(cmd: StartCommand) -> Result { + let mut opts: MetaSrvOptions = if let Some(path) = cmd.config_file { + toml_loader::from_file!(&path)? + } else { + MetaSrvOptions::default() + }; + + if let Some(addr) = cmd.server_addr { + opts.server_addr = addr; + } + if let Some(addr) = cmd.store_addr { + opts.store_addr = addr; + } + + Ok(opts) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_read_from_cmd() { + let cmd = StartCommand { + server_addr: Some("0.0.0.0:3002".to_string()), + store_addr: Some("127.0.0.1:2380".to_string()), + config_file: None, + }; + let options: MetaSrvOptions = cmd.try_into().unwrap(); + assert_eq!("0.0.0.0:3002".to_string(), options.server_addr); + assert_eq!("127.0.0.1:2380".to_string(), options.store_addr); + } + + #[test] + fn test_read_from_config_file() { + let cmd = StartCommand { + server_addr: None, + store_addr: None, + config_file: Some(format!( + "{}/../../config/metasrv.example.toml", + std::env::current_dir().unwrap().as_path().to_str().unwrap() + )), + }; + let options: MetaSrvOptions = cmd.try_into().unwrap(); + assert_eq!("0.0.0.0:3002".to_string(), options.server_addr); + assert_eq!("127.0.0.1:2380".to_string(), options.store_addr); + } +} diff --git a/src/common/grpc/Cargo.toml b/src/common/grpc/Cargo.toml index 72f23fe0f2..50d3b48c22 100644 --- a/src/common/grpc/Cargo.toml +++ b/src/common/grpc/Cargo.toml @@ -8,8 +8,11 @@ api = { path = "../../api" } async-trait = "0.1" common-base = { path = "../base" } common-error = { path = "../error" } +common-runtime = { path = "../runtime" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } snafu = { version = "0.7", features = ["backtraces"] } +tokio = { version = "1.0", features = ["full"] } +tonic = "0.8" [dependencies.arrow] package = "arrow2" diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs new file mode 100644 index 0000000000..25545c5475 --- /dev/null +++ b/src/common/grpc/src/channel_manager.rs @@ -0,0 +1,426 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::Mutex; +use std::time::Duration; + +use snafu::ResultExt; +use tonic::transport::Channel as InnerChannel; +use tonic::transport::Endpoint; + +use crate::error; +use crate::error::Result; + +const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60; + +#[derive(Clone, Debug)] +pub struct ChannelManager { + config: Option, + pool: Arc>, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ChannelConfig { + pub timeout: Option, + pub connect_timeout: Option, + pub concurrency_limit: Option, + pub rate_limit: Option<(u64, Duration)>, + pub initial_stream_window_size: Option, + pub initial_connection_window_size: Option, + pub http2_keep_alive_interval: Option, + pub http2_keep_alive_timeout: Option, + pub http2_keep_alive_while_idle: Option, + pub http2_adaptive_window: Option, + pub tcp_keepalive: Option, + pub tcp_nodelay: bool, +} + +impl Default for ChannelConfig { + fn default() -> Self { + Self { + timeout: None, + connect_timeout: None, + concurrency_limit: None, + rate_limit: None, + initial_stream_window_size: None, + initial_connection_window_size: None, + http2_keep_alive_interval: None, + http2_keep_alive_timeout: None, + http2_keep_alive_while_idle: None, + http2_adaptive_window: None, + tcp_keepalive: None, + tcp_nodelay: true, + } + } +} + +impl ChannelConfig { + pub fn new() -> Self { + Default::default() + } + + /// A timeout to each request. + pub fn timeout(self, timeout: Duration) -> Self { + Self { + timeout: Some(timeout), + ..self + } + } + + /// A timeout to connecting to the uri. + /// + /// Defaults to no timeout. + pub fn connect_timeout(self, timeout: Duration) -> Self { + Self { + connect_timeout: Some(timeout), + ..self + } + } + + /// A concurrency limit to each request. + pub fn concurrency_limit(self, limit: usize) -> Self { + Self { + concurrency_limit: Some(limit), + ..self + } + } + + /// A rate limit to each request. + pub fn rate_limit(self, limit: u64, duration: Duration) -> Self { + Self { + rate_limit: Some((limit, duration)), + ..self + } + } + + /// Sets the SETTINGS_INITIAL_WINDOW_SIZE option for HTTP2 stream-level flow control. + /// Default is 65,535 + pub fn initial_stream_window_size(self, size: u32) -> Self { + Self { + initial_stream_window_size: Some(size), + ..self + } + } + + /// Sets the max connection-level flow control for HTTP2 + /// + /// Default is 65,535 + pub fn initial_connection_window_size(self, size: u32) -> Self { + Self { + initial_connection_window_size: Some(size), + ..self + } + } + + /// Set http2 KEEP_ALIVE_INTERVAL. Uses hyper’s default otherwise. + pub fn http2_keep_alive_interval(self, duration: Duration) -> Self { + Self { + http2_keep_alive_interval: Some(duration), + ..self + } + } + + /// Set http2 KEEP_ALIVE_TIMEOUT. Uses hyper’s default otherwise. + pub fn http2_keep_alive_timeout(self, duration: Duration) -> Self { + Self { + http2_keep_alive_timeout: Some(duration), + ..self + } + } + + /// Set http2 KEEP_ALIVE_WHILE_IDLE. Uses hyper’s default otherwise. + pub fn http2_keep_alive_while_idle(self, enabled: bool) -> Self { + Self { + http2_keep_alive_while_idle: Some(enabled), + ..self + } + } + + /// Sets whether to use an adaptive flow control. Uses hyper’s default otherwise. + pub fn http2_adaptive_window(self, enabled: bool) -> Self { + Self { + http2_adaptive_window: Some(enabled), + ..self + } + } + + /// Set whether TCP keepalive messages are enabled on accepted connections. + /// + /// If None is specified, keepalive is disabled, otherwise the duration specified + /// will be the time to remain idle before sending TCP keepalive probes. + /// + /// Default is no keepalive (None) + pub fn tcp_keepalive(self, duration: Duration) -> Self { + Self { + tcp_keepalive: Some(duration), + ..self + } + } + + /// Set the value of TCP_NODELAY option for accepted connections. + /// + /// Enabled by default. + pub fn tcp_nodelay(self, enabled: bool) -> Self { + Self { + tcp_nodelay: enabled, + ..self + } + } +} + +#[derive(Debug)] +struct Pool { + channels: HashMap, +} + +#[derive(Debug)] +struct Channel { + channel: InnerChannel, + access: usize, +} + +impl ChannelManager { + pub fn new() -> Self { + Default::default() + } + + pub fn with_config(config: ChannelConfig) -> Self { + let mut manager = ChannelManager::new(); + manager.config = Some(config); + manager + } + + pub fn config(&self) -> Option { + self.config.clone() + } + + pub fn get(&self, addr: impl AsRef) -> Result { + let addr = addr.as_ref(); + let mut pool = self.pool.lock().unwrap(); + if let Some(ch) = pool.get_mut(addr) { + ch.access += 1; + return Ok(ch.channel.clone()); + } + + let mut endpoint = + Endpoint::new(format!("http://{}", addr)).context(error::CreateChannelSnafu)?; + + if let Some(cfg) = &self.config { + if let Some(dur) = cfg.timeout { + endpoint = endpoint.timeout(dur); + } + if let Some(dur) = cfg.connect_timeout { + endpoint = endpoint.connect_timeout(dur); + } + if let Some(limit) = cfg.concurrency_limit { + endpoint = endpoint.concurrency_limit(limit); + } + if let Some((limit, dur)) = cfg.rate_limit { + endpoint = endpoint.rate_limit(limit, dur); + } + if let Some(size) = cfg.initial_stream_window_size { + endpoint = endpoint.initial_stream_window_size(size); + } + if let Some(size) = cfg.initial_connection_window_size { + endpoint = endpoint.initial_connection_window_size(size); + } + if let Some(dur) = cfg.http2_keep_alive_interval { + endpoint = endpoint.http2_keep_alive_interval(dur); + } + if let Some(dur) = cfg.http2_keep_alive_timeout { + endpoint = endpoint.keep_alive_timeout(dur); + } + if let Some(enabled) = cfg.http2_keep_alive_while_idle { + endpoint = endpoint.keep_alive_while_idle(enabled); + } + if let Some(enabled) = cfg.http2_adaptive_window { + endpoint = endpoint.http2_adaptive_window(enabled); + } + endpoint = endpoint + .tcp_keepalive(cfg.tcp_keepalive) + .tcp_nodelay(cfg.tcp_nodelay); + } + + let inner_channel = endpoint.connect_lazy(); + let channel = Channel { + channel: inner_channel.clone(), + access: 1, + }; + pool.put(addr, channel); + + Ok(inner_channel) + } +} + +impl Pool { + #[inline] + fn get_mut(&mut self, addr: &str) -> Option<&mut Channel> { + self.channels.get_mut(addr) + } + + #[inline] + fn put(&mut self, addr: &str, channel: Channel) { + self.channels.insert(addr.to_string(), channel); + } + + #[inline] + fn retain_channel(&mut self, f: F) + where + F: FnMut(&String, &mut Channel) -> bool, + { + self.channels.retain(f); + } +} + +impl Default for ChannelManager { + fn default() -> Self { + let pool = Pool { + channels: HashMap::default(), + }; + let pool = Arc::new(Mutex::new(pool)); + let cloned_pool = pool.clone(); + + common_runtime::spawn_bg(async move { + recycle_channel_in_loop(cloned_pool, RECYCLE_CHANNEL_INTERVAL_SECS).await; + }); + + Self { pool, config: None } + } +} + +async fn recycle_channel_in_loop(pool: Arc>, interval_secs: u64) { + let mut interval = tokio::time::interval(Duration::from_secs(interval_secs)); + + loop { + interval.tick().await; + let mut pool = pool.lock().unwrap(); + pool.retain_channel(|_, c| { + if c.access == 0 { + false + } else { + c.access = 0; + true + } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[should_panic] + #[test] + fn test_invalid_addr() { + let pool = Pool { + channels: HashMap::default(), + }; + let pool = Arc::new(Mutex::new(pool)); + let mgr = ChannelManager { pool, config: None }; + let addr = "http://test"; + + let _ = mgr.get(addr).unwrap(); + } + + #[tokio::test] + async fn test_access_count() { + let pool = Pool { + channels: HashMap::default(), + }; + let pool = Arc::new(Mutex::new(pool)); + let config = ChannelConfig::new() + .timeout(Duration::from_secs(1)) + .connect_timeout(Duration::from_secs(1)) + .concurrency_limit(1) + .rate_limit(1, Duration::from_secs(1)) + .initial_stream_window_size(1) + .initial_connection_window_size(1) + .http2_keep_alive_interval(Duration::from_secs(1)) + .http2_keep_alive_timeout(Duration::from_secs(1)) + .http2_keep_alive_while_idle(true) + .http2_adaptive_window(true) + .tcp_keepalive(Duration::from_secs(1)) + .tcp_nodelay(true); + let mgr = ChannelManager { + pool, + config: Some(config), + }; + let addr = "test_uri"; + + for i in 0..10 { + { + let _ = mgr.get(addr).unwrap(); + let mut pool = mgr.pool.lock().unwrap(); + assert_eq!(i + 1, pool.get_mut(addr).unwrap().access); + } + } + + let mut pool = mgr.pool.lock().unwrap(); + + assert_eq!(10, pool.get_mut(addr).unwrap().access); + + pool.retain_channel(|_, c| { + if c.access == 0 { + false + } else { + c.access = 0; + true + } + }); + + assert_eq!(0, pool.get_mut(addr).unwrap().access); + } + + #[test] + fn test_config() { + let default_cfg = ChannelConfig::new(); + assert_eq!( + ChannelConfig { + timeout: None, + connect_timeout: None, + concurrency_limit: None, + rate_limit: None, + initial_stream_window_size: None, + initial_connection_window_size: None, + http2_keep_alive_interval: None, + http2_keep_alive_timeout: None, + http2_keep_alive_while_idle: None, + http2_adaptive_window: None, + tcp_keepalive: None, + tcp_nodelay: true, + }, + default_cfg + ); + + let cfg = default_cfg + .timeout(Duration::from_secs(3)) + .connect_timeout(Duration::from_secs(5)) + .concurrency_limit(6) + .rate_limit(5, Duration::from_secs(1)) + .initial_stream_window_size(10) + .initial_connection_window_size(20) + .http2_keep_alive_interval(Duration::from_secs(1)) + .http2_keep_alive_timeout(Duration::from_secs(3)) + .http2_keep_alive_while_idle(true) + .http2_adaptive_window(true) + .tcp_keepalive(Duration::from_secs(2)) + .tcp_nodelay(false); + + assert_eq!( + ChannelConfig { + timeout: Some(Duration::from_secs(3)), + connect_timeout: Some(Duration::from_secs(5)), + concurrency_limit: Some(6), + rate_limit: Some((5, Duration::from_secs(1))), + initial_stream_window_size: Some(10), + initial_connection_window_size: Some(20), + http2_keep_alive_interval: Some(Duration::from_secs(1)), + http2_keep_alive_timeout: Some(Duration::from_secs(3)), + http2_keep_alive_while_idle: Some(true), + http2_adaptive_window: Some(true), + tcp_keepalive: Some(Duration::from_secs(2)), + tcp_nodelay: false, + }, + cfg + ); + } +} diff --git a/src/common/grpc/src/error.rs b/src/common/grpc/src/error.rs index 9c31c3dad2..55489b1468 100644 --- a/src/common/grpc/src/error.rs +++ b/src/common/grpc/src/error.rs @@ -49,6 +49,12 @@ pub enum Error { actual: String, backtrace: Backtrace, }, + + #[snafu(display("Failed to create gRPC channel, source: {}", source))] + CreateChannel { + source: tonic::transport::Error, + backtrace: Backtrace, + }, } impl ErrorExt for Error { @@ -61,9 +67,9 @@ impl ErrorExt for Error { Error::UnsupportedDfPlan { .. } | Error::UnsupportedDfExpr { .. } => { StatusCode::Unsupported } - Error::NewProjection { .. } | Error::DecodePhysicalPlanNode { .. } => { - StatusCode::Internal - } + Error::NewProjection { .. } + | Error::DecodePhysicalPlanNode { .. } + | Error::CreateChannel { .. } => StatusCode::Internal, } } @@ -75,3 +81,129 @@ impl ErrorExt for Error { self } } + +#[cfg(test)] +mod tests { + use snafu::OptionExt; + use snafu::ResultExt; + + use super::*; + + type StdResult = std::result::Result<(), E>; + + fn throw_none_option() -> Option { + None + } + + #[test] + fn test_empty_physical_plan_error() { + let e = throw_none_option() + .context(EmptyPhysicalPlanSnafu { name: "test" }) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::InvalidArguments); + } + + #[test] + fn test_empty_physical_expr_error() { + let e = throw_none_option() + .context(EmptyPhysicalExprSnafu { name: "test" }) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::InvalidArguments); + } + + #[test] + fn test_unsupported_df_plan_error() { + let e = throw_none_option() + .context(UnsupportedDfPlanSnafu { name: "test" }) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Unsupported); + } + + #[test] + fn test_unsupported_df_expr_error() { + let e = throw_none_option() + .context(UnsupportedDfExprSnafu { name: "test" }) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Unsupported); + } + + #[test] + fn test_missing_field_error() { + let e = throw_none_option() + .context(MissingFieldSnafu { field: "test" }) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::InvalidArguments); + } + + #[test] + fn test_new_projection_error() { + fn throw_df_error() -> StdResult { + Err(DataFusionError::NotImplemented("".to_string())) + } + + let e = throw_df_error().context(NewProjectionSnafu).err().unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } + + #[test] + fn test_decode_physical_plan_node_error() { + fn throw_decode_error() -> StdResult { + Err(DecodeError::new("test")) + } + + let e = throw_decode_error() + .context(DecodePhysicalPlanNodeSnafu) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } + + #[test] + fn test_type_mismatch_error() { + let e = throw_none_option() + .context(TypeMismatchSnafu { + column_name: "", + expected: "", + actual: "", + }) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::InvalidArguments); + } + + #[test] + fn test_create_channel_error() { + fn throw_tonic_error() -> StdResult { + tonic::transport::Endpoint::new("http//http").map(|_| ()) + } + + let e = throw_tonic_error() + .context(CreateChannelSnafu) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } +} diff --git a/src/common/grpc/src/lib.rs b/src/common/grpc/src/lib.rs index 4ba2ebd375..48abde3139 100644 --- a/src/common/grpc/src/lib.rs +++ b/src/common/grpc/src/lib.rs @@ -1,3 +1,4 @@ +pub mod channel_manager; pub mod error; pub mod physical; pub mod writer; diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 88f469b572..d9112adef1 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -42,7 +42,7 @@ store-api = { path = "../store-api" } table = { path = "../table" } table-engine = { path = "../table-engine", features = ["test"] } tokio = { version = "1.18", features = ["full"] } -tokio-stream = { version = "0.1.8", features = ["net"] } +tokio-stream = { version = "0.1", features = ["net"] } tonic = "0.8" tower = { version = "0.4", features = ["full"] } tower-http = { version = "0.3", features = ["full"] } diff --git a/src/meta-client/Cargo.toml b/src/meta-client/Cargo.toml new file mode 100644 index 0000000000..953bf539f4 --- /dev/null +++ b/src/meta-client/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "meta-client" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +api = { path = "../api" } +common-error = { path = "../common/error" } +common-grpc = { path = "../common/grpc" } +common-telemetry = { path = "../common/telemetry" } +etcd-client = "0.10" +rand = "0.8" +snafu = { version = "0.7", features = ["backtraces"] } +tokio = { version = "1.18", features = ["full"] } +tonic = "0.8" + +[dev-dependencies] +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/src/meta-client/examples/meta_client.rs b/src/meta-client/examples/meta_client.rs new file mode 100644 index 0000000000..1012f6dc03 --- /dev/null +++ b/src/meta-client/examples/meta_client.rs @@ -0,0 +1,89 @@ +use std::time::Duration; + +use api::v1::meta::region::Partition; +use api::v1::meta::CreateRequest; +use api::v1::meta::DeleteRangeRequest; +use api::v1::meta::PutRequest; +use api::v1::meta::RangeRequest; +use api::v1::meta::Region; +use api::v1::meta::RequestHeader; +use api::v1::meta::TableName; +use common_grpc::channel_manager::ChannelConfig; +use common_grpc::channel_manager::ChannelManager; +use meta_client::client::MetaClientBuilder; +use tracing::event; +use tracing::subscriber; +use tracing::Level; +use tracing_subscriber::FmtSubscriber; + +fn main() { + subscriber::set_global_default(FmtSubscriber::builder().finish()).unwrap(); + run(); +} + +#[tokio::main] +async fn run() { + let config = ChannelConfig::new() + .timeout(Duration::from_secs(3)) + .connect_timeout(Duration::from_secs(5)) + .tcp_nodelay(true); + let channel_manager = ChannelManager::with_config(config); + let mut meta_client = MetaClientBuilder::new() + .heartbeat_client(true) + .router_client(true) + .store_client(true) + .channel_manager(channel_manager) + .build(); + meta_client.start(&["127.0.0.1:3002"]).await.unwrap(); + // required only when the heartbeat_client is enabled + meta_client.ask_leader().await.unwrap(); + + let header = RequestHeader::new(0, 0); + + let p1 = Partition::new() + .column_list(vec![b"col_1".to_vec(), b"col_2".to_vec()]) + .value_list(vec![b"k1".to_vec(), b"k2".to_vec()]); + + let p2 = Partition::new() + .column_list(vec![b"col_1".to_vec(), b"col_2".to_vec()]) + .value_list(vec![b"Max1".to_vec(), b"Max2".to_vec()]); + + let table_name = TableName::new("test_catlog", "test_schema", "test_table"); + + let create_req = CreateRequest::new(header, table_name) + .add_region(Region::new(0, "test_region1", p1)) + .add_region(Region::new(1, "test_region2", p2)); + + let res = meta_client.create_route(create_req).await.unwrap(); + event!(Level::INFO, "create_route result: {:#?}", res); + + // put + let put_req = PutRequest { + key: b"key1".to_vec(), + value: b"value1".to_vec(), + prev_kv: true, + ..Default::default() + }; + let res = meta_client.put(put_req).await.unwrap(); + event!(Level::INFO, "put result: {:#?}", res); + + // get + let range_req = RangeRequest { + key: b"key1".to_vec(), + ..Default::default() + }; + let res = meta_client.range(range_req.clone()).await.unwrap(); + event!(Level::INFO, "get range result: {:#?}", res); + + // delete + let delete_range_req = DeleteRangeRequest { + key: b"key1".to_vec(), + ..Default::default() + }; + let res = meta_client.delete_range(delete_range_req).await.unwrap(); + event!(Level::INFO, "delete range result: {:#?}", res); + + // get none + let res = meta_client.range(range_req).await; + event!(Level::INFO, "get range result: {:#?}", res); +} diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs new file mode 100644 index 0000000000..f096093b82 --- /dev/null +++ b/src/meta-client/src/client.rs @@ -0,0 +1,332 @@ +mod heartbeat; +mod load_balance; +mod router; +mod store; + +use api::v1::meta::CreateRequest; +use api::v1::meta::CreateResponse; +use api::v1::meta::DeleteRangeRequest; +use api::v1::meta::DeleteRangeResponse; +use api::v1::meta::PutRequest; +use api::v1::meta::PutResponse; +use api::v1::meta::RangeRequest; +use api::v1::meta::RangeResponse; +use api::v1::meta::RouteRequest; +use api::v1::meta::RouteResponse; +use common_grpc::channel_manager::ChannelConfig; +use common_grpc::channel_manager::ChannelManager; +use common_telemetry::info; +use heartbeat::Client as HeartbeatClient; +use router::Client as RouterClient; +use snafu::OptionExt; +use store::Client as StoreClient; + +use crate::error; +use crate::error::Result; + +#[derive(Clone, Debug, Default)] +pub struct MetaClientBuilder { + heartbeat_client: bool, + router_client: bool, + store_client: bool, + channel_manager: Option, +} + +impl MetaClientBuilder { + pub fn new() -> Self { + MetaClientBuilder::default() + } + + pub fn heartbeat_client(self, enabled: bool) -> Self { + Self { + heartbeat_client: enabled, + ..self + } + } + + pub fn router_client(self, enabled: bool) -> Self { + Self { + router_client: enabled, + ..self + } + } + + pub fn store_client(self, enabled: bool) -> Self { + Self { + store_client: enabled, + ..self + } + } + + pub fn channel_manager(self, channel_manager: ChannelManager) -> Self { + Self { + channel_manager: Some(channel_manager), + ..self + } + } + + pub fn build(self) -> MetaClient { + let mut meta_client = if let Some(mgr) = self.channel_manager { + MetaClient { + channel_manager: mgr, + ..Default::default() + } + } else { + Default::default() + }; + + if let (false, false, false) = + (self.heartbeat_client, self.router_client, self.store_client) + { + panic!("At least one client needs to be enabled.") + } + + let mgr = meta_client.channel_manager.clone(); + + if self.heartbeat_client { + meta_client.heartbeat_client = Some(HeartbeatClient::new(mgr.clone())); + } + if self.router_client { + meta_client.router_client = Some(RouterClient::new(mgr.clone())); + } + if self.store_client { + meta_client.store_client = Some(StoreClient::new(mgr)); + } + + meta_client + } +} + +#[derive(Clone, Debug, Default)] +pub struct MetaClient { + channel_manager: ChannelManager, + heartbeat_client: Option, + router_client: Option, + store_client: Option, +} + +impl MetaClient { + pub async fn start(&mut self, urls: A) -> Result<()> + where + U: AsRef, + A: AsRef<[U]> + Clone, + { + info!("MetaClient channel config: {:?}", self.channel_config()); + + if let Some(heartbeat_client) = &mut self.heartbeat_client { + heartbeat_client.start(urls.clone()).await?; + info!("Heartbeat client started"); + } + if let Some(router_client) = &mut self.router_client { + router_client.start(urls.clone()).await?; + info!("Router client started"); + } + if let Some(store_client) = &mut self.store_client { + store_client.start(urls).await?; + info!("Store client started"); + } + + Ok(()) + } + + pub async fn ask_leader(&self) -> Result<()> { + self.heartbeat_client() + .context(error::NotStartedSnafu { + name: "heartbeat_client", + })? + .ask_leader() + .await + } + + pub async fn refresh_members(&mut self) { + todo!() + } + + pub async fn create_route(&self, req: CreateRequest) -> Result { + self.router_client() + .context(error::NotStartedSnafu { + name: "route_client", + })? + .create(req) + .await + } + + /// Fetch routing information for tables. The smallest unit is the complete + /// routing information(all regions) of a table. + /// + /// ```text + /// table_1 + /// table_name + /// table_schema + /// regions + /// region_1 + /// mutate_endpoint + /// select_endpoint_1, select_endpoint_2 + /// region_2 + /// mutate_endpoint + /// select_endpoint_1, select_endpoint_2, select_endpoint_3 + /// region_xxx + /// table_2 + /// ... + /// ``` + /// + pub async fn route(&self, req: RouteRequest) -> Result { + self.router_client() + .context(error::NotStartedSnafu { + name: "route_client", + })? + .route(req) + .await + } + + /// Range gets the keys in the range from the key-value store. + pub async fn range(&self, req: RangeRequest) -> Result { + self.store_client() + .context(error::NotStartedSnafu { + name: "store_client", + })? + .range(req) + .await + } + + /// Put puts the given key into the key-value store. + pub async fn put(&self, req: PutRequest) -> Result { + self.store_client() + .context(error::NotStartedSnafu { + name: "store_client", + })? + .put(req) + .await + } + + /// DeleteRange deletes the given range from the key-value store. + pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result { + self.store_client() + .context(error::NotStartedSnafu { + name: "store_client", + })? + .delete_range(req) + .await + } + + #[inline] + pub fn heartbeat_client(&self) -> Option { + self.heartbeat_client.clone() + } + + #[inline] + pub fn router_client(&self) -> Option { + self.router_client.clone() + } + + #[inline] + pub fn store_client(&self) -> Option { + self.store_client.clone() + } + + pub fn channel_config(&self) -> Option { + self.channel_manager.config() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_meta_client_builder() { + let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; + + let mut meta_client = MetaClientBuilder::new().heartbeat_client(true).build(); + assert!(meta_client.heartbeat_client().is_some()); + assert!(meta_client.router_client().is_none()); + assert!(meta_client.store_client().is_none()); + meta_client.start(urls).await.unwrap(); + assert!(meta_client.heartbeat_client().unwrap().is_started().await); + + let mut meta_client = MetaClientBuilder::new().router_client(true).build(); + assert!(meta_client.heartbeat_client().is_none()); + assert!(meta_client.router_client().is_some()); + assert!(meta_client.store_client().is_none()); + meta_client.start(urls).await.unwrap(); + assert!(meta_client.router_client().unwrap().is_started().await); + + let mut meta_client = MetaClientBuilder::new().store_client(true).build(); + assert!(meta_client.heartbeat_client().is_none()); + assert!(meta_client.router_client().is_none()); + assert!(meta_client.store_client().is_some()); + meta_client.start(urls).await.unwrap(); + assert!(meta_client.store_client().unwrap().is_started().await); + + let mut meta_client = MetaClientBuilder::new() + .heartbeat_client(true) + .router_client(true) + .store_client(true) + .build(); + assert!(meta_client.heartbeat_client().is_some()); + assert!(meta_client.router_client().is_some()); + assert!(meta_client.store_client().is_some()); + meta_client.start(urls).await.unwrap(); + assert!(meta_client.heartbeat_client().unwrap().is_started().await); + assert!(meta_client.router_client().unwrap().is_started().await); + assert!(meta_client.store_client().unwrap().is_started().await); + } + + #[tokio::test] + async fn test_not_start_heartbeat_client() { + let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; + + let mut meta_client = MetaClientBuilder::new() + .router_client(true) + .store_client(true) + .build(); + + meta_client.start(urls).await.unwrap(); + + let res = meta_client.ask_leader().await; + + assert!(matches!(res.err(), Some(error::Error::NotStarted { .. }))); + } + + #[tokio::test] + async fn test_not_start_router_client() { + let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; + + let mut meta_client = MetaClientBuilder::new() + .heartbeat_client(true) + .store_client(true) + .build(); + + meta_client.start(urls).await.unwrap(); + + let res = meta_client.create_route(CreateRequest::default()).await; + + assert!(matches!(res.err(), Some(error::Error::NotStarted { .. }))); + } + + #[tokio::test] + async fn test_not_start_store_client() { + let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; + + let mut meta_client = MetaClientBuilder::new() + .heartbeat_client(true) + .router_client(true) + .build(); + + meta_client.start(urls).await.unwrap(); + + let res = meta_client.put(PutRequest::default()).await; + + assert!(matches!(res.err(), Some(error::Error::NotStarted { .. }))); + } + + #[should_panic] + #[test] + fn test_enable_at_least_one_client() { + let _ = MetaClientBuilder::new() + .heartbeat_client(false) + .router_client(false) + .store_client(false) + .build(); + } +} diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs new file mode 100644 index 0000000000..a6e3188f26 --- /dev/null +++ b/src/meta-client/src/client/heartbeat.rs @@ -0,0 +1,193 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use api::v1::meta::heartbeat_client::HeartbeatClient; +use api::v1::meta::AskLeaderRequest; +use api::v1::meta::RequestHeader; +use common_grpc::channel_manager::ChannelManager; +use common_telemetry::debug; +use snafu::ensure; +use snafu::OptionExt; +use snafu::ResultExt; +use tokio::sync::RwLock; +use tonic::transport::Channel; + +use crate::error; +use crate::error::Result; + +#[derive(Clone, Debug)] +pub struct Client { + inner: Arc>, +} + +impl Client { + pub fn new(channel_manager: ChannelManager) -> Self { + let inner = Inner { + channel_manager, + peers: HashSet::default(), + leader: None, + }; + + Self { + inner: Arc::new(RwLock::new(inner)), + } + } + + pub async fn start(&mut self, urls: A) -> Result<()> + where + U: AsRef, + A: AsRef<[U]>, + { + let mut inner = self.inner.write().await; + inner.start(urls).await + } + + pub async fn ask_leader(&mut self) -> Result<()> { + let mut inner = self.inner.write().await; + inner.ask_leader().await + } + + pub async fn is_started(&self) -> bool { + let inner = self.inner.read().await; + inner.is_started() + } + + // TODO(jiachun) send heartbeat +} + +#[derive(Debug)] +struct Inner { + channel_manager: ChannelManager, + peers: HashSet, + leader: Option, +} + +impl Inner { + async fn start(&mut self, urls: A) -> Result<()> + where + U: AsRef, + A: AsRef<[U]>, + { + ensure!( + !self.is_started(), + error::IllegalGrpcClientStateSnafu { + err_msg: "Heartbeat client already started" + } + ); + + self.peers = urls + .as_ref() + .iter() + .map(|url| url.as_ref().to_string()) + .collect(); + + Ok(()) + } + + async fn ask_leader(&mut self) -> Result<()> { + ensure!( + self.is_started(), + error::IllegalGrpcClientStateSnafu { + err_msg: "Heartbeat client not start" + } + ); + + // TODO(jiachun): set cluster_id and member_id + let header = RequestHeader::new(0, 0); + let mut leader = None; + for addr in &self.peers { + let req = AskLeaderRequest::new(header.clone()); + let mut client = self.make_client(addr)?; + match client.ask_leader(req).await { + Ok(res) => { + if let Some(endpoint) = res.into_inner().leader { + leader = Some(endpoint.addr); + break; + } + } + Err(status) => { + debug!("Failed to ask leader from: {}, {}", addr, status); + } + } + } + self.leader = Some(leader.context(error::AskLeaderSnafu)?); + Ok(()) + } + + fn make_client(&self, addr: impl AsRef) -> Result> { + let channel = self + .channel_manager + .get(addr) + .context(error::CreateChannelSnafu)?; + + Ok(HeartbeatClient::new(channel)) + } + + #[inline] + fn is_started(&self) -> bool { + !self.peers.is_empty() + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn test_start_client() { + let mut client = Client::new(ChannelManager::default()); + + assert!(!client.is_started().await); + + client + .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) + .await + .unwrap(); + + assert!(client.is_started().await); + } + + #[tokio::test] + async fn test_already_start() { + let mut client = Client::new(ChannelManager::default()); + client + .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) + .await + .unwrap(); + + assert!(client.is_started().await); + + let res = client.start(&["127.0.0.1:1002"]).await; + + assert!(res.is_err()); + + assert!(matches!( + res.err(), + Some(error::Error::IllegalGrpcClientState { .. }) + )); + } + + #[tokio::test] + async fn test_start_with_duplicate_peers() { + let mut client = Client::new(ChannelManager::default()); + client + .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"]) + .await + .unwrap(); + + assert_eq!(1, client.inner.write().await.peers.len()); + } + + #[tokio::test] + async fn test_ask_leader_unavailable() { + let mut client = Client::new(ChannelManager::default()); + client.start(&["unavailable_peer"]).await.unwrap(); + + let res = client.ask_leader().await; + + assert!(res.is_err()); + + let err = res.err().unwrap(); + assert!(matches!(err, error::Error::AskLeader { .. })); + } +} diff --git a/src/meta-client/src/client/load_balance.rs b/src/meta-client/src/client/load_balance.rs new file mode 100644 index 0000000000..3cf55d18ed --- /dev/null +++ b/src/meta-client/src/client/load_balance.rs @@ -0,0 +1,34 @@ +use rand::Rng; + +pub fn random_get(len: usize, func: F) -> Option +where + F: FnOnce(usize) -> Option, +{ + if len == 0 { + return None; + } + + let mut rng = rand::thread_rng(); + let i = rng.gen_range(0..len); + + func(i) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_random_get() { + for i in 1..100 { + let res = random_get(i, |index| Some(2 * index)); + assert!(res.unwrap() < 2 * i); + } + } + + #[test] + fn test_random_get_none() { + let res = random_get(0, |index| Some(2 * index)); + assert!(res.is_none()); + } +} diff --git a/src/meta-client/src/client/router.rs b/src/meta-client/src/client/router.rs new file mode 100644 index 0000000000..b048cecfd4 --- /dev/null +++ b/src/meta-client/src/client/router.rs @@ -0,0 +1,218 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use api::v1::meta::router_client::RouterClient; +use api::v1::meta::CreateRequest; +use api::v1::meta::CreateResponse; +use api::v1::meta::RouteRequest; +use api::v1::meta::RouteResponse; +use common_grpc::channel_manager::ChannelManager; +use snafu::ensure; +use snafu::OptionExt; +use snafu::ResultExt; +use tokio::sync::RwLock; +use tonic::transport::Channel; + +use crate::client::load_balance as lb; +use crate::error; +use crate::error::Result; + +#[derive(Clone, Debug)] +pub struct Client { + inner: Arc>, +} + +impl Client { + pub fn new(channel_manager: ChannelManager) -> Self { + let inner = Inner { + channel_manager, + peers: vec![], + }; + + Self { + inner: Arc::new(RwLock::new(inner)), + } + } + + pub async fn start(&mut self, urls: A) -> Result<()> + where + U: AsRef, + A: AsRef<[U]>, + { + let mut inner = self.inner.write().await; + inner.start(urls).await + } + + pub async fn is_started(&self) -> bool { + let inner = self.inner.read().await; + inner.is_started() + } + + pub async fn create(&self, req: CreateRequest) -> Result { + let inner = self.inner.read().await; + inner.create(req).await + } + + pub async fn route(&self, req: RouteRequest) -> Result { + let inner = self.inner.read().await; + inner.route(req).await + } +} + +#[derive(Debug)] +struct Inner { + channel_manager: ChannelManager, + peers: Vec, +} + +impl Inner { + async fn start(&mut self, urls: A) -> Result<()> + where + U: AsRef, + A: AsRef<[U]>, + { + ensure!( + !self.is_started(), + error::IllegalGrpcClientStateSnafu { + err_msg: "Router client already started", + } + ); + + self.peers = urls + .as_ref() + .iter() + .map(|url| url.as_ref().to_string()) + .collect::>() + .drain() + .collect::>(); + + Ok(()) + } + + async fn route(&self, req: RouteRequest) -> Result { + let mut client = self.random_client()?; + + let res = client.route(req).await.context(error::TonicStatusSnafu)?; + + Ok(res.into_inner()) + } + + async fn create(&self, req: CreateRequest) -> Result { + let mut client = self.random_client()?; + + let res = client.create(req).await.context(error::TonicStatusSnafu)?; + + Ok(res.into_inner()) + } + + fn random_client(&self) -> Result> { + let len = self.peers.len(); + let peer = lb::random_get(len, |i| Some(&self.peers[i])).context( + error::IllegalGrpcClientStateSnafu { + err_msg: "Empty peers, router client may not start yet", + }, + )?; + + self.make_client(peer) + } + + fn make_client(&self, addr: impl AsRef) -> Result> { + let channel = self + .channel_manager + .get(addr) + .context(error::CreateChannelSnafu)?; + + Ok(RouterClient::new(channel)) + } + + #[inline] + fn is_started(&self) -> bool { + !self.peers.is_empty() + } +} + +#[cfg(test)] +mod test { + use api::v1::meta::{RequestHeader, TableName}; + + use super::*; + + #[tokio::test] + async fn test_start_client() { + let mut client = Client::new(ChannelManager::default()); + + assert!(!client.is_started().await); + + client + .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) + .await + .unwrap(); + + assert!(client.is_started().await); + } + + #[tokio::test] + async fn test_already_start() { + let mut client = Client::new(ChannelManager::default()); + client + .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) + .await + .unwrap(); + + assert!(client.is_started().await); + + let res = client.start(&["127.0.0.1:1002"]).await; + + assert!(res.is_err()); + + assert!(matches!( + res.err(), + Some(error::Error::IllegalGrpcClientState { .. }) + )); + } + + #[tokio::test] + async fn test_start_with_duplicate_peers() { + let mut client = Client::new(ChannelManager::default()); + client + .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"]) + .await + .unwrap(); + + assert_eq!(1, client.inner.write().await.peers.len()); + } + + #[tokio::test] + async fn test_create_unavailable() { + let mut client = Client::new(ChannelManager::default()); + client.start(&["unavailable_peer"]).await.unwrap(); + + let header = RequestHeader::new(0, 0); + let req = CreateRequest::new(header, TableName::default()); + let res = client.create(req).await; + + assert!(res.is_err()); + + let err = res.err().unwrap(); + assert!( + matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable) + ); + } + + #[tokio::test] + async fn test_route_unavailable() { + let mut client = Client::new(ChannelManager::default()); + client.start(&["unavailable_peer"]).await.unwrap(); + + let header = RequestHeader::new(0, 0); + let req = RouteRequest::new(header); + let res = client.route(req).await; + + assert!(res.is_err()); + + let err = res.err().unwrap(); + assert!( + matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable) + ); + } +} diff --git a/src/meta-client/src/client/store.rs b/src/meta-client/src/client/store.rs new file mode 100644 index 0000000000..f6a587a229 --- /dev/null +++ b/src/meta-client/src/client/store.rs @@ -0,0 +1,259 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use api::v1::meta::store_client::StoreClient; +use api::v1::meta::DeleteRangeRequest; +use api::v1::meta::DeleteRangeResponse; +use api::v1::meta::PutRequest; +use api::v1::meta::PutResponse; +use api::v1::meta::RangeRequest; +use api::v1::meta::RangeResponse; +use common_grpc::channel_manager::ChannelManager; +use snafu::ensure; +use snafu::OptionExt; +use snafu::ResultExt; +use tokio::sync::RwLock; +use tonic::transport::Channel; + +use crate::client::load_balance as lb; +use crate::error; +use crate::error::Result; + +#[derive(Clone, Debug)] +pub struct Client { + inner: Arc>, +} + +impl Client { + pub fn new(channel_manager: ChannelManager) -> Self { + let inner = Inner { + channel_manager, + peers: vec![], + }; + + Self { + inner: Arc::new(RwLock::new(inner)), + } + } + + pub async fn start(&mut self, urls: A) -> Result<()> + where + U: AsRef, + A: AsRef<[U]>, + { + let mut inner = self.inner.write().await; + inner.start(urls).await + } + + pub async fn is_started(&self) -> bool { + let inner = self.inner.read().await; + inner.is_started() + } + + pub async fn range(&self, req: RangeRequest) -> Result { + let inner = self.inner.read().await; + inner.range(req).await + } + + pub async fn put(&self, req: PutRequest) -> Result { + let inner = self.inner.read().await; + inner.put(req).await + } + + pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result { + let inner = self.inner.read().await; + inner.delete_range(req).await + } +} + +#[derive(Debug)] +struct Inner { + channel_manager: ChannelManager, + peers: Vec, +} + +impl Inner { + async fn start(&mut self, urls: A) -> Result<()> + where + U: AsRef, + A: AsRef<[U]>, + { + ensure!( + !self.is_started(), + error::IllegalGrpcClientStateSnafu { + err_msg: "Store client already started", + } + ); + + self.peers = urls + .as_ref() + .iter() + .map(|url| url.as_ref().to_string()) + .collect::>() + .drain() + .collect::>(); + + Ok(()) + } + + async fn range(&self, req: RangeRequest) -> Result { + let mut client = self.random_client()?; + + let res = client.range(req).await.context(error::TonicStatusSnafu)?; + + Ok(res.into_inner()) + } + + async fn put(&self, req: PutRequest) -> Result { + let mut client = self.random_client()?; + + let res = client.put(req).await.context(error::TonicStatusSnafu)?; + + Ok(res.into_inner()) + } + + async fn delete_range(&self, req: DeleteRangeRequest) -> Result { + let mut client = self.random_client()?; + + let res = client + .delete_range(req) + .await + .context(error::TonicStatusSnafu)?; + + Ok(res.into_inner()) + } + + fn random_client(&self) -> Result> { + let len = self.peers.len(); + let peer = lb::random_get(len, |i| Some(&self.peers[i])).context( + error::IllegalGrpcClientStateSnafu { + err_msg: "Empty peers, store client may not start yet", + }, + )?; + + self.make_client(peer) + } + + fn make_client(&self, addr: impl AsRef) -> Result> { + let channel = self + .channel_manager + .get(addr) + .context(error::CreateChannelSnafu)?; + + Ok(StoreClient::new(channel)) + } + + #[inline] + fn is_started(&self) -> bool { + !self.peers.is_empty() + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn test_start_client() { + let mut client = Client::new(ChannelManager::default()); + + assert!(!client.is_started().await); + + client + .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) + .await + .unwrap(); + + assert!(client.is_started().await); + } + + #[tokio::test] + async fn test_already_start() { + let mut client = Client::new(ChannelManager::default()); + client + .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) + .await + .unwrap(); + + assert!(client.is_started().await); + + let res = client.start(&["127.0.0.1:1002"]).await; + + assert!(res.is_err()); + + assert!(matches!( + res.err(), + Some(error::Error::IllegalGrpcClientState { .. }) + )); + } + + #[tokio::test] + async fn test_start_with_duplicate_peers() { + let mut client = Client::new(ChannelManager::default()); + client + .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"]) + .await + .unwrap(); + + assert_eq!(1, client.inner.write().await.peers.len()); + } + + #[tokio::test] + async fn test_range_unavailable() { + let mut client = Client::new(ChannelManager::default()); + client.start(&["unknow_peer"]).await.unwrap(); + + let req = RangeRequest { + key: b"key1".to_vec(), + ..Default::default() + }; + let res = client.range(req).await; + + assert!(res.is_err()); + + let err = res.err().unwrap(); + assert!( + matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable) + ); + } + + #[tokio::test] + async fn test_put_unavailable() { + let mut client = Client::new(ChannelManager::default()); + client.start(&["unavailable_peer"]).await.unwrap(); + + let req = PutRequest { + key: b"key1".to_vec(), + value: b"value1".to_vec(), + prev_kv: true, + ..Default::default() + }; + let res = client.put(req).await; + + assert!(res.is_err()); + + let err = res.err().unwrap(); + assert!( + matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable) + ); + } + + #[tokio::test] + async fn test_delete_range_unavailable() { + let mut client = Client::new(ChannelManager::default()); + client.start(&["unavailable_peer"]).await.unwrap(); + + let req = DeleteRangeRequest { + key: b"key1".to_vec(), + ..Default::default() + }; + let res = client.delete_range(req).await; + + assert!(res.is_err()); + + let err = res.err().unwrap(); + assert!( + matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable) + ); + } +} diff --git a/src/meta-client/src/error.rs b/src/meta-client/src/error.rs new file mode 100644 index 0000000000..0aa215ffb6 --- /dev/null +++ b/src/meta-client/src/error.rs @@ -0,0 +1,137 @@ +use common_error::prelude::*; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Failed to connect to {}, source: {}", url, source))] + ConnectFailed { + url: String, + source: tonic::transport::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Illegal GRPC client state: {}", err_msg))] + IllegalGrpcClientState { + err_msg: String, + backtrace: Backtrace, + }, + + #[snafu(display("Tonic internal error, source: {}", source))] + TonicStatus { + source: tonic::Status, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to ask leader from all endpoints"))] + AskLeader { backtrace: Backtrace }, + + #[snafu(display("Failed to create gRPC channel, source: {}", source))] + CreateChannel { + #[snafu(backtrace)] + source: common_grpc::error::Error, + }, + + #[snafu(display("{} not started", name))] + NotStarted { name: String, backtrace: Backtrace }, +} + +#[allow(dead_code)] +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn status_code(&self) -> StatusCode { + match self { + Error::ConnectFailed { .. } + | Error::IllegalGrpcClientState { .. } + | Error::TonicStatus { .. } + | Error::AskLeader { .. } + | Error::NotStarted { .. } + | Error::CreateChannel { .. } => StatusCode::Internal, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + type StdResult = std::result::Result<(), E>; + + fn throw_none_option() -> Option { + None + } + + #[test] + fn test_connect_failed_error() { + fn throw_tonic_error() -> StdResult { + tonic::transport::Endpoint::new("http//http").map(|_| ()) + } + + let e = throw_tonic_error() + .context(ConnectFailedSnafu { url: "" }) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } + + #[test] + fn test_illegal_grpc_client_state_error() { + let e = throw_none_option() + .context(IllegalGrpcClientStateSnafu { err_msg: "" }) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } + + #[test] + fn test_tonic_status_error() { + fn throw_tonic_status_error() -> StdResult { + Err(tonic::Status::new(tonic::Code::Aborted, "")) + } + + let e = throw_tonic_status_error() + .context(TonicStatusSnafu) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } + + #[test] + fn test_ask_leader_error() { + let e = throw_none_option().context(AskLeaderSnafu).err().unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } + + #[test] + fn test_create_channel_error() { + fn throw_common_grpc_error() -> StdResult { + tonic::transport::Endpoint::new("http//http") + .map(|_| ()) + .context(common_grpc::error::CreateChannelSnafu) + } + + let e = throw_common_grpc_error() + .context(CreateChannelSnafu) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } +} diff --git a/src/meta-client/src/lib.rs b/src/meta-client/src/lib.rs new file mode 100644 index 0000000000..610c471ca3 --- /dev/null +++ b/src/meta-client/src/lib.rs @@ -0,0 +1,2 @@ +pub mod client; +mod error; diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml new file mode 100644 index 0000000000..d08cfc4aed --- /dev/null +++ b/src/meta-srv/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "meta-srv" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +api = { path = "../api" } +async-trait = "0.1" +common-base = { path = "../common/base" } +common-error = { path = "../common/error" } +common-telemetry = { path = "../common/telemetry" } +etcd-client = "0.10" +futures = "0.3" +http-body = "0.4" +snafu = { version = "0.7", features = ["backtraces"] } +serde = "1.0" +tokio = { version = "1.0", features = ["full"] } +tokio-stream = { version = "0.1", features = ["net"] } +tonic = "0.8" +url = "2.3" + +[dev-dependencies] +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/src/meta-srv/examples/kv_store.rs b/src/meta-srv/examples/kv_store.rs new file mode 100644 index 0000000000..707c1df152 --- /dev/null +++ b/src/meta-srv/examples/kv_store.rs @@ -0,0 +1,48 @@ +use api::v1::meta::DeleteRangeRequest; +use api::v1::meta::PutRequest; +use api::v1::meta::RangeRequest; +use meta_srv::service::store::etcd::EtcdStore; +use tracing::event; +use tracing::subscriber; +use tracing::Level; +use tracing_subscriber::FmtSubscriber; + +fn main() { + subscriber::set_global_default(FmtSubscriber::builder().finish()).unwrap(); + run(); +} + +#[tokio::main] +async fn run() { + let kv_store = EtcdStore::with_endpoints(["127.0.0.1:2380"]).await.unwrap(); + + // put + let put_req = PutRequest { + key: b"key1".to_vec(), + value: b"value1".to_vec(), + prev_kv: true, + ..Default::default() + }; + let res = kv_store.put(put_req).await; + event!(Level::INFO, "put result: {:#?}", res); + + // get + let range_req = RangeRequest { + key: b"key1".to_vec(), + ..Default::default() + }; + let res = kv_store.range(range_req.clone()).await; + event!(Level::INFO, "get range result: {:#?}", res); + + // delete + let delete_range_req = DeleteRangeRequest { + key: b"key1".to_vec(), + ..Default::default() + }; + let res = kv_store.delete_range(delete_range_req).await; + event!(Level::INFO, "delete range result: {:#?}", res); + + // get none + let res = kv_store.range(range_req).await; + event!(Level::INFO, "get range result: {:#?}", res); +} diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs new file mode 100644 index 0000000000..7eae327227 --- /dev/null +++ b/src/meta-srv/src/bootstrap.rs @@ -0,0 +1,37 @@ +use api::v1::meta::heartbeat_server::HeartbeatServer; +use api::v1::meta::router_server::RouterServer; +use api::v1::meta::store_server::StoreServer; +use snafu::ResultExt; +use tokio::net::TcpListener; +use tokio_stream::wrappers::TcpListenerStream; + +use crate::error; +use crate::metasrv::MetaSrv; +use crate::metasrv::MetaSrvOptions; +use crate::service::admin; +use crate::service::store::etcd::EtcdStore; + +// Bootstrap the rpc server to serve incoming request +pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> crate::Result<()> { + let kv_store = EtcdStore::with_endpoints([&opts.store_addr]).await?; + let meta_srv = MetaSrv::new(kv_store); + + let listener = TcpListener::bind(&opts.server_addr) + .await + .context(error::TcpBindSnafu { + addr: &opts.server_addr, + })?; + let listener = TcpListenerStream::new(listener); + + tonic::transport::Server::builder() + .accept_http1(true) // for admin services + .add_service(HeartbeatServer::new(meta_srv.clone())) + .add_service(RouterServer::new(meta_srv.clone())) + .add_service(StoreServer::new(meta_srv.clone())) + .add_service(admin::make_admin_service(meta_srv.clone())) + .serve_with_incoming(listener) + .await + .context(error::StartGrpcSnafu)?; + + Ok(()) +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs new file mode 100644 index 0000000000..e10bf905c1 --- /dev/null +++ b/src/meta-srv/src/error.rs @@ -0,0 +1,145 @@ +use common_error::prelude::*; +use tonic::{Code, Status}; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Error stream request next is None"))] + StreamNone { backtrace: Backtrace }, + + #[snafu(display("Empty key is not allowed"))] + EmptyKey { backtrace: Backtrace }, + + #[snafu(display("Failed to execute via Etcd, source: {}", source))] + EtcdFailed { + source: etcd_client::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to connect to Etcd, source: {}", source))] + ConnectEtcd { + source: etcd_client::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to bind address {}, source: {}", addr, source))] + TcpBind { + addr: String, + source: std::io::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to start gRPC server, source: {}", source))] + StartGrpc { + source: tonic::transport::Error, + backtrace: Backtrace, + }, +} + +pub type Result = std::result::Result; + +impl From for Status { + fn from(err: Error) -> Self { + Status::new(Code::Internal, err.to_string()) + } +} + +impl ErrorExt for Error { + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn status_code(&self) -> StatusCode { + match self { + Error::StreamNone { .. } + | Error::EtcdFailed { .. } + | Error::ConnectEtcd { .. } + | Error::TcpBind { .. } + | Error::StartGrpc { .. } => StatusCode::Internal, + Error::EmptyKey { .. } => StatusCode::InvalidArguments, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + type StdResult = std::result::Result<(), E>; + + fn throw_none_option() -> Option { + None + } + + fn throw_etcd_client_error() -> StdResult { + Err(etcd_client::Error::InvalidArgs("".to_string())) + } + + #[test] + fn test_stream_node_error() { + let e = throw_none_option().context(StreamNoneSnafu).err().unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } + + #[test] + fn test_empty_key_error() { + let e = throw_none_option().context(EmptyKeySnafu).err().unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::InvalidArguments); + } + + #[test] + fn test_etcd_failed_error() { + let e = throw_etcd_client_error() + .context(EtcdFailedSnafu) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } + + #[test] + fn test_connect_etcd_error() { + let e = throw_etcd_client_error() + .context(ConnectEtcdSnafu) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } + + #[test] + fn test_tcp_bind_error() { + fn throw_std_error() -> StdResult { + Err(std::io::ErrorKind::NotFound.into()) + } + let e = throw_std_error() + .context(TcpBindSnafu { addr: "127.0.0.1" }) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } + + #[test] + fn test_start_grpc_error() { + fn throw_tonic_error() -> StdResult { + tonic::transport::Endpoint::new("http//http").map(|_| ()) + } + + let e = throw_tonic_error().context(StartGrpcSnafu).err().unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } +} diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs new file mode 100644 index 0000000000..e42b7ced31 --- /dev/null +++ b/src/meta-srv/src/lib.rs @@ -0,0 +1,6 @@ +pub mod bootstrap; +pub mod error; +pub mod metasrv; +pub mod service; + +pub use crate::error::Result; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs new file mode 100644 index 0000000000..f3e07cb5e5 --- /dev/null +++ b/src/meta-srv/src/metasrv.rs @@ -0,0 +1,34 @@ +use serde::Deserialize; +use serde::Serialize; + +use crate::service::store::kv::KvStoreRef; + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct MetaSrvOptions { + pub server_addr: String, + pub store_addr: String, +} + +impl Default for MetaSrvOptions { + fn default() -> Self { + Self { + server_addr: "0.0.0.0:3002".to_string(), + store_addr: "0.0.0.0:2380".to_string(), + } + } +} + +#[derive(Clone)] +pub struct MetaSrv { + kv_store: KvStoreRef, +} + +impl MetaSrv { + pub fn new(kv_store: KvStoreRef) -> Self { + Self { kv_store } + } + + pub fn kv_store(&self) -> KvStoreRef { + self.kv_store.clone() + } +} diff --git a/src/meta-srv/src/service.rs b/src/meta-srv/src/service.rs new file mode 100644 index 0000000000..0e8fabd8da --- /dev/null +++ b/src/meta-srv/src/service.rs @@ -0,0 +1,13 @@ +use std::pin::Pin; + +use futures::Stream; +use tonic::Response; +use tonic::Status; + +pub mod admin; +mod heartbeat; +pub mod router; +pub mod store; + +pub type GrpcResult = std::result::Result, Status>; +pub type GrpcStream = Pin> + Send + Sync + 'static>>; diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs new file mode 100644 index 0000000000..510bcc8fdc --- /dev/null +++ b/src/meta-srv/src/service/admin.rs @@ -0,0 +1,248 @@ +mod health; + +use std::collections::HashMap; +use std::convert::Infallible; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; + +use tonic::body::BoxBody; +use tonic::codegen::empty_body; +use tonic::codegen::http; +use tonic::codegen::BoxFuture; +use tonic::codegen::Service; +use tonic::transport::NamedService; + +use crate::metasrv::MetaSrv; + +pub fn make_admin_service(_: MetaSrv) -> Admin { + let router = Router::new().route("/health", health::HealthHandler); + + let router = Router::nest("/admin", router); + + Admin::new(router) +} + +#[async_trait::async_trait] +pub trait HttpHandler: Send + Sync { + async fn handle( + &self, + path: &str, + params: &HashMap, + ) -> crate::Result>; +} + +#[derive(Clone)] +pub struct Admin +where + Self: Send, +{ + router: Arc, +} + +impl Admin { + pub fn new(router: Router) -> Self { + Self { + router: Arc::new(router), + } + } +} + +impl NamedService for Admin { + const NAME: &'static str = "admin"; +} + +impl Service> for Admin +where + T: Send, +{ + type Response = http::Response; + type Error = Infallible; + type Future = BoxFuture; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: http::Request) -> Self::Future { + let router = self.router.clone(); + let query_params = req + .uri() + .query() + .map(|q| { + url::form_urlencoded::parse(q.as_bytes()) + .into_owned() + .collect() + }) + .unwrap_or_else(HashMap::new); + let path = req.uri().path().to_owned(); + Box::pin(async move { router.call(&path, query_params).await }) + } +} + +#[derive(Default)] +pub struct Router { + handlers: HashMap>, +} + +impl Router { + pub fn new() -> Self { + Self { + handlers: HashMap::default(), + } + } + + pub fn nest(path: &str, router: Router) -> Self { + check_path(path); + + let handlers = router + .handlers + .into_iter() + .map(|(url, handler)| (format!("{path}{url}"), handler)) + .collect(); + + Self { handlers } + } + + pub fn route(mut self, path: &str, handler: impl HttpHandler + 'static) -> Self { + check_path(path); + + self.handlers.insert(path.to_owned(), Box::new(handler)); + + self + } + + pub async fn call( + &self, + path: &str, + params: HashMap, + ) -> Result, Infallible> { + let handler = match self.handlers.get(path) { + Some(handler) => handler, + None => { + return Ok(http::Response::builder() + .status(http::StatusCode::NOT_FOUND) + .body(empty_body()) + .unwrap()) + } + }; + + let res = match handler.handle(path, ¶ms).await { + Ok(res) => res.map(boxed), + Err(e) => http::Response::builder() + .status(http::StatusCode::INTERNAL_SERVER_ERROR) + .body(boxed(e.to_string())) + .unwrap(), + }; + + Ok(res) + } +} + +fn check_path(path: &str) { + if path.is_empty() || !path.starts_with('/') { + panic!("paths must start with a `/`") + } +} + +fn boxed(body: String) -> BoxBody { + use http_body::Body; + + body.map_err(|_| panic!("")).boxed_unsync() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::error; + + struct MockOkHandler; + + #[async_trait::async_trait] + impl HttpHandler for MockOkHandler { + async fn handle( + &self, + _: &str, + _: &HashMap, + ) -> crate::Result> { + Ok(http::Response::builder() + .status(http::StatusCode::OK) + .body("Ok".to_string()) + .unwrap()) + } + } + struct MockEmptyKeyErrorHandler; + + #[async_trait::async_trait] + impl HttpHandler for MockEmptyKeyErrorHandler { + async fn handle( + &self, + _: &str, + _: &HashMap, + ) -> crate::Result> { + error::EmptyKeySnafu {}.fail() + } + } + + #[test] + fn test_route_nest() { + let mock_handler = MockOkHandler {}; + let router = Router::new().route("/test_node", mock_handler); + let router = Router::nest("/test_root", router); + + assert_eq!(1, router.handlers.len()); + assert!(router.handlers.contains_key("/test_root/test_node")); + } + + #[should_panic] + #[test] + fn test_invalid_path() { + check_path("test_node") + } + + #[should_panic] + #[test] + fn test_empty_path() { + check_path("") + } + + #[tokio::test] + async fn test_route_call_ok() { + let mock_handler = MockOkHandler {}; + let router = Router::new().route("/test_node", mock_handler); + let router = Router::nest("/test_root", router); + + let res = router + .call("/test_root/test_node", HashMap::default()) + .await + .unwrap(); + + assert!(res.status().is_success()); + } + + #[tokio::test] + async fn test_route_call_no_handler() { + let router = Router::new(); + + let res = router + .call("/test_root/test_node", HashMap::default()) + .await + .unwrap(); + + assert_eq!(http::StatusCode::NOT_FOUND, res.status()); + } + + #[tokio::test] + async fn test_route_call_err() { + let mock_handler = MockEmptyKeyErrorHandler {}; + let router = Router::new().route("/test_node", mock_handler); + let router = Router::nest("/test_root", router); + + let res = router + .call("/test_root/test_node", HashMap::default()) + .await + .unwrap(); + + assert_eq!(http::StatusCode::INTERNAL_SERVER_ERROR, res.status()); + } +} diff --git a/src/meta-srv/src/service/admin/health.rs b/src/meta-srv/src/service/admin/health.rs new file mode 100644 index 0000000000..f24c52f656 --- /dev/null +++ b/src/meta-srv/src/service/admin/health.rs @@ -0,0 +1,36 @@ +use std::collections::HashMap; + +use tonic::codegen::http; + +use super::HttpHandler; +use crate::error::Result; + +const HTTP_OK: &str = "OK\n"; + +pub struct HealthHandler; + +#[async_trait::async_trait] +impl HttpHandler for HealthHandler { + async fn handle(&self, _: &str, _: &HashMap) -> Result> { + Ok(http::Response::builder() + .status(http::StatusCode::OK) + .body(HTTP_OK.to_owned()) + .unwrap()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_health_handle() { + let health_handler = HealthHandler {}; + let path = "any"; + let params = HashMap::default(); + let res = health_handler.handle(path, ¶ms).await.unwrap(); + + assert!(res.status().is_success()); + assert_eq!(HTTP_OK.to_owned(), res.body().to_owned()); + } +} diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs new file mode 100644 index 0000000000..6f2a9b1614 --- /dev/null +++ b/src/meta-srv/src/service/heartbeat.rs @@ -0,0 +1,159 @@ +use api::v1::meta::heartbeat_server; +use api::v1::meta::AskLeaderRequest; +use api::v1::meta::AskLeaderResponse; +use api::v1::meta::Endpoint; +use api::v1::meta::HeartbeatRequest; +use api::v1::meta::HeartbeatResponse; +use api::v1::meta::ResponseHeader; +use api::v1::meta::PROTOCOL_VERSION; +use futures::StreamExt; +use futures::TryFutureExt; +use snafu::OptionExt; +use tonic::Request; +use tonic::Response; +use tonic::Streaming; + +use super::store::kv::KvStoreRef; +use super::GrpcResult; +use super::GrpcStream; +use crate::error; +use crate::error::Result; +use crate::metasrv::MetaSrv; + +#[async_trait::async_trait] +impl heartbeat_server::Heartbeat for MetaSrv { + type HeartbeatStream = GrpcStream; + + async fn heartbeat( + &self, + req: Request>, + ) -> GrpcResult { + let msg = req + .into_inner() + .next() + .await + .context(error::StreamNoneSnafu {})??; + + let res = handle_heartbeat(msg).map_err(|e| e.into()); + + let output = futures::stream::once(res); + + Ok(Response::new(Box::pin(output))) + } + + async fn ask_leader(&self, req: Request) -> GrpcResult { + let req = req.into_inner(); + let kv_store = self.kv_store(); + let res = handle_ask_leader(req, kv_store).await?; + + Ok(Response::new(res)) + } +} + +async fn handle_heartbeat(msg: HeartbeatRequest) -> Result { + let HeartbeatRequest { header, .. } = msg; + + let res_header = ResponseHeader { + protocol_version: PROTOCOL_VERSION, + cluster_id: header.map_or(0, |h| h.cluster_id), + ..Default::default() + }; + + // TODO(jiachun) Do something high-end + + let res = HeartbeatResponse { + header: Some(res_header), + ..Default::default() + }; + + Ok(res) +} + +async fn handle_ask_leader( + req: AskLeaderRequest, + _kv_store: KvStoreRef, +) -> Result { + let AskLeaderRequest { header, .. } = req; + + let res_header = ResponseHeader { + protocol_version: PROTOCOL_VERSION, + cluster_id: header.map_or(0u64, |h| h.cluster_id), + ..Default::default() + }; + + // TODO(jiachun): return leader + let res = AskLeaderResponse { + header: Some(res_header), + leader: Some(Endpoint { + addr: "127.0.0.1:3002".to_string(), + }), + }; + + Ok(res) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::meta::heartbeat_server::Heartbeat; + use api::v1::meta::*; + use tonic::IntoRequest; + + use super::*; + use crate::metasrv::MetaSrvOptions; + use crate::service::store::kv::KvStore; + + #[derive(Clone)] + pub struct NoopKvStore { + _opts: MetaSrvOptions, + } + + impl NoopKvStore { + pub fn new(opts: MetaSrvOptions) -> Self { + Self { _opts: opts } + } + } + + #[async_trait::async_trait] + impl KvStore for NoopKvStore { + async fn range(&self, _req: RangeRequest) -> crate::Result { + unreachable!() + } + + async fn put(&self, _req: PutRequest) -> crate::Result { + unreachable!() + } + + async fn delete_range( + &self, + _req: DeleteRangeRequest, + ) -> crate::Result { + unreachable!() + } + } + + #[tokio::test] + async fn test_handle_heartbeat_resp_header() { + let header = RequestHeader::new(1, 2); + let req = HeartbeatRequest::new(header); + + let res = handle_heartbeat(req).await.unwrap(); + + assert_eq!(1, res.header.unwrap().cluster_id); + } + + #[tokio::test] + async fn test_ask_leader() { + let kv_store = Arc::new(NoopKvStore::new(MetaSrvOptions::default())); + let meta_srv = MetaSrv::new(kv_store); + + let header = RequestHeader::new(1, 1); + let req = AskLeaderRequest::new(header); + + let res = meta_srv.ask_leader(req.into_request()).await.unwrap(); + let res = res.into_inner(); + assert_eq!(1, res.header.unwrap().cluster_id); + assert_eq!("127.0.0.1:3002".to_string(), res.leader.unwrap().addr); + } +} diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs new file mode 100644 index 0000000000..b1775df5ab --- /dev/null +++ b/src/meta-srv/src/service/router.rs @@ -0,0 +1,126 @@ +use api::v1::meta::router_server; +use api::v1::meta::CreateRequest; +use api::v1::meta::CreateResponse; +use api::v1::meta::Peer; +use api::v1::meta::RouteRequest; +use api::v1::meta::RouteResponse; +use tonic::Request; +use tonic::Response; + +use super::store::kv::KvStoreRef; +use super::GrpcResult; +use crate::error::Result; +use crate::metasrv::MetaSrv; + +#[async_trait::async_trait] +impl router_server::Router for MetaSrv { + async fn route(&self, req: Request) -> GrpcResult { + let req = req.into_inner(); + let kv_store = self.kv_store(); + let res = handle_route(req, kv_store).await?; + + Ok(Response::new(res)) + } + + async fn create(&self, req: Request) -> GrpcResult { + let req = req.into_inner(); + let kv_store = self.kv_store(); + let res = handle_create(req, kv_store).await?; + + Ok(Response::new(res)) + } +} + +async fn handle_route(_req: RouteRequest, _kv_store: KvStoreRef) -> Result { + todo!() +} + +async fn handle_create(req: CreateRequest, _kv_store: KvStoreRef) -> Result { + let CreateRequest { mut regions, .. } = req; + + // TODO(jiachun): route table + for r in &mut regions { + r.peer = Some(Peer::new(0, "127.0.0.1:3000")); + } + + Ok(CreateResponse { + regions, + ..Default::default() + }) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::meta::router_server::Router; + use api::v1::meta::*; + use tonic::IntoRequest; + + use super::*; + use crate::service::store::kv::KvStore; + + struct MockKvStore; + + #[async_trait::async_trait] + impl KvStore for MockKvStore { + async fn range(&self, _req: RangeRequest) -> crate::Result { + unreachable!() + } + + async fn put(&self, _req: PutRequest) -> crate::Result { + unreachable!() + } + + async fn delete_range( + &self, + _req: DeleteRangeRequest, + ) -> crate::Result { + unreachable!() + } + } + + #[should_panic] + #[tokio::test] + async fn test_handle_route() { + let kv_store = Arc::new(MockKvStore {}); + let meta_srv = MetaSrv::new(kv_store); + + let header = RequestHeader::new(1, 1); + let req = RouteRequest::new(header); + let req = req + .add_table(TableName::new("catalog1", "schema1", "table1")) + .add_table(TableName::new("catalog1", "schema1", "table2")) + .add_table(TableName::new("catalog1", "schema1", "table3")); + + let _res = meta_srv.route(req.into_request()).await.unwrap(); + } + + #[tokio::test] + async fn test_handle_create() { + let kv_store = Arc::new(MockKvStore {}); + let meta_srv = MetaSrv::new(kv_store); + + let header = RequestHeader::new(1, 1); + let table_name = TableName::new("test_catalog", "test_db", "table1"); + let req = CreateRequest::new(header, table_name); + + let p = region::Partition::new() + .column_list(vec![b"col1".to_vec(), b"col2".to_vec()]) + .value_list(vec![b"v1".to_vec(), b"v2".to_vec()]); + let r1 = Region::new(1, "region1", p); + + let p = region::Partition::new() + .column_list(vec![b"col1".to_vec(), b"col2".to_vec()]) + .value_list(vec![b"v11".to_vec(), b"v22".to_vec()]); + let r2 = Region::new(1, "region2", p); + + let req = req.add_region(r1).add_region(r2); + + let res = meta_srv.create(req.into_request()).await.unwrap(); + + for r in res.into_inner().regions { + assert_eq!("127.0.0.1:3000", r.peer.unwrap().endpoint.unwrap().addr); + } + } +} diff --git a/src/meta-srv/src/service/store.rs b/src/meta-srv/src/service/store.rs new file mode 100644 index 0000000000..848c1a1801 --- /dev/null +++ b/src/meta-srv/src/service/store.rs @@ -0,0 +1,104 @@ +pub mod etcd; +pub mod kv; + +use api::v1::meta::store_server; +use api::v1::meta::DeleteRangeRequest; +use api::v1::meta::DeleteRangeResponse; +use api::v1::meta::PutRequest; +use api::v1::meta::PutResponse; +use api::v1::meta::RangeRequest; +use api::v1::meta::RangeResponse; +use tonic::Request; +use tonic::Response; + +use super::GrpcResult; +use crate::metasrv::MetaSrv; + +#[async_trait::async_trait] +impl store_server::Store for MetaSrv { + async fn range(&self, req: Request) -> GrpcResult { + let req = req.into_inner(); + let res = self.kv_store().range(req).await?; + + Ok(Response::new(res)) + } + + async fn put(&self, req: Request) -> GrpcResult { + let req = req.into_inner(); + let res = self.kv_store().put(req).await?; + + Ok(Response::new(res)) + } + + async fn delete_range( + &self, + req: Request, + ) -> GrpcResult { + let req = req.into_inner(); + let res = self.kv_store().delete_range(req).await?; + + Ok(Response::new(res)) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::meta::store_server::Store; + use api::v1::meta::*; + use tonic::IntoRequest; + + use super::*; + use crate::service::store::kv::KvStore; + + struct MockKvStore; + + #[async_trait::async_trait] + impl KvStore for MockKvStore { + async fn range(&self, _req: RangeRequest) -> crate::Result { + Ok(RangeResponse::default()) + } + + async fn put(&self, _req: PutRequest) -> crate::Result { + Ok(PutResponse::default()) + } + + async fn delete_range( + &self, + _req: DeleteRangeRequest, + ) -> crate::Result { + Ok(DeleteRangeResponse::default()) + } + } + + #[tokio::test] + async fn test_range() { + let kv_store = Arc::new(MockKvStore {}); + let meta_srv = MetaSrv::new(kv_store); + let req = RangeRequest::default(); + let res = meta_srv.range(req.into_request()).await; + + assert!(res.is_ok()); + } + + #[tokio::test] + async fn test_put() { + let kv_store = Arc::new(MockKvStore {}); + let meta_srv = MetaSrv::new(kv_store); + let req = PutRequest::default(); + let res = meta_srv.put(req.into_request()).await; + + assert!(res.is_ok()); + } + + #[tokio::test] + async fn test_delete_range() { + let kv_store = Arc::new(MockKvStore {}); + let meta_srv = MetaSrv::new(kv_store); + let req = DeleteRangeRequest::default(); + let res = meta_srv.delete_range(req.into_request()).await; + + assert!(res.is_ok()); + } +} diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs new file mode 100644 index 0000000000..d5256e9614 --- /dev/null +++ b/src/meta-srv/src/service/store/etcd.rs @@ -0,0 +1,280 @@ +use std::sync::Arc; + +use api::v1::meta::DeleteRangeRequest; +use api::v1::meta::DeleteRangeResponse; +use api::v1::meta::KeyValue; +use api::v1::meta::PutRequest; +use api::v1::meta::PutResponse; +use api::v1::meta::RangeRequest; +use api::v1::meta::RangeResponse; +use common_error::prelude::*; +use etcd_client::Client; +use etcd_client::DeleteOptions; +use etcd_client::GetOptions; +use etcd_client::PutOptions; + +use super::kv::KvStore; +use super::kv::KvStoreRef; +use crate::error; +use crate::error::Result; + +#[derive(Clone)] +pub struct EtcdStore { + client: Client, +} + +impl EtcdStore { + pub async fn with_endpoints(endpoints: S) -> Result + where + E: AsRef, + S: AsRef<[E]>, + { + let client = Client::connect(endpoints, None) + .await + .context(error::ConnectEtcdSnafu)?; + + Ok(Arc::new(Self { client })) + } +} + +#[async_trait::async_trait] +impl KvStore for EtcdStore { + async fn range(&self, req: RangeRequest) -> Result { + let Get { key, options } = req.try_into()?; + + let res = self + .client + .kv_client() + .get(key, options) + .await + .context(error::EtcdFailedSnafu)?; + + let kvs = res + .kvs() + .iter() + .map(|kv| KvPair::new(kv).into()) + .collect::>(); + + Ok(RangeResponse { + kvs, + more: res.more(), + ..Default::default() + }) + } + + async fn put(&self, req: PutRequest) -> Result { + let Put { + key, + value, + options, + } = req.try_into()?; + + let res = self + .client + .kv_client() + .put(key, value, options) + .await + .context(error::EtcdFailedSnafu)?; + + let prev_kv = res.prev_key().map(|kv| KvPair::new(kv).into()); + + Ok(PutResponse { + prev_kv, + ..Default::default() + }) + } + + async fn delete_range(&self, req: DeleteRangeRequest) -> Result { + let Delete { key, options } = req.try_into()?; + + let res = self + .client + .kv_client() + .delete(key, options) + .await + .context(error::EtcdFailedSnafu)?; + + let prev_kvs = res + .prev_kvs() + .iter() + .map(|kv| KvPair::new(kv).into()) + .collect::>(); + + Ok(DeleteRangeResponse { + deleted: res.deleted(), + prev_kvs, + ..Default::default() + }) + } +} + +struct Get { + key: Vec, + options: Option, +} + +impl TryFrom for Get { + type Error = error::Error; + + fn try_from(req: RangeRequest) -> Result { + let RangeRequest { + key, + range_end, + limit, + keys_only, + .. + } = req; + + ensure!(!key.is_empty(), error::EmptyKeySnafu); + + let mut options = GetOptions::default(); + if !range_end.is_empty() { + options = options.with_range(range_end); + if limit > 0 { + options = options.with_limit(limit); + } + } + if keys_only { + options = options.with_keys_only(); + } + + Ok(Get { + key, + options: Some(options), + }) + } +} + +struct Put { + key: Vec, + value: Vec, + options: Option, +} + +impl TryFrom for Put { + type Error = error::Error; + + fn try_from(req: PutRequest) -> Result { + let PutRequest { + key, + value, + prev_kv, + .. + } = req; + + let mut options = PutOptions::default(); + if prev_kv { + options = options.with_prev_key(); + } + + Ok(Put { + key, + value, + options: Some(options), + }) + } +} + +struct Delete { + key: Vec, + options: Option, +} + +impl TryFrom for Delete { + type Error = error::Error; + + fn try_from(req: DeleteRangeRequest) -> Result { + let DeleteRangeRequest { + key, + range_end, + prev_kv, + .. + } = req; + + ensure!(!key.is_empty(), error::EmptyKeySnafu); + + let mut options = DeleteOptions::default(); + if !range_end.is_empty() { + options = options.with_range(range_end); + } + if prev_kv { + options = options.with_prev_key(); + } + + Ok(Delete { + key, + options: Some(options), + }) + } +} + +struct KvPair<'a>(&'a etcd_client::KeyValue); + +impl<'a> KvPair<'a> { + /// Creates a `KvPair` from etcd KeyValue + #[inline] + const fn new(kv: &'a etcd_client::KeyValue) -> Self { + Self(kv) + } +} + +impl<'a> From> for KeyValue { + fn from(kv: KvPair<'a>) -> Self { + Self { + key: kv.0.key().to_vec(), + value: kv.0.value().to_vec(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_get() { + let req = RangeRequest { + key: b"test_key".to_vec(), + range_end: b"test_range_end".to_vec(), + limit: 64, + keys_only: true, + ..Default::default() + }; + + let get: Get = req.try_into().unwrap(); + + assert_eq!(b"test_key".to_vec(), get.key); + assert!(get.options.is_some()); + } + + #[test] + fn test_parse_put() { + let req = PutRequest { + key: b"test_key".to_vec(), + value: b"test_value".to_vec(), + prev_kv: true, + ..Default::default() + }; + + let put: Put = req.try_into().unwrap(); + + assert_eq!(b"test_key".to_vec(), put.key); + assert_eq!(b"test_value".to_vec(), put.value); + assert!(put.options.is_some()); + } + + #[test] + fn test_parse_delete() { + let req = DeleteRangeRequest { + key: b"test_key".to_vec(), + range_end: b"test_range_end".to_vec(), + prev_kv: true, + ..Default::default() + }; + + let delete: Delete = req.try_into().unwrap(); + + assert_eq!(b"test_key".to_vec(), delete.key); + assert!(delete.options.is_some()); + } +} diff --git a/src/meta-srv/src/service/store/kv.rs b/src/meta-srv/src/service/store/kv.rs new file mode 100644 index 0000000000..f079de71b7 --- /dev/null +++ b/src/meta-srv/src/service/store/kv.rs @@ -0,0 +1,21 @@ +use std::sync::Arc; + +use api::v1::meta::DeleteRangeRequest; +use api::v1::meta::DeleteRangeResponse; +use api::v1::meta::PutRequest; +use api::v1::meta::PutResponse; +use api::v1::meta::RangeRequest; +use api::v1::meta::RangeResponse; + +use crate::error::Result; + +pub type KvStoreRef = Arc; + +#[async_trait::async_trait] +pub trait KvStore: Send + Sync { + async fn range(&self, req: RangeRequest) -> Result; + + async fn put(&self, req: PutRequest) -> Result; + + async fn delete_range(&self, req: DeleteRangeRequest) -> Result; +}