From 2168970814e2f8968e68ff7ba5268ee2cc19ece8 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 15 Aug 2023 14:27:27 +0800 Subject: [PATCH] feat: define region server and related requests (#2160) * define region server and related requests Signed-off-by: Ruihang Xia * fill request body Signed-off-by: Ruihang Xia * change mito2's request type Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * chore: bump greptime-proto to d9167cab (row insert/delete) Signed-off-by: Ruihang Xia * fix test compile Signed-off-by: Ruihang Xia * remove name_to_index Signed-off-by: Ruihang Xia * address cr comments Signed-off-by: Ruihang Xia * finilise Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 1 + src/datanode/Cargo.toml | 1 + src/datanode/src/error.rs | 35 +++++++- src/datanode/src/instance/grpc.rs | 8 +- src/datanode/src/lib.rs | 2 +- src/datanode/src/region_server.rs | 103 +++++++++++++++++++++++ src/frontend/src/instance/distributed.rs | 12 +-- src/frontend/src/instance/grpc.rs | 8 +- src/mito2/src/engine.rs | 79 +++++++---------- src/mito2/src/manifest/manager.rs | 3 +- src/mito2/src/manifest/tests/utils.rs | 3 +- src/mito2/src/memtable/key_values.rs | 3 +- src/mito2/src/metadata.rs | 54 ++++++------ src/mito2/src/request.rs | 75 ++++++----------- src/mito2/src/test_util.rs | 3 +- src/mito2/src/worker.rs | 51 ++++++----- src/mito2/src/worker/handle_close.rs | 12 +-- src/mito2/src/worker/handle_create.rs | 19 +++-- src/mito2/src/worker/handle_open.rs | 17 ++-- src/servers/tests/mod.rs | 7 +- src/store-api/Cargo.toml | 2 +- src/store-api/src/lib.rs | 3 + src/store-api/src/metadata.rs | 30 +++++++ src/store-api/src/region_engine.rs | 38 +++++++++ src/store-api/src/region_request.rs | 97 +++++++++++++++++++++ 25 files changed, 476 insertions(+), 190 deletions(-) create mode 100644 src/datanode/src/region_server.rs create mode 100644 src/store-api/src/metadata.rs create mode 100644 src/store-api/src/region_engine.rs create mode 100644 src/store-api/src/region_request.rs diff --git a/Cargo.lock b/Cargo.lock index 0a38354dfe..77c54cd392 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2645,6 +2645,7 @@ dependencies = [ "common-telemetry", "common-test-util", "common-time", + "dashmap", "datafusion", "datafusion-common", "datafusion-expr", diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 333d341b8f..5f6ac8ad7a 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -30,6 +30,7 @@ common-recordbatch = { workspace = true } common-runtime = { workspace = true } common-telemetry = { workspace = true } common-time = { workspace = true } +dashmap = "5.4" datafusion-common.workspace = true datafusion-expr.workspace = true datafusion.workspace = true diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 8db4b8b552..99467127da 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -19,7 +19,7 @@ use common_error::status_code::StatusCode; use common_procedure::ProcedureId; use serde_json::error::Error as JsonError; use snafu::{Location, Snafu}; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionId, RegionNumber}; use table::error::Error as TableError; /// Business error of datanode. @@ -482,6 +482,30 @@ pub enum Error { violated: String, location: Location, }, + + #[snafu(display( + "Failed to handle request for region {}, source: {}, location: {}", + region_id, + source, + location + ))] + HandleRegionRequest { + region_id: RegionId, + location: Location, + source: BoxedError, + }, + + #[snafu(display("RegionId {} not found, location: {}", region_id, location))] + RegionNotFound { + region_id: RegionId, + location: Location, + }, + + #[snafu(display("Region engine {} is not registered, location: {}", name, location))] + RegionEngineNotFound { name: String, location: Location }, + + #[snafu(display("Unsupported gRPC request, kind: {}, location: {}", kind, location))] + UnsupportedGrpcRequest { kind: String, location: Location }, } pub type Result = std::result::Result; @@ -559,7 +583,9 @@ impl ErrorExt for Error { | MissingInsertBody { .. } | ShutdownInstance { .. } | CloseTableEngine { .. } - | JoinTask { .. } => StatusCode::Internal, + | JoinTask { .. } + | RegionNotFound { .. } + | RegionEngineNotFound { .. } => StatusCode::Internal, StartServer { source, .. } | ShutdownServer { source, .. } @@ -570,7 +596,9 @@ impl ErrorExt for Error { OpenLogStore { source, .. } => source.status_code(), RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted, MetaClientInit { source, .. } => source.status_code(), - TableIdProviderNotFound { .. } => StatusCode::Unsupported, + TableIdProviderNotFound { .. } | UnsupportedGrpcRequest { .. } => { + StatusCode::Unsupported + } BumpTableId { source, .. } => source.status_code(), ColumnDefaultValue { source, .. } => source.status_code(), UnrecognizedTableOption { .. } => StatusCode::InvalidArguments, @@ -581,6 +609,7 @@ impl ErrorExt for Error { StartProcedureManager { source } | StopProcedureManager { source } => { source.status_code() } + HandleRegionRequest { source, .. } => source.status_code(), } } diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index bb5b75cd95..0b8d3f0a46 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -42,7 +42,7 @@ use table::table::adapter::DfTableProviderAdapter; use crate::error::{ self, CatalogSnafu, DecodeLogicalPlanSnafu, DeleteExprToRequestSnafu, DeleteSnafu, ExecuteLogicalPlanSnafu, ExecuteSqlSnafu, InsertDataSnafu, InsertSnafu, JoinTaskSnafu, - PlanStatementSnafu, Result, TableNotFoundSnafu, + PlanStatementSnafu, Result, TableNotFoundSnafu, UnsupportedGrpcRequestSnafu, }; use crate::instance::Instance; @@ -221,8 +221,10 @@ impl GrpcQueryHandler for Instance { self.handle_query(query, ctx).await } Request::Ddl(request) => self.handle_ddl(request, ctx).await, - Request::RowInserts(_) => unreachable!(), - Request::RowDelete(_) => unreachable!(), + Request::RowInserts(_) | Request::RowDelete(_) => UnsupportedGrpcRequestSnafu { + kind: "row insert/delete", + } + .fail(), } } } diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 24a010d317..9097d680c1 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -23,9 +23,9 @@ pub mod instance; pub mod metrics; #[cfg(any(test, feature = "testing"))] mod mock; +pub mod region_server; pub mod server; pub mod sql; mod store; - #[cfg(test)] mod tests; diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs new file mode 100644 index 0000000000..051cf56912 --- /dev/null +++ b/src/datanode/src/region_server.rs @@ -0,0 +1,103 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use common_query::Output; +use common_telemetry::info; +use dashmap::DashMap; +use snafu::{OptionExt, ResultExt}; +use store_api::region_engine::RegionEngineRef; +use store_api::region_request::RegionRequest; +use store_api::storage::RegionId; + +use crate::error::{ + HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, +}; + +#[derive(Default)] +pub struct RegionServer { + engines: HashMap, + region_map: DashMap, +} + +impl RegionServer { + pub fn new() -> Self { + Self::default() + } + + pub fn register_engine(&mut self, engine: RegionEngineRef) { + let engine_name = engine.name(); + self.engines.insert(engine_name.to_string(), engine); + } + + pub async fn handle_request( + &self, + region_id: RegionId, + request: RegionRequest, + ) -> Result { + // TODO(ruihang): add some metrics + + let region_change = match &request { + RegionRequest::Create(create) => RegionChange::Register(create.engine.clone()), + RegionRequest::Open(open) => RegionChange::Register(open.engine.clone()), + RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters, + RegionRequest::Write(_) + | RegionRequest::Read(_) + | RegionRequest::Delete(_) + | RegionRequest::Alter(_) + | RegionRequest::Flush(_) + | RegionRequest::Compact(_) => RegionChange::None, + }; + + let engine = match ®ion_change { + RegionChange::Register(engine_type) => self + .engines + .get(engine_type) + .with_context(|| RegionEngineNotFoundSnafu { name: engine_type })? + .clone(), + RegionChange::None | RegionChange::Deregisters => self + .region_map + .get(®ion_id) + .with_context(|| RegionNotFoundSnafu { region_id })? + .clone(), + }; + let engine_type = engine.name(); + + let result = engine + .handle_request(region_id, request) + .await + .with_context(|_| HandleRegionRequestSnafu { region_id })?; + + match region_change { + RegionChange::None => {} + RegionChange::Register(_) => { + info!("Region {region_id} is registered to engine {engine_type}"); + self.region_map.insert(region_id, engine); + } + RegionChange::Deregisters => { + info!("Region {region_id} is deregistered from engine {engine_type}"); + self.region_map.remove(®ion_id); + } + } + + Ok(result) + } +} + +enum RegionChange { + None, + Register(String), + Deregisters, +} diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 130805265f..99452fb9c7 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -64,9 +64,9 @@ use table::TableRef; use crate::catalog::FrontendCatalogManager; use crate::error::{ self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, - DeserializePartitionSnafu, InvokeDatanodeSnafu, ParseSqlSnafu, RequestDatanodeSnafu, - RequestMetaSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu, TableNotFoundSnafu, - TableSnafu, ToTableDeleteRequestSnafu, UnrecognizedTableOptionSnafu, + DeserializePartitionSnafu, InvokeDatanodeSnafu, NotSupportedSnafu, ParseSqlSnafu, + RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu, + TableNotFoundSnafu, TableSnafu, ToTableDeleteRequestSnafu, UnrecognizedTableOptionSnafu, }; use crate::expr_factory; use crate::instance::distributed::inserter::DistInserter; @@ -677,6 +677,10 @@ impl GrpcQueryHandler for DistInstance { match request { Request::Inserts(requests) => self.handle_dist_insert(requests, ctx).await, Request::Delete(request) => self.handle_dist_delete(request, ctx).await, + Request::RowInserts(_) | Request::RowDelete(_) => NotSupportedSnafu { + feat: "row insert/delete", + } + .fail(), Request::Query(_) => { unreachable!("Query should have been handled directly in Frontend Instance!") } @@ -713,8 +717,6 @@ impl GrpcQueryHandler for DistInstance { } } } - Request::RowInserts(_) => unreachable!(), - Request::RowDelete(_) => unreachable!(), } } } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 2f7dc71245..148a36ea0e 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -44,6 +44,12 @@ impl GrpcQueryHandler for Instance { let output = match request { Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?, + Request::RowInserts(_) | Request::RowDelete(_) => { + return NotSupportedSnafu { + feat: "row insert/delete", + } + .fail(); + } Request::Query(query_request) => { let query = query_request.query.context(IncompleteGrpcResultSnafu { err_msg: "Missing field 'QueryRequest.query'", @@ -88,8 +94,6 @@ impl GrpcQueryHandler for Instance { GrpcQueryHandler::do_query(self.grpc_query_handler.as_ref(), request, ctx.clone()) .await? } - Request::RowInserts(_) => unreachable!(), - Request::RowDelete(_) => unreachable!(), }; let output = interceptor.post_execute(output, ctx)?; diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 30e6deefd3..4b342318c4 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -14,21 +14,22 @@ //! Mito region engine. -#[cfg(test)] -mod tests; +// TODO: migrate test to RegionRequest +// #[cfg(test)] +// mod tests; use std::sync::Arc; +use common_query::Output; use object_store::ObjectStore; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; use store_api::logstore::LogStore; +use store_api::region_request::RegionRequest; use store_api::storage::RegionId; use crate::config::MitoConfig; -use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result}; -use crate::request::{ - CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody, WriteRequest, -}; +use crate::error::{RecvSnafu, Result}; +use crate::request::RegionTask; use crate::worker::WorkerGroup; /// Region engine implementation for timeseries data. @@ -59,29 +60,13 @@ impl MitoEngine { self.inner.stop().await } - /// Creates a new region. - pub async fn create_region(&self, create_request: CreateRequest) -> Result<()> { - self.inner - .handle_request_body(RequestBody::Create(create_request)) - .await - } - - /// Opens an existing region. - /// - /// Returns error if the region does not exist. - pub async fn open_region(&self, open_request: OpenRequest) -> Result<()> { - self.inner - .handle_request_body(RequestBody::Open(open_request)) - .await - } - - /// Closes a region. - /// - /// Does nothing if the region is already closed. - pub async fn close_region(&self, close_request: CloseRequest) -> Result<()> { - self.inner - .handle_request_body(RequestBody::Close(close_request)) - .await + pub async fn handle_request( + &self, + region_id: RegionId, + request: RegionRequest, + ) -> Result { + self.inner.handle_request(region_id, request).await?; + Ok(Output::AffectedRows(0)) } /// Returns true if the specific region exists. @@ -89,23 +74,22 @@ impl MitoEngine { self.inner.workers.is_region_exists(region_id) } - /// Write to a region. - pub async fn write_region(&self, mut write_request: WriteRequest) -> Result<()> { - let region = self - .inner - .workers - .get_region(write_request.region_id) - .context(RegionNotFoundSnafu { - region_id: write_request.region_id, - })?; - let metadata = region.metadata(); + // /// Write to a region. + // pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> { + // write_request.validate()?; + // RequestValidator::write_request(&write_request)?; - write_request.fill_missing_columns(&metadata)?; + // TODO(yingwen): Fill default values. + // We need to fill default values before writing it to WAL so we can get + // the same default value after reopening the region. - self.inner - .handle_request_body(RequestBody::Write(write_request)) - .await - } + // let metadata = region.metadata(); + + // write_request.fill_missing_columns(&metadata)?; + // self.inner + // .handle_request_body(RequestBody::Write(write_request)) + // .await + // } } /// Inner struct of [MitoEngine]. @@ -131,9 +115,10 @@ impl EngineInner { self.workers.stop().await } + // TODO(yingwen): return `Output` instead of `Result<()>`. /// Handles [RequestBody] and return its executed result. - async fn handle_request_body(&self, body: RequestBody) -> Result<()> { - let (request, receiver) = RegionRequest::from_body(body); + async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result<()> { + let (request, receiver) = RegionTask::from_request(region_id, request); self.workers.submit_to_worker(request).await?; receiver.await.context(RecvSnafu)? diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index ebe57a8e6b..ebaa753802 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -451,11 +451,12 @@ mod test { use common_datasource::compression::CompressionType; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; + use store_api::metadata::ColumnMetadata; use super::*; use crate::manifest::action::RegionChange; use crate::manifest::tests::utils::basic_region_metadata; - use crate::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use crate::metadata::RegionMetadataBuilder; use crate::test_util::TestEnv; #[tokio::test] diff --git a/src/mito2/src/manifest/tests/utils.rs b/src/mito2/src/manifest/tests/utils.rs index 7444f05119..2c4f96d507 100644 --- a/src/mito2/src/manifest/tests/utils.rs +++ b/src/mito2/src/manifest/tests/utils.rs @@ -15,9 +15,10 @@ use api::v1::SemanticType; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; +use store_api::metadata::ColumnMetadata; use store_api::storage::RegionId; -use crate::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; +use crate::metadata::{RegionMetadata, RegionMetadataBuilder}; /// Build a basic region metadata for testing. /// It contains three columns: diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index a91fbc4e7f..90a3f5e8bf 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -191,10 +191,11 @@ mod tests { use api::v1::ColumnDataType; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; + use store_api::metadata::ColumnMetadata; use store_api::storage::RegionId; use super::*; - use crate::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use crate::metadata::RegionMetadataBuilder; use crate::test_util::i64_value; const TS_NAME: &str = "ts"; diff --git a/src/mito2/src/metadata.rs b/src/mito2/src/metadata.rs index 06d5df3c89..05482d76ef 100644 --- a/src/mito2/src/metadata.rs +++ b/src/mito2/src/metadata.rs @@ -19,10 +19,11 @@ use std::sync::Arc; use api::v1::SemanticType; use datatypes::prelude::DataType; -use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::schema::{Schema, SchemaRef}; use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::metadata::ColumnMetadata; use store_api::storage::{ColumnId, RegionId}; use crate::error::{InvalidMetaSnafu, InvalidSchemaSnafu, Result, SerdeJsonSnafu}; @@ -143,7 +144,7 @@ impl RegionMetadata { let mut id_names = HashMap::with_capacity(self.column_metadatas.len()); for col in &self.column_metadatas { // Validate each column. - col.validate()?; + Self::validate_column_metadata(col)?; // Check whether column id is duplicated. We already check column name // is unique in `Schema` so we only check column id here. @@ -253,6 +254,26 @@ impl RegionMetadata { Ok(()) } + + /// Checks whether it is a valid column. + fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> { + if column_metadata.semantic_type == SemanticType::Timestamp { + ensure!( + column_metadata + .column_schema + .data_type + .is_timestamp_compatible(), + InvalidMetaSnafu { + reason: format!( + "{} is not timestamp compatible", + column_metadata.column_schema.name + ), + } + ); + } + + Ok(()) + } } /// Builder to build [RegionMetadata]. @@ -316,33 +337,6 @@ impl RegionMetadataBuilder { } } -/// Metadata of a column. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct ColumnMetadata { - /// Schema of this column. Is the same as `column_schema` in [SchemaRef]. - pub column_schema: ColumnSchema, - /// Semantic type of this column (e.g. tag or timestamp). - pub semantic_type: SemanticType, - /// Immutable and unique id of a region. - pub column_id: ColumnId, -} - -impl ColumnMetadata { - /// Checks whether it is a valid column. - pub fn validate(&self) -> Result<()> { - if self.semantic_type == SemanticType::Timestamp { - ensure!( - self.column_schema.data_type.is_timestamp_compatible(), - InvalidMetaSnafu { - reason: format!("{} is not timestamp compatible", self.column_schema.name), - } - ); - } - - Ok(()) - } -} - /// Fields skipped in serialization. struct SkippedFields { /// Last schema. @@ -390,6 +384,7 @@ impl SkippedFields { #[cfg(test)] mod test { use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; use super::*; @@ -450,7 +445,6 @@ mod test { semantic_type: SemanticType::Timestamp, column_id: 1, }; - col.validate().unwrap_err(); builder.push_column_metadata(col); let err = builder.build().unwrap_err(); diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index ec1ce0ac89..5cde0ccfb4 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -24,12 +24,14 @@ use api::helper::{ use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, Value}; use common_base::readable_size::ReadableSize; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::metadata::ColumnMetadata; +use store_api::region_request::RegionRequest; use store_api::storage::{ColumnId, CompactionStrategy, RegionId}; use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::config::DEFAULT_WRITE_BUFFER_SIZE; use crate::error::{CreateDefaultSnafu, FillDefaultSnafu, InvalidRequestSnafu, Result}; -use crate::metadata::{ColumnMetadata, RegionMetadata}; +use crate::metadata::RegionMetadata; /// Options that affect the entire region. /// @@ -350,7 +352,7 @@ pub(crate) struct SenderWriteRequest { /// Request sent to a worker pub(crate) enum WorkerRequest { /// Region request. - Region(RegionRequest), + Region(RegionTask), /// Notify a worker to stop. Stop, @@ -358,70 +360,47 @@ pub(crate) enum WorkerRequest { /// Request to modify a region. #[derive(Debug)] -pub(crate) struct RegionRequest { +pub(crate) struct RegionTask { /// Sender to send result. /// /// Now the result is a `Result<()>`, but we could replace the empty tuple /// with an enum if we need to carry more information. pub(crate) sender: Option>>, /// Request body. - pub(crate) body: RequestBody, + pub(crate) request: RegionRequest, + /// Region identifier. + pub(crate) region_id: RegionId, } -impl RegionRequest { - /// Creates a [RegionRequest] and a receiver from `body`. - pub(crate) fn from_body(body: RequestBody) -> (RegionRequest, Receiver>) { +impl RegionTask { + /// Creates a [RegionTask] and a receiver from [RegionRequest]. + pub(crate) fn from_request( + region_id: RegionId, + request: RegionRequest, + ) -> (RegionTask, Receiver>) { let (sender, receiver) = oneshot::channel(); ( - RegionRequest { + RegionTask { sender: Some(sender), - body, + request, + region_id, }, receiver, ) } } -/// Body to carry actual region request. -#[derive(Debug)] -pub(crate) enum RequestBody { - /// Write to a region. - Write(WriteRequest), +/// Mito Region Engine's request validator +pub(crate) struct RequestValidator; - // DDL: - /// Creates a new region. - Create(CreateRequest), - /// Opens an existing region. - Open(OpenRequest), - /// Closes a region. - Close(CloseRequest), -} - -impl RequestBody { - /// Region id of this request. - pub(crate) fn region_id(&self) -> RegionId { - match self { - RequestBody::Write(req) => req.region_id, - RequestBody::Create(req) => req.region_id, - RequestBody::Open(req) => req.region_id, - RequestBody::Close(req) => req.region_id, - } - } - - /// Returns whether the request is a write request. - pub(crate) fn is_write(&self) -> bool { - matches!(self, RequestBody::Write(_)) - } - - /// Converts the request into a [WriteRequest]. - /// - /// # Panics - /// Panics if it isn't a [WriteRequest]. - pub(crate) fn into_write_request(self) -> WriteRequest { - match self { - RequestBody::Write(req) => req, - other => panic!("expect write request, found {other:?}"), - } +impl RequestValidator { + /// Validate the [WriteRequest]. + pub fn write_request(_write_request: &WriteRequest) -> Result<()> { + // - checks whether the request is too large. + // - checks whether each row in rows has the same schema. + // - checks whether each column match the schema in Rows. + // - checks rows don't have duplicate columns. + unimplemented!() } } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 68265b28a0..369a027b10 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -27,13 +27,14 @@ use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::test_util::log_store_util; use object_store::services::Fs; use object_store::ObjectStore; +use store_api::metadata::ColumnMetadata; use store_api::storage::RegionId; use crate::config::MitoConfig; use crate::engine::MitoEngine; use crate::error::Result; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; -use crate::metadata::{ColumnMetadata, RegionMetadataRef}; +use crate::metadata::RegionMetadataRef; use crate::request::{CreateRequest, RegionOptions}; use crate::worker::WorkerGroup; diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index a26d657a16..cc1d6af254 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -30,6 +30,7 @@ use futures::future::try_join_all; use object_store::ObjectStore; use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; +use store_api::region_request::RegionRequest; use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, Mutex}; @@ -38,7 +39,7 @@ use crate::config::MitoConfig; use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef}; use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; -use crate::request::{RegionRequest, RequestBody, SenderWriteRequest, WorkerRequest}; +use crate::request::{RegionTask, WorkerRequest}; use crate::wal::Wal; /// Identifier for a worker. @@ -122,10 +123,8 @@ impl WorkerGroup { } /// Submit a request to a worker in the group. - pub(crate) async fn submit_to_worker(&self, request: RegionRequest) -> Result<()> { - self.worker(request.body.region_id()) - .submit_request(request) - .await + pub(crate) async fn submit_to_worker(&self, task: RegionTask) -> Result<()> { + self.worker(task.region_id).submit_request(task).await } /// Returns true if the specific region exists. @@ -206,7 +205,7 @@ impl RegionWorker { } /// Submit request to background worker thread. - async fn submit_request(&self, request: RegionRequest) -> Result<()> { + async fn submit_request(&self, request: RegionTask) -> Result<()> { ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id }); if self .sender @@ -336,18 +335,18 @@ impl RegionWorkerLoop { /// /// `buffer` should be empty. async fn handle_requests(&mut self, buffer: &mut RequestBuffer) { - let mut write_requests = Vec::with_capacity(buffer.len()); + let write_requests = Vec::with_capacity(buffer.len()); let mut ddl_requests = Vec::with_capacity(buffer.len()); for worker_req in buffer.drain(..) { match worker_req { - WorkerRequest::Region(req) => { - if req.body.is_write() { - write_requests.push(SenderWriteRequest { - sender: req.sender, - request: req.body.into_write_request(), - }); + WorkerRequest::Region(task) => { + if matches!(task.request, RegionRequest::Write(_)) { + // write_requests.push(SenderWriteRequest { + // sender: task.sender, + // request: task.request.into_write_request(), + // }); } else { - ddl_requests.push(req); + ddl_requests.push(task); } } // We receive a stop signal, but we still want to process remaining @@ -369,20 +368,26 @@ impl RegionWorkerLoop { impl RegionWorkerLoop { /// Takes and handles all ddl requests. - async fn handle_ddl_requests(&mut self, ddl_requests: Vec) { - if ddl_requests.is_empty() { + async fn handle_ddl_requests(&mut self, ddl_tasks: Vec) { + if ddl_tasks.is_empty() { return; } - for request in ddl_requests { - let res = match request.body { - RequestBody::Create(req) => self.handle_create_request(req).await, - RequestBody::Open(req) => self.handle_open_request(req).await, - RequestBody::Close(req) => self.handle_close_request(req).await, - RequestBody::Write(_) => unreachable!(), + for task in ddl_tasks { + let res: std::result::Result<(), crate::error::Error> = match task.request { + RegionRequest::Create(req) => self.handle_create_request(task.region_id, req).await, + RegionRequest::Open(req) => self.handle_open_request(task.region_id, req).await, + RegionRequest::Close(_) => self.handle_close_request(task.region_id).await, + RegionRequest::Write(_) + | RegionRequest::Read(_) + | RegionRequest::Delete(_) + | RegionRequest::Drop(_) + | RegionRequest::Alter(_) + | RegionRequest::Flush(_) + | RegionRequest::Compact(_) => unreachable!(), }; - if let Some(sender) = request.sender { + if let Some(sender) = task.sender { // Ignore send result. let _ = sender.send(res); } diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs index f08aa4ad59..b3aba7a767 100644 --- a/src/mito2/src/worker/handle_close.rs +++ b/src/mito2/src/worker/handle_close.rs @@ -15,23 +15,23 @@ //! Handling close request. use common_telemetry::info; +use store_api::storage::RegionId; use crate::error::Result; -use crate::request::CloseRequest; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { - pub(crate) async fn handle_close_request(&mut self, request: CloseRequest) -> Result<()> { - let Some(region) = self.regions.get_region(request.region_id) else { + pub(crate) async fn handle_close_request(&mut self, region_id: RegionId) -> Result<()> { + let Some(region) = self.regions.get_region(region_id) else { return Ok(()); }; - info!("Try to close region {}", request.region_id); + info!("Try to close region {}", region_id); region.stop().await?; - self.regions.remove_region(request.region_id); + self.regions.remove_region(region_id); - info!("Region {} closed", request.region_id); + info!("Region {} closed", region_id); Ok(()) } diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 75a2cfd3bf..ad2b67b1b9 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -18,22 +18,25 @@ use std::sync::Arc; use common_telemetry::info; use snafu::ensure; +use store_api::region_request::RegionCreateRequest; +use store_api::storage::RegionId; use crate::error::{RegionExistsSnafu, Result}; use crate::metadata::{RegionMetadataBuilder, INIT_REGION_VERSION}; use crate::region::opener::RegionOpener; -use crate::request::CreateRequest; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { - pub(crate) async fn handle_create_request(&mut self, request: CreateRequest) -> Result<()> { + pub(crate) async fn handle_create_request( + &mut self, + region_id: RegionId, + request: RegionCreateRequest, + ) -> Result<()> { // Checks whether the table exists. - if self.regions.is_region_exists(request.region_id) { + if self.regions.is_region_exists(region_id) { ensure!( request.create_if_not_exists, - RegionExistsSnafu { - region_id: request.region_id, - } + RegionExistsSnafu { region_id } ); // Region already exists. @@ -41,7 +44,7 @@ impl RegionWorkerLoop { } // Convert the request into a RegionMetadata and validate it. - let mut builder = RegionMetadataBuilder::new(request.region_id, INIT_REGION_VERSION); + let mut builder = RegionMetadataBuilder::new(region_id, INIT_REGION_VERSION); for column in request.column_metadatas { builder.push_column_metadata(column); } @@ -50,7 +53,7 @@ impl RegionWorkerLoop { // Create a MitoRegion from the RegionMetadata. let region = RegionOpener::new( - request.region_id, + region_id, self.memtable_builder.clone(), self.object_store.clone(), ) diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 9cca66bc7b..eac4a9173f 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -17,23 +17,28 @@ use std::sync::Arc; use common_telemetry::info; +use store_api::region_request::RegionOpenRequest; +use store_api::storage::RegionId; use crate::error::Result; use crate::region::opener::RegionOpener; -use crate::request::OpenRequest; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { - pub(crate) async fn handle_open_request(&mut self, request: OpenRequest) -> Result<()> { - if self.regions.is_region_exists(request.region_id) { + pub(crate) async fn handle_open_request( + &mut self, + region_id: RegionId, + request: RegionOpenRequest, + ) -> Result<()> { + if self.regions.is_region_exists(region_id) { return Ok(()); } - info!("Try to open region {}", request.region_id); + info!("Try to open region {}", region_id); // Open region from specific region dir. let region = RegionOpener::new( - request.region_id, + region_id, self.memtable_builder.clone(), self.object_store.clone(), ) @@ -41,7 +46,7 @@ impl RegionWorkerLoop { .open(&self.config) .await?; - info!("Region {} is opened", request.region_id); + info!("Region {} is opened", region_id); // Insert the MitoRegion into the RegionMap. self.regions.insert_region(Arc::new(region)); diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 0a8cb4101a..e0674204a1 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -159,7 +159,10 @@ impl GrpcQueryHandler for DummyInstance { ctx: QueryContextRef, ) -> std::result::Result { let output = match request { - Request::Inserts(_) | Request::Delete(_) => unimplemented!(), + Request::Inserts(_) + | Request::Delete(_) + | Request::RowInserts(_) + | Request::RowDelete(_) => unimplemented!(), Request::Query(query_request) => { let query = query_request.query.unwrap(); match query { @@ -194,8 +197,6 @@ impl GrpcQueryHandler for DummyInstance { } } Request::Ddl(_) => unimplemented!(), - Request::RowInserts(_) => unimplemented!(), - Request::RowDelete(_) => unimplemented!(), }; Ok(output) } diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index b0615d727e..ef35a8da59 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -5,7 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] -api.workspace = true +api = { workspace = true } async-trait.workspace = true bytes = "1.1" common-base = { workspace = true } diff --git a/src/store-api/src/lib.rs b/src/store-api/src/lib.rs index 627248defe..21cf732db5 100644 --- a/src/store-api/src/lib.rs +++ b/src/store-api/src/lib.rs @@ -17,4 +17,7 @@ pub mod logstore; pub mod manifest; +pub mod metadata; +pub mod region_engine; +pub mod region_request; pub mod storage; diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs new file mode 100644 index 0000000000..12be8b33e1 --- /dev/null +++ b/src/store-api/src/metadata.rs @@ -0,0 +1,30 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::SemanticType; +use datatypes::schema::ColumnSchema; +use serde::{Deserialize, Serialize}; + +use crate::storage::ColumnId; + +/// Metadata of a column. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ColumnMetadata { + /// Schema of this column. Is the same as `column_schema` in [SchemaRef]. + pub column_schema: ColumnSchema, + /// Semantic type of this column (e.g. tag or timestamp). + pub semantic_type: SemanticType, + /// Immutable and unique id of a region. + pub column_id: ColumnId, +} diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs new file mode 100644 index 0000000000..e0e14ec3fa --- /dev/null +++ b/src/store-api/src/region_engine.rs @@ -0,0 +1,38 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Region Engine's definition + +use std::sync::Arc; + +use async_trait::async_trait; +use common_error::ext::BoxedError; +use common_query::Output; + +use crate::region_request::RegionRequest; +use crate::storage::RegionId; + +#[async_trait] +pub trait RegionEngine { + /// Name of this engine + fn name(&self) -> &str; + + async fn handle_request( + &self, + region_id: RegionId, + request: RegionRequest, + ) -> Result; +} + +pub type RegionEngineRef = Arc; diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs new file mode 100644 index 0000000000..6d9bd23af8 --- /dev/null +++ b/src/store-api/src/region_request.rs @@ -0,0 +1,97 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::Rows; + +use crate::metadata::ColumnMetadata; +use crate::storage::{AlterRequest, ColumnId, ScanRequest}; + +#[derive(Debug)] +pub enum RegionRequest { + Write(RegionWriteRequest), + Read(RegionReadRequest), + Delete(RegionDeleteRequest), + Create(RegionCreateRequest), + Drop(RegionDropRequest), + Open(RegionOpenRequest), + Close(RegionCloseRequest), + Alter(RegionAlterRequest), + Flush(RegionFlushRequest), + Compact(RegionCompactRequest), +} + +/// Request to write a region. +#[derive(Debug)] +pub struct RegionWriteRequest { + /// Rows to write. + pub rows: Rows, +} + +#[derive(Debug)] +pub struct RegionReadRequest { + pub request: ScanRequest, +} + +#[derive(Debug)] +pub struct RegionDeleteRequest { + /// Rows to write. + pub rows: Rows, +} + +#[derive(Debug)] +pub struct RegionCreateRequest { + /// Region engine name + pub engine: String, + /// Columns in this region. + pub column_metadatas: Vec, + /// Columns in the primary key. + pub primary_key: Vec, + /// Create region if not exists. + pub create_if_not_exists: bool, + /// Options of the created region. + pub options: HashMap, + /// Directory for region's data home. Usually is composed by catalog and table id + pub region_dir: String, +} + +#[derive(Debug)] +pub struct RegionDropRequest {} + +/// Open region request. +#[derive(Debug)] +pub struct RegionOpenRequest { + /// Region engine name + pub engine: String, + /// Data directory of the region. + pub region_dir: String, + /// Options of the created region. + pub options: HashMap, +} + +/// Close region request. +#[derive(Debug)] +pub struct RegionCloseRequest {} + +#[derive(Debug)] +pub struct RegionAlterRequest { + pub request: AlterRequest, +} + +#[derive(Debug)] +pub struct RegionFlushRequest {} + +#[derive(Debug)] +pub struct RegionCompactRequest {}