feat: update our http api and attempt to add openapi spec support

This commit is contained in:
Ning Sun
2022-10-27 17:54:59 +08:00
parent c4e9931d31
commit 49263bcb2e
4 changed files with 317 additions and 57 deletions

198
Cargo.lock generated
View File

@@ -40,6 +40,19 @@ dependencies = [
"version_check",
]
[[package]]
name = "ahash"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57e6e951cfbb2db8de1828d49073a113a29fd7117b1596caa781a258c7e38d72"
dependencies = [
"cfg-if",
"getrandom",
"once_cell",
"serde",
"version_check",
]
[[package]]
name = "aho-corasick"
version = "0.7.19"
@@ -49,6 +62,26 @@ dependencies = [
"memchr",
]
[[package]]
name = "aide"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a47c350c121222a7d8cc7d2efad0856ddd3a903eb62f0f5e982efb27d811c94c"
dependencies = [
"axum 0.6.0-rc.2",
"bytes",
"cfg-if",
"http",
"indexmap",
"schemars",
"serde",
"serde_json",
"thiserror",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "alloc-no-stdlib"
version = "2.0.4"
@@ -154,7 +187,7 @@ version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e387b20dd573a96f36b173d9027483898f944d696521afd74e2caa3c813d86e"
dependencies = [
"ahash",
"ahash 0.7.6",
"arrow-format",
"base64",
"bytemuck",
@@ -379,6 +412,24 @@ dependencies = [
"tower-service",
]
[[package]]
name = "axum-jsonschema"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d4b9fcccef8d5374b8dcd15488544440d15e0da3581d27b909f471604b90690"
dependencies = [
"aide",
"async-trait",
"axum 0.6.0-rc.2",
"http",
"http-body",
"jsonschema",
"schemars",
"serde",
"serde_json",
"tracing",
]
[[package]]
name = "axum-macros"
version = "0.3.0-rc.1"
@@ -480,6 +531,21 @@ dependencies = [
"shlex",
]
[[package]]
name = "bit-set"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1"
dependencies = [
"bit-vec",
]
[[package]]
name = "bit-vec"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb"
[[package]]
name = "bitflags"
version = "1.3.2"
@@ -578,6 +644,12 @@ version = "3.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d"
[[package]]
name = "bytecount"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c"
[[package]]
name = "bytemuck"
version = "1.12.1"
@@ -1343,7 +1415,7 @@ name = "datafusion"
version = "7.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?branch=arrow2#744b2626081db95a254fc882820fc7812f95aa51"
dependencies = [
"ahash",
"ahash 0.7.6",
"arrow2",
"async-trait",
"chrono",
@@ -1385,7 +1457,7 @@ name = "datafusion-expr"
version = "7.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?branch=arrow2#744b2626081db95a254fc882820fc7812f95aa51"
dependencies = [
"ahash",
"ahash 0.7.6",
"arrow2",
"datafusion-common",
"sqlparser",
@@ -1396,7 +1468,7 @@ name = "datafusion-physical-expr"
version = "7.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?branch=arrow2#744b2626081db95a254fc882820fc7812f95aa51"
dependencies = [
"ahash",
"ahash 0.7.6",
"arrow2",
"blake2",
"blake3",
@@ -1588,6 +1660,12 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
[[package]]
name = "dyn-clone"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f94fa09c2aeea5b8839e414b7b841bf429fd25b9c522116ac97ee87856d88b2"
[[package]]
name = "either"
version = "1.8.0"
@@ -1698,6 +1776,16 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fancy-regex"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0678ab2d46fa5195aaf59ad034c083d351377d4af57f3e073c074d0da3e3c766"
dependencies = [
"bit-set",
"regex",
]
[[package]]
name = "fastrand"
version = "1.8.0"
@@ -1765,6 +1853,16 @@ dependencies = [
"regex",
]
[[package]]
name = "fraction"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99df8100674344d1cee346c764684f7ad688a4dcaa1a3efb2fdb45daf9acf4f9"
dependencies = [
"lazy_static",
"num",
]
[[package]]
name = "frontend"
version = "0.1.0"
@@ -2058,7 +2156,7 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
dependencies = [
"ahash",
"ahash 0.7.6",
]
[[package]]
@@ -2251,6 +2349,7 @@ checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e"
dependencies = [
"autocfg",
"hashbrown",
"serde",
]
[[package]]
@@ -2317,6 +2416,15 @@ dependencies = [
"syn",
]
[[package]]
name = "iso8601"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f21abb3d09069861499d93d41a970243a90e215df0cf763ac9a31b9b27178d0"
dependencies = [
"nom",
]
[[package]]
name = "itertools"
version = "0.10.5"
@@ -2356,6 +2464,33 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "jsonschema"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ca9e2b45609132ae2214d50482c03aeee78826cd6fd53a8940915b81acedf16"
dependencies = [
"ahash 0.8.0",
"anyhow",
"base64",
"bytecount",
"fancy-regex",
"fraction",
"iso8601",
"itoa 1.0.3",
"lazy_static",
"memchr",
"num-cmp",
"parking_lot",
"percent-encoding",
"regex",
"serde",
"serde_json",
"time 0.3.14",
"url",
"uuid",
]
[[package]]
name = "jsonwebtoken"
version = "8.1.1"
@@ -2711,7 +2846,7 @@ version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b9b8653cec6897f73b519a43fba5ee3d50f62fe9af80b428accdcc093b4a849"
dependencies = [
"ahash",
"ahash 0.7.6",
"metrics-macros",
"portable-atomic",
]
@@ -2987,6 +3122,12 @@ dependencies = [
"serde",
]
[[package]]
name = "num-cmp"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63335b2e2c34fae2fb0aa2cecfd9f0832a1e24b3b32ecec612c3426d46dc8aaa"
[[package]]
name = "num-complex"
version = "0.4.2"
@@ -4350,7 +4491,7 @@ name = "rustpython-compiler-core"
version = "0.1.2"
source = "git+https://github.com/RustPython/RustPython?rev=02a1d1d#02a1d1d7db57afbb78049599c2585cc7cd59e6d3"
dependencies = [
"ahash",
"ahash 0.7.6",
"indexmap",
"itertools",
"log",
@@ -4392,7 +4533,7 @@ name = "rustpython-parser"
version = "0.1.2"
source = "git+https://github.com/RustPython/RustPython?rev=02a1d1d#02a1d1d7db57afbb78049599c2585cc7cd59e6d3"
dependencies = [
"ahash",
"ahash 0.7.6",
"lalrpop-util",
"log",
"num-bigint",
@@ -4417,7 +4558,7 @@ version = "0.1.2"
source = "git+https://github.com/RustPython/RustPython?rev=02a1d1d#02a1d1d7db57afbb78049599c2585cc7cd59e6d3"
dependencies = [
"adler32",
"ahash",
"ahash 0.7.6",
"ascii",
"atty",
"bitflags",
@@ -4546,6 +4687,31 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "schemars"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a5fb6c61f29e723026dc8e923d94c694313212abbecbbe5f55a7748eec5b307"
dependencies = [
"dyn-clone",
"indexmap",
"schemars_derive",
"serde",
"serde_json",
]
[[package]]
name = "schemars_derive"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f188d036977451159430f3b8dc82ec76364a42b7e289c2b18a9a18f4470058e9"
dependencies = [
"proc-macro2",
"quote",
"serde_derive_internals",
"syn",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
@@ -4663,6 +4829,17 @@ dependencies = [
"syn",
]
[[package]]
name = "serde_derive_internals"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85bf8229e7920a9f636479437026331ce11aa132b4dde37d121944a44d6e5f3c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.85"
@@ -4691,9 +4868,11 @@ dependencies = [
name = "servers"
version = "0.1.0"
dependencies = [
"aide",
"api",
"async-trait",
"axum 0.6.0-rc.2",
"axum-jsonschema",
"axum-macros",
"axum-test-helper",
"bytes",
@@ -4721,6 +4900,7 @@ dependencies = [
"prost 0.11.0",
"query",
"rand 0.8.5",
"schemars",
"script",
"serde",
"serde_json",

View File

@@ -4,9 +4,11 @@ version = "0.1.0"
edition = "2021"
[dependencies]
aide = { version = "0.6", features = ["axum"] }
api = { path = "../api" }
async-trait = "0.1"
axum = "0.6.0-rc.2"
axum-jsonschema = { version = "0.2", features = [ "aide" ] }
axum-macros = "0.3.0-rc.1"
bytes = "1.2"
common-base = { path = "../common/base" }
@@ -29,6 +31,7 @@ openmetrics-parser = "0.4"
opensrv-mysql = "0.1"
pgwire = { version = "0.4" }
prost = "0.11"
schemars = "0.8"
serde = "1.0"
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }

View File

@@ -6,7 +6,11 @@ pub mod prometheus;
use std::net::SocketAddr;
use std::time::Duration;
use aide::axum::routing as apirouting;
use aide::axum::{ApiRouter, IntoApiResponse};
use aide::openapi::{Info, OpenApi};
use async_trait::async_trait;
use axum::Extension;
use axum::{
error_handling::HandleErrorLayer,
http::header,
@@ -17,7 +21,10 @@ use axum::{
use common_query::Output;
use common_recordbatch::{util, RecordBatch};
use common_telemetry::logging::info;
use datatypes::data_type::DataType;
use schemars::JsonSchema;
use serde::Serialize;
use serde_json::Value;
use snafu::ResultExt;
use tower::{timeout::TimeoutLayer, ServiceBuilder};
use tower_http::trace::TraceLayer;
@@ -39,10 +46,10 @@ pub struct HttpServer {
prom_handler: Option<PrometheusProtocolHandlerRef>,
}
#[derive(Serialize, Debug)]
pub enum JsonOutput {
AffectedRows(usize),
Rows(Vec<RecordBatch>),
#[derive(Debug, Serialize, JsonSchema)]
pub struct ColumnSchema {
name: String,
data_type: String,
}
#[derive(Serialize, Debug)]
@@ -52,14 +59,73 @@ pub struct BytesResponse {
pub bytes: Vec<u8>,
}
#[derive(Serialize, Debug)]
pub enum HttpResponse {
Json(JsonResponse),
Text(String),
Bytes(BytesResponse),
#[derive(Debug, Serialize, JsonSchema)]
pub struct Schema {
column_schemas: Vec<ColumnSchema>,
}
#[derive(Serialize, Debug)]
#[derive(Debug, Serialize, JsonSchema)]
pub struct HttpRecordsOutput {
schema: Option<Schema>,
rows: Vec<Vec<Value>>,
}
impl TryFrom<Vec<RecordBatch>> for HttpRecordsOutput {
type Error = String;
fn try_from(
recordbatches: Vec<RecordBatch>,
) -> std::result::Result<HttpRecordsOutput, Self::Error> {
if recordbatches.is_empty() {
Ok(HttpRecordsOutput {
schema: None,
rows: vec![],
})
} else {
// safety ensured by previous empty check
let first = &recordbatches[0];
let schema = Schema {
column_schemas: first
.schema
.column_schemas()
.iter()
.map(|cs| ColumnSchema {
name: cs.name.clone(),
data_type: cs.data_type.name().to_owned(),
})
.collect(),
};
let mut rows = Vec::new();
for recordbatch in recordbatches {
for row in recordbatch.rows() {
let row = row.map_err(|e| e.to_string())?;
let value_row = row
.into_iter()
.map(|f| Value::try_from(f).map_err(|err| err.to_string()))
.collect::<std::result::Result<Vec<Value>, _>>()?;
rows.push(value_row);
}
}
Ok(HttpRecordsOutput {
schema: Some(schema),
rows,
})
}
}
}
#[derive(Serialize, Debug, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub enum JsonOutput {
AffectedRows(usize),
Rows(HttpRecordsOutput),
}
#[derive(Serialize, Debug, JsonSchema)]
pub struct JsonResponse {
success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
@@ -68,23 +134,6 @@ pub struct JsonResponse {
output: Option<JsonOutput>,
}
impl IntoResponse for HttpResponse {
fn into_response(self) -> Response {
match self {
HttpResponse::Json(json) => Json(json).into_response(),
HttpResponse::Text(text) => text.into_response(),
HttpResponse::Bytes(resp) => (
[
(header::CONTENT_TYPE, resp.content_type),
(header::CONTENT_ENCODING, resp.content_encoding),
],
resp.bytes,
)
.into_response(),
}
}
}
impl JsonResponse {
fn with_error(error: Option<String>) -> Self {
JsonResponse {
@@ -109,11 +158,17 @@ impl JsonResponse {
Self::with_output(Some(JsonOutput::AffectedRows(rows)))
}
Ok(Output::Stream(stream)) => match util::collect(stream).await {
Ok(rows) => Self::with_output(Some(JsonOutput::Rows(rows))),
Ok(rows) => match HttpRecordsOutput::try_from(rows) {
Ok(rows) => Self::with_output(Some(JsonOutput::Rows(rows))),
Err(err) => Self::with_error(Some(format!(": {}", err))),
},
Err(e) => Self::with_error(Some(format!("Recordbatch error: {}", e))),
},
Ok(Output::RecordBatches(recordbatches)) => {
Self::with_output(Some(JsonOutput::Rows(recordbatches.take())))
match HttpRecordsOutput::try_from(recordbatches.take()) {
Ok(rows) => Self::with_output(Some(JsonOutput::Rows(rows))),
Err(err) => Self::with_error(Some(format!(": {}", err))),
}
}
Err(e) => Self::with_error(Some(format!("Query engine output error: {}", e))),
}
@@ -140,6 +195,10 @@ async fn shutdown_signal() {
.expect("failed to install CTRL+C signal handler");
}
async fn serve_api(Extension(api): Extension<OpenApi>) -> impl IntoApiResponse {
Json(api)
}
impl HttpServer {
pub fn new(sql_handler: SqlQueryHandlerRef) -> Self {
Self {
@@ -175,6 +234,15 @@ impl HttpServer {
}
pub fn make_app(&self) -> Router {
// TODO
let mut api = OpenApi {
info: Info {
description: Some("an example API".to_string()),
..Info::default()
},
..OpenApi::default()
};
// TODO(LFC): Use released Axum.
// Axum version 0.6 introduces state within router, making router methods far more elegant
// to write. Though version 0.6 is rc, I think it's worth to upgrade.
@@ -182,10 +250,14 @@ impl HttpServer {
// handlers amongst router methods. That requires us to pack all query handlers in a shared
// state, and check-then-get the desired query handler in different router methods, which
// is a lot of tedious work.
let sql_router = Router::with_state(self.sql_handler.clone())
.route("/sql", routing::any(handler::sql))
.route("/scripts", routing::post(handler::scripts))
.route("/run-script", routing::post(handler::run_script));
let sql_router = ApiRouter::with_state(self.sql_handler.clone())
.api_route("/sql", apirouting::get(handler::sql))
.api_route("/sql", apirouting::post(handler::sql))
.api_route("/scripts", apirouting::post(handler::scripts))
.api_route("/run-script", apirouting::post(handler::run_script))
.route("/api.json", apirouting::get(serve_api))
.finish_api(&mut api)
.layer(Extension(api));
let mut router = Router::new().nest(&format!("/{}", HTTP_API_VERSION), sql_router);
@@ -212,8 +284,10 @@ impl HttpServer {
router = router.nest(&format!("/{}/prometheus", HTTP_API_VERSION), prom_router);
}
let metrics_router = Router::new().route("/", routing::get(handler::metrics));
router = router.nest(&format!("/{}/metrics", HTTP_API_VERSION), metrics_router);
router
.route("/metrics", routing::get(handler::metrics))
// middlewares
.layer(
ServiceBuilder::new()

View File

@@ -1,10 +1,13 @@
use std::collections::HashMap;
use aide::axum::IntoApiResponse;
use axum::extract::{Json, Query, State};
use axum::response::IntoResponse;
use common_telemetry::metric;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::http::{HttpResponse, JsonResponse};
use crate::http::JsonResponse;
use crate::query_handler::SqlQueryHandlerRef;
/// Handler to execute sql
@@ -12,11 +15,11 @@ use crate::query_handler::SqlQueryHandlerRef;
pub async fn sql(
State(sql_handler): State<SqlQueryHandlerRef>,
Query(params): Query<HashMap<String, String>>,
) -> HttpResponse {
) -> impl IntoApiResponse {
if let Some(sql) = params.get("sql") {
HttpResponse::Json(JsonResponse::from_output(sql_handler.do_query(sql).await).await)
Json(JsonResponse::from_output(sql_handler.do_query(sql).await).await)
} else {
HttpResponse::Json(JsonResponse::with_error(Some(
Json(JsonResponse::with_error(Some(
"sql parameter is required.".to_string(),
)))
}
@@ -24,15 +27,15 @@ pub async fn sql(
/// Handler to export metrics
#[axum_macros::debug_handler]
pub async fn metrics(Query(_params): Query<HashMap<String, String>>) -> HttpResponse {
pub async fn metrics(Query(_params): Query<HashMap<String, String>>) -> impl IntoResponse {
if let Some(handle) = metric::try_handle() {
HttpResponse::Text(handle.render())
handle.render()
} else {
HttpResponse::Text("Prometheus handle not initialized.".to_string())
"Prometheus handle not initialized.".to_owned()
}
}
#[derive(Deserialize, Serialize)]
#[derive(Deserialize, Serialize, JsonSchema)]
pub struct ScriptExecution {
pub name: String,
pub script: String,
@@ -43,9 +46,9 @@ pub struct ScriptExecution {
pub async fn scripts(
State(query_handler): State<SqlQueryHandlerRef>,
Json(payload): Json<ScriptExecution>,
) -> HttpResponse {
) -> impl IntoApiResponse {
if payload.name.is_empty() || payload.script.is_empty() {
return HttpResponse::Json(JsonResponse::with_error(Some(
return Json(JsonResponse::with_error(Some(
"Invalid name or script".to_string(),
)));
}
@@ -58,7 +61,7 @@ pub async fn scripts(
Err(e) => JsonResponse::with_error(Some(format!("Insert script error: {}", e))),
};
HttpResponse::Json(body)
Json(body)
}
/// Handler to execute script
@@ -66,14 +69,14 @@ pub async fn scripts(
pub async fn run_script(
State(query_handler): State<SqlQueryHandlerRef>,
Query(params): Query<HashMap<String, String>>,
) -> HttpResponse {
) -> impl IntoApiResponse {
let name = params.get("name");
if name.is_none() || name.unwrap().is_empty() {
return HttpResponse::Json(JsonResponse::with_error(Some("Invalid name".to_string())));
return Json(JsonResponse::with_error(Some("Invalid name".to_string())));
}
let output = query_handler.execute_script(name.unwrap()).await;
HttpResponse::Json(JsonResponse::from_output(output).await)
Json(JsonResponse::from_output(output).await)
}