From 2ef0d06cdbd7ce434741b04ca466c0365cd03aa9 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Wed, 19 Jul 2023 19:27:49 +0800 Subject: [PATCH] feat: status_code in response header (#1982) * feat: status_code in response header * chore: parese grpc response * fix: sqlness failed * chore: fix sqlness --- src/client/src/database.rs | 28 ++++-------- src/client/src/lib.rs | 33 ++++++++++++++ src/client/src/stream_insert.rs | 20 +++------ src/common/error/src/status_code.rs | 38 ++++++++++++++++ src/servers/src/grpc/database.rs | 39 ++++++++++++++--- src/servers/src/grpc/flight.rs | 2 +- src/servers/src/grpc/handler.rs | 27 +++++++----- src/servers/src/grpc/prom_query_gateway.rs | 8 +++- .../alter/drop_col_not_null_next.result | 43 +++++++++++++++++++ .../alter/drop_col_not_null_next.sql | 0 .../alter/drop_col_not_null_next.result | 0 .../alter/drop_col_not_null_next.sql | 15 +++++++ 12 files changed, 199 insertions(+), 54 deletions(-) create mode 100644 tests/cases/distributed/alter/drop_col_not_null_next.result rename tests/cases/{standalone/common => distributed}/alter/drop_col_not_null_next.sql (100%) rename tests/cases/standalone/{common => }/alter/drop_col_not_null_next.result (100%) create mode 100644 tests/cases/standalone/alter/drop_col_not_null_next.sql diff --git a/src/client/src/database.rs b/src/client/src/database.rs index e2bfed224e..d04eda47f9 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -17,9 +17,9 @@ use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{ - greptime_response, AffectedRows, AlterExpr, AuthHeader, CompactTableExpr, CreateTableExpr, - DdlRequest, DeleteRequest, DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequests, - PromRangeQuery, QueryRequest, RequestHeader, + AlterExpr, AuthHeader, CompactTableExpr, CreateTableExpr, DdlRequest, DeleteRequest, + DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequests, PromRangeQuery, QueryRequest, + RequestHeader, }; use arrow_flight::{FlightData, Ticket}; use common_error::ext::{BoxedError, ErrorExt}; @@ -28,12 +28,10 @@ use common_query::Output; use common_telemetry::{logging, timer}; use futures_util::{TryFutureExt, TryStreamExt}; use prost::Message; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{ensure, ResultExt}; -use crate::error::{ - ConvertFlightDataSnafu, IllegalDatabaseResponseSnafu, IllegalFlightMessagesSnafu, -}; -use crate::{error, metrics, Client, Result, StreamInserter}; +use crate::error::{ConvertFlightDataSnafu, IllegalFlightMessagesSnafu, ServerSnafu}; +use crate::{error, from_grpc_response, metrics, Client, Result, StreamInserter}; #[derive(Clone, Debug, Default)] pub struct Database { @@ -142,16 +140,8 @@ impl Database { async fn handle(&self, request: Request) -> Result { let mut client = self.client.make_database_client()?.inner; let request = self.to_rpc_request(request); - let response = client - .handle(request) - .await? - .into_inner() - .response - .context(IllegalDatabaseResponseSnafu { - err_msg: "GreptimeResponse is empty", - })?; - let greptime_response::Response::AffectedRows(AffectedRows { value }) = response; - Ok(value) + let response = client.handle(request).await?.into_inner(); + from_grpc_response(response) } #[inline] @@ -264,7 +254,7 @@ impl Database { let e: error::Error = e.into(); let code = e.status_code(); let msg = e.to_string(); - error::ServerSnafu { code, msg } + ServerSnafu { code, msg } .fail::<()>() .map_err(BoxedError::new) .context(error::FlightGetSnafu { diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index 0c1c2cd600..45ae26440b 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -21,9 +21,42 @@ mod metrics; mod stream_insert; pub use api; +use api::v1::greptime_response::Response; +use api::v1::{AffectedRows, GreptimeResponse}; pub use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_error::status_code::StatusCode; +use snafu::OptionExt; pub use self::client::Client; pub use self::database::Database; pub use self::error::{Error, Result}; pub use self::stream_insert::StreamInserter; +use crate::error::{IllegalDatabaseResponseSnafu, ServerSnafu}; + +pub fn from_grpc_response(response: GreptimeResponse) -> Result { + let header = response.header.context(IllegalDatabaseResponseSnafu { + err_msg: "missing header", + })?; + let status = header.status.context(IllegalDatabaseResponseSnafu { + err_msg: "missing status", + })?; + + if StatusCode::is_success(status.status_code) { + let res = response.response.context(IllegalDatabaseResponseSnafu { + err_msg: "missing response", + })?; + match res { + Response::AffectedRows(AffectedRows { value }) => Ok(value), + } + } else { + let status_code = + StatusCode::from_u32(status.status_code).context(IllegalDatabaseResponseSnafu { + err_msg: format!("invalid status: {:?}", status), + })?; + ServerSnafu { + code: status_code, + msg: status.err_msg, + } + .fail() + } +} diff --git a/src/client/src/stream_insert.rs b/src/client/src/stream_insert.rs index 943f9a0ad7..0701490101 100644 --- a/src/client/src/stream_insert.rs +++ b/src/client/src/stream_insert.rs @@ -15,17 +15,16 @@ use api::v1::greptime_database_client::GreptimeDatabaseClient; use api::v1::greptime_request::Request; use api::v1::{ - greptime_response, AffectedRows, AuthHeader, GreptimeRequest, GreptimeResponse, InsertRequest, - InsertRequests, RequestHeader, + AuthHeader, GreptimeRequest, GreptimeResponse, InsertRequest, InsertRequests, RequestHeader, }; -use snafu::OptionExt; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; use tonic::{Response, Status}; -use crate::error::{self, IllegalDatabaseResponseSnafu, Result}; +use crate::error::{self, Result}; +use crate::from_grpc_response; /// A structure that provides some methods for streaming data insert. /// @@ -89,17 +88,8 @@ impl StreamInserter { drop(self.sender); let response = self.join.await.unwrap()?; - - let response = response - .into_inner() - .response - .context(IllegalDatabaseResponseSnafu { - err_msg: "GreptimeResponse is empty", - })?; - - let greptime_response::Response::AffectedRows(AffectedRows { value }) = response; - - Ok(value) + let response = response.into_inner(); + from_grpc_response(response) } fn to_rpc_request(&self, request: Request) -> GreptimeRequest { diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index c43f908048..81f8daa157 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -154,6 +154,44 @@ impl StatusCode { | StatusCode::AccessDenied => false, } } + + pub fn from_u32(value: u32) -> Option { + match value { + v if v == StatusCode::Success as u32 => Some(StatusCode::Success), + v if v == StatusCode::Unknown as u32 => Some(StatusCode::Unknown), + v if v == StatusCode::Unsupported as u32 => Some(StatusCode::Unsupported), + v if v == StatusCode::Unexpected as u32 => Some(StatusCode::Unexpected), + v if v == StatusCode::Internal as u32 => Some(StatusCode::Internal), + v if v == StatusCode::InvalidArguments as u32 => Some(StatusCode::InvalidArguments), + v if v == StatusCode::Cancelled as u32 => Some(StatusCode::Cancelled), + v if v == StatusCode::InvalidSyntax as u32 => Some(StatusCode::InvalidSyntax), + v if v == StatusCode::PlanQuery as u32 => Some(StatusCode::PlanQuery), + v if v == StatusCode::EngineExecuteQuery as u32 => Some(StatusCode::EngineExecuteQuery), + v if v == StatusCode::TableAlreadyExists as u32 => Some(StatusCode::TableAlreadyExists), + v if v == StatusCode::TableNotFound as u32 => Some(StatusCode::TableNotFound), + v if v == StatusCode::TableColumnNotFound as u32 => { + Some(StatusCode::TableColumnNotFound) + } + v if v == StatusCode::TableColumnExists as u32 => Some(StatusCode::TableColumnExists), + v if v == StatusCode::DatabaseNotFound as u32 => Some(StatusCode::DatabaseNotFound), + v if v == StatusCode::StorageUnavailable as u32 => Some(StatusCode::StorageUnavailable), + v if v == StatusCode::RuntimeResourcesExhausted as u32 => { + Some(StatusCode::RuntimeResourcesExhausted) + } + v if v == StatusCode::RateLimited as u32 => Some(StatusCode::RateLimited), + v if v == StatusCode::UserNotFound as u32 => Some(StatusCode::UserNotFound), + v if v == StatusCode::UnsupportedPasswordType as u32 => { + Some(StatusCode::UnsupportedPasswordType) + } + v if v == StatusCode::UserPasswordMismatch as u32 => { + Some(StatusCode::UserPasswordMismatch) + } + v if v == StatusCode::AuthHeaderNotFound as u32 => Some(StatusCode::AuthHeaderNotFound), + v if v == StatusCode::InvalidAuthHeader as u32 => Some(StatusCode::InvalidAuthHeader), + v if v == StatusCode::AccessDenied as u32 => Some(StatusCode::AccessDenied), + _ => None, + } + } } impl fmt::Display for StatusCode { diff --git a/src/servers/src/grpc/database.rs b/src/servers/src/grpc/database.rs index edf214bab0..adcc6273a5 100644 --- a/src/servers/src/grpc/database.rs +++ b/src/servers/src/grpc/database.rs @@ -16,8 +16,10 @@ use std::sync::Arc; use api::v1::greptime_database_server::GreptimeDatabase; use api::v1::greptime_response::Response as RawResponse; -use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse}; +use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader}; use async_trait::async_trait; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; use common_query::Output; use futures::StreamExt; use tonic::{Request, Response, Status, Streaming}; @@ -44,13 +46,27 @@ impl GreptimeDatabase for DatabaseService { let request = request.into_inner(); let output = self.handler.handle_request(request).await?; let response = match output { - Output::AffectedRows(rows) => GreptimeResponse { - header: None, + Ok(Output::AffectedRows(rows)) => GreptimeResponse { + header: Some(ResponseHeader { + status: Some(api::v1::Status { + status_code: StatusCode::Success as _, + ..Default::default() + }), + }), response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })), }, - Output::Stream(_) | Output::RecordBatches(_) => { + Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => { return Err(Status::unimplemented("GreptimeDatabase::Handle for query")); } + Err(e) => GreptimeResponse { + header: Some(ResponseHeader { + status: Some(api::v1::Status { + status_code: e.status_code() as _, + err_msg: e.to_string(), + }), + }), + response: None, + }, }; Ok(Response::new(response)) } @@ -66,17 +82,26 @@ impl GreptimeDatabase for DatabaseService { let request = request?; let output = self.handler.handle_request(request).await?; match output { - Output::AffectedRows(rows) => affected_rows += rows, - Output::Stream(_) | Output::RecordBatches(_) => { + Ok(Output::AffectedRows(rows)) => affected_rows += rows, + Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => { return Err(Status::unimplemented( "GreptimeDatabase::HandleRequests for query", )); } + Err(e) => { + // We directly convert it to a tonic error and fail immediately in stream. + return Err(e.into()); + } } } let response = GreptimeResponse { - header: None, + header: Some(ResponseHeader { + status: Some(api::v1::Status { + status_code: StatusCode::Success as _, + ..Default::default() + }), + }), response: Some(RawResponse::AffectedRows(AffectedRows { value: affected_rows as u32, })), diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index 0b793d9855..826de00999 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -89,7 +89,7 @@ impl FlightService for FlightHandler { let request = GreptimeRequest::decode(ticket.as_ref()).context(error::InvalidFlightTicketSnafu)?; - let output = self.handler.handle_request(request).await?; + let output = self.handler.handle_request(request).await??; let stream = to_flight_data_stream(output); Ok(Response::new(stream)) diff --git a/src/servers/src/grpc/handler.rs b/src/servers/src/grpc/handler.rs index ea8234fe8d..1d4e80c157 100644 --- a/src/servers/src/grpc/handler.rs +++ b/src/servers/src/grpc/handler.rs @@ -26,11 +26,12 @@ use common_telemetry::logging; use metrics::{histogram, increment_counter}; use session::context::{QueryContext, QueryContextRef}; use snafu::{OptionExt, ResultExt}; -use tonic::Status; use crate::auth::{Identity, Password, UserProviderRef}; use crate::error::Error::UnsupportedAuthScheme; -use crate::error::{AuthSnafu, InvalidQuerySnafu, JoinTaskSnafu, NotFoundAuthHeaderSnafu}; +use crate::error::{ + AuthSnafu, InvalidQuerySnafu, JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result as InternalResult, +}; use crate::grpc::TonicResult; use crate::metrics::{ METRIC_AUTH_FAILURE, METRIC_CODE_LABEL, METRIC_SERVER_GRPC_DB_REQUEST_TIMER, @@ -57,7 +58,10 @@ impl GreptimeRequestHandler { } } - pub(crate) async fn handle_request(&self, request: GreptimeRequest) -> TonicResult { + pub(crate) async fn handle_request( + &self, + request: GreptimeRequest, + ) -> TonicResult> { let query = request.request.context(InvalidQuerySnafu { reason: "Expecting non-empty GreptimeRequest.", })?; @@ -65,7 +69,7 @@ impl GreptimeRequestHandler { let header = request.header.as_ref(); let query_ctx = create_query_context(header); - self.auth(header, &query_ctx).await?; + let _ = self.auth(header, &query_ctx).await?; let handler = self.handler.clone(); let request_type = request_type(&query); @@ -94,7 +98,7 @@ impl GreptimeRequestHandler { let output = handle.await.context(JoinTaskSnafu).map_err(|e| { timer.record(e.status_code()); e - })??; + })?; Ok(output) } @@ -102,8 +106,8 @@ impl GreptimeRequestHandler { &self, header: Option<&RequestHeader>, query_ctx: &QueryContextRef, - ) -> TonicResult<()> { - let Some(user_provider) = self.user_provider.as_ref() else { return Ok(()) }; + ) -> TonicResult> { + let Some(user_provider) = self.user_provider.as_ref() else { return Ok(Ok(())) }; let auth_scheme = header .and_then(|header| { @@ -114,7 +118,7 @@ impl GreptimeRequestHandler { }) .context(NotFoundAuthHeaderSnafu)?; - let _ = match auth_scheme { + let res = match auth_scheme { AuthScheme::Basic(Basic { username, password }) => user_provider .auth( Identity::UserId(&username, None), @@ -128,14 +132,15 @@ impl GreptimeRequestHandler { name: "Token AuthScheme".to_string(), }), } + .map(|_| ()) .map_err(|e| { increment_counter!( METRIC_AUTH_FAILURE, &[(METRIC_CODE_LABEL, format!("{}", e.status_code()))] ); - Status::unauthenticated(e.to_string()) - })?; - Ok(()) + e + }); + Ok(res) } } diff --git a/src/servers/src/grpc/prom_query_gateway.rs b/src/servers/src/grpc/prom_query_gateway.rs index eced6644ad..6a93acab0a 100644 --- a/src/servers/src/grpc/prom_query_gateway.rs +++ b/src/servers/src/grpc/prom_query_gateway.rs @@ -22,6 +22,7 @@ use api::v1::promql_request::Promql; use api::v1::{PromqlRequest, PromqlResponse, ResponseHeader}; use async_trait::async_trait; use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; use common_telemetry::timer; use common_time::util::current_time_rfc3339; use promql_parser::parser::ValueType; @@ -80,7 +81,12 @@ impl PrometheusGateway for PrometheusGatewayService { let json_bytes = serde_json::to_string(&json_response).unwrap().into_bytes(); let response = Response::new(PromqlResponse { - header: Some(ResponseHeader { status: None }), + header: Some(ResponseHeader { + status: Some(api::v1::Status { + status_code: StatusCode::Success as _, + ..Default::default() + }), + }), body: json_bytes, }); Ok(response) diff --git a/tests/cases/distributed/alter/drop_col_not_null_next.result b/tests/cases/distributed/alter/drop_col_not_null_next.result new file mode 100644 index 0000000000..7e58c5e635 --- /dev/null +++ b/tests/cases/distributed/alter/drop_col_not_null_next.result @@ -0,0 +1,43 @@ +CREATE TABLE test(i BIGINT TIME INDEX, j INTEGER, k INTEGER NOT NULL); + +Affected Rows: 0 + +INSERT INTO test VALUES (1, 1, 11), (2, 2, 12); + +Affected Rows: 2 + +SELECT * FROM test; + ++---+---+----+ +| i | j | k | ++---+---+----+ +| 1 | 1 | 11 | +| 2 | 2 | 12 | ++---+---+----+ + +ALTER TABLE test DROP COLUMN j; + +Affected Rows: 0 + +INSERT INTO test VALUES (3, NULL); + +Error: 1004(InvalidArguments), Failed to insert value to table: greptime.public.test, source: Failed to operate table, source: Column k is not null but input has null + +INSERT INTO test VALUES (3, 13); + +Affected Rows: 1 + +SELECT * FROM test; + ++---+----+ +| i | k | ++---+----+ +| 1 | 11 | +| 2 | 12 | +| 3 | 13 | ++---+----+ + +DROP TABLE test; + +Affected Rows: 1 + diff --git a/tests/cases/standalone/common/alter/drop_col_not_null_next.sql b/tests/cases/distributed/alter/drop_col_not_null_next.sql similarity index 100% rename from tests/cases/standalone/common/alter/drop_col_not_null_next.sql rename to tests/cases/distributed/alter/drop_col_not_null_next.sql diff --git a/tests/cases/standalone/common/alter/drop_col_not_null_next.result b/tests/cases/standalone/alter/drop_col_not_null_next.result similarity index 100% rename from tests/cases/standalone/common/alter/drop_col_not_null_next.result rename to tests/cases/standalone/alter/drop_col_not_null_next.result diff --git a/tests/cases/standalone/alter/drop_col_not_null_next.sql b/tests/cases/standalone/alter/drop_col_not_null_next.sql new file mode 100644 index 0000000000..fcd438ea9a --- /dev/null +++ b/tests/cases/standalone/alter/drop_col_not_null_next.sql @@ -0,0 +1,15 @@ +CREATE TABLE test(i BIGINT TIME INDEX, j INTEGER, k INTEGER NOT NULL); + +INSERT INTO test VALUES (1, 1, 11), (2, 2, 12); + +SELECT * FROM test; + +ALTER TABLE test DROP COLUMN j; + +INSERT INTO test VALUES (3, NULL); + +INSERT INTO test VALUES (3, 13); + +SELECT * FROM test; + +DROP TABLE test;