feat: validate schema

This commit is contained in:
evenyag
2023-08-02 19:34:09 +08:00
parent a737c3d12a
commit da80423119
10 changed files with 321 additions and 45 deletions

4
Cargo.lock generated
View File

@@ -4126,7 +4126,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5e422e518b6755c46ec8e6f26233c433ff016558#5e422e518b6755c46ec8e6f26233c433ff016558"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7855e1fcc806a795e5e714297fa65083b1110dce#7855e1fcc806a795e5e714297fa65083b1110dce"
dependencies = [
"prost",
"serde",
@@ -5510,7 +5510,7 @@ dependencies = [
"datafusion-common",
"datatypes",
"futures",
"greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5e422e518b6755c46ec8e6f26233c433ff016558)",
"greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7855e1fcc806a795e5e714297fa65083b1110dce)",
"lazy_static",
"log-store",
"metrics",

View File

@@ -32,8 +32,8 @@ datafusion.workspace = true
datafusion-common.workspace = true
datatypes = { path = "../datatypes" }
futures.workspace = true
# TODO(yingwen): Update and use api crate once https://github.com/GreptimeTeam/greptime-proto/pull/72 is merged.
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5e422e518b6755c46ec8e6f26233c433ff016558" }
# TODO(yingwen): Update and use api crate once https://github.com/GreptimeTeam/greptime-proto/pull/75 is merged.
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7855e1fcc806a795e5e714297fa65083b1110dce" }
lazy_static = "1.4"
log-store = { path = "../log-store" }
metrics.workspace = true

View File

@@ -26,7 +26,9 @@ use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::error::{RecvSnafu, Result};
use crate::request::{CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody, WriteRequest};
use crate::request::{
CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody, WriteRequest,
};
use crate::worker::WorkerGroup;
/// Region engine implementation for timeseries data.
@@ -88,7 +90,8 @@ impl MitoEngine {
pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> {
write_request.validate()?;
self.inner.handle_request_body(RequestBody::Write(write_request))
self.inner
.handle_request_body(RequestBody::Write(write_request))
.await
}
}

View File

@@ -174,6 +174,13 @@ pub enum Error {
reason: String,
location: Location,
},
#[snafu(display("Invalid request to region {}, reason: {}", region_id, reason))]
InvalidRequest {
region_id: RegionId,
reason: String,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -194,9 +201,10 @@ impl ErrorExt for Error {
| NewRecordBatch { .. }
| RegionNotFound { .. }
| RegionCorrupted { .. } => StatusCode::Unexpected,
InvalidScanIndex { .. } | InvalidMeta { .. } | InvalidSchema { .. } => {
StatusCode::InvalidArguments
}
InvalidScanIndex { .. }
| InvalidMeta { .. }
| InvalidSchema { .. }
| InvalidRequest { .. } => StatusCode::InvalidArguments,
RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => {
StatusCode::Internal
}

View File

@@ -221,6 +221,50 @@ impl RegionMetadata {
}
}
/// Fields skipped in serialization.
struct SkippedFields {
/// Last schema.
schema: SchemaRef,
/// Id of the time index column.
time_index: ColumnId,
/// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
id_to_index: HashMap<ColumnId, usize>,
}
impl SkippedFields {
/// Constructs skipped fields from `column_metadatas`.
fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
let column_schemas = column_metadatas
.iter()
.map(|column_metadata| column_metadata.column_schema.clone())
.collect();
let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
let time_index = column_metadatas
.iter()
.find_map(|col| {
if col.semantic_type == SemanticType::Timestamp {
Some(col.column_id)
} else {
None
}
})
.context(InvalidMetaSnafu {
reason: "time index not found",
})?;
let id_to_index = column_metadatas
.iter()
.enumerate()
.map(|(idx, col)| (col.column_id, idx))
.collect();
Ok(SkippedFields {
schema,
time_index,
id_to_index,
})
}
}
/// Builder to build [RegionMetadata].
pub struct RegionMetadataBuilder {
region_id: RegionId,
@@ -310,14 +354,14 @@ impl ColumnMetadata {
}
/// The semantic type of one column
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum SemanticType {
/// Tag column, also is a part of primary key.
Tag,
Tag = 0,
/// A column that isn't a time index or part of primary key.
Field,
Field = 1,
/// Time index column.
Timestamp,
Timestamp = 2,
}
/// Fields skipped in serialization.

View File

@@ -15,7 +15,7 @@
//! Mito region.
pub(crate) mod opener;
mod version;
pub(crate) mod version;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
@@ -25,7 +25,7 @@ use store_api::storage::RegionId;
use crate::error::Result;
use crate::manifest::manager::RegionManifestManager;
use crate::region::version::VersionControlRef;
use crate::region::version::{VersionControlRef, VersionRef};
/// Type to store region version.
pub type VersionNumber = u32;
@@ -56,6 +56,11 @@ impl MitoRegion {
Ok(())
}
/// Returns current version of the region.
pub(crate) fn version(&self) -> VersionRef {
self.version_control.current()
}
}
/// Regions indexed by ids.

View File

@@ -26,7 +26,6 @@
use std::sync::Arc;
use arc_swap::ArcSwap;
use store_api::manifest::ManifestVersion;
use store_api::storage::SequenceNumber;
use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
@@ -48,6 +47,11 @@ impl VersionControl {
version: ArcSwap::new(Arc::new(version)),
}
}
/// Returns current [Version].
pub(crate) fn current(&self) -> VersionRef {
self.version.load_full()
}
}
pub(crate) type VersionControlRef = Arc<VersionControl>;
@@ -59,21 +63,20 @@ pub(crate) struct Version {
///
/// Altering metadata isn't frequent, storing metadata in Arc to allow sharing
/// metadata and reuse metadata when creating a new `Version`.
metadata: RegionMetadataRef,
pub(crate) metadata: RegionMetadataRef,
/// Mutable and immutable memtables.
///
/// Wrapped in Arc to make clone of `Version` much cheaper.
memtables: MemtableVersionRef,
pub(crate) memtables: MemtableVersionRef,
/// SSTs of the region.
ssts: SstVersionRef,
pub(crate) ssts: SstVersionRef,
/// Inclusive max sequence of flushed data.
flushed_sequence: SequenceNumber,
// TODO(yingwen): Remove this.
/// Current version of region manifest.
manifest_version: ManifestVersion,
pub(crate) flushed_sequence: SequenceNumber,
// TODO(yingwen): RegionOptions.
}
pub(crate) type VersionRef = Arc<Version>;
/// Version builder.
pub(crate) struct VersionBuilder {
metadata: RegionMetadataRef,
@@ -94,7 +97,6 @@ impl VersionBuilder {
memtables: Arc::new(MemtableVersion::new(self.mutable)),
ssts: Arc::new(SstVersion::new()),
flushed_sequence: 0,
manifest_version: 0,
}
}
}

View File

@@ -83,33 +83,34 @@ pub struct CloseRequest {
pub region_id: RegionId,
}
/// Mutation to apply to a set of rows.
#[derive(Debug)]
pub struct Mutation {
/// Type of the mutation.
pub op_type: OpType,
/// Rows to write.
pub rows: Rows,
}
/// Request to write a region.
#[derive(Debug)]
pub struct WriteRequest {
/// Region to write.
pub region_id: RegionId,
/// Mutation to the region.
pub mutation: Mutation,
/// Type of the write request.
pub op_type: OpType,
/// Rows to write.
pub rows: Rows,
}
impl WriteRequest {
/// Validate the request.
pub(crate) fn validate(&self) -> Result<()> {
// 1. checks whether the request is too large.
// 2. checks whether each row in rows has the same schema.
// - checks whether the request is too large.
// - checks whether each row in rows has the same schema.
// - checks rows don't have duplicate columns.
unimplemented!()
}
}
/// Sender and write request.
pub(crate) struct SenderWriteRequest {
/// Result sender.
pub(crate) sender: Option<Sender<Result<()>>>,
pub(crate) request: WriteRequest,
}
/// Request sent to a worker
pub(crate) enum WorkerRequest {
/// Region request.
@@ -175,4 +176,15 @@ impl RequestBody {
pub(crate) fn is_write(&self) -> bool {
matches!(self, RequestBody::Write(_))
}
/// Converts the request into a [WriteRequest].
///
/// # Panics
/// Panics if it isn't a [WriteRequest].
pub(crate) fn into_write_request(self) -> WriteRequest {
match self {
RequestBody::Write(req) => req,
other => panic!("expect write request, found {other:?}"),
}
}
}

View File

@@ -38,7 +38,7 @@ use crate::config::MitoConfig;
use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef};
use crate::region::{RegionMap, RegionMapRef};
use crate::request::{RegionRequest, RequestBody, WorkerRequest};
use crate::request::{RegionRequest, RequestBody, SenderWriteRequest, WorkerRequest};
/// Identifier for a worker.
pub(crate) type WorkerId = u32;
@@ -329,7 +329,10 @@ impl<S> RegionWorkerLoop<S> {
match worker_req {
WorkerRequest::Region(req) => {
if req.body.is_write() {
write_requests.push(req);
write_requests.push(SenderWriteRequest {
sender: req.sender,
request: req.body.into_write_request(),
});
} else {
ddl_requests.push(req);
}

View File

@@ -14,18 +14,59 @@
//! Handling write requests.
use crate::{worker::RegionWorkerLoop, request::{RegionRequest}};
use std::collections::HashMap;
use datatypes::prelude::ConcreteDataType;
use datatypes::types::{TimeType, TimestampType};
use greptime_proto::v1::mito::Mutation;
use greptime_proto::v1::{ColumnDataType, Rows};
use snafu::ensure;
use tokio::sync::oneshot::Sender;
use crate::error::{InvalidRequestSnafu, RegionNotFoundSnafu, Result};
use crate::metadata::SemanticType;
use crate::region::version::VersionRef;
use crate::region::MitoRegionRef;
use crate::request::SenderWriteRequest;
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
/// Takes and handles all write requests.
///
/// # Panics
/// Panics if `write_requests` contains a request whose body is a [WriteRequest].
pub(crate) async fn handle_write_requests(&mut self, write_requests: Vec<RegionRequest>) {
pub(crate) async fn handle_write_requests(&mut self, write_requests: Vec<SenderWriteRequest>) {
if write_requests.is_empty() {
return;
}
let mut region_ctxs = HashMap::new();
for sender_req in write_requests {
let region_id = sender_req.request.region_id;
// Checks whether the region exists.
if !region_ctxs.contains_key(&region_id) {
let Some(region) = self.regions.get_region(region_id) else {
// No such region.
send_result(sender_req.sender, RegionNotFoundSnafu {
region_id,
}.fail());
continue;
};
// Initialize the context.
region_ctxs.insert(region_id, RegionWriteCtx::new(region));
}
// Safety: Now we ensure the region exists.
let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
// Checks request schema.
if let Err(e) = region_ctx.check_schema(&sender_req.request.rows) {
send_result(sender_req.sender, Err(e));
continue;
}
// Push request.
}
// We need to check:
// - region exists, if not, return error
// - check whether the schema is compatible with region schema
@@ -41,7 +82,166 @@ impl<S> RegionWorkerLoop<S> {
}
}
/// Send result to the request.
fn send_result(sender: Option<Sender<Result<()>>>, res: Result<()>) {
if let Some(sender) = sender {
// Ignore send result.
let _ = sender.send(res);
}
}
/// Context to write to a region.
struct RegionWriteCtx {
/// Region to write.
region: MitoRegionRef,
/// Version of the region while creating the context.
version: VersionRef,
/// Valid mutations.
mutations: Vec<Mutation>,
/// Result senders.
///
/// The sender is 1:1 map to the mutation in `mutations`.
senders: Vec<Option<Sender<Result<()>>>>,
}
impl RegionWriteCtx {
/// Returns an empty context.
fn new(region: MitoRegionRef) -> RegionWriteCtx {
let version = region.version();
RegionWriteCtx {
region,
version,
mutations: Vec::new(),
senders: Vec::new(),
}
}
/// Checks schema of rows.
fn check_schema(&self, rows: &Rows) -> Result<()> {
let region_id = self.region.region_id;
// Index all columns in rows.
let mut rows_columns: HashMap<_, _> = rows
.schema
.iter()
.map(|column| (&column.column_name, column))
.collect();
// Checks all columns in this region.
for column in &self.version.metadata.column_metadatas {
if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
// Check data type.
ensure!(
check_column_type(input_col.datatype, &column.column_schema.data_type),
InvalidRequestSnafu {
region_id,
reason: format!(
"Column {} expect type {:?}, given: {:?}({})",
column.column_schema.name,
column.column_schema.data_type,
ColumnDataType::from_i32(input_col.datatype),
input_col.datatype,
)
}
);
// Check semantic type.
ensure!(
check_semantic_type(input_col.semantic_type, column.semantic_type),
InvalidRequestSnafu {
region_id,
reason: format!(
"Column {} has semantic type {:?}, given: {:?}({})",
column.column_schema.name,
column.semantic_type,
greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type),
input_col.semantic_type
),
}
);
} else {
// For columns not in rows, checks whether they are nullable.
ensure!(
column.column_schema.is_nullable()
|| column.column_schema.default_constraint().is_some(),
InvalidRequestSnafu {
region_id,
reason: format!("Missing column {}", column.column_schema.name),
}
);
}
}
// Checks all columns in rows exist in the regino.
if !rows_columns.is_empty() {
let names: Vec<_> = rows_columns.into_keys().collect();
return InvalidRequestSnafu {
region_id,
reason: format!("Unknown columns: {:?}", names),
}
.fail();
}
Ok(())
}
}
/// Returns true if the pb semantic type is valid.
fn check_semantic_type(type_value: i32, semantic_type: SemanticType) -> bool {
type_value == semantic_type as i32
}
/// Returns true if the pb type value is valid.
fn check_column_type(type_value: i32, expect_type: &ConcreteDataType) -> bool {
let Some(column_type) = ColumnDataType::from_i32(type_value) else {
return false;
};
is_column_type_eq(column_type, expect_type)
}
/// Returns true if the column type is equal to exepcted type.
fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool {
match (column_type, expect_type) {
(ColumnDataType::Boolean, ConcreteDataType::Boolean(_))
| (ColumnDataType::Int8, ConcreteDataType::Int8(_))
| (ColumnDataType::Int16, ConcreteDataType::Int16(_))
| (ColumnDataType::Int32, ConcreteDataType::Int32(_))
| (ColumnDataType::Int64, ConcreteDataType::Int64(_))
| (ColumnDataType::Uint8, ConcreteDataType::UInt8(_))
| (ColumnDataType::Uint16, ConcreteDataType::UInt16(_))
| (ColumnDataType::Uint32, ConcreteDataType::UInt32(_))
| (ColumnDataType::Uint64, ConcreteDataType::UInt64(_))
| (ColumnDataType::Float32, ConcreteDataType::Float32(_))
| (ColumnDataType::Float64, ConcreteDataType::Float64(_))
| (ColumnDataType::Binary, ConcreteDataType::Binary(_))
| (ColumnDataType::String, ConcreteDataType::String(_))
| (ColumnDataType::Date, ConcreteDataType::Date(_))
| (ColumnDataType::Datetime, ConcreteDataType::DateTime(_))
| (
ColumnDataType::TimestampSecond,
ConcreteDataType::Timestamp(TimestampType::Second(_)),
)
| (
ColumnDataType::TimestampMillisecond,
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)),
)
| (
ColumnDataType::TimestampMicrosecond,
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)),
)
| (
ColumnDataType::TimestampNanosecond,
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)),
)
| (ColumnDataType::TimeSecond, ConcreteDataType::Time(TimeType::Second(_)))
| (ColumnDataType::TimeMillisecond, ConcreteDataType::Time(TimeType::Millisecond(_)))
| (ColumnDataType::TimeMicrosecond, ConcreteDataType::Time(TimeType::Microsecond(_)))
| (ColumnDataType::TimeNanosecond, ConcreteDataType::Time(TimeType::Nanosecond(_))) => true,
_ => false,
}
}
// sender
// pb write message
// region id
// rows
@@ -85,4 +285,3 @@ impl<S> RegionWorkerLoop<S> {
// self.requests.is_empty()
// }
// }