feat(mito): preparation to implementing write (#2085)

* refactor: move request mod

* feat: add mutation

* feat: add handle_write mod

* feat: one mutation at a time

* feat: handle write requests

* feat: validate schema

* refactor: move schema check to write request

* feat: add convert value

* feat: fill default values

* chore: remove comments

* feat: remove code

* feat: remove code

* feat: buf requests

* style: fix clippy

* refactor: rename check functions

* chore: fix compile error

* chore: Revert "feat: remove code"

This reverts commit 6516597540.

* chore: Revert "feat: remove code"

This reverts commit 5f2b790a01.

* chore: upgrade greptime-proto

* chore: Update comment

Co-authored-by: dennis zhuang <killme2008@gmail.com>

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
This commit is contained in:
Yingwen
2023-08-04 11:53:02 +09:00
committed by GitHub
parent ac81d3c74f
commit e5663a075f
12 changed files with 620 additions and 54 deletions

19
Cargo.lock generated
View File

@@ -211,7 +211,7 @@ dependencies = [
"common-error",
"common-time",
"datatypes",
"greptime-proto",
"greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=eeae2d0dfa8ee320a7b9e987b4631a6c1c732ebd)",
"prost",
"snafu",
"tonic 0.9.2",
@@ -4111,6 +4111,18 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ec4b84931378004db60d168e2604bc3fb9735e9c#ec4b84931378004db60d168e2604bc3fb9735e9c"
dependencies = [
"prost",
"serde",
"serde_json",
"tonic 0.9.2",
"tonic-build",
]
[[package]]
name = "greptime-proto"
version = "0.1.0"
@@ -5498,6 +5510,7 @@ dependencies = [
"datafusion-common",
"datatypes",
"futures",
"greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ec4b84931378004db60d168e2604bc3fb9735e9c)",
"lazy_static",
"log-store",
"metrics",
@@ -6982,7 +6995,7 @@ dependencies = [
"datafusion",
"datatypes",
"futures",
"greptime-proto",
"greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=eeae2d0dfa8ee320a7b9e987b4631a6c1c732ebd)",
"promql-parser",
"prost",
"query",
@@ -7252,7 +7265,7 @@ dependencies = [
"format_num",
"futures",
"futures-util",
"greptime-proto",
"greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=eeae2d0dfa8ee320a7b9e987b4631a6c1c732ebd)",
"humantime",
"metrics",
"num",

View File

@@ -32,6 +32,8 @@ datafusion.workspace = true
datafusion-common.workspace = true
datatypes = { path = "../datatypes" }
futures.workspace = true
# TODO(yingwen): Update and use api crate once https://github.com/GreptimeTeam/greptime-proto/pull/75 is merged.
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec4b84931378004db60d168e2604bc3fb9735e9c" }
lazy_static = "1.4"
log-store = { path = "../log-store" }
metrics.workspace = true

View File

@@ -26,8 +26,9 @@ use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::error::{RecvSnafu, Result};
pub use crate::request::CreateRequest;
use crate::request::{CloseRequest, OpenRequest, RegionRequest, RequestBody};
use crate::request::{
CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody, WriteRequest,
};
use crate::worker::WorkerGroup;
/// Region engine implementation for timeseries data.
@@ -84,6 +85,19 @@ impl MitoEngine {
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_exists(region_id)
}
/// Write to a region.
pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> {
write_request.validate()?;
// TODO(yingwen): Fill default values.
// We need to fill default values before writing it to WAL so we can get
// the same default value after reopening the region.
self.inner
.handle_request_body(RequestBody::Write(write_request))
.await
}
}
/// Inner struct of [MitoEngine].

View File

@@ -174,6 +174,37 @@ pub enum Error {
reason: String,
location: Location,
},
#[snafu(display("Invalid request to region {}, reason: {}", region_id, reason))]
InvalidRequest {
region_id: RegionId,
reason: String,
location: Location,
},
/// An error type to indicate that schema is changed and we need
/// to fill default values again.
#[snafu(display(
"Need to fill default value to column {} of region {}",
column,
region_id
))]
FillDefault {
region_id: RegionId,
column: String,
// The error is for retry purpose so we don't need a location.
},
#[snafu(display(
"Failed to create default value for column {} of region {}",
column,
region_id
))]
CreateDefault {
region_id: RegionId,
column: String,
source: datatypes::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -193,10 +224,13 @@ impl ErrorExt for Error {
| RegionExists { .. }
| NewRecordBatch { .. }
| RegionNotFound { .. }
| RegionCorrupted { .. } => StatusCode::Unexpected,
InvalidScanIndex { .. } | InvalidMeta { .. } | InvalidSchema { .. } => {
StatusCode::InvalidArguments
}
| RegionCorrupted { .. }
| CreateDefault { .. } => StatusCode::Unexpected,
InvalidScanIndex { .. }
| InvalidMeta { .. }
| InvalidSchema { .. }
| InvalidRequest { .. }
| FillDefault { .. } => StatusCode::InvalidArguments,
RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => {
StatusCode::Internal
}

View File

@@ -31,6 +31,7 @@ pub mod manifest;
pub mod memtable;
#[allow(dead_code)]
pub mod metadata;
pub(crate) mod proto_util;
pub mod read;
#[allow(dead_code)]
mod region;

View File

@@ -310,14 +310,14 @@ impl ColumnMetadata {
}
/// The semantic type of one column
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum SemanticType {
/// Tag column, also is a part of primary key.
Tag,
Tag = 0,
/// A column that isn't a time index or part of primary key.
Field,
Field = 1,
/// Time index column.
Timestamp,
Timestamp = 2,
}
/// Fields skipped in serialization.

188
src/mito2/src/proto_util.rs Normal file
View File

@@ -0,0 +1,188 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities to process protobuf messages.
use common_time::timestamp::TimeUnit;
use datatypes::prelude::ConcreteDataType;
use datatypes::types::{TimeType, TimestampType};
use datatypes::value::Value;
use greptime_proto::v1::{self, ColumnDataType};
use store_api::storage::OpType;
use crate::metadata::SemanticType;
/// Returns true if the pb semantic type is valid.
pub(crate) fn is_semantic_type_eq(type_value: i32, semantic_type: SemanticType) -> bool {
type_value == semantic_type as i32
}
/// Returns true if the pb type value is valid.
pub(crate) fn is_column_type_value_eq(type_value: i32, expect_type: &ConcreteDataType) -> bool {
let Some(column_type) = ColumnDataType::from_i32(type_value) else {
return false;
};
is_column_type_eq(column_type, expect_type)
}
/// Convert value into proto's value.
pub(crate) fn to_proto_value(value: Value) -> Option<v1::Value> {
let proto_value = match value {
Value::Null => v1::Value { value: None },
Value::Boolean(v) => v1::Value {
value: Some(v1::value::Value::BoolValue(v)),
},
Value::UInt8(v) => v1::Value {
value: Some(v1::value::Value::U8Value(v.into())),
},
Value::UInt16(v) => v1::Value {
value: Some(v1::value::Value::U16Value(v.into())),
},
Value::UInt32(v) => v1::Value {
value: Some(v1::value::Value::U32Value(v)),
},
Value::UInt64(v) => v1::Value {
value: Some(v1::value::Value::U64Value(v)),
},
Value::Int8(v) => v1::Value {
value: Some(v1::value::Value::I8Value(v.into())),
},
Value::Int16(v) => v1::Value {
value: Some(v1::value::Value::I16Value(v.into())),
},
Value::Int32(v) => v1::Value {
value: Some(v1::value::Value::I32Value(v)),
},
Value::Int64(v) => v1::Value {
value: Some(v1::value::Value::I64Value(v)),
},
Value::Float32(v) => v1::Value {
value: Some(v1::value::Value::F32Value(*v)),
},
Value::Float64(v) => v1::Value {
value: Some(v1::value::Value::F64Value(*v)),
},
Value::String(v) => v1::Value {
value: Some(v1::value::Value::StringValue(v.as_utf8().to_string())),
},
Value::Binary(v) => v1::Value {
value: Some(v1::value::Value::BinaryValue(v.to_vec())),
},
Value::Date(v) => v1::Value {
value: Some(v1::value::Value::DateValue(v.val())),
},
Value::DateTime(v) => v1::Value {
value: Some(v1::value::Value::DatetimeValue(v.val())),
},
Value::Timestamp(v) => match v.unit() {
TimeUnit::Second => v1::Value {
value: Some(v1::value::Value::TsSecondValue(v.value())),
},
TimeUnit::Millisecond => v1::Value {
value: Some(v1::value::Value::TsMillisecondValue(v.value())),
},
TimeUnit::Microsecond => v1::Value {
value: Some(v1::value::Value::TsMicrosecondValue(v.value())),
},
TimeUnit::Nanosecond => v1::Value {
value: Some(v1::value::Value::TsNanosecondValue(v.value())),
},
},
Value::Time(v) => match v.unit() {
TimeUnit::Second => v1::Value {
value: Some(v1::value::Value::TimeSecondValue(v.value())),
},
TimeUnit::Millisecond => v1::Value {
value: Some(v1::value::Value::TimeMillisecondValue(v.value())),
},
TimeUnit::Microsecond => v1::Value {
value: Some(v1::value::Value::TimeMicrosecondValue(v.value())),
},
TimeUnit::Nanosecond => v1::Value {
value: Some(v1::value::Value::TimeNanosecondValue(v.value())),
},
},
Value::Interval(_) | Value::List(_) => return None,
};
Some(proto_value)
}
/// Convert [ConcreteDataType] to [ColumnDataType].
pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option<ColumnDataType> {
let column_data_type = match data_type {
ConcreteDataType::Boolean(_) => ColumnDataType::Boolean,
ConcreteDataType::Int8(_) => ColumnDataType::Int8,
ConcreteDataType::Int16(_) => ColumnDataType::Int16,
ConcreteDataType::Int32(_) => ColumnDataType::Int32,
ConcreteDataType::Int64(_) => ColumnDataType::Int64,
ConcreteDataType::UInt8(_) => ColumnDataType::Uint8,
ConcreteDataType::UInt16(_) => ColumnDataType::Uint16,
ConcreteDataType::UInt32(_) => ColumnDataType::Uint32,
ConcreteDataType::UInt64(_) => ColumnDataType::Uint64,
ConcreteDataType::Float32(_) => ColumnDataType::Float32,
ConcreteDataType::Float64(_) => ColumnDataType::Float64,
ConcreteDataType::Binary(_) => ColumnDataType::Binary,
ConcreteDataType::String(_) => ColumnDataType::String,
ConcreteDataType::Date(_) => ColumnDataType::Date,
ConcreteDataType::DateTime(_) => ColumnDataType::Datetime,
ConcreteDataType::Timestamp(TimestampType::Second(_)) => ColumnDataType::TimestampSecond,
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
ColumnDataType::TimestampMillisecond
}
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
ColumnDataType::TimestampMicrosecond
}
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
ColumnDataType::TimestampNanosecond
}
ConcreteDataType::Time(TimeType::Second(_)) => ColumnDataType::TimeSecond,
ConcreteDataType::Time(TimeType::Millisecond(_)) => ColumnDataType::TimeMillisecond,
ConcreteDataType::Time(TimeType::Microsecond(_)) => ColumnDataType::TimeMicrosecond,
ConcreteDataType::Time(TimeType::Nanosecond(_)) => ColumnDataType::TimeNanosecond,
ConcreteDataType::Null(_)
| ConcreteDataType::Interval(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_) => return None,
};
Some(column_data_type)
}
/// Convert semantic type to proto's semantic type
pub(crate) fn to_proto_semantic_type(semantic_type: SemanticType) -> v1::SemanticType {
match semantic_type {
SemanticType::Tag => v1::SemanticType::Tag,
SemanticType::Field => v1::SemanticType::Field,
SemanticType::Timestamp => v1::SemanticType::Timestamp,
}
}
/// Convert op type to proto's op type.
pub(crate) fn to_proto_op_type(op_type: OpType) -> v1::mito::OpType {
match op_type {
OpType::Delete => v1::mito::OpType::Delete,
OpType::Put => v1::mito::OpType::Put,
}
}
/// Returns true if the column type is equal to expected type.
fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool {
if let Some(expect) = to_column_data_type(expect_type) {
column_type == expect
} else {
false
}
}

View File

@@ -15,7 +15,7 @@
//! Mito region.
pub(crate) mod opener;
mod version;
pub(crate) mod version;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
@@ -25,7 +25,7 @@ use store_api::storage::RegionId;
use crate::error::Result;
use crate::manifest::manager::RegionManifestManager;
use crate::region::version::VersionControlRef;
use crate::region::version::{VersionControlRef, VersionRef};
/// Type to store region version.
pub type VersionNumber = u32;
@@ -56,6 +56,11 @@ impl MitoRegion {
Ok(())
}
/// Returns current version of the region.
pub(crate) fn version(&self) -> VersionRef {
self.version_control.current()
}
}
/// Regions indexed by ids.

View File

@@ -26,7 +26,6 @@
use std::sync::Arc;
use arc_swap::ArcSwap;
use store_api::manifest::ManifestVersion;
use store_api::storage::SequenceNumber;
use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
@@ -48,6 +47,11 @@ impl VersionControl {
version: ArcSwap::new(Arc::new(version)),
}
}
/// Returns current [Version].
pub(crate) fn current(&self) -> VersionRef {
self.version.load_full()
}
}
pub(crate) type VersionControlRef = Arc<VersionControl>;
@@ -59,21 +63,20 @@ pub(crate) struct Version {
///
/// Altering metadata isn't frequent, storing metadata in Arc to allow sharing
/// metadata and reuse metadata when creating a new `Version`.
metadata: RegionMetadataRef,
pub(crate) metadata: RegionMetadataRef,
/// Mutable and immutable memtables.
///
/// Wrapped in Arc to make clone of `Version` much cheaper.
memtables: MemtableVersionRef,
pub(crate) memtables: MemtableVersionRef,
/// SSTs of the region.
ssts: SstVersionRef,
pub(crate) ssts: SstVersionRef,
/// Inclusive max sequence of flushed data.
flushed_sequence: SequenceNumber,
// TODO(yingwen): Remove this.
/// Current version of region manifest.
manifest_version: ManifestVersion,
pub(crate) flushed_sequence: SequenceNumber,
// TODO(yingwen): RegionOptions.
}
pub(crate) type VersionRef = Arc<Version>;
/// Version builder.
pub(crate) struct VersionBuilder {
metadata: RegionMetadataRef,
@@ -94,7 +97,6 @@ impl VersionBuilder {
memtables: Arc::new(MemtableVersion::new(self.mutable)),
ssts: Arc::new(SstVersion::new()),
flushed_sequence: 0,
manifest_version: 0,
}
}
}

View File

@@ -14,15 +14,22 @@
//! Worker requests.
use std::collections::HashMap;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use store_api::storage::{ColumnId, CompactionStrategy, RegionId};
use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{ColumnId, CompactionStrategy, OpType, RegionId};
use tokio::sync::oneshot::{self, Receiver, Sender};
use crate::config::DEFAULT_WRITE_BUFFER_SIZE;
use crate::error::Result;
use crate::metadata::ColumnMetadata;
use crate::error::{CreateDefaultSnafu, FillDefaultSnafu, InvalidRequestSnafu, Result};
use crate::metadata::{ColumnMetadata, RegionMetadata};
use crate::proto_util::{
is_column_type_value_eq, is_semantic_type_eq, to_column_data_type, to_proto_semantic_type,
to_proto_value,
};
/// Options that affect the entire region.
///
@@ -84,9 +91,193 @@ pub struct CloseRequest {
/// Request to write a region.
#[derive(Debug)]
pub(crate) struct WriteRequest {
pub struct WriteRequest {
/// Region to write.
pub region_id: RegionId,
/// Type of the write request.
pub op_type: OpType,
/// Rows to write.
pub rows: Rows,
/// Map column name to column index in `rows`.
name_to_index: HashMap<String, usize>,
}
impl WriteRequest {
/// Returns a new request.
pub fn new(region_id: RegionId, op_type: OpType, rows: Rows) -> WriteRequest {
let name_to_index = rows
.schema
.iter()
.enumerate()
.map(|(index, column)| (column.column_name.clone(), index))
.collect();
WriteRequest {
region_id,
op_type,
rows,
name_to_index,
}
}
/// Validate the request.
pub(crate) fn validate(&self) -> Result<()> {
// - checks whether the request is too large.
// - checks whether each row in rows has the same schema.
// - checks whether each column match the schema in Rows.
// - checks rows don't have duplicate columns.
unimplemented!()
}
/// Checks schema of rows.
///
/// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
/// error.
pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
let region_id = self.region_id;
// Index all columns in rows.
let mut rows_columns: HashMap<_, _> = self
.rows
.schema
.iter()
.map(|column| (&column.column_name, column))
.collect();
// Checks all columns in this region.
for column in &metadata.column_metadatas {
if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
// Check data type.
ensure!(
is_column_type_value_eq(input_col.datatype, &column.column_schema.data_type),
InvalidRequestSnafu {
region_id,
reason: format!(
"column {} expect type {:?}, given: {:?}({})",
column.column_schema.name,
column.column_schema.data_type,
ColumnDataType::from_i32(input_col.datatype),
input_col.datatype,
)
}
);
// Check semantic type.
ensure!(
is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
InvalidRequestSnafu {
region_id,
reason: format!(
"column {} has semantic type {:?}, given: {:?}({})",
column.column_schema.name,
column.semantic_type,
greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type),
input_col.semantic_type
),
}
);
} else {
// For columns not in rows, checks whether they have default value.
ensure!(
column.column_schema.is_nullable()
|| column.column_schema.default_constraint().is_some(),
InvalidRequestSnafu {
region_id,
reason: format!("missing column {}", column.column_schema.name),
}
);
return FillDefaultSnafu {
region_id,
column: &column.column_schema.name,
}
.fail();
}
}
// Checks all columns in rows exist in the region.
if !rows_columns.is_empty() {
let names: Vec<_> = rows_columns.into_keys().collect();
return InvalidRequestSnafu {
region_id,
reason: format!("unknown columns: {:?}", names),
}
.fail();
}
Ok(())
}
/// Try to fill missing columns.
///
/// Currently, our protobuf format might be inefficient when we need to fill lots of null
/// values.
pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
for column in &metadata.column_metadatas {
if !self.name_to_index.contains_key(&column.column_schema.name) {
self.fill_column(metadata.region_id, column)?;
}
}
Ok(())
}
/// Fill default value for specific `column`.
fn fill_column(&mut self, region_id: RegionId, column: &ColumnMetadata) -> Result<()> {
// Need to add a default value for this column.
let default_value = column
.column_schema
.create_default()
.context(CreateDefaultSnafu {
region_id,
column: &column.column_schema.name,
})?
// This column doesn't have default value.
.with_context(|| InvalidRequestSnafu {
region_id,
reason: format!(
"column {} does not have default value",
column.column_schema.name
),
})?;
// Convert default value into proto's value.
let proto_value = to_proto_value(default_value).with_context(|| InvalidRequestSnafu {
region_id,
reason: format!(
"no protobuf type for default value of column {} ({:?})",
column.column_schema.name, column.column_schema.data_type
),
})?;
// Insert default value to each row.
for row in &mut self.rows.rows {
row.values.push(proto_value.clone());
}
// Insert column schema.
let datatype = to_column_data_type(&column.column_schema.data_type).with_context(|| {
InvalidRequestSnafu {
region_id,
reason: format!(
"no protobuf type for column {} ({:?})",
column.column_schema.name, column.column_schema.data_type
),
}
})?;
self.rows.schema.push(ColumnSchema {
column_name: column.column_schema.name.clone(),
datatype: datatype as i32,
semantic_type: to_proto_semantic_type(column.semantic_type) as i32,
});
Ok(())
}
}
/// Sender and write request.
pub(crate) struct SenderWriteRequest {
/// Result sender.
pub(crate) sender: Option<Sender<Result<()>>>,
pub(crate) request: WriteRequest,
}
/// Request sent to a worker
@@ -127,7 +318,6 @@ impl RegionRequest {
/// Body to carry actual region request.
#[derive(Debug)]
pub(crate) enum RequestBody {
// DML:
/// Write to a region.
Write(WriteRequest),
@@ -151,13 +341,19 @@ impl RequestBody {
}
}
/// Returns whether the request is a DDL (e.g. CREATE/OPEN/ALTER).
pub(crate) fn is_ddl(&self) -> bool {
/// Returns whether the request is a write request.
pub(crate) fn is_write(&self) -> bool {
matches!(self, RequestBody::Write(_))
}
/// Converts the request into a [WriteRequest].
///
/// # Panics
/// Panics if it isn't a [WriteRequest].
pub(crate) fn into_write_request(self) -> WriteRequest {
match self {
RequestBody::Write(_) => false,
RequestBody::Create(_) => true,
RequestBody::Open(_) => true,
RequestBody::Close(_) => true,
RequestBody::Write(req) => req,
other => panic!("expect write request, found {other:?}"),
}
}
}

View File

@@ -17,6 +17,7 @@
mod handle_close;
mod handle_create;
mod handle_open;
mod handle_write;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
@@ -37,7 +38,7 @@ use crate::config::MitoConfig;
use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef};
use crate::region::{RegionMap, RegionMapRef};
use crate::request::{RegionRequest, RequestBody, WorkerRequest};
use crate::request::{RegionRequest, RequestBody, SenderWriteRequest, WorkerRequest};
/// Identifier for a worker.
pub(crate) type WorkerId = u32;
@@ -322,15 +323,18 @@ impl<S> RegionWorkerLoop<S> {
///
/// `buffer` should be empty.
async fn handle_requests(&mut self, buffer: &mut RequestBuffer) {
let mut dml_requests = Vec::with_capacity(buffer.len());
let mut write_requests = Vec::with_capacity(buffer.len());
let mut ddl_requests = Vec::with_capacity(buffer.len());
for worker_req in buffer.drain(..) {
match worker_req {
WorkerRequest::Region(req) => {
if req.body.is_ddl() {
ddl_requests.push(req);
if req.body.is_write() {
write_requests.push(SenderWriteRequest {
sender: req.sender,
request: req.body.into_write_request(),
});
} else {
dml_requests.push(req);
ddl_requests.push(req);
}
}
// We receive a stop signal, but we still want to process remaining
@@ -342,24 +346,13 @@ impl<S> RegionWorkerLoop<S> {
}
}
// Handles all dml requests first. So we can alter regions without
// considering existing dml requests.
self.handle_dml_requests(dml_requests).await;
// Handles all write requests first. So we can alter regions without
// considering existing write requests.
self.handle_write_requests(write_requests).await;
self.handle_ddl_requests(ddl_requests).await;
}
/// Takes and handles all dml requests.
async fn handle_dml_requests(&mut self, write_requests: Vec<RegionRequest>) {
if write_requests.is_empty() {
return;
}
// Create a write context that holds meta and sequence.
unimplemented!()
}
/// Takes and handles all ddl requests.
async fn handle_ddl_requests(&mut self, ddl_requests: Vec<RegionRequest>) {
if ddl_requests.is_empty() {

View File

@@ -0,0 +1,118 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Handling write requests.
use std::collections::{hash_map, HashMap};
use greptime_proto::v1::mito::Mutation;
use tokio::sync::oneshot::Sender;
use crate::error::{RegionNotFoundSnafu, Result};
use crate::proto_util::to_proto_op_type;
use crate::region::version::VersionRef;
use crate::region::MitoRegionRef;
use crate::request::SenderWriteRequest;
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
/// Takes and handles all write requests.
pub(crate) async fn handle_write_requests(&mut self, write_requests: Vec<SenderWriteRequest>) {
if write_requests.is_empty() {
return;
}
let mut region_ctxs = HashMap::new();
for sender_req in write_requests {
let region_id = sender_req.request.region_id;
// Checks whether the region exists.
if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
let Some(region) = self.regions.get_region(region_id) else {
// No such region.
send_result(sender_req.sender, RegionNotFoundSnafu {
region_id,
}.fail());
continue;
};
// Initialize the context.
e.insert(RegionWriteCtx::new(region));
}
// Safety: Now we ensure the region exists.
let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
// Checks whether request schema is compatible with region schema.
if let Err(e) = sender_req
.request
.check_schema(&region_ctx.version.metadata)
{
send_result(sender_req.sender, Err(e));
continue;
}
// Collect requests by region.
region_ctx.push_sender_request(sender_req);
}
todo!()
}
}
/// Send result to the request.
fn send_result(sender: Option<Sender<Result<()>>>, res: Result<()>) {
if let Some(sender) = sender {
// Ignore send result.
let _ = sender.send(res);
}
}
/// Context to keep region metadata and buffer write requests.
struct RegionWriteCtx {
/// Region to write.
region: MitoRegionRef,
/// Version of the region while creating the context.
version: VersionRef,
/// Valid mutations.
mutations: Vec<Mutation>,
/// Result senders.
///
/// The sender is 1:1 map to the mutation in `mutations`.
senders: Vec<Option<Sender<Result<()>>>>,
}
impl RegionWriteCtx {
/// Returns an empty context.
fn new(region: MitoRegionRef) -> RegionWriteCtx {
let version = region.version();
RegionWriteCtx {
region,
version,
mutations: Vec::new(),
senders: Vec::new(),
}
}
/// Push [SenderWriteRequest] to the context.
fn push_sender_request(&mut self, sender_req: SenderWriteRequest) {
self.mutations.push(Mutation {
op_type: to_proto_op_type(sender_req.request.op_type) as i32,
sequence: 0, // TODO(yingwen): Set sequence.
rows: Some(sender_req.request.rows),
});
self.senders.push(sender_req.sender);
}
}