mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-27 16:32:54 +00:00
feat: supports null reponse format for http API (#6531)
* feat: supports null reponse format for http API Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * fix: license header and assertion Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * chore: in seconds Signed-off-by: Dennis Zhuang <killme2008@gmail.com> --------- Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
This commit is contained in:
5
.gitignore
vendored
5
.gitignore
vendored
@@ -60,4 +60,7 @@ tests-fuzz/corpus/
|
||||
greptimedb_data
|
||||
|
||||
# github
|
||||
!/.github
|
||||
!/.github
|
||||
|
||||
# Claude code
|
||||
CLAUDE.md
|
||||
|
||||
@@ -69,6 +69,7 @@ use crate::http::result::error_result::ErrorResponse;
|
||||
use crate::http::result::greptime_result_v1::GreptimedbV1Response;
|
||||
use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
|
||||
use crate::http::result::json_result::JsonResponse;
|
||||
use crate::http::result::null_result::NullResponse;
|
||||
use crate::interceptor::LogIngestInterceptorRef;
|
||||
use crate::metrics::http_metrics_layer;
|
||||
use crate::metrics_handler::MetricsHandler;
|
||||
@@ -332,6 +333,7 @@ pub enum ResponseFormat {
|
||||
GreptimedbV1,
|
||||
InfluxdbV1,
|
||||
Json,
|
||||
Null,
|
||||
}
|
||||
|
||||
impl ResponseFormat {
|
||||
@@ -345,6 +347,7 @@ impl ResponseFormat {
|
||||
"greptimedb_v1" => Some(ResponseFormat::GreptimedbV1),
|
||||
"influxdb_v1" => Some(ResponseFormat::InfluxdbV1),
|
||||
"json" => Some(ResponseFormat::Json),
|
||||
"null" => Some(ResponseFormat::Null),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@@ -357,6 +360,7 @@ impl ResponseFormat {
|
||||
ResponseFormat::GreptimedbV1 => "greptimedb_v1",
|
||||
ResponseFormat::InfluxdbV1 => "influxdb_v1",
|
||||
ResponseFormat::Json => "json",
|
||||
ResponseFormat::Null => "null",
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -414,6 +418,7 @@ pub enum HttpResponse {
|
||||
GreptimedbV1(GreptimedbV1Response),
|
||||
InfluxdbV1(InfluxdbV1Response),
|
||||
Json(JsonResponse),
|
||||
Null(NullResponse),
|
||||
}
|
||||
|
||||
impl HttpResponse {
|
||||
@@ -425,6 +430,7 @@ impl HttpResponse {
|
||||
HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(),
|
||||
HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(),
|
||||
HttpResponse::Json(resp) => resp.with_execution_time(execution_time).into(),
|
||||
HttpResponse::Null(resp) => resp.with_execution_time(execution_time).into(),
|
||||
HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(),
|
||||
}
|
||||
}
|
||||
@@ -468,6 +474,7 @@ impl IntoResponse for HttpResponse {
|
||||
HttpResponse::GreptimedbV1(resp) => resp.into_response(),
|
||||
HttpResponse::InfluxdbV1(resp) => resp.into_response(),
|
||||
HttpResponse::Json(resp) => resp.into_response(),
|
||||
HttpResponse::Null(resp) => resp.into_response(),
|
||||
HttpResponse::Error(resp) => resp.into_response(),
|
||||
}
|
||||
}
|
||||
@@ -515,6 +522,12 @@ impl From<JsonResponse> for HttpResponse {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NullResponse> for HttpResponse {
|
||||
fn from(value: NullResponse) -> Self {
|
||||
HttpResponse::Null(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ApiState {
|
||||
pub sql_handler: ServerSqlQueryHandlerRef,
|
||||
@@ -1520,6 +1533,7 @@ mod test {
|
||||
ResponseFormat::Table,
|
||||
ResponseFormat::Arrow,
|
||||
ResponseFormat::Json,
|
||||
ResponseFormat::Null,
|
||||
] {
|
||||
let recordbatches =
|
||||
RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]).unwrap();
|
||||
@@ -1533,6 +1547,7 @@ mod test {
|
||||
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
|
||||
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await,
|
||||
ResponseFormat::Json => JsonResponse::from_output(outputs).await,
|
||||
ResponseFormat::Null => NullResponse::from_output(outputs).await,
|
||||
};
|
||||
|
||||
match json_resp {
|
||||
@@ -1617,6 +1632,10 @@ mod test {
|
||||
}
|
||||
}
|
||||
|
||||
HttpResponse::Null(resp) => {
|
||||
assert_eq!(resp.rows(), 4);
|
||||
}
|
||||
|
||||
HttpResponse::Error(err) => unreachable!("{err:?}"),
|
||||
}
|
||||
}
|
||||
@@ -1648,6 +1667,7 @@ mod test {
|
||||
Some(ResponseFormat::InfluxdbV1)
|
||||
);
|
||||
assert_eq!(ResponseFormat::parse("json"), Some(ResponseFormat::Json));
|
||||
assert_eq!(ResponseFormat::parse("null"), Some(ResponseFormat::Null));
|
||||
|
||||
// invalid formats
|
||||
assert_eq!(ResponseFormat::parse("invalid"), None);
|
||||
@@ -1662,5 +1682,7 @@ mod test {
|
||||
assert_eq!(ResponseFormat::GreptimedbV1.as_str(), "greptimedb_v1");
|
||||
assert_eq!(ResponseFormat::InfluxdbV1.as_str(), "influxdb_v1");
|
||||
assert_eq!(ResponseFormat::Json.as_str(), "json");
|
||||
assert_eq!(ResponseFormat::Null.as_str(), "null");
|
||||
assert_eq!(ResponseFormat::default().as_str(), "greptimedb_v1");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +43,7 @@ use crate::http::result::error_result::ErrorResponse;
|
||||
use crate::http::result::greptime_result_v1::GreptimedbV1Response;
|
||||
use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
|
||||
use crate::http::result::json_result::JsonResponse;
|
||||
use crate::http::result::null_result::NullResponse;
|
||||
use crate::http::result::table_result::TableResponse;
|
||||
use crate::http::{
|
||||
ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput,
|
||||
@@ -145,6 +146,7 @@ pub async fn sql(
|
||||
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
|
||||
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
|
||||
ResponseFormat::Json => JsonResponse::from_output(outputs).await,
|
||||
ResponseFormat::Null => NullResponse::from_output(outputs).await,
|
||||
};
|
||||
|
||||
if let Some(limit) = query_params.limit {
|
||||
@@ -336,6 +338,7 @@ pub async fn promql(
|
||||
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
|
||||
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
|
||||
ResponseFormat::Json => JsonResponse::from_output(outputs).await,
|
||||
ResponseFormat::Null => NullResponse::from_output(outputs).await,
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -19,5 +19,6 @@ pub(crate) mod greptime_manage_resp;
|
||||
pub mod greptime_result_v1;
|
||||
pub mod influxdb_result_v1;
|
||||
pub(crate) mod json_result;
|
||||
pub(crate) mod null_result;
|
||||
pub(crate) mod prometheus_resp;
|
||||
pub(crate) mod table_result;
|
||||
|
||||
164
src/servers/src/http/result/null_result.rs
Normal file
164
src/servers/src/http/result/null_result.rs
Normal file
@@ -0,0 +1,164 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::Write;
|
||||
|
||||
use axum::http::{header, HeaderValue};
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_query::Output;
|
||||
use mime_guess::mime;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
|
||||
use crate::http::result::error_result::ErrorResponse;
|
||||
use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
enum Rows {
|
||||
Affected(usize),
|
||||
Queried(usize),
|
||||
}
|
||||
|
||||
/// The null format is a simple text format that outputs the number of affected rows or queried rows
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct NullResponse {
|
||||
rows: Rows,
|
||||
execution_time_ms: u64,
|
||||
}
|
||||
|
||||
impl NullResponse {
|
||||
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
|
||||
match handler::from_output(outputs).await {
|
||||
Err(err) => HttpResponse::Error(err),
|
||||
Ok((mut output, _)) => {
|
||||
if output.len() > 1 {
|
||||
HttpResponse::Error(ErrorResponse::from_error_message(
|
||||
StatusCode::InvalidArguments,
|
||||
"cannot output multi-statements result in null format".to_string(),
|
||||
))
|
||||
} else {
|
||||
match output.pop() {
|
||||
Some(GreptimeQueryOutput::AffectedRows(rows)) => {
|
||||
HttpResponse::Null(NullResponse {
|
||||
rows: Rows::Affected(rows),
|
||||
execution_time_ms: 0,
|
||||
})
|
||||
}
|
||||
|
||||
Some(GreptimeQueryOutput::Records(records)) => {
|
||||
HttpResponse::Null(NullResponse {
|
||||
rows: Rows::Queried(records.num_rows()),
|
||||
execution_time_ms: 0,
|
||||
})
|
||||
}
|
||||
_ => HttpResponse::Error(ErrorResponse::from_error_message(
|
||||
StatusCode::Unexpected,
|
||||
"unexpected output type".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of rows affected or queried.
|
||||
pub fn rows(&self) -> usize {
|
||||
match &self.rows {
|
||||
Rows::Affected(rows) => *rows,
|
||||
Rows::Queried(rows) => *rows,
|
||||
}
|
||||
}
|
||||
|
||||
/// Consumes `self`, updates the execution time in milliseconds, and returns the updated instance.
|
||||
pub(crate) fn with_execution_time(mut self, execution_time: u64) -> Self {
|
||||
self.execution_time_ms = execution_time;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoResponse for NullResponse {
|
||||
fn into_response(self) -> Response {
|
||||
let mut body = String::new();
|
||||
match self.rows {
|
||||
Rows::Affected(rows) => {
|
||||
let _ = writeln!(body, "{} rows affected.", rows);
|
||||
}
|
||||
Rows::Queried(rows) => {
|
||||
let _ = writeln!(body, "{} rows in set.", rows);
|
||||
}
|
||||
}
|
||||
let elapsed_secs = (self.execution_time_ms as f64) / 1000.0;
|
||||
let _ = writeln!(body, "Elapsed: {:.3} sec.", elapsed_secs);
|
||||
|
||||
let mut resp = (
|
||||
[(
|
||||
header::CONTENT_TYPE,
|
||||
HeaderValue::from_static(mime::TEXT_PLAIN_UTF_8.as_ref()),
|
||||
)],
|
||||
body,
|
||||
)
|
||||
.into_response();
|
||||
resp.headers_mut().insert(
|
||||
&GREPTIME_DB_HEADER_FORMAT,
|
||||
HeaderValue::from_static(ResponseFormat::Null.as_str()),
|
||||
);
|
||||
resp.headers_mut().insert(
|
||||
&GREPTIME_DB_HEADER_EXECUTION_TIME,
|
||||
HeaderValue::from(self.execution_time_ms),
|
||||
);
|
||||
|
||||
resp
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use axum::body::to_bytes;
|
||||
use axum::http;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_into_response_format() {
|
||||
let result = NullResponse {
|
||||
rows: Rows::Queried(42),
|
||||
execution_time_ms: 1234,
|
||||
};
|
||||
let response = result.into_response();
|
||||
|
||||
// Check status code
|
||||
assert_eq!(response.status(), http::StatusCode::OK);
|
||||
|
||||
// Check headers
|
||||
let headers = response.headers();
|
||||
assert_eq!(
|
||||
headers.get(axum::http::header::CONTENT_TYPE).unwrap(),
|
||||
mime::TEXT_PLAIN_UTF_8.as_ref()
|
||||
);
|
||||
assert_eq!(
|
||||
headers.get(&GREPTIME_DB_HEADER_FORMAT).unwrap(),
|
||||
ResponseFormat::Null.as_str()
|
||||
);
|
||||
assert_eq!(
|
||||
headers.get(&GREPTIME_DB_HEADER_EXECUTION_TIME).unwrap(),
|
||||
"1234"
|
||||
);
|
||||
|
||||
// Check body
|
||||
let body_bytes = to_bytes(response.into_body(), 1024).await.unwrap();
|
||||
let body = String::from_utf8(body_bytes.to_vec()).unwrap();
|
||||
assert!(body.contains("42 rows in set."));
|
||||
assert!(body.contains("Elapsed: 1.234 sec."));
|
||||
}
|
||||
}
|
||||
@@ -218,7 +218,7 @@ async fn test_sql_form() {
|
||||
ctx.set_current_user(auth::userinfo_by_name(None));
|
||||
let api_state = ApiState { sql_handler };
|
||||
|
||||
for format in ["greptimedb_v1", "influxdb_v1", "csv", "table"] {
|
||||
for format in ["greptimedb_v1", "influxdb_v1", "csv", "table", "null"] {
|
||||
let form = create_form(format);
|
||||
let json = http_handler::sql(
|
||||
State(api_state.clone()),
|
||||
@@ -310,6 +310,23 @@ async fn test_sql_form() {
|
||||
),
|
||||
);
|
||||
}
|
||||
HttpResponse::Null(resp) => {
|
||||
assert_eq!(resp.rows(), 1);
|
||||
let resp = resp.into_response();
|
||||
assert_eq!(
|
||||
resp.headers().get(header::CONTENT_TYPE),
|
||||
Some(HeaderValue::from_static(mime::TEXT_PLAIN_UTF_8.as_ref())).as_ref(),
|
||||
);
|
||||
let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
|
||||
.await
|
||||
.unwrap();
|
||||
let body_str = std::str::from_utf8(&body).unwrap();
|
||||
assert!(
|
||||
body_str.starts_with("1 rows in set.\n"),
|
||||
"Body did not start with expected prefix: {}",
|
||||
body_str
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -480,6 +480,15 @@ pub async fn test_sql_api(store_type: StorageType) {
|
||||
body,
|
||||
"cpu,ts,host\r\nFloat64,TimestampMillisecond,String\r\n66.6,0,\"host, \"\"name\"\r\n"
|
||||
);
|
||||
// test null format
|
||||
let res = client
|
||||
.get("/v1/sql?format=null&sql=select cpu,ts,host from demo limit 1")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let body = &res.text().await;
|
||||
assert!(body.contains("1 rows in set."));
|
||||
assert!(body.ends_with("sec.\n"));
|
||||
|
||||
// test parse method
|
||||
let res = client.get("/v1/sql/parse?sql=desc table t").send().await;
|
||||
|
||||
Reference in New Issue
Block a user