refactor: move schema check to write request

This commit is contained in:
evenyag
2023-08-02 20:19:33 +08:00
parent da80423119
commit 4b0931aee0
4 changed files with 160 additions and 136 deletions

View File

@@ -31,6 +31,7 @@ pub mod manifest;
pub mod memtable;
#[allow(dead_code)]
pub mod metadata;
pub(crate) mod proto_util;
pub mod read;
#[allow(dead_code)]
mod region;

View File

@@ -0,0 +1,77 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities to process protobuf messages.
use datatypes::prelude::ConcreteDataType;
use datatypes::types::{TimeType, TimestampType};
use greptime_proto::v1::ColumnDataType;
use crate::metadata::SemanticType;
/// Returns true if the pb semantic type is valid.
pub(crate) fn check_semantic_type(type_value: i32, semantic_type: SemanticType) -> bool {
type_value == semantic_type as i32
}
/// Returns true if the pb type value is valid.
pub(crate) fn check_column_type(type_value: i32, expect_type: &ConcreteDataType) -> bool {
let Some(column_type) = ColumnDataType::from_i32(type_value) else {
return false;
};
is_column_type_eq(column_type, expect_type)
}
/// Returns true if the column type is equal to exepcted type.
fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool {
match (column_type, expect_type) {
(ColumnDataType::Boolean, ConcreteDataType::Boolean(_))
| (ColumnDataType::Int8, ConcreteDataType::Int8(_))
| (ColumnDataType::Int16, ConcreteDataType::Int16(_))
| (ColumnDataType::Int32, ConcreteDataType::Int32(_))
| (ColumnDataType::Int64, ConcreteDataType::Int64(_))
| (ColumnDataType::Uint8, ConcreteDataType::UInt8(_))
| (ColumnDataType::Uint16, ConcreteDataType::UInt16(_))
| (ColumnDataType::Uint32, ConcreteDataType::UInt32(_))
| (ColumnDataType::Uint64, ConcreteDataType::UInt64(_))
| (ColumnDataType::Float32, ConcreteDataType::Float32(_))
| (ColumnDataType::Float64, ConcreteDataType::Float64(_))
| (ColumnDataType::Binary, ConcreteDataType::Binary(_))
| (ColumnDataType::String, ConcreteDataType::String(_))
| (ColumnDataType::Date, ConcreteDataType::Date(_))
| (ColumnDataType::Datetime, ConcreteDataType::DateTime(_))
| (
ColumnDataType::TimestampSecond,
ConcreteDataType::Timestamp(TimestampType::Second(_)),
)
| (
ColumnDataType::TimestampMillisecond,
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)),
)
| (
ColumnDataType::TimestampMicrosecond,
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)),
)
| (
ColumnDataType::TimestampNanosecond,
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)),
)
| (ColumnDataType::TimeSecond, ConcreteDataType::Time(TimeType::Second(_)))
| (ColumnDataType::TimeMillisecond, ConcreteDataType::Time(TimeType::Millisecond(_)))
| (ColumnDataType::TimeMicrosecond, ConcreteDataType::Time(TimeType::Microsecond(_)))
| (ColumnDataType::TimeNanosecond, ConcreteDataType::Time(TimeType::Nanosecond(_))) => true,
_ => false,
}
}

View File

@@ -14,16 +14,19 @@
//! Worker requests.
use std::collections::HashMap;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use greptime_proto::v1::Rows;
use greptime_proto::v1::{ColumnDataType, Rows};
use snafu::ensure;
use store_api::storage::{ColumnId, CompactionStrategy, OpType, RegionId};
use tokio::sync::oneshot::{self, Receiver, Sender};
use crate::config::DEFAULT_WRITE_BUFFER_SIZE;
use crate::error::Result;
use crate::metadata::ColumnMetadata;
use crate::error::{InvalidRequestSnafu, Result};
use crate::metadata::{ColumnMetadata, RegionMetadata};
use crate::proto_util::{check_column_type, check_semantic_type};
/// Options that affect the entire region.
///
@@ -102,6 +105,75 @@ impl WriteRequest {
// - checks rows don't have duplicate columns.
unimplemented!()
}
/// Checks schema of rows.
pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
let region_id = self.region_id;
// Index all columns in rows.
let mut rows_columns: HashMap<_, _> = self
.rows
.schema
.iter()
.map(|column| (&column.column_name, column))
.collect();
// Checks all columns in this region.
for column in &metadata.column_metadatas {
if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
// Check data type.
ensure!(
check_column_type(input_col.datatype, &column.column_schema.data_type),
InvalidRequestSnafu {
region_id,
reason: format!(
"Column {} expect type {:?}, given: {:?}({})",
column.column_schema.name,
column.column_schema.data_type,
ColumnDataType::from_i32(input_col.datatype),
input_col.datatype,
)
}
);
// Check semantic type.
ensure!(
check_semantic_type(input_col.semantic_type, column.semantic_type),
InvalidRequestSnafu {
region_id,
reason: format!(
"Column {} has semantic type {:?}, given: {:?}({})",
column.column_schema.name,
column.semantic_type,
greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type),
input_col.semantic_type
),
}
);
} else {
// For columns not in rows, checks whether they are nullable.
ensure!(
column.column_schema.is_nullable()
|| column.column_schema.default_constraint().is_some(),
InvalidRequestSnafu {
region_id,
reason: format!("Missing column {}", column.column_schema.name),
}
);
}
}
// Checks all columns in rows exist in the regino.
if !rows_columns.is_empty() {
let names: Vec<_> = rows_columns.into_keys().collect();
return InvalidRequestSnafu {
region_id,
reason: format!("Unknown columns: {:?}", names),
}
.fail();
}
Ok(())
}
}
/// Sender and write request.

View File

@@ -16,15 +16,10 @@
use std::collections::HashMap;
use datatypes::prelude::ConcreteDataType;
use datatypes::types::{TimeType, TimestampType};
use greptime_proto::v1::mito::Mutation;
use greptime_proto::v1::{ColumnDataType, Rows};
use snafu::ensure;
use tokio::sync::oneshot::Sender;
use crate::error::{InvalidRequestSnafu, RegionNotFoundSnafu, Result};
use crate::metadata::SemanticType;
use crate::error::{RegionNotFoundSnafu, Result};
use crate::region::version::VersionRef;
use crate::region::MitoRegionRef;
use crate::request::SenderWriteRequest;
@@ -59,17 +54,20 @@ impl<S> RegionWorkerLoop<S> {
let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
// Checks request schema.
if let Err(e) = region_ctx.check_schema(&sender_req.request.rows) {
if let Err(e) = sender_req
.request
.check_schema(&region_ctx.version.metadata)
{
send_result(sender_req.sender, Err(e));
continue;
}
// Push request.
//
}
// We need to check:
// - region exists, if not, return error
// - check whether the schema is compatible with region schema
// - check whether the schema is compatible with region schema. We should fill default value at this time.
// - collect rows by region
// - get sequence for each row
@@ -115,130 +113,6 @@ impl RegionWriteCtx {
senders: Vec::new(),
}
}
/// Checks schema of rows.
fn check_schema(&self, rows: &Rows) -> Result<()> {
let region_id = self.region.region_id;
// Index all columns in rows.
let mut rows_columns: HashMap<_, _> = rows
.schema
.iter()
.map(|column| (&column.column_name, column))
.collect();
// Checks all columns in this region.
for column in &self.version.metadata.column_metadatas {
if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
// Check data type.
ensure!(
check_column_type(input_col.datatype, &column.column_schema.data_type),
InvalidRequestSnafu {
region_id,
reason: format!(
"Column {} expect type {:?}, given: {:?}({})",
column.column_schema.name,
column.column_schema.data_type,
ColumnDataType::from_i32(input_col.datatype),
input_col.datatype,
)
}
);
// Check semantic type.
ensure!(
check_semantic_type(input_col.semantic_type, column.semantic_type),
InvalidRequestSnafu {
region_id,
reason: format!(
"Column {} has semantic type {:?}, given: {:?}({})",
column.column_schema.name,
column.semantic_type,
greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type),
input_col.semantic_type
),
}
);
} else {
// For columns not in rows, checks whether they are nullable.
ensure!(
column.column_schema.is_nullable()
|| column.column_schema.default_constraint().is_some(),
InvalidRequestSnafu {
region_id,
reason: format!("Missing column {}", column.column_schema.name),
}
);
}
}
// Checks all columns in rows exist in the regino.
if !rows_columns.is_empty() {
let names: Vec<_> = rows_columns.into_keys().collect();
return InvalidRequestSnafu {
region_id,
reason: format!("Unknown columns: {:?}", names),
}
.fail();
}
Ok(())
}
}
/// Returns true if the pb semantic type is valid.
fn check_semantic_type(type_value: i32, semantic_type: SemanticType) -> bool {
type_value == semantic_type as i32
}
/// Returns true if the pb type value is valid.
fn check_column_type(type_value: i32, expect_type: &ConcreteDataType) -> bool {
let Some(column_type) = ColumnDataType::from_i32(type_value) else {
return false;
};
is_column_type_eq(column_type, expect_type)
}
/// Returns true if the column type is equal to exepcted type.
fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool {
match (column_type, expect_type) {
(ColumnDataType::Boolean, ConcreteDataType::Boolean(_))
| (ColumnDataType::Int8, ConcreteDataType::Int8(_))
| (ColumnDataType::Int16, ConcreteDataType::Int16(_))
| (ColumnDataType::Int32, ConcreteDataType::Int32(_))
| (ColumnDataType::Int64, ConcreteDataType::Int64(_))
| (ColumnDataType::Uint8, ConcreteDataType::UInt8(_))
| (ColumnDataType::Uint16, ConcreteDataType::UInt16(_))
| (ColumnDataType::Uint32, ConcreteDataType::UInt32(_))
| (ColumnDataType::Uint64, ConcreteDataType::UInt64(_))
| (ColumnDataType::Float32, ConcreteDataType::Float32(_))
| (ColumnDataType::Float64, ConcreteDataType::Float64(_))
| (ColumnDataType::Binary, ConcreteDataType::Binary(_))
| (ColumnDataType::String, ConcreteDataType::String(_))
| (ColumnDataType::Date, ConcreteDataType::Date(_))
| (ColumnDataType::Datetime, ConcreteDataType::DateTime(_))
| (
ColumnDataType::TimestampSecond,
ConcreteDataType::Timestamp(TimestampType::Second(_)),
)
| (
ColumnDataType::TimestampMillisecond,
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)),
)
| (
ColumnDataType::TimestampMicrosecond,
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)),
)
| (
ColumnDataType::TimestampNanosecond,
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)),
)
| (ColumnDataType::TimeSecond, ConcreteDataType::Time(TimeType::Second(_)))
| (ColumnDataType::TimeMillisecond, ConcreteDataType::Time(TimeType::Millisecond(_)))
| (ColumnDataType::TimeMicrosecond, ConcreteDataType::Time(TimeType::Microsecond(_)))
| (ColumnDataType::TimeNanosecond, ConcreteDataType::Time(TimeType::Nanosecond(_))) => true,
_ => false,
}
}
// sender