diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index 8b66da0f9c..59cc0e2037 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -57,6 +57,7 @@ pub enum StatusCode { TableColumnExists = 4003, DatabaseNotFound = 4004, RegionNotFound = 4005, + RegionAlreadyExists = 4006, // ====== End of catalog related status code ======= // ====== Begin of storage related status code ===== @@ -115,6 +116,7 @@ impl StatusCode { | StatusCode::TableAlreadyExists | StatusCode::TableNotFound | StatusCode::RegionNotFound + | StatusCode::RegionAlreadyExists | StatusCode::TableColumnNotFound | StatusCode::TableColumnExists | StatusCode::DatabaseNotFound @@ -148,6 +150,7 @@ impl StatusCode { | StatusCode::TableAlreadyExists | StatusCode::TableNotFound | StatusCode::RegionNotFound + | StatusCode::RegionAlreadyExists | StatusCode::TableColumnNotFound | StatusCode::TableColumnExists | StatusCode::DatabaseNotFound @@ -177,6 +180,9 @@ impl StatusCode { v if v == StatusCode::TableAlreadyExists as u32 => Some(StatusCode::TableAlreadyExists), v if v == StatusCode::TableNotFound as u32 => Some(StatusCode::TableNotFound), v if v == StatusCode::RegionNotFound as u32 => Some(StatusCode::RegionNotFound), + v if v == StatusCode::RegionAlreadyExists as u32 => { + Some(StatusCode::RegionAlreadyExists) + } v if v == StatusCode::TableColumnNotFound as u32 => { Some(StatusCode::TableColumnNotFound) } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 22760fa194..606c0522ab 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -73,22 +73,12 @@ impl MitoEngine { self.inner.stop().await } - /// Handle requests that modify a region. - pub async fn handle_request( - &self, - region_id: RegionId, - request: RegionRequest, - ) -> Result { - self.inner.handle_request(region_id, request).await - } - /// Returns true if the specific region exists. pub fn is_region_exists(&self, region_id: RegionId) -> bool { self.inner.workers.is_region_exists(region_id) } - /// Handles the scan `request` and returns a [Scanner] for the `request`. - fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result { + fn scan(&self, region_id: RegionId, request: ScanRequest) -> Result { self.inner.handle_query(region_id, request) } @@ -178,10 +168,14 @@ impl RegionEngine for MitoEngine { /// Handle substrait query and return a stream of record batches async fn handle_query( &self, - _region_id: RegionId, - _request: ScanRequest, + region_id: RegionId, + request: ScanRequest, ) -> std::result::Result { - todo!() + self.scan(region_id, request) + .map_err(BoxedError::new)? + .scan() + .await + .map_err(BoxedError::new) } /// Retrieve region's metadata. diff --git a/src/mito2/src/engine/close_test.rs b/src/mito2/src/engine/close_test.rs index 5ab27300a3..d6396e034a 100644 --- a/src/mito2/src/engine/close_test.rs +++ b/src/mito2/src/engine/close_test.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use store_api::region_engine::RegionEngine; use store_api::region_request::{RegionCloseRequest, RegionRequest}; use store_api::storage::RegionId; diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs index f4ad9fabad..2b9c8bae9e 100644 --- a/src/mito2/src/engine/create_test.rs +++ b/src/mito2/src/engine/create_test.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; use store_api::storage::RegionId; use crate::config::MitoConfig; -use crate::error::Error; use crate::test_util::{CreateRequestBuilder, TestEnv}; #[tokio::test] @@ -71,7 +73,7 @@ async fn test_engine_create_existing_region() { .await .unwrap_err(); assert!( - matches!(err, Error::RegionExists { .. }), + matches!(err.status_code(), StatusCode::RegionAlreadyExists), "unexpected err: {err}" ); } diff --git a/src/mito2/src/engine/drop_test.rs b/src/mito2/src/engine/drop_test.rs index 1f6f372770..d42c37d05b 100644 --- a/src/mito2/src/engine/drop_test.rs +++ b/src/mito2/src/engine/drop_test.rs @@ -13,6 +13,7 @@ // limitations under the License. use object_store::util::join_path; +use store_api::region_engine::RegionEngine; use store_api::region_request::{RegionDropRequest, RegionRequest}; use store_api::storage::RegionId; diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index e06ea9eca4..5d2e53c210 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -14,11 +14,13 @@ use std::collections::HashMap; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use store_api::region_engine::RegionEngine; use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionRequest}; use store_api::storage::RegionId; use crate::config::MitoConfig; -use crate::error::Error; use crate::test_util::{CreateRequestBuilder, TestEnv}; #[tokio::test] @@ -38,7 +40,7 @@ async fn test_engine_open_empty() { .await .unwrap_err(); assert!( - matches!(err, Error::RegionNotFound { .. }), + matches!(err.status_code(), StatusCode::RegionNotFound), "unexpected err: {err}" ); } diff --git a/src/mito2/src/engine/tests.rs b/src/mito2/src/engine/tests.rs index 62457b65e5..25bfe00f68 100644 --- a/src/mito2/src/engine/tests.rs +++ b/src/mito2/src/engine/tests.rs @@ -19,6 +19,8 @@ use std::collections::HashMap; use api::helper::ColumnDataTypeWrapper; use api::v1::value::ValueData; use api::v1::{ColumnSchema, Row, Rows, SemanticType}; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; use common_recordbatch::RecordBatches; use store_api::metadata::ColumnMetadata; use store_api::region_request::{ @@ -28,7 +30,6 @@ use store_api::region_request::{ use store_api::storage::RegionId; use super::*; -use crate::error::Error; use crate::region::version::VersionControlData; use crate::test_util::{CreateRequestBuilder, TestEnv}; @@ -54,7 +55,7 @@ async fn test_engine_new_stop() { .await .unwrap_err(); assert!( - matches!(err, Error::WorkerStopped { .. }), + matches!(err.status_code(), StatusCode::Internal), "unexpected err: {err}" ); } @@ -181,7 +182,7 @@ async fn test_region_replay() { assert_eq!(0, rows); let request = ScanRequest::default(); - let scanner = engine.handle_query(region_id, request).unwrap(); + let scanner = engine.scan(region_id, request).unwrap(); let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!(42, batches.iter().map(|b| b.num_rows()).sum::()); @@ -222,8 +223,7 @@ async fn test_write_query_region() { put_rows(&engine, region_id, rows).await; let request = ScanRequest::default(); - let scanner = engine.handle_query(region_id, request).unwrap(); - let stream = scanner.scan().await.unwrap(); + let stream = engine.handle_query(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ @@ -335,7 +335,7 @@ async fn test_put_delete() { delete_rows(&engine, region_id, rows).await; let request = ScanRequest::default(); - let scanner = engine.handle_query(region_id, request).unwrap(); + let scanner = engine.scan(region_id, request).unwrap(); let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ @@ -393,7 +393,7 @@ async fn test_put_overwrite() { put_rows(&engine, region_id, rows).await; let request = ScanRequest::default(); - let scanner = engine.handle_query(region_id, request).unwrap(); + let scanner = engine.scan(region_id, request).unwrap(); let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ @@ -444,7 +444,7 @@ async fn test_manual_flush() { assert_eq!(0, rows); let request = ScanRequest::default(); - let scanner = engine.handle_query(region_id, request).unwrap(); + let scanner = engine.scan(region_id, request).unwrap(); assert_eq!(1, scanner.num_files()); let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 1281381958..e3aa757c77 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -445,12 +445,12 @@ impl ErrorExt for Error { | DecompressObject { .. } | SerdeJson { .. } | Utf8 { .. } - | RegionExists { .. } | NewRecordBatch { .. } - | RegionNotFound { .. } | RegionCorrupted { .. } | CreateDefault { .. } | InvalidParquet { .. } => StatusCode::Unexpected, + RegionNotFound { .. } => StatusCode::RegionNotFound, + RegionExists { .. } => StatusCode::RegionAlreadyExists, InvalidScanIndex { .. } | InvalidMeta { .. } | InvalidSchema { .. } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index b6a7435ee5..f376d3a344 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -465,7 +465,9 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code { | StatusCode::EngineExecuteQuery => Code::Internal, StatusCode::InvalidArguments | StatusCode::InvalidSyntax => Code::InvalidArgument, StatusCode::Cancelled => Code::Cancelled, - StatusCode::TableAlreadyExists | StatusCode::TableColumnExists => Code::AlreadyExists, + StatusCode::TableAlreadyExists + | StatusCode::TableColumnExists + | StatusCode::RegionAlreadyExists => Code::AlreadyExists, StatusCode::TableNotFound | StatusCode::RegionNotFound | StatusCode::TableColumnNotFound