From ca732d45f987a4d71dabf92667f6cb1b26bf3390 Mon Sep 17 00:00:00 2001 From: LFC Date: Mon, 26 Sep 2022 15:47:43 +0800 Subject: [PATCH] feat: opentsdb support (#274) * feat: opentsdb support * fix: tests * fix: resolve CR comments * fix: resolve CR comments * fix: resolve CR comments * fix: resolve CR comments * refactor: remove feature flags for opentsdb and pg * fix: resolve CR comments * fix: resolve CR comments Co-authored-by: luofucong --- Cargo.lock | 74 ++++- src/cmd/Cargo.toml | 4 - src/cmd/src/datanode.rs | 10 +- src/cmd/src/frontend.rs | 51 +++- src/common/error/src/status_code.rs | 1 + src/datanode/Cargo.toml | 7 +- src/datanode/src/datanode.rs | 4 - src/datanode/src/instance.rs | 5 +- src/datanode/src/server.rs | 5 - src/datanode/src/tests/instance_test.rs | 16 +- src/frontend/Cargo.toml | 4 - src/frontend/src/error.rs | 24 +- src/frontend/src/frontend.rs | 21 +- src/frontend/src/instance.rs | 99 +------ src/frontend/src/instance/opentsdb.rs | 280 ++++++++++++++++++ src/frontend/src/lib.rs | 5 + src/frontend/src/mysql.rs | 16 + src/frontend/src/opentsdb.rs | 16 + src/frontend/src/postgres.rs | 16 + src/frontend/src/server.rs | 58 ++-- src/frontend/src/tests.rs | 61 ++++ src/servers/Cargo.toml | 15 +- src/servers/src/error.rs | 59 +++- src/servers/src/http.rs | 54 +++- src/servers/src/http/handler.rs | 15 +- src/servers/src/http/opentsdb.rs | 225 ++++++++++++++ src/servers/src/lib.rs | 5 +- src/servers/src/mysql/server.rs | 89 ++---- src/servers/src/opentsdb.rs | 102 +++++++ src/servers/src/opentsdb/codec.rs | 280 ++++++++++++++++++ src/servers/src/opentsdb/connection.rs | 190 ++++++++++++ src/servers/src/opentsdb/handler.rs | 184 ++++++++++++ src/servers/src/postgres/server.rs | 87 ++---- src/servers/src/query_handler.rs | 9 + src/servers/src/server.rs | 106 ++++++- src/servers/src/shutdown.rs | 51 ++++ src/servers/tests/http/http_handler_test.rs | 18 +- src/servers/tests/http/mod.rs | 1 + src/servers/tests/http/opentsdb_test.rs | 195 ++++++++++++ src/servers/tests/mod.rs | 2 +- src/servers/tests/opentsdb.rs | 186 ++++++++++++ src/table-engine/src/error.rs | 3 +- .../src/table/test_util/mock_engine.rs | 23 +- 43 files changed, 2300 insertions(+), 376 deletions(-) create mode 100644 src/frontend/src/instance/opentsdb.rs create mode 100644 src/frontend/src/mysql.rs create mode 100644 src/frontend/src/opentsdb.rs create mode 100644 src/frontend/src/postgres.rs create mode 100644 src/frontend/src/tests.rs create mode 100644 src/servers/src/http/opentsdb.rs create mode 100644 src/servers/src/opentsdb.rs create mode 100644 src/servers/src/opentsdb/codec.rs create mode 100644 src/servers/src/opentsdb/connection.rs create mode 100644 src/servers/src/opentsdb/handler.rs create mode 100644 src/servers/src/shutdown.rs create mode 100644 src/servers/tests/http/opentsdb_test.rs create mode 100644 src/servers/tests/opentsdb.rs diff --git a/Cargo.lock b/Cargo.lock index 500022b528..4c5d338f47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -286,7 +286,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c9e3356844c4d6a6d6467b8da2cffb4a2820be256f50a3a386c9d152bab31043" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.2.8", "bitflags", "bytes", "futures-util", @@ -294,7 +294,38 @@ dependencies = [ "http-body", "hyper", "itoa 1.0.3", - "matchit", + "matchit 0.5.0", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum" +version = "0.6.0-rc.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2628a243073c55aef15a1c1fe45c87f21b84f9e89ca9e7b262a180d3d03543d" +dependencies = [ + "async-trait", + "axum-core 0.3.0-rc.2", + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa 1.0.3", + "matchit 0.6.0", "memchr", "mime", "percent-encoding", @@ -327,10 +358,26 @@ dependencies = [ ] [[package]] -name = "axum-macros" -version = "0.2.3" +name = "axum-core" +version = "0.3.0-rc.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6293dae2ec708e679da6736e857cf8532886ef258e92930f38279c12641628b8" +checksum = "473bd0762170028bb6b5068be9e97de2a9f0af3bf2084498d840498f47194d3d" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-macros" +version = "0.3.0-rc.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "247a599903eb2e02abbaf2facc6396140df7af6dcc84e64ce3b71d117702fa22" dependencies = [ "heck 0.4.0", "proc-macro2", @@ -344,7 +391,7 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b5f0c689f3a3cb707ea097813153343b74dcf73b3e46dedb25be91a24050913" dependencies = [ - "axum", + "axum 0.5.16", "bytes", "http", "http-body", @@ -1372,7 +1419,7 @@ dependencies = [ "api", "arrow2", "async-trait", - "axum", + "axum 0.6.0-rc.2", "axum-macros", "axum-test-helper", "catalog", @@ -2606,6 +2653,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" +[[package]] +name = "matchit" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dfc802da7b1cf80aefffa0c7b2f77247c8b32206cc83c270b61264f5b360a80" + [[package]] name = "matrixmultiply" version = "0.3.2" @@ -4633,8 +4686,10 @@ version = "0.1.0" dependencies = [ "api", "async-trait", - "axum", + "axum 0.6.0-rc.2", "axum-macros", + "axum-test-helper", + "bytes", "catalog", "common-base", "common-error", @@ -4662,6 +4717,7 @@ dependencies = [ "tokio", "tokio-postgres", "tokio-stream", + "tokio-test", "tonic", "tower", "tower-http", @@ -5482,7 +5538,7 @@ checksum = "11cd56bdb54ef93935a6a79dbd1d91f1ebd4c64150fd61654031fd6b8b775c91" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.5.16", "base64", "bytes", "futures-core", diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index f77be51b13..1cb37040ad 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -21,7 +21,3 @@ toml = "0.5" [dev-dependencies] serde = "1.0" tempdir = "0.3" - -[features] -default = ["postgres"] -postgres = ["datanode/postgres"] diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 57973bb55d..6fd3682a67 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -39,7 +39,6 @@ struct StartCommand { rpc_addr: Option, #[clap(long)] mysql_addr: Option, - #[cfg(feature = "postgres")] #[clap(long)] postgres_addr: Option, #[clap(short, long)] @@ -81,7 +80,6 @@ impl TryFrom for DatanodeOptions { if let Some(addr) = cmd.mysql_addr { opts.mysql_addr = addr; } - #[cfg(feature = "postgres")] if let Some(addr) = cmd.postgres_addr { opts.postgres_addr = addr; } @@ -102,7 +100,6 @@ mod tests { http_addr: None, rpc_addr: None, mysql_addr: None, - #[cfg(feature = "postgres")] postgres_addr: None, config_file: Some(format!( "{}/../../config/datanode.example.toml", @@ -116,11 +113,8 @@ mod tests { assert_eq!("0.0.0.0:3306".to_string(), options.mysql_addr); assert_eq!(4, options.mysql_runtime_size); - #[cfg(feature = "postgres")] - { - assert_eq!("0.0.0.0:5432".to_string(), options.postgres_addr); - assert_eq!(4, options.postgres_runtime_size); - } + assert_eq!("0.0.0.0:5432".to_string(), options.postgres_addr); + assert_eq!(4, options.postgres_runtime_size); match options.storage { ObjectStoreConfig::File { data_dir } => { diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 9256ba7d69..4da2dbf925 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -1,5 +1,8 @@ use clap::Parser; use frontend::frontend::{Frontend, FrontendOptions}; +use frontend::mysql::MysqlOptions; +use frontend::opentsdb::OpentsdbOptions; +use frontend::postgres::PostgresOptions; use snafu::ResultExt; use crate::error::{self, Result}; @@ -38,9 +41,10 @@ struct StartCommand { grpc_addr: Option, #[clap(long)] mysql_addr: Option, - #[cfg(feature = "postgres")] #[clap(long)] postgres_addr: Option, + #[clap(long)] + opentsdb_addr: Option, #[clap(short, long)] config_file: Option, } @@ -70,11 +74,22 @@ impl TryFrom for FrontendOptions { opts.grpc_addr = Some(addr); } if let Some(addr) = cmd.mysql_addr { - opts.mysql_addr = Some(addr); + opts.mysql_options = Some(MysqlOptions { + addr, + ..Default::default() + }); } - #[cfg(feature = "postgres")] if let Some(addr) = cmd.postgres_addr { - opts.postgres_addr = Some(addr); + opts.postgres_options = Some(PostgresOptions { + addr, + ..Default::default() + }); + } + if let Some(addr) = cmd.opentsdb_addr { + opts.opentsdb_options = Some(OpentsdbOptions { + addr, + ..Default::default() + }); } Ok(opts) } @@ -90,24 +105,36 @@ mod tests { http_addr: Some("127.0.0.1:1234".to_string()), grpc_addr: None, mysql_addr: Some("127.0.0.1:5678".to_string()), - #[cfg(feature = "postgres")] postgres_addr: Some("127.0.0.1:5432".to_string()), + opentsdb_addr: Some("127.0.0.1:4321".to_string()), config_file: None, }; let opts: FrontendOptions = command.try_into().unwrap(); assert_eq!(opts.http_addr, Some("127.0.0.1:1234".to_string())); - assert_eq!(opts.mysql_addr, Some("127.0.0.1:5678".to_string())); - #[cfg(feature = "postgres")] - assert_eq!(opts.postgres_addr, Some("127.0.0.1:5432".to_string())); + assert_eq!(opts.mysql_options.as_ref().unwrap().addr, "127.0.0.1:5678"); + assert_eq!( + opts.postgres_options.as_ref().unwrap().addr, + "127.0.0.1:5432" + ); + assert_eq!( + opts.opentsdb_options.as_ref().unwrap().addr, + "127.0.0.1:4321" + ); let default_opts = FrontendOptions::default(); assert_eq!(opts.grpc_addr, default_opts.grpc_addr); - assert_eq!(opts.mysql_runtime_size, default_opts.mysql_runtime_size); - #[cfg(feature = "postgres")] assert_eq!( - opts.postgres_runtime_size, - default_opts.postgres_runtime_size + opts.mysql_options.as_ref().unwrap().runtime_size, + default_opts.mysql_options.as_ref().unwrap().runtime_size + ); + assert_eq!( + opts.postgres_options.as_ref().unwrap().runtime_size, + default_opts.postgres_options.as_ref().unwrap().runtime_size + ); + assert_eq!( + opts.opentsdb_options.as_ref().unwrap().runtime_size, + default_opts.opentsdb_options.as_ref().unwrap().runtime_size ); } } diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index b60d65a872..299c8fd1bc 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -36,6 +36,7 @@ pub enum StatusCode { TableAlreadyExists = 4000, TableNotFound = 4001, TableColumnNotFound = 4002, + TableColumnExists = 4003, // ====== End of catalog related status code ======= // ====== Begin of storage related status code ===== diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index f83560efb1..7bad02b547 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -4,17 +4,16 @@ version = "0.1.0" edition = "2021" [features] -default = ["python", "postgres"] +default = ["python"] python = [ "dep:script" ] -postgres = ["servers/postgres"] [dependencies] api = { path = "../api" } async-trait = "0.1" -axum = "0.5" -axum-macros = "0.2" +axum = "0.6.0-rc.2" +axum-macros = "0.3.0-rc.1" catalog = { path = "../catalog" } common-base = { path = "../common/base" } common-error = { path = "../common/error" } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 0ad523eecd..b7785d8d30 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -26,9 +26,7 @@ pub struct DatanodeOptions { pub rpc_addr: String, pub mysql_addr: String, pub mysql_runtime_size: u32, - #[cfg(feature = "postgres")] pub postgres_addr: String, - #[cfg(feature = "postgres")] pub postgres_runtime_size: u32, pub wal_dir: String, pub storage: ObjectStoreConfig, @@ -41,9 +39,7 @@ impl Default for DatanodeOptions { rpc_addr: "0.0.0.0:3001".to_string(), mysql_addr: "0.0.0.0:3306".to_string(), mysql_runtime_size: 2, - #[cfg(feature = "postgres")] postgres_addr: "0.0.0.0:5432".to_string(), - #[cfg(feature = "postgres")] postgres_runtime_size: 2, wal_dir: "/tmp/greptimedb/wal".to_string(), storage: ObjectStoreConfig::default(), diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index df298b6751..7eeb8b55c8 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -220,10 +220,9 @@ impl Instance { pub fn catalog_manager(&self) -> &CatalogManagerRef { &self.catalog_manager } -} -#[cfg(test)] -impl Instance { + // This method is used in other crate's testing codes, so move it out of "cfg(test)". + // TODO(LFC): Delete it when callers no longer need it. pub async fn new_mock() -> Result { use table_engine::table::test_util::new_test_object_store; use table_engine::table::test_util::MockEngine; diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index 182a4b5775..7a9ac13b6c 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -21,7 +21,6 @@ pub struct Services { http_server: HttpServer, grpc_server: GrpcServer, mysql_server: Box, - #[cfg(feature = "postgres")] postgres_server: Box, } @@ -34,7 +33,6 @@ impl Services { .build() .context(error::RuntimeResourceSnafu)?, ); - #[cfg(feature = "postgres")] let postgres_io_runtime = Arc::new( RuntimeBuilder::default() .worker_threads(opts.postgres_runtime_size as usize) @@ -46,7 +44,6 @@ impl Services { http_server: HttpServer::new(instance.clone()), grpc_server: GrpcServer::new(instance.clone(), instance.clone()), mysql_server: MysqlServer::create_server(instance.clone(), mysql_io_runtime), - #[cfg(feature = "postgres")] postgres_server: Box::new(PostgresServer::new(instance, postgres_io_runtime)), }) } @@ -65,7 +62,6 @@ impl Services { addr: &opts.mysql_addr, })?; - #[cfg(feature = "postgres")] let postgres_addr: SocketAddr = opts.postgres_addr.parse().context(error::ParseAddrSnafu { addr: &opts.postgres_addr, @@ -75,7 +71,6 @@ impl Services { self.http_server.start(http_addr), self.grpc_server.start(grpc_addr), self.mysql_server.start(mysql_addr), - #[cfg(feature = "postgres")] self.postgres_server.start(postgres_addr), ) .context(error::StartServerSnafu)?; diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index 94062a2bbf..faa0316db5 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -193,8 +193,6 @@ pub async fn test_create_table_illegal_timestamp_type() { #[tokio::test] async fn test_alter_table() { - common_telemetry::init_default_ut_logging(); - // TODO(LFC) Use real Mito engine when we can alter its region schema, // and delete the `new_mock` method. let instance = Instance::new_mock().await.unwrap(); @@ -237,13 +235,13 @@ async fn test_alter_table() { let pretty_print = arrow_print::write(&recordbatch); let pretty_print = pretty_print.lines().collect::>(); let expected = vec![ - "+-------+---------------------+-----+--------+--------+", - "| host | ts | cpu | memory | my_tag |", - "+-------+---------------------+-----+--------+--------+", - "| host1 | 1970-01-01 00:00:01 | 1.1 | 100 | |", - "| host2 | 1970-01-01 00:00:02 | 2.2 | 200 | hello |", - "| host3 | 1970-01-01 00:00:03 | 3.3 | 300 | |", - "+-------+---------------------+-----+--------+--------+", + "+-------+-----+--------+---------------------+--------+", + "| host | cpu | memory | ts | my_tag |", + "+-------+-----+--------+---------------------+--------+", + "| host1 | 1.1 | 100 | 1970-01-01 00:00:01 | |", + "| host2 | 2.2 | 200 | 1970-01-01 00:00:02 | hello |", + "| host3 | 3.3 | 300 | 1970-01-01 00:00:03 | |", + "+-------+-----+--------+---------------------+--------+", ]; assert_eq!(pretty_print, expected); } diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index b8a3341bb4..3fea2cf4f2 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -37,7 +37,3 @@ futures = "0.3" tempdir = "0.3" tonic = "0.8" tower = "0.4" - -[features] -default = ["postgres"] -postgres = ["servers/postgres"] diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 1a06497a61..4659115196 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -12,6 +12,12 @@ pub enum Error { source: client::Error, }, + #[snafu(display("Failed to request Datanode, source: {}", source))] + RequestDatanode { + #[snafu(backtrace)] + source: client::Error, + }, + #[snafu(display("Runtime resource error, source: {}", source))] RuntimeResource { #[snafu(backtrace)] @@ -64,6 +70,18 @@ pub enum Error { err_msg: String, backtrace: Backtrace, }, + + #[snafu(display("Incomplete GRPC result: {}", err_msg))] + IncompleteGrpcResult { + err_msg: String, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to execute OpenTSDB put, reason: {}", reason))] + ExecOpentsdbPut { + reason: String, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -78,8 +96,12 @@ impl ErrorExt for Error { Error::StartServer { source, .. } => source.status_code(), Error::ParseSql { source } => source.status_code(), Error::ConvertColumnDefaultConstraint { source, .. } => source.status_code(), + Error::RequestDatanode { source } => source.status_code(), Error::ColumnDataType { .. } => StatusCode::Internal, - Error::IllegalFrontendState { .. } => StatusCode::Unexpected, + Error::IllegalFrontendState { .. } | Error::IncompleteGrpcResult { .. } => { + StatusCode::Unexpected + } + Error::ExecOpentsdbPut { .. } => StatusCode::Internal, } } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 6f55cc1e9d..9ca7e60081 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -5,18 +5,18 @@ use snafu::prelude::*; use crate::error::{self, Result}; use crate::instance::Instance; +use crate::mysql::MysqlOptions; +use crate::opentsdb::OpentsdbOptions; +use crate::postgres::PostgresOptions; use crate::server::Services; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct FrontendOptions { pub http_addr: Option, pub grpc_addr: Option, - pub mysql_addr: Option, - pub mysql_runtime_size: u32, - #[cfg(feature = "postgres")] - pub postgres_addr: Option, - #[cfg(feature = "postgres")] - pub postgres_runtime_size: u32, + pub mysql_options: Option, + pub postgres_options: Option, + pub opentsdb_options: Option, } impl Default for FrontendOptions { @@ -24,12 +24,9 @@ impl Default for FrontendOptions { Self { http_addr: Some("0.0.0.0:4000".to_string()), grpc_addr: Some("0.0.0.0:4001".to_string()), - mysql_addr: Some("0.0.0.0:4002".to_string()), - mysql_runtime_size: 2, - #[cfg(feature = "postgres")] - postgres_addr: Some("0.0.0.0:4003".to_string()), - #[cfg(feature = "postgres")] - postgres_runtime_size: 2, + mysql_options: Some(MysqlOptions::default()), + postgres_options: Some(PostgresOptions::default()), + opentsdb_options: Some(OpentsdbOptions::default()), } } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 9dc98b3ae0..38ada0f2c0 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -1,3 +1,5 @@ +mod opentsdb; + use std::collections::HashMap; use std::sync::Arc; @@ -258,30 +260,21 @@ mod tests { use std::assert_matches::assert_matches; use api::v1::codec::{InsertBatch, SelectResult}; - use api::v1::greptime_client::GreptimeClient; use api::v1::{ admin_expr, admin_result, column, object_expr, object_result, select_expr, Column, ExprHeader, MutateResult, SelectExpr, }; use datafusion::arrow_print; use datafusion_common::record_batch::RecordBatch as DfRecordBatch; - use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; - use datanode::instance::Instance as DatanodeInstance; use datatypes::schema::ColumnDefaultConstraint; use datatypes::value::Value; - use servers::grpc::GrpcServer; - use tempdir::TempDir; - use tonic::transport::{Endpoint, Server}; - use tower::service_fn; use super::*; + use crate::tests; #[tokio::test] async fn test_execute_sql() { - common_telemetry::init_default_ut_logging(); - - let datanode_instance = create_datanode_instance().await; - let frontend_instance = create_frontend_instance(datanode_instance).await; + let instance = tests::create_frontend_instance().await; let sql = r#"CREATE TABLE demo( host STRING, @@ -292,9 +285,7 @@ mod tests { TIME INDEX (ts), PRIMARY KEY(ts, host) ) engine=mito with(regions=1);"#; - let output = SqlQueryHandler::do_query(&*frontend_instance, sql) - .await - .unwrap(); + let output = SqlQueryHandler::do_query(&*instance, sql).await.unwrap(); match output { Output::AffectedRows(rows) => assert_eq!(rows, 1), _ => unreachable!(), @@ -305,18 +296,14 @@ mod tests { ('frontend.host2', null, null, 2000), ('frontend.host3', 3.3, 300, 3000) "#; - let output = SqlQueryHandler::do_query(&*frontend_instance, sql) - .await - .unwrap(); + let output = SqlQueryHandler::do_query(&*instance, sql).await.unwrap(); match output { Output::AffectedRows(rows) => assert_eq!(rows, 3), _ => unreachable!(), } let sql = "select * from demo"; - let output = SqlQueryHandler::do_query(&*frontend_instance, sql) - .await - .unwrap(); + let output = SqlQueryHandler::do_query(&*instance, sql).await.unwrap(); match output { Output::RecordBatches(recordbatches) => { let recordbatches = recordbatches @@ -341,9 +328,7 @@ mod tests { }; let sql = "select * from demo where ts>cast(1000000000 as timestamp)"; // use nanoseconds as where condition - let output = SqlQueryHandler::do_query(&*frontend_instance, sql) - .await - .unwrap(); + let output = SqlQueryHandler::do_query(&*instance, sql).await.unwrap(); match output { Output::RecordBatches(recordbatches) => { let recordbatches = recordbatches @@ -369,10 +354,7 @@ mod tests { #[tokio::test] async fn test_execute_grpc() { - common_telemetry::init_default_ut_logging(); - - let datanode_instance = create_datanode_instance().await; - let frontend_instance = create_frontend_instance(datanode_instance).await; + let instance = tests::create_frontend_instance().await; // testing data: let expected_host_col = Column { @@ -432,7 +414,7 @@ mod tests { header: Some(ExprHeader::default()), expr: Some(admin_expr::Expr::Create(create_expr)), }; - let result = GrpcAdminHandler::exec_admin_request(&*frontend_instance, admin_expr) + let result = GrpcAdminHandler::exec_admin_request(&*instance, admin_expr) .await .unwrap(); assert_matches!( @@ -462,7 +444,7 @@ mod tests { header: Some(ExprHeader::default()), expr: Some(object_expr::Expr::Insert(insert_expr)), }; - let result = GrpcQueryHandler::do_query(&*frontend_instance, object_expr) + let result = GrpcQueryHandler::do_query(&*instance, object_expr) .await .unwrap(); assert_matches!( @@ -480,7 +462,7 @@ mod tests { expr: Some(select_expr::Expr::Sql("select * from demo".to_string())), })), }; - let result = GrpcQueryHandler::do_query(&*frontend_instance, object_expr) + let result = GrpcQueryHandler::do_query(&*instance, object_expr) .await .unwrap(); match result.result { @@ -508,63 +490,6 @@ mod tests { } } - async fn create_datanode_instance() -> Arc { - let wal_tmp_dir = TempDir::new("/tmp/greptimedb_test_wal").unwrap(); - let data_tmp_dir = TempDir::new("/tmp/greptimedb_test_data").unwrap(); - let opts = DatanodeOptions { - wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), - storage: ObjectStoreConfig::File { - data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), - }, - ..Default::default() - }; - - let instance = Arc::new(DatanodeInstance::new(&opts).await.unwrap()); - instance.start().await.unwrap(); - instance - } - - async fn create_frontend_instance(datanode_instance: Arc) -> Arc { - let (client, server) = tokio::io::duplex(1024); - - // create a mock datanode grpc service, see example here: - // https://github.com/hyperium/tonic/blob/master/examples/src/mock/mock.rs - let datanode_service = - GrpcServer::new(datanode_instance.clone(), datanode_instance).create_service(); - tokio::spawn(async move { - Server::builder() - .add_service(datanode_service) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) - .await - }); - - // Move client to an option so we can _move_ the inner value - // on the first attempt to connect. All other attempts will fail. - let mut client = Some(client); - // "http://[::]:50051" is just a placeholder, does not actually connect to it, - // see https://github.com/hyperium/tonic/issues/727#issuecomment-881532934 - let channel = Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector(service_fn(move |_| { - let client = client.take(); - - async move { - if let Some(client) = client { - Ok(client) - } else { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Client already taken", - )) - } - } - })) - .await - .unwrap(); - let client = Client::with_client(GreptimeClient::new(channel)); - Arc::new(Instance::with_client(client)) - } - fn create_expr() -> CreateExpr { let column_defs = vec![ GrpcColumnDef { diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs new file mode 100644 index 0000000000..4c3b6564ed --- /dev/null +++ b/src/frontend/src/instance/opentsdb.rs @@ -0,0 +1,280 @@ +use std::collections::HashMap; + +use api::v1::{alter_expr, AddColumn, AlterExpr, ColumnDataType, ColumnDef, CreateExpr}; +use async_trait::async_trait; +use client::{Error as ClientError, ObjectResult}; +use common_error::prelude::{BoxedError, StatusCode}; +use common_telemetry::info; +use servers::error as server_error; +use servers::opentsdb::codec::{ + DataPoint, OPENTSDB_TIMESTAMP_COLUMN_NAME, OPENTSDB_VALUE_COLUMN_NAME, +}; +use servers::query_handler::OpentsdbProtocolHandler; +use snafu::prelude::*; + +use crate::error::{self, Result}; +use crate::instance::Instance; + +#[async_trait] +impl OpentsdbProtocolHandler for Instance { + async fn exec(&self, data_point: &DataPoint) -> server_error::Result<()> { + // TODO(LFC): Insert metrics in batch, then make OpentsdbLineProtocolHandler::exec received multiple data points, when + // metric table and tags can be created upon insertion. + self.insert_opentsdb_metric(data_point) + .await + .map_err(BoxedError::new) + .with_context(|_| server_error::PutOpentsdbDataPointSnafu { + data_point: format!("{:?}", data_point), + })?; + Ok(()) + } +} + +impl Instance { + async fn insert_opentsdb_metric(&self, data_point: &DataPoint) -> Result<()> { + let expr = data_point.as_grpc_insert(); + + let result = self.db.insert(expr.clone()).await; + + let object_result = match result { + Ok(result) => result, + Err(ClientError::Datanode { code, .. }) => { + let retry = if code == StatusCode::TableNotFound as u32 { + self.create_opentsdb_metric(data_point).await?; + true + } else if code == StatusCode::TableColumnNotFound as u32 { + self.create_opentsdb_tags(data_point).await?; + true + } else { + false + }; + if retry { + self.db + .insert(expr.clone()) + .await + .context(error::RequestDatanodeSnafu)? + } else { + // `unwrap_err` is safe because we are matching "result" in "Err" arm + return Err(result.context(error::RequestDatanodeSnafu).unwrap_err()); + } + } + Err(_) => { + return Err(result.context(error::RequestDatanodeSnafu).unwrap_err()); + } + }; + + match object_result { + ObjectResult::Mutate(mutate) => { + if mutate.success != 1 || mutate.failure != 0 { + return error::ExecOpentsdbPutSnafu { + reason: format!("illegal result: {:?}", mutate), + } + .fail(); + } + } + ObjectResult::Select(_) => unreachable!(), + } + Ok(()) + } + + async fn create_opentsdb_metric(&self, data_point: &DataPoint) -> Result<()> { + let mut column_defs = Vec::with_capacity(2 + data_point.tags().len()); + + let ts_column = ColumnDef { + name: OPENTSDB_TIMESTAMP_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Timestamp as i32, + is_nullable: false, + ..Default::default() + }; + column_defs.push(ts_column); + + let value_column = ColumnDef { + name: OPENTSDB_VALUE_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: false, + ..Default::default() + }; + column_defs.push(value_column); + + for (tagk, _) in data_point.tags().iter() { + column_defs.push(ColumnDef { + name: tagk.to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: true, + ..Default::default() + }) + } + + let expr = CreateExpr { + catalog_name: None, + schema_name: None, + table_name: data_point.metric().to_string(), + desc: Some(format!( + "Table for OpenTSDB metric: {}", + &data_point.metric() + )), + column_defs, + time_index: OPENTSDB_TIMESTAMP_COLUMN_NAME.to_string(), + primary_keys: vec![], + create_if_not_exists: true, + table_options: HashMap::new(), + }; + + let result = self + .admin + .create(expr) + .await + .context(error::RequestDatanodeSnafu)?; + let header = result.header.context(error::IncompleteGrpcResultSnafu { + err_msg: "'header' is missing", + })?; + if header.code == (StatusCode::Success as u32) + || header.code == (StatusCode::TableAlreadyExists as u32) + { + info!( + "OpenTSDB metric table for \"{}\" is created!", + data_point.metric() + ); + Ok(()) + } else { + error::ExecOpentsdbPutSnafu { + reason: format!("error code: {}", header.code), + } + .fail() + } + } + + async fn create_opentsdb_tags(&self, data_point: &DataPoint) -> Result<()> { + // TODO(LFC): support adding columns in one request + for (tagk, _) in data_point.tags().iter() { + let tag_column = ColumnDef { + name: tagk.to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: true, + ..Default::default() + }; + let expr = AlterExpr { + catalog_name: None, + schema_name: None, + table_name: data_point.metric().to_string(), + kind: Some(alter_expr::Kind::AddColumn(AddColumn { + column_def: Some(tag_column), + })), + }; + + let result = self + .admin + .alter(expr) + .await + .context(error::RequestDatanodeSnafu)?; + let header = result.header.context(error::IncompleteGrpcResultSnafu { + err_msg: "'header' is missing", + })?; + if header.code != (StatusCode::Success as u32) + && header.code != (StatusCode::TableColumnExists as u32) + { + return error::ExecOpentsdbPutSnafu { + reason: format!("error code: {}", header.code), + } + .fail(); + } + info!( + "OpenTSDB tag \"{}\" for metric \"{}\" is added!", + tagk, + data_point.metric() + ); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use common_query::Output; + use datafusion::arrow_print; + use servers::query_handler::SqlQueryHandler; + + use super::*; + use crate::tests; + + #[tokio::test] + async fn test_exec() { + let instance = tests::create_frontend_instance().await; + instance + .exec( + &DataPoint::try_create( + "put sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0", + ) + .unwrap(), + ) + .await + .unwrap(); + instance + .exec(&DataPoint::try_create("put sys.procs.running 1479496100 42 host=web01").unwrap()) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_insert_opentsdb_metric() { + let instance = tests::create_frontend_instance().await; + + let data_point1 = DataPoint::new( + "my_metric_1".to_string(), + 1000, + 1.0, + vec![ + ("tagk1".to_string(), "tagv1".to_string()), + ("tagk2".to_string(), "tagv2".to_string()), + ], + ); + // should create new table "my_metric_1" directly + let result = instance.insert_opentsdb_metric(&data_point1).await; + assert!(result.is_ok()); + + let data_point2 = DataPoint::new( + "my_metric_1".to_string(), + 2000, + 2.0, + vec![ + ("tagk2".to_string(), "tagv2".to_string()), + ("tagk3".to_string(), "tagv3".to_string()), + ], + ); + // should create new column "tagk3" directly + let result = instance.insert_opentsdb_metric(&data_point2).await; + assert!(result.is_ok()); + + let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]); + // should handle null tags properly + let result = instance.insert_opentsdb_metric(&data_point3).await; + assert!(result.is_ok()); + + let output = instance + .do_query("select * from my_metric_1") + .await + .unwrap(); + match output { + Output::RecordBatches(recordbatches) => { + let recordbatches = recordbatches + .take() + .into_iter() + .map(|r| r.df_recordbatch) + .collect::>(); + let pretty_print = arrow_print::write(&recordbatches); + let pretty_print = pretty_print.lines().collect::>(); + let expected = vec![ + "+---------------------+-------+-------+-------+-------+", + "| timestamp | value | tagk1 | tagk2 | tagk3 |", + "+---------------------+-------+-------+-------+-------+", + "| 1970-01-01 00:00:01 | 1 | tagv1 | tagv2 | |", + "| 1970-01-01 00:00:02 | 2 | | tagv2 | tagv3 |", + "| 1970-01-01 00:00:03 | 3 | | | |", + "+---------------------+-------+-------+-------+-------+", + ]; + assert_eq!(pretty_print, expected); + } + _ => unreachable!(), + }; + } +} diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 47e168e09f..932e213a87 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -3,4 +3,9 @@ pub mod error; pub mod frontend; pub mod instance; +pub mod mysql; +pub mod opentsdb; +pub mod postgres; mod server; +#[cfg(test)] +mod tests; diff --git a/src/frontend/src/mysql.rs b/src/frontend/src/mysql.rs new file mode 100644 index 0000000000..f1d688790e --- /dev/null +++ b/src/frontend/src/mysql.rs @@ -0,0 +1,16 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct MysqlOptions { + pub addr: String, + pub runtime_size: usize, +} + +impl Default for MysqlOptions { + fn default() -> Self { + Self { + addr: "0.0.0.0:4002".to_string(), + runtime_size: 2, + } + } +} diff --git a/src/frontend/src/opentsdb.rs b/src/frontend/src/opentsdb.rs new file mode 100644 index 0000000000..2752167342 --- /dev/null +++ b/src/frontend/src/opentsdb.rs @@ -0,0 +1,16 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct OpentsdbOptions { + pub addr: String, + pub runtime_size: usize, +} + +impl Default for OpentsdbOptions { + fn default() -> Self { + Self { + addr: "0.0.0.0:4242".to_string(), + runtime_size: 2, + } + } +} diff --git a/src/frontend/src/postgres.rs b/src/frontend/src/postgres.rs new file mode 100644 index 0000000000..1c949318f8 --- /dev/null +++ b/src/frontend/src/postgres.rs @@ -0,0 +1,16 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PostgresOptions { + pub addr: String, + pub runtime_size: usize, +} + +impl Default for PostgresOptions { + fn default() -> Self { + Self { + addr: "0.0.0.0:4003".to_string(), + runtime_size: 2, + } + } +} diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 12f6d07766..44674f791f 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -5,7 +5,7 @@ use common_runtime::Builder as RuntimeBuilder; use servers::grpc::GrpcServer; use servers::http::HttpServer; use servers::mysql::server::MysqlServer; -#[cfg(feature = "postgres")] +use servers::opentsdb::OpentsdbServer; use servers::postgres::PostgresServer; use servers::server::Server; use snafu::ResultExt; @@ -19,16 +19,6 @@ pub(crate) struct Services; impl Services { pub(crate) async fn start(opts: &FrontendOptions, instance: InstanceRef) -> Result<()> { - let http_server_and_addr = if let Some(http_addr) = &opts.http_addr { - let http_addr = parse_addr(http_addr)?; - - let http_server = HttpServer::new(instance.clone()); - - Some((Box::new(http_server) as _, http_addr)) - } else { - None - }; - let grpc_server_and_addr = if let Some(grpc_addr) = &opts.grpc_addr { let grpc_addr = parse_addr(grpc_addr)?; @@ -39,12 +29,12 @@ impl Services { None }; - let mysql_server_and_addr = if let Some(mysql_addr) = &opts.mysql_addr { - let mysql_addr = parse_addr(mysql_addr)?; + let mysql_server_and_addr = if let Some(opts) = &opts.mysql_options { + let mysql_addr = parse_addr(&opts.addr)?; let mysql_io_runtime = Arc::new( RuntimeBuilder::default() - .worker_threads(opts.mysql_runtime_size as usize) + .worker_threads(opts.runtime_size) .thread_name("mysql-io-handlers") .build() .context(error::RuntimeResourceSnafu)?, @@ -57,13 +47,12 @@ impl Services { None }; - #[cfg(feature = "postgres")] - let postgres_server_and_addr = if let Some(pg_addr) = &opts.postgres_addr { - let pg_addr = parse_addr(pg_addr)?; + let postgres_server_and_addr = if let Some(opts) = &opts.postgres_options { + let pg_addr = parse_addr(&opts.addr)?; let pg_io_runtime = Arc::new( RuntimeBuilder::default() - .worker_threads(opts.postgres_runtime_size as usize) + .worker_threads(opts.runtime_size) .thread_name("pg-io-handlers") .build() .context(error::RuntimeResourceSnafu)?, @@ -77,12 +66,43 @@ impl Services { None }; + let opentsdb_server_and_addr = if let Some(opts) = &opts.opentsdb_options { + let addr = parse_addr(&opts.addr)?; + + let io_runtime = Arc::new( + RuntimeBuilder::default() + .worker_threads(opts.runtime_size) + .thread_name("opentsdb-io-handlers") + .build() + .context(error::RuntimeResourceSnafu)?, + ); + + let server = OpentsdbServer::create_server(instance.clone(), io_runtime); + + Some((server, addr)) + } else { + None + }; + + let http_server_and_addr = if let Some(http_addr) = &opts.http_addr { + let http_addr = parse_addr(http_addr)?; + + let mut http_server = HttpServer::new(instance.clone()); + if opentsdb_server_and_addr.is_some() { + http_server.set_opentsdb_handler(instance.clone()); + } + + Some((Box::new(http_server) as _, http_addr)) + } else { + None + }; + try_join!( start_server(http_server_and_addr), start_server(grpc_server_and_addr), start_server(mysql_server_and_addr), - #[cfg(feature = "postgres")] start_server(postgres_server_and_addr), + start_server(opentsdb_server_and_addr) ) .context(error::StartServerSnafu)?; Ok(()) diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs new file mode 100644 index 0000000000..bc2b473e97 --- /dev/null +++ b/src/frontend/src/tests.rs @@ -0,0 +1,61 @@ +use std::sync::Arc; + +use api::v1::greptime_client::GreptimeClient; +use client::Client; +use datanode::instance::Instance as DatanodeInstance; +use servers::grpc::GrpcServer; +use tonic::transport::{Endpoint, Server}; +use tower::service_fn; + +use crate::instance::Instance; + +async fn create_datanode_instance() -> Arc { + // TODO(LFC) Use real Mito engine when we can alter its region schema, + // and delete the `new_mock` method. + let instance = Arc::new(DatanodeInstance::new_mock().await.unwrap()); + instance.start().await.unwrap(); + instance +} + +pub(crate) async fn create_frontend_instance() -> Arc { + let datanode_instance = create_datanode_instance().await; + + let (client, server) = tokio::io::duplex(1024); + + // create a mock datanode grpc service, see example here: + // https://github.com/hyperium/tonic/blob/master/examples/src/mock/mock.rs + let datanode_service = + GrpcServer::new(datanode_instance.clone(), datanode_instance).create_service(); + tokio::spawn(async move { + Server::builder() + .add_service(datanode_service) + .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .await + }); + + // Move client to an option so we can _move_ the inner value + // on the first attempt to connect. All other attempts will fail. + let mut client = Some(client); + // "http://[::]:50051" is just a placeholder, does not actually connect to it, + // see https://github.com/hyperium/tonic/issues/727#issuecomment-881532934 + let channel = Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(service_fn(move |_| { + let client = client.take(); + + async move { + if let Some(client) = client { + Ok(client) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Client already taken", + )) + } + } + })) + .await + .unwrap(); + let client = Client::with_client(GreptimeClient::new(channel)); + Arc::new(Instance::with_client(client)) +} diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 9d7143dae8..95852cd68c 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -6,8 +6,9 @@ edition = "2021" [dependencies] api = { path = "../api" } async-trait = "0.1" -axum = "0.5" -axum-macros = "0.2" +axum = "0.6.0-rc.2" +axum-macros = "0.3.0-rc.1" +bytes = "1.2" common-error = { path = "../common/error" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } @@ -16,12 +17,12 @@ common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } futures = "0.3" -hex = { version = "0.4", optional = true } +hex = { version = "0.4" } hyper = { version = "0.14", features = ["full"] } metrics = "0.20" num_cpus = "1.13" opensrv-mysql = "0.1" -pgwire = { version = "0.3", optional = true } +pgwire = { version = "0.3" } query = { path = "../query" } serde = "1.0" serde_json = "1.0" @@ -32,11 +33,8 @@ tonic = "0.8" tower = { version = "0.4", features = ["full"] } tower-http = { version = "0.3", features = ["full"] } -[features] -default = ["postgres"] -postgres = ["hex", "pgwire"] - [dev-dependencies] +axum-test-helper = "0.1" catalog = { path = "../catalog" } common-base = { path = "../common/base" } mysql_async = "0.30" @@ -44,3 +42,4 @@ rand = "0.8" script = { path = "../script", features = ["python"] } test-util = { path = "../../test-util" } tokio-postgres = "0.7" +tokio-test = "0.4" diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index da14cb60c3..946b5e08f7 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -1,7 +1,11 @@ use std::any::Any; use std::net::SocketAddr; +use axum::http::StatusCode as HttpStatusCode; +use axum::response::{IntoResponse, Response}; +use axum::Json; use common_error::prelude::*; +use serde_json::json; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] @@ -71,6 +75,35 @@ pub enum Error { reason: String, backtrace: Backtrace, }, + + #[snafu(display("Connection reset by peer"))] + ConnResetByPeer { backtrace: Backtrace }, + + #[snafu(display("Hyper error, source: {}", source))] + Hyper { source: hyper::Error }, + + #[snafu(display("Invalid OpenTSDB line, source: {}", source))] + InvalidOpentsdbLine { + source: std::string::FromUtf8Error, + backtrace: Backtrace, + }, + + #[snafu(display("Invalid OpenTSDB Json request, source: {}", source))] + InvalidOpentsdbJsonRequest { + source: serde_json::error::Error, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to put OpenTSDB data point: {:?}, source: {}", + data_point, + source + ))] + PutOpentsdbDataPoint { + data_point: String, + #[snafu(backtrace)] + source: BoxedError, + }, } pub type Result = std::result::Result; @@ -90,9 +123,16 @@ impl ErrorExt for Error { InsertScript { source, .. } | ExecuteScript { source, .. } - | ExecuteQuery { source, .. } => source.status_code(), + | ExecuteQuery { source, .. } + | PutOpentsdbDataPoint { source, .. } => source.status_code(), - NotSupported { .. } | InvalidQuery { .. } => StatusCode::InvalidArguments, + NotSupported { .. } + | InvalidQuery { .. } + | ConnResetByPeer { .. } + | InvalidOpentsdbLine { .. } + | InvalidOpentsdbJsonRequest { .. } => StatusCode::InvalidArguments, + + Hyper { .. } => StatusCode::Unknown, } } @@ -116,3 +156,18 @@ impl From for Error { Error::InternalIo { source: e } } } + +impl IntoResponse for Error { + fn into_response(self) -> Response { + let (status, error_message) = match self { + Error::InvalidOpentsdbLine { .. } + | Error::InvalidOpentsdbJsonRequest { .. } + | Error::InvalidQuery { .. } => (HttpStatusCode::BAD_REQUEST, self.to_string()), + _ => (HttpStatusCode::INTERNAL_SERVER_ERROR, self.to_string()), + }; + let body = Json(json!({ + "error": error_message, + })); + (status, body).into_response() + } +} diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 56f3fdb96e..a9922f49d1 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -1,4 +1,5 @@ pub mod handler; +pub mod opentsdb; use std::net::SocketAddr; use std::time::Duration; @@ -8,7 +9,7 @@ use axum::{ error_handling::HandleErrorLayer, response::IntoResponse, response::{Json, Response}, - routing, BoxError, Extension, Router, + routing, BoxError, Router, }; use common_query::Output; use common_recordbatch::{util, RecordBatch}; @@ -19,13 +20,15 @@ use tower::{timeout::TimeoutLayer, ServiceBuilder}; use tower_http::trace::TraceLayer; use crate::error::{Result, StartHttpSnafu}; +use crate::query_handler::OpentsdbProtocolHandlerRef; use crate::query_handler::SqlQueryHandlerRef; use crate::server::Server; const HTTP_API_VERSION: &str = "v1"; pub struct HttpServer { - query_handler: SqlQueryHandlerRef, + sql_handler: SqlQueryHandlerRef, + opentsdb_handler: Option, } #[derive(Serialize, Debug)] @@ -114,27 +117,50 @@ async fn shutdown_signal() { } impl HttpServer { - pub fn new(query_handler: SqlQueryHandlerRef) -> Self { - Self { query_handler } + pub fn new(sql_handler: SqlQueryHandlerRef) -> Self { + Self { + sql_handler, + opentsdb_handler: None, + } + } + + pub fn set_opentsdb_handler(&mut self, handler: OpentsdbProtocolHandlerRef) { + debug_assert!( + self.opentsdb_handler.is_none(), + "OpenTSDB handler can be set only once!" + ); + self.opentsdb_handler.get_or_insert(handler); } pub fn make_app(&self) -> Router { - Router::new() - .nest( - &format!("/{}", HTTP_API_VERSION), - Router::new() - // handlers - .route("/sql", routing::get(handler::sql)) - .route("/scripts", routing::post(handler::scripts)) - .route("/run-script", routing::post(handler::run_script)), - ) + // TODO(LFC): Use released Axum. + // Axum version 0.6 introduces state within router, making router methods far more elegant + // to write. Though version 0.6 is rc, I think it's worth to upgrade. + // Prior to version 0.6, we only have a single "Extension" to share all query + // handlers amongst router methods. That requires us to pack all query handlers in a shared + // state, and check-then-get the desired query handler in different router methods, which + // is a lot of tedious work. + let sql_router = Router::with_state(self.sql_handler.clone()) + .route("/sql", routing::get(handler::sql)) + .route("/scripts", routing::post(handler::scripts)) + .route("/run-script", routing::post(handler::run_script)); + + let mut router = Router::new().nest(&format!("/{}", HTTP_API_VERSION), sql_router); + + if let Some(opentsdb_handler) = self.opentsdb_handler.clone() { + let opentsdb_router = Router::with_state(opentsdb_handler.clone()) + .route("/api/put", routing::post(opentsdb::put)); + + router = router.nest(&format!("/{}/opentsdb", HTTP_API_VERSION), opentsdb_router); + } + + router .route("/metrics", routing::get(handler::metrics)) // middlewares .layer( ServiceBuilder::new() .layer(HandleErrorLayer::new(handle_error)) .layer(TraceLayer::new_for_http()) - .layer(Extension(self.query_handler.clone())) // TODO(LFC): make timeout configurable .layer(TimeoutLayer::new(Duration::from_secs(30))), ) diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 81fb403835..afcc07d75b 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use axum::extract::{Extension, Json, Query}; +use axum::extract::{Json, Query, State}; use common_telemetry::metric; use serde::{Deserialize, Serialize}; @@ -10,11 +10,11 @@ use crate::query_handler::SqlQueryHandlerRef; /// Handler to execute sql #[axum_macros::debug_handler] pub async fn sql( - Extension(query_handler): Extension, + State(sql_handler): State, Query(params): Query>, ) -> HttpResponse { if let Some(sql) = params.get("sql") { - HttpResponse::Json(JsonResponse::from_output(query_handler.do_query(sql).await).await) + HttpResponse::Json(JsonResponse::from_output(sql_handler.do_query(sql).await).await) } else { HttpResponse::Json(JsonResponse::with_error(Some( "sql parameter is required.".to_string(), @@ -24,10 +24,7 @@ pub async fn sql( /// Handler to export metrics #[axum_macros::debug_handler] -pub async fn metrics( - Extension(_query_handler): Extension, - Query(_params): Query>, -) -> HttpResponse { +pub async fn metrics(Query(_params): Query>) -> HttpResponse { if let Some(handle) = metric::try_handle() { HttpResponse::Text(handle.render()) } else { @@ -44,7 +41,7 @@ pub struct ScriptExecution { /// Handler to insert and compile script #[axum_macros::debug_handler] pub async fn scripts( - Extension(query_handler): Extension, + State(query_handler): State, Json(payload): Json, ) -> HttpResponse { if payload.name.is_empty() || payload.script.is_empty() { @@ -67,7 +64,7 @@ pub async fn scripts( /// Handler to execute script #[axum_macros::debug_handler] pub async fn run_script( - Extension(query_handler): Extension, + State(query_handler): State, Query(params): Query>, ) -> HttpResponse { let name = params.get("name"); diff --git a/src/servers/src/http/opentsdb.rs b/src/servers/src/http/opentsdb.rs new file mode 100644 index 0000000000..216077827a --- /dev/null +++ b/src/servers/src/http/opentsdb.rs @@ -0,0 +1,225 @@ +use std::collections::HashMap; + +use axum::extract::{Query, RawBody, State}; +use axum::http::StatusCode as HttpStatusCode; +use axum::Json; +use hyper::Body; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; + +use crate::error::{self, Error, Result}; +use crate::opentsdb::codec::DataPoint; +use crate::query_handler::OpentsdbProtocolHandlerRef; + +#[derive(Serialize, Deserialize)] +#[serde(untagged)] +enum OneOrMany { + One(T), + Vec(Vec), +} + +impl From> for Vec { + fn from(from: OneOrMany) -> Self { + match from { + OneOrMany::One(val) => vec![val], + OneOrMany::Vec(vec) => vec, + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct DataPointRequest { + metric: String, + timestamp: i64, + value: f64, + tags: HashMap, +} + +impl From for DataPoint { + fn from(request: DataPointRequest) -> Self { + let ts_millis = DataPoint::timestamp_to_millis(request.timestamp); + + let tags = request + .tags + .into_iter() + .map(|(k, v)| (k, v)) + .collect::>(); + + DataPoint::new(request.metric, ts_millis, request.value, tags) + } +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(untagged)] +pub enum OpentsdbPutResponse { + Empty, + Debug(OpentsdbDebuggingResponse), +} + +// Please refer to the OpenTSDB documents of ["api/put"](http://opentsdb.net/docs/build/html/api_http/put.html) +// for more details. +#[axum_macros::debug_handler] +pub async fn put( + State(opentsdb_handler): State, + Query(params): Query>, + RawBody(body): RawBody, +) -> Result<(HttpStatusCode, Json)> { + let summary = params.contains_key("summary"); + let details = params.contains_key("details"); + + let data_points = parse_data_points(body).await?; + + let response = if !summary && !details { + for data_point in data_points.into_iter() { + if let Err(e) = opentsdb_handler.exec(&data_point.into()).await { + // Not debugging purpose, failed fast. + return error::InternalSnafu { + err_msg: e.to_string(), + } + .fail(); + } + } + (HttpStatusCode::NO_CONTENT, Json(OpentsdbPutResponse::Empty)) + } else { + let mut response = OpentsdbDebuggingResponse { + success: 0, + failed: 0, + errors: if details { + Some(Vec::with_capacity(data_points.len())) + } else { + None + }, + }; + + for data_point in data_points.into_iter() { + let result = opentsdb_handler.exec(&data_point.clone().into()).await; + match result { + Ok(()) => response.on_success(), + Err(e) => { + response.on_failed(data_point, e); + } + } + } + ( + HttpStatusCode::OK, + Json(OpentsdbPutResponse::Debug(response)), + ) + }; + Ok(response) +} + +async fn parse_data_points(body: Body) -> Result> { + let body = hyper::body::to_bytes(body) + .await + .context(error::HyperSnafu)?; + let data_points = serde_json::from_slice::>(&body[..]) + .context(error::InvalidOpentsdbJsonRequestSnafu)?; + Ok(data_points.into()) +} + +#[derive(Serialize, Deserialize, Debug)] +struct OpentsdbDetailError { + datapoint: DataPointRequest, + error: String, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct OpentsdbDebuggingResponse { + success: i32, + failed: i32, + #[serde(skip_serializing_if = "Option::is_none")] + errors: Option>, +} + +impl OpentsdbDebuggingResponse { + fn on_success(&mut self) { + self.success += 1; + } + + fn on_failed(&mut self, datapoint: DataPointRequest, error: Error) { + self.failed += 1; + + if let Some(details) = self.errors.as_mut() { + let error = OpentsdbDetailError { + datapoint, + error: error.to_string(), + }; + details.push(error); + }; + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_into_opentsdb_data_point() { + let request = DataPointRequest { + metric: "hello".to_string(), + timestamp: 1234, + value: 1.0, + tags: HashMap::from([("foo".to_string(), "a".to_string())]), + }; + let data_point: DataPoint = request.into(); + assert_eq!(data_point.metric(), "hello"); + assert_eq!(data_point.ts_millis(), 1234000); + assert_eq!(data_point.value(), 1.0); + assert_eq!( + data_point.tags(), + &vec![("foo".to_string(), "a".to_string())] + ); + } + + #[tokio::test] + async fn test_parse_data_points() { + let raw_data_point1 = r#"{ + "metric": "sys.cpu.nice", + "timestamp": 1346846400, + "value": 18, + "tags": { + "host": "web01", + "dc": "lga" + } + }"#; + let data_point1 = serde_json::from_str::(raw_data_point1).unwrap(); + + let raw_data_point2 = r#"{ + "metric": "sys.cpu.nice", + "timestamp": 1346846400, + "value": 9, + "tags": { + "host": "web02", + "dc": "lga" + } + }"#; + let data_point2 = serde_json::from_str::(raw_data_point2).unwrap(); + + let body = Body::from(raw_data_point1); + let data_points = parse_data_points(body).await.unwrap(); + assert_eq!(data_points.len(), 1); + assert_eq!(data_points[0], data_point1); + + let body = Body::from(format!("[{},{}]", raw_data_point1, raw_data_point2)); + let data_points = parse_data_points(body).await.unwrap(); + assert_eq!(data_points.len(), 2); + assert_eq!(data_points[0], data_point1); + assert_eq!(data_points[1], data_point2); + + let body = Body::from(""); + let result = parse_data_points(body).await; + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Invalid OpenTSDB Json request, source: EOF while parsing a value at line 1 column 0" + ); + + let body = Body::from("hello world"); + let result = parse_data_points(body).await; + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Invalid OpenTSDB Json request, source: expected value at line 1 column 1" + ); + } +} diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index d59ba05e21..5b75f6e4b7 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -1,8 +1,11 @@ +#![feature(assert_matches)] + pub mod error; pub mod grpc; pub mod http; pub mod mysql; -#[cfg(feature = "postgres")] +pub mod opentsdb; pub mod postgres; pub mod query_handler; pub mod server; +mod shutdown; diff --git a/src/servers/src/mysql/server.rs b/src/servers/src/mysql/server.rs index 3f282c0aec..ff1b28145d 100644 --- a/src/servers/src/mysql/server.rs +++ b/src/servers/src/mysql/server.rs @@ -5,69 +5,39 @@ use std::sync::Arc; use async_trait::async_trait; use common_runtime::Runtime; use common_telemetry::logging::{error, info}; -use futures::future::AbortHandle; -use futures::future::AbortRegistration; -use futures::future::Abortable; use futures::StreamExt; use opensrv_mysql::AsyncMysqlIntermediary; -use snafu::prelude::*; use tokio; use tokio::net::TcpStream; -use tokio::task::JoinHandle; -use tokio_stream::wrappers::TcpListenerStream; -use crate::error::{self, Result}; +use crate::error::Result; use crate::mysql::handler::MysqlInstanceShim; use crate::query_handler::SqlQueryHandlerRef; -use crate::server::Server; +use crate::server::{AbortableStream, BaseTcpServer, Server}; pub struct MysqlServer { - // `abort_handle` and `abort_registration` are used in pairs in shutting down MySQL server. - // They work like sender and receiver for aborting stream. When the server is shutting down, - // calling `abort_handle.abort()` will "notify" `abort_registration` to stop emitting new - // elements in the stream. - abort_handle: AbortHandle, - abort_registration: Option, - - // A handle holding the TCP accepting task. - join_handle: Option>, - + base_server: BaseTcpServer, query_handler: SqlQueryHandlerRef, - io_runtime: Arc, } impl MysqlServer { - /// Creates a new MySQL server with provided [MysqlInstance] and [Runtime]. pub fn create_server( query_handler: SqlQueryHandlerRef, io_runtime: Arc, ) -> Box { - let (abort_handle, registration) = AbortHandle::new_pair(); Box::new(MysqlServer { - abort_handle, - abort_registration: Some(registration), - join_handle: None, + base_server: BaseTcpServer::create_server("MySQL", io_runtime), query_handler, - io_runtime, }) } - async fn bind(addr: SocketAddr) -> Result<(TcpListenerStream, SocketAddr)> { - let listener = tokio::net::TcpListener::bind(addr) - .await - .context(error::TokioIoSnafu { - err_msg: format!("Failed to bind addr {}", addr), - })?; - // get actually bond addr in case input addr use port 0 - let addr = listener.local_addr()?; - info!("MySQL server is bound to {}", addr); - Ok((TcpListenerStream::new(listener), addr)) - } - - fn accept(&self, accepting_stream: Abortable) -> impl Future { - let io_runtime = self.io_runtime.clone(); + fn accept( + &self, + io_runtime: Arc, + stream: AbortableStream, + ) -> impl Future { let query_handler = self.query_handler.clone(); - accepting_stream.for_each(move |tcp_stream| { + stream.for_each(move |tcp_stream| { let io_runtime = io_runtime.clone(); let query_handler = query_handler.clone(); async move { @@ -99,40 +69,15 @@ impl MysqlServer { #[async_trait] impl Server for MysqlServer { async fn shutdown(&mut self) -> Result<()> { - match self.join_handle.take() { - Some(join_handle) => { - self.abort_handle.abort(); - - if let Err(error) = join_handle.await { - // Couldn't use `error!(e; xxx)` as JoinError doesn't implement ErrorExt. - error!( - "Unexpected error during shutdown MySQL server, error: {}", - error - ); - } else { - info!("MySQL server is shutdown.") - } - Ok(()) - } - None => error::InternalSnafu { - err_msg: "MySQL server is not started.", - } - .fail()?, - } + self.base_server.shutdown().await } async fn start(&mut self, listening: SocketAddr) -> Result { - match self.abort_registration.take() { - Some(registration) => { - let (stream, listener) = Self::bind(listening).await?; - let stream = Abortable::new(stream, registration); - self.join_handle = Some(tokio::spawn(self.accept(stream))); - Ok(listener) - } - None => error::InternalSnafu { - err_msg: "MySQL server has been started.", - } - .fail()?, - } + let (stream, addr) = self.base_server.bind(listening).await?; + + let io_runtime = self.base_server.io_runtime(); + let join_handle = tokio::spawn(self.accept(io_runtime, stream)); + self.base_server.start_with(join_handle)?; + Ok(addr) } } diff --git a/src/servers/src/opentsdb.rs b/src/servers/src/opentsdb.rs new file mode 100644 index 0000000000..aaf04b0bd2 --- /dev/null +++ b/src/servers/src/opentsdb.rs @@ -0,0 +1,102 @@ +pub mod codec; +pub mod connection; +mod handler; + +use std::future::Future; +use std::net::SocketAddr; +use std::sync::Arc; + +use async_trait::async_trait; +use common_runtime::Runtime; +use common_telemetry::logging::error; +use futures::StreamExt; +use tokio::sync::broadcast; + +use crate::error::Result; +use crate::opentsdb::connection::Connection; +use crate::opentsdb::handler::Handler; +use crate::query_handler::OpentsdbProtocolHandlerRef; +use crate::server::{AbortableStream, BaseTcpServer, Server}; +use crate::shutdown::Shutdown; + +pub struct OpentsdbServer { + base_server: BaseTcpServer, + query_handler: OpentsdbProtocolHandlerRef, + + /// Broadcasts a shutdown signal to all active connections. + /// + /// When a connection task is spawned, it is passed a broadcast receiver handle. We can send + /// a `()` value via `notify_shutdown` or just drop `notify_shutdown`, then each active + /// connection receives it, reaches a safe terminal state, and completes the task. + notify_shutdown: Option>, +} + +impl OpentsdbServer { + pub fn create_server( + query_handler: OpentsdbProtocolHandlerRef, + io_runtime: Arc, + ) -> Box { + // When the provided `shutdown` future completes, we must send a shutdown + // message to all active connections. We use a broadcast channel for this + // purpose. The call below ignores the receiver of the broadcast pair, and when + // a receiver is needed, the subscribe() method on the sender is used to create + // one. + let (notify_shutdown, _) = broadcast::channel(1); + + Box::new(OpentsdbServer { + base_server: BaseTcpServer::create_server("OpenTSDB", io_runtime), + query_handler, + notify_shutdown: Some(notify_shutdown), + }) + } + + fn accept( + &self, + io_runtime: Arc, + stream: AbortableStream, + ) -> impl Future { + let query_handler = self.query_handler.clone(); + let notify_shutdown = self + .notify_shutdown + .clone() + .expect("`notify_shutdown` must be present when accepting connection!"); + stream.for_each(move |stream| { + let io_runtime = io_runtime.clone(); + let query_handler = query_handler.clone(); + let shutdown = Shutdown::new(notify_shutdown.subscribe()); + async move { + match stream { + Ok(stream) => { + let connection = Connection::new(stream); + let mut handler = Handler::new(query_handler, connection, shutdown); + + let _ = io_runtime.spawn(async move { + if let Err(e) = handler.run().await { + error!(e; "Unexpected error when handling OpenTSDB connection"); + } + }); + } + Err(error) => error!("Broken pipe: {}", error), // IoError doesn't impl ErrorExt. + }; + } + }) + } +} + +#[async_trait] +impl Server for OpentsdbServer { + async fn shutdown(&mut self) -> Result<()> { + self.base_server.shutdown().await?; + drop(self.notify_shutdown.take()); + Ok(()) + } + + async fn start(&mut self, listening: SocketAddr) -> Result { + let (stream, addr) = self.base_server.bind(listening).await?; + + let io_runtime = self.base_server.io_runtime(); + let join_handle = tokio::spawn(self.accept(io_runtime, stream)); + self.base_server.start_with(join_handle)?; + Ok(addr) + } +} diff --git a/src/servers/src/opentsdb/codec.rs b/src/servers/src/opentsdb/codec.rs new file mode 100644 index 0000000000..41cd849c6e --- /dev/null +++ b/src/servers/src/opentsdb/codec.rs @@ -0,0 +1,280 @@ +use api::v1::codec::InsertBatch; +use api::v1::{column, insert_expr, Column, InsertExpr}; + +use crate::error::{self, Result}; + +pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "timestamp"; +pub const OPENTSDB_VALUE_COLUMN_NAME: &str = "value"; + +#[derive(Debug)] +pub struct DataPoint { + metric: String, + ts_millis: i64, + value: f64, + tags: Vec<(String, String)>, +} + +impl DataPoint { + pub fn new(metric: String, ts_millis: i64, value: f64, tags: Vec<(String, String)>) -> Self { + Self { + metric, + ts_millis, + value, + tags, + } + } + + pub fn try_create(line: &str) -> Result { + let tokens = line.split_whitespace().collect::>(); + let cmd = if tokens.is_empty() { "" } else { tokens[0] }; + // OpenTSDB command is case sensitive, verified in real OpenTSDB. + if cmd != "put" { + return error::InvalidQuerySnafu { + reason: format!("unknown command {}.", cmd), + } + .fail(); + } + if tokens.len() < 4 { + return error::InvalidQuerySnafu { + reason: format!( + "put: illegal argument: not enough arguments (need least 4, got {})", + tokens.len() + ), + } + .fail(); + } + + let metric = tokens[1]; + + let ts_millis = match tokens[2].parse::() { + Ok(t) => Self::timestamp_to_millis(t), + Err(_) => { + return error::InvalidQuerySnafu { + reason: format!("put: invalid timestamp: {}", tokens[2]), + } + .fail() + } + }; + + let value = match tokens[3].parse::() { + Ok(v) => v, + Err(_) => { + return error::InvalidQuerySnafu { + reason: format!("put: invalid value: {}", tokens[3]), + } + .fail() + } + }; + + let mut tags = Vec::with_capacity(tokens.len() - 4); + for token in tokens.iter().skip(4) { + let tag = token.split('=').collect::>(); + if tag.len() != 2 || tag[0].is_empty() || tag[1].is_empty() { + return error::InvalidQuerySnafu { + reason: format!("put: invalid tag: {}", token), + } + .fail(); + } + let tagk = tag[0].to_string(); + let tagv = tag[1].to_string(); + if tags.iter().any(|(t, _)| t == &tagk) { + return error::InvalidQuerySnafu { + reason: format!("put: illegal argument: duplicate tag: {}", tagk), + } + .fail(); + } + tags.push((tagk, tagv)); + } + + Ok(DataPoint { + metric: metric.to_string(), + ts_millis, + value, + tags, + }) + } + + pub fn metric(&self) -> &str { + &self.metric + } + + pub fn tags(&self) -> &Vec<(String, String)> { + &self.tags + } + + pub fn ts_millis(&self) -> i64 { + self.ts_millis + } + + pub fn value(&self) -> f64 { + self.value + } + + pub fn as_grpc_insert(&self) -> InsertExpr { + let mut columns = Vec::with_capacity(2 + self.tags.len()); + + let ts_column = Column { + column_name: OPENTSDB_TIMESTAMP_COLUMN_NAME.to_string(), + values: Some(column::Values { + ts_millis_values: vec![self.ts_millis], + ..Default::default() + }), + ..Default::default() + }; + columns.push(ts_column); + + let value_column = Column { + column_name: OPENTSDB_VALUE_COLUMN_NAME.to_string(), + values: Some(column::Values { + f64_values: vec![self.value], + ..Default::default() + }), + ..Default::default() + }; + columns.push(value_column); + + for (tagk, tagv) in self.tags.iter() { + columns.push(Column { + column_name: tagk.to_string(), + values: Some(column::Values { + string_values: vec![tagv.to_string()], + ..Default::default() + }), + ..Default::default() + }); + } + + let batch = InsertBatch { + columns, + row_count: 1, + }; + InsertExpr { + table_name: self.metric.clone(), + expr: Some(insert_expr::Expr::Values(insert_expr::Values { + values: vec![batch.into()], + })), + } + } + + pub fn timestamp_to_millis(t: i64) -> i64 { + // 9999999999999 (13 digits) is of date "Sat Nov 20 2286 17:46:39 UTC", + // 999999999999 (12 digits) is "Sun Sep 09 2001 01:46:39 UTC", + // so timestamp digits less than 13 means we got seconds here. + // (We are not expecting to store data that is 21 years ago, are we?) + if t.abs().to_string().len() < 13 { + t * 1000 + } else { + t + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_try_create() { + fn test_illegal_line(line: &str, expected_err: &str) { + let result = DataPoint::try_create(line); + match result.unwrap_err() { + error::Error::InvalidQuery { reason, .. } => { + assert_eq!(reason, expected_err) + } + _ => unreachable!(), + } + } + + test_illegal_line("no_put", "unknown command no_put."); + test_illegal_line( + "put", + "put: illegal argument: not enough arguments (need least 4, got 1)", + ); + test_illegal_line( + "put metric.foo notatime 42 host=web01", + "put: invalid timestamp: notatime", + ); + test_illegal_line( + "put metric.foo 1000 notavalue host=web01", + "put: invalid value: notavalue", + ); + test_illegal_line("put metric.foo 1000 42 host=", "put: invalid tag: host="); + test_illegal_line( + "put metric.foo 1000 42 host=web01 host=web02", + "put: illegal argument: duplicate tag: host", + ); + + let data_point = DataPoint::try_create( + "put sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0", + ) + .unwrap(); + assert_eq!(data_point.metric, "sys.if.bytes.out"); + assert_eq!(data_point.ts_millis, 1479496100000); + assert_eq!(data_point.value, 1.3e3); + assert_eq!( + data_point.tags, + vec![ + ("host".to_string(), "web01".to_string()), + ("interface".to_string(), "eth0".to_string()) + ] + ); + + let data_point = + DataPoint::try_create("put sys.procs.running 1479496100 42 host=web01").unwrap(); + assert_eq!(data_point.metric, "sys.procs.running"); + assert_eq!(data_point.ts_millis, 1479496100000); + assert_eq!(data_point.value, 42f64); + assert_eq!( + data_point.tags, + vec![("host".to_string(), "web01".to_string())] + ); + } + + #[test] + fn test_as_grpc_insert() { + let data_point = DataPoint { + metric: "my_metric_1".to_string(), + ts_millis: 1000, + value: 1.0, + tags: vec![ + ("tagk1".to_string(), "tagv1".to_string()), + ("tagk2".to_string(), "tagv2".to_string()), + ], + }; + + let grpc_insert = data_point.as_grpc_insert(); + assert_eq!(grpc_insert.table_name, "my_metric_1"); + + match grpc_insert.expr { + Some(insert_expr::Expr::Values(insert_expr::Values { values })) => { + assert_eq!(values.len(), 1); + let insert_batch = InsertBatch::try_from(values[0].as_slice()).unwrap(); + assert_eq!(insert_batch.row_count, 1); + let columns = insert_batch.columns; + assert_eq!(columns.len(), 4); + + assert_eq!(columns[0].column_name, OPENTSDB_TIMESTAMP_COLUMN_NAME); + assert_eq!( + columns[0].values.as_ref().unwrap().ts_millis_values, + vec![1000] + ); + + assert_eq!(columns[1].column_name, OPENTSDB_VALUE_COLUMN_NAME); + assert_eq!(columns[1].values.as_ref().unwrap().f64_values, vec![1.0]); + + assert_eq!(columns[2].column_name, "tagk1"); + assert_eq!( + columns[2].values.as_ref().unwrap().string_values, + vec!["tagv1"] + ); + + assert_eq!(columns[3].column_name, "tagk2"); + assert_eq!( + columns[3].values.as_ref().unwrap().string_values, + vec!["tagv2"] + ); + } + _ => unreachable!(), + } + } +} diff --git a/src/servers/src/opentsdb/connection.rs b/src/servers/src/opentsdb/connection.rs new file mode 100644 index 0000000000..3ef2b099e0 --- /dev/null +++ b/src/servers/src/opentsdb/connection.rs @@ -0,0 +1,190 @@ +//! Modified from Tokio's mini-redis example. + +use bytes::{Buf, BytesMut}; +use snafu::ResultExt; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufWriter}; + +use crate::error::{self, Result}; + +type Line = String; + +#[derive(Debug)] +pub struct Connection { + stream: BufWriter, + buffer: BytesMut, +} + +impl Connection { + pub fn new(stream: S) -> Connection { + Connection { + stream: BufWriter::new(stream), + buffer: BytesMut::with_capacity(4 * 1024), + } + } + + /// Read one line from the underlying stream. + /// + /// The function waits until it has retrieved enough data to parse a line (terminated by \r\n). + /// Any data remaining in the read buffer after the line has been parsed is kept there for the + /// next call to `read_line`. + /// + /// # Returns + /// + /// On success, the received line is returned. If the stream is closed in a way that + /// doesn't break a line in half, it returns `None`. Otherwise, an error is returned. + pub async fn read_line(&mut self) -> Result> { + loop { + // Attempt to parse a line from the buffered data. If enough data + // has been buffered, the line is returned. + if let Some(line) = self.parse_line()? { + return Ok(Some(line)); + } + + // There is not enough buffered data as a line. Attempt to read more from the socket. + // On success, the number of bytes is returned. `0` indicates "end of stream". + if self.stream.read_buf(&mut self.buffer).await? == 0 { + // The remote closed the connection. For this to be a clean shutdown, there should + // be no data in the read buffer. If there is, this means that the peer closed the + // socket while sending a line. + if self.buffer.is_empty() { + return Ok(None); + } else { + return error::ConnResetByPeerSnafu {}.fail(); + } + } + } + } + + /// Tries to parse a line from the buffer. + /// + /// If the buffer contains enough data, the line is returned and the buffered data is removed. + /// If not enough data has been buffered yet, `Ok(None)` is returned. + /// If the buffered data does not represent a valid UTF8 line, `Err` is returned. + fn parse_line(&mut self) -> Result> { + if self.buffer.is_empty() { + return Ok(None); + } + + let buf = &self.buffer[..]; + if let Some(pos) = buf.windows(2).position(|w| w == [b'\r', b'\n']) { + let line = buf[0..pos].to_vec(); + + self.buffer.advance(pos + 2); + + Ok(Some( + String::from_utf8(line).context(error::InvalidOpentsdbLineSnafu)?, + )) + } else { + // There is not enough data present in the read buffer to parse a single line. We must + // wait for more data to be received from the socket. + Ok(None) + } + } + + pub async fn write_line(&mut self, line: String) -> Result<()> { + self.stream + .write_all(line.as_bytes()) + .await + .context(error::InternalIoSnafu)?; + self.stream + .write(b"\r\n") + .await + .context(error::InternalIoSnafu)?; + self.stream.flush().await.context(error::InternalIoSnafu)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::io::Write; + + use bytes::BufMut; + use tokio_test::io::Builder; + + use super::*; + + #[tokio::test] + async fn test_read_line() { + let mock = Builder::new() + .read(b"This is") + .read(b" a line.\r\n") + .build(); + let mut conn = Connection::new(mock); + let line = conn.read_line().await.unwrap(); + assert_eq!(line, Some("This is a line.".to_string())); + + let line = conn.read_line().await.unwrap(); + assert_eq!(line, None); + + let buffer = &mut conn.buffer; + buffer + .writer() + .write_all(b"simulating buffer has remaining data") + .unwrap(); + let result = conn.read_line().await; + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Connection reset by peer")); + } + + #[test] + fn test_parse_line() { + let mock = Builder::new().build(); + let mut conn = Connection::new(mock); + + // initially, no data in the buffer, so return None + let line = conn.parse_line(); + assert_matches!(line, Ok(None)); + + // still has no line, but we have data in the buffer + { + let buffer = &mut conn.buffer; + buffer.writer().write_all(b"This is a ").unwrap(); + let line = conn.parse_line(); + assert_matches!(line, Ok(None)); + } + let buffer = &conn.buffer[..]; + assert_eq!(String::from_utf8(buffer.to_vec()).unwrap(), "This is a "); + + // finally gets a line, and the buffer has the remaining data + { + let buffer = &mut conn.buffer; + buffer + .writer() + .write_all(b"line.\r\n another line's remaining data") + .unwrap(); + let line = conn.parse_line().unwrap(); + assert_eq!(line, Some("This is a line.".to_string())); + } + let buffer = &conn.buffer[..]; + assert_eq!( + String::from_utf8(buffer.to_vec()).unwrap(), + " another line's remaining data" + ); + + // expected failed on not valid utf-8 line + let buffer = &mut conn.buffer; + buffer.writer().write_all(b"Hello Wor\xffld.\r\n").unwrap(); + let result = conn.parse_line(); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Invalid OpenTSDB line, source: invalid utf-8 sequence")); + } + + #[tokio::test] + async fn test_write_err() { + let mock = Builder::new() + .write(b"An OpenTSDB error.") + .write(b"\r\n") + .build(); + let mut conn = Connection::new(mock); + let result = conn.write_line("An OpenTSDB error.".to_string()).await; + assert!(result.is_ok()); + } +} diff --git a/src/servers/src/opentsdb/handler.rs b/src/servers/src/opentsdb/handler.rs new file mode 100644 index 0000000000..280315cac4 --- /dev/null +++ b/src/servers/src/opentsdb/handler.rs @@ -0,0 +1,184 @@ +//! Modified from Tokio's mini-redis example. + +use tokio::io::{AsyncRead, AsyncWrite}; + +use crate::error::Result; +use crate::opentsdb::codec::DataPoint; +use crate::opentsdb::connection::Connection; +use crate::query_handler::OpentsdbProtocolHandlerRef; +use crate::shutdown::Shutdown; + +/// Per-connection handler. Reads requests from `connection` and applies the OpenTSDB metric to +/// [OpentsdbLineProtocolHandler]. +pub(crate) struct Handler { + query_handler: OpentsdbProtocolHandlerRef, + + /// The TCP connection decorated with OpenTSDB line protocol encoder / decoder implemented + /// using a buffered `TcpStream`. + /// + /// When TCP listener receives an inbound connection, the `TcpStream` is passed to + /// `Connection::new`, which initializes the associated buffers. The byte level protocol + /// parsing details is encapsulated in `Connection`. + connection: Connection, + + /// Listen for shutdown notifications. + /// + /// A wrapper around the `broadcast::Receiver` paired with the sender in TCP connections + /// listener. The connection handler processes requests from the connection until the peer + /// disconnects **or** a shutdown notification is received from `shutdown`. In the latter case, + /// any in-flight work being processed for the peer is continued until it reaches a safe state, + /// at which point the connection is terminated. (Graceful shutdown.) + shutdown: Shutdown, +} + +impl Handler { + pub(crate) fn new( + query_handler: OpentsdbProtocolHandlerRef, + connection: Connection, + shutdown: Shutdown, + ) -> Self { + Self { + query_handler, + connection, + shutdown, + } + } + + pub(crate) async fn run(&mut self) -> Result<()> { + while !self.shutdown.is_shutdown() { + // While reading a request, also listen for the shutdown signal. + let maybe_line = tokio::select! { + line = self.connection.read_line() => line?, + _ = self.shutdown.recv() => { + // If a shutdown signal is received, return from `run`. + // This will result in the task terminating. + return Ok(()); + } + }; + + // If `None` is returned from `read_line()` then the peer closed the socket. There is + // no further work to do and the task can be terminated. + let line = match maybe_line { + Some(line) => line, + None => return Ok(()), + }; + + // Close connection upon receiving "quit" line. With actual OpenTSDB, telnet just won't + // quit, the connection to OpenTSDB server can be closed only via terminating telnet + // session manually, for example, close the terminal window. That is a little annoying, + // so I added "quit" command to the line protocol, to make telnet client able to quit + // gracefully. + if line.trim().eq_ignore_ascii_case("quit") { + return Ok(()); + } + + match DataPoint::try_create(&line) { + Ok(data_point) => { + let result = self.query_handler.exec(&data_point).await; + if let Err(e) = result { + self.connection.write_line(e.to_string()).await?; + } + } + Err(e) => { + self.connection.write_line(e.to_string()).await?; + } + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::net::SocketAddr; + use std::sync::Arc; + + use async_trait::async_trait; + use tokio::net::{TcpListener, TcpStream}; + use tokio::sync::{broadcast, mpsc}; + + use super::*; + use crate::error; + use crate::query_handler::OpentsdbProtocolHandler; + + struct DummyQueryHandler { + tx: mpsc::Sender, + } + + #[async_trait] + impl OpentsdbProtocolHandler for DummyQueryHandler { + async fn exec(&self, data_point: &DataPoint) -> Result<()> { + let metric = data_point.metric(); + if metric == "should_failed" { + return error::InternalSnafu { + err_msg: "expected", + } + .fail(); + } + self.tx.send(metric.to_string()).await.unwrap(); + Ok(()) + } + } + + #[tokio::test] + async fn test_run() { + let (tx, mut rx) = mpsc::channel(100); + + let query_handler = Arc::new(DummyQueryHandler { tx }); + let (notify_shutdown, _) = broadcast::channel(1); + let addr = start_server(query_handler, notify_shutdown).await; + + let stream = TcpStream::connect(addr).await.unwrap(); + let mut client = Connection::new(stream); + + client + .write_line("put my_metric_1 1000 1.0 host=web01".to_string()) + .await + .unwrap(); + assert_eq!(rx.recv().await.unwrap(), "my_metric_1"); + + client + .write_line("put my_metric_2 1000 1.0 host=web01".to_string()) + .await + .unwrap(); + assert_eq!(rx.recv().await.unwrap(), "my_metric_2"); + + client + .write_line("put should_failed 1000 1.0 host=web01".to_string()) + .await + .unwrap(); + let resp = client.read_line().await.unwrap(); + assert_eq!(resp, Some("Internal error: expected".to_string())); + + client.write_line("get".to_string()).await.unwrap(); + let resp = client.read_line().await.unwrap(); + assert_eq!( + resp, + Some("Invalid query: unknown command get.".to_string()) + ); + } + + async fn start_server( + query_handler: OpentsdbProtocolHandlerRef, + notify_shutdown: broadcast::Sender<()>, + ) -> SocketAddr { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + loop { + let (stream, _) = listener.accept().await.unwrap(); + + let query_handler = query_handler.clone(); + let connection = Connection::new(stream); + let shutdown = Shutdown::new(notify_shutdown.subscribe()); + tokio::spawn(async move { + Handler::new(query_handler, connection, shutdown) + .run() + .await + }); + } + }); + addr + } +} diff --git a/src/servers/src/postgres/server.rs b/src/servers/src/postgres/server.rs index f34e60d9de..43aec90e4b 100644 --- a/src/servers/src/postgres/server.rs +++ b/src/servers/src/postgres/server.rs @@ -4,68 +4,40 @@ use std::sync::Arc; use async_trait::async_trait; use common_runtime::Runtime; -use common_telemetry::logging::{error, info}; -use futures::future::AbortHandle; -use futures::future::AbortRegistration; -use futures::future::Abortable; +use common_telemetry::logging::error; use futures::StreamExt; use pgwire::api::auth::noop::NoopStartupHandler; use pgwire::tokio::process_socket; -use snafu::prelude::*; use tokio; -use tokio::task::JoinHandle; -use tokio_stream::wrappers::TcpListenerStream; -use crate::error::{self, Result}; +use crate::error::Result; use crate::postgres::handler::PostgresServerHandler; use crate::query_handler::SqlQueryHandlerRef; -use crate::server::Server; +use crate::server::{AbortableStream, BaseTcpServer, Server}; pub struct PostgresServer { - // See MySQL module for usage of these types - abort_handle: AbortHandle, - abort_registration: Option, - - // A handle holding the TCP accepting task. - join_handle: Option>, - + base_server: BaseTcpServer, auth_handler: Arc, query_handler: Arc, - io_runtime: Arc, } impl PostgresServer { /// Creates a new Postgres server with provided query_handler and async runtime pub fn new(query_handler: SqlQueryHandlerRef, io_runtime: Arc) -> PostgresServer { - let (abort_handle, registration) = AbortHandle::new_pair(); let postgres_handler = Arc::new(PostgresServerHandler::new(query_handler)); let startup_handler = Arc::new(NoopStartupHandler); PostgresServer { - abort_handle, - abort_registration: Some(registration), - join_handle: None, - + base_server: BaseTcpServer::create_server("Postgres", io_runtime), auth_handler: startup_handler, query_handler: postgres_handler, - - io_runtime, } } - async fn bind(addr: SocketAddr) -> Result<(TcpListenerStream, SocketAddr)> { - let listener = tokio::net::TcpListener::bind(addr) - .await - .context(error::TokioIoSnafu { - err_msg: format!("Failed to bind addr {}", addr), - })?; - // get actually bond addr in case input addr use port 0 - let addr = listener.local_addr()?; - info!("Postgres server is bound to {}", addr); - Ok((TcpListenerStream::new(listener), addr)) - } - - fn accept(&self, accepting_stream: Abortable) -> impl Future { - let io_runtime = self.io_runtime.clone(); + fn accept( + &self, + io_runtime: Arc, + accepting_stream: AbortableStream, + ) -> impl Future { let auth_handler = self.auth_handler.clone(); let query_handler = self.query_handler.clone(); @@ -97,40 +69,15 @@ impl PostgresServer { #[async_trait] impl Server for PostgresServer { async fn shutdown(&mut self) -> Result<()> { - match self.join_handle.take() { - Some(join_handle) => { - self.abort_handle.abort(); - - if let Err(error) = join_handle.await { - // Couldn't use `error!(e; xxx)` as JoinError doesn't implement ErrorExt. - error!( - "Unexpected error during shutdown Postgres server, error: {}", - error - ); - } else { - info!("Postgres server is shutdown.") - } - Ok(()) - } - None => error::InternalSnafu { - err_msg: "Postgres server is not started.", - } - .fail()?, - } + self.base_server.shutdown().await } async fn start(&mut self, listening: SocketAddr) -> Result { - match self.abort_registration.take() { - Some(registration) => { - let (stream, listener) = Self::bind(listening).await?; - let stream = Abortable::new(stream, registration); - self.join_handle = Some(tokio::spawn(self.accept(stream))); - Ok(listener) - } - None => error::InternalSnafu { - err_msg: "Postgres server has been started.", - } - .fail()?, - } + let (stream, addr) = self.base_server.bind(listening).await?; + + let io_runtime = self.base_server.io_runtime(); + let join_handle = tokio::spawn(self.accept(io_runtime, stream)); + self.base_server.start_with(join_handle)?; + Ok(addr) } } diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 1d904846c9..74400a46b3 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -5,6 +5,7 @@ use async_trait::async_trait; use common_query::Output; use crate::error::Result; +use crate::opentsdb::codec::DataPoint; /// All query handler traits for various request protocols, like SQL or GRPC. /// Instance that wishes to support certain request protocol, just implement the corresponding @@ -19,6 +20,7 @@ use crate::error::Result; pub type SqlQueryHandlerRef = Arc; pub type GrpcQueryHandlerRef = Arc; pub type GrpcAdminHandlerRef = Arc; +pub type OpentsdbProtocolHandlerRef = Arc; #[async_trait] pub trait SqlQueryHandler { @@ -36,3 +38,10 @@ pub trait GrpcQueryHandler { pub trait GrpcAdminHandler { async fn exec_admin_request(&self, expr: AdminExpr) -> Result; } + +#[async_trait] +pub trait OpentsdbProtocolHandler { + /// A successful request will not return a response. + /// Only on error will the socket return a line of data. + async fn exec(&self, data_point: &DataPoint) -> Result<()>; +} diff --git a/src/servers/src/server.rs b/src/servers/src/server.rs index 77ffd615d7..1075cc6d2f 100644 --- a/src/servers/src/server.rs +++ b/src/servers/src/server.rs @@ -1,11 +1,115 @@ use std::net::SocketAddr; +use std::sync::Arc; use async_trait::async_trait; +use common_runtime::Runtime; +use common_telemetry::logging::{error, info}; +use futures::future::AbortRegistration; +use futures::future::{AbortHandle, Abortable}; +use snafu::ResultExt; +use tokio::task::JoinHandle; +use tokio_stream::wrappers::TcpListenerStream; -use crate::error::Result; +use crate::error::{self, Result}; + +pub(crate) type AbortableStream = Abortable; #[async_trait] pub trait Server: Send { async fn shutdown(&mut self) -> Result<()>; async fn start(&mut self, listening: SocketAddr) -> Result; } + +pub(crate) struct BaseTcpServer { + name: String, + + // `abort_handle` and `abort_registration` are used in pairs in shutting down the server. + // They work like sender and receiver for aborting stream. When the server is shutting down, + // calling `abort_handle.abort()` will "notify" `abort_registration` to stop emitting new + // elements in the stream. + abort_handle: AbortHandle, + abort_registration: Option, + + // A handle holding the TCP accepting task. + join_handle: Option>, + + io_runtime: Arc, +} + +impl BaseTcpServer { + pub(crate) fn create_server(name: impl Into, io_runtime: Arc) -> Self { + let (abort_handle, registration) = AbortHandle::new_pair(); + Self { + name: name.into(), + abort_handle, + abort_registration: Some(registration), + join_handle: None, + io_runtime, + } + } + + pub(crate) async fn shutdown(&mut self) -> Result<()> { + match self.join_handle.take() { + Some(join_handle) => { + self.abort_handle.abort(); + + if let Err(error) = join_handle.await { + // Couldn't use `error!(e; xxx)` because JoinError doesn't implement ErrorExt. + error!( + "Unexpected error during shutdown {} server, error: {}", + &self.name, error + ); + } else { + info!("{} server is shutdown.", &self.name); + } + Ok(()) + } + None => error::InternalSnafu { + err_msg: format!("{} server is not started.", &self.name), + } + .fail()?, + } + } + + pub(crate) async fn bind( + &mut self, + addr: SocketAddr, + ) -> Result<(Abortable, SocketAddr)> { + match self.abort_registration.take() { + Some(registration) => { + let listener = + tokio::net::TcpListener::bind(addr) + .await + .context(error::TokioIoSnafu { + err_msg: format!("Failed to bind addr {}", addr), + })?; + // get actually bond addr in case input addr use port 0 + let addr = listener.local_addr()?; + info!("{} server started at {}", &self.name, addr); + + let stream = TcpListenerStream::new(listener); + let stream = Abortable::new(stream, registration); + Ok((stream, addr)) + } + None => error::InternalSnafu { + err_msg: format!("{} server has been started.", &self.name), + } + .fail()?, + } + } + + pub(crate) fn start_with(&mut self, join_handle: JoinHandle<()>) -> Result<()> { + if self.join_handle.is_some() { + return error::InternalSnafu { + err_msg: format!("{} server has been started.", &self.name), + } + .fail(); + } + let _ = self.join_handle.insert(join_handle); + Ok(()) + } + + pub(crate) fn io_runtime(&self) -> Arc { + self.io_runtime.clone() + } +} diff --git a/src/servers/src/shutdown.rs b/src/servers/src/shutdown.rs new file mode 100644 index 0000000000..6ae90c7de9 --- /dev/null +++ b/src/servers/src/shutdown.rs @@ -0,0 +1,51 @@ +//! Copied from tokio's mini-redis example. + +use tokio::sync::broadcast; + +/// Listens for the server shutdown signal. +/// +/// Shutdown is signalled using a `broadcast::Receiver`. Only a single value is +/// ever sent. Once a value has been sent via the broadcast channel, the server +/// should shutdown. +/// +/// The `Shutdown` struct listens for the signal and tracks that the signal has +/// been received. Callers may query for whether the shutdown signal has been +/// received or not. +#[derive(Debug)] +pub(crate) struct Shutdown { + /// `true` if the shutdown signal has been received + shutdown: bool, + + /// The receive half of the channel used to listen for shutdown. + notify: broadcast::Receiver<()>, +} + +impl Shutdown { + /// Create a new `Shutdown` backed by the given `broadcast::Receiver`. + pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown { + Shutdown { + shutdown: false, + notify, + } + } + + /// Returns `true` if the shutdown signal has been received. + pub(crate) fn is_shutdown(&self) -> bool { + self.shutdown + } + + /// Receive the shutdown notice, waiting if necessary. + pub(crate) async fn recv(&mut self) { + // If the shutdown signal has already been received, then return + // immediately. + if self.shutdown { + return; + } + + // Cannot receive a "lag error" as only one value is ever sent. + let _ = self.notify.recv().await; + + // Remember that the signal has been received. + self.shutdown = true; + } +} diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs index 92d37d8ac5..7177c9a71d 100644 --- a/src/servers/tests/http/http_handler_test.rs +++ b/src/servers/tests/http/http_handler_test.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; -use axum::extract::{Json, Query}; -use axum::Extension; +use axum::extract::{Json, Query, State}; use common_telemetry::metric; use metrics::counter; use servers::http::handler as http_handler; @@ -14,9 +13,7 @@ use crate::create_testing_sql_query_handler; #[tokio::test] async fn test_sql_not_provided() { let query_handler = create_testing_sql_query_handler(MemTable::default_numbers_table()); - let extension = Extension(query_handler); - - let json = http_handler::sql(extension, Query(HashMap::default())).await; + let json = http_handler::sql(State(query_handler), Query(HashMap::default())).await; match json { HttpResponse::Json(json) => { assert!(!json.success()); @@ -36,9 +33,8 @@ async fn test_sql_output_rows() { let query = create_query(); let query_handler = create_testing_sql_query_handler(MemTable::default_numbers_table()); - let extension = Extension(query_handler); - let json = http_handler::sql(extension, query).await; + let json = http_handler::sql(State(query_handler), query).await; match json { HttpResponse::Json(json) => { assert!(json.success(), "{:?}", json); @@ -61,10 +57,7 @@ async fn test_metrics() { counter!("test_metrics", 1); let query = create_query(); - let query_handler = create_testing_sql_query_handler(MemTable::default_numbers_table()); - let extension = Extension(query_handler); - - let text = http_handler::metrics(extension, query).await; + let text = http_handler::metrics(query).await; match text { HttpResponse::Text(s) => assert!(s.contains("test_metrics counter")), _ => unreachable!(), @@ -77,9 +70,8 @@ async fn test_scripts() { let exec = create_script_payload(); let query_handler = create_testing_sql_query_handler(MemTable::default_numbers_table()); - let extension = Extension(query_handler); - let json = http_handler::scripts(extension, exec).await; + let json = http_handler::scripts(State(query_handler), exec).await; match json { HttpResponse::Json(json) => { assert!(json.success(), "{:?}", json); diff --git a/src/servers/tests/http/mod.rs b/src/servers/tests/http/mod.rs index 5d1b718df1..5e8292dceb 100644 --- a/src/servers/tests/http/mod.rs +++ b/src/servers/tests/http/mod.rs @@ -1 +1,2 @@ mod http_handler_test; +mod opentsdb_test; diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs new file mode 100644 index 0000000000..f0066df5cd --- /dev/null +++ b/src/servers/tests/http/opentsdb_test.rs @@ -0,0 +1,195 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use axum::Router; +use axum_test_helper::TestClient; +use common_query::Output; +use servers::error::{self, Result}; +use servers::http::HttpServer; +use servers::opentsdb::codec::DataPoint; +use servers::query_handler::{OpentsdbProtocolHandler, SqlQueryHandler}; +use tokio::sync::mpsc; + +struct DummyInstance { + tx: mpsc::Sender, +} + +#[async_trait] +impl OpentsdbProtocolHandler for DummyInstance { + async fn exec(&self, data_point: &DataPoint) -> Result<()> { + if data_point.metric() == "should_failed" { + return error::InternalSnafu { + err_msg: "expected", + } + .fail(); + } + let _ = self.tx.send(data_point.metric().to_string()).await; + Ok(()) + } +} + +#[async_trait] +impl SqlQueryHandler for DummyInstance { + async fn do_query(&self, _query: &str) -> Result { + unimplemented!() + } + + async fn insert_script(&self, _name: &str, _script: &str) -> Result<()> { + unimplemented!() + } + + async fn execute_script(&self, _name: &str) -> Result { + unimplemented!() + } +} + +fn make_test_app(tx: mpsc::Sender) -> Router { + let instance = Arc::new(DummyInstance { tx }); + let mut server = HttpServer::new(instance.clone()); + server.set_opentsdb_handler(instance); + server.make_app() +} + +#[tokio::test] +async fn test_opentsdb_put() { + let (tx, mut rx) = mpsc::channel(100); + + let app = make_test_app(tx); + let client = TestClient::new(app); + + // single data point put + let result = client + .post("/v1/opentsdb/api/put") + .body(create_data_point("m1")) + .send() + .await; + assert_eq!(result.status(), 204); + assert!(result.text().await.is_empty()); + + // multiple data point put + let result = client + .post("/v1/opentsdb/api/put") + .body(format!( + "[{},{}]", + create_data_point("m2"), + create_data_point("m3") + )) + .send() + .await; + assert_eq!(result.status(), 204); + assert!(result.text().await.is_empty()); + + // bad data point + let result = client + .post("/v1/opentsdb/api/put") + .body("hello, world") + .send() + .await; + assert_eq!(result.status(), 400); + assert_eq!( + result.text().await, + "{\"error\":\"Invalid OpenTSDB Json request, source: expected value at line 1 column 1\"}" + ); + + // internal server error + let result = client + .post("/v1/opentsdb/api/put") + .body(create_data_point("should_failed")) + .send() + .await; + assert_eq!(result.status(), 500); + assert_eq!( + result.text().await, + "{\"error\":\"Internal error: Internal error: expected\"}" + ); + + let mut metrics = vec![]; + while let Ok(s) = rx.try_recv() { + metrics.push(s); + } + assert_eq!( + metrics, + vec!["m1".to_string(), "m2".to_string(), "m3".to_string()] + ); +} + +#[tokio::test] +async fn test_opentsdb_debug_put() { + let (tx, mut rx) = mpsc::channel(100); + + let app = make_test_app(tx); + let client = TestClient::new(app); + + // single data point summary debug put + let result = client + .post("/v1/opentsdb/api/put?summary") + .body(create_data_point("m11")) + .send() + .await; + assert_eq!(result.status(), 200); + assert_eq!(result.text().await, "{\"success\":1,\"failed\":0}"); + + let result = client + .post("/v1/opentsdb/api/put?summary") + .body(create_data_point("should_failed")) + .send() + .await; + assert_eq!(result.status(), 200); + assert_eq!(result.text().await, "{\"success\":0,\"failed\":1}"); + + let result = client + .post("/v1/opentsdb/api/put?details") + .body(create_data_point("should_failed")) + .send() + .await; + assert_eq!(result.status(), 200); + assert_eq!(result.text().await, "{\"success\":0,\"failed\":1,\"errors\":[{\"datapoint\":{\"metric\":\"should_failed\",\"timestamp\":1000,\"value\":1.0,\"tags\":{\"host\":\"web01\"}},\"error\":\"Internal error: expected\"}]}"); + + // multiple data point summary debug put + let result = client + .post("/v1/opentsdb/api/put?summary") + .body(format!( + "[{},{}]", + create_data_point("should_failed"), + create_data_point("m22"), + )) + .send() + .await; + assert_eq!(result.status(), 200); + assert_eq!(result.text().await, "{\"success\":1,\"failed\":1}"); + + let result = client + .post("/v1/opentsdb/api/put?details") + .body(format!( + "[{},{}]", + create_data_point("should_failed"), + create_data_point("m33") + )) + .send() + .await; + assert_eq!(result.status(), 200); + assert_eq!(result.text().await, "{\"success\":1,\"failed\":1,\"errors\":[{\"datapoint\":{\"metric\":\"should_failed\",\"timestamp\":1000,\"value\":1.0,\"tags\":{\"host\":\"web01\"}},\"error\":\"Internal error: expected\"}]}"); + + let mut metrics = vec![]; + while let Ok(s) = rx.try_recv() { + metrics.push(s); + } + assert_eq!( + metrics, + vec!["m11".to_string(), "m22".to_string(), "m33".to_string()] + ); +} + +fn create_data_point(metric: &str) -> String { + format!( + r#"{{ + "metric": "{}", + "timestamp": 1000, + "value": 1, + "tags": {{ + "host": "web01" + }} + }}"#, + metric + ) +} diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 54abbe7935..e907f032b4 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -18,7 +18,7 @@ use script::{ engine::{CompileContext, EvalContext, Script, ScriptEngine}, python::{PyEngine, PyScript}, }; -#[cfg(feature = "postgres")] +mod opentsdb; mod postgres; struct DummyInstance { diff --git a/src/servers/tests/opentsdb.rs b/src/servers/tests/opentsdb.rs new file mode 100644 index 0000000000..8e04cd2640 --- /dev/null +++ b/src/servers/tests/opentsdb.rs @@ -0,0 +1,186 @@ +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use common_runtime::Builder as RuntimeBuilder; +use rand::rngs::StdRng; +use rand::Rng; +use servers::error::{self as server_error, Result}; +use servers::opentsdb::codec::DataPoint; +use servers::opentsdb::connection::Connection; +use servers::opentsdb::OpentsdbServer; +use servers::query_handler::OpentsdbProtocolHandler; +use servers::server::Server; +use tokio::net::TcpStream; +use tokio::sync::mpsc; + +struct DummyOpentsdbInstance { + tx: mpsc::Sender, +} + +#[async_trait] +impl OpentsdbProtocolHandler for DummyOpentsdbInstance { + async fn exec(&self, data_point: &DataPoint) -> Result<()> { + let metric = data_point.metric(); + if metric == "should_failed" { + return server_error::InternalSnafu { + err_msg: "expected", + } + .fail(); + } + let i = metric.parse::().unwrap(); + let _ = self.tx.send(i * i).await; + Ok(()) + } +} + +fn create_opentsdb_server(tx: mpsc::Sender) -> Result> { + let query_handler = Arc::new(DummyOpentsdbInstance { tx }); + let io_runtime = Arc::new( + RuntimeBuilder::default() + .worker_threads(2) + .thread_name("opentsdb-io-handlers") + .build() + .unwrap(), + ); + Ok(OpentsdbServer::create_server(query_handler, io_runtime)) +} + +#[tokio::test] +async fn test_start_opentsdb_server() -> Result<()> { + let (tx, _) = mpsc::channel(100); + let mut server = create_opentsdb_server(tx)?; + let listening = "127.0.0.1:0".parse::().unwrap(); + let result = server.start(listening).await; + assert!(result.is_ok()); + + let result = server.start(listening).await; + assert!(result + .unwrap_err() + .to_string() + .contains("OpenTSDB server has been started.")); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_shutdown_opentsdb_server() -> Result<()> { + let (tx, _) = mpsc::channel(100); + let mut server = create_opentsdb_server(tx)?; + let result = server.shutdown().await; + assert!(result + .unwrap_err() + .to_string() + .contains("OpenTSDB server is not started.")); + + let listening = "127.0.0.1:0".parse::().unwrap(); + let addr = server.start(listening).await?; + + let mut join_handles = vec![]; + for _ in 0..2 { + join_handles.push(tokio::spawn(async move { + for i in 0..1000 { + let stream = TcpStream::connect(addr).await; + match stream { + Ok(stream) => { + let mut connection = Connection::new(stream); + let result = connection.write_line(format!("put {} 1 1", i)).await; + if let Err(e) = result { + return Err(e.to_string()); + } + } + Err(e) => return Err(e.to_string()), + } + } + Ok(()) + })) + } + + tokio::time::sleep(Duration::from_millis(10)).await; + let result = server.shutdown().await; + assert!(result.is_ok()); + + for handle in join_handles.iter_mut() { + let result = handle.await.unwrap(); + assert!(result.is_err()); + let error = result.unwrap_err(); + assert!(error.contains("Connection refused") || error.contains("Connection reset by peer")); + } + Ok(()) +} + +#[tokio::test] +async fn test_query() -> Result<()> { + let (tx, mut rx) = mpsc::channel(10); + let mut server = create_opentsdb_server(tx)?; + let listening = "127.0.0.1:0".parse::().unwrap(); + let addr = server.start(listening).await?; + + let stream = TcpStream::connect(addr).await.unwrap(); + let mut connection = Connection::new(stream); + connection.write_line("put 100 1 1".to_string()).await?; + assert_eq!(rx.recv().await.unwrap(), 10000); + + connection + .write_line("foo illegal put line".to_string()) + .await + .unwrap(); + let result = connection.read_line().await?; + assert_eq!( + result, + Some("Invalid query: unknown command foo.".to_string()) + ); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_query_concurrently() -> Result<()> { + let threads = 4; + let expect_executed_queries_per_worker = 1000; + let (tx, mut rx) = mpsc::channel(threads * expect_executed_queries_per_worker); + + let mut server = create_opentsdb_server(tx)?; + let listening = "127.0.0.1:0".parse::().unwrap(); + let addr = server.start(listening).await?; + + let mut join_handles = vec![]; + for _ in 0..threads { + join_handles.push(tokio::spawn(async move { + let mut rand: StdRng = rand::SeedableRng::from_entropy(); + + let stream = TcpStream::connect(addr).await.unwrap(); + let mut connection = Connection::new(stream); + for i in 0..expect_executed_queries_per_worker { + connection + .write_line(format!("put {} 1 1", i)) + .await + .unwrap(); + + let should_recreate_conn = rand.gen_range(0..100) == 1; + if should_recreate_conn { + let stream = TcpStream::connect(addr).await.unwrap(); + connection = Connection::new(stream); + } + } + expect_executed_queries_per_worker + })) + } + + let mut total_pending_queries = threads * expect_executed_queries_per_worker; + for handle in join_handles.iter_mut() { + total_pending_queries -= handle.await.unwrap(); + } + assert_eq!(0, total_pending_queries); + + let mut expected_result: i32 = (threads + * (0..expect_executed_queries_per_worker) + .map(|i| i * i) + .sum::()) as i32; + while let Some(i) = rx.recv().await { + expected_result -= i; + if expected_result == 0 { + break; + } + } + Ok(()) +} diff --git a/src/table-engine/src/error.rs b/src/table-engine/src/error.rs index 5b7100c8b0..ae52ab1673 100644 --- a/src/table-engine/src/error.rs +++ b/src/table-engine/src/error.rs @@ -196,12 +196,13 @@ impl ErrorExt for Error { | BuildTableInfo { .. } | BuildRegionDescriptor { .. } | TableExists { .. } - | ColumnExists { .. } | ProjectedColumnNotFound { .. } | MissingTimestampIndex { .. } | UnsupportedDefaultConstraint { .. } | TableNotFound { .. } => StatusCode::InvalidArguments, + ColumnExists { .. } => StatusCode::TableColumnExists, + TableInfoNotFound { .. } => StatusCode::Unexpected, ScanTableManifest { .. } | UpdateTableManifest { .. } => StatusCode::StorageUnavailable, diff --git a/src/table-engine/src/table/test_util/mock_engine.rs b/src/table-engine/src/table/test_util/mock_engine.rs index d46c63f449..4512e74e85 100644 --- a/src/table-engine/src/table/test_util/mock_engine.rs +++ b/src/table-engine/src/table/test_util/mock_engine.rs @@ -8,7 +8,7 @@ use async_trait::async_trait; use common_error::mock::MockError; use common_telemetry::logging; use datatypes::prelude::{Value, VectorBuilder, VectorRef}; -use datatypes::schema::ColumnSchema; +use datatypes::schema::{ColumnSchema, Schema}; use storage::metadata::{RegionMetaImpl, RegionMetadata}; use storage::write_batch::{Mutation, WriteBatch}; use store_api::storage::{ @@ -75,14 +75,31 @@ impl Snapshot for MockSnapshot { async fn scan( &self, _ctx: &ReadContext, - _request: ScanRequest, + request: ScanRequest, ) -> Result> { let memtable = { let memtable = self.region.memtable.read().unwrap(); memtable.clone() }; + + let schema = self.schema(); + let projection_schema = if let Some(projection) = request.projection { + let mut columns = Vec::with_capacity(projection.len()); + for idx in projection { + columns.push( + schema + .column_schema_by_name(schema.column_name_by_index(idx)) + .unwrap() + .clone(), + ); + } + Arc::new(Schema::new(columns)) + } else { + schema.clone() + }; + let reader = MockChunkReader { - schema: self.schema().clone(), + schema: projection_schema, memtable, read: false, };