From 49263bcb2eeb8124de875b00e181979ac4aa8d87 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Thu, 27 Oct 2022 17:54:59 +0800 Subject: [PATCH] feat: update our http api and attempt to add openapi spec support --- Cargo.lock | 198 ++++++++++++++++++++++++++++++-- src/servers/Cargo.toml | 3 + src/servers/src/http.rs | 142 +++++++++++++++++------ src/servers/src/http/handler.rs | 31 ++--- 4 files changed, 317 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 391b9b2c2c..87cd7c94d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -40,6 +40,19 @@ dependencies = [ "version_check", ] +[[package]] +name = "ahash" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57e6e951cfbb2db8de1828d49073a113a29fd7117b1596caa781a258c7e38d72" +dependencies = [ + "cfg-if", + "getrandom", + "once_cell", + "serde", + "version_check", +] + [[package]] name = "aho-corasick" version = "0.7.19" @@ -49,6 +62,26 @@ dependencies = [ "memchr", ] +[[package]] +name = "aide" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a47c350c121222a7d8cc7d2efad0856ddd3a903eb62f0f5e982efb27d811c94c" +dependencies = [ + "axum 0.6.0-rc.2", + "bytes", + "cfg-if", + "http", + "indexmap", + "schemars", + "serde", + "serde_json", + "thiserror", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "alloc-no-stdlib" version = "2.0.4" @@ -154,7 +187,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e387b20dd573a96f36b173d9027483898f944d696521afd74e2caa3c813d86e" dependencies = [ - "ahash", + "ahash 0.7.6", "arrow-format", "base64", "bytemuck", @@ -379,6 +412,24 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-jsonschema" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d4b9fcccef8d5374b8dcd15488544440d15e0da3581d27b909f471604b90690" +dependencies = [ + "aide", + "async-trait", + "axum 0.6.0-rc.2", + "http", + "http-body", + "jsonschema", + "schemars", + "serde", + "serde_json", + "tracing", +] + [[package]] name = "axum-macros" version = "0.3.0-rc.1" @@ -480,6 +531,21 @@ dependencies = [ "shlex", ] +[[package]] +name = "bit-set" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1" +dependencies = [ + "bit-vec", +] + +[[package]] +name = "bit-vec" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" + [[package]] name = "bitflags" version = "1.3.2" @@ -578,6 +644,12 @@ version = "3.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d" +[[package]] +name = "bytecount" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" + [[package]] name = "bytemuck" version = "1.12.1" @@ -1343,7 +1415,7 @@ name = "datafusion" version = "7.0.0" source = "git+https://github.com/apache/arrow-datafusion.git?branch=arrow2#744b2626081db95a254fc882820fc7812f95aa51" dependencies = [ - "ahash", + "ahash 0.7.6", "arrow2", "async-trait", "chrono", @@ -1385,7 +1457,7 @@ name = "datafusion-expr" version = "7.0.0" source = "git+https://github.com/apache/arrow-datafusion.git?branch=arrow2#744b2626081db95a254fc882820fc7812f95aa51" dependencies = [ - "ahash", + "ahash 0.7.6", "arrow2", "datafusion-common", "sqlparser", @@ -1396,7 +1468,7 @@ name = "datafusion-physical-expr" version = "7.0.0" source = "git+https://github.com/apache/arrow-datafusion.git?branch=arrow2#744b2626081db95a254fc882820fc7812f95aa51" dependencies = [ - "ahash", + "ahash 0.7.6", "arrow2", "blake2", "blake3", @@ -1588,6 +1660,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "dyn-clone" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f94fa09c2aeea5b8839e414b7b841bf429fd25b9c522116ac97ee87856d88b2" + [[package]] name = "either" version = "1.8.0" @@ -1698,6 +1776,16 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" +[[package]] +name = "fancy-regex" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0678ab2d46fa5195aaf59ad034c083d351377d4af57f3e073c074d0da3e3c766" +dependencies = [ + "bit-set", + "regex", +] + [[package]] name = "fastrand" version = "1.8.0" @@ -1765,6 +1853,16 @@ dependencies = [ "regex", ] +[[package]] +name = "fraction" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99df8100674344d1cee346c764684f7ad688a4dcaa1a3efb2fdb45daf9acf4f9" +dependencies = [ + "lazy_static", + "num", +] + [[package]] name = "frontend" version = "0.1.0" @@ -2058,7 +2156,7 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" dependencies = [ - "ahash", + "ahash 0.7.6", ] [[package]] @@ -2251,6 +2349,7 @@ checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" dependencies = [ "autocfg", "hashbrown", + "serde", ] [[package]] @@ -2317,6 +2416,15 @@ dependencies = [ "syn", ] +[[package]] +name = "iso8601" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f21abb3d09069861499d93d41a970243a90e215df0cf763ac9a31b9b27178d0" +dependencies = [ + "nom", +] + [[package]] name = "itertools" version = "0.10.5" @@ -2356,6 +2464,33 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonschema" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ca9e2b45609132ae2214d50482c03aeee78826cd6fd53a8940915b81acedf16" +dependencies = [ + "ahash 0.8.0", + "anyhow", + "base64", + "bytecount", + "fancy-regex", + "fraction", + "iso8601", + "itoa 1.0.3", + "lazy_static", + "memchr", + "num-cmp", + "parking_lot", + "percent-encoding", + "regex", + "serde", + "serde_json", + "time 0.3.14", + "url", + "uuid", +] + [[package]] name = "jsonwebtoken" version = "8.1.1" @@ -2711,7 +2846,7 @@ version = "0.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b9b8653cec6897f73b519a43fba5ee3d50f62fe9af80b428accdcc093b4a849" dependencies = [ - "ahash", + "ahash 0.7.6", "metrics-macros", "portable-atomic", ] @@ -2987,6 +3122,12 @@ dependencies = [ "serde", ] +[[package]] +name = "num-cmp" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63335b2e2c34fae2fb0aa2cecfd9f0832a1e24b3b32ecec612c3426d46dc8aaa" + [[package]] name = "num-complex" version = "0.4.2" @@ -4350,7 +4491,7 @@ name = "rustpython-compiler-core" version = "0.1.2" source = "git+https://github.com/RustPython/RustPython?rev=02a1d1d#02a1d1d7db57afbb78049599c2585cc7cd59e6d3" dependencies = [ - "ahash", + "ahash 0.7.6", "indexmap", "itertools", "log", @@ -4392,7 +4533,7 @@ name = "rustpython-parser" version = "0.1.2" source = "git+https://github.com/RustPython/RustPython?rev=02a1d1d#02a1d1d7db57afbb78049599c2585cc7cd59e6d3" dependencies = [ - "ahash", + "ahash 0.7.6", "lalrpop-util", "log", "num-bigint", @@ -4417,7 +4558,7 @@ version = "0.1.2" source = "git+https://github.com/RustPython/RustPython?rev=02a1d1d#02a1d1d7db57afbb78049599c2585cc7cd59e6d3" dependencies = [ "adler32", - "ahash", + "ahash 0.7.6", "ascii", "atty", "bitflags", @@ -4546,6 +4687,31 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "schemars" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a5fb6c61f29e723026dc8e923d94c694313212abbecbbe5f55a7748eec5b307" +dependencies = [ + "dyn-clone", + "indexmap", + "schemars_derive", + "serde", + "serde_json", +] + +[[package]] +name = "schemars_derive" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f188d036977451159430f3b8dc82ec76364a42b7e289c2b18a9a18f4470058e9" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -4663,6 +4829,17 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_derive_internals" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85bf8229e7920a9f636479437026331ce11aa132b4dde37d121944a44d6e5f3c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_json" version = "1.0.85" @@ -4691,9 +4868,11 @@ dependencies = [ name = "servers" version = "0.1.0" dependencies = [ + "aide", "api", "async-trait", "axum 0.6.0-rc.2", + "axum-jsonschema", "axum-macros", "axum-test-helper", "bytes", @@ -4721,6 +4900,7 @@ dependencies = [ "prost 0.11.0", "query", "rand 0.8.5", + "schemars", "script", "serde", "serde_json", diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index ae590a6d64..213f34da0a 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -4,9 +4,11 @@ version = "0.1.0" edition = "2021" [dependencies] +aide = { version = "0.6", features = ["axum"] } api = { path = "../api" } async-trait = "0.1" axum = "0.6.0-rc.2" +axum-jsonschema = { version = "0.2", features = [ "aide" ] } axum-macros = "0.3.0-rc.1" bytes = "1.2" common-base = { path = "../common/base" } @@ -29,6 +31,7 @@ openmetrics-parser = "0.4" opensrv-mysql = "0.1" pgwire = { version = "0.4" } prost = "0.11" +schemars = "0.8" serde = "1.0" serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index bf27968c6f..6268def631 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -6,7 +6,11 @@ pub mod prometheus; use std::net::SocketAddr; use std::time::Duration; +use aide::axum::routing as apirouting; +use aide::axum::{ApiRouter, IntoApiResponse}; +use aide::openapi::{Info, OpenApi}; use async_trait::async_trait; +use axum::Extension; use axum::{ error_handling::HandleErrorLayer, http::header, @@ -17,7 +21,10 @@ use axum::{ use common_query::Output; use common_recordbatch::{util, RecordBatch}; use common_telemetry::logging::info; +use datatypes::data_type::DataType; +use schemars::JsonSchema; use serde::Serialize; +use serde_json::Value; use snafu::ResultExt; use tower::{timeout::TimeoutLayer, ServiceBuilder}; use tower_http::trace::TraceLayer; @@ -39,10 +46,10 @@ pub struct HttpServer { prom_handler: Option, } -#[derive(Serialize, Debug)] -pub enum JsonOutput { - AffectedRows(usize), - Rows(Vec), +#[derive(Debug, Serialize, JsonSchema)] +pub struct ColumnSchema { + name: String, + data_type: String, } #[derive(Serialize, Debug)] @@ -52,14 +59,73 @@ pub struct BytesResponse { pub bytes: Vec, } -#[derive(Serialize, Debug)] -pub enum HttpResponse { - Json(JsonResponse), - Text(String), - Bytes(BytesResponse), +#[derive(Debug, Serialize, JsonSchema)] +pub struct Schema { + column_schemas: Vec, } -#[derive(Serialize, Debug)] +#[derive(Debug, Serialize, JsonSchema)] +pub struct HttpRecordsOutput { + schema: Option, + rows: Vec>, +} + +impl TryFrom> for HttpRecordsOutput { + type Error = String; + + fn try_from( + recordbatches: Vec, + ) -> std::result::Result { + if recordbatches.is_empty() { + Ok(HttpRecordsOutput { + schema: None, + rows: vec![], + }) + } else { + // safety ensured by previous empty check + let first = &recordbatches[0]; + let schema = Schema { + column_schemas: first + .schema + .column_schemas() + .iter() + .map(|cs| ColumnSchema { + name: cs.name.clone(), + data_type: cs.data_type.name().to_owned(), + }) + .collect(), + }; + + let mut rows = Vec::new(); + + for recordbatch in recordbatches { + for row in recordbatch.rows() { + let row = row.map_err(|e| e.to_string())?; + let value_row = row + .into_iter() + .map(|f| Value::try_from(f).map_err(|err| err.to_string())) + .collect::, _>>()?; + + rows.push(value_row); + } + } + + Ok(HttpRecordsOutput { + schema: Some(schema), + rows, + }) + } + } +} + +#[derive(Serialize, Debug, JsonSchema)] +#[serde(rename_all = "lowercase")] +pub enum JsonOutput { + AffectedRows(usize), + Rows(HttpRecordsOutput), +} + +#[derive(Serialize, Debug, JsonSchema)] pub struct JsonResponse { success: bool, #[serde(skip_serializing_if = "Option::is_none")] @@ -68,23 +134,6 @@ pub struct JsonResponse { output: Option, } -impl IntoResponse for HttpResponse { - fn into_response(self) -> Response { - match self { - HttpResponse::Json(json) => Json(json).into_response(), - HttpResponse::Text(text) => text.into_response(), - HttpResponse::Bytes(resp) => ( - [ - (header::CONTENT_TYPE, resp.content_type), - (header::CONTENT_ENCODING, resp.content_encoding), - ], - resp.bytes, - ) - .into_response(), - } - } -} - impl JsonResponse { fn with_error(error: Option) -> Self { JsonResponse { @@ -109,11 +158,17 @@ impl JsonResponse { Self::with_output(Some(JsonOutput::AffectedRows(rows))) } Ok(Output::Stream(stream)) => match util::collect(stream).await { - Ok(rows) => Self::with_output(Some(JsonOutput::Rows(rows))), + Ok(rows) => match HttpRecordsOutput::try_from(rows) { + Ok(rows) => Self::with_output(Some(JsonOutput::Rows(rows))), + Err(err) => Self::with_error(Some(format!(": {}", err))), + }, Err(e) => Self::with_error(Some(format!("Recordbatch error: {}", e))), }, Ok(Output::RecordBatches(recordbatches)) => { - Self::with_output(Some(JsonOutput::Rows(recordbatches.take()))) + match HttpRecordsOutput::try_from(recordbatches.take()) { + Ok(rows) => Self::with_output(Some(JsonOutput::Rows(rows))), + Err(err) => Self::with_error(Some(format!(": {}", err))), + } } Err(e) => Self::with_error(Some(format!("Query engine output error: {}", e))), } @@ -140,6 +195,10 @@ async fn shutdown_signal() { .expect("failed to install CTRL+C signal handler"); } +async fn serve_api(Extension(api): Extension) -> impl IntoApiResponse { + Json(api) +} + impl HttpServer { pub fn new(sql_handler: SqlQueryHandlerRef) -> Self { Self { @@ -175,6 +234,15 @@ impl HttpServer { } pub fn make_app(&self) -> Router { + // TODO + let mut api = OpenApi { + info: Info { + description: Some("an example API".to_string()), + ..Info::default() + }, + ..OpenApi::default() + }; + // TODO(LFC): Use released Axum. // Axum version 0.6 introduces state within router, making router methods far more elegant // to write. Though version 0.6 is rc, I think it's worth to upgrade. @@ -182,10 +250,14 @@ impl HttpServer { // handlers amongst router methods. That requires us to pack all query handlers in a shared // state, and check-then-get the desired query handler in different router methods, which // is a lot of tedious work. - let sql_router = Router::with_state(self.sql_handler.clone()) - .route("/sql", routing::any(handler::sql)) - .route("/scripts", routing::post(handler::scripts)) - .route("/run-script", routing::post(handler::run_script)); + let sql_router = ApiRouter::with_state(self.sql_handler.clone()) + .api_route("/sql", apirouting::get(handler::sql)) + .api_route("/sql", apirouting::post(handler::sql)) + .api_route("/scripts", apirouting::post(handler::scripts)) + .api_route("/run-script", apirouting::post(handler::run_script)) + .route("/api.json", apirouting::get(serve_api)) + .finish_api(&mut api) + .layer(Extension(api)); let mut router = Router::new().nest(&format!("/{}", HTTP_API_VERSION), sql_router); @@ -212,8 +284,10 @@ impl HttpServer { router = router.nest(&format!("/{}/prometheus", HTTP_API_VERSION), prom_router); } + let metrics_router = Router::new().route("/", routing::get(handler::metrics)); + router = router.nest(&format!("/{}/metrics", HTTP_API_VERSION), metrics_router); + router - .route("/metrics", routing::get(handler::metrics)) // middlewares .layer( ServiceBuilder::new() diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index afcc07d75b..4a4f4612ba 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -1,10 +1,13 @@ use std::collections::HashMap; +use aide::axum::IntoApiResponse; use axum::extract::{Json, Query, State}; +use axum::response::IntoResponse; use common_telemetry::metric; +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use crate::http::{HttpResponse, JsonResponse}; +use crate::http::JsonResponse; use crate::query_handler::SqlQueryHandlerRef; /// Handler to execute sql @@ -12,11 +15,11 @@ use crate::query_handler::SqlQueryHandlerRef; pub async fn sql( State(sql_handler): State, Query(params): Query>, -) -> HttpResponse { +) -> impl IntoApiResponse { if let Some(sql) = params.get("sql") { - HttpResponse::Json(JsonResponse::from_output(sql_handler.do_query(sql).await).await) + Json(JsonResponse::from_output(sql_handler.do_query(sql).await).await) } else { - HttpResponse::Json(JsonResponse::with_error(Some( + Json(JsonResponse::with_error(Some( "sql parameter is required.".to_string(), ))) } @@ -24,15 +27,15 @@ pub async fn sql( /// Handler to export metrics #[axum_macros::debug_handler] -pub async fn metrics(Query(_params): Query>) -> HttpResponse { +pub async fn metrics(Query(_params): Query>) -> impl IntoResponse { if let Some(handle) = metric::try_handle() { - HttpResponse::Text(handle.render()) + handle.render() } else { - HttpResponse::Text("Prometheus handle not initialized.".to_string()) + "Prometheus handle not initialized.".to_owned() } } -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, JsonSchema)] pub struct ScriptExecution { pub name: String, pub script: String, @@ -43,9 +46,9 @@ pub struct ScriptExecution { pub async fn scripts( State(query_handler): State, Json(payload): Json, -) -> HttpResponse { +) -> impl IntoApiResponse { if payload.name.is_empty() || payload.script.is_empty() { - return HttpResponse::Json(JsonResponse::with_error(Some( + return Json(JsonResponse::with_error(Some( "Invalid name or script".to_string(), ))); } @@ -58,7 +61,7 @@ pub async fn scripts( Err(e) => JsonResponse::with_error(Some(format!("Insert script error: {}", e))), }; - HttpResponse::Json(body) + Json(body) } /// Handler to execute script @@ -66,14 +69,14 @@ pub async fn scripts( pub async fn run_script( State(query_handler): State, Query(params): Query>, -) -> HttpResponse { +) -> impl IntoApiResponse { let name = params.get("name"); if name.is_none() || name.unwrap().is_empty() { - return HttpResponse::Json(JsonResponse::with_error(Some("Invalid name".to_string()))); + return Json(JsonResponse::with_error(Some("Invalid name".to_string()))); } let output = query_handler.execute_script(name.unwrap()).await; - HttpResponse::Json(JsonResponse::from_output(output).await) + Json(JsonResponse::from_output(output).await) }