From ccda17248eb5cd314f8cccc317b948e10b6f153d Mon Sep 17 00:00:00 2001 From: LFC Date: Wed, 17 Aug 2022 14:29:12 +0800 Subject: [PATCH] feat: unify servers and mysql server in datanode (#172) * address PR comments address PR comments use 3306 for mysql server's default port upgrade metric to version 0.20 move crate "servers" out of "common" make mysql io threads count configurable in config file add snafu backtrace for errors with source use common-server error for mysql server add test for grpc server refactor testing codes fix rustfmt check start mysql server in datanode move grpc server codes from datanode to common-servers feat: unify servers * rebase develop and resolve conflicts * remove an unnecessary todo Co-authored-by: luofucong --- Cargo.lock | 129 ++++++----- Cargo.toml | 2 +- config/datanode.example.toml | 3 + src/client/src/database.rs | 2 +- src/client/src/lib.rs | 2 +- src/cmd/src/datanode.rs | 37 +++- src/common/runtime/Cargo.toml | 2 +- src/common/servers/Cargo.toml | 27 --- src/common/servers/src/error.rs | 28 --- src/common/servers/src/lib.rs | 3 - src/common/servers/src/mysql/error.rs | 60 ------ src/common/servers/src/mysql/mod.rs | 4 - src/common/servers/tests/mod.rs | 1 - src/common/telemetry/Cargo.toml | 4 +- src/common/telemetry/src/metric.rs | 11 +- src/datanode/Cargo.toml | 5 +- src/datanode/src/datanode.rs | 10 +- src/datanode/src/error.rs | 18 +- src/datanode/src/instance.rs | 202 ++++++------------ src/datanode/src/lib.rs | 5 - src/datanode/src/server.rs | 57 +++-- src/datanode/src/server/grpc.rs | 44 +--- src/datanode/src/server/grpc/handler.rs | 82 +------ src/datanode/src/server/grpc/select.rs | 2 +- src/datanode/src/server/grpc/server.rs | 30 --- src/datanode/src/server/http/handler.rs | 123 ----------- src/datanode/src/sql.rs | 1 - src/datanode/src/test_util.rs | 32 --- src/datanode/src/tests.rs | 1 - src/datanode/tests/grpc_test.rs | 113 ++++++++++ src/datanode/{src => }/tests/http_test.rs | 9 +- src/datanode/tests/instance_test.rs | 80 +++++++ src/datanode/tests/test_util.rs | 85 ++++++++ src/query/Cargo.toml | 2 +- src/servers/Cargo.toml | 36 ++++ src/servers/src/error.rs | 89 ++++++++ src/servers/src/grpc.rs | 72 +++++++ src/servers/src/grpc/handler.rs | 29 +++ .../src/server => servers/src}/http.rs | 54 +++-- src/servers/src/http/handler.rs | 35 +++ src/servers/src/lib.rs | 6 + .../src/mysql/handler.rs} | 23 +- src/servers/src/mysql/mod.rs | 3 + .../src/mysql/server.rs} | 41 ++-- .../src/mysql/writer.rs} | 2 +- src/servers/src/query_handler.rs | 30 +++ src/{common => }/servers/src/server.rs | 0 src/servers/tests/http/http_handler_test.rs | 78 +++++++ src/servers/tests/http/mod.rs | 1 + src/servers/tests/mod.rs | 42 ++++ src/{common => }/servers/tests/mysql/mod.rs | 0 .../servers/tests/mysql/mysql_server_test.rs | 45 +--- .../servers/tests/mysql/mysql_writer_test.rs | 2 +- 53 files changed, 1036 insertions(+), 768 deletions(-) delete mode 100644 src/common/servers/Cargo.toml delete mode 100644 src/common/servers/src/error.rs delete mode 100644 src/common/servers/src/lib.rs delete mode 100644 src/common/servers/src/mysql/error.rs delete mode 100644 src/common/servers/src/mysql/mod.rs delete mode 100644 src/common/servers/tests/mod.rs delete mode 100644 src/datanode/src/server/grpc/server.rs delete mode 100644 src/datanode/src/server/http/handler.rs delete mode 100644 src/datanode/src/test_util.rs delete mode 100644 src/datanode/src/tests.rs create mode 100644 src/datanode/tests/grpc_test.rs rename src/datanode/{src => }/tests/http_test.rs (93%) create mode 100644 src/datanode/tests/instance_test.rs create mode 100644 src/datanode/tests/test_util.rs create mode 100644 src/servers/Cargo.toml create mode 100644 src/servers/src/error.rs create mode 100644 src/servers/src/grpc.rs create mode 100644 src/servers/src/grpc/handler.rs rename src/{datanode/src/server => servers/src}/http.rs (73%) create mode 100644 src/servers/src/http/handler.rs create mode 100644 src/servers/src/lib.rs rename src/{common/servers/src/mysql/mysql_instance.rs => servers/src/mysql/handler.rs} (72%) create mode 100644 src/servers/src/mysql/mod.rs rename src/{common/servers/src/mysql/mysql_server.rs => servers/src/mysql/server.rs} (78%) rename src/{common/servers/src/mysql/mysql_writer.rs => servers/src/mysql/writer.rs} (99%) create mode 100644 src/servers/src/query_handler.rs rename src/{common => }/servers/src/server.rs (100%) create mode 100644 src/servers/tests/http/http_handler_test.rs create mode 100644 src/servers/tests/http/mod.rs create mode 100644 src/servers/tests/mod.rs rename src/{common => }/servers/tests/mysql/mod.rs (100%) rename src/{common => }/servers/tests/mysql/mysql_server_test.rs (79%) rename src/{common => }/servers/tests/mysql/mysql_writer_test.rs (94%) diff --git a/Cargo.lock b/Cargo.lock index 59291525d9..9675130378 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -212,15 +212,6 @@ dependencies = [ "syn", ] -[[package]] -name = "atomic-shim" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67cd4b51d303cf3501c301e8125df442128d3c6d7c69f71b27833d253de47e77" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "atomic_float" version = "0.1.0" @@ -799,7 +790,7 @@ version = "0.1.0" dependencies = [ "common-error", "common-telemetry", - "metrics 0.18.1", + "metrics 0.20.1", "once_cell", "paste", "snafu", @@ -807,31 +798,6 @@ dependencies = [ "tokio-test", ] -[[package]] -name = "common-servers" -version = "0.1.0" -dependencies = [ - "async-trait", - "catalog", - "common-base", - "common-error", - "common-recordbatch", - "common-runtime", - "common-telemetry", - "datatypes", - "futures", - "metrics 0.20.1", - "mysql_async", - "num_cpus", - "opensrv-mysql", - "query", - "rand 0.8.5", - "snafu", - "test-util", - "tokio", - "tokio-stream", -] - [[package]] name = "common-telemetry" version = "0.1.0" @@ -839,7 +805,7 @@ dependencies = [ "backtrace", "common-error", "console-subscriber", - "metrics 0.18.1", + "metrics 0.20.1", "metrics-exporter-prometheus", "once_cell", "opentelemetry", @@ -1267,20 +1233,23 @@ dependencies = [ "axum-macros", "axum-test-helper", "catalog", + "client", "common-base", "common-error", "common-query", "common-recordbatch", + "common-runtime", "common-telemetry", "datafusion", "datatypes", "hyper", "log-store", - "metrics 0.18.1", + "metrics 0.20.1", "object-store", "query", "serde", "serde_json", + "servers", "snafu", "sql", "storage", @@ -1744,9 +1713,6 @@ name = "hashbrown" version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" -dependencies = [ - "ahash", -] [[package]] name = "hashbrown" @@ -2306,16 +2272,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "metrics" -version = "0.18.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e52eb6380b6d2a10eb3434aec0885374490f5b82c8aaf5cd487a183c98be834" -dependencies = [ - "ahash", - "metrics-macros 0.5.1", -] - [[package]] name = "metrics" version = "0.19.0" @@ -2339,14 +2295,15 @@ dependencies = [ [[package]] name = "metrics-exporter-prometheus" -version = "0.9.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b93b470b04c005178058e18ac8bb2eb3fda562cf87af5ea05ba8d44190d458c" +checksum = "8603921e1f54ef386189335f288441af761e0fc61bcb552168d9cedfe63ebc70" dependencies = [ "indexmap", - "metrics 0.18.1", + "metrics 0.20.1", "metrics-util", - "parking_lot 0.11.2", + "parking_lot 0.12.0", + "portable-atomic", "quanta", "thiserror", ] @@ -2375,17 +2332,17 @@ dependencies = [ [[package]] name = "metrics-util" -version = "0.12.1" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65a9e83b833e1d2e07010a386b197c13aa199bbd0fca5cf69bfa147972db890a" +checksum = "f7d24dc2dbae22bff6f1f9326ffce828c9f07ef9cc1e8002e5279f845432a30a" dependencies = [ - "atomic-shim", "crossbeam-epoch", "crossbeam-utils", - "hashbrown 0.11.2", - "metrics 0.18.1", + "hashbrown 0.12.1", + "metrics 0.20.1", "num_cpus", - "parking_lot 0.11.2", + "parking_lot 0.12.0", + "portable-atomic", "quanta", "sketches-ddsketch", ] @@ -3166,9 +3123,9 @@ dependencies = [ [[package]] name = "portable-atomic" -version = "0.3.7" +version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ef3e12daa83946e79a4e22dff6ff8154138bfb34bef1769ec80c92bc3aa88e3" +checksum = "b303a15aeda678da614ab23306232dbd282d532f8c5919cedd41b66b9dc96560" [[package]] name = "ppv-lite86" @@ -3313,9 +3270,9 @@ dependencies = [ [[package]] name = "quanta" -version = "0.9.3" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20afe714292d5e879d8b12740aa223c6a88f118af41870e8b6196e39a02238a8" +checksum = "b7e31331286705f455e56cca62e0e717158474ff02b7936c1fa596d983f4ae27" dependencies = [ "crossbeam-utils", "libc", @@ -3345,7 +3302,7 @@ dependencies = [ "datatypes", "futures", "futures-util", - "metrics 0.18.1", + "metrics 0.20.1", "num", "num-traits", "rand 0.8.5", @@ -3453,9 +3410,9 @@ dependencies = [ [[package]] name = "raw-cpuid" -version = "10.3.0" +version = "10.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "738bc47119e3eeccc7e94c4a506901aea5e7b4944ecd0829cbebf4af04ceda12" +checksum = "2c49596760fce12ca21550ac21dc5a9617b2ea4b6e0aa7d8dab8ff2824fc2bba" dependencies = [ "bitflags", ] @@ -3803,6 +3760,40 @@ dependencies = [ "serde", ] +[[package]] +name = "servers" +version = "0.1.0" +dependencies = [ + "api", + "async-trait", + "axum", + "axum-macros", + "catalog", + "common-base", + "common-error", + "common-recordbatch", + "common-runtime", + "common-telemetry", + "datatypes", + "futures", + "hyper", + "metrics 0.20.1", + "mysql_async", + "num_cpus", + "opensrv-mysql", + "query", + "rand 0.8.5", + "serde", + "serde_json", + "snafu", + "test-util", + "tokio", + "tokio-stream", + "tonic 0.8.0", + "tower", + "tower-http", +] + [[package]] name = "sha-1" version = "0.10.0" @@ -3875,9 +3866,9 @@ checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" [[package]] name = "sketches-ddsketch" -version = "0.1.2" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76a77a8fd93886010f05e7ea0720e569d6d16c65329dbe3ec033bbbccccb017b" +checksum = "ceb945e54128e09c43d8e4f1277851bd5044c6fc540bbaa2ad888f60b3da9ae7" [[package]] name = "slab" diff --git a/Cargo.toml b/Cargo.toml index 743bfde0f8..7dd14b8809 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ members = [ "src/common/query", "src/common/recordbatch", "src/common/runtime", - "src/common/servers", "src/common/telemetry", "src/common/time", "src/cmd", @@ -19,6 +18,7 @@ members = [ "src/logical-plans", "src/object-store", "src/query", + "src/servers", "src/sql", "src/storage", "src/store-api", diff --git a/config/datanode.example.toml b/config/datanode.example.toml index e1c435352f..0a29d6ce40 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -2,6 +2,9 @@ http_addr = '0.0.0.0:3000' rpc_addr = '0.0.0.0:3001' wal_dir = '/tmp/wal' +mysql_addr = '0.0.0.0:3306' +mysql_runtime_size = 4 + [storage] type = 'File' data_dir = '/tmp/greptimedb/data/' diff --git a/src/client/src/database.rs b/src/client/src/database.rs index ec696c1e2c..e75caec0ef 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -65,7 +65,7 @@ impl Database { let header = obj_result.header.context(MissingHeaderSnafu)?; - if StatusCode::is_success(header.code) { + if !StatusCode::is_success(header.code) { return DataNodeSnafu { code: header.code, msg: header.err_msg, diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index df2aca5339..b054dddc44 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -4,6 +4,6 @@ mod error; pub use self::{ client::Client, - database::Database, + database::{Database, ObjectResult}, error::{Error, Result}, }; diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 03c6a72e78..40d66e2055 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -37,6 +37,8 @@ struct StartCommand { http_addr: Option, #[clap(long)] rpc_addr: Option, + #[clap(long)] + mysql_addr: Option, #[clap(short, long)] config_file: Option, } @@ -70,11 +72,44 @@ impl TryFrom for DatanodeOptions { if let Some(addr) = cmd.http_addr { opts.http_addr = addr; } - if let Some(addr) = cmd.rpc_addr { opts.rpc_addr = addr; } + if let Some(addr) = cmd.mysql_addr { + opts.mysql_addr = addr; + } Ok(opts) } } + +#[cfg(test)] +mod tests { + use datanode::datanode::ObjectStoreConfig; + + use super::*; + + #[test] + fn test_read_from_config_file() { + let cmd = StartCommand { + http_addr: None, + rpc_addr: None, + mysql_addr: None, + config_file: Some(format!( + "{}/../../config/datanode.example.toml", + std::env::current_dir().unwrap().as_path().to_str().unwrap() + )), + }; + let options: DatanodeOptions = cmd.try_into().unwrap(); + assert_eq!("0.0.0.0:3000".to_string(), options.http_addr); + assert_eq!("0.0.0.0:3001".to_string(), options.rpc_addr); + assert_eq!("/tmp/wal".to_string(), options.wal_dir); + assert_eq!("0.0.0.0:3306".to_string(), options.mysql_addr); + assert_eq!(4, options.mysql_runtime_size); + match options.storage { + ObjectStoreConfig::File { data_dir } => { + assert_eq!("/tmp/greptimedb/data/".to_string(), data_dir) + } + }; + } +} diff --git a/src/common/runtime/Cargo.toml b/src/common/runtime/Cargo.toml index 8327e73b09..ef3f92d9d9 100644 --- a/src/common/runtime/Cargo.toml +++ b/src/common/runtime/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] common-error = { path = "../error" } common-telemetry = { path = "../telemetry" } -metrics = "0.18" +metrics = "0.20" once_cell = "1.12" paste = "1.0" snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/common/servers/Cargo.toml b/src/common/servers/Cargo.toml deleted file mode 100644 index c5b10fb2f5..0000000000 --- a/src/common/servers/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -name = "common-servers" -version = "0.1.0" -edition = "2021" - -[dependencies] -async-trait = "0.1" -common-error = { path = "../error" } -common-recordbatch = { path = "../recordbatch" } -common-runtime = { path = "../runtime" } -common-telemetry = { path = "../telemetry" } -datatypes = { path = "../../datatypes"} -futures = "0.3" -metrics = "0.20" -num_cpus = "1.13" -opensrv-mysql = "0.1" -query = { path = "../../query" } -snafu = { version = "0.7", features = ["backtraces"] } -tokio = { version = "1.20", features = ["full"] } -tokio-stream = { version = "0.1", features = ["net"] } - -[dev-dependencies] -common-base = { path = "../base" } -catalog = { path = "../../catalog" } -mysql_async = "0.30" -rand = "0.8" -test-util = { path = "../../../test-util" } diff --git a/src/common/servers/src/error.rs b/src/common/servers/src/error.rs deleted file mode 100644 index a1ccdd1cdb..0000000000 --- a/src/common/servers/src/error.rs +++ /dev/null @@ -1,28 +0,0 @@ -use std::any::Any; - -use common_error::prelude::*; - -#[derive(Debug, Snafu)] -#[snafu(visibility(pub))] -pub enum Error { - #[snafu(display("MySQL server error, source: {}", source))] - MysqlServer { source: crate::mysql::error::Error }, -} - -pub type Result = std::result::Result; - -impl ErrorExt for Error { - fn status_code(&self) -> StatusCode { - match self { - Error::MysqlServer { .. } => StatusCode::Internal, - } - } - - fn backtrace_opt(&self) -> Option<&Backtrace> { - ErrorCompat::backtrace(self) - } - - fn as_any(&self) -> &dyn Any { - self - } -} diff --git a/src/common/servers/src/lib.rs b/src/common/servers/src/lib.rs deleted file mode 100644 index 1274427dbd..0000000000 --- a/src/common/servers/src/lib.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod error; -pub mod mysql; -pub mod server; diff --git a/src/common/servers/src/mysql/error.rs b/src/common/servers/src/mysql/error.rs deleted file mode 100644 index 9f851be9f3..0000000000 --- a/src/common/servers/src/mysql/error.rs +++ /dev/null @@ -1,60 +0,0 @@ -use std::any::Any; -use std::io; - -use common_error::prelude::*; - -#[derive(Debug, Snafu)] -#[snafu(visibility(pub))] -pub enum Error { - #[snafu(display("Internal error: {}", err_msg))] - Internal { err_msg: String }, - - #[snafu(display("Internal IO error, source: {}", source))] - InternalIo { source: io::Error }, - - #[snafu(display("Tokio IO error: {}, source: {}", err_msg, source))] - TokioIo { err_msg: String, source: io::Error }, - - #[snafu(display("Runtime resource error, source: {}", source))] - RuntimeResource { - source: common_runtime::error::Error, - }, - - #[snafu(display("Failed to convert vector, source: {}", source))] - VectorConversion { source: datatypes::error::Error }, - - #[snafu(display("Failed to collect recordbatch, source: {}", source))] - CollectRecordbatch { - source: common_recordbatch::error::Error, - }, -} - -pub type Result = std::result::Result; - -impl ErrorExt for Error { - fn status_code(&self) -> StatusCode { - match self { - Error::Internal { .. } | Error::InternalIo { .. } | Error::TokioIo { .. } => { - StatusCode::Unexpected - } - Error::VectorConversion { .. } | Error::CollectRecordbatch { .. } => { - StatusCode::Internal - } - Error::RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted, - } - } - - fn backtrace_opt(&self) -> Option<&Backtrace> { - ErrorCompat::backtrace(self) - } - - fn as_any(&self) -> &dyn Any { - self - } -} - -impl From for Error { - fn from(e: io::Error) -> Self { - Error::InternalIo { source: e } - } -} diff --git a/src/common/servers/src/mysql/mod.rs b/src/common/servers/src/mysql/mod.rs deleted file mode 100644 index 47bc2a60a2..0000000000 --- a/src/common/servers/src/mysql/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod error; -pub mod mysql_instance; -pub mod mysql_server; -pub mod mysql_writer; diff --git a/src/common/servers/tests/mod.rs b/src/common/servers/tests/mod.rs deleted file mode 100644 index 02b5c273ef..0000000000 --- a/src/common/servers/tests/mod.rs +++ /dev/null @@ -1 +0,0 @@ -mod mysql; diff --git a/src/common/telemetry/Cargo.toml b/src/common/telemetry/Cargo.toml index 1829b462ea..c4a41d2d52 100644 --- a/src/common/telemetry/Cargo.toml +++ b/src/common/telemetry/Cargo.toml @@ -13,8 +13,8 @@ deadlock_detection=["parking_lot"] backtrace = "0.3" common-error = { path = "../error" } console-subscriber = { version = "0.1", optional = true } -metrics = "0.18" -metrics-exporter-prometheus = { version = "0.9", default-features = false } +metrics = "0.20" +metrics-exporter-prometheus = { version = "0.11", default-features = false } once_cell = "1.10" opentelemetry = { version = "0.17", default-features = false, features = ["trace", "rt-tokio"] } opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] } diff --git a/src/common/telemetry/src/metric.rs b/src/common/telemetry/src/metric.rs index fb572f4a45..ad35c85473 100644 --- a/src/common/telemetry/src/metric.rs +++ b/src/common/telemetry/src/metric.rs @@ -21,7 +21,16 @@ fn init_prometheus_recorder() { let recorder = PrometheusBuilder::new().build_recorder(); let mut h = PROMETHEUS_HANDLE.as_ref().write().unwrap(); *h = Some(recorder.handle()); - metrics::clear_recorder(); + // TODO(LFC): separate metrics for testing and metrics for production + // `clear_recorder` is likely not expected to be called in production code, recorder should be + // globally unique and used throughout the whole lifetime of an application. + // It's marked as "unsafe" since [this PR](https://github.com/metrics-rs/metrics/pull/302), and + // "metrics" version also upgraded to 0.19. + // A quick look in the metrics codes suggests that the "unsafe" call is of no harm. However, + // it required a further investigation in how to use metric properly. + unsafe { + metrics::clear_recorder(); + } match metrics::set_boxed_recorder(Box::new(recorder)) { Ok(_) => (), Err(err) => crate::warn!("Install prometheus recorder failed, cause: {}", err), diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 8bfdd02833..2af95caaa5 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -19,15 +19,17 @@ catalog = { path = "../catalog" } common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-recordbatch = { path = "../common/recordbatch" } +common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } datatypes = { path = "../datatypes"} hyper = { version = "0.14", features = ["full"] } log-store = { path = "../log-store" } -metrics = "0.18" +metrics = "0.20" object-store = { path = "../object-store" } query = { path = "../query" } serde = "1.0" serde_json = "1.0" +servers = { path = "../servers" } snafu = { version = "0.7", features = ["backtraces"] } sql = { path = "../sql" } storage = { path = "../storage" } @@ -42,6 +44,7 @@ tower-http = { version ="0.3", features = ["full"]} [dev-dependencies] axum-test-helper = "0.1" +client = { path = "../client" } common-query = { path = "../common/query" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} tempdir = "0.3" diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index c7ba2fd74a..78f817a5e3 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -24,6 +24,8 @@ impl Default for ObjectStoreConfig { pub struct DatanodeOptions { pub http_addr: String, pub rpc_addr: String, + pub mysql_addr: String, + pub mysql_runtime_size: u32, pub wal_dir: String, pub storage: ObjectStoreConfig, } @@ -33,6 +35,8 @@ impl Default for DatanodeOptions { Self { http_addr: "0.0.0.0:3000".to_string(), rpc_addr: "0.0.0.0:3001".to_string(), + mysql_addr: "0.0.0.0:3306".to_string(), + mysql_runtime_size: 2, wal_dir: "/tmp/wal".to_string(), storage: ObjectStoreConfig::default(), } @@ -49,15 +53,15 @@ pub struct Datanode { impl Datanode { pub async fn new(opts: DatanodeOptions) -> Result { let instance = Arc::new(Instance::new(&opts).await?); - + let services = Services::try_new(instance.clone(), &opts)?; Ok(Self { opts, - services: Services::new(instance.clone()), + services, instance, }) } - pub async fn start(&self) -> Result<()> { + pub async fn start(&mut self) -> Result<()> { self.instance.start().await?; self.services.start(&self.opts).await } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 6601208481..003b54f403 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -80,10 +80,11 @@ pub enum Error { #[snafu(display("Fail to convert bytes to insert batch, {}", source))] DecodeInsert { source: DecodeError }, - // The error source of http error is clear even without backtrace now so - // a backtrace is not carried in this varaint. - #[snafu(display("Fail to start HTTP server, source: {}", source))] - StartHttp { source: hyper::Error }, + #[snafu(display("Failed to start server, source: {}", source))] + StartServer { + #[snafu(backtrace)] + source: servers::error::Error, + }, #[snafu(display("Fail to parse address {}, source: {}", addr, source))] ParseAddr { @@ -122,6 +123,12 @@ pub enum Error { #[snafu(display("Unsupported expr type: {}", name))] UnsupportedExpr { name: String }, + #[snafu(display("Runtime resource error, source: {}", source))] + RuntimeResource { + #[snafu(backtrace)] + source: common_runtime::error::Error, + }, + #[snafu(display("Invalid CREATE TABLE sql statement, cause: {}", msg))] InvalidCreateTableSql { msg: String, backtrace: Backtrace }, @@ -179,7 +186,7 @@ impl ErrorExt for Error { | Error::KeyColumnNotFound { .. } | Error::ConstraintNotSupported { .. } => StatusCode::InvalidArguments, // TODO(yingwen): Further categorize http error. - Error::StartHttp { .. } + Error::StartServer { .. } | Error::ParseAddr { .. } | Error::TcpBind { .. } | Error::StartGrpc { .. } @@ -190,6 +197,7 @@ impl ErrorExt for Error { Error::InitBackend { .. } => StatusCode::StorageUnavailable, Error::OpenLogStore { source } => source.status_code(), Error::OpenStorageEngine { source } => source.status_code(), + Error::RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted, } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index f4edcd3e51..dc0b20bec6 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -1,16 +1,18 @@ use std::{fs, path, sync::Arc}; -use api::v1::InsertExpr; +use api::v1::{object_expr, select_expr, InsertExpr, ObjectExpr, ObjectResult, SelectExpr}; +use async_trait::async_trait; use catalog::{CatalogManagerRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_error::status_code::StatusCode; use common_telemetry::logging::info; +use common_telemetry::timer; use log_store::fs::{config::LogConfig, log::LocalFileLogStore}; use object_store::{backend::fs::Backend, util, ObjectStore}; use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef}; -use snafu::{OptionExt, ResultExt}; +use servers::query_handler::{GrpcQueryHandler, SqlQueryHandler}; +use snafu::prelude::*; use sql::statements::statement::Statement; use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl}; -#[cfg(test)] -use table::engine::TableEngineRef; use table_engine::config::EngineConfig as TableEngineConfig; use table_engine::engine::MitoEngine; @@ -18,7 +20,10 @@ use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; use crate::error::{ self, ExecuteSqlSnafu, InsertSnafu, NewCatalogSnafu, Result, TableNotFoundSnafu, }; +use crate::metric; +use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder}; use crate::server::grpc::insert::insertion_expr_to_request; +use crate::server::grpc::select::to_object_result; use crate::sql::{SqlHandler, SqlRequest}; type DefaultEngine = MitoEngine>; @@ -146,61 +151,36 @@ impl Instance { Ok(()) } - #[cfg(test)] - pub fn table_engine(&self) -> TableEngineRef { - self.sql_handler.table_engine() + async fn handle_insert(&self, insert_expr: InsertExpr) -> ObjectResult { + match self.execute_grpc_insert(insert_expr).await { + Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new() + .status_code(StatusCode::Success as u32) + .mutate_result(rows as u32, 0) + .build(), + Err(err) => { + // TODO(fys): failure count + build_err_result(&err) + } + _ => unreachable!(), + } } - #[cfg(test)] - pub async fn create_test_table(&self) -> Result<()> { - use datatypes::data_type::ConcreteDataType; - use datatypes::schema::{ColumnSchema, Schema}; - use table::engine::EngineContext; - use table::requests::CreateTableRequest; + async fn handle_select(&self, select_expr: SelectExpr) -> ObjectResult { + match select_expr.expr { + Some(select_expr::Expr::Sql(sql)) => { + let result = self.execute_sql(&sql).await; + to_object_result(result).await + } + None => ObjectResult::default(), + } + } - use crate::error::CreateTableSnafu; + pub fn sql_handler(&self) -> &SqlHandler { + &self.sql_handler + } - let column_schemas = vec![ - ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), - ]; - - let table_name = "demo"; - let table = self - .table_engine() - .create_table( - &EngineContext::default(), - CreateTableRequest { - id: 1, - catalog_name: None, - schema_name: None, - table_name: table_name.to_string(), - desc: Some(" a test table".to_string()), - schema: Arc::new( - Schema::with_timestamp_index(column_schemas, 3) - .expect("ts is expected to be timestamp column"), - ), - create_if_not_exists: true, - primary_key_indices: Vec::default(), - }, - ) - .await - .context(CreateTableSnafu { table_name })?; - - let schema_provider = self - .catalog_manager - .catalog(DEFAULT_CATALOG_NAME) - .unwrap() - .schema(DEFAULT_SCHEMA_NAME) - .unwrap(); - - schema_provider - .register_table(table_name.to_string(), table) - .unwrap(); - - Ok(()) + pub fn catalog_manager(&self) -> &CatalogManagerRef { + &self.catalog_manager } } @@ -243,84 +223,40 @@ async fn create_local_file_log_store(opts: &DatanodeOptions) -> Result servers::error::Result { + let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED); + self.execute_sql(query) .await - .unwrap(); - - assert!(matches!(output, Output::AffectedRows(2))); - } - - #[tokio::test] - async fn test_execute_query() { - let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); - let instance = Instance::new(&opts).await.unwrap(); - instance.start().await.unwrap(); - - let output = instance - .execute_sql("select sum(number) from numbers limit 20") - .await - .unwrap(); - - match output { - Output::RecordBatch(recordbatch) => { - let numbers = util::collect(recordbatch).await.unwrap(); - let columns = numbers[0].df_recordbatch.columns(); - assert_eq!(1, columns.len()); - assert_eq!(columns[0].len(), 1); - - assert_eq!( - *columns[0].as_any().downcast_ref::().unwrap(), - UInt64Array::from_slice(&[4950]) - ); - } - _ => unreachable!(), - } - } - - #[tokio::test] - pub async fn test_execute_create() { - common_telemetry::init_default_ut_logging(); - let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); - let instance = Instance::new(&opts).await.unwrap(); - instance.start().await.unwrap(); - instance.create_test_table().await.unwrap(); - - let output = instance - .execute_sql( - r#"create table test_table( - host string, - ts bigint, - cpu double default 0, - memory double, - TIME INDEX (ts), - PRIMARY KEY(ts, host) - ) engine=mito with(regions=1);"#, - ) - .await - .unwrap(); - - assert!(matches!(output, Output::AffectedRows(1))); + // TODO(LFC): use snafu's `context` to include source error and backtrace. + // Ideally we should define a snafu in servers::error to wrap the error thrown + // by `execute_sql`. However, we cannot do that because that would introduce a circular + // dependency. + .map_err(|e| { + servers::error::ExecuteQuerySnafu { + query, + err_msg: format!("{}", e), + } + .fail::() + .unwrap_err() + }) + } +} + +#[async_trait] +impl GrpcQueryHandler for Instance { + async fn do_query(&self, query: ObjectExpr) -> servers::error::Result { + let object_resp = match query.expr { + Some(object_expr::Expr::Insert(insert_expr)) => self.handle_insert(insert_expr).await, + Some(object_expr::Expr::Select(select_expr)) => self.handle_select(select_expr).await, + other => { + return servers::error::NotSupportedSnafu { + feat: format!("{:?}", other), + } + .fail(); + } + }; + Ok(object_resp) } } diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 2c3478b138..398ee2747b 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -6,8 +6,3 @@ pub mod instance; mod metric; pub mod server; mod sql; - -#[cfg(test)] -pub mod test_util; -#[cfg(test)] -mod tests; diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index feb8e9b235..7327c29cd1 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -1,33 +1,66 @@ pub mod grpc; -pub mod http; -use grpc::GrpcServer; -use http::HttpServer; +use std::net::SocketAddr; +use std::sync::Arc; + +use common_runtime::Builder as RuntimeBuilder; +use servers::grpc::GrpcServer; +use servers::http::HttpServer; +use servers::mysql::server::MysqlServer; +use servers::server::Server; +use snafu::ResultExt; use tokio::try_join; use crate::datanode::DatanodeOptions; -use crate::error::Result; +use crate::error::{self, Result}; use crate::instance::InstanceRef; /// All rpc services. pub struct Services { http_server: HttpServer, grpc_server: GrpcServer, + mysql_server: Box, } impl Services { - pub fn new(instance: InstanceRef) -> Self { - Self { + pub fn try_new(instance: InstanceRef, opts: &DatanodeOptions) -> Result { + let mysql_io_runtime = Arc::new( + RuntimeBuilder::default() + .worker_threads(opts.mysql_runtime_size as usize) + .thread_name("mysql-io-handlers") + .build() + .context(error::RuntimeResourceSnafu)?, + ); + Ok(Self { http_server: HttpServer::new(instance.clone()), - grpc_server: GrpcServer::new(instance), - } + grpc_server: GrpcServer::new(instance.clone()), + mysql_server: MysqlServer::create_server(instance, mysql_io_runtime), + }) } - pub async fn start(&self, opts: &DatanodeOptions) -> Result<()> { + // TODO(LFC): make servers started on demand (not starting mysql if no needed, for example) + pub async fn start(&mut self, opts: &DatanodeOptions) -> Result<()> { + let http_addr = &opts.http_addr; + let http_addr: SocketAddr = http_addr + .parse() + .context(error::ParseAddrSnafu { addr: http_addr })?; + + let grpc_addr = &opts.rpc_addr; + let grpc_addr: SocketAddr = grpc_addr + .parse() + .context(error::ParseAddrSnafu { addr: grpc_addr })?; + + let mysql_addr = &opts.mysql_addr; + let mysql_addr: SocketAddr = mysql_addr + .parse() + .context(error::ParseAddrSnafu { addr: mysql_addr })?; + try_join!( - self.http_server.start(&opts.http_addr), - self.grpc_server.start(&opts.rpc_addr) - )?; + self.http_server.start(http_addr), + self.grpc_server.start(grpc_addr), + self.mysql_server.start(mysql_addr), + ) + .context(error::StartServerSnafu)?; Ok(()) } } diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 63401af1a1..285516957a 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -1,43 +1,3 @@ -mod handler; +pub(crate) mod handler; pub mod insert; -mod select; -mod server; - -use common_telemetry::logging::info; -use snafu::ResultExt; -use tokio::net::TcpListener; -use tokio_stream::wrappers::TcpListenerStream; - -use crate::{ - error::{Result, StartGrpcSnafu, TcpBindSnafu}, - instance::InstanceRef, - server::grpc::{handler::BatchHandler, server::Server}, -}; - -pub struct GrpcServer { - handler: BatchHandler, -} - -impl GrpcServer { - pub fn new(instance: InstanceRef) -> Self { - Self { - handler: BatchHandler::new(instance), - } - } - - pub async fn start(&self, addr: &str) -> Result<()> { - let listener = TcpListener::bind(addr) - .await - .context(TcpBindSnafu { addr })?; - let addr = listener.local_addr().context(TcpBindSnafu { addr })?; - info!("The gRPC server is running at {}", addr); - - let svc = Server::new(self.handler.clone()).into_service(); - let _ = tonic::transport::Server::builder() - .add_service(svc) - .serve_with_incoming(TcpListenerStream::new(listener)) - .await - .context(StartGrpcSnafu)?; - Ok(()) - } -} +pub mod select; diff --git a/src/datanode/src/server/grpc/handler.rs b/src/datanode/src/server/grpc/handler.rs index 7e35fb7392..b4229981a5 100644 --- a/src/datanode/src/server/grpc/handler.rs +++ b/src/datanode/src/server/grpc/handler.rs @@ -1,83 +1,10 @@ use api::v1::{ - codec::SelectResult, object_expr, object_result, select_expr, BatchRequest, BatchResponse, - DatabaseResponse, InsertExpr, MutateResult, ObjectResult, ResultHeader, SelectExpr, + codec::SelectResult, object_result, MutateResult, ObjectResult, ResultHeader, SelectResult as SelectResultRaw, }; use common_error::prelude::ErrorExt; -use common_error::status_code::StatusCode; -use query::Output; -use crate::server::grpc::{select::to_object_result, server::PROTOCOL_VERSION}; -use crate::{error::Result, error::UnsupportedExprSnafu, instance::InstanceRef}; - -#[derive(Clone)] -pub struct BatchHandler { - instance: InstanceRef, -} - -impl BatchHandler { - pub fn new(instance: InstanceRef) -> Self { - Self { instance } - } - - pub async fn batch(&self, batch_req: BatchRequest) -> Result { - let mut batch_resp = BatchResponse::default(); - let mut db_resp = DatabaseResponse::default(); - let databases = batch_req.databases; - - for req in databases { - let exprs = req.exprs; - - for obj_expr in exprs { - let object_resp = match obj_expr.expr { - Some(object_expr::Expr::Insert(insert_expr)) => { - self.handle_insert(insert_expr).await - } - Some(object_expr::Expr::Select(select_expr)) => { - self.handle_select(select_expr).await - } - other => { - return UnsupportedExprSnafu { - name: format!("{:?}", other), - } - .fail(); - } - }; - - db_resp.results.push(object_resp); - } - } - batch_resp.databases.push(db_resp); - Ok(batch_resp) - } - - pub async fn handle_insert(&self, insert_expr: InsertExpr) -> ObjectResult { - match self.instance.execute_grpc_insert(insert_expr).await { - Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new() - .status_code(StatusCode::Success as u32) - .mutate_result(rows as u32, 0) - .build(), - Err(err) => { - // TODO(fys): failure count - build_err_result(&err) - } - _ => unreachable!(), - } - } - - pub async fn handle_select(&self, select_expr: SelectExpr) -> ObjectResult { - let expr = match select_expr.expr { - Some(expr) => expr, - None => return ObjectResult::default(), - }; - match expr { - select_expr::Expr::Sql(sql) => { - let result = self.instance.execute_sql(&sql).await; - to_object_result(result).await - } - } - } -} +pub const PROTOCOL_VERSION: u32 = 1; pub type Success = u32; pub type Failure = u32; @@ -165,9 +92,8 @@ mod tests { use api::v1::{object_result, MutateResult}; use common_error::status_code::StatusCode; - use super::{build_err_result, ObjectResultBuilder}; - use crate::server::grpc::handler::UnsupportedExprSnafu; - use crate::server::grpc::server::PROTOCOL_VERSION; + use super::*; + use crate::error::UnsupportedExprSnafu; #[test] fn test_object_result_builder() { diff --git a/src/datanode/src/server/grpc/select.rs b/src/datanode/src/server/grpc/select.rs index 8d490c5963..4ff13036fb 100644 --- a/src/datanode/src/server/grpc/select.rs +++ b/src/datanode/src/server/grpc/select.rs @@ -13,7 +13,7 @@ use snafu::OptionExt; use crate::error::{ConversionSnafu, Result}; use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder}; -pub(crate) async fn to_object_result(result: Result) -> ObjectResult { +pub async fn to_object_result(result: Result) -> ObjectResult { match result { Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new() .status_code(StatusCode::Success as u32) diff --git a/src/datanode/src/server/grpc/server.rs b/src/datanode/src/server/grpc/server.rs deleted file mode 100644 index 433b406505..0000000000 --- a/src/datanode/src/server/grpc/server.rs +++ /dev/null @@ -1,30 +0,0 @@ -use api::v1::*; -use tonic::{Request, Response, Status}; - -use super::handler::BatchHandler; - -pub const PROTOCOL_VERSION: u32 = 1; - -#[derive(Clone)] -pub struct Server { - handler: BatchHandler, -} - -impl Server { - pub fn new(handler: BatchHandler) -> Self { - Self { handler } - } - - pub fn into_service(self) -> greptime_server::GreptimeServer { - greptime_server::GreptimeServer::new(self) - } -} - -#[tonic::async_trait] -impl greptime_server::Greptime for Server { - async fn batch(&self, req: Request) -> Result, Status> { - let req = req.into_inner(); - let res = self.handler.batch(req).await?; - Ok(Response::new(res)) - } -} diff --git a/src/datanode/src/server/http/handler.rs b/src/datanode/src/server/http/handler.rs deleted file mode 100644 index 34380442b6..0000000000 --- a/src/datanode/src/server/http/handler.rs +++ /dev/null @@ -1,123 +0,0 @@ -// http handlers - -use std::collections::HashMap; - -use axum::extract::{Extension, Query}; -use common_telemetry::{metric, timer}; - -use crate::instance::InstanceRef; -use crate::metric::METRIC_HANDLE_SQL_ELAPSED; -use crate::server::http::{HttpResponse, JsonResponse}; - -/// Handler to execute sql -#[axum_macros::debug_handler] -pub async fn sql( - Extension(instance): Extension, - Query(params): Query>, -) -> HttpResponse { - let _timer = timer!(METRIC_HANDLE_SQL_ELAPSED); - if let Some(sql) = params.get("sql") { - HttpResponse::Json(JsonResponse::from_output(instance.execute_sql(sql).await).await) - } else { - HttpResponse::Json(JsonResponse::with_error(Some( - "sql parameter is required.".to_string(), - ))) - } -} - -/// Handler to export metrics -#[axum_macros::debug_handler] -pub async fn metrics( - Extension(_instance): Extension, - Query(_params): Query>, -) -> HttpResponse { - if let Some(handle) = metric::try_handle() { - HttpResponse::Text(handle.render()) - } else { - HttpResponse::Text("Prometheus handle not initialized.".to_string()) - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use metrics::counter; - - use super::*; - use crate::instance::Instance; - use crate::server::http::JsonOutput; - use crate::test_util::{self, TestGuard}; - - fn create_params() -> Query> { - let mut map = HashMap::new(); - map.insert( - "sql".to_string(), - "select sum(number) from numbers limit 20".to_string(), - ); - Query(map) - } - - async fn create_extension() -> (Extension, TestGuard) { - let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(); - let instance = Arc::new(Instance::new(&opts).await.unwrap()); - instance.start().await.unwrap(); - (Extension(instance), guard) - } - - #[tokio::test] - async fn test_sql_not_provided() { - let (extension, _guard) = create_extension().await; - - let json = sql(extension, Query(HashMap::default())).await; - match json { - HttpResponse::Json(json) => { - assert!(!json.success); - assert_eq!(Some("sql parameter is required.".to_string()), json.error); - assert!(json.output.is_none()); - } - _ => unreachable!(), - } - } - - #[tokio::test] - async fn test_sql_output_rows() { - common_telemetry::init_default_ut_logging(); - let query = create_params(); - let (extension, _guard) = create_extension().await; - - let json = sql(extension, query).await; - - match json { - HttpResponse::Json(json) => { - assert!(json.success, "{:?}", json); - assert!(json.error.is_none()); - assert!(json.output.is_some()); - - match json.output.unwrap() { - JsonOutput::Rows(rows) => { - assert_eq!(1, rows.len()); - } - _ => unreachable!(), - } - } - _ => unreachable!(), - } - } - - #[tokio::test] - async fn test_metrics() { - metric::init_default_metrics_recorder(); - - counter!("test_metrics", 1); - - let query = create_params(); - let (extension, _guard) = create_extension().await; - let text = metrics(extension, query).await; - - match text { - HttpResponse::Text(s) => assert!(s.contains("test_metrics counter")), - _ => unreachable!(), - } - } -} diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index f4e2ddeb18..e42252c4b8 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -50,7 +50,6 @@ impl SqlHandler { .context(TableNotFoundSnafu { table_name }) } - #[cfg(test)] pub fn table_engine(&self) -> Arc { self.table_engine.clone() } diff --git a/src/datanode/src/test_util.rs b/src/datanode/src/test_util.rs deleted file mode 100644 index 30ed650a9a..0000000000 --- a/src/datanode/src/test_util.rs +++ /dev/null @@ -1,32 +0,0 @@ -use tempdir::TempDir; - -use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; - -/// Create a tmp dir(will be deleted once it goes out of scope.) and a default `DatanodeOptions`, -/// Only for test. -/// -/// TODO: Add a test feature -pub struct TestGuard { - _wal_tmp_dir: TempDir, - _data_tmp_dir: TempDir, -} - -pub fn create_tmp_dir_and_datanode_opts() -> (DatanodeOptions, TestGuard) { - let wal_tmp_dir = TempDir::new("/tmp/greptimedb_test_wal").unwrap(); - let data_tmp_dir = TempDir::new("/tmp/greptimedb_test_data").unwrap(); - let opts = DatanodeOptions { - wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), - storage: ObjectStoreConfig::File { - data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), - }, - ..Default::default() - }; - - ( - opts, - TestGuard { - _wal_tmp_dir: wal_tmp_dir, - _data_tmp_dir: data_tmp_dir, - }, - ) -} diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs deleted file mode 100644 index 150709f786..0000000000 --- a/src/datanode/src/tests.rs +++ /dev/null @@ -1 +0,0 @@ -mod http_test; diff --git a/src/datanode/tests/grpc_test.rs b/src/datanode/tests/grpc_test.rs new file mode 100644 index 0000000000..8cf37e8fbb --- /dev/null +++ b/src/datanode/tests/grpc_test.rs @@ -0,0 +1,113 @@ +mod test_util; + +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +use api::v1::{codec::InsertBatch, column, select_expr, Column, SelectExpr}; +use client::{Client, Database, ObjectResult}; +use datanode::instance::Instance; +use servers::grpc::GrpcServer; +use servers::server::Server; + +#[tokio::test] +async fn test_insert_and_select() { + common_telemetry::init_default_ut_logging(); + + let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); + let instance = Arc::new(Instance::new(&opts).await.unwrap()); + instance.start().await.unwrap(); + + test_util::create_test_table(&instance).await.unwrap(); + + tokio::spawn(async move { + let mut grpc_server = GrpcServer::new(instance); + let addr = "127.0.0.1:3001".parse::().unwrap(); + grpc_server.start(addr).await.unwrap() + }); + + // wait for GRPC server to start + tokio::time::sleep(Duration::from_secs(1)).await; + + let grpc_client = Client::connect("http://127.0.0.1:3001").await.unwrap(); + let db = Database::new("greptime", grpc_client); + + // testing data: + let expected_host_col = Column { + column_name: "host".to_string(), + values: Some(column::Values { + string_values: vec!["host1", "host2", "host3", "host4"] + .into_iter() + .map(|s| s.to_string()) + .collect(), + ..Default::default() + }), + ..Default::default() + }; + let expected_cpu_col = Column { + column_name: "cpu".to_string(), + values: Some(column::Values { + f64_values: vec![0.31, 0.41, 0.2], + ..Default::default() + }), + null_mask: vec![2], + ..Default::default() + }; + let expected_mem_col = Column { + column_name: "memory".to_string(), + values: Some(column::Values { + f64_values: vec![0.1, 0.2, 0.3], + ..Default::default() + }), + null_mask: vec![4], + ..Default::default() + }; + let expected_ts_col = Column { + column_name: "ts".to_string(), + values: Some(column::Values { + i64_values: vec![100, 101, 102, 103], + ..Default::default() + }), + ..Default::default() + }; + + // insert + let values = vec![InsertBatch { + columns: vec![ + expected_host_col.clone(), + expected_cpu_col.clone(), + expected_mem_col.clone(), + expected_ts_col.clone(), + ], + row_count: 4, + } + .into()]; + let result = db.insert("demo", values).await; + assert!(result.is_ok()); + + // select + let select_expr = SelectExpr { + expr: Some(select_expr::Expr::Sql("select * from demo".to_string())), + }; + let result = db.select(select_expr).await.unwrap(); + assert!(matches!(result, ObjectResult::Select(_))); + match result { + ObjectResult::Select(select_result) => { + assert_eq!(4, select_result.row_count); + let actual_columns = select_result.columns; + assert_eq!(4, actual_columns.len()); + + let expected_columns = vec![ + expected_ts_col, + expected_host_col, + expected_cpu_col, + expected_mem_col, + ]; + expected_columns + .iter() + .zip(actual_columns.iter()) + .for_each(|(x, y)| assert_eq!(x, y)); + } + _ => unreachable!(), + } +} diff --git a/src/datanode/src/tests/http_test.rs b/src/datanode/tests/http_test.rs similarity index 93% rename from src/datanode/src/tests/http_test.rs rename to src/datanode/tests/http_test.rs index 8db7cc11af..7a44b687f9 100644 --- a/src/datanode/src/tests/http_test.rs +++ b/src/datanode/tests/http_test.rs @@ -1,14 +1,13 @@ -//! http server test +mod test_util; use std::sync::Arc; use axum::http::StatusCode; use axum::Router; use axum_test_helper::TestClient; - -use crate::instance::Instance; -use crate::server::http::HttpServer; -use crate::test_util::{self, TestGuard}; +use datanode::instance::Instance; +use servers::http::HttpServer; +use test_util::TestGuard; async fn make_test_app() -> (Router, TestGuard) { let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(); diff --git a/src/datanode/tests/instance_test.rs b/src/datanode/tests/instance_test.rs new file mode 100644 index 0000000000..a259e7e791 --- /dev/null +++ b/src/datanode/tests/instance_test.rs @@ -0,0 +1,80 @@ +mod test_util; + +use arrow::array::UInt64Array; +use common_recordbatch::util; +use datanode::instance::Instance; +use query::Output; + +#[tokio::test] +async fn test_execute_insert() { + common_telemetry::init_default_ut_logging(); + + let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); + let instance = Instance::new(&opts).await.unwrap(); + instance.start().await.unwrap(); + + test_util::create_test_table(&instance).await.unwrap(); + + let output = instance + .execute_sql( + r#"insert into demo(host, cpu, memory, ts) values + ('host1', 66.6, 1024, 1655276557000), + ('host2', 88.8, 333.3, 1655276558000) + "#, + ) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(2))); +} + +#[tokio::test] +async fn test_execute_query() { + let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); + let instance = Instance::new(&opts).await.unwrap(); + instance.start().await.unwrap(); + + let output = instance + .execute_sql("select sum(number) from numbers limit 20") + .await + .unwrap(); + match output { + Output::RecordBatch(recordbatch) => { + let numbers = util::collect(recordbatch).await.unwrap(); + let columns = numbers[0].df_recordbatch.columns(); + assert_eq!(1, columns.len()); + assert_eq!(columns[0].len(), 1); + + assert_eq!( + *columns[0].as_any().downcast_ref::().unwrap(), + UInt64Array::from_slice(&[4950]) + ); + } + _ => unreachable!(), + } +} + +#[tokio::test] +pub async fn test_execute_create() { + common_telemetry::init_default_ut_logging(); + + let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); + let instance = Instance::new(&opts).await.unwrap(); + instance.start().await.unwrap(); + + test_util::create_test_table(&instance).await.unwrap(); + + let output = instance + .execute_sql( + r#"create table test_table( + host string, + ts bigint, + cpu double default 0, + memory double, + TIME INDEX (ts), + PRIMARY KEY(ts, host) + ) engine=mito with(regions=1);"#, + ) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); +} diff --git a/src/datanode/tests/test_util.rs b/src/datanode/tests/test_util.rs new file mode 100644 index 0000000000..8a86b3b40a --- /dev/null +++ b/src/datanode/tests/test_util.rs @@ -0,0 +1,85 @@ +use std::sync::Arc; + +use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; +use datanode::error::{CreateTableSnafu, Result}; +use datanode::instance::Instance; +use datatypes::data_type::ConcreteDataType; +use datatypes::schema::{ColumnSchema, Schema}; +use snafu::ResultExt; +use table::engine::EngineContext; +use table::engine::TableEngineRef; +use table::requests::CreateTableRequest; +use tempdir::TempDir; + +/// Create a tmp dir(will be deleted once it goes out of scope.) and a default `DatanodeOptions`, +/// Only for test. +pub struct TestGuard { + _wal_tmp_dir: TempDir, + _data_tmp_dir: TempDir, +} + +pub fn create_tmp_dir_and_datanode_opts() -> (DatanodeOptions, TestGuard) { + let wal_tmp_dir = TempDir::new("/tmp/greptimedb_test_wal").unwrap(); + let data_tmp_dir = TempDir::new("/tmp/greptimedb_test_data").unwrap(); + let opts = DatanodeOptions { + wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + storage: ObjectStoreConfig::File { + data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), + }, + ..Default::default() + }; + ( + opts, + TestGuard { + _wal_tmp_dir: wal_tmp_dir, + _data_tmp_dir: data_tmp_dir, + }, + ) +} + +// It's actually not dead codes, at least been used in instance_test.rs and grpc_test.rs +// However, clippy keeps warning us, so I temporary add an "allow" to bypass it. +// TODO(LFC): further investigate why clippy falsely warning "dead_code" +#[allow(dead_code)] +pub async fn create_test_table(instance: &Instance) -> Result<()> { + let column_schemas = vec![ + ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), + ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), + ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), + ]; + + let table_name = "demo"; + let table_engine: TableEngineRef = instance.sql_handler().table_engine(); + let table = table_engine + .create_table( + &EngineContext::default(), + CreateTableRequest { + id: 1, + catalog_name: None, + schema_name: None, + table_name: table_name.to_string(), + desc: Some(" a test table".to_string()), + schema: Arc::new( + Schema::with_timestamp_index(column_schemas, 3) + .expect("ts is expected to be timestamp column"), + ), + create_if_not_exists: true, + primary_key_indices: Vec::default(), + }, + ) + .await + .context(CreateTableSnafu { table_name })?; + + let schema_provider = instance + .catalog_manager() + .catalog(DEFAULT_CATALOG_NAME) + .unwrap() + .schema(DEFAULT_SCHEMA_NAME) + .unwrap(); + schema_provider + .register_table(table_name.to_string(), table) + .unwrap(); + Ok(()) +} diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 127abab0fb..ebead01f94 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -22,7 +22,7 @@ datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", br datatypes = { path = "../datatypes" } futures = "0.3" futures-util = "0.3" -metrics = "0.18" +metrics = "0.20" serde = "1.0" serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml new file mode 100644 index 0000000000..cfd5904e51 --- /dev/null +++ b/src/servers/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "servers" +version = "0.1.0" +edition = "2021" + +[dependencies] +api = { path = "../api" } +async-trait = "0.1" +axum = "0.5" +axum-macros = "0.2" +common-error = { path = "../common/error" } +common-recordbatch = { path = "../common/recordbatch" } +common-runtime = { path = "../common/runtime" } +common-telemetry = { path = "../common/telemetry" } +datatypes = { path = "../datatypes" } +futures = "0.3" +hyper = { version = "0.14", features = ["full"] } +metrics = "0.20" +num_cpus = "1.13" +opensrv-mysql = "0.1" +query = { path = "../query" } +serde = "1.0" +serde_json = "1.0" +snafu = { version = "0.7", features = ["backtraces"] } +tonic = "0.8" +tokio = { version = "1.20", features = ["full"] } +tokio-stream = { version = "0.1", features = ["net"] } +tower = { version = "0.4", features = ["full"]} +tower-http = { version ="0.3", features = ["full"]} + +[dev-dependencies] +common-base = { path = "../common/base" } +catalog = { path = "../catalog" } +mysql_async = "0.30" +rand = "0.8" +test-util = { path = "../../test-util" } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs new file mode 100644 index 0000000000..85c83225ab --- /dev/null +++ b/src/servers/src/error.rs @@ -0,0 +1,89 @@ +use std::any::Any; +use std::net::SocketAddr; + +use common_error::prelude::*; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Internal error: {}", err_msg))] + Internal { err_msg: String }, + + #[snafu(display("Internal IO error, source: {}", source))] + InternalIo { source: std::io::Error }, + + #[snafu(display("Tokio IO error: {}, source: {}", err_msg, source))] + TokioIo { + err_msg: String, + source: std::io::Error, + }, + + #[snafu(display("Failed to convert vector, source: {}", source))] + VectorConversion { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Failed to collect recordbatch, source: {}", source))] + CollectRecordbatch { + #[snafu(backtrace)] + source: common_recordbatch::error::Error, + }, + + #[snafu(display("Failed to start HTTP server, source: {}", source))] + StartHttp { source: hyper::Error }, + + #[snafu(display("Failed to start gRPC server, source: {}", source))] + StartGrpc { source: tonic::transport::Error }, + + #[snafu(display("Failed to bind address {}, source: {}", addr, source))] + TcpBind { + addr: SocketAddr, + source: std::io::Error, + }, + + #[snafu(display("Failed to execute query: {}, error: {}", query, err_msg))] + ExecuteQuery { query: String, err_msg: String }, + + #[snafu(display("Not supported: {}", feat))] + NotSupported { feat: String }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::Internal { .. } + | Error::InternalIo { .. } + | Error::TokioIo { .. } + | Error::VectorConversion { .. } + | Error::CollectRecordbatch { .. } + | Error::StartHttp { .. } + | Error::StartGrpc { .. } + | Error::TcpBind { .. } + | Error::ExecuteQuery { .. } => StatusCode::Internal, + Error::NotSupported { .. } => StatusCode::InvalidArguments, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl From for tonic::Status { + fn from(err: Error) -> Self { + tonic::Status::new(tonic::Code::Internal, err.to_string()) + } +} + +impl From for Error { + fn from(e: std::io::Error) -> Self { + Error::InternalIo { source: e } + } +} diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs new file mode 100644 index 0000000000..b5bc22b4cd --- /dev/null +++ b/src/servers/src/grpc.rs @@ -0,0 +1,72 @@ +pub mod handler; + +use std::net::SocketAddr; + +use api::v1::{greptime_server, BatchRequest, BatchResponse}; +use async_trait::async_trait; +use common_telemetry::logging::info; +use snafu::ResultExt; +use tokio::net::TcpListener; +use tokio_stream::wrappers::TcpListenerStream; +use tonic::{Request, Response, Status}; + +use crate::error::{Result, StartGrpcSnafu, TcpBindSnafu}; +use crate::grpc::handler::BatchHandler; +use crate::query_handler::GrpcQueryHandlerRef; +use crate::server::Server; + +pub struct GrpcServer { + query_handler: GrpcQueryHandlerRef, +} + +impl GrpcServer { + pub fn new(query_handler: GrpcQueryHandlerRef) -> Self { + Self { query_handler } + } + + pub fn create_service(&self) -> greptime_server::GreptimeServer { + let service = GrpcService { + handler: BatchHandler::new(self.query_handler.clone()), + }; + greptime_server::GreptimeServer::new(service) + } +} + +pub struct GrpcService { + handler: BatchHandler, +} + +#[tonic::async_trait] +impl greptime_server::Greptime for GrpcService { + async fn batch( + &self, + req: Request, + ) -> std::result::Result, Status> { + let req = req.into_inner(); + let res = self.handler.batch(req).await?; + Ok(Response::new(res)) + } +} + +#[async_trait] +impl Server for GrpcServer { + async fn shutdown(&mut self) -> Result<()> { + // TODO(LFC): shutdown grpc server + unimplemented!() + } + + async fn start(&mut self, addr: SocketAddr) -> Result { + let listener = TcpListener::bind(addr) + .await + .context(TcpBindSnafu { addr })?; + let addr = listener.local_addr().context(TcpBindSnafu { addr })?; + info!("GRPC server is bound to {}", addr); + + tonic::transport::Server::builder() + .add_service(self.create_service()) + .serve_with_incoming(TcpListenerStream::new(listener)) + .await + .context(StartGrpcSnafu)?; + Ok(addr) + } +} diff --git a/src/servers/src/grpc/handler.rs b/src/servers/src/grpc/handler.rs new file mode 100644 index 0000000000..82d91eee34 --- /dev/null +++ b/src/servers/src/grpc/handler.rs @@ -0,0 +1,29 @@ +use api::v1::{BatchRequest, BatchResponse, DatabaseResponse}; + +use crate::error::Result; +use crate::query_handler::GrpcQueryHandlerRef; + +#[derive(Clone)] +pub struct BatchHandler { + query_handler: GrpcQueryHandlerRef, +} + +impl BatchHandler { + pub fn new(query_handler: GrpcQueryHandlerRef) -> Self { + Self { query_handler } + } + + pub async fn batch(&self, batch_req: BatchRequest) -> Result { + let mut batch_resp = BatchResponse::default(); + let mut db_resp = DatabaseResponse::default(); + + for db_req in batch_req.databases { + for obj_expr in db_req.exprs { + let object_resp = self.query_handler.do_query(obj_expr).await?; + db_resp.results.push(object_resp); + } + } + batch_resp.databases.push(db_resp); + Ok(batch_resp) + } +} diff --git a/src/datanode/src/server/http.rs b/src/servers/src/http.rs similarity index 73% rename from src/datanode/src/server/http.rs rename to src/servers/src/http.rs index f662ce3535..d9f5101cc6 100644 --- a/src/datanode/src/server/http.rs +++ b/src/servers/src/http.rs @@ -1,8 +1,9 @@ -mod handler; +pub mod handler; use std::net::SocketAddr; use std::time::Duration; +use async_trait::async_trait; use axum::{ error_handling::HandleErrorLayer, response::IntoResponse, @@ -18,12 +19,12 @@ use snafu::ResultExt; use tower::{timeout::TimeoutLayer, ServiceBuilder}; use tower_http::trace::TraceLayer; -use crate::error::{ParseAddrSnafu, Result, StartHttpSnafu}; -use crate::server::InstanceRef; +use crate::error::{Result, StartHttpSnafu}; +use crate::query_handler::SqlQueryHandlerRef; +use crate::server::Server; -/// Http server pub struct HttpServer { - instance: InstanceRef, + query_handler: SqlQueryHandlerRef, } #[derive(Serialize, Debug)] @@ -32,14 +33,12 @@ pub enum JsonOutput { Rows(Vec), } -/// Http response #[derive(Serialize, Debug)] pub enum HttpResponse { Json(JsonResponse), Text(String), } -/// Json response #[derive(Serialize, Debug)] pub struct JsonResponse { success: bool, @@ -66,6 +65,7 @@ impl JsonResponse { output: None, } } + fn with_output(output: Option) -> Self { JsonResponse { success: true, @@ -87,6 +87,18 @@ impl JsonResponse { Err(e) => Self::with_error(Some(format!("Query engine output error: {}", e))), } } + + pub fn success(&self) -> bool { + self.success + } + + pub fn error(&self) -> Option<&String> { + self.error.as_ref() + } + + pub fn output(&self) -> Option<&JsonOutput> { + self.output.as_ref() + } } async fn shutdown_signal() { @@ -98,8 +110,8 @@ async fn shutdown_signal() { } impl HttpServer { - pub fn new(instance: InstanceRef) -> Self { - Self { instance } + pub fn new(query_handler: SqlQueryHandlerRef) -> Self { + Self { query_handler } } pub fn make_app(&self) -> Router { @@ -112,20 +124,28 @@ impl HttpServer { ServiceBuilder::new() .layer(HandleErrorLayer::new(handle_error)) .layer(TraceLayer::new_for_http()) - .layer(Extension(self.instance.clone())) - // TODO configure timeout + .layer(Extension(self.query_handler.clone())) + // TODO(LFC): make timeout configurable .layer(TimeoutLayer::new(Duration::from_secs(30))), ) } +} - pub async fn start(&self, addr: &str) -> Result<()> { +#[async_trait] +impl Server for HttpServer { + async fn shutdown(&mut self) -> Result<()> { + // TODO(LFC): shutdown http server, and remove `shutdown_signal` above + unimplemented!() + } + + async fn start(&mut self, listening: SocketAddr) -> Result { let app = self.make_app(); - let socket_addr: SocketAddr = addr.parse().context(ParseAddrSnafu { addr })?; - info!("Datanode HTTP server is listening on {}", socket_addr); - let server = axum::Server::bind(&socket_addr).serve(app.into_make_service()); + let server = axum::Server::bind(&listening).serve(app.into_make_service()); + let listening = server.local_addr(); + info!("HTTP server is bound to {}", listening); let graceful = server.with_graceful_shutdown(shutdown_signal()); - - graceful.await.context(StartHttpSnafu) + graceful.await.context(StartHttpSnafu)?; + Ok(listening) } } diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs new file mode 100644 index 0000000000..1ecbaeb511 --- /dev/null +++ b/src/servers/src/http/handler.rs @@ -0,0 +1,35 @@ +use std::collections::HashMap; + +use axum::extract::{Extension, Query}; +use common_telemetry::metric; + +use crate::http::{HttpResponse, JsonResponse}; +use crate::query_handler::SqlQueryHandlerRef; + +/// Handler to execute sql +#[axum_macros::debug_handler] +pub async fn sql( + Extension(query_handler): Extension, + Query(params): Query>, +) -> HttpResponse { + if let Some(sql) = params.get("sql") { + HttpResponse::Json(JsonResponse::from_output(query_handler.do_query(sql).await).await) + } else { + HttpResponse::Json(JsonResponse::with_error(Some( + "sql parameter is required.".to_string(), + ))) + } +} + +/// Handler to export metrics +#[axum_macros::debug_handler] +pub async fn metrics( + Extension(_query_handler): Extension, + Query(_params): Query>, +) -> HttpResponse { + if let Some(handle) = metric::try_handle() { + HttpResponse::Text(handle.render()) + } else { + HttpResponse::Text("Prometheus handle not initialized.".to_string()) + } +} diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs new file mode 100644 index 0000000000..b469916ef1 --- /dev/null +++ b/src/servers/src/lib.rs @@ -0,0 +1,6 @@ +pub mod error; +pub mod grpc; +pub mod http; +pub mod mysql; +pub mod query_handler; +pub mod server; diff --git a/src/common/servers/src/mysql/mysql_instance.rs b/src/servers/src/mysql/handler.rs similarity index 72% rename from src/common/servers/src/mysql/mysql_instance.rs rename to src/servers/src/mysql/handler.rs index bdf81fa248..9c784df7b5 100644 --- a/src/common/servers/src/mysql/mysql_instance.rs +++ b/src/servers/src/mysql/handler.rs @@ -1,5 +1,4 @@ use std::io; -use std::sync::Arc; use async_trait::async_trait; use opensrv_mysql::AsyncMysqlShim; @@ -7,27 +6,19 @@ use opensrv_mysql::ErrorKind; use opensrv_mysql::ParamParser; use opensrv_mysql::QueryResultWriter; use opensrv_mysql::StatementMetaWriter; -use query::query_engine::Output; -use crate::mysql::error::{self, Result}; -use crate::mysql::mysql_writer::MysqlResultWriter; - -pub type MysqlInstanceRef = Arc; - -// TODO(LFC): Move to instance layer. -#[async_trait] -pub trait MysqlInstance { - async fn do_query(&self, query: &str) -> Result; -} +use crate::error::{self, Result}; +use crate::mysql::writer::MysqlResultWriter; +use crate::query_handler::SqlQueryHandlerRef; // An intermediate shim for executing MySQL queries. pub struct MysqlInstanceShim { - mysql_instance: MysqlInstanceRef, + query_handler: SqlQueryHandlerRef, } impl MysqlInstanceShim { - pub fn create(mysql_instance: MysqlInstanceRef) -> MysqlInstanceShim { - MysqlInstanceShim { mysql_instance } + pub fn create(query_handler: SqlQueryHandlerRef) -> MysqlInstanceShim { + MysqlInstanceShim { query_handler } } } @@ -72,7 +63,7 @@ impl AsyncMysqlShim for MysqlInstanceShim { query: &'a str, writer: QueryResultWriter<'a, W>, ) -> Result<()> { - let output = self.mysql_instance.do_query(query).await; + let output = self.query_handler.do_query(query).await; let mut writer = MysqlResultWriter::new(writer); writer.write(output).await diff --git a/src/servers/src/mysql/mod.rs b/src/servers/src/mysql/mod.rs new file mode 100644 index 0000000000..2c0f9eef38 --- /dev/null +++ b/src/servers/src/mysql/mod.rs @@ -0,0 +1,3 @@ +pub mod handler; +pub mod server; +pub mod writer; diff --git a/src/common/servers/src/mysql/mysql_server.rs b/src/servers/src/mysql/server.rs similarity index 78% rename from src/common/servers/src/mysql/mysql_server.rs rename to src/servers/src/mysql/server.rs index b3d1976d95..ce844de5e2 100644 --- a/src/common/servers/src/mysql/mysql_server.rs +++ b/src/servers/src/mysql/server.rs @@ -16,9 +16,9 @@ use tokio::net::TcpStream; use tokio::task::JoinHandle; use tokio_stream::wrappers::TcpListenerStream; -use crate::error as server_error; -use crate::mysql::error::{self, Result}; -use crate::mysql::mysql_instance::{MysqlInstanceRef, MysqlInstanceShim}; +use crate::error::{self, Result}; +use crate::mysql::handler::MysqlInstanceShim; +use crate::query_handler::SqlQueryHandlerRef; use crate::server::Server; pub struct MysqlServer { @@ -32,14 +32,14 @@ pub struct MysqlServer { // A handle holding the TCP accepting task. join_handle: Option>, - mysql_handler: MysqlInstanceRef, + query_handler: SqlQueryHandlerRef, io_runtime: Arc, } impl MysqlServer { /// Creates a new MySQL server with provided [MysqlInstance] and [Runtime]. pub fn create_server( - mysql_handler: MysqlInstanceRef, + query_handler: SqlQueryHandlerRef, io_runtime: Arc, ) -> Box { let (abort_handle, registration) = AbortHandle::new_pair(); @@ -47,7 +47,7 @@ impl MysqlServer { abort_handle, abort_registration: Some(registration), join_handle: None, - mysql_handler, + query_handler, io_runtime, }) } @@ -59,21 +59,22 @@ impl MysqlServer { err_msg: format!("Failed to bind addr {}", addr), })?; // get actually bond addr in case input addr use port 0 - let listener_addr = listener.local_addr()?; - Ok((TcpListenerStream::new(listener), listener_addr)) + let addr = listener.local_addr()?; + info!("MySQL server is bound to {}", addr); + Ok((TcpListenerStream::new(listener), addr)) } fn accept(&self, accepting_stream: Abortable) -> impl Future { let io_runtime = self.io_runtime.clone(); - let mysql_handler = self.mysql_handler.clone(); + let query_handler = self.query_handler.clone(); accepting_stream.for_each(move |tcp_stream| { let io_runtime = io_runtime.clone(); - let mysql_handler = mysql_handler.clone(); + let query_handler = query_handler.clone(); async move { match tcp_stream { Err(error) => error!("Broken pipe: {}", error), Ok(io_stream) => { - if let Err(error) = Self::handle(io_stream, io_runtime, mysql_handler) { + if let Err(error) = Self::handle(io_stream, io_runtime, query_handler) { error!("Unexpected error when handling TcpStream: {:?}", error); }; } @@ -85,10 +86,10 @@ impl MysqlServer { pub fn handle( stream: TcpStream, io_runtime: Arc, - mysql_handler: MysqlInstanceRef, + query_handler: SqlQueryHandlerRef, ) -> Result<()> { info!("MySQL connection coming from: {}", stream.peer_addr()?); - let shim = MysqlInstanceShim::create(mysql_handler); + let shim = MysqlInstanceShim::create(query_handler); // TODO(LFC): Relate "handler" with MySQL session; also deal with panics there. let _handler = io_runtime.spawn(AsyncMysqlIntermediary::run_on(shim, stream)); Ok(()) @@ -97,7 +98,7 @@ impl MysqlServer { #[async_trait] impl Server for MysqlServer { - async fn shutdown(&mut self) -> server_error::Result<()> { + async fn shutdown(&mut self) -> Result<()> { match self.join_handle.take() { Some(join_handle) => { self.abort_handle.abort(); @@ -112,17 +113,14 @@ impl Server for MysqlServer { None => error::InternalSnafu { err_msg: "MySQL server is not started.", } - .fail() - .context(server_error::MysqlServerSnafu), + .fail()?, } } - async fn start(&mut self, listening: SocketAddr) -> server_error::Result { + async fn start(&mut self, listening: SocketAddr) -> Result { match self.abort_registration.take() { Some(registration) => { - let (stream, listener) = Self::bind(listening) - .await - .context(server_error::MysqlServerSnafu)?; + let (stream, listener) = Self::bind(listening).await?; let stream = Abortable::new(stream, registration); self.join_handle = Some(tokio::spawn(self.accept(stream))); Ok(listener) @@ -130,8 +128,7 @@ impl Server for MysqlServer { None => error::InternalSnafu { err_msg: "MySQL server has been started.", } - .fail() - .context(server_error::MysqlServerSnafu), + .fail()?, } } } diff --git a/src/common/servers/src/mysql/mysql_writer.rs b/src/servers/src/mysql/writer.rs similarity index 99% rename from src/common/servers/src/mysql/mysql_writer.rs rename to src/servers/src/mysql/writer.rs index 305a7d0e04..42b7086d14 100644 --- a/src/common/servers/src/mysql/mysql_writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -9,7 +9,7 @@ use opensrv_mysql::{ use query::Output; use snafu::prelude::*; -use crate::mysql::error::{self, Error, Result}; +use crate::error::{self, Error, Result}; struct QueryResult { recordbatches: Vec, diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs new file mode 100644 index 0000000000..0c469c37b6 --- /dev/null +++ b/src/servers/src/query_handler.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; + +use api::v1::{ObjectExpr, ObjectResult}; +use async_trait::async_trait; +use query::Output; + +use crate::error::Result; + +/// All query handler traits for various request protocols, like SQL or GRPC. +/// Instance that wishes to support certain request protocol, just implement the corresponding +/// trait, the Server will handle codec for you. +/// +/// Note: +/// Query handlers are not confined to only handle read requests, they are expecting to handle +/// write requests too. So the "query" here not might seem ambiguity. However, "query" has been +/// used as some kind of "convention", it's the "Q" in "SQL". So we might better stick to the +/// word "query". + +pub type SqlQueryHandlerRef = Arc; +pub type GrpcQueryHandlerRef = Arc; + +#[async_trait] +pub trait SqlQueryHandler { + async fn do_query(&self, query: &str) -> Result; +} + +#[async_trait] +pub trait GrpcQueryHandler { + async fn do_query(&self, query: ObjectExpr) -> Result; +} diff --git a/src/common/servers/src/server.rs b/src/servers/src/server.rs similarity index 100% rename from src/common/servers/src/server.rs rename to src/servers/src/server.rs diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs new file mode 100644 index 0000000000..22c30b81fb --- /dev/null +++ b/src/servers/tests/http/http_handler_test.rs @@ -0,0 +1,78 @@ +use std::collections::HashMap; + +use axum::extract::Query; +use axum::Extension; +use common_telemetry::metric; +use metrics::counter; +use servers::http::handler as http_handler; +use servers::http::{HttpResponse, JsonOutput}; +use test_util::MemTable; + +use crate::create_testing_sql_query_handler; + +#[tokio::test] +async fn test_sql_not_provided() { + let query_handler = create_testing_sql_query_handler(MemTable::default_numbers_table()); + let extension = Extension(query_handler); + + let json = http_handler::sql(extension, Query(HashMap::default())).await; + match json { + HttpResponse::Json(json) => { + assert!(!json.success()); + assert_eq!( + Some(&"sql parameter is required.".to_string()), + json.error() + ); + assert!(json.output().is_none()); + } + _ => unreachable!(), + } +} + +#[tokio::test] +async fn test_sql_output_rows() { + common_telemetry::init_default_ut_logging(); + + let query = create_query(); + let query_handler = create_testing_sql_query_handler(MemTable::default_numbers_table()); + let extension = Extension(query_handler); + + let json = http_handler::sql(extension, query).await; + match json { + HttpResponse::Json(json) => { + assert!(json.success(), "{:?}", json); + assert!(json.error().is_none()); + match json.output().expect("assertion failed") { + JsonOutput::Rows(rows) => { + assert_eq!(1, rows.len()); + } + _ => unreachable!(), + } + } + _ => unreachable!(), + } +} + +#[tokio::test] +async fn test_metrics() { + metric::init_default_metrics_recorder(); + + counter!("test_metrics", 1); + + let query = create_query(); + let query_handler = create_testing_sql_query_handler(MemTable::default_numbers_table()); + let extension = Extension(query_handler); + + let text = http_handler::metrics(extension, query).await; + match text { + HttpResponse::Text(s) => assert!(s.contains("test_metrics counter")), + _ => unreachable!(), + } +} + +fn create_query() -> Query> { + Query(HashMap::from([( + "sql".to_string(), + "select sum(uint32s) from numbers limit 20".to_string(), + )])) +} diff --git a/src/servers/tests/http/mod.rs b/src/servers/tests/http/mod.rs new file mode 100644 index 0000000000..5d1b718df1 --- /dev/null +++ b/src/servers/tests/http/mod.rs @@ -0,0 +1 @@ +mod http_handler_test; diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs new file mode 100644 index 0000000000..cca5e86628 --- /dev/null +++ b/src/servers/tests/mod.rs @@ -0,0 +1,42 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; +use catalog::{ + CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, +}; +use query::{Output, QueryEngineFactory, QueryEngineRef}; +use servers::error::Result; +use servers::query_handler::{SqlQueryHandler, SqlQueryHandlerRef}; +use test_util::MemTable; + +mod http; +mod mysql; + +struct DummyInstance { + query_engine: QueryEngineRef, +} + +#[async_trait] +impl SqlQueryHandler for DummyInstance { + async fn do_query(&self, query: &str) -> Result { + let plan = self.query_engine.sql_to_plan(query).unwrap(); + Ok(self.query_engine.execute(&plan).await.unwrap()) + } +} + +fn create_testing_sql_query_handler(table: MemTable) -> SqlQueryHandlerRef { + let table_name = table.table_name().to_string(); + let table = Arc::new(table); + + let schema_provider = Arc::new(MemorySchemaProvider::new()); + let catalog_provider = Arc::new(MemoryCatalogProvider::new()); + let catalog_list = Arc::new(MemoryCatalogList::default()); + schema_provider.register_table(table_name, table).unwrap(); + catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider); + catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider); + + let factory = QueryEngineFactory::new(catalog_list); + let query_engine = factory.query_engine().clone(); + Arc::new(DummyInstance { query_engine }) +} diff --git a/src/common/servers/tests/mysql/mod.rs b/src/servers/tests/mysql/mod.rs similarity index 100% rename from src/common/servers/tests/mysql/mod.rs rename to src/servers/tests/mysql/mod.rs diff --git a/src/common/servers/tests/mysql/mysql_server_test.rs b/src/servers/tests/mysql/mysql_server_test.rs similarity index 79% rename from src/common/servers/tests/mysql/mysql_server_test.rs rename to src/servers/tests/mysql/mysql_server_test.rs index e5a5dd4d7b..ba3f99992d 100644 --- a/src/common/servers/tests/mysql/mysql_server_test.rs +++ b/src/servers/tests/mysql/mysql_server_test.rs @@ -2,49 +2,30 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use async_trait::async_trait; -use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; -use catalog::{ - CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, -}; use common_recordbatch::RecordBatch; use common_runtime::Builder as RuntimeBuilder; -use common_servers::mysql::error::{Result, RuntimeResourceSnafu}; -use common_servers::mysql::mysql_instance::MysqlInstance; -use common_servers::mysql::mysql_server::MysqlServer; -use common_servers::server::Server; use datatypes::schema::Schema; use mysql_async::prelude::*; -use query::{Output, QueryEngineFactory, QueryEngineRef}; use rand::rngs::StdRng; use rand::Rng; -use snafu::prelude::*; +use servers::error::Result; +use servers::mysql::server::MysqlServer; +use servers::server::Server; use test_util::MemTable; +use crate::create_testing_sql_query_handler; use crate::mysql::{all_datatype_testing_data, MysqlTextRow, TestingData}; fn create_mysql_server(table: MemTable) -> Result> { - let table_name = table.table_name().to_string(); - let table = Arc::new(table); - - let schema_provider = Arc::new(MemorySchemaProvider::new()); - schema_provider.register_table(table_name, table).unwrap(); - let catalog_provider = Arc::new(MemoryCatalogProvider::new()); - catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider); - let catalog_list = Arc::new(MemoryCatalogList::default()); - catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider); - let factory = QueryEngineFactory::new(catalog_list); - let query_engine = factory.query_engine().clone(); - - let mysql_instance = Arc::new(DummyMysqlInstance { query_engine }); + let query_handler = create_testing_sql_query_handler(table); let io_runtime = Arc::new( RuntimeBuilder::default() .worker_threads(4) .thread_name("mysql-io-handlers") .build() - .context(RuntimeResourceSnafu)?, + .unwrap(), ); - Ok(MysqlServer::create_server(mysql_instance, io_runtime)) + Ok(MysqlServer::create_server(query_handler, io_runtime)) } #[tokio::test] @@ -209,15 +190,3 @@ async fn create_connection(port: u16) -> mysql_async::Result .wait_timeout(Some(1000)); mysql_async::Conn::new(opts).await } - -struct DummyMysqlInstance { - query_engine: QueryEngineRef, -} - -#[async_trait] -impl MysqlInstance for DummyMysqlInstance { - async fn do_query(&self, query: &str) -> Result { - let plan = self.query_engine.sql_to_plan(query).unwrap(); - Ok(self.query_engine.execute(&plan).await.unwrap()) - } -} diff --git a/src/common/servers/tests/mysql/mysql_writer_test.rs b/src/servers/tests/mysql/mysql_writer_test.rs similarity index 94% rename from src/common/servers/tests/mysql/mysql_writer_test.rs rename to src/servers/tests/mysql/mysql_writer_test.rs index 064c700751..392711b425 100644 --- a/src/common/servers/tests/mysql/mysql_writer_test.rs +++ b/src/servers/tests/mysql/mysql_writer_test.rs @@ -1,8 +1,8 @@ use std::sync::Arc; -use common_servers::mysql::mysql_writer::create_mysql_column_def; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema}; +use servers::mysql::writer::create_mysql_column_def; use crate::mysql::{all_datatype_testing_data, TestingData};