feat: grpc handler result (#2107)

* feat: grpc handler inner result

* feat: ext header, x-greptime-err-code, x-greptime-err-msg

* fix: sqlness case

* chore: by comment

* fix: convert status to Error
This commit is contained in:
JeremyHi
2023-08-15 17:34:00 +08:00
committed by GitHub
parent f5e44ba4cf
commit 24dc827ff9
10 changed files with 34 additions and 117 deletions

View File

@@ -13,11 +13,10 @@
// limitations under the License.
use std::any::Any;
use std::str::FromStr;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_error::{INNER_ERROR_CODE, INNER_ERROR_MSG};
use common_error::{GREPTIME_ERROR_CODE, GREPTIME_ERROR_MSG};
use snafu::{Location, Snafu};
use tonic::{Code, Status};
@@ -107,11 +106,17 @@ impl From<Status> for Error {
.and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
}
let code = get_metadata_value(&e, INNER_ERROR_CODE)
.and_then(|s| StatusCode::from_str(&s).ok())
let code = get_metadata_value(&e, GREPTIME_ERROR_CODE)
.and_then(|s| {
if let Ok(code) = s.parse::<u32>() {
StatusCode::from_u32(code)
} else {
None
}
})
.unwrap_or(StatusCode::Unknown);
let msg = get_metadata_value(&e, INNER_ERROR_MSG).unwrap_or(e.to_string());
let msg = get_metadata_value(&e, GREPTIME_ERROR_MSG).unwrap_or(e.to_string());
Self::Server { code, msg }
}

View File

@@ -17,7 +17,7 @@ pub mod format;
pub mod mock;
pub mod status_code;
pub const INNER_ERROR_CODE: &str = "INNER_ERROR_CODE";
pub const INNER_ERROR_MSG: &str = "INNER_ERROR_MSG";
pub const GREPTIME_ERROR_CODE: &str = "x-greptime-err-code";
pub const GREPTIME_ERROR_MSG: &str = "x-greptime-err-msg";
pub use snafu;

View File

@@ -22,7 +22,7 @@ use base64::DecodeError;
use catalog;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_error::{INNER_ERROR_CODE, INNER_ERROR_MSG};
use common_error::{GREPTIME_ERROR_CODE, GREPTIME_ERROR_MSG};
use common_telemetry::logging;
use datatypes::prelude::ConcreteDataType;
use query::parser::PromQuery;
@@ -462,12 +462,10 @@ impl From<Error> for tonic::Status {
// If either of the status_code or error msg cannot convert to valid HTTP header value
// (which is a very rare case), just ignore. Client will use Tonic status code and message.
let status_code = err.status_code();
if let Ok(code) = HeaderValue::from_bytes(status_code.to_string().as_bytes()) {
let _ = headers.insert(INNER_ERROR_CODE, code);
}
headers.insert(GREPTIME_ERROR_CODE, HeaderValue::from(status_code as u32));
let root_error = err.iter_chain().last().unwrap();
if let Ok(err_msg) = HeaderValue::from_bytes(root_error.to_string().as_bytes()) {
let _ = headers.insert(INNER_ERROR_MSG, err_msg);
let _ = headers.insert(GREPTIME_ERROR_MSG, err_msg);
}
let metadata = MetadataMap::from_headers(headers);

View File

@@ -18,7 +18,6 @@ use api::v1::greptime_database_server::GreptimeDatabase;
use api::v1::greptime_response::Response as RawResponse;
use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader};
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use futures::StreamExt;
@@ -45,8 +44,8 @@ impl GreptimeDatabase for DatabaseService {
) -> TonicResult<Response<GreptimeResponse>> {
let request = request.into_inner();
let output = self.handler.handle_request(request).await?;
let response = match output {
Ok(Output::AffectedRows(rows)) => GreptimeResponse {
let message = match output {
Output::AffectedRows(rows) => GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
@@ -55,20 +54,11 @@ impl GreptimeDatabase for DatabaseService {
}),
response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
},
Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => {
Output::Stream(_) | Output::RecordBatches(_) => {
return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
}
Err(e) => GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: e.status_code() as _,
err_msg: e.to_string(),
}),
}),
response: None,
},
};
Ok(Response::new(response))
Ok(Response::new(message))
}
async fn handle_requests(
@@ -82,20 +72,15 @@ impl GreptimeDatabase for DatabaseService {
let request = request?;
let output = self.handler.handle_request(request).await?;
match output {
Ok(Output::AffectedRows(rows)) => affected_rows += rows,
Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => {
Output::AffectedRows(rows) => affected_rows += rows,
Output::Stream(_) | Output::RecordBatches(_) => {
return Err(Status::unimplemented(
"GreptimeDatabase::HandleRequests for query",
));
}
Err(e) => {
// We directly convert it to a tonic error and fail immediately in stream.
return Err(e.into());
}
}
}
let response = GreptimeResponse {
let message = GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
@@ -106,6 +91,6 @@ impl GreptimeDatabase for DatabaseService {
value: affected_rows as u32,
})),
};
Ok(Response::new(response))
Ok(Response::new(message))
}
}

View File

@@ -89,7 +89,7 @@ impl FlightService for FlightHandler {
let request =
GreptimeRequest::decode(ticket.as_ref()).context(error::InvalidFlightTicketSnafu)?;
let output = self.handler.handle_request(request).await??;
let output = self.handler.handle_request(request).await?;
let stream = to_flight_data_stream(output);
Ok(Response::new(stream))

View File

@@ -31,10 +31,7 @@ use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use crate::error::Error::UnsupportedAuthScheme;
use crate::error::{
AuthSnafu, InvalidQuerySnafu, JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result as InternalResult,
};
use crate::grpc::TonicResult;
use crate::error::{AuthSnafu, InvalidQuerySnafu, JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result};
use crate::metrics::{
METRIC_AUTH_FAILURE, METRIC_CODE_LABEL, METRIC_DB_LABEL, METRIC_SERVER_GRPC_DB_REQUEST_TIMER,
METRIC_TYPE_LABEL,
@@ -60,23 +57,15 @@ impl GreptimeRequestHandler {
}
}
pub(crate) async fn handle_request(
&self,
request: GreptimeRequest,
) -> TonicResult<InternalResult<Output>> {
pub(crate) async fn handle_request(&self, request: GreptimeRequest) -> Result<Output> {
let query = request.request.context(InvalidQuerySnafu {
reason: "Expecting non-empty GreptimeRequest.",
})?;
let header = request.header.as_ref();
let query_ctx = create_query_context(header);
match self.auth(header, &query_ctx).await? {
Err(e) => return Ok(Err(e)),
Ok(user_info) => {
query_ctx.set_current_user(user_info);
}
};
let user_info = self.auth(header, &query_ctx).await?;
query_ctx.set_current_user(user_info);
let handler = self.handler.clone();
let request_type = request_type(&query);
@@ -102,20 +91,19 @@ impl GreptimeRequestHandler {
})
});
let output = handle.await.context(JoinTaskSnafu).map_err(|e| {
handle.await.context(JoinTaskSnafu).map_err(|e| {
timer.record(e.status_code());
e
})?;
Ok(output)
})?
}
async fn auth(
&self,
header: Option<&RequestHeader>,
query_ctx: &QueryContextRef,
) -> TonicResult<InternalResult<Option<UserInfoRef>>> {
) -> Result<Option<UserInfoRef>> {
let Some(user_provider) = self.user_provider.as_ref() else {
return Ok(Ok(None));
return Ok(None);
};
let auth_scheme = header
@@ -127,7 +115,7 @@ impl GreptimeRequestHandler {
})
.context(NotFoundAuthHeaderSnafu)?;
let res = match auth_scheme {
match auth_scheme {
AuthScheme::Basic(Basic { username, password }) => user_provider
.auth(
Identity::UserId(&username, None),
@@ -148,8 +136,7 @@ impl GreptimeRequestHandler {
&[(METRIC_CODE_LABEL, format!("{}", e.status_code()))]
);
e
});
Ok(res)
})
}
}

View File

@@ -1,43 +0,0 @@
CREATE TABLE test(i BIGINT TIME INDEX, j INTEGER, k INTEGER NOT NULL);
Affected Rows: 0
INSERT INTO test VALUES (1, 1, 11), (2, 2, 12);
Affected Rows: 2
SELECT * FROM test;
+---+---+----+
| i | j | k |
+---+---+----+
| 1 | 1 | 11 |
| 2 | 2 | 12 |
+---+---+----+
ALTER TABLE test DROP COLUMN j;
Affected Rows: 0
INSERT INTO test VALUES (3, NULL);
Error: 1004(InvalidArguments), Failed to insert value to table: greptime.public.test, source: Failed to operate table, source: Column k is not null but input has null
INSERT INTO test VALUES (3, 13);
Affected Rows: 1
SELECT * FROM test;
+---+----+
| i | k |
+---+----+
| 1 | 11 |
| 2 | 12 |
| 3 | 13 |
+---+----+
DROP TABLE test;
Affected Rows: 1

View File

@@ -1,15 +0,0 @@
CREATE TABLE test(i BIGINT TIME INDEX, j INTEGER, k INTEGER NOT NULL);
INSERT INTO test VALUES (1, 1, 11), (2, 2, 12);
SELECT * FROM test;
ALTER TABLE test DROP COLUMN j;
INSERT INTO test VALUES (3, NULL);
INSERT INTO test VALUES (3, 13);
SELECT * FROM test;
DROP TABLE test;