From 6cf1da35eef7b0c9a5dc21bcd1eb6120e0fbc6dc Mon Sep 17 00:00:00 2001 From: Jiachun Feng Date: Wed, 6 Jul 2022 20:56:16 +0800 Subject: [PATCH] feat: add grpc impl (#50) * feat: add grpc impl * feat: add grpc server * some ut * verson format: a.b * code style * admin request/response * by cr * admin api * by cr * chore: by cr * chore: by cr --- Cargo.lock | 97 +++++++++++++++++++++- Cargo.toml | 2 + src/api/Cargo.toml | 13 +++ src/api/build.rs | 5 ++ src/api/greptime/v1/admin.proto | 9 ++ src/api/greptime/v1/database.proto | 52 ++++++++++++ src/api/greptime/v1/greptime.proto | 20 +++++ src/api/src/lib.rs | 1 + src/api/src/v1.rs | 1 + src/client/Cargo.toml | 12 +++ src/client/src/admin.rs | 14 ++++ src/client/src/client.rs | 59 +++++++++++++ src/client/src/database.rs | 76 +++++++++++++++++ src/client/src/error.rs | 27 ++++++ src/client/src/lib.rs | 9 ++ src/cmd/Cargo.toml | 2 +- src/datanode/Cargo.toml | 3 + src/datanode/src/error.rs | 31 ++++++- src/datanode/src/server.rs | 12 ++- src/datanode/src/server/grpc.rs | 42 +++++++++- src/datanode/src/server/grpc/handler.rs | 18 ++++ src/datanode/src/server/grpc/processors.rs | 1 - src/datanode/src/server/grpc/server.rs | 28 +++++++ 23 files changed, 524 insertions(+), 10 deletions(-) create mode 100644 src/api/Cargo.toml create mode 100644 src/api/build.rs create mode 100644 src/api/greptime/v1/admin.proto create mode 100644 src/api/greptime/v1/database.proto create mode 100644 src/api/greptime/v1/greptime.proto create mode 100644 src/api/src/lib.rs create mode 100644 src/api/src/v1.rs create mode 100644 src/client/Cargo.toml create mode 100644 src/client/src/admin.rs create mode 100644 src/client/src/client.rs create mode 100644 src/client/src/database.rs create mode 100644 src/client/src/error.rs create mode 100644 src/client/src/lib.rs create mode 100644 src/datanode/src/server/grpc/handler.rs delete mode 100644 src/datanode/src/server/grpc/processors.rs create mode 100644 src/datanode/src/server/grpc/server.rs diff --git a/Cargo.lock b/Cargo.lock index 73f3d47f9b..a8cd94147c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -67,6 +67,15 @@ version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc" +[[package]] +name = "api" +version = "0.1.0" +dependencies = [ + "prost", + "tonic", + "tonic-build", +] + [[package]] name = "arc-swap" version = "1.5.0" @@ -522,6 +531,25 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "client" +version = "0.1.0" +dependencies = [ + "api", + "common-error", + "snafu", + "tonic", +] + +[[package]] +name = "cmake" +version = "0.1.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8ad8cef104ac57b68b89df3208164d228503abbdce70f6880ffa3d970e7443a" +dependencies = [ + "cc", +] + [[package]] name = "cmd" version = "0.1.0" @@ -935,6 +963,7 @@ dependencies = [ name = "datanode" version = "0.1.0" dependencies = [ + "api", "arrow2", "async-trait", "axum", @@ -957,6 +986,8 @@ dependencies = [ "table", "table-engine", "tokio", + "tokio-stream", + "tonic", "tower", "tower-http", ] @@ -1889,6 +1920,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "multiversion" version = "0.6.1" @@ -2438,6 +2475,16 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "prettyplease" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3662417e650bd6af740f5b8b3501776aa10c3d5cbd10b40263ed250d3770884" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -2473,14 +2520,36 @@ dependencies = [ [[package]] name = "prost" -version = "0.10.3" +version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc03e116981ff7d8da8e5c220e374587b98d294af7ba7dd7fda761158f00086f" +checksum = "71adf41db68aa0daaefc69bb30bcd68ded9b9abaad5d1fbb6304c4fb390e083e" dependencies = [ "bytes", "prost-derive", ] +[[package]] +name = "prost-build" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae5a4388762d5815a9fc0dea33c56b021cdc8dde0c55e0c9ca57197254b0cab" +dependencies = [ + "bytes", + "cfg-if", + "cmake", + "heck 0.4.0", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prost", + "prost-types", + "regex", + "tempfile", + "which", +] + [[package]] name = "prost-derive" version = "0.10.1" @@ -3468,6 +3537,19 @@ dependencies = [ "tracing-futures", ] +[[package]] +name = "tonic-build" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9263bf4c9bfaae7317c1c2faf7f18491d2fe476f70c414b73bf5d445b00ffa1" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn", +] + [[package]] name = "tower" version = "0.4.12" @@ -3859,6 +3941,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "which" +version = "4.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c4fb54e6113b6a8772ee41c3404fb0301ac79604489467e0a9ce1f3e97c24ae" +dependencies = [ + "either", + "lazy_static", + "libc", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 2b13f34b41..eb662fbdb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,7 @@ [workspace] members = [ + "src/api", + "src/client", "src/common/base", "src/common/error", "src/common/function", diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml new file mode 100644 index 0000000000..7def8aec3b --- /dev/null +++ b/src/api/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "api" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +prost = "0.10" +tonic = "0.7" + +[build-dependencies] +tonic-build = "0.7" diff --git a/src/api/build.rs b/src/api/build.rs new file mode 100644 index 0000000000..70ce3453be --- /dev/null +++ b/src/api/build.rs @@ -0,0 +1,5 @@ +fn main() { + tonic_build::configure() + .compile(&["greptime/v1/greptime.proto"], &["."]) + .expect("compile proto"); +} diff --git a/src/api/greptime/v1/admin.proto b/src/api/greptime/v1/admin.proto new file mode 100644 index 0000000000..327b5d75b6 --- /dev/null +++ b/src/api/greptime/v1/admin.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; + +package greptime.v1; + +// TODO(jiachun) +message AdminRequest {} + +// TODO(jiachun) +message AdminResponse {} diff --git a/src/api/greptime/v1/database.proto b/src/api/greptime/v1/database.proto new file mode 100644 index 0000000000..bd522bcdc7 --- /dev/null +++ b/src/api/greptime/v1/database.proto @@ -0,0 +1,52 @@ +syntax = "proto3"; + +package greptime.v1; + +message DatabaseRequest { + string name = 1; + repeated ObjectExpr exprs = 2; +} + +message DatabaseResponse { + repeated ObjectResult results = 1; +} + +message ObjectExpr { + ExprHeader header = 1; + InsertExpr insert = 2; + SelectExpr select = 3; + UpdateExpr update = 4; + DeleteExpr delete = 5; +} + +message SelectExpr { + string select = 1; // sql, promql, etc. +} + +message InsertExpr { + string table_name = 1; + repeated bytes values = 2; +} + +// TODO(jiachun) +message UpdateExpr {} +// TODO(jiachun) +message DeleteExpr {} + +message ObjectResult { + ResultHeader header = 1; + string schema = 2; + repeated bytes results = 3; +} + +message ExprHeader { + uint32 version = 1; +} + +message ResultHeader { + uint32 version = 1; + uint32 success = 2; + uint32 failure = 3; + uint32 code = 4; + string err_msg = 5; +} diff --git a/src/api/greptime/v1/greptime.proto b/src/api/greptime/v1/greptime.proto new file mode 100644 index 0000000000..f8c2c3aabd --- /dev/null +++ b/src/api/greptime/v1/greptime.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +package greptime.v1; + +import "greptime/v1/admin.proto"; +import "greptime/v1/database.proto"; + +service Greptime { + rpc Batch(BatchRequest) returns (BatchResponse) {} +} + +message BatchRequest { + repeated AdminRequest admins = 1; + repeated DatabaseRequest databases = 2; +} + +message BatchResponse { + repeated AdminResponse admins = 1; + repeated DatabaseResponse databases = 2; +} diff --git a/src/api/src/lib.rs b/src/api/src/lib.rs new file mode 100644 index 0000000000..a3a6d96c3f --- /dev/null +++ b/src/api/src/lib.rs @@ -0,0 +1 @@ +pub mod v1; diff --git a/src/api/src/v1.rs b/src/api/src/v1.rs new file mode 100644 index 0000000000..cc09b9e9ab --- /dev/null +++ b/src/api/src/v1.rs @@ -0,0 +1 @@ +tonic::include_proto!("greptime.v1"); diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml new file mode 100644 index 0000000000..3ad86d4e52 --- /dev/null +++ b/src/client/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "client" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +api = { path = "../api" } +common-error = { path = "../common/error" } +snafu = { version = "0.7", features = ["backtraces"] } +tonic = "0.7" diff --git a/src/client/src/admin.rs b/src/client/src/admin.rs new file mode 100644 index 0000000000..2eada14a2f --- /dev/null +++ b/src/client/src/admin.rs @@ -0,0 +1,14 @@ +use api::v1::*; + +#[derive(Clone, Debug)] +pub struct Admin { + client: Client, +} + +impl Admin { + pub fn new(client: Client) -> Self { + Self { client } + } + + // TODO(jiachun): admin api +} diff --git a/src/client/src/client.rs b/src/client/src/client.rs new file mode 100644 index 0000000000..bd46a0add0 --- /dev/null +++ b/src/client/src/client.rs @@ -0,0 +1,59 @@ +use api::v1::{greptime_client::GreptimeClient, *}; +use snafu::{OptionExt, ResultExt}; +use tonic::transport::Channel; + +use crate::error; +use crate::Result; + +#[derive(Clone, Debug)] +pub struct Client { + client: GreptimeClient, +} + +impl Client { + pub async fn connect(url: impl Into) -> Result { + let url = url.into(); + let client = GreptimeClient::connect(url.clone()) + .await + .context(error::ConnectFailedSnafu { url })?; + Ok(Self { client }) + } + + pub async fn admin(&self, req: AdminRequest) -> Result { + let req = BatchRequest { + admins: vec![req], + ..Default::default() + }; + + let mut res = self.batch(req).await?; + res.admins.pop().context(error::MissingResultSnafu { + name: "admins", + expected: 1_usize, + actual: 0_usize, + }) + } + + pub async fn database(&self, req: DatabaseRequest) -> Result { + let req = BatchRequest { + databases: vec![req], + ..Default::default() + }; + + let mut res = self.batch(req).await?; + res.databases.pop().context(error::MissingResultSnafu { + name: "database", + expected: 1_usize, + actual: 0_usize, + }) + } + + pub async fn batch(&self, req: BatchRequest) -> Result { + let res = self + .client + .clone() + .batch(req) + .await + .context(error::TonicStatusSnafu)?; + Ok(res.into_inner()) + } +} diff --git a/src/client/src/database.rs b/src/client/src/database.rs new file mode 100644 index 0000000000..fc58a1cdc0 --- /dev/null +++ b/src/client/src/database.rs @@ -0,0 +1,76 @@ +use api::v1::*; +use snafu::ensure; + +use crate::error; +use crate::{Client, Result}; + +pub const PROTOCOL_VERSION: u32 = 1; + +pub type Bytes = Vec; + +#[derive(Clone, Debug)] +pub struct Database { + name: String, + client: Client, +} + +impl Database { + pub fn new(name: impl Into, client: Client) -> Self { + Self { + name: name.into(), + client, + } + } + + pub fn name(&self) -> &str { + &self.name + } + + pub async fn insert(&self, table: impl Into, values: Vec) -> Result<()> { + let header = ExprHeader { + version: PROTOCOL_VERSION, + }; + let insert = InsertExpr { + table_name: table.into(), + values, + }; + let expr = ObjectExpr { + header: Some(header), + insert: Some(insert), + ..Default::default() + }; + + self.object(expr).await?; + + Ok(()) + } + + // TODO(jiachun) select/update/delete + + async fn object(&self, expr: ObjectExpr) -> Result { + let res = self.objects(vec![expr]).await?.pop().unwrap(); + Ok(res) + } + + async fn objects(&self, exprs: Vec) -> Result> { + let expr_count = exprs.len(); + let req = DatabaseRequest { + name: self.name.clone(), + exprs, + }; + + let res = self.client.database(req).await?; + let res = res.results; + + ensure!( + res.len() == expr_count, + error::MissingResultSnafu { + name: "object_results", + expected: expr_count, + actual: res.len(), + } + ); + + Ok(res) + } +} diff --git a/src/client/src/error.rs b/src/client/src/error.rs new file mode 100644 index 0000000000..974b05678b --- /dev/null +++ b/src/client/src/error.rs @@ -0,0 +1,27 @@ +use common_error::prelude::*; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Connect failed to {}, source: {}", url, source))] + ConnectFailed { + url: String, + source: tonic::transport::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Missing {}, expected {}, actual {}", name, expected, actual))] + MissingResult { + name: String, + expected: usize, + actual: usize, + }, + + #[snafu(display("Tonic internal error, source: {}", source))] + TonicStatus { + source: tonic::Status, + backtrace: Backtrace, + }, +} + +pub type Result = std::result::Result; diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs new file mode 100644 index 0000000000..df2aca5339 --- /dev/null +++ b/src/client/src/lib.rs @@ -0,0 +1,9 @@ +mod client; +mod database; +mod error; + +pub use self::{ + client::Client, + database::Database, + error::{Error, Result}, +}; diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 7552d838ee..974cbd9ecd 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -13,4 +13,4 @@ common-error = { path = "../common/error" } common-telemetry = { path = "../common/telemetry", features = ["deadlock_detection"]} datanode = { path = "../datanode" } snafu = { version = "0.7", features = ["backtraces"] } -tokio = { version = "1.18.0", features = ["full"] } +tokio = { version = "1.18", features = ["full"] } diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 11419444d2..8a9089c483 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +api = { path = "../api" } async-trait = "0.1" axum = "0.5" axum-macros = "0.2" @@ -25,6 +26,8 @@ store-api = { path = "../store-api" } table = { path = "../table" } table-engine = { path = "../table-engine" } tokio = { version = "1.18", features = ["full"] } +tonic = "0.7" +tokio-stream = { version = "0.1.8", features = ["net"] } tower = { version = "0.4", features = ["full"]} tower-http = { version ="0.3", features = ["full"]} diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 1efa11b499..f8e45608c4 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -83,6 +83,15 @@ pub enum Error { addr: String, source: std::net::AddrParseError, }, + + #[snafu(display("Fail to bind address {}, source: {}", addr, source))] + TcpBind { + addr: String, + source: std::io::Error, + }, + + #[snafu(display("Fail to start gRPC server, source: {}", source))] + StartGrpc { source: tonic::transport::Error }, } pub type Result = std::result::Result; @@ -91,16 +100,19 @@ impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { Error::ExecuteSql { source } | Error::NewCatalog { source } => source.status_code(), - // TODO(yingwen): Further categorize http error. - Error::StartHttp { .. } | Error::ParseAddr { .. } => StatusCode::Internal, Error::CreateTable { source, .. } => source.status_code(), Error::GetTable { source, .. } => source.status_code(), + Error::Insert { source, .. } => source.status_code(), Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound, Error::ColumnValuesNumberMismatch { .. } | Error::ParseSqlValue { .. } | Error::ColumnTypeMismatch { .. } => StatusCode::InvalidArguments, - Error::Insert { source, .. } => source.status_code(), + // TODO(yingwen): Further categorize http error. + Error::StartHttp { .. } + | Error::ParseAddr { .. } + | Error::TcpBind { .. } + | Error::StartGrpc { .. } => StatusCode::Internal, } } @@ -113,6 +125,12 @@ impl ErrorExt for Error { } } +impl From for tonic::Status { + fn from(err: Error) -> Self { + tonic::Status::new(tonic::Code::Internal, err.to_string()) + } +} + #[cfg(test)] mod tests { use common_error::mock::MockError; @@ -130,11 +148,18 @@ mod tests { assert_eq!(StatusCode::Internal, err.status_code()); } + fn assert_tonic_internal_error(err: Error) { + let s: tonic::Status = err.into(); + assert_eq!(s.code(), tonic::Code::Internal); + } + #[test] fn test_error() { let err = throw_query_error().context(ExecuteSqlSnafu).err().unwrap(); assert_internal_error(&err); + assert_tonic_internal_error(err); let err = throw_query_error().context(NewCatalogSnafu).err().unwrap(); assert_internal_error(&err); + assert_tonic_internal_error(err); } } diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index 8ad559d59d..feb8e9b235 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -1,7 +1,9 @@ pub mod grpc; pub mod http; +use grpc::GrpcServer; use http::HttpServer; +use tokio::try_join; use crate::datanode::DatanodeOptions; use crate::error::Result; @@ -10,16 +12,22 @@ use crate::instance::InstanceRef; /// All rpc services. pub struct Services { http_server: HttpServer, + grpc_server: GrpcServer, } impl Services { pub fn new(instance: InstanceRef) -> Self { Self { - http_server: HttpServer::new(instance), + http_server: HttpServer::new(instance.clone()), + grpc_server: GrpcServer::new(instance), } } pub async fn start(&self, opts: &DatanodeOptions) -> Result<()> { - self.http_server.start(&opts.http_addr).await + try_join!( + self.http_server.start(&opts.http_addr), + self.grpc_server.start(&opts.rpc_addr) + )?; + Ok(()) } } diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 1e0df3b152..064745234c 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -1 +1,41 @@ -mod processors; +use common_telemetry::logging::info; +use snafu::ResultExt; +use tokio::net::TcpListener; +use tokio_stream::wrappers::TcpListenerStream; + +use crate::{ + error::{Result, StartGrpcSnafu, TcpBindSnafu}, + instance::InstanceRef, + server::grpc::{handler::BatchHandler, server::Server}, +}; + +mod handler; +mod server; + +pub struct GrpcServer { + handler: BatchHandler, +} + +impl GrpcServer { + pub fn new(instance: InstanceRef) -> Self { + Self { + handler: BatchHandler::new(instance), + } + } + + pub async fn start(&self, addr: &str) -> Result<()> { + let listener = TcpListener::bind(addr) + .await + .context(TcpBindSnafu { addr })?; + let addr = listener.local_addr().context(TcpBindSnafu { addr })?; + info!("The gRPC server is running at {}", addr); + + let svc = Server::new(self.handler.clone()).into_service(); + let _ = tonic::transport::Server::builder() + .add_service(svc) + .serve_with_incoming(TcpListenerStream::new(listener)) + .await + .context(StartGrpcSnafu)?; + Ok(()) + } +} diff --git a/src/datanode/src/server/grpc/handler.rs b/src/datanode/src/server/grpc/handler.rs new file mode 100644 index 0000000000..2eaa75a05b --- /dev/null +++ b/src/datanode/src/server/grpc/handler.rs @@ -0,0 +1,18 @@ +use api::v1::*; + +use crate::{error::Result, instance::InstanceRef}; + +#[derive(Clone)] +pub struct BatchHandler {} + +impl BatchHandler { + pub fn new(_instance: InstanceRef) -> Self { + Self {} + } + + pub async fn batch(&self, mut batch_req: BatchRequest) -> Result { + let batch_res = BatchResponse::default(); + let _databases = std::mem::take(&mut batch_req.databases); + Ok(batch_res) + } +} diff --git a/src/datanode/src/server/grpc/processors.rs b/src/datanode/src/server/grpc/processors.rs deleted file mode 100644 index 8b13789179..0000000000 --- a/src/datanode/src/server/grpc/processors.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/datanode/src/server/grpc/server.rs b/src/datanode/src/server/grpc/server.rs new file mode 100644 index 0000000000..0661d15ebb --- /dev/null +++ b/src/datanode/src/server/grpc/server.rs @@ -0,0 +1,28 @@ +use api::v1::*; +use tonic::{Request, Response, Status}; + +use super::handler::BatchHandler; + +#[derive(Clone)] +pub struct Server { + handler: BatchHandler, +} + +impl Server { + pub fn new(handler: BatchHandler) -> Self { + Self { handler } + } + + pub fn into_service(self) -> greptime_server::GreptimeServer { + greptime_server::GreptimeServer::new(self) + } +} + +#[tonic::async_trait] +impl greptime_server::Greptime for Server { + async fn batch(&self, req: Request) -> Result, Status> { + let req = req.into_inner(); + let res = self.handler.batch(req).await?; + Ok(Response::new(res)) + } +}