feat: adapt methods from RegionEngine for MitoEngine (#2315)

* feat: adapt methods from RegionEngine for MitoEngine

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* minor fixes

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-09-04 01:32:44 -05:00
parent 920763d7dd
commit 50fca2400e
9 changed files with 37 additions and 29 deletions

View File

@@ -57,6 +57,7 @@ pub enum StatusCode {
TableColumnExists = 4003,
DatabaseNotFound = 4004,
RegionNotFound = 4005,
RegionAlreadyExists = 4006,
// ====== End of catalog related status code =======
// ====== Begin of storage related status code =====
@@ -115,6 +116,7 @@ impl StatusCode {
| StatusCode::TableAlreadyExists
| StatusCode::TableNotFound
| StatusCode::RegionNotFound
| StatusCode::RegionAlreadyExists
| StatusCode::TableColumnNotFound
| StatusCode::TableColumnExists
| StatusCode::DatabaseNotFound
@@ -148,6 +150,7 @@ impl StatusCode {
| StatusCode::TableAlreadyExists
| StatusCode::TableNotFound
| StatusCode::RegionNotFound
| StatusCode::RegionAlreadyExists
| StatusCode::TableColumnNotFound
| StatusCode::TableColumnExists
| StatusCode::DatabaseNotFound
@@ -177,6 +180,9 @@ impl StatusCode {
v if v == StatusCode::TableAlreadyExists as u32 => Some(StatusCode::TableAlreadyExists),
v if v == StatusCode::TableNotFound as u32 => Some(StatusCode::TableNotFound),
v if v == StatusCode::RegionNotFound as u32 => Some(StatusCode::RegionNotFound),
v if v == StatusCode::RegionAlreadyExists as u32 => {
Some(StatusCode::RegionAlreadyExists)
}
v if v == StatusCode::TableColumnNotFound as u32 => {
Some(StatusCode::TableColumnNotFound)
}

View File

@@ -73,22 +73,12 @@ impl MitoEngine {
self.inner.stop().await
}
/// Handle requests that modify a region.
pub async fn handle_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output> {
self.inner.handle_request(region_id, request).await
}
/// Returns true if the specific region exists.
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_exists(region_id)
}
/// Handles the scan `request` and returns a [Scanner] for the `request`.
fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
fn scan(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
self.inner.handle_query(region_id, request)
}
@@ -178,10 +168,14 @@ impl RegionEngine for MitoEngine {
/// Handle substrait query and return a stream of record batches
async fn handle_query(
&self,
_region_id: RegionId,
_request: ScanRequest,
region_id: RegionId,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
todo!()
self.scan(region_id, request)
.map_err(BoxedError::new)?
.scan()
.await
.map_err(BoxedError::new)
}
/// Retrieve region's metadata.

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionCloseRequest, RegionRequest};
use store_api::storage::RegionId;

View File

@@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::error::Error;
use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
@@ -71,7 +73,7 @@ async fn test_engine_create_existing_region() {
.await
.unwrap_err();
assert!(
matches!(err, Error::RegionExists { .. }),
matches!(err.status_code(), StatusCode::RegionAlreadyExists),
"unexpected err: {err}"
);
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use object_store::util::join_path;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionDropRequest, RegionRequest};
use store_api::storage::RegionId;

View File

@@ -14,11 +14,13 @@
use std::collections::HashMap;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::error::Error;
use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
@@ -38,7 +40,7 @@ async fn test_engine_open_empty() {
.await
.unwrap_err();
assert!(
matches!(err, Error::RegionNotFound { .. }),
matches!(err.status_code(), StatusCode::RegionNotFound),
"unexpected err: {err}"
);
}

View File

@@ -19,6 +19,8 @@ use std::collections::HashMap;
use api::helper::ColumnDataTypeWrapper;
use api::v1::value::ValueData;
use api::v1::{ColumnSchema, Row, Rows, SemanticType};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{
@@ -28,7 +30,6 @@ use store_api::region_request::{
use store_api::storage::RegionId;
use super::*;
use crate::error::Error;
use crate::region::version::VersionControlData;
use crate::test_util::{CreateRequestBuilder, TestEnv};
@@ -54,7 +55,7 @@ async fn test_engine_new_stop() {
.await
.unwrap_err();
assert!(
matches!(err, Error::WorkerStopped { .. }),
matches!(err.status_code(), StatusCode::Internal),
"unexpected err: {err}"
);
}
@@ -181,7 +182,7 @@ async fn test_region_replay() {
assert_eq!(0, rows);
let request = ScanRequest::default();
let scanner = engine.handle_query(region_id, request).unwrap();
let scanner = engine.scan(region_id, request).unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(42, batches.iter().map(|b| b.num_rows()).sum::<usize>());
@@ -222,8 +223,7 @@ async fn test_write_query_region() {
put_rows(&engine, region_id, rows).await;
let request = ScanRequest::default();
let scanner = engine.handle_query(region_id, request).unwrap();
let stream = scanner.scan().await.unwrap();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
@@ -335,7 +335,7 @@ async fn test_put_delete() {
delete_rows(&engine, region_id, rows).await;
let request = ScanRequest::default();
let scanner = engine.handle_query(region_id, request).unwrap();
let scanner = engine.scan(region_id, request).unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
@@ -393,7 +393,7 @@ async fn test_put_overwrite() {
put_rows(&engine, region_id, rows).await;
let request = ScanRequest::default();
let scanner = engine.handle_query(region_id, request).unwrap();
let scanner = engine.scan(region_id, request).unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
@@ -444,7 +444,7 @@ async fn test_manual_flush() {
assert_eq!(0, rows);
let request = ScanRequest::default();
let scanner = engine.handle_query(region_id, request).unwrap();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();

View File

@@ -445,12 +445,12 @@ impl ErrorExt for Error {
| DecompressObject { .. }
| SerdeJson { .. }
| Utf8 { .. }
| RegionExists { .. }
| NewRecordBatch { .. }
| RegionNotFound { .. }
| RegionCorrupted { .. }
| CreateDefault { .. }
| InvalidParquet { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
RegionExists { .. } => StatusCode::RegionAlreadyExists,
InvalidScanIndex { .. }
| InvalidMeta { .. }
| InvalidSchema { .. }

View File

@@ -465,7 +465,9 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code {
| StatusCode::EngineExecuteQuery => Code::Internal,
StatusCode::InvalidArguments | StatusCode::InvalidSyntax => Code::InvalidArgument,
StatusCode::Cancelled => Code::Cancelled,
StatusCode::TableAlreadyExists | StatusCode::TableColumnExists => Code::AlreadyExists,
StatusCode::TableAlreadyExists
| StatusCode::TableColumnExists
| StatusCode::RegionAlreadyExists => Code::AlreadyExists,
StatusCode::TableNotFound
| StatusCode::RegionNotFound
| StatusCode::TableColumnNotFound