diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index b3953a2a1c..1aeacc9270 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -31,6 +31,7 @@ pub mod manifest; pub mod memtable; #[allow(dead_code)] pub mod metadata; +pub(crate) mod proto_util; pub mod read; #[allow(dead_code)] mod region; diff --git a/src/mito2/src/proto_util.rs b/src/mito2/src/proto_util.rs new file mode 100644 index 0000000000..04bcedc8b6 --- /dev/null +++ b/src/mito2/src/proto_util.rs @@ -0,0 +1,77 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities to process protobuf messages. + +use datatypes::prelude::ConcreteDataType; +use datatypes::types::{TimeType, TimestampType}; +use greptime_proto::v1::ColumnDataType; + +use crate::metadata::SemanticType; + +/// Returns true if the pb semantic type is valid. +pub(crate) fn check_semantic_type(type_value: i32, semantic_type: SemanticType) -> bool { + type_value == semantic_type as i32 +} + +/// Returns true if the pb type value is valid. +pub(crate) fn check_column_type(type_value: i32, expect_type: &ConcreteDataType) -> bool { + let Some(column_type) = ColumnDataType::from_i32(type_value) else { + return false; + }; + + is_column_type_eq(column_type, expect_type) +} + +/// Returns true if the column type is equal to exepcted type. +fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool { + match (column_type, expect_type) { + (ColumnDataType::Boolean, ConcreteDataType::Boolean(_)) + | (ColumnDataType::Int8, ConcreteDataType::Int8(_)) + | (ColumnDataType::Int16, ConcreteDataType::Int16(_)) + | (ColumnDataType::Int32, ConcreteDataType::Int32(_)) + | (ColumnDataType::Int64, ConcreteDataType::Int64(_)) + | (ColumnDataType::Uint8, ConcreteDataType::UInt8(_)) + | (ColumnDataType::Uint16, ConcreteDataType::UInt16(_)) + | (ColumnDataType::Uint32, ConcreteDataType::UInt32(_)) + | (ColumnDataType::Uint64, ConcreteDataType::UInt64(_)) + | (ColumnDataType::Float32, ConcreteDataType::Float32(_)) + | (ColumnDataType::Float64, ConcreteDataType::Float64(_)) + | (ColumnDataType::Binary, ConcreteDataType::Binary(_)) + | (ColumnDataType::String, ConcreteDataType::String(_)) + | (ColumnDataType::Date, ConcreteDataType::Date(_)) + | (ColumnDataType::Datetime, ConcreteDataType::DateTime(_)) + | ( + ColumnDataType::TimestampSecond, + ConcreteDataType::Timestamp(TimestampType::Second(_)), + ) + | ( + ColumnDataType::TimestampMillisecond, + ConcreteDataType::Timestamp(TimestampType::Millisecond(_)), + ) + | ( + ColumnDataType::TimestampMicrosecond, + ConcreteDataType::Timestamp(TimestampType::Microsecond(_)), + ) + | ( + ColumnDataType::TimestampNanosecond, + ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)), + ) + | (ColumnDataType::TimeSecond, ConcreteDataType::Time(TimeType::Second(_))) + | (ColumnDataType::TimeMillisecond, ConcreteDataType::Time(TimeType::Millisecond(_))) + | (ColumnDataType::TimeMicrosecond, ConcreteDataType::Time(TimeType::Microsecond(_))) + | (ColumnDataType::TimeNanosecond, ConcreteDataType::Time(TimeType::Nanosecond(_))) => true, + _ => false, + } +} diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index d1dfd29701..d80c06e711 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -14,16 +14,19 @@ //! Worker requests. +use std::collections::HashMap; use std::time::Duration; use common_base::readable_size::ReadableSize; -use greptime_proto::v1::Rows; +use greptime_proto::v1::{ColumnDataType, Rows}; +use snafu::ensure; use store_api::storage::{ColumnId, CompactionStrategy, OpType, RegionId}; use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::config::DEFAULT_WRITE_BUFFER_SIZE; -use crate::error::Result; -use crate::metadata::ColumnMetadata; +use crate::error::{InvalidRequestSnafu, Result}; +use crate::metadata::{ColumnMetadata, RegionMetadata}; +use crate::proto_util::{check_column_type, check_semantic_type}; /// Options that affect the entire region. /// @@ -102,6 +105,75 @@ impl WriteRequest { // - checks rows don't have duplicate columns. unimplemented!() } + + /// Checks schema of rows. + pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> { + let region_id = self.region_id; + // Index all columns in rows. + let mut rows_columns: HashMap<_, _> = self + .rows + .schema + .iter() + .map(|column| (&column.column_name, column)) + .collect(); + + // Checks all columns in this region. + for column in &metadata.column_metadatas { + if let Some(input_col) = rows_columns.remove(&column.column_schema.name) { + // Check data type. + ensure!( + check_column_type(input_col.datatype, &column.column_schema.data_type), + InvalidRequestSnafu { + region_id, + reason: format!( + "Column {} expect type {:?}, given: {:?}({})", + column.column_schema.name, + column.column_schema.data_type, + ColumnDataType::from_i32(input_col.datatype), + input_col.datatype, + ) + } + ); + + // Check semantic type. + ensure!( + check_semantic_type(input_col.semantic_type, column.semantic_type), + InvalidRequestSnafu { + region_id, + reason: format!( + "Column {} has semantic type {:?}, given: {:?}({})", + column.column_schema.name, + column.semantic_type, + greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type), + input_col.semantic_type + ), + } + ); + } else { + // For columns not in rows, checks whether they are nullable. + ensure!( + column.column_schema.is_nullable() + || column.column_schema.default_constraint().is_some(), + InvalidRequestSnafu { + region_id, + reason: format!("Missing column {}", column.column_schema.name), + } + ); + } + } + + // Checks all columns in rows exist in the regino. + if !rows_columns.is_empty() { + let names: Vec<_> = rows_columns.into_keys().collect(); + return InvalidRequestSnafu { + region_id, + reason: format!("Unknown columns: {:?}", names), + } + .fail(); + } + + Ok(()) + } } /// Sender and write request. diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index e8bdb4f9fc..193e3c94eb 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -16,15 +16,10 @@ use std::collections::HashMap; -use datatypes::prelude::ConcreteDataType; -use datatypes::types::{TimeType, TimestampType}; use greptime_proto::v1::mito::Mutation; -use greptime_proto::v1::{ColumnDataType, Rows}; -use snafu::ensure; use tokio::sync::oneshot::Sender; -use crate::error::{InvalidRequestSnafu, RegionNotFoundSnafu, Result}; -use crate::metadata::SemanticType; +use crate::error::{RegionNotFoundSnafu, Result}; use crate::region::version::VersionRef; use crate::region::MitoRegionRef; use crate::request::SenderWriteRequest; @@ -59,17 +54,20 @@ impl RegionWorkerLoop { let region_ctx = region_ctxs.get_mut(®ion_id).unwrap(); // Checks request schema. - if let Err(e) = region_ctx.check_schema(&sender_req.request.rows) { + if let Err(e) = sender_req + .request + .check_schema(®ion_ctx.version.metadata) + { send_result(sender_req.sender, Err(e)); continue; } - // Push request. + // } // We need to check: // - region exists, if not, return error - // - check whether the schema is compatible with region schema + // - check whether the schema is compatible with region schema. We should fill default value at this time. // - collect rows by region // - get sequence for each row @@ -115,130 +113,6 @@ impl RegionWriteCtx { senders: Vec::new(), } } - - /// Checks schema of rows. - fn check_schema(&self, rows: &Rows) -> Result<()> { - let region_id = self.region.region_id; - // Index all columns in rows. - let mut rows_columns: HashMap<_, _> = rows - .schema - .iter() - .map(|column| (&column.column_name, column)) - .collect(); - - // Checks all columns in this region. - for column in &self.version.metadata.column_metadatas { - if let Some(input_col) = rows_columns.remove(&column.column_schema.name) { - // Check data type. - ensure!( - check_column_type(input_col.datatype, &column.column_schema.data_type), - InvalidRequestSnafu { - region_id, - reason: format!( - "Column {} expect type {:?}, given: {:?}({})", - column.column_schema.name, - column.column_schema.data_type, - ColumnDataType::from_i32(input_col.datatype), - input_col.datatype, - ) - } - ); - - // Check semantic type. - ensure!( - check_semantic_type(input_col.semantic_type, column.semantic_type), - InvalidRequestSnafu { - region_id, - reason: format!( - "Column {} has semantic type {:?}, given: {:?}({})", - column.column_schema.name, - column.semantic_type, - greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type), - input_col.semantic_type - ), - } - ); - } else { - // For columns not in rows, checks whether they are nullable. - ensure!( - column.column_schema.is_nullable() - || column.column_schema.default_constraint().is_some(), - InvalidRequestSnafu { - region_id, - reason: format!("Missing column {}", column.column_schema.name), - } - ); - } - } - - // Checks all columns in rows exist in the regino. - if !rows_columns.is_empty() { - let names: Vec<_> = rows_columns.into_keys().collect(); - return InvalidRequestSnafu { - region_id, - reason: format!("Unknown columns: {:?}", names), - } - .fail(); - } - - Ok(()) - } -} - -/// Returns true if the pb semantic type is valid. -fn check_semantic_type(type_value: i32, semantic_type: SemanticType) -> bool { - type_value == semantic_type as i32 -} - -/// Returns true if the pb type value is valid. -fn check_column_type(type_value: i32, expect_type: &ConcreteDataType) -> bool { - let Some(column_type) = ColumnDataType::from_i32(type_value) else { - return false; - }; - - is_column_type_eq(column_type, expect_type) -} - -/// Returns true if the column type is equal to exepcted type. -fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool { - match (column_type, expect_type) { - (ColumnDataType::Boolean, ConcreteDataType::Boolean(_)) - | (ColumnDataType::Int8, ConcreteDataType::Int8(_)) - | (ColumnDataType::Int16, ConcreteDataType::Int16(_)) - | (ColumnDataType::Int32, ConcreteDataType::Int32(_)) - | (ColumnDataType::Int64, ConcreteDataType::Int64(_)) - | (ColumnDataType::Uint8, ConcreteDataType::UInt8(_)) - | (ColumnDataType::Uint16, ConcreteDataType::UInt16(_)) - | (ColumnDataType::Uint32, ConcreteDataType::UInt32(_)) - | (ColumnDataType::Uint64, ConcreteDataType::UInt64(_)) - | (ColumnDataType::Float32, ConcreteDataType::Float32(_)) - | (ColumnDataType::Float64, ConcreteDataType::Float64(_)) - | (ColumnDataType::Binary, ConcreteDataType::Binary(_)) - | (ColumnDataType::String, ConcreteDataType::String(_)) - | (ColumnDataType::Date, ConcreteDataType::Date(_)) - | (ColumnDataType::Datetime, ConcreteDataType::DateTime(_)) - | ( - ColumnDataType::TimestampSecond, - ConcreteDataType::Timestamp(TimestampType::Second(_)), - ) - | ( - ColumnDataType::TimestampMillisecond, - ConcreteDataType::Timestamp(TimestampType::Millisecond(_)), - ) - | ( - ColumnDataType::TimestampMicrosecond, - ConcreteDataType::Timestamp(TimestampType::Microsecond(_)), - ) - | ( - ColumnDataType::TimestampNanosecond, - ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)), - ) - | (ColumnDataType::TimeSecond, ConcreteDataType::Time(TimeType::Second(_))) - | (ColumnDataType::TimeMillisecond, ConcreteDataType::Time(TimeType::Millisecond(_))) - | (ColumnDataType::TimeMicrosecond, ConcreteDataType::Time(TimeType::Microsecond(_))) - | (ColumnDataType::TimeNanosecond, ConcreteDataType::Time(TimeType::Nanosecond(_))) => true, - _ => false, - } } // sender