diff --git a/Cargo.lock b/Cargo.lock index 59291525d9..9675130378 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -212,15 +212,6 @@ dependencies = [ "syn", ] -[[package]] -name = "atomic-shim" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67cd4b51d303cf3501c301e8125df442128d3c6d7c69f71b27833d253de47e77" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "atomic_float" version = "0.1.0" @@ -799,7 +790,7 @@ version = "0.1.0" dependencies = [ "common-error", "common-telemetry", - "metrics 0.18.1", + "metrics 0.20.1", "once_cell", "paste", "snafu", @@ -807,31 +798,6 @@ dependencies = [ "tokio-test", ] -[[package]] -name = "common-servers" -version = "0.1.0" -dependencies = [ - "async-trait", - "catalog", - "common-base", - "common-error", - "common-recordbatch", - "common-runtime", - "common-telemetry", - "datatypes", - "futures", - "metrics 0.20.1", - "mysql_async", - "num_cpus", - "opensrv-mysql", - "query", - "rand 0.8.5", - "snafu", - "test-util", - "tokio", - "tokio-stream", -] - [[package]] name = "common-telemetry" version = "0.1.0" @@ -839,7 +805,7 @@ dependencies = [ "backtrace", "common-error", "console-subscriber", - "metrics 0.18.1", + "metrics 0.20.1", "metrics-exporter-prometheus", "once_cell", "opentelemetry", @@ -1267,20 +1233,23 @@ dependencies = [ "axum-macros", "axum-test-helper", "catalog", + "client", "common-base", "common-error", "common-query", "common-recordbatch", + "common-runtime", "common-telemetry", "datafusion", "datatypes", "hyper", "log-store", - "metrics 0.18.1", + "metrics 0.20.1", "object-store", "query", "serde", "serde_json", + "servers", "snafu", "sql", "storage", @@ -1744,9 +1713,6 @@ name = "hashbrown" version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" -dependencies = [ - "ahash", -] [[package]] name = "hashbrown" @@ -2306,16 +2272,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "metrics" -version = "0.18.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e52eb6380b6d2a10eb3434aec0885374490f5b82c8aaf5cd487a183c98be834" -dependencies = [ - "ahash", - "metrics-macros 0.5.1", -] - [[package]] name = "metrics" version = "0.19.0" @@ -2339,14 +2295,15 @@ dependencies = [ [[package]] name = "metrics-exporter-prometheus" -version = "0.9.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b93b470b04c005178058e18ac8bb2eb3fda562cf87af5ea05ba8d44190d458c" +checksum = "8603921e1f54ef386189335f288441af761e0fc61bcb552168d9cedfe63ebc70" dependencies = [ "indexmap", - "metrics 0.18.1", + "metrics 0.20.1", "metrics-util", - "parking_lot 0.11.2", + "parking_lot 0.12.0", + "portable-atomic", "quanta", "thiserror", ] @@ -2375,17 +2332,17 @@ dependencies = [ [[package]] name = "metrics-util" -version = "0.12.1" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65a9e83b833e1d2e07010a386b197c13aa199bbd0fca5cf69bfa147972db890a" +checksum = "f7d24dc2dbae22bff6f1f9326ffce828c9f07ef9cc1e8002e5279f845432a30a" dependencies = [ - "atomic-shim", "crossbeam-epoch", "crossbeam-utils", - "hashbrown 0.11.2", - "metrics 0.18.1", + "hashbrown 0.12.1", + "metrics 0.20.1", "num_cpus", - "parking_lot 0.11.2", + "parking_lot 0.12.0", + "portable-atomic", "quanta", "sketches-ddsketch", ] @@ -3166,9 +3123,9 @@ dependencies = [ [[package]] name = "portable-atomic" -version = "0.3.7" +version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ef3e12daa83946e79a4e22dff6ff8154138bfb34bef1769ec80c92bc3aa88e3" +checksum = "b303a15aeda678da614ab23306232dbd282d532f8c5919cedd41b66b9dc96560" [[package]] name = "ppv-lite86" @@ -3313,9 +3270,9 @@ dependencies = [ [[package]] name = "quanta" -version = "0.9.3" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20afe714292d5e879d8b12740aa223c6a88f118af41870e8b6196e39a02238a8" +checksum = "b7e31331286705f455e56cca62e0e717158474ff02b7936c1fa596d983f4ae27" dependencies = [ "crossbeam-utils", "libc", @@ -3345,7 +3302,7 @@ dependencies = [ "datatypes", "futures", "futures-util", - "metrics 0.18.1", + "metrics 0.20.1", "num", "num-traits", "rand 0.8.5", @@ -3453,9 +3410,9 @@ dependencies = [ [[package]] name = "raw-cpuid" -version = "10.3.0" +version = "10.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "738bc47119e3eeccc7e94c4a506901aea5e7b4944ecd0829cbebf4af04ceda12" +checksum = "2c49596760fce12ca21550ac21dc5a9617b2ea4b6e0aa7d8dab8ff2824fc2bba" dependencies = [ "bitflags", ] @@ -3803,6 +3760,40 @@ dependencies = [ "serde", ] +[[package]] +name = "servers" +version = "0.1.0" +dependencies = [ + "api", + "async-trait", + "axum", + "axum-macros", + "catalog", + "common-base", + "common-error", + "common-recordbatch", + "common-runtime", + "common-telemetry", + "datatypes", + "futures", + "hyper", + "metrics 0.20.1", + "mysql_async", + "num_cpus", + "opensrv-mysql", + "query", + "rand 0.8.5", + "serde", + "serde_json", + "snafu", + "test-util", + "tokio", + "tokio-stream", + "tonic 0.8.0", + "tower", + "tower-http", +] + [[package]] name = "sha-1" version = "0.10.0" @@ -3875,9 +3866,9 @@ checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" [[package]] name = "sketches-ddsketch" -version = "0.1.2" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76a77a8fd93886010f05e7ea0720e569d6d16c65329dbe3ec033bbbccccb017b" +checksum = "ceb945e54128e09c43d8e4f1277851bd5044c6fc540bbaa2ad888f60b3da9ae7" [[package]] name = "slab" diff --git a/Cargo.toml b/Cargo.toml index 743bfde0f8..7dd14b8809 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ members = [ "src/common/query", "src/common/recordbatch", "src/common/runtime", - "src/common/servers", "src/common/telemetry", "src/common/time", "src/cmd", @@ -19,6 +18,7 @@ members = [ "src/logical-plans", "src/object-store", "src/query", + "src/servers", "src/sql", "src/storage", "src/store-api", diff --git a/config/datanode.example.toml b/config/datanode.example.toml index e1c435352f..0a29d6ce40 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -2,6 +2,9 @@ http_addr = '0.0.0.0:3000' rpc_addr = '0.0.0.0:3001' wal_dir = '/tmp/wal' +mysql_addr = '0.0.0.0:3306' +mysql_runtime_size = 4 + [storage] type = 'File' data_dir = '/tmp/greptimedb/data/' diff --git a/src/client/src/database.rs b/src/client/src/database.rs index ec696c1e2c..e75caec0ef 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -65,7 +65,7 @@ impl Database { let header = obj_result.header.context(MissingHeaderSnafu)?; - if StatusCode::is_success(header.code) { + if !StatusCode::is_success(header.code) { return DataNodeSnafu { code: header.code, msg: header.err_msg, diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index df2aca5339..b054dddc44 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -4,6 +4,6 @@ mod error; pub use self::{ client::Client, - database::Database, + database::{Database, ObjectResult}, error::{Error, Result}, }; diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 03c6a72e78..40d66e2055 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -37,6 +37,8 @@ struct StartCommand { http_addr: Option, #[clap(long)] rpc_addr: Option, + #[clap(long)] + mysql_addr: Option, #[clap(short, long)] config_file: Option, } @@ -70,11 +72,44 @@ impl TryFrom for DatanodeOptions { if let Some(addr) = cmd.http_addr { opts.http_addr = addr; } - if let Some(addr) = cmd.rpc_addr { opts.rpc_addr = addr; } + if let Some(addr) = cmd.mysql_addr { + opts.mysql_addr = addr; + } Ok(opts) } } + +#[cfg(test)] +mod tests { + use datanode::datanode::ObjectStoreConfig; + + use super::*; + + #[test] + fn test_read_from_config_file() { + let cmd = StartCommand { + http_addr: None, + rpc_addr: None, + mysql_addr: None, + config_file: Some(format!( + "{}/../../config/datanode.example.toml", + std::env::current_dir().unwrap().as_path().to_str().unwrap() + )), + }; + let options: DatanodeOptions = cmd.try_into().unwrap(); + assert_eq!("0.0.0.0:3000".to_string(), options.http_addr); + assert_eq!("0.0.0.0:3001".to_string(), options.rpc_addr); + assert_eq!("/tmp/wal".to_string(), options.wal_dir); + assert_eq!("0.0.0.0:3306".to_string(), options.mysql_addr); + assert_eq!(4, options.mysql_runtime_size); + match options.storage { + ObjectStoreConfig::File { data_dir } => { + assert_eq!("/tmp/greptimedb/data/".to_string(), data_dir) + } + }; + } +} diff --git a/src/common/runtime/Cargo.toml b/src/common/runtime/Cargo.toml index 8327e73b09..ef3f92d9d9 100644 --- a/src/common/runtime/Cargo.toml +++ b/src/common/runtime/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] common-error = { path = "../error" } common-telemetry = { path = "../telemetry" } -metrics = "0.18" +metrics = "0.20" once_cell = "1.12" paste = "1.0" snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/common/servers/Cargo.toml b/src/common/servers/Cargo.toml deleted file mode 100644 index c5b10fb2f5..0000000000 --- a/src/common/servers/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -name = "common-servers" -version = "0.1.0" -edition = "2021" - -[dependencies] -async-trait = "0.1" -common-error = { path = "../error" } -common-recordbatch = { path = "../recordbatch" } -common-runtime = { path = "../runtime" } -common-telemetry = { path = "../telemetry" } -datatypes = { path = "../../datatypes"} -futures = "0.3" -metrics = "0.20" -num_cpus = "1.13" -opensrv-mysql = "0.1" -query = { path = "../../query" } -snafu = { version = "0.7", features = ["backtraces"] } -tokio = { version = "1.20", features = ["full"] } -tokio-stream = { version = "0.1", features = ["net"] } - -[dev-dependencies] -common-base = { path = "../base" } -catalog = { path = "../../catalog" } -mysql_async = "0.30" -rand = "0.8" -test-util = { path = "../../../test-util" } diff --git a/src/common/servers/src/error.rs b/src/common/servers/src/error.rs deleted file mode 100644 index a1ccdd1cdb..0000000000 --- a/src/common/servers/src/error.rs +++ /dev/null @@ -1,28 +0,0 @@ -use std::any::Any; - -use common_error::prelude::*; - -#[derive(Debug, Snafu)] -#[snafu(visibility(pub))] -pub enum Error { - #[snafu(display("MySQL server error, source: {}", source))] - MysqlServer { source: crate::mysql::error::Error }, -} - -pub type Result = std::result::Result; - -impl ErrorExt for Error { - fn status_code(&self) -> StatusCode { - match self { - Error::MysqlServer { .. } => StatusCode::Internal, - } - } - - fn backtrace_opt(&self) -> Option<&Backtrace> { - ErrorCompat::backtrace(self) - } - - fn as_any(&self) -> &dyn Any { - self - } -} diff --git a/src/common/servers/src/lib.rs b/src/common/servers/src/lib.rs deleted file mode 100644 index 1274427dbd..0000000000 --- a/src/common/servers/src/lib.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod error; -pub mod mysql; -pub mod server; diff --git a/src/common/servers/src/mysql/error.rs b/src/common/servers/src/mysql/error.rs deleted file mode 100644 index 9f851be9f3..0000000000 --- a/src/common/servers/src/mysql/error.rs +++ /dev/null @@ -1,60 +0,0 @@ -use std::any::Any; -use std::io; - -use common_error::prelude::*; - -#[derive(Debug, Snafu)] -#[snafu(visibility(pub))] -pub enum Error { - #[snafu(display("Internal error: {}", err_msg))] - Internal { err_msg: String }, - - #[snafu(display("Internal IO error, source: {}", source))] - InternalIo { source: io::Error }, - - #[snafu(display("Tokio IO error: {}, source: {}", err_msg, source))] - TokioIo { err_msg: String, source: io::Error }, - - #[snafu(display("Runtime resource error, source: {}", source))] - RuntimeResource { - source: common_runtime::error::Error, - }, - - #[snafu(display("Failed to convert vector, source: {}", source))] - VectorConversion { source: datatypes::error::Error }, - - #[snafu(display("Failed to collect recordbatch, source: {}", source))] - CollectRecordbatch { - source: common_recordbatch::error::Error, - }, -} - -pub type Result = std::result::Result; - -impl ErrorExt for Error { - fn status_code(&self) -> StatusCode { - match self { - Error::Internal { .. } | Error::InternalIo { .. } | Error::TokioIo { .. } => { - StatusCode::Unexpected - } - Error::VectorConversion { .. } | Error::CollectRecordbatch { .. } => { - StatusCode::Internal - } - Error::RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted, - } - } - - fn backtrace_opt(&self) -> Option<&Backtrace> { - ErrorCompat::backtrace(self) - } - - fn as_any(&self) -> &dyn Any { - self - } -} - -impl From for Error { - fn from(e: io::Error) -> Self { - Error::InternalIo { source: e } - } -} diff --git a/src/common/servers/src/mysql/mod.rs b/src/common/servers/src/mysql/mod.rs deleted file mode 100644 index 47bc2a60a2..0000000000 --- a/src/common/servers/src/mysql/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod error; -pub mod mysql_instance; -pub mod mysql_server; -pub mod mysql_writer; diff --git a/src/common/servers/tests/mod.rs b/src/common/servers/tests/mod.rs deleted file mode 100644 index 02b5c273ef..0000000000 --- a/src/common/servers/tests/mod.rs +++ /dev/null @@ -1 +0,0 @@ -mod mysql; diff --git a/src/common/telemetry/Cargo.toml b/src/common/telemetry/Cargo.toml index 1829b462ea..c4a41d2d52 100644 --- a/src/common/telemetry/Cargo.toml +++ b/src/common/telemetry/Cargo.toml @@ -13,8 +13,8 @@ deadlock_detection=["parking_lot"] backtrace = "0.3" common-error = { path = "../error" } console-subscriber = { version = "0.1", optional = true } -metrics = "0.18" -metrics-exporter-prometheus = { version = "0.9", default-features = false } +metrics = "0.20" +metrics-exporter-prometheus = { version = "0.11", default-features = false } once_cell = "1.10" opentelemetry = { version = "0.17", default-features = false, features = ["trace", "rt-tokio"] } opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] } diff --git a/src/common/telemetry/src/metric.rs b/src/common/telemetry/src/metric.rs index fb572f4a45..ad35c85473 100644 --- a/src/common/telemetry/src/metric.rs +++ b/src/common/telemetry/src/metric.rs @@ -21,7 +21,16 @@ fn init_prometheus_recorder() { let recorder = PrometheusBuilder::new().build_recorder(); let mut h = PROMETHEUS_HANDLE.as_ref().write().unwrap(); *h = Some(recorder.handle()); - metrics::clear_recorder(); + // TODO(LFC): separate metrics for testing and metrics for production + // `clear_recorder` is likely not expected to be called in production code, recorder should be + // globally unique and used throughout the whole lifetime of an application. + // It's marked as "unsafe" since [this PR](https://github.com/metrics-rs/metrics/pull/302), and + // "metrics" version also upgraded to 0.19. + // A quick look in the metrics codes suggests that the "unsafe" call is of no harm. However, + // it required a further investigation in how to use metric properly. + unsafe { + metrics::clear_recorder(); + } match metrics::set_boxed_recorder(Box::new(recorder)) { Ok(_) => (), Err(err) => crate::warn!("Install prometheus recorder failed, cause: {}", err), diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 8bfdd02833..2af95caaa5 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -19,15 +19,17 @@ catalog = { path = "../catalog" } common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-recordbatch = { path = "../common/recordbatch" } +common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } datatypes = { path = "../datatypes"} hyper = { version = "0.14", features = ["full"] } log-store = { path = "../log-store" } -metrics = "0.18" +metrics = "0.20" object-store = { path = "../object-store" } query = { path = "../query" } serde = "1.0" serde_json = "1.0" +servers = { path = "../servers" } snafu = { version = "0.7", features = ["backtraces"] } sql = { path = "../sql" } storage = { path = "../storage" } @@ -42,6 +44,7 @@ tower-http = { version ="0.3", features = ["full"]} [dev-dependencies] axum-test-helper = "0.1" +client = { path = "../client" } common-query = { path = "../common/query" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} tempdir = "0.3" diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index c7ba2fd74a..78f817a5e3 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -24,6 +24,8 @@ impl Default for ObjectStoreConfig { pub struct DatanodeOptions { pub http_addr: String, pub rpc_addr: String, + pub mysql_addr: String, + pub mysql_runtime_size: u32, pub wal_dir: String, pub storage: ObjectStoreConfig, } @@ -33,6 +35,8 @@ impl Default for DatanodeOptions { Self { http_addr: "0.0.0.0:3000".to_string(), rpc_addr: "0.0.0.0:3001".to_string(), + mysql_addr: "0.0.0.0:3306".to_string(), + mysql_runtime_size: 2, wal_dir: "/tmp/wal".to_string(), storage: ObjectStoreConfig::default(), } @@ -49,15 +53,15 @@ pub struct Datanode { impl Datanode { pub async fn new(opts: DatanodeOptions) -> Result { let instance = Arc::new(Instance::new(&opts).await?); - + let services = Services::try_new(instance.clone(), &opts)?; Ok(Self { opts, - services: Services::new(instance.clone()), + services, instance, }) } - pub async fn start(&self) -> Result<()> { + pub async fn start(&mut self) -> Result<()> { self.instance.start().await?; self.services.start(&self.opts).await } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 6601208481..003b54f403 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -80,10 +80,11 @@ pub enum Error { #[snafu(display("Fail to convert bytes to insert batch, {}", source))] DecodeInsert { source: DecodeError }, - // The error source of http error is clear even without backtrace now so - // a backtrace is not carried in this varaint. - #[snafu(display("Fail to start HTTP server, source: {}", source))] - StartHttp { source: hyper::Error }, + #[snafu(display("Failed to start server, source: {}", source))] + StartServer { + #[snafu(backtrace)] + source: servers::error::Error, + }, #[snafu(display("Fail to parse address {}, source: {}", addr, source))] ParseAddr { @@ -122,6 +123,12 @@ pub enum Error { #[snafu(display("Unsupported expr type: {}", name))] UnsupportedExpr { name: String }, + #[snafu(display("Runtime resource error, source: {}", source))] + RuntimeResource { + #[snafu(backtrace)] + source: common_runtime::error::Error, + }, + #[snafu(display("Invalid CREATE TABLE sql statement, cause: {}", msg))] InvalidCreateTableSql { msg: String, backtrace: Backtrace }, @@ -179,7 +186,7 @@ impl ErrorExt for Error { | Error::KeyColumnNotFound { .. } | Error::ConstraintNotSupported { .. } => StatusCode::InvalidArguments, // TODO(yingwen): Further categorize http error. - Error::StartHttp { .. } + Error::StartServer { .. } | Error::ParseAddr { .. } | Error::TcpBind { .. } | Error::StartGrpc { .. } @@ -190,6 +197,7 @@ impl ErrorExt for Error { Error::InitBackend { .. } => StatusCode::StorageUnavailable, Error::OpenLogStore { source } => source.status_code(), Error::OpenStorageEngine { source } => source.status_code(), + Error::RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted, } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index f4edcd3e51..dc0b20bec6 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -1,16 +1,18 @@ use std::{fs, path, sync::Arc}; -use api::v1::InsertExpr; +use api::v1::{object_expr, select_expr, InsertExpr, ObjectExpr, ObjectResult, SelectExpr}; +use async_trait::async_trait; use catalog::{CatalogManagerRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_error::status_code::StatusCode; use common_telemetry::logging::info; +use common_telemetry::timer; use log_store::fs::{config::LogConfig, log::LocalFileLogStore}; use object_store::{backend::fs::Backend, util, ObjectStore}; use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef}; -use snafu::{OptionExt, ResultExt}; +use servers::query_handler::{GrpcQueryHandler, SqlQueryHandler}; +use snafu::prelude::*; use sql::statements::statement::Statement; use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl}; -#[cfg(test)] -use table::engine::TableEngineRef; use table_engine::config::EngineConfig as TableEngineConfig; use table_engine::engine::MitoEngine; @@ -18,7 +20,10 @@ use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; use crate::error::{ self, ExecuteSqlSnafu, InsertSnafu, NewCatalogSnafu, Result, TableNotFoundSnafu, }; +use crate::metric; +use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder}; use crate::server::grpc::insert::insertion_expr_to_request; +use crate::server::grpc::select::to_object_result; use crate::sql::{SqlHandler, SqlRequest}; type DefaultEngine = MitoEngine>; @@ -146,61 +151,36 @@ impl Instance { Ok(()) } - #[cfg(test)] - pub fn table_engine(&self) -> TableEngineRef { - self.sql_handler.table_engine() + async fn handle_insert(&self, insert_expr: InsertExpr) -> ObjectResult { + match self.execute_grpc_insert(insert_expr).await { + Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new() + .status_code(StatusCode::Success as u32) + .mutate_result(rows as u32, 0) + .build(), + Err(err) => { + // TODO(fys): failure count + build_err_result(&err) + } + _ => unreachable!(), + } } - #[cfg(test)] - pub async fn create_test_table(&self) -> Result<()> { - use datatypes::data_type::ConcreteDataType; - use datatypes::schema::{ColumnSchema, Schema}; - use table::engine::EngineContext; - use table::requests::CreateTableRequest; + async fn handle_select(&self, select_expr: SelectExpr) -> ObjectResult { + match select_expr.expr { + Some(select_expr::Expr::Sql(sql)) => { + let result = self.execute_sql(&sql).await; + to_object_result(result).await + } + None => ObjectResult::default(), + } + } - use crate::error::CreateTableSnafu; + pub fn sql_handler(&self) -> &SqlHandler { + &self.sql_handler + } - let column_schemas = vec![ - ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), - ]; - - let table_name = "demo"; - let table = self - .table_engine() - .create_table( - &EngineContext::default(), - CreateTableRequest { - id: 1, - catalog_name: None, - schema_name: None, - table_name: table_name.to_string(), - desc: Some(" a test table".to_string()), - schema: Arc::new( - Schema::with_timestamp_index(column_schemas, 3) - .expect("ts is expected to be timestamp column"), - ), - create_if_not_exists: true, - primary_key_indices: Vec::default(), - }, - ) - .await - .context(CreateTableSnafu { table_name })?; - - let schema_provider = self - .catalog_manager - .catalog(DEFAULT_CATALOG_NAME) - .unwrap() - .schema(DEFAULT_SCHEMA_NAME) - .unwrap(); - - schema_provider - .register_table(table_name.to_string(), table) - .unwrap(); - - Ok(()) + pub fn catalog_manager(&self) -> &CatalogManagerRef { + &self.catalog_manager } } @@ -243,84 +223,40 @@ async fn create_local_file_log_store(opts: &DatanodeOptions) -> Result servers::error::Result { + let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED); + self.execute_sql(query) .await - .unwrap(); - - assert!(matches!(output, Output::AffectedRows(2))); - } - - #[tokio::test] - async fn test_execute_query() { - let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); - let instance = Instance::new(&opts).await.unwrap(); - instance.start().await.unwrap(); - - let output = instance - .execute_sql("select sum(number) from numbers limit 20") - .await - .unwrap(); - - match output { - Output::RecordBatch(recordbatch) => { - let numbers = util::collect(recordbatch).await.unwrap(); - let columns = numbers[0].df_recordbatch.columns(); - assert_eq!(1, columns.len()); - assert_eq!(columns[0].len(), 1); - - assert_eq!( - *columns[0].as_any().downcast_ref::().unwrap(), - UInt64Array::from_slice(&[4950]) - ); - } - _ => unreachable!(), - } - } - - #[tokio::test] - pub async fn test_execute_create() { - common_telemetry::init_default_ut_logging(); - let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); - let instance = Instance::new(&opts).await.unwrap(); - instance.start().await.unwrap(); - instance.create_test_table().await.unwrap(); - - let output = instance - .execute_sql( - r#"create table test_table( - host string, - ts bigint, - cpu double default 0, - memory double, - TIME INDEX (ts), - PRIMARY KEY(ts, host) - ) engine=mito with(regions=1);"#, - ) - .await - .unwrap(); - - assert!(matches!(output, Output::AffectedRows(1))); + // TODO(LFC): use snafu's `context` to include source error and backtrace. + // Ideally we should define a snafu in servers::error to wrap the error thrown + // by `execute_sql`. However, we cannot do that because that would introduce a circular + // dependency. + .map_err(|e| { + servers::error::ExecuteQuerySnafu { + query, + err_msg: format!("{}", e), + } + .fail::() + .unwrap_err() + }) + } +} + +#[async_trait] +impl GrpcQueryHandler for Instance { + async fn do_query(&self, query: ObjectExpr) -> servers::error::Result { + let object_resp = match query.expr { + Some(object_expr::Expr::Insert(insert_expr)) => self.handle_insert(insert_expr).await, + Some(object_expr::Expr::Select(select_expr)) => self.handle_select(select_expr).await, + other => { + return servers::error::NotSupportedSnafu { + feat: format!("{:?}", other), + } + .fail(); + } + }; + Ok(object_resp) } } diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 2c3478b138..398ee2747b 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -6,8 +6,3 @@ pub mod instance; mod metric; pub mod server; mod sql; - -#[cfg(test)] -pub mod test_util; -#[cfg(test)] -mod tests; diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index feb8e9b235..7327c29cd1 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -1,33 +1,66 @@ pub mod grpc; -pub mod http; -use grpc::GrpcServer; -use http::HttpServer; +use std::net::SocketAddr; +use std::sync::Arc; + +use common_runtime::Builder as RuntimeBuilder; +use servers::grpc::GrpcServer; +use servers::http::HttpServer; +use servers::mysql::server::MysqlServer; +use servers::server::Server; +use snafu::ResultExt; use tokio::try_join; use crate::datanode::DatanodeOptions; -use crate::error::Result; +use crate::error::{self, Result}; use crate::instance::InstanceRef; /// All rpc services. pub struct Services { http_server: HttpServer, grpc_server: GrpcServer, + mysql_server: Box, } impl Services { - pub fn new(instance: InstanceRef) -> Self { - Self { + pub fn try_new(instance: InstanceRef, opts: &DatanodeOptions) -> Result { + let mysql_io_runtime = Arc::new( + RuntimeBuilder::default() + .worker_threads(opts.mysql_runtime_size as usize) + .thread_name("mysql-io-handlers") + .build() + .context(error::RuntimeResourceSnafu)?, + ); + Ok(Self { http_server: HttpServer::new(instance.clone()), - grpc_server: GrpcServer::new(instance), - } + grpc_server: GrpcServer::new(instance.clone()), + mysql_server: MysqlServer::create_server(instance, mysql_io_runtime), + }) } - pub async fn start(&self, opts: &DatanodeOptions) -> Result<()> { + // TODO(LFC): make servers started on demand (not starting mysql if no needed, for example) + pub async fn start(&mut self, opts: &DatanodeOptions) -> Result<()> { + let http_addr = &opts.http_addr; + let http_addr: SocketAddr = http_addr + .parse() + .context(error::ParseAddrSnafu { addr: http_addr })?; + + let grpc_addr = &opts.rpc_addr; + let grpc_addr: SocketAddr = grpc_addr + .parse() + .context(error::ParseAddrSnafu { addr: grpc_addr })?; + + let mysql_addr = &opts.mysql_addr; + let mysql_addr: SocketAddr = mysql_addr + .parse() + .context(error::ParseAddrSnafu { addr: mysql_addr })?; + try_join!( - self.http_server.start(&opts.http_addr), - self.grpc_server.start(&opts.rpc_addr) - )?; + self.http_server.start(http_addr), + self.grpc_server.start(grpc_addr), + self.mysql_server.start(mysql_addr), + ) + .context(error::StartServerSnafu)?; Ok(()) } } diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 63401af1a1..285516957a 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -1,43 +1,3 @@ -mod handler; +pub(crate) mod handler; pub mod insert; -mod select; -mod server; - -use common_telemetry::logging::info; -use snafu::ResultExt; -use tokio::net::TcpListener; -use tokio_stream::wrappers::TcpListenerStream; - -use crate::{ - error::{Result, StartGrpcSnafu, TcpBindSnafu}, - instance::InstanceRef, - server::grpc::{handler::BatchHandler, server::Server}, -}; - -pub struct GrpcServer { - handler: BatchHandler, -} - -impl GrpcServer { - pub fn new(instance: InstanceRef) -> Self { - Self { - handler: BatchHandler::new(instance), - } - } - - pub async fn start(&self, addr: &str) -> Result<()> { - let listener = TcpListener::bind(addr) - .await - .context(TcpBindSnafu { addr })?; - let addr = listener.local_addr().context(TcpBindSnafu { addr })?; - info!("The gRPC server is running at {}", addr); - - let svc = Server::new(self.handler.clone()).into_service(); - let _ = tonic::transport::Server::builder() - .add_service(svc) - .serve_with_incoming(TcpListenerStream::new(listener)) - .await - .context(StartGrpcSnafu)?; - Ok(()) - } -} +pub mod select; diff --git a/src/datanode/src/server/grpc/handler.rs b/src/datanode/src/server/grpc/handler.rs index 7e35fb7392..b4229981a5 100644 --- a/src/datanode/src/server/grpc/handler.rs +++ b/src/datanode/src/server/grpc/handler.rs @@ -1,83 +1,10 @@ use api::v1::{ - codec::SelectResult, object_expr, object_result, select_expr, BatchRequest, BatchResponse, - DatabaseResponse, InsertExpr, MutateResult, ObjectResult, ResultHeader, SelectExpr, + codec::SelectResult, object_result, MutateResult, ObjectResult, ResultHeader, SelectResult as SelectResultRaw, }; use common_error::prelude::ErrorExt; -use common_error::status_code::StatusCode; -use query::Output; -use crate::server::grpc::{select::to_object_result, server::PROTOCOL_VERSION}; -use crate::{error::Result, error::UnsupportedExprSnafu, instance::InstanceRef}; - -#[derive(Clone)] -pub struct BatchHandler { - instance: InstanceRef, -} - -impl BatchHandler { - pub fn new(instance: InstanceRef) -> Self { - Self { instance } - } - - pub async fn batch(&self, batch_req: BatchRequest) -> Result { - let mut batch_resp = BatchResponse::default(); - let mut db_resp = DatabaseResponse::default(); - let databases = batch_req.databases; - - for req in databases { - let exprs = req.exprs; - - for obj_expr in exprs { - let object_resp = match obj_expr.expr { - Some(object_expr::Expr::Insert(insert_expr)) => { - self.handle_insert(insert_expr).await - } - Some(object_expr::Expr::Select(select_expr)) => { - self.handle_select(select_expr).await - } - other => { - return UnsupportedExprSnafu { - name: format!("{:?}", other), - } - .fail(); - } - }; - - db_resp.results.push(object_resp); - } - } - batch_resp.databases.push(db_resp); - Ok(batch_resp) - } - - pub async fn handle_insert(&self, insert_expr: InsertExpr) -> ObjectResult { - match self.instance.execute_grpc_insert(insert_expr).await { - Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new() - .status_code(StatusCode::Success as u32) - .mutate_result(rows as u32, 0) - .build(), - Err(err) => { - // TODO(fys): failure count - build_err_result(&err) - } - _ => unreachable!(), - } - } - - pub async fn handle_select(&self, select_expr: SelectExpr) -> ObjectResult { - let expr = match select_expr.expr { - Some(expr) => expr, - None => return ObjectResult::default(), - }; - match expr { - select_expr::Expr::Sql(sql) => { - let result = self.instance.execute_sql(&sql).await; - to_object_result(result).await - } - } - } -} +pub const PROTOCOL_VERSION: u32 = 1; pub type Success = u32; pub type Failure = u32; @@ -165,9 +92,8 @@ mod tests { use api::v1::{object_result, MutateResult}; use common_error::status_code::StatusCode; - use super::{build_err_result, ObjectResultBuilder}; - use crate::server::grpc::handler::UnsupportedExprSnafu; - use crate::server::grpc::server::PROTOCOL_VERSION; + use super::*; + use crate::error::UnsupportedExprSnafu; #[test] fn test_object_result_builder() { diff --git a/src/datanode/src/server/grpc/select.rs b/src/datanode/src/server/grpc/select.rs index 8d490c5963..4ff13036fb 100644 --- a/src/datanode/src/server/grpc/select.rs +++ b/src/datanode/src/server/grpc/select.rs @@ -13,7 +13,7 @@ use snafu::OptionExt; use crate::error::{ConversionSnafu, Result}; use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder}; -pub(crate) async fn to_object_result(result: Result) -> ObjectResult { +pub async fn to_object_result(result: Result) -> ObjectResult { match result { Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new() .status_code(StatusCode::Success as u32) diff --git a/src/datanode/src/server/grpc/server.rs b/src/datanode/src/server/grpc/server.rs deleted file mode 100644 index 433b406505..0000000000 --- a/src/datanode/src/server/grpc/server.rs +++ /dev/null @@ -1,30 +0,0 @@ -use api::v1::*; -use tonic::{Request, Response, Status}; - -use super::handler::BatchHandler; - -pub const PROTOCOL_VERSION: u32 = 1; - -#[derive(Clone)] -pub struct Server { - handler: BatchHandler, -} - -impl Server { - pub fn new(handler: BatchHandler) -> Self { - Self { handler } - } - - pub fn into_service(self) -> greptime_server::GreptimeServer { - greptime_server::GreptimeServer::new(self) - } -} - -#[tonic::async_trait] -impl greptime_server::Greptime for Server { - async fn batch(&self, req: Request) -> Result, Status> { - let req = req.into_inner(); - let res = self.handler.batch(req).await?; - Ok(Response::new(res)) - } -} diff --git a/src/datanode/src/server/http/handler.rs b/src/datanode/src/server/http/handler.rs deleted file mode 100644 index 34380442b6..0000000000 --- a/src/datanode/src/server/http/handler.rs +++ /dev/null @@ -1,123 +0,0 @@ -// http handlers - -use std::collections::HashMap; - -use axum::extract::{Extension, Query}; -use common_telemetry::{metric, timer}; - -use crate::instance::InstanceRef; -use crate::metric::METRIC_HANDLE_SQL_ELAPSED; -use crate::server::http::{HttpResponse, JsonResponse}; - -/// Handler to execute sql -#[axum_macros::debug_handler] -pub async fn sql( - Extension(instance): Extension, - Query(params): Query>, -) -> HttpResponse { - let _timer = timer!(METRIC_HANDLE_SQL_ELAPSED); - if let Some(sql) = params.get("sql") { - HttpResponse::Json(JsonResponse::from_output(instance.execute_sql(sql).await).await) - } else { - HttpResponse::Json(JsonResponse::with_error(Some( - "sql parameter is required.".to_string(), - ))) - } -} - -/// Handler to export metrics -#[axum_macros::debug_handler] -pub async fn metrics( - Extension(_instance): Extension, - Query(_params): Query>, -) -> HttpResponse { - if let Some(handle) = metric::try_handle() { - HttpResponse::Text(handle.render()) - } else { - HttpResponse::Text("Prometheus handle not initialized.".to_string()) - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use metrics::counter; - - use super::*; - use crate::instance::Instance; - use crate::server::http::JsonOutput; - use crate::test_util::{self, TestGuard}; - - fn create_params() -> Query> { - let mut map = HashMap::new(); - map.insert( - "sql".to_string(), - "select sum(number) from numbers limit 20".to_string(), - ); - Query(map) - } - - async fn create_extension() -> (Extension, TestGuard) { - let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(); - let instance = Arc::new(Instance::new(&opts).await.unwrap()); - instance.start().await.unwrap(); - (Extension(instance), guard) - } - - #[tokio::test] - async fn test_sql_not_provided() { - let (extension, _guard) = create_extension().await; - - let json = sql(extension, Query(HashMap::default())).await; - match json { - HttpResponse::Json(json) => { - assert!(!json.success); - assert_eq!(Some("sql parameter is required.".to_string()), json.error); - assert!(json.output.is_none()); - } - _ => unreachable!(), - } - } - - #[tokio::test] - async fn test_sql_output_rows() { - common_telemetry::init_default_ut_logging(); - let query = create_params(); - let (extension, _guard) = create_extension().await; - - let json = sql(extension, query).await; - - match json { - HttpResponse::Json(json) => { - assert!(json.success, "{:?}", json); - assert!(json.error.is_none()); - assert!(json.output.is_some()); - - match json.output.unwrap() { - JsonOutput::Rows(rows) => { - assert_eq!(1, rows.len()); - } - _ => unreachable!(), - } - } - _ => unreachable!(), - } - } - - #[tokio::test] - async fn test_metrics() { - metric::init_default_metrics_recorder(); - - counter!("test_metrics", 1); - - let query = create_params(); - let (extension, _guard) = create_extension().await; - let text = metrics(extension, query).await; - - match text { - HttpResponse::Text(s) => assert!(s.contains("test_metrics counter")), - _ => unreachable!(), - } - } -} diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index f4e2ddeb18..e42252c4b8 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -50,7 +50,6 @@ impl SqlHandler { .context(TableNotFoundSnafu { table_name }) } - #[cfg(test)] pub fn table_engine(&self) -> Arc { self.table_engine.clone() } diff --git a/src/datanode/src/test_util.rs b/src/datanode/src/test_util.rs deleted file mode 100644 index 30ed650a9a..0000000000 --- a/src/datanode/src/test_util.rs +++ /dev/null @@ -1,32 +0,0 @@ -use tempdir::TempDir; - -use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; - -/// Create a tmp dir(will be deleted once it goes out of scope.) and a default `DatanodeOptions`, -/// Only for test. -/// -/// TODO: Add a test feature -pub struct TestGuard { - _wal_tmp_dir: TempDir, - _data_tmp_dir: TempDir, -} - -pub fn create_tmp_dir_and_datanode_opts() -> (DatanodeOptions, TestGuard) { - let wal_tmp_dir = TempDir::new("/tmp/greptimedb_test_wal").unwrap(); - let data_tmp_dir = TempDir::new("/tmp/greptimedb_test_data").unwrap(); - let opts = DatanodeOptions { - wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), - storage: ObjectStoreConfig::File { - data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), - }, - ..Default::default() - }; - - ( - opts, - TestGuard { - _wal_tmp_dir: wal_tmp_dir, - _data_tmp_dir: data_tmp_dir, - }, - ) -} diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs deleted file mode 100644 index 150709f786..0000000000 --- a/src/datanode/src/tests.rs +++ /dev/null @@ -1 +0,0 @@ -mod http_test; diff --git a/src/datanode/tests/grpc_test.rs b/src/datanode/tests/grpc_test.rs new file mode 100644 index 0000000000..8cf37e8fbb --- /dev/null +++ b/src/datanode/tests/grpc_test.rs @@ -0,0 +1,113 @@ +mod test_util; + +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +use api::v1::{codec::InsertBatch, column, select_expr, Column, SelectExpr}; +use client::{Client, Database, ObjectResult}; +use datanode::instance::Instance; +use servers::grpc::GrpcServer; +use servers::server::Server; + +#[tokio::test] +async fn test_insert_and_select() { + common_telemetry::init_default_ut_logging(); + + let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); + let instance = Arc::new(Instance::new(&opts).await.unwrap()); + instance.start().await.unwrap(); + + test_util::create_test_table(&instance).await.unwrap(); + + tokio::spawn(async move { + let mut grpc_server = GrpcServer::new(instance); + let addr = "127.0.0.1:3001".parse::().unwrap(); + grpc_server.start(addr).await.unwrap() + }); + + // wait for GRPC server to start + tokio::time::sleep(Duration::from_secs(1)).await; + + let grpc_client = Client::connect("http://127.0.0.1:3001").await.unwrap(); + let db = Database::new("greptime", grpc_client); + + // testing data: + let expected_host_col = Column { + column_name: "host".to_string(), + values: Some(column::Values { + string_values: vec!["host1", "host2", "host3", "host4"] + .into_iter() + .map(|s| s.to_string()) + .collect(), + ..Default::default() + }), + ..Default::default() + }; + let expected_cpu_col = Column { + column_name: "cpu".to_string(), + values: Some(column::Values { + f64_values: vec![0.31, 0.41, 0.2], + ..Default::default() + }), + null_mask: vec![2], + ..Default::default() + }; + let expected_mem_col = Column { + column_name: "memory".to_string(), + values: Some(column::Values { + f64_values: vec![0.1, 0.2, 0.3], + ..Default::default() + }), + null_mask: vec![4], + ..Default::default() + }; + let expected_ts_col = Column { + column_name: "ts".to_string(), + values: Some(column::Values { + i64_values: vec![100, 101, 102, 103], + ..Default::default() + }), + ..Default::default() + }; + + // insert + let values = vec![InsertBatch { + columns: vec![ + expected_host_col.clone(), + expected_cpu_col.clone(), + expected_mem_col.clone(), + expected_ts_col.clone(), + ], + row_count: 4, + } + .into()]; + let result = db.insert("demo", values).await; + assert!(result.is_ok()); + + // select + let select_expr = SelectExpr { + expr: Some(select_expr::Expr::Sql("select * from demo".to_string())), + }; + let result = db.select(select_expr).await.unwrap(); + assert!(matches!(result, ObjectResult::Select(_))); + match result { + ObjectResult::Select(select_result) => { + assert_eq!(4, select_result.row_count); + let actual_columns = select_result.columns; + assert_eq!(4, actual_columns.len()); + + let expected_columns = vec![ + expected_ts_col, + expected_host_col, + expected_cpu_col, + expected_mem_col, + ]; + expected_columns + .iter() + .zip(actual_columns.iter()) + .for_each(|(x, y)| assert_eq!(x, y)); + } + _ => unreachable!(), + } +} diff --git a/src/datanode/src/tests/http_test.rs b/src/datanode/tests/http_test.rs similarity index 93% rename from src/datanode/src/tests/http_test.rs rename to src/datanode/tests/http_test.rs index 8db7cc11af..7a44b687f9 100644 --- a/src/datanode/src/tests/http_test.rs +++ b/src/datanode/tests/http_test.rs @@ -1,14 +1,13 @@ -//! http server test +mod test_util; use std::sync::Arc; use axum::http::StatusCode; use axum::Router; use axum_test_helper::TestClient; - -use crate::instance::Instance; -use crate::server::http::HttpServer; -use crate::test_util::{self, TestGuard}; +use datanode::instance::Instance; +use servers::http::HttpServer; +use test_util::TestGuard; async fn make_test_app() -> (Router, TestGuard) { let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(); diff --git a/src/datanode/tests/instance_test.rs b/src/datanode/tests/instance_test.rs new file mode 100644 index 0000000000..a259e7e791 --- /dev/null +++ b/src/datanode/tests/instance_test.rs @@ -0,0 +1,80 @@ +mod test_util; + +use arrow::array::UInt64Array; +use common_recordbatch::util; +use datanode::instance::Instance; +use query::Output; + +#[tokio::test] +async fn test_execute_insert() { + common_telemetry::init_default_ut_logging(); + + let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); + let instance = Instance::new(&opts).await.unwrap(); + instance.start().await.unwrap(); + + test_util::create_test_table(&instance).await.unwrap(); + + let output = instance + .execute_sql( + r#"insert into demo(host, cpu, memory, ts) values + ('host1', 66.6, 1024, 1655276557000), + ('host2', 88.8, 333.3, 1655276558000) + "#, + ) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(2))); +} + +#[tokio::test] +async fn test_execute_query() { + let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); + let instance = Instance::new(&opts).await.unwrap(); + instance.start().await.unwrap(); + + let output = instance + .execute_sql("select sum(number) from numbers limit 20") + .await + .unwrap(); + match output { + Output::RecordBatch(recordbatch) => { + let numbers = util::collect(recordbatch).await.unwrap(); + let columns = numbers[0].df_recordbatch.columns(); + assert_eq!(1, columns.len()); + assert_eq!(columns[0].len(), 1); + + assert_eq!( + *columns[0].as_any().downcast_ref::().unwrap(), + UInt64Array::from_slice(&[4950]) + ); + } + _ => unreachable!(), + } +} + +#[tokio::test] +pub async fn test_execute_create() { + common_telemetry::init_default_ut_logging(); + + let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); + let instance = Instance::new(&opts).await.unwrap(); + instance.start().await.unwrap(); + + test_util::create_test_table(&instance).await.unwrap(); + + let output = instance + .execute_sql( + r#"create table test_table( + host string, + ts bigint, + cpu double default 0, + memory double, + TIME INDEX (ts), + PRIMARY KEY(ts, host) + ) engine=mito with(regions=1);"#, + ) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); +} diff --git a/src/datanode/tests/test_util.rs b/src/datanode/tests/test_util.rs new file mode 100644 index 0000000000..8a86b3b40a --- /dev/null +++ b/src/datanode/tests/test_util.rs @@ -0,0 +1,85 @@ +use std::sync::Arc; + +use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; +use datanode::error::{CreateTableSnafu, Result}; +use datanode::instance::Instance; +use datatypes::data_type::ConcreteDataType; +use datatypes::schema::{ColumnSchema, Schema}; +use snafu::ResultExt; +use table::engine::EngineContext; +use table::engine::TableEngineRef; +use table::requests::CreateTableRequest; +use tempdir::TempDir; + +/// Create a tmp dir(will be deleted once it goes out of scope.) and a default `DatanodeOptions`, +/// Only for test. +pub struct TestGuard { + _wal_tmp_dir: TempDir, + _data_tmp_dir: TempDir, +} + +pub fn create_tmp_dir_and_datanode_opts() -> (DatanodeOptions, TestGuard) { + let wal_tmp_dir = TempDir::new("/tmp/greptimedb_test_wal").unwrap(); + let data_tmp_dir = TempDir::new("/tmp/greptimedb_test_data").unwrap(); + let opts = DatanodeOptions { + wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + storage: ObjectStoreConfig::File { + data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), + }, + ..Default::default() + }; + ( + opts, + TestGuard { + _wal_tmp_dir: wal_tmp_dir, + _data_tmp_dir: data_tmp_dir, + }, + ) +} + +// It's actually not dead codes, at least been used in instance_test.rs and grpc_test.rs +// However, clippy keeps warning us, so I temporary add an "allow" to bypass it. +// TODO(LFC): further investigate why clippy falsely warning "dead_code" +#[allow(dead_code)] +pub async fn create_test_table(instance: &Instance) -> Result<()> { + let column_schemas = vec![ + ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), + ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), + ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), + ]; + + let table_name = "demo"; + let table_engine: TableEngineRef = instance.sql_handler().table_engine(); + let table = table_engine + .create_table( + &EngineContext::default(), + CreateTableRequest { + id: 1, + catalog_name: None, + schema_name: None, + table_name: table_name.to_string(), + desc: Some(" a test table".to_string()), + schema: Arc::new( + Schema::with_timestamp_index(column_schemas, 3) + .expect("ts is expected to be timestamp column"), + ), + create_if_not_exists: true, + primary_key_indices: Vec::default(), + }, + ) + .await + .context(CreateTableSnafu { table_name })?; + + let schema_provider = instance + .catalog_manager() + .catalog(DEFAULT_CATALOG_NAME) + .unwrap() + .schema(DEFAULT_SCHEMA_NAME) + .unwrap(); + schema_provider + .register_table(table_name.to_string(), table) + .unwrap(); + Ok(()) +} diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 127abab0fb..ebead01f94 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -22,7 +22,7 @@ datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", br datatypes = { path = "../datatypes" } futures = "0.3" futures-util = "0.3" -metrics = "0.18" +metrics = "0.20" serde = "1.0" serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml new file mode 100644 index 0000000000..cfd5904e51 --- /dev/null +++ b/src/servers/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "servers" +version = "0.1.0" +edition = "2021" + +[dependencies] +api = { path = "../api" } +async-trait = "0.1" +axum = "0.5" +axum-macros = "0.2" +common-error = { path = "../common/error" } +common-recordbatch = { path = "../common/recordbatch" } +common-runtime = { path = "../common/runtime" } +common-telemetry = { path = "../common/telemetry" } +datatypes = { path = "../datatypes" } +futures = "0.3" +hyper = { version = "0.14", features = ["full"] } +metrics = "0.20" +num_cpus = "1.13" +opensrv-mysql = "0.1" +query = { path = "../query" } +serde = "1.0" +serde_json = "1.0" +snafu = { version = "0.7", features = ["backtraces"] } +tonic = "0.8" +tokio = { version = "1.20", features = ["full"] } +tokio-stream = { version = "0.1", features = ["net"] } +tower = { version = "0.4", features = ["full"]} +tower-http = { version ="0.3", features = ["full"]} + +[dev-dependencies] +common-base = { path = "../common/base" } +catalog = { path = "../catalog" } +mysql_async = "0.30" +rand = "0.8" +test-util = { path = "../../test-util" } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs new file mode 100644 index 0000000000..85c83225ab --- /dev/null +++ b/src/servers/src/error.rs @@ -0,0 +1,89 @@ +use std::any::Any; +use std::net::SocketAddr; + +use common_error::prelude::*; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Internal error: {}", err_msg))] + Internal { err_msg: String }, + + #[snafu(display("Internal IO error, source: {}", source))] + InternalIo { source: std::io::Error }, + + #[snafu(display("Tokio IO error: {}, source: {}", err_msg, source))] + TokioIo { + err_msg: String, + source: std::io::Error, + }, + + #[snafu(display("Failed to convert vector, source: {}", source))] + VectorConversion { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Failed to collect recordbatch, source: {}", source))] + CollectRecordbatch { + #[snafu(backtrace)] + source: common_recordbatch::error::Error, + }, + + #[snafu(display("Failed to start HTTP server, source: {}", source))] + StartHttp { source: hyper::Error }, + + #[snafu(display("Failed to start gRPC server, source: {}", source))] + StartGrpc { source: tonic::transport::Error }, + + #[snafu(display("Failed to bind address {}, source: {}", addr, source))] + TcpBind { + addr: SocketAddr, + source: std::io::Error, + }, + + #[snafu(display("Failed to execute query: {}, error: {}", query, err_msg))] + ExecuteQuery { query: String, err_msg: String }, + + #[snafu(display("Not supported: {}", feat))] + NotSupported { feat: String }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::Internal { .. } + | Error::InternalIo { .. } + | Error::TokioIo { .. } + | Error::VectorConversion { .. } + | Error::CollectRecordbatch { .. } + | Error::StartHttp { .. } + | Error::StartGrpc { .. } + | Error::TcpBind { .. } + | Error::ExecuteQuery { .. } => StatusCode::Internal, + Error::NotSupported { .. } => StatusCode::InvalidArguments, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl From for tonic::Status { + fn from(err: Error) -> Self { + tonic::Status::new(tonic::Code::Internal, err.to_string()) + } +} + +impl From for Error { + fn from(e: std::io::Error) -> Self { + Error::InternalIo { source: e } + } +} diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs new file mode 100644 index 0000000000..b5bc22b4cd --- /dev/null +++ b/src/servers/src/grpc.rs @@ -0,0 +1,72 @@ +pub mod handler; + +use std::net::SocketAddr; + +use api::v1::{greptime_server, BatchRequest, BatchResponse}; +use async_trait::async_trait; +use common_telemetry::logging::info; +use snafu::ResultExt; +use tokio::net::TcpListener; +use tokio_stream::wrappers::TcpListenerStream; +use tonic::{Request, Response, Status}; + +use crate::error::{Result, StartGrpcSnafu, TcpBindSnafu}; +use crate::grpc::handler::BatchHandler; +use crate::query_handler::GrpcQueryHandlerRef; +use crate::server::Server; + +pub struct GrpcServer { + query_handler: GrpcQueryHandlerRef, +} + +impl GrpcServer { + pub fn new(query_handler: GrpcQueryHandlerRef) -> Self { + Self { query_handler } + } + + pub fn create_service(&self) -> greptime_server::GreptimeServer { + let service = GrpcService { + handler: BatchHandler::new(self.query_handler.clone()), + }; + greptime_server::GreptimeServer::new(service) + } +} + +pub struct GrpcService { + handler: BatchHandler, +} + +#[tonic::async_trait] +impl greptime_server::Greptime for GrpcService { + async fn batch( + &self, + req: Request, + ) -> std::result::Result, Status> { + let req = req.into_inner(); + let res = self.handler.batch(req).await?; + Ok(Response::new(res)) + } +} + +#[async_trait] +impl Server for GrpcServer { + async fn shutdown(&mut self) -> Result<()> { + // TODO(LFC): shutdown grpc server + unimplemented!() + } + + async fn start(&mut self, addr: SocketAddr) -> Result { + let listener = TcpListener::bind(addr) + .await + .context(TcpBindSnafu { addr })?; + let addr = listener.local_addr().context(TcpBindSnafu { addr })?; + info!("GRPC server is bound to {}", addr); + + tonic::transport::Server::builder() + .add_service(self.create_service()) + .serve_with_incoming(TcpListenerStream::new(listener)) + .await + .context(StartGrpcSnafu)?; + Ok(addr) + } +} diff --git a/src/servers/src/grpc/handler.rs b/src/servers/src/grpc/handler.rs new file mode 100644 index 0000000000..82d91eee34 --- /dev/null +++ b/src/servers/src/grpc/handler.rs @@ -0,0 +1,29 @@ +use api::v1::{BatchRequest, BatchResponse, DatabaseResponse}; + +use crate::error::Result; +use crate::query_handler::GrpcQueryHandlerRef; + +#[derive(Clone)] +pub struct BatchHandler { + query_handler: GrpcQueryHandlerRef, +} + +impl BatchHandler { + pub fn new(query_handler: GrpcQueryHandlerRef) -> Self { + Self { query_handler } + } + + pub async fn batch(&self, batch_req: BatchRequest) -> Result { + let mut batch_resp = BatchResponse::default(); + let mut db_resp = DatabaseResponse::default(); + + for db_req in batch_req.databases { + for obj_expr in db_req.exprs { + let object_resp = self.query_handler.do_query(obj_expr).await?; + db_resp.results.push(object_resp); + } + } + batch_resp.databases.push(db_resp); + Ok(batch_resp) + } +} diff --git a/src/datanode/src/server/http.rs b/src/servers/src/http.rs similarity index 73% rename from src/datanode/src/server/http.rs rename to src/servers/src/http.rs index f662ce3535..d9f5101cc6 100644 --- a/src/datanode/src/server/http.rs +++ b/src/servers/src/http.rs @@ -1,8 +1,9 @@ -mod handler; +pub mod handler; use std::net::SocketAddr; use std::time::Duration; +use async_trait::async_trait; use axum::{ error_handling::HandleErrorLayer, response::IntoResponse, @@ -18,12 +19,12 @@ use snafu::ResultExt; use tower::{timeout::TimeoutLayer, ServiceBuilder}; use tower_http::trace::TraceLayer; -use crate::error::{ParseAddrSnafu, Result, StartHttpSnafu}; -use crate::server::InstanceRef; +use crate::error::{Result, StartHttpSnafu}; +use crate::query_handler::SqlQueryHandlerRef; +use crate::server::Server; -/// Http server pub struct HttpServer { - instance: InstanceRef, + query_handler: SqlQueryHandlerRef, } #[derive(Serialize, Debug)] @@ -32,14 +33,12 @@ pub enum JsonOutput { Rows(Vec), } -/// Http response #[derive(Serialize, Debug)] pub enum HttpResponse { Json(JsonResponse), Text(String), } -/// Json response #[derive(Serialize, Debug)] pub struct JsonResponse { success: bool, @@ -66,6 +65,7 @@ impl JsonResponse { output: None, } } + fn with_output(output: Option) -> Self { JsonResponse { success: true, @@ -87,6 +87,18 @@ impl JsonResponse { Err(e) => Self::with_error(Some(format!("Query engine output error: {}", e))), } } + + pub fn success(&self) -> bool { + self.success + } + + pub fn error(&self) -> Option<&String> { + self.error.as_ref() + } + + pub fn output(&self) -> Option<&JsonOutput> { + self.output.as_ref() + } } async fn shutdown_signal() { @@ -98,8 +110,8 @@ async fn shutdown_signal() { } impl HttpServer { - pub fn new(instance: InstanceRef) -> Self { - Self { instance } + pub fn new(query_handler: SqlQueryHandlerRef) -> Self { + Self { query_handler } } pub fn make_app(&self) -> Router { @@ -112,20 +124,28 @@ impl HttpServer { ServiceBuilder::new() .layer(HandleErrorLayer::new(handle_error)) .layer(TraceLayer::new_for_http()) - .layer(Extension(self.instance.clone())) - // TODO configure timeout + .layer(Extension(self.query_handler.clone())) + // TODO(LFC): make timeout configurable .layer(TimeoutLayer::new(Duration::from_secs(30))), ) } +} - pub async fn start(&self, addr: &str) -> Result<()> { +#[async_trait] +impl Server for HttpServer { + async fn shutdown(&mut self) -> Result<()> { + // TODO(LFC): shutdown http server, and remove `shutdown_signal` above + unimplemented!() + } + + async fn start(&mut self, listening: SocketAddr) -> Result { let app = self.make_app(); - let socket_addr: SocketAddr = addr.parse().context(ParseAddrSnafu { addr })?; - info!("Datanode HTTP server is listening on {}", socket_addr); - let server = axum::Server::bind(&socket_addr).serve(app.into_make_service()); + let server = axum::Server::bind(&listening).serve(app.into_make_service()); + let listening = server.local_addr(); + info!("HTTP server is bound to {}", listening); let graceful = server.with_graceful_shutdown(shutdown_signal()); - - graceful.await.context(StartHttpSnafu) + graceful.await.context(StartHttpSnafu)?; + Ok(listening) } } diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs new file mode 100644 index 0000000000..1ecbaeb511 --- /dev/null +++ b/src/servers/src/http/handler.rs @@ -0,0 +1,35 @@ +use std::collections::HashMap; + +use axum::extract::{Extension, Query}; +use common_telemetry::metric; + +use crate::http::{HttpResponse, JsonResponse}; +use crate::query_handler::SqlQueryHandlerRef; + +/// Handler to execute sql +#[axum_macros::debug_handler] +pub async fn sql( + Extension(query_handler): Extension, + Query(params): Query>, +) -> HttpResponse { + if let Some(sql) = params.get("sql") { + HttpResponse::Json(JsonResponse::from_output(query_handler.do_query(sql).await).await) + } else { + HttpResponse::Json(JsonResponse::with_error(Some( + "sql parameter is required.".to_string(), + ))) + } +} + +/// Handler to export metrics +#[axum_macros::debug_handler] +pub async fn metrics( + Extension(_query_handler): Extension, + Query(_params): Query>, +) -> HttpResponse { + if let Some(handle) = metric::try_handle() { + HttpResponse::Text(handle.render()) + } else { + HttpResponse::Text("Prometheus handle not initialized.".to_string()) + } +} diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs new file mode 100644 index 0000000000..b469916ef1 --- /dev/null +++ b/src/servers/src/lib.rs @@ -0,0 +1,6 @@ +pub mod error; +pub mod grpc; +pub mod http; +pub mod mysql; +pub mod query_handler; +pub mod server; diff --git a/src/common/servers/src/mysql/mysql_instance.rs b/src/servers/src/mysql/handler.rs similarity index 72% rename from src/common/servers/src/mysql/mysql_instance.rs rename to src/servers/src/mysql/handler.rs index bdf81fa248..9c784df7b5 100644 --- a/src/common/servers/src/mysql/mysql_instance.rs +++ b/src/servers/src/mysql/handler.rs @@ -1,5 +1,4 @@ use std::io; -use std::sync::Arc; use async_trait::async_trait; use opensrv_mysql::AsyncMysqlShim; @@ -7,27 +6,19 @@ use opensrv_mysql::ErrorKind; use opensrv_mysql::ParamParser; use opensrv_mysql::QueryResultWriter; use opensrv_mysql::StatementMetaWriter; -use query::query_engine::Output; -use crate::mysql::error::{self, Result}; -use crate::mysql::mysql_writer::MysqlResultWriter; - -pub type MysqlInstanceRef = Arc; - -// TODO(LFC): Move to instance layer. -#[async_trait] -pub trait MysqlInstance { - async fn do_query(&self, query: &str) -> Result; -} +use crate::error::{self, Result}; +use crate::mysql::writer::MysqlResultWriter; +use crate::query_handler::SqlQueryHandlerRef; // An intermediate shim for executing MySQL queries. pub struct MysqlInstanceShim { - mysql_instance: MysqlInstanceRef, + query_handler: SqlQueryHandlerRef, } impl MysqlInstanceShim { - pub fn create(mysql_instance: MysqlInstanceRef) -> MysqlInstanceShim { - MysqlInstanceShim { mysql_instance } + pub fn create(query_handler: SqlQueryHandlerRef) -> MysqlInstanceShim { + MysqlInstanceShim { query_handler } } } @@ -72,7 +63,7 @@ impl AsyncMysqlShim for MysqlInstanceShim { query: &'a str, writer: QueryResultWriter<'a, W>, ) -> Result<()> { - let output = self.mysql_instance.do_query(query).await; + let output = self.query_handler.do_query(query).await; let mut writer = MysqlResultWriter::new(writer); writer.write(output).await diff --git a/src/servers/src/mysql/mod.rs b/src/servers/src/mysql/mod.rs new file mode 100644 index 0000000000..2c0f9eef38 --- /dev/null +++ b/src/servers/src/mysql/mod.rs @@ -0,0 +1,3 @@ +pub mod handler; +pub mod server; +pub mod writer; diff --git a/src/common/servers/src/mysql/mysql_server.rs b/src/servers/src/mysql/server.rs similarity index 78% rename from src/common/servers/src/mysql/mysql_server.rs rename to src/servers/src/mysql/server.rs index b3d1976d95..ce844de5e2 100644 --- a/src/common/servers/src/mysql/mysql_server.rs +++ b/src/servers/src/mysql/server.rs @@ -16,9 +16,9 @@ use tokio::net::TcpStream; use tokio::task::JoinHandle; use tokio_stream::wrappers::TcpListenerStream; -use crate::error as server_error; -use crate::mysql::error::{self, Result}; -use crate::mysql::mysql_instance::{MysqlInstanceRef, MysqlInstanceShim}; +use crate::error::{self, Result}; +use crate::mysql::handler::MysqlInstanceShim; +use crate::query_handler::SqlQueryHandlerRef; use crate::server::Server; pub struct MysqlServer { @@ -32,14 +32,14 @@ pub struct MysqlServer { // A handle holding the TCP accepting task. join_handle: Option>, - mysql_handler: MysqlInstanceRef, + query_handler: SqlQueryHandlerRef, io_runtime: Arc, } impl MysqlServer { /// Creates a new MySQL server with provided [MysqlInstance] and [Runtime]. pub fn create_server( - mysql_handler: MysqlInstanceRef, + query_handler: SqlQueryHandlerRef, io_runtime: Arc, ) -> Box { let (abort_handle, registration) = AbortHandle::new_pair(); @@ -47,7 +47,7 @@ impl MysqlServer { abort_handle, abort_registration: Some(registration), join_handle: None, - mysql_handler, + query_handler, io_runtime, }) } @@ -59,21 +59,22 @@ impl MysqlServer { err_msg: format!("Failed to bind addr {}", addr), })?; // get actually bond addr in case input addr use port 0 - let listener_addr = listener.local_addr()?; - Ok((TcpListenerStream::new(listener), listener_addr)) + let addr = listener.local_addr()?; + info!("MySQL server is bound to {}", addr); + Ok((TcpListenerStream::new(listener), addr)) } fn accept(&self, accepting_stream: Abortable) -> impl Future { let io_runtime = self.io_runtime.clone(); - let mysql_handler = self.mysql_handler.clone(); + let query_handler = self.query_handler.clone(); accepting_stream.for_each(move |tcp_stream| { let io_runtime = io_runtime.clone(); - let mysql_handler = mysql_handler.clone(); + let query_handler = query_handler.clone(); async move { match tcp_stream { Err(error) => error!("Broken pipe: {}", error), Ok(io_stream) => { - if let Err(error) = Self::handle(io_stream, io_runtime, mysql_handler) { + if let Err(error) = Self::handle(io_stream, io_runtime, query_handler) { error!("Unexpected error when handling TcpStream: {:?}", error); }; } @@ -85,10 +86,10 @@ impl MysqlServer { pub fn handle( stream: TcpStream, io_runtime: Arc, - mysql_handler: MysqlInstanceRef, + query_handler: SqlQueryHandlerRef, ) -> Result<()> { info!("MySQL connection coming from: {}", stream.peer_addr()?); - let shim = MysqlInstanceShim::create(mysql_handler); + let shim = MysqlInstanceShim::create(query_handler); // TODO(LFC): Relate "handler" with MySQL session; also deal with panics there. let _handler = io_runtime.spawn(AsyncMysqlIntermediary::run_on(shim, stream)); Ok(()) @@ -97,7 +98,7 @@ impl MysqlServer { #[async_trait] impl Server for MysqlServer { - async fn shutdown(&mut self) -> server_error::Result<()> { + async fn shutdown(&mut self) -> Result<()> { match self.join_handle.take() { Some(join_handle) => { self.abort_handle.abort(); @@ -112,17 +113,14 @@ impl Server for MysqlServer { None => error::InternalSnafu { err_msg: "MySQL server is not started.", } - .fail() - .context(server_error::MysqlServerSnafu), + .fail()?, } } - async fn start(&mut self, listening: SocketAddr) -> server_error::Result { + async fn start(&mut self, listening: SocketAddr) -> Result { match self.abort_registration.take() { Some(registration) => { - let (stream, listener) = Self::bind(listening) - .await - .context(server_error::MysqlServerSnafu)?; + let (stream, listener) = Self::bind(listening).await?; let stream = Abortable::new(stream, registration); self.join_handle = Some(tokio::spawn(self.accept(stream))); Ok(listener) @@ -130,8 +128,7 @@ impl Server for MysqlServer { None => error::InternalSnafu { err_msg: "MySQL server has been started.", } - .fail() - .context(server_error::MysqlServerSnafu), + .fail()?, } } } diff --git a/src/common/servers/src/mysql/mysql_writer.rs b/src/servers/src/mysql/writer.rs similarity index 99% rename from src/common/servers/src/mysql/mysql_writer.rs rename to src/servers/src/mysql/writer.rs index 305a7d0e04..42b7086d14 100644 --- a/src/common/servers/src/mysql/mysql_writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -9,7 +9,7 @@ use opensrv_mysql::{ use query::Output; use snafu::prelude::*; -use crate::mysql::error::{self, Error, Result}; +use crate::error::{self, Error, Result}; struct QueryResult { recordbatches: Vec, diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs new file mode 100644 index 0000000000..0c469c37b6 --- /dev/null +++ b/src/servers/src/query_handler.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; + +use api::v1::{ObjectExpr, ObjectResult}; +use async_trait::async_trait; +use query::Output; + +use crate::error::Result; + +/// All query handler traits for various request protocols, like SQL or GRPC. +/// Instance that wishes to support certain request protocol, just implement the corresponding +/// trait, the Server will handle codec for you. +/// +/// Note: +/// Query handlers are not confined to only handle read requests, they are expecting to handle +/// write requests too. So the "query" here not might seem ambiguity. However, "query" has been +/// used as some kind of "convention", it's the "Q" in "SQL". So we might better stick to the +/// word "query". + +pub type SqlQueryHandlerRef = Arc; +pub type GrpcQueryHandlerRef = Arc; + +#[async_trait] +pub trait SqlQueryHandler { + async fn do_query(&self, query: &str) -> Result; +} + +#[async_trait] +pub trait GrpcQueryHandler { + async fn do_query(&self, query: ObjectExpr) -> Result; +} diff --git a/src/common/servers/src/server.rs b/src/servers/src/server.rs similarity index 100% rename from src/common/servers/src/server.rs rename to src/servers/src/server.rs diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs new file mode 100644 index 0000000000..22c30b81fb --- /dev/null +++ b/src/servers/tests/http/http_handler_test.rs @@ -0,0 +1,78 @@ +use std::collections::HashMap; + +use axum::extract::Query; +use axum::Extension; +use common_telemetry::metric; +use metrics::counter; +use servers::http::handler as http_handler; +use servers::http::{HttpResponse, JsonOutput}; +use test_util::MemTable; + +use crate::create_testing_sql_query_handler; + +#[tokio::test] +async fn test_sql_not_provided() { + let query_handler = create_testing_sql_query_handler(MemTable::default_numbers_table()); + let extension = Extension(query_handler); + + let json = http_handler::sql(extension, Query(HashMap::default())).await; + match json { + HttpResponse::Json(json) => { + assert!(!json.success()); + assert_eq!( + Some(&"sql parameter is required.".to_string()), + json.error() + ); + assert!(json.output().is_none()); + } + _ => unreachable!(), + } +} + +#[tokio::test] +async fn test_sql_output_rows() { + common_telemetry::init_default_ut_logging(); + + let query = create_query(); + let query_handler = create_testing_sql_query_handler(MemTable::default_numbers_table()); + let extension = Extension(query_handler); + + let json = http_handler::sql(extension, query).await; + match json { + HttpResponse::Json(json) => { + assert!(json.success(), "{:?}", json); + assert!(json.error().is_none()); + match json.output().expect("assertion failed") { + JsonOutput::Rows(rows) => { + assert_eq!(1, rows.len()); + } + _ => unreachable!(), + } + } + _ => unreachable!(), + } +} + +#[tokio::test] +async fn test_metrics() { + metric::init_default_metrics_recorder(); + + counter!("test_metrics", 1); + + let query = create_query(); + let query_handler = create_testing_sql_query_handler(MemTable::default_numbers_table()); + let extension = Extension(query_handler); + + let text = http_handler::metrics(extension, query).await; + match text { + HttpResponse::Text(s) => assert!(s.contains("test_metrics counter")), + _ => unreachable!(), + } +} + +fn create_query() -> Query> { + Query(HashMap::from([( + "sql".to_string(), + "select sum(uint32s) from numbers limit 20".to_string(), + )])) +} diff --git a/src/servers/tests/http/mod.rs b/src/servers/tests/http/mod.rs new file mode 100644 index 0000000000..5d1b718df1 --- /dev/null +++ b/src/servers/tests/http/mod.rs @@ -0,0 +1 @@ +mod http_handler_test; diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs new file mode 100644 index 0000000000..cca5e86628 --- /dev/null +++ b/src/servers/tests/mod.rs @@ -0,0 +1,42 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; +use catalog::{ + CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, +}; +use query::{Output, QueryEngineFactory, QueryEngineRef}; +use servers::error::Result; +use servers::query_handler::{SqlQueryHandler, SqlQueryHandlerRef}; +use test_util::MemTable; + +mod http; +mod mysql; + +struct DummyInstance { + query_engine: QueryEngineRef, +} + +#[async_trait] +impl SqlQueryHandler for DummyInstance { + async fn do_query(&self, query: &str) -> Result { + let plan = self.query_engine.sql_to_plan(query).unwrap(); + Ok(self.query_engine.execute(&plan).await.unwrap()) + } +} + +fn create_testing_sql_query_handler(table: MemTable) -> SqlQueryHandlerRef { + let table_name = table.table_name().to_string(); + let table = Arc::new(table); + + let schema_provider = Arc::new(MemorySchemaProvider::new()); + let catalog_provider = Arc::new(MemoryCatalogProvider::new()); + let catalog_list = Arc::new(MemoryCatalogList::default()); + schema_provider.register_table(table_name, table).unwrap(); + catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider); + catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider); + + let factory = QueryEngineFactory::new(catalog_list); + let query_engine = factory.query_engine().clone(); + Arc::new(DummyInstance { query_engine }) +} diff --git a/src/common/servers/tests/mysql/mod.rs b/src/servers/tests/mysql/mod.rs similarity index 100% rename from src/common/servers/tests/mysql/mod.rs rename to src/servers/tests/mysql/mod.rs diff --git a/src/common/servers/tests/mysql/mysql_server_test.rs b/src/servers/tests/mysql/mysql_server_test.rs similarity index 79% rename from src/common/servers/tests/mysql/mysql_server_test.rs rename to src/servers/tests/mysql/mysql_server_test.rs index e5a5dd4d7b..ba3f99992d 100644 --- a/src/common/servers/tests/mysql/mysql_server_test.rs +++ b/src/servers/tests/mysql/mysql_server_test.rs @@ -2,49 +2,30 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use async_trait::async_trait; -use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; -use catalog::{ - CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, -}; use common_recordbatch::RecordBatch; use common_runtime::Builder as RuntimeBuilder; -use common_servers::mysql::error::{Result, RuntimeResourceSnafu}; -use common_servers::mysql::mysql_instance::MysqlInstance; -use common_servers::mysql::mysql_server::MysqlServer; -use common_servers::server::Server; use datatypes::schema::Schema; use mysql_async::prelude::*; -use query::{Output, QueryEngineFactory, QueryEngineRef}; use rand::rngs::StdRng; use rand::Rng; -use snafu::prelude::*; +use servers::error::Result; +use servers::mysql::server::MysqlServer; +use servers::server::Server; use test_util::MemTable; +use crate::create_testing_sql_query_handler; use crate::mysql::{all_datatype_testing_data, MysqlTextRow, TestingData}; fn create_mysql_server(table: MemTable) -> Result> { - let table_name = table.table_name().to_string(); - let table = Arc::new(table); - - let schema_provider = Arc::new(MemorySchemaProvider::new()); - schema_provider.register_table(table_name, table).unwrap(); - let catalog_provider = Arc::new(MemoryCatalogProvider::new()); - catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider); - let catalog_list = Arc::new(MemoryCatalogList::default()); - catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider); - let factory = QueryEngineFactory::new(catalog_list); - let query_engine = factory.query_engine().clone(); - - let mysql_instance = Arc::new(DummyMysqlInstance { query_engine }); + let query_handler = create_testing_sql_query_handler(table); let io_runtime = Arc::new( RuntimeBuilder::default() .worker_threads(4) .thread_name("mysql-io-handlers") .build() - .context(RuntimeResourceSnafu)?, + .unwrap(), ); - Ok(MysqlServer::create_server(mysql_instance, io_runtime)) + Ok(MysqlServer::create_server(query_handler, io_runtime)) } #[tokio::test] @@ -209,15 +190,3 @@ async fn create_connection(port: u16) -> mysql_async::Result .wait_timeout(Some(1000)); mysql_async::Conn::new(opts).await } - -struct DummyMysqlInstance { - query_engine: QueryEngineRef, -} - -#[async_trait] -impl MysqlInstance for DummyMysqlInstance { - async fn do_query(&self, query: &str) -> Result { - let plan = self.query_engine.sql_to_plan(query).unwrap(); - Ok(self.query_engine.execute(&plan).await.unwrap()) - } -} diff --git a/src/common/servers/tests/mysql/mysql_writer_test.rs b/src/servers/tests/mysql/mysql_writer_test.rs similarity index 94% rename from src/common/servers/tests/mysql/mysql_writer_test.rs rename to src/servers/tests/mysql/mysql_writer_test.rs index 064c700751..392711b425 100644 --- a/src/common/servers/tests/mysql/mysql_writer_test.rs +++ b/src/servers/tests/mysql/mysql_writer_test.rs @@ -1,8 +1,8 @@ use std::sync::Arc; -use common_servers::mysql::mysql_writer::create_mysql_column_def; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema}; +use servers::mysql::writer::create_mysql_column_def; use crate::mysql::{all_datatype_testing_data, TestingData};