feat: support CSV format in sql HTTP API (#3062)

* chore: fix typo

Signed-off-by: tison <wander4096@gmail.com>

* add csv format

Signed-off-by: tison <wander4096@gmail.com>

* flatten response

Signed-off-by: tison <wander4096@gmail.com>

* more flatten response

Signed-off-by: tison <wander4096@gmail.com>

* add CSV format

Signed-off-by: tison <wander4096@gmail.com>

* format InfluxdbV1Response

Signed-off-by: tison <wander4096@gmail.com>

* format ErrorResponse

Signed-off-by: tison <wander4096@gmail.com>

* propagate ErrorResponse to InfluxdbV1Response

Signed-off-by: tison <wander4096@gmail.com>

* format GreptimedbV1Response

Signed-off-by: tison <wander4096@gmail.com>

* format CsvResponse

Signed-off-by: tison <wander4096@gmail.com>

* impl IntoResponse for QueryResponse

Signed-off-by: tison <wander4096@gmail.com>

* promql

Signed-off-by: tison <wander4096@gmail.com>

* sql

Signed-off-by: tison <wander4096@gmail.com>

* compile

Signed-off-by: tison <wander4096@gmail.com>

* fixup aide

Signed-off-by: tison <wander4096@gmail.com>

* clear debt

Signed-off-by: tison <wander4096@gmail.com>

* fixup UT test_recordbatches_conversion

Signed-off-by: tison <wander4096@gmail.com>

* fixup IT cases

Signed-off-by: tison <wander4096@gmail.com>

* fixup more IT cases

Signed-off-by: tison <wander4096@gmail.com>

* fixup test-integration cases

Signed-off-by: tison <wander4096@gmail.com>

* update comment

Signed-off-by: tison <wander4096@gmail.com>

* fixup deserialize and most query < 1ms

Signed-off-by: tison <wander4096@gmail.com>

* fixup auth tests

Signed-off-by: tison <wander4096@gmail.com>

* fixup tests

Signed-off-by: tison <wander4096@gmail.com>

* fixup and align X-GreptimeDB headers

Signed-off-by: tison <wander4096@gmail.com>

* fixup compile

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
This commit is contained in:
tison
2024-01-08 18:54:27 +08:00
committed by GitHub
parent 8c58d3f85b
commit 316d843482
17 changed files with 709 additions and 540 deletions

View File

@@ -16,7 +16,7 @@ use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_error::{GREPTIME_ERROR_CODE, GREPTIME_ERROR_MSG};
use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG};
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
use tonic::{Code, Status};
@@ -115,7 +115,7 @@ impl From<Status> for Error {
.and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
}
let code = get_metadata_value(&e, GREPTIME_ERROR_CODE)
let code = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_CODE)
.and_then(|s| {
if let Ok(code) = s.parse::<u32>() {
StatusCode::from_u32(code)
@@ -125,8 +125,8 @@ impl From<Status> for Error {
})
.unwrap_or(StatusCode::Unknown);
let msg =
get_metadata_value(&e, GREPTIME_ERROR_MSG).unwrap_or_else(|| e.message().to_string());
let msg = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_MSG)
.unwrap_or_else(|| e.message().to_string());
Self::Server { code, msg }
}

View File

@@ -19,7 +19,7 @@ pub mod format;
pub mod mock;
pub mod status_code;
pub const GREPTIME_ERROR_CODE: &str = "x-greptime-err-code";
pub const GREPTIME_ERROR_MSG: &str = "x-greptime-err-msg";
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code";
pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = "x-greptime-err-msg";
pub use snafu;

View File

@@ -14,7 +14,7 @@
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_error::{GREPTIME_ERROR_CODE, GREPTIME_ERROR_MSG};
use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG};
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
use tonic::Status;
@@ -117,7 +117,7 @@ impl From<Status> for Error {
.and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
}
let code = get_metadata_value(&e, GREPTIME_ERROR_CODE)
let code = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_CODE)
.and_then(|s| {
if let Ok(code) = s.parse::<u32>() {
StatusCode::from_u32(code)
@@ -127,8 +127,8 @@ impl From<Status> for Error {
})
.unwrap_or(StatusCode::Internal);
let msg =
get_metadata_value(&e, GREPTIME_ERROR_MSG).unwrap_or_else(|| e.message().to_string());
let msg = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_MSG)
.unwrap_or_else(|| e.message().to_string());
Self::MetaServer { code, msg }
}

View File

@@ -569,7 +569,7 @@ macro_rules! define_into_tonic_status {
($Error: ty) => {
impl From<$Error> for tonic::Status {
fn from(err: $Error) -> Self {
use common_error::{GREPTIME_ERROR_CODE, GREPTIME_ERROR_MSG};
use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG};
use tonic::codegen::http::{HeaderMap, HeaderValue};
use tonic::metadata::MetadataMap;
@@ -578,11 +578,14 @@ macro_rules! define_into_tonic_status {
// If either of the status_code or error msg cannot convert to valid HTTP header value
// (which is a very rare case), just ignore. Client will use Tonic status code and message.
let status_code = err.status_code();
headers.insert(GREPTIME_ERROR_CODE, HeaderValue::from(status_code as u32));
headers.insert(
GREPTIME_DB_HEADER_ERROR_CODE,
HeaderValue::from(status_code as u32),
);
let root_error = err.output_msg();
if let Ok(err_msg) = HeaderValue::from_bytes(root_error.as_bytes()) {
let _ = headers.insert(GREPTIME_ERROR_MSG, err_msg);
let _ = headers.insert(GREPTIME_DB_HEADER_ERROR_MSG, err_msg);
}
let metadata = MetadataMap::from_headers(headers);

View File

@@ -12,43 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod authorize;
pub mod handler;
pub mod header;
pub mod influxdb;
pub mod mem_prof;
pub mod opentsdb;
pub mod otlp;
pub mod pprof;
pub mod prom_store;
pub mod prometheus;
pub mod script;
#[cfg(feature = "dashboard")]
mod dashboard;
pub mod influxdb_result_v1;
use std::fmt::Display;
use std::net::SocketAddr;
use std::time::{Duration, Instant};
use aide::axum::{routing as apirouting, ApiRouter, IntoApiResponse};
use aide::openapi::{Info, OpenApi, Server as OpenAPIServer};
use aide::OperationOutput;
use async_trait::async_trait;
use auth::UserProviderRef;
use axum::error_handling::HandleErrorLayer;
use axum::extract::{DefaultBodyLimit, MatchedPath};
use axum::http::Request;
use axum::middleware::{self, Next};
use axum::response::{Html, IntoResponse, Json};
use axum::response::{Html, IntoResponse, Json, Response};
use axum::{routing, BoxError, Extension, Router};
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::{util, RecordBatch};
use common_telemetry::logging::{debug, error, info};
use common_recordbatch::RecordBatch;
use common_telemetry::logging::{error, info};
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datatypes::data_type::DataType;
@@ -66,6 +49,9 @@ use tower_http::trace::TraceLayer;
use self::authorize::AuthState;
use crate::configurator::ConfiguratorRef;
use crate::error::{AlreadyStartedSnafu, Error, Result, StartHttpSnafu, ToJsonSnafu};
use crate::http::csv_result::CsvResponse;
use crate::http::error_result::ErrorResponse;
use crate::http::greptime_result_v1::GreptimedbV1Response;
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
use crate::http::influxdb_result_v1::InfluxdbV1Response;
use crate::http::prometheus::{
@@ -84,6 +70,25 @@ use crate::query_handler::{
};
use crate::server::Server;
pub mod authorize;
pub mod handler;
pub mod header;
pub mod influxdb;
pub mod mem_prof;
pub mod opentsdb;
pub mod otlp;
pub mod pprof;
pub mod prom_store;
pub mod prometheus;
pub mod script;
pub mod csv_result;
#[cfg(feature = "dashboard")]
mod dashboard;
pub mod error_result;
pub mod greptime_result_v1;
pub mod influxdb_result_v1;
pub const HTTP_API_VERSION: &str = "v1";
pub const HTTP_API_PREFIX: &str = "/v1/";
/// Default http body limit (64M).
@@ -239,129 +244,16 @@ impl TryFrom<Vec<RecordBatch>> for HttpRecordsOutput {
#[derive(Serialize, Deserialize, Debug, JsonSchema, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum JsonOutput {
pub enum GreptimeQueryOutput {
AffectedRows(usize),
Records(HttpRecordsOutput),
}
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
pub struct GreptimedbV1Response {
code: u32,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
output: Vec<JsonOutput>,
#[serde(skip_serializing_if = "Option::is_none")]
execution_time_ms: Option<u64>,
}
impl GreptimedbV1Response {
pub fn with_error(error: impl ErrorExt) -> Self {
let code = error.status_code();
if code.should_log_error() {
error!(error; "Failed to handle HTTP request");
} else {
debug!("Failed to handle HTTP request, err: {:?}", error);
}
GreptimedbV1Response {
error: Some(error.output_msg()),
code: code as u32,
output: vec![],
execution_time_ms: None,
}
}
fn with_error_message(err_msg: String, error_code: StatusCode) -> Self {
GreptimedbV1Response {
error: Some(err_msg),
code: error_code as u32,
output: vec![],
execution_time_ms: None,
}
}
fn with_output(output: Vec<JsonOutput>) -> Self {
GreptimedbV1Response {
error: None,
code: StatusCode::Success as u32,
output,
execution_time_ms: None,
}
}
fn with_execution_time(&mut self, execution_time: u64) {
self.execution_time_ms = Some(execution_time);
}
/// Create a json response from query result
pub async fn from_output(outputs: Vec<Result<Output>>) -> Self {
// TODO(sunng87): this api response structure cannot represent error
// well. It hides successful execution results from error response
let mut results = Vec::with_capacity(outputs.len());
for out in outputs {
match out {
Ok(Output::AffectedRows(rows)) => {
results.push(JsonOutput::AffectedRows(rows));
}
Ok(Output::Stream(stream)) => {
// TODO(sunng87): streaming response
match util::collect(stream).await {
Ok(rows) => match HttpRecordsOutput::try_from(rows) {
Ok(rows) => {
results.push(JsonOutput::Records(rows));
}
Err(err) => {
return Self::with_error(err);
}
},
Err(e) => {
return Self::with_error(e);
}
}
}
Ok(Output::RecordBatches(rbs)) => match HttpRecordsOutput::try_from(rbs.take()) {
Ok(rows) => {
results.push(JsonOutput::Records(rows));
}
Err(err) => {
return Self::with_error(err);
}
},
Err(e) => {
return Self::with_error(e);
}
}
}
Self::with_output(results)
}
pub fn code(&self) -> u32 {
self.code
}
pub fn success(&self) -> bool {
self.code == (StatusCode::Success as u32)
}
pub fn error(&self) -> Option<&String> {
self.error.as_ref()
}
pub fn output(&self) -> &[JsonOutput] {
&self.output
}
pub fn execution_time_ms(&self) -> Option<u64> {
self.execution_time_ms
}
}
/// It allows the results of SQL queries to be presented in different formats.
/// Currently, `greptimedb_v1` and `influxdb_v1` are supported.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResponseFormat {
Csv,
#[default]
GreptimedbV1,
InfluxdbV1,
}
@@ -369,11 +261,20 @@ pub enum ResponseFormat {
impl ResponseFormat {
pub fn parse(s: &str) -> Option<Self> {
match s {
"csv" => Some(ResponseFormat::Csv),
"greptimedb_v1" => Some(ResponseFormat::GreptimedbV1),
"influxdb_v1" => Some(ResponseFormat::InfluxdbV1),
_ => None,
}
}
pub fn as_str(&self) -> &'static str {
match self {
ResponseFormat::Csv => "csv",
ResponseFormat::GreptimedbV1 => "greptimedb_v1",
ResponseFormat::InfluxdbV1 => "influxdb_v1",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -421,67 +322,60 @@ impl Display for Epoch {
}
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
#[serde(tag = "type")]
pub enum JsonResponse {
pub enum HttpResponse {
Csv(CsvResponse),
Error(ErrorResponse),
GreptimedbV1(GreptimedbV1Response),
InfluxdbV1(InfluxdbV1Response),
}
impl From<GreptimedbV1Response> for JsonResponse {
impl HttpResponse {
pub fn with_execution_time(self, execution_time: u64) -> Self {
match self {
HttpResponse::Csv(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(),
}
}
}
impl IntoResponse for HttpResponse {
fn into_response(self) -> Response {
match self {
HttpResponse::Csv(resp) => resp.into_response(),
HttpResponse::GreptimedbV1(resp) => resp.into_response(),
HttpResponse::InfluxdbV1(resp) => resp.into_response(),
HttpResponse::Error(resp) => resp.into_response(),
}
}
}
impl OperationOutput for HttpResponse {
type Inner = Response;
}
impl From<CsvResponse> for HttpResponse {
fn from(value: CsvResponse) -> Self {
HttpResponse::Csv(value)
}
}
impl From<ErrorResponse> for HttpResponse {
fn from(value: ErrorResponse) -> Self {
HttpResponse::Error(value)
}
}
impl From<GreptimedbV1Response> for HttpResponse {
fn from(value: GreptimedbV1Response) -> Self {
JsonResponse::GreptimedbV1(value)
HttpResponse::GreptimedbV1(value)
}
}
impl From<InfluxdbV1Response> for JsonResponse {
impl From<InfluxdbV1Response> for HttpResponse {
fn from(value: InfluxdbV1Response) -> Self {
JsonResponse::InfluxdbV1(value)
}
}
impl JsonResponse {
pub fn with_error(error: impl ErrorExt, response_format: ResponseFormat) -> Self {
match response_format {
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::with_error(error).into(),
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::with_error(error).into(),
}
}
pub fn with_error_message(
err_msg: String,
error_code: StatusCode,
response_format: ResponseFormat,
) -> Self {
match response_format {
ResponseFormat::GreptimedbV1 => {
GreptimedbV1Response::with_error_message(err_msg, error_code).into()
}
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::with_error_message(err_msg).into(),
}
}
pub async fn from_output(
outputs: Vec<Result<Output>>,
response_format: ResponseFormat,
epoch: Option<Epoch>,
) -> Self {
match response_format {
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await.into(),
ResponseFormat::InfluxdbV1 => {
InfluxdbV1Response::from_output(outputs, epoch).await.into()
}
}
}
fn with_execution_time(mut self, execution_time: u128) -> Self {
match &mut self {
JsonResponse::GreptimedbV1(resp) => {
resp.with_execution_time(execution_time as u64);
}
JsonResponse::InfluxdbV1(resp) => {
resp.with_execution_time(execution_time as u64);
}
}
self
HttpResponse::InfluxdbV1(value)
}
}
@@ -900,14 +794,13 @@ impl Server for HttpServer {
}
/// handle error middleware
async fn handle_error(err: BoxError) -> Json<JsonResponse> {
async fn handle_error(err: BoxError) -> Json<HttpResponse> {
error!(err; "Unhandled internal error");
Json(JsonResponse::with_error_message(
format!("Unhandled internal error: {err}"),
StatusCode::Unexpected,
Json(HttpResponse::Error(ErrorResponse::from_error_message(
ResponseFormat::GreptimedbV1,
))
StatusCode::Unexpected,
format!("Unhandled internal error: {err}"),
)))
}
#[cfg(test)]
@@ -920,6 +813,7 @@ mod test {
use axum::http::StatusCode;
use axum::routing::get;
use axum_test_helper::TestClient;
use common_query::Output;
use common_recordbatch::RecordBatches;
use datatypes::prelude::*;
use datatypes::schema::{ColumnSchema, Schema};
@@ -1051,20 +945,24 @@ mod test {
];
let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap();
for format in [ResponseFormat::GreptimedbV1, ResponseFormat::InfluxdbV1] {
for format in [
ResponseFormat::GreptimedbV1,
ResponseFormat::InfluxdbV1,
ResponseFormat::Csv,
] {
let recordbatches =
RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]).unwrap();
let json_resp = JsonResponse::from_output(
vec![Ok(Output::RecordBatches(recordbatches))],
format,
None,
)
.await;
let outputs = vec![Ok(Output::RecordBatches(recordbatches))];
let json_resp = match format {
ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await,
};
match json_resp {
JsonResponse::GreptimedbV1(json_resp) => {
let json_output = &json_resp.output[0];
if let JsonOutput::Records(r) = json_output {
HttpResponse::GreptimedbV1(resp) => {
let json_output = &resp.output[0];
if let GreptimeQueryOutput::Records(r) = json_output {
assert_eq!(r.num_rows(), 4);
assert_eq!(r.num_cols(), 2);
let schema = r.schema.as_ref().unwrap();
@@ -1076,8 +974,8 @@ mod test {
panic!("invalid output type");
}
}
JsonResponse::InfluxdbV1(json_resp) => {
let json_output = &json_resp.results()[0];
HttpResponse::InfluxdbV1(resp) => {
let json_output = &resp.results()[0];
assert_eq!(json_output.num_rows(), 4);
assert_eq!(json_output.num_cols(), 2);
assert_eq!(json_output.series[0].columns.clone()[0], "numbers");
@@ -1087,6 +985,21 @@ mod test {
);
assert_eq!(json_output.series[0].values[0][1], serde_json::Value::Null);
}
HttpResponse::Csv(resp) => {
let output = &resp.output()[0];
if let GreptimeQueryOutput::Records(r) = output {
assert_eq!(r.num_rows(), 4);
assert_eq!(r.num_cols(), 2);
let schema = r.schema.as_ref().unwrap();
assert_eq!(schema.column_schemas[0].name, "numbers");
assert_eq!(schema.column_schemas[0].data_type, "UInt32");
assert_eq!(r.rows[0][0], serde_json::Value::from(1));
assert_eq!(r.rows[0][1], serde_json::Value::Null);
} else {
panic!("invalid output type");
}
}
HttpResponse::Error(err) => unreachable!("{err:?}"),
}
}
}

View File

@@ -17,7 +17,6 @@ use axum::extract::State;
use axum::http::{self, Request, StatusCode};
use axum::middleware::Next;
use axum::response::{IntoResponse, Response};
use axum::Json;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
@@ -30,11 +29,12 @@ use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt};
use super::header::GreptimeDbName;
use super::{JsonResponse, ResponseFormat, PUBLIC_APIS};
use super::{ResponseFormat, PUBLIC_APIS};
use crate::error::{
self, InvalidAuthorizationHeaderSnafu, InvalidParameterSnafu, InvisibleASCIISnafu,
NotFoundInfluxAuthSnafu, Result, UnsupportedAuthSchemeSnafu, UrlDecodeSnafu,
};
use crate::http::error_result::ErrorResponse;
use crate::http::HTTP_API_PREFIX;
/// AuthState is a holder state for [`UserProviderRef`]
@@ -118,14 +118,12 @@ pub async fn check_http_auth<B>(
}
fn err_response(is_influxdb: bool, err: impl ErrorExt) -> impl IntoResponse {
let format = if is_influxdb {
let ty = if is_influxdb {
ResponseFormat::InfluxdbV1
} else {
ResponseFormat::GreptimedbV1
};
let body = JsonResponse::with_error(err, format);
(StatusCode::UNAUTHORIZED, Json(body))
(StatusCode::UNAUTHORIZED, ErrorResponse::from_error(ty, err))
}
fn extract_catalog_and_schema<B>(request: &Request<B>) -> (&str, &str) {

View File

@@ -0,0 +1,111 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Write;
use axum::http::{header, HeaderValue};
use axum::response::{IntoResponse, Response};
use common_error::status_code::StatusCode;
use common_query::Output;
use itertools::Itertools;
use mime_guess::mime;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::http::error_result::ErrorResponse;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat};
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
pub struct CsvResponse {
output: Vec<GreptimeQueryOutput>,
execution_time_ms: u64,
}
impl CsvResponse {
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
match handler::from_output(ResponseFormat::Csv, outputs).await {
Err(err) => HttpResponse::Error(err),
Ok(output) => {
if output.len() > 1 {
HttpResponse::Error(ErrorResponse::from_error_message(
ResponseFormat::Csv,
StatusCode::InvalidArguments,
"Multi-statements are not allowed".to_string(),
))
} else {
HttpResponse::Csv(CsvResponse {
output,
execution_time_ms: 0,
})
}
}
}
}
pub fn output(&self) -> &[GreptimeQueryOutput] {
&self.output
}
pub fn with_execution_time(mut self, execution_time: u64) -> Self {
self.execution_time_ms = execution_time;
self
}
pub fn execution_time_ms(&self) -> u64 {
self.execution_time_ms
}
}
impl IntoResponse for CsvResponse {
fn into_response(mut self) -> Response {
debug_assert!(
self.output.len() <= 1,
"self.output has extra elements: {}",
self.output.len()
);
let execution_time = self.execution_time_ms;
let payload = match self.output.pop() {
None => "".to_string(),
Some(GreptimeQueryOutput::AffectedRows(n)) => {
format!("{n}\n")
}
Some(GreptimeQueryOutput::Records(records)) => {
let mut result = String::new();
for row in records.rows {
let row = row.iter().map(|v| v.to_string()).join(",");
writeln!(result, "{row}").unwrap();
}
result
}
};
let mut resp = (
[(
header::CONTENT_TYPE,
HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref()),
)],
payload,
)
.into_response();
resp.headers_mut()
.insert(GREPTIME_DB_HEADER_FORMAT, HeaderValue::from_static("CSV"));
resp.headers_mut().insert(
GREPTIME_DB_HEADER_EXECUTION_TIME,
HeaderValue::from(execution_time),
);
resp
}
}

View File

@@ -0,0 +1,98 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
use axum::Json;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG};
use common_telemetry::logging::{debug, error};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::ResponseFormat;
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
pub struct ErrorResponse {
#[serde(skip)]
ty: ResponseFormat,
code: u32,
error: String,
execution_time_ms: u64,
}
impl ErrorResponse {
pub fn from_error(ty: ResponseFormat, error: impl ErrorExt) -> Self {
let code = error.status_code();
if code.should_log_error() {
error!(error; "Failed to handle HTTP request");
} else {
debug!("Failed to handle HTTP request, err: {:?}", error);
}
Self::from_error_message(ty, code, error.output_msg())
}
pub fn from_error_message(ty: ResponseFormat, code: StatusCode, msg: String) -> Self {
ErrorResponse {
ty,
code: code as u32,
error: msg,
execution_time_ms: 0,
}
}
pub fn with_execution_time(mut self, execution_time: u64) -> Self {
self.execution_time_ms = execution_time;
self
}
pub fn execution_time_ms(&self) -> u64 {
self.execution_time_ms
}
pub fn code(&self) -> u32 {
self.code
}
pub fn error(&self) -> &str {
&self.error
}
}
impl IntoResponse for ErrorResponse {
fn into_response(self) -> Response {
let ty = self.ty.as_str();
let code = self.code;
let msg = self.error.clone();
let execution_time = self.execution_time_ms;
let mut resp = Json(self).into_response();
resp.headers_mut()
.insert(GREPTIME_DB_HEADER_ERROR_CODE, HeaderValue::from(code));
resp.headers_mut().insert(
GREPTIME_DB_HEADER_ERROR_MSG,
HeaderValue::from_str(&msg).expect("malformed error msg"),
);
resp.headers_mut()
.insert(GREPTIME_DB_HEADER_FORMAT, HeaderValue::from_static(ty));
resp.headers_mut().insert(
GREPTIME_DB_HEADER_EXECUTION_TIME,
HeaderValue::from(execution_time),
);
resp
}
}

View File

@@ -0,0 +1,71 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use axum::response::{IntoResponse, Response};
use axum::Json;
use common_query::Output;
use reqwest::header::HeaderValue;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat};
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
pub struct GreptimedbV1Response {
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub(crate) output: Vec<GreptimeQueryOutput>,
pub(crate) execution_time_ms: u64,
}
impl GreptimedbV1Response {
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
match handler::from_output(ResponseFormat::GreptimedbV1, outputs).await {
Ok(output) => HttpResponse::GreptimedbV1(Self {
output,
execution_time_ms: 0,
}),
Err(err) => HttpResponse::Error(err),
}
}
pub fn output(&self) -> &[GreptimeQueryOutput] {
&self.output
}
pub fn with_execution_time(mut self, execution_time: u64) -> Self {
self.execution_time_ms = execution_time;
self
}
pub fn execution_time_ms(&self) -> u64 {
self.execution_time_ms
}
}
impl IntoResponse for GreptimedbV1Response {
fn into_response(self) -> Response {
let execution_time = self.execution_time_ms;
let mut resp = Json(self).into_response();
resp.headers_mut().insert(
GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static("greptimedb_v1"),
);
resp.headers_mut().insert(
GREPTIME_DB_HEADER_EXECUTION_TIME,
HeaderValue::from(execution_time),
);
resp
}
}

View File

@@ -22,12 +22,21 @@ use axum::response::{IntoResponse, Response};
use axum::{Extension, Form};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::util;
use query::parser::PromQuery;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use session::context::QueryContextRef;
use crate::http::{ApiState, Epoch, GreptimeOptionsConfigState, JsonResponse, ResponseFormat};
use crate::http::csv_result::CsvResponse;
use crate::http::error_result::ErrorResponse;
use crate::http::greptime_result_v1::GreptimedbV1Response;
use crate::http::influxdb_result_v1::InfluxdbV1Response;
use crate::http::{
ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput,
HttpResponse, ResponseFormat,
};
use crate::metrics_handler::MetricsHandler;
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
@@ -35,7 +44,7 @@ use crate::query_handler::sql::ServerSqlQueryHandlerRef;
pub struct SqlQuery {
pub db: Option<String>,
pub sql: Option<String>,
// (Optional) result format: [`gerptimedb_v1`, `influxdb_v1`],
// (Optional) result format: [`greptimedb_v1`, `influxdb_v1`, `csv`],
// the default value is `greptimedb_v1`
pub format: Option<String>,
// Returns epoch timestamps with the specified precision.
@@ -56,7 +65,7 @@ pub async fn sql(
Query(query_params): Query<SqlQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
Form(form_params): Form<SqlQuery>,
) -> Json<JsonResponse> {
) -> HttpResponse {
let sql_handler = &state.sql_handler;
let start = Instant::now();
@@ -78,21 +87,82 @@ pub async fn sql(
.with_label_values(&[db.as_str()])
.start_timer();
let resp = if let Some(sql) = &sql {
if let Some(resp) = validate_schema(sql_handler.clone(), query_ctx.clone(), format).await {
return Json(resp);
let result = if let Some(sql) = &sql {
if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await {
Err((status, msg))
} else {
Ok(sql_handler.do_query(sql, query_ctx).await)
}
JsonResponse::from_output(sql_handler.do_query(sql, query_ctx).await, format, epoch).await
} else {
JsonResponse::with_error_message(
"sql parameter is required.".to_string(),
Err((
StatusCode::InvalidArguments,
format,
)
"sql parameter is required.".to_string(),
))
};
Json(resp.with_execution_time(start.elapsed().as_millis()))
let outputs = match result {
Err((status, msg)) => {
return HttpResponse::Error(
ErrorResponse::from_error_message(format, status, msg)
.with_execution_time(start.elapsed().as_millis() as u64),
);
}
Ok(outputs) => outputs,
};
let resp = match format {
ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
};
resp.with_execution_time(start.elapsed().as_millis() as u64)
}
/// Create a response from query result
pub async fn from_output(
ty: ResponseFormat,
outputs: Vec<crate::error::Result<Output>>,
) -> Result<Vec<GreptimeQueryOutput>, ErrorResponse> {
// TODO(sunng87): this api response structure cannot represent error well.
// It hides successful execution results from error response
let mut results = Vec::with_capacity(outputs.len());
for out in outputs {
match out {
Ok(Output::AffectedRows(rows)) => {
results.push(GreptimeQueryOutput::AffectedRows(rows));
}
Ok(Output::Stream(stream)) => {
// TODO(sunng87): streaming response
match util::collect(stream).await {
Ok(rows) => match HttpRecordsOutput::try_from(rows) {
Ok(rows) => {
results.push(GreptimeQueryOutput::Records(rows));
}
Err(err) => {
return Err(ErrorResponse::from_error(ty, err));
}
},
Err(err) => {
return Err(ErrorResponse::from_error(ty, err));
}
}
}
Ok(Output::RecordBatches(rbs)) => match HttpRecordsOutput::try_from(rbs.take()) {
Ok(rows) => {
results.push(GreptimeQueryOutput::Records(rows));
}
Err(err) => {
return Err(ErrorResponse::from_error(ty, err));
}
},
Err(err) => {
return Err(ErrorResponse::from_error(ty, err));
}
}
}
Ok(results)
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
@@ -121,7 +191,7 @@ pub async fn promql(
State(state): State<ApiState>,
Query(params): Query<PromqlQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
) -> Json<JsonResponse> {
) -> Response {
let sql_handler = &state.sql_handler;
let exec_start = Instant::now();
let db = query_ctx.get_db_string();
@@ -129,29 +199,23 @@ pub async fn promql(
.with_label_values(&[db.as_str()])
.start_timer();
if let Some(resp) = validate_schema(
sql_handler.clone(),
query_ctx.clone(),
ResponseFormat::GreptimedbV1,
)
.await
let resp = if let Some((status, msg)) =
validate_schema(sql_handler.clone(), query_ctx.clone()).await
{
return Json(resp);
}
let resp = ErrorResponse::from_error_message(ResponseFormat::GreptimedbV1, status, msg);
HttpResponse::Error(resp)
} else {
let prom_query = params.into();
let outputs = sql_handler.do_promql_query(&prom_query, query_ctx).await;
GreptimedbV1Response::from_output(outputs).await
};
let prom_query = params.into();
let resp = JsonResponse::from_output(
sql_handler.do_promql_query(&prom_query, query_ctx).await,
ResponseFormat::GreptimedbV1,
None,
)
.await;
Json(resp.with_execution_time(exec_start.elapsed().as_millis()))
resp.with_execution_time(exec_start.elapsed().as_millis() as u64)
.into_response()
}
pub(crate) fn sql_docs(op: TransformOperation) -> TransformOperation {
op.response::<200, Json<JsonResponse>>()
op.response::<200, Json<HttpResponse>>()
}
/// Handler to export metrics
@@ -222,26 +286,23 @@ pub async fn config(State(state): State<GreptimeOptionsConfigState>) -> Response
async fn validate_schema(
sql_handler: ServerSqlQueryHandlerRef,
query_ctx: QueryContextRef,
format: ResponseFormat,
) -> Option<JsonResponse> {
) -> Option<(StatusCode, String)> {
match sql_handler
.is_valid_schema(query_ctx.current_catalog(), query_ctx.current_schema())
.await
{
Ok(false) => Some(JsonResponse::with_error_message(
format!("Database not found: {}", query_ctx.get_db_string()),
Ok(true) => None,
Ok(false) => Some((
StatusCode::DatabaseNotFound,
format,
format!("Database not found: {}", query_ctx.get_db_string()),
)),
Err(e) => Some(JsonResponse::with_error_message(
Err(e) => Some((
StatusCode::Internal,
format!(
"Error checking database: {}, {}",
query_ctx.get_db_string(),
e.output_msg(),
),
StatusCode::Internal,
format,
)),
_ => None,
}
}

View File

@@ -14,13 +14,16 @@
use headers::{Header, HeaderName, HeaderValue};
pub static GREPTIME_DB_NAME_HEADER_NAME: HeaderName = HeaderName::from_static("x-greptime-db-name");
pub const GREPTIME_DB_HEADER_FORMAT: &str = "x-greptime-format";
pub const GREPTIME_DB_HEADER_EXECUTION_TIME: &str = "x-greptime-execution-time";
pub static GREPTIME_DB_HEADER_NAME: HeaderName = HeaderName::from_static("x-greptime-name");
pub struct GreptimeDbName(Option<String>);
impl Header for GreptimeDbName {
fn name() -> &'static HeaderName {
&GREPTIME_DB_NAME_HEADER_NAME
&GREPTIME_DB_HEADER_NAME
}
fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>

View File

@@ -12,17 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
use axum::Json;
use common_error::ext::ErrorExt;
use common_query::Output;
use common_recordbatch::{util, RecordBatch};
use common_telemetry::{debug, error};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use snafu::ResultExt;
use crate::error::{Error, ToJsonSnafu};
use crate::http::Epoch;
use crate::http::error_result::ErrorResponse;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::{Epoch, HttpResponse, ResponseFormat};
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct SqlQuery {
@@ -125,55 +129,26 @@ impl InfluxdbOutput {
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
pub struct InfluxdbV1Response {
results: Vec<InfluxdbOutput>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
execution_time_ms: Option<u64>,
execution_time_ms: u64,
}
impl InfluxdbV1Response {
pub fn with_error(error: impl ErrorExt) -> Self {
let code = error.status_code();
if code.should_log_error() {
error!(error; "Failed to handle HTTP request");
} else {
debug!("Failed to handle HTTP request, err: {:?}", error);
}
InfluxdbV1Response {
results: vec![],
error: Some(error.output_msg()),
execution_time_ms: None,
}
}
pub fn with_error_message(err_msg: String) -> Self {
InfluxdbV1Response {
results: vec![],
error: Some(err_msg),
execution_time_ms: None,
}
}
fn with_output(results: Vec<InfluxdbOutput>) -> Self {
InfluxdbV1Response {
results,
error: None,
execution_time_ms: None,
}
}
pub fn with_execution_time(&mut self, execution_time: u64) {
self.execution_time_ms = Some(execution_time);
pub fn with_execution_time(mut self, execution_time: u64) -> Self {
self.execution_time_ms = execution_time;
self
}
/// Create a influxdb v1 response from query result
pub async fn from_output(
outputs: Vec<crate::error::Result<Output>>,
epoch: Option<Epoch>,
) -> Self {
// TODO(sunng87): this api response structure cannot represent error
// well. It hides successful execution results from error response
) -> HttpResponse {
fn make_error_response(error: impl ErrorExt) -> HttpResponse {
HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::InfluxdbV1, error))
}
// TODO(sunng87): this api response structure cannot represent error well.
// It hides successful execution results from error response
let mut results = Vec::with_capacity(outputs.len());
for (statement_id, out) in outputs.into_iter().enumerate() {
let statement_id = statement_id as u32;
@@ -195,12 +170,11 @@ impl InfluxdbV1Response {
});
}
Err(err) => {
return Self::with_error(err);
return make_error_response(err);
}
},
Err(e) => {
return Self::with_error(e);
Err(err) => {
return make_error_response(err);
}
}
}
@@ -213,31 +187,43 @@ impl InfluxdbV1Response {
});
}
Err(err) => {
return Self::with_error(err);
return make_error_response(err);
}
}
}
Err(e) => {
return Self::with_error(e);
Err(err) => {
return make_error_response(err);
}
}
}
Self::with_output(results)
}
pub fn success(&self) -> bool {
self.error.is_none()
}
pub fn error(&self) -> Option<&String> {
self.error.as_ref()
HttpResponse::InfluxdbV1(InfluxdbV1Response {
results,
execution_time_ms: 0,
})
}
pub fn results(&self) -> &[InfluxdbOutput] {
&self.results
}
pub fn execution_time_ms(&self) -> Option<u64> {
pub fn execution_time_ms(&self) -> u64 {
self.execution_time_ms
}
}
impl IntoResponse for InfluxdbV1Response {
fn into_response(self) -> Response {
let execution_time = self.execution_time_ms;
let mut resp = Json(self).into_response();
resp.headers_mut().insert(
GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static("influxdb_v1"),
);
resp.headers_mut().insert(
GREPTIME_DB_HEADER_EXECUTION_TIME,
HeaderValue::from(execution_time),
);
resp
}
}

View File

@@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::time::Instant;
use axum::extract::{Json, Query, RawBody, State};
use axum::extract::{Query, RawBody, State};
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
@@ -25,18 +25,19 @@ use session::context::QueryContext;
use snafu::ResultExt;
use crate::error::{HyperSnafu, InvalidUtf8ValueSnafu};
use crate::http::{ApiState, GreptimedbV1Response, JsonResponse, ResponseFormat};
use crate::http::error_result::ErrorResponse;
use crate::http::{ApiState, GreptimedbV1Response, HttpResponse, ResponseFormat};
macro_rules! json_err {
($e: expr) => {{
return Json(JsonResponse::with_error($e, ResponseFormat::GreptimedbV1));
return HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::GreptimedbV1, $e));
}};
($msg: expr, $code: expr) => {{
return Json(JsonResponse::with_error_message(
$msg.to_string(),
$code,
return HttpResponse::Error(ErrorResponse::from_error_message(
ResponseFormat::GreptimedbV1,
$code,
$msg.to_string(),
));
}};
}
@@ -56,7 +57,7 @@ pub async fn scripts(
State(state): State<ApiState>,
Query(params): Query<ScriptQuery>,
RawBody(body): RawBody,
) -> Json<JsonResponse> {
) -> HttpResponse {
if let Some(script_handler) = &state.script_handler {
let catalog = params
.catalog
@@ -80,18 +81,16 @@ pub async fn scripts(
// Safety: schema and name are already checked above.
let query_ctx = QueryContext::with(&catalog, schema.unwrap());
let body = match script_handler
match script_handler
.insert_script(query_ctx, name.unwrap(), &script)
.await
{
Ok(()) => GreptimedbV1Response::with_output(vec![]).into(),
Ok(()) => GreptimedbV1Response::from_output(vec![]).await,
Err(e) => json_err!(
format!("Insert script error: {}", e.output_msg()),
e.status_code()
),
};
Json(body)
}
} else {
json_err!(
"Script execution not supported, missing script handler",
@@ -114,7 +113,7 @@ pub struct ScriptQuery {
pub async fn run_script(
State(state): State<ApiState>,
Query(params): Query<ScriptQuery>,
) -> Json<JsonResponse> {
) -> HttpResponse {
if let Some(script_handler) = &state.script_handler {
let catalog = params
.catalog
@@ -137,10 +136,8 @@ pub async fn run_script(
let output = script_handler
.execute_script(query_ctx, name.unwrap(), params.params)
.await;
let resp =
JsonResponse::from_output(vec![output], ResponseFormat::GreptimedbV1, None).await;
Json(resp.with_execution_time(start.elapsed().as_millis()))
let resp = GreptimedbV1Response::from_output(vec![output]).await;
resp.with_execution_time(start.elapsed().as_millis() as u64)
} else {
json_err!(
"Script execution not supported, missing script handler",

View File

@@ -49,7 +49,7 @@ async fn test_http_auth() {
let mut resp = auth_res.unwrap_err();
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
assert_eq!(
b"{\"type\":\"GreptimedbV1\",\"code\":7003,\"error\":\"Not found http or grpc authorization header\"}",
b"{\"code\":7003,\"error\":\"Not found http or grpc authorization header\",\"execution_time_ms\":0}",
resp.data().await.unwrap().unwrap().as_ref()
);
@@ -60,7 +60,7 @@ async fn test_http_auth() {
let mut resp = auth_res.unwrap_err();
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
assert_eq!(
b"{\"type\":\"GreptimedbV1\",\"code\":7000,\"error\":\"User not found, username: username\"}",
b"{\"code\":7000,\"error\":\"User not found, username: username\",\"execution_time_ms\":0}",
resp.data().await.unwrap().unwrap().as_ref(),
);
}
@@ -95,7 +95,7 @@ async fn test_schema_validating() {
let mut resp = result.unwrap_err();
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
assert_eq!(
b"{\"type\":\"GreptimedbV1\",\"code\":7005,\"error\":\"Access denied for user 'greptime' to database 'greptime-wrong'\"}",
b"{\"code\":7005,\"error\":\"Access denied for user 'greptime' to database 'greptime-wrong'\",\"execution_time_ms\":0}",
resp.data().await.unwrap().unwrap().as_ref()
);
}
@@ -113,7 +113,7 @@ async fn test_whitelist_no_auth() {
let mut resp = auth_res.unwrap_err();
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
assert_eq!(
b"{\"type\":\"GreptimedbV1\",\"code\":7003,\"error\":\"Not found http or grpc authorization header\"}",
b"{\"code\":7003,\"error\":\"Not found http or grpc authorization header\",\"execution_time_ms\":0}",
resp.data().await.unwrap().unwrap().as_ref()
);
@@ -134,6 +134,5 @@ fn mock_http_request(
if let Some(auth_header) = auth_header {
req = req.header(http::header::AUTHORIZATION, auth_header);
}
Ok(req.body(()).unwrap())
}

View File

@@ -16,12 +16,16 @@ use std::collections::HashMap;
use axum::body::{Body, Bytes};
use axum::extract::{Json, Query, RawBody, State};
use axum::http::header;
use axum::response::IntoResponse;
use axum::Form;
use headers::HeaderValue;
use http_body::combinators::UnsyncBoxBody;
use hyper::Response;
use mime_guess::mime;
use servers::http::{
handler as http_handler, script as script_handler, ApiState, GreptimeOptionsConfigState,
JsonOutput, JsonResponse,
GreptimeQueryOutput, HttpResponse,
};
use servers::metrics_handler::MetricsHandler;
use session::context::QueryContext;
@@ -42,39 +46,26 @@ async fn test_sql_not_provided() {
script_handler: None,
};
for format in ["greptimedb_v1", "influxdb_v1"] {
for format in ["greptimedb_v1", "influxdb_v1", "csv"] {
let query = http_handler::SqlQuery {
db: None,
sql: None,
format: Some(format.to_string()),
epoch: None,
};
let Json(json) = http_handler::sql(
let HttpResponse::Error(resp) = http_handler::sql(
State(api_state.clone()),
Query(query),
axum::Extension(ctx.clone()),
Form(http_handler::SqlQuery::default()),
)
.await;
.await
else {
unreachable!("must be error response")
};
match json {
JsonResponse::GreptimedbV1(resp) => {
assert!(!resp.success());
assert_eq!(
Some(&"sql parameter is required.".to_string()),
resp.error()
);
assert!(resp.output().is_empty());
}
JsonResponse::InfluxdbV1(resp) => {
assert!(!resp.success());
assert_eq!(
Some(&"sql parameter is required.".to_string()),
resp.error()
);
assert!(resp.results().is_empty());
}
}
assert_eq!("sql parameter is required.", resp.error());
}
}
@@ -91,9 +82,9 @@ async fn test_sql_output_rows() {
script_handler: None,
};
for format in ["greptimedb_v1", "influxdb_v1"] {
for format in ["greptimedb_v1", "influxdb_v1", "csv"] {
let query = create_query(format);
let Json(json) = http_handler::sql(
let json = http_handler::sql(
State(api_state.clone()),
query,
axum::Extension(ctx.clone()),
@@ -102,16 +93,13 @@ async fn test_sql_output_rows() {
.await;
match json {
JsonResponse::GreptimedbV1(resp) => {
assert!(resp.success(), "{resp:?}");
assert!(resp.error().is_none());
match &resp.output()[0] {
JsonOutput::Records(records) => {
assert_eq!(1, records.num_rows());
let json = serde_json::to_string_pretty(&records).unwrap();
assert_eq!(
json,
r#"{
HttpResponse::GreptimedbV1(resp) => match &resp.output()[0] {
GreptimeQueryOutput::Records(records) => {
assert_eq!(1, records.num_rows());
let json = serde_json::to_string_pretty(&records).unwrap();
assert_eq!(
json,
r#"{
"schema": {
"column_schemas": [
{
@@ -126,15 +114,11 @@ async fn test_sql_output_rows() {
]
]
}"#
);
}
_ => unreachable!(),
);
}
}
JsonResponse::InfluxdbV1(resp) => {
assert!(resp.success(), "{resp:?}");
assert!(resp.error().is_none());
_ => unreachable!(),
},
HttpResponse::InfluxdbV1(resp) => {
let json = serde_json::to_string_pretty(&resp.results()).unwrap();
assert_eq!(
json,
@@ -158,6 +142,19 @@ async fn test_sql_output_rows() {
]"#
);
}
HttpResponse::Csv(resp) => {
use http_body::Body as HttpBody;
let mut resp = resp.into_response();
assert_eq!(
resp.headers().get(header::CONTENT_TYPE),
Some(HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref())).as_ref(),
);
assert_eq!(
resp.body_mut().data().await.unwrap().unwrap(),
hyper::body::Bytes::from_static(b"4950\n"),
);
}
_ => unreachable!(),
}
}
}
@@ -175,9 +172,9 @@ async fn test_sql_form() {
script_handler: None,
};
for format in ["greptimedb_v1", "influxdb_v1"] {
for format in ["greptimedb_v1", "influxdb_v1", "csv"] {
let form = create_form(format);
let Json(json) = http_handler::sql(
let json = http_handler::sql(
State(api_state.clone()),
Query(http_handler::SqlQuery::default()),
axum::Extension(ctx.clone()),
@@ -186,16 +183,13 @@ async fn test_sql_form() {
.await;
match json {
JsonResponse::GreptimedbV1(resp) => {
assert!(resp.success(), "{resp:?}");
assert!(resp.error().is_none());
match &resp.output()[0] {
JsonOutput::Records(records) => {
assert_eq!(1, records.num_rows());
let json = serde_json::to_string_pretty(&records).unwrap();
assert_eq!(
json,
r#"{
HttpResponse::GreptimedbV1(resp) => match &resp.output()[0] {
GreptimeQueryOutput::Records(records) => {
assert_eq!(1, records.num_rows());
let json = serde_json::to_string_pretty(&records).unwrap();
assert_eq!(
json,
r#"{
"schema": {
"column_schemas": [
{
@@ -210,15 +204,11 @@ async fn test_sql_form() {
]
]
}"#
);
}
_ => unreachable!(),
);
}
}
JsonResponse::InfluxdbV1(resp) => {
assert!(resp.success(), "{resp:?}");
assert!(resp.error().is_none());
_ => unreachable!(),
},
HttpResponse::InfluxdbV1(resp) => {
let json = serde_json::to_string_pretty(&resp.results()).unwrap();
assert_eq!(
json,
@@ -242,6 +232,19 @@ async fn test_sql_form() {
]"#
);
}
HttpResponse::Csv(resp) => {
use http_body::Body as HttpBody;
let mut resp = resp.into_response();
assert_eq!(
resp.headers().get(header::CONTENT_TYPE),
Some(HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref())).as_ref(),
);
assert_eq!(
resp.body_mut().data().await.unwrap().unwrap(),
hyper::body::Bytes::from_static(b"4950\n"),
);
}
_ => unreachable!(),
}
}
}
@@ -255,7 +258,7 @@ lazy_static::lazy_static! {
async fn test_metrics() {
TEST_METRIC.inc();
let stats = MetricsHandler;
let text = http_handler::metrics(axum::extract::State(stats), Query(HashMap::default())).await;
let text = http_handler::metrics(State(stats), Query(HashMap::default())).await;
assert!(text.contains("test_metrics counter"));
}
@@ -266,7 +269,7 @@ async fn insert_script(
) {
let body = RawBody(Body::from(script.clone()));
let invalid_query = create_invalid_script_query();
let Json(json) = script_handler::scripts(
let json = script_handler::scripts(
State(ApiState {
sql_handler: sql_handler.clone(),
script_handler: Some(script_handler.clone()),
@@ -275,16 +278,15 @@ async fn insert_script(
body,
)
.await;
let JsonResponse::GreptimedbV1(json) = json else {
let HttpResponse::Error(json) = json else {
unreachable!()
};
assert!(!json.success(), "{json:?}");
assert_eq!(json.error().unwrap(), "invalid schema");
assert_eq!(json.error(), "invalid schema");
let body = RawBody(Body::from(script.clone()));
let exec = create_script_query();
// Insert the script
let Json(json) = script_handler::scripts(
let json = script_handler::scripts(
State(ApiState {
sql_handler: sql_handler.clone(),
script_handler: Some(script_handler.clone()),
@@ -293,11 +295,9 @@ async fn insert_script(
body,
)
.await;
let JsonResponse::GreptimedbV1(json) = json else {
let HttpResponse::GreptimedbV1(json) = json else {
unreachable!()
};
assert!(json.success(), "{json:?}");
assert!(json.error().is_none());
assert!(json.output().is_empty());
}
@@ -317,7 +317,7 @@ def test(n) -> vector[i64]:
insert_script(script.clone(), script_handler.clone(), sql_handler.clone()).await;
// Run the script
let exec = create_script_query();
let Json(json) = script_handler::run_script(
let json = script_handler::run_script(
State(ApiState {
sql_handler,
script_handler: Some(script_handler),
@@ -325,14 +325,11 @@ def test(n) -> vector[i64]:
exec,
)
.await;
let JsonResponse::GreptimedbV1(json) = json else {
let HttpResponse::GreptimedbV1(json) = json else {
unreachable!()
};
assert!(json.success(), "{json:?}");
assert!(json.error().is_none());
match &json.output()[0] {
JsonOutput::Records(records) => {
GreptimeQueryOutput::Records(records) => {
let json = serde_json::to_string_pretty(&records).unwrap();
assert_eq!(5, records.num_rows());
assert_eq!(
@@ -387,7 +384,7 @@ def test(n, **params) -> vector[i64]:
// Run the script
let mut exec = create_script_query();
let _ = exec.0.params.insert("a".to_string(), "42".to_string());
let Json(json) = script_handler::run_script(
let json = script_handler::run_script(
State(ApiState {
sql_handler,
script_handler: Some(script_handler),
@@ -395,14 +392,11 @@ def test(n, **params) -> vector[i64]:
exec,
)
.await;
let JsonResponse::GreptimedbV1(json) = json else {
let HttpResponse::GreptimedbV1(json) = json else {
unreachable!()
};
assert!(json.success(), "{json:?}");
assert!(json.error().is_none());
match &json.output()[0] {
JsonOutput::Records(records) => {
GreptimeQueryOutput::Records(records) => {
let json = serde_json::to_string_pretty(&records).unwrap();
assert_eq!(5, records.num_rows());
assert_eq!(

View File

@@ -26,6 +26,7 @@ use query::parser::PromQuery;
use query::plan::LogicalPlan;
use query::query_engine::DescribeResult;
use servers::error::{Error, Result};
use servers::http::header::GREPTIME_DB_HEADER_FORMAT;
use servers::http::{HttpOptions, HttpServerBuilder};
use servers::influxdb::InfluxdbRequest;
use servers::query_handler::grpc::GrpcQueryHandler;
@@ -169,7 +170,11 @@ async fn test_influxdb_write() {
.await;
assert_eq!(result.status(), 401);
assert_eq!(
"{\"type\":\"InfluxdbV1\",\"results\":[],\"error\":\"Username and password does not match, username: greptime\"}",
result.headers().get(GREPTIME_DB_HEADER_FORMAT).unwrap(),
"influxdb_v1",
);
assert_eq!(
"{\"code\":7002,\"error\":\"Username and password does not match, username: greptime\",\"execution_time_ms\":0}",
result.text().await
);
@@ -181,7 +186,11 @@ async fn test_influxdb_write() {
.await;
assert_eq!(result.status(), 401);
assert_eq!(
"{\"type\":\"InfluxdbV1\",\"results\":[],\"error\":\"Not found influx http authorization info\"}",
result.headers().get(GREPTIME_DB_HEADER_FORMAT).unwrap(),
"influxdb_v1",
);
assert_eq!(
"{\"code\":7003,\"error\":\"Not found influx http authorization info\",\"execution_time_ms\":0}",
result.text().await
);

View File

@@ -19,10 +19,12 @@ use axum::http::StatusCode;
use axum_test_helper::TestClient;
use common_error::status_code::StatusCode as ErrorCode;
use serde_json::json;
use servers::http::error_result::ErrorResponse;
use servers::http::greptime_result_v1::GreptimedbV1Response;
use servers::http::handler::HealthResponse;
use servers::http::influxdb_result_v1::InfluxdbOutput;
use servers::http::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response};
use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse};
use servers::http::{JsonOutput, JsonResponse};
use servers::http::GreptimeQueryOutput;
use tests_integration::test_util::{
setup_test_http_app, setup_test_http_app_with_frontend,
setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend,
@@ -123,13 +125,9 @@ pub async fn test_sql_api(store_type: StorageType) {
let res = client.get("/v1/sql").send().await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
let JsonResponse::GreptimedbV1(body) = body else {
unreachable!()
};
let body = serde_json::from_str::<ErrorResponse>(&res.text().await).unwrap();
assert_eq!(body.code(), 1004);
assert_eq!(body.error().unwrap(), "sql parameter is required.");
let _ = body.execution_time_ms().unwrap();
assert_eq!(body.error(), "sql parameter is required.");
let res = client
.get("/v1/sql?sql=select * from numbers limit 10")
@@ -137,18 +135,12 @@ pub async fn test_sql_api(store_type: StorageType) {
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
let JsonResponse::GreptimedbV1(body) = body else {
unreachable!()
};
assert!(body.success());
let _ = body.execution_time_ms().unwrap();
let body = serde_json::from_str::<GreptimedbV1Response>(&res.text().await).unwrap();
let output = body.output();
assert_eq!(output.len(), 1);
assert_eq!(
output[0],
serde_json::from_value::<JsonOutput>(json!({
serde_json::from_value::<GreptimeQueryOutput>(json!({
"records" :{"schema":{"column_schemas":[{"name":"number","data_type":"UInt32"}]},"rows":[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]]}
})).unwrap()
);
@@ -160,13 +152,7 @@ pub async fn test_sql_api(store_type: StorageType) {
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
let JsonResponse::InfluxdbV1(body) = body else {
unreachable!()
};
assert!(body.success());
let _ = body.execution_time_ms().unwrap();
let body = serde_json::from_str::<InfluxdbV1Response>(&res.text().await).unwrap();
let output = body.results();
assert_eq!(output.len(), 1);
assert_eq!(
@@ -190,18 +176,13 @@ pub async fn test_sql_api(store_type: StorageType) {
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
let JsonResponse::GreptimedbV1(body) = body else {
unreachable!()
};
assert!(body.success());
let _ = body.execution_time_ms().unwrap();
let body = serde_json::from_str::<GreptimedbV1Response>(&res.text().await).unwrap();
let output = body.output();
assert_eq!(output.len(), 1);
assert_eq!(
output[0],
serde_json::from_value::<JsonOutput>(json!({
serde_json::from_value::<GreptimeQueryOutput>(json!({
"records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[["host",66.6,1024.0,0]]}
})).unwrap()
);
@@ -213,18 +194,13 @@ pub async fn test_sql_api(store_type: StorageType) {
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
let JsonResponse::GreptimedbV1(body) = body else {
unreachable!()
};
assert!(body.success());
let _ = body.execution_time_ms().unwrap();
let body = serde_json::from_str::<GreptimedbV1Response>(&res.text().await).unwrap();
let output = body.output();
assert_eq!(output.len(), 1);
assert_eq!(
output[0],
serde_json::from_value::<JsonOutput>(json!({
serde_json::from_value::<GreptimeQueryOutput>(json!({
"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]}
})).unwrap()
);
@@ -236,17 +212,12 @@ pub async fn test_sql_api(store_type: StorageType) {
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
let JsonResponse::GreptimedbV1(body) = body else {
unreachable!()
};
assert!(body.success());
let _ = body.execution_time_ms().unwrap();
let body = serde_json::from_str::<GreptimedbV1Response>(&res.text().await).unwrap();
let output = body.output();
assert_eq!(output.len(), 1);
assert_eq!(
output[0],
serde_json::from_value::<JsonOutput>(json!({
serde_json::from_value::<GreptimeQueryOutput>(json!({
"records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]}
})).unwrap()
);
@@ -258,23 +229,18 @@ pub async fn test_sql_api(store_type: StorageType) {
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
let JsonResponse::GreptimedbV1(body) = body else {
unreachable!()
};
assert!(body.success());
let _ = body.execution_time_ms().unwrap();
let body = serde_json::from_str::<GreptimedbV1Response>(&res.text().await).unwrap();
let outputs = body.output();
assert_eq!(outputs.len(), 2);
assert_eq!(
outputs[0],
serde_json::from_value::<JsonOutput>(json!({
serde_json::from_value::<GreptimeQueryOutput>(json!({
"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]}
})).unwrap()
);
assert_eq!(
outputs[1],
serde_json::from_value::<JsonOutput>(json!({
serde_json::from_value::<GreptimeQueryOutput>(json!({
"records":{"rows":[]}
}))
.unwrap()
@@ -287,14 +253,9 @@ pub async fn test_sql_api(store_type: StorageType) {
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
let JsonResponse::GreptimedbV1(body) = body else {
unreachable!()
};
assert!(!body.success());
let _ = body.execution_time_ms().unwrap();
let _body = serde_json::from_str::<ErrorResponse>(&res.text().await).unwrap();
// TODO(shuiyisong): fix this when return source err msg to client side
// assert!(body.error().unwrap().contains("Table not found"));
// assert!(body.error().contains("Table not found"));
// test database given
let res = client
@@ -303,17 +264,12 @@ pub async fn test_sql_api(store_type: StorageType) {
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
let JsonResponse::GreptimedbV1(body) = body else {
unreachable!()
};
assert!(body.success());
let _ = body.execution_time_ms().unwrap();
let body = serde_json::from_str::<GreptimedbV1Response>(&res.text().await).unwrap();
let outputs = body.output();
assert_eq!(outputs.len(), 1);
assert_eq!(
outputs[0],
serde_json::from_value::<JsonOutput>(json!({
serde_json::from_value::<GreptimeQueryOutput>(json!({
"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]}
})).unwrap()
);
@@ -324,10 +280,7 @@ pub async fn test_sql_api(store_type: StorageType) {
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
let JsonResponse::GreptimedbV1(body) = body else {
unreachable!()
};
let body = serde_json::from_str::<ErrorResponse>(&res.text().await).unwrap();
assert_eq!(body.code(), ErrorCode::DatabaseNotFound as u32);
// test catalog-schema given
@@ -337,17 +290,12 @@ pub async fn test_sql_api(store_type: StorageType) {
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
let JsonResponse::GreptimedbV1(body) = body else {
unreachable!()
};
assert!(body.success());
let _ = body.execution_time_ms().unwrap();
let body = serde_json::from_str::<GreptimedbV1Response>(&res.text().await).unwrap();
let outputs = body.output();
assert_eq!(outputs.len(), 1);
assert_eq!(
outputs[0],
serde_json::from_value::<JsonOutput>(json!({
serde_json::from_value::<GreptimeQueryOutput>(json!({
"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]}
})).unwrap()
);
@@ -358,10 +306,7 @@ pub async fn test_sql_api(store_type: StorageType) {
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
let JsonResponse::GreptimedbV1(body) = body else {
unreachable!()
};
let body = serde_json::from_str::<ErrorResponse>(&res.text().await).unwrap();
assert_eq!(body.code(), ErrorCode::DatabaseNotFound as u32);
// test invalid schema
@@ -370,10 +315,7 @@ pub async fn test_sql_api(store_type: StorageType) {
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
let JsonResponse::GreptimedbV1(body) = body else {
unreachable!()
};
let body = serde_json::from_str::<ErrorResponse>(&res.text().await).unwrap();
assert_eq!(body.code(), ErrorCode::DatabaseNotFound as u32);
guard.remove_all().await;
@@ -389,13 +331,7 @@ pub async fn test_prometheus_promql_api(store_type: StorageType) {
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
let JsonResponse::GreptimedbV1(body) = body else {
unreachable!()
};
assert!(body.success());
let _ = body.execution_time_ms().unwrap();
let _body = serde_json::from_str::<GreptimedbV1Response>(&res.text().await).unwrap();
guard.remove_all().await;
}
@@ -605,11 +541,7 @@ def test(n) -> vector[f64]:
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
let JsonResponse::GreptimedbV1(body) = body else {
unreachable!()
};
assert_eq!(body.code(), 0);
let body = serde_json::from_str::<GreptimedbV1Response>(&res.text().await).unwrap();
assert!(body.output().is_empty());
// call script
@@ -618,18 +550,12 @@ def test(n) -> vector[f64]:
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
let JsonResponse::GreptimedbV1(body) = body else {
unreachable!()
};
assert_eq!(body.code(), 0);
let _ = body.execution_time_ms().unwrap();
let body = serde_json::from_str::<GreptimedbV1Response>(&res.text().await).unwrap();
let output = body.output();
assert_eq!(output.len(), 1);
assert_eq!(
output[0],
serde_json::from_value::<JsonOutput>(json!({
serde_json::from_value::<GreptimeQueryOutput>(json!({
"records":{"schema":{"column_schemas":[{"name":"n","data_type":"Float64"}]},"rows":[[1.0],[2.0],[3.0],[4.0],[5.0],[6.0],[7.0],[8.0],[9.0],[10.0]]}
})).unwrap()
);