mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
feat: improve http error message (#2767)
* feat: improve http error message * feat: log http error * style: fix clippy * test: fix test
This commit is contained in:
@@ -382,6 +382,13 @@ pub enum Error {
|
||||
actual: i32,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert to json"))]
|
||||
ToJson {
|
||||
#[snafu(source)]
|
||||
error: serde_json::error::Error,
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -475,6 +482,8 @@ impl ErrorExt for Error {
|
||||
Metrics { source } => source.status_code(),
|
||||
|
||||
ConvertScalarValue { source, .. } => source.status_code(),
|
||||
|
||||
ToJson { .. } => StatusCode::Internal,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -48,6 +48,7 @@ use common_error::status_code::StatusCode;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::{util, RecordBatch};
|
||||
use common_telemetry::logging::{self, info};
|
||||
use common_telemetry::{debug, error};
|
||||
use datatypes::data_type::DataType;
|
||||
use futures::FutureExt;
|
||||
use schemars::JsonSchema;
|
||||
@@ -61,10 +62,10 @@ use tower::ServiceBuilder;
|
||||
use tower_http::auth::AsyncRequireAuthorizationLayer;
|
||||
use tower_http::trace::TraceLayer;
|
||||
|
||||
use self::authorize::HttpAuth;
|
||||
use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
|
||||
use crate::configurator::ConfiguratorRef;
|
||||
use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu};
|
||||
use crate::error::{AlreadyStartedSnafu, Error, Result, StartHttpSnafu, ToJsonSnafu};
|
||||
use crate::http::authorize::HttpAuth;
|
||||
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
|
||||
use crate::http::prometheus::{
|
||||
format_query, instant_query, label_values_query, labels_query, range_query, series_query,
|
||||
};
|
||||
@@ -185,7 +186,7 @@ impl HttpRecordsOutput {
|
||||
}
|
||||
|
||||
impl TryFrom<Vec<RecordBatch>> for HttpRecordsOutput {
|
||||
type Error = String;
|
||||
type Error = Error;
|
||||
|
||||
// TODO(sunng87): use schema from recordstreams when #366 fixed
|
||||
fn try_from(
|
||||
@@ -218,8 +219,9 @@ impl TryFrom<Vec<RecordBatch>> for HttpRecordsOutput {
|
||||
for row in recordbatch.rows() {
|
||||
let value_row = row
|
||||
.into_iter()
|
||||
.map(|f| Value::try_from(f).map_err(|err| err.to_string()))
|
||||
.collect::<std::result::Result<Vec<Value>, _>>()?;
|
||||
.map(Value::try_from)
|
||||
.collect::<std::result::Result<Vec<Value>, _>>()
|
||||
.context(ToJsonSnafu)?;
|
||||
|
||||
rows.push(value_row);
|
||||
}
|
||||
@@ -252,9 +254,25 @@ pub struct JsonResponse {
|
||||
}
|
||||
|
||||
impl JsonResponse {
|
||||
pub fn with_error(error: String, error_code: StatusCode) -> Self {
|
||||
pub fn with_error(error: impl ErrorExt) -> Self {
|
||||
let code = error.status_code();
|
||||
if code.should_log_error() {
|
||||
error!(error; "Failed to handle HTTP request");
|
||||
} else {
|
||||
debug!("Failed to handle HTTP request, err: {:?}", error);
|
||||
}
|
||||
|
||||
JsonResponse {
|
||||
error: Some(error),
|
||||
error: Some(error.output_msg()),
|
||||
code: code as u32,
|
||||
output: None,
|
||||
execution_time_ms: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn with_error_message(err_msg: String, error_code: StatusCode) -> Self {
|
||||
JsonResponse {
|
||||
error: Some(err_msg),
|
||||
code: error_code as u32,
|
||||
output: None,
|
||||
execution_time_ms: None,
|
||||
@@ -293,12 +311,12 @@ impl JsonResponse {
|
||||
results.push(JsonOutput::Records(rows));
|
||||
}
|
||||
Err(err) => {
|
||||
return Self::with_error(err, StatusCode::Internal);
|
||||
return Self::with_error(err);
|
||||
}
|
||||
},
|
||||
|
||||
Err(e) => {
|
||||
return Self::with_error(e.output_msg(), e.status_code());
|
||||
return Self::with_error(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -307,11 +325,11 @@ impl JsonResponse {
|
||||
results.push(JsonOutput::Records(rows));
|
||||
}
|
||||
Err(err) => {
|
||||
return Self::with_error(err, StatusCode::Internal);
|
||||
return Self::with_error(err);
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
return Self::with_error(e.output_msg(), e.status_code());
|
||||
return Self::with_error(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -756,7 +774,7 @@ impl Server for HttpServer {
|
||||
async fn handle_error(err: BoxError) -> Json<JsonResponse> {
|
||||
logging::error!("Unhandled internal error: {}", err);
|
||||
|
||||
Json(JsonResponse::with_error(
|
||||
Json(JsonResponse::with_error_message(
|
||||
format!("Unhandled internal error: {err}"),
|
||||
StatusCode::Unexpected,
|
||||
))
|
||||
|
||||
@@ -20,6 +20,7 @@ use aide::transform::TransformOperation;
|
||||
use axum::extract::{Json, Query, State};
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::{Extension, Form};
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use query::parser::PromQuery;
|
||||
use schemars::JsonSchema;
|
||||
@@ -60,7 +61,7 @@ pub async fn sql(
|
||||
|
||||
JsonResponse::from_output(sql_handler.do_query(sql, query_ctx).await).await
|
||||
} else {
|
||||
JsonResponse::with_error(
|
||||
JsonResponse::with_error_message(
|
||||
"sql parameter is required.".to_string(),
|
||||
StatusCode::InvalidArguments,
|
||||
)
|
||||
@@ -191,14 +192,15 @@ async fn validate_schema(
|
||||
.is_valid_schema(query_ctx.current_catalog(), query_ctx.current_schema())
|
||||
.await
|
||||
{
|
||||
Ok(false) => Some(JsonResponse::with_error(
|
||||
Ok(false) => Some(JsonResponse::with_error_message(
|
||||
format!("Database not found: {}", query_ctx.get_db_string()),
|
||||
StatusCode::DatabaseNotFound,
|
||||
)),
|
||||
Err(e) => Some(JsonResponse::with_error(
|
||||
Err(e) => Some(JsonResponse::with_error_message(
|
||||
format!(
|
||||
"Error checking database: {}, {e}",
|
||||
query_ctx.get_db_string()
|
||||
"Error checking database: {}, {}",
|
||||
query_ctx.get_db_string(),
|
||||
e.output_msg(),
|
||||
),
|
||||
StatusCode::Internal,
|
||||
)),
|
||||
|
||||
@@ -18,22 +18,22 @@ use std::time::Instant;
|
||||
use axum::extract::{Json, Query, RawBody, State};
|
||||
use common_catalog::consts::DEFAULT_CATALOG_NAME;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session::context::QueryContext;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{HyperSnafu, InvalidUtf8ValueSnafu};
|
||||
use crate::http::{ApiState, JsonResponse};
|
||||
|
||||
macro_rules! json_err {
|
||||
($e: expr) => {{
|
||||
return Json(JsonResponse::with_error(
|
||||
format!("Invalid argument: {}", $e),
|
||||
common_error::status_code::StatusCode::InvalidArguments,
|
||||
));
|
||||
return Json(JsonResponse::with_error($e));
|
||||
}};
|
||||
|
||||
($msg: expr, $code: expr) => {{
|
||||
return Json(JsonResponse::with_error($msg.to_string(), $code));
|
||||
return Json(JsonResponse::with_error_message($msg.to_string(), $code));
|
||||
}};
|
||||
}
|
||||
|
||||
@@ -60,18 +60,19 @@ pub async fn scripts(
|
||||
let schema = params.db.as_ref();
|
||||
|
||||
if schema.is_none() || schema.unwrap().is_empty() {
|
||||
json_err!("invalid schema")
|
||||
json_err!("invalid schema", StatusCode::InvalidArguments)
|
||||
}
|
||||
|
||||
let name = params.name.as_ref();
|
||||
|
||||
if name.is_none() || name.unwrap().is_empty() {
|
||||
json_err!("invalid name");
|
||||
json_err!("invalid name", StatusCode::InvalidArguments);
|
||||
}
|
||||
|
||||
let bytes = unwrap_or_json_err!(hyper::body::to_bytes(body).await);
|
||||
let bytes = unwrap_or_json_err!(hyper::body::to_bytes(body).await.context(HyperSnafu));
|
||||
|
||||
let script = unwrap_or_json_err!(String::from_utf8(bytes.to_vec()));
|
||||
let script =
|
||||
unwrap_or_json_err!(String::from_utf8(bytes.to_vec()).context(InvalidUtf8ValueSnafu));
|
||||
|
||||
// Safety: schema and name are already checked above.
|
||||
let query_ctx = QueryContext::with(&catalog, schema.unwrap());
|
||||
@@ -80,12 +81,18 @@ pub async fn scripts(
|
||||
.await
|
||||
{
|
||||
Ok(()) => JsonResponse::with_output(None),
|
||||
Err(e) => json_err!(format!("Insert script error: {e}"), e.status_code()),
|
||||
Err(e) => json_err!(
|
||||
format!("Insert script error: {}", e.output_msg()),
|
||||
e.status_code()
|
||||
),
|
||||
};
|
||||
|
||||
Json(body)
|
||||
} else {
|
||||
json_err!("Script execution not supported, missing script handler");
|
||||
json_err!(
|
||||
"Script execution not supported, missing script handler",
|
||||
StatusCode::Unsupported
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,13 +119,13 @@ pub async fn run_script(
|
||||
let schema = params.db.as_ref();
|
||||
|
||||
if schema.is_none() || schema.unwrap().is_empty() {
|
||||
json_err!("invalid schema")
|
||||
json_err!("invalid schema", StatusCode::InvalidArguments)
|
||||
}
|
||||
|
||||
let name = params.name.as_ref();
|
||||
|
||||
if name.is_none() || name.unwrap().is_empty() {
|
||||
json_err!("invalid name");
|
||||
json_err!("invalid name", StatusCode::InvalidArguments);
|
||||
}
|
||||
|
||||
// Safety: schema and name are already checked above.
|
||||
@@ -130,6 +137,9 @@ pub async fn run_script(
|
||||
|
||||
Json(resp.with_execution_time(start.elapsed().as_millis()))
|
||||
} else {
|
||||
json_err!("Script execution not supported, missing script handler");
|
||||
json_err!(
|
||||
"Script execution not supported, missing script handler",
|
||||
StatusCode::Unsupported
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -182,7 +182,7 @@ async fn insert_script(
|
||||
)
|
||||
.await;
|
||||
assert!(!json.success(), "{json:?}");
|
||||
assert_eq!(json.error().unwrap(), "Invalid argument: invalid schema");
|
||||
assert_eq!(json.error().unwrap(), "invalid schema");
|
||||
|
||||
let body = RawBody(Body::from(script.clone()));
|
||||
let exec = create_script_query();
|
||||
|
||||
Reference in New Issue
Block a user