From 316d8434829929b7e47fbe525b8b2c4e38360f3b Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 8 Jan 2024 18:54:27 +0800 Subject: [PATCH] feat: support CSV format in sql HTTP API (#3062) * chore: fix typo Signed-off-by: tison * add csv format Signed-off-by: tison * flatten response Signed-off-by: tison * more flatten response Signed-off-by: tison * add CSV format Signed-off-by: tison * format InfluxdbV1Response Signed-off-by: tison * format ErrorResponse Signed-off-by: tison * propagate ErrorResponse to InfluxdbV1Response Signed-off-by: tison * format GreptimedbV1Response Signed-off-by: tison * format CsvResponse Signed-off-by: tison * impl IntoResponse for QueryResponse Signed-off-by: tison * promql Signed-off-by: tison * sql Signed-off-by: tison * compile Signed-off-by: tison * fixup aide Signed-off-by: tison * clear debt Signed-off-by: tison * fixup UT test_recordbatches_conversion Signed-off-by: tison * fixup IT cases Signed-off-by: tison * fixup more IT cases Signed-off-by: tison * fixup test-integration cases Signed-off-by: tison * update comment Signed-off-by: tison * fixup deserialize and most query < 1ms Signed-off-by: tison * fixup auth tests Signed-off-by: tison * fixup tests Signed-off-by: tison * fixup and align X-GreptimeDB headers Signed-off-by: tison * fixup compile Signed-off-by: tison --------- Signed-off-by: tison --- src/client/src/error.rs | 8 +- src/common/error/src/lib.rs | 4 +- src/meta-client/src/error.rs | 8 +- src/servers/src/error.rs | 9 +- src/servers/src/http.rs | 329 +++++++------------- src/servers/src/http/authorize.rs | 10 +- src/servers/src/http/csv_result.rs | 111 +++++++ src/servers/src/http/error_result.rs | 98 ++++++ src/servers/src/http/greptime_result_v1.rs | 71 +++++ src/servers/src/http/handler.rs | 143 ++++++--- src/servers/src/http/header.rs | 7 +- src/servers/src/http/influxdb_result_v1.rs | 102 +++--- src/servers/src/http/script.rs | 31 +- src/servers/tests/http/authorize.rs | 9 +- src/servers/tests/http/http_handler_test.rs | 160 +++++----- src/servers/tests/http/influxdb_test.rs | 13 +- tests-integration/tests/http.rs | 136 ++------ 17 files changed, 709 insertions(+), 540 deletions(-) create mode 100644 src/servers/src/http/csv_result.rs create mode 100644 src/servers/src/http/error_result.rs create mode 100644 src/servers/src/http/greptime_result_v1.rs diff --git a/src/client/src/error.rs b/src/client/src/error.rs index ae573d037d..cac8ebb0b5 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -16,7 +16,7 @@ use std::any::Any; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; -use common_error::{GREPTIME_ERROR_CODE, GREPTIME_ERROR_MSG}; +use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG}; use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; use tonic::{Code, Status}; @@ -115,7 +115,7 @@ impl From for Error { .and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok()) } - let code = get_metadata_value(&e, GREPTIME_ERROR_CODE) + let code = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_CODE) .and_then(|s| { if let Ok(code) = s.parse::() { StatusCode::from_u32(code) @@ -125,8 +125,8 @@ impl From for Error { }) .unwrap_or(StatusCode::Unknown); - let msg = - get_metadata_value(&e, GREPTIME_ERROR_MSG).unwrap_or_else(|| e.message().to_string()); + let msg = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_MSG) + .unwrap_or_else(|| e.message().to_string()); Self::Server { code, msg } } diff --git a/src/common/error/src/lib.rs b/src/common/error/src/lib.rs index 9fd659fc65..aa3c915e84 100644 --- a/src/common/error/src/lib.rs +++ b/src/common/error/src/lib.rs @@ -19,7 +19,7 @@ pub mod format; pub mod mock; pub mod status_code; -pub const GREPTIME_ERROR_CODE: &str = "x-greptime-err-code"; -pub const GREPTIME_ERROR_MSG: &str = "x-greptime-err-msg"; +pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code"; +pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = "x-greptime-err-msg"; pub use snafu; diff --git a/src/meta-client/src/error.rs b/src/meta-client/src/error.rs index a86ec82280..fcb6e82ccf 100644 --- a/src/meta-client/src/error.rs +++ b/src/meta-client/src/error.rs @@ -14,7 +14,7 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use common_error::{GREPTIME_ERROR_CODE, GREPTIME_ERROR_MSG}; +use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG}; use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; use tonic::Status; @@ -117,7 +117,7 @@ impl From for Error { .and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok()) } - let code = get_metadata_value(&e, GREPTIME_ERROR_CODE) + let code = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_CODE) .and_then(|s| { if let Ok(code) = s.parse::() { StatusCode::from_u32(code) @@ -127,8 +127,8 @@ impl From for Error { }) .unwrap_or(StatusCode::Internal); - let msg = - get_metadata_value(&e, GREPTIME_ERROR_MSG).unwrap_or_else(|| e.message().to_string()); + let msg = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_MSG) + .unwrap_or_else(|| e.message().to_string()); Self::MetaServer { code, msg } } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 70b4401c9a..5be38cee15 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -569,7 +569,7 @@ macro_rules! define_into_tonic_status { ($Error: ty) => { impl From<$Error> for tonic::Status { fn from(err: $Error) -> Self { - use common_error::{GREPTIME_ERROR_CODE, GREPTIME_ERROR_MSG}; + use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG}; use tonic::codegen::http::{HeaderMap, HeaderValue}; use tonic::metadata::MetadataMap; @@ -578,11 +578,14 @@ macro_rules! define_into_tonic_status { // If either of the status_code or error msg cannot convert to valid HTTP header value // (which is a very rare case), just ignore. Client will use Tonic status code and message. let status_code = err.status_code(); - headers.insert(GREPTIME_ERROR_CODE, HeaderValue::from(status_code as u32)); + headers.insert( + GREPTIME_DB_HEADER_ERROR_CODE, + HeaderValue::from(status_code as u32), + ); let root_error = err.output_msg(); if let Ok(err_msg) = HeaderValue::from_bytes(root_error.as_bytes()) { - let _ = headers.insert(GREPTIME_ERROR_MSG, err_msg); + let _ = headers.insert(GREPTIME_DB_HEADER_ERROR_MSG, err_msg); } let metadata = MetadataMap::from_headers(headers); diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index a390f406d7..1507834d81 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -12,43 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod authorize; -pub mod handler; -pub mod header; -pub mod influxdb; -pub mod mem_prof; -pub mod opentsdb; -pub mod otlp; -pub mod pprof; -pub mod prom_store; -pub mod prometheus; -pub mod script; - -#[cfg(feature = "dashboard")] -mod dashboard; -pub mod influxdb_result_v1; - use std::fmt::Display; use std::net::SocketAddr; use std::time::{Duration, Instant}; use aide::axum::{routing as apirouting, ApiRouter, IntoApiResponse}; use aide::openapi::{Info, OpenApi, Server as OpenAPIServer}; +use aide::OperationOutput; use async_trait::async_trait; use auth::UserProviderRef; use axum::error_handling::HandleErrorLayer; use axum::extract::{DefaultBodyLimit, MatchedPath}; use axum::http::Request; use axum::middleware::{self, Next}; -use axum::response::{Html, IntoResponse, Json}; +use axum::response::{Html, IntoResponse, Json, Response}; use axum::{routing, BoxError, Extension, Router}; use common_base::readable_size::ReadableSize; use common_base::Plugins; -use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use common_query::Output; -use common_recordbatch::{util, RecordBatch}; -use common_telemetry::logging::{debug, error, info}; +use common_recordbatch::RecordBatch; +use common_telemetry::logging::{error, info}; use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datatypes::data_type::DataType; @@ -66,6 +49,9 @@ use tower_http::trace::TraceLayer; use self::authorize::AuthState; use crate::configurator::ConfiguratorRef; use crate::error::{AlreadyStartedSnafu, Error, Result, StartHttpSnafu, ToJsonSnafu}; +use crate::http::csv_result::CsvResponse; +use crate::http::error_result::ErrorResponse; +use crate::http::greptime_result_v1::GreptimedbV1Response; use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2}; use crate::http::influxdb_result_v1::InfluxdbV1Response; use crate::http::prometheus::{ @@ -84,6 +70,25 @@ use crate::query_handler::{ }; use crate::server::Server; +pub mod authorize; +pub mod handler; +pub mod header; +pub mod influxdb; +pub mod mem_prof; +pub mod opentsdb; +pub mod otlp; +pub mod pprof; +pub mod prom_store; +pub mod prometheus; +pub mod script; + +pub mod csv_result; +#[cfg(feature = "dashboard")] +mod dashboard; +pub mod error_result; +pub mod greptime_result_v1; +pub mod influxdb_result_v1; + pub const HTTP_API_VERSION: &str = "v1"; pub const HTTP_API_PREFIX: &str = "/v1/"; /// Default http body limit (64M). @@ -239,129 +244,16 @@ impl TryFrom> for HttpRecordsOutput { #[derive(Serialize, Deserialize, Debug, JsonSchema, Eq, PartialEq)] #[serde(rename_all = "lowercase")] -pub enum JsonOutput { +pub enum GreptimeQueryOutput { AffectedRows(usize), Records(HttpRecordsOutput), } -#[derive(Serialize, Deserialize, Debug, JsonSchema)] -pub struct GreptimedbV1Response { - code: u32, - #[serde(skip_serializing_if = "Option::is_none")] - error: Option, - #[serde(skip_serializing_if = "Vec::is_empty", default)] - output: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - execution_time_ms: Option, -} - -impl GreptimedbV1Response { - pub fn with_error(error: impl ErrorExt) -> Self { - let code = error.status_code(); - if code.should_log_error() { - error!(error; "Failed to handle HTTP request"); - } else { - debug!("Failed to handle HTTP request, err: {:?}", error); - } - - GreptimedbV1Response { - error: Some(error.output_msg()), - code: code as u32, - output: vec![], - execution_time_ms: None, - } - } - - fn with_error_message(err_msg: String, error_code: StatusCode) -> Self { - GreptimedbV1Response { - error: Some(err_msg), - code: error_code as u32, - output: vec![], - execution_time_ms: None, - } - } - - fn with_output(output: Vec) -> Self { - GreptimedbV1Response { - error: None, - code: StatusCode::Success as u32, - output, - execution_time_ms: None, - } - } - - fn with_execution_time(&mut self, execution_time: u64) { - self.execution_time_ms = Some(execution_time); - } - - /// Create a json response from query result - pub async fn from_output(outputs: Vec>) -> Self { - // TODO(sunng87): this api response structure cannot represent error - // well. It hides successful execution results from error response - let mut results = Vec::with_capacity(outputs.len()); - for out in outputs { - match out { - Ok(Output::AffectedRows(rows)) => { - results.push(JsonOutput::AffectedRows(rows)); - } - Ok(Output::Stream(stream)) => { - // TODO(sunng87): streaming response - match util::collect(stream).await { - Ok(rows) => match HttpRecordsOutput::try_from(rows) { - Ok(rows) => { - results.push(JsonOutput::Records(rows)); - } - Err(err) => { - return Self::with_error(err); - } - }, - - Err(e) => { - return Self::with_error(e); - } - } - } - Ok(Output::RecordBatches(rbs)) => match HttpRecordsOutput::try_from(rbs.take()) { - Ok(rows) => { - results.push(JsonOutput::Records(rows)); - } - Err(err) => { - return Self::with_error(err); - } - }, - Err(e) => { - return Self::with_error(e); - } - } - } - Self::with_output(results) - } - - pub fn code(&self) -> u32 { - self.code - } - - pub fn success(&self) -> bool { - self.code == (StatusCode::Success as u32) - } - - pub fn error(&self) -> Option<&String> { - self.error.as_ref() - } - - pub fn output(&self) -> &[JsonOutput] { - &self.output - } - - pub fn execution_time_ms(&self) -> Option { - self.execution_time_ms - } -} - /// It allows the results of SQL queries to be presented in different formats. -/// Currently, `greptimedb_v1` and `influxdb_v1` are supported. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] pub enum ResponseFormat { + Csv, + #[default] GreptimedbV1, InfluxdbV1, } @@ -369,11 +261,20 @@ pub enum ResponseFormat { impl ResponseFormat { pub fn parse(s: &str) -> Option { match s { + "csv" => Some(ResponseFormat::Csv), "greptimedb_v1" => Some(ResponseFormat::GreptimedbV1), "influxdb_v1" => Some(ResponseFormat::InfluxdbV1), _ => None, } } + + pub fn as_str(&self) -> &'static str { + match self { + ResponseFormat::Csv => "csv", + ResponseFormat::GreptimedbV1 => "greptimedb_v1", + ResponseFormat::InfluxdbV1 => "influxdb_v1", + } + } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -421,67 +322,60 @@ impl Display for Epoch { } #[derive(Serialize, Deserialize, Debug, JsonSchema)] -#[serde(tag = "type")] -pub enum JsonResponse { +pub enum HttpResponse { + Csv(CsvResponse), + Error(ErrorResponse), GreptimedbV1(GreptimedbV1Response), InfluxdbV1(InfluxdbV1Response), } -impl From for JsonResponse { +impl HttpResponse { + pub fn with_execution_time(self, execution_time: u64) -> Self { + match self { + HttpResponse::Csv(resp) => resp.with_execution_time(execution_time).into(), + HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(), + HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(), + HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(), + } + } +} + +impl IntoResponse for HttpResponse { + fn into_response(self) -> Response { + match self { + HttpResponse::Csv(resp) => resp.into_response(), + HttpResponse::GreptimedbV1(resp) => resp.into_response(), + HttpResponse::InfluxdbV1(resp) => resp.into_response(), + HttpResponse::Error(resp) => resp.into_response(), + } + } +} + +impl OperationOutput for HttpResponse { + type Inner = Response; +} + +impl From for HttpResponse { + fn from(value: CsvResponse) -> Self { + HttpResponse::Csv(value) + } +} + +impl From for HttpResponse { + fn from(value: ErrorResponse) -> Self { + HttpResponse::Error(value) + } +} + +impl From for HttpResponse { fn from(value: GreptimedbV1Response) -> Self { - JsonResponse::GreptimedbV1(value) + HttpResponse::GreptimedbV1(value) } } -impl From for JsonResponse { +impl From for HttpResponse { fn from(value: InfluxdbV1Response) -> Self { - JsonResponse::InfluxdbV1(value) - } -} - -impl JsonResponse { - pub fn with_error(error: impl ErrorExt, response_format: ResponseFormat) -> Self { - match response_format { - ResponseFormat::GreptimedbV1 => GreptimedbV1Response::with_error(error).into(), - ResponseFormat::InfluxdbV1 => InfluxdbV1Response::with_error(error).into(), - } - } - - pub fn with_error_message( - err_msg: String, - error_code: StatusCode, - response_format: ResponseFormat, - ) -> Self { - match response_format { - ResponseFormat::GreptimedbV1 => { - GreptimedbV1Response::with_error_message(err_msg, error_code).into() - } - ResponseFormat::InfluxdbV1 => InfluxdbV1Response::with_error_message(err_msg).into(), - } - } - pub async fn from_output( - outputs: Vec>, - response_format: ResponseFormat, - epoch: Option, - ) -> Self { - match response_format { - ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await.into(), - ResponseFormat::InfluxdbV1 => { - InfluxdbV1Response::from_output(outputs, epoch).await.into() - } - } - } - - fn with_execution_time(mut self, execution_time: u128) -> Self { - match &mut self { - JsonResponse::GreptimedbV1(resp) => { - resp.with_execution_time(execution_time as u64); - } - JsonResponse::InfluxdbV1(resp) => { - resp.with_execution_time(execution_time as u64); - } - } - self + HttpResponse::InfluxdbV1(value) } } @@ -900,14 +794,13 @@ impl Server for HttpServer { } /// handle error middleware -async fn handle_error(err: BoxError) -> Json { +async fn handle_error(err: BoxError) -> Json { error!(err; "Unhandled internal error"); - - Json(JsonResponse::with_error_message( - format!("Unhandled internal error: {err}"), - StatusCode::Unexpected, + Json(HttpResponse::Error(ErrorResponse::from_error_message( ResponseFormat::GreptimedbV1, - )) + StatusCode::Unexpected, + format!("Unhandled internal error: {err}"), + ))) } #[cfg(test)] @@ -920,6 +813,7 @@ mod test { use axum::http::StatusCode; use axum::routing::get; use axum_test_helper::TestClient; + use common_query::Output; use common_recordbatch::RecordBatches; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema}; @@ -1051,20 +945,24 @@ mod test { ]; let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap(); - for format in [ResponseFormat::GreptimedbV1, ResponseFormat::InfluxdbV1] { + for format in [ + ResponseFormat::GreptimedbV1, + ResponseFormat::InfluxdbV1, + ResponseFormat::Csv, + ] { let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]).unwrap(); - let json_resp = JsonResponse::from_output( - vec![Ok(Output::RecordBatches(recordbatches))], - format, - None, - ) - .await; + let outputs = vec![Ok(Output::RecordBatches(recordbatches))]; + let json_resp = match format { + ResponseFormat::Csv => CsvResponse::from_output(outputs).await, + ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, + ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await, + }; match json_resp { - JsonResponse::GreptimedbV1(json_resp) => { - let json_output = &json_resp.output[0]; - if let JsonOutput::Records(r) = json_output { + HttpResponse::GreptimedbV1(resp) => { + let json_output = &resp.output[0]; + if let GreptimeQueryOutput::Records(r) = json_output { assert_eq!(r.num_rows(), 4); assert_eq!(r.num_cols(), 2); let schema = r.schema.as_ref().unwrap(); @@ -1076,8 +974,8 @@ mod test { panic!("invalid output type"); } } - JsonResponse::InfluxdbV1(json_resp) => { - let json_output = &json_resp.results()[0]; + HttpResponse::InfluxdbV1(resp) => { + let json_output = &resp.results()[0]; assert_eq!(json_output.num_rows(), 4); assert_eq!(json_output.num_cols(), 2); assert_eq!(json_output.series[0].columns.clone()[0], "numbers"); @@ -1087,6 +985,21 @@ mod test { ); assert_eq!(json_output.series[0].values[0][1], serde_json::Value::Null); } + HttpResponse::Csv(resp) => { + let output = &resp.output()[0]; + if let GreptimeQueryOutput::Records(r) = output { + assert_eq!(r.num_rows(), 4); + assert_eq!(r.num_cols(), 2); + let schema = r.schema.as_ref().unwrap(); + assert_eq!(schema.column_schemas[0].name, "numbers"); + assert_eq!(schema.column_schemas[0].data_type, "UInt32"); + assert_eq!(r.rows[0][0], serde_json::Value::from(1)); + assert_eq!(r.rows[0][1], serde_json::Value::Null); + } else { + panic!("invalid output type"); + } + } + HttpResponse::Error(err) => unreachable!("{err:?}"), } } } diff --git a/src/servers/src/http/authorize.rs b/src/servers/src/http/authorize.rs index 51040ba489..0d5908ea6b 100644 --- a/src/servers/src/http/authorize.rs +++ b/src/servers/src/http/authorize.rs @@ -17,7 +17,6 @@ use axum::extract::State; use axum::http::{self, Request, StatusCode}; use axum::middleware::Next; use axum::response::{IntoResponse, Response}; -use axum::Json; use base64::prelude::BASE64_STANDARD; use base64::Engine; use common_catalog::consts::DEFAULT_SCHEMA_NAME; @@ -30,11 +29,12 @@ use session::context::QueryContext; use snafu::{ensure, OptionExt, ResultExt}; use super::header::GreptimeDbName; -use super::{JsonResponse, ResponseFormat, PUBLIC_APIS}; +use super::{ResponseFormat, PUBLIC_APIS}; use crate::error::{ self, InvalidAuthorizationHeaderSnafu, InvalidParameterSnafu, InvisibleASCIISnafu, NotFoundInfluxAuthSnafu, Result, UnsupportedAuthSchemeSnafu, UrlDecodeSnafu, }; +use crate::http::error_result::ErrorResponse; use crate::http::HTTP_API_PREFIX; /// AuthState is a holder state for [`UserProviderRef`] @@ -118,14 +118,12 @@ pub async fn check_http_auth( } fn err_response(is_influxdb: bool, err: impl ErrorExt) -> impl IntoResponse { - let format = if is_influxdb { + let ty = if is_influxdb { ResponseFormat::InfluxdbV1 } else { ResponseFormat::GreptimedbV1 }; - - let body = JsonResponse::with_error(err, format); - (StatusCode::UNAUTHORIZED, Json(body)) + (StatusCode::UNAUTHORIZED, ErrorResponse::from_error(ty, err)) } fn extract_catalog_and_schema(request: &Request) -> (&str, &str) { diff --git a/src/servers/src/http/csv_result.rs b/src/servers/src/http/csv_result.rs new file mode 100644 index 0000000000..7c26d055da --- /dev/null +++ b/src/servers/src/http/csv_result.rs @@ -0,0 +1,111 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Write; + +use axum::http::{header, HeaderValue}; +use axum::response::{IntoResponse, Response}; +use common_error::status_code::StatusCode; +use common_query::Output; +use itertools::Itertools; +use mime_guess::mime; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::http::error_result::ErrorResponse; +use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; +use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; + +#[derive(Serialize, Deserialize, Debug, JsonSchema)] +pub struct CsvResponse { + output: Vec, + execution_time_ms: u64, +} + +impl CsvResponse { + pub async fn from_output(outputs: Vec>) -> HttpResponse { + match handler::from_output(ResponseFormat::Csv, outputs).await { + Err(err) => HttpResponse::Error(err), + Ok(output) => { + if output.len() > 1 { + HttpResponse::Error(ErrorResponse::from_error_message( + ResponseFormat::Csv, + StatusCode::InvalidArguments, + "Multi-statements are not allowed".to_string(), + )) + } else { + HttpResponse::Csv(CsvResponse { + output, + execution_time_ms: 0, + }) + } + } + } + } + + pub fn output(&self) -> &[GreptimeQueryOutput] { + &self.output + } + + pub fn with_execution_time(mut self, execution_time: u64) -> Self { + self.execution_time_ms = execution_time; + self + } + + pub fn execution_time_ms(&self) -> u64 { + self.execution_time_ms + } +} + +impl IntoResponse for CsvResponse { + fn into_response(mut self) -> Response { + debug_assert!( + self.output.len() <= 1, + "self.output has extra elements: {}", + self.output.len() + ); + + let execution_time = self.execution_time_ms; + let payload = match self.output.pop() { + None => "".to_string(), + Some(GreptimeQueryOutput::AffectedRows(n)) => { + format!("{n}\n") + } + Some(GreptimeQueryOutput::Records(records)) => { + let mut result = String::new(); + for row in records.rows { + let row = row.iter().map(|v| v.to_string()).join(","); + writeln!(result, "{row}").unwrap(); + } + result + } + }; + + let mut resp = ( + [( + header::CONTENT_TYPE, + HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref()), + )], + payload, + ) + .into_response(); + resp.headers_mut() + .insert(GREPTIME_DB_HEADER_FORMAT, HeaderValue::from_static("CSV")); + resp.headers_mut().insert( + GREPTIME_DB_HEADER_EXECUTION_TIME, + HeaderValue::from(execution_time), + ); + resp + } +} diff --git a/src/servers/src/http/error_result.rs b/src/servers/src/http/error_result.rs new file mode 100644 index 0000000000..629594e664 --- /dev/null +++ b/src/servers/src/http/error_result.rs @@ -0,0 +1,98 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use axum::http::HeaderValue; +use axum::response::{IntoResponse, Response}; +use axum::Json; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG}; +use common_telemetry::logging::{debug, error}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; +use crate::http::ResponseFormat; + +#[derive(Serialize, Deserialize, Debug, JsonSchema)] +pub struct ErrorResponse { + #[serde(skip)] + ty: ResponseFormat, + code: u32, + error: String, + execution_time_ms: u64, +} + +impl ErrorResponse { + pub fn from_error(ty: ResponseFormat, error: impl ErrorExt) -> Self { + let code = error.status_code(); + + if code.should_log_error() { + error!(error; "Failed to handle HTTP request"); + } else { + debug!("Failed to handle HTTP request, err: {:?}", error); + } + + Self::from_error_message(ty, code, error.output_msg()) + } + + pub fn from_error_message(ty: ResponseFormat, code: StatusCode, msg: String) -> Self { + ErrorResponse { + ty, + code: code as u32, + error: msg, + execution_time_ms: 0, + } + } + + pub fn with_execution_time(mut self, execution_time: u64) -> Self { + self.execution_time_ms = execution_time; + self + } + + pub fn execution_time_ms(&self) -> u64 { + self.execution_time_ms + } + + pub fn code(&self) -> u32 { + self.code + } + + pub fn error(&self) -> &str { + &self.error + } +} + +impl IntoResponse for ErrorResponse { + fn into_response(self) -> Response { + let ty = self.ty.as_str(); + let code = self.code; + let msg = self.error.clone(); + let execution_time = self.execution_time_ms; + let mut resp = Json(self).into_response(); + resp.headers_mut() + .insert(GREPTIME_DB_HEADER_ERROR_CODE, HeaderValue::from(code)); + resp.headers_mut().insert( + GREPTIME_DB_HEADER_ERROR_MSG, + HeaderValue::from_str(&msg).expect("malformed error msg"), + ); + resp.headers_mut() + .insert(GREPTIME_DB_HEADER_FORMAT, HeaderValue::from_static(ty)); + resp.headers_mut().insert( + GREPTIME_DB_HEADER_EXECUTION_TIME, + HeaderValue::from(execution_time), + ); + resp + } +} diff --git a/src/servers/src/http/greptime_result_v1.rs b/src/servers/src/http/greptime_result_v1.rs new file mode 100644 index 0000000000..53e16948b7 --- /dev/null +++ b/src/servers/src/http/greptime_result_v1.rs @@ -0,0 +1,71 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use axum::response::{IntoResponse, Response}; +use axum::Json; +use common_query::Output; +use reqwest::header::HeaderValue; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; +use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; + +#[derive(Serialize, Deserialize, Debug, JsonSchema)] +pub struct GreptimedbV1Response { + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) output: Vec, + pub(crate) execution_time_ms: u64, +} + +impl GreptimedbV1Response { + pub async fn from_output(outputs: Vec>) -> HttpResponse { + match handler::from_output(ResponseFormat::GreptimedbV1, outputs).await { + Ok(output) => HttpResponse::GreptimedbV1(Self { + output, + execution_time_ms: 0, + }), + Err(err) => HttpResponse::Error(err), + } + } + + pub fn output(&self) -> &[GreptimeQueryOutput] { + &self.output + } + + pub fn with_execution_time(mut self, execution_time: u64) -> Self { + self.execution_time_ms = execution_time; + self + } + + pub fn execution_time_ms(&self) -> u64 { + self.execution_time_ms + } +} + +impl IntoResponse for GreptimedbV1Response { + fn into_response(self) -> Response { + let execution_time = self.execution_time_ms; + let mut resp = Json(self).into_response(); + resp.headers_mut().insert( + GREPTIME_DB_HEADER_FORMAT, + HeaderValue::from_static("greptimedb_v1"), + ); + resp.headers_mut().insert( + GREPTIME_DB_HEADER_EXECUTION_TIME, + HeaderValue::from(execution_time), + ); + resp + } +} diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index f615e520cb..3bcea4595d 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -22,12 +22,21 @@ use axum::response::{IntoResponse, Response}; use axum::{Extension, Form}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; +use common_query::Output; +use common_recordbatch::util; use query::parser::PromQuery; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use session::context::QueryContextRef; -use crate::http::{ApiState, Epoch, GreptimeOptionsConfigState, JsonResponse, ResponseFormat}; +use crate::http::csv_result::CsvResponse; +use crate::http::error_result::ErrorResponse; +use crate::http::greptime_result_v1::GreptimedbV1Response; +use crate::http::influxdb_result_v1::InfluxdbV1Response; +use crate::http::{ + ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput, + HttpResponse, ResponseFormat, +}; use crate::metrics_handler::MetricsHandler; use crate::query_handler::sql::ServerSqlQueryHandlerRef; @@ -35,7 +44,7 @@ use crate::query_handler::sql::ServerSqlQueryHandlerRef; pub struct SqlQuery { pub db: Option, pub sql: Option, - // (Optional) result format: [`gerptimedb_v1`, `influxdb_v1`], + // (Optional) result format: [`greptimedb_v1`, `influxdb_v1`, `csv`], // the default value is `greptimedb_v1` pub format: Option, // Returns epoch timestamps with the specified precision. @@ -56,7 +65,7 @@ pub async fn sql( Query(query_params): Query, Extension(query_ctx): Extension, Form(form_params): Form, -) -> Json { +) -> HttpResponse { let sql_handler = &state.sql_handler; let start = Instant::now(); @@ -78,21 +87,82 @@ pub async fn sql( .with_label_values(&[db.as_str()]) .start_timer(); - let resp = if let Some(sql) = &sql { - if let Some(resp) = validate_schema(sql_handler.clone(), query_ctx.clone(), format).await { - return Json(resp); + let result = if let Some(sql) = &sql { + if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await { + Err((status, msg)) + } else { + Ok(sql_handler.do_query(sql, query_ctx).await) } - - JsonResponse::from_output(sql_handler.do_query(sql, query_ctx).await, format, epoch).await } else { - JsonResponse::with_error_message( - "sql parameter is required.".to_string(), + Err(( StatusCode::InvalidArguments, - format, - ) + "sql parameter is required.".to_string(), + )) }; - Json(resp.with_execution_time(start.elapsed().as_millis())) + let outputs = match result { + Err((status, msg)) => { + return HttpResponse::Error( + ErrorResponse::from_error_message(format, status, msg) + .with_execution_time(start.elapsed().as_millis() as u64), + ); + } + Ok(outputs) => outputs, + }; + + let resp = match format { + ResponseFormat::Csv => CsvResponse::from_output(outputs).await, + ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, + ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await, + }; + + resp.with_execution_time(start.elapsed().as_millis() as u64) +} + +/// Create a response from query result +pub async fn from_output( + ty: ResponseFormat, + outputs: Vec>, +) -> Result, ErrorResponse> { + // TODO(sunng87): this api response structure cannot represent error well. + // It hides successful execution results from error response + let mut results = Vec::with_capacity(outputs.len()); + for out in outputs { + match out { + Ok(Output::AffectedRows(rows)) => { + results.push(GreptimeQueryOutput::AffectedRows(rows)); + } + Ok(Output::Stream(stream)) => { + // TODO(sunng87): streaming response + match util::collect(stream).await { + Ok(rows) => match HttpRecordsOutput::try_from(rows) { + Ok(rows) => { + results.push(GreptimeQueryOutput::Records(rows)); + } + Err(err) => { + return Err(ErrorResponse::from_error(ty, err)); + } + }, + Err(err) => { + return Err(ErrorResponse::from_error(ty, err)); + } + } + } + Ok(Output::RecordBatches(rbs)) => match HttpRecordsOutput::try_from(rbs.take()) { + Ok(rows) => { + results.push(GreptimeQueryOutput::Records(rows)); + } + Err(err) => { + return Err(ErrorResponse::from_error(ty, err)); + } + }, + Err(err) => { + return Err(ErrorResponse::from_error(ty, err)); + } + } + } + + Ok(results) } #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] @@ -121,7 +191,7 @@ pub async fn promql( State(state): State, Query(params): Query, Extension(query_ctx): Extension, -) -> Json { +) -> Response { let sql_handler = &state.sql_handler; let exec_start = Instant::now(); let db = query_ctx.get_db_string(); @@ -129,29 +199,23 @@ pub async fn promql( .with_label_values(&[db.as_str()]) .start_timer(); - if let Some(resp) = validate_schema( - sql_handler.clone(), - query_ctx.clone(), - ResponseFormat::GreptimedbV1, - ) - .await + let resp = if let Some((status, msg)) = + validate_schema(sql_handler.clone(), query_ctx.clone()).await { - return Json(resp); - } + let resp = ErrorResponse::from_error_message(ResponseFormat::GreptimedbV1, status, msg); + HttpResponse::Error(resp) + } else { + let prom_query = params.into(); + let outputs = sql_handler.do_promql_query(&prom_query, query_ctx).await; + GreptimedbV1Response::from_output(outputs).await + }; - let prom_query = params.into(); - let resp = JsonResponse::from_output( - sql_handler.do_promql_query(&prom_query, query_ctx).await, - ResponseFormat::GreptimedbV1, - None, - ) - .await; - - Json(resp.with_execution_time(exec_start.elapsed().as_millis())) + resp.with_execution_time(exec_start.elapsed().as_millis() as u64) + .into_response() } pub(crate) fn sql_docs(op: TransformOperation) -> TransformOperation { - op.response::<200, Json>() + op.response::<200, Json>() } /// Handler to export metrics @@ -222,26 +286,23 @@ pub async fn config(State(state): State) -> Response async fn validate_schema( sql_handler: ServerSqlQueryHandlerRef, query_ctx: QueryContextRef, - format: ResponseFormat, -) -> Option { +) -> Option<(StatusCode, String)> { match sql_handler .is_valid_schema(query_ctx.current_catalog(), query_ctx.current_schema()) .await { - Ok(false) => Some(JsonResponse::with_error_message( - format!("Database not found: {}", query_ctx.get_db_string()), + Ok(true) => None, + Ok(false) => Some(( StatusCode::DatabaseNotFound, - format, + format!("Database not found: {}", query_ctx.get_db_string()), )), - Err(e) => Some(JsonResponse::with_error_message( + Err(e) => Some(( + StatusCode::Internal, format!( "Error checking database: {}, {}", query_ctx.get_db_string(), e.output_msg(), ), - StatusCode::Internal, - format, )), - _ => None, } } diff --git a/src/servers/src/http/header.rs b/src/servers/src/http/header.rs index 04db669550..40bcfabf4b 100644 --- a/src/servers/src/http/header.rs +++ b/src/servers/src/http/header.rs @@ -14,13 +14,16 @@ use headers::{Header, HeaderName, HeaderValue}; -pub static GREPTIME_DB_NAME_HEADER_NAME: HeaderName = HeaderName::from_static("x-greptime-db-name"); +pub const GREPTIME_DB_HEADER_FORMAT: &str = "x-greptime-format"; +pub const GREPTIME_DB_HEADER_EXECUTION_TIME: &str = "x-greptime-execution-time"; + +pub static GREPTIME_DB_HEADER_NAME: HeaderName = HeaderName::from_static("x-greptime-name"); pub struct GreptimeDbName(Option); impl Header for GreptimeDbName { fn name() -> &'static HeaderName { - &GREPTIME_DB_NAME_HEADER_NAME + &GREPTIME_DB_HEADER_NAME } fn decode<'i, I>(values: &mut I) -> Result diff --git a/src/servers/src/http/influxdb_result_v1.rs b/src/servers/src/http/influxdb_result_v1.rs index 0b8c184ce7..cfcc8f1f94 100644 --- a/src/servers/src/http/influxdb_result_v1.rs +++ b/src/servers/src/http/influxdb_result_v1.rs @@ -12,17 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +use axum::http::HeaderValue; +use axum::response::{IntoResponse, Response}; +use axum::Json; use common_error::ext::ErrorExt; use common_query::Output; use common_recordbatch::{util, RecordBatch}; -use common_telemetry::{debug, error}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value; use snafu::ResultExt; use crate::error::{Error, ToJsonSnafu}; -use crate::http::Epoch; +use crate::http::error_result::ErrorResponse; +use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; +use crate::http::{Epoch, HttpResponse, ResponseFormat}; #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] pub struct SqlQuery { @@ -125,55 +129,26 @@ impl InfluxdbOutput { #[derive(Serialize, Deserialize, Debug, JsonSchema)] pub struct InfluxdbV1Response { results: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - error: Option, - #[serde(skip_serializing_if = "Option::is_none")] - execution_time_ms: Option, + execution_time_ms: u64, } impl InfluxdbV1Response { - pub fn with_error(error: impl ErrorExt) -> Self { - let code = error.status_code(); - if code.should_log_error() { - error!(error; "Failed to handle HTTP request"); - } else { - debug!("Failed to handle HTTP request, err: {:?}", error); - } - - InfluxdbV1Response { - results: vec![], - error: Some(error.output_msg()), - execution_time_ms: None, - } - } - - pub fn with_error_message(err_msg: String) -> Self { - InfluxdbV1Response { - results: vec![], - error: Some(err_msg), - execution_time_ms: None, - } - } - - fn with_output(results: Vec) -> Self { - InfluxdbV1Response { - results, - error: None, - execution_time_ms: None, - } - } - - pub fn with_execution_time(&mut self, execution_time: u64) { - self.execution_time_ms = Some(execution_time); + pub fn with_execution_time(mut self, execution_time: u64) -> Self { + self.execution_time_ms = execution_time; + self } /// Create a influxdb v1 response from query result pub async fn from_output( outputs: Vec>, epoch: Option, - ) -> Self { - // TODO(sunng87): this api response structure cannot represent error - // well. It hides successful execution results from error response + ) -> HttpResponse { + fn make_error_response(error: impl ErrorExt) -> HttpResponse { + HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::InfluxdbV1, error)) + } + + // TODO(sunng87): this api response structure cannot represent error well. + // It hides successful execution results from error response let mut results = Vec::with_capacity(outputs.len()); for (statement_id, out) in outputs.into_iter().enumerate() { let statement_id = statement_id as u32; @@ -195,12 +170,11 @@ impl InfluxdbV1Response { }); } Err(err) => { - return Self::with_error(err); + return make_error_response(err); } }, - - Err(e) => { - return Self::with_error(e); + Err(err) => { + return make_error_response(err); } } } @@ -213,31 +187,43 @@ impl InfluxdbV1Response { }); } Err(err) => { - return Self::with_error(err); + return make_error_response(err); } } } - Err(e) => { - return Self::with_error(e); + Err(err) => { + return make_error_response(err); } } } - Self::with_output(results) - } - pub fn success(&self) -> bool { - self.error.is_none() - } - - pub fn error(&self) -> Option<&String> { - self.error.as_ref() + HttpResponse::InfluxdbV1(InfluxdbV1Response { + results, + execution_time_ms: 0, + }) } pub fn results(&self) -> &[InfluxdbOutput] { &self.results } - pub fn execution_time_ms(&self) -> Option { + pub fn execution_time_ms(&self) -> u64 { self.execution_time_ms } } + +impl IntoResponse for InfluxdbV1Response { + fn into_response(self) -> Response { + let execution_time = self.execution_time_ms; + let mut resp = Json(self).into_response(); + resp.headers_mut().insert( + GREPTIME_DB_HEADER_FORMAT, + HeaderValue::from_static("influxdb_v1"), + ); + resp.headers_mut().insert( + GREPTIME_DB_HEADER_EXECUTION_TIME, + HeaderValue::from(execution_time), + ); + resp + } +} diff --git a/src/servers/src/http/script.rs b/src/servers/src/http/script.rs index 741cc13171..445bc380cf 100644 --- a/src/servers/src/http/script.rs +++ b/src/servers/src/http/script.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::time::Instant; -use axum::extract::{Json, Query, RawBody, State}; +use axum::extract::{Query, RawBody, State}; use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; @@ -25,18 +25,19 @@ use session::context::QueryContext; use snafu::ResultExt; use crate::error::{HyperSnafu, InvalidUtf8ValueSnafu}; -use crate::http::{ApiState, GreptimedbV1Response, JsonResponse, ResponseFormat}; +use crate::http::error_result::ErrorResponse; +use crate::http::{ApiState, GreptimedbV1Response, HttpResponse, ResponseFormat}; macro_rules! json_err { ($e: expr) => {{ - return Json(JsonResponse::with_error($e, ResponseFormat::GreptimedbV1)); + return HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::GreptimedbV1, $e)); }}; ($msg: expr, $code: expr) => {{ - return Json(JsonResponse::with_error_message( - $msg.to_string(), - $code, + return HttpResponse::Error(ErrorResponse::from_error_message( ResponseFormat::GreptimedbV1, + $code, + $msg.to_string(), )); }}; } @@ -56,7 +57,7 @@ pub async fn scripts( State(state): State, Query(params): Query, RawBody(body): RawBody, -) -> Json { +) -> HttpResponse { if let Some(script_handler) = &state.script_handler { let catalog = params .catalog @@ -80,18 +81,16 @@ pub async fn scripts( // Safety: schema and name are already checked above. let query_ctx = QueryContext::with(&catalog, schema.unwrap()); - let body = match script_handler + match script_handler .insert_script(query_ctx, name.unwrap(), &script) .await { - Ok(()) => GreptimedbV1Response::with_output(vec![]).into(), + Ok(()) => GreptimedbV1Response::from_output(vec![]).await, Err(e) => json_err!( format!("Insert script error: {}", e.output_msg()), e.status_code() ), - }; - - Json(body) + } } else { json_err!( "Script execution not supported, missing script handler", @@ -114,7 +113,7 @@ pub struct ScriptQuery { pub async fn run_script( State(state): State, Query(params): Query, -) -> Json { +) -> HttpResponse { if let Some(script_handler) = &state.script_handler { let catalog = params .catalog @@ -137,10 +136,8 @@ pub async fn run_script( let output = script_handler .execute_script(query_ctx, name.unwrap(), params.params) .await; - let resp = - JsonResponse::from_output(vec![output], ResponseFormat::GreptimedbV1, None).await; - - Json(resp.with_execution_time(start.elapsed().as_millis())) + let resp = GreptimedbV1Response::from_output(vec![output]).await; + resp.with_execution_time(start.elapsed().as_millis() as u64) } else { json_err!( "Script execution not supported, missing script handler", diff --git a/src/servers/tests/http/authorize.rs b/src/servers/tests/http/authorize.rs index 97f1c9e2e8..675c291de3 100644 --- a/src/servers/tests/http/authorize.rs +++ b/src/servers/tests/http/authorize.rs @@ -49,7 +49,7 @@ async fn test_http_auth() { let mut resp = auth_res.unwrap_err(); assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); assert_eq!( - b"{\"type\":\"GreptimedbV1\",\"code\":7003,\"error\":\"Not found http or grpc authorization header\"}", + b"{\"code\":7003,\"error\":\"Not found http or grpc authorization header\",\"execution_time_ms\":0}", resp.data().await.unwrap().unwrap().as_ref() ); @@ -60,7 +60,7 @@ async fn test_http_auth() { let mut resp = auth_res.unwrap_err(); assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); assert_eq!( - b"{\"type\":\"GreptimedbV1\",\"code\":7000,\"error\":\"User not found, username: username\"}", + b"{\"code\":7000,\"error\":\"User not found, username: username\",\"execution_time_ms\":0}", resp.data().await.unwrap().unwrap().as_ref(), ); } @@ -95,7 +95,7 @@ async fn test_schema_validating() { let mut resp = result.unwrap_err(); assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); assert_eq!( - b"{\"type\":\"GreptimedbV1\",\"code\":7005,\"error\":\"Access denied for user 'greptime' to database 'greptime-wrong'\"}", + b"{\"code\":7005,\"error\":\"Access denied for user 'greptime' to database 'greptime-wrong'\",\"execution_time_ms\":0}", resp.data().await.unwrap().unwrap().as_ref() ); } @@ -113,7 +113,7 @@ async fn test_whitelist_no_auth() { let mut resp = auth_res.unwrap_err(); assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); assert_eq!( - b"{\"type\":\"GreptimedbV1\",\"code\":7003,\"error\":\"Not found http or grpc authorization header\"}", + b"{\"code\":7003,\"error\":\"Not found http or grpc authorization header\",\"execution_time_ms\":0}", resp.data().await.unwrap().unwrap().as_ref() ); @@ -134,6 +134,5 @@ fn mock_http_request( if let Some(auth_header) = auth_header { req = req.header(http::header::AUTHORIZATION, auth_header); } - Ok(req.body(()).unwrap()) } diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs index e29953ed75..afcd44c262 100644 --- a/src/servers/tests/http/http_handler_test.rs +++ b/src/servers/tests/http/http_handler_test.rs @@ -16,12 +16,16 @@ use std::collections::HashMap; use axum::body::{Body, Bytes}; use axum::extract::{Json, Query, RawBody, State}; +use axum::http::header; +use axum::response::IntoResponse; use axum::Form; +use headers::HeaderValue; use http_body::combinators::UnsyncBoxBody; use hyper::Response; +use mime_guess::mime; use servers::http::{ handler as http_handler, script as script_handler, ApiState, GreptimeOptionsConfigState, - JsonOutput, JsonResponse, + GreptimeQueryOutput, HttpResponse, }; use servers::metrics_handler::MetricsHandler; use session::context::QueryContext; @@ -42,39 +46,26 @@ async fn test_sql_not_provided() { script_handler: None, }; - for format in ["greptimedb_v1", "influxdb_v1"] { + for format in ["greptimedb_v1", "influxdb_v1", "csv"] { let query = http_handler::SqlQuery { db: None, sql: None, format: Some(format.to_string()), epoch: None, }; - let Json(json) = http_handler::sql( + + let HttpResponse::Error(resp) = http_handler::sql( State(api_state.clone()), Query(query), axum::Extension(ctx.clone()), Form(http_handler::SqlQuery::default()), ) - .await; + .await + else { + unreachable!("must be error response") + }; - match json { - JsonResponse::GreptimedbV1(resp) => { - assert!(!resp.success()); - assert_eq!( - Some(&"sql parameter is required.".to_string()), - resp.error() - ); - assert!(resp.output().is_empty()); - } - JsonResponse::InfluxdbV1(resp) => { - assert!(!resp.success()); - assert_eq!( - Some(&"sql parameter is required.".to_string()), - resp.error() - ); - assert!(resp.results().is_empty()); - } - } + assert_eq!("sql parameter is required.", resp.error()); } } @@ -91,9 +82,9 @@ async fn test_sql_output_rows() { script_handler: None, }; - for format in ["greptimedb_v1", "influxdb_v1"] { + for format in ["greptimedb_v1", "influxdb_v1", "csv"] { let query = create_query(format); - let Json(json) = http_handler::sql( + let json = http_handler::sql( State(api_state.clone()), query, axum::Extension(ctx.clone()), @@ -102,16 +93,13 @@ async fn test_sql_output_rows() { .await; match json { - JsonResponse::GreptimedbV1(resp) => { - assert!(resp.success(), "{resp:?}"); - assert!(resp.error().is_none()); - match &resp.output()[0] { - JsonOutput::Records(records) => { - assert_eq!(1, records.num_rows()); - let json = serde_json::to_string_pretty(&records).unwrap(); - assert_eq!( - json, - r#"{ + HttpResponse::GreptimedbV1(resp) => match &resp.output()[0] { + GreptimeQueryOutput::Records(records) => { + assert_eq!(1, records.num_rows()); + let json = serde_json::to_string_pretty(&records).unwrap(); + assert_eq!( + json, + r#"{ "schema": { "column_schemas": [ { @@ -126,15 +114,11 @@ async fn test_sql_output_rows() { ] ] }"# - ); - } - _ => unreachable!(), + ); } - } - JsonResponse::InfluxdbV1(resp) => { - assert!(resp.success(), "{resp:?}"); - assert!(resp.error().is_none()); - + _ => unreachable!(), + }, + HttpResponse::InfluxdbV1(resp) => { let json = serde_json::to_string_pretty(&resp.results()).unwrap(); assert_eq!( json, @@ -158,6 +142,19 @@ async fn test_sql_output_rows() { ]"# ); } + HttpResponse::Csv(resp) => { + use http_body::Body as HttpBody; + let mut resp = resp.into_response(); + assert_eq!( + resp.headers().get(header::CONTENT_TYPE), + Some(HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref())).as_ref(), + ); + assert_eq!( + resp.body_mut().data().await.unwrap().unwrap(), + hyper::body::Bytes::from_static(b"4950\n"), + ); + } + _ => unreachable!(), } } } @@ -175,9 +172,9 @@ async fn test_sql_form() { script_handler: None, }; - for format in ["greptimedb_v1", "influxdb_v1"] { + for format in ["greptimedb_v1", "influxdb_v1", "csv"] { let form = create_form(format); - let Json(json) = http_handler::sql( + let json = http_handler::sql( State(api_state.clone()), Query(http_handler::SqlQuery::default()), axum::Extension(ctx.clone()), @@ -186,16 +183,13 @@ async fn test_sql_form() { .await; match json { - JsonResponse::GreptimedbV1(resp) => { - assert!(resp.success(), "{resp:?}"); - assert!(resp.error().is_none()); - match &resp.output()[0] { - JsonOutput::Records(records) => { - assert_eq!(1, records.num_rows()); - let json = serde_json::to_string_pretty(&records).unwrap(); - assert_eq!( - json, - r#"{ + HttpResponse::GreptimedbV1(resp) => match &resp.output()[0] { + GreptimeQueryOutput::Records(records) => { + assert_eq!(1, records.num_rows()); + let json = serde_json::to_string_pretty(&records).unwrap(); + assert_eq!( + json, + r#"{ "schema": { "column_schemas": [ { @@ -210,15 +204,11 @@ async fn test_sql_form() { ] ] }"# - ); - } - _ => unreachable!(), + ); } - } - JsonResponse::InfluxdbV1(resp) => { - assert!(resp.success(), "{resp:?}"); - assert!(resp.error().is_none()); - + _ => unreachable!(), + }, + HttpResponse::InfluxdbV1(resp) => { let json = serde_json::to_string_pretty(&resp.results()).unwrap(); assert_eq!( json, @@ -242,6 +232,19 @@ async fn test_sql_form() { ]"# ); } + HttpResponse::Csv(resp) => { + use http_body::Body as HttpBody; + let mut resp = resp.into_response(); + assert_eq!( + resp.headers().get(header::CONTENT_TYPE), + Some(HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref())).as_ref(), + ); + assert_eq!( + resp.body_mut().data().await.unwrap().unwrap(), + hyper::body::Bytes::from_static(b"4950\n"), + ); + } + _ => unreachable!(), } } } @@ -255,7 +258,7 @@ lazy_static::lazy_static! { async fn test_metrics() { TEST_METRIC.inc(); let stats = MetricsHandler; - let text = http_handler::metrics(axum::extract::State(stats), Query(HashMap::default())).await; + let text = http_handler::metrics(State(stats), Query(HashMap::default())).await; assert!(text.contains("test_metrics counter")); } @@ -266,7 +269,7 @@ async fn insert_script( ) { let body = RawBody(Body::from(script.clone())); let invalid_query = create_invalid_script_query(); - let Json(json) = script_handler::scripts( + let json = script_handler::scripts( State(ApiState { sql_handler: sql_handler.clone(), script_handler: Some(script_handler.clone()), @@ -275,16 +278,15 @@ async fn insert_script( body, ) .await; - let JsonResponse::GreptimedbV1(json) = json else { + let HttpResponse::Error(json) = json else { unreachable!() }; - assert!(!json.success(), "{json:?}"); - assert_eq!(json.error().unwrap(), "invalid schema"); + assert_eq!(json.error(), "invalid schema"); let body = RawBody(Body::from(script.clone())); let exec = create_script_query(); // Insert the script - let Json(json) = script_handler::scripts( + let json = script_handler::scripts( State(ApiState { sql_handler: sql_handler.clone(), script_handler: Some(script_handler.clone()), @@ -293,11 +295,9 @@ async fn insert_script( body, ) .await; - let JsonResponse::GreptimedbV1(json) = json else { + let HttpResponse::GreptimedbV1(json) = json else { unreachable!() }; - assert!(json.success(), "{json:?}"); - assert!(json.error().is_none()); assert!(json.output().is_empty()); } @@ -317,7 +317,7 @@ def test(n) -> vector[i64]: insert_script(script.clone(), script_handler.clone(), sql_handler.clone()).await; // Run the script let exec = create_script_query(); - let Json(json) = script_handler::run_script( + let json = script_handler::run_script( State(ApiState { sql_handler, script_handler: Some(script_handler), @@ -325,14 +325,11 @@ def test(n) -> vector[i64]: exec, ) .await; - let JsonResponse::GreptimedbV1(json) = json else { + let HttpResponse::GreptimedbV1(json) = json else { unreachable!() }; - assert!(json.success(), "{json:?}"); - assert!(json.error().is_none()); - match &json.output()[0] { - JsonOutput::Records(records) => { + GreptimeQueryOutput::Records(records) => { let json = serde_json::to_string_pretty(&records).unwrap(); assert_eq!(5, records.num_rows()); assert_eq!( @@ -387,7 +384,7 @@ def test(n, **params) -> vector[i64]: // Run the script let mut exec = create_script_query(); let _ = exec.0.params.insert("a".to_string(), "42".to_string()); - let Json(json) = script_handler::run_script( + let json = script_handler::run_script( State(ApiState { sql_handler, script_handler: Some(script_handler), @@ -395,14 +392,11 @@ def test(n, **params) -> vector[i64]: exec, ) .await; - let JsonResponse::GreptimedbV1(json) = json else { + let HttpResponse::GreptimedbV1(json) = json else { unreachable!() }; - assert!(json.success(), "{json:?}"); - assert!(json.error().is_none()); - match &json.output()[0] { - JsonOutput::Records(records) => { + GreptimeQueryOutput::Records(records) => { let json = serde_json::to_string_pretty(&records).unwrap(); assert_eq!(5, records.num_rows()); assert_eq!( diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index 06a9193ce3..a1a0015fa6 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -26,6 +26,7 @@ use query::parser::PromQuery; use query::plan::LogicalPlan; use query::query_engine::DescribeResult; use servers::error::{Error, Result}; +use servers::http::header::GREPTIME_DB_HEADER_FORMAT; use servers::http::{HttpOptions, HttpServerBuilder}; use servers::influxdb::InfluxdbRequest; use servers::query_handler::grpc::GrpcQueryHandler; @@ -169,7 +170,11 @@ async fn test_influxdb_write() { .await; assert_eq!(result.status(), 401); assert_eq!( - "{\"type\":\"InfluxdbV1\",\"results\":[],\"error\":\"Username and password does not match, username: greptime\"}", + result.headers().get(GREPTIME_DB_HEADER_FORMAT).unwrap(), + "influxdb_v1", + ); + assert_eq!( + "{\"code\":7002,\"error\":\"Username and password does not match, username: greptime\",\"execution_time_ms\":0}", result.text().await ); @@ -181,7 +186,11 @@ async fn test_influxdb_write() { .await; assert_eq!(result.status(), 401); assert_eq!( - "{\"type\":\"InfluxdbV1\",\"results\":[],\"error\":\"Not found influx http authorization info\"}", + result.headers().get(GREPTIME_DB_HEADER_FORMAT).unwrap(), + "influxdb_v1", + ); + assert_eq!( + "{\"code\":7003,\"error\":\"Not found influx http authorization info\",\"execution_time_ms\":0}", result.text().await ); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 42843ac22e..8495c0b0f0 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -19,10 +19,12 @@ use axum::http::StatusCode; use axum_test_helper::TestClient; use common_error::status_code::StatusCode as ErrorCode; use serde_json::json; +use servers::http::error_result::ErrorResponse; +use servers::http::greptime_result_v1::GreptimedbV1Response; use servers::http::handler::HealthResponse; -use servers::http::influxdb_result_v1::InfluxdbOutput; +use servers::http::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response}; use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse}; -use servers::http::{JsonOutput, JsonResponse}; +use servers::http::GreptimeQueryOutput; use tests_integration::test_util::{ setup_test_http_app, setup_test_http_app_with_frontend, setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend, @@ -123,13 +125,9 @@ pub async fn test_sql_api(store_type: StorageType) { let res = client.get("/v1/sql").send().await; assert_eq!(res.status(), StatusCode::OK); - let body = serde_json::from_str::(&res.text().await).unwrap(); - let JsonResponse::GreptimedbV1(body) = body else { - unreachable!() - }; + let body = serde_json::from_str::(&res.text().await).unwrap(); assert_eq!(body.code(), 1004); - assert_eq!(body.error().unwrap(), "sql parameter is required."); - let _ = body.execution_time_ms().unwrap(); + assert_eq!(body.error(), "sql parameter is required."); let res = client .get("/v1/sql?sql=select * from numbers limit 10") @@ -137,18 +135,12 @@ pub async fn test_sql_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); - let body = serde_json::from_str::(&res.text().await).unwrap(); - let JsonResponse::GreptimedbV1(body) = body else { - unreachable!() - }; - assert!(body.success()); - let _ = body.execution_time_ms().unwrap(); - + let body = serde_json::from_str::(&res.text().await).unwrap(); let output = body.output(); assert_eq!(output.len(), 1); assert_eq!( output[0], - serde_json::from_value::(json!({ + serde_json::from_value::(json!({ "records" :{"schema":{"column_schemas":[{"name":"number","data_type":"UInt32"}]},"rows":[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]]} })).unwrap() ); @@ -160,13 +152,7 @@ pub async fn test_sql_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); - let body = serde_json::from_str::(&res.text().await).unwrap(); - let JsonResponse::InfluxdbV1(body) = body else { - unreachable!() - }; - assert!(body.success()); - let _ = body.execution_time_ms().unwrap(); - + let body = serde_json::from_str::(&res.text().await).unwrap(); let output = body.results(); assert_eq!(output.len(), 1); assert_eq!( @@ -190,18 +176,13 @@ pub async fn test_sql_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); - let body = serde_json::from_str::(&res.text().await).unwrap(); - let JsonResponse::GreptimedbV1(body) = body else { - unreachable!() - }; - assert!(body.success()); - let _ = body.execution_time_ms().unwrap(); + let body = serde_json::from_str::(&res.text().await).unwrap(); let output = body.output(); assert_eq!(output.len(), 1); assert_eq!( output[0], - serde_json::from_value::(json!({ + serde_json::from_value::(json!({ "records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[["host",66.6,1024.0,0]]} })).unwrap() ); @@ -213,18 +194,13 @@ pub async fn test_sql_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); - let body = serde_json::from_str::(&res.text().await).unwrap(); - let JsonResponse::GreptimedbV1(body) = body else { - unreachable!() - }; - assert!(body.success()); - let _ = body.execution_time_ms().unwrap(); + let body = serde_json::from_str::(&res.text().await).unwrap(); let output = body.output(); assert_eq!(output.len(), 1); assert_eq!( output[0], - serde_json::from_value::(json!({ + serde_json::from_value::(json!({ "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} })).unwrap() ); @@ -236,17 +212,12 @@ pub async fn test_sql_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); - let body = serde_json::from_str::(&res.text().await).unwrap(); - let JsonResponse::GreptimedbV1(body) = body else { - unreachable!() - }; - assert!(body.success()); - let _ = body.execution_time_ms().unwrap(); + let body = serde_json::from_str::(&res.text().await).unwrap(); let output = body.output(); assert_eq!(output.len(), 1); assert_eq!( output[0], - serde_json::from_value::(json!({ + serde_json::from_value::(json!({ "records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} })).unwrap() ); @@ -258,23 +229,18 @@ pub async fn test_sql_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); - let body = serde_json::from_str::(&res.text().await).unwrap(); - let JsonResponse::GreptimedbV1(body) = body else { - unreachable!() - }; - assert!(body.success()); - let _ = body.execution_time_ms().unwrap(); + let body = serde_json::from_str::(&res.text().await).unwrap(); let outputs = body.output(); assert_eq!(outputs.len(), 2); assert_eq!( outputs[0], - serde_json::from_value::(json!({ + serde_json::from_value::(json!({ "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} })).unwrap() ); assert_eq!( outputs[1], - serde_json::from_value::(json!({ + serde_json::from_value::(json!({ "records":{"rows":[]} })) .unwrap() @@ -287,14 +253,9 @@ pub async fn test_sql_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); - let body = serde_json::from_str::(&res.text().await).unwrap(); - let JsonResponse::GreptimedbV1(body) = body else { - unreachable!() - }; - assert!(!body.success()); - let _ = body.execution_time_ms().unwrap(); + let _body = serde_json::from_str::(&res.text().await).unwrap(); // TODO(shuiyisong): fix this when return source err msg to client side - // assert!(body.error().unwrap().contains("Table not found")); + // assert!(body.error().contains("Table not found")); // test database given let res = client @@ -303,17 +264,12 @@ pub async fn test_sql_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); - let body = serde_json::from_str::(&res.text().await).unwrap(); - let JsonResponse::GreptimedbV1(body) = body else { - unreachable!() - }; - assert!(body.success()); - let _ = body.execution_time_ms().unwrap(); + let body = serde_json::from_str::(&res.text().await).unwrap(); let outputs = body.output(); assert_eq!(outputs.len(), 1); assert_eq!( outputs[0], - serde_json::from_value::(json!({ + serde_json::from_value::(json!({ "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} })).unwrap() ); @@ -324,10 +280,7 @@ pub async fn test_sql_api(store_type: StorageType) { .send() .await; assert_eq!(res.status(), StatusCode::OK); - let body = serde_json::from_str::(&res.text().await).unwrap(); - let JsonResponse::GreptimedbV1(body) = body else { - unreachable!() - }; + let body = serde_json::from_str::(&res.text().await).unwrap(); assert_eq!(body.code(), ErrorCode::DatabaseNotFound as u32); // test catalog-schema given @@ -337,17 +290,12 @@ pub async fn test_sql_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); - let body = serde_json::from_str::(&res.text().await).unwrap(); - let JsonResponse::GreptimedbV1(body) = body else { - unreachable!() - }; - assert!(body.success()); - let _ = body.execution_time_ms().unwrap(); + let body = serde_json::from_str::(&res.text().await).unwrap(); let outputs = body.output(); assert_eq!(outputs.len(), 1); assert_eq!( outputs[0], - serde_json::from_value::(json!({ + serde_json::from_value::(json!({ "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} })).unwrap() ); @@ -358,10 +306,7 @@ pub async fn test_sql_api(store_type: StorageType) { .send() .await; assert_eq!(res.status(), StatusCode::OK); - let body = serde_json::from_str::(&res.text().await).unwrap(); - let JsonResponse::GreptimedbV1(body) = body else { - unreachable!() - }; + let body = serde_json::from_str::(&res.text().await).unwrap(); assert_eq!(body.code(), ErrorCode::DatabaseNotFound as u32); // test invalid schema @@ -370,10 +315,7 @@ pub async fn test_sql_api(store_type: StorageType) { .send() .await; assert_eq!(res.status(), StatusCode::OK); - let body = serde_json::from_str::(&res.text().await).unwrap(); - let JsonResponse::GreptimedbV1(body) = body else { - unreachable!() - }; + let body = serde_json::from_str::(&res.text().await).unwrap(); assert_eq!(body.code(), ErrorCode::DatabaseNotFound as u32); guard.remove_all().await; @@ -389,13 +331,7 @@ pub async fn test_prometheus_promql_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); - let body = serde_json::from_str::(&res.text().await).unwrap(); - let JsonResponse::GreptimedbV1(body) = body else { - unreachable!() - }; - assert!(body.success()); - let _ = body.execution_time_ms().unwrap(); - + let _body = serde_json::from_str::(&res.text().await).unwrap(); guard.remove_all().await; } @@ -605,11 +541,7 @@ def test(n) -> vector[f64]: .await; assert_eq!(res.status(), StatusCode::OK); - let body = serde_json::from_str::(&res.text().await).unwrap(); - let JsonResponse::GreptimedbV1(body) = body else { - unreachable!() - }; - assert_eq!(body.code(), 0); + let body = serde_json::from_str::(&res.text().await).unwrap(); assert!(body.output().is_empty()); // call script @@ -618,18 +550,12 @@ def test(n) -> vector[f64]: .send() .await; assert_eq!(res.status(), StatusCode::OK); - let body = serde_json::from_str::(&res.text().await).unwrap(); - let JsonResponse::GreptimedbV1(body) = body else { - unreachable!() - }; - - assert_eq!(body.code(), 0); - let _ = body.execution_time_ms().unwrap(); + let body = serde_json::from_str::(&res.text().await).unwrap(); let output = body.output(); assert_eq!(output.len(), 1); assert_eq!( output[0], - serde_json::from_value::(json!({ + serde_json::from_value::(json!({ "records":{"schema":{"column_schemas":[{"name":"n","data_type":"Float64"}]},"rows":[[1.0],[2.0],[3.0],[4.0],[5.0],[6.0],[7.0],[8.0],[9.0],[10.0]]} })).unwrap() );