mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: add json format output for http interface (#4797)
* feat: json output format for http * feat: add json result test case * fix: typo and refactor a piece of code * fix: cargo check * move affected_rows to top level
This commit is contained in:
@@ -59,6 +59,7 @@ use crate::http::error_result::ErrorResponse;
|
||||
use crate::http::greptime_result_v1::GreptimedbV1Response;
|
||||
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
|
||||
use crate::http::influxdb_result_v1::InfluxdbV1Response;
|
||||
use crate::http::json_result::JsonResponse;
|
||||
use crate::http::prometheus::{
|
||||
build_info_query, format_query, instant_query, label_values_query, labels_query, range_query,
|
||||
series_query,
|
||||
@@ -97,6 +98,7 @@ pub mod error_result;
|
||||
pub mod greptime_manage_resp;
|
||||
pub mod greptime_result_v1;
|
||||
pub mod influxdb_result_v1;
|
||||
pub mod json_result;
|
||||
pub mod table_result;
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
@@ -279,6 +281,7 @@ pub enum ResponseFormat {
|
||||
#[default]
|
||||
GreptimedbV1,
|
||||
InfluxdbV1,
|
||||
Json,
|
||||
}
|
||||
|
||||
impl ResponseFormat {
|
||||
@@ -289,6 +292,7 @@ impl ResponseFormat {
|
||||
"table" => Some(ResponseFormat::Table),
|
||||
"greptimedb_v1" => Some(ResponseFormat::GreptimedbV1),
|
||||
"influxdb_v1" => Some(ResponseFormat::InfluxdbV1),
|
||||
"json" => Some(ResponseFormat::Json),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@@ -300,6 +304,7 @@ impl ResponseFormat {
|
||||
ResponseFormat::Table => "table",
|
||||
ResponseFormat::GreptimedbV1 => "greptimedb_v1",
|
||||
ResponseFormat::InfluxdbV1 => "influxdb_v1",
|
||||
ResponseFormat::Json => "json",
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -356,6 +361,7 @@ pub enum HttpResponse {
|
||||
Error(ErrorResponse),
|
||||
GreptimedbV1(GreptimedbV1Response),
|
||||
InfluxdbV1(InfluxdbV1Response),
|
||||
Json(JsonResponse),
|
||||
}
|
||||
|
||||
impl HttpResponse {
|
||||
@@ -366,6 +372,7 @@ impl HttpResponse {
|
||||
HttpResponse::Table(resp) => resp.with_execution_time(execution_time).into(),
|
||||
HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(),
|
||||
HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(),
|
||||
HttpResponse::Json(resp) => resp.with_execution_time(execution_time).into(),
|
||||
HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(),
|
||||
}
|
||||
}
|
||||
@@ -375,6 +382,7 @@ impl HttpResponse {
|
||||
HttpResponse::Csv(resp) => resp.with_limit(limit).into(),
|
||||
HttpResponse::Table(resp) => resp.with_limit(limit).into(),
|
||||
HttpResponse::GreptimedbV1(resp) => resp.with_limit(limit).into(),
|
||||
HttpResponse::Json(resp) => resp.with_limit(limit).into(),
|
||||
_ => self,
|
||||
}
|
||||
}
|
||||
@@ -407,6 +415,7 @@ impl IntoResponse for HttpResponse {
|
||||
HttpResponse::Table(resp) => resp.into_response(),
|
||||
HttpResponse::GreptimedbV1(resp) => resp.into_response(),
|
||||
HttpResponse::InfluxdbV1(resp) => resp.into_response(),
|
||||
HttpResponse::Json(resp) => resp.into_response(),
|
||||
HttpResponse::Error(resp) => resp.into_response(),
|
||||
}
|
||||
}
|
||||
@@ -452,6 +461,12 @@ impl From<InfluxdbV1Response> for HttpResponse {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<JsonResponse> for HttpResponse {
|
||||
fn from(value: JsonResponse) -> Self {
|
||||
HttpResponse::Json(value)
|
||||
}
|
||||
}
|
||||
|
||||
async fn serve_api(Extension(api): Extension<OpenApi>) -> impl IntoApiResponse {
|
||||
Json(api)
|
||||
}
|
||||
@@ -1131,6 +1146,7 @@ mod test {
|
||||
ResponseFormat::Csv,
|
||||
ResponseFormat::Table,
|
||||
ResponseFormat::Arrow,
|
||||
ResponseFormat::Json,
|
||||
] {
|
||||
let recordbatches =
|
||||
RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]).unwrap();
|
||||
@@ -1141,6 +1157,7 @@ mod test {
|
||||
ResponseFormat::Table => TableResponse::from_output(outputs).await,
|
||||
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
|
||||
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await,
|
||||
ResponseFormat::Json => JsonResponse::from_output(outputs).await,
|
||||
};
|
||||
|
||||
match json_resp {
|
||||
@@ -1210,6 +1227,21 @@ mod test {
|
||||
assert_eq!(rb.num_columns(), 2);
|
||||
assert_eq!(rb.num_rows(), 4);
|
||||
}
|
||||
|
||||
HttpResponse::Json(resp) => {
|
||||
let output = &resp.output()[0];
|
||||
if let GreptimeQueryOutput::Records(r) = output {
|
||||
assert_eq!(r.num_rows(), 4);
|
||||
assert_eq!(r.num_cols(), 2);
|
||||
assert_eq!(r.schema.column_schemas[0].name, "numbers");
|
||||
assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
|
||||
assert_eq!(r.rows[0][0], serde_json::Value::from(1));
|
||||
assert_eq!(r.rows[0][1], serde_json::Value::Null);
|
||||
} else {
|
||||
panic!("invalid output type");
|
||||
}
|
||||
}
|
||||
|
||||
HttpResponse::Error(err) => unreachable!("{err:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,6 +39,7 @@ use crate::http::csv_result::CsvResponse;
|
||||
use crate::http::error_result::ErrorResponse;
|
||||
use crate::http::greptime_result_v1::GreptimedbV1Response;
|
||||
use crate::http::influxdb_result_v1::InfluxdbV1Response;
|
||||
use crate::http::json_result::JsonResponse;
|
||||
use crate::http::table_result::TableResponse;
|
||||
use crate::http::{
|
||||
ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput,
|
||||
@@ -138,6 +139,7 @@ pub async fn sql(
|
||||
ResponseFormat::Table => TableResponse::from_output(outputs).await,
|
||||
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
|
||||
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
|
||||
ResponseFormat::Json => JsonResponse::from_output(outputs).await,
|
||||
};
|
||||
|
||||
if let Some(limit) = query_params.limit {
|
||||
|
||||
137
src/servers/src/http/json_result.rs
Normal file
137
src/servers/src/http/json_result.rs
Normal file
@@ -0,0 +1,137 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use axum::http::{header, HeaderValue};
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_query::Output;
|
||||
use mime_guess::mime;
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{json, Map, Value};
|
||||
|
||||
use super::process_with_limit;
|
||||
use crate::http::error_result::ErrorResponse;
|
||||
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
|
||||
use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat};
|
||||
|
||||
/// The json format here is different from the default json output of `GreptimedbV1` result.
|
||||
/// `JsonResponse` is intended to make it easier for user to consume data.
|
||||
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
|
||||
pub struct JsonResponse {
|
||||
output: Vec<GreptimeQueryOutput>,
|
||||
execution_time_ms: u64,
|
||||
}
|
||||
|
||||
impl JsonResponse {
|
||||
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
|
||||
match handler::from_output(outputs).await {
|
||||
Err(err) => HttpResponse::Error(err),
|
||||
Ok((output, _)) => {
|
||||
if output.len() > 1 {
|
||||
HttpResponse::Error(ErrorResponse::from_error_message(
|
||||
StatusCode::InvalidArguments,
|
||||
"cannot output multi-statements result in json format".to_string(),
|
||||
))
|
||||
} else {
|
||||
HttpResponse::Json(JsonResponse {
|
||||
output,
|
||||
execution_time_ms: 0,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn output(&self) -> &[GreptimeQueryOutput] {
|
||||
&self.output
|
||||
}
|
||||
|
||||
pub fn with_execution_time(mut self, execution_time: u64) -> Self {
|
||||
self.execution_time_ms = execution_time;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn execution_time_ms(&self) -> u64 {
|
||||
self.execution_time_ms
|
||||
}
|
||||
|
||||
pub fn with_limit(mut self, limit: usize) -> Self {
|
||||
self.output = process_with_limit(self.output, limit);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoResponse for JsonResponse {
|
||||
fn into_response(mut self) -> Response {
|
||||
debug_assert!(
|
||||
self.output.len() <= 1,
|
||||
"self.output has extra elements: {}",
|
||||
self.output.len()
|
||||
);
|
||||
|
||||
let execution_time = self.execution_time_ms;
|
||||
let payload = match self.output.pop() {
|
||||
None => String::default(),
|
||||
Some(GreptimeQueryOutput::AffectedRows(n)) => json!({
|
||||
"data": [],
|
||||
"affected_rows": n,
|
||||
"execution_time_ms": execution_time,
|
||||
})
|
||||
.to_string(),
|
||||
|
||||
Some(GreptimeQueryOutput::Records(records)) => {
|
||||
let schema = records.schema();
|
||||
|
||||
let data: Vec<Map<String, Value>> = records
|
||||
.rows
|
||||
.iter()
|
||||
.map(|row| {
|
||||
schema
|
||||
.column_schemas
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, col)| (col.name.clone(), row[i].clone()))
|
||||
.collect::<Map<String, Value>>()
|
||||
})
|
||||
.collect();
|
||||
|
||||
json!({
|
||||
"data": data,
|
||||
"execution_time_ms": execution_time,
|
||||
})
|
||||
.to_string()
|
||||
}
|
||||
};
|
||||
|
||||
(
|
||||
[
|
||||
(
|
||||
header::CONTENT_TYPE,
|
||||
HeaderValue::from_static(mime::APPLICATION_JSON.as_ref()),
|
||||
),
|
||||
(
|
||||
GREPTIME_DB_HEADER_FORMAT.clone(),
|
||||
HeaderValue::from_static(ResponseFormat::Json.as_str()),
|
||||
),
|
||||
(
|
||||
GREPTIME_DB_HEADER_EXECUTION_TIME.clone(),
|
||||
HeaderValue::from(execution_time),
|
||||
),
|
||||
],
|
||||
payload,
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
@@ -182,6 +182,22 @@ pub async fn test_sql_api(store_type: StorageType) {
|
||||
})).unwrap()
|
||||
);
|
||||
|
||||
// test json result format
|
||||
let res = client
|
||||
.get("/v1/sql?format=json&sql=select * from numbers limit 10")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let body = res.json::<Value>().await;
|
||||
let data = body.get("data").expect("Missing 'data' field in response");
|
||||
|
||||
let expected = json!([
|
||||
{"number": 0}, {"number": 1}, {"number": 2}, {"number": 3}, {"number": 4},
|
||||
{"number": 5}, {"number": 6}, {"number": 7}, {"number": 8}, {"number": 9}
|
||||
]);
|
||||
assert_eq!(data, &expected);
|
||||
|
||||
// test insert and select
|
||||
let res = client
|
||||
.get("/v1/sql?sql=insert into demo values('host', 66.6, 1024, 0)")
|
||||
@@ -1307,7 +1323,7 @@ transform:
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let body: serde_json::Value = res.json().await;
|
||||
let body: Value = res.json().await;
|
||||
let schema = &body["schema"];
|
||||
let rows = &body["rows"];
|
||||
assert_eq!(
|
||||
|
||||
Reference in New Issue
Block a user