fix: some TODO in sqlness cases and refactor meta-client error (#2207)

* fix: some TODO in sqlness cases and refactor meta-client error

* fix: delete tests/cases/standalone/alter/drop_col_not_null_next.output
This commit is contained in:
dennis zhuang
2023-08-18 18:09:11 +08:00
committed by GitHub
parent 3150f4b22e
commit 272f649b22
19 changed files with 114 additions and 141 deletions

View File

@@ -18,6 +18,7 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_procedure::ProcedureId;
use serde_json::error::Error as JsonError;
use servers::define_into_tonic_status;
use snafu::{Location, Snafu};
use store_api::storage::{RegionId, RegionNumber};
use table::error::Error as TableError;
@@ -644,11 +645,7 @@ impl ErrorExt for Error {
}
}
impl From<Error> for tonic::Status {
fn from(err: Error) -> Self {
tonic::Status::from_error(Box::new(err))
}
}
define_into_tonic_status!(Error);
#[cfg(test)]
mod tests {

View File

@@ -20,6 +20,7 @@ use common_error::status_code::StatusCode;
use datafusion::parquet;
use datatypes::arrow::error::ArrowError;
use datatypes::value::Value;
use servers::define_into_tonic_status;
use snafu::{Location, Snafu};
use store_api::storage::RegionNumber;
@@ -734,8 +735,4 @@ impl ErrorExt for Error {
}
}
impl From<Error> for tonic::Status {
fn from(err: Error) -> Self {
tonic::Status::new(tonic::Code::Internal, err.to_string())
}
}
define_into_tonic_status!(Error);

View File

@@ -136,7 +136,7 @@ impl Inner {
let res = client
.submit_ddl_task(req.clone())
.await
.context(error::TonicStatusSnafu)?;
.map_err(error::Error::from)?;
let res = res.into_inner();

View File

@@ -78,7 +78,7 @@ impl HeartbeatStream {
/// Fetch the next message from this stream.
#[inline]
pub async fn message(&mut self) -> Result<Option<HeartbeatResponse>> {
let res = self.stream.message().await.context(error::TonicStatusSnafu);
let res = self.stream.message().await.map_err(error::Error::from);
if let Ok(Some(heartbeat)) = &res {
util::check_response_header(heartbeat.header.as_ref())
.context(InvalidResponseHeaderSnafu)?;
@@ -214,13 +214,13 @@ impl Inner {
let mut stream = leader
.heartbeat(receiver)
.await
.context(error::TonicStatusSnafu)?
.map_err(error::Error::from)?
.into_inner();
let res = stream
.message()
.await
.context(error::TonicStatusSnafu)?
.map_err(error::Error::from)?
.context(error::CreateHeartbeatStreamSnafu)?;
info!("Success to create heartbeat stream to server: {:#?}", res);

View File

@@ -128,7 +128,7 @@ impl Inner {
async fn lock(&self, mut req: LockRequest) -> Result<LockResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
let res = client.lock(req).await.context(error::TonicStatusSnafu)?;
let res = client.lock(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
}
@@ -136,7 +136,7 @@ impl Inner {
async fn unlock(&self, mut req: UnlockRequest) -> Result<UnlockResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
let res = client.unlock(req).await.context(error::TonicStatusSnafu)?;
let res = client.unlock(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
}

View File

@@ -98,7 +98,7 @@ impl Inner {
async fn route(&self, mut req: RouteRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
let res = client.route(req).await.context(error::TonicStatusSnafu)?;
let res = client.route(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
}

View File

@@ -141,7 +141,7 @@ impl Inner {
async fn range(&self, mut req: RangeRequest) -> Result<RangeResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
let res = client.range(req).await.context(error::TonicStatusSnafu)?;
let res = client.range(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
}
@@ -149,7 +149,7 @@ impl Inner {
async fn put(&self, mut req: PutRequest) -> Result<PutResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
let res = client.put(req).await.context(error::TonicStatusSnafu)?;
let res = client.put(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
}
@@ -158,10 +158,7 @@ impl Inner {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
let res = client
.batch_get(req)
.await
.context(error::TonicStatusSnafu)?;
let res = client.batch_get(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
}
@@ -169,10 +166,7 @@ impl Inner {
async fn batch_put(&self, mut req: BatchPutRequest) -> Result<BatchPutResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
let res = client
.batch_put(req)
.await
.context(error::TonicStatusSnafu)?;
let res = client.batch_put(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
}
@@ -180,10 +174,7 @@ impl Inner {
async fn batch_delete(&self, mut req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
let res = client
.batch_delete(req)
.await
.context(error::TonicStatusSnafu)?;
let res = client.batch_delete(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
}
@@ -197,7 +188,7 @@ impl Inner {
let res = client
.compare_and_put(req)
.await
.context(error::TonicStatusSnafu)?;
.map_err(error::Error::from)?;
Ok(res.into_inner())
}
@@ -205,10 +196,7 @@ impl Inner {
async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
let res = client
.delete_range(req)
.await
.context(error::TonicStatusSnafu)?;
let res = client.delete_range(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
}
@@ -216,10 +204,7 @@ impl Inner {
async fn move_value(&self, mut req: MoveValueRequest) -> Result<MoveValueResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
let res = client
.move_value(req)
.await
.context(error::TonicStatusSnafu)?;
let res = client.move_value(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
}

View File

@@ -14,7 +14,9 @@
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_error::{GREPTIME_ERROR_CODE, GREPTIME_ERROR_MSG};
use snafu::{Location, Snafu};
use tonic::Status;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
@@ -22,11 +24,8 @@ pub enum Error {
#[snafu(display("Illegal GRPC client state: {}", err_msg))]
IllegalGrpcClientState { err_msg: String, location: Location },
#[snafu(display("Tonic internal error, source: {}", source))]
TonicStatus {
source: tonic::Status,
location: Location,
},
#[snafu(display("{}", msg))]
MetaServer { code: StatusCode, msg: String },
#[snafu(display("Failed to ask leader from all endpoints"))]
AskLeader { location: Location },
@@ -79,7 +78,6 @@ impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::IllegalGrpcClientState { .. }
| Error::TonicStatus { .. }
| Error::AskLeader { .. }
| Error::NoLeader { .. }
| Error::NotStarted { .. }
@@ -87,9 +85,37 @@ impl ErrorExt for Error {
| Error::CreateHeartbeatStream { .. }
| Error::CreateChannel { .. } => StatusCode::Internal,
Error::MetaServer { code, .. } => *code,
Error::InvalidResponseHeader { source, .. }
| Error::ConvertMetaRequest { source, .. }
| Error::ConvertMetaResponse { source, .. } => source.status_code(),
}
}
}
// FIXME(dennis): partial duplicated with src/client/src/error.rs
impl From<Status> for Error {
fn from(e: Status) -> Self {
fn get_metadata_value(s: &Status, key: &str) -> Option<String> {
s.metadata()
.get(key)
.and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
}
let code = get_metadata_value(&e, GREPTIME_ERROR_CODE)
.and_then(|s| {
if let Ok(code) = s.parse::<u32>() {
StatusCode::from_u32(code)
} else {
None
}
})
.unwrap_or(StatusCode::Internal);
let msg =
get_metadata_value(&e, GREPTIME_ERROR_MSG).unwrap_or_else(|| e.message().to_string());
Self::MetaServer { code, msg }
}
}

View File

@@ -16,10 +16,10 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_meta::peer::Peer;
use common_runtime::JoinError;
use servers::define_into_tonic_status;
use snafu::{Location, Snafu};
use tokio::sync::mpsc::error::SendError;
use tonic::codegen::http;
use tonic::Code;
use crate::pubsub::Message;
@@ -520,11 +520,7 @@ pub enum Error {
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for tonic::Status {
fn from(err: Error) -> Self {
tonic::Status::new(Code::Internal, err.to_string())
}
}
define_into_tonic_status!(Error);
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
@@ -550,7 +546,6 @@ impl ErrorExt for Error {
| Error::SendShutdownSignal { .. }
| Error::ParseAddr { .. }
| Error::SchemaAlreadyExists { .. }
| Error::TableAlreadyExists { .. }
| Error::PusherNotFound { .. }
| Error::PushMessage { .. }
| Error::MailboxClosed { .. }
@@ -564,6 +559,7 @@ impl ErrorExt for Error {
| Error::PublishMessage { .. }
| Error::Join { .. }
| Error::Unsupported { .. } => StatusCode::Internal,
Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }
| Error::MissingRequestHeader { .. }

View File

@@ -93,7 +93,11 @@ impl AlterTableProcedure {
ensure!(
!exist,
error::TableAlreadyExistsSnafu {
table_name: request.table_ref().to_string()
table_name: common_catalog::format_full_table_name(
&request.catalog_name,
&request.schema_name,
new_table_name,
),
}
)
}

View File

@@ -22,14 +22,11 @@ use base64::DecodeError;
use catalog;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_error::{GREPTIME_ERROR_CODE, GREPTIME_ERROR_MSG};
use common_telemetry::logging;
use datatypes::prelude::ConcreteDataType;
use query::parser::PromQuery;
use serde_json::json;
use snafu::{ErrorCompat, Location, Snafu};
use tonic::codegen::http::{HeaderMap, HeaderValue};
use tonic::metadata::MetadataMap;
use snafu::{Location, Snafu};
use tonic::Code;
#[derive(Debug, Snafu)]
@@ -429,7 +426,7 @@ impl ErrorExt for Error {
}
/// Returns the tonic [Code] of a [StatusCode].
fn status_to_tonic_code(status_code: StatusCode) -> Code {
pub fn status_to_tonic_code(status_code: StatusCode) -> Code {
match status_code {
StatusCode::Success => Code::Ok,
StatusCode::Unknown => Code::Unknown,
@@ -455,24 +452,41 @@ fn status_to_tonic_code(status_code: StatusCode) -> Code {
}
}
impl From<Error> for tonic::Status {
fn from(err: Error) -> Self {
let mut headers = HeaderMap::<HeaderValue>::with_capacity(2);
#[macro_export]
macro_rules! define_into_tonic_status {
($Error: ty) => {
impl From<$Error> for tonic::Status {
fn from(err: $Error) -> Self {
use common_error::{GREPTIME_ERROR_CODE, GREPTIME_ERROR_MSG};
use snafu::ErrorCompat;
use tonic::codegen::http::{HeaderMap, HeaderValue};
use tonic::metadata::MetadataMap;
// If either of the status_code or error msg cannot convert to valid HTTP header value
// (which is a very rare case), just ignore. Client will use Tonic status code and message.
let status_code = err.status_code();
headers.insert(GREPTIME_ERROR_CODE, HeaderValue::from(status_code as u32));
let root_error = err.iter_chain().last().unwrap();
if let Ok(err_msg) = HeaderValue::from_bytes(root_error.to_string().as_bytes()) {
let _ = headers.insert(GREPTIME_ERROR_MSG, err_msg);
let mut headers = HeaderMap::<HeaderValue>::with_capacity(2);
// If either of the status_code or error msg cannot convert to valid HTTP header value
// (which is a very rare case), just ignore. Client will use Tonic status code and message.
let status_code = err.status_code();
headers.insert(GREPTIME_ERROR_CODE, HeaderValue::from(status_code as u32));
let root_error = err.iter_chain().last().unwrap();
if let Ok(err_msg) = HeaderValue::from_bytes(root_error.to_string().as_bytes()) {
let _ = headers.insert(GREPTIME_ERROR_MSG, err_msg);
}
let metadata = MetadataMap::from_headers(headers);
tonic::Status::with_metadata(
$crate::error::status_to_tonic_code(status_code),
err.to_string(),
metadata,
)
}
}
let metadata = MetadataMap::from_headers(headers);
tonic::Status::with_metadata(status_to_tonic_code(status_code), err.to_string(), metadata)
}
};
}
define_into_tonic_status!(Error);
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Error::InternalIo { source: e }

View File

@@ -1,42 +0,0 @@
CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX);
Affected Rows: 0
DESC TABLE t;
+-------+-------+------+---------+---------------+
| Field | Type | Null | Default | Semantic Type |
+-------+-------+------+---------+---------------+
| i | Int32 | YES | | FIELD |
| j | Int64 | NO | | TIME INDEX |
+-------+-------+------+---------+---------------+
INSERT INTO TABLE t VALUES (1, 1), (3, 3), (NULL, 4);
Affected Rows: 3
SELECT * from t;
+---+---+
| i | j |
+---+---+
| 1 | 1 |
| 3 | 3 |
| | 4 |
+---+---+
-- TODO(LFC): Port test cases from standalone env when distribute rename table is implemented (#723).
ALTER TABLE t RENAME new_table;
Affected Rows: 0
DROP TABLE t;
Error: 4001(TableNotFound), Table not found: greptime.public.t
-- TODO: this clause should success
-- SQLNESS REPLACE details.*
DROP TABLE new_table;
Affected Rows: 1

View File

@@ -1,16 +0,0 @@
CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX);
DESC TABLE t;
INSERT INTO TABLE t VALUES (1, 1), (3, 3), (NULL, 4);
SELECT * from t;
-- TODO(LFC): Port test cases from standalone env when distribute rename table is implemented (#723).
ALTER TABLE t RENAME new_table;
DROP TABLE t;
-- TODO: this clause should success
-- SQLNESS REPLACE details.*
DROP TABLE new_table;

View File

@@ -36,10 +36,26 @@ SELECT * FROM test ORDER BY i NULLS LAST, j NULLS FIRST;
| | 1 | 2 |
+---+---+---+
-- TODO(ruihang): The following two SQL will fail under distributed mode with error
-- Error: 1003(Internal), status: Internal, message: "Failed to collect recordbatch, source: Failed to poll stream, source: Arrow error: Invalid argument error: batches[0] schema is different with argument schema.\n batches[0] schema: Schema { fields: [Field { name: \"i\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"j\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"t\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {\"greptime:time_index\": \"true\"} }], metadata: {\"greptime:version\": \"0\"} },\n argument schema: Schema { fields: [Field { name: \"i\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"j\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"t\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {\"greptime:time_index\": \"true\"} }], metadata: {} }\n ", details: [], metadata: MetadataMap { headers: {"inner_error_code": "Internal"} }
-- SELECT i, j, row_number() OVER (PARTITION BY i ORDER BY j NULLS FIRST) FROM test ORDER BY i NULLS FIRST, j NULLS FIRST;
-- SELECT i, j, row_number() OVER (PARTITION BY i ORDER BY j NULLS LAST) FROM test ORDER BY i NULLS FIRST, j NULLS FIRST;
SELECT i, j, row_number() OVER (PARTITION BY i ORDER BY j NULLS FIRST) FROM test ORDER BY i NULLS FIRST, j NULLS FIRST;
+---+---+------------------------------------------------------------------------------------------------------------------------+
| i | j | ROW_NUMBER() PARTITION BY [test.i] ORDER BY [test.j ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW |
+---+---+------------------------------------------------------------------------------------------------------------------------+
| | 1 | 1 |
| 1 | | 1 |
| 1 | 1 | 2 |
+---+---+------------------------------------------------------------------------------------------------------------------------+
SELECT i, j, row_number() OVER (PARTITION BY i ORDER BY j NULLS LAST) FROM test ORDER BY i NULLS FIRST, j NULLS FIRST;
+---+---+-----------------------------------------------------------------------------------------------------------------------+
| i | j | ROW_NUMBER() PARTITION BY [test.i] ORDER BY [test.j ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW |
+---+---+-----------------------------------------------------------------------------------------------------------------------+
| | 1 | 1 |
| 1 | | 2 |
| 1 | 1 | 1 |
+---+---+-----------------------------------------------------------------------------------------------------------------------+
SELECT * FROM test ORDER BY i NULLS FIRST, j NULLS LAST LIMIT 2;
+---+---+---+

View File

@@ -8,11 +8,9 @@ SELECT * FROM test ORDER BY i NULLS FIRST, j NULLS FIRST;
SELECT * FROM test ORDER BY i NULLS LAST, j NULLS FIRST;
-- TODO(ruihang): The following two SQL will fail under distributed mode with error
-- Error: 1003(Internal), status: Internal, message: "Failed to collect recordbatch, source: Failed to poll stream, source: Arrow error: Invalid argument error: batches[0] schema is different with argument schema.\n batches[0] schema: Schema { fields: [Field { name: \"i\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"j\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"t\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {\"greptime:time_index\": \"true\"} }], metadata: {\"greptime:version\": \"0\"} },\n argument schema: Schema { fields: [Field { name: \"i\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"j\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"t\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {\"greptime:time_index\": \"true\"} }], metadata: {} }\n ", details: [], metadata: MetadataMap { headers: {"inner_error_code": "Internal"} }
-- SELECT i, j, row_number() OVER (PARTITION BY i ORDER BY j NULLS FIRST) FROM test ORDER BY i NULLS FIRST, j NULLS FIRST;
SELECT i, j, row_number() OVER (PARTITION BY i ORDER BY j NULLS FIRST) FROM test ORDER BY i NULLS FIRST, j NULLS FIRST;
-- SELECT i, j, row_number() OVER (PARTITION BY i ORDER BY j NULLS LAST) FROM test ORDER BY i NULLS FIRST, j NULLS FIRST;
SELECT i, j, row_number() OVER (PARTITION BY i ORDER BY j NULLS LAST) FROM test ORDER BY i NULLS FIRST, j NULLS FIRST;
SELECT * FROM test ORDER BY i NULLS FIRST, j NULLS LAST LIMIT 2;

View File

@@ -125,5 +125,3 @@ DROP table test7;
DROP table test8;
DROP TABLE DirectReports;
-- TODO(LFC): Seems creating distributed table has some column schema related issues, look into "order_variable_size_payload" test case.