From 8be0f05570ed51fb849fcf6f5e65765196b5e1a5 Mon Sep 17 00:00:00 2001 From: Dongxu Wang Date: Thu, 24 Nov 2022 11:09:21 +0800 Subject: [PATCH] chore: able to config axum timeout in toml (#624) --- Cargo.lock | 13 +++- config/frontend.example.toml | 5 +- config/standalone.example.toml | 5 +- src/cmd/src/frontend.rs | 39 ++++++++++- src/cmd/src/standalone.rs | 23 +++++-- src/datanode/src/tests/http_test.rs | 6 +- src/frontend/src/frontend.rs | 5 +- src/frontend/src/server.rs | 6 +- src/servers/Cargo.toml | 1 + src/servers/src/http.rs | 80 ++++++++++++++++++++++- src/servers/tests/http/influxdb_test.rs | 4 +- src/servers/tests/http/opentsdb_test.rs | 4 +- src/servers/tests/http/prometheus_test.rs | 4 +- 13 files changed, 168 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7b2f738a97..4ab981bced 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2612,6 +2612,16 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "0.14.20" @@ -5419,6 +5429,7 @@ dependencies = [ "datatypes", "futures", "hex", + "humantime-serde", "hyper", "influxdb_line_protocol", "metrics", @@ -6635,7 +6646,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand 0.4.6", + "rand 0.8.5", "static_assertions", ] diff --git a/config/frontend.example.toml b/config/frontend.example.toml index b23335f51e..a26112ba22 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -1,6 +1,9 @@ mode = 'distributed' datanode_rpc_addr = '127.0.0.1:3001' -http_addr = '127.0.0.1:4000' + +[http_options] +addr = '127.0.0.1:4000' +timeout = "30s" [meta_client_opts] metasrv_addrs = ['127.0.0.1:3002'] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index ab728b403f..54587a6e4d 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -1,9 +1,12 @@ node_id = 0 mode = 'standalone' -http_addr = '127.0.0.1:4000' wal_dir = '/tmp/greptimedb/wal/' enable_memory_catalog = false +[http_options] +addr = '127.0.0.1:4000' +timeout = "30s" + [storage] type = 'File' data_dir = '/tmp/greptimedb/data/' diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index b6b7b8bfad..100f411d30 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -21,6 +21,7 @@ use frontend::mysql::MysqlOptions; use frontend::opentsdb::OpentsdbOptions; use frontend::postgres::PostgresOptions; use meta_client::MetaClientOpts; +use servers::http::HttpOptions; use servers::Mode; use snafu::ResultExt; @@ -96,7 +97,10 @@ impl TryFrom for FrontendOptions { }; if let Some(addr) = cmd.http_addr { - opts.http_addr = Some(addr); + opts.http_options = Some(HttpOptions { + addr, + ..Default::default() + }); } if let Some(addr) = cmd.grpc_addr { opts.grpc_options = Some(GrpcOptions { @@ -141,6 +145,8 @@ impl TryFrom for FrontendOptions { #[cfg(test)] mod tests { + use std::time::Duration; + use super::*; #[test] @@ -157,7 +163,7 @@ mod tests { }; let opts: FrontendOptions = command.try_into().unwrap(); - assert_eq!(opts.http_addr, Some("127.0.0.1:1234".to_string())); + assert_eq!(opts.http_options.as_ref().unwrap().addr, "127.0.0.1:1234"); assert_eq!(opts.mysql_options.as_ref().unwrap().addr, "127.0.0.1:5678"); assert_eq!( opts.postgres_options.as_ref().unwrap().addr, @@ -188,4 +194,33 @@ mod tests { assert!(!opts.influxdb_options.unwrap().enable); } + + #[test] + fn test_read_from_config_file() { + let command = StartCommand { + http_addr: None, + grpc_addr: None, + mysql_addr: None, + postgres_addr: None, + opentsdb_addr: None, + influxdb_enable: None, + config_file: Some(format!( + "{}/../../config/frontend.example.toml", + std::env::current_dir().unwrap().as_path().to_str().unwrap() + )), + metasrv_addr: None, + }; + + let fe_opts = FrontendOptions::try_from(command).unwrap(); + assert_eq!(Mode::Distributed, fe_opts.mode); + assert_eq!("127.0.0.1:3001".to_string(), fe_opts.datanode_rpc_addr); + assert_eq!( + "127.0.0.1:4000".to_string(), + fe_opts.http_options.as_ref().unwrap().addr + ); + assert_eq!( + Duration::from_secs(30), + fe_opts.http_options.as_ref().unwrap().timeout + ); + } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 917d32dfe0..62fbd0e82d 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -25,6 +25,7 @@ use frontend::opentsdb::OpentsdbOptions; use frontend::postgres::PostgresOptions; use frontend::prometheus::PrometheusOptions; use serde::{Deserialize, Serialize}; +use servers::http::HttpOptions; use servers::Mode; use snafu::ResultExt; use tokio::try_join; @@ -61,7 +62,7 @@ impl SubCommand { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct StandaloneOptions { - pub http_addr: Option, + pub http_options: Option, pub grpc_options: Option, pub mysql_options: Option, pub postgres_options: Option, @@ -77,7 +78,7 @@ pub struct StandaloneOptions { impl Default for StandaloneOptions { fn default() -> Self { Self { - http_addr: Some("127.0.0.1:4000".to_string()), + http_options: Some(HttpOptions::default()), grpc_options: Some(GrpcOptions::default()), mysql_options: Some(MysqlOptions::default()), postgres_options: Some(PostgresOptions::default()), @@ -95,7 +96,7 @@ impl Default for StandaloneOptions { impl StandaloneOptions { fn frontend_options(self) -> FrontendOptions { FrontendOptions { - http_addr: self.http_addr, + http_options: self.http_options, grpc_options: self.grpc_options, mysql_options: self.mysql_options, postgres_options: self.postgres_options, @@ -206,7 +207,10 @@ impl TryFrom for FrontendOptions { opts.mode = Mode::Standalone; if let Some(addr) = cmd.http_addr { - opts.http_addr = Some(addr); + opts.http_options = Some(HttpOptions { + addr, + ..Default::default() + }); } if let Some(addr) = cmd.rpc_addr { // frontend grpc addr conflict with datanode default grpc addr @@ -256,6 +260,8 @@ impl TryFrom for FrontendOptions { #[cfg(test)] mod tests { + use std::time::Duration; + use super::*; #[test] @@ -277,7 +283,14 @@ mod tests { let fe_opts = FrontendOptions::try_from(cmd).unwrap(); assert_eq!(Mode::Standalone, fe_opts.mode); assert_eq!("127.0.0.1:3001".to_string(), fe_opts.datanode_rpc_addr); - assert_eq!(Some("127.0.0.1:4000".to_string()), fe_opts.http_addr); + assert_eq!( + "127.0.0.1:4000".to_string(), + fe_opts.http_options.as_ref().unwrap().addr + ); + assert_eq!( + Duration::from_secs(30), + fe_opts.http_options.as_ref().unwrap().timeout + ); assert_eq!( "127.0.0.1:4001".to_string(), fe_opts.grpc_options.unwrap().addr diff --git a/src/datanode/src/tests/http_test.rs b/src/datanode/src/tests/http_test.rs index 7348fb6430..1dcdf9405a 100644 --- a/src/datanode/src/tests/http_test.rs +++ b/src/datanode/src/tests/http_test.rs @@ -21,7 +21,7 @@ use datatypes::prelude::ConcreteDataType; use frontend::frontend::FrontendOptions; use frontend::instance::{FrontendInstance, Instance as FeInstance}; use serde_json::json; -use servers::http::{ColumnSchema, HttpServer, JsonOutput, JsonResponse, Schema}; +use servers::http::{ColumnSchema, HttpOptions, HttpServer, JsonOutput, JsonResponse, Schema}; use test_util::TestGuard; use crate::instance::{Instance, InstanceRef}; @@ -46,7 +46,7 @@ async fn make_test_app(name: &str) -> (Router, TestGuard) { ) .await .unwrap(); - let http_server = HttpServer::new(instance); + let http_server = HttpServer::new(instance, HttpOptions::default()); (http_server.make_app(), guard) } @@ -63,7 +63,7 @@ async fn make_test_app_with_frontend(name: &str) -> (Router, TestGuard) { .await .unwrap(); frontend.start().await.unwrap(); - let mut http_server = HttpServer::new(Arc::new(frontend)); + let mut http_server = HttpServer::new(Arc::new(frontend), HttpOptions::default()); http_server.set_script_handler(instance.clone()); let app = http_server.make_app(); (app, guard) diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 8a5a538449..521ed6c834 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use meta_client::MetaClientOpts; use serde::{Deserialize, Serialize}; +use servers::http::HttpOptions; use servers::Mode; use snafu::prelude::*; @@ -31,7 +32,7 @@ use crate::server::Services; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct FrontendOptions { - pub http_addr: Option, + pub http_options: Option, pub grpc_options: Option, pub mysql_options: Option, pub postgres_options: Option, @@ -46,7 +47,7 @@ pub struct FrontendOptions { impl Default for FrontendOptions { fn default() -> Self { Self { - http_addr: Some("127.0.0.1:4000".to_string()), + http_options: Some(HttpOptions::default()), grpc_options: Some(GrpcOptions::default()), mysql_options: Some(MysqlOptions::default()), postgres_options: Some(PostgresOptions::default()), diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 9ba9587600..ede8adc8b5 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -116,10 +116,10 @@ impl Services { None }; - let http_server_and_addr = if let Some(http_addr) = &opts.http_addr { - let http_addr = parse_addr(http_addr)?; + let http_server_and_addr = if let Some(http_options) = &opts.http_options { + let http_addr = parse_addr(&http_options.addr)?; - let mut http_server = HttpServer::new(instance.clone()); + let mut http_server = HttpServer::new(instance.clone(), http_options.clone()); if opentsdb_server_and_addr.is_some() { http_server.set_opentsdb_handler(instance.clone()); } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 5a74c223fa..8a4c3f4345 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -24,6 +24,7 @@ datatypes = { path = "../datatypes" } futures = "0.3" hex = { version = "0.4" } hyper = { version = "0.14", features = ["full"] } +humantime-serde = "1.1" influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" } metrics = "0.20" num_cpus = "1.13" diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 1b90286737..3aced9a8cf 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -59,6 +59,7 @@ const HTTP_API_VERSION: &str = "v1"; pub struct HttpServer { sql_handler: SqlQueryHandlerRef, + options: HttpOptions, influxdb_handler: Option, opentsdb_handler: Option, prom_handler: Option, @@ -66,6 +67,22 @@ pub struct HttpServer { shutdown_tx: Mutex>>, } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct HttpOptions { + pub addr: String, + #[serde(with = "humantime_serde")] + pub timeout: Duration, +} + +impl Default for HttpOptions { + fn default() -> Self { + Self { + addr: "127.0.0.1:4000".to_string(), + timeout: Duration::from_secs(30), + } + } +} + #[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq)] pub struct ColumnSchema { name: String, @@ -271,9 +288,10 @@ pub struct ApiState { } impl HttpServer { - pub fn new(sql_handler: SqlQueryHandlerRef) -> Self { + pub fn new(sql_handler: SqlQueryHandlerRef, options: HttpOptions) -> Self { Self { sql_handler, + options, opentsdb_handler: None, influxdb_handler: None, prom_handler: None, @@ -385,8 +403,7 @@ impl HttpServer { ServiceBuilder::new() .layer(HandleErrorLayer::new(handle_error)) .layer(TraceLayer::new_for_http()) - // TODO(LFC): make timeout configurable - .layer(TimeoutLayer::new(Duration::from_secs(30))) + .layer(TimeoutLayer::new(self.options.timeout)) // custom layer .layer(middleware::from_fn(context::build_ctx)), ) @@ -443,14 +460,71 @@ async fn handle_error(err: BoxError) -> Json { #[cfg(test)] mod test { + use std::future::pending; use std::sync::Arc; + use axum::handler::Handler; + use axum::http::StatusCode; + use axum::routing::get; + use axum_test_helper::TestClient; use common_recordbatch::RecordBatches; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::{StringVector, UInt32Vector}; + use tokio::sync::mpsc; use super::*; + use crate::query_handler::SqlQueryHandler; + + struct DummyInstance { + _tx: mpsc::Sender<(String, Vec)>, + } + + #[async_trait] + impl SqlQueryHandler for DummyInstance { + async fn do_query(&self, _query: &str) -> Result { + unimplemented!() + } + } + + fn timeout() -> TimeoutLayer { + TimeoutLayer::new(Duration::from_millis(10)) + } + + async fn forever() { + pending().await + } + + fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { + let instance = Arc::new(DummyInstance { _tx: tx }); + let server = HttpServer::new(instance, HttpOptions::default()); + server.make_app().route( + "/test/timeout", + get(forever.layer( + ServiceBuilder::new() + .layer(HandleErrorLayer::new(|_: BoxError| async { + StatusCode::REQUEST_TIMEOUT + })) + .layer(timeout()), + )), + ) + } + + #[test] + fn test_http_options_default() { + let default = HttpOptions::default(); + assert_eq!("127.0.0.1:4000".to_string(), default.addr); + assert_eq!(Duration::from_secs(30), default.timeout) + } + + #[tokio::test] + async fn test_http_server_request_timeout() { + let (tx, _rx) = mpsc::channel(100); + let app = make_test_app(tx); + let client = TestClient::new(app); + let res = client.get("/test/timeout").send().await; + assert_eq!(res.status(), StatusCode::REQUEST_TIMEOUT); + } #[tokio::test] async fn test_recordbatches_conversion() { diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index 24a21716cf..16472d3aeb 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -20,7 +20,7 @@ use axum::Router; use axum_test_helper::TestClient; use common_query::Output; use servers::error::Result; -use servers::http::HttpServer; +use servers::http::{HttpOptions, HttpServer}; use servers::influxdb::InfluxdbRequest; use servers::query_handler::{InfluxdbLineProtocolHandler, SqlQueryHandler}; use tokio::sync::mpsc; @@ -51,7 +51,7 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: mpsc::Sender<(String, String)>) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone()); + let mut server = HttpServer::new(instance.clone(), HttpOptions::default()); server.set_influxdb_handler(instance); server.make_app() } diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index f66281302e..0ce4465e2a 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -19,7 +19,7 @@ use axum::Router; use axum_test_helper::TestClient; use common_query::Output; use servers::error::{self, Result}; -use servers::http::HttpServer; +use servers::http::{HttpOptions, HttpServer}; use servers::opentsdb::codec::DataPoint; use servers::query_handler::{OpentsdbProtocolHandler, SqlQueryHandler}; use tokio::sync::mpsc; @@ -51,7 +51,7 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: mpsc::Sender) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone()); + let mut server = HttpServer::new(instance.clone(), HttpOptions::default()); server.set_opentsdb_handler(instance); server.make_app() } diff --git a/src/servers/tests/http/prometheus_test.rs b/src/servers/tests/http/prometheus_test.rs index a5a3274dc5..ce1cd017a5 100644 --- a/src/servers/tests/http/prometheus_test.rs +++ b/src/servers/tests/http/prometheus_test.rs @@ -23,7 +23,7 @@ use axum_test_helper::TestClient; use common_query::Output; use prost::Message; use servers::error::Result; -use servers::http::HttpServer; +use servers::http::{HttpOptions, HttpServer}; use servers::prometheus; use servers::prometheus::{snappy_compress, Metrics}; use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse, SqlQueryHandler}; @@ -76,7 +76,7 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone()); + let mut server = HttpServer::new(instance.clone(), HttpOptions::default()); server.set_prom_handler(instance); server.make_app() }