feat: status_code in response header (#1982)

* feat: status_code in response header

* chore: parese grpc response

* fix: sqlness failed

* chore: fix sqlness
This commit is contained in:
JeremyHi
2023-07-19 19:27:49 +08:00
committed by GitHub
parent 2e2a82689c
commit 2ef0d06cdb
12 changed files with 199 additions and 54 deletions

View File

@@ -17,9 +17,9 @@ use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{
greptime_response, AffectedRows, AlterExpr, AuthHeader, CompactTableExpr, CreateTableExpr,
DdlRequest, DeleteRequest, DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequests,
PromRangeQuery, QueryRequest, RequestHeader,
AlterExpr, AuthHeader, CompactTableExpr, CreateTableExpr, DdlRequest, DeleteRequest,
DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequests, PromRangeQuery, QueryRequest,
RequestHeader,
};
use arrow_flight::{FlightData, Ticket};
use common_error::ext::{BoxedError, ErrorExt};
@@ -28,12 +28,10 @@ use common_query::Output;
use common_telemetry::{logging, timer};
use futures_util::{TryFutureExt, TryStreamExt};
use prost::Message;
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{ensure, ResultExt};
use crate::error::{
ConvertFlightDataSnafu, IllegalDatabaseResponseSnafu, IllegalFlightMessagesSnafu,
};
use crate::{error, metrics, Client, Result, StreamInserter};
use crate::error::{ConvertFlightDataSnafu, IllegalFlightMessagesSnafu, ServerSnafu};
use crate::{error, from_grpc_response, metrics, Client, Result, StreamInserter};
#[derive(Clone, Debug, Default)]
pub struct Database {
@@ -142,16 +140,8 @@ impl Database {
async fn handle(&self, request: Request) -> Result<u32> {
let mut client = self.client.make_database_client()?.inner;
let request = self.to_rpc_request(request);
let response = client
.handle(request)
.await?
.into_inner()
.response
.context(IllegalDatabaseResponseSnafu {
err_msg: "GreptimeResponse is empty",
})?;
let greptime_response::Response::AffectedRows(AffectedRows { value }) = response;
Ok(value)
let response = client.handle(request).await?.into_inner();
from_grpc_response(response)
}
#[inline]
@@ -264,7 +254,7 @@ impl Database {
let e: error::Error = e.into();
let code = e.status_code();
let msg = e.to_string();
error::ServerSnafu { code, msg }
ServerSnafu { code, msg }
.fail::<()>()
.map_err(BoxedError::new)
.context(error::FlightGetSnafu {

View File

@@ -21,9 +21,42 @@ mod metrics;
mod stream_insert;
pub use api;
use api::v1::greptime_response::Response;
use api::v1::{AffectedRows, GreptimeResponse};
pub use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::status_code::StatusCode;
use snafu::OptionExt;
pub use self::client::Client;
pub use self::database::Database;
pub use self::error::{Error, Result};
pub use self::stream_insert::StreamInserter;
use crate::error::{IllegalDatabaseResponseSnafu, ServerSnafu};
pub fn from_grpc_response(response: GreptimeResponse) -> Result<u32> {
let header = response.header.context(IllegalDatabaseResponseSnafu {
err_msg: "missing header",
})?;
let status = header.status.context(IllegalDatabaseResponseSnafu {
err_msg: "missing status",
})?;
if StatusCode::is_success(status.status_code) {
let res = response.response.context(IllegalDatabaseResponseSnafu {
err_msg: "missing response",
})?;
match res {
Response::AffectedRows(AffectedRows { value }) => Ok(value),
}
} else {
let status_code =
StatusCode::from_u32(status.status_code).context(IllegalDatabaseResponseSnafu {
err_msg: format!("invalid status: {:?}", status),
})?;
ServerSnafu {
code: status_code,
msg: status.err_msg,
}
.fail()
}
}

View File

@@ -15,17 +15,16 @@
use api::v1::greptime_database_client::GreptimeDatabaseClient;
use api::v1::greptime_request::Request;
use api::v1::{
greptime_response, AffectedRows, AuthHeader, GreptimeRequest, GreptimeResponse, InsertRequest,
InsertRequests, RequestHeader,
AuthHeader, GreptimeRequest, GreptimeResponse, InsertRequest, InsertRequests, RequestHeader,
};
use snafu::OptionExt;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Channel;
use tonic::{Response, Status};
use crate::error::{self, IllegalDatabaseResponseSnafu, Result};
use crate::error::{self, Result};
use crate::from_grpc_response;
/// A structure that provides some methods for streaming data insert.
///
@@ -89,17 +88,8 @@ impl StreamInserter {
drop(self.sender);
let response = self.join.await.unwrap()?;
let response = response
.into_inner()
.response
.context(IllegalDatabaseResponseSnafu {
err_msg: "GreptimeResponse is empty",
})?;
let greptime_response::Response::AffectedRows(AffectedRows { value }) = response;
Ok(value)
let response = response.into_inner();
from_grpc_response(response)
}
fn to_rpc_request(&self, request: Request) -> GreptimeRequest {

View File

@@ -154,6 +154,44 @@ impl StatusCode {
| StatusCode::AccessDenied => false,
}
}
pub fn from_u32(value: u32) -> Option<Self> {
match value {
v if v == StatusCode::Success as u32 => Some(StatusCode::Success),
v if v == StatusCode::Unknown as u32 => Some(StatusCode::Unknown),
v if v == StatusCode::Unsupported as u32 => Some(StatusCode::Unsupported),
v if v == StatusCode::Unexpected as u32 => Some(StatusCode::Unexpected),
v if v == StatusCode::Internal as u32 => Some(StatusCode::Internal),
v if v == StatusCode::InvalidArguments as u32 => Some(StatusCode::InvalidArguments),
v if v == StatusCode::Cancelled as u32 => Some(StatusCode::Cancelled),
v if v == StatusCode::InvalidSyntax as u32 => Some(StatusCode::InvalidSyntax),
v if v == StatusCode::PlanQuery as u32 => Some(StatusCode::PlanQuery),
v if v == StatusCode::EngineExecuteQuery as u32 => Some(StatusCode::EngineExecuteQuery),
v if v == StatusCode::TableAlreadyExists as u32 => Some(StatusCode::TableAlreadyExists),
v if v == StatusCode::TableNotFound as u32 => Some(StatusCode::TableNotFound),
v if v == StatusCode::TableColumnNotFound as u32 => {
Some(StatusCode::TableColumnNotFound)
}
v if v == StatusCode::TableColumnExists as u32 => Some(StatusCode::TableColumnExists),
v if v == StatusCode::DatabaseNotFound as u32 => Some(StatusCode::DatabaseNotFound),
v if v == StatusCode::StorageUnavailable as u32 => Some(StatusCode::StorageUnavailable),
v if v == StatusCode::RuntimeResourcesExhausted as u32 => {
Some(StatusCode::RuntimeResourcesExhausted)
}
v if v == StatusCode::RateLimited as u32 => Some(StatusCode::RateLimited),
v if v == StatusCode::UserNotFound as u32 => Some(StatusCode::UserNotFound),
v if v == StatusCode::UnsupportedPasswordType as u32 => {
Some(StatusCode::UnsupportedPasswordType)
}
v if v == StatusCode::UserPasswordMismatch as u32 => {
Some(StatusCode::UserPasswordMismatch)
}
v if v == StatusCode::AuthHeaderNotFound as u32 => Some(StatusCode::AuthHeaderNotFound),
v if v == StatusCode::InvalidAuthHeader as u32 => Some(StatusCode::InvalidAuthHeader),
v if v == StatusCode::AccessDenied as u32 => Some(StatusCode::AccessDenied),
_ => None,
}
}
}
impl fmt::Display for StatusCode {

View File

@@ -16,8 +16,10 @@ use std::sync::Arc;
use api::v1::greptime_database_server::GreptimeDatabase;
use api::v1::greptime_response::Response as RawResponse;
use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse};
use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader};
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use futures::StreamExt;
use tonic::{Request, Response, Status, Streaming};
@@ -44,13 +46,27 @@ impl GreptimeDatabase for DatabaseService {
let request = request.into_inner();
let output = self.handler.handle_request(request).await?;
let response = match output {
Output::AffectedRows(rows) => GreptimeResponse {
header: None,
Ok(Output::AffectedRows(rows)) => GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
}),
}),
response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
},
Output::Stream(_) | Output::RecordBatches(_) => {
Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => {
return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
}
Err(e) => GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: e.status_code() as _,
err_msg: e.to_string(),
}),
}),
response: None,
},
};
Ok(Response::new(response))
}
@@ -66,17 +82,26 @@ impl GreptimeDatabase for DatabaseService {
let request = request?;
let output = self.handler.handle_request(request).await?;
match output {
Output::AffectedRows(rows) => affected_rows += rows,
Output::Stream(_) | Output::RecordBatches(_) => {
Ok(Output::AffectedRows(rows)) => affected_rows += rows,
Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => {
return Err(Status::unimplemented(
"GreptimeDatabase::HandleRequests for query",
));
}
Err(e) => {
// We directly convert it to a tonic error and fail immediately in stream.
return Err(e.into());
}
}
}
let response = GreptimeResponse {
header: None,
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
}),
}),
response: Some(RawResponse::AffectedRows(AffectedRows {
value: affected_rows as u32,
})),

View File

@@ -89,7 +89,7 @@ impl FlightService for FlightHandler {
let request =
GreptimeRequest::decode(ticket.as_ref()).context(error::InvalidFlightTicketSnafu)?;
let output = self.handler.handle_request(request).await?;
let output = self.handler.handle_request(request).await??;
let stream = to_flight_data_stream(output);
Ok(Response::new(stream))

View File

@@ -26,11 +26,12 @@ use common_telemetry::logging;
use metrics::{histogram, increment_counter};
use session::context::{QueryContext, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use tonic::Status;
use crate::auth::{Identity, Password, UserProviderRef};
use crate::error::Error::UnsupportedAuthScheme;
use crate::error::{AuthSnafu, InvalidQuerySnafu, JoinTaskSnafu, NotFoundAuthHeaderSnafu};
use crate::error::{
AuthSnafu, InvalidQuerySnafu, JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result as InternalResult,
};
use crate::grpc::TonicResult;
use crate::metrics::{
METRIC_AUTH_FAILURE, METRIC_CODE_LABEL, METRIC_SERVER_GRPC_DB_REQUEST_TIMER,
@@ -57,7 +58,10 @@ impl GreptimeRequestHandler {
}
}
pub(crate) async fn handle_request(&self, request: GreptimeRequest) -> TonicResult<Output> {
pub(crate) async fn handle_request(
&self,
request: GreptimeRequest,
) -> TonicResult<InternalResult<Output>> {
let query = request.request.context(InvalidQuerySnafu {
reason: "Expecting non-empty GreptimeRequest.",
})?;
@@ -65,7 +69,7 @@ impl GreptimeRequestHandler {
let header = request.header.as_ref();
let query_ctx = create_query_context(header);
self.auth(header, &query_ctx).await?;
let _ = self.auth(header, &query_ctx).await?;
let handler = self.handler.clone();
let request_type = request_type(&query);
@@ -94,7 +98,7 @@ impl GreptimeRequestHandler {
let output = handle.await.context(JoinTaskSnafu).map_err(|e| {
timer.record(e.status_code());
e
})??;
})?;
Ok(output)
}
@@ -102,8 +106,8 @@ impl GreptimeRequestHandler {
&self,
header: Option<&RequestHeader>,
query_ctx: &QueryContextRef,
) -> TonicResult<()> {
let Some(user_provider) = self.user_provider.as_ref() else { return Ok(()) };
) -> TonicResult<InternalResult<()>> {
let Some(user_provider) = self.user_provider.as_ref() else { return Ok(Ok(())) };
let auth_scheme = header
.and_then(|header| {
@@ -114,7 +118,7 @@ impl GreptimeRequestHandler {
})
.context(NotFoundAuthHeaderSnafu)?;
let _ = match auth_scheme {
let res = match auth_scheme {
AuthScheme::Basic(Basic { username, password }) => user_provider
.auth(
Identity::UserId(&username, None),
@@ -128,14 +132,15 @@ impl GreptimeRequestHandler {
name: "Token AuthScheme".to_string(),
}),
}
.map(|_| ())
.map_err(|e| {
increment_counter!(
METRIC_AUTH_FAILURE,
&[(METRIC_CODE_LABEL, format!("{}", e.status_code()))]
);
Status::unauthenticated(e.to_string())
})?;
Ok(())
e
});
Ok(res)
}
}

View File

@@ -22,6 +22,7 @@ use api::v1::promql_request::Promql;
use api::v1::{PromqlRequest, PromqlResponse, ResponseHeader};
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_telemetry::timer;
use common_time::util::current_time_rfc3339;
use promql_parser::parser::ValueType;
@@ -80,7 +81,12 @@ impl PrometheusGateway for PrometheusGatewayService {
let json_bytes = serde_json::to_string(&json_response).unwrap().into_bytes();
let response = Response::new(PromqlResponse {
header: Some(ResponseHeader { status: None }),
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
}),
}),
body: json_bytes,
});
Ok(response)