chore: able to config axum timeout in toml (#624)

This commit is contained in:
Dongxu Wang
2022-11-24 11:09:21 +08:00
committed by GitHub
parent 69f06eec8b
commit 8be0f05570
13 changed files with 168 additions and 27 deletions

13
Cargo.lock generated
View File

@@ -2612,6 +2612,16 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "humantime-serde"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c"
dependencies = [
"humantime",
"serde",
]
[[package]]
name = "hyper"
version = "0.14.20"
@@ -5419,6 +5429,7 @@ dependencies = [
"datatypes",
"futures",
"hex",
"humantime-serde",
"hyper",
"influxdb_line_protocol",
"metrics",
@@ -6635,7 +6646,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
dependencies = [
"cfg-if",
"rand 0.4.6",
"rand 0.8.5",
"static_assertions",
]

View File

@@ -1,6 +1,9 @@
mode = 'distributed'
datanode_rpc_addr = '127.0.0.1:3001'
http_addr = '127.0.0.1:4000'
[http_options]
addr = '127.0.0.1:4000'
timeout = "30s"
[meta_client_opts]
metasrv_addrs = ['127.0.0.1:3002']

View File

@@ -1,9 +1,12 @@
node_id = 0
mode = 'standalone'
http_addr = '127.0.0.1:4000'
wal_dir = '/tmp/greptimedb/wal/'
enable_memory_catalog = false
[http_options]
addr = '127.0.0.1:4000'
timeout = "30s"
[storage]
type = 'File'
data_dir = '/tmp/greptimedb/data/'

View File

@@ -21,6 +21,7 @@ use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use meta_client::MetaClientOpts;
use servers::http::HttpOptions;
use servers::Mode;
use snafu::ResultExt;
@@ -96,7 +97,10 @@ impl TryFrom<StartCommand> for FrontendOptions {
};
if let Some(addr) = cmd.http_addr {
opts.http_addr = Some(addr);
opts.http_options = Some(HttpOptions {
addr,
..Default::default()
});
}
if let Some(addr) = cmd.grpc_addr {
opts.grpc_options = Some(GrpcOptions {
@@ -141,6 +145,8 @@ impl TryFrom<StartCommand> for FrontendOptions {
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
#[test]
@@ -157,7 +163,7 @@ mod tests {
};
let opts: FrontendOptions = command.try_into().unwrap();
assert_eq!(opts.http_addr, Some("127.0.0.1:1234".to_string()));
assert_eq!(opts.http_options.as_ref().unwrap().addr, "127.0.0.1:1234");
assert_eq!(opts.mysql_options.as_ref().unwrap().addr, "127.0.0.1:5678");
assert_eq!(
opts.postgres_options.as_ref().unwrap().addr,
@@ -188,4 +194,33 @@ mod tests {
assert!(!opts.influxdb_options.unwrap().enable);
}
#[test]
fn test_read_from_config_file() {
let command = StartCommand {
http_addr: None,
grpc_addr: None,
mysql_addr: None,
postgres_addr: None,
opentsdb_addr: None,
influxdb_enable: None,
config_file: Some(format!(
"{}/../../config/frontend.example.toml",
std::env::current_dir().unwrap().as_path().to_str().unwrap()
)),
metasrv_addr: None,
};
let fe_opts = FrontendOptions::try_from(command).unwrap();
assert_eq!(Mode::Distributed, fe_opts.mode);
assert_eq!("127.0.0.1:3001".to_string(), fe_opts.datanode_rpc_addr);
assert_eq!(
"127.0.0.1:4000".to_string(),
fe_opts.http_options.as_ref().unwrap().addr
);
assert_eq!(
Duration::from_secs(30),
fe_opts.http_options.as_ref().unwrap().timeout
);
}
}

View File

@@ -25,6 +25,7 @@ use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use frontend::prometheus::PrometheusOptions;
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::Mode;
use snafu::ResultExt;
use tokio::try_join;
@@ -61,7 +62,7 @@ impl SubCommand {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StandaloneOptions {
pub http_addr: Option<String>,
pub http_options: Option<HttpOptions>,
pub grpc_options: Option<GrpcOptions>,
pub mysql_options: Option<MysqlOptions>,
pub postgres_options: Option<PostgresOptions>,
@@ -77,7 +78,7 @@ pub struct StandaloneOptions {
impl Default for StandaloneOptions {
fn default() -> Self {
Self {
http_addr: Some("127.0.0.1:4000".to_string()),
http_options: Some(HttpOptions::default()),
grpc_options: Some(GrpcOptions::default()),
mysql_options: Some(MysqlOptions::default()),
postgres_options: Some(PostgresOptions::default()),
@@ -95,7 +96,7 @@ impl Default for StandaloneOptions {
impl StandaloneOptions {
fn frontend_options(self) -> FrontendOptions {
FrontendOptions {
http_addr: self.http_addr,
http_options: self.http_options,
grpc_options: self.grpc_options,
mysql_options: self.mysql_options,
postgres_options: self.postgres_options,
@@ -206,7 +207,10 @@ impl TryFrom<StartCommand> for FrontendOptions {
opts.mode = Mode::Standalone;
if let Some(addr) = cmd.http_addr {
opts.http_addr = Some(addr);
opts.http_options = Some(HttpOptions {
addr,
..Default::default()
});
}
if let Some(addr) = cmd.rpc_addr {
// frontend grpc addr conflict with datanode default grpc addr
@@ -256,6 +260,8 @@ impl TryFrom<StartCommand> for FrontendOptions {
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
#[test]
@@ -277,7 +283,14 @@ mod tests {
let fe_opts = FrontendOptions::try_from(cmd).unwrap();
assert_eq!(Mode::Standalone, fe_opts.mode);
assert_eq!("127.0.0.1:3001".to_string(), fe_opts.datanode_rpc_addr);
assert_eq!(Some("127.0.0.1:4000".to_string()), fe_opts.http_addr);
assert_eq!(
"127.0.0.1:4000".to_string(),
fe_opts.http_options.as_ref().unwrap().addr
);
assert_eq!(
Duration::from_secs(30),
fe_opts.http_options.as_ref().unwrap().timeout
);
assert_eq!(
"127.0.0.1:4001".to_string(),
fe_opts.grpc_options.unwrap().addr

View File

@@ -21,7 +21,7 @@ use datatypes::prelude::ConcreteDataType;
use frontend::frontend::FrontendOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use serde_json::json;
use servers::http::{ColumnSchema, HttpServer, JsonOutput, JsonResponse, Schema};
use servers::http::{ColumnSchema, HttpOptions, HttpServer, JsonOutput, JsonResponse, Schema};
use test_util::TestGuard;
use crate::instance::{Instance, InstanceRef};
@@ -46,7 +46,7 @@ async fn make_test_app(name: &str) -> (Router, TestGuard) {
)
.await
.unwrap();
let http_server = HttpServer::new(instance);
let http_server = HttpServer::new(instance, HttpOptions::default());
(http_server.make_app(), guard)
}
@@ -63,7 +63,7 @@ async fn make_test_app_with_frontend(name: &str) -> (Router, TestGuard) {
.await
.unwrap();
frontend.start().await.unwrap();
let mut http_server = HttpServer::new(Arc::new(frontend));
let mut http_server = HttpServer::new(Arc::new(frontend), HttpOptions::default());
http_server.set_script_handler(instance.clone());
let app = http_server.make_app();
(app, guard)

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use meta_client::MetaClientOpts;
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::Mode;
use snafu::prelude::*;
@@ -31,7 +32,7 @@ use crate::server::Services;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FrontendOptions {
pub http_addr: Option<String>,
pub http_options: Option<HttpOptions>,
pub grpc_options: Option<GrpcOptions>,
pub mysql_options: Option<MysqlOptions>,
pub postgres_options: Option<PostgresOptions>,
@@ -46,7 +47,7 @@ pub struct FrontendOptions {
impl Default for FrontendOptions {
fn default() -> Self {
Self {
http_addr: Some("127.0.0.1:4000".to_string()),
http_options: Some(HttpOptions::default()),
grpc_options: Some(GrpcOptions::default()),
mysql_options: Some(MysqlOptions::default()),
postgres_options: Some(PostgresOptions::default()),

View File

@@ -116,10 +116,10 @@ impl Services {
None
};
let http_server_and_addr = if let Some(http_addr) = &opts.http_addr {
let http_addr = parse_addr(http_addr)?;
let http_server_and_addr = if let Some(http_options) = &opts.http_options {
let http_addr = parse_addr(&http_options.addr)?;
let mut http_server = HttpServer::new(instance.clone());
let mut http_server = HttpServer::new(instance.clone(), http_options.clone());
if opentsdb_server_and_addr.is_some() {
http_server.set_opentsdb_handler(instance.clone());
}

View File

@@ -24,6 +24,7 @@ datatypes = { path = "../datatypes" }
futures = "0.3"
hex = { version = "0.4" }
hyper = { version = "0.14", features = ["full"] }
humantime-serde = "1.1"
influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" }
metrics = "0.20"
num_cpus = "1.13"

View File

@@ -59,6 +59,7 @@ const HTTP_API_VERSION: &str = "v1";
pub struct HttpServer {
sql_handler: SqlQueryHandlerRef,
options: HttpOptions,
influxdb_handler: Option<InfluxdbLineProtocolHandlerRef>,
opentsdb_handler: Option<OpentsdbProtocolHandlerRef>,
prom_handler: Option<PrometheusProtocolHandlerRef>,
@@ -66,6 +67,22 @@ pub struct HttpServer {
shutdown_tx: Mutex<Option<Sender<()>>>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HttpOptions {
pub addr: String,
#[serde(with = "humantime_serde")]
pub timeout: Duration,
}
impl Default for HttpOptions {
fn default() -> Self {
Self {
addr: "127.0.0.1:4000".to_string(),
timeout: Duration::from_secs(30),
}
}
}
#[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq)]
pub struct ColumnSchema {
name: String,
@@ -271,9 +288,10 @@ pub struct ApiState {
}
impl HttpServer {
pub fn new(sql_handler: SqlQueryHandlerRef) -> Self {
pub fn new(sql_handler: SqlQueryHandlerRef, options: HttpOptions) -> Self {
Self {
sql_handler,
options,
opentsdb_handler: None,
influxdb_handler: None,
prom_handler: None,
@@ -385,8 +403,7 @@ impl HttpServer {
ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_error))
.layer(TraceLayer::new_for_http())
// TODO(LFC): make timeout configurable
.layer(TimeoutLayer::new(Duration::from_secs(30)))
.layer(TimeoutLayer::new(self.options.timeout))
// custom layer
.layer(middleware::from_fn(context::build_ctx)),
)
@@ -443,14 +460,71 @@ async fn handle_error(err: BoxError) -> Json<JsonResponse> {
#[cfg(test)]
mod test {
use std::future::pending;
use std::sync::Arc;
use axum::handler::Handler;
use axum::http::StatusCode;
use axum::routing::get;
use axum_test_helper::TestClient;
use common_recordbatch::RecordBatches;
use datatypes::prelude::*;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{StringVector, UInt32Vector};
use tokio::sync::mpsc;
use super::*;
use crate::query_handler::SqlQueryHandler;
struct DummyInstance {
_tx: mpsc::Sender<(String, Vec<u8>)>,
}
#[async_trait]
impl SqlQueryHandler for DummyInstance {
async fn do_query(&self, _query: &str) -> Result<Output> {
unimplemented!()
}
}
fn timeout() -> TimeoutLayer {
TimeoutLayer::new(Duration::from_millis(10))
}
async fn forever() {
pending().await
}
fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
let instance = Arc::new(DummyInstance { _tx: tx });
let server = HttpServer::new(instance, HttpOptions::default());
server.make_app().route(
"/test/timeout",
get(forever.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(|_: BoxError| async {
StatusCode::REQUEST_TIMEOUT
}))
.layer(timeout()),
)),
)
}
#[test]
fn test_http_options_default() {
let default = HttpOptions::default();
assert_eq!("127.0.0.1:4000".to_string(), default.addr);
assert_eq!(Duration::from_secs(30), default.timeout)
}
#[tokio::test]
async fn test_http_server_request_timeout() {
let (tx, _rx) = mpsc::channel(100);
let app = make_test_app(tx);
let client = TestClient::new(app);
let res = client.get("/test/timeout").send().await;
assert_eq!(res.status(), StatusCode::REQUEST_TIMEOUT);
}
#[tokio::test]
async fn test_recordbatches_conversion() {

View File

@@ -20,7 +20,7 @@ use axum::Router;
use axum_test_helper::TestClient;
use common_query::Output;
use servers::error::Result;
use servers::http::HttpServer;
use servers::http::{HttpOptions, HttpServer};
use servers::influxdb::InfluxdbRequest;
use servers::query_handler::{InfluxdbLineProtocolHandler, SqlQueryHandler};
use tokio::sync::mpsc;
@@ -51,7 +51,7 @@ impl SqlQueryHandler for DummyInstance {
fn make_test_app(tx: mpsc::Sender<(String, String)>) -> Router {
let instance = Arc::new(DummyInstance { tx });
let mut server = HttpServer::new(instance.clone());
let mut server = HttpServer::new(instance.clone(), HttpOptions::default());
server.set_influxdb_handler(instance);
server.make_app()
}

View File

@@ -19,7 +19,7 @@ use axum::Router;
use axum_test_helper::TestClient;
use common_query::Output;
use servers::error::{self, Result};
use servers::http::HttpServer;
use servers::http::{HttpOptions, HttpServer};
use servers::opentsdb::codec::DataPoint;
use servers::query_handler::{OpentsdbProtocolHandler, SqlQueryHandler};
use tokio::sync::mpsc;
@@ -51,7 +51,7 @@ impl SqlQueryHandler for DummyInstance {
fn make_test_app(tx: mpsc::Sender<String>) -> Router {
let instance = Arc::new(DummyInstance { tx });
let mut server = HttpServer::new(instance.clone());
let mut server = HttpServer::new(instance.clone(), HttpOptions::default());
server.set_opentsdb_handler(instance);
server.make_app()
}

View File

@@ -23,7 +23,7 @@ use axum_test_helper::TestClient;
use common_query::Output;
use prost::Message;
use servers::error::Result;
use servers::http::HttpServer;
use servers::http::{HttpOptions, HttpServer};
use servers::prometheus;
use servers::prometheus::{snappy_compress, Metrics};
use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse, SqlQueryHandler};
@@ -76,7 +76,7 @@ impl SqlQueryHandler for DummyInstance {
fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
let instance = Arc::new(DummyInstance { tx });
let mut server = HttpServer::new(instance.clone());
let mut server = HttpServer::new(instance.clone(), HttpOptions::default());
server.set_prom_handler(instance);
server.make_app()
}