feat: Update Http SQL api for dashboard requirements (#474)

* feat: make sql api output a vector to support multi-statement

* feat: add execution_time_ms to http sql and script api

* fix: use u128 for execution time

* Apply suggestions from code review

Co-authored-by: Yingwen <realevenyag@gmail.com>

* fix: lint error

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Ning Sun
2022-11-14 21:40:31 +08:00
committed by GitHub
parent 281eae9f44
commit c673debc89
4 changed files with 190 additions and 55 deletions

View File

@@ -5,7 +5,8 @@ use axum::http::StatusCode;
use axum::Router;
use axum_test_helper::TestClient;
use datatypes::prelude::ConcreteDataType;
use servers::http::HttpServer;
use serde_json::json;
use servers::http::{ColumnSchema, HttpServer, JsonOutput, JsonResponse, Schema};
use servers::server::Server;
use test_util::TestGuard;
@@ -31,11 +32,11 @@ async fn test_sql_api() {
let res = client.get("/v1/sql").send().await;
assert_eq!(res.status(), StatusCode::OK);
let body = res.text().await;
assert_eq!(
body,
r#"{"code":1004,"error":"sql parameter is required."}"#
);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json: r#"{"code":1004,"error":"sql parameter is required."}"#
assert_eq!(body.code(), 1004);
assert_eq!(body.error().unwrap(), "sql parameter is required.");
assert!(body.execution_time_ms().is_some());
let res = client
.get("/v1/sql?sql=select * from numbers limit 10")
@@ -43,11 +44,30 @@ async fn test_sql_api() {
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = res.text().await;
assert_eq!(
body,
r#"{"code":0,"output":{"records":{"schema":{"column_schemas":[{"name":"number","data_type":"UInt32"}]},"rows":[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]]}}}"#
);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json:
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"number","data_type":"UInt32"}]},"rows":[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]]}}]}"#
assert!(body.success());
assert!(body.execution_time_ms().is_some());
let output = body.output().unwrap();
assert_eq!(output.len(), 1);
if let JsonOutput::Records(records) = &output[0] {
assert_eq!(records.num_cols(), 1);
assert_eq!(records.num_rows(), 10);
assert_eq!(
records.schema().unwrap(),
&Schema::new(vec![ColumnSchema::new(
"number".to_owned(),
"UInt32".to_owned()
)])
);
assert_eq!(records.rows()[0][0], json!(0));
assert_eq!(records.rows()[9][0], json!(9));
} else {
unreachable!()
}
// test insert and select
let res = client
@@ -63,11 +83,31 @@ async fn test_sql_api() {
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = res.text().await;
assert_eq!(
body,
r#"{"code":0,"output":{"records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"},{"name":"ts","data_type":"Timestamp"}]},"rows":[["host",66.6,1024.0,0]]}}}"#
);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json: r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"},{"name":"ts","data_type":"Timestamp"}]},"rows":[["host",66.6,1024.0,0]]}}]}"#
assert!(body.success());
assert!(body.execution_time_ms().is_some());
let output = body.output().unwrap();
assert_eq!(output.len(), 1);
if let JsonOutput::Records(records) = &output[0] {
assert_eq!(records.num_cols(), 4);
assert_eq!(records.num_rows(), 1);
assert_eq!(
records.schema().unwrap(),
&Schema::new(vec![
ColumnSchema::new("host".to_owned(), "String".to_owned()),
ColumnSchema::new("cpu".to_owned(), "Float64".to_owned()),
ColumnSchema::new("memory".to_owned(), "Float64".to_owned()),
ColumnSchema::new("ts".to_owned(), "Timestamp".to_owned())
])
);
assert_eq!(
records.rows()[0],
vec![json!("host"), json!(66.6), json!(1024.0), json!(0)]
);
} else {
unreachable!();
}
// select with projections
let res = client
@@ -76,11 +116,27 @@ async fn test_sql_api() {
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = res.text().await;
assert_eq!(
body,
r#"{"code":0,"output":{"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"Timestamp"}]},"rows":[[66.6,0]]}}}"#
);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json:
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"Timestamp"}]},"rows":[[66.6,0]]}}]}"#
assert!(body.success());
assert!(body.execution_time_ms().is_some());
let output = body.output().unwrap();
assert_eq!(output.len(), 1);
if let JsonOutput::Records(records) = &output[0] {
assert_eq!(records.num_cols(), 2);
assert_eq!(records.num_rows(), 1);
assert_eq!(
records.schema().unwrap(),
&Schema::new(vec![
ColumnSchema::new("cpu".to_owned(), "Float64".to_owned()),
ColumnSchema::new("ts".to_owned(), "Timestamp".to_owned())
])
);
assert_eq!(records.rows()[0], vec![json!(66.6), json!(0)]);
} else {
unreachable!()
}
// select with column alias
let res = client
@@ -89,11 +145,27 @@ async fn test_sql_api() {
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = res.text().await;
assert_eq!(
body,
r#"{"code":0,"output":{"records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"Timestamp"}]},"rows":[[66.6,0]]}}}"#
);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json:
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"Timestamp"}]},"rows":[[66.6,0]]}}]}"#
assert!(body.success());
assert!(body.execution_time_ms().is_some());
let output = body.output().unwrap();
assert_eq!(output.len(), 1);
if let JsonOutput::Records(records) = &output[0] {
assert_eq!(records.num_cols(), 2);
assert_eq!(records.num_rows(), 1);
assert_eq!(
records.schema().unwrap(),
&Schema::new(vec![
ColumnSchema::new("c".to_owned(), "Float64".to_owned()),
ColumnSchema::new("time".to_owned(), "Timestamp".to_owned())
])
);
assert_eq!(records.rows()[0], vec![json!(66.6), json!(0)]);
} else {
unreachable!()
}
}
#[tokio::test(flavor = "multi_thread")]
@@ -135,18 +207,36 @@ def test(n):
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = res.text().await;
assert_eq!(body, r#"{"code":0}"#,);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json: r#"{"code":0}"#
assert_eq!(body.code(), 0);
assert!(body.output().is_none());
// call script
let res = client.post("/v1/run-script?name=test").send().await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
let body = res.text().await;
assert_eq!(
body,
r#"{"code":0,"output":{"records":{"schema":{"column_schemas":[{"name":"n","data_type":"Float64"}]},"rows":[[1.0],[2.0],[3.0],[4.0],[5.0],[6.0],[7.0],[8.0],[9.0],[10.0]]}}}"#,
);
// body json:
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"n","data_type":"Float64"}]},"rows":[[1.0],[2.0],[3.0],[4.0],[5.0],[6.0],[7.0],[8.0],[9.0],[10.0]]}}]}"#
assert_eq!(body.code(), 0);
assert!(body.execution_time_ms().is_some());
let output = body.output().unwrap();
assert_eq!(output.len(), 1);
if let JsonOutput::Records(ref records) = output[0] {
assert_eq!(records.num_cols(), 1);
assert_eq!(records.num_rows(), 10);
assert_eq!(
records.schema().unwrap(),
&Schema::new(vec![ColumnSchema::new(
"n".to_owned(),
"Float64".to_owned()
)])
);
assert_eq!(records.rows()[0][0], json!(1.0));
} else {
unreachable!()
}
}
async fn start_test_app(addr: &str) -> (SocketAddr, TestGuard) {

View File

@@ -24,7 +24,7 @@ use common_telemetry::logging::info;
use datatypes::data_type::DataType;
use futures::FutureExt;
use schemars::JsonSchema;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use snafu::ensure;
use snafu::ResultExt;
@@ -51,18 +51,32 @@ pub struct HttpServer {
shutdown_tx: Mutex<Option<Sender<()>>>,
}
#[derive(Debug, Serialize, JsonSchema)]
#[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq)]
pub struct ColumnSchema {
name: String,
data_type: String,
}
#[derive(Debug, Serialize, JsonSchema)]
impl ColumnSchema {
pub fn new(name: String, data_type: String) -> ColumnSchema {
ColumnSchema { name, data_type }
}
}
#[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq)]
pub struct Schema {
column_schemas: Vec<ColumnSchema>,
}
#[derive(Debug, Serialize, JsonSchema)]
impl Schema {
pub fn new(columns: Vec<ColumnSchema>) -> Schema {
Schema {
column_schemas: columns,
}
}
}
#[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq)]
pub struct HttpRecordsOutput {
schema: Option<Schema>,
rows: Vec<Vec<Value>>,
@@ -79,6 +93,14 @@ impl HttpRecordsOutput {
.map(|x| x.column_schemas.len())
.unwrap_or(0)
}
pub fn schema(&self) -> Option<&Schema> {
self.schema.as_ref()
}
pub fn rows(&self) -> &Vec<Vec<Value>> {
&self.rows
}
}
impl TryFrom<Vec<RecordBatch>> for HttpRecordsOutput {
@@ -131,20 +153,22 @@ impl TryFrom<Vec<RecordBatch>> for HttpRecordsOutput {
}
}
#[derive(Serialize, Debug, JsonSchema)]
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub enum JsonOutput {
AffectedRows(usize),
Records(HttpRecordsOutput),
}
#[derive(Serialize, Debug, JsonSchema)]
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
pub struct JsonResponse {
code: u32,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
output: Option<JsonOutput>,
output: Option<Vec<JsonOutput>>,
#[serde(skip_serializing_if = "Option::is_none")]
execution_time_ms: Option<u128>,
}
impl JsonResponse {
@@ -153,33 +177,40 @@ impl JsonResponse {
error: Some(error),
code: error_code as u32,
output: None,
execution_time_ms: None,
}
}
fn with_output(output: Option<JsonOutput>) -> Self {
fn with_output(output: Option<Vec<JsonOutput>>) -> Self {
JsonResponse {
error: None,
code: StatusCode::Success as u32,
output,
execution_time_ms: None,
}
}
fn with_execution_time(mut self, execution_time: u128) -> Self {
self.execution_time_ms = Some(execution_time);
self
}
/// Create a json response from query result
async fn from_output(output: Result<Output>) -> Self {
match output {
Ok(Output::AffectedRows(rows)) => {
Self::with_output(Some(JsonOutput::AffectedRows(rows)))
Self::with_output(Some(vec![JsonOutput::AffectedRows(rows)]))
}
Ok(Output::Stream(stream)) => match util::collect(stream).await {
Ok(rows) => match HttpRecordsOutput::try_from(rows) {
Ok(rows) => Self::with_output(Some(JsonOutput::Records(rows))),
Ok(rows) => Self::with_output(Some(vec![JsonOutput::Records(rows)])),
Err(err) => Self::with_error(err, StatusCode::Internal),
},
Err(e) => Self::with_error(format!("Recordbatch error: {}", e), e.status_code()),
},
Ok(Output::RecordBatches(recordbatches)) => {
match HttpRecordsOutput::try_from(recordbatches.take()) {
Ok(rows) => Self::with_output(Some(JsonOutput::Records(rows))),
Ok(rows) => Self::with_output(Some(vec![JsonOutput::Records(rows)])),
Err(err) => Self::with_error(err, StatusCode::Internal),
}
}
@@ -189,6 +220,10 @@ impl JsonResponse {
}
}
pub fn code(&self) -> u32 {
self.code
}
pub fn success(&self) -> bool {
self.code == (StatusCode::Success as u32)
}
@@ -197,8 +232,12 @@ impl JsonResponse {
self.error.as_ref()
}
pub fn output(&self) -> Option<&JsonOutput> {
self.output.as_ref()
pub fn output(&self) -> Option<&[JsonOutput]> {
self.output.as_deref()
}
pub fn execution_time_ms(&self) -> Option<u128> {
self.execution_time_ms
}
}
@@ -402,11 +441,11 @@ mod test {
let json_resp = JsonResponse::from_output(Ok(Output::RecordBatches(recordbatches))).await;
let json_output = json_resp.output.unwrap();
let json_output = &json_resp.output.unwrap()[0];
if let JsonOutput::Records(r) = json_output {
assert_eq!(r.num_rows(), 4);
assert_eq!(r.num_cols(), 2);
let schema = r.schema.unwrap();
let schema = r.schema.as_ref().unwrap();
assert_eq!(schema.column_schemas[0].name, "numbers");
assert_eq!(schema.column_schemas[0].data_type, "UInt32");
assert_eq!(r.rows[0][0], serde_json::Value::from(1));

View File

@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::time::Instant;
use aide::transform::TransformOperation;
use axum::extract::{Json, Query, RawBody, State};
@@ -23,14 +24,17 @@ pub async fn sql(
State(sql_handler): State<SqlQueryHandlerRef>,
Query(params): Query<SqlQuery>,
) -> Json<JsonResponse> {
if let Some(ref sql) = params.sql {
Json(JsonResponse::from_output(sql_handler.do_query(sql).await).await)
let start = Instant::now();
let resp = if let Some(ref sql) = params.sql {
JsonResponse::from_output(sql_handler.do_query(sql).await).await
} else {
Json(JsonResponse::with_error(
JsonResponse::with_error(
"sql parameter is required.".to_string(),
StatusCode::InvalidArguments,
))
}
)
};
Json(resp.with_execution_time(start.elapsed().as_millis()))
}
pub(crate) fn sql_docs(op: TransformOperation) -> TransformOperation {
@@ -104,6 +108,7 @@ pub async fn run_script(
State(query_handler): State<SqlQueryHandlerRef>,
Query(params): Query<ScriptQuery>,
) -> Json<JsonResponse> {
let start = Instant::now();
let name = params.name.as_ref();
if name.is_none() || name.unwrap().is_empty() {
@@ -111,6 +116,7 @@ pub async fn run_script(
}
let output = query_handler.execute_script(name.unwrap()).await;
let resp = JsonResponse::from_output(output).await;
Json(JsonResponse::from_output(output).await)
Json(resp.with_execution_time(start.elapsed().as_millis()))
}

View File

@@ -36,7 +36,7 @@ async fn test_sql_output_rows() {
let Json(json) = http_handler::sql(State(query_handler), query).await;
assert!(json.success(), "{:?}", json);
assert!(json.error().is_none());
match json.output().expect("assertion failed") {
match &json.output().expect("assertion failed")[0] {
JsonOutput::Records(records) => {
assert_eq!(1, records.num_rows());
}