From 411eb768b10b1f0da41c1c145eb20bc21b0a0597 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Wed, 16 Jul 2025 11:46:39 +0800 Subject: [PATCH] feat: supports null reponse format for http API (#6531) * feat: supports null reponse format for http API Signed-off-by: Dennis Zhuang * fix: license header and assertion Signed-off-by: Dennis Zhuang * chore: in seconds Signed-off-by: Dennis Zhuang --------- Signed-off-by: Dennis Zhuang --- .gitignore | 5 +- src/servers/src/http.rs | 22 +++ src/servers/src/http/handler.rs | 3 + src/servers/src/http/result.rs | 1 + src/servers/src/http/result/null_result.rs | 164 ++++++++++++++++++++ src/servers/tests/http/http_handler_test.rs | 19 ++- tests-integration/tests/http.rs | 9 ++ 7 files changed, 221 insertions(+), 2 deletions(-) create mode 100644 src/servers/src/http/result/null_result.rs diff --git a/.gitignore b/.gitignore index dbfab66be5..66f0fd9268 100644 --- a/.gitignore +++ b/.gitignore @@ -60,4 +60,7 @@ tests-fuzz/corpus/ greptimedb_data # github -!/.github \ No newline at end of file +!/.github + +# Claude code +CLAUDE.md diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 2e699180fe..f96618bb9f 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -69,6 +69,7 @@ use crate::http::result::error_result::ErrorResponse; use crate::http::result::greptime_result_v1::GreptimedbV1Response; use crate::http::result::influxdb_result_v1::InfluxdbV1Response; use crate::http::result::json_result::JsonResponse; +use crate::http::result::null_result::NullResponse; use crate::interceptor::LogIngestInterceptorRef; use crate::metrics::http_metrics_layer; use crate::metrics_handler::MetricsHandler; @@ -332,6 +333,7 @@ pub enum ResponseFormat { GreptimedbV1, InfluxdbV1, Json, + Null, } impl ResponseFormat { @@ -345,6 +347,7 @@ impl ResponseFormat { "greptimedb_v1" => Some(ResponseFormat::GreptimedbV1), "influxdb_v1" => Some(ResponseFormat::InfluxdbV1), "json" => Some(ResponseFormat::Json), + "null" => Some(ResponseFormat::Null), _ => None, } } @@ -357,6 +360,7 @@ impl ResponseFormat { ResponseFormat::GreptimedbV1 => "greptimedb_v1", ResponseFormat::InfluxdbV1 => "influxdb_v1", ResponseFormat::Json => "json", + ResponseFormat::Null => "null", } } } @@ -414,6 +418,7 @@ pub enum HttpResponse { GreptimedbV1(GreptimedbV1Response), InfluxdbV1(InfluxdbV1Response), Json(JsonResponse), + Null(NullResponse), } impl HttpResponse { @@ -425,6 +430,7 @@ impl HttpResponse { HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::Json(resp) => resp.with_execution_time(execution_time).into(), + HttpResponse::Null(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(), } } @@ -468,6 +474,7 @@ impl IntoResponse for HttpResponse { HttpResponse::GreptimedbV1(resp) => resp.into_response(), HttpResponse::InfluxdbV1(resp) => resp.into_response(), HttpResponse::Json(resp) => resp.into_response(), + HttpResponse::Null(resp) => resp.into_response(), HttpResponse::Error(resp) => resp.into_response(), } } @@ -515,6 +522,12 @@ impl From for HttpResponse { } } +impl From for HttpResponse { + fn from(value: NullResponse) -> Self { + HttpResponse::Null(value) + } +} + #[derive(Clone)] pub struct ApiState { pub sql_handler: ServerSqlQueryHandlerRef, @@ -1520,6 +1533,7 @@ mod test { ResponseFormat::Table, ResponseFormat::Arrow, ResponseFormat::Json, + ResponseFormat::Null, ] { let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]).unwrap(); @@ -1533,6 +1547,7 @@ mod test { ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await, ResponseFormat::Json => JsonResponse::from_output(outputs).await, + ResponseFormat::Null => NullResponse::from_output(outputs).await, }; match json_resp { @@ -1617,6 +1632,10 @@ mod test { } } + HttpResponse::Null(resp) => { + assert_eq!(resp.rows(), 4); + } + HttpResponse::Error(err) => unreachable!("{err:?}"), } } @@ -1648,6 +1667,7 @@ mod test { Some(ResponseFormat::InfluxdbV1) ); assert_eq!(ResponseFormat::parse("json"), Some(ResponseFormat::Json)); + assert_eq!(ResponseFormat::parse("null"), Some(ResponseFormat::Null)); // invalid formats assert_eq!(ResponseFormat::parse("invalid"), None); @@ -1662,5 +1682,7 @@ mod test { assert_eq!(ResponseFormat::GreptimedbV1.as_str(), "greptimedb_v1"); assert_eq!(ResponseFormat::InfluxdbV1.as_str(), "influxdb_v1"); assert_eq!(ResponseFormat::Json.as_str(), "json"); + assert_eq!(ResponseFormat::Null.as_str(), "null"); + assert_eq!(ResponseFormat::default().as_str(), "greptimedb_v1"); } } diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 4c6253c791..6cd977d7fa 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -43,6 +43,7 @@ use crate::http::result::error_result::ErrorResponse; use crate::http::result::greptime_result_v1::GreptimedbV1Response; use crate::http::result::influxdb_result_v1::InfluxdbV1Response; use crate::http::result::json_result::JsonResponse; +use crate::http::result::null_result::NullResponse; use crate::http::result::table_result::TableResponse; use crate::http::{ ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput, @@ -145,6 +146,7 @@ pub async fn sql( ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await, ResponseFormat::Json => JsonResponse::from_output(outputs).await, + ResponseFormat::Null => NullResponse::from_output(outputs).await, }; if let Some(limit) = query_params.limit { @@ -336,6 +338,7 @@ pub async fn promql( ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await, ResponseFormat::Json => JsonResponse::from_output(outputs).await, + ResponseFormat::Null => NullResponse::from_output(outputs).await, } }; diff --git a/src/servers/src/http/result.rs b/src/servers/src/http/result.rs index 8fd4df8576..dbad6dc3bc 100644 --- a/src/servers/src/http/result.rs +++ b/src/servers/src/http/result.rs @@ -19,5 +19,6 @@ pub(crate) mod greptime_manage_resp; pub mod greptime_result_v1; pub mod influxdb_result_v1; pub(crate) mod json_result; +pub(crate) mod null_result; pub(crate) mod prometheus_resp; pub(crate) mod table_result; diff --git a/src/servers/src/http/result/null_result.rs b/src/servers/src/http/result/null_result.rs new file mode 100644 index 0000000000..82c65a84b3 --- /dev/null +++ b/src/servers/src/http/result/null_result.rs @@ -0,0 +1,164 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Write; + +use axum::http::{header, HeaderValue}; +use axum::response::{IntoResponse, Response}; +use common_error::status_code::StatusCode; +use common_query::Output; +use mime_guess::mime; +use serde::{Deserialize, Serialize}; + +use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; +use crate::http::result::error_result::ErrorResponse; +use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; + +#[derive(Serialize, Deserialize, Debug)] +enum Rows { + Affected(usize), + Queried(usize), +} + +/// The null format is a simple text format that outputs the number of affected rows or queried rows +#[derive(Serialize, Deserialize, Debug)] +pub struct NullResponse { + rows: Rows, + execution_time_ms: u64, +} + +impl NullResponse { + pub async fn from_output(outputs: Vec>) -> HttpResponse { + match handler::from_output(outputs).await { + Err(err) => HttpResponse::Error(err), + Ok((mut output, _)) => { + if output.len() > 1 { + HttpResponse::Error(ErrorResponse::from_error_message( + StatusCode::InvalidArguments, + "cannot output multi-statements result in null format".to_string(), + )) + } else { + match output.pop() { + Some(GreptimeQueryOutput::AffectedRows(rows)) => { + HttpResponse::Null(NullResponse { + rows: Rows::Affected(rows), + execution_time_ms: 0, + }) + } + + Some(GreptimeQueryOutput::Records(records)) => { + HttpResponse::Null(NullResponse { + rows: Rows::Queried(records.num_rows()), + execution_time_ms: 0, + }) + } + _ => HttpResponse::Error(ErrorResponse::from_error_message( + StatusCode::Unexpected, + "unexpected output type".to_string(), + )), + } + } + } + } + } + + /// Returns the number of rows affected or queried. + pub fn rows(&self) -> usize { + match &self.rows { + Rows::Affected(rows) => *rows, + Rows::Queried(rows) => *rows, + } + } + + /// Consumes `self`, updates the execution time in milliseconds, and returns the updated instance. + pub(crate) fn with_execution_time(mut self, execution_time: u64) -> Self { + self.execution_time_ms = execution_time; + self + } +} + +impl IntoResponse for NullResponse { + fn into_response(self) -> Response { + let mut body = String::new(); + match self.rows { + Rows::Affected(rows) => { + let _ = writeln!(body, "{} rows affected.", rows); + } + Rows::Queried(rows) => { + let _ = writeln!(body, "{} rows in set.", rows); + } + } + let elapsed_secs = (self.execution_time_ms as f64) / 1000.0; + let _ = writeln!(body, "Elapsed: {:.3} sec.", elapsed_secs); + + let mut resp = ( + [( + header::CONTENT_TYPE, + HeaderValue::from_static(mime::TEXT_PLAIN_UTF_8.as_ref()), + )], + body, + ) + .into_response(); + resp.headers_mut().insert( + &GREPTIME_DB_HEADER_FORMAT, + HeaderValue::from_static(ResponseFormat::Null.as_str()), + ); + resp.headers_mut().insert( + &GREPTIME_DB_HEADER_EXECUTION_TIME, + HeaderValue::from(self.execution_time_ms), + ); + + resp + } +} +#[cfg(test)] +mod tests { + use axum::body::to_bytes; + use axum::http; + + use super::*; + + #[tokio::test] + async fn test_into_response_format() { + let result = NullResponse { + rows: Rows::Queried(42), + execution_time_ms: 1234, + }; + let response = result.into_response(); + + // Check status code + assert_eq!(response.status(), http::StatusCode::OK); + + // Check headers + let headers = response.headers(); + assert_eq!( + headers.get(axum::http::header::CONTENT_TYPE).unwrap(), + mime::TEXT_PLAIN_UTF_8.as_ref() + ); + assert_eq!( + headers.get(&GREPTIME_DB_HEADER_FORMAT).unwrap(), + ResponseFormat::Null.as_str() + ); + assert_eq!( + headers.get(&GREPTIME_DB_HEADER_EXECUTION_TIME).unwrap(), + "1234" + ); + + // Check body + let body_bytes = to_bytes(response.into_body(), 1024).await.unwrap(); + let body = String::from_utf8(body_bytes.to_vec()).unwrap(); + assert!(body.contains("42 rows in set.")); + assert!(body.contains("Elapsed: 1.234 sec.")); + } +} diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs index f76d27fd54..30ca45600c 100644 --- a/src/servers/tests/http/http_handler_test.rs +++ b/src/servers/tests/http/http_handler_test.rs @@ -218,7 +218,7 @@ async fn test_sql_form() { ctx.set_current_user(auth::userinfo_by_name(None)); let api_state = ApiState { sql_handler }; - for format in ["greptimedb_v1", "influxdb_v1", "csv", "table"] { + for format in ["greptimedb_v1", "influxdb_v1", "csv", "table", "null"] { let form = create_form(format); let json = http_handler::sql( State(api_state.clone()), @@ -310,6 +310,23 @@ async fn test_sql_form() { ), ); } + HttpResponse::Null(resp) => { + assert_eq!(resp.rows(), 1); + let resp = resp.into_response(); + assert_eq!( + resp.headers().get(header::CONTENT_TYPE), + Some(HeaderValue::from_static(mime::TEXT_PLAIN_UTF_8.as_ref())).as_ref(), + ); + let body = axum::body::to_bytes(resp.into_body(), usize::MAX) + .await + .unwrap(); + let body_str = std::str::from_utf8(&body).unwrap(); + assert!( + body_str.starts_with("1 rows in set.\n"), + "Body did not start with expected prefix: {}", + body_str + ); + } _ => unreachable!(), } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 4d9cfeac91..1b6eb32f77 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -480,6 +480,15 @@ pub async fn test_sql_api(store_type: StorageType) { body, "cpu,ts,host\r\nFloat64,TimestampMillisecond,String\r\n66.6,0,\"host, \"\"name\"\r\n" ); + // test null format + let res = client + .get("/v1/sql?format=null&sql=select cpu,ts,host from demo limit 1") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = &res.text().await; + assert!(body.contains("1 rows in set.")); + assert!(body.ends_with("sec.\n")); // test parse method let res = client.get("/v1/sql/parse?sql=desc table t").send().await;