refactor: extract the common method for errors into tonic status (#6437)

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-07-02 10:57:30 +08:00
committed by GitHub
parent 6b90e2b6b4
commit 385f12a62e
6 changed files with 91 additions and 99 deletions

View File

@@ -31,7 +31,7 @@ use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use common_catalog::build_db_string;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::BoxedError;
use common_grpc::flight::do_put::DoPutResponse;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_query::Output;
@@ -48,7 +48,7 @@ use tonic::transport::Channel;
use crate::error::{
ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu,
InvalidTonicMetadataValueSnafu, ServerSnafu,
InvalidTonicMetadataValueSnafu,
};
use crate::{error, from_grpc_response, Client, Result};
@@ -302,21 +302,16 @@ impl Database {
let response = client.mut_inner().do_get(request).await.or_else(|e| {
let tonic_code = e.code();
let e: Error = e.into();
let code = e.status_code();
let msg = e.to_string();
let error =
Err(BoxedError::new(ServerSnafu { code, msg }.build())).with_context(|_| {
FlightGetSnafu {
addr: client.addr().to_string(),
tonic_code,
}
});
error!(
"Failed to do Flight get, addr: {}, code: {}, source: {:?}",
client.addr(),
tonic_code,
error
e
);
let error = Err(BoxedError::new(e)).with_context(|_| FlightGetSnafu {
addr: client.addr().to_string(),
tonic_code,
});
error
})?;
@@ -446,8 +441,11 @@ mod tests {
use api::v1::auth_header::AuthScheme;
use api::v1::{AuthHeader, Basic};
use common_error::status_code::StatusCode;
use tonic::{Code, Status};
use super::*;
use crate::error::TonicSnafu;
#[test]
fn test_flight_ctx() {
@@ -470,4 +468,19 @@ mod tests {
})
)
}
#[test]
fn test_from_tonic_status() {
let expected = TonicSnafu {
code: StatusCode::Internal,
msg: "blabla".to_string(),
tonic_code: Code::Internal,
}
.build();
let status = Status::new(Code::Internal, "blabla");
let actual: Error = status.into();
assert_eq!(expected.to_string(), actual.to_string());
}
}

View File

@@ -14,13 +14,13 @@
use std::any::Any;
use common_error::define_from_tonic_status;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::{convert_tonic_code_to_status_code, StatusCode};
use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{location, Location, Snafu};
use tonic::metadata::errors::InvalidMetadataValue;
use tonic::{Code, Status};
use tonic::Code;
#[derive(Snafu)]
#[snafu(visibility(pub))]
@@ -124,6 +124,15 @@ pub enum Error {
location: Location,
source: datatypes::error::Error,
},
#[snafu(display("{}", msg))]
Tonic {
code: StatusCode,
msg: String,
tonic_code: Code,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -135,7 +144,7 @@ impl ErrorExt for Error {
| Error::MissingField { .. }
| Error::IllegalDatabaseResponse { .. } => StatusCode::Internal,
Error::Server { code, .. } => *code,
Error::Server { code, .. } | Error::Tonic { code, .. } => *code,
Error::FlightGet { source, .. }
| Error::RegionServer { source, .. }
| Error::FlowServer { source, .. } => source.status_code(),
@@ -153,34 +162,7 @@ impl ErrorExt for Error {
}
}
impl From<Status> for Error {
fn from(e: Status) -> Self {
fn get_metadata_value(e: &Status, key: &str) -> Option<String> {
e.metadata()
.get(key)
.and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
}
let code = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_CODE).and_then(|s| {
if let Ok(code) = s.parse::<u32>() {
StatusCode::from_u32(code)
} else {
None
}
});
let tonic_code = e.code();
let code = code.unwrap_or_else(|| convert_tonic_code_to_status_code(tonic_code));
let msg = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_MSG)
.unwrap_or_else(|| e.message().to_string());
Self::Server {
code,
msg,
location: location!(),
}
}
}
define_from_tonic_status!(Error, Tonic);
impl Error {
pub fn should_retry(&self) -> bool {

View File

@@ -21,7 +21,7 @@ use arc_swap::ArcSwapOption;
use arrow_flight::Ticket;
use async_stream::stream;
use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::BoxedError;
use common_error::status_code::StatusCode;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_meta::error::{self as meta_error, Result as MetaResult};
@@ -107,24 +107,18 @@ impl RegionRequester {
.mut_inner()
.do_get(ticket)
.await
.map_err(|e| {
.or_else(|e| {
let tonic_code = e.code();
let e: error::Error = e.into();
let code = e.status_code();
let msg = e.to_string();
let error = ServerSnafu { code, msg }
.fail::<()>()
.map_err(BoxedError::new)
.with_context(|_| FlightGetSnafu {
tonic_code,
addr: flight_client.addr().to_string(),
})
.unwrap_err();
error!(
e; "Failed to do Flight get, addr: {}, code: {}",
flight_client.addr(),
tonic_code
);
let error = Err(BoxedError::new(e)).with_context(|_| FlightGetSnafu {
addr: flight_client.addr().to_string(),
tonic_code,
});
error
})?;

View File

@@ -239,6 +239,48 @@ impl fmt::Display for StatusCode {
}
}
#[macro_export]
macro_rules! define_from_tonic_status {
($Error: ty, $Variant: ident) => {
impl From<tonic::Status> for $Error {
fn from(e: tonic::Status) -> Self {
use snafu::location;
fn metadata_value(e: &tonic::Status, key: &str) -> Option<String> {
e.metadata()
.get(key)
.and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
}
let code = metadata_value(&e, $crate::GREPTIME_DB_HEADER_ERROR_CODE)
.and_then(|s| {
if let Ok(code) = s.parse::<u32>() {
StatusCode::from_u32(code)
} else {
None
}
})
.unwrap_or_else(|| match e.code() {
tonic::Code::Cancelled => StatusCode::Cancelled,
tonic::Code::DeadlineExceeded => StatusCode::DeadlineExceeded,
_ => StatusCode::Internal,
});
let msg = metadata_value(&e, $crate::GREPTIME_DB_HEADER_ERROR_MSG)
.unwrap_or_else(|| e.message().to_string());
// TODO(LFC): Make the error variant defined automatically.
Self::$Variant {
code,
msg,
tonic_code: e.code(),
location: location!(),
}
}
}
};
}
#[macro_export]
macro_rules! define_into_tonic_status {
($Error: ty) => {
@@ -315,15 +357,6 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code {
}
}
/// Converts tonic [Code] to [StatusCode].
pub fn convert_tonic_code_to_status_code(code: Code) -> StatusCode {
match code {
Code::Cancelled => StatusCode::Cancelled,
Code::DeadlineExceeded => StatusCode::DeadlineExceeded,
_ => StatusCode::Internal,
}
}
#[cfg(test)]
mod tests {
use strum::IntoEnumIterator;

View File

@@ -12,12 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::define_from_tonic_status;
use common_error::ext::ErrorExt;
use common_error::status_code::{convert_tonic_code_to_status_code, StatusCode};
use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{location, Location, Snafu};
use tonic::Status;
#[derive(Snafu)]
#[snafu(visibility(pub))]
@@ -161,33 +160,4 @@ impl Error {
}
}
// FIXME(dennis): partial duplicated with src/client/src/error.rs
impl From<Status> for Error {
fn from(e: Status) -> Self {
fn get_metadata_value(s: &Status, key: &str) -> Option<String> {
s.metadata()
.get(key)
.and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
}
let code = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_CODE).and_then(|s| {
if let Ok(code) = s.parse::<u32>() {
StatusCode::from_u32(code)
} else {
None
}
});
let tonic_code = e.code();
let code = code.unwrap_or_else(|| convert_tonic_code_to_status_code(tonic_code));
let msg = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_MSG)
.unwrap_or_else(|| e.message().to_string());
Self::MetaServer {
code,
msg,
tonic_code,
location: location!(),
}
}
}
define_from_tonic_status!(Error, MetaServer);

View File

@@ -93,7 +93,7 @@ pub struct MemtableStats {
impl MemtableStats {
/// Attaches the time range to the stats.
#[cfg(any(test, feature = "test"))]
pub(crate) fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
self.time_range = time_range;
self
}