diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index f01daa1c37..ef36dcf501 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -805,7 +805,6 @@ impl Server for HttpServer { async fn handle_error(err: BoxError) -> Json { error!(err; "Unhandled internal error"); Json(HttpResponse::Error(ErrorResponse::from_error_message( - ResponseFormat::GreptimedbV1, StatusCode::Unexpected, format!("Unhandled internal error: {err}"), ))) diff --git a/src/servers/src/http/arrow_result.rs b/src/servers/src/http/arrow_result.rs index 025bd36cd8..e6d2441ee2 100644 --- a/src/servers/src/http/arrow_result.rs +++ b/src/servers/src/http/arrow_result.rs @@ -60,49 +60,46 @@ async fn write_arrow_bytes( } impl ArrowResponse { - pub async fn from_output(mut outputs: Vec>) -> HttpResponse { - if outputs.len() != 1 { + pub async fn from_output(mut outputs: Vec>) -> HttpResponse { + if outputs.len() > 1 { return HttpResponse::Error(ErrorResponse::from_error_message( - ResponseFormat::Arrow, StatusCode::InvalidArguments, - "Multi-statements and empty query are not allowed".to_string(), + "cannot output multi-statements result in arrow format".to_string(), )); } - match outputs.remove(0) { - Ok(output) => match output.data { - OutputData::AffectedRows(_rows) => HttpResponse::Arrow(ArrowResponse { + match outputs.pop() { + None => HttpResponse::Arrow(ArrowResponse { + data: vec![], + execution_time_ms: 0, + }), + Some(Ok(output)) => match output.data { + OutputData::AffectedRows(_) => HttpResponse::Arrow(ArrowResponse { data: vec![], execution_time_ms: 0, }), - OutputData::RecordBatches(recordbatches) => { - let schema = recordbatches.schema(); - match write_arrow_bytes(recordbatches.as_stream(), schema.arrow_schema()).await - { + OutputData::RecordBatches(batches) => { + let schema = batches.schema(); + match write_arrow_bytes(batches.as_stream(), schema.arrow_schema()).await { Ok(payload) => HttpResponse::Arrow(ArrowResponse { data: payload, execution_time_ms: 0, }), - Err(e) => { - HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::Arrow, e)) - } + Err(e) => HttpResponse::Error(ErrorResponse::from_error(e)), } } - - OutputData::Stream(recordbatches) => { - let schema = recordbatches.schema(); - match write_arrow_bytes(recordbatches, schema.arrow_schema()).await { + OutputData::Stream(batches) => { + let schema = batches.schema(); + match write_arrow_bytes(batches, schema.arrow_schema()).await { Ok(payload) => HttpResponse::Arrow(ArrowResponse { data: payload, execution_time_ms: 0, }), - Err(e) => { - HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::Arrow, e)) - } + Err(e) => HttpResponse::Error(ErrorResponse::from_error(e)), } } }, - Err(e) => HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::Arrow, e)), + Some(Err(e)) => HttpResponse::Error(ErrorResponse::from_error(e)), } } @@ -127,7 +124,7 @@ impl IntoResponse for ArrowResponse { ), ( &GREPTIME_DB_HEADER_FORMAT, - HeaderValue::from_static("ARROW"), + HeaderValue::from_static(ResponseFormat::Arrow.as_str()), ), ( &GREPTIME_DB_HEADER_EXECUTION_TIME, diff --git a/src/servers/src/http/authorize.rs b/src/servers/src/http/authorize.rs index 2cc8901fc7..de99828fb3 100644 --- a/src/servers/src/http/authorize.rs +++ b/src/servers/src/http/authorize.rs @@ -33,7 +33,7 @@ use session::context::QueryContextBuilder; use snafu::{ensure, OptionExt, ResultExt}; use super::header::{GreptimeDbName, GREPTIME_TIMEZONE_HEADER_NAME}; -use super::{ResponseFormat, PUBLIC_APIS}; +use super::PUBLIC_APIS; use crate::error::{ self, InvalidAuthHeaderInvisibleASCIISnafu, InvalidAuthHeaderSnafu, InvalidParameterSnafu, NotFoundInfluxAuthSnafu, Result, UnsupportedAuthSchemeSnafu, UrlDecodeSnafu, @@ -88,7 +88,7 @@ pub async fn inner_auth( crate::metrics::METRIC_AUTH_FAILURE .with_label_values(&[e.status_code().as_ref()]) .inc(); - return Err(err_response(is_influxdb_request(&req), e).into_response()); + return Err(err_response(e)); } }; @@ -112,7 +112,7 @@ pub async fn inner_auth( crate::metrics::METRIC_AUTH_FAILURE .with_label_values(&[e.status_code().as_ref()]) .inc(); - Err(err_response(is_influxdb_request(&req), e).into_response()) + Err(err_response(e)) } } } @@ -128,13 +128,8 @@ pub async fn check_http_auth( } } -fn err_response(is_influxdb: bool, err: impl ErrorExt) -> impl IntoResponse { - let ty = if is_influxdb { - ResponseFormat::InfluxdbV1 - } else { - ResponseFormat::GreptimedbV1 - }; - (StatusCode::UNAUTHORIZED, ErrorResponse::from_error(ty, err)) +fn err_response(err: impl ErrorExt) -> Response { + (StatusCode::UNAUTHORIZED, ErrorResponse::from_error(err)).into_response() } pub fn extract_catalog_and_schema(request: &Request) -> (&str, &str) { diff --git a/src/servers/src/http/csv_result.rs b/src/servers/src/http/csv_result.rs index f0d377b01e..ad89ac21b7 100644 --- a/src/servers/src/http/csv_result.rs +++ b/src/servers/src/http/csv_result.rs @@ -35,14 +35,13 @@ pub struct CsvResponse { impl CsvResponse { pub async fn from_output(outputs: Vec>) -> HttpResponse { - match handler::from_output(ResponseFormat::Csv, outputs).await { + match handler::from_output(outputs).await { Err(err) => HttpResponse::Error(err), Ok((output, _)) => { if output.len() > 1 { HttpResponse::Error(ErrorResponse::from_error_message( - ResponseFormat::Csv, StatusCode::InvalidArguments, - "Multi-statements are not allowed".to_string(), + "cannot output multi-statements result in csv format".to_string(), )) } else { HttpResponse::Csv(CsvResponse { @@ -100,8 +99,10 @@ impl IntoResponse for CsvResponse { payload, ) .into_response(); - resp.headers_mut() - .insert(&GREPTIME_DB_HEADER_FORMAT, HeaderValue::from_static("CSV")); + resp.headers_mut().insert( + &GREPTIME_DB_HEADER_FORMAT, + HeaderValue::from_static(ResponseFormat::Csv.as_str()), + ); resp.headers_mut().insert( &GREPTIME_DB_HEADER_EXECUTION_TIME, HeaderValue::from(execution_time), diff --git a/src/servers/src/http/error_result.rs b/src/servers/src/http/error_result.rs index 5b69ba0a6a..d20de4f460 100644 --- a/src/servers/src/http/error_result.rs +++ b/src/servers/src/http/error_result.rs @@ -22,20 +22,17 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use crate::http::header::constants::GREPTIME_DB_HEADER_ERROR_CODE; -use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; -use crate::http::ResponseFormat; +use crate::http::header::GREPTIME_DB_HEADER_EXECUTION_TIME; #[derive(Serialize, Deserialize, Debug, JsonSchema)] pub struct ErrorResponse { - #[serde(skip)] - ty: ResponseFormat, code: u32, error: String, execution_time_ms: u64, } impl ErrorResponse { - pub fn from_error(ty: ResponseFormat, error: impl ErrorExt) -> Self { + pub fn from_error(error: impl ErrorExt) -> Self { let code = error.status_code(); if code.should_log_error() { @@ -44,12 +41,11 @@ impl ErrorResponse { debug!("Failed to handle HTTP request, err: {:?}", error); } - Self::from_error_message(ty, code, error.output_msg()) + Self::from_error_message(code, error.output_msg()) } - pub fn from_error_message(ty: ResponseFormat, code: StatusCode, msg: String) -> Self { + pub fn from_error_message(code: StatusCode, msg: String) -> Self { ErrorResponse { - ty, code: code as u32, error: msg, execution_time_ms: 0, @@ -76,14 +72,11 @@ impl ErrorResponse { impl IntoResponse for ErrorResponse { fn into_response(self) -> Response { - let ty = self.ty.as_str(); let code = self.code; let execution_time = self.execution_time_ms; let mut resp = Json(self).into_response(); resp.headers_mut() .insert(GREPTIME_DB_HEADER_ERROR_CODE, HeaderValue::from(code)); - resp.headers_mut() - .insert(&GREPTIME_DB_HEADER_FORMAT, HeaderValue::from_static(ty)); resp.headers_mut().insert( &GREPTIME_DB_HEADER_EXECUTION_TIME, HeaderValue::from(execution_time), diff --git a/src/servers/src/http/greptime_result_v1.rs b/src/servers/src/http/greptime_result_v1.rs index 1efeeccda2..ee87a5dca6 100644 --- a/src/servers/src/http/greptime_result_v1.rs +++ b/src/servers/src/http/greptime_result_v1.rs @@ -40,7 +40,7 @@ pub struct GreptimedbV1Response { impl GreptimedbV1Response { pub async fn from_output(outputs: Vec>) -> HttpResponse { - match handler::from_output(ResponseFormat::GreptimedbV1, outputs).await { + match handler::from_output(outputs).await { Ok((output, resp_metrics)) => HttpResponse::GreptimedbV1(Self { output, execution_time_ms: 0, @@ -77,7 +77,7 @@ impl IntoResponse for GreptimedbV1Response { resp.headers_mut().insert( &GREPTIME_DB_HEADER_FORMAT, - HeaderValue::from_static("greptimedb_v1"), + HeaderValue::from_static(ResponseFormat::GreptimedbV1.as_str()), ); resp.headers_mut().insert( &GREPTIME_DB_HEADER_EXECUTION_TIME, diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 614b69b3d9..002bc2dd28 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -110,7 +110,7 @@ pub async fn sql( let outputs = match result { Err((status, msg)) => { return HttpResponse::Error( - ErrorResponse::from_error_message(format, status, msg) + ErrorResponse::from_error_message(status, msg) .with_execution_time(start.elapsed().as_millis() as u64), ); } @@ -130,7 +130,6 @@ pub async fn sql( /// Create a response from query result pub async fn from_output( - ty: ResponseFormat, outputs: Vec>, ) -> Result<(Vec, HashMap), ErrorResponse> { // TODO(sunng87): this api response structure cannot represent error well. @@ -154,11 +153,11 @@ pub async fn from_output( Ok(rows) => match HttpRecordsOutput::try_new(schema, rows) { Ok(rows) => rows, Err(err) => { - return Err(ErrorResponse::from_error(ty, err)); + return Err(ErrorResponse::from_error(err)); } }, Err(err) => { - return Err(ErrorResponse::from_error(ty, err)); + return Err(ErrorResponse::from_error(err)); } }; if let Some(physical_plan) = o.meta.plan { @@ -180,14 +179,14 @@ pub async fn from_output( results.push(GreptimeQueryOutput::Records(rows)); } Err(err) => { - return Err(ErrorResponse::from_error(ty, err)); + return Err(ErrorResponse::from_error(err)); } } } }, Err(err) => { - return Err(ErrorResponse::from_error(ty, err)); + return Err(ErrorResponse::from_error(err)); } } } @@ -239,7 +238,7 @@ pub async fn promql( let resp = if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await { - let resp = ErrorResponse::from_error_message(ResponseFormat::GreptimedbV1, status, msg); + let resp = ErrorResponse::from_error_message(status, msg); HttpResponse::Error(resp) } else { let prom_query = params.into(); diff --git a/src/servers/src/http/influxdb_result_v1.rs b/src/servers/src/http/influxdb_result_v1.rs index 1cc2ade276..3fa4594728 100644 --- a/src/servers/src/http/influxdb_result_v1.rs +++ b/src/servers/src/http/influxdb_result_v1.rs @@ -15,7 +15,6 @@ use axum::http::HeaderValue; use axum::response::{IntoResponse, Response}; use axum::Json; -use common_error::ext::ErrorExt; use common_query::{Output, OutputData}; use common_recordbatch::{util, RecordBatch}; use schemars::JsonSchema; @@ -143,10 +142,6 @@ impl InfluxdbV1Response { outputs: Vec>, epoch: Option, ) -> HttpResponse { - fn make_error_response(error: impl ErrorExt) -> HttpResponse { - HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::InfluxdbV1, error)) - } - // TODO(sunng87): this api response structure cannot represent error well. // It hides successful execution results from error response let mut results = Vec::with_capacity(outputs.len()); @@ -172,11 +167,11 @@ impl InfluxdbV1Response { }); } Err(err) => { - return make_error_response(err); + return HttpResponse::Error(ErrorResponse::from_error(err)); } }, Err(err) => { - return make_error_response(err); + return HttpResponse::Error(ErrorResponse::from_error(err)); } } } @@ -189,14 +184,14 @@ impl InfluxdbV1Response { }); } Err(err) => { - return make_error_response(err); + return HttpResponse::Error(ErrorResponse::from_error(err)); } } } } } Err(err) => { - return make_error_response(err); + return HttpResponse::Error(ErrorResponse::from_error(err)); } } } @@ -222,7 +217,7 @@ impl IntoResponse for InfluxdbV1Response { let mut resp = Json(self).into_response(); resp.headers_mut().insert( &GREPTIME_DB_HEADER_FORMAT, - HeaderValue::from_static("influxdb_v1"), + HeaderValue::from_static(ResponseFormat::InfluxdbV1.as_str()), ); resp.headers_mut().insert( &GREPTIME_DB_HEADER_EXECUTION_TIME, diff --git a/src/servers/src/http/script.rs b/src/servers/src/http/script.rs index 445bc380cf..312d2766a7 100644 --- a/src/servers/src/http/script.rs +++ b/src/servers/src/http/script.rs @@ -26,19 +26,15 @@ use snafu::ResultExt; use crate::error::{HyperSnafu, InvalidUtf8ValueSnafu}; use crate::http::error_result::ErrorResponse; -use crate::http::{ApiState, GreptimedbV1Response, HttpResponse, ResponseFormat}; +use crate::http::{ApiState, GreptimedbV1Response, HttpResponse}; macro_rules! json_err { ($e: expr) => {{ - return HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::GreptimedbV1, $e)); + return HttpResponse::Error(ErrorResponse::from_error($e)); }}; ($msg: expr, $code: expr) => {{ - return HttpResponse::Error(ErrorResponse::from_error_message( - ResponseFormat::GreptimedbV1, - $code, - $msg.to_string(), - )); + return HttpResponse::Error(ErrorResponse::from_error_message($code, $msg.to_string())); }}; } diff --git a/src/servers/src/http/table_result.rs b/src/servers/src/http/table_result.rs index 005eec479a..e601213c08 100644 --- a/src/servers/src/http/table_result.rs +++ b/src/servers/src/http/table_result.rs @@ -36,14 +36,13 @@ pub struct TableResponse { impl TableResponse { pub async fn from_output(outputs: Vec>) -> HttpResponse { - match handler::from_output(ResponseFormat::Csv, outputs).await { + match handler::from_output(outputs).await { Err(err) => HttpResponse::Error(err), Ok((output, _)) => { if output.len() > 1 { HttpResponse::Error(ErrorResponse::from_error_message( - ResponseFormat::Table, StatusCode::InvalidArguments, - "Multi-statements are not allowed".to_string(), + "cannot output multi-statements result in table format".to_string(), )) } else { HttpResponse::Table(TableResponse { @@ -143,7 +142,7 @@ impl IntoResponse for TableResponse { .into_response(); resp.headers_mut().insert( &GREPTIME_DB_HEADER_FORMAT, - HeaderValue::from_static("TABLE"), + HeaderValue::from_static(ResponseFormat::Table.as_str()), ); resp.headers_mut().insert( &GREPTIME_DB_HEADER_EXECUTION_TIME, diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index fb000ec02e..8c0a6031a5 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -27,7 +27,6 @@ use query::plan::LogicalPlan; use query::query_engine::DescribeResult; use servers::error::{Error, Result}; use servers::http::header::constants::GREPTIME_DB_HEADER_NAME; -use servers::http::header::GREPTIME_DB_HEADER_FORMAT; use servers::http::{HttpOptions, HttpServerBuilder}; use servers::influxdb::InfluxdbRequest; use servers::query_handler::grpc::GrpcQueryHandler; @@ -182,10 +181,6 @@ async fn test_influxdb_write() { .send() .await; assert_eq!(result.status(), 401); - assert_eq!( - result.headers().get(&GREPTIME_DB_HEADER_FORMAT).unwrap(), - "influxdb_v1", - ); assert_eq!( "{\"code\":7002,\"error\":\"Username and password does not match, username: greptime\",\"execution_time_ms\":0}", result.text().await @@ -198,10 +193,6 @@ async fn test_influxdb_write() { .send() .await; assert_eq!(result.status(), 401); - assert_eq!( - result.headers().get(&GREPTIME_DB_HEADER_FORMAT).unwrap(), - "influxdb_v1", - ); assert_eq!( "{\"code\":7003,\"error\":\"Not found influx http authorization info\",\"execution_time_ms\":0}", result.text().await