feat: add grpc impl (#50)

* feat: add grpc impl

* feat: add grpc server

* some ut

* verson format: a.b

* code style

* admin request/response

* by cr

* admin api

* by cr

* chore: by cr

* chore: by cr
This commit is contained in:
Jiachun Feng
2022-07-06 20:56:16 +08:00
committed by GitHub
parent 008f62afc1
commit 6cf1da35ee
23 changed files with 524 additions and 10 deletions

97
Cargo.lock generated
View File

@@ -67,6 +67,15 @@ version = "1.0.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc"
[[package]]
name = "api"
version = "0.1.0"
dependencies = [
"prost",
"tonic",
"tonic-build",
]
[[package]]
name = "arc-swap"
version = "1.5.0"
@@ -522,6 +531,25 @@ dependencies = [
"os_str_bytes",
]
[[package]]
name = "client"
version = "0.1.0"
dependencies = [
"api",
"common-error",
"snafu",
"tonic",
]
[[package]]
name = "cmake"
version = "0.1.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8ad8cef104ac57b68b89df3208164d228503abbdce70f6880ffa3d970e7443a"
dependencies = [
"cc",
]
[[package]]
name = "cmd"
version = "0.1.0"
@@ -935,6 +963,7 @@ dependencies = [
name = "datanode"
version = "0.1.0"
dependencies = [
"api",
"arrow2",
"async-trait",
"axum",
@@ -957,6 +986,8 @@ dependencies = [
"table",
"table-engine",
"tokio",
"tokio-stream",
"tonic",
"tower",
"tower-http",
]
@@ -1889,6 +1920,12 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "multimap"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "multiversion"
version = "0.6.1"
@@ -2438,6 +2475,16 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "prettyplease"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3662417e650bd6af740f5b8b3501776aa10c3d5cbd10b40263ed250d3770884"
dependencies = [
"proc-macro2",
"syn",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
@@ -2473,14 +2520,36 @@ dependencies = [
[[package]]
name = "prost"
version = "0.10.3"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc03e116981ff7d8da8e5c220e374587b98d294af7ba7dd7fda761158f00086f"
checksum = "71adf41db68aa0daaefc69bb30bcd68ded9b9abaad5d1fbb6304c4fb390e083e"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-build"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ae5a4388762d5815a9fc0dea33c56b021cdc8dde0c55e0c9ca57197254b0cab"
dependencies = [
"bytes",
"cfg-if",
"cmake",
"heck 0.4.0",
"itertools",
"lazy_static",
"log",
"multimap",
"petgraph",
"prost",
"prost-types",
"regex",
"tempfile",
"which",
]
[[package]]
name = "prost-derive"
version = "0.10.1"
@@ -3468,6 +3537,19 @@ dependencies = [
"tracing-futures",
]
[[package]]
name = "tonic-build"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9263bf4c9bfaae7317c1c2faf7f18491d2fe476f70c414b73bf5d445b00ffa1"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"quote",
"syn",
]
[[package]]
name = "tower"
version = "0.4.12"
@@ -3859,6 +3941,17 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "which"
version = "4.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c4fb54e6113b6a8772ee41c3404fb0301ac79604489467e0a9ce1f3e97c24ae"
dependencies = [
"either",
"lazy_static",
"libc",
]
[[package]]
name = "winapi"
version = "0.3.9"

View File

@@ -1,5 +1,7 @@
[workspace]
members = [
"src/api",
"src/client",
"src/common/base",
"src/common/error",
"src/common/function",

13
src/api/Cargo.toml Normal file
View File

@@ -0,0 +1,13 @@
[package]
name = "api"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
prost = "0.10"
tonic = "0.7"
[build-dependencies]
tonic-build = "0.7"

5
src/api/build.rs Normal file
View File

@@ -0,0 +1,5 @@
fn main() {
tonic_build::configure()
.compile(&["greptime/v1/greptime.proto"], &["."])
.expect("compile proto");
}

View File

@@ -0,0 +1,9 @@
syntax = "proto3";
package greptime.v1;
// TODO(jiachun)
message AdminRequest {}
// TODO(jiachun)
message AdminResponse {}

View File

@@ -0,0 +1,52 @@
syntax = "proto3";
package greptime.v1;
message DatabaseRequest {
string name = 1;
repeated ObjectExpr exprs = 2;
}
message DatabaseResponse {
repeated ObjectResult results = 1;
}
message ObjectExpr {
ExprHeader header = 1;
InsertExpr insert = 2;
SelectExpr select = 3;
UpdateExpr update = 4;
DeleteExpr delete = 5;
}
message SelectExpr {
string select = 1; // sql, promql, etc.
}
message InsertExpr {
string table_name = 1;
repeated bytes values = 2;
}
// TODO(jiachun)
message UpdateExpr {}
// TODO(jiachun)
message DeleteExpr {}
message ObjectResult {
ResultHeader header = 1;
string schema = 2;
repeated bytes results = 3;
}
message ExprHeader {
uint32 version = 1;
}
message ResultHeader {
uint32 version = 1;
uint32 success = 2;
uint32 failure = 3;
uint32 code = 4;
string err_msg = 5;
}

View File

@@ -0,0 +1,20 @@
syntax = "proto3";
package greptime.v1;
import "greptime/v1/admin.proto";
import "greptime/v1/database.proto";
service Greptime {
rpc Batch(BatchRequest) returns (BatchResponse) {}
}
message BatchRequest {
repeated AdminRequest admins = 1;
repeated DatabaseRequest databases = 2;
}
message BatchResponse {
repeated AdminResponse admins = 1;
repeated DatabaseResponse databases = 2;
}

1
src/api/src/lib.rs Normal file
View File

@@ -0,0 +1 @@
pub mod v1;

1
src/api/src/v1.rs Normal file
View File

@@ -0,0 +1 @@
tonic::include_proto!("greptime.v1");

12
src/client/Cargo.toml Normal file
View File

@@ -0,0 +1,12 @@
[package]
name = "client"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
api = { path = "../api" }
common-error = { path = "../common/error" }
snafu = { version = "0.7", features = ["backtraces"] }
tonic = "0.7"

14
src/client/src/admin.rs Normal file
View File

@@ -0,0 +1,14 @@
use api::v1::*;
#[derive(Clone, Debug)]
pub struct Admin {
client: Client,
}
impl Admin {
pub fn new(client: Client) -> Self {
Self { client }
}
// TODO(jiachun): admin api
}

59
src/client/src/client.rs Normal file
View File

@@ -0,0 +1,59 @@
use api::v1::{greptime_client::GreptimeClient, *};
use snafu::{OptionExt, ResultExt};
use tonic::transport::Channel;
use crate::error;
use crate::Result;
#[derive(Clone, Debug)]
pub struct Client {
client: GreptimeClient<Channel>,
}
impl Client {
pub async fn connect(url: impl Into<String>) -> Result<Self> {
let url = url.into();
let client = GreptimeClient::connect(url.clone())
.await
.context(error::ConnectFailedSnafu { url })?;
Ok(Self { client })
}
pub async fn admin(&self, req: AdminRequest) -> Result<AdminResponse> {
let req = BatchRequest {
admins: vec![req],
..Default::default()
};
let mut res = self.batch(req).await?;
res.admins.pop().context(error::MissingResultSnafu {
name: "admins",
expected: 1_usize,
actual: 0_usize,
})
}
pub async fn database(&self, req: DatabaseRequest) -> Result<DatabaseResponse> {
let req = BatchRequest {
databases: vec![req],
..Default::default()
};
let mut res = self.batch(req).await?;
res.databases.pop().context(error::MissingResultSnafu {
name: "database",
expected: 1_usize,
actual: 0_usize,
})
}
pub async fn batch(&self, req: BatchRequest) -> Result<BatchResponse> {
let res = self
.client
.clone()
.batch(req)
.await
.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
}

View File

@@ -0,0 +1,76 @@
use api::v1::*;
use snafu::ensure;
use crate::error;
use crate::{Client, Result};
pub const PROTOCOL_VERSION: u32 = 1;
pub type Bytes = Vec<u8>;
#[derive(Clone, Debug)]
pub struct Database {
name: String,
client: Client,
}
impl Database {
pub fn new(name: impl Into<String>, client: Client) -> Self {
Self {
name: name.into(),
client,
}
}
pub fn name(&self) -> &str {
&self.name
}
pub async fn insert(&self, table: impl Into<String>, values: Vec<Bytes>) -> Result<()> {
let header = ExprHeader {
version: PROTOCOL_VERSION,
};
let insert = InsertExpr {
table_name: table.into(),
values,
};
let expr = ObjectExpr {
header: Some(header),
insert: Some(insert),
..Default::default()
};
self.object(expr).await?;
Ok(())
}
// TODO(jiachun) select/update/delete
async fn object(&self, expr: ObjectExpr) -> Result<ObjectResult> {
let res = self.objects(vec![expr]).await?.pop().unwrap();
Ok(res)
}
async fn objects(&self, exprs: Vec<ObjectExpr>) -> Result<Vec<ObjectResult>> {
let expr_count = exprs.len();
let req = DatabaseRequest {
name: self.name.clone(),
exprs,
};
let res = self.client.database(req).await?;
let res = res.results;
ensure!(
res.len() == expr_count,
error::MissingResultSnafu {
name: "object_results",
expected: expr_count,
actual: res.len(),
}
);
Ok(res)
}
}

27
src/client/src/error.rs Normal file
View File

@@ -0,0 +1,27 @@
use common_error::prelude::*;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Connect failed to {}, source: {}", url, source))]
ConnectFailed {
url: String,
source: tonic::transport::Error,
backtrace: Backtrace,
},
#[snafu(display("Missing {}, expected {}, actual {}", name, expected, actual))]
MissingResult {
name: String,
expected: usize,
actual: usize,
},
#[snafu(display("Tonic internal error, source: {}", source))]
TonicStatus {
source: tonic::Status,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;

9
src/client/src/lib.rs Normal file
View File

@@ -0,0 +1,9 @@
mod client;
mod database;
mod error;
pub use self::{
client::Client,
database::Database,
error::{Error, Result},
};

View File

@@ -13,4 +13,4 @@ common-error = { path = "../common/error" }
common-telemetry = { path = "../common/telemetry", features = ["deadlock_detection"]}
datanode = { path = "../datanode" }
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.18.0", features = ["full"] }
tokio = { version = "1.18", features = ["full"] }

View File

@@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
api = { path = "../api" }
async-trait = "0.1"
axum = "0.5"
axum-macros = "0.2"
@@ -25,6 +26,8 @@ store-api = { path = "../store-api" }
table = { path = "../table" }
table-engine = { path = "../table-engine" }
tokio = { version = "1.18", features = ["full"] }
tonic = "0.7"
tokio-stream = { version = "0.1.8", features = ["net"] }
tower = { version = "0.4", features = ["full"]}
tower-http = { version ="0.3", features = ["full"]}

View File

@@ -83,6 +83,15 @@ pub enum Error {
addr: String,
source: std::net::AddrParseError,
},
#[snafu(display("Fail to bind address {}, source: {}", addr, source))]
TcpBind {
addr: String,
source: std::io::Error,
},
#[snafu(display("Fail to start gRPC server, source: {}", source))]
StartGrpc { source: tonic::transport::Error },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -91,16 +100,19 @@ impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ExecuteSql { source } | Error::NewCatalog { source } => source.status_code(),
// TODO(yingwen): Further categorize http error.
Error::StartHttp { .. } | Error::ParseAddr { .. } => StatusCode::Internal,
Error::CreateTable { source, .. } => source.status_code(),
Error::GetTable { source, .. } => source.status_code(),
Error::Insert { source, .. } => source.status_code(),
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
Error::ColumnValuesNumberMismatch { .. }
| Error::ParseSqlValue { .. }
| Error::ColumnTypeMismatch { .. } => StatusCode::InvalidArguments,
Error::Insert { source, .. } => source.status_code(),
// TODO(yingwen): Further categorize http error.
Error::StartHttp { .. }
| Error::ParseAddr { .. }
| Error::TcpBind { .. }
| Error::StartGrpc { .. } => StatusCode::Internal,
}
}
@@ -113,6 +125,12 @@ impl ErrorExt for Error {
}
}
impl From<Error> for tonic::Status {
fn from(err: Error) -> Self {
tonic::Status::new(tonic::Code::Internal, err.to_string())
}
}
#[cfg(test)]
mod tests {
use common_error::mock::MockError;
@@ -130,11 +148,18 @@ mod tests {
assert_eq!(StatusCode::Internal, err.status_code());
}
fn assert_tonic_internal_error(err: Error) {
let s: tonic::Status = err.into();
assert_eq!(s.code(), tonic::Code::Internal);
}
#[test]
fn test_error() {
let err = throw_query_error().context(ExecuteSqlSnafu).err().unwrap();
assert_internal_error(&err);
assert_tonic_internal_error(err);
let err = throw_query_error().context(NewCatalogSnafu).err().unwrap();
assert_internal_error(&err);
assert_tonic_internal_error(err);
}
}

View File

@@ -1,7 +1,9 @@
pub mod grpc;
pub mod http;
use grpc::GrpcServer;
use http::HttpServer;
use tokio::try_join;
use crate::datanode::DatanodeOptions;
use crate::error::Result;
@@ -10,16 +12,22 @@ use crate::instance::InstanceRef;
/// All rpc services.
pub struct Services {
http_server: HttpServer,
grpc_server: GrpcServer,
}
impl Services {
pub fn new(instance: InstanceRef) -> Self {
Self {
http_server: HttpServer::new(instance),
http_server: HttpServer::new(instance.clone()),
grpc_server: GrpcServer::new(instance),
}
}
pub async fn start(&self, opts: &DatanodeOptions) -> Result<()> {
self.http_server.start(&opts.http_addr).await
try_join!(
self.http_server.start(&opts.http_addr),
self.grpc_server.start(&opts.rpc_addr)
)?;
Ok(())
}
}

View File

@@ -1 +1,41 @@
mod processors;
use common_telemetry::logging::info;
use snafu::ResultExt;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use crate::{
error::{Result, StartGrpcSnafu, TcpBindSnafu},
instance::InstanceRef,
server::grpc::{handler::BatchHandler, server::Server},
};
mod handler;
mod server;
pub struct GrpcServer {
handler: BatchHandler,
}
impl GrpcServer {
pub fn new(instance: InstanceRef) -> Self {
Self {
handler: BatchHandler::new(instance),
}
}
pub async fn start(&self, addr: &str) -> Result<()> {
let listener = TcpListener::bind(addr)
.await
.context(TcpBindSnafu { addr })?;
let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
info!("The gRPC server is running at {}", addr);
let svc = Server::new(self.handler.clone()).into_service();
let _ = tonic::transport::Server::builder()
.add_service(svc)
.serve_with_incoming(TcpListenerStream::new(listener))
.await
.context(StartGrpcSnafu)?;
Ok(())
}
}

View File

@@ -0,0 +1,18 @@
use api::v1::*;
use crate::{error::Result, instance::InstanceRef};
#[derive(Clone)]
pub struct BatchHandler {}
impl BatchHandler {
pub fn new(_instance: InstanceRef) -> Self {
Self {}
}
pub async fn batch(&self, mut batch_req: BatchRequest) -> Result<BatchResponse> {
let batch_res = BatchResponse::default();
let _databases = std::mem::take(&mut batch_req.databases);
Ok(batch_res)
}
}

View File

@@ -1 +0,0 @@

View File

@@ -0,0 +1,28 @@
use api::v1::*;
use tonic::{Request, Response, Status};
use super::handler::BatchHandler;
#[derive(Clone)]
pub struct Server {
handler: BatchHandler,
}
impl Server {
pub fn new(handler: BatchHandler) -> Self {
Self { handler }
}
pub fn into_service(self) -> greptime_server::GreptimeServer<Self> {
greptime_server::GreptimeServer::new(self)
}
}
#[tonic::async_trait]
impl greptime_server::Greptime for Server {
async fn batch(&self, req: Request<BatchRequest>) -> Result<Response<BatchResponse>, Status> {
let req = req.into_inner();
let res = self.handler.batch(req).await?;
Ok(Response::new(res))
}
}