feat: define region server and related requests (#2160)

* define region server and related requests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fill request body

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change mito2's request type

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: bump greptime-proto to d9167cab (row insert/delete)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix test compile

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove name_to_index

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* address cr comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* finilise

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-08-15 14:27:27 +08:00
committed by GitHub
parent 69a2036cee
commit 2168970814
25 changed files with 476 additions and 190 deletions

1
Cargo.lock generated
View File

@@ -2645,6 +2645,7 @@ dependencies = [
"common-telemetry",
"common-test-util",
"common-time",
"dashmap",
"datafusion",
"datafusion-common",
"datafusion-expr",

View File

@@ -30,6 +30,7 @@ common-recordbatch = { workspace = true }
common-runtime = { workspace = true }
common-telemetry = { workspace = true }
common-time = { workspace = true }
dashmap = "5.4"
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion.workspace = true

View File

@@ -19,7 +19,7 @@ use common_error::status_code::StatusCode;
use common_procedure::ProcedureId;
use serde_json::error::Error as JsonError;
use snafu::{Location, Snafu};
use store_api::storage::RegionNumber;
use store_api::storage::{RegionId, RegionNumber};
use table::error::Error as TableError;
/// Business error of datanode.
@@ -482,6 +482,30 @@ pub enum Error {
violated: String,
location: Location,
},
#[snafu(display(
"Failed to handle request for region {}, source: {}, location: {}",
region_id,
source,
location
))]
HandleRegionRequest {
region_id: RegionId,
location: Location,
source: BoxedError,
},
#[snafu(display("RegionId {} not found, location: {}", region_id, location))]
RegionNotFound {
region_id: RegionId,
location: Location,
},
#[snafu(display("Region engine {} is not registered, location: {}", name, location))]
RegionEngineNotFound { name: String, location: Location },
#[snafu(display("Unsupported gRPC request, kind: {}, location: {}", kind, location))]
UnsupportedGrpcRequest { kind: String, location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -559,7 +583,9 @@ impl ErrorExt for Error {
| MissingInsertBody { .. }
| ShutdownInstance { .. }
| CloseTableEngine { .. }
| JoinTask { .. } => StatusCode::Internal,
| JoinTask { .. }
| RegionNotFound { .. }
| RegionEngineNotFound { .. } => StatusCode::Internal,
StartServer { source, .. }
| ShutdownServer { source, .. }
@@ -570,7 +596,9 @@ impl ErrorExt for Error {
OpenLogStore { source, .. } => source.status_code(),
RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted,
MetaClientInit { source, .. } => source.status_code(),
TableIdProviderNotFound { .. } => StatusCode::Unsupported,
TableIdProviderNotFound { .. } | UnsupportedGrpcRequest { .. } => {
StatusCode::Unsupported
}
BumpTableId { source, .. } => source.status_code(),
ColumnDefaultValue { source, .. } => source.status_code(),
UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,
@@ -581,6 +609,7 @@ impl ErrorExt for Error {
StartProcedureManager { source } | StopProcedureManager { source } => {
source.status_code()
}
HandleRegionRequest { source, .. } => source.status_code(),
}
}

View File

@@ -42,7 +42,7 @@ use table::table::adapter::DfTableProviderAdapter;
use crate::error::{
self, CatalogSnafu, DecodeLogicalPlanSnafu, DeleteExprToRequestSnafu, DeleteSnafu,
ExecuteLogicalPlanSnafu, ExecuteSqlSnafu, InsertDataSnafu, InsertSnafu, JoinTaskSnafu,
PlanStatementSnafu, Result, TableNotFoundSnafu,
PlanStatementSnafu, Result, TableNotFoundSnafu, UnsupportedGrpcRequestSnafu,
};
use crate::instance::Instance;
@@ -221,8 +221,10 @@ impl GrpcQueryHandler for Instance {
self.handle_query(query, ctx).await
}
Request::Ddl(request) => self.handle_ddl(request, ctx).await,
Request::RowInserts(_) => unreachable!(),
Request::RowDelete(_) => unreachable!(),
Request::RowInserts(_) | Request::RowDelete(_) => UnsupportedGrpcRequestSnafu {
kind: "row insert/delete",
}
.fail(),
}
}
}

View File

@@ -23,9 +23,9 @@ pub mod instance;
pub mod metrics;
#[cfg(any(test, feature = "testing"))]
mod mock;
pub mod region_server;
pub mod server;
pub mod sql;
mod store;
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,103 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_query::Output;
use common_telemetry::info;
use dashmap::DashMap;
use snafu::{OptionExt, ResultExt};
use store_api::region_engine::RegionEngineRef;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use crate::error::{
HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result,
};
#[derive(Default)]
pub struct RegionServer {
engines: HashMap<String, RegionEngineRef>,
region_map: DashMap<RegionId, RegionEngineRef>,
}
impl RegionServer {
pub fn new() -> Self {
Self::default()
}
pub fn register_engine(&mut self, engine: RegionEngineRef) {
let engine_name = engine.name();
self.engines.insert(engine_name.to_string(), engine);
}
pub async fn handle_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output> {
// TODO(ruihang): add some metrics
let region_change = match &request {
RegionRequest::Create(create) => RegionChange::Register(create.engine.clone()),
RegionRequest::Open(open) => RegionChange::Register(open.engine.clone()),
RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
RegionRequest::Write(_)
| RegionRequest::Read(_)
| RegionRequest::Delete(_)
| RegionRequest::Alter(_)
| RegionRequest::Flush(_)
| RegionRequest::Compact(_) => RegionChange::None,
};
let engine = match &region_change {
RegionChange::Register(engine_type) => self
.engines
.get(engine_type)
.with_context(|| RegionEngineNotFoundSnafu { name: engine_type })?
.clone(),
RegionChange::None | RegionChange::Deregisters => self
.region_map
.get(&region_id)
.with_context(|| RegionNotFoundSnafu { region_id })?
.clone(),
};
let engine_type = engine.name();
let result = engine
.handle_request(region_id, request)
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })?;
match region_change {
RegionChange::None => {}
RegionChange::Register(_) => {
info!("Region {region_id} is registered to engine {engine_type}");
self.region_map.insert(region_id, engine);
}
RegionChange::Deregisters => {
info!("Region {region_id} is deregistered from engine {engine_type}");
self.region_map.remove(&region_id);
}
}
Ok(result)
}
}
enum RegionChange {
None,
Register(String),
Deregisters,
}

View File

@@ -64,9 +64,9 @@ use table::TableRef;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
DeserializePartitionSnafu, InvokeDatanodeSnafu, ParseSqlSnafu, RequestDatanodeSnafu,
RequestMetaSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu, TableNotFoundSnafu,
TableSnafu, ToTableDeleteRequestSnafu, UnrecognizedTableOptionSnafu,
DeserializePartitionSnafu, InvokeDatanodeSnafu, NotSupportedSnafu, ParseSqlSnafu,
RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu,
TableNotFoundSnafu, TableSnafu, ToTableDeleteRequestSnafu, UnrecognizedTableOptionSnafu,
};
use crate::expr_factory;
use crate::instance::distributed::inserter::DistInserter;
@@ -677,6 +677,10 @@ impl GrpcQueryHandler for DistInstance {
match request {
Request::Inserts(requests) => self.handle_dist_insert(requests, ctx).await,
Request::Delete(request) => self.handle_dist_delete(request, ctx).await,
Request::RowInserts(_) | Request::RowDelete(_) => NotSupportedSnafu {
feat: "row insert/delete",
}
.fail(),
Request::Query(_) => {
unreachable!("Query should have been handled directly in Frontend Instance!")
}
@@ -713,8 +717,6 @@ impl GrpcQueryHandler for DistInstance {
}
}
}
Request::RowInserts(_) => unreachable!(),
Request::RowDelete(_) => unreachable!(),
}
}
}

View File

@@ -44,6 +44,12 @@ impl GrpcQueryHandler for Instance {
let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(_) | Request::RowDelete(_) => {
return NotSupportedSnafu {
feat: "row insert/delete",
}
.fail();
}
Request::Query(query_request) => {
let query = query_request.query.context(IncompleteGrpcResultSnafu {
err_msg: "Missing field 'QueryRequest.query'",
@@ -88,8 +94,6 @@ impl GrpcQueryHandler for Instance {
GrpcQueryHandler::do_query(self.grpc_query_handler.as_ref(), request, ctx.clone())
.await?
}
Request::RowInserts(_) => unreachable!(),
Request::RowDelete(_) => unreachable!(),
};
let output = interceptor.post_execute(output, ctx)?;

View File

@@ -14,21 +14,22 @@
//! Mito region engine.
#[cfg(test)]
mod tests;
// TODO: migrate test to RegionRequest
// #[cfg(test)]
// mod tests;
use std::sync::Arc;
use common_query::Output;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result};
use crate::request::{
CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody, WriteRequest,
};
use crate::error::{RecvSnafu, Result};
use crate::request::RegionTask;
use crate::worker::WorkerGroup;
/// Region engine implementation for timeseries data.
@@ -59,29 +60,13 @@ impl MitoEngine {
self.inner.stop().await
}
/// Creates a new region.
pub async fn create_region(&self, create_request: CreateRequest) -> Result<()> {
self.inner
.handle_request_body(RequestBody::Create(create_request))
.await
}
/// Opens an existing region.
///
/// Returns error if the region does not exist.
pub async fn open_region(&self, open_request: OpenRequest) -> Result<()> {
self.inner
.handle_request_body(RequestBody::Open(open_request))
.await
}
/// Closes a region.
///
/// Does nothing if the region is already closed.
pub async fn close_region(&self, close_request: CloseRequest) -> Result<()> {
self.inner
.handle_request_body(RequestBody::Close(close_request))
.await
pub async fn handle_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output> {
self.inner.handle_request(region_id, request).await?;
Ok(Output::AffectedRows(0))
}
/// Returns true if the specific region exists.
@@ -89,23 +74,22 @@ impl MitoEngine {
self.inner.workers.is_region_exists(region_id)
}
/// Write to a region.
pub async fn write_region(&self, mut write_request: WriteRequest) -> Result<()> {
let region = self
.inner
.workers
.get_region(write_request.region_id)
.context(RegionNotFoundSnafu {
region_id: write_request.region_id,
})?;
let metadata = region.metadata();
// /// Write to a region.
// pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> {
// write_request.validate()?;
// RequestValidator::write_request(&write_request)?;
write_request.fill_missing_columns(&metadata)?;
// TODO(yingwen): Fill default values.
// We need to fill default values before writing it to WAL so we can get
// the same default value after reopening the region.
self.inner
.handle_request_body(RequestBody::Write(write_request))
.await
}
// let metadata = region.metadata();
// write_request.fill_missing_columns(&metadata)?;
// self.inner
// .handle_request_body(RequestBody::Write(write_request))
// .await
// }
}
/// Inner struct of [MitoEngine].
@@ -131,9 +115,10 @@ impl EngineInner {
self.workers.stop().await
}
// TODO(yingwen): return `Output` instead of `Result<()>`.
/// Handles [RequestBody] and return its executed result.
async fn handle_request_body(&self, body: RequestBody) -> Result<()> {
let (request, receiver) = RegionRequest::from_body(body);
async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result<()> {
let (request, receiver) = RegionTask::from_request(region_id, request);
self.workers.submit_to_worker(request).await?;
receiver.await.context(RecvSnafu)?

View File

@@ -451,11 +451,12 @@ mod test {
use common_datasource::compression::CompressionType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use super::*;
use crate::manifest::action::RegionChange;
use crate::manifest::tests::utils::basic_region_metadata;
use crate::metadata::{ColumnMetadata, RegionMetadataBuilder};
use crate::metadata::RegionMetadataBuilder;
use crate::test_util::TestEnv;
#[tokio::test]

View File

@@ -15,9 +15,10 @@
use api::v1::SemanticType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::storage::RegionId;
use crate::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use crate::metadata::{RegionMetadata, RegionMetadataBuilder};
/// Build a basic region metadata for testing.
/// It contains three columns:

View File

@@ -191,10 +191,11 @@ mod tests {
use api::v1::ColumnDataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::storage::RegionId;
use super::*;
use crate::metadata::{ColumnMetadata, RegionMetadataBuilder};
use crate::metadata::RegionMetadataBuilder;
use crate::test_util::i64_value;
const TS_NAME: &str = "ts";

View File

@@ -19,10 +19,11 @@ use std::sync::Arc;
use api::v1::SemanticType;
use datatypes::prelude::DataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::schema::{Schema, SchemaRef};
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::storage::{ColumnId, RegionId};
use crate::error::{InvalidMetaSnafu, InvalidSchemaSnafu, Result, SerdeJsonSnafu};
@@ -143,7 +144,7 @@ impl RegionMetadata {
let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
for col in &self.column_metadatas {
// Validate each column.
col.validate()?;
Self::validate_column_metadata(col)?;
// Check whether column id is duplicated. We already check column name
// is unique in `Schema` so we only check column id here.
@@ -253,6 +254,26 @@ impl RegionMetadata {
Ok(())
}
/// Checks whether it is a valid column.
fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
if column_metadata.semantic_type == SemanticType::Timestamp {
ensure!(
column_metadata
.column_schema
.data_type
.is_timestamp_compatible(),
InvalidMetaSnafu {
reason: format!(
"{} is not timestamp compatible",
column_metadata.column_schema.name
),
}
);
}
Ok(())
}
}
/// Builder to build [RegionMetadata].
@@ -316,33 +337,6 @@ impl RegionMetadataBuilder {
}
}
/// Metadata of a column.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ColumnMetadata {
/// Schema of this column. Is the same as `column_schema` in [SchemaRef].
pub column_schema: ColumnSchema,
/// Semantic type of this column (e.g. tag or timestamp).
pub semantic_type: SemanticType,
/// Immutable and unique id of a region.
pub column_id: ColumnId,
}
impl ColumnMetadata {
/// Checks whether it is a valid column.
pub fn validate(&self) -> Result<()> {
if self.semantic_type == SemanticType::Timestamp {
ensure!(
self.column_schema.data_type.is_timestamp_compatible(),
InvalidMetaSnafu {
reason: format!("{} is not timestamp compatible", self.column_schema.name),
}
);
}
Ok(())
}
}
/// Fields skipped in serialization.
struct SkippedFields {
/// Last schema.
@@ -390,6 +384,7 @@ impl SkippedFields {
#[cfg(test)]
mod test {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use super::*;
@@ -450,7 +445,6 @@ mod test {
semantic_type: SemanticType::Timestamp,
column_id: 1,
};
col.validate().unwrap_err();
builder.push_column_metadata(col);
let err = builder.build().unwrap_err();

View File

@@ -24,12 +24,14 @@ use api::helper::{
use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, Value};
use common_base::readable_size::ReadableSize;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::region_request::RegionRequest;
use store_api::storage::{ColumnId, CompactionStrategy, RegionId};
use tokio::sync::oneshot::{self, Receiver, Sender};
use crate::config::DEFAULT_WRITE_BUFFER_SIZE;
use crate::error::{CreateDefaultSnafu, FillDefaultSnafu, InvalidRequestSnafu, Result};
use crate::metadata::{ColumnMetadata, RegionMetadata};
use crate::metadata::RegionMetadata;
/// Options that affect the entire region.
///
@@ -350,7 +352,7 @@ pub(crate) struct SenderWriteRequest {
/// Request sent to a worker
pub(crate) enum WorkerRequest {
/// Region request.
Region(RegionRequest),
Region(RegionTask),
/// Notify a worker to stop.
Stop,
@@ -358,70 +360,47 @@ pub(crate) enum WorkerRequest {
/// Request to modify a region.
#[derive(Debug)]
pub(crate) struct RegionRequest {
pub(crate) struct RegionTask {
/// Sender to send result.
///
/// Now the result is a `Result<()>`, but we could replace the empty tuple
/// with an enum if we need to carry more information.
pub(crate) sender: Option<Sender<Result<()>>>,
/// Request body.
pub(crate) body: RequestBody,
pub(crate) request: RegionRequest,
/// Region identifier.
pub(crate) region_id: RegionId,
}
impl RegionRequest {
/// Creates a [RegionRequest] and a receiver from `body`.
pub(crate) fn from_body(body: RequestBody) -> (RegionRequest, Receiver<Result<()>>) {
impl RegionTask {
/// Creates a [RegionTask] and a receiver from [RegionRequest].
pub(crate) fn from_request(
region_id: RegionId,
request: RegionRequest,
) -> (RegionTask, Receiver<Result<()>>) {
let (sender, receiver) = oneshot::channel();
(
RegionRequest {
RegionTask {
sender: Some(sender),
body,
request,
region_id,
},
receiver,
)
}
}
/// Body to carry actual region request.
#[derive(Debug)]
pub(crate) enum RequestBody {
/// Write to a region.
Write(WriteRequest),
/// Mito Region Engine's request validator
pub(crate) struct RequestValidator;
// DDL:
/// Creates a new region.
Create(CreateRequest),
/// Opens an existing region.
Open(OpenRequest),
/// Closes a region.
Close(CloseRequest),
}
impl RequestBody {
/// Region id of this request.
pub(crate) fn region_id(&self) -> RegionId {
match self {
RequestBody::Write(req) => req.region_id,
RequestBody::Create(req) => req.region_id,
RequestBody::Open(req) => req.region_id,
RequestBody::Close(req) => req.region_id,
}
}
/// Returns whether the request is a write request.
pub(crate) fn is_write(&self) -> bool {
matches!(self, RequestBody::Write(_))
}
/// Converts the request into a [WriteRequest].
///
/// # Panics
/// Panics if it isn't a [WriteRequest].
pub(crate) fn into_write_request(self) -> WriteRequest {
match self {
RequestBody::Write(req) => req,
other => panic!("expect write request, found {other:?}"),
}
impl RequestValidator {
/// Validate the [WriteRequest].
pub fn write_request(_write_request: &WriteRequest) -> Result<()> {
// - checks whether the request is too large.
// - checks whether each row in rows has the same schema.
// - checks whether each column match the schema in Rows.
// - checks rows don't have duplicate columns.
unimplemented!()
}
}

View File

@@ -27,13 +27,14 @@ use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::test_util::log_store_util;
use object_store::services::Fs;
use object_store::ObjectStore;
use store_api::metadata::ColumnMetadata;
use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::error::Result;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::metadata::{ColumnMetadata, RegionMetadataRef};
use crate::metadata::RegionMetadataRef;
use crate::request::{CreateRequest, RegionOptions};
use crate::worker::WorkerGroup;

View File

@@ -30,6 +30,7 @@ use futures::future::try_join_all;
use object_store::ObjectStore;
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, Mutex};
@@ -38,7 +39,7 @@ use crate::config::MitoConfig;
use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef};
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
use crate::request::{RegionRequest, RequestBody, SenderWriteRequest, WorkerRequest};
use crate::request::{RegionTask, WorkerRequest};
use crate::wal::Wal;
/// Identifier for a worker.
@@ -122,10 +123,8 @@ impl WorkerGroup {
}
/// Submit a request to a worker in the group.
pub(crate) async fn submit_to_worker(&self, request: RegionRequest) -> Result<()> {
self.worker(request.body.region_id())
.submit_request(request)
.await
pub(crate) async fn submit_to_worker(&self, task: RegionTask) -> Result<()> {
self.worker(task.region_id).submit_request(task).await
}
/// Returns true if the specific region exists.
@@ -206,7 +205,7 @@ impl RegionWorker {
}
/// Submit request to background worker thread.
async fn submit_request(&self, request: RegionRequest) -> Result<()> {
async fn submit_request(&self, request: RegionTask) -> Result<()> {
ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
if self
.sender
@@ -336,18 +335,18 @@ impl<S: LogStore> RegionWorkerLoop<S> {
///
/// `buffer` should be empty.
async fn handle_requests(&mut self, buffer: &mut RequestBuffer) {
let mut write_requests = Vec::with_capacity(buffer.len());
let write_requests = Vec::with_capacity(buffer.len());
let mut ddl_requests = Vec::with_capacity(buffer.len());
for worker_req in buffer.drain(..) {
match worker_req {
WorkerRequest::Region(req) => {
if req.body.is_write() {
write_requests.push(SenderWriteRequest {
sender: req.sender,
request: req.body.into_write_request(),
});
WorkerRequest::Region(task) => {
if matches!(task.request, RegionRequest::Write(_)) {
// write_requests.push(SenderWriteRequest {
// sender: task.sender,
// request: task.request.into_write_request(),
// });
} else {
ddl_requests.push(req);
ddl_requests.push(task);
}
}
// We receive a stop signal, but we still want to process remaining
@@ -369,20 +368,26 @@ impl<S: LogStore> RegionWorkerLoop<S> {
impl<S> RegionWorkerLoop<S> {
/// Takes and handles all ddl requests.
async fn handle_ddl_requests(&mut self, ddl_requests: Vec<RegionRequest>) {
if ddl_requests.is_empty() {
async fn handle_ddl_requests(&mut self, ddl_tasks: Vec<RegionTask>) {
if ddl_tasks.is_empty() {
return;
}
for request in ddl_requests {
let res = match request.body {
RequestBody::Create(req) => self.handle_create_request(req).await,
RequestBody::Open(req) => self.handle_open_request(req).await,
RequestBody::Close(req) => self.handle_close_request(req).await,
RequestBody::Write(_) => unreachable!(),
for task in ddl_tasks {
let res: std::result::Result<(), crate::error::Error> = match task.request {
RegionRequest::Create(req) => self.handle_create_request(task.region_id, req).await,
RegionRequest::Open(req) => self.handle_open_request(task.region_id, req).await,
RegionRequest::Close(_) => self.handle_close_request(task.region_id).await,
RegionRequest::Write(_)
| RegionRequest::Read(_)
| RegionRequest::Delete(_)
| RegionRequest::Drop(_)
| RegionRequest::Alter(_)
| RegionRequest::Flush(_)
| RegionRequest::Compact(_) => unreachable!(),
};
if let Some(sender) = request.sender {
if let Some(sender) = task.sender {
// Ignore send result.
let _ = sender.send(res);
}

View File

@@ -15,23 +15,23 @@
//! Handling close request.
use common_telemetry::info;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::request::CloseRequest;
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
pub(crate) async fn handle_close_request(&mut self, request: CloseRequest) -> Result<()> {
let Some(region) = self.regions.get_region(request.region_id) else {
pub(crate) async fn handle_close_request(&mut self, region_id: RegionId) -> Result<()> {
let Some(region) = self.regions.get_region(region_id) else {
return Ok(());
};
info!("Try to close region {}", request.region_id);
info!("Try to close region {}", region_id);
region.stop().await?;
self.regions.remove_region(request.region_id);
self.regions.remove_region(region_id);
info!("Region {} closed", request.region_id);
info!("Region {} closed", region_id);
Ok(())
}

View File

@@ -18,22 +18,25 @@ use std::sync::Arc;
use common_telemetry::info;
use snafu::ensure;
use store_api::region_request::RegionCreateRequest;
use store_api::storage::RegionId;
use crate::error::{RegionExistsSnafu, Result};
use crate::metadata::{RegionMetadataBuilder, INIT_REGION_VERSION};
use crate::region::opener::RegionOpener;
use crate::request::CreateRequest;
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
pub(crate) async fn handle_create_request(&mut self, request: CreateRequest) -> Result<()> {
pub(crate) async fn handle_create_request(
&mut self,
region_id: RegionId,
request: RegionCreateRequest,
) -> Result<()> {
// Checks whether the table exists.
if self.regions.is_region_exists(request.region_id) {
if self.regions.is_region_exists(region_id) {
ensure!(
request.create_if_not_exists,
RegionExistsSnafu {
region_id: request.region_id,
}
RegionExistsSnafu { region_id }
);
// Region already exists.
@@ -41,7 +44,7 @@ impl<S> RegionWorkerLoop<S> {
}
// Convert the request into a RegionMetadata and validate it.
let mut builder = RegionMetadataBuilder::new(request.region_id, INIT_REGION_VERSION);
let mut builder = RegionMetadataBuilder::new(region_id, INIT_REGION_VERSION);
for column in request.column_metadatas {
builder.push_column_metadata(column);
}
@@ -50,7 +53,7 @@ impl<S> RegionWorkerLoop<S> {
// Create a MitoRegion from the RegionMetadata.
let region = RegionOpener::new(
request.region_id,
region_id,
self.memtable_builder.clone(),
self.object_store.clone(),
)

View File

@@ -17,23 +17,28 @@
use std::sync::Arc;
use common_telemetry::info;
use store_api::region_request::RegionOpenRequest;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::region::opener::RegionOpener;
use crate::request::OpenRequest;
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
pub(crate) async fn handle_open_request(&mut self, request: OpenRequest) -> Result<()> {
if self.regions.is_region_exists(request.region_id) {
pub(crate) async fn handle_open_request(
&mut self,
region_id: RegionId,
request: RegionOpenRequest,
) -> Result<()> {
if self.regions.is_region_exists(region_id) {
return Ok(());
}
info!("Try to open region {}", request.region_id);
info!("Try to open region {}", region_id);
// Open region from specific region dir.
let region = RegionOpener::new(
request.region_id,
region_id,
self.memtable_builder.clone(),
self.object_store.clone(),
)
@@ -41,7 +46,7 @@ impl<S> RegionWorkerLoop<S> {
.open(&self.config)
.await?;
info!("Region {} is opened", request.region_id);
info!("Region {} is opened", region_id);
// Insert the MitoRegion into the RegionMap.
self.regions.insert_region(Arc::new(region));

View File

@@ -159,7 +159,10 @@ impl GrpcQueryHandler for DummyInstance {
ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error> {
let output = match request {
Request::Inserts(_) | Request::Delete(_) => unimplemented!(),
Request::Inserts(_)
| Request::Delete(_)
| Request::RowInserts(_)
| Request::RowDelete(_) => unimplemented!(),
Request::Query(query_request) => {
let query = query_request.query.unwrap();
match query {
@@ -194,8 +197,6 @@ impl GrpcQueryHandler for DummyInstance {
}
}
Request::Ddl(_) => unimplemented!(),
Request::RowInserts(_) => unimplemented!(),
Request::RowDelete(_) => unimplemented!(),
};
Ok(output)
}

View File

@@ -5,7 +5,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
api.workspace = true
api = { workspace = true }
async-trait.workspace = true
bytes = "1.1"
common-base = { workspace = true }

View File

@@ -17,4 +17,7 @@
pub mod logstore;
pub mod manifest;
pub mod metadata;
pub mod region_engine;
pub mod region_request;
pub mod storage;

View File

@@ -0,0 +1,30 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::SemanticType;
use datatypes::schema::ColumnSchema;
use serde::{Deserialize, Serialize};
use crate::storage::ColumnId;
/// Metadata of a column.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ColumnMetadata {
/// Schema of this column. Is the same as `column_schema` in [SchemaRef].
pub column_schema: ColumnSchema,
/// Semantic type of this column (e.g. tag or timestamp).
pub semantic_type: SemanticType,
/// Immutable and unique id of a region.
pub column_id: ColumnId,
}

View File

@@ -0,0 +1,38 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Region Engine's definition
use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_query::Output;
use crate::region_request::RegionRequest;
use crate::storage::RegionId;
#[async_trait]
pub trait RegionEngine {
/// Name of this engine
fn name(&self) -> &str;
async fn handle_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output, BoxedError>;
}
pub type RegionEngineRef = Arc<dyn RegionEngine>;

View File

@@ -0,0 +1,97 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::Rows;
use crate::metadata::ColumnMetadata;
use crate::storage::{AlterRequest, ColumnId, ScanRequest};
#[derive(Debug)]
pub enum RegionRequest {
Write(RegionWriteRequest),
Read(RegionReadRequest),
Delete(RegionDeleteRequest),
Create(RegionCreateRequest),
Drop(RegionDropRequest),
Open(RegionOpenRequest),
Close(RegionCloseRequest),
Alter(RegionAlterRequest),
Flush(RegionFlushRequest),
Compact(RegionCompactRequest),
}
/// Request to write a region.
#[derive(Debug)]
pub struct RegionWriteRequest {
/// Rows to write.
pub rows: Rows,
}
#[derive(Debug)]
pub struct RegionReadRequest {
pub request: ScanRequest,
}
#[derive(Debug)]
pub struct RegionDeleteRequest {
/// Rows to write.
pub rows: Rows,
}
#[derive(Debug)]
pub struct RegionCreateRequest {
/// Region engine name
pub engine: String,
/// Columns in this region.
pub column_metadatas: Vec<ColumnMetadata>,
/// Columns in the primary key.
pub primary_key: Vec<ColumnId>,
/// Create region if not exists.
pub create_if_not_exists: bool,
/// Options of the created region.
pub options: HashMap<String, String>,
/// Directory for region's data home. Usually is composed by catalog and table id
pub region_dir: String,
}
#[derive(Debug)]
pub struct RegionDropRequest {}
/// Open region request.
#[derive(Debug)]
pub struct RegionOpenRequest {
/// Region engine name
pub engine: String,
/// Data directory of the region.
pub region_dir: String,
/// Options of the created region.
pub options: HashMap<String, String>,
}
/// Close region request.
#[derive(Debug)]
pub struct RegionCloseRequest {}
#[derive(Debug)]
pub struct RegionAlterRequest {
pub request: AlterRequest,
}
#[derive(Debug)]
pub struct RegionFlushRequest {}
#[derive(Debug)]
pub struct RegionCompactRequest {}